From f10ea82418e36c7dccba0b6bf4e249b19aa54f1c Mon Sep 17 00:00:00 2001 From: fdc310 <2213070223@qq.com> Date: Tue, 16 Jun 2026 00:07:11 +0800 Subject: [PATCH] feat(wecom): implement Dify human input pause handling with button interaction support --- src/langbot/libs/wecom_ai_bot_api/api.py | 218 ++++++++++++ .../libs/wecom_ai_bot_api/ws_client.py | 144 +++++++- src/langbot/pkg/platform/sources/wecombot.py | 318 +++++++++++++++++- 3 files changed, 675 insertions(+), 5 deletions(-) diff --git a/src/langbot/libs/wecom_ai_bot_api/api.py b/src/langbot/libs/wecom_ai_bot_api/api.py index b6f45cf2..1ed006da 100644 --- a/src/langbot/libs/wecom_ai_bot_api/api.py +++ b/src/langbot/libs/wecom_ai_bot_api/api.py @@ -67,6 +67,16 @@ class StreamSession: # 反馈 ID,用于接收用户点赞/点踩反馈 feedback_id: Optional[str] = None + # Dify 人工输入暂停态:runner 把 _form_data 传过来时填充。 + # 一旦设置,下次企微 followup 请求时返回 button_interaction 模板卡 + # 替代 stream chunk。点击按钮会回调 template_card_event,EventKey + # 就是 Dify 的 action_id。 + pending_form: Optional[dict] = None + + # template_card task_id(企微要求 button_interaction 必填且不可重复)。 + # 创建 pending_form 时生成;按钮点击回调里用来反查 session。 + pending_form_task_id: Optional[str] = None + class StreamSessionManager: """管理 stream 会话的生命周期,并负责队列的生产消费。""" @@ -83,6 +93,9 @@ class StreamSessionManager: self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射 self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话 self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射 + # task_id (button_interaction template_card 的) -> stream_id 映射, + # 用于按钮点击回调里反查 pending_form。 + self._task_index: dict[str, str] = {} def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]: if not msg_id: @@ -118,6 +131,40 @@ class StreamSessionManager: if feedback_id and stream_id: self._feedback_index[feedback_id] = stream_id + def set_pending_form(self, stream_id: str, form_data: dict, task_id: str) -> None: + """把 Dify 人工输入暂停态绑定到 stream session。 + + 下一次企微 followup 请求时,adapter 检测到 pending_form, + 返回 button_interaction 模板卡而不是 stream chunk。 + """ + session = self._sessions.get(stream_id) + if not session: + return + session.pending_form = form_data + session.pending_form_task_id = task_id + if task_id: + self._task_index[task_id] = stream_id + + def get_session_by_task_id(self, task_id: str) -> Optional[StreamSession]: + """按按钮点击回调里的 TaskId 反查 session。""" + if not task_id: + return None + stream_id = self._task_index.get(task_id) + if not stream_id: + return None + return self._sessions.get(stream_id) + + def clear_pending_form(self, stream_id: str) -> None: + """按钮点击消费完后清掉 pending_form,避免重复弹卡。""" + session = self._sessions.get(stream_id) + if not session: + return + task_id = session.pending_form_task_id + session.pending_form = None + session.pending_form_task_id = None + if task_id: + self._task_index.pop(task_id, None) + def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]: """根据企业微信回调创建或获取会话。 @@ -723,6 +770,79 @@ async def parse_wecom_bot_message( return message_data +def build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]: + """Build a `template_card` (button_interaction) WeCom payload. + + Shared by both the webhook-mode client (returns the payload as the + response to a stream-followup callback) and the ws_client (sends it + as a reply frame). Output shape is `{"msgtype": "template_card", + "template_card": {...}}` per the WeCom spec. + + Args: + form_data: Dify human-input form data with keys ``actions`` (list of + ``{id, title, button_style}``), ``node_title``, ``form_content``. + task_id: Unique per-card identifier. WeCom requires this for + button_interaction. The click callback returns it as TaskId so we + can find the originating session. + + Notes: + * ``button.key`` is set directly to the Dify ``action_id``. The click + callback's ``EventKey`` carries this back unchanged (1024-byte limit + per the spec, far more than we ever need). + * WeCom caps the button list at 6. Extra actions are appended to + ``sub_title_text`` so users can still reply with the id as text. + * Styles map ``primary``→1 (blue), ``danger``→2 (red), default→0 + (gray). First button is auto-promoted to primary when no style. + """ + actions = list(form_data.get('actions') or []) + node_title = (form_data.get('node_title') or '').strip() or '人工介入' + form_content = (form_data.get('form_content') or '').strip() + + visible_actions = actions[:6] + overflow = actions[6:] + + sub_title_parts: list[str] = [] + if form_content: + sub_title_parts.append(form_content) + if overflow: + extra_lines = [f' - {a.get("title") or a.get("id") or ""} (回复 id: {a.get("id") or ""})' for a in overflow] + sub_title_parts.append(f'另有 {len(overflow)} 个选项不在按钮列表中,可直接回复 id:\n' + '\n'.join(extra_lines)) + sub_title_text = '\n\n'.join(sub_title_parts) or '请选择一个操作以继续。' + + button_list = [] + for idx, action in enumerate(visible_actions): + action_id = str(action.get('id') or '') + title = str(action.get('title') or action_id or f'选项 {idx + 1}') + style_raw = (action.get('button_style') or '').lower() + if style_raw == 'primary' or (style_raw == '' and idx == 0): + style = 1 + elif style_raw == 'danger': + style = 2 + else: + style = 0 + button_list.append( + { + 'text': title, + 'style': style, + 'key': action_id, + } + ) + + card = { + 'card_type': 'button_interaction', + 'main_title': { + 'title': node_title, + }, + 'sub_title_text': sub_title_text, + 'button_list': button_list, + 'task_id': task_id, + } + return { + 'msgtype': 'template_card', + 'template_card': card, + } + + class WecomBotClient: def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLogger, unified_mode: bool = False): """企业微信智能机器人客户端。 @@ -761,6 +881,7 @@ class WecomBotClient: self.stream_poll_timeout = 0.5 self._feedback_callback: Optional[Callable] = None + self._card_action_callback: Optional[Callable] = None def set_feedback_callback(self, callback: Callable) -> None: """设置反馈回调函数。 @@ -770,6 +891,19 @@ class WecomBotClient: """ self._feedback_callback = callback + def set_card_action_callback(self, callback: Callable) -> None: + """设置按钮卡片点击回调函数。 + + Signature: ``async def callback(session, action_id, task_id, raw_event) -> None`` + + ``session`` is the StreamSession the card was attached to; + ``action_id`` is the Dify action_id reflected back via the + button's ``key`` field; ``task_id`` is the card's task_id + (matches ``session.pending_form_task_id``); ``raw_event`` is the + decoded callback JSON for any extra fields the adapter wants. + """ + self._card_action_callback = callback + @staticmethod def _build_stream_payload( stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None @@ -800,6 +934,12 @@ class WecomBotClient: 'stream': stream_payload, } + @staticmethod + def _build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]: + """Class-level shim — delegates to module-level builder so ws_client + can reuse the exact same payload shape without importing the class.""" + return build_button_interaction_payload(form_data, task_id) + async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]: """对响应进行加密封装并返回给企业微信。 @@ -892,6 +1032,22 @@ class WecomBotClient: return await self._encrypt_and_reply(self._build_stream_payload('', '', True), nonce) session = self.stream_sessions.get_session(stream_id) + + # If a Dify human-input pause arrived during this stream, switch + # the response from `msgtype: stream` to `msgtype: template_card` + # (button_interaction). The session's stream is also marked + # finished so future followups aren't expected (assuming the + # WeCom client treats template_card as the terminal response — + # we'll know from the next callback whether it kept polling). + if session and session.pending_form and session.pending_form_task_id: + await self.logger.info( + f'WeComBot: returning button_interaction for stream_id={stream_id} ' + f'task_id={session.pending_form_task_id} actions={len(session.pending_form.get("actions") or [])}' + ) + card_payload = self._build_button_interaction_payload(session.pending_form, session.pending_form_task_id) + self.stream_sessions.mark_finished(stream_id) + return await self._encrypt_and_reply(card_payload, nonce) + chunk = await self.stream_sessions.consume(stream_id, timeout=self.stream_poll_timeout) if not chunk: @@ -1000,11 +1156,50 @@ class WecomBotClient: if event_type == 'feedback_event': return await self._handle_feedback_event(msg_json, nonce) + # Button click on a button_interaction template_card. The WeCom doc + # calls this `template_card_event`; some routes wrap the button + # event payload inside `event.template_card_event`. + if event_type == 'template_card_event': + return await self._handle_template_card_event(msg_json, nonce) + if msg_json.get('msgtype') == 'stream': return await self._handle_post_followup_response(msg_json, nonce) return await self._handle_post_initial_response(msg_json, nonce) + async def _handle_template_card_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]: + """Handle a button click on a button_interaction template_card. + + WeCom carries the click info in ``event.template_card_event`` with + ``TaskId`` matching the card we created and ``EventKey`` carrying + the button's ``key`` (which we set to the Dify ``action_id``). + """ + try: + tce = msg_json.get('event', {}).get('template_card_event', {}) + task_id = tce.get('TaskId') or tce.get('task_id') or '' + event_key = tce.get('EventKey') or tce.get('event_key') or '' + card_type = tce.get('CardType') or tce.get('card_type') or '' + + await self.logger.info(f'收到按钮点击: task_id={task_id} event_key={event_key!r} card_type={card_type}') + + session = self.stream_sessions.get_session_by_task_id(task_id) + if session is None: + await self.logger.warning(f'未找到 task_id={task_id} 对应的 session,按钮点击被丢弃') + else: + if self._card_action_callback is not None: + try: + await self._card_action_callback(session, event_key, task_id, msg_json) + except Exception: + await self.logger.error(f'card action callback raised: {traceback.format_exc()}') + # Drop the form so a fresh chunk/followup doesn't re-render + # the same card (and so the task_id can be GC'd). + self.stream_sessions.clear_pending_form(session.stream_id) + except Exception: + await self.logger.error(f'_handle_template_card_event error: {traceback.format_exc()}') + + # WeCom expects an empty success ack for event callbacks. + return await self._encrypt_and_reply({}, nonce) + async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]: """处理企业微信用户反馈事件(点赞/点踩)。 @@ -1114,6 +1309,29 @@ class WecomBotClient: self.stream_sessions.mark_finished(stream_id) return True + async def push_form_pause( + self, msg_id: str, form_data: dict, task_id: Optional[str] = None + ) -> tuple[bool, Optional[str], Optional[str]]: + """Attach a Dify human-input pause to the active stream session. + + On the next WeCom followup poll, the response switches from + ``msgtype: stream`` to ``msgtype: template_card`` (button_interaction) + carrying the buttons. ``task_id`` is auto-generated if not provided + and is what the button-click callback uses to look the session back up. + + Returns: + ``(ok, stream_id, task_id)``. ``ok`` is False if the + adapter's msg_id maps to no stream session (e.g. non-stream mode). + """ + stream_id = self.stream_sessions.get_stream_id_by_msg(msg_id) + if not stream_id: + return False, None, None + if not task_id: + # WeCom requires task_id [A-Za-z0-9_-@], <= 128 bytes, unique per bot. + task_id = f'dify-{uuid.uuid4().hex[:24]}' + self.stream_sessions.set_pending_form(stream_id, form_data, task_id) + return True, stream_id, task_id + async def set_message(self, msg_id: str, content: str): """兼容旧逻辑:若无法流式返回则缓存最终结果。 diff --git a/src/langbot/libs/wecom_ai_bot_api/ws_client.py b/src/langbot/libs/wecom_ai_bot_api/ws_client.py index 5125a704..64e79eac 100644 --- a/src/langbot/libs/wecom_ai_bot_api/ws_client.py +++ b/src/langbot/libs/wecom_ai_bot_api/ws_client.py @@ -20,7 +20,11 @@ from typing import Any, Callable, Optional import aiohttp from langbot.libs.wecom_ai_bot_api import wecombotevent -from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession +from langbot.libs.wecom_ai_bot_api.api import ( + parse_wecom_bot_message, + StreamSession, + build_button_interaction_payload, +) from langbot.pkg.platform.logger import EventLogger DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com' @@ -103,6 +107,18 @@ class WecomBotWsClient: # msg_id -> feedback_id (for associating feedback with message) self._msg_feedback_ids: dict[str, str] = {} # msg_id -> feedback_id + # Dify human-input pause state for ws mode. Keys are task_id (echoed + # back in template_card_event.TaskId so we can rebuild the session + # context on click). + # task_id -> {form_data, msg_id, user_id, chat_id, stream_id, req_id} + self._pending_forms_by_task: dict[str, dict] = {} + # Reverse: msg_id -> task_id (for cleanup when stream finishes). + self._task_id_by_msg: dict[str, str] = {} + # Optional card-action callback registered by the adapter. + # Signature mirrors the http-mode WecomBotClient: + # async def callback(session, action_id, task_id, raw_event) -> None + self._card_action_callback: Optional[Callable] = None + # ── Public API ────────────────────────────────────────────────── async def connect(self): @@ -236,6 +252,83 @@ class WecomBotWsClient: } return await self._send_reply(req_id, body) + async def reply_template_card(self, req_id: str, card_payload: dict[str, Any]) -> Optional[dict]: + """Send a template_card (button_interaction etc.) reply. + + Args: + req_id: The req_id from the original message frame. + card_payload: Body produced by ``build_button_interaction_payload``; + must contain ``msgtype`` and ``template_card`` keys. + + Returns: + ACK frame dict, or None on failure. + """ + return await self._send_reply(req_id, card_payload) + + def set_card_action_callback(self, callback: Callable) -> None: + """Register the button-click handler. + + ``async def callback(session, action_id, task_id, raw_event) -> None`` + — same signature as the http-mode WecomBotClient version so the + adapter can hand both off to the same coroutine. + """ + self._card_action_callback = callback + + async def push_form_pause( + self, msg_id: str, form_data: dict, task_id: Optional[str] = None + ) -> tuple[bool, Optional[str], Optional[str]]: + """Attach a Dify human-input pause to the active stream and send + the button_interaction card immediately. + + ws mode has no notion of polled "followup" responses — each reply + is a one-shot frame send. So unlike the http path (which defers + card delivery to the next followup), here we just craft the card + and reply with it on the original req_id. The corresponding stream + session is then torn down so subsequent chunks don't re-send. + + Returns: + ``(ok, stream_id, task_id)``. ``ok=False`` if no active stream + for this msg_id (e.g. message arrived in non-stream mode). + """ + key = self._stream_ids.get(msg_id) + if not key: + return False, None, None + req_id, stream_id = key.split('|', 1) + + if not task_id: + task_id = f'dify-{secrets.token_hex(12)}' + + session_info = self._stream_sessions.get(msg_id) or {} + self._pending_forms_by_task[task_id] = { + 'form_data': form_data, + 'msg_id': msg_id, + 'user_id': session_info.get('user_id', ''), + 'chat_id': session_info.get('chat_id', ''), + 'stream_id': stream_id, + 'req_id': req_id, + } + self._task_id_by_msg[msg_id] = task_id + + card_payload = build_button_interaction_payload(form_data, task_id) + try: + await self.reply_template_card(req_id, card_payload) + except Exception: + await self.logger.error(f'Failed to send button_interaction card: {traceback.format_exc()}') + # Roll back the bookkeeping so the next attempt isn't blocked. + self._pending_forms_by_task.pop(task_id, None) + self._task_id_by_msg.pop(msg_id, None) + return False, stream_id, None + + # Tear down the stream — WeCom expects either stream chunks OR a + # template_card, not both on the same req_id. Subsequent + # push_stream_chunk calls for this msg_id become no-ops. + self._stream_ids.pop(msg_id, None) + self._stream_last_content.pop(msg_id, None) + # Keep _stream_sessions so the button callback can still resolve + # user/chat context; it gets cleaned up when the click fires. + + return True, stream_id, task_id + async def send_message(self, chat_id: str, content: str, msgtype: str = 'markdown') -> Optional[dict]: """Proactively send a message to a specified chat. @@ -258,6 +351,23 @@ class WecomBotWsClient: body['text'] = {'content': content} return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG) + async def send_template_card(self, chat_id: str, card_payload: dict[str, Any]) -> Optional[dict]: + """Proactively push a template_card to a chat. + + Used for the resumed-workflow path (button click → new query): + synthetic events have no inbound req_id to reply against, so we + fall back to proactive ``aibot_send_msg`` instead of reply mode. + + Args: + chat_id: userid (single chat) or chatid (group chat). + card_payload: ``{"msgtype": "template_card", "template_card": {...}}`` + as produced by :func:`build_button_interaction_payload`. + """ + req_id = _generate_req_id(CMD_SEND_MSG) + body = dict(card_payload) + body['chatid'] = chat_id + return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG) + async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = False) -> bool: """Push a streaming chunk for a given message ID. @@ -568,6 +678,38 @@ class WecomBotWsClient: await self.logger.error(f'Error in feedback handler: {traceback.format_exc()}') return + if event_type == 'template_card_event': + tce = event_info.get('template_card_event', {}) + task_id = tce.get('TaskId') or tce.get('task_id') or '' + event_key = tce.get('EventKey') or tce.get('event_key') or '' + card_type = tce.get('CardType') or tce.get('card_type') or '' + await self.logger.info( + f'收到按钮点击 (ws): task_id={task_id} event_key={event_key!r} card_type={card_type}' + ) + pending = self._pending_forms_by_task.get(task_id) + if pending is None: + await self.logger.warning(f'未找到 task_id={task_id} 对应的 pending_form (ws),按钮点击被丢弃') + elif self._card_action_callback is not None: + try: + session = StreamSession( + stream_id=pending.get('stream_id', ''), + msg_id=pending.get('msg_id', ''), + chat_id=pending.get('chat_id') or None, + user_id=pending.get('user_id') or None, + ) + session.pending_form = pending.get('form_data') + session.pending_form_task_id = task_id + await self._card_action_callback(session, event_key, task_id, body) + except Exception: + await self.logger.error(f'card action callback raised (ws): {traceback.format_exc()}') + # Consume — drop bookkeeping so a stale click can't re-fire. + self._pending_forms_by_task.pop(task_id, None) + msg_id = pending.get('msg_id', '') + if msg_id: + self._task_id_by_msg.pop(msg_id, None) + self._stream_sessions.pop(msg_id, None) + return + event = wecombotevent.WecomBotEvent(message_data) if event_type in self._message_handlers: diff --git a/src/langbot/pkg/platform/sources/wecombot.py b/src/langbot/pkg/platform/sources/wecombot.py index dd726544..84cde168 100644 --- a/src/langbot/pkg/platform/sources/wecombot.py +++ b/src/langbot/pkg/platform/sources/wecombot.py @@ -296,6 +296,7 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): listeners: dict = {} _stream_to_monitoring_msg: dict = {} # Maps stream_id to (monitoring_message_id, timestamp) _STREAM_MAPPING_TTL = 600 # 10 minutes + ap: typing.Any = None def __init__(self, config: dict, logger: EventLogger): enable_webhook = config.get('enable-webhook', False) @@ -336,6 +337,12 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): _stream_to_monitoring_msg={}, ) + # Both WecomBotClient (webhook) and WecomBotWsClient (ws long-conn) + # expose ``set_card_action_callback``. Wire the click handler so + # Dify human-input button taps resume the workflow on either mode. + if hasattr(self.bot, 'set_card_action_callback'): + self.bot.set_card_action_callback(self._on_card_action) + async def reply_message( self, message_source: platform_events.MessageEvent, @@ -345,15 +352,37 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): content = await self.message_converter.yiri2target(message) _ws_mode = not self.config.get('enable-webhook', False) + event = message_source.source_platform_object + # Synthetic events (button-click resume queries) have no inbound + # platform object. Fall back to a proactive send so error + # messages and one-shot replies still reach the user. + if event is None: + if _ws_mode: + if isinstance(message_source, platform_events.GroupMessage): + chat_id = str(message_source.group.id) + else: + chat_id = str(message_source.sender.id) + try: + await self.bot.send_message(chat_id, content) + except Exception: + await self.logger.error( + f'WeComBot: proactive reply for synthetic event failed: {traceback.format_exc()}' + ) + else: + await self.logger.warning( + 'WeComBot webhook mode cannot reply to a synthetic event ' + '(no req_id and no proactive-send credentials); dropping.' + ) + return + if _ws_mode: - event = message_source.source_platform_object - req_id = event.get('req_id', '') + req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '') if req_id: await self.bot.reply_text(req_id, content) else: await self.bot.set_message(event.message_id, content) else: - await self.bot.set_message(message_source.source_platform_object.message_id, content) + await self.bot.set_message(event.message_id, content) async def reply_message_chunk( self, @@ -364,9 +393,56 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): is_final: bool = False, ): content = await self.message_converter.yiri2target(message) - msg_id = message_source.source_platform_object.message_id _ws_mode = not self.config.get('enable-webhook', False) + # Synthetic events (e.g. button-click triggered form resume) have + # no inbound platform message — no msg_id, no req_id, no stream + # session. The output must go via the proactive-send path instead + # of the stream/reply path. + spo = message_source.source_platform_object + if spo is None: + return await self._handle_synthetic_chunk(message_source, bot_message, content, is_final, _ws_mode) + + msg_id = spo.message_id + + # Dify human-input pause: when the runner attaches `_form_data` to + # the final chunk, hand the button_interaction card off to the + # underlying client. In webhook mode the card is queued for the + # next followup poll; in ws mode it's sent as a reply frame + # immediately. Falls back to plain text when the bot has no active + # stream session for this msg_id (rare). + form_data = getattr(bot_message, '_form_data', None) + if form_data and is_final: + if hasattr(self.bot, 'push_form_pause'): + ok, stream_id, task_id = await self.bot.push_form_pause(msg_id, form_data) + if ok: + await self.logger.info( + f'WeComBot: pending button_interaction registered ' + f'stream_id={stream_id} task_id={task_id} ws_mode={_ws_mode}' + ) + return {'stream': True, 'form': True, 'task_id': task_id} + await self.logger.warning( + 'WeComBot: cannot register form pause (no active stream session); falling back to plain text' + ) + try: + from langbot.pkg.provider.runners.difysvapi import _format_human_input_text + + fallback = _format_human_input_text( + form_data.get('node_title', ''), + form_data.get('form_content', ''), + form_data.get('actions', []) or [], + ) + except Exception: + fallback = content or '(人工输入)' + if _ws_mode: + event = message_source.source_platform_object + req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '') + if req_id: + await self.bot.reply_text(req_id, fallback) + else: + await self.bot.set_message(msg_id, fallback) + return {'stream': False, 'form': True, 'fallback': True} + if _ws_mode: success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final) if not success and is_final: @@ -385,6 +461,129 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): """Whether streaming output is enabled for this bot instance.""" return self.config.get('enable-stream-reply', True) + async def _handle_synthetic_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message, + content: str, + is_final: bool, + ws_mode: bool, + ) -> dict: + """Handle reply_message_chunk for synthetic events (button clicks). + + Synthetic events have no inbound message → no msg_id, no req_id, + no stream session. We can't do incremental streaming, so we + buffer chunks per-conversation and flush on ``is_final`` via the + proactive send path. + + Buffer keyed by ``(launcher_type, launcher_id)`` from the + synthetic event itself. Only ws mode has a usable proactive-send + path right now (``ws_client.send_message`` / + ``ws_client.send_template_card``); webhook mode requires a + corpid/secret we don't have, so it logs and drops. + """ + if isinstance(message_source, platform_events.GroupMessage): + chat_id = str(message_source.group.id) + else: + chat_id = str(message_source.sender.id) + + form_data = getattr(bot_message, '_form_data', None) + + # Buffer streaming content until is_final. + buf_key = chat_id + if not hasattr(self, '_synthetic_buffers'): + # Attribute-not-declared trick: pydantic forbids dynamic attrs + # on the model, but plain instance dicts via object.__setattr__ + # do work. Lazy-create on first call. + object.__setattr__(self, '_synthetic_buffers', {}) + buffers: dict[str, str] = self._synthetic_buffers + if content and not form_data: + buffers[buf_key] = buffers.get(buf_key, '') + content + + if not is_final: + return {'stream': True, 'synthetic': True, 'buffered': True} + + final_content = buffers.pop(buf_key, '') + if content and final_content.startswith(content): + # is_final chunk re-emitted the full accumulated text — keep + # whichever is longer. + final_content = final_content if len(final_content) >= len(content) else content + elif content and not final_content: + final_content = content + + if not ws_mode: + await self.logger.warning( + 'WeComBot webhook mode cannot proactively push synthetic-event ' + 'output (no corpid/secret); the resume reply is dropped. ' + f'content_len={len(final_content)} form_data_present={form_data is not None}' + ) + return {'stream': False, 'synthetic': True, 'dropped': True} + + # ws mode: proactive send. + try: + if form_data: + # Determine user_id / chat_id for the routing context of any + # subsequent click on this card. + if isinstance(message_source, platform_events.GroupMessage): + routing_chat_id = str(message_source.group.id) + routing_user_id = str(message_source.sender.id) + else: + routing_chat_id = '' + routing_user_id = str(message_source.sender.id) + payload = self._build_button_interaction_payload_from_form( + form_data, + user_id=routing_user_id, + chat_id=routing_chat_id, + ) + await self.bot.send_template_card(chat_id, payload) + await self.logger.info( + f'WeComBot ws: proactively sent template_card for synthetic event ' + f'chat_id={chat_id} form_token={form_data.get("form_token")!r} ' + f'workflow_run_id={form_data.get("workflow_run_id")!r}' + ) + elif final_content: + await self.bot.send_message(chat_id, final_content) + await self.logger.info( + f'WeComBot ws: proactively sent text for synthetic event chat_id={chat_id} len={len(final_content)}' + ) + except Exception: + await self.logger.error(f'WeComBot: synthetic event proactive send failed: {traceback.format_exc()}') + return {'stream': False, 'synthetic': True, 'error': True} + + return {'stream': True, 'synthetic': True} + + def _build_button_interaction_payload_from_form( + self, form_data: dict, *, user_id: str = '', chat_id: str = '' + ) -> dict: + """Build a button_interaction payload + track task_id for click resolution. + + Unlike the inbound-event path (where push_form_pause registers the + task_id with the active stream session), proactive sends still + need the task_id registered so button clicks find pending_form. + For ws mode we stash it directly on the ws_client's pending dict. + """ + from langbot.libs.wecom_ai_bot_api.api import build_button_interaction_payload + import secrets as _secrets + + task_id = f'dify-{_secrets.token_hex(12)}' + payload = build_button_interaction_payload(form_data, task_id) + + # Register task_id → form_data so the click callback can find it. + # user_id / chat_id are required so _on_card_action can route the + # resulting synthetic query back to the right user. msg_id / req_id + # / stream_id are intentionally empty — synthetic cards have no + # inbound message to anchor on. + if hasattr(self.bot, '_pending_forms_by_task'): + self.bot._pending_forms_by_task[task_id] = { + 'form_data': form_data, + 'msg_id': '', + 'user_id': user_id, + 'chat_id': chat_id, + 'stream_id': '', + 'req_id': '', + } + return payload + async def send_message(self, target_type, target_id, message): _ws_mode = not self.config.get('enable-webhook', False) if _ws_mode: @@ -531,3 +730,114 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): async def is_muted(self, group_id: int) -> bool: pass + + # ------------------------------------------------------------------ + # Dify human-input button-interaction click handling + # ------------------------------------------------------------------ + + async def _on_card_action(self, session, action_id: str, task_id: str, raw_event: dict) -> None: + """Translate a button click on a button_interaction card into a + synthetic ``_dify_form_action`` query enqueued on the pool. + + Pattern mirrors DingTalk / Lark / Telegram so the runner's + ``_merge_pending_form_action`` path resumes the workflow. + """ + import langbot_plugin.api.entities.builtin.provider.session as provider_session + + form = session.pending_form or {} + await self.logger.info( + f'WeComBot _on_card_action: task_id={task_id} action_id={action_id!r} ' + f'form_token={form.get("form_token")!r} workflow_run_id={form.get("workflow_run_id")!r} ' + f'session.user_id={session.user_id!r} session.chat_id={session.chat_id!r}' + ) + + actions = form.get('actions') or [] + clean_action_id = (action_id or '').strip() + action_title = clean_action_id + for a in actions: + if str(a.get('id', '')) == clean_action_id: + action_title = a.get('title') or clean_action_id + break + + launcher_id = session.user_id or session.chat_id or '' + sender_user_id = session.user_id or launcher_id + # WeCom AI bot has both single-chat and group-chat; chat_id present + # indicates group context. + if session.chat_id: + launcher_type = provider_session.LauncherTypes.GROUP + launcher_id = session.chat_id + else: + launcher_type = provider_session.LauncherTypes.PERSON + launcher_id = session.user_id or '' + + form_action_data = { + 'form_token': form.get('form_token', ''), + 'workflow_run_id': form.get('workflow_run_id', ''), + 'action_id': clean_action_id, + 'action_title': action_title, + 'node_title': form.get('node_title', ''), + 'user': f'{launcher_type.value}_{launcher_id}', + 'inputs': {}, + } + + message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')]) + + if launcher_type == provider_session.LauncherTypes.GROUP: + synthetic_event = platform_events.GroupMessage( + sender=platform_entities.GroupMember( + id=sender_user_id, + member_name='', + permission=platform_entities.Permission.Member, + group=platform_entities.Group( + id=launcher_id, + name='', + permission=platform_entities.Permission.Member, + ), + special_title='', + ), + message_chain=message_chain, + time=int(time.time()), + source_platform_object=None, + ) + else: + synthetic_event = platform_events.FriendMessage( + sender=platform_entities.Friend( + id=sender_user_id, + nickname='', + remark='', + ), + message_chain=message_chain, + time=int(time.time()), + source_platform_object=None, + ) + + if self.ap is None: + await self.logger.error('WeComBot: ap not injected; cannot enqueue button-click query') + return + + bot_uuid = '' + pipeline_uuid = None + for bot in self.ap.platform_mgr.bots: + if bot.adapter is self: + bot_uuid = bot.bot_entity.uuid + pipeline_uuid = bot.bot_entity.use_pipeline_uuid + break + + try: + await self.ap.query_pool.add_query( + bot_uuid=bot_uuid, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=sender_user_id, + message_event=synthetic_event, + message_chain=message_chain, + adapter=self, + pipeline_uuid=pipeline_uuid, + variables={ + '_dify_form_action': form_action_data, + '_routed_by_rule': True, + }, + ) + await self.logger.info(f'WeComBot: button-click query enqueued action_id={clean_action_id!r}') + except Exception: + await self.logger.error(f'WeComBot: enqueue button-click query failed: {traceback.format_exc()}')