diff --git a/docs/event-based-agents/00-overview.md b/docs/event-based-agents/00-overview.md new file mode 100644 index 00000000..6419d534 --- /dev/null +++ b/docs/event-based-agents/00-overview.md @@ -0,0 +1,196 @@ +# Event Based Agents 架构设计总览 + +## 1. 背景与动机 + +### 当前架构的局限性 + +LangBot 当前的平台适配器架构围绕**消息事件**单一场景设计: + +- **事件层面**:只监听 `FriendMessage`(私聊消息)和 `GroupMessage`(群消息)两种事件 +- **API 层面**:只暴露 `send_message` 和 `reply_message` 两个平台 API +- **处理层面**:所有消息统一进入 Pipeline 流水线处理,无法为不同事件类型配置不同处理逻辑 +- **适配器结构**:每个适配器是单个 Python 文件(200-800 行),随着功能增加难以维护 + +这导致以下问题: + +1. **无法处理非消息事件**:新成员入群、好友请求、消息撤回、消息编辑等大部分平台都支持的事件被完全忽略 +2. **平台能力未充分利用**:编辑消息、撤回消息、获取群成员列表、管理群组等 API 无法使用 +3. **插件能力受限**:插件只能监听消息事件、只能发送/回复消息,无法实现更丰富的交互 +4. **处理逻辑不灵活**:所有消息走同一条 Pipeline,无法为入群欢迎、好友自动通过等场景配置独立的处理流程 + +### 设计目标 + +Event Based Agents(EBA)架构旨在将 LangBot 从"消息处理平台"升级为"事件驱动的智能代理平台": + +- **丰富事件**:支持消息、群组、好友、Bot 状态等多种事件类型 +- **丰富 API**:支持消息编辑/撤回、群组管理、用户信息查询等通用 API,以及适配器特有 API 的透传调用 +- **灵活编排**:用户可在 WebUI 上为每个 Bot 的每种事件类型配置不同的处理器 +- **可扩展**:适配器可声明自己支持的事件和 API,平台特有能力通过标准机制暴露 +- **向后兼容**:现有插件无需修改即可在新架构下运行 + +## 2. 架构对比 + +### 现有架构 + +``` +消息平台 (Telegram/Discord/...) + │ + ▼ +平台适配器 (单文件, 只处理消息) + │ FriendMessage / GroupMessage + ▼ +RuntimeBot (注册 on_friend_message / on_group_message 回调) + │ + ▼ +MessageAggregator (消息聚合) + │ + ▼ +QueryPool → Controller → Pipeline (固定阶段链) + │ │ + │ ▼ + │ RequestRunner (local-agent / dify / n8n / ...) + │ + ▼ +adapter.reply_message() / adapter.send_message() +``` + +关键代码路径: +- 适配器基类:`langbot-plugin-sdk/.../abstract/platform/adapter.py` — `AbstractMessagePlatformAdapter` +- 事件定义:`langbot-plugin-sdk/.../builtin/platform/events.py` — 仅 `FriendMessage` / `GroupMessage` +- Bot 管理:`LangBot/src/langbot/pkg/platform/botmgr.py` — `RuntimeBot` 只注册两个消息回调 +- 流水线控制:`LangBot/src/langbot/pkg/pipeline/controller.py` — 从 QueryPool 消费并执行 Pipeline + +### 新架构(Event Based Agents) + +``` +消息平台 (Telegram/Discord/...) + │ + ▼ +平台适配器 (独立目录, 监听所有事件, 实现丰富 API) + │ MessageReceived / MemberJoined / FriendRequest / ... + ▼ +EventBus (统一事件总线) + │ + ▼ +EventRouter (事件路由引擎, 读取 Bot 的 event_handlers 配置) + │ + ├─→ PipelineHandler — 现有流水线(完整 Stage 链) + ├─→ AgentHandler — 直接调用 RequestRunner(轻量 AI 处理) + ├─→ WebhookHandler — POST 到外部服务(Dify/n8n webhook 等) + └─→ PluginHandler — 分发给插件 EventListener + │ + ▼ +统一平台 API + send / reply / edit / delete / getGroupInfo / getUserInfo / callPlatformApi / ... +``` + +## 3. 核心概念 + +### 3.1 统一事件体系 + +所有平台事件统一为命名空间式的事件类型: + +| 命名空间 | 事件 | 说明 | +|----------|------|------| +| `message.*` | `message.received`, `message.edited`, `message.deleted`, `message.reaction` | 消息相关 | +| `group.*` | `group.member_joined`, `group.member_left`, `group.member_banned`, `group.info_updated` | 群组相关 | +| `friend.*` | `friend.request_received`, `friend.added`, `friend.removed` | 好友相关 | +| `bot.*` | `bot.invited_to_group`, `bot.removed_from_group`, `bot.muted`, `bot.unmuted` | Bot 状态 | +| `platform.*` | `platform.{adapter}.{action}` | 适配器特有事件 | + +详见 [01-event-system.md](./01-event-system.md)。 + +### 3.2 统一平台 API + +扩展适配器基类,提供通用 API + 透传机制: + +| 类别 | API | 必需/可选 | +|------|-----|----------| +| 消息 | `send_message`, `reply_message`, `edit_message`, `delete_message`, `forward_message` | send/reply 必需,其余可选 | +| 群组 | `get_group_info`, `get_group_member_list`, `get_group_member_info`, `mute_member`, `kick_member` | 全部可选 | +| 用户 | `get_user_info`, `get_friend_list` | 全部可选 | +| 媒体 | `upload_file`, `get_file_url` | 全部可选 | +| 透传 | `call_platform_api(action, params)` | 可选 | + +详见 [02-platform-api.md](./02-platform-api.md)。 + +### 3.3 适配器新结构 + +每个适配器从单文件迁移到独立目录: + +``` +pkg/platform/adapters/ +├── _base/ # 基类和通用定义 +│ ├── adapter.py +│ ├── events.py +│ ├── entities.py +│ └── api.py +├── telegram/ +│ ├── __init__.py +│ ├── adapter.py # 主适配器类 +│ ├── event_converter.py # 事件转换(多种事件类型) +│ ├── message_converter.py # 消息链转换 +│ ├── api_impl.py # 通用 API 实现 +│ ├── platform_api.py # 平台特有 API +│ ├── types.py # 平台特有类型 +│ └── manifest.yaml +├── discord/ +│ └── ... +``` + +详见 [03-adapter-structure.md](./03-adapter-structure.md)。 + +### 3.4 事件处理器(Event Handler) + +四种处理器类型,用户在 WebUI 的 Bot 管理页面配置: + +| 类型 | 说明 | 适用场景 | +|------|------|----------| +| **pipeline** | 现有流水线机制,完整的多 Stage 处理链(PreProcessor → MessageProcessor → PostProcessor 等) | 复杂消息处理,需要完整的预处理/后处理流程 | +| **agent** | 直接调用 RequestRunner(local-agent / dify / n8n / coze / dashscope / langflow / tbox),从 Pipeline 中解耦 | 轻量级 AI 处理、直接对接外部 LLMOps 平台处理各类事件 | +| **webhook** | 将事件 POST 到外部 URL,根据响应执行动作 | 对接自建服务、Dify/n8n 的 Webhook 触发器、自定义后端 | +| **plugin** | 分发给插件 EventListener 处理 | 插件自定义逻辑 | + +配置存储在 Bot 表的 `event_handlers` JSON 字段中,通过 WebUI 编排面板管理。 + +详见 [04-event-routing.md](./04-event-routing.md)。 + +### 3.5 插件 SDK 改造 + +- 新事件类型全部暴露给插件 +- 新 API 全部通过 `LangBotAPIProxy` 暴露 +- 兼容层保证现有插件零修改运行 + +详见 [05-plugin-sdk.md](./05-plugin-sdk.md)。 + +## 4. 关键设计决策 + +| # | 决策点 | 选择 | 理由 | +|---|--------|------|------| +| 1 | 事件处理器配置粒度 | 每个 Bot 独立配置 | Bot 是用户操作的核心单元,不同 Bot 可能对接不同业务场景 | +| 2 | 适配器特有 API | 统一抽象 + `call_platform_api` 透传 | 通用 API 覆盖大部分场景,透传机制保证灵活性,避免每个适配器导出独立的类型化 API 包 | +| 3 | 向后兼容策略 | 兼容层适配 | 保留旧事件类型和 API 作为新系统的 alias/wrapper,现有插件无需修改 | +| 4 | 处理器配置存储 | Bot 表新增 `event_handlers` JSON 字段 | 简单直接,避免新增关联表;替代现有 `use_pipeline_uuid` | +| 5 | Agent 处理器定位 | 从 Pipeline 中解耦 RequestRunner | 不是所有事件都需要完整 Pipeline Stage 链;Agent 处理器提供轻量级 AI 处理路径,支持所有现有 Runner | +| 6 | 事件命名方式 | 命名空间式(`message.received`) | 清晰的分类层级,便于通配匹配(`message.*`),与 WebUI 配置天然对应 | + +## 5. 文档索引 + +| 文档 | 内容 | +|------|------| +| [01-event-system.md](./01-event-system.md) | 统一事件体系:事件分类、定义、生命周期 | +| [02-platform-api.md](./02-platform-api.md) | 统一平台 API:通用 API、透传 API、实体定义 | +| [03-adapter-structure.md](./03-adapter-structure.md) | 适配器新结构:目录布局、基类、注册机制 | +| [04-event-routing.md](./04-event-routing.md) | 事件路由与编排:路由引擎、处理器类型、WebUI 数据模型 | +| [05-plugin-sdk.md](./05-plugin-sdk.md) | 插件 SDK 改造:新事件/API、兼容层 | +| [06-migration-plan.md](./06-migration-plan.md) | 分阶段迁移计划 | + +## 6. 涉及的代码仓库 + +| 仓库 | 改动范围 | +|------|----------| +| **langbot-plugin-sdk** | 事件定义、实体模型、API 接口、适配器基类、通信协议扩展 | +| **LangBot**(后端) | 适配器实现、事件路由引擎、Bot 实体扩展、数据库迁移、RequestRunner 解耦 | +| **LangBot**(前端) | Bot 事件处理器编排面板 | +| **langbot-wiki** | 新架构文档、插件开发指南更新、适配器开发指南 | +| **langbot-plugin-demo** | 示例更新(使用新事件和 API) | diff --git a/docs/event-based-agents/01-event-system.md b/docs/event-based-agents/01-event-system.md new file mode 100644 index 00000000..bd61cca2 --- /dev/null +++ b/docs/event-based-agents/01-event-system.md @@ -0,0 +1,508 @@ +# 统一事件体系 + +## 1. 设计原则 + +- **命名空间分类**:事件类型采用 `{namespace}.{action}` 格式,如 `message.received` +- **通用优先**:大部分平台都支持的事件抽象为通用事件,定义统一的字段格式 +- **平台特有事件标准化**:各适配器的独有事件通过 `PlatformSpecificEvent` 承载,保留原始数据 +- **向后兼容**:现有 `FriendMessage` / `GroupMessage` 通过兼容层映射到新的 `message.received` 事件 + +## 2. 事件基类层次 + +``` +Event (事件基类) +├── MessageEvent (消息相关事件) +│ ├── MessageReceivedEvent # message.received +│ ├── MessageEditedEvent # message.edited +│ ├── MessageDeletedEvent # message.deleted +│ └── MessageReactionEvent # message.reaction +├── GroupEvent (群组相关事件) +│ ├── MemberJoinedEvent # group.member_joined +│ ├── MemberLeftEvent # group.member_left +│ ├── MemberBannedEvent # group.member_banned +│ ├── MemberUnbannedEvent # group.member_unbanned +│ └── GroupInfoUpdatedEvent # group.info_updated +├── FriendEvent (好友相关事件) +│ ├── FriendRequestReceivedEvent # friend.request_received +│ ├── FriendAddedEvent # friend.added +│ └── FriendRemovedEvent # friend.removed +├── BotEvent (Bot 状态事件) +│ ├── BotInvitedToGroupEvent # bot.invited_to_group +│ ├── BotRemovedFromGroupEvent # bot.removed_from_group +│ ├── BotMutedEvent # bot.muted +│ └── BotUnmutedEvent # bot.unmuted +└── PlatformSpecificEvent # platform.{adapter}.{action} +``` + +## 3. 通用事件定义 + +### 3.1 事件基类 + +```python +class Event(pydantic.BaseModel): + """事件基类""" + + type: str + """事件类型标识,如 'message.received'""" + + timestamp: float + """事件发生的时间戳""" + + bot_uuid: str + """接收到此事件的 Bot UUID""" + + adapter_name: str + """产生此事件的适配器名称""" + + source_platform_object: typing.Optional[typing.Any] = None + """原始平台事件对象,供适配器内部使用""" +``` + +### 3.2 消息事件 + +#### MessageReceivedEvent (`message.received`) + +收到新消息。这是最核心的事件,替代现有的 `FriendMessage` / `GroupMessage`。 + +```python +class MessageReceivedEvent(Event): + """收到新消息""" + + type: str = "message.received" + + message_id: typing.Union[int, str] + """消息 ID""" + + message_chain: MessageChain + """消息内容""" + + sender: User + """发送者""" + + chat_type: ChatType # "private" | "group" + """会话类型""" + + chat_id: typing.Union[int, str] + """会话 ID(私聊为对方用户 ID,群聊为群 ID)""" + + group: typing.Optional[Group] = None + """群信息(仅群聊时存在)""" +``` + +与现有类型的映射关系: +- `chat_type == "private"` → 等价于现有 `FriendMessage` +- `chat_type == "group"` → 等价于现有 `GroupMessage` + +`ChatType` 枚举: + +```python +class ChatType(str, Enum): + PRIVATE = "private" + GROUP = "group" +``` + +#### MessageEditedEvent (`message.edited`) + +消息被编辑。 + +```python +class MessageEditedEvent(Event): + """消息被编辑""" + + type: str = "message.edited" + + message_id: typing.Union[int, str] + """被编辑的消息 ID""" + + new_content: MessageChain + """编辑后的新内容""" + + editor: User + """编辑者""" + + chat_type: ChatType + chat_id: typing.Union[int, str] + group: typing.Optional[Group] = None +``` + +#### MessageDeletedEvent (`message.deleted`) + +消息被删除/撤回。 + +```python +class MessageDeletedEvent(Event): + """消息被删除/撤回""" + + type: str = "message.deleted" + + message_id: typing.Union[int, str] + """被删除的消息 ID""" + + operator: typing.Optional[User] = None + """操作者(可能是发送者自己撤回,也可能是管理员删除)""" + + chat_type: ChatType + chat_id: typing.Union[int, str] + group: typing.Optional[Group] = None +``` + +#### MessageReactionEvent (`message.reaction`) + +消息收到表情回应。 + +```python +class MessageReactionEvent(Event): + """消息收到表情回应""" + + type: str = "message.reaction" + + message_id: typing.Union[int, str] + """被回应的消息 ID""" + + user: User + """回应者""" + + reaction: str + """回应的表情标识(emoji 或平台特定表情 ID)""" + + is_add: bool + """True 为添加回应,False 为移除回应""" + + chat_type: ChatType + chat_id: typing.Union[int, str] + group: typing.Optional[Group] = None +``` + +### 3.3 群组事件 + +#### MemberJoinedEvent (`group.member_joined`) + +新成员加入群组。 + +```python +class MemberJoinedEvent(Event): + """新成员加入群组""" + + type: str = "group.member_joined" + + group: Group + """群组""" + + member: User + """加入的成员""" + + inviter: typing.Optional[User] = None + """邀请者(如有)""" + + join_type: typing.Optional[str] = None + """加入方式:'invite' / 'request' / 'direct' / None""" +``` + +#### MemberLeftEvent (`group.member_left`) + +成员离开群组。 + +```python +class MemberLeftEvent(Event): + """成员离开群组""" + + type: str = "group.member_left" + + group: Group + member: User + + is_kicked: bool = False + """是否被踢出""" + + operator: typing.Optional[User] = None + """操作者(踢出时为管理员)""" +``` + +#### MemberBannedEvent (`group.member_banned`) + +成员被禁言。 + +```python +class MemberBannedEvent(Event): + """成员被禁言""" + + type: str = "group.member_banned" + + group: Group + member: User + operator: typing.Optional[User] = None + duration: typing.Optional[int] = None + """禁言时长(秒),None 表示永久""" +``` + +#### MemberUnbannedEvent (`group.member_unbanned`) + +成员被解除禁言。 + +```python +class MemberUnbannedEvent(Event): + """成员被解除禁言""" + + type: str = "group.member_unbanned" + + group: Group + member: User + operator: typing.Optional[User] = None +``` + +#### GroupInfoUpdatedEvent (`group.info_updated`) + +群组信息被修改。 + +```python +class GroupInfoUpdatedEvent(Event): + """群组信息被修改""" + + type: str = "group.info_updated" + + group: Group + """更新后的群组信息""" + + operator: typing.Optional[User] = None + """操作者""" + + changed_fields: list[str] = [] + """发生变更的字段名列表,如 ['name', 'description']""" +``` + +### 3.4 好友事件 + +#### FriendRequestReceivedEvent (`friend.request_received`) + +收到好友请求。 + +```python +class FriendRequestReceivedEvent(Event): + """收到好友请求""" + + type: str = "friend.request_received" + + request_id: typing.Union[int, str] + """请求 ID,用于后续 approve/reject 操作""" + + user: User + """请求者""" + + message: typing.Optional[str] = None + """验证消息""" +``` + +#### FriendAddedEvent (`friend.added`) + +成功添加好友。 + +```python +class FriendAddedEvent(Event): + """成功添加好友""" + + type: str = "friend.added" + + user: User + """新好友""" +``` + +#### FriendRemovedEvent (`friend.removed`) + +好友被移除。 + +```python +class FriendRemovedEvent(Event): + """好友被移除""" + + type: str = "friend.removed" + + user: User + """被移除的好友""" +``` + +### 3.5 Bot 状态事件 + +#### BotInvitedToGroupEvent (`bot.invited_to_group`) + +Bot 被邀请加入群组。 + +```python +class BotInvitedToGroupEvent(Event): + """Bot 被邀请加入群组""" + + type: str = "bot.invited_to_group" + + group: Group + inviter: typing.Optional[User] = None + + request_id: typing.Optional[typing.Union[int, str]] = None + """邀请请求 ID,某些平台需要 Bot 确认才加入""" +``` + +#### BotRemovedFromGroupEvent (`bot.removed_from_group`) + +Bot 被移出群组。 + +```python +class BotRemovedFromGroupEvent(Event): + """Bot 被移出群组""" + + type: str = "bot.removed_from_group" + + group: Group + operator: typing.Optional[User] = None +``` + +#### BotMutedEvent / BotUnmutedEvent (`bot.muted` / `bot.unmuted`) + +Bot 被禁言/解除禁言。 + +```python +class BotMutedEvent(Event): + """Bot 被禁言""" + + type: str = "bot.muted" + + group: Group + operator: typing.Optional[User] = None + duration: typing.Optional[int] = None + + +class BotUnmutedEvent(Event): + """Bot 被解除禁言""" + + type: str = "bot.unmuted" + + group: Group + operator: typing.Optional[User] = None +``` + +### 3.6 平台特有事件 + +对于无法抽象为通用事件的平台特有事件,使用统一的 `PlatformSpecificEvent` 承载: + +```python +class PlatformSpecificEvent(Event): + """平台特有事件 + + 适配器无法映射到通用事件类型时,使用此类型承载。 + 插件可以通过 adapter_name + action 来识别和处理。 + """ + + type: str = "platform.specific" + + action: str + """平台特有的事件动作标识,如 'channel_created', 'pin_message'""" + + data: dict = {} + """事件数据,结构由具体适配器定义""" +``` + +事件类型字符串格式为 `platform.{adapter_name}.{action}`,例如: +- `platform.telegram.chat_member_updated` — Telegram 的群成员信息更新 +- `platform.discord.channel_created` — Discord 的频道创建 +- `platform.discord.voice_state_update` — Discord 的语音状态变更 +- `platform.slack.app_home_opened` — Slack 的 App Home 打开 + +## 4. 各平台事件支持矩阵 + +下表标注各通用事件在主要平台上的支持情况: + +| 事件 | Telegram | Discord | OneBot(QQ) | 飞书 | 钉钉 | Slack | 微信 | LINE | KOOK | +|------|----------|---------|-----------|------|------|-------|------|------|------| +| `message.received` | Y | Y | Y | Y | Y | Y | Y | Y | Y | +| `message.edited` | Y | Y | N | Y | N | Y | N | N | Y | +| `message.deleted` | Y | Y | Y | Y | N | Y | Y | N | Y | +| `message.reaction` | Y | Y | Y | Y | Y | Y | N | N | Y | +| `group.member_joined` | Y | Y | Y | Y | Y | Y | Y | Y | Y | +| `group.member_left` | Y | Y | Y | Y | Y | Y | Y | Y | Y | +| `group.member_banned` | Y | Y | Y | N | N | N | N | N | N | +| `group.info_updated` | Y | Y | Y | Y | Y | Y | N | N | Y | +| `friend.request_received` | N | Y | Y | N | N | N | Y | Y | Y | +| `friend.added` | N | Y | Y | N | N | N | Y | Y | N | +| `bot.invited_to_group` | Y | Y | Y | Y | Y | Y | Y | N | Y | +| `bot.removed_from_group` | Y | Y | Y | Y | N | N | Y | N | Y | +| `bot.muted` | N | N | Y | N | N | N | N | N | N | + +> 注:此表为初步评估,具体以各平台 SDK/API 文档为准,实施时逐个确认。 + +## 5. 事件生命周期 + +``` +1. 平台 SDK 回调触发 + │ +2. 适配器 EventConverter.target2yiri(raw_event) + │ 将平台原生事件转换为统一 Event 对象 + │ 无法映射的事件 → PlatformSpecificEvent + │ +3. 适配器回调注册的 listener(event, adapter) + │ +4. RuntimeBot 接收事件 + │ +5. EventBus 分发 + │ +6. EventRouter 查询 Bot 的 event_handlers 配置 + │ 匹配事件类型 → 找到对应的 Handler + │ 支持通配符:'message.*' 匹配所有消息事件 + │ 未匹配到 → 走默认 Handler(plugin,保持向后兼容) + │ +7. Handler 处理事件 + │ PipelineHandler → 进入 Pipeline 流水线 + │ AgentHandler → 调用 RequestRunner + │ WebhookHandler → POST 到外部 URL + │ PluginHandler → 分发给插件 EventListener + │ +8. Handler 执行完毕,可能通过 API 执行响应动作 + (发消息、编辑消息、踢人、同意好友请求等) +``` + +## 6. 与现有事件类型的兼容映射 + +为保证现有插件不受影响,建立以下映射关系: + +| 新事件 | 条件 | 旧事件 | +|--------|------|--------| +| `MessageReceivedEvent` (chat_type=private) | — | `FriendMessage` | +| `MessageReceivedEvent` (chat_type=group) | — | `GroupMessage` | + +在插件 SDK 层面: + +| 新事件 | 旧插件事件 | +|--------|-----------| +| `MessageReceivedEvent` (chat_type=private, 非命令) | `PersonNormalMessageReceived` | +| `MessageReceivedEvent` (chat_type=group, 非命令) | `GroupNormalMessageReceived` | +| `MessageReceivedEvent` (chat_type=private, 命令) | `PersonCommandSent` | +| `MessageReceivedEvent` (chat_type=group, 命令) | `GroupCommandSent` | +| `MessageReceivedEvent` (处理完毕后) | `NormalMessageResponded` | + +兼容层在事件分发给插件 EventListener 时自动生成旧格式事件,确保监听旧事件类型的插件仍能正常工作。 + +## 7. 事件类型注册表 + +适配器在 manifest.yaml 中声明自己支持的事件类型: + +```yaml +kind: MessagePlatformAdapter +metadata: + name: telegram +spec: + supported_events: + - message.received + - message.edited + - message.deleted + - message.reaction + - group.member_joined + - group.member_left + - group.member_banned + - group.info_updated + - bot.invited_to_group + - bot.removed_from_group + platform_specific_events: + - chat_member_updated + - chat_join_request +``` + +这份声明用于: +1. WebUI 在配置事件处理器时,只显示当前 Bot 的适配器支持的事件类型 +2. EventRouter 在路由时校验事件类型有效性 +3. 文档自动生成 diff --git a/docs/event-based-agents/02-platform-api.md b/docs/event-based-agents/02-platform-api.md new file mode 100644 index 00000000..f67d11f4 --- /dev/null +++ b/docs/event-based-agents/02-platform-api.md @@ -0,0 +1,546 @@ +# 统一平台 API 与实体定义 + +## 1. 设计原则 + +- **通用 API 抽象**:大部分平台都支持的操作(发消息、获取群信息等)定义为通用 API 方法 +- **required / optional 标记**:每个 API 标记为必需或可选,适配器未实现可选 API 时抛出 `NotSupportedError` +- **透传机制**:适配器特有的操作通过 `call_platform_api(action, params)` 统一入口透传调用 +- **能力声明**:适配器在 manifest 中声明自己支持的 API 列表,供 WebUI 和插件查询 +- **实体统一**:通用实体(User、Group 等)在 SDK 层面统一定义,适配器负责转换 + +## 2. 通用实体定义 + +### 2.1 现有实体回顾 + +当前 SDK 已有以下实体(`langbot_plugin/api/entities/builtin/platform/entities.py`): + +```python +Entity(id) +├── Friend(id, nickname, remark) +├── Group(id, name, permission) +└── GroupMember(id, member_name, permission, group, special_title) +``` + +### 2.2 新实体设计 + +扩展实体体系,保持向后兼容: + +```python +class User(pydantic.BaseModel): + """用户实体(统一表示)""" + + id: typing.Union[int, str] + """用户 ID""" + + nickname: str = "" + """昵称""" + + avatar_url: typing.Optional[str] = None + """头像 URL""" + + is_bot: bool = False + """是否为 Bot""" + + # 以下为可选的扩展信息,不同平台可能部分为空 + username: typing.Optional[str] = None + """用户名(如 Telegram 的 @username)""" + + remark: typing.Optional[str] = None + """备注名""" + + +class Group(pydantic.BaseModel): + """群组实体""" + + id: typing.Union[int, str] + """群组 ID""" + + name: str = "" + """群组名称""" + + description: typing.Optional[str] = None + """群组描述""" + + member_count: typing.Optional[int] = None + """成员数量""" + + avatar_url: typing.Optional[str] = None + """群组头像 URL""" + + owner_id: typing.Optional[typing.Union[int, str]] = None + """群主 ID""" + + +class GroupMember(pydantic.BaseModel): + """群成员实体""" + + user: User + """用户信息""" + + group_id: typing.Union[int, str] + """所属群组 ID""" + + role: MemberRole + """群内角色""" + + display_name: typing.Optional[str] = None + """群内显示名""" + + joined_at: typing.Optional[float] = None + """加入群组的时间戳""" + + title: typing.Optional[str] = None + """群头衔/特殊称号""" + + +class MemberRole(str, Enum): + """群成员角色""" + OWNER = "owner" + ADMIN = "admin" + MEMBER = "member" +``` + +### 2.3 与现有实体的兼容映射 + +| 新实体 | 旧实体 | 映射方式 | +|--------|--------|----------| +| `User` | `Friend` | `User(id=friend.id, nickname=friend.nickname, remark=friend.remark)` | +| `Group` | `Group`(旧) | `Group(id=old.id, name=old.name)` + `permission` 字段弃用 | +| `GroupMember` | `GroupMember`(旧) | `GroupMember(user=User(...), role=..., display_name=old.member_name)` | +| `MemberRole` | `Permission` | `OWNER↔Owner`, `ADMIN↔Administrator`, `MEMBER↔Member` | + +旧实体类保留,标记为 `@deprecated`,内部通过转换方法桥接到新实体。 + +## 3. 通用 API 定义 + +### 3.1 API 方法一览 + +#### 消息 API + +| 方法 | 必需/可选 | 说明 | +|------|----------|------| +| `send_message(target_type, target_id, message)` | **必需** | 主动发送消息 | +| `reply_message(event, message, quote_origin)` | **必需** | 回复一个消息事件 | +| `edit_message(chat_type, chat_id, message_id, new_content)` | 可选 | 编辑已发送的消息 | +| `delete_message(chat_type, chat_id, message_id)` | 可选 | 删除/撤回消息 | +| `forward_message(from_chat, message_id, to_chat_type, to_chat_id)` | 可选 | 转发消息到另一个会话 | +| `get_message(chat_type, chat_id, message_id)` | 可选 | 获取指定消息的内容 | + +#### 群组 API + +| 方法 | 必需/可选 | 说明 | +|------|----------|------| +| `get_group_info(group_id)` | 可选 | 获取群组信息 | +| `get_group_list()` | 可选 | 获取 Bot 加入的群组列表 | +| `get_group_member_list(group_id)` | 可选 | 获取群成员列表 | +| `get_group_member_info(group_id, user_id)` | 可选 | 获取指定群成员信息 | +| `set_group_name(group_id, name)` | 可选 | 修改群名称 | +| `mute_member(group_id, user_id, duration)` | 可选 | 禁言群成员 | +| `unmute_member(group_id, user_id)` | 可选 | 解除禁言 | +| `kick_member(group_id, user_id)` | 可选 | 踢出群成员 | +| `leave_group(group_id)` | 可选 | Bot 退出群组 | + +#### 用户 API + +| 方法 | 必需/可选 | 说明 | +|------|----------|------| +| `get_user_info(user_id)` | 可选 | 获取用户信息 | +| `get_friend_list()` | 可选 | 获取好友列表 | +| `approve_friend_request(request_id, approve, remark)` | 可选 | 处理好友请求 | +| `approve_group_invite(request_id, approve)` | 可选 | 处理入群邀请 | + +#### 媒体 API + +| 方法 | 必需/可选 | 说明 | +|------|----------|------| +| `upload_file(file_data, filename)` | 可选 | 上传文件,返回可引用的文件 ID 或 URL | +| `get_file_url(file_id)` | 可选 | 获取文件下载 URL | + +#### 透传 API + +| 方法 | 必需/可选 | 说明 | +|------|----------|------| +| `call_platform_api(action, params)` | 可选 | 调用适配器特有 API | + +### 3.2 API 方法签名详解 + +```python +class AbstractPlatformAdapter(pydantic.BaseModel, metaclass=abc.ABCMeta): + """平台适配器基类(新版)""" + + # ======== 必需方法 ======== + + @abc.abstractmethod + async def send_message( + self, + target_type: str, # "private" | "group" + target_id: typing.Union[int, str], + message: MessageChain, + ) -> MessageResult: + """主动发送消息 + + Returns: + MessageResult: 包含 message_id 等发送结果 + """ + ... + + @abc.abstractmethod + async def reply_message( + self, + event: MessageReceivedEvent, + message: MessageChain, + quote_origin: bool = False, + ) -> MessageResult: + """回复一个消息事件""" + ... + + # ======== 可选消息方法 ======== + + async def edit_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + new_content: MessageChain, + ) -> None: + """编辑已发送的消息""" + raise NotSupportedError("edit_message") + + async def delete_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> None: + """删除/撤回消息""" + raise NotSupportedError("delete_message") + + async def forward_message( + self, + from_chat_type: str, + from_chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + to_chat_type: str, + to_chat_id: typing.Union[int, str], + ) -> MessageResult: + """转发消息""" + raise NotSupportedError("forward_message") + + async def get_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> MessageReceivedEvent: + """获取指定消息""" + raise NotSupportedError("get_message") + + # ======== 可选群组方法 ======== + + async def get_group_info( + self, + group_id: typing.Union[int, str], + ) -> Group: + """获取群组信息""" + raise NotSupportedError("get_group_info") + + async def get_group_list(self) -> list[Group]: + """获取 Bot 加入的群组列表""" + raise NotSupportedError("get_group_list") + + async def get_group_member_list( + self, + group_id: typing.Union[int, str], + ) -> list[GroupMember]: + """获取群成员列表""" + raise NotSupportedError("get_group_member_list") + + async def get_group_member_info( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> GroupMember: + """获取指定群成员信息""" + raise NotSupportedError("get_group_member_info") + + async def set_group_name( + self, + group_id: typing.Union[int, str], + name: str, + ) -> None: + """修改群名称""" + raise NotSupportedError("set_group_name") + + async def mute_member( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + duration: int = 0, + ) -> None: + """禁言群成员,duration 为秒数,0 表示永久""" + raise NotSupportedError("mute_member") + + async def unmute_member( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> None: + """解除禁言""" + raise NotSupportedError("unmute_member") + + async def kick_member( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> None: + """踢出群成员""" + raise NotSupportedError("kick_member") + + async def leave_group( + self, + group_id: typing.Union[int, str], + ) -> None: + """Bot 退出群组""" + raise NotSupportedError("leave_group") + + # ======== 可选用户方法 ======== + + async def get_user_info( + self, + user_id: typing.Union[int, str], + ) -> User: + """获取用户信息""" + raise NotSupportedError("get_user_info") + + async def get_friend_list(self) -> list[User]: + """获取好友列表""" + raise NotSupportedError("get_friend_list") + + async def approve_friend_request( + self, + request_id: typing.Union[int, str], + approve: bool = True, + remark: typing.Optional[str] = None, + ) -> None: + """处理好友请求""" + raise NotSupportedError("approve_friend_request") + + async def approve_group_invite( + self, + request_id: typing.Union[int, str], + approve: bool = True, + ) -> None: + """处理入群邀请""" + raise NotSupportedError("approve_group_invite") + + # ======== 可选媒体方法 ======== + + async def upload_file( + self, + file_data: bytes, + filename: str, + ) -> str: + """上传文件,返回文件 ID 或 URL""" + raise NotSupportedError("upload_file") + + async def get_file_url( + self, + file_id: str, + ) -> str: + """获取文件下载 URL""" + raise NotSupportedError("get_file_url") + + # ======== 透传 API ======== + + async def call_platform_api( + self, + action: str, + params: dict = {}, + ) -> dict: + """调用适配器特有 API + + Args: + action: 平台特有的 API 动作标识 + params: 参数字典 + + Returns: + dict: 返回结果 + + Examples: + # Telegram: pin 消息 + await adapter.call_platform_api("pin_message", { + "chat_id": 123456, + "message_id": 789 + }) + + # Discord: 创建频道 + await adapter.call_platform_api("create_channel", { + "guild_id": "...", + "name": "new-channel", + "type": "text" + }) + """ + raise NotSupportedError("call_platform_api") + + # ======== 流式输出(保留现有机制) ======== + + async def reply_message_chunk( + self, + event: MessageReceivedEvent, + bot_message: dict, + message: MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ): + """流式回复消息""" + raise NotSupportedError("reply_message_chunk") + + async def is_stream_output_supported(self) -> bool: + """是否支持流式输出""" + return False + + # ======== 生命周期方法(保留现有) ======== + + @abc.abstractmethod + async def run_async(self): + """启动适配器""" + ... + + @abc.abstractmethod + async def kill(self) -> bool: + """停止适配器""" + ... + + @abc.abstractmethod + def register_listener(self, event_type, callback): + """注册事件监听器""" + ... + + @abc.abstractmethod + def unregister_listener(self, event_type, callback): + """注销事件监听器""" + ... +``` + +### 3.3 返回值类型 + +```python +class MessageResult(pydantic.BaseModel): + """消息发送结果""" + + message_id: typing.Optional[typing.Union[int, str]] = None + """发送成功后的消息 ID""" + + raw: typing.Optional[dict] = None + """平台原始返回数据""" + + +class NotSupportedError(Exception): + """适配器未实现此 API""" + + def __init__(self, api_name: str): + self.api_name = api_name + super().__init__(f"API not supported by this adapter: {api_name}") +``` + +## 4. API 能力声明 + +适配器在 manifest.yaml 中声明支持的 API: + +```yaml +kind: MessagePlatformAdapter +metadata: + name: telegram +spec: + supported_apis: + required: + - send_message + - reply_message + optional: + - edit_message + - delete_message + - get_group_info + - get_group_member_list + - get_user_info + - upload_file + - get_file_url + - call_platform_api + platform_specific_apis: + - action: pin_message + description: "Pin a message in a chat" + params_schema: + chat_id: { type: "string", required: true } + message_id: { type: "string", required: true } + - action: unpin_message + description: "Unpin a message" + params_schema: + chat_id: { type: "string", required: true } + message_id: { type: "string", required: true } +``` + +用途: +1. **WebUI**:在配置界面展示当前 Bot 可用的 API 能力 +2. **插件**:插件可查询某个 Bot 是否支持特定 API,据此决定行为 +3. **文档**:自动生成各适配器的 API 支持矩阵 + +## 5. 各平台 API 支持矩阵 + +| API | Telegram | Discord | OneBot(QQ) | 飞书 | 钉钉 | Slack | 微信 | LINE | KOOK | +|-----|----------|---------|-----------|------|------|-------|------|------|------| +| `send_message` | Y | Y | Y | Y | Y | Y | Y | Y | Y | +| `reply_message` | Y | Y | Y | Y | Y | Y | Y | Y | Y | +| `edit_message` | Y | Y | N | Y | N | Y | N | N | Y | +| `delete_message` | Y | Y | Y | Y | N | Y | Y | N | Y | +| `forward_message` | Y | N | Y | Y | N | N | Y | N | N | +| `get_group_info` | Y | Y | Y | Y | Y | Y | N | Y | Y | +| `get_group_member_list` | Y | Y | Y | Y | Y | Y | N | Y | Y | +| `get_user_info` | Y | Y | Y | Y | Y | Y | N | Y | Y | +| `get_friend_list` | N | Y | Y | N | N | N | Y | N | N | +| `mute_member` | Y | Y | Y | N | N | N | N | N | N | +| `kick_member` | Y | Y | Y | N | N | N | N | N | Y | +| `upload_file` | Y | Y | Y | Y | Y | Y | Y | Y | Y | +| `call_platform_api` | Y | Y | Y | Y | Y | Y | Y | Y | Y | + +> 注:此表为初步评估,具体以各平台 SDK/API 文档为准。 + +## 6. MessageChain 扩展 + +### 6.1 保留的通用组件 + +以下 MessageComponent 类型保持不变,继续作为通用消息元素: + +- `Source` — 消息元信息 +- `Plain` — 纯文本 +- `Quote` — 引用回复 +- `At` / `AtAll` — @提及 +- `Image` — 图片 +- `Voice` — 语音 +- `File` — 文件 +- `Forward` — 合并转发 +- `Face` — 表情 +- `Unknown` — 未知类型 + +### 6.2 平台特有组件处理 + +当前 MessageChain 中存在大量微信特有的组件类型(`WeChatMiniPrograms`, `WeChatEmoji`, `WeChatLink` 等)。在新架构下: + +- 这些类型**继续保留**在 SDK 中以保持兼容 +- 新增的平台特有消息组件统一使用 `PlatformComponent` 基类: + +```python +class PlatformComponent(MessageComponent): + """平台特有的消息组件""" + + type: str = "Platform" + + platform: str + """平台标识""" + + component_type: str + """组件类型""" + + data: dict = {} + """组件数据""" +``` + +适配器在转换消息链时,对于无法映射到通用组件的平台特有内容,使用 `PlatformComponent` 承载。 diff --git a/docs/event-based-agents/03-adapter-structure.md b/docs/event-based-agents/03-adapter-structure.md new file mode 100644 index 00000000..44147e6a --- /dev/null +++ b/docs/event-based-agents/03-adapter-structure.md @@ -0,0 +1,479 @@ +# 适配器新目录结构 + +## 1. 设计目标 + +- **模块化**:每个适配器从单文件拆分到独立目录,各模块职责清晰 +- **可维护**:随着事件和 API 的增加,代码量会显著增长,目录结构有助于管理复杂度 +- **一致性**:所有适配器遵循相同的目录布局和文件命名约定 +- **兼容现有发现机制**:保持 YAML manifest + ComponentDiscoveryEngine 的注册体系 + +## 2. 新目录布局 + +### 2.1 整体结构 + +``` +pkg/platform/ +├── __init__.py +├── botmgr.py # PlatformManager + RuntimeBot(重构) +├── event_bus.py # EventBus(新增) +├── event_router.py # EventRouter(新增) +├── logger.py # EventLogger(保留) +├── webhook_pusher.py # WebhookPusher(重构为 WebhookHandler) +│ +├── adapters/ # 适配器(新目录) +│ ├── __init__.py +│ │ +│ ├── telegram/ +│ │ ├── __init__.py +│ │ ├── adapter.py # TelegramAdapter 主类 +│ │ ├── event_converter.py # 平台事件 → 统一事件 +│ │ ├── message_converter.py # MessageChain 互转 +│ │ ├── api_impl.py # 通用 API 实现 +│ │ ├── platform_api.py # call_platform_api 的动作映射 +│ │ ├── types.py # 平台特有类型定义 +│ │ └── manifest.yaml # 适配器清单 +│ │ +│ ├── discord/ +│ │ ├── __init__.py +│ │ ├── adapter.py +│ │ ├── event_converter.py +│ │ ├── message_converter.py +│ │ ├── api_impl.py +│ │ ├── platform_api.py +│ │ ├── types.py +│ │ ├── voice.py # Discord 语音连接管理(特有) +│ │ └── manifest.yaml +│ │ +│ ├── aiocqhttp/ # OneBot v11 (QQ) +│ │ └── ... +│ ├── qqofficial/ +│ │ └── ... +│ ├── lark/ # 飞书 +│ │ └── ... +│ ├── dingtalk/ +│ │ └── ... +│ ├── slack/ +│ │ └── ... +│ ├── wechatpad/ +│ │ └── ... +│ ├── officialaccount/ # 微信公众号 +│ │ └── ... +│ ├── wecom/ # 企业微信 +│ │ └── ... +│ ├── wecombot/ +│ │ └── ... +│ ├── wecomcs/ +│ │ └── ... +│ ├── kook/ +│ │ └── ... +│ ├── line/ +│ │ └── ... +│ ├── satori/ +│ │ └── ... +│ ├── websocket/ # 内置 WebSocket 适配器 +│ │ ├── __init__.py +│ │ ├── adapter.py +│ │ ├── manager.py # WebSocket 连接管理 +│ │ └── manifest.yaml +│ │ +│ └── legacy/ # 旧版适配器(保留一段时间后移除) +│ ├── gewechat/ +│ ├── nakuru/ +│ └── qqbotpy/ +│ +└── handlers/ # 事件处理器实现(新增) + ├── __init__.py + ├── base.py # AbstractEventHandler 基类 + ├── pipeline_handler.py # PipelineHandler + ├── agent_handler.py # AgentHandler + ├── webhook_handler.py # WebhookHandler + └── plugin_handler.py # PluginHandler +``` + +### 2.2 适配器目录内各文件职责 + +以 Telegram 为例: + +| 文件 | 职责 | 关键类/函数 | +|------|------|------------| +| `adapter.py` | 主入口,继承 `AbstractPlatformAdapter`,组装其他模块 | `TelegramAdapter` | +| `event_converter.py` | 将 Telegram 原生事件转换为统一事件类型 | `TelegramEventConverter` — 支持 Message/Edit/Delete/Reaction/MemberJoin 等所有事件 | +| `message_converter.py` | `MessageChain` 与 Telegram 消息格式互转 | `TelegramMessageConverter.yiri2target()` / `target2yiri()` | +| `api_impl.py` | 实现通用 API 方法(edit_message, delete_message, get_group_info 等) | 各 API 方法的 Telegram 实现 | +| `platform_api.py` | 实现 `call_platform_api` 的动作分发表 | `PLATFORM_API_MAP = {"pin_message": ..., "unpin_message": ...}` | +| `types.py` | 平台特有的类型定义 | Telegram 特有的枚举、配置结构等 | +| `manifest.yaml` | 适配器清单:名称、配置 schema、支持的事件和 API 列表 | — | + +## 3. 新基类设计 + +### 3.1 AbstractPlatformAdapter + +新基类继承自现有 `AbstractMessagePlatformAdapter` 并扩展,位于 `langbot-plugin-sdk` 中: + +```python +# langbot_plugin/api/definition/abstract/platform/adapter.py + +class AbstractPlatformAdapter(pydantic.BaseModel, metaclass=abc.ABCMeta): + """平台适配器基类(EBA 版本) + + 相比旧版 AbstractMessagePlatformAdapter: + - 新增通用 API 方法(edit_message, delete_message, get_group_info 等) + - 新增透传 API(call_platform_api) + - 新增能力声明(get_supported_events, get_supported_apis) + - 事件监听器支持所有事件类型,不仅限于消息事件 + """ + + bot_account_id: str = "" + config: dict + logger: AbstractEventLogger = pydantic.Field(exclude=True) + + class Config: + arbitrary_types_allowed = True + + # ---- 能力声明 ---- + + def get_supported_events(self) -> list[str]: + """返回此适配器支持的事件类型列表 + + 默认实现从 manifest.yaml 读取。 + 适配器也可以 override 此方法动态声明。 + """ + return ["message.received"] + + def get_supported_apis(self) -> list[str]: + """返回此适配器支持的 API 列表 + + 默认实现从 manifest.yaml 读取。 + """ + return ["send_message", "reply_message"] + + # ---- 必需方法(抽象) ---- + + @abc.abstractmethod + async def send_message(self, target_type, target_id, message) -> MessageResult: + ... + + @abc.abstractmethod + async def reply_message(self, event, message, quote_origin=False) -> MessageResult: + ... + + @abc.abstractmethod + async def run_async(self): + ... + + @abc.abstractmethod + async def kill(self) -> bool: + ... + + @abc.abstractmethod + def register_listener(self, event_type, callback): + ... + + @abc.abstractmethod + def unregister_listener(self, event_type, callback): + ... + + # ---- 可选方法(默认抛 NotSupportedError) ---- + # edit_message, delete_message, forward_message, + # get_group_info, get_group_member_list, ... + # call_platform_api, ... + # (完整签名见 02-platform-api.md) + + # ---- 流式输出(保留) ---- + + async def reply_message_chunk(self, event, bot_message, message, + quote_origin=False, is_final=False): + raise NotSupportedError("reply_message_chunk") + + async def is_stream_output_supported(self) -> bool: + return False + + # ---- 消息卡片(保留) ---- + + async def create_message_card(self, message_id, event) -> bool: + return False + + async def is_muted(self, group_id) -> bool: + return False +``` + +### 3.2 AbstractMessagePlatformAdapter 兼容 + +旧的 `AbstractMessagePlatformAdapter` 保留为 `AbstractPlatformAdapter` 的类型别名: + +```python +# 向后兼容 +AbstractMessagePlatformAdapter = AbstractPlatformAdapter +``` + +现有适配器代码中的 `AbstractMessagePlatformAdapter` 引用不需要立即修改。 + +### 3.3 EventConverter 新设计 + +现有 `AbstractEventConverter` 只有 `target2yiri` 和 `yiri2target` 两个静态方法,且只处理消息事件。 + +新设计支持多种事件类型: + +```python +class AbstractEventConverter: + """事件转换器基类(EBA 版本) + + 适配器需要实现此转换器,将平台原生事件转换为统一事件。 + """ + + @staticmethod + def target2yiri(raw_event: typing.Any) -> typing.Optional[Event]: + """将平台原生事件转换为统一事件 + + Args: + raw_event: 平台 SDK 回调传入的原始事件对象 + + Returns: + 统一 Event 对象,如果无法转换或不需要处理则返回 None + """ + raise NotImplementedError + + @staticmethod + def yiri2target(event: Event) -> typing.Any: + """将统一事件转换为平台原生事件(一般不需要)""" + raise NotImplementedError +``` + +具体适配器的 EventConverter 实现会是一个分发式的结构: + +```python +class TelegramEventConverter(AbstractEventConverter): + """Telegram 事件转换器""" + + @staticmethod + def target2yiri(update: telegram.Update) -> typing.Optional[Event]: + # 消息事件 + if update.message: + return TelegramEventConverter._convert_message(update) + # 消息编辑 + if update.edited_message: + return TelegramEventConverter._convert_edited_message(update) + # 成员变动 + if update.chat_member: + return TelegramEventConverter._convert_chat_member(update) + # 回调查询(按钮点击等) + if update.callback_query: + return TelegramEventConverter._convert_callback_query(update) + # 其他 → PlatformSpecificEvent + return TelegramEventConverter._convert_platform_specific(update) + + @staticmethod + def _convert_message(update) -> MessageReceivedEvent: + ... + + @staticmethod + def _convert_edited_message(update) -> MessageEditedEvent: + ... + + @staticmethod + def _convert_chat_member(update) -> typing.Union[ + MemberJoinedEvent, MemberLeftEvent, ... + ]: + ... + + @staticmethod + def _convert_platform_specific(update) -> PlatformSpecificEvent: + ... +``` + +## 4. Manifest 文件格式扩展 + +现有 manifest.yaml 只声明 `kind`, `metadata`, `spec.config`, `execution`。 + +新增 `spec.supported_events` 和 `spec.supported_apis`: + +```yaml +apiVersion: v1 +kind: MessagePlatformAdapter + +metadata: + name: telegram + label: + en_US: Telegram + zh_Hans: Telegram + icon: telegram.svg + description: + en_US: Telegram Bot adapter + zh_Hans: Telegram Bot 适配器 + +spec: + config: + # 现有配置 schema(保持不变) + - key: token + label: { en_US: "Bot Token", zh_Hans: "Bot Token" } + type: string + required: true + sensitive: true + # ... + + supported_events: + - message.received + - message.edited + - message.deleted + - message.reaction + - group.member_joined + - group.member_left + - group.member_banned + - group.info_updated + - bot.invited_to_group + - bot.removed_from_group + + supported_apis: + required: + - send_message + - reply_message + optional: + - edit_message + - delete_message + - get_group_info + - get_group_member_list + - get_group_member_info + - get_user_info + - upload_file + - get_file_url + - call_platform_api + + platform_specific_apis: + - action: pin_message + description: { en_US: "Pin a message", zh_Hans: "置顶消息" } + - action: unpin_message + description: { en_US: "Unpin a message", zh_Hans: "取消置顶" } + - action: get_chat_administrators + description: { en_US: "Get chat admins", zh_Hans: "获取群管理员列表" } + +execution: + python: + path: pkg/platform/adapters/telegram/adapter.py + attr: TelegramAdapter +``` + +## 5. 适配器注册与发现 + +### 5.1 Blueprint 更新 + +`templates/components.yaml` 中更新扫描路径: + +```yaml +kind: Blueprint +spec: + components: + MessagePlatformAdapter: + fromDirs: + - path: pkg/platform/adapters/ # 新路径 +``` + +`ComponentDiscoveryEngine` 的递归扫描逻辑不变——它会扫描所有子目录中的 `.yaml` 文件。因此每个适配器目录下的 `manifest.yaml` 会被自动发现。 + +### 5.2 PlatformManager 适配 + +`PlatformManager.initialize()` 的核心逻辑基本不变: + +```python +async def initialize(self): + # 1. 发现适配器组件(自动扫描新目录结构) + self.adapter_components = self.ap.discover.get_components_by_kind('MessagePlatformAdapter') + + # 2. 动态导入适配器类 + for component in self.adapter_components: + self.adapter_dict[component.metadata.name] = component.get_python_component_class() + + # 3. 从数据库加载 Bot 并实例化适配器(不变) + await self.load_bots_from_db() +``` + +变更点: +- `execution.python.path` 从 `pkg/platform/sources/telegram.py` 变为 `pkg/platform/adapters/telegram/adapter.py` +- `get_python_component_class()` 正常工作,因为它按路径动态导入 + +## 6. RuntimeBot 重构 + +### 6.1 现有问题 + +当前 `RuntimeBot.initialize()` 硬编码注册了两个回调: + +```python +# 现有代码 +self.adapter.register_listener(platform_events.FriendMessage, on_friend_message) +self.adapter.register_listener(platform_events.GroupMessage, on_group_message) +``` + +### 6.2 新设计 + +`RuntimeBot` 改为注册一个通用的事件回调: + +```python +class RuntimeBot: + async def initialize(self): + # 注册通用事件回调,接收所有事件类型 + self.adapter.register_listener(Event, self._on_event) + + async def _on_event( + self, + event: Event, + adapter: AbstractPlatformAdapter, + ): + """统一事件入口""" + + # 1. 设置事件的 bot_uuid 和 adapter_name + event.bot_uuid = self.bot_entity.uuid + event.adapter_name = self.bot_entity.adapter + + # 2. 日志记录 + await self._log_event(event) + + # 3. 提交给 EventBus + await self.ap.event_bus.emit(event, adapter) +``` + +适配器侧的 `register_listener` 实现也需调整: +- 当 `event_type` 为 `Event`(基类)时,注册为"接收所有事件"的通配回调 +- 适配器在收到平台原生事件时,通过 `EventConverter.target2yiri()` 转换后,调用所有匹配的回调 + +## 7. 从现有单文件适配器迁移 + +### 7.1 迁移模式 + +以 Telegram 为例,从 `sources/telegram.py`(445 行)拆分: + +| 原代码位置 | → 新文件 | +|-----------|----------| +| `TelegramMessageConverter` 类 | `telegram/message_converter.py` | +| `TelegramEventConverter` 类 | `telegram/event_converter.py`(扩展,支持更多事件) | +| `TelegramAdapter.__init__` / `run_async` / `kill` / `register_listener` | `telegram/adapter.py` | +| `TelegramAdapter.send_message` / `reply_message` / `reply_message_chunk` | `telegram/adapter.py`(消息方法保留在主类)+ `telegram/api_impl.py`(新增 API) | +| 新增代码 | `telegram/api_impl.py`(edit_message, delete_message, get_group_info 等) | +| 新增代码 | `telegram/platform_api.py`(pin_message, unpin_message 等的映射) | +| `telegram.yaml` | `telegram/manifest.yaml`(扩展 supported_events/apis) | + +### 7.2 迁移顺序建议 + +1. **Telegram** — 功能最完整的适配器之一,适合作为模板 +2. **Discord** — 第二个迁移,验证模式的通用性 +3. **AioCQHTTP (OneBot)** — 国内最常用,确保兼容 +4. **其他适配器** — 按使用频率排序 + +### 7.3 渐进式迁移 + +不需要一次性迁移所有适配器。可以采用渐进策略: + +1. 先在 `adapters/` 下建立新适配器 +2. `Blueprint` 同时扫描 `sources/` 和 `adapters/` 两个目录 +3. 旧适配器在 `sources/` 中继续工作 +4. 逐个迁移到新结构 +5. 全部迁移完成后移除 `sources/` 目录 + +```yaml +# 过渡期的 Blueprint +kind: Blueprint +spec: + components: + MessagePlatformAdapter: + fromDirs: + - path: pkg/platform/sources/ # 旧路径(尚未迁移的适配器) + - path: pkg/platform/adapters/ # 新路径(已迁移的适配器) +``` diff --git a/docs/event-based-agents/04-event-routing.md b/docs/event-based-agents/04-event-routing.md new file mode 100644 index 00000000..bc18258d --- /dev/null +++ b/docs/event-based-agents/04-event-routing.md @@ -0,0 +1,738 @@ +# 事件路由与编排 + +## 1. 概述 + +事件路由是 EBA 架构的核心机制:事件从适配器产生后,经由 EventBus 进入 EventRouter,由 EventRouter 根据 Bot 的配置将事件分发到对应的处理器(Handler)。 + +**配置方式**:用户在 WebUI 的 Bot 管理页面通过可视化编排面板管理事件处理器配置,配置数据存储在数据库的 Bot 表 `event_handlers` JSON 字段中。 + +## 2. 数据模型 + +### 2.1 Bot 实体扩展 + +在 `bots` 表新增 `event_handlers` 字段: + +```python +class Bot(Base): + __tablename__ = "bots" + + uuid: str # 主键 + name: str + description: str + adapter: str + adapter_config: dict # JSON + enable: bool + + # 新增 + event_handlers: list # JSON — 事件处理器配置列表 + + # 保留(过渡期后弃用) + use_pipeline_name: str # deprecated + use_pipeline_uuid: str # deprecated + + created_at: datetime + updated_at: datetime +``` + +### 2.2 EventHandler 配置结构 + +`event_handlers` 字段存储一个 JSON 数组,每个元素定义一条事件路由规则: + +```python +class EventHandlerConfig(pydantic.BaseModel): + """单条事件处理器配置""" + + event_type: str + """匹配的事件类型 + + 支持精确匹配和通配符: + - "message.received" — 精确匹配 + - "message.*" — 匹配 message 命名空间下所有事件 + - "group.*" — 匹配 group 命名空间下所有事件 + - "*" — 匹配所有事件(兜底) + """ + + handler_type: str + """处理器类型: "pipeline" | "agent" | "webhook" | "plugin" """ + + handler_config: dict = {} + """处理器的具体配置,结构取决于 handler_type""" + + enabled: bool = True + """是否启用此规则""" + + priority: int = 0 + """优先级,数字越大越先匹配(同一事件类型有多条规则时)""" + + description: str = "" + """规则描述(供 WebUI 显示)""" +``` + +### 2.3 各 Handler 类型的 handler_config 结构 + +#### pipeline + +```json +{ + "handler_type": "pipeline", + "handler_config": { + "pipeline_uuid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" + } +} +``` + +将事件作为消息事件传入现有 Pipeline 流水线。仅适用于 `message.received` 事件。 + +#### agent + +```json +{ + "handler_type": "agent", + "handler_config": { + "runner": "local-agent", + "runner_config": { + "model_uuid": "...", + "prompt": "你是一个群组助理,请处理以下事件:{event_summary}", + "tools_enabled": true + } + } +} +``` + +```json +{ + "handler_type": "agent", + "handler_config": { + "runner": "dify-service-api", + "runner_config": { + "base_url": "https://api.dify.ai/v1", + "api_key": "...", + "app_type": "agent" + } + } +} +``` + +直接调用 RequestRunner 处理事件。可用的 runner 包括: +- `local-agent` — 内置 LLM Agent +- `dify-service-api` — Dify 平台 +- `n8n-service-api` — n8n 工作流 +- `coze-api` — Coze (扣子) +- `dashscope-app-api` — 阿里百炼 +- `langflow-api` — Langflow +- `tbox-app-api` — 蚂蚁 Tbox + +Agent 处理器不经过 Pipeline 的多 Stage 流程,而是直接构建上下文并调用 Runner。适用于所有事件类型。 + +**Agent Handler 与 Pipeline 的关系**: +- Pipeline 是完整的多 Stage 处理链(PreProcessor → MessageProcessor(内含Runner) → PostProcessor → ...),适合复杂消息处理 +- Agent Handler 是轻量级的,直接调用 Runner,跳过 PreProcessor/PostProcessor 等阶段 +- Pipeline 内部的 AI Stage 仍然使用 Runner,所以 Runner 本身被两种 Handler 共享 +- 用户可以根据场景选择:消息处理用 Pipeline(更多控制),其他事件用 Agent(更直接) + +#### webhook + +```json +{ + "handler_type": "webhook", + "handler_config": { + "url": "https://example.com/webhook/langbot-events", + "method": "POST", + "headers": { + "Authorization": "Bearer xxx" + }, + "timeout": 30, + "retry_count": 3, + "retry_interval": 5, + "response_actions": true + } +} +``` + +将事件序列化为 JSON POST 到外部 URL。支持的特性: +- **认证**:通过 headers 配置(Bearer Token、API Key 等) +- **重试**:配置重试次数和间隔 +- **响应动作**:如果 `response_actions` 为 true,解析响应 JSON 中的 `actions` 字段并执行(如发送消息、同意好友请求等) + +Webhook 请求体格式: + +```json +{ + "event": { + "type": "group.member_joined", + "timestamp": 1700000000.0, + "bot_uuid": "...", + "adapter_name": "telegram", + "group": { "id": "...", "name": "..." }, + "member": { "id": "...", "nickname": "..." } + }, + "bot": { + "uuid": "...", + "name": "...", + "adapter": "telegram" + } +} +``` + +响应体格式(当 `response_actions` 为 true 时): + +```json +{ + "actions": [ + { + "type": "send_message", + "params": { + "target_type": "group", + "target_id": "123456", + "message": [{ "type": "Plain", "text": "欢迎新成员!" }] + } + }, + { + "type": "call_platform_api", + "params": { + "action": "pin_message", + "params": { "chat_id": "123456", "message_id": "789" } + } + } + ] +} +``` + +#### plugin + +```json +{ + "handler_type": "plugin", + "handler_config": { + "plugin_filter": [] + } +} +``` + +将事件分发给插件的 EventListener 处理。 + +- `plugin_filter`:可选的插件名过滤列表,为空表示分发给所有插件 +- 沿用现有的插件事件分发机制(按优先级遍历插件,支持 `prevent_postorder`) + +### 2.4 完整配置示例 + +一个 Bot 的 `event_handlers` 配置示例: + +```json +[ + { + "event_type": "message.received", + "handler_type": "pipeline", + "handler_config": { + "pipeline_uuid": "default-pipeline-uuid" + }, + "enabled": true, + "priority": 10, + "description": "消息事件使用默认流水线处理" + }, + { + "event_type": "group.member_joined", + "handler_type": "agent", + "handler_config": { + "runner": "local-agent", + "runner_config": { + "model_uuid": "gpt-4o-mini", + "prompt": "有新成员 {member_name} 加入了群组 {group_name},请生成一条欢迎消息。" + } + }, + "enabled": true, + "priority": 0, + "description": "新成员入群时用 AI 生成欢迎消息" + }, + { + "event_type": "friend.request_received", + "handler_type": "webhook", + "handler_config": { + "url": "https://my-server.com/api/friend-request", + "response_actions": true + }, + "enabled": true, + "priority": 0, + "description": "好友请求转发到自建服务处理" + }, + { + "event_type": "*", + "handler_type": "plugin", + "handler_config": {}, + "enabled": true, + "priority": -100, + "description": "所有事件兜底发给插件处理" + } +] +``` + +## 3. EventBus 设计 + +EventBus 是事件的中转站,接收来自各个 RuntimeBot 的事件,交由 EventRouter 处理。 + +```python +class EventBus: + """事件总线""" + + def __init__(self, ap: Application): + self.ap = ap + self.event_router = EventRouter(ap) + + async def emit( + self, + event: Event, + adapter: AbstractPlatformAdapter, + ): + """接收并分发事件 + + Args: + event: 统一事件对象 + adapter: 产生此事件的适配器实例 + """ + # 1. 全局事件日志 + self.ap.logger.debug( + f"EventBus: {event.type} from bot {event.bot_uuid}" + ) + + # 2. 交由 EventRouter 路由处理 + await self.event_router.route(event, adapter) +``` + +## 4. EventRouter 设计 + +EventRouter 是事件路由引擎,根据 Bot 的 `event_handlers` 配置决定事件的处理方式。 + +```python +class EventRouter: + """事件路由引擎""" + + def __init__(self, ap: Application): + self.ap = ap + self.handlers: dict[str, AbstractEventHandler] = { + "pipeline": PipelineHandler(ap), + "agent": AgentHandler(ap), + "webhook": WebhookHandler(ap), + "plugin": PluginHandler(ap), + } + + async def route( + self, + event: Event, + adapter: AbstractPlatformAdapter, + ): + """路由事件到对应处理器""" + + # 1. 获取 Bot 配置 + bot = await self.ap.platform_mgr.get_bot_by_uuid(event.bot_uuid) + if not bot: + return + + # 2. 获取事件处理器配置 + event_handlers = bot.bot_entity.event_handlers or [] + + # 3. 匹配规则(按 priority 降序排列) + matched_handlers = self._match_handlers(event.type, event_handlers) + + if not matched_handlers: + # 未匹配到任何规则 → 默认交给插件处理(向后兼容) + await self.handlers["plugin"].handle(event, adapter, {}) + return + + # 4. 执行第一个匹配的 Handler + # (未来可扩展为多个 Handler 串行/并行执行) + handler_config = matched_handlers[0] + handler = self.handlers.get(handler_config.handler_type) + + if handler: + await handler.handle(event, adapter, handler_config.handler_config) + else: + self.ap.logger.warning( + f"Unknown handler type: {handler_config.handler_type}" + ) + + def _match_handlers( + self, + event_type: str, + handlers: list[EventHandlerConfig], + ) -> list[EventHandlerConfig]: + """匹配事件类型到处理器配置 + + 匹配规则: + 1. 精确匹配:event_type == handler.event_type + 2. 命名空间通配:handler.event_type 为 "message.*" 时匹配所有 "message.xxx" + 3. 全局通配:handler.event_type 为 "*" 时匹配所有事件 + 4. 按 priority 降序排列 + 5. 只返回 enabled=True 的规则 + """ + matched = [] + for handler in handlers: + if not handler.enabled: + continue + if self._event_type_matches(event_type, handler.event_type): + matched.append(handler) + + matched.sort(key=lambda h: h.priority, reverse=True) + return matched + + @staticmethod + def _event_type_matches(event_type: str, pattern: str) -> bool: + """判断事件类型是否匹配模式""" + if pattern == "*": + return True + if pattern == event_type: + return True + if pattern.endswith(".*"): + namespace = pattern[:-2] + return event_type.startswith(namespace + ".") + return False +``` + +## 5. 事件处理器(Handler)实现 + +### 5.1 Handler 基类 + +```python +class AbstractEventHandler(abc.ABC): + """事件处理器基类""" + + def __init__(self, ap: Application): + self.ap = ap + + @abc.abstractmethod + async def handle( + self, + event: Event, + adapter: AbstractPlatformAdapter, + config: dict, + ) -> None: + """处理事件 + + Args: + event: 统一事件对象 + adapter: 适配器实例(用于调用平台 API 发送响应) + config: handler_config 配置 + """ + ... +``` + +### 5.2 PipelineHandler + +将消息事件注入现有 Pipeline 流水线处理。 + +```python +class PipelineHandler(AbstractEventHandler): + """Pipeline 处理器 — 将事件送入现有 Pipeline 流水线""" + + async def handle(self, event, adapter, config): + pipeline_uuid = config.get("pipeline_uuid") + + if not isinstance(event, MessageReceivedEvent): + self.ap.logger.warning( + f"PipelineHandler only supports MessageReceivedEvent, " + f"got {event.type}" + ) + return + + # 将 MessageReceivedEvent 转换为现有的 Query 并投入 QueryPool + # 复用现有的 MessageAggregator + QueryPool + Pipeline 机制 + launcher_type = ( + LauncherTypes.PERSON + if event.chat_type == ChatType.PRIVATE + else LauncherTypes.GROUP + ) + + await self.ap.msg_aggregator.add_message( + bot_uuid=event.bot_uuid, + launcher_type=launcher_type, + launcher_id=event.chat_id, + sender_id=event.sender.id, + message_event=event.to_legacy_event(), # 转换为 FriendMessage/GroupMessage + message_chain=event.message_chain, + adapter=adapter, + pipeline_uuid=pipeline_uuid, + ) +``` + +### 5.3 AgentHandler + +直接调用 RequestRunner 处理事件,不经过 Pipeline Stage 链。 + +```python +class AgentHandler(AbstractEventHandler): + """Agent 处理器 — 直接调用 RequestRunner 处理事件""" + + async def handle(self, event, adapter, config): + runner_name = config.get("runner", "local-agent") + runner_config = config.get("runner_config", {}) + + # 1. 查找 Runner 类 + runner_cls = None + for r in preregistered_runners: + if r.name == runner_name: + runner_cls = r + break + + if not runner_cls: + self.ap.logger.error(f"Runner not found: {runner_name}") + return + + # 2. 构建事件上下文(将事件信息整理为 Runner 可处理的格式) + event_context = self._build_event_context(event, runner_config) + + # 3. 实例化并调用 Runner + runner = runner_cls(self.ap, self._build_runner_pipeline_config(config)) + + response_messages = [] + async for result in runner.run(event_context): + response_messages.append(result) + + # 4. 发送响应(如果 Runner 产生了回复) + if response_messages and isinstance(event, MessageReceivedEvent): + # 将 Runner 输出转换为 MessageChain 并回复 + reply_chain = self._build_reply_chain(response_messages) + await adapter.reply_message(event, reply_chain) + + def _build_event_context(self, event, runner_config): + """将事件构建为 Runner 可处理的上下文 + + 对于消息事件,直接使用消息内容。 + 对于其他事件,根据 runner_config 中的 prompt 模板生成描述文本。 + """ + ... + + def _build_runner_pipeline_config(self, config): + """将 handler_config 转换为 Runner 需要的 pipeline_config 格式""" + ... +``` + +### 5.4 WebhookHandler + +将事件 POST 到外部 URL。 + +```python +class WebhookHandler(AbstractEventHandler): + """Webhook 处理器 — 将事件 POST 到外部 URL""" + + async def handle(self, event, adapter, config): + url = config.get("url") + method = config.get("method", "POST") + headers = config.get("headers", {}) + timeout = config.get("timeout", 30) + retry_count = config.get("retry_count", 3) + response_actions = config.get("response_actions", False) + + # 1. 构建请求体 + bot = await self.ap.platform_mgr.get_bot_by_uuid(event.bot_uuid) + payload = { + "event": event.model_dump(), + "bot": { + "uuid": bot.bot_entity.uuid, + "name": bot.bot_entity.name, + "adapter": bot.bot_entity.adapter, + } + } + + # 2. 发送请求(带重试) + response = await self._send_with_retry( + url, method, headers, payload, timeout, retry_count + ) + + # 3. 处理响应动作 + if response_actions and response: + await self._execute_response_actions( + response, adapter, event + ) + + async def _execute_response_actions(self, response, adapter, event): + """执行响应中的动作列表""" + actions = response.get("actions", []) + for action in actions: + action_type = action.get("type") + params = action.get("params", {}) + + if action_type == "send_message": + chain = MessageChain.model_validate(params.get("message", [])) + await adapter.send_message( + params["target_type"], + params["target_id"], + chain, + ) + elif action_type == "reply": + chain = MessageChain.model_validate(params.get("message", [])) + await adapter.reply_message(event, chain) + elif action_type == "call_platform_api": + await adapter.call_platform_api( + params["action"], + params.get("params", {}), + ) + elif action_type == "approve_friend_request": + await adapter.approve_friend_request( + params["request_id"], + params.get("approve", True), + ) + # ... 更多动作类型 +``` + +### 5.5 PluginHandler + +将事件分发给插件的 EventListener。 + +```python +class PluginHandler(AbstractEventHandler): + """Plugin 处理器 — 分发给插件 EventListener""" + + async def handle(self, event, adapter, config): + plugin_filter = config.get("plugin_filter", []) + + # 复用现有的插件事件分发机制 + # 通过 plugin_connector 将事件发送给 Plugin Runtime + await self.ap.plugin_connector.emit_event( + event=event, + adapter=adapter, + plugin_filter=plugin_filter, + ) +``` + +## 6. use_pipeline_uuid 迁移 + +### 6.1 自动迁移 + +数据库迁移脚本将现有的 `use_pipeline_uuid` 自动转换为 `event_handlers`: + +```python +# 迁移逻辑 +for bot in all_bots: + if bot.use_pipeline_uuid and not bot.event_handlers: + bot.event_handlers = [ + { + "event_type": "message.received", + "handler_type": "pipeline", + "handler_config": { + "pipeline_uuid": bot.use_pipeline_uuid + }, + "enabled": True, + "priority": 10, + "description": "Auto-migrated from use_pipeline_uuid" + }, + { + "event_type": "*", + "handler_type": "plugin", + "handler_config": {}, + "enabled": True, + "priority": -100, + "description": "Default plugin handler" + } + ] +``` + +### 6.2 过渡期兼容 + +在过渡期内,如果 `event_handlers` 为空且 `use_pipeline_uuid` 非空,EventRouter 自动回退到旧行为: + +```python +# EventRouter.route() 中的兼容逻辑 +if not event_handlers and bot.bot_entity.use_pipeline_uuid: + # 回退:消息事件走 Pipeline,其他事件走 Plugin + if isinstance(event, MessageReceivedEvent): + await self.handlers["pipeline"].handle( + event, adapter, + {"pipeline_uuid": bot.bot_entity.use_pipeline_uuid} + ) + else: + await self.handlers["plugin"].handle(event, adapter, {}) + return +``` + +## 7. WebUI 编排面板数据模型 + +### 7.1 交互设计概要 + +在 WebUI 的 Bot 管理页面,新增"事件处理器"标签页(或区域),呈现为一个**规则列表**: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 事件处理器 [+ 添加规则] │ +├─────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─ 规则 1 ─────────────────────────────────── [启用] [删除] ─┐ │ +│ │ 事件类型: [message.received ▾] │ │ +│ │ 处理器: [Pipeline ▾] │ │ +│ │ Pipeline: [默认流水线 ▾] │ │ +│ │ 优先级: [10] │ │ +│ │ 描述: 消息事件使用默认流水线处理 │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─ 规则 2 ─────────────────────────────────── [启用] [删除] ─┐ │ +│ │ 事件类型: [group.member_joined ▾] │ │ +│ │ 处理器: [Agent ▾] │ │ +│ │ Runner: [local-agent ▾] │ │ +│ │ 模型: [gpt-4o-mini ▾] │ │ +│ │ Prompt: [有新成员加入...] │ │ +│ │ 优先级: [0] │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─ 规则 3 (兜底) ──────────────────────────── [启用] [删除] ─┐ │ +│ │ 事件类型: [* ▾] │ │ +│ │ 处理器: [Plugin ▾] │ │ +│ │ 优先级: [-100] │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 7.2 前端数据结构 + +```typescript +interface EventHandlerRule { + event_type: string; // 下拉选择,选项从适配器 manifest 的 supported_events 获取 + handler_type: string; // "pipeline" | "agent" | "webhook" | "plugin" + handler_config: Record; // 根据 handler_type 动态渲染不同的配置表单 + enabled: boolean; + priority: number; + description: string; +} + +// Bot 编辑接口扩展 +interface BotConfig { + uuid: string; + name: string; + adapter: string; + adapter_config: Record; + enable: boolean; + event_handlers: EventHandlerRule[]; // 新增 +} +``` + +### 7.3 事件类型下拉选项 + +从 Bot 关联的适配器 manifest 中获取 `supported_events`,加上通配符选项: + +``` +- message.received +- message.edited +- message.deleted +- message.reaction +- group.member_joined +- group.member_left +- group.member_banned +- group.info_updated +- friend.request_received +- friend.added +- bot.invited_to_group +- bot.removed_from_group +───────────────── +- message.* (所有消息事件) +- group.* (所有群组事件) +- friend.* (所有好友事件) +- bot.* (所有 Bot 事件) +- * (所有事件) +``` + +### 7.4 HTTP API + +``` +GET /api/v1/bots/{uuid}/event-handlers 获取 Bot 的事件处理器配置 +PUT /api/v1/bots/{uuid}/event-handlers 更新 Bot 的事件处理器配置 +GET /api/v1/adapters/{name}/supported-events 获取适配器支持的事件类型 +GET /api/v1/adapters/{name}/supported-apis 获取适配器支持的 API +``` diff --git a/docs/event-based-agents/05-plugin-sdk.md b/docs/event-based-agents/05-plugin-sdk.md new file mode 100644 index 00000000..9deb7bab --- /dev/null +++ b/docs/event-based-agents/05-plugin-sdk.md @@ -0,0 +1,718 @@ +# 插件 SDK 改造 + +## 1. 概述 + +插件 SDK 需要配合 EBA 架构进行以下改造: + +1. **新事件类型**:将所有通用事件暴露给插件 +2. **新 API**:将新增的平台 API 通过 `LangBotAPIProxy` 暴露给插件 +3. **兼容层**:保证现有插件零修改运行 +4. **通信协议扩展**:新增 action 枚举支持新 API + +## 2. 新事件类型暴露 + +### 2.1 插件事件模型扩展 + +当前插件 SDK 的事件模型(`api/entities/events.py`)只有消息相关事件。需要新增所有通用事件的插件级包装: + +```python +# api/entities/events.py — 新增事件 + +# ---- 消息事件(扩展) ---- + +class MessageEditedReceived(BaseEventModel): + """消息被编辑事件""" + launcher_type: str + launcher_id: typing.Union[int, str] + message_id: typing.Union[int, str] + editor_id: typing.Union[int, str] + new_content: MessageChain + chat_type: str # "private" | "group" + +class MessageDeletedReceived(BaseEventModel): + """消息被删除/撤回事件""" + launcher_type: str + launcher_id: typing.Union[int, str] + message_id: typing.Union[int, str] + operator_id: typing.Optional[typing.Union[int, str]] = None + chat_type: str + +class MessageReactionReceived(BaseEventModel): + """消息表情回应事件""" + launcher_type: str + launcher_id: typing.Union[int, str] + message_id: typing.Union[int, str] + user_id: typing.Union[int, str] + reaction: str + is_add: bool + +# ---- 群组事件 ---- + +class GroupMemberJoined(BaseEventModel): + """新成员加入群组""" + group_id: typing.Union[int, str] + group_name: str + member_id: typing.Union[int, str] + member_name: str + inviter_id: typing.Optional[typing.Union[int, str]] = None + join_type: typing.Optional[str] = None + +class GroupMemberLeft(BaseEventModel): + """成员离开群组""" + group_id: typing.Union[int, str] + group_name: str + member_id: typing.Union[int, str] + member_name: str + is_kicked: bool = False + operator_id: typing.Optional[typing.Union[int, str]] = None + +class GroupMemberBanned(BaseEventModel): + """成员被禁言""" + group_id: typing.Union[int, str] + member_id: typing.Union[int, str] + operator_id: typing.Optional[typing.Union[int, str]] = None + duration: typing.Optional[int] = None + +class GroupMemberUnbanned(BaseEventModel): + """成员被解除禁言""" + group_id: typing.Union[int, str] + member_id: typing.Union[int, str] + operator_id: typing.Optional[typing.Union[int, str]] = None + +class GroupInfoUpdated(BaseEventModel): + """群组信息被修改""" + group_id: typing.Union[int, str] + group_name: str + operator_id: typing.Optional[typing.Union[int, str]] = None + changed_fields: list[str] = [] + +# ---- 好友事件 ---- + +class FriendRequestReceived(BaseEventModel): + """收到好友请求""" + request_id: typing.Union[int, str] + user_id: typing.Union[int, str] + user_name: str + message: typing.Optional[str] = None + +class FriendAdded(BaseEventModel): + """成功添加好友""" + user_id: typing.Union[int, str] + user_name: str + +class FriendRemoved(BaseEventModel): + """好友被移除""" + user_id: typing.Union[int, str] + user_name: str + +# ---- Bot 状态事件 ---- + +class BotInvitedToGroup(BaseEventModel): + """Bot 被邀请加入群组""" + group_id: typing.Union[int, str] + group_name: str + inviter_id: typing.Optional[typing.Union[int, str]] = None + request_id: typing.Optional[typing.Union[int, str]] = None + +class BotRemovedFromGroup(BaseEventModel): + """Bot 被移出群组""" + group_id: typing.Union[int, str] + group_name: str + operator_id: typing.Optional[typing.Union[int, str]] = None + +class BotMuted(BaseEventModel): + """Bot 被禁言""" + group_id: typing.Union[int, str] + operator_id: typing.Optional[typing.Union[int, str]] = None + duration: typing.Optional[int] = None + +class BotUnmuted(BaseEventModel): + """Bot 被解除禁言""" + group_id: typing.Union[int, str] + operator_id: typing.Optional[typing.Union[int, str]] = None + +# ---- 平台特有事件 ---- + +class PlatformSpecificEventReceived(BaseEventModel): + """平台特有事件""" + adapter_name: str + action: str + data: dict = {} +``` + +### 2.2 EventListener 注册方式 + +插件的 EventListener 继续使用 `@self.handler(EventType)` 装饰器注册,只是可以注册的事件类型大幅增加: + +```python +class MyEventListener(EventListener): + def __init__(self, host): + super().__init__(host) + + # 现有方式(继续工作) + @self.handler(PersonNormalMessageReceived) + async def on_person_message(ctx: EventContext): + ... + + # 新事件类型 + @self.handler(GroupMemberJoined) + async def on_member_joined(ctx: EventContext): + group_name = ctx.event.group_name + member_name = ctx.event.member_name + await ctx.reply(MessageChain([ + Plain(f"欢迎 {member_name} 加入 {group_name}!") + ])) + + @self.handler(FriendRequestReceived) + async def on_friend_request(ctx: EventContext): + # 自动通过好友请求 + await ctx.approve_friend_request( + ctx.event.request_id, approve=True + ) + + @self.handler(PlatformSpecificEventReceived) + async def on_platform_event(ctx: EventContext): + if ctx.event.adapter_name == "telegram" and ctx.event.action == "chat_join_request": + ... +``` + +## 3. 新 API 暴露 + +### 3.1 LangBotAPIProxy 扩展 + +在 `LangBotAPIProxy` 中新增以下方法,插件通过 `self.xxx()` 调用(在 BasePlugin 中继承): + +```python +class LangBotAPIProxy: + # ---- 现有方法(保留) ---- + # get_langbot_version, get_bots, get_bot_info, + # send_message, invoke_llm, get/set/delete_plugin_storage, ... + + # ---- 新增消息 API ---- + + async def edit_message( + self, + bot_uuid: str, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + new_content: MessageChain, + ) -> None: + """编辑已发送的消息""" + ... + + async def delete_message( + self, + bot_uuid: str, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> None: + """删除/撤回消息""" + ... + + async def forward_message( + self, + bot_uuid: str, + from_chat_type: str, + from_chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + to_chat_type: str, + to_chat_id: typing.Union[int, str], + ) -> dict: + """转发消息""" + ... + + async def get_message( + self, + bot_uuid: str, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> dict: + """获取指定消息""" + ... + + # ---- 新增群组 API ---- + + async def get_group_info( + self, + bot_uuid: str, + group_id: typing.Union[int, str], + ) -> dict: + """获取群组信息""" + ... + + async def get_group_list( + self, + bot_uuid: str, + ) -> list[dict]: + """获取 Bot 加入的群组列表""" + ... + + async def get_group_member_list( + self, + bot_uuid: str, + group_id: typing.Union[int, str], + ) -> list[dict]: + """获取群成员列表""" + ... + + async def get_group_member_info( + self, + bot_uuid: str, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> dict: + """获取指定群成员信息""" + ... + + async def mute_member( + self, + bot_uuid: str, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + duration: int = 0, + ) -> None: + """禁言群成员""" + ... + + async def unmute_member( + self, + bot_uuid: str, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> None: + """解除禁言""" + ... + + async def kick_member( + self, + bot_uuid: str, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> None: + """踢出群成员""" + ... + + # ---- 新增用户 API ---- + + async def get_user_info( + self, + bot_uuid: str, + user_id: typing.Union[int, str], + ) -> dict: + """获取用户信息""" + ... + + async def get_friend_list( + self, + bot_uuid: str, + ) -> list[dict]: + """获取好友列表""" + ... + + async def approve_friend_request( + self, + bot_uuid: str, + request_id: typing.Union[int, str], + approve: bool = True, + remark: typing.Optional[str] = None, + ) -> None: + """处理好友请求""" + ... + + async def approve_group_invite( + self, + bot_uuid: str, + request_id: typing.Union[int, str], + approve: bool = True, + ) -> None: + """处理入群邀请""" + ... + + # ---- 新增透传 API ---- + + async def call_platform_api( + self, + bot_uuid: str, + action: str, + params: dict = {}, + ) -> dict: + """调用适配器特有 API + + Examples: + # Telegram: pin 消息 + result = await self.call_platform_api( + bot_uuid, "pin_message", + {"chat_id": 123456, "message_id": 789} + ) + + # Discord: 创建频道 + result = await self.call_platform_api( + bot_uuid, "create_channel", + {"guild_id": "...", "name": "new-channel"} + ) + """ + ... + + # ---- 新增能力查询 API ---- + + async def get_supported_events( + self, + bot_uuid: str, + ) -> list[str]: + """获取指定 Bot 的适配器支持的事件类型""" + ... + + async def get_supported_apis( + self, + bot_uuid: str, + ) -> list[str]: + """获取指定 Bot 的适配器支持的 API""" + ... +``` + +### 3.2 QueryBasedAPIProxy 扩展 + +在事件处理上下文中(EventContext),通过 `QueryBasedAPIProxy` 新增便捷方法: + +```python +class QueryBasedAPIProxy: + # ---- 现有方法(保留) ---- + # reply, get_bot_uuid, set_query_var, get_query_var, + # create_new_conversation, ... + + # ---- 新增便捷方法 ---- + + async def edit_message( + self, + message_id: typing.Union[int, str], + new_content: MessageChain, + ) -> None: + """在当前会话中编辑消息(自动使用当前 bot_uuid 和 chat 信息)""" + ... + + async def delete_message( + self, + message_id: typing.Union[int, str], + ) -> None: + """在当前会话中删除消息""" + ... + + async def approve_friend_request( + self, + request_id: typing.Union[int, str], + approve: bool = True, + remark: typing.Optional[str] = None, + ) -> None: + """处理好友请求(上下文中自动获取 bot_uuid)""" + ... + + async def approve_group_invite( + self, + request_id: typing.Union[int, str], + approve: bool = True, + ) -> None: + """处理入群邀请""" + ... + + async def get_group_info(self) -> dict: + """获取当前群组信息(仅群聊事件中可用)""" + ... + + async def get_group_member_list(self) -> list[dict]: + """获取当前群组成员列表(仅群聊事件中可用)""" + ... + + async def call_platform_api( + self, + action: str, + params: dict = {}, + ) -> dict: + """调用平台特有 API(自动使用当前 bot_uuid)""" + ... +``` + +## 4. 兼容层设计 + +### 4.1 事件兼容层 + +当 PluginHandler 将新的 `MessageReceivedEvent` 分发给插件时,需要同时生成旧格式事件: + +```python +class PluginEventCompatLayer: + """插件事件兼容层 + + 将新的统一事件转换为旧的插件事件格式, + 确保监听旧事件类型的插件仍能正常工作。 + """ + + @staticmethod + def convert_to_legacy_events( + event: Event, + ) -> list[BaseEventModel]: + """将统一事件转换为旧插件事件列表 + + 一个统一事件可能生成多个旧插件事件。 + 例如 MessageReceivedEvent 会同时生成: + - PersonMessageReceived / GroupMessageReceived(总是生成) + - PersonNormalMessageReceived / GroupNormalMessageReceived(非命令时) + - PersonCommandSent / GroupCommandSent(命令时) + """ + legacy_events = [] + + if isinstance(event, MessageReceivedEvent): + if event.chat_type == ChatType.PRIVATE: + legacy_events.append( + PersonMessageReceived( + launcher_type="person", + launcher_id=event.chat_id, + sender_id=event.sender.id, + message_event=event.to_legacy_friend_message(), + message_chain=event.message_chain, + ) + ) + # 命令检测后还会生成 PersonNormalMessageReceived + # 或 PersonCommandSent,在 Pipeline 阶段处理 + elif event.chat_type == ChatType.GROUP: + legacy_events.append( + GroupMessageReceived( + launcher_type="group", + launcher_id=event.chat_id, + sender_id=event.sender.id, + message_event=event.to_legacy_group_message(), + message_chain=event.message_chain, + ) + ) + + # 新事件类型没有旧的对应物,不生成兼容事件 + # 只有监听了新事件类型的插件才会收到 + + return legacy_events +``` + +### 4.2 分发流程 + +``` +统一事件 (MessageReceivedEvent) + │ + ├─→ 转换为旧格式 (PersonMessageReceived / GroupMessageReceived) + │ └─→ 分发给监听旧事件类型的插件 EventListener + │ + └─→ 直接分发为新格式 (MessageReceivedEvent → 对应的插件事件) + └─→ 分发给监听新事件类型的插件 EventListener +``` + +插件 Runtime 在分发事件时检查每个 EventListener 注册的事件类型: +- 如果注册的是旧类型(`PersonMessageReceived` 等),发送兼容层生成的旧格式事件 +- 如果注册的是新类型(`GroupMemberJoined` 等),发送新格式事件 +- 两者可以共存,同一个插件可以同时监听新旧类型 + +### 4.3 API 兼容层 + +现有插件使用的 API 不受影响: + +| 现有 API | 新架构行为 | +|---------|----------| +| `self.send_message(bot_uuid, target_type, target_id, message_chain)` | 不变,直接调用适配器的 `send_message` | +| `ctx.reply(message_chain, quote_origin)` | 不变,在 MessageReceivedEvent 上下文中调用适配器的 `reply_message` | +| `self.get_bots()` | 不变 | +| `self.get_bot_info(bot_uuid)` | 不变 | + +新 API 只是额外新增的方法,不影响现有方法。 + +## 5. 通信协议扩展 + +### 5.1 新增 Action 枚举 + +在 `entities/io/actions/enums.py` 中新增 action: + +```python +class PluginToRuntimeAction(str, Enum): + # ---- 现有 actions(保留) ---- + REGISTER_PLUGIN = "register_plugin" + REPLY = "reply" + SEND_MESSAGE = "send_message" + # ... + + # ---- 新增消息 API ---- + EDIT_MESSAGE = "edit_message" + DELETE_MESSAGE = "delete_message" + FORWARD_MESSAGE = "forward_message" + GET_MESSAGE = "get_message" + + # ---- 新增群组 API ---- + GET_GROUP_INFO = "get_group_info" + GET_GROUP_LIST = "get_group_list" + GET_GROUP_MEMBER_LIST = "get_group_member_list" + GET_GROUP_MEMBER_INFO = "get_group_member_info" + MUTE_MEMBER = "mute_member" + UNMUTE_MEMBER = "unmute_member" + KICK_MEMBER = "kick_member" + + # ---- 新增用户 API ---- + GET_USER_INFO = "get_user_info" + GET_FRIEND_LIST = "get_friend_list" + APPROVE_FRIEND_REQUEST = "approve_friend_request" + APPROVE_GROUP_INVITE = "approve_group_invite" + + # ---- 新增透传 API ---- + CALL_PLATFORM_API = "call_platform_api" + + # ---- 新增能力查询 ---- + GET_SUPPORTED_EVENTS = "get_supported_events" + GET_SUPPORTED_APIS = "get_supported_apis" + + +class RuntimeToPluginAction(str, Enum): + # ---- 现有 actions(保留) ---- + EMIT_EVENT = "emit_event" + # ... + # EMIT_EVENT 的 data 结构扩展以支持新事件类型 +``` + +### 5.2 新增 Action 的请求/响应格式 + +以 `EDIT_MESSAGE` 为例: + +```json +// 请求 (Plugin → Runtime) +{ + "action": "edit_message", + "seq_id": 12345, + "data": { + "bot_uuid": "...", + "chat_type": "group", + "chat_id": "123456", + "message_id": "789", + "new_content": [ + { "type": "Plain", "text": "edited message" } + ] + } +} + +// 响应 (Runtime → Plugin) +{ + "seq_id": 12345, + "code": 0, + "message": "ok", + "data": {} +} +``` + +以 `GET_GROUP_MEMBER_LIST` 为例: + +```json +// 请求 +{ + "action": "get_group_member_list", + "seq_id": 12346, + "data": { + "bot_uuid": "...", + "group_id": "123456" + } +} + +// 响应 +{ + "seq_id": 12346, + "code": 0, + "message": "ok", + "data": { + "members": [ + { + "user": { "id": "111", "nickname": "Alice" }, + "group_id": "123456", + "role": "admin", + "display_name": "管理员Alice" + }, + ... + ] + } +} +``` + +以 `CALL_PLATFORM_API` 为例: + +```json +// 请求 +{ + "action": "call_platform_api", + "seq_id": 12347, + "data": { + "bot_uuid": "...", + "action": "pin_message", + "params": { + "chat_id": "123456", + "message_id": "789" + } + } +} + +// 响应 +{ + "seq_id": 12347, + "code": 0, + "message": "ok", + "data": { + "result": { ... } + } +} +``` + +### 5.3 LangBot 侧 Handler 实现 + +在 `ControlConnectionHandler`(LangBot → Runtime 侧)和 `PluginConnectionHandler`(Runtime → Plugin 侧)中新增对应的 action 处理逻辑: + +```python +# PluginConnectionHandler 中新增 +async def _handle_edit_message(self, data): + bot_uuid = data["bot_uuid"] + bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid) + await bot.adapter.edit_message( + chat_type=data["chat_type"], + chat_id=data["chat_id"], + message_id=data["message_id"], + new_content=MessageChain.model_validate(data["new_content"]), + ) + return {} + +async def _handle_call_platform_api(self, data): + bot_uuid = data["bot_uuid"] + bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid) + result = await bot.adapter.call_platform_api( + action=data["action"], + params=data.get("params", {}), + ) + return {"result": result} +``` + +## 6. 插件开发者迁移指南 + +### 6.1 无需迁移(零修改运行) + +以下场景的现有插件**不需要任何修改**: + +- 使用 `PersonNormalMessageReceived` / `GroupNormalMessageReceived` 监听消息 +- 使用 `PersonCommandSent` / `GroupCommandSent` 处理命令 +- 使用 `ctx.reply()` 回复消息 +- 使用 `self.send_message()` 主动发消息 +- 使用 LLM / 存储 / RAG 等现有 API + +### 6.2 推荐迁移(获得新能力) + +如果插件希望利用新功能,可以: + +1. **监听新事件类型**:在 EventListener 中注册新事件类型的 handler +2. **使用新 API**:调用 `self.edit_message()`, `self.get_group_info()` 等 +3. **使用透传 API**:调用 `self.call_platform_api()` 使用平台特有功能 + +### 6.3 SDK 版本号 + +新功能通过提升 SDK minor 版本发布: + +- 现有版本:`langbot-plugin-sdk >= x.y.z` +- 新版本:`langbot-plugin-sdk >= x.(y+1).0` + +插件的 `manifest.yaml` 中的 `min_sdk_version` 决定是否能使用新 API。使用旧 SDK 版本的插件在新 LangBot 上正常运行(兼容层保证),只是无法调用新 API。 diff --git a/docs/event-based-agents/06-migration-plan.md b/docs/event-based-agents/06-migration-plan.md new file mode 100644 index 00000000..bd8d3b66 --- /dev/null +++ b/docs/event-based-agents/06-migration-plan.md @@ -0,0 +1,429 @@ +# 分阶段迁移计划 + +## 1. 概述 + +EBA 架构涉及 langbot-plugin-sdk、LangBot 后端、LangBot 前端、文档和示例插件等多个仓库的改动。为降低风险、保证系统稳定性,采用分阶段渐进式迁移策略。 + +### 1.1 阶段总览 + +| 阶段 | 名称 | 范围 | 依赖 | +|------|------|------|------| +| Phase 1 | SDK 实体层 | langbot-plugin-sdk | 无 | +| Phase 2 | 适配器重构 | LangBot 后端 | Phase 1 | +| Phase 3 | 核心系统 | LangBot 后端 | Phase 2 | +| Phase 4 | 插件 SDK 集成 | langbot-plugin-sdk + LangBot | Phase 3 | +| Phase 5 | WebUI 编排面板 | LangBot 前端 | Phase 3 | +| Phase 6 | 文档与示例 | langbot-wiki + langbot-plugin-demo | Phase 4, 5 | + +### 1.2 核心原则 + +- **每个阶段结束后系统可运行**:任何阶段完成后,现有功能不受影响 +- **向后兼容贯穿全程**:旧接口在整个迁移期间保持可用 +- **先 SDK 后实现**:先定义好接口和模型,再做具体实现 +- **先核心适配器后边缘**:优先迁移用户量大的适配器 + +--- + +## 2. Phase 1:SDK 实体层 + +**目标**:在 langbot-plugin-sdk 中定义新的事件体系、通用实体、API 接口和适配器基类。 + +**仓库**:`langbot-plugin-sdk` + +### 2.1 任务清单 + +| # | 任务 | 文件/模块 | 说明 | +|---|------|----------|------| +| 1.1 | 定义通用事件基类层次 | `api/entities/builtin/platform/events.py` | 新增 `MessageReceivedEvent`, `MessageEditedEvent`, `GroupMemberJoinedEvent` 等,保留现有 `FriendMessage`/`GroupMessage` | +| 1.2 | 定义平台特有事件基类 | `api/entities/builtin/platform/events.py` | 新增 `PlatformSpecificEvent` | +| 1.3 | 扩展通用实体 | `api/entities/builtin/platform/entities.py` | 新增 `User`(统一 Friend/GroupMember 的基础)、`Channel` 等,保留现有实体 | +| 1.4 | 清理消息组件 | `api/entities/builtin/platform/message.py` | 将 `WeChatMiniPrograms` 等 WeChat 特有组件标记为 platform-specific,不再作为通用组件 | +| 1.5 | 定义新适配器基类 | `api/definition/abstract/platform/adapter.py` | 新增 `AbstractPlatformAdapter`(继承现有 `AbstractMessagePlatformAdapter` 并扩展通用 API 方法),保留旧基类 | +| 1.6 | 定义 API 能力声明 | `api/definition/abstract/platform/capabilities.py`(新文件) | `AdapterCapabilities` 数据类,声明适配器支持的事件和 API | +| 1.7 | 定义 `NotSupportedError` | `api/entities/builtin/platform/errors.py`(新文件) | 可选 API 未实现时抛出的异常 | + +### 2.2 关键设计约束 + +- 所有新增定义以**新增文件或新增类**的方式引入,**不修改**现有类的字段和方法签名 +- 现有 `AbstractMessagePlatformAdapter` 保留不动,新基类 `AbstractPlatformAdapter` 继承它 +- 新事件类与旧事件类并存,通过 `event_type` 字段(命名空间字符串)区分 + +### 2.3 验收标准 + +- [ ] 所有新增类可正常 import 且通过类型检查 +- [ ] 现有 `FriendMessage`, `GroupMessage`, `AbstractMessagePlatformAdapter` 等类行为不变 +- [ ] 新增单元测试覆盖事件序列化/反序列化、实体构造 +- [ ] SDK 版本号 minor bump(如 `0.x.0` → `0.x+1.0`) + +--- + +## 3. Phase 2:适配器重构 + +**目标**:将现有单文件适配器迁移到独立目录结构,实现新事件监听和通用 API。 + +**仓库**:`LangBot`(后端) + +### 3.1 适配器迁移优先级 + +根据用户量和代表性,建议按以下顺序迁移: + +| 优先级 | 适配器 | 理由 | +|--------|--------|------| +| P0 | **Telegram** | 用户量大,API 最完善,适合作为参考实现 | +| P0 | **Discord** | 国际用户主要平台,事件类型丰富 | +| P1 | **aiocqhttp**(OneBot v11) | 国内 QQ 用户主要适配器 | +| P1 | **Satori** | 通用协议适配器,覆盖多个平台 | +| P2 | **Lark** / **DingTalk** / **Slack** | 企业平台,用户量中等 | +| P2 | **qqofficial** / **WeChat 系列** | 国内用户 | +| P3 | **Kook** / **LINE** / **WeCom 系列** | 用户量较小 | +| P3 | **WebSocket** | 内置适配器,相对简单 | +| P4 | **legacy/*** | 遗留适配器,按需决定是否迁移或废弃 | + +### 3.2 单个适配器迁移步骤(以 Telegram 为例) + +| # | 任务 | 说明 | +|---|------|------| +| 2.1 | 创建目录结构 | `pkg/platform/adapters/telegram/` 下创建 `__init__.py`, `adapter.py`, `event_converter.py`, `message_converter.py`, `api_impl.py`, `types.py`, `manifest.yaml` | +| 2.2 | 迁移消息转换器 | 将 `TelegramMessageConverter` 从 `sources/telegram.py` 搬到 `adapters/telegram/message_converter.py`,逻辑不变 | +| 2.3 | 重写事件转换器 | 新的 `TelegramEventConverter` 支持将 Telegram Update 转换为所有通用事件类型(不只是消息),不支持的事件转为 `PlatformSpecificEvent` | +| 2.4 | 实现通用 API | 在 `api_impl.py` 中实现 `edit_message`, `delete_message`, `get_group_info` 等 Telegram 支持的通用 API | +| 2.5 | 实现透传 API | 在 `adapter.py` 中实现 `call_platform_api`,将 action 映射到 Telegram Bot API 调用 | +| 2.6 | 声明能力 | 在 `manifest.yaml` 或适配器类中声明支持的事件和 API 列表 | +| 2.7 | 新建 Adapter 主类 | `TelegramAdapter` 继承 `AbstractPlatformAdapter`(新基类),委托各模块实现 | +| 2.8 | 更新 manifest.yaml | 更新 `execution.python.path` 指向新位置 | +| 2.9 | 验证 | 确保新适配器通过现有消息收发流程的测试 | + +### 3.3 基础设施任务 + +| # | 任务 | 说明 | +|---|------|------| +| 2.A | 创建 `adapters/_base/` | 将 SDK 中新基类的运行时辅助代码放在此处(如事件分发辅助函数) | +| 2.B | 更新 ComponentDiscovery | 使 `discover_blueprint` 支持扫描 `adapters/` 子目录中的 YAML | +| 2.C | 更新 `templates/components.yaml` | 将 `fromDirs` 从 `pkg/platform/sources/` 改为 `pkg/platform/adapters/`(过渡期两个都扫描) | +| 2.D | 保留旧 sources/ | 过渡期不删除旧文件,通过 manifest 的 `deprecated: true` 标记 | + +### 3.4 验收标准 + +- [ ] 已迁移的适配器在新目录结构下正常启动和收发消息 +- [ ] 新事件(如 `message.edited`)在支持的平台上正确触发 +- [ ] 通用 API(如 `edit_message`)在支持的平台上正确执行 +- [ ] 未迁移的适配器(仍在 `sources/`)继续正常工作 +- [ ] ComponentDiscovery 同时扫描新旧目录 + +--- + +## 4. Phase 3:核心系统 + +**目标**:实现 EventBus、EventRouter 和事件处理器框架,将事件从适配器分发到不同的处理器。 + +**仓库**:`LangBot`(后端) + +### 4.1 任务清单 + +| # | 任务 | 文件/模块 | 说明 | +|---|------|----------|------| +| 3.1 | 实现 EventBus | `pkg/platform/event_bus.py`(新文件) | 事件总线:接收适配器事件,进行日志记录,分发给 EventRouter | +| 3.2 | 实现 EventRouter | `pkg/platform/event_router.py`(新文件) | 事件路由引擎:读取 Bot 的 `event_handlers` 配置,匹配事件类型,分发到对应 Handler | +| 3.3 | 实现 PipelineHandler | `pkg/platform/handlers/pipeline_handler.py` | 将 `message.received` 事件转为现有 Query,进入 Pipeline 流水线 | +| 3.4 | 实现 AgentHandler | `pkg/platform/handlers/agent_handler.py` | 直接调用 RequestRunner 处理事件,不经过 Pipeline 多 Stage 流程 | +| 3.5 | 实现 WebhookHandler | `pkg/platform/handlers/webhook_handler.py` | 将事件 POST 到外部 URL,解析响应执行动作(重构现有 WebhookPusher) | +| 3.6 | 实现 PluginHandler | `pkg/platform/handlers/plugin_handler.py` | 将事件分发给插件 EventListener(复用现有 plugin_connector 机制) | +| 3.7 | Bot 实体扩展 | `pkg/entity/persistence/bot.py` | 新增 `event_handlers` JSON 字段 | +| 3.8 | 数据库迁移 | `pkg/persistence/migrations/` | 新增迁移脚本:添加 `event_handlers` 列,将现有 `use_pipeline_uuid` 数据迁移为 `event_handlers` 格式 | +| 3.9 | 重构 RuntimeBot | `pkg/platform/botmgr.py` | 将 `initialize()` 中硬编码的 `on_friend_message`/`on_group_message` 回调替换为通过 EventBus 分发所有事件 | +| 3.10 | 重构 MessageAggregator | `pkg/pipeline/aggregator.py` | 从 RuntimeBot 解耦,作为 PipelineHandler 的内部机制(只对 `message.received` 事件生效) | +| 3.11 | Agent Handler 中 RequestRunner 解耦 | `pkg/provider/runner.py` + handlers | RequestRunner 需要能独立于 Pipeline Stage 运行,为 Agent Handler 提供轻量调用路径 | +| 3.12 | HTTP API 扩展 | `pkg/api/http/controller/` | 新增/更新 Bot API 端点以支持 `event_handlers` 的 CRUD | + +### 4.2 数据迁移策略 + +现有 Bot 表有 `use_pipeline_uuid` 字段,需要自动迁移为 `event_handlers`: + +```python +# 迁移逻辑伪代码 +for bot in all_bots: + if bot.use_pipeline_uuid: + bot.event_handlers = [ + { + "event_type": "message.received", + "handler_type": "pipeline", + "handler_config": { + "pipeline_uuid": bot.use_pipeline_uuid + } + } + ] + else: + bot.event_handlers = [] +``` + +### 4.3 RuntimeBot 重构要点 + +当前 `RuntimeBot.initialize()` 硬编码注册两个回调: + +```python +# 现有代码 (botmgr.py) +self.adapter.register_listener(FriendMessage, on_friend_message) +self.adapter.register_listener(GroupMessage, on_group_message) +``` + +重构后改为注册通用事件回调: + +```python +# 新代码 +async def on_event(event: Event, adapter: AbstractPlatformAdapter): + await self.event_bus.emit( + bot_uuid=self.bot_entity.uuid, + event=event, + adapter=adapter, + ) + +# 注册所有事件类型的统一回调 +self.adapter.register_listener(Event, on_event) +``` + +EventBus 接收事件后,调用 EventRouter 按配置分发。 + +### 4.4 事件处理器执行流程 + +``` +EventBus.emit(bot_uuid, event, adapter) + │ + ▼ +EventRouter.route(bot_uuid, event) + │ 查询 bot.event_handlers 配置 + │ 匹配 event_type(精确匹配 > 通配符 *) + ▼ +匹配到的 Handler(s) + │ + ├── PipelineHandler.handle(event, adapter) + │ │ 仅支持 message.received + │ │ 构造 Query → MessageAggregator → QueryPool → Pipeline + │ └── 沿用现有完整流水线机制 + │ + ├── AgentHandler.handle(event, adapter) + │ │ 根据 handler_config 选择 RequestRunner + │ │ 直接调用 runner.run() 处理事件 + │ └── 将结果通过 adapter API 回复 + │ + ├── WebhookHandler.handle(event, adapter) + │ │ 序列化事件为 JSON + │ │ POST 到 handler_config.url + │ └── 解析响应,执行动作(回复消息、调用 API 等) + │ + └── PluginHandler.handle(event, adapter) + │ 通过 plugin_connector 分发给插件 + └── 插件 EventListener 处理 +``` + +### 4.5 验收标准 + +- [ ] `message.received` 事件通过 PipelineHandler 正确进入现有 Pipeline(与旧行为一致) +- [ ] 新增事件(如 `group.member_joined`)能通过 PluginHandler 分发给插件 +- [ ] AgentHandler 能直接调用 RequestRunner(至少 `local-agent`)处理事件并回复 +- [ ] WebhookHandler 能将事件 POST 到外部 URL +- [ ] 数据库迁移正确执行,`use_pipeline_uuid` 数据迁移到 `event_handlers` +- [ ] 现有 Bot 在不修改配置的情况下行为不变(自动迁移保证) + +--- + +## 5. Phase 4:插件 SDK 集成 + +**目标**:将新事件和 API 通过插件 SDK 暴露给插件开发者,同时实现兼容层。 + +**仓库**:`langbot-plugin-sdk` + `LangBot` + +### 5.1 任务清单 + +| # | 任务 | 说明 | +|---|------|------| +| 4.1 | 新增插件事件包装 | 在 `api/entities/events.py` 中为每个通用事件新增插件级事件类(如 `MessageEditedReceived`, `MemberJoinedReceived`) | +| 4.2 | 兼容层实现 | `PersonMessageReceived` / `GroupMessageReceived` 由新的 `MessageReceivedEvent` 自动生成,旧事件作为新事件的 alias | +| 4.3 | 新 API 暴露 | 在 `LangBotAPIProxy` 中新增方法:`edit_message`, `delete_message`, `get_group_info`, `get_user_info`, `call_platform_api` 等 | +| 4.4 | 通信协议扩展 | 在 `entities/io/actions/enums.py` 中新增 action 枚举(如 `EDIT_MESSAGE`, `DELETE_MESSAGE`, `GET_GROUP_INFO`, `CALL_PLATFORM_API`) | +| 4.5 | Runtime Handler 扩展 | 在 PluginConnectionHandler / ControlConnectionHandler 中添加新 action 的处理逻辑 | +| 4.6 | EventListener 扩展 | 确保 `@handler()` 装饰器支持注册新事件类型 | +| 4.7 | QueryBasedAPI 扩展 | 在 `QueryBasedAPIProxy` 中新增事件上下文相关的 API(如 `get_event_source_adapter`) | + +### 5.2 兼容层详细设计 + +``` +新事件系统 旧事件系统(兼容层) +───────────── ───────────────── +MessageReceivedEvent ┌→ PersonMessageReceived (chat_type == "private") + (chat_type: "private"|"group") ┤ + └→ GroupMessageReceived (chat_type == "group") +``` + +**实现方式**:在 RuntimeEventDispatcher 中,当分发 `MessageReceivedEvent` 给插件时,同时生成对应的旧事件类实例。插件可以用新事件类或旧事件类注册 handler,都能收到。 + +### 5.3 验收标准 + +- [ ] 现有插件(使用旧事件和 API)无需修改即可运行 +- [ ] 新插件可以使用新事件类型(如 `MemberJoinedReceived`)注册 handler +- [ ] 新 API(如 `edit_message`)可通过 `self.edit_message()` 或 `event_context.edit_message()` 调用 +- [ ] 透传 API `call_platform_api` 可正常调用适配器特有功能 +- [ ] 所有新 action 的通信协议正确工作(stdio / WebSocket) + +--- + +## 6. Phase 5:WebUI 编排面板 + +**目标**:在 WebUI 的 Bot 管理页面实现事件处理器的可视化编排。 + +**仓库**:`LangBot`(前端 `web/`) + +### 6.1 任务清单 + +| # | 任务 | 说明 | +|---|------|------| +| 5.1 | Bot 编辑页面扩展 | 在 Bot 编辑页面新增「事件处理」面板 | +| 5.2 | 事件处理器列表组件 | 可视化展示当前 Bot 的 `event_handlers` 列表,支持增删改排序 | +| 5.3 | 事件类型选择器 | 下拉选择事件类型(命名空间分组展示),支持通配符 `*` | +| 5.4 | Handler 类型选择与配置 | 选择 handler 类型后展示对应的配置表单(Pipeline 选择器、Runner 选择器、Webhook URL 等) | +| 5.5 | Pipeline Handler 配置 | 复用现有的 Pipeline 选择 UI(从现有 `use_pipeline_uuid` 选择器迁移) | +| 5.6 | Agent Handler 配置 | Runner 选择器(local-agent / dify / n8n / coze 等)+ Runner 参数配置表单 | +| 5.7 | Webhook Handler 配置 | URL 输入、认证方式选择、Header 配置 | +| 5.8 | Plugin Handler 配置 | 通常无需额外配置,分发给所有匹配的插件 EventListener | +| 5.9 | HTTP API 对接 | 前端调用后端 API 保存/读取 `event_handlers` 配置 | +| 5.10 | 迁移提示 | 对于从旧版本升级的用户,如果检测到 `use_pipeline_uuid` 已自动迁移,展示提示说明 | + +### 6.2 UI 交互设计概要 + +``` +┌─ Bot 编辑页面 ─────────────────────────────────────┐ +│ │ +│ 基本信息 │ 适配器配置 │ ★ 事件处理 │ │ +│ │ +│ ┌─ 事件处理器列表 ────────────────────────────┐ │ +│ │ │ │ +│ │ ① message.received → Pipeline: "主流水线" │ │ +│ │ [编辑] [删除] │ │ +│ │ │ │ +│ │ ② group.member_joined → Agent: local-agent │ │ +│ │ [编辑] [删除] │ │ +│ │ │ │ +│ │ ③ * (默认) → Plugin │ │ +│ │ [编辑] [删除] │ │ +│ │ │ │ +│ │ [+ 添加事件处理器] │ │ +│ │ │ │ +│ └──────────────────────────────────────────────┘ │ +│ │ +│ [保存] [取消] │ +└─────────────────────────────────────────────────────┘ +``` + +### 6.3 验收标准 + +- [ ] 用户可以在 WebUI 上为 Bot 添加/编辑/删除事件处理器 +- [ ] 四种 Handler 类型均有对应的配置表单 +- [ ] 配置保存后正确写入数据库 `event_handlers` 字段 +- [ ] 旧版本升级后,自动迁移的配置在 UI 上正确展示 +- [ ] Pipeline Handler 的行为与旧的 `use_pipeline_uuid` 完全一致 + +--- + +## 7. Phase 6:文档与示例 + +**目标**:更新所有面向开发者的文档和示例。 + +**仓库**:`langbot-wiki`, `langbot-plugin-demo` + +### 7.1 任务清单 + +| # | 任务 | 仓库 | 说明 | +|---|------|------|------| +| 6.1 | EBA 架构概览文档 | langbot-wiki | 面向用户的新架构说明 | +| 6.2 | 适配器开发指南更新 | langbot-wiki | 如何开发一个新的适配器(新目录结构、新基类、事件转换等) | +| 6.3 | 插件开发指南更新 | langbot-wiki | 新事件类型、新 API 的使用说明 | +| 6.4 | 插件迁移指南 | langbot-wiki | 现有插件如何迁移到新事件/API(如果需要使用新能力) | +| 6.5 | 事件处理器配置指南 | langbot-wiki | WebUI 上如何配置事件处理器 | +| 6.6 | 示例插件更新 | langbot-plugin-demo | HelloPlugin 增加新事件监听示例、新 API 调用示例 | +| 6.7 | 新示例插件 | langbot-plugin-demo | 新建一个示例展示非消息事件处理(如入群欢迎) | + +--- + +## 8. 风险评估与缓解 + +### 8.1 技术风险 + +| 风险 | 影响 | 概率 | 缓解措施 | +|------|------|------|----------| +| 适配器迁移中断现有功能 | 高 | 中 | 新旧目录并存,ComponentDiscovery 同时扫描两个目录,逐个适配器迁移验证 | +| 事件模型不兼容导致插件崩溃 | 高 | 低 | 兼容层保证旧事件类型继续工作,新增类不修改旧类 | +| 数据库迁移失败 | 高 | 低 | 迁移脚本做前置校验,`use_pipeline_uuid` 在过渡期保留不删除 | +| RequestRunner 解耦破坏 Pipeline | 高 | 中 | Agent Handler 调用 Runner 的路径独立于 Pipeline,不修改现有 Pipeline Stage 中的 Runner 调用逻辑 | +| 性能回退(EventBus 额外开销) | 中 | 低 | EventBus 在进程内同步分发,无额外序列化/网络开销 | +| 各平台事件差异大难以统一 | 中 | 中 | 通用事件只抽象最大公约数字段,差异部分保留在 `source_platform_object`;不支持的事件走 `PlatformSpecificEvent` | + +### 8.2 兼容性风险 + +| 风险 | 缓解措施 | +|------|----------| +| 现有插件使用旧事件类 | 兼容层自动将新事件转为旧事件分发,两种事件类都能注册 handler | +| 现有插件调用 `reply()` / `send_message()` | 这两个 API 保持不变,只是底层实现可能微调 | +| 第三方基于 `AbstractMessagePlatformAdapter` 开发的适配器 | 旧基类保留,新基类继承旧基类,第三方适配器无需立即迁移 | +| 用户自定义 Pipeline 配置 | Pipeline 机制完整保留,PipelineHandler 只是入口变了(从 RuntimeBot 硬编码变为 EventRouter 配置) | + +### 8.3 回滚策略 + +每个 Phase 独立可回滚: + +- **Phase 1**(SDK 新增类):删除新增文件,回退 SDK 版本号 +- **Phase 2**(适配器目录):恢复 `components.yaml` 的 `fromDirs` 指向旧目录,旧 sources/ 未删除 +- **Phase 3**(核心系统):回退数据库迁移,恢复 RuntimeBot 旧的硬编码回调 +- **Phase 4**(插件集成):回退 SDK 版本,插件使用旧版 SDK +- **Phase 5**(WebUI):前端回退,Bot 编辑页面隐藏事件处理面板 + +--- + +## 9. 里程碑与时间线建议 + +| 里程碑 | 阶段 | 预期产出 | +|--------|------|----------| +| M1 | Phase 1 完成 | SDK 新版本发布,包含新事件/实体/基类定义 | +| M2 | Phase 2 首批适配器(Telegram + Discord) | 两个参考实现,验证目录结构和事件/API 体系 | +| M3 | Phase 3 核心系统 | EventBus + EventRouter + 四种 Handler 可用 | +| M4 | Phase 2 剩余适配器 | 所有活跃适配器迁移完成 | +| M5 | Phase 4 插件集成 | 新 SDK 发布,插件可使用新事件和 API | +| M6 | Phase 5 WebUI | 事件处理器编排面板上线 | +| M7 | Phase 6 文档 | 开发者文档和示例更新完毕 | + +建议 M1-M3 作为第一个大版本发布(如 v5.0),M4-M7 在后续小版本迭代中完成。 + +--- + +## 10. 开发指引 + +### 10.1 分支策略 + +建议在主仓库创建 `feature/eba` 长期特性分支,各 Phase 在子分支上开发后合入特性分支: + +``` +main + └── feature/eba + ├── feature/eba-sdk-entities (Phase 1) + ├── feature/eba-adapter-telegram (Phase 2) + ├── feature/eba-adapter-discord (Phase 2) + ├── feature/eba-core-system (Phase 3) + ├── feature/eba-plugin-sdk (Phase 4) + └── feature/eba-webui (Phase 5) +``` + +### 10.2 测试策略 + +| 层次 | 测试内容 | 工具 | +|------|----------|------| +| 单元测试 | 事件序列化/反序列化、实体构造、API 调用 mock | pytest | +| 集成测试 | EventBus → EventRouter → Handler 全链路 | pytest + asyncio | +| 适配器测试 | 各适配器的事件转换、消息转换、API 调用 | pytest + mock SDK | +| 端到端测试 | 从模拟平台事件到完整处理流程 | staging 环境 | +| 插件兼容性测试 | 旧插件在新系统下的行为 | langbot-plugin-demo | + +### 10.3 代码审查关注点 + +- 新增代码是否影响现有行为 +- 兼容层是否正确映射所有旧事件/API 场景 +- 数据库迁移是否可逆 +- 新 API 的错误处理(`NotSupportedError`)是否一致 +- 事件模型的序列化在 stdio/WebSocket 通信中是否正确