diff --git a/docs/event-based-agents/adapters/00-index.md b/docs/event-based-agents/adapters/00-index.md index cc51b1c0..8bd7b5cf 100644 --- a/docs/event-based-agents/adapters/00-index.md +++ b/docs/event-based-agents/adapters/00-index.md @@ -20,6 +20,8 @@ Current acceptance report: [EBA Adapter Acceptance Report](./acceptance-report.m | OneBot v11 / aiocqhttp | Migrated; Matcha UI plus protocol-level multi-component coverage | [OneBot v11 / aiocqhttp](./aiocqhttp.md) | | DingTalk | Migrated; partial plugin E2E, real UI inbound image/file verified; group gap remains | [DingTalk](./dingtalk.md) | | Lark / Feishu | Migrated; partial live text E2E, media-inbound gap remains | [Lark / Feishu](./lark.md) | +| WeCom | Migrated; private text plugin E2E verified, media/group gaps remain | [WeCom](./wecom.md) | +| WeComBot | Migrated; private text and outbound/API plugin E2E verified, feedback/group gaps remain | [WeComBot](./wecombot.md) | ## Documentation Checklist diff --git a/docs/event-based-agents/adapters/acceptance-report.md b/docs/event-based-agents/adapters/acceptance-report.md index d503d876..7e5ff8e1 100644 --- a/docs/event-based-agents/adapters/acceptance-report.md +++ b/docs/event-based-agents/adapters/acceptance-report.md @@ -9,6 +9,8 @@ Scope: - `aiocqhttp-eba` - `dingtalk-eba` - `lark-eba` +- `wecom-eba` +- `wecombot-eba` This report follows `acceptance-checklist.md`. Evidence levels are intentionally strict: @@ -28,6 +30,8 @@ This report follows `acceptance-checklist.md`. Evidence levels are intentionally | OneBot v11 / aiocqhttp | Partial EBA acceptance | Matcha UI covered real group text and outbound supported components/APIs. Multi-component inbound `Source/Plain/At/Face/Image/Voice/File/Quote` was verified through the real OneBot reverse WebSocket adapter endpoint, but not through Matcha UI upload/send. Matcha blocks file-send and merged-forward APIs. | | DingTalk | Partial EBA acceptance | Real DingTalk UI covered private text, emoji-as-text inbound, private inbound image/file, outbound image/file/quote/mention fallback components, safe SDK APIs, and safe DingTalk platform APIs. Real UI inbound voice/quote and group trigger were not completed. | | Lark / Feishu | Partial EBA acceptance | EBA adapter structure, self-built/store app config, WebSocket/Webhook mode handling, converters, common APIs, platform APIs, and unit tests are in place. One real LangBot organization WebSocket private text event reached `EBAEventProbe`; outbound component sweep was visible in Feishu. Latest real UI image/file sends did not reach local plugin evidence, so media receive remains blocked. | +| WeCom | Partial EBA acceptance | Regular WeCom application-message adapter is split into the EBA directory with manifest, converters, API mixin, platform API map, and unit tests. Private text reached `EBAEventProbe` through standalone runtime and the real WeCom client; safe plugin APIs passed. Real inbound media and broader event coverage remain pending. | +| WeComBot | Partial EBA acceptance | WeCom AI Bot is split into the EBA directory with WebSocket long connection mode and optional webhook mode, EBA message/feedback/platform-specific conversion, cache-backed common APIs, platform API map, unit tests, and a direct live probe. Private text, outbound component sweep, safe common APIs, and all declared WeComBot platform APIs reached `EBAEventProbe`; group, real inbound media, and feedback callback evidence remain pending. | Telegram and DingTalk now have real user-side UI image/file upload evidence in plugin JSONL. Discord and aiocqhttp do not yet have real UI inbound image/file evidence. diff --git a/docs/event-based-agents/adapters/wecom.md b/docs/event-based-agents/adapters/wecom.md new file mode 100644 index 00000000..c217cbd3 --- /dev/null +++ b/docs/event-based-agents/adapters/wecom.md @@ -0,0 +1,130 @@ +# WeCom EBA Adapter + +## Status + +WeCom application messages now have an EBA adapter directory: + +```text +src/langbot/pkg/platform/adapters/wecom/ +├── adapter.py +├── api_impl.py +├── event_converter.py +├── manifest.yaml +├── message_converter.py +├── platform_api.py +└── types.py +``` + +The adapter is registered as `wecom-eba`. + +This record covers the regular WeCom application-message adapter. WeCom AI Bot (`wecombot-eba`) uses a different protocol flow and is documented separately in `wecombot.md`. WeCom Customer Service (`wecomcs`) remains a separate follow-up migration. + +## Configuration + +| Field | Required | Default | Description | +|-------|----------|---------|-------------| +| `webhook_url` | No | `""` | Unified webhook URL copied into the WeCom application callback settings. | +| `corpid` | Yes | `""` | WeCom corporate ID. | +| `secret` | Yes | `""` | WeCom application secret. | +| `token` | Yes | `""` | WeCom callback token. | +| `EncodingAESKey` | Yes | `""` | WeCom callback encryption key. | +| `contacts_secret` | No | `""` | Contacts secret for contact-list based helper APIs. | +| `api_base_url` | No | `https://qyapi.weixin.qq.com/cgi-bin` | WeCom API base URL, overrideable for proxy/private-network deployments. | + +## Events + +WeCom declares these EBA events: + +- `message.received` +- `platform.specific` + +`message.received` currently covers text and image application callbacks. Other WeCom callback types are surfaced as `platform.specific` so plugins can inspect the raw structured payload without crashing the common message path. + +## Common APIs + +| API | Status | Notes | +|-----|--------|-------| +| `send_message` | Supported | Private/person target only. `target_id` must be `user_id|agent_id`. Supports text, image, voice, file, flattened forward, and quote fallback. | +| `reply_message` | Supported | Replies to the original WeCom sender and application agent from `source_platform_object`. | +| `get_message` | Supported from cache | Returns cached inbound `MessageReceivedEvent` by message ID. | +| `get_user_info` | Supported | Uses cached event users first, then WeCom `user/get`. | +| `get_friend_list` | Partial | Returns users seen by this adapter instance. Full contacts listing is not declared as common coverage. | +| `call_platform_api` | Supported | See below. | +| `edit_message` | Not supported | WeCom application messages do not expose a general edit endpoint for sent messages. | +| `delete_message` | Not supported | WeCom application messages do not expose a general delete endpoint for sent messages. | +| `get_group_info` / member APIs | Not supported | Regular WeCom application callbacks handled here are private user messages, not group-chat bot messages. | +| `upload_file` / `get_file_url` | Not supported as common APIs | WeCom media upload is used internally while sending image/voice/file components; no portable standalone common file URL is exposed. | + +## Platform-Specific APIs + +`call_platform_api(action, params)` supports: + +- `check_access_token` +- `refresh_access_token` +- `get_user_info` +- `send_to_all` + +`send_to_all` requires a configured `contacts_secret` with suitable contact visibility and should be treated as a broad-send operation in live testing. + +## Unit Verification + +Covered by: + +```bash +uv run pytest tests/unit_tests/platform/test_wecom_eba_adapter.py +``` + +The unit tests cover: + +- Manifest events/APIs/platform actions match adapter declarations. +- Outbound component conversion for text, image, voice, file, quote fallback, and byte-safe text splitting. +- Text callback conversion to `MessageReceivedEvent`. +- Legacy `FriendMessage` compatibility. +- EBA listener dispatch and inbound message/user cache. +- `send_message`, `reply_message`, and safe platform API dispatch against a mocked WeCom client. + +## Standalone Runtime Plugin E2E Record + +Verified on May 27, 2026 with `EBAEventProbe`, SDK standalone runtime, LangBot core, and a real WeCom desktop client against the server test environment. + +```bash +cd langbot-plugin-sdk +uv run python -m langbot_plugin.cli.__init__ rt --debug-only --ws-control-port 5400 --ws-debug-port 5401 --skip-deps-check + +cd LangBot +uv run main.py --standalone-runtime + +cd data/plugins/LangBot__EBAEventProbe +EBA_PROBE_API=1 EBA_PROBE_COMPONENT_SWEEP=1 EBA_PROBE_PLATFORM_API=1 \ +uv --project /absolute/path/to/langbot-plugin-sdk run python -m langbot_plugin.cli.__init__ run +``` + +Evidence: + +- JSONL: `data/temp/wecom_eba_plugin_probe.jsonl` +- Bot: `wecom-eba` +- Client: real WeCom desktop client +- Environment: `dev.rockchin.top` test server + +Observed and verified: + +- A real private WeCom user message reached the plugin as `MessageReceived` with `adapter_name=wecom-eba`, common sender/chat fields, and `Source + Plain`. +- SDK API calls succeeded through the standalone runtime, including `get_langbot_version`, `get_bots`, `get_bot_info`, `send_message`, plugin/workspace storage, and manifest/list APIs. +- Safe adapter API checks succeeded through the plugin path for cached message/user data and declared safe platform API actions. + +Still required for stricter acceptance: + +- Send a private image and confirm common `Image` reaches the plugin. +- Have the plugin call `send_message` and `reply_message` for text and one media component, then verify the WeCom client receives the bot output. +- Exercise `send_to_all` only with a disposable visible-contact scope. +- Trigger one non-text/image callback, if available, and confirm it becomes `PlatformSpecificEventReceived`. + +## Current Acceptance + +Current status is **partial EBA acceptance**. + +Blocked items: + +- Real inbound image/voice/file evidence was not completed in this run. +- Inbound voice/file callback parsing is not present in the legacy `WecomClient.get_message()` path, so the EBA adapter does not claim those receive components yet. +- Group/member/moderation APIs do not apply to this regular WeCom application-message adapter. diff --git a/docs/event-based-agents/adapters/wecombot.md b/docs/event-based-agents/adapters/wecombot.md new file mode 100644 index 00000000..5eee651f --- /dev/null +++ b/docs/event-based-agents/adapters/wecombot.md @@ -0,0 +1,148 @@ +# WeComBot EBA Adapter + +## Status + +WeCom AI Bot now has an EBA adapter directory: + +```text +src/langbot/pkg/platform/adapters/wecombot/ +├── adapter.py +├── api_impl.py +├── event_converter.py +├── manifest.yaml +├── message_converter.py +├── platform_api.py +└── types.py +``` + +The adapter is registered as `wecombot-eba`. + +This is separate from regular WeCom internal applications (`wecom-eba`). WeComBot supports WebSocket long connection mode, which does not require a webhook URL. Webhook mode remains available when `enable-webhook=true`. + +## Configuration + +| Field | Required | Default | Description | +|-------|----------|---------|-------------| +| `BotId` | Yes for WebSocket mode | `""` | WeCom AI Bot ID. | +| `robot_name` | Yes | `""` | Bot display name used to strip bot mentions from incoming group text. | +| `enable-webhook` | Yes | `false` | `false` uses WebSocket long connection mode; `true` uses webhook callback mode. | +| `webhook_url` | No | `""` | Unified webhook URL, only needed when webhook mode is enabled. | +| `Secret` | Yes for WebSocket mode | `""` | WeCom AI Bot secret for long connection mode. | +| `Corpid` | Yes for webhook mode | `""` | WeCom corporate ID for webhook callback mode. | +| `Token` | Yes for webhook mode | `""` | WeCom callback token. | +| `EncodingAESKey` | Yes for webhook mode; optional for WebSocket media decrypt | `""` | Message encryption/decryption key. | +| `enable-stream-reply` | No | `true` | Enables WeComBot streaming replies. | + +## Events + +WeComBot declares these EBA events: + +- `message.received` +- `feedback.received` +- `platform.specific` + +`message.received` covers private and group messages from the WeComBot SDK. `feedback.received` covers WeComBot like/dislike feedback callbacks. Native SDK events without a common EBA equivalent are emitted as `platform.specific`. + +## Common APIs + +| API | Status | Notes | +|-----|--------|-------| +| `send_message` | Supported in WebSocket mode | Sends proactive markdown/text to a person or group chat ID. Webhook mode raises `NotSupportedError` because the platform callback flow has no proactive send path here. | +| `reply_message` | Supported | Replies through native `req_id` in WebSocket mode or stream finalization/cache in webhook mode. | +| `get_message` | Supported from cache | Returns cached inbound `MessageReceivedEvent` by message ID. | +| `get_user_info` | Supported from cache | WeComBot events carry user info; no full user lookup endpoint is declared. | +| `get_friend_list` | Partial | Returns users observed by this adapter instance. | +| `get_group_info` | Supported from cache | Returns groups observed from inbound group messages. | +| `get_group_member_info` | Supported from cache | Returns observed sender/group-member pairs. | +| `get_group_member_list` | Partial | Returns observed members for the cached group only. | +| `call_platform_api` | Supported | See below. | +| `edit_message` / `delete_message` / `forward_message` | Not supported | WeComBot does not expose portable common APIs for these operations in the current SDK wrapper. | +| `upload_file` / `get_file_url` | Not supported as common APIs | Media is represented inside messages; no portable standalone file upload/URL API is declared. | +| moderation / leave APIs | Not supported | WeComBot does not expose equivalent common moderation operations through this adapter. | + +## Platform-Specific APIs + +`call_platform_api(action, params)` supports: + +- `is_websocket_mode` +- `get_stream_session_status` +- `send_markdown` + +`send_markdown` is only available in WebSocket mode. + +## Unit Verification + +Covered by: + +```bash +PYTHONPATH=/Users/wangqiang/code/python/langbot-plugin-sdk/src uv run pytest tests/unit_tests/platform/test_wecombot_eba_adapter.py +``` + +The unit tests cover: + +- Manifest events/APIs/platform actions match adapter declarations. +- Outbound common components flatten to WeComBot markdown/text. +- Private and group native events become `MessageReceivedEvent`. +- Inbound image, file, voice, and quote components map to common `MessageChain`. +- Legacy `FriendMessage`/`GroupMessage` compatibility. +- EBA listener dispatch, message/user/group/member cache, reply, send, streaming chunk, feedback, and platform API calls. + +## Live Probe + +The direct adapter probe is: + +```bash +PYTHONPATH=/absolute/path/to/langbot-plugin-sdk/src uv run python tests/e2e/live_wecombot_eba_probe.py --help +``` + +Default mode is WebSocket long connection and requires: + +- `WECOMBOT_BOT_ID` +- `WECOMBOT_SECRET` +- `WECOMBOT_ROBOT_NAME` +- optional `WECOMBOT_ENCODING_AES_KEY` + +Webhook mode uses `--webhook` and requires: + +- `WECOMBOT_TOKEN` +- `WECOMBOT_ENCODING_AES_KEY` +- `WECOMBOT_CORPID` + +The probe writes JSONL evidence to `data/temp/wecombot_eba_live_probe.jsonl`, waits for a real WeComBot message, records common EBA event fields and message components, then runs safe cached/common/platform API checks. + +## Standalone Runtime Plugin E2E Record + +Verified on May 27, 2026 with `EBAEventProbe`, SDK standalone runtime, LangBot core, and the real WeCom desktop client in a WeCom AI Bot private chat. + +Evidence: + +- JSONL: `data/temp/wecombot_eba_plugin_probe.jsonl` +- Bot UUID: `9f5d4125-7b6d-4c98-8ca2-111111111111` +- Adapter: `wecombot-eba` +- Client: real WeCom desktop client, private `LangBot` BOT chat +- Mode: WebSocket long connection (`enable-webhook=false`) + +Observed and verified: + +- A real user-side message reached the plugin as `MessageReceived` with `adapter_name=wecombot-eba`, common sender/chat fields, and `Source + Plain`. +- SDK API calls succeeded through the standalone runtime: `get_langbot_version`, `get_bots`, `get_bot_info`, `send_message`, plugin/workspace storage, manifest/list APIs, and safe cached common platform APIs. +- Outbound component sweep was visible in the WeCom client and returned `errcode=0`: plain/mention/face fallback, base64 image marker, quote fallback, file marker, and flattened forward fallback. +- Declared WeComBot platform APIs succeeded through `plugin.call_platform_api`: `is_websocket_mode`, `get_stream_session_status`, and `send_markdown`. +- The `send_markdown` platform API produced visible bot output in the WeCom client. + +Not completed: + +- Clicking the visible WeCom AI feedback button did not produce a `FeedbackReceived` JSONL entry in this run, so `feedback.received` remains unverified at plugin E2E level. +- Group chat inbound and group cache/member coverage still need a real group-side trigger. +- Real inbound image/file/voice from the WeCom client was not exercised. + +## Current Acceptance + +Current status is **partial EBA acceptance**. + +Blocked or limited items: + +- `feedback.received` is implemented and unit-covered, but real plugin E2E feedback evidence was not observed from the desktop client click. +- Outbound image/voice/file are flattened as textual markers because the WeComBot SDK reply/proactive path used here is markdown/text oriented. +- Group member APIs are cache-backed and only know members observed in received messages. +- Destructive or moderation APIs are not declared because the current WeComBot protocol surface does not provide safe common equivalents. diff --git a/src/langbot/libs/wecom_ai_bot_api/api.py b/src/langbot/libs/wecom_ai_bot_api/api.py index b6f45cf2..f4e3a988 100644 --- a/src/langbot/libs/wecom_ai_bot_api/api.py +++ b/src/langbot/libs/wecom_ai_bot_api/api.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import base64 import json @@ -7,7 +9,7 @@ import uuid import xml.etree.ElementTree as ET from dataclasses import dataclass, field import re -from typing import Any, Callable, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, Optional, Tuple from urllib.parse import unquote import httpx @@ -16,7 +18,9 @@ from quart import Quart, request, Response, jsonify from langbot.libs.wecom_ai_bot_api import wecombotevent from langbot.libs.wecom_ai_bot_api.WXBizMsgCrypt3 import WXBizMsgCrypt -from langbot.pkg.platform.logger import EventLogger + +if TYPE_CHECKING: + from langbot.pkg.platform.logger import EventLogger @dataclass diff --git a/src/langbot/libs/wecom_ai_bot_api/ws_client.py b/src/langbot/libs/wecom_ai_bot_api/ws_client.py index 5125a704..8ad432fd 100644 --- a/src/langbot/libs/wecom_ai_bot_api/ws_client.py +++ b/src/langbot/libs/wecom_ai_bot_api/ws_client.py @@ -15,13 +15,15 @@ import json import secrets import time import traceback -from typing import Any, Callable, Optional +from typing import TYPE_CHECKING, Any, Callable, Optional import aiohttp from langbot.libs.wecom_ai_bot_api import wecombotevent from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession -from langbot.pkg.platform.logger import EventLogger + +if TYPE_CHECKING: + from langbot.pkg.platform.logger import EventLogger DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com' diff --git a/src/langbot/pkg/api/http/service/bot.py b/src/langbot/pkg/api/http/service/bot.py index 332c8ec7..bc003b90 100644 --- a/src/langbot/pkg/api/http/service/bot.py +++ b/src/langbot/pkg/api/http/service/bot.py @@ -5,6 +5,7 @@ import sqlalchemy import typing from ....core import app +from ....discover import engine from ....entity.persistence import bot as persistence_bot from ....entity.persistence import pipeline as persistence_pipeline @@ -17,6 +18,24 @@ class BotService: def __init__(self, ap: app.Application) -> None: self.ap = ap + def _get_adapter_component(self, adapter_name: str) -> engine.Component | None: + """Return the discovered platform adapter component for an adapter name.""" + for component in self.ap.discover.get_components_by_kind('MessagePlatformAdapter'): + if component.metadata.name == adapter_name: + return component + return None + + def _adapter_declares_webhook_url(self, adapter_name: str) -> bool: + """Whether the adapter manifest declares a generated webhook URL config item.""" + component = self._get_adapter_component(adapter_name) + if component is None: + return False + + for config_item in component.spec.get('config', []): + if config_item.get('type') == 'webhook-url': + return True + return False + async def get_bots(self, include_secret: bool = True) -> list[dict]: """获取所有机器人""" result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_bot.Bot)) @@ -58,17 +77,10 @@ class BotService: if runtime_bot is not None: adapter_runtime_values['bot_account_id'] = runtime_bot.adapter.bot_account_id - # Webhook URL for unified webhook adapters (independent of bot running state) - if persistence_bot['adapter'] in [ - 'wecom', - 'wecombot', - 'officialaccount', - 'qqofficial', - 'slack', - 'wecomcs', - 'LINE', - 'lark', - ]: + # Webhook URL for adapters that declare a generated webhook config item. + # This is manifest-driven so EBA adapters do not need to be mirrored in a + # second hard-coded list. + if self._adapter_declares_webhook_url(persistence_bot['adapter']): webhook_prefix = self.ap.instance_config.data['api'].get('webhook_prefix', 'http://127.0.0.1:5300') extra_webhook_prefix = self.ap.instance_config.data['api'].get('extra_webhook_prefix', '') webhook_url = f'/bots/{bot_uuid}' diff --git a/src/langbot/pkg/pipeline/preproc/preproc.py b/src/langbot/pkg/pipeline/preproc/preproc.py index 83ddce89..02793137 100644 --- a/src/langbot/pkg/pipeline/preproc/preproc.py +++ b/src/langbot/pkg/pipeline/preproc/preproc.py @@ -163,13 +163,21 @@ class PreProcessor(stage.PipelineStage): plain_text = '' quote_msg = query.pipeline_config['trigger'].get('misc', '').get('combine-quote-message') + local_agent_without_vision = ( + selected_runner == 'local-agent' + and llm_model + and not llm_model.model_entity.abilities.__contains__('vision') + ) for me in query.message_chain: if isinstance(me, platform_message.Plain): content_list.append(provider_message.ContentElement.from_text(me.text)) plain_text += me.text elif isinstance(me, platform_message.Image): - if selected_runner != 'local-agent' or ( + if local_agent_without_vision: + content_list.append(provider_message.ContentElement.from_text('[Image]')) + plain_text += '[Image]' + elif selected_runner != 'local-agent' or ( llm_model and llm_model.model_entity.abilities.__contains__('vision') ): if me.base64 is not None: @@ -190,7 +198,10 @@ class PreProcessor(stage.PipelineStage): if isinstance(msg, platform_message.Plain): content_list.append(provider_message.ContentElement.from_text(msg.text)) elif isinstance(msg, platform_message.Image): - if selected_runner != 'local-agent' or ( + if local_agent_without_vision: + content_list.append(provider_message.ContentElement.from_text('[Image]')) + plain_text += '[Image]' + elif selected_runner != 'local-agent' or ( llm_model and llm_model.model_entity.abilities.__contains__('vision') ): if msg.base64 is not None: diff --git a/src/langbot/pkg/platform/adapters/wecom/__init__.py b/src/langbot/pkg/platform/adapters/wecom/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/__init__.py @@ -0,0 +1 @@ + diff --git a/src/langbot/pkg/platform/adapters/wecom/adapter.py b/src/langbot/pkg/platform/adapters/wecom/adapter.py new file mode 100644 index 00000000..50fd5ff3 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/adapter.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +import asyncio +import traceback +import typing + +import pydantic + +from langbot.libs.wecom_api.api import WecomClient +from langbot.libs.wecom_api.wecomevent import WecomEvent +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger +from langbot.pkg.platform.adapters.wecom.api_impl import WecomAPIMixin +from langbot.pkg.platform.adapters.wecom.event_converter import WecomEventConverter +from langbot.pkg.platform.adapters.wecom.message_converter import WecomMessageConverter +from langbot.pkg.platform.adapters.wecom.platform_api import PLATFORM_API_MAP +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class WecomAdapter(WecomAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter): + bot: WecomClient = pydantic.Field(exclude=True) + + message_converter: WecomMessageConverter = WecomMessageConverter() + event_converter: WecomEventConverter = WecomEventConverter() + + config: dict + bot_uuid: str | None = None + listeners: dict[ + typing.Type[platform_events.Event], + typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None], + ] = {} + _message_cache: dict[str, platform_events.MessageReceivedEvent] = {} + _user_cache: dict[str, typing.Any] = {} + + class Config: + arbitrary_types_allowed = True + + def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger): + required_keys = [ + 'corpid', + 'secret', + 'token', + 'EncodingAESKey', + ] + missing_keys = [key for key in required_keys if key not in config] + if missing_keys: + raise Exception(f'WeCom missing required config fields: {missing_keys}') + + bot = WecomClient( + corpid=config['corpid'], + secret=config['secret'], + token=config['token'], + EncodingAESKey=config['EncodingAESKey'], + contacts_secret=config.get('contacts_secret', ''), + logger=logger, + unified_mode=True, + api_base_url=config.get('api_base_url', 'https://qyapi.weixin.qq.com/cgi-bin'), + ) + + super().__init__( + config=config, + logger=logger, + bot=bot, + bot_account_id='', + bot_uuid=None, + listeners={}, + _message_cache={}, + _user_cache={}, + ) + self._register_native_handlers() + + def set_bot_uuid(self, bot_uuid: str): + self.bot_uuid = bot_uuid + + def get_supported_events(self) -> list[str]: + return [ + 'message.received', + 'platform.specific', + ] + + def get_supported_apis(self) -> list[str]: + return [ + 'send_message', + 'reply_message', + 'get_message', + 'get_user_info', + 'get_friend_list', + 'call_platform_api', + ] + + async def send_message( + self, + target_type: str, + target_id: str, + message: platform_message.MessageChain, + ) -> platform_events.MessageResult: + if target_type not in ('person', 'private'): + raise NotSupportedError(f'send_message:{target_type}') + + user_id, agent_id = self._parse_target_id(target_id) + content_list = await WecomMessageConverter.yiri2target(message, self.bot) + raw_results = [] + for content in content_list: + raw_results.append(await self._send_content(user_id, agent_id, content)) + return platform_events.MessageResult(raw={'results': raw_results}) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ) -> platform_events.MessageResult: + wecom_event = await WecomEventConverter.yiri2target(message_source) + if not isinstance(wecom_event, WecomEvent): + raise ValueError('WeCom reply_message requires a WecomEvent source object') + content_list = await WecomMessageConverter.yiri2target(message, self.bot) + raw_results = [] + for content in content_list: + raw_results.append(await self._send_content(wecom_event.user_id, int(wecom_event.agent_id), content)) + return platform_events.MessageResult(message_id=wecom_event.message_id, raw={'results': raw_results}) + + async def call_platform_api(self, action: str, params: dict = {}) -> dict: + handler = PLATFORM_API_MAP.get(action) + if handler is None: + raise NotSupportedError(f'call_platform_api:{action}') + return await handler(self.bot, params) + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + self.listeners[event_type] = callback + + def unregister_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + registered = self.listeners.get(event_type) + if registered is callback: + self.listeners.pop(event_type, None) + + async def handle_unified_webhook(self, bot_uuid: str, path: str, request): + return await self.bot.handle_unified_webhook(request) + + async def run_async(self): + async def keep_alive(): + while True: + await asyncio.sleep(1) + + await self.logger.info('WeCom EBA adapter running in unified webhook mode') + await keep_alive() + + async def kill(self) -> bool: + return True + + async def is_muted(self, group_id: int | None = None) -> bool: + return False + + def _register_native_handlers(self): + async def on_message(event: WecomEvent): + await self._handle_native_event(event) + + self.bot.on_message('text')(on_message) + self.bot.on_message('image')(on_message) + + async def _handle_native_event(self, event: WecomEvent): + self.bot_account_id = event.receiver_id or self.bot_account_id + try: + if platform_events.FriendMessage in self.listeners: + legacy_event = await self.event_converter.target2legacy(event, self.bot) + if legacy_event: + callback = self.listeners.get(type(legacy_event)) + if callback: + await callback(legacy_event, self) + + eba_event = await self.event_converter.target2yiri(event, self.bot) + if eba_event: + self._cache_event(eba_event) + await self._dispatch_eba_event(eba_event) + except Exception: + await self.logger.error(f'Error in wecom native event: {traceback.format_exc()}') + + async def _dispatch_eba_event(self, event: platform_events.EBAEvent): + for event_type in (type(event), platform_events.EBAEvent, platform_events.Event): + callback = self.listeners.get(event_type) + if callback: + await callback(event, self) + return + + def _cache_event(self, event: platform_events.Event): + if not isinstance(event, platform_events.MessageReceivedEvent): + return + self._message_cache[str(event.message_id)] = event + self._user_cache[str(event.sender.id)] = event.sender + + async def _send_content(self, user_id: str, agent_id: int, content: dict): + content_type = content.get('type') + if content_type == 'text': + return await self.bot.send_private_msg(user_id, agent_id, content.get('content', '')) + if content_type == 'image': + return await self.bot.send_image(user_id, agent_id, content['media_id']) + if content_type == 'voice': + return await self.bot.send_voice(user_id, agent_id, content['media_id']) + if content_type == 'file': + return await self.bot.send_file(user_id, agent_id, content['media_id']) + raise NotSupportedError(f'send_content:{content_type}') + + @staticmethod + def _parse_target_id(target_id: str) -> tuple[str, int]: + user_id, sep, agent_id = str(target_id).partition('|') + if not user_id or not sep or not agent_id: + raise ValueError('WeCom target_id must be formatted as "user_id|agent_id"') + return user_id, int(agent_id) diff --git a/src/langbot/pkg/platform/adapters/wecom/api_impl.py b/src/langbot/pkg/platform/adapters/wecom/api_impl.py new file mode 100644 index 00000000..07ec2784 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/api_impl.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import typing + +from langbot.libs.wecom_api.api import WecomClient +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class WecomAPIMixin: + bot: WecomClient + _message_cache: dict[str, platform_events.MessageReceivedEvent] + _user_cache: dict[str, platform_entities.User] + + async def get_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> platform_events.MessageReceivedEvent: + event = self._message_cache.get(str(message_id)) + if event is None: + raise NotSupportedError('get_message:message_not_cached') + return event + + async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User: + cached = self._user_cache.get(str(user_id)) + if cached is not None: + return cached + info = await self.bot.get_user_info(str(user_id)) + return platform_entities.User( + id=info.get('userid') or user_id, + nickname=info.get('name') or str(user_id), + username=info.get('alias') or info.get('userid') or None, + ) + + async def get_friend_list(self) -> list[platform_entities.User]: + return list(self._user_cache.values()) + + async def upload_file(self, file_data: bytes, filename: str) -> str: + raise NotSupportedError('upload_file') + + async def get_file_url(self, file_id: str) -> str: + raise NotSupportedError('get_file_url') + + async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup: + raise NotSupportedError('get_group_info') + + async def get_group_member_list( + self, + group_id: typing.Union[int, str], + ) -> list[platform_entities.UserGroupMember]: + raise NotSupportedError('get_group_member_list') + + async def get_group_member_info( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> platform_entities.UserGroupMember: + raise NotSupportedError('get_group_member_info') + + async def edit_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + new_content: platform_message.MessageChain, + ) -> None: + raise NotSupportedError('edit_message') + + async def delete_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> None: + raise NotSupportedError('delete_message') diff --git a/src/langbot/pkg/platform/adapters/wecom/event_converter.py b/src/langbot/pkg/platform/adapters/wecom/event_converter.py new file mode 100644 index 00000000..94c43fd7 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/event_converter.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import typing + +from langbot.libs.wecom_api.api import WecomClient +from langbot.libs.wecom_api.wecomevent import WecomEvent +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +from langbot.pkg.platform.adapters.wecom.message_converter import WecomMessageConverter +from langbot.pkg.platform.adapters.wecom.types import ADAPTER_NAME, make_private_chat_id +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events + + +class WecomEventConverter(abstract_platform_adapter.AbstractEventConverter): + @staticmethod + async def yiri2target(event: platform_events.Event) -> WecomEvent | None: + return getattr(event, 'source_platform_object', None) + + @staticmethod + async def target2legacy(event: WecomEvent, bot: WecomClient | None = None) -> platform_events.FriendMessage | None: + eba_event = await WecomEventConverter.target2yiri(event, bot) + if hasattr(eba_event, 'to_legacy_event'): + return eba_event.to_legacy_event() + if event.type in {'text', 'image'} and eba_event is not None: + friend = platform_entities.Friend( + id=f'u{event.user_id}', + nickname=getattr(getattr(eba_event, 'sender', None), 'nickname', str(event.user_id or '')), + remark='', + ) + return platform_events.FriendMessage( + sender=friend, + message_chain=eba_event.message_chain, + time=getattr(eba_event, 'timestamp', None), + source_platform_object=event, + ) + return None + + @staticmethod + async def target2yiri(event: WecomEvent, bot: WecomClient | None = None) -> platform_events.Event | None: + if event.type in {'text', 'image'}: + return await WecomEventConverter.message_to_eba(event, bot) + return WecomEventConverter.platform_specific(event, f'message.{event.detail_type or event.type or "unknown"}') + + @staticmethod + async def message_to_eba(event: WecomEvent, bot: WecomClient | None = None) -> platform_events.MessageReceivedEvent: + if event.type == 'image': + message_chain = await WecomMessageConverter.target2yiri_image(event.picurl, event.message_id) + else: + message_chain = await WecomMessageConverter.target2yiri_text(event.message, event.message_id) + + sender = await WecomEventConverter.user_from_event(event, bot) + return platform_events.MessageReceivedEvent( + type='message.received', + adapter_name=ADAPTER_NAME, + message_id=event.message_id or '', + message_chain=message_chain, + sender=sender, + chat_type=platform_entities.ChatType.PRIVATE, + chat_id=make_private_chat_id(event.user_id, event.agent_id), + group=None, + timestamp=float(event.timestamp or 0), + source_platform_object=event, + ) + + @staticmethod + async def user_from_event(event: WecomEvent, bot: WecomClient | None = None) -> platform_entities.User: + nickname = str(event.user_id or '') + raw: dict[str, typing.Any] = {} + if bot and event.user_id: + try: + raw = await bot.get_user_info(event.user_id) + nickname = raw.get('name') or nickname + except Exception: + raw = {} + + return platform_entities.User( + id=event.user_id or '', + nickname=nickname, + username=raw.get('alias') or raw.get('userid') or None, + ) + + @staticmethod + def platform_specific(event: WecomEvent, action: str) -> platform_events.PlatformSpecificEvent: + return platform_events.PlatformSpecificEvent( + type='platform.specific', + adapter_name=ADAPTER_NAME, + action=action, + data=dict(event), + timestamp=float(event.timestamp or 0), + source_platform_object=event, + ) diff --git a/src/langbot/pkg/platform/adapters/wecom/manifest.yaml b/src/langbot/pkg/platform/adapters/wecom/manifest.yaml new file mode 100644 index 00000000..c3cc57f1 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/manifest.yaml @@ -0,0 +1,117 @@ +apiVersion: v1 +kind: MessagePlatformAdapter + +metadata: + name: wecom-eba + label: + en_US: WeCom (EBA) + zh_Hans: 企业微信 (EBA) + zh_Hant: 企業微信 (EBA) + description: + en_US: WeCom application message adapter (EBA architecture) + zh_Hans: 企业微信内部应用消息适配器(EBA 架构版本) + zh_Hant: 企業微信內部應用訊息適配器(EBA 架構版本) + icon: wecom.png + +spec: + categories: + - popular + - china + help_links: + zh: https://link.langbot.app/zh/platforms/wecom + en: https://link.langbot.app/en/platforms/wecom + ja: https://link.langbot.app/ja/platforms/wecom + config: + - name: webhook_url + label: + en_US: Webhook Callback URL + zh_Hans: Webhook 回调地址 + zh_Hant: Webhook 回調地址 + description: + en_US: Copy this URL and paste it into your WeCom app's webhook configuration + zh_Hans: 复制此地址并粘贴到企业微信应用的 Webhook 配置中 + zh_Hant: 複製此地址並貼到企業微信應用的 Webhook 設定中 + type: webhook-url + required: false + default: "" + - name: corpid + label: + en_US: Corpid + zh_Hans: 企业ID + zh_Hant: 企業ID + type: string + required: true + default: "" + - name: secret + label: + en_US: Secret + zh_Hans: 密钥 (Secret) + zh_Hant: 密鑰 (Secret) + type: string + required: true + default: "" + - name: token + label: + en_US: Token + zh_Hans: 令牌 (Token) + zh_Hant: 令牌 (Token) + type: string + required: true + default: "" + - name: EncodingAESKey + label: + en_US: EncodingAESKey + zh_Hans: 消息加解密密钥 (EncodingAESKey) + zh_Hant: 訊息加解密密鑰 (EncodingAESKey) + type: string + required: true + default: "" + - name: contacts_secret + label: + en_US: Contacts Secret + zh_Hans: 通讯录密钥 + zh_Hant: 通訊錄密鑰 + type: string + required: false + default: "" + - name: api_base_url + label: + en_US: API Base URL + zh_Hans: API 基础 URL + zh_Hant: API 基礎 URL + description: + en_US: Optional WeCom API base URL for private network or reverse proxy deployments. + zh_Hans: 可选,若部署在内网环境并通过反向代理访问企业微信 API,可根据文档填写此项 + zh_Hant: 可選,若部署在內網環境並透過反向代理存取企業微信 API,可根據文件填寫此項 + type: string + required: false + default: "https://qyapi.weixin.qq.com/cgi-bin" + + supported_events: + - message.received + - platform.specific + + supported_apis: + required: + - send_message + - reply_message + optional: + - get_message + - get_user_info + - get_friend_list + - call_platform_api + + platform_specific_apis: + - action: check_access_token + description: { en_US: "Check whether the current WeCom access token is usable", zh_Hans: "检查当前企业微信 access token 是否可用" } + - action: refresh_access_token + description: { en_US: "Refresh the WeCom access token", zh_Hans: "刷新企业微信 access token" } + - action: get_user_info + description: { en_US: "Get WeCom user information by user ID", zh_Hans: "按用户 ID 获取企业微信用户信息" } + - action: send_to_all + description: { en_US: "Send an application text message to all contacts available to the configured contacts secret", zh_Hans: "使用配置的通讯录密钥向可见成员群发应用文本消息" } + +execution: + python: + path: ./adapter.py + attr: WecomAdapter diff --git a/src/langbot/pkg/platform/adapters/wecom/message_converter.py b/src/langbot/pkg/platform/adapters/wecom/message_converter.py new file mode 100644 index 00000000..e742778d --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/message_converter.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import datetime + +from langbot.libs.wecom_api.api import WecomClient +from langbot.pkg.utils import image +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +def split_string_by_bytes(text: str, limit: int = 2048, encoding: str = 'utf-8') -> list[str]: + """Split text without cutting a multi-byte character in half.""" + bytes_data = text.encode(encoding) + total_len = len(bytes_data) + parts: list[str] = [] + start = 0 + + while start < total_len: + end = min(start + limit, total_len) + chunk = bytes_data[start:end] + part = chunk.decode(encoding, errors='ignore') + part_len = len(part.encode(encoding)) + if part_len == 0 and end < total_len: + start += 1 + continue + parts.append(part) + start += part_len + + return parts + + +class WecomMessageConverter(abstract_platform_adapter.AbstractMessageConverter): + @staticmethod + async def yiri2target(message_chain: platform_message.MessageChain, bot: WecomClient) -> list[dict]: + content_list: list[dict] = [] + + for msg in message_chain: + if isinstance(msg, platform_message.Source): + continue + if isinstance(msg, platform_message.Plain): + content_list.extend({'type': 'text', 'content': chunk} for chunk in split_string_by_bytes(msg.text)) + elif isinstance(msg, platform_message.Image): + content_list.append({'type': 'image', 'media_id': await bot.get_media_id(msg)}) + elif isinstance(msg, platform_message.Voice): + content_list.append({'type': 'voice', 'media_id': await bot.get_media_id(msg)}) + elif isinstance(msg, platform_message.File): + content_list.append({'type': 'file', 'media_id': await bot.get_media_id(msg)}) + elif isinstance(msg, platform_message.Forward): + for node in msg.node_list: + content_list.extend(await WecomMessageConverter.yiri2target(node.message_chain, bot)) + elif isinstance(msg, platform_message.Quote): + if msg.id is not None: + content_list.append({'type': 'text', 'content': f'[Quote {msg.id}] '}) + if msg.origin: + content_list.extend(await WecomMessageConverter.yiri2target(msg.origin, bot)) + elif isinstance(msg, platform_message.At): + content_list.append({'type': 'text', 'content': f'@{msg.display or msg.target}'}) + elif isinstance(msg, platform_message.AtAll): + content_list.append({'type': 'text', 'content': '@all'}) + else: + content_list.append({'type': 'text', 'content': str(msg)}) + + return content_list + + @staticmethod + async def target2yiri_text(message: str | None, message_id: int | str | None = -1) -> platform_message.MessageChain: + return platform_message.MessageChain( + [ + platform_message.Source(id=message_id, time=datetime.datetime.now()), + platform_message.Plain(text=message or ''), + ] + ) + + @staticmethod + async def target2yiri_image(picurl: str, message_id: int | str | None = -1) -> platform_message.MessageChain: + image_base64, image_format = await image.get_wecom_image_base64(pic_url=picurl) + return platform_message.MessageChain( + [ + platform_message.Source(id=message_id, time=datetime.datetime.now()), + platform_message.Image(base64=f'data:image/{image_format};base64,{image_base64}'), + ] + ) diff --git a/src/langbot/pkg/platform/adapters/wecom/platform_api.py b/src/langbot/pkg/platform/adapters/wecom/platform_api.py new file mode 100644 index 00000000..15c937c4 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/platform_api.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import typing + +from langbot.libs.wecom_api.api import WecomClient + + +async def check_access_token(bot: WecomClient, params: dict) -> dict: + return {'valid': await bot.check_access_token()} + + +async def refresh_access_token(bot: WecomClient, params: dict) -> dict: + bot.access_token = await bot.get_access_token(bot.secret) + return {'ok': bool(bot.access_token)} + + +async def get_user_info(bot: WecomClient, params: dict) -> dict: + user_id = params.get('user_id') or params.get('userid') + if not user_id: + raise ValueError('user_id is required') + return await bot.get_user_info(str(user_id)) + + +async def send_to_all(bot: WecomClient, params: dict) -> dict: + content = params.get('content') + agent_id = params.get('agent_id') or params.get('agentid') + if not content: + raise ValueError('content is required') + if agent_id is None: + raise ValueError('agent_id is required') + await bot.send_to_all(str(content), int(agent_id)) + return {'ok': True} + + +PLATFORM_API_MAP: dict[str, typing.Callable[[WecomClient, dict], typing.Awaitable[dict]]] = { + 'check_access_token': check_access_token, + 'refresh_access_token': refresh_access_token, + 'get_user_info': get_user_info, + 'send_to_all': send_to_all, +} diff --git a/src/langbot/pkg/platform/adapters/wecom/types.py b/src/langbot/pkg/platform/adapters/wecom/types.py new file mode 100644 index 00000000..596459e0 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecom/types.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +ADAPTER_NAME = 'wecom-eba' + + +def make_private_chat_id(user_id: str | int | None, agent_id: str | int | None) -> str: + """Build the routable private chat id used by the WeCom EBA adapter.""" + user = str(user_id or '') + agent = str(agent_id or '') + if not user or not agent: + return user + return f'{user}|{agent}' diff --git a/src/langbot/pkg/platform/adapters/wecom/wecom.png b/src/langbot/pkg/platform/adapters/wecom/wecom.png new file mode 100644 index 00000000..8588c20d Binary files /dev/null and b/src/langbot/pkg/platform/adapters/wecom/wecom.png differ diff --git a/src/langbot/pkg/platform/adapters/wecombot/__init__.py b/src/langbot/pkg/platform/adapters/wecombot/__init__.py new file mode 100644 index 00000000..9d48db4f --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/src/langbot/pkg/platform/adapters/wecombot/adapter.py b/src/langbot/pkg/platform/adapters/wecombot/adapter.py new file mode 100644 index 00000000..0ba7864c --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/adapter.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import asyncio +import time +import traceback +import typing + +import pydantic + +from langbot.libs.wecom_ai_bot_api.api import WecomBotClient +from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent +from langbot.libs.wecom_ai_bot_api.ws_client import WecomBotWsClient +from langbot.pkg.platform.adapters.wecombot.api_impl import WecomBotAPIMixin +from langbot.pkg.platform.adapters.wecombot.event_converter import WecomBotEventConverter +from langbot.pkg.platform.adapters.wecombot.message_converter import WecomBotMessageConverter +from langbot.pkg.platform.adapters.wecombot.platform_api import PLATFORM_API_MAP +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class WecomBotAdapter(WecomBotAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter): + bot: typing.Any = pydantic.Field(exclude=True) + + message_converter: WecomBotMessageConverter = WecomBotMessageConverter() + event_converter: WecomBotEventConverter + + config: dict + bot_uuid: str | None = None + bot_name: str = '' + listeners: dict[ + typing.Type[platform_events.Event], + typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None], + ] = {} + _message_cache: dict[str, platform_events.MessageReceivedEvent] = {} + _user_cache: dict[str, platform_entities.User] = {} + _group_cache: dict[str, platform_entities.UserGroup] = {} + _member_cache: dict[tuple[str, str], platform_entities.UserGroupMember] = {} + _stream_to_monitoring_msg: dict[str, tuple[str, float]] = {} + _STREAM_MAPPING_TTL: int = 600 + + class Config: + arbitrary_types_allowed = True + + def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger): + enable_webhook = config.get('enable-webhook', False) + bot_name = config.get('robot_name', '') + if not enable_webhook: + required_keys = ['BotId', 'Secret'] + missing_keys = [key for key in required_keys if not config.get(key)] + if missing_keys: + raise Exception(f'WeComBot WebSocket mode missing config: {missing_keys}') + bot = WecomBotWsClient( + bot_id=config['BotId'], + secret=config['Secret'], + logger=logger, + encoding_aes_key=config.get('EncodingAESKey', ''), + ) + else: + required_keys = ['Token', 'EncodingAESKey', 'Corpid'] + missing_keys = [key for key in required_keys if not config.get(key)] + if missing_keys: + raise Exception(f'WeComBot webhook mode missing config: {missing_keys}') + bot = WecomBotClient( + Token=config['Token'], + EnCodingAESKey=config['EncodingAESKey'], + Corpid=config['Corpid'], + logger=logger, + unified_mode=True, + ) + + super().__init__( + config=config, + logger=logger, + bot=bot, + bot_account_id=config.get('BotId', ''), + bot_uuid=None, + bot_name=bot_name, + event_converter=WecomBotEventConverter(bot_name=bot_name), + listeners={}, + _message_cache={}, + _user_cache={}, + _group_cache={}, + _member_cache={}, + _stream_to_monitoring_msg={}, + ) + self._register_native_handlers() + + def set_bot_uuid(self, bot_uuid: str): + self.bot_uuid = bot_uuid + + def get_supported_events(self) -> list[str]: + return [ + 'message.received', + 'feedback.received', + 'platform.specific', + ] + + def get_supported_apis(self) -> list[str]: + return [ + 'send_message', + 'reply_message', + 'get_message', + 'get_user_info', + 'get_friend_list', + 'get_group_info', + 'get_group_member_info', + 'get_group_member_list', + 'call_platform_api', + ] + + async def send_message( + self, + target_type: str, + target_id: str, + message: platform_message.MessageChain, + ) -> platform_events.MessageResult: + if self.config.get('enable-webhook', False): + raise NotSupportedError('send_message:webhook_mode') + if target_type not in ('person', 'private', 'group'): + raise NotSupportedError(f'send_message:{target_type}') + content = await WecomBotMessageConverter.yiri2target(message) + raw = await self.bot.send_message(str(target_id), content) + return platform_events.MessageResult(raw={'result': raw}) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ) -> platform_events.MessageResult: + event = await WecomBotEventConverter.yiri2target(message_source) + if not isinstance(event, WecomBotEvent): + raise ValueError('WeComBot reply_message requires a WecomBotEvent source object') + content = await WecomBotMessageConverter.yiri2target(message) + if not self.config.get('enable-webhook', False) and event.get('req_id'): + raw = await self.bot.reply_text(event.get('req_id'), content) + else: + raw = await self.bot.set_message(event.message_id, content) + return platform_events.MessageResult(message_id=event.message_id, raw={'result': raw}) + + async def reply_message_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message, + message: platform_message.MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ) -> dict: + event = await WecomBotEventConverter.yiri2target(message_source) + if not isinstance(event, WecomBotEvent): + raise ValueError('WeComBot reply_message_chunk requires a WecomBotEvent source object') + content = await WecomBotMessageConverter.yiri2target(message) + success = await self.bot.push_stream_chunk(event.message_id, content, is_final=is_final) + if not success and is_final and not self.config.get('enable-webhook', False) and event.get('req_id'): + await self.bot.reply_text(event.get('req_id'), content) + return {'stream': success} + + async def is_stream_output_supported(self) -> bool: + return self.config.get('enable-stream-reply', True) + + async def call_platform_api(self, action: str, params: dict = {}) -> dict: + handler = PLATFORM_API_MAP.get(action) + if handler is None: + raise NotSupportedError(f'call_platform_api:{action}') + return await handler(self.bot, params) + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + self.listeners[event_type] = callback + + def unregister_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + registered = self.listeners.get(event_type) + if registered is callback: + self.listeners.pop(event_type, None) + + async def handle_unified_webhook(self, bot_uuid: str, path: str, request): + if not self.config.get('enable-webhook', False): + return None + return await self.bot.handle_unified_webhook(request) + + async def run_async(self): + if not self.config.get('enable-webhook', False): + await self.bot.connect() + return + + async def keep_alive(): + while True: + await asyncio.sleep(1) + + await self.logger.info('WeComBot EBA adapter running in unified webhook mode') + await keep_alive() + + async def kill(self) -> bool: + if not self.config.get('enable-webhook', False): + await self.bot.disconnect() + return True + + async def is_muted(self, group_id: int | None = None) -> bool: + return False + + async def on_monitoring_message_created(self, query, monitoring_message_id: str): + try: + stream_id = query.message_event.source_platform_object.stream_id + if stream_id: + self._stream_to_monitoring_msg[stream_id] = (monitoring_message_id, time.time()) + self._cleanup_stream_mapping() + except Exception as e: + await self.logger.debug(f'Failed to map stream_id to monitoring message: {e}') + + def _register_native_handlers(self): + self.bot.on_message('single')(self._handle_native_event) + self.bot.on_message('group')(self._handle_native_event) + if hasattr(self.bot, 'on_feedback'): + self.bot.on_feedback()(self._handle_feedback) + if hasattr(self.bot, 'on_message'): + self.bot.on_message('event')(self._handle_native_event) + + async def _handle_native_event(self, event: WecomBotEvent): + try: + if platform_events.FriendMessage in self.listeners or platform_events.GroupMessage in self.listeners: + legacy_event = await self.event_converter.target2legacy(event) + if legacy_event and type(legacy_event) in self.listeners: + await self.listeners[type(legacy_event)](legacy_event, self) + + eba_event = await self.event_converter.target2yiri(event) + if eba_event: + self._cache_event(eba_event) + await self._dispatch_eba_event(eba_event) + except Exception: + await self.logger.error(f'Error in wecombot native event: {traceback.format_exc()}') + + async def _handle_feedback(self, **kwargs): + try: + event = WecomBotEventConverter.feedback_to_eba(**kwargs) + if event.stream_id and event.stream_id in self._stream_to_monitoring_msg: + monitoring_msg_id, _ = self._stream_to_monitoring_msg[event.stream_id] + event.stream_id = monitoring_msg_id + await self._dispatch_eba_event(event) + except Exception: + await self.logger.error(f'Error in wecombot feedback event: {traceback.format_exc()}') + + async def _dispatch_eba_event(self, event: platform_events.EBAEvent): + for event_type in (type(event), platform_events.EBAEvent, platform_events.Event): + callback = self.listeners.get(event_type) + if callback: + await callback(event, self) + return + + def _cache_event(self, event: platform_events.Event): + if not isinstance(event, platform_events.MessageReceivedEvent): + return + self._message_cache[str(event.message_id)] = event + self._user_cache[str(event.sender.id)] = event.sender + if event.group: + self._group_cache[str(event.group.id)] = event.group + self._member_cache[(str(event.group.id), str(event.sender.id))] = platform_entities.UserGroupMember( + user=event.sender, + group_id=event.group.id, + role=platform_entities.MemberRole.MEMBER, + display_name=event.sender.nickname, + ) + + def _cleanup_stream_mapping(self): + now = time.time() + expired = [key for key, (_, ts) in self._stream_to_monitoring_msg.items() if now - ts > self._STREAM_MAPPING_TTL] + for key in expired: + del self._stream_to_monitoring_msg[key] diff --git a/src/langbot/pkg/platform/adapters/wecombot/api_impl.py b/src/langbot/pkg/platform/adapters/wecombot/api_impl.py new file mode 100644 index 00000000..d2255cf8 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/api_impl.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import typing + +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class WecomBotAPIMixin: + _message_cache: dict[str, platform_events.MessageReceivedEvent] + _user_cache: dict[str, platform_entities.User] + _group_cache: dict[str, platform_entities.UserGroup] + _member_cache: dict[tuple[str, str], platform_entities.UserGroupMember] + + async def get_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> platform_events.MessageReceivedEvent: + event = self._message_cache.get(str(message_id)) + if event is None: + raise NotSupportedError('get_message:message_not_cached') + return event + + async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User: + cached = self._user_cache.get(str(user_id)) + if cached is None: + raise NotSupportedError('get_user_info:not_cached') + return cached + + async def get_friend_list(self) -> list[platform_entities.User]: + return list(self._user_cache.values()) + + async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup: + cached = self._group_cache.get(str(group_id)) + if cached is None: + raise NotSupportedError('get_group_info:not_cached') + return cached + + async def get_group_member_info( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> platform_entities.UserGroupMember: + cached = self._member_cache.get((str(group_id), str(user_id))) + if cached is None: + raise NotSupportedError('get_group_member_info:not_cached') + return cached + + async def get_group_member_list( + self, + group_id: typing.Union[int, str], + ) -> list[platform_entities.UserGroupMember]: + return [member for (cached_group_id, _), member in self._member_cache.items() if cached_group_id == str(group_id)] + + async def upload_file(self, file_data: bytes, filename: str) -> str: + raise NotSupportedError('upload_file') + + async def get_file_url(self, file_id: str) -> str: + raise NotSupportedError('get_file_url') + + async def edit_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + new_content: platform_message.MessageChain, + ) -> None: + raise NotSupportedError('edit_message') + + async def delete_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> None: + raise NotSupportedError('delete_message') + + async def forward_message( + self, + from_chat_type: str, + from_chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + to_chat_type: str, + to_chat_id: typing.Union[int, str], + ) -> platform_events.MessageResult: + raise NotSupportedError('forward_message') + + async def mute_member(self, group_id: typing.Union[int, str], user_id: typing.Union[int, str], duration: int = 0): + raise NotSupportedError('mute_member') + + async def unmute_member(self, group_id: typing.Union[int, str], user_id: typing.Union[int, str]): + raise NotSupportedError('unmute_member') + + async def kick_member(self, group_id: typing.Union[int, str], user_id: typing.Union[int, str]): + raise NotSupportedError('kick_member') + + async def leave_group(self, group_id: typing.Union[int, str]): + raise NotSupportedError('leave_group') diff --git a/src/langbot/pkg/platform/adapters/wecombot/event_converter.py b/src/langbot/pkg/platform/adapters/wecombot/event_converter.py new file mode 100644 index 00000000..5b94ce15 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/event_converter.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import time +import typing + +from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent +from langbot.pkg.platform.adapters.wecombot.message_converter import WecomBotMessageConverter +from langbot.pkg.platform.adapters.wecombot.types import ADAPTER_NAME +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events + + +class WecomBotEventConverter(abstract_platform_adapter.AbstractEventConverter): + def __init__(self, bot_name: str = ''): + self.bot_name = bot_name + + @staticmethod + async def yiri2target(event: platform_events.Event) -> typing.Any: + return getattr(event, 'source_platform_object', None) + + async def target2legacy(self, event: WecomBotEvent) -> platform_events.FriendMessage | platform_events.GroupMessage | None: + eba_event = await self.target2yiri(event) + if not isinstance(eba_event, platform_events.MessageReceivedEvent): + return None + if eba_event.chat_type == platform_entities.ChatType.PRIVATE: + return platform_events.FriendMessage( + sender=platform_entities.Friend(id=eba_event.sender.id, nickname=eba_event.sender.nickname, remark=''), + message_chain=eba_event.message_chain, + time=eba_event.timestamp, + source_platform_object=event, + ) + return platform_events.GroupMessage( + sender=platform_entities.GroupMember( + id=eba_event.sender.id, + permission='MEMBER', + member_name=eba_event.sender.nickname, + group=platform_entities.Group( + id=eba_event.group.id if eba_event.group else eba_event.chat_id, + name=eba_event.group.name if eba_event.group else '', + permission=platform_entities.Permission.Member, + ), + special_title='', + ), + message_chain=eba_event.message_chain, + time=eba_event.timestamp, + source_platform_object=event, + ) + + async def target2yiri(self, event: WecomBotEvent) -> platform_events.Event: + if event.type in {'single', 'group'} and event.msgtype != 'event': + return await self.message_to_eba(event) + return self.platform_specific(event, f'wecombot.{event.get("eventtype") or event.msgtype or event.type or "unknown"}') + + async def message_to_eba(self, event: WecomBotEvent) -> platform_events.MessageReceivedEvent: + sender = platform_entities.User(id=event.userid, nickname=event.username or event.userid) + group = None + chat_type = platform_entities.ChatType.PRIVATE + chat_id = event.userid + if event.type == 'group': + chat_type = platform_entities.ChatType.GROUP + chat_id = str(event.chatid) + group = platform_entities.UserGroup(id=str(event.chatid), name=event.chatname or str(event.chatid)) + + return platform_events.MessageReceivedEvent( + type='message.received', + adapter_name=ADAPTER_NAME, + message_id=event.message_id or '', + message_chain=await WecomBotMessageConverter.target2yiri(event, self.bot_name), + sender=sender, + chat_type=chat_type, + chat_id=chat_id or '', + group=group, + timestamp=time.time(), + source_platform_object=event, + ) + + @staticmethod + def feedback_to_eba( + *, + feedback_id: str, + feedback_type: int, + feedback_content: str | None = None, + inaccurate_reasons: list | None = None, + session=None, + ) -> platform_events.FeedbackReceivedEvent: + session_id = None + user_id = None + message_id = None + stream_id = None + if session: + if getattr(session, 'chat_id', None): + session_id = f'group_{session.chat_id}' + elif getattr(session, 'user_id', None): + session_id = f'person_{session.user_id}' + user_id = getattr(session, 'user_id', None) + message_id = getattr(session, 'msg_id', None) + stream_id = getattr(session, 'stream_id', None) + + return platform_events.FeedbackReceivedEvent( + type='feedback.received', + adapter_name=ADAPTER_NAME, + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_content, + inaccurate_reasons=[str(reason) for reason in (inaccurate_reasons or [])] or None, + user_id=user_id, + session_id=session_id, + message_id=message_id, + stream_id=stream_id, + timestamp=time.time(), + source_platform_object=session, + ) + + @staticmethod + def platform_specific(event: WecomBotEvent, action: str) -> platform_events.PlatformSpecificEvent: + return platform_events.PlatformSpecificEvent( + type='platform.specific', + adapter_name=ADAPTER_NAME, + action=action, + data=dict(event), + timestamp=time.time(), + source_platform_object=event, + ) diff --git a/src/langbot/pkg/platform/adapters/wecombot/manifest.yaml b/src/langbot/pkg/platform/adapters/wecombot/manifest.yaml new file mode 100644 index 00000000..bb812ea9 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/manifest.yaml @@ -0,0 +1,158 @@ +apiVersion: v1 +kind: MessagePlatformAdapter + +metadata: + name: wecombot-eba + label: + en_US: WeComBot (EBA) + zh_Hans: 企业微信智能机器人 (EBA) + zh_Hant: 企業微信智慧機器人 (EBA) + description: + en_US: WeCom AI Bot adapter with Event-Based Agents support + zh_Hans: 企业微信智能机器人适配器(EBA 架构版本),支持长连接和 Webhook 两种接入方式 + zh_Hant: 企業微信智慧機器人適配器(EBA 架構版本),支援長連線和 Webhook 兩種接入方式 + icon: wecombot.png + +spec: + categories: + - china + help_links: + zh: https://link.langbot.app/zh/platforms/wecombot + en: https://link.langbot.app/en/platforms/wecombot + ja: https://link.langbot.app/ja/platforms/wecombot + config: + - name: BotId + label: + en_US: BotId + zh_Hans: 机器人ID (BotId) + zh_Hant: 機器人ID (BotId) + type: string + required: true + default: "" + - name: robot_name + label: + en_US: Robot Name + zh_Hans: 机器人名称 + zh_Hant: 機器人名稱 + type: string + required: true + default: "" + - name: enable-webhook + label: + en_US: Enable Webhook Mode + zh_Hans: 启用 Webhook 模式 + zh_Hant: 啟用 Webhook 模式 + description: + en_US: If enabled, the bot will use webhook mode. Otherwise it uses WebSocket long connection mode and does not need a webhook URL. + zh_Hans: 如果启用,机器人将使用 Webhook 模式;否则使用 WebSocket 长连接模式,不需要配置 webhook URL。 + zh_Hant: 如果啟用,機器人將使用 Webhook 模式;否則使用 WebSocket 長連線模式,不需要設定 webhook URL。 + type: boolean + required: true + default: false + - name: webhook_url + label: + en_US: Webhook Callback URL + zh_Hans: Webhook 回调地址 + zh_Hant: Webhook 回調地址 + description: + en_US: Copy this URL into WeComBot callback settings only when webhook mode is enabled. + zh_Hans: 仅在启用 Webhook 模式时复制此地址到企业微信智能机器人回调配置中。 + zh_Hant: 僅在啟用 Webhook 模式時複製此地址到企業微信智慧機器人回調設定中。 + type: webhook-url + required: false + default: "" + show_if: + field: enable-webhook + operator: eq + value: true + - name: Secret + label: + en_US: Secret + zh_Hans: 机器人密钥 (Secret) + zh_Hant: 機器人密鑰 (Secret) + description: + en_US: Required for WebSocket long connection mode. + zh_Hans: 使用 WebSocket 长连接模式时必填。 + zh_Hant: 使用 WebSocket 長連線模式時必填。 + type: string + required: false + default: "" + - name: Corpid + label: + en_US: Corpid + zh_Hans: 企业ID + zh_Hant: 企業ID + description: + en_US: Required for webhook mode. + zh_Hans: 使用 Webhook 模式时必填。 + zh_Hant: 使用 Webhook 模式時必填。 + type: string + required: false + default: "" + - name: Token + label: + en_US: Token + zh_Hans: 令牌 (Token) + zh_Hant: 令牌 (Token) + description: + en_US: Required for webhook mode. + zh_Hans: 使用 Webhook 模式时必填。 + zh_Hant: 使用 Webhook 模式時必填。 + type: string + required: false + default: "" + - name: EncodingAESKey + label: + en_US: EncodingAESKey + zh_Hans: 消息加解密密钥 (EncodingAESKey) + zh_Hant: 訊息加解密密鑰 (EncodingAESKey) + description: + en_US: Required for webhook mode. Optional for WebSocket mode when encrypted files need to be decrypted. + zh_Hans: Webhook 模式必填。WebSocket 模式下如需解密文件则填写。 + zh_Hant: Webhook 模式必填。WebSocket 模式下如需解密檔案則填寫。 + type: string + required: false + default: "" + - name: enable-stream-reply + label: + en_US: Enable Stream Reply + zh_Hans: 启用流式回复 + zh_Hant: 啟用串流回覆 + description: + en_US: If enabled, the bot will use WeComBot streaming replies. + zh_Hans: 如果启用,机器人将使用企业微信智能机器人流式回复。 + zh_Hant: 如果啟用,機器人將使用企業微信智慧機器人串流回覆。 + type: boolean + required: false + default: true + + supported_events: + - message.received + - feedback.received + - platform.specific + + supported_apis: + required: + - send_message + - reply_message + optional: + - get_message + - get_user_info + - get_friend_list + - get_group_info + - get_group_member_info + - get_group_member_list + - call_platform_api + + platform_specific_apis: + - action: is_websocket_mode + description: { en_US: "Return whether the adapter is using WebSocket long connection mode", zh_Hans: "返回当前适配器是否使用 WebSocket 长连接模式" } + - action: get_stream_session_status + description: { en_US: "Inspect stream session state for a received message ID", zh_Hans: "按接收消息 ID 查看流式会话状态" } + - action: send_markdown + description: { en_US: "Send markdown text proactively in WebSocket mode", zh_Hans: "在 WebSocket 模式下主动发送 Markdown 文本" } + +execution: + python: + path: ./adapter.py + attr: WecomBotAdapter diff --git a/src/langbot/pkg/platform/adapters/wecombot/message_converter.py b/src/langbot/pkg/platform/adapters/wecombot/message_converter.py new file mode 100644 index 00000000..dae78b0a --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/message_converter.py @@ -0,0 +1,161 @@ +from __future__ import annotations + +import datetime + +from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +class WecomBotMessageConverter(abstract_platform_adapter.AbstractMessageConverter): + @staticmethod + async def yiri2target(message_chain: platform_message.MessageChain) -> str: + content_parts: list[str] = [] + for msg in message_chain: + if isinstance(msg, platform_message.Source): + continue + if isinstance(msg, platform_message.Plain): + content_parts.append(msg.text) + elif isinstance(msg, platform_message.At): + content_parts.append(f'@{msg.display or msg.target}') + elif isinstance(msg, platform_message.AtAll): + content_parts.append('@all') + elif isinstance(msg, platform_message.Image): + content_parts.append('[Image]') + elif isinstance(msg, platform_message.Voice): + content_parts.append('[Voice]') + elif isinstance(msg, platform_message.File): + content_parts.append(f'[File: {msg.name or msg.file_id or msg.url or "file"}]') + elif isinstance(msg, platform_message.Quote): + if msg.id is not None: + content_parts.append(f'[Quote {msg.id}]') + if msg.origin: + content_parts.append(await WecomBotMessageConverter.yiri2target(msg.origin)) + elif isinstance(msg, platform_message.Forward): + for node in msg.node_list: + if node.message_chain: + content_parts.append(await WecomBotMessageConverter.yiri2target(node.message_chain)) + else: + content_parts.append(str(msg)) + return '\n'.join(part for part in content_parts if part) + + @staticmethod + async def target2yiri(event: WecomBotEvent, bot_name: str = '') -> platform_message.MessageChain: + components: list[platform_message.MessageComponent] = [ + platform_message.Source(id=event.message_id, time=datetime.datetime.now()), + ] + if event.type == 'group' and event.ai_bot_id: + components.append(platform_message.At(target=event.ai_bot_id)) + + if event.content: + content = event.content + if bot_name: + content = content.replace(f'@{bot_name}', '').strip() + if content: + components.append(platform_message.Plain(text=content)) + + WecomBotMessageConverter._append_images(components, event.images or ([event.picurl] if event.picurl else [])) + WecomBotMessageConverter._append_file(components, event.file) + WecomBotMessageConverter._append_voice(components, event.voice) + WecomBotMessageConverter._append_video(components, event.video) + WecomBotMessageConverter._append_link(components, event.link) + WecomBotMessageConverter._append_quote(components, event.quote) + + if not any(not isinstance(component, (platform_message.Source, platform_message.At)) for component in components): + components.append(platform_message.Unknown(text=f'[unsupported wecombot msgtype: {event.msgtype or "unknown"}]')) + + return platform_message.MessageChain(components) + + @staticmethod + def _append_images(components: list[platform_message.MessageComponent], images: list[str]): + for image_data in images: + if image_data: + components.append(platform_message.Image(base64=image_data)) + + @staticmethod + def _append_file(components: list[platform_message.MessageComponent], file_info: dict | None): + if not file_info: + return + file_url = file_info.get('download_url') or file_info.get('url') or file_info.get('fileurl') or file_info.get('path') + file_base64 = file_info.get('base64') + file_name = file_info.get('filename') or file_info.get('name') + file_size = file_info.get('filesize') or file_info.get('size') + try: + kwargs = {} + if file_url: + kwargs['url'] = file_url + if file_base64: + kwargs['base64'] = file_base64 + if file_name: + kwargs['name'] = file_name + if file_size is not None: + kwargs['size'] = file_size + if kwargs: + components.append(platform_message.File(**kwargs)) + except Exception: + components.append(platform_message.Unknown(text='[file message unsupported]')) + + @staticmethod + def _append_voice(components: list[platform_message.MessageComponent], voice_info: dict | None): + if not voice_info: + return + voice_payload = voice_info.get('base64') or voice_info.get('url') + if not voice_payload: + return + if voice_info.get('base64') and not voice_payload.startswith('data:'): + voice_payload = f'data:audio/mpeg;base64,{voice_info.get("base64")}' + try: + if voice_payload.startswith('data:'): + components.append(platform_message.Voice(base64=voice_payload)) + else: + components.append(platform_message.Voice(url=voice_payload)) + except Exception: + components.append(platform_message.Unknown(text='[voice message unsupported]')) + + @staticmethod + def _append_video(components: list[platform_message.MessageComponent], video_info: dict | None): + if not video_info: + return + video_payload = ( + video_info.get('base64') + or video_info.get('url') + or video_info.get('download_url') + or video_info.get('fileurl') + ) + if not video_payload: + return + try: + components.append( + platform_message.File( + url=video_payload, + name=video_info.get('filename') or video_info.get('name') or 'video', + size=video_info.get('filesize') or video_info.get('size'), + ) + ) + except Exception: + components.append(platform_message.Unknown(text='[video message unsupported]')) + + @staticmethod + def _append_link(components: list[platform_message.MessageComponent], link: dict | None, prefix: str = ''): + if not link: + return + summary = '\n'.join( + filter(None, [link.get('title', ''), link.get('description') or link.get('digest', ''), link.get('url', '')]) + ) + if summary: + components.append(platform_message.Plain(text=f'{prefix}{summary}')) + + @staticmethod + def _append_quote(components: list[platform_message.MessageComponent], quote_info: dict | None): + if not quote_info: + return + origin: list[platform_message.MessageComponent] = [] + if quote_info.get('content'): + origin.append(platform_message.Plain(text=quote_info.get('content'))) + WecomBotMessageConverter._append_images(origin, quote_info.get('images') or ([quote_info.get('picurl')] if quote_info.get('picurl') else [])) + WecomBotMessageConverter._append_file(origin, quote_info.get('file')) + WecomBotMessageConverter._append_voice(origin, quote_info.get('voice')) + WecomBotMessageConverter._append_video(origin, quote_info.get('video')) + WecomBotMessageConverter._append_link(origin, quote_info.get('link')) + if origin: + components.append(platform_message.Quote(origin=platform_message.MessageChain(origin))) diff --git a/src/langbot/pkg/platform/adapters/wecombot/platform_api.py b/src/langbot/pkg/platform/adapters/wecombot/platform_api.py new file mode 100644 index 00000000..73f03cc1 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/platform_api.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import typing + + +async def is_websocket_mode(bot, params: dict) -> dict: + return {'websocket': hasattr(bot, 'bot_id') and not hasattr(bot, 'Token')} + + +async def get_stream_session_status(bot, params: dict) -> dict: + msg_id = str(params.get('message_id') or params.get('msg_id') or '') + if not msg_id: + raise ValueError('message_id is required') + if hasattr(bot, 'stream_sessions'): + stream_id = bot.stream_sessions.get_stream_id_by_msg(msg_id) + session = bot.stream_sessions.get_session(stream_id) if stream_id else None + return {'stream_id': stream_id, 'active': session is not None} + stream_key = getattr(bot, '_stream_ids', {}).get(msg_id) + if not stream_key: + return {'stream_id': None, 'active': False} + _req_id, _sep, stream_id = stream_key.partition('|') + return {'stream_id': stream_id, 'active': True} + + +async def send_markdown(bot, params: dict) -> dict: + chat_id = params.get('chat_id') or params.get('chatid') or params.get('target_id') + content = params.get('content') + if not chat_id: + raise ValueError('chat_id is required') + if not content: + raise ValueError('content is required') + if not hasattr(bot, 'send_message'): + raise ValueError('send_markdown is only available in WebSocket mode') + result = await bot.send_message(str(chat_id), str(content), msgtype='markdown') + return {'ok': True, 'raw': result} + + +PLATFORM_API_MAP: dict[str, typing.Callable[[typing.Any, dict], typing.Awaitable[dict]]] = { + 'is_websocket_mode': is_websocket_mode, + 'get_stream_session_status': get_stream_session_status, + 'send_markdown': send_markdown, +} diff --git a/src/langbot/pkg/platform/adapters/wecombot/types.py b/src/langbot/pkg/platform/adapters/wecombot/types.py new file mode 100644 index 00000000..8e0d98df --- /dev/null +++ b/src/langbot/pkg/platform/adapters/wecombot/types.py @@ -0,0 +1,3 @@ +from __future__ import annotations + +ADAPTER_NAME = 'wecombot-eba' diff --git a/src/langbot/pkg/platform/adapters/wecombot/wecombot.png b/src/langbot/pkg/platform/adapters/wecombot/wecombot.png new file mode 100644 index 00000000..0734efaf Binary files /dev/null and b/src/langbot/pkg/platform/adapters/wecombot/wecombot.png differ diff --git a/tests/e2e/live_wecom_eba_probe.py b/tests/e2e/live_wecom_eba_probe.py new file mode 100644 index 00000000..2801e1fc --- /dev/null +++ b/tests/e2e/live_wecom_eba_probe.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import argparse +import asyncio +import json +import os +from pathlib import Path +from typing import Any + +from quart import Quart, request + +from langbot.pkg.platform.adapters.wecom.adapter import WecomAdapter +from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +TINY_PNG = ( + 'data:image/png;base64,' + 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII=' +) + + +class ProbeLogger(AbstractEventLogger): + async def info(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[info] {text}') + + async def debug(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[debug] {text}') + + async def warning(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[warning] {text}') + + async def error(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[error] {text}') + + +def redact(value: Any) -> Any: + if isinstance(value, dict): + redacted = {} + for key, item in value.items(): + if key.lower() in {'secret', 'token', 'encodingaeskey', 'access_token'}: + redacted[key] = '' + else: + redacted[key] = redact(item) + return redacted + if isinstance(value, list): + return [redact(item) for item in value] + return value + + +def summarize_event(event: platform_events.EBAEvent) -> dict: + data = { + 'type': event.type, + 'adapter_name': event.adapter_name, + 'timestamp': event.timestamp, + } + for field in ('message_id', 'chat_id', 'chat_type', 'action', 'data'): + if hasattr(event, field): + value = getattr(event, field) + if hasattr(value, 'value'): + value = value.value + data[field] = redact(value) + if hasattr(event, 'sender') and event.sender is not None: + data['sender'] = event.sender.model_dump() + if hasattr(event, 'message_chain') and event.message_chain is not None: + data['message_chain'] = event.message_chain.model_dump() + return data + + +def record_api(results: list[dict[str, Any]], name: str, ok: bool, result: Any = None, error: Exception | None = None): + entry = {'name': name, 'ok': ok} + if result is not None: + entry['result'] = redact(result) + if error is not None: + entry['error'] = repr(error) + results.append(entry) + print('WECOM_EBA_API', json.dumps(entry, ensure_ascii=False, default=str)) + + +async def run_api(results: list[dict[str, Any]], name: str, func): + try: + result = await func() + record_api(results, name, True, result) + return result + except Exception as exc: + record_api(results, name, False, error=exc) + return None + + +def config_from_env() -> dict: + required = { + 'corpid': os.getenv('WECOM_CORPID', ''), + 'secret': os.getenv('WECOM_SECRET', ''), + 'token': os.getenv('WECOM_TOKEN', ''), + 'EncodingAESKey': os.getenv('WECOM_ENCODING_AES_KEY', ''), + } + missing = [key for key, value in required.items() if not value] + if missing: + raise RuntimeError(f'Missing required WeCom env vars for fields: {missing}') + return { + **required, + 'contacts_secret': os.getenv('WECOM_CONTACTS_SECRET', ''), + 'api_base_url': os.getenv('WECOM_API_BASE_URL', 'https://qyapi.weixin.qq.com/cgi-bin'), + } + + +async def run_probe(args: argparse.Namespace): + adapter = WecomAdapter(config_from_env(), ProbeLogger()) + events: list[platform_events.EBAEvent] = [] + api_results: list[dict[str, Any]] = [] + first_message = asyncio.Event() + log_path = Path(args.log) + log_path.parent.mkdir(parents=True, exist_ok=True) + + async def listener(event, adapter): + events.append(event) + with log_path.open('a', encoding='utf-8') as fp: + fp.write(json.dumps(summarize_event(event), ensure_ascii=False, default=str) + '\n') + print('WECOM_EBA_EVENT', json.dumps(summarize_event(event), ensure_ascii=False, default=str)) + if isinstance(event, platform_events.MessageReceivedEvent): + first_message.set() + + adapter.register_listener(platform_events.EBAEvent, listener) + + app = Quart(__name__) + + @app.route(args.path, methods=['GET', 'POST']) + async def callback(): + return await adapter.handle_unified_webhook(args.bot_uuid, '', request) + + server_task = asyncio.create_task(app.run_task(host=args.host, port=args.port)) + try: + print(f'READY: configure WeCom callback URL to http://{args.host}:{args.port}{args.path}') + print('READY: send a real WeCom application message to the bot now.') + await asyncio.wait_for(first_message.wait(), timeout=args.timeout) + + source = next(event for event in events if isinstance(event, platform_events.MessageReceivedEvent)) + raw_event = source.source_platform_object + target_id = f'{source.chat_id}|{raw_event.agent_id}' + + if not args.skip_api: + await run_api( + api_results, + 'reply_message:text', + lambda: adapter.reply_message( + source, + platform_message.MessageChain([platform_message.Plain(text='WeCom EBA probe reply')]), + ), + ) + await run_api( + api_results, + 'send_message:text', + lambda: adapter.send_message( + 'person', + target_id, + platform_message.MessageChain([platform_message.Plain(text='WeCom EBA probe send')]), + ), + ) + await run_api( + api_results, + 'send_message:image', + lambda: adapter.send_message( + 'person', + target_id, + platform_message.MessageChain( + [ + platform_message.Plain(text='WeCom EBA probe image'), + platform_message.Image(base64=TINY_PNG), + ] + ), + ), + ) + await run_api( + api_results, + 'get_message', + lambda: adapter.get_message('private', source.chat_id, source.message_id), + ) + await run_api(api_results, 'get_user_info', lambda: adapter.get_user_info(source.sender.id)) + await run_api(api_results, 'get_friend_list', lambda: adapter.get_friend_list()) + await run_api(api_results, 'call_platform_api:check_access_token', lambda: adapter.call_platform_api('check_access_token', {})) + await run_api( + api_results, + 'call_platform_api:get_user_info', + lambda: adapter.call_platform_api('get_user_info', {'user_id': source.sender.id}), + ) + + summary = { + 'events': [event.type for event in events], + 'api_results': api_results, + 'log_path': str(log_path), + } + print('WECOM_EBA_SUMMARY', json.dumps(summary, ensure_ascii=False, default=str)) + return summary + finally: + server_task.cancel() + await adapter.kill() + + +def main(): + parser = argparse.ArgumentParser(description='Live WeCom EBA adapter probe.') + parser.add_argument('--host', default='0.0.0.0') + parser.add_argument('--port', type=int, default=5312) + parser.add_argument('--path', default='/wecom/callback') + parser.add_argument('--timeout', type=int, default=180) + parser.add_argument('--bot-uuid', default='wecom-eba-live-probe') + parser.add_argument('--log', default='data/temp/wecom_eba_live_probe.jsonl') + parser.add_argument('--skip-api', action='store_true') + args = parser.parse_args() + asyncio.run(run_probe(args)) + + +if __name__ == '__main__': + main() diff --git a/tests/e2e/live_wecombot_eba_probe.py b/tests/e2e/live_wecombot_eba_probe.py new file mode 100644 index 00000000..4818dba5 --- /dev/null +++ b/tests/e2e/live_wecombot_eba_probe.py @@ -0,0 +1,203 @@ +from __future__ import annotations + +import argparse +import asyncio +import json +import os +from pathlib import Path +from typing import Any + +from quart import Quart, request + +from langbot.pkg.platform.adapters.wecombot.adapter import WecomBotAdapter +from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +class ProbeLogger(AbstractEventLogger): + async def info(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[info] {text}') + + async def debug(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[debug] {text}') + + async def warning(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[warning] {text}') + + async def error(self, text, images=None, message_session_id=None, no_throw=True): + print(f'[error] {text}') + + +def redact(value: Any) -> Any: + if isinstance(value, dict): + return { + key: '' if key.lower() in {'secret', 'token', 'encodingaeskey', 'encrypt', 'aeskey'} else redact(item) + for key, item in value.items() + } + if isinstance(value, list): + return [redact(item) for item in value] + return value + + +def summarize_event(event: platform_events.EBAEvent) -> dict: + data = { + 'type': event.type, + 'adapter_name': event.adapter_name, + 'timestamp': event.timestamp, + } + for field in ('message_id', 'chat_id', 'chat_type', 'action', 'data', 'feedback_id', 'feedback_type'): + if hasattr(event, field): + value = getattr(event, field) + if hasattr(value, 'value'): + value = value.value + data[field] = redact(value) + if hasattr(event, 'sender') and event.sender is not None: + data['sender'] = event.sender.model_dump() + if hasattr(event, 'group') and event.group is not None: + data['group'] = event.group.model_dump() + if hasattr(event, 'message_chain') and event.message_chain is not None: + data['message_chain'] = event.message_chain.model_dump() + return data + + +def record_api(results: list[dict[str, Any]], name: str, ok: bool, result: Any = None, error: Exception | None = None): + entry = {'name': name, 'ok': ok} + if result is not None: + entry['result'] = redact(result) + if error is not None: + entry['error'] = repr(error) + results.append(entry) + print('WECOMBOT_EBA_API', json.dumps(entry, ensure_ascii=False, default=str)) + + +async def run_api(results: list[dict[str, Any]], name: str, func): + try: + result = await func() + record_api(results, name, True, result) + return result + except Exception as exc: + record_api(results, name, False, error=exc) + return None + + +def config_from_env(enable_webhook: bool) -> dict: + config = { + 'BotId': os.getenv('WECOMBOT_BOT_ID', ''), + 'robot_name': os.getenv('WECOMBOT_ROBOT_NAME', ''), + 'enable-webhook': enable_webhook, + 'Secret': os.getenv('WECOMBOT_SECRET', ''), + 'Token': os.getenv('WECOMBOT_TOKEN', ''), + 'EncodingAESKey': os.getenv('WECOMBOT_ENCODING_AES_KEY', ''), + 'Corpid': os.getenv('WECOMBOT_CORPID', ''), + 'enable-stream-reply': os.getenv('WECOMBOT_ENABLE_STREAM_REPLY', '1') != '0', + } + required = ['BotId', 'Secret'] if not enable_webhook else ['Token', 'EncodingAESKey', 'Corpid'] + missing = [key for key in required if not config.get(key)] + if missing: + raise RuntimeError(f'Missing required WeComBot env vars for fields: {missing}') + return config + + +async def run_probe(args: argparse.Namespace): + adapter = WecomBotAdapter(config_from_env(args.webhook), ProbeLogger()) + events: list[platform_events.EBAEvent] = [] + api_results: list[dict[str, Any]] = [] + first_message = asyncio.Event() + log_path = Path(args.log) + log_path.parent.mkdir(parents=True, exist_ok=True) + + async def listener(event, adapter): + events.append(event) + with log_path.open('a', encoding='utf-8') as fp: + fp.write(json.dumps(summarize_event(event), ensure_ascii=False, default=str) + '\n') + print('WECOMBOT_EBA_EVENT', json.dumps(summarize_event(event), ensure_ascii=False, default=str)) + if isinstance(event, platform_events.MessageReceivedEvent): + first_message.set() + + adapter.register_listener(platform_events.EBAEvent, listener) + + run_task = None + server_task = None + if args.webhook: + app = Quart(__name__) + + @app.route(args.path, methods=['GET', 'POST']) + async def callback(): + return await adapter.handle_unified_webhook(args.bot_uuid, '', request) + + server_task = asyncio.create_task(app.run_task(host=args.host, port=args.port)) + print(f'READY: configure WeComBot callback URL to http://{args.host}:{args.port}{args.path}') + else: + run_task = asyncio.create_task(adapter.run_async()) + print('READY: WeComBot WebSocket long connection started; no webhook URL is required.') + + try: + print('READY: send a real WeComBot message to the bot now.') + await asyncio.wait_for(first_message.wait(), timeout=args.timeout) + + source = next(event for event in events if isinstance(event, platform_events.MessageReceivedEvent)) + + if not args.skip_api: + await run_api( + api_results, + 'reply_message:text', + lambda: adapter.reply_message( + source, + platform_message.MessageChain([platform_message.Plain(text='WeComBot EBA probe reply')]), + ), + ) + if not args.webhook: + await run_api( + api_results, + 'send_message:text', + lambda: adapter.send_message( + 'group' if source.chat_type.value == 'group' else 'person', + source.chat_id, + platform_message.MessageChain([platform_message.Plain(text='WeComBot EBA probe send')]), + ), + ) + await run_api(api_results, 'get_message', lambda: adapter.get_message(source.chat_type.value, source.chat_id, source.message_id)) + await run_api(api_results, 'get_user_info', lambda: adapter.get_user_info(source.sender.id)) + if source.group: + await run_api(api_results, 'get_group_info', lambda: adapter.get_group_info(source.group.id)) + await run_api(api_results, 'get_group_member_list', lambda: adapter.get_group_member_list(source.group.id)) + await run_api(api_results, 'call_platform_api:is_websocket_mode', lambda: adapter.call_platform_api('is_websocket_mode', {})) + await run_api( + api_results, + 'call_platform_api:get_stream_session_status', + lambda: adapter.call_platform_api('get_stream_session_status', {'message_id': source.message_id}), + ) + + summary = { + 'events': [event.type for event in events], + 'api_results': api_results, + 'log_path': str(log_path), + 'mode': 'webhook' if args.webhook else 'websocket', + } + print('WECOMBOT_EBA_SUMMARY', json.dumps(summary, ensure_ascii=False, default=str)) + return summary + finally: + if server_task: + server_task.cancel() + if run_task: + run_task.cancel() + await adapter.kill() + + +def main(): + parser = argparse.ArgumentParser(description='Live WeComBot EBA adapter probe.') + parser.add_argument('--webhook', action='store_true', help='Use webhook mode. Default is WebSocket long connection mode.') + parser.add_argument('--host', default='0.0.0.0') + parser.add_argument('--port', type=int, default=5313) + parser.add_argument('--path', default='/wecombot/callback') + parser.add_argument('--timeout', type=int, default=180) + parser.add_argument('--bot-uuid', default='wecombot-eba-live-probe') + parser.add_argument('--log', default='data/temp/wecombot_eba_live_probe.jsonl') + parser.add_argument('--skip-api', action='store_true') + args = parser.parse_args() + asyncio.run(run_probe(args)) + + +if __name__ == '__main__': + main() diff --git a/tests/unit_tests/pipeline/test_preproc_media_fallback.py b/tests/unit_tests/pipeline/test_preproc_media_fallback.py new file mode 100644 index 00000000..91d86d6b --- /dev/null +++ b/tests/unit_tests/pipeline/test_preproc_media_fallback.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from datetime import datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +def _conversation(): + prompt = Mock() + prompt.messages = [] + prompt.copy = Mock(return_value=Mock(messages=[])) + + return SimpleNamespace( + uuid='conversation-uuid', + create_time=datetime.now(), + update_time=datetime.now(), + prompt=prompt, + messages=[], + ) + + +def _prompt_preprocessing_context(): + ctx = Mock() + ctx.event.default_prompt = [] + ctx.event.prompt = [] + return ctx + + +@pytest.mark.asyncio +async def test_preprocessor_keeps_image_placeholder_for_text_only_local_agent(mock_app, sample_query): + model = Mock() + model.model_entity.uuid = 'text-only-model' + model.model_entity.abilities = [] + + mock_app.model_mgr.get_model_by_uuid = AsyncMock(return_value=model) + mock_app.sess_mgr.get_session = AsyncMock( + return_value=SimpleNamespace(launcher_type=sample_query.launcher_type, launcher_id=sample_query.launcher_id) + ) + mock_app.sess_mgr.get_conversation = AsyncMock(return_value=_conversation()) + mock_app.plugin_connector.emit_event = AsyncMock(return_value=_prompt_preprocessing_context()) + + sample_query.pipeline_config = { + 'ai': { + 'runner': {'runner': 'local-agent'}, + 'local-agent': {'model': {'primary': 'text-only-model', 'fallbacks': []}, 'prompt': []}, + }, + 'trigger': {'misc': {'combine-quote-message': False}}, + 'output': {'misc': {'exception-handling': 'show-hint'}}, + } + sample_query.message_chain = platform_message.MessageChain( + [platform_message.Image(base64='data:image/png;base64,AAAA')] + ) + sample_query.messages = [] + sample_query.variables = {} + + from importlib import import_module + + import_module('langbot.pkg.pipeline.pipelinemgr') + preproc_module = import_module('langbot.pkg.pipeline.preproc.preproc') + result = await preproc_module.PreProcessor(mock_app).process(sample_query, 'PreProcessor') + content = result.new_query.user_message.content + + assert len(content) == 1 + assert content[0].type == 'text' + assert content[0].text == '[Image]' + assert result.new_query.variables['user_message_text'] == '[Image]' diff --git a/tests/unit_tests/platform/test_wecom_eba_adapter.py b/tests/unit_tests/platform/test_wecom_eba_adapter.py new file mode 100644 index 00000000..9563c39a --- /dev/null +++ b/tests/unit_tests/platform/test_wecom_eba_adapter.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import pathlib +from unittest.mock import AsyncMock, patch + +import pytest +import yaml + +from langbot.libs.wecom_api.api import WecomClient +from langbot.libs.wecom_api.wecomevent import WecomEvent +from langbot.pkg.platform.adapters.wecom.adapter import WecomAdapter +from langbot.pkg.platform.adapters.wecom.event_converter import WecomEventConverter +from langbot.pkg.platform.adapters.wecom.message_converter import WecomMessageConverter, split_string_by_bytes +from langbot.pkg.platform.adapters.wecom.platform_api import PLATFORM_API_MAP +from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +class DummyLogger(AbstractEventLogger): + async def info(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def debug(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def warning(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def error(self, text, images=None, message_session_id=None, no_throw=True): + pass + + +class DummyWecomClient(WecomClient): + def __init__(self, *args, **kwargs): + self.corpid = kwargs['corpid'] + self.secret = kwargs['secret'] + self.token = kwargs['token'] + self.aes = kwargs['EncodingAESKey'] + self.secret_for_contacts = kwargs.get('contacts_secret', '') + self.base_url = kwargs.get('api_base_url', 'https://qyapi.weixin.qq.com/cgi-bin') + self.logger = kwargs.get('logger') + self.access_token = '' + self._message_handlers = {} + self.get_media_id = AsyncMock(return_value='media-id') + self.send_private_msg = AsyncMock() + self.send_image = AsyncMock() + self.send_voice = AsyncMock() + self.send_file = AsyncMock() + self.get_user_info = AsyncMock(return_value={'userid': 'user-1', 'name': 'Alice', 'alias': 'alice'}) + self.check_access_token = AsyncMock(return_value=True) + self.get_access_token = AsyncMock(return_value='access-token') + self.send_to_all = AsyncMock() + self.handle_unified_webhook = AsyncMock(return_value='success') + + def on_message(self, msg_type: str): + def decorator(func): + self._message_handlers.setdefault(msg_type, []).append(func) + return func + + return decorator + + +def manifest() -> dict: + manifest_path = ( + pathlib.Path(__file__).parents[3] + / 'src' + / 'langbot' + / 'pkg' + / 'platform' + / 'adapters' + / 'wecom' + / 'manifest.yaml' + ) + return yaml.safe_load(manifest_path.read_text()) + + +def make_adapter() -> WecomAdapter: + config = { + 'corpid': 'corp-id', + 'secret': 'secret', + 'token': 'token', + 'EncodingAESKey': 'encoding-key', + 'contacts_secret': 'contacts-secret', + 'api_base_url': 'https://qyapi.weixin.qq.com/cgi-bin', + } + with patch('langbot.pkg.platform.adapters.wecom.adapter.WecomClient', DummyWecomClient): + return WecomAdapter(config, DummyLogger()) + + +def wecom_event(**overrides) -> WecomEvent: + payload = { + 'ToUserName': overrides.get('to_user', 'corp-id'), + 'FromUserName': overrides.get('from_user', 'user-1'), + 'CreateTime': overrides.get('create_time', 1_714_000_000), + 'MsgType': overrides.get('msg_type', 'text'), + 'Content': overrides.get('content', 'hello'), + 'MsgId': overrides.get('message_id', 12345), + 'AgentID': overrides.get('agent_id', 1000002), + } + if payload['MsgType'] == 'image': + payload['MediaId'] = overrides.get('media_id', 'media-id') + payload['PicUrl'] = overrides.get('picurl', 'https://example.test/a.png') + return WecomEvent.from_payload(payload) + + +def test_wecom_supported_events_match_manifest(): + assert make_adapter().get_supported_events() == manifest()['spec']['supported_events'] + + +def test_wecom_supported_apis_match_manifest(): + supported_apis = make_adapter().get_supported_apis() + manifest_apis = manifest()['spec']['supported_apis'] + + assert supported_apis == manifest_apis['required'] + manifest_apis['optional'] + + +def test_wecom_platform_api_map_matches_manifest(): + manifest_actions = {item['action'] for item in manifest()['spec']['platform_specific_apis']} + + assert set(PLATFORM_API_MAP) == manifest_actions + + +def test_wecom_split_string_by_bytes_keeps_multibyte_boundaries(): + parts = split_string_by_bytes('你好hello', limit=7) + + assert ''.join(parts) == '你好hello' + assert all(len(part.encode('utf-8')) <= 7 for part in parts) + + +@pytest.mark.asyncio +async def test_wecom_message_converter_maps_outbound_components(): + adapter = make_adapter() + content = await WecomMessageConverter.yiri2target( + platform_message.MessageChain( + [ + platform_message.Plain(text='hi'), + platform_message.Image(base64='data:image/png;base64,AAAA'), + platform_message.Voice(base64='data:audio/mp3;base64,BBBB'), + platform_message.File(name='doc.txt', base64='Q0NDQw=='), + platform_message.Quote( + id='origin', + origin=platform_message.MessageChain([platform_message.Plain(text='quoted')]), + ), + ] + ), + adapter.bot, + ) + + assert content[0] == {'type': 'text', 'content': 'hi'} + assert {'type': 'image', 'media_id': 'media-id'} in content + assert {'type': 'voice', 'media_id': 'media-id'} in content + assert {'type': 'file', 'media_id': 'media-id'} in content + assert {'type': 'text', 'content': '[Quote origin] '} in content + assert {'type': 'text', 'content': 'quoted'} in content + + +@pytest.mark.asyncio +async def test_wecom_event_converter_maps_text_message_to_eba_and_legacy(): + adapter = make_adapter() + event = await WecomEventConverter.target2yiri(wecom_event(), adapter.bot) + + assert isinstance(event, platform_events.MessageReceivedEvent) + assert event.adapter_name == 'wecom-eba' + assert event.chat_type == platform_entities.ChatType.PRIVATE + assert event.chat_id == 'user-1|1000002' + assert event.sender.nickname == 'Alice' + assert str(event.message_chain) == 'hello' + + legacy = await WecomEventConverter.target2legacy(wecom_event(), adapter.bot) + assert isinstance(legacy, platform_events.FriendMessage) + assert legacy.sender.id == 'user-1' + assert str(legacy.message_chain) == 'hello' + + +@pytest.mark.asyncio +async def test_wecom_event_converter_maps_image_message_to_eba(): + adapter = make_adapter() + + with patch( + 'langbot.pkg.platform.adapters.wecom.message_converter.image.get_wecom_image_base64', + AsyncMock(return_value=('AAAA', 'png')), + ): + event = await WecomEventConverter.target2yiri( + wecom_event(msg_type='image', content=None, picurl='https://example.test/a.png'), + adapter.bot, + ) + + assert isinstance(event, platform_events.MessageReceivedEvent) + assert event.adapter_name == 'wecom-eba' + assert event.message_id == 12345 + assert isinstance(event.message_chain[1], platform_message.Image) + assert event.message_chain[1].base64 == 'data:image/png;base64,AAAA' + + +@pytest.mark.asyncio +async def test_wecom_adapter_dispatches_and_caches_message_event(): + adapter = make_adapter() + calls: list[platform_events.Event] = [] + + async def listener(event, adapter): + calls.append(event) + + adapter.register_listener(platform_events.MessageReceivedEvent, listener) + await adapter._handle_native_event(wecom_event()) + + assert len(calls) == 1 + received = calls[0] + assert isinstance(received, platform_events.MessageReceivedEvent) + assert adapter.bot_account_id == 'corp-id' + assert received.chat_id == 'user-1|1000002' + assert await adapter.get_message('private', 'user-1|1000002', 12345) == received + assert (await adapter.get_user_info('user-1')).nickname == 'Alice' + + +@pytest.mark.asyncio +async def test_wecom_send_reply_and_platform_api_use_underlying_client(): + adapter = make_adapter() + message = platform_message.MessageChain([platform_message.Plain(text='hello')]) + + await adapter.send_message('person', 'user-1|1000002', message) + adapter.bot.send_private_msg.assert_awaited_once_with('user-1', 1000002, 'hello') + + source_event = await WecomEventConverter.target2yiri(wecom_event(), adapter.bot) + await adapter.reply_message(source_event, message) + assert adapter.bot.send_private_msg.await_count == 2 + + token_status = await adapter.call_platform_api('check_access_token', {}) + user_info = await adapter.call_platform_api('get_user_info', {'user_id': 'user-1'}) + sent_all = await adapter.call_platform_api('send_to_all', {'content': 'notice', 'agent_id': 1000002}) + + assert token_status == {'valid': True} + assert user_info['name'] == 'Alice' + assert sent_all == {'ok': True} diff --git a/tests/unit_tests/platform/test_wecombot_eba_adapter.py b/tests/unit_tests/platform/test_wecombot_eba_adapter.py new file mode 100644 index 00000000..e284abea --- /dev/null +++ b/tests/unit_tests/platform/test_wecombot_eba_adapter.py @@ -0,0 +1,274 @@ +from __future__ import annotations + +import pathlib +from unittest.mock import AsyncMock, patch + +import pytest +import yaml + +from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent +from langbot.pkg.platform.adapters.wecombot.adapter import WecomBotAdapter +from langbot.pkg.platform.adapters.wecombot.event_converter import WecomBotEventConverter +from langbot.pkg.platform.adapters.wecombot.message_converter import WecomBotMessageConverter +from langbot.pkg.platform.adapters.wecombot.platform_api import PLATFORM_API_MAP +from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class DummyLogger(AbstractEventLogger): + async def info(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def debug(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def warning(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def error(self, text, images=None, message_session_id=None, no_throw=True): + pass + + +class DummyWecomBotWsClient: + def __init__(self, *args, **kwargs): + self.bot_id = kwargs['bot_id'] + self.secret = kwargs['secret'] + self.encoding_aes_key = kwargs.get('encoding_aes_key', '') + self._message_handlers = {} + self.connect = AsyncMock() + self.disconnect = AsyncMock() + self.send_message = AsyncMock(return_value={'ok': True}) + self.reply_text = AsyncMock(return_value={'reply': True}) + self.push_stream_chunk = AsyncMock(return_value=True) + self.set_message = AsyncMock(return_value={'set': True}) + + def on_message(self, msg_type: str): + def decorator(func): + self._message_handlers.setdefault(msg_type, []).append(func) + return func + + return decorator + + def on_feedback(self): + def decorator(func): + self._message_handlers.setdefault('feedback', []).append(func) + return func + + return decorator + + +class DummyWecomBotClient(DummyWecomBotWsClient): + def __init__(self, *args, **kwargs): + self.Token = kwargs['Token'] + self.EnCodingAESKey = kwargs['EnCodingAESKey'] + self.Corpid = kwargs['Corpid'] + self._message_handlers = {} + self.handle_unified_webhook = AsyncMock(return_value='success') + self.push_stream_chunk = AsyncMock(return_value=True) + self.set_message = AsyncMock(return_value={'set': True}) + + +def manifest() -> dict: + manifest_path = ( + pathlib.Path(__file__).parents[3] + / 'src' + / 'langbot' + / 'pkg' + / 'platform' + / 'adapters' + / 'wecombot' + / 'manifest.yaml' + ) + return yaml.safe_load(manifest_path.read_text()) + + +def make_adapter(enable_webhook: bool = False) -> WecomBotAdapter: + config = { + 'BotId': 'bot-id', + 'robot_name': 'EBA Bot', + 'enable-webhook': enable_webhook, + 'Secret': 'secret', + 'Token': 'token', + 'EncodingAESKey': 'encoding-key', + 'Corpid': 'corp-id', + 'enable-stream-reply': True, + } + with ( + patch('langbot.pkg.platform.adapters.wecombot.adapter.WecomBotWsClient', DummyWecomBotWsClient), + patch('langbot.pkg.platform.adapters.wecombot.adapter.WecomBotClient', DummyWecomBotClient), + ): + return WecomBotAdapter(config, DummyLogger()) + + +def wecombot_event(**overrides) -> WecomBotEvent: + event_type = overrides.get('type', 'single') + payload = { + 'type': event_type, + 'msgtype': overrides.get('msgtype', 'text'), + 'msgid': overrides.get('message_id', 'msg-1'), + 'userid': overrides.get('userid', 'user-1'), + 'username': overrides.get('username', 'Alice'), + 'content': overrides.get('content', 'hello'), + 'aibotid': overrides.get('aibotid', 'bot-id'), + 'req_id': overrides.get('req_id', 'req-1'), + 'stream_id': overrides.get('stream_id', 'stream-1'), + } + if event_type == 'group': + payload.update({'chatid': overrides.get('chatid', 'group-1'), 'chatname': overrides.get('chatname', 'Group')}) + if payload['msgtype'] == 'image': + payload['images'] = overrides.get('images', ['data:image/png;base64,AAAA']) + payload['content'] = overrides.get('content', '') + if payload['msgtype'] == 'file': + payload['file'] = overrides.get('file', {'download_url': 'https://example.test/a.txt', 'filename': 'a.txt'}) + payload['content'] = overrides.get('content', '') + if payload['msgtype'] == 'voice': + payload['voice'] = overrides.get('voice', {'base64': 'BBBB'}) + payload['content'] = overrides.get('content', '') + if 'quote' in overrides: + payload['quote'] = overrides['quote'] + return WecomBotEvent(payload) + + +def test_wecombot_supported_events_match_manifest(): + assert make_adapter().get_supported_events() == manifest()['spec']['supported_events'] + + +def test_wecombot_supported_apis_match_manifest(): + supported_apis = make_adapter().get_supported_apis() + manifest_apis = manifest()['spec']['supported_apis'] + + assert supported_apis == manifest_apis['required'] + manifest_apis['optional'] + + +def test_wecombot_platform_api_map_matches_manifest(): + manifest_actions = {item['action'] for item in manifest()['spec']['platform_specific_apis']} + + assert set(PLATFORM_API_MAP) == manifest_actions + + +@pytest.mark.asyncio +async def test_wecombot_message_converter_maps_outbound_components_to_markdown_text(): + content = await WecomBotMessageConverter.yiri2target( + platform_message.MessageChain( + [ + platform_message.Plain(text='hi'), + platform_message.At(target='user-1', display='Alice'), + platform_message.Image(base64='data:image/png;base64,AAAA'), + platform_message.File(name='a.txt', url='https://example.test/a.txt'), + platform_message.Quote( + id='origin', + origin=platform_message.MessageChain([platform_message.Plain(text='quoted')]), + ), + ] + ) + ) + + assert 'hi' in content + assert '@Alice' in content + assert '[Image]' in content + assert '[File: a.txt]' in content + assert '[Quote origin]' in content + assert 'quoted' in content + + +@pytest.mark.asyncio +async def test_wecombot_event_converter_maps_private_and_group_messages_to_eba(): + private_event = await WecomBotEventConverter(bot_name='EBA Bot').target2yiri( + wecombot_event(content='@EBA Bot hello') + ) + group_event = await WecomBotEventConverter(bot_name='EBA Bot').target2yiri( + wecombot_event(type='group', content='@EBA Bot group hello') + ) + + assert isinstance(private_event, platform_events.MessageReceivedEvent) + assert private_event.adapter_name == 'wecombot-eba' + assert private_event.chat_type == platform_entities.ChatType.PRIVATE + assert private_event.chat_id == 'user-1' + assert str(private_event.message_chain) == 'hello' + + assert isinstance(group_event, platform_events.MessageReceivedEvent) + assert group_event.chat_type == platform_entities.ChatType.GROUP + assert group_event.chat_id == 'group-1' + assert group_event.group.name == 'Group' + assert isinstance(group_event.message_chain[1], platform_message.At) + + +@pytest.mark.asyncio +async def test_wecombot_event_converter_maps_media_and_quote_components(): + event = await WecomBotEventConverter().target2yiri( + wecombot_event( + msgtype='image', + quote={ + 'content': 'quoted', + 'file': {'download_url': 'https://example.test/q.txt', 'filename': 'q.txt'}, + }, + ) + ) + + assert isinstance(event, platform_events.MessageReceivedEvent) + assert any(isinstance(component, platform_message.Image) for component in event.message_chain) + quote = next(component for component in event.message_chain if isinstance(component, platform_message.Quote)) + assert any(isinstance(component, platform_message.File) for component in quote.origin) + + +@pytest.mark.asyncio +async def test_wecombot_adapter_dispatches_eba_and_legacy_and_caches_message_event(): + adapter = make_adapter() + eba_calls: list[platform_events.Event] = [] + legacy_calls: list[platform_events.Event] = [] + + async def eba_listener(event, adapter): + eba_calls.append(event) + + async def legacy_listener(event, adapter): + legacy_calls.append(event) + + adapter.register_listener(platform_events.MessageReceivedEvent, eba_listener) + adapter.register_listener(platform_events.FriendMessage, legacy_listener) + await adapter._handle_native_event(wecombot_event()) + + assert len(eba_calls) == 1 + assert len(legacy_calls) == 1 + received = eba_calls[0] + assert isinstance(received, platform_events.MessageReceivedEvent) + assert await adapter.get_message('private', 'user-1', 'msg-1') == received + assert (await adapter.get_user_info('user-1')).nickname == 'Alice' + + +@pytest.mark.asyncio +async def test_wecombot_send_reply_feedback_and_platform_api_use_underlying_client(): + adapter = make_adapter() + message = platform_message.MessageChain([platform_message.Plain(text='hello')]) + + await adapter.send_message('person', 'user-1', message) + adapter.bot.send_message.assert_awaited_once_with('user-1', 'hello') + + source_event = await WecomBotEventConverter().target2yiri(wecombot_event()) + await adapter.reply_message(source_event, message) + adapter.bot.reply_text.assert_awaited_once_with('req-1', 'hello') + + await adapter.reply_message_chunk(source_event, None, message, is_final=True) + adapter.bot.push_stream_chunk.assert_awaited_once_with('msg-1', 'hello', is_final=True) + + platform_status = await adapter.call_platform_api('is_websocket_mode', {}) + assert platform_status == {'websocket': True} + + feedback_calls: list[platform_events.Event] = [] + + async def feedback_listener(event, adapter): + feedback_calls.append(event) + + adapter.register_listener(platform_events.FeedbackReceivedEvent, feedback_listener) + await adapter._handle_feedback(feedback_id='fb-1', feedback_type=1, inaccurate_reasons=[1, 2], session=None) + assert isinstance(feedback_calls[0], platform_events.FeedbackReceivedEvent) + assert feedback_calls[0].inaccurate_reasons == ['1', '2'] + + +@pytest.mark.asyncio +async def test_wecombot_webhook_mode_rejects_proactive_send(): + adapter = make_adapter(enable_webhook=True) + with pytest.raises(NotSupportedError): + await adapter.send_message('person', 'user-1', platform_message.MessageChain([platform_message.Plain(text='hi')]))