文件:bridge_message_queue.py
- 类:
BridgeMessageQueue - 方法:
publish(account_id: str, message: dict) - 位置:第 48-68 行
async def publish(self, account_id: str, message: dict):
"""发布消息到所有订阅者,同时存入缓冲区"""
event_id = self._next_event_id()
message["event_id"] = event_id
# 存入缓冲区
with self._lock:
if account_id not in self._message_buffer:
self._message_buffer[account_id] = deque(maxlen=self.BUFFER_MAX_SIZE)
self._message_buffer[account_id].append(message)
subscribers = list(self._queues.get(account_id, []))
# 推送到所有订阅者队列
for q in subscribers:
try:
q.put_nowait(message)
except asyncio.QueueFull:
logger.warning(f"[Bridge] 订阅者队列已满,丢弃消息 event_id={event_id} account={account_id}")
logger.debug(f"[Bridge] 消息已发布 event_id={event_id} account={account_id} 订阅者数={len(subscribers)}")文件:bridge_api.py
- 端点:
GET /api/bridge/messages - 函数:
stream_messages() - 位置:第 68-109 行
@bridge_router.get("/messages")
async def stream_messages(
request: Request,
account_id: str = "default",
last_event_id: Optional[str] = Header(None, alias="Last-Event-ID"),
):
"""SSE 端点:持续推送指定账号的入站消息"""
queue = bridge_queue.subscribe(account_id)
async def event_generator():
try:
# 如果有 Last-Event-ID,先补发断线期间的消息
if last_event_id:
missed = bridge_queue.get_missed_messages(account_id, last_event_id)
for msg in missed:
eid = msg.get("event_id", "")
data = json.dumps(msg, ensure_ascii=False)
yield f"id: {eid}\nevent: message\ndata: {data}\n\n"
# 持续监听队列
while True:
# 检查客户端是否断开
if await request.is_disconnected():
break
try:
msg = await asyncio.wait_for(queue.get(), timeout=30.0)
eid = msg.get("event_id", "")
data = json.dumps(msg, ensure_ascii=False)
yield f"id: {eid}\nevent: message\ndata: {data}\n\n"
except asyncio.TimeoutError:
# 30 秒无消息,发送心跳保持连接
yield ": keepalive\n\n"
finally:
bridge_queue.unsubscribe(account_id, queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)Python 端(Bridge API):❌ 没有过滤逻辑
bridge_message_queue.py的publish()方法直接发布所有消息,没有任何过滤bridge_api.py的stream_messages()端点直接推送队列中的所有消息,没有过滤- 结论:Python 端不过滤系统消息,所有消息都会通过 SSE 推送
TypeScript 端(OpenClaw Plugin):✅ 有过滤逻辑
- 文件:
openclaw-plugin/src/inbound-handler.ts - 函数:
handleBridgeMessage() - 位置:第 54-57 行
// 忽略系统消息
if (data.contentType === "system") {
log?.debug?.(`[Xianyu][${accountId}] Ignoring system message`);
return;
}结论:过滤逻辑在 OpenClaw 插件的入站处理器中,而不是在 SSE 消息推送层。
文件:openclaw-plugin/src/types.ts
export interface BridgeMessageEvent {
messageId: string;
conversationId: string;
senderId: string;
senderName: string;
content: string;
contentType: "text" | "image" | "system"; // ← 系统消息标识
itemId?: string;
timestamp: number;
}判断条件:data.contentType === "system"
- 系统消息通过
contentType字段标识 - 当
contentType为"system"时,该消息被认为是系统消息 - 系统消息会被 OpenClaw 插件的入站处理器忽略,不会分发给 Agent
系统消息可能来自:
- 闲鱼平台的系统通知(如订单状态变更、平台公告等)
- WebSocket 连接的系统事件(如连接建立、断开等)
- XianyuLive 实例的内部事件(如 Cookie 刷新、账号状态变更等)
文件:clawdbot-feishu-repo/src/bot.ts
飞书插件没有显式的系统消息过滤,但有以下过滤逻辑:
-
消息去重(第 653 行):
if (!tryRecordMessage(messageId, dedupAccountId)) { log(`feishu: skipping duplicate message ${messageId}`); return; }
-
权限检查(第 680-720 行):
- 群组白名单检查
- 发送者白名单检查
- DM 策略检查(pairing/allowlist/open)
-
@提及检查(第 722-738 行):
if (requireMention && !ctx.mentionedBot) { log(`feishu[${account.accountId}]: message in group ${ctx.chatId} did not mention bot, recording to history`); if (chatHistories) { recordPendingHistoryEntryIfEnabled({ historyMap: chatHistories, historyKey: ctx.chatId, limit: historyLimit, entry: { sender: ctx.senderOpenId, body: `${ctx.senderName ?? ctx.senderOpenId}: ${ctx.content}`, timestamp: Date.now(), messageId: ctx.messageId, }, }); } return; }
飞书插件的过滤策略:
- ✅ 去重过滤
- ✅ 权限过滤
- ✅ @提及过滤
- ❌ 没有系统消息过滤(因为飞书 API 不会推送系统消息)
文件:openclaw-channel-dingtalk-repo/src/runtime.ts
钉钉插件的代码非常简洁,只提供了运行时初始化函数,没有消息处理逻辑。
推测:钉钉插件的消息过滤逻辑可能在其他文件中(未提供完整代码)。
文件:openclaw-china-repo/extensions/wecom/src/channel.ts
企业微信插件没有显式的系统消息过滤,但有以下过滤逻辑:
-
目标解析(第 42-90 行):
- 只接受
user:<userid>或group:<chatid>格式的目标 - 拒绝无效的目标格式
- 只接受
-
流式回复检查(第 200-220 行):
const streamAccepted = appendWecomActiveStreamChunk({ accountId: account.accountId, to: replyTarget, chunk: params.text, sessionKey: streamContext.sessionKey, runId: streamContext.runId, }); if (streamAccepted) { return { channel: "wecom", ok: true, messageId: `stream:${Date.now()}`, }; } const error = new Error( `No active stream available for ${replyTarget}. WeCom message tool is stream-only in current mode.` );
企业微信插件的过滤策略:
- ✅ 目标格式验证
- ✅ 流式回复检查
- ❌ 没有系统消息过滤(因为企业微信 API 不会推送系统消息)
✅ 不存在问题,现有机制已经正确处理系统消息。
原因:
-
Python 端(Bridge API)不过滤系统消息是正确的设计
- SSE 推送层应该是"透明"的,不应该做业务逻辑过滤
- 所有消息都应该推送到订阅者,由订阅者决定如何处理
-
TypeScript 端(OpenClaw Plugin)正确过滤了系统消息
- 入站处理器在接收到消息后,立即检查
contentType - 系统消息被忽略,不会分发给 Agent
- 这是正确的业务逻辑层过滤
- 入站处理器在接收到消息后,立即检查
-
分层设计清晰
- 传输层(SSE):负责消息推送,不做过滤
- 业务层(OpenClaw Plugin):负责消息处理,做业务逻辑过滤
闲鱼频道的消息过滤机制:
┌─────────────────────────────────────────────────────────────┐
│ XianyuLive (Python) │
│ ├─ WebSocket 接收消息 │
│ ├─ 解析消息类型(text/image/system) │
│ └─ 调用 bridge_queue.publish() 发布所有消息 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Bridge API (Python) │
│ ├─ SSE 端点 /api/bridge/messages │
│ └─ 推送所有消息(包括系统消息) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ OpenClaw Plugin (TypeScript) │
│ ├─ BridgeClient 接收 SSE 消息 │
│ ├─ ConnectionManager 管理连接 │
│ └─ handleBridgeMessage() 处理消息 │
│ ├─ ✅ 过滤系统消息 (contentType === "system") │
│ ├─ ✅ 过滤空内容消息 │
│ └─ ✅ 分发有效消息给 Agent │
└─────────────────────────────────────────────────────────────┘
过滤规则(在 OpenClaw Plugin 中):
// 1. 忽略系统消息
if (data.contentType === "system") {
log?.debug?.(`[Xianyu][${accountId}] Ignoring system message`);
return;
}
// 2. 忽略空内容
if (!data.content || !data.content.trim()) {
log?.debug?.(`[Xianyu][${accountId}] Ignoring empty message`);
return;
}| 插件 | 系统消息过滤 | 过滤位置 | 原因 |
|---|---|---|---|
| 闲鱼(Xianyu) | ✅ 有 | OpenClaw Plugin 入站处理器 | 闲鱼 WebSocket 会推送系统消息 |
| 飞书(Feishu) | ❌ 无 | - | 飞书 API 不推送系统消息 |
| 钉钉(DingTalk) | ❓ 未知 | - | 代码不完整,无法确认 |
| 企业微信(WeCom) | ❌ 无 | - | 企业微信 API 不推送系统消息 |
结论:闲鱼频道的系统消息过滤是必要的,因为闲鱼 WebSocket 会推送系统消息。
✅ 当前实现已经正确,无需修改。
如果未来需要增强,可以考虑:
-
在 Python 端添加可选的消息过滤配置
- 允许用户配置是否推送系统消息
- 但这会破坏 SSE 层的透明性,不推荐
-
在 OpenClaw Plugin 中添加更多过滤规则
- 例如:过滤特定类型的系统消息
- 例如:过滤特定发送者的消息
- 这些都应该在业务层(OpenClaw Plugin)实现
-
添加消息过滤日志
- 当前已经有
log?.debug?.()日志 - 可以考虑添加统计信息(过滤了多少系统消息)
- 当前已经有
- SSE 消息推送层(Python)不过滤系统消息 - 这是正确的设计
- OpenClaw 插件(TypeScript)正确过滤了系统消息 - 在入站处理器中
- 系统消息通过
contentType === "system"识别 - 其他插件(飞书、企业微信)不需要过滤系统消息 - 因为它们的 API 不推送系统消息
问题 1:检查发布到 SSE 的消息是否有过滤系统消息的逻辑?
- 答:Python 端(Bridge API)没有过滤逻辑,所有消息都会推送到 SSE。
问题 2:检查项目是否有内置的过滤系统消息逻辑?
- 答:有,在 OpenClaw Plugin 的入站处理器中(
inbound-handler.ts第 54-57 行)。
✅ 当前实现是正确的,不存在问题。
- SSE 推送层负责消息传输,不做业务逻辑过滤
- OpenClaw 插件负责消息处理,正确过滤了系统消息
- 分层设计清晰,符合软件工程最佳实践
无需修改现有代码。