Skip to content

Latest commit

 

History

History
369 lines (284 loc) · 13 KB

File metadata and controls

369 lines (284 loc) · 13 KB

SSE 消息过滤逻辑分析报告

1. 当前实现分析

1.1 SSE 消息推送代码位置

文件:bridge_message_queue.py

  • 类:BridgeMessageQueue
  • 方法:publish(account_id: str, message: dict)
  • 位置:第 48-68 行
async def publish(self, account_id: str, message: dict):
    """发布消息到所有订阅者,同时存入缓冲区"""
    event_id = self._next_event_id()
    message["event_id"] = event_id

    # 存入缓冲区
    with self._lock:
        if account_id not in self._message_buffer:
            self._message_buffer[account_id] = deque(maxlen=self.BUFFER_MAX_SIZE)
        self._message_buffer[account_id].append(message)

        subscribers = list(self._queues.get(account_id, []))

    # 推送到所有订阅者队列
    for q in subscribers:
        try:
            q.put_nowait(message)
        except asyncio.QueueFull:
            logger.warning(f"[Bridge] 订阅者队列已满,丢弃消息 event_id={event_id} account={account_id}")

    logger.debug(f"[Bridge] 消息已发布 event_id={event_id} account={account_id} 订阅者数={len(subscribers)}")

文件:bridge_api.py

  • 端点:GET /api/bridge/messages
  • 函数:stream_messages()
  • 位置:第 68-109 行
@bridge_router.get("/messages")
async def stream_messages(
    request: Request,
    account_id: str = "default",
    last_event_id: Optional[str] = Header(None, alias="Last-Event-ID"),
):
    """SSE 端点:持续推送指定账号的入站消息"""

    queue = bridge_queue.subscribe(account_id)

    async def event_generator():
        try:
            # 如果有 Last-Event-ID,先补发断线期间的消息
            if last_event_id:
                missed = bridge_queue.get_missed_messages(account_id, last_event_id)
                for msg in missed:
                    eid = msg.get("event_id", "")
                    data = json.dumps(msg, ensure_ascii=False)
                    yield f"id: {eid}\nevent: message\ndata: {data}\n\n"

            # 持续监听队列
            while True:
                # 检查客户端是否断开
                if await request.is_disconnected():
                    break

                try:
                    msg = await asyncio.wait_for(queue.get(), timeout=30.0)
                    eid = msg.get("event_id", "")
                    data = json.dumps(msg, ensure_ascii=False)
                    yield f"id: {eid}\nevent: message\ndata: {data}\n\n"
                except asyncio.TimeoutError:
                    # 30 秒无消息,发送心跳保持连接
                    yield ": keepalive\n\n"
        finally:
            bridge_queue.unsubscribe(account_id, queue)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

1.2 是否有过滤逻辑?

Python 端(Bridge API):❌ 没有过滤逻辑

  • bridge_message_queue.pypublish() 方法直接发布所有消息,没有任何过滤
  • bridge_api.pystream_messages() 端点直接推送队列中的所有消息,没有过滤
  • 结论:Python 端不过滤系统消息,所有消息都会通过 SSE 推送

TypeScript 端(OpenClaw Plugin):✅ 有过滤逻辑

  • 文件:openclaw-plugin/src/inbound-handler.ts
  • 函数:handleBridgeMessage()
  • 位置:第 54-57 行
// 忽略系统消息
if (data.contentType === "system") {
  log?.debug?.(`[Xianyu][${accountId}] Ignoring system message`);
  return;
}

结论:过滤逻辑在 OpenClaw 插件的入站处理器中,而不是在 SSE 消息推送层。


2. 系统消息定义

2.1 系统消息的特征

文件:openclaw-plugin/src/types.ts

export interface BridgeMessageEvent {
  messageId: string;
  conversationId: string;
  senderId: string;
  senderName: string;
  content: string;
  contentType: "text" | "image" | "system";  // ← 系统消息标识
  itemId?: string;
  timestamp: number;
}

2.2 如何识别系统消息

判断条件:data.contentType === "system"

  • 系统消息通过 contentType 字段标识
  • contentType"system" 时,该消息被认为是系统消息
  • 系统消息会被 OpenClaw 插件的入站处理器忽略,不会分发给 Agent

2.3 系统消息的来源

系统消息可能来自:

  1. 闲鱼平台的系统通知(如订单状态变更、平台公告等)
  2. WebSocket 连接的系统事件(如连接建立、断开等)
  3. XianyuLive 实例的内部事件(如 Cookie 刷新、账号状态变更等)

3. 其他插件参考

3.1 飞书插件(Feishu)

文件:clawdbot-feishu-repo/src/bot.ts

飞书插件没有显式的系统消息过滤,但有以下过滤逻辑:

  1. 消息去重(第 653 行):

    if (!tryRecordMessage(messageId, dedupAccountId)) {
      log(`feishu: skipping duplicate message ${messageId}`);
      return;
    }
  2. 权限检查(第 680-720 行):

    • 群组白名单检查
    • 发送者白名单检查
    • DM 策略检查(pairing/allowlist/open)
  3. @提及检查(第 722-738 行):

    if (requireMention && !ctx.mentionedBot) {
      log(`feishu[${account.accountId}]: message in group ${ctx.chatId} did not mention bot, recording to history`);
      if (chatHistories) {
        recordPendingHistoryEntryIfEnabled({
          historyMap: chatHistories,
          historyKey: ctx.chatId,
          limit: historyLimit,
          entry: {
            sender: ctx.senderOpenId,
            body: `${ctx.senderName ?? ctx.senderOpenId}: ${ctx.content}`,
            timestamp: Date.now(),
            messageId: ctx.messageId,
          },
        });
      }
      return;
    }

飞书插件的过滤策略:

  • ✅ 去重过滤
  • ✅ 权限过滤
  • ✅ @提及过滤
  • ❌ 没有系统消息过滤(因为飞书 API 不会推送系统消息)

3.2 钉钉插件(DingTalk)

文件:openclaw-channel-dingtalk-repo/src/runtime.ts

钉钉插件的代码非常简洁,只提供了运行时初始化函数,没有消息处理逻辑。

推测:钉钉插件的消息过滤逻辑可能在其他文件中(未提供完整代码)。

3.3 企业微信插件(WeCom)

文件:openclaw-china-repo/extensions/wecom/src/channel.ts

企业微信插件没有显式的系统消息过滤,但有以下过滤逻辑:

  1. 目标解析(第 42-90 行):

    • 只接受 user:<userid>group:<chatid> 格式的目标
    • 拒绝无效的目标格式
  2. 流式回复检查(第 200-220 行):

    const streamAccepted = appendWecomActiveStreamChunk({
      accountId: account.accountId,
      to: replyTarget,
      chunk: params.text,
      sessionKey: streamContext.sessionKey,
      runId: streamContext.runId,
    });
    if (streamAccepted) {
      return {
        channel: "wecom",
        ok: true,
        messageId: `stream:${Date.now()}`,
      };
    }
    const error = new Error(
      `No active stream available for ${replyTarget}. WeCom message tool is stream-only in current mode.`
    );

企业微信插件的过滤策略:

  • ✅ 目标格式验证
  • ✅ 流式回复检查
  • ❌ 没有系统消息过滤(因为企业微信 API 不会推送系统消息)

4. 问题和建议

4.1 当前是否存在问题?

✅ 不存在问题,现有机制已经正确处理系统消息。

原因:

  1. Python 端(Bridge API)不过滤系统消息是正确的设计

    • SSE 推送层应该是"透明"的,不应该做业务逻辑过滤
    • 所有消息都应该推送到订阅者,由订阅者决定如何处理
  2. TypeScript 端(OpenClaw Plugin)正确过滤了系统消息

    • 入站处理器在接收到消息后,立即检查 contentType
    • 系统消息被忽略,不会分发给 Agent
    • 这是正确的业务逻辑层过滤
  3. 分层设计清晰

    • 传输层(SSE):负责消息推送,不做过滤
    • 业务层(OpenClaw Plugin):负责消息处理,做业务逻辑过滤

4.2 现有机制说明

闲鱼频道的消息过滤机制:

┌─────────────────────────────────────────────────────────────┐
│ XianyuLive (Python)                                         │
│ ├─ WebSocket 接收消息                                       │
│ ├─ 解析消息类型(text/image/system)                       │
│ └─ 调用 bridge_queue.publish() 发布所有消息                │
└─────────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────────┐
│ Bridge API (Python)                                         │
│ ├─ SSE 端点 /api/bridge/messages                           │
│ └─ 推送所有消息(包括系统消息)                            │
└─────────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────────┐
│ OpenClaw Plugin (TypeScript)                                │
│ ├─ BridgeClient 接收 SSE 消息                              │
│ ├─ ConnectionManager 管理连接                              │
│ └─ handleBridgeMessage() 处理消息                          │
│    ├─ ✅ 过滤系统消息 (contentType === "system")           │
│    ├─ ✅ 过滤空内容消息                                    │
│    └─ ✅ 分发有效消息给 Agent                              │
└─────────────────────────────────────────────────────────────┘

过滤规则(在 OpenClaw Plugin 中):

// 1. 忽略系统消息
if (data.contentType === "system") {
  log?.debug?.(`[Xianyu][${accountId}] Ignoring system message`);
  return;
}

// 2. 忽略空内容
if (!data.content || !data.content.trim()) {
  log?.debug?.(`[Xianyu][${accountId}] Ignoring empty message`);
  return;
}

4.3 与其他插件的对比

插件 系统消息过滤 过滤位置 原因
闲鱼(Xianyu) ✅ 有 OpenClaw Plugin 入站处理器 闲鱼 WebSocket 会推送系统消息
飞书(Feishu) ❌ 无 - 飞书 API 不推送系统消息
钉钉(DingTalk) ❓ 未知 - 代码不完整,无法确认
企业微信(WeCom) ❌ 无 - 企业微信 API 不推送系统消息

结论:闲鱼频道的系统消息过滤是必要的,因为闲鱼 WebSocket 会推送系统消息。

4.4 建议

✅ 当前实现已经正确,无需修改。

如果未来需要增强,可以考虑:

  1. 在 Python 端添加可选的消息过滤配置

    • 允许用户配置是否推送系统消息
    • 但这会破坏 SSE 层的透明性,不推荐
  2. 在 OpenClaw Plugin 中添加更多过滤规则

    • 例如:过滤特定类型的系统消息
    • 例如:过滤特定发送者的消息
    • 这些都应该在业务层(OpenClaw Plugin)实现
  3. 添加消息过滤日志

    • 当前已经有 log?.debug?.() 日志
    • 可以考虑添加统计信息(过滤了多少系统消息)

5. 总结

5.1 核心发现

  1. SSE 消息推送层(Python)不过滤系统消息 - 这是正确的设计
  2. OpenClaw 插件(TypeScript)正确过滤了系统消息 - 在入站处理器中
  3. 系统消息通过 contentType === "system" 识别
  4. 其他插件(飞书、企业微信)不需要过滤系统消息 - 因为它们的 API 不推送系统消息

5.2 回答原始问题

问题 1:检查发布到 SSE 的消息是否有过滤系统消息的逻辑?

  • 答:Python 端(Bridge API)没有过滤逻辑,所有消息都会推送到 SSE。

问题 2:检查项目是否有内置的过滤系统消息逻辑?

  • 答:有,在 OpenClaw Plugin 的入站处理器中(inbound-handler.ts 第 54-57 行)。

5.3 最终结论

✅ 当前实现是正确的,不存在问题。

  • SSE 推送层负责消息传输,不做业务逻辑过滤
  • OpenClaw 插件负责消息处理,正确过滤了系统消息
  • 分层设计清晰,符合软件工程最佳实践

无需修改现有代码。