feat(platform): add wecom eba adapters

This commit is contained in:
WangCham
2026-05-27 10:52:17 +08:00
parent 197e117900
commit 7328881e6f
31 changed files with 2842 additions and 17 deletions

View File

@@ -20,6 +20,8 @@ Current acceptance report: [EBA Adapter Acceptance Report](./acceptance-report.m
| OneBot v11 / aiocqhttp | Migrated; Matcha UI plus protocol-level multi-component coverage | [OneBot v11 / aiocqhttp](./aiocqhttp.md) |
| DingTalk | Migrated; partial plugin E2E, real UI inbound image/file verified; group gap remains | [DingTalk](./dingtalk.md) |
| Lark / Feishu | Migrated; partial live text E2E, media-inbound gap remains | [Lark / Feishu](./lark.md) |
| WeCom | Migrated; private text plugin E2E verified, media/group gaps remain | [WeCom](./wecom.md) |
| WeComBot | Migrated; private text and outbound/API plugin E2E verified, feedback/group gaps remain | [WeComBot](./wecombot.md) |
## Documentation Checklist

View File

@@ -9,6 +9,8 @@ Scope:
- `aiocqhttp-eba`
- `dingtalk-eba`
- `lark-eba`
- `wecom-eba`
- `wecombot-eba`
This report follows `acceptance-checklist.md`. Evidence levels are intentionally strict:
@@ -28,6 +30,8 @@ This report follows `acceptance-checklist.md`. Evidence levels are intentionally
| OneBot v11 / aiocqhttp | Partial EBA acceptance | Matcha UI covered real group text and outbound supported components/APIs. Multi-component inbound `Source/Plain/At/Face/Image/Voice/File/Quote` was verified through the real OneBot reverse WebSocket adapter endpoint, but not through Matcha UI upload/send. Matcha blocks file-send and merged-forward APIs. |
| DingTalk | Partial EBA acceptance | Real DingTalk UI covered private text, emoji-as-text inbound, private inbound image/file, outbound image/file/quote/mention fallback components, safe SDK APIs, and safe DingTalk platform APIs. Real UI inbound voice/quote and group trigger were not completed. |
| Lark / Feishu | Partial EBA acceptance | EBA adapter structure, self-built/store app config, WebSocket/Webhook mode handling, converters, common APIs, platform APIs, and unit tests are in place. One real LangBot organization WebSocket private text event reached `EBAEventProbe`; outbound component sweep was visible in Feishu. Latest real UI image/file sends did not reach local plugin evidence, so media receive remains blocked. |
| WeCom | Partial EBA acceptance | Regular WeCom application-message adapter is split into the EBA directory with manifest, converters, API mixin, platform API map, and unit tests. Private text reached `EBAEventProbe` through standalone runtime and the real WeCom client; safe plugin APIs passed. Real inbound media and broader event coverage remain pending. |
| WeComBot | Partial EBA acceptance | WeCom AI Bot is split into the EBA directory with WebSocket long connection mode and optional webhook mode, EBA message/feedback/platform-specific conversion, cache-backed common APIs, platform API map, unit tests, and a direct live probe. Private text, outbound component sweep, safe common APIs, and all declared WeComBot platform APIs reached `EBAEventProbe`; group, real inbound media, and feedback callback evidence remain pending. |
Telegram and DingTalk now have real user-side UI image/file upload evidence in plugin JSONL. Discord and aiocqhttp do not yet have real UI inbound image/file evidence.

View File

@@ -0,0 +1,130 @@
# WeCom EBA Adapter
## Status
WeCom application messages now have an EBA adapter directory:
```text
src/langbot/pkg/platform/adapters/wecom/
├── adapter.py
├── api_impl.py
├── event_converter.py
├── manifest.yaml
├── message_converter.py
├── platform_api.py
└── types.py
```
The adapter is registered as `wecom-eba`.
This record covers the regular WeCom application-message adapter. WeCom AI Bot (`wecombot-eba`) uses a different protocol flow and is documented separately in `wecombot.md`. WeCom Customer Service (`wecomcs`) remains a separate follow-up migration.
## Configuration
| Field | Required | Default | Description |
|-------|----------|---------|-------------|
| `webhook_url` | No | `""` | Unified webhook URL copied into the WeCom application callback settings. |
| `corpid` | Yes | `""` | WeCom corporate ID. |
| `secret` | Yes | `""` | WeCom application secret. |
| `token` | Yes | `""` | WeCom callback token. |
| `EncodingAESKey` | Yes | `""` | WeCom callback encryption key. |
| `contacts_secret` | No | `""` | Contacts secret for contact-list based helper APIs. |
| `api_base_url` | No | `https://qyapi.weixin.qq.com/cgi-bin` | WeCom API base URL, overrideable for proxy/private-network deployments. |
## Events
WeCom declares these EBA events:
- `message.received`
- `platform.specific`
`message.received` currently covers text and image application callbacks. Other WeCom callback types are surfaced as `platform.specific` so plugins can inspect the raw structured payload without crashing the common message path.
## Common APIs
| API | Status | Notes |
|-----|--------|-------|
| `send_message` | Supported | Private/person target only. `target_id` must be `user_id|agent_id`. Supports text, image, voice, file, flattened forward, and quote fallback. |
| `reply_message` | Supported | Replies to the original WeCom sender and application agent from `source_platform_object`. |
| `get_message` | Supported from cache | Returns cached inbound `MessageReceivedEvent` by message ID. |
| `get_user_info` | Supported | Uses cached event users first, then WeCom `user/get`. |
| `get_friend_list` | Partial | Returns users seen by this adapter instance. Full contacts listing is not declared as common coverage. |
| `call_platform_api` | Supported | See below. |
| `edit_message` | Not supported | WeCom application messages do not expose a general edit endpoint for sent messages. |
| `delete_message` | Not supported | WeCom application messages do not expose a general delete endpoint for sent messages. |
| `get_group_info` / member APIs | Not supported | Regular WeCom application callbacks handled here are private user messages, not group-chat bot messages. |
| `upload_file` / `get_file_url` | Not supported as common APIs | WeCom media upload is used internally while sending image/voice/file components; no portable standalone common file URL is exposed. |
## Platform-Specific APIs
`call_platform_api(action, params)` supports:
- `check_access_token`
- `refresh_access_token`
- `get_user_info`
- `send_to_all`
`send_to_all` requires a configured `contacts_secret` with suitable contact visibility and should be treated as a broad-send operation in live testing.
## Unit Verification
Covered by:
```bash
uv run pytest tests/unit_tests/platform/test_wecom_eba_adapter.py
```
The unit tests cover:
- Manifest events/APIs/platform actions match adapter declarations.
- Outbound component conversion for text, image, voice, file, quote fallback, and byte-safe text splitting.
- Text callback conversion to `MessageReceivedEvent`.
- Legacy `FriendMessage` compatibility.
- EBA listener dispatch and inbound message/user cache.
- `send_message`, `reply_message`, and safe platform API dispatch against a mocked WeCom client.
## Standalone Runtime Plugin E2E Record
Verified on May 27, 2026 with `EBAEventProbe`, SDK standalone runtime, LangBot core, and a real WeCom desktop client against the server test environment.
```bash
cd langbot-plugin-sdk
uv run python -m langbot_plugin.cli.__init__ rt --debug-only --ws-control-port 5400 --ws-debug-port 5401 --skip-deps-check
cd LangBot
uv run main.py --standalone-runtime
cd data/plugins/LangBot__EBAEventProbe
EBA_PROBE_API=1 EBA_PROBE_COMPONENT_SWEEP=1 EBA_PROBE_PLATFORM_API=1 \
uv --project /absolute/path/to/langbot-plugin-sdk run python -m langbot_plugin.cli.__init__ run
```
Evidence:
- JSONL: `data/temp/wecom_eba_plugin_probe.jsonl`
- Bot: `wecom-eba`
- Client: real WeCom desktop client
- Environment: `dev.rockchin.top` test server
Observed and verified:
- A real private WeCom user message reached the plugin as `MessageReceived` with `adapter_name=wecom-eba`, common sender/chat fields, and `Source + Plain`.
- SDK API calls succeeded through the standalone runtime, including `get_langbot_version`, `get_bots`, `get_bot_info`, `send_message`, plugin/workspace storage, and manifest/list APIs.
- Safe adapter API checks succeeded through the plugin path for cached message/user data and declared safe platform API actions.
Still required for stricter acceptance:
- Send a private image and confirm common `Image` reaches the plugin.
- Have the plugin call `send_message` and `reply_message` for text and one media component, then verify the WeCom client receives the bot output.
- Exercise `send_to_all` only with a disposable visible-contact scope.
- Trigger one non-text/image callback, if available, and confirm it becomes `PlatformSpecificEventReceived`.
## Current Acceptance
Current status is **partial EBA acceptance**.
Blocked items:
- Real inbound image/voice/file evidence was not completed in this run.
- Inbound voice/file callback parsing is not present in the legacy `WecomClient.get_message()` path, so the EBA adapter does not claim those receive components yet.
- Group/member/moderation APIs do not apply to this regular WeCom application-message adapter.

View File

@@ -0,0 +1,148 @@
# WeComBot EBA Adapter
## Status
WeCom AI Bot now has an EBA adapter directory:
```text
src/langbot/pkg/platform/adapters/wecombot/
├── adapter.py
├── api_impl.py
├── event_converter.py
├── manifest.yaml
├── message_converter.py
├── platform_api.py
└── types.py
```
The adapter is registered as `wecombot-eba`.
This is separate from regular WeCom internal applications (`wecom-eba`). WeComBot supports WebSocket long connection mode, which does not require a webhook URL. Webhook mode remains available when `enable-webhook=true`.
## Configuration
| Field | Required | Default | Description |
|-------|----------|---------|-------------|
| `BotId` | Yes for WebSocket mode | `""` | WeCom AI Bot ID. |
| `robot_name` | Yes | `""` | Bot display name used to strip bot mentions from incoming group text. |
| `enable-webhook` | Yes | `false` | `false` uses WebSocket long connection mode; `true` uses webhook callback mode. |
| `webhook_url` | No | `""` | Unified webhook URL, only needed when webhook mode is enabled. |
| `Secret` | Yes for WebSocket mode | `""` | WeCom AI Bot secret for long connection mode. |
| `Corpid` | Yes for webhook mode | `""` | WeCom corporate ID for webhook callback mode. |
| `Token` | Yes for webhook mode | `""` | WeCom callback token. |
| `EncodingAESKey` | Yes for webhook mode; optional for WebSocket media decrypt | `""` | Message encryption/decryption key. |
| `enable-stream-reply` | No | `true` | Enables WeComBot streaming replies. |
## Events
WeComBot declares these EBA events:
- `message.received`
- `feedback.received`
- `platform.specific`
`message.received` covers private and group messages from the WeComBot SDK. `feedback.received` covers WeComBot like/dislike feedback callbacks. Native SDK events without a common EBA equivalent are emitted as `platform.specific`.
## Common APIs
| API | Status | Notes |
|-----|--------|-------|
| `send_message` | Supported in WebSocket mode | Sends proactive markdown/text to a person or group chat ID. Webhook mode raises `NotSupportedError` because the platform callback flow has no proactive send path here. |
| `reply_message` | Supported | Replies through native `req_id` in WebSocket mode or stream finalization/cache in webhook mode. |
| `get_message` | Supported from cache | Returns cached inbound `MessageReceivedEvent` by message ID. |
| `get_user_info` | Supported from cache | WeComBot events carry user info; no full user lookup endpoint is declared. |
| `get_friend_list` | Partial | Returns users observed by this adapter instance. |
| `get_group_info` | Supported from cache | Returns groups observed from inbound group messages. |
| `get_group_member_info` | Supported from cache | Returns observed sender/group-member pairs. |
| `get_group_member_list` | Partial | Returns observed members for the cached group only. |
| `call_platform_api` | Supported | See below. |
| `edit_message` / `delete_message` / `forward_message` | Not supported | WeComBot does not expose portable common APIs for these operations in the current SDK wrapper. |
| `upload_file` / `get_file_url` | Not supported as common APIs | Media is represented inside messages; no portable standalone file upload/URL API is declared. |
| moderation / leave APIs | Not supported | WeComBot does not expose equivalent common moderation operations through this adapter. |
## Platform-Specific APIs
`call_platform_api(action, params)` supports:
- `is_websocket_mode`
- `get_stream_session_status`
- `send_markdown`
`send_markdown` is only available in WebSocket mode.
## Unit Verification
Covered by:
```bash
PYTHONPATH=/Users/wangqiang/code/python/langbot-plugin-sdk/src uv run pytest tests/unit_tests/platform/test_wecombot_eba_adapter.py
```
The unit tests cover:
- Manifest events/APIs/platform actions match adapter declarations.
- Outbound common components flatten to WeComBot markdown/text.
- Private and group native events become `MessageReceivedEvent`.
- Inbound image, file, voice, and quote components map to common `MessageChain`.
- Legacy `FriendMessage`/`GroupMessage` compatibility.
- EBA listener dispatch, message/user/group/member cache, reply, send, streaming chunk, feedback, and platform API calls.
## Live Probe
The direct adapter probe is:
```bash
PYTHONPATH=/absolute/path/to/langbot-plugin-sdk/src uv run python tests/e2e/live_wecombot_eba_probe.py --help
```
Default mode is WebSocket long connection and requires:
- `WECOMBOT_BOT_ID`
- `WECOMBOT_SECRET`
- `WECOMBOT_ROBOT_NAME`
- optional `WECOMBOT_ENCODING_AES_KEY`
Webhook mode uses `--webhook` and requires:
- `WECOMBOT_TOKEN`
- `WECOMBOT_ENCODING_AES_KEY`
- `WECOMBOT_CORPID`
The probe writes JSONL evidence to `data/temp/wecombot_eba_live_probe.jsonl`, waits for a real WeComBot message, records common EBA event fields and message components, then runs safe cached/common/platform API checks.
## Standalone Runtime Plugin E2E Record
Verified on May 27, 2026 with `EBAEventProbe`, SDK standalone runtime, LangBot core, and the real WeCom desktop client in a WeCom AI Bot private chat.
Evidence:
- JSONL: `data/temp/wecombot_eba_plugin_probe.jsonl`
- Bot UUID: `9f5d4125-7b6d-4c98-8ca2-111111111111`
- Adapter: `wecombot-eba`
- Client: real WeCom desktop client, private `LangBot` BOT chat
- Mode: WebSocket long connection (`enable-webhook=false`)
Observed and verified:
- A real user-side message reached the plugin as `MessageReceived` with `adapter_name=wecombot-eba`, common sender/chat fields, and `Source + Plain`.
- SDK API calls succeeded through the standalone runtime: `get_langbot_version`, `get_bots`, `get_bot_info`, `send_message`, plugin/workspace storage, manifest/list APIs, and safe cached common platform APIs.
- Outbound component sweep was visible in the WeCom client and returned `errcode=0`: plain/mention/face fallback, base64 image marker, quote fallback, file marker, and flattened forward fallback.
- Declared WeComBot platform APIs succeeded through `plugin.call_platform_api`: `is_websocket_mode`, `get_stream_session_status`, and `send_markdown`.
- The `send_markdown` platform API produced visible bot output in the WeCom client.
Not completed:
- Clicking the visible WeCom AI feedback button did not produce a `FeedbackReceived` JSONL entry in this run, so `feedback.received` remains unverified at plugin E2E level.
- Group chat inbound and group cache/member coverage still need a real group-side trigger.
- Real inbound image/file/voice from the WeCom client was not exercised.
## Current Acceptance
Current status is **partial EBA acceptance**.
Blocked or limited items:
- `feedback.received` is implemented and unit-covered, but real plugin E2E feedback evidence was not observed from the desktop client click.
- Outbound image/voice/file are flattened as textual markers because the WeComBot SDK reply/proactive path used here is markdown/text oriented.
- Group member APIs are cache-backed and only know members observed in received messages.
- Destructive or moderation APIs are not declared because the current WeComBot protocol surface does not provide safe common equivalents.

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import asyncio
import base64
import json
@@ -7,7 +9,7 @@ import uuid
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
import re
from typing import Any, Callable, Optional, Tuple
from typing import TYPE_CHECKING, Any, Callable, Optional, Tuple
from urllib.parse import unquote
import httpx
@@ -16,7 +18,9 @@ from quart import Quart, request, Response, jsonify
from langbot.libs.wecom_ai_bot_api import wecombotevent
from langbot.libs.wecom_ai_bot_api.WXBizMsgCrypt3 import WXBizMsgCrypt
from langbot.pkg.platform.logger import EventLogger
if TYPE_CHECKING:
from langbot.pkg.platform.logger import EventLogger
@dataclass

View File

@@ -15,13 +15,15 @@ import json
import secrets
import time
import traceback
from typing import Any, Callable, Optional
from typing import TYPE_CHECKING, Any, Callable, Optional
import aiohttp
from langbot.libs.wecom_ai_bot_api import wecombotevent
from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession
from langbot.pkg.platform.logger import EventLogger
if TYPE_CHECKING:
from langbot.pkg.platform.logger import EventLogger
DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com'

View File

@@ -5,6 +5,7 @@ import sqlalchemy
import typing
from ....core import app
from ....discover import engine
from ....entity.persistence import bot as persistence_bot
from ....entity.persistence import pipeline as persistence_pipeline
@@ -17,6 +18,24 @@ class BotService:
def __init__(self, ap: app.Application) -> None:
self.ap = ap
def _get_adapter_component(self, adapter_name: str) -> engine.Component | None:
"""Return the discovered platform adapter component for an adapter name."""
for component in self.ap.discover.get_components_by_kind('MessagePlatformAdapter'):
if component.metadata.name == adapter_name:
return component
return None
def _adapter_declares_webhook_url(self, adapter_name: str) -> bool:
"""Whether the adapter manifest declares a generated webhook URL config item."""
component = self._get_adapter_component(adapter_name)
if component is None:
return False
for config_item in component.spec.get('config', []):
if config_item.get('type') == 'webhook-url':
return True
return False
async def get_bots(self, include_secret: bool = True) -> list[dict]:
"""获取所有机器人"""
result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_bot.Bot))
@@ -58,17 +77,10 @@ class BotService:
if runtime_bot is not None:
adapter_runtime_values['bot_account_id'] = runtime_bot.adapter.bot_account_id
# Webhook URL for unified webhook adapters (independent of bot running state)
if persistence_bot['adapter'] in [
'wecom',
'wecombot',
'officialaccount',
'qqofficial',
'slack',
'wecomcs',
'LINE',
'lark',
]:
# Webhook URL for adapters that declare a generated webhook config item.
# This is manifest-driven so EBA adapters do not need to be mirrored in a
# second hard-coded list.
if self._adapter_declares_webhook_url(persistence_bot['adapter']):
webhook_prefix = self.ap.instance_config.data['api'].get('webhook_prefix', 'http://127.0.0.1:5300')
extra_webhook_prefix = self.ap.instance_config.data['api'].get('extra_webhook_prefix', '')
webhook_url = f'/bots/{bot_uuid}'

View File

@@ -163,13 +163,21 @@ class PreProcessor(stage.PipelineStage):
plain_text = ''
quote_msg = query.pipeline_config['trigger'].get('misc', '').get('combine-quote-message')
local_agent_without_vision = (
selected_runner == 'local-agent'
and llm_model
and not llm_model.model_entity.abilities.__contains__('vision')
)
for me in query.message_chain:
if isinstance(me, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(me.text))
plain_text += me.text
elif isinstance(me, platform_message.Image):
if selected_runner != 'local-agent' or (
if local_agent_without_vision:
content_list.append(provider_message.ContentElement.from_text('[Image]'))
plain_text += '[Image]'
elif selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if me.base64 is not None:
@@ -190,7 +198,10 @@ class PreProcessor(stage.PipelineStage):
if isinstance(msg, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if selected_runner != 'local-agent' or (
if local_agent_without_vision:
content_list.append(provider_message.ContentElement.from_text('[Image]'))
plain_text += '[Image]'
elif selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if msg.base64 is not None:

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,221 @@
from __future__ import annotations
import asyncio
import traceback
import typing
import pydantic
from langbot.libs.wecom_api.api import WecomClient
from langbot.libs.wecom_api.wecomevent import WecomEvent
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
from langbot.pkg.platform.adapters.wecom.api_impl import WecomAPIMixin
from langbot.pkg.platform.adapters.wecom.event_converter import WecomEventConverter
from langbot.pkg.platform.adapters.wecom.message_converter import WecomMessageConverter
from langbot.pkg.platform.adapters.wecom.platform_api import PLATFORM_API_MAP
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class WecomAdapter(WecomAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter):
bot: WecomClient = pydantic.Field(exclude=True)
message_converter: WecomMessageConverter = WecomMessageConverter()
event_converter: WecomEventConverter = WecomEventConverter()
config: dict
bot_uuid: str | None = None
listeners: dict[
typing.Type[platform_events.Event],
typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
] = {}
_message_cache: dict[str, platform_events.MessageReceivedEvent] = {}
_user_cache: dict[str, typing.Any] = {}
class Config:
arbitrary_types_allowed = True
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger):
required_keys = [
'corpid',
'secret',
'token',
'EncodingAESKey',
]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise Exception(f'WeCom missing required config fields: {missing_keys}')
bot = WecomClient(
corpid=config['corpid'],
secret=config['secret'],
token=config['token'],
EncodingAESKey=config['EncodingAESKey'],
contacts_secret=config.get('contacts_secret', ''),
logger=logger,
unified_mode=True,
api_base_url=config.get('api_base_url', 'https://qyapi.weixin.qq.com/cgi-bin'),
)
super().__init__(
config=config,
logger=logger,
bot=bot,
bot_account_id='',
bot_uuid=None,
listeners={},
_message_cache={},
_user_cache={},
)
self._register_native_handlers()
def set_bot_uuid(self, bot_uuid: str):
self.bot_uuid = bot_uuid
def get_supported_events(self) -> list[str]:
return [
'message.received',
'platform.specific',
]
def get_supported_apis(self) -> list[str]:
return [
'send_message',
'reply_message',
'get_message',
'get_user_info',
'get_friend_list',
'call_platform_api',
]
async def send_message(
self,
target_type: str,
target_id: str,
message: platform_message.MessageChain,
) -> platform_events.MessageResult:
if target_type not in ('person', 'private'):
raise NotSupportedError(f'send_message:{target_type}')
user_id, agent_id = self._parse_target_id(target_id)
content_list = await WecomMessageConverter.yiri2target(message, self.bot)
raw_results = []
for content in content_list:
raw_results.append(await self._send_content(user_id, agent_id, content))
return platform_events.MessageResult(raw={'results': raw_results})
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
) -> platform_events.MessageResult:
wecom_event = await WecomEventConverter.yiri2target(message_source)
if not isinstance(wecom_event, WecomEvent):
raise ValueError('WeCom reply_message requires a WecomEvent source object')
content_list = await WecomMessageConverter.yiri2target(message, self.bot)
raw_results = []
for content in content_list:
raw_results.append(await self._send_content(wecom_event.user_id, int(wecom_event.agent_id), content))
return platform_events.MessageResult(message_id=wecom_event.message_id, raw={'results': raw_results})
async def call_platform_api(self, action: str, params: dict = {}) -> dict:
handler = PLATFORM_API_MAP.get(action)
if handler is None:
raise NotSupportedError(f'call_platform_api:{action}')
return await handler(self.bot, params)
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
self.listeners[event_type] = callback
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
registered = self.listeners.get(event_type)
if registered is callback:
self.listeners.pop(event_type, None)
async def handle_unified_webhook(self, bot_uuid: str, path: str, request):
return await self.bot.handle_unified_webhook(request)
async def run_async(self):
async def keep_alive():
while True:
await asyncio.sleep(1)
await self.logger.info('WeCom EBA adapter running in unified webhook mode')
await keep_alive()
async def kill(self) -> bool:
return True
async def is_muted(self, group_id: int | None = None) -> bool:
return False
def _register_native_handlers(self):
async def on_message(event: WecomEvent):
await self._handle_native_event(event)
self.bot.on_message('text')(on_message)
self.bot.on_message('image')(on_message)
async def _handle_native_event(self, event: WecomEvent):
self.bot_account_id = event.receiver_id or self.bot_account_id
try:
if platform_events.FriendMessage in self.listeners:
legacy_event = await self.event_converter.target2legacy(event, self.bot)
if legacy_event:
callback = self.listeners.get(type(legacy_event))
if callback:
await callback(legacy_event, self)
eba_event = await self.event_converter.target2yiri(event, self.bot)
if eba_event:
self._cache_event(eba_event)
await self._dispatch_eba_event(eba_event)
except Exception:
await self.logger.error(f'Error in wecom native event: {traceback.format_exc()}')
async def _dispatch_eba_event(self, event: platform_events.EBAEvent):
for event_type in (type(event), platform_events.EBAEvent, platform_events.Event):
callback = self.listeners.get(event_type)
if callback:
await callback(event, self)
return
def _cache_event(self, event: platform_events.Event):
if not isinstance(event, platform_events.MessageReceivedEvent):
return
self._message_cache[str(event.message_id)] = event
self._user_cache[str(event.sender.id)] = event.sender
async def _send_content(self, user_id: str, agent_id: int, content: dict):
content_type = content.get('type')
if content_type == 'text':
return await self.bot.send_private_msg(user_id, agent_id, content.get('content', ''))
if content_type == 'image':
return await self.bot.send_image(user_id, agent_id, content['media_id'])
if content_type == 'voice':
return await self.bot.send_voice(user_id, agent_id, content['media_id'])
if content_type == 'file':
return await self.bot.send_file(user_id, agent_id, content['media_id'])
raise NotSupportedError(f'send_content:{content_type}')
@staticmethod
def _parse_target_id(target_id: str) -> tuple[str, int]:
user_id, sep, agent_id = str(target_id).partition('|')
if not user_id or not sep or not agent_id:
raise ValueError('WeCom target_id must be formatted as "user_id|agent_id"')
return user_id, int(agent_id)

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
import typing
from langbot.libs.wecom_api.api import WecomClient
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class WecomAPIMixin:
bot: WecomClient
_message_cache: dict[str, platform_events.MessageReceivedEvent]
_user_cache: dict[str, platform_entities.User]
async def get_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> platform_events.MessageReceivedEvent:
event = self._message_cache.get(str(message_id))
if event is None:
raise NotSupportedError('get_message:message_not_cached')
return event
async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User:
cached = self._user_cache.get(str(user_id))
if cached is not None:
return cached
info = await self.bot.get_user_info(str(user_id))
return platform_entities.User(
id=info.get('userid') or user_id,
nickname=info.get('name') or str(user_id),
username=info.get('alias') or info.get('userid') or None,
)
async def get_friend_list(self) -> list[platform_entities.User]:
return list(self._user_cache.values())
async def upload_file(self, file_data: bytes, filename: str) -> str:
raise NotSupportedError('upload_file')
async def get_file_url(self, file_id: str) -> str:
raise NotSupportedError('get_file_url')
async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup:
raise NotSupportedError('get_group_info')
async def get_group_member_list(
self,
group_id: typing.Union[int, str],
) -> list[platform_entities.UserGroupMember]:
raise NotSupportedError('get_group_member_list')
async def get_group_member_info(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> platform_entities.UserGroupMember:
raise NotSupportedError('get_group_member_info')
async def edit_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
new_content: platform_message.MessageChain,
) -> None:
raise NotSupportedError('edit_message')
async def delete_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> None:
raise NotSupportedError('delete_message')

View File

@@ -0,0 +1,91 @@
from __future__ import annotations
import typing
from langbot.libs.wecom_api.api import WecomClient
from langbot.libs.wecom_api.wecomevent import WecomEvent
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot.pkg.platform.adapters.wecom.message_converter import WecomMessageConverter
from langbot.pkg.platform.adapters.wecom.types import ADAPTER_NAME, make_private_chat_id
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
class WecomEventConverter(abstract_platform_adapter.AbstractEventConverter):
@staticmethod
async def yiri2target(event: platform_events.Event) -> WecomEvent | None:
return getattr(event, 'source_platform_object', None)
@staticmethod
async def target2legacy(event: WecomEvent, bot: WecomClient | None = None) -> platform_events.FriendMessage | None:
eba_event = await WecomEventConverter.target2yiri(event, bot)
if hasattr(eba_event, 'to_legacy_event'):
return eba_event.to_legacy_event()
if event.type in {'text', 'image'} and eba_event is not None:
friend = platform_entities.Friend(
id=f'u{event.user_id}',
nickname=getattr(getattr(eba_event, 'sender', None), 'nickname', str(event.user_id or '')),
remark='',
)
return platform_events.FriendMessage(
sender=friend,
message_chain=eba_event.message_chain,
time=getattr(eba_event, 'timestamp', None),
source_platform_object=event,
)
return None
@staticmethod
async def target2yiri(event: WecomEvent, bot: WecomClient | None = None) -> platform_events.Event | None:
if event.type in {'text', 'image'}:
return await WecomEventConverter.message_to_eba(event, bot)
return WecomEventConverter.platform_specific(event, f'message.{event.detail_type or event.type or "unknown"}')
@staticmethod
async def message_to_eba(event: WecomEvent, bot: WecomClient | None = None) -> platform_events.MessageReceivedEvent:
if event.type == 'image':
message_chain = await WecomMessageConverter.target2yiri_image(event.picurl, event.message_id)
else:
message_chain = await WecomMessageConverter.target2yiri_text(event.message, event.message_id)
sender = await WecomEventConverter.user_from_event(event, bot)
return platform_events.MessageReceivedEvent(
type='message.received',
adapter_name=ADAPTER_NAME,
message_id=event.message_id or '',
message_chain=message_chain,
sender=sender,
chat_type=platform_entities.ChatType.PRIVATE,
chat_id=make_private_chat_id(event.user_id, event.agent_id),
group=None,
timestamp=float(event.timestamp or 0),
source_platform_object=event,
)
@staticmethod
async def user_from_event(event: WecomEvent, bot: WecomClient | None = None) -> platform_entities.User:
nickname = str(event.user_id or '')
raw: dict[str, typing.Any] = {}
if bot and event.user_id:
try:
raw = await bot.get_user_info(event.user_id)
nickname = raw.get('name') or nickname
except Exception:
raw = {}
return platform_entities.User(
id=event.user_id or '',
nickname=nickname,
username=raw.get('alias') or raw.get('userid') or None,
)
@staticmethod
def platform_specific(event: WecomEvent, action: str) -> platform_events.PlatformSpecificEvent:
return platform_events.PlatformSpecificEvent(
type='platform.specific',
adapter_name=ADAPTER_NAME,
action=action,
data=dict(event),
timestamp=float(event.timestamp or 0),
source_platform_object=event,
)

View File

@@ -0,0 +1,117 @@
apiVersion: v1
kind: MessagePlatformAdapter
metadata:
name: wecom-eba
label:
en_US: WeCom (EBA)
zh_Hans: 企业微信 (EBA)
zh_Hant: 企業微信 (EBA)
description:
en_US: WeCom application message adapter (EBA architecture)
zh_Hans: 企业微信内部应用消息适配器EBA 架构版本)
zh_Hant: 企業微信內部應用訊息適配器EBA 架構版本)
icon: wecom.png
spec:
categories:
- popular
- china
help_links:
zh: https://link.langbot.app/zh/platforms/wecom
en: https://link.langbot.app/en/platforms/wecom
ja: https://link.langbot.app/ja/platforms/wecom
config:
- name: webhook_url
label:
en_US: Webhook Callback URL
zh_Hans: Webhook 回调地址
zh_Hant: Webhook 回調地址
description:
en_US: Copy this URL and paste it into your WeCom app's webhook configuration
zh_Hans: 复制此地址并粘贴到企业微信应用的 Webhook 配置中
zh_Hant: 複製此地址並貼到企業微信應用的 Webhook 設定中
type: webhook-url
required: false
default: ""
- name: corpid
label:
en_US: Corpid
zh_Hans: 企业ID
zh_Hant: 企業ID
type: string
required: true
default: ""
- name: secret
label:
en_US: Secret
zh_Hans: 密钥 (Secret)
zh_Hant: 密鑰 (Secret)
type: string
required: true
default: ""
- name: token
label:
en_US: Token
zh_Hans: 令牌 (Token)
zh_Hant: 令牌 (Token)
type: string
required: true
default: ""
- name: EncodingAESKey
label:
en_US: EncodingAESKey
zh_Hans: 消息加解密密钥 (EncodingAESKey)
zh_Hant: 訊息加解密密鑰 (EncodingAESKey)
type: string
required: true
default: ""
- name: contacts_secret
label:
en_US: Contacts Secret
zh_Hans: 通讯录密钥
zh_Hant: 通訊錄密鑰
type: string
required: false
default: ""
- name: api_base_url
label:
en_US: API Base URL
zh_Hans: API 基础 URL
zh_Hant: API 基礎 URL
description:
en_US: Optional WeCom API base URL for private network or reverse proxy deployments.
zh_Hans: 可选,若部署在内网环境并通过反向代理访问企业微信 API可根据文档填写此项
zh_Hant: 可選,若部署在內網環境並透過反向代理存取企業微信 API可根據文件填寫此項
type: string
required: false
default: "https://qyapi.weixin.qq.com/cgi-bin"
supported_events:
- message.received
- platform.specific
supported_apis:
required:
- send_message
- reply_message
optional:
- get_message
- get_user_info
- get_friend_list
- call_platform_api
platform_specific_apis:
- action: check_access_token
description: { en_US: "Check whether the current WeCom access token is usable", zh_Hans: "检查当前企业微信 access token 是否可用" }
- action: refresh_access_token
description: { en_US: "Refresh the WeCom access token", zh_Hans: "刷新企业微信 access token" }
- action: get_user_info
description: { en_US: "Get WeCom user information by user ID", zh_Hans: "按用户 ID 获取企业微信用户信息" }
- action: send_to_all
description: { en_US: "Send an application text message to all contacts available to the configured contacts secret", zh_Hans: "使用配置的通讯录密钥向可见成员群发应用文本消息" }
execution:
python:
path: ./adapter.py
attr: WecomAdapter

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
import datetime
from langbot.libs.wecom_api.api import WecomClient
from langbot.pkg.utils import image
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot_plugin.api.entities.builtin.platform import message as platform_message
def split_string_by_bytes(text: str, limit: int = 2048, encoding: str = 'utf-8') -> list[str]:
"""Split text without cutting a multi-byte character in half."""
bytes_data = text.encode(encoding)
total_len = len(bytes_data)
parts: list[str] = []
start = 0
while start < total_len:
end = min(start + limit, total_len)
chunk = bytes_data[start:end]
part = chunk.decode(encoding, errors='ignore')
part_len = len(part.encode(encoding))
if part_len == 0 and end < total_len:
start += 1
continue
parts.append(part)
start += part_len
return parts
class WecomMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
async def yiri2target(message_chain: platform_message.MessageChain, bot: WecomClient) -> list[dict]:
content_list: list[dict] = []
for msg in message_chain:
if isinstance(msg, platform_message.Source):
continue
if isinstance(msg, platform_message.Plain):
content_list.extend({'type': 'text', 'content': chunk} for chunk in split_string_by_bytes(msg.text))
elif isinstance(msg, platform_message.Image):
content_list.append({'type': 'image', 'media_id': await bot.get_media_id(msg)})
elif isinstance(msg, platform_message.Voice):
content_list.append({'type': 'voice', 'media_id': await bot.get_media_id(msg)})
elif isinstance(msg, platform_message.File):
content_list.append({'type': 'file', 'media_id': await bot.get_media_id(msg)})
elif isinstance(msg, platform_message.Forward):
for node in msg.node_list:
content_list.extend(await WecomMessageConverter.yiri2target(node.message_chain, bot))
elif isinstance(msg, platform_message.Quote):
if msg.id is not None:
content_list.append({'type': 'text', 'content': f'[Quote {msg.id}] '})
if msg.origin:
content_list.extend(await WecomMessageConverter.yiri2target(msg.origin, bot))
elif isinstance(msg, platform_message.At):
content_list.append({'type': 'text', 'content': f'@{msg.display or msg.target}'})
elif isinstance(msg, platform_message.AtAll):
content_list.append({'type': 'text', 'content': '@all'})
else:
content_list.append({'type': 'text', 'content': str(msg)})
return content_list
@staticmethod
async def target2yiri_text(message: str | None, message_id: int | str | None = -1) -> platform_message.MessageChain:
return platform_message.MessageChain(
[
platform_message.Source(id=message_id, time=datetime.datetime.now()),
platform_message.Plain(text=message or ''),
]
)
@staticmethod
async def target2yiri_image(picurl: str, message_id: int | str | None = -1) -> platform_message.MessageChain:
image_base64, image_format = await image.get_wecom_image_base64(pic_url=picurl)
return platform_message.MessageChain(
[
platform_message.Source(id=message_id, time=datetime.datetime.now()),
platform_message.Image(base64=f'data:image/{image_format};base64,{image_base64}'),
]
)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import typing
from langbot.libs.wecom_api.api import WecomClient
async def check_access_token(bot: WecomClient, params: dict) -> dict:
return {'valid': await bot.check_access_token()}
async def refresh_access_token(bot: WecomClient, params: dict) -> dict:
bot.access_token = await bot.get_access_token(bot.secret)
return {'ok': bool(bot.access_token)}
async def get_user_info(bot: WecomClient, params: dict) -> dict:
user_id = params.get('user_id') or params.get('userid')
if not user_id:
raise ValueError('user_id is required')
return await bot.get_user_info(str(user_id))
async def send_to_all(bot: WecomClient, params: dict) -> dict:
content = params.get('content')
agent_id = params.get('agent_id') or params.get('agentid')
if not content:
raise ValueError('content is required')
if agent_id is None:
raise ValueError('agent_id is required')
await bot.send_to_all(str(content), int(agent_id))
return {'ok': True}
PLATFORM_API_MAP: dict[str, typing.Callable[[WecomClient, dict], typing.Awaitable[dict]]] = {
'check_access_token': check_access_token,
'refresh_access_token': refresh_access_token,
'get_user_info': get_user_info,
'send_to_all': send_to_all,
}

View File

@@ -0,0 +1,12 @@
from __future__ import annotations
ADAPTER_NAME = 'wecom-eba'
def make_private_chat_id(user_id: str | int | None, agent_id: str | int | None) -> str:
"""Build the routable private chat id used by the WeCom EBA adapter."""
user = str(user_id or '')
agent = str(agent_id or '')
if not user or not agent:
return user
return f'{user}|{agent}'

Binary file not shown.

After

Width:  |  Height:  |  Size: 257 KiB

View File

@@ -0,0 +1 @@
from __future__ import annotations

View File

@@ -0,0 +1,282 @@
from __future__ import annotations
import asyncio
import time
import traceback
import typing
import pydantic
from langbot.libs.wecom_ai_bot_api.api import WecomBotClient
from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent
from langbot.libs.wecom_ai_bot_api.ws_client import WecomBotWsClient
from langbot.pkg.platform.adapters.wecombot.api_impl import WecomBotAPIMixin
from langbot.pkg.platform.adapters.wecombot.event_converter import WecomBotEventConverter
from langbot.pkg.platform.adapters.wecombot.message_converter import WecomBotMessageConverter
from langbot.pkg.platform.adapters.wecombot.platform_api import PLATFORM_API_MAP
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class WecomBotAdapter(WecomBotAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter):
bot: typing.Any = pydantic.Field(exclude=True)
message_converter: WecomBotMessageConverter = WecomBotMessageConverter()
event_converter: WecomBotEventConverter
config: dict
bot_uuid: str | None = None
bot_name: str = ''
listeners: dict[
typing.Type[platform_events.Event],
typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
] = {}
_message_cache: dict[str, platform_events.MessageReceivedEvent] = {}
_user_cache: dict[str, platform_entities.User] = {}
_group_cache: dict[str, platform_entities.UserGroup] = {}
_member_cache: dict[tuple[str, str], platform_entities.UserGroupMember] = {}
_stream_to_monitoring_msg: dict[str, tuple[str, float]] = {}
_STREAM_MAPPING_TTL: int = 600
class Config:
arbitrary_types_allowed = True
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger):
enable_webhook = config.get('enable-webhook', False)
bot_name = config.get('robot_name', '')
if not enable_webhook:
required_keys = ['BotId', 'Secret']
missing_keys = [key for key in required_keys if not config.get(key)]
if missing_keys:
raise Exception(f'WeComBot WebSocket mode missing config: {missing_keys}')
bot = WecomBotWsClient(
bot_id=config['BotId'],
secret=config['Secret'],
logger=logger,
encoding_aes_key=config.get('EncodingAESKey', ''),
)
else:
required_keys = ['Token', 'EncodingAESKey', 'Corpid']
missing_keys = [key for key in required_keys if not config.get(key)]
if missing_keys:
raise Exception(f'WeComBot webhook mode missing config: {missing_keys}')
bot = WecomBotClient(
Token=config['Token'],
EnCodingAESKey=config['EncodingAESKey'],
Corpid=config['Corpid'],
logger=logger,
unified_mode=True,
)
super().__init__(
config=config,
logger=logger,
bot=bot,
bot_account_id=config.get('BotId', ''),
bot_uuid=None,
bot_name=bot_name,
event_converter=WecomBotEventConverter(bot_name=bot_name),
listeners={},
_message_cache={},
_user_cache={},
_group_cache={},
_member_cache={},
_stream_to_monitoring_msg={},
)
self._register_native_handlers()
def set_bot_uuid(self, bot_uuid: str):
self.bot_uuid = bot_uuid
def get_supported_events(self) -> list[str]:
return [
'message.received',
'feedback.received',
'platform.specific',
]
def get_supported_apis(self) -> list[str]:
return [
'send_message',
'reply_message',
'get_message',
'get_user_info',
'get_friend_list',
'get_group_info',
'get_group_member_info',
'get_group_member_list',
'call_platform_api',
]
async def send_message(
self,
target_type: str,
target_id: str,
message: platform_message.MessageChain,
) -> platform_events.MessageResult:
if self.config.get('enable-webhook', False):
raise NotSupportedError('send_message:webhook_mode')
if target_type not in ('person', 'private', 'group'):
raise NotSupportedError(f'send_message:{target_type}')
content = await WecomBotMessageConverter.yiri2target(message)
raw = await self.bot.send_message(str(target_id), content)
return platform_events.MessageResult(raw={'result': raw})
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
) -> platform_events.MessageResult:
event = await WecomBotEventConverter.yiri2target(message_source)
if not isinstance(event, WecomBotEvent):
raise ValueError('WeComBot reply_message requires a WecomBotEvent source object')
content = await WecomBotMessageConverter.yiri2target(message)
if not self.config.get('enable-webhook', False) and event.get('req_id'):
raw = await self.bot.reply_text(event.get('req_id'), content)
else:
raw = await self.bot.set_message(event.message_id, content)
return platform_events.MessageResult(message_id=event.message_id, raw={'result': raw})
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
quote_origin: bool = False,
is_final: bool = False,
) -> dict:
event = await WecomBotEventConverter.yiri2target(message_source)
if not isinstance(event, WecomBotEvent):
raise ValueError('WeComBot reply_message_chunk requires a WecomBotEvent source object')
content = await WecomBotMessageConverter.yiri2target(message)
success = await self.bot.push_stream_chunk(event.message_id, content, is_final=is_final)
if not success and is_final and not self.config.get('enable-webhook', False) and event.get('req_id'):
await self.bot.reply_text(event.get('req_id'), content)
return {'stream': success}
async def is_stream_output_supported(self) -> bool:
return self.config.get('enable-stream-reply', True)
async def call_platform_api(self, action: str, params: dict = {}) -> dict:
handler = PLATFORM_API_MAP.get(action)
if handler is None:
raise NotSupportedError(f'call_platform_api:{action}')
return await handler(self.bot, params)
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
self.listeners[event_type] = callback
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
registered = self.listeners.get(event_type)
if registered is callback:
self.listeners.pop(event_type, None)
async def handle_unified_webhook(self, bot_uuid: str, path: str, request):
if not self.config.get('enable-webhook', False):
return None
return await self.bot.handle_unified_webhook(request)
async def run_async(self):
if not self.config.get('enable-webhook', False):
await self.bot.connect()
return
async def keep_alive():
while True:
await asyncio.sleep(1)
await self.logger.info('WeComBot EBA adapter running in unified webhook mode')
await keep_alive()
async def kill(self) -> bool:
if not self.config.get('enable-webhook', False):
await self.bot.disconnect()
return True
async def is_muted(self, group_id: int | None = None) -> bool:
return False
async def on_monitoring_message_created(self, query, monitoring_message_id: str):
try:
stream_id = query.message_event.source_platform_object.stream_id
if stream_id:
self._stream_to_monitoring_msg[stream_id] = (monitoring_message_id, time.time())
self._cleanup_stream_mapping()
except Exception as e:
await self.logger.debug(f'Failed to map stream_id to monitoring message: {e}')
def _register_native_handlers(self):
self.bot.on_message('single')(self._handle_native_event)
self.bot.on_message('group')(self._handle_native_event)
if hasattr(self.bot, 'on_feedback'):
self.bot.on_feedback()(self._handle_feedback)
if hasattr(self.bot, 'on_message'):
self.bot.on_message('event')(self._handle_native_event)
async def _handle_native_event(self, event: WecomBotEvent):
try:
if platform_events.FriendMessage in self.listeners or platform_events.GroupMessage in self.listeners:
legacy_event = await self.event_converter.target2legacy(event)
if legacy_event and type(legacy_event) in self.listeners:
await self.listeners[type(legacy_event)](legacy_event, self)
eba_event = await self.event_converter.target2yiri(event)
if eba_event:
self._cache_event(eba_event)
await self._dispatch_eba_event(eba_event)
except Exception:
await self.logger.error(f'Error in wecombot native event: {traceback.format_exc()}')
async def _handle_feedback(self, **kwargs):
try:
event = WecomBotEventConverter.feedback_to_eba(**kwargs)
if event.stream_id and event.stream_id in self._stream_to_monitoring_msg:
monitoring_msg_id, _ = self._stream_to_monitoring_msg[event.stream_id]
event.stream_id = monitoring_msg_id
await self._dispatch_eba_event(event)
except Exception:
await self.logger.error(f'Error in wecombot feedback event: {traceback.format_exc()}')
async def _dispatch_eba_event(self, event: platform_events.EBAEvent):
for event_type in (type(event), platform_events.EBAEvent, platform_events.Event):
callback = self.listeners.get(event_type)
if callback:
await callback(event, self)
return
def _cache_event(self, event: platform_events.Event):
if not isinstance(event, platform_events.MessageReceivedEvent):
return
self._message_cache[str(event.message_id)] = event
self._user_cache[str(event.sender.id)] = event.sender
if event.group:
self._group_cache[str(event.group.id)] = event.group
self._member_cache[(str(event.group.id), str(event.sender.id))] = platform_entities.UserGroupMember(
user=event.sender,
group_id=event.group.id,
role=platform_entities.MemberRole.MEMBER,
display_name=event.sender.nickname,
)
def _cleanup_stream_mapping(self):
now = time.time()
expired = [key for key, (_, ts) in self._stream_to_monitoring_msg.items() if now - ts > self._STREAM_MAPPING_TTL]
for key in expired:
del self._stream_to_monitoring_msg[key]

View File

@@ -0,0 +1,102 @@
from __future__ import annotations
import typing
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class WecomBotAPIMixin:
_message_cache: dict[str, platform_events.MessageReceivedEvent]
_user_cache: dict[str, platform_entities.User]
_group_cache: dict[str, platform_entities.UserGroup]
_member_cache: dict[tuple[str, str], platform_entities.UserGroupMember]
async def get_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> platform_events.MessageReceivedEvent:
event = self._message_cache.get(str(message_id))
if event is None:
raise NotSupportedError('get_message:message_not_cached')
return event
async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User:
cached = self._user_cache.get(str(user_id))
if cached is None:
raise NotSupportedError('get_user_info:not_cached')
return cached
async def get_friend_list(self) -> list[platform_entities.User]:
return list(self._user_cache.values())
async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup:
cached = self._group_cache.get(str(group_id))
if cached is None:
raise NotSupportedError('get_group_info:not_cached')
return cached
async def get_group_member_info(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> platform_entities.UserGroupMember:
cached = self._member_cache.get((str(group_id), str(user_id)))
if cached is None:
raise NotSupportedError('get_group_member_info:not_cached')
return cached
async def get_group_member_list(
self,
group_id: typing.Union[int, str],
) -> list[platform_entities.UserGroupMember]:
return [member for (cached_group_id, _), member in self._member_cache.items() if cached_group_id == str(group_id)]
async def upload_file(self, file_data: bytes, filename: str) -> str:
raise NotSupportedError('upload_file')
async def get_file_url(self, file_id: str) -> str:
raise NotSupportedError('get_file_url')
async def edit_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
new_content: platform_message.MessageChain,
) -> None:
raise NotSupportedError('edit_message')
async def delete_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> None:
raise NotSupportedError('delete_message')
async def forward_message(
self,
from_chat_type: str,
from_chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
to_chat_type: str,
to_chat_id: typing.Union[int, str],
) -> platform_events.MessageResult:
raise NotSupportedError('forward_message')
async def mute_member(self, group_id: typing.Union[int, str], user_id: typing.Union[int, str], duration: int = 0):
raise NotSupportedError('mute_member')
async def unmute_member(self, group_id: typing.Union[int, str], user_id: typing.Union[int, str]):
raise NotSupportedError('unmute_member')
async def kick_member(self, group_id: typing.Union[int, str], user_id: typing.Union[int, str]):
raise NotSupportedError('kick_member')
async def leave_group(self, group_id: typing.Union[int, str]):
raise NotSupportedError('leave_group')

View File

@@ -0,0 +1,124 @@
from __future__ import annotations
import time
import typing
from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent
from langbot.pkg.platform.adapters.wecombot.message_converter import WecomBotMessageConverter
from langbot.pkg.platform.adapters.wecombot.types import ADAPTER_NAME
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
class WecomBotEventConverter(abstract_platform_adapter.AbstractEventConverter):
def __init__(self, bot_name: str = ''):
self.bot_name = bot_name
@staticmethod
async def yiri2target(event: platform_events.Event) -> typing.Any:
return getattr(event, 'source_platform_object', None)
async def target2legacy(self, event: WecomBotEvent) -> platform_events.FriendMessage | platform_events.GroupMessage | None:
eba_event = await self.target2yiri(event)
if not isinstance(eba_event, platform_events.MessageReceivedEvent):
return None
if eba_event.chat_type == platform_entities.ChatType.PRIVATE:
return platform_events.FriendMessage(
sender=platform_entities.Friend(id=eba_event.sender.id, nickname=eba_event.sender.nickname, remark=''),
message_chain=eba_event.message_chain,
time=eba_event.timestamp,
source_platform_object=event,
)
return platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=eba_event.sender.id,
permission='MEMBER',
member_name=eba_event.sender.nickname,
group=platform_entities.Group(
id=eba_event.group.id if eba_event.group else eba_event.chat_id,
name=eba_event.group.name if eba_event.group else '',
permission=platform_entities.Permission.Member,
),
special_title='',
),
message_chain=eba_event.message_chain,
time=eba_event.timestamp,
source_platform_object=event,
)
async def target2yiri(self, event: WecomBotEvent) -> platform_events.Event:
if event.type in {'single', 'group'} and event.msgtype != 'event':
return await self.message_to_eba(event)
return self.platform_specific(event, f'wecombot.{event.get("eventtype") or event.msgtype or event.type or "unknown"}')
async def message_to_eba(self, event: WecomBotEvent) -> platform_events.MessageReceivedEvent:
sender = platform_entities.User(id=event.userid, nickname=event.username or event.userid)
group = None
chat_type = platform_entities.ChatType.PRIVATE
chat_id = event.userid
if event.type == 'group':
chat_type = platform_entities.ChatType.GROUP
chat_id = str(event.chatid)
group = platform_entities.UserGroup(id=str(event.chatid), name=event.chatname or str(event.chatid))
return platform_events.MessageReceivedEvent(
type='message.received',
adapter_name=ADAPTER_NAME,
message_id=event.message_id or '',
message_chain=await WecomBotMessageConverter.target2yiri(event, self.bot_name),
sender=sender,
chat_type=chat_type,
chat_id=chat_id or '',
group=group,
timestamp=time.time(),
source_platform_object=event,
)
@staticmethod
def feedback_to_eba(
*,
feedback_id: str,
feedback_type: int,
feedback_content: str | None = None,
inaccurate_reasons: list | None = None,
session=None,
) -> platform_events.FeedbackReceivedEvent:
session_id = None
user_id = None
message_id = None
stream_id = None
if session:
if getattr(session, 'chat_id', None):
session_id = f'group_{session.chat_id}'
elif getattr(session, 'user_id', None):
session_id = f'person_{session.user_id}'
user_id = getattr(session, 'user_id', None)
message_id = getattr(session, 'msg_id', None)
stream_id = getattr(session, 'stream_id', None)
return platform_events.FeedbackReceivedEvent(
type='feedback.received',
adapter_name=ADAPTER_NAME,
feedback_id=feedback_id,
feedback_type=feedback_type,
feedback_content=feedback_content,
inaccurate_reasons=[str(reason) for reason in (inaccurate_reasons or [])] or None,
user_id=user_id,
session_id=session_id,
message_id=message_id,
stream_id=stream_id,
timestamp=time.time(),
source_platform_object=session,
)
@staticmethod
def platform_specific(event: WecomBotEvent, action: str) -> platform_events.PlatformSpecificEvent:
return platform_events.PlatformSpecificEvent(
type='platform.specific',
adapter_name=ADAPTER_NAME,
action=action,
data=dict(event),
timestamp=time.time(),
source_platform_object=event,
)

View File

@@ -0,0 +1,158 @@
apiVersion: v1
kind: MessagePlatformAdapter
metadata:
name: wecombot-eba
label:
en_US: WeComBot (EBA)
zh_Hans: 企业微信智能机器人 (EBA)
zh_Hant: 企業微信智慧機器人 (EBA)
description:
en_US: WeCom AI Bot adapter with Event-Based Agents support
zh_Hans: 企业微信智能机器人适配器EBA 架构版本),支持长连接和 Webhook 两种接入方式
zh_Hant: 企業微信智慧機器人適配器EBA 架構版本),支援長連線和 Webhook 兩種接入方式
icon: wecombot.png
spec:
categories:
- china
help_links:
zh: https://link.langbot.app/zh/platforms/wecombot
en: https://link.langbot.app/en/platforms/wecombot
ja: https://link.langbot.app/ja/platforms/wecombot
config:
- name: BotId
label:
en_US: BotId
zh_Hans: 机器人ID (BotId)
zh_Hant: 機器人ID (BotId)
type: string
required: true
default: ""
- name: robot_name
label:
en_US: Robot Name
zh_Hans: 机器人名称
zh_Hant: 機器人名稱
type: string
required: true
default: ""
- name: enable-webhook
label:
en_US: Enable Webhook Mode
zh_Hans: 启用 Webhook 模式
zh_Hant: 啟用 Webhook 模式
description:
en_US: If enabled, the bot will use webhook mode. Otherwise it uses WebSocket long connection mode and does not need a webhook URL.
zh_Hans: 如果启用,机器人将使用 Webhook 模式;否则使用 WebSocket 长连接模式,不需要配置 webhook URL。
zh_Hant: 如果啟用,機器人將使用 Webhook 模式;否則使用 WebSocket 長連線模式,不需要設定 webhook URL。
type: boolean
required: true
default: false
- name: webhook_url
label:
en_US: Webhook Callback URL
zh_Hans: Webhook 回调地址
zh_Hant: Webhook 回調地址
description:
en_US: Copy this URL into WeComBot callback settings only when webhook mode is enabled.
zh_Hans: 仅在启用 Webhook 模式时复制此地址到企业微信智能机器人回调配置中。
zh_Hant: 僅在啟用 Webhook 模式時複製此地址到企業微信智慧機器人回調設定中。
type: webhook-url
required: false
default: ""
show_if:
field: enable-webhook
operator: eq
value: true
- name: Secret
label:
en_US: Secret
zh_Hans: 机器人密钥 (Secret)
zh_Hant: 機器人密鑰 (Secret)
description:
en_US: Required for WebSocket long connection mode.
zh_Hans: 使用 WebSocket 长连接模式时必填。
zh_Hant: 使用 WebSocket 長連線模式時必填。
type: string
required: false
default: ""
- name: Corpid
label:
en_US: Corpid
zh_Hans: 企业ID
zh_Hant: 企業ID
description:
en_US: Required for webhook mode.
zh_Hans: 使用 Webhook 模式时必填。
zh_Hant: 使用 Webhook 模式時必填。
type: string
required: false
default: ""
- name: Token
label:
en_US: Token
zh_Hans: 令牌 (Token)
zh_Hant: 令牌 (Token)
description:
en_US: Required for webhook mode.
zh_Hans: 使用 Webhook 模式时必填。
zh_Hant: 使用 Webhook 模式時必填。
type: string
required: false
default: ""
- name: EncodingAESKey
label:
en_US: EncodingAESKey
zh_Hans: 消息加解密密钥 (EncodingAESKey)
zh_Hant: 訊息加解密密鑰 (EncodingAESKey)
description:
en_US: Required for webhook mode. Optional for WebSocket mode when encrypted files need to be decrypted.
zh_Hans: Webhook 模式必填。WebSocket 模式下如需解密文件则填写。
zh_Hant: Webhook 模式必填。WebSocket 模式下如需解密檔案則填寫。
type: string
required: false
default: ""
- name: enable-stream-reply
label:
en_US: Enable Stream Reply
zh_Hans: 启用流式回复
zh_Hant: 啟用串流回覆
description:
en_US: If enabled, the bot will use WeComBot streaming replies.
zh_Hans: 如果启用,机器人将使用企业微信智能机器人流式回复。
zh_Hant: 如果啟用,機器人將使用企業微信智慧機器人串流回覆。
type: boolean
required: false
default: true
supported_events:
- message.received
- feedback.received
- platform.specific
supported_apis:
required:
- send_message
- reply_message
optional:
- get_message
- get_user_info
- get_friend_list
- get_group_info
- get_group_member_info
- get_group_member_list
- call_platform_api
platform_specific_apis:
- action: is_websocket_mode
description: { en_US: "Return whether the adapter is using WebSocket long connection mode", zh_Hans: "返回当前适配器是否使用 WebSocket 长连接模式" }
- action: get_stream_session_status
description: { en_US: "Inspect stream session state for a received message ID", zh_Hans: "按接收消息 ID 查看流式会话状态" }
- action: send_markdown
description: { en_US: "Send markdown text proactively in WebSocket mode", zh_Hans: "在 WebSocket 模式下主动发送 Markdown 文本" }
execution:
python:
path: ./adapter.py
attr: WecomBotAdapter

View File

@@ -0,0 +1,161 @@
from __future__ import annotations
import datetime
from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot_plugin.api.entities.builtin.platform import message as platform_message
class WecomBotMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
async def yiri2target(message_chain: platform_message.MessageChain) -> str:
content_parts: list[str] = []
for msg in message_chain:
if isinstance(msg, platform_message.Source):
continue
if isinstance(msg, platform_message.Plain):
content_parts.append(msg.text)
elif isinstance(msg, platform_message.At):
content_parts.append(f'@{msg.display or msg.target}')
elif isinstance(msg, platform_message.AtAll):
content_parts.append('@all')
elif isinstance(msg, platform_message.Image):
content_parts.append('[Image]')
elif isinstance(msg, platform_message.Voice):
content_parts.append('[Voice]')
elif isinstance(msg, platform_message.File):
content_parts.append(f'[File: {msg.name or msg.file_id or msg.url or "file"}]')
elif isinstance(msg, platform_message.Quote):
if msg.id is not None:
content_parts.append(f'[Quote {msg.id}]')
if msg.origin:
content_parts.append(await WecomBotMessageConverter.yiri2target(msg.origin))
elif isinstance(msg, platform_message.Forward):
for node in msg.node_list:
if node.message_chain:
content_parts.append(await WecomBotMessageConverter.yiri2target(node.message_chain))
else:
content_parts.append(str(msg))
return '\n'.join(part for part in content_parts if part)
@staticmethod
async def target2yiri(event: WecomBotEvent, bot_name: str = '') -> platform_message.MessageChain:
components: list[platform_message.MessageComponent] = [
platform_message.Source(id=event.message_id, time=datetime.datetime.now()),
]
if event.type == 'group' and event.ai_bot_id:
components.append(platform_message.At(target=event.ai_bot_id))
if event.content:
content = event.content
if bot_name:
content = content.replace(f'@{bot_name}', '').strip()
if content:
components.append(platform_message.Plain(text=content))
WecomBotMessageConverter._append_images(components, event.images or ([event.picurl] if event.picurl else []))
WecomBotMessageConverter._append_file(components, event.file)
WecomBotMessageConverter._append_voice(components, event.voice)
WecomBotMessageConverter._append_video(components, event.video)
WecomBotMessageConverter._append_link(components, event.link)
WecomBotMessageConverter._append_quote(components, event.quote)
if not any(not isinstance(component, (platform_message.Source, platform_message.At)) for component in components):
components.append(platform_message.Unknown(text=f'[unsupported wecombot msgtype: {event.msgtype or "unknown"}]'))
return platform_message.MessageChain(components)
@staticmethod
def _append_images(components: list[platform_message.MessageComponent], images: list[str]):
for image_data in images:
if image_data:
components.append(platform_message.Image(base64=image_data))
@staticmethod
def _append_file(components: list[platform_message.MessageComponent], file_info: dict | None):
if not file_info:
return
file_url = file_info.get('download_url') or file_info.get('url') or file_info.get('fileurl') or file_info.get('path')
file_base64 = file_info.get('base64')
file_name = file_info.get('filename') or file_info.get('name')
file_size = file_info.get('filesize') or file_info.get('size')
try:
kwargs = {}
if file_url:
kwargs['url'] = file_url
if file_base64:
kwargs['base64'] = file_base64
if file_name:
kwargs['name'] = file_name
if file_size is not None:
kwargs['size'] = file_size
if kwargs:
components.append(platform_message.File(**kwargs))
except Exception:
components.append(platform_message.Unknown(text='[file message unsupported]'))
@staticmethod
def _append_voice(components: list[platform_message.MessageComponent], voice_info: dict | None):
if not voice_info:
return
voice_payload = voice_info.get('base64') or voice_info.get('url')
if not voice_payload:
return
if voice_info.get('base64') and not voice_payload.startswith('data:'):
voice_payload = f'data:audio/mpeg;base64,{voice_info.get("base64")}'
try:
if voice_payload.startswith('data:'):
components.append(platform_message.Voice(base64=voice_payload))
else:
components.append(platform_message.Voice(url=voice_payload))
except Exception:
components.append(platform_message.Unknown(text='[voice message unsupported]'))
@staticmethod
def _append_video(components: list[platform_message.MessageComponent], video_info: dict | None):
if not video_info:
return
video_payload = (
video_info.get('base64')
or video_info.get('url')
or video_info.get('download_url')
or video_info.get('fileurl')
)
if not video_payload:
return
try:
components.append(
platform_message.File(
url=video_payload,
name=video_info.get('filename') or video_info.get('name') or 'video',
size=video_info.get('filesize') or video_info.get('size'),
)
)
except Exception:
components.append(platform_message.Unknown(text='[video message unsupported]'))
@staticmethod
def _append_link(components: list[platform_message.MessageComponent], link: dict | None, prefix: str = ''):
if not link:
return
summary = '\n'.join(
filter(None, [link.get('title', ''), link.get('description') or link.get('digest', ''), link.get('url', '')])
)
if summary:
components.append(platform_message.Plain(text=f'{prefix}{summary}'))
@staticmethod
def _append_quote(components: list[platform_message.MessageComponent], quote_info: dict | None):
if not quote_info:
return
origin: list[platform_message.MessageComponent] = []
if quote_info.get('content'):
origin.append(platform_message.Plain(text=quote_info.get('content')))
WecomBotMessageConverter._append_images(origin, quote_info.get('images') or ([quote_info.get('picurl')] if quote_info.get('picurl') else []))
WecomBotMessageConverter._append_file(origin, quote_info.get('file'))
WecomBotMessageConverter._append_voice(origin, quote_info.get('voice'))
WecomBotMessageConverter._append_video(origin, quote_info.get('video'))
WecomBotMessageConverter._append_link(origin, quote_info.get('link'))
if origin:
components.append(platform_message.Quote(origin=platform_message.MessageChain(origin)))

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import typing
async def is_websocket_mode(bot, params: dict) -> dict:
return {'websocket': hasattr(bot, 'bot_id') and not hasattr(bot, 'Token')}
async def get_stream_session_status(bot, params: dict) -> dict:
msg_id = str(params.get('message_id') or params.get('msg_id') or '')
if not msg_id:
raise ValueError('message_id is required')
if hasattr(bot, 'stream_sessions'):
stream_id = bot.stream_sessions.get_stream_id_by_msg(msg_id)
session = bot.stream_sessions.get_session(stream_id) if stream_id else None
return {'stream_id': stream_id, 'active': session is not None}
stream_key = getattr(bot, '_stream_ids', {}).get(msg_id)
if not stream_key:
return {'stream_id': None, 'active': False}
_req_id, _sep, stream_id = stream_key.partition('|')
return {'stream_id': stream_id, 'active': True}
async def send_markdown(bot, params: dict) -> dict:
chat_id = params.get('chat_id') or params.get('chatid') or params.get('target_id')
content = params.get('content')
if not chat_id:
raise ValueError('chat_id is required')
if not content:
raise ValueError('content is required')
if not hasattr(bot, 'send_message'):
raise ValueError('send_markdown is only available in WebSocket mode')
result = await bot.send_message(str(chat_id), str(content), msgtype='markdown')
return {'ok': True, 'raw': result}
PLATFORM_API_MAP: dict[str, typing.Callable[[typing.Any, dict], typing.Awaitable[dict]]] = {
'is_websocket_mode': is_websocket_mode,
'get_stream_session_status': get_stream_session_status,
'send_markdown': send_markdown,
}

View File

@@ -0,0 +1,3 @@
from __future__ import annotations
ADAPTER_NAME = 'wecombot-eba'

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

View File

@@ -0,0 +1,214 @@
from __future__ import annotations
import argparse
import asyncio
import json
import os
from pathlib import Path
from typing import Any
from quart import Quart, request
from langbot.pkg.platform.adapters.wecom.adapter import WecomAdapter
from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
TINY_PNG = (
'data:image/png;base64,'
'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII='
)
class ProbeLogger(AbstractEventLogger):
async def info(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[info] {text}')
async def debug(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[debug] {text}')
async def warning(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[warning] {text}')
async def error(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[error] {text}')
def redact(value: Any) -> Any:
if isinstance(value, dict):
redacted = {}
for key, item in value.items():
if key.lower() in {'secret', 'token', 'encodingaeskey', 'access_token'}:
redacted[key] = '<redacted>'
else:
redacted[key] = redact(item)
return redacted
if isinstance(value, list):
return [redact(item) for item in value]
return value
def summarize_event(event: platform_events.EBAEvent) -> dict:
data = {
'type': event.type,
'adapter_name': event.adapter_name,
'timestamp': event.timestamp,
}
for field in ('message_id', 'chat_id', 'chat_type', 'action', 'data'):
if hasattr(event, field):
value = getattr(event, field)
if hasattr(value, 'value'):
value = value.value
data[field] = redact(value)
if hasattr(event, 'sender') and event.sender is not None:
data['sender'] = event.sender.model_dump()
if hasattr(event, 'message_chain') and event.message_chain is not None:
data['message_chain'] = event.message_chain.model_dump()
return data
def record_api(results: list[dict[str, Any]], name: str, ok: bool, result: Any = None, error: Exception | None = None):
entry = {'name': name, 'ok': ok}
if result is not None:
entry['result'] = redact(result)
if error is not None:
entry['error'] = repr(error)
results.append(entry)
print('WECOM_EBA_API', json.dumps(entry, ensure_ascii=False, default=str))
async def run_api(results: list[dict[str, Any]], name: str, func):
try:
result = await func()
record_api(results, name, True, result)
return result
except Exception as exc:
record_api(results, name, False, error=exc)
return None
def config_from_env() -> dict:
required = {
'corpid': os.getenv('WECOM_CORPID', ''),
'secret': os.getenv('WECOM_SECRET', ''),
'token': os.getenv('WECOM_TOKEN', ''),
'EncodingAESKey': os.getenv('WECOM_ENCODING_AES_KEY', ''),
}
missing = [key for key, value in required.items() if not value]
if missing:
raise RuntimeError(f'Missing required WeCom env vars for fields: {missing}')
return {
**required,
'contacts_secret': os.getenv('WECOM_CONTACTS_SECRET', ''),
'api_base_url': os.getenv('WECOM_API_BASE_URL', 'https://qyapi.weixin.qq.com/cgi-bin'),
}
async def run_probe(args: argparse.Namespace):
adapter = WecomAdapter(config_from_env(), ProbeLogger())
events: list[platform_events.EBAEvent] = []
api_results: list[dict[str, Any]] = []
first_message = asyncio.Event()
log_path = Path(args.log)
log_path.parent.mkdir(parents=True, exist_ok=True)
async def listener(event, adapter):
events.append(event)
with log_path.open('a', encoding='utf-8') as fp:
fp.write(json.dumps(summarize_event(event), ensure_ascii=False, default=str) + '\n')
print('WECOM_EBA_EVENT', json.dumps(summarize_event(event), ensure_ascii=False, default=str))
if isinstance(event, platform_events.MessageReceivedEvent):
first_message.set()
adapter.register_listener(platform_events.EBAEvent, listener)
app = Quart(__name__)
@app.route(args.path, methods=['GET', 'POST'])
async def callback():
return await adapter.handle_unified_webhook(args.bot_uuid, '', request)
server_task = asyncio.create_task(app.run_task(host=args.host, port=args.port))
try:
print(f'READY: configure WeCom callback URL to http://{args.host}:{args.port}{args.path}')
print('READY: send a real WeCom application message to the bot now.')
await asyncio.wait_for(first_message.wait(), timeout=args.timeout)
source = next(event for event in events if isinstance(event, platform_events.MessageReceivedEvent))
raw_event = source.source_platform_object
target_id = f'{source.chat_id}|{raw_event.agent_id}'
if not args.skip_api:
await run_api(
api_results,
'reply_message:text',
lambda: adapter.reply_message(
source,
platform_message.MessageChain([platform_message.Plain(text='WeCom EBA probe reply')]),
),
)
await run_api(
api_results,
'send_message:text',
lambda: adapter.send_message(
'person',
target_id,
platform_message.MessageChain([platform_message.Plain(text='WeCom EBA probe send')]),
),
)
await run_api(
api_results,
'send_message:image',
lambda: adapter.send_message(
'person',
target_id,
platform_message.MessageChain(
[
platform_message.Plain(text='WeCom EBA probe image'),
platform_message.Image(base64=TINY_PNG),
]
),
),
)
await run_api(
api_results,
'get_message',
lambda: adapter.get_message('private', source.chat_id, source.message_id),
)
await run_api(api_results, 'get_user_info', lambda: adapter.get_user_info(source.sender.id))
await run_api(api_results, 'get_friend_list', lambda: adapter.get_friend_list())
await run_api(api_results, 'call_platform_api:check_access_token', lambda: adapter.call_platform_api('check_access_token', {}))
await run_api(
api_results,
'call_platform_api:get_user_info',
lambda: adapter.call_platform_api('get_user_info', {'user_id': source.sender.id}),
)
summary = {
'events': [event.type for event in events],
'api_results': api_results,
'log_path': str(log_path),
}
print('WECOM_EBA_SUMMARY', json.dumps(summary, ensure_ascii=False, default=str))
return summary
finally:
server_task.cancel()
await adapter.kill()
def main():
parser = argparse.ArgumentParser(description='Live WeCom EBA adapter probe.')
parser.add_argument('--host', default='0.0.0.0')
parser.add_argument('--port', type=int, default=5312)
parser.add_argument('--path', default='/wecom/callback')
parser.add_argument('--timeout', type=int, default=180)
parser.add_argument('--bot-uuid', default='wecom-eba-live-probe')
parser.add_argument('--log', default='data/temp/wecom_eba_live_probe.jsonl')
parser.add_argument('--skip-api', action='store_true')
args = parser.parse_args()
asyncio.run(run_probe(args))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,203 @@
from __future__ import annotations
import argparse
import asyncio
import json
import os
from pathlib import Path
from typing import Any
from quart import Quart, request
from langbot.pkg.platform.adapters.wecombot.adapter import WecomBotAdapter
from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
class ProbeLogger(AbstractEventLogger):
async def info(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[info] {text}')
async def debug(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[debug] {text}')
async def warning(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[warning] {text}')
async def error(self, text, images=None, message_session_id=None, no_throw=True):
print(f'[error] {text}')
def redact(value: Any) -> Any:
if isinstance(value, dict):
return {
key: '<redacted>' if key.lower() in {'secret', 'token', 'encodingaeskey', 'encrypt', 'aeskey'} else redact(item)
for key, item in value.items()
}
if isinstance(value, list):
return [redact(item) for item in value]
return value
def summarize_event(event: platform_events.EBAEvent) -> dict:
data = {
'type': event.type,
'adapter_name': event.adapter_name,
'timestamp': event.timestamp,
}
for field in ('message_id', 'chat_id', 'chat_type', 'action', 'data', 'feedback_id', 'feedback_type'):
if hasattr(event, field):
value = getattr(event, field)
if hasattr(value, 'value'):
value = value.value
data[field] = redact(value)
if hasattr(event, 'sender') and event.sender is not None:
data['sender'] = event.sender.model_dump()
if hasattr(event, 'group') and event.group is not None:
data['group'] = event.group.model_dump()
if hasattr(event, 'message_chain') and event.message_chain is not None:
data['message_chain'] = event.message_chain.model_dump()
return data
def record_api(results: list[dict[str, Any]], name: str, ok: bool, result: Any = None, error: Exception | None = None):
entry = {'name': name, 'ok': ok}
if result is not None:
entry['result'] = redact(result)
if error is not None:
entry['error'] = repr(error)
results.append(entry)
print('WECOMBOT_EBA_API', json.dumps(entry, ensure_ascii=False, default=str))
async def run_api(results: list[dict[str, Any]], name: str, func):
try:
result = await func()
record_api(results, name, True, result)
return result
except Exception as exc:
record_api(results, name, False, error=exc)
return None
def config_from_env(enable_webhook: bool) -> dict:
config = {
'BotId': os.getenv('WECOMBOT_BOT_ID', ''),
'robot_name': os.getenv('WECOMBOT_ROBOT_NAME', ''),
'enable-webhook': enable_webhook,
'Secret': os.getenv('WECOMBOT_SECRET', ''),
'Token': os.getenv('WECOMBOT_TOKEN', ''),
'EncodingAESKey': os.getenv('WECOMBOT_ENCODING_AES_KEY', ''),
'Corpid': os.getenv('WECOMBOT_CORPID', ''),
'enable-stream-reply': os.getenv('WECOMBOT_ENABLE_STREAM_REPLY', '1') != '0',
}
required = ['BotId', 'Secret'] if not enable_webhook else ['Token', 'EncodingAESKey', 'Corpid']
missing = [key for key in required if not config.get(key)]
if missing:
raise RuntimeError(f'Missing required WeComBot env vars for fields: {missing}')
return config
async def run_probe(args: argparse.Namespace):
adapter = WecomBotAdapter(config_from_env(args.webhook), ProbeLogger())
events: list[platform_events.EBAEvent] = []
api_results: list[dict[str, Any]] = []
first_message = asyncio.Event()
log_path = Path(args.log)
log_path.parent.mkdir(parents=True, exist_ok=True)
async def listener(event, adapter):
events.append(event)
with log_path.open('a', encoding='utf-8') as fp:
fp.write(json.dumps(summarize_event(event), ensure_ascii=False, default=str) + '\n')
print('WECOMBOT_EBA_EVENT', json.dumps(summarize_event(event), ensure_ascii=False, default=str))
if isinstance(event, platform_events.MessageReceivedEvent):
first_message.set()
adapter.register_listener(platform_events.EBAEvent, listener)
run_task = None
server_task = None
if args.webhook:
app = Quart(__name__)
@app.route(args.path, methods=['GET', 'POST'])
async def callback():
return await adapter.handle_unified_webhook(args.bot_uuid, '', request)
server_task = asyncio.create_task(app.run_task(host=args.host, port=args.port))
print(f'READY: configure WeComBot callback URL to http://{args.host}:{args.port}{args.path}')
else:
run_task = asyncio.create_task(adapter.run_async())
print('READY: WeComBot WebSocket long connection started; no webhook URL is required.')
try:
print('READY: send a real WeComBot message to the bot now.')
await asyncio.wait_for(first_message.wait(), timeout=args.timeout)
source = next(event for event in events if isinstance(event, platform_events.MessageReceivedEvent))
if not args.skip_api:
await run_api(
api_results,
'reply_message:text',
lambda: adapter.reply_message(
source,
platform_message.MessageChain([platform_message.Plain(text='WeComBot EBA probe reply')]),
),
)
if not args.webhook:
await run_api(
api_results,
'send_message:text',
lambda: adapter.send_message(
'group' if source.chat_type.value == 'group' else 'person',
source.chat_id,
platform_message.MessageChain([platform_message.Plain(text='WeComBot EBA probe send')]),
),
)
await run_api(api_results, 'get_message', lambda: adapter.get_message(source.chat_type.value, source.chat_id, source.message_id))
await run_api(api_results, 'get_user_info', lambda: adapter.get_user_info(source.sender.id))
if source.group:
await run_api(api_results, 'get_group_info', lambda: adapter.get_group_info(source.group.id))
await run_api(api_results, 'get_group_member_list', lambda: adapter.get_group_member_list(source.group.id))
await run_api(api_results, 'call_platform_api:is_websocket_mode', lambda: adapter.call_platform_api('is_websocket_mode', {}))
await run_api(
api_results,
'call_platform_api:get_stream_session_status',
lambda: adapter.call_platform_api('get_stream_session_status', {'message_id': source.message_id}),
)
summary = {
'events': [event.type for event in events],
'api_results': api_results,
'log_path': str(log_path),
'mode': 'webhook' if args.webhook else 'websocket',
}
print('WECOMBOT_EBA_SUMMARY', json.dumps(summary, ensure_ascii=False, default=str))
return summary
finally:
if server_task:
server_task.cancel()
if run_task:
run_task.cancel()
await adapter.kill()
def main():
parser = argparse.ArgumentParser(description='Live WeComBot EBA adapter probe.')
parser.add_argument('--webhook', action='store_true', help='Use webhook mode. Default is WebSocket long connection mode.')
parser.add_argument('--host', default='0.0.0.0')
parser.add_argument('--port', type=int, default=5313)
parser.add_argument('--path', default='/wecombot/callback')
parser.add_argument('--timeout', type=int, default=180)
parser.add_argument('--bot-uuid', default='wecombot-eba-live-probe')
parser.add_argument('--log', default='data/temp/wecombot_eba_live_probe.jsonl')
parser.add_argument('--skip-api', action='store_true')
args = parser.parse_args()
asyncio.run(run_probe(args))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
from langbot_plugin.api.entities.builtin.platform import message as platform_message
def _conversation():
prompt = Mock()
prompt.messages = []
prompt.copy = Mock(return_value=Mock(messages=[]))
return SimpleNamespace(
uuid='conversation-uuid',
create_time=datetime.now(),
update_time=datetime.now(),
prompt=prompt,
messages=[],
)
def _prompt_preprocessing_context():
ctx = Mock()
ctx.event.default_prompt = []
ctx.event.prompt = []
return ctx
@pytest.mark.asyncio
async def test_preprocessor_keeps_image_placeholder_for_text_only_local_agent(mock_app, sample_query):
model = Mock()
model.model_entity.uuid = 'text-only-model'
model.model_entity.abilities = []
mock_app.model_mgr.get_model_by_uuid = AsyncMock(return_value=model)
mock_app.sess_mgr.get_session = AsyncMock(
return_value=SimpleNamespace(launcher_type=sample_query.launcher_type, launcher_id=sample_query.launcher_id)
)
mock_app.sess_mgr.get_conversation = AsyncMock(return_value=_conversation())
mock_app.plugin_connector.emit_event = AsyncMock(return_value=_prompt_preprocessing_context())
sample_query.pipeline_config = {
'ai': {
'runner': {'runner': 'local-agent'},
'local-agent': {'model': {'primary': 'text-only-model', 'fallbacks': []}, 'prompt': []},
},
'trigger': {'misc': {'combine-quote-message': False}},
'output': {'misc': {'exception-handling': 'show-hint'}},
}
sample_query.message_chain = platform_message.MessageChain(
[platform_message.Image(base64='data:image/png;base64,AAAA')]
)
sample_query.messages = []
sample_query.variables = {}
from importlib import import_module
import_module('langbot.pkg.pipeline.pipelinemgr')
preproc_module = import_module('langbot.pkg.pipeline.preproc.preproc')
result = await preproc_module.PreProcessor(mock_app).process(sample_query, 'PreProcessor')
content = result.new_query.user_message.content
assert len(content) == 1
assert content[0].type == 'text'
assert content[0].text == '[Image]'
assert result.new_query.variables['user_message_text'] == '[Image]'

View File

@@ -0,0 +1,235 @@
from __future__ import annotations
import pathlib
from unittest.mock import AsyncMock, patch
import pytest
import yaml
from langbot.libs.wecom_api.api import WecomClient
from langbot.libs.wecom_api.wecomevent import WecomEvent
from langbot.pkg.platform.adapters.wecom.adapter import WecomAdapter
from langbot.pkg.platform.adapters.wecom.event_converter import WecomEventConverter
from langbot.pkg.platform.adapters.wecom.message_converter import WecomMessageConverter, split_string_by_bytes
from langbot.pkg.platform.adapters.wecom.platform_api import PLATFORM_API_MAP
from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
class DummyLogger(AbstractEventLogger):
async def info(self, text, images=None, message_session_id=None, no_throw=True):
pass
async def debug(self, text, images=None, message_session_id=None, no_throw=True):
pass
async def warning(self, text, images=None, message_session_id=None, no_throw=True):
pass
async def error(self, text, images=None, message_session_id=None, no_throw=True):
pass
class DummyWecomClient(WecomClient):
def __init__(self, *args, **kwargs):
self.corpid = kwargs['corpid']
self.secret = kwargs['secret']
self.token = kwargs['token']
self.aes = kwargs['EncodingAESKey']
self.secret_for_contacts = kwargs.get('contacts_secret', '')
self.base_url = kwargs.get('api_base_url', 'https://qyapi.weixin.qq.com/cgi-bin')
self.logger = kwargs.get('logger')
self.access_token = ''
self._message_handlers = {}
self.get_media_id = AsyncMock(return_value='media-id')
self.send_private_msg = AsyncMock()
self.send_image = AsyncMock()
self.send_voice = AsyncMock()
self.send_file = AsyncMock()
self.get_user_info = AsyncMock(return_value={'userid': 'user-1', 'name': 'Alice', 'alias': 'alice'})
self.check_access_token = AsyncMock(return_value=True)
self.get_access_token = AsyncMock(return_value='access-token')
self.send_to_all = AsyncMock()
self.handle_unified_webhook = AsyncMock(return_value='success')
def on_message(self, msg_type: str):
def decorator(func):
self._message_handlers.setdefault(msg_type, []).append(func)
return func
return decorator
def manifest() -> dict:
manifest_path = (
pathlib.Path(__file__).parents[3]
/ 'src'
/ 'langbot'
/ 'pkg'
/ 'platform'
/ 'adapters'
/ 'wecom'
/ 'manifest.yaml'
)
return yaml.safe_load(manifest_path.read_text())
def make_adapter() -> WecomAdapter:
config = {
'corpid': 'corp-id',
'secret': 'secret',
'token': 'token',
'EncodingAESKey': 'encoding-key',
'contacts_secret': 'contacts-secret',
'api_base_url': 'https://qyapi.weixin.qq.com/cgi-bin',
}
with patch('langbot.pkg.platform.adapters.wecom.adapter.WecomClient', DummyWecomClient):
return WecomAdapter(config, DummyLogger())
def wecom_event(**overrides) -> WecomEvent:
payload = {
'ToUserName': overrides.get('to_user', 'corp-id'),
'FromUserName': overrides.get('from_user', 'user-1'),
'CreateTime': overrides.get('create_time', 1_714_000_000),
'MsgType': overrides.get('msg_type', 'text'),
'Content': overrides.get('content', 'hello'),
'MsgId': overrides.get('message_id', 12345),
'AgentID': overrides.get('agent_id', 1000002),
}
if payload['MsgType'] == 'image':
payload['MediaId'] = overrides.get('media_id', 'media-id')
payload['PicUrl'] = overrides.get('picurl', 'https://example.test/a.png')
return WecomEvent.from_payload(payload)
def test_wecom_supported_events_match_manifest():
assert make_adapter().get_supported_events() == manifest()['spec']['supported_events']
def test_wecom_supported_apis_match_manifest():
supported_apis = make_adapter().get_supported_apis()
manifest_apis = manifest()['spec']['supported_apis']
assert supported_apis == manifest_apis['required'] + manifest_apis['optional']
def test_wecom_platform_api_map_matches_manifest():
manifest_actions = {item['action'] for item in manifest()['spec']['platform_specific_apis']}
assert set(PLATFORM_API_MAP) == manifest_actions
def test_wecom_split_string_by_bytes_keeps_multibyte_boundaries():
parts = split_string_by_bytes('你好hello', limit=7)
assert ''.join(parts) == '你好hello'
assert all(len(part.encode('utf-8')) <= 7 for part in parts)
@pytest.mark.asyncio
async def test_wecom_message_converter_maps_outbound_components():
adapter = make_adapter()
content = await WecomMessageConverter.yiri2target(
platform_message.MessageChain(
[
platform_message.Plain(text='hi'),
platform_message.Image(base64='data:image/png;base64,AAAA'),
platform_message.Voice(base64='data:audio/mp3;base64,BBBB'),
platform_message.File(name='doc.txt', base64='Q0NDQw=='),
platform_message.Quote(
id='origin',
origin=platform_message.MessageChain([platform_message.Plain(text='quoted')]),
),
]
),
adapter.bot,
)
assert content[0] == {'type': 'text', 'content': 'hi'}
assert {'type': 'image', 'media_id': 'media-id'} in content
assert {'type': 'voice', 'media_id': 'media-id'} in content
assert {'type': 'file', 'media_id': 'media-id'} in content
assert {'type': 'text', 'content': '[Quote origin] '} in content
assert {'type': 'text', 'content': 'quoted'} in content
@pytest.mark.asyncio
async def test_wecom_event_converter_maps_text_message_to_eba_and_legacy():
adapter = make_adapter()
event = await WecomEventConverter.target2yiri(wecom_event(), adapter.bot)
assert isinstance(event, platform_events.MessageReceivedEvent)
assert event.adapter_name == 'wecom-eba'
assert event.chat_type == platform_entities.ChatType.PRIVATE
assert event.chat_id == 'user-1|1000002'
assert event.sender.nickname == 'Alice'
assert str(event.message_chain) == 'hello'
legacy = await WecomEventConverter.target2legacy(wecom_event(), adapter.bot)
assert isinstance(legacy, platform_events.FriendMessage)
assert legacy.sender.id == 'user-1'
assert str(legacy.message_chain) == 'hello'
@pytest.mark.asyncio
async def test_wecom_event_converter_maps_image_message_to_eba():
adapter = make_adapter()
with patch(
'langbot.pkg.platform.adapters.wecom.message_converter.image.get_wecom_image_base64',
AsyncMock(return_value=('AAAA', 'png')),
):
event = await WecomEventConverter.target2yiri(
wecom_event(msg_type='image', content=None, picurl='https://example.test/a.png'),
adapter.bot,
)
assert isinstance(event, platform_events.MessageReceivedEvent)
assert event.adapter_name == 'wecom-eba'
assert event.message_id == 12345
assert isinstance(event.message_chain[1], platform_message.Image)
assert event.message_chain[1].base64 == 'data:image/png;base64,AAAA'
@pytest.mark.asyncio
async def test_wecom_adapter_dispatches_and_caches_message_event():
adapter = make_adapter()
calls: list[platform_events.Event] = []
async def listener(event, adapter):
calls.append(event)
adapter.register_listener(platform_events.MessageReceivedEvent, listener)
await adapter._handle_native_event(wecom_event())
assert len(calls) == 1
received = calls[0]
assert isinstance(received, platform_events.MessageReceivedEvent)
assert adapter.bot_account_id == 'corp-id'
assert received.chat_id == 'user-1|1000002'
assert await adapter.get_message('private', 'user-1|1000002', 12345) == received
assert (await adapter.get_user_info('user-1')).nickname == 'Alice'
@pytest.mark.asyncio
async def test_wecom_send_reply_and_platform_api_use_underlying_client():
adapter = make_adapter()
message = platform_message.MessageChain([platform_message.Plain(text='hello')])
await adapter.send_message('person', 'user-1|1000002', message)
adapter.bot.send_private_msg.assert_awaited_once_with('user-1', 1000002, 'hello')
source_event = await WecomEventConverter.target2yiri(wecom_event(), adapter.bot)
await adapter.reply_message(source_event, message)
assert adapter.bot.send_private_msg.await_count == 2
token_status = await adapter.call_platform_api('check_access_token', {})
user_info = await adapter.call_platform_api('get_user_info', {'user_id': 'user-1'})
sent_all = await adapter.call_platform_api('send_to_all', {'content': 'notice', 'agent_id': 1000002})
assert token_status == {'valid': True}
assert user_info['name'] == 'Alice'
assert sent_all == {'ok': True}

View File

@@ -0,0 +1,274 @@
from __future__ import annotations
import pathlib
from unittest.mock import AsyncMock, patch
import pytest
import yaml
from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent
from langbot.pkg.platform.adapters.wecombot.adapter import WecomBotAdapter
from langbot.pkg.platform.adapters.wecombot.event_converter import WecomBotEventConverter
from langbot.pkg.platform.adapters.wecombot.message_converter import WecomBotMessageConverter
from langbot.pkg.platform.adapters.wecombot.platform_api import PLATFORM_API_MAP
from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class DummyLogger(AbstractEventLogger):
async def info(self, text, images=None, message_session_id=None, no_throw=True):
pass
async def debug(self, text, images=None, message_session_id=None, no_throw=True):
pass
async def warning(self, text, images=None, message_session_id=None, no_throw=True):
pass
async def error(self, text, images=None, message_session_id=None, no_throw=True):
pass
class DummyWecomBotWsClient:
def __init__(self, *args, **kwargs):
self.bot_id = kwargs['bot_id']
self.secret = kwargs['secret']
self.encoding_aes_key = kwargs.get('encoding_aes_key', '')
self._message_handlers = {}
self.connect = AsyncMock()
self.disconnect = AsyncMock()
self.send_message = AsyncMock(return_value={'ok': True})
self.reply_text = AsyncMock(return_value={'reply': True})
self.push_stream_chunk = AsyncMock(return_value=True)
self.set_message = AsyncMock(return_value={'set': True})
def on_message(self, msg_type: str):
def decorator(func):
self._message_handlers.setdefault(msg_type, []).append(func)
return func
return decorator
def on_feedback(self):
def decorator(func):
self._message_handlers.setdefault('feedback', []).append(func)
return func
return decorator
class DummyWecomBotClient(DummyWecomBotWsClient):
def __init__(self, *args, **kwargs):
self.Token = kwargs['Token']
self.EnCodingAESKey = kwargs['EnCodingAESKey']
self.Corpid = kwargs['Corpid']
self._message_handlers = {}
self.handle_unified_webhook = AsyncMock(return_value='success')
self.push_stream_chunk = AsyncMock(return_value=True)
self.set_message = AsyncMock(return_value={'set': True})
def manifest() -> dict:
manifest_path = (
pathlib.Path(__file__).parents[3]
/ 'src'
/ 'langbot'
/ 'pkg'
/ 'platform'
/ 'adapters'
/ 'wecombot'
/ 'manifest.yaml'
)
return yaml.safe_load(manifest_path.read_text())
def make_adapter(enable_webhook: bool = False) -> WecomBotAdapter:
config = {
'BotId': 'bot-id',
'robot_name': 'EBA Bot',
'enable-webhook': enable_webhook,
'Secret': 'secret',
'Token': 'token',
'EncodingAESKey': 'encoding-key',
'Corpid': 'corp-id',
'enable-stream-reply': True,
}
with (
patch('langbot.pkg.platform.adapters.wecombot.adapter.WecomBotWsClient', DummyWecomBotWsClient),
patch('langbot.pkg.platform.adapters.wecombot.adapter.WecomBotClient', DummyWecomBotClient),
):
return WecomBotAdapter(config, DummyLogger())
def wecombot_event(**overrides) -> WecomBotEvent:
event_type = overrides.get('type', 'single')
payload = {
'type': event_type,
'msgtype': overrides.get('msgtype', 'text'),
'msgid': overrides.get('message_id', 'msg-1'),
'userid': overrides.get('userid', 'user-1'),
'username': overrides.get('username', 'Alice'),
'content': overrides.get('content', 'hello'),
'aibotid': overrides.get('aibotid', 'bot-id'),
'req_id': overrides.get('req_id', 'req-1'),
'stream_id': overrides.get('stream_id', 'stream-1'),
}
if event_type == 'group':
payload.update({'chatid': overrides.get('chatid', 'group-1'), 'chatname': overrides.get('chatname', 'Group')})
if payload['msgtype'] == 'image':
payload['images'] = overrides.get('images', ['data:image/png;base64,AAAA'])
payload['content'] = overrides.get('content', '')
if payload['msgtype'] == 'file':
payload['file'] = overrides.get('file', {'download_url': 'https://example.test/a.txt', 'filename': 'a.txt'})
payload['content'] = overrides.get('content', '')
if payload['msgtype'] == 'voice':
payload['voice'] = overrides.get('voice', {'base64': 'BBBB'})
payload['content'] = overrides.get('content', '')
if 'quote' in overrides:
payload['quote'] = overrides['quote']
return WecomBotEvent(payload)
def test_wecombot_supported_events_match_manifest():
assert make_adapter().get_supported_events() == manifest()['spec']['supported_events']
def test_wecombot_supported_apis_match_manifest():
supported_apis = make_adapter().get_supported_apis()
manifest_apis = manifest()['spec']['supported_apis']
assert supported_apis == manifest_apis['required'] + manifest_apis['optional']
def test_wecombot_platform_api_map_matches_manifest():
manifest_actions = {item['action'] for item in manifest()['spec']['platform_specific_apis']}
assert set(PLATFORM_API_MAP) == manifest_actions
@pytest.mark.asyncio
async def test_wecombot_message_converter_maps_outbound_components_to_markdown_text():
content = await WecomBotMessageConverter.yiri2target(
platform_message.MessageChain(
[
platform_message.Plain(text='hi'),
platform_message.At(target='user-1', display='Alice'),
platform_message.Image(base64='data:image/png;base64,AAAA'),
platform_message.File(name='a.txt', url='https://example.test/a.txt'),
platform_message.Quote(
id='origin',
origin=platform_message.MessageChain([platform_message.Plain(text='quoted')]),
),
]
)
)
assert 'hi' in content
assert '@Alice' in content
assert '[Image]' in content
assert '[File: a.txt]' in content
assert '[Quote origin]' in content
assert 'quoted' in content
@pytest.mark.asyncio
async def test_wecombot_event_converter_maps_private_and_group_messages_to_eba():
private_event = await WecomBotEventConverter(bot_name='EBA Bot').target2yiri(
wecombot_event(content='@EBA Bot hello')
)
group_event = await WecomBotEventConverter(bot_name='EBA Bot').target2yiri(
wecombot_event(type='group', content='@EBA Bot group hello')
)
assert isinstance(private_event, platform_events.MessageReceivedEvent)
assert private_event.adapter_name == 'wecombot-eba'
assert private_event.chat_type == platform_entities.ChatType.PRIVATE
assert private_event.chat_id == 'user-1'
assert str(private_event.message_chain) == 'hello'
assert isinstance(group_event, platform_events.MessageReceivedEvent)
assert group_event.chat_type == platform_entities.ChatType.GROUP
assert group_event.chat_id == 'group-1'
assert group_event.group.name == 'Group'
assert isinstance(group_event.message_chain[1], platform_message.At)
@pytest.mark.asyncio
async def test_wecombot_event_converter_maps_media_and_quote_components():
event = await WecomBotEventConverter().target2yiri(
wecombot_event(
msgtype='image',
quote={
'content': 'quoted',
'file': {'download_url': 'https://example.test/q.txt', 'filename': 'q.txt'},
},
)
)
assert isinstance(event, platform_events.MessageReceivedEvent)
assert any(isinstance(component, platform_message.Image) for component in event.message_chain)
quote = next(component for component in event.message_chain if isinstance(component, platform_message.Quote))
assert any(isinstance(component, platform_message.File) for component in quote.origin)
@pytest.mark.asyncio
async def test_wecombot_adapter_dispatches_eba_and_legacy_and_caches_message_event():
adapter = make_adapter()
eba_calls: list[platform_events.Event] = []
legacy_calls: list[platform_events.Event] = []
async def eba_listener(event, adapter):
eba_calls.append(event)
async def legacy_listener(event, adapter):
legacy_calls.append(event)
adapter.register_listener(platform_events.MessageReceivedEvent, eba_listener)
adapter.register_listener(platform_events.FriendMessage, legacy_listener)
await adapter._handle_native_event(wecombot_event())
assert len(eba_calls) == 1
assert len(legacy_calls) == 1
received = eba_calls[0]
assert isinstance(received, platform_events.MessageReceivedEvent)
assert await adapter.get_message('private', 'user-1', 'msg-1') == received
assert (await adapter.get_user_info('user-1')).nickname == 'Alice'
@pytest.mark.asyncio
async def test_wecombot_send_reply_feedback_and_platform_api_use_underlying_client():
adapter = make_adapter()
message = platform_message.MessageChain([platform_message.Plain(text='hello')])
await adapter.send_message('person', 'user-1', message)
adapter.bot.send_message.assert_awaited_once_with('user-1', 'hello')
source_event = await WecomBotEventConverter().target2yiri(wecombot_event())
await adapter.reply_message(source_event, message)
adapter.bot.reply_text.assert_awaited_once_with('req-1', 'hello')
await adapter.reply_message_chunk(source_event, None, message, is_final=True)
adapter.bot.push_stream_chunk.assert_awaited_once_with('msg-1', 'hello', is_final=True)
platform_status = await adapter.call_platform_api('is_websocket_mode', {})
assert platform_status == {'websocket': True}
feedback_calls: list[platform_events.Event] = []
async def feedback_listener(event, adapter):
feedback_calls.append(event)
adapter.register_listener(platform_events.FeedbackReceivedEvent, feedback_listener)
await adapter._handle_feedback(feedback_id='fb-1', feedback_type=1, inaccurate_reasons=[1, 2], session=None)
assert isinstance(feedback_calls[0], platform_events.FeedbackReceivedEvent)
assert feedback_calls[0].inaccurate_reasons == ['1', '2']
@pytest.mark.asyncio
async def test_wecombot_webhook_mode_rejects_proactive_send():
adapter = make_adapter(enable_webhook=True)
with pytest.raises(NotSupportedError):
await adapter.send_message('person', 'user-1', platform_message.MessageChain([platform_message.Plain(text='hi')]))