feat(wecom): implement Dify human input pause handling with button interaction support

This commit is contained in:
fdc310
2026-06-16 00:07:11 +08:00
parent a32c4d152f
commit f10ea82418
3 changed files with 675 additions and 5 deletions

View File

@@ -67,6 +67,16 @@ class StreamSession:
# 反馈 ID用于接收用户点赞/点踩反馈
feedback_id: Optional[str] = None
# Dify 人工输入暂停态runner 把 _form_data 传过来时填充。
# 一旦设置,下次企微 followup 请求时返回 button_interaction 模板卡
# 替代 stream chunk。点击按钮会回调 template_card_eventEventKey
# 就是 Dify 的 action_id。
pending_form: Optional[dict] = None
# template_card task_id企微要求 button_interaction 必填且不可重复)。
# 创建 pending_form 时生成;按钮点击回调里用来反查 session。
pending_form_task_id: Optional[str] = None
class StreamSessionManager:
"""管理 stream 会话的生命周期,并负责队列的生产消费。"""
@@ -83,6 +93,9 @@ class StreamSessionManager:
self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射
self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话
self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射
# task_id (button_interaction template_card 的) -> stream_id 映射,
# 用于按钮点击回调里反查 pending_form。
self._task_index: dict[str, str] = {}
def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]:
if not msg_id:
@@ -118,6 +131,40 @@ class StreamSessionManager:
if feedback_id and stream_id:
self._feedback_index[feedback_id] = stream_id
def set_pending_form(self, stream_id: str, form_data: dict, task_id: str) -> None:
"""把 Dify 人工输入暂停态绑定到 stream session。
下一次企微 followup 请求时adapter 检测到 pending_form
返回 button_interaction 模板卡而不是 stream chunk。
"""
session = self._sessions.get(stream_id)
if not session:
return
session.pending_form = form_data
session.pending_form_task_id = task_id
if task_id:
self._task_index[task_id] = stream_id
def get_session_by_task_id(self, task_id: str) -> Optional[StreamSession]:
"""按按钮点击回调里的 TaskId 反查 session。"""
if not task_id:
return None
stream_id = self._task_index.get(task_id)
if not stream_id:
return None
return self._sessions.get(stream_id)
def clear_pending_form(self, stream_id: str) -> None:
"""按钮点击消费完后清掉 pending_form避免重复弹卡。"""
session = self._sessions.get(stream_id)
if not session:
return
task_id = session.pending_form_task_id
session.pending_form = None
session.pending_form_task_id = None
if task_id:
self._task_index.pop(task_id, None)
def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]:
"""根据企业微信回调创建或获取会话。
@@ -723,6 +770,79 @@ async def parse_wecom_bot_message(
return message_data
def build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]:
"""Build a `template_card` (button_interaction) WeCom payload.
Shared by both the webhook-mode client (returns the payload as the
response to a stream-followup callback) and the ws_client (sends it
as a reply frame). Output shape is `{"msgtype": "template_card",
"template_card": {...}}` per the WeCom spec.
Args:
form_data: Dify human-input form data with keys ``actions`` (list of
``{id, title, button_style}``), ``node_title``, ``form_content``.
task_id: Unique per-card identifier. WeCom requires this for
button_interaction. The click callback returns it as TaskId so we
can find the originating session.
Notes:
* ``button.key`` is set directly to the Dify ``action_id``. The click
callback's ``EventKey`` carries this back unchanged (1024-byte limit
per the spec, far more than we ever need).
* WeCom caps the button list at 6. Extra actions are appended to
``sub_title_text`` so users can still reply with the id as text.
* Styles map ``primary``→1 (blue), ``danger``→2 (red), default→0
(gray). First button is auto-promoted to primary when no style.
"""
actions = list(form_data.get('actions') or [])
node_title = (form_data.get('node_title') or '').strip() or '人工介入'
form_content = (form_data.get('form_content') or '').strip()
visible_actions = actions[:6]
overflow = actions[6:]
sub_title_parts: list[str] = []
if form_content:
sub_title_parts.append(form_content)
if overflow:
extra_lines = [f' - {a.get("title") or a.get("id") or ""} (回复 id: {a.get("id") or ""})' for a in overflow]
sub_title_parts.append(f'另有 {len(overflow)} 个选项不在按钮列表中,可直接回复 id\n' + '\n'.join(extra_lines))
sub_title_text = '\n\n'.join(sub_title_parts) or '请选择一个操作以继续。'
button_list = []
for idx, action in enumerate(visible_actions):
action_id = str(action.get('id') or '')
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
style_raw = (action.get('button_style') or '').lower()
if style_raw == 'primary' or (style_raw == '' and idx == 0):
style = 1
elif style_raw == 'danger':
style = 2
else:
style = 0
button_list.append(
{
'text': title,
'style': style,
'key': action_id,
}
)
card = {
'card_type': 'button_interaction',
'main_title': {
'title': node_title,
},
'sub_title_text': sub_title_text,
'button_list': button_list,
'task_id': task_id,
}
return {
'msgtype': 'template_card',
'template_card': card,
}
class WecomBotClient:
def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLogger, unified_mode: bool = False):
"""企业微信智能机器人客户端。
@@ -761,6 +881,7 @@ class WecomBotClient:
self.stream_poll_timeout = 0.5
self._feedback_callback: Optional[Callable] = None
self._card_action_callback: Optional[Callable] = None
def set_feedback_callback(self, callback: Callable) -> None:
"""设置反馈回调函数。
@@ -770,6 +891,19 @@ class WecomBotClient:
"""
self._feedback_callback = callback
def set_card_action_callback(self, callback: Callable) -> None:
"""设置按钮卡片点击回调函数。
Signature: ``async def callback(session, action_id, task_id, raw_event) -> None``
``session`` is the StreamSession the card was attached to;
``action_id`` is the Dify action_id reflected back via the
button's ``key`` field; ``task_id`` is the card's task_id
(matches ``session.pending_form_task_id``); ``raw_event`` is the
decoded callback JSON for any extra fields the adapter wants.
"""
self._card_action_callback = callback
@staticmethod
def _build_stream_payload(
stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None
@@ -800,6 +934,12 @@ class WecomBotClient:
'stream': stream_payload,
}
@staticmethod
def _build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]:
"""Class-level shim — delegates to module-level builder so ws_client
can reuse the exact same payload shape without importing the class."""
return build_button_interaction_payload(form_data, task_id)
async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""对响应进行加密封装并返回给企业微信。
@@ -892,6 +1032,22 @@ class WecomBotClient:
return await self._encrypt_and_reply(self._build_stream_payload('', '', True), nonce)
session = self.stream_sessions.get_session(stream_id)
# If a Dify human-input pause arrived during this stream, switch
# the response from `msgtype: stream` to `msgtype: template_card`
# (button_interaction). The session's stream is also marked
# finished so future followups aren't expected (assuming the
# WeCom client treats template_card as the terminal response —
# we'll know from the next callback whether it kept polling).
if session and session.pending_form and session.pending_form_task_id:
await self.logger.info(
f'WeComBot: returning button_interaction for stream_id={stream_id} '
f'task_id={session.pending_form_task_id} actions={len(session.pending_form.get("actions") or [])}'
)
card_payload = self._build_button_interaction_payload(session.pending_form, session.pending_form_task_id)
self.stream_sessions.mark_finished(stream_id)
return await self._encrypt_and_reply(card_payload, nonce)
chunk = await self.stream_sessions.consume(stream_id, timeout=self.stream_poll_timeout)
if not chunk:
@@ -1000,11 +1156,50 @@ class WecomBotClient:
if event_type == 'feedback_event':
return await self._handle_feedback_event(msg_json, nonce)
# Button click on a button_interaction template_card. The WeCom doc
# calls this `template_card_event`; some routes wrap the button
# event payload inside `event.template_card_event`.
if event_type == 'template_card_event':
return await self._handle_template_card_event(msg_json, nonce)
if msg_json.get('msgtype') == 'stream':
return await self._handle_post_followup_response(msg_json, nonce)
return await self._handle_post_initial_response(msg_json, nonce)
async def _handle_template_card_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""Handle a button click on a button_interaction template_card.
WeCom carries the click info in ``event.template_card_event`` with
``TaskId`` matching the card we created and ``EventKey`` carrying
the button's ``key`` (which we set to the Dify ``action_id``).
"""
try:
tce = msg_json.get('event', {}).get('template_card_event', {})
task_id = tce.get('TaskId') or tce.get('task_id') or ''
event_key = tce.get('EventKey') or tce.get('event_key') or ''
card_type = tce.get('CardType') or tce.get('card_type') or ''
await self.logger.info(f'收到按钮点击: task_id={task_id} event_key={event_key!r} card_type={card_type}')
session = self.stream_sessions.get_session_by_task_id(task_id)
if session is None:
await self.logger.warning(f'未找到 task_id={task_id} 对应的 session按钮点击被丢弃')
else:
if self._card_action_callback is not None:
try:
await self._card_action_callback(session, event_key, task_id, msg_json)
except Exception:
await self.logger.error(f'card action callback raised: {traceback.format_exc()}')
# Drop the form so a fresh chunk/followup doesn't re-render
# the same card (and so the task_id can be GC'd).
self.stream_sessions.clear_pending_form(session.stream_id)
except Exception:
await self.logger.error(f'_handle_template_card_event error: {traceback.format_exc()}')
# WeCom expects an empty success ack for event callbacks.
return await self._encrypt_and_reply({}, nonce)
async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""处理企业微信用户反馈事件(点赞/点踩)。
@@ -1114,6 +1309,29 @@ class WecomBotClient:
self.stream_sessions.mark_finished(stream_id)
return True
async def push_form_pause(
self, msg_id: str, form_data: dict, task_id: Optional[str] = None
) -> tuple[bool, Optional[str], Optional[str]]:
"""Attach a Dify human-input pause to the active stream session.
On the next WeCom followup poll, the response switches from
``msgtype: stream`` to ``msgtype: template_card`` (button_interaction)
carrying the buttons. ``task_id`` is auto-generated if not provided
and is what the button-click callback uses to look the session back up.
Returns:
``(ok, stream_id, task_id)``. ``ok`` is False if the
adapter's msg_id maps to no stream session (e.g. non-stream mode).
"""
stream_id = self.stream_sessions.get_stream_id_by_msg(msg_id)
if not stream_id:
return False, None, None
if not task_id:
# WeCom requires task_id [A-Za-z0-9_-@], <= 128 bytes, unique per bot.
task_id = f'dify-{uuid.uuid4().hex[:24]}'
self.stream_sessions.set_pending_form(stream_id, form_data, task_id)
return True, stream_id, task_id
async def set_message(self, msg_id: str, content: str):
"""兼容旧逻辑:若无法流式返回则缓存最终结果。

View File

@@ -20,7 +20,11 @@ from typing import Any, Callable, Optional
import aiohttp
from langbot.libs.wecom_ai_bot_api import wecombotevent
from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession
from langbot.libs.wecom_ai_bot_api.api import (
parse_wecom_bot_message,
StreamSession,
build_button_interaction_payload,
)
from langbot.pkg.platform.logger import EventLogger
DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com'
@@ -103,6 +107,18 @@ class WecomBotWsClient:
# msg_id -> feedback_id (for associating feedback with message)
self._msg_feedback_ids: dict[str, str] = {} # msg_id -> feedback_id
# Dify human-input pause state for ws mode. Keys are task_id (echoed
# back in template_card_event.TaskId so we can rebuild the session
# context on click).
# task_id -> {form_data, msg_id, user_id, chat_id, stream_id, req_id}
self._pending_forms_by_task: dict[str, dict] = {}
# Reverse: msg_id -> task_id (for cleanup when stream finishes).
self._task_id_by_msg: dict[str, str] = {}
# Optional card-action callback registered by the adapter.
# Signature mirrors the http-mode WecomBotClient:
# async def callback(session, action_id, task_id, raw_event) -> None
self._card_action_callback: Optional[Callable] = None
# ── Public API ──────────────────────────────────────────────────
async def connect(self):
@@ -236,6 +252,83 @@ class WecomBotWsClient:
}
return await self._send_reply(req_id, body)
async def reply_template_card(self, req_id: str, card_payload: dict[str, Any]) -> Optional[dict]:
"""Send a template_card (button_interaction etc.) reply.
Args:
req_id: The req_id from the original message frame.
card_payload: Body produced by ``build_button_interaction_payload``;
must contain ``msgtype`` and ``template_card`` keys.
Returns:
ACK frame dict, or None on failure.
"""
return await self._send_reply(req_id, card_payload)
def set_card_action_callback(self, callback: Callable) -> None:
"""Register the button-click handler.
``async def callback(session, action_id, task_id, raw_event) -> None``
— same signature as the http-mode WecomBotClient version so the
adapter can hand both off to the same coroutine.
"""
self._card_action_callback = callback
async def push_form_pause(
self, msg_id: str, form_data: dict, task_id: Optional[str] = None
) -> tuple[bool, Optional[str], Optional[str]]:
"""Attach a Dify human-input pause to the active stream and send
the button_interaction card immediately.
ws mode has no notion of polled "followup" responses — each reply
is a one-shot frame send. So unlike the http path (which defers
card delivery to the next followup), here we just craft the card
and reply with it on the original req_id. The corresponding stream
session is then torn down so subsequent chunks don't re-send.
Returns:
``(ok, stream_id, task_id)``. ``ok=False`` if no active stream
for this msg_id (e.g. message arrived in non-stream mode).
"""
key = self._stream_ids.get(msg_id)
if not key:
return False, None, None
req_id, stream_id = key.split('|', 1)
if not task_id:
task_id = f'dify-{secrets.token_hex(12)}'
session_info = self._stream_sessions.get(msg_id) or {}
self._pending_forms_by_task[task_id] = {
'form_data': form_data,
'msg_id': msg_id,
'user_id': session_info.get('user_id', ''),
'chat_id': session_info.get('chat_id', ''),
'stream_id': stream_id,
'req_id': req_id,
}
self._task_id_by_msg[msg_id] = task_id
card_payload = build_button_interaction_payload(form_data, task_id)
try:
await self.reply_template_card(req_id, card_payload)
except Exception:
await self.logger.error(f'Failed to send button_interaction card: {traceback.format_exc()}')
# Roll back the bookkeeping so the next attempt isn't blocked.
self._pending_forms_by_task.pop(task_id, None)
self._task_id_by_msg.pop(msg_id, None)
return False, stream_id, None
# Tear down the stream — WeCom expects either stream chunks OR a
# template_card, not both on the same req_id. Subsequent
# push_stream_chunk calls for this msg_id become no-ops.
self._stream_ids.pop(msg_id, None)
self._stream_last_content.pop(msg_id, None)
# Keep _stream_sessions so the button callback can still resolve
# user/chat context; it gets cleaned up when the click fires.
return True, stream_id, task_id
async def send_message(self, chat_id: str, content: str, msgtype: str = 'markdown') -> Optional[dict]:
"""Proactively send a message to a specified chat.
@@ -258,6 +351,23 @@ class WecomBotWsClient:
body['text'] = {'content': content}
return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG)
async def send_template_card(self, chat_id: str, card_payload: dict[str, Any]) -> Optional[dict]:
"""Proactively push a template_card to a chat.
Used for the resumed-workflow path (button click → new query):
synthetic events have no inbound req_id to reply against, so we
fall back to proactive ``aibot_send_msg`` instead of reply mode.
Args:
chat_id: userid (single chat) or chatid (group chat).
card_payload: ``{"msgtype": "template_card", "template_card": {...}}``
as produced by :func:`build_button_interaction_payload`.
"""
req_id = _generate_req_id(CMD_SEND_MSG)
body = dict(card_payload)
body['chatid'] = chat_id
return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG)
async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = False) -> bool:
"""Push a streaming chunk for a given message ID.
@@ -568,6 +678,38 @@ class WecomBotWsClient:
await self.logger.error(f'Error in feedback handler: {traceback.format_exc()}')
return
if event_type == 'template_card_event':
tce = event_info.get('template_card_event', {})
task_id = tce.get('TaskId') or tce.get('task_id') or ''
event_key = tce.get('EventKey') or tce.get('event_key') or ''
card_type = tce.get('CardType') or tce.get('card_type') or ''
await self.logger.info(
f'收到按钮点击 (ws): task_id={task_id} event_key={event_key!r} card_type={card_type}'
)
pending = self._pending_forms_by_task.get(task_id)
if pending is None:
await self.logger.warning(f'未找到 task_id={task_id} 对应的 pending_form (ws),按钮点击被丢弃')
elif self._card_action_callback is not None:
try:
session = StreamSession(
stream_id=pending.get('stream_id', ''),
msg_id=pending.get('msg_id', ''),
chat_id=pending.get('chat_id') or None,
user_id=pending.get('user_id') or None,
)
session.pending_form = pending.get('form_data')
session.pending_form_task_id = task_id
await self._card_action_callback(session, event_key, task_id, body)
except Exception:
await self.logger.error(f'card action callback raised (ws): {traceback.format_exc()}')
# Consume — drop bookkeeping so a stale click can't re-fire.
self._pending_forms_by_task.pop(task_id, None)
msg_id = pending.get('msg_id', '')
if msg_id:
self._task_id_by_msg.pop(msg_id, None)
self._stream_sessions.pop(msg_id, None)
return
event = wecombotevent.WecomBotEvent(message_data)
if event_type in self._message_handlers:

View File

@@ -296,6 +296,7 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
listeners: dict = {}
_stream_to_monitoring_msg: dict = {} # Maps stream_id to (monitoring_message_id, timestamp)
_STREAM_MAPPING_TTL = 600 # 10 minutes
ap: typing.Any = None
def __init__(self, config: dict, logger: EventLogger):
enable_webhook = config.get('enable-webhook', False)
@@ -336,6 +337,12 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
_stream_to_monitoring_msg={},
)
# Both WecomBotClient (webhook) and WecomBotWsClient (ws long-conn)
# expose ``set_card_action_callback``. Wire the click handler so
# Dify human-input button taps resume the workflow on either mode.
if hasattr(self.bot, 'set_card_action_callback'):
self.bot.set_card_action_callback(self._on_card_action)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
@@ -345,15 +352,37 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
content = await self.message_converter.yiri2target(message)
_ws_mode = not self.config.get('enable-webhook', False)
event = message_source.source_platform_object
# Synthetic events (button-click resume queries) have no inbound
# platform object. Fall back to a proactive send so error
# messages and one-shot replies still reach the user.
if event is None:
if _ws_mode:
if isinstance(message_source, platform_events.GroupMessage):
chat_id = str(message_source.group.id)
else:
chat_id = str(message_source.sender.id)
try:
await self.bot.send_message(chat_id, content)
except Exception:
await self.logger.error(
f'WeComBot: proactive reply for synthetic event failed: {traceback.format_exc()}'
)
else:
await self.logger.warning(
'WeComBot webhook mode cannot reply to a synthetic event '
'(no req_id and no proactive-send credentials); dropping.'
)
return
if _ws_mode:
event = message_source.source_platform_object
req_id = event.get('req_id', '')
req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '')
if req_id:
await self.bot.reply_text(req_id, content)
else:
await self.bot.set_message(event.message_id, content)
else:
await self.bot.set_message(message_source.source_platform_object.message_id, content)
await self.bot.set_message(event.message_id, content)
async def reply_message_chunk(
self,
@@ -364,9 +393,56 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
is_final: bool = False,
):
content = await self.message_converter.yiri2target(message)
msg_id = message_source.source_platform_object.message_id
_ws_mode = not self.config.get('enable-webhook', False)
# Synthetic events (e.g. button-click triggered form resume) have
# no inbound platform message — no msg_id, no req_id, no stream
# session. The output must go via the proactive-send path instead
# of the stream/reply path.
spo = message_source.source_platform_object
if spo is None:
return await self._handle_synthetic_chunk(message_source, bot_message, content, is_final, _ws_mode)
msg_id = spo.message_id
# Dify human-input pause: when the runner attaches `_form_data` to
# the final chunk, hand the button_interaction card off to the
# underlying client. In webhook mode the card is queued for the
# next followup poll; in ws mode it's sent as a reply frame
# immediately. Falls back to plain text when the bot has no active
# stream session for this msg_id (rare).
form_data = getattr(bot_message, '_form_data', None)
if form_data and is_final:
if hasattr(self.bot, 'push_form_pause'):
ok, stream_id, task_id = await self.bot.push_form_pause(msg_id, form_data)
if ok:
await self.logger.info(
f'WeComBot: pending button_interaction registered '
f'stream_id={stream_id} task_id={task_id} ws_mode={_ws_mode}'
)
return {'stream': True, 'form': True, 'task_id': task_id}
await self.logger.warning(
'WeComBot: cannot register form pause (no active stream session); falling back to plain text'
)
try:
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
fallback = _format_human_input_text(
form_data.get('node_title', ''),
form_data.get('form_content', ''),
form_data.get('actions', []) or [],
)
except Exception:
fallback = content or '(人工输入)'
if _ws_mode:
event = message_source.source_platform_object
req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '')
if req_id:
await self.bot.reply_text(req_id, fallback)
else:
await self.bot.set_message(msg_id, fallback)
return {'stream': False, 'form': True, 'fallback': True}
if _ws_mode:
success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final)
if not success and is_final:
@@ -385,6 +461,129 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""Whether streaming output is enabled for this bot instance."""
return self.config.get('enable-stream-reply', True)
async def _handle_synthetic_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
content: str,
is_final: bool,
ws_mode: bool,
) -> dict:
"""Handle reply_message_chunk for synthetic events (button clicks).
Synthetic events have no inbound message → no msg_id, no req_id,
no stream session. We can't do incremental streaming, so we
buffer chunks per-conversation and flush on ``is_final`` via the
proactive send path.
Buffer keyed by ``(launcher_type, launcher_id)`` from the
synthetic event itself. Only ws mode has a usable proactive-send
path right now (``ws_client.send_message`` /
``ws_client.send_template_card``); webhook mode requires a
corpid/secret we don't have, so it logs and drops.
"""
if isinstance(message_source, platform_events.GroupMessage):
chat_id = str(message_source.group.id)
else:
chat_id = str(message_source.sender.id)
form_data = getattr(bot_message, '_form_data', None)
# Buffer streaming content until is_final.
buf_key = chat_id
if not hasattr(self, '_synthetic_buffers'):
# Attribute-not-declared trick: pydantic forbids dynamic attrs
# on the model, but plain instance dicts via object.__setattr__
# do work. Lazy-create on first call.
object.__setattr__(self, '_synthetic_buffers', {})
buffers: dict[str, str] = self._synthetic_buffers
if content and not form_data:
buffers[buf_key] = buffers.get(buf_key, '') + content
if not is_final:
return {'stream': True, 'synthetic': True, 'buffered': True}
final_content = buffers.pop(buf_key, '')
if content and final_content.startswith(content):
# is_final chunk re-emitted the full accumulated text — keep
# whichever is longer.
final_content = final_content if len(final_content) >= len(content) else content
elif content and not final_content:
final_content = content
if not ws_mode:
await self.logger.warning(
'WeComBot webhook mode cannot proactively push synthetic-event '
'output (no corpid/secret); the resume reply is dropped. '
f'content_len={len(final_content)} form_data_present={form_data is not None}'
)
return {'stream': False, 'synthetic': True, 'dropped': True}
# ws mode: proactive send.
try:
if form_data:
# Determine user_id / chat_id for the routing context of any
# subsequent click on this card.
if isinstance(message_source, platform_events.GroupMessage):
routing_chat_id = str(message_source.group.id)
routing_user_id = str(message_source.sender.id)
else:
routing_chat_id = ''
routing_user_id = str(message_source.sender.id)
payload = self._build_button_interaction_payload_from_form(
form_data,
user_id=routing_user_id,
chat_id=routing_chat_id,
)
await self.bot.send_template_card(chat_id, payload)
await self.logger.info(
f'WeComBot ws: proactively sent template_card for synthetic event '
f'chat_id={chat_id} form_token={form_data.get("form_token")!r} '
f'workflow_run_id={form_data.get("workflow_run_id")!r}'
)
elif final_content:
await self.bot.send_message(chat_id, final_content)
await self.logger.info(
f'WeComBot ws: proactively sent text for synthetic event chat_id={chat_id} len={len(final_content)}'
)
except Exception:
await self.logger.error(f'WeComBot: synthetic event proactive send failed: {traceback.format_exc()}')
return {'stream': False, 'synthetic': True, 'error': True}
return {'stream': True, 'synthetic': True}
def _build_button_interaction_payload_from_form(
self, form_data: dict, *, user_id: str = '', chat_id: str = ''
) -> dict:
"""Build a button_interaction payload + track task_id for click resolution.
Unlike the inbound-event path (where push_form_pause registers the
task_id with the active stream session), proactive sends still
need the task_id registered so button clicks find pending_form.
For ws mode we stash it directly on the ws_client's pending dict.
"""
from langbot.libs.wecom_ai_bot_api.api import build_button_interaction_payload
import secrets as _secrets
task_id = f'dify-{_secrets.token_hex(12)}'
payload = build_button_interaction_payload(form_data, task_id)
# Register task_id → form_data so the click callback can find it.
# user_id / chat_id are required so _on_card_action can route the
# resulting synthetic query back to the right user. msg_id / req_id
# / stream_id are intentionally empty — synthetic cards have no
# inbound message to anchor on.
if hasattr(self.bot, '_pending_forms_by_task'):
self.bot._pending_forms_by_task[task_id] = {
'form_data': form_data,
'msg_id': '',
'user_id': user_id,
'chat_id': chat_id,
'stream_id': '',
'req_id': '',
}
return payload
async def send_message(self, target_type, target_id, message):
_ws_mode = not self.config.get('enable-webhook', False)
if _ws_mode:
@@ -531,3 +730,114 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
async def is_muted(self, group_id: int) -> bool:
pass
# ------------------------------------------------------------------
# Dify human-input button-interaction click handling
# ------------------------------------------------------------------
async def _on_card_action(self, session, action_id: str, task_id: str, raw_event: dict) -> None:
"""Translate a button click on a button_interaction card into a
synthetic ``_dify_form_action`` query enqueued on the pool.
Pattern mirrors DingTalk / Lark / Telegram so the runner's
``_merge_pending_form_action`` path resumes the workflow.
"""
import langbot_plugin.api.entities.builtin.provider.session as provider_session
form = session.pending_form or {}
await self.logger.info(
f'WeComBot _on_card_action: task_id={task_id} action_id={action_id!r} '
f'form_token={form.get("form_token")!r} workflow_run_id={form.get("workflow_run_id")!r} '
f'session.user_id={session.user_id!r} session.chat_id={session.chat_id!r}'
)
actions = form.get('actions') or []
clean_action_id = (action_id or '').strip()
action_title = clean_action_id
for a in actions:
if str(a.get('id', '')) == clean_action_id:
action_title = a.get('title') or clean_action_id
break
launcher_id = session.user_id or session.chat_id or ''
sender_user_id = session.user_id or launcher_id
# WeCom AI bot has both single-chat and group-chat; chat_id present
# indicates group context.
if session.chat_id:
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = session.chat_id
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = session.user_id or ''
form_action_data = {
'form_token': form.get('form_token', ''),
'workflow_run_id': form.get('workflow_run_id', ''),
'action_id': clean_action_id,
'action_title': action_title,
'node_title': form.get('node_title', ''),
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=sender_user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
special_title='',
),
message_chain=message_chain,
time=int(time.time()),
source_platform_object=None,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=sender_user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=int(time.time()),
source_platform_object=None,
)
if self.ap is None:
await self.logger.error('WeComBot: ap not injected; cannot enqueue button-click query')
return
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
try:
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
await self.logger.info(f'WeComBot: button-click query enqueued action_id={clean_action_id!r}')
except Exception:
await self.logger.error(f'WeComBot: enqueue button-click query failed: {traceback.format_exc()}')