From e6cfee541fcf1b6b8a3bc76ab67df8c27b4a6717 Mon Sep 17 00:00:00 2001 From: fdc310 <2213070223@qq.com> Date: Tue, 16 Jun 2026 19:37:16 +0800 Subject: [PATCH] feat(qqofficial): implement Dify human input button interaction handling and markdown keyboard support --- src/langbot/libs/qq_official_api/api.py | 270 ++++++++- .../pkg/platform/sources/qqofficial.py | 526 +++++++++++++++++- 2 files changed, 786 insertions(+), 10 deletions(-) diff --git a/src/langbot/libs/qq_official_api/api.py b/src/langbot/libs/qq_official_api/api.py index db3194b6c..7e2b8bcd0 100644 --- a/src/langbot/libs/qq_official_api/api.py +++ b/src/langbot/libs/qq_official_api/api.py @@ -12,6 +12,70 @@ import traceback from cryptography.hazmat.primitives.asymmetric import ed25519 +def build_keyboard_from_form(form_data: dict, *, buttons_per_row: int = 2) -> dict: + """Build a QQ keyboard JSON payload from a Dify human-input form_data. + + Each Dify ``action`` becomes a callback button (``action.type=1``) + whose ``data`` is set directly to the Dify ``action_id``. The + INTERACTION_CREATE event carries this back as + ``data.resolved.button_data`` so the adapter can match the click to + the originating form. + + Layout limits per spec: max 5 rows, max 5 buttons per row. We default + to 2 buttons per row for legibility; oversized button lists wrap + onto additional rows and overflow gets dropped (max 25 visible). + + Args: + form_data: Dify ``{"actions": [{"id", "title", "button_style"}, ...]}``. + buttons_per_row: 1..5. Mobile UI looks best at 2. + + Returns: + ``{"content": {"rows": [{"buttons": [...]}]}}``. + """ + actions = list(form_data.get('actions') or [])[:25] # 5×5 hard cap + buttons_per_row = max(1, min(5, buttons_per_row)) + + def _button(idx: int, action: dict) -> dict: + action_id = str(action.get('id') or '') + label = str(action.get('title') or action_id or f'选项 {idx + 1}') + style_raw = (action.get('button_style') or '').lower() + # QQ: 0 灰色线框, 1 蓝色线框. Highlight the primary / first action. + if style_raw == 'primary' or (style_raw == '' and idx == 0): + style = 1 + else: + style = 0 + return { + 'id': str(idx + 1), + 'render_data': { + 'label': label, + # Shown after the user clicks — gives local "已选择" feedback + # without a follow-up message. Style mimics DingTalk/Lark's + # in-card selection state. + 'visited_label': f'✓ {label}', + 'style': style, + }, + 'action': { + 'type': 1, # callback button + 'permission': {'type': 2}, # everyone can click + 'data': action_id, + 'unsupport_tips': '当前客户端版本不支持此按钮,请升级 QQ', + }, + } + + rows = [] + for row_start in range(0, len(actions), buttons_per_row): + row_actions = actions[row_start : row_start + buttons_per_row] + rows.append( + { + 'buttons': [_button(row_start + j, a) for j, a in enumerate(row_actions)], + } + ) + if len(rows) >= 5: + break + + return {'content': {'rows': rows}} + + class QQOfficialClient: def __init__(self, secret: str, token: str, app_id: str, logger: None, unified_mode: bool = False): self.unified_mode = unified_mode @@ -30,6 +94,10 @@ class QQOfficialClient: self.token = token self.app_id = app_id self._message_handlers = {} + # Single optional handler for INTERACTION_CREATE (button click). We + # don't multiplex like message handlers — only the adapter cares, + # and the click<->resume path needs a single source of truth. + self._interaction_handler: Optional[Callable[[Dict[str, Any], Optional[str]], Any]] = None self.base_url = 'https://api.sgroup.qq.com' self.access_token = '' self.access_token_expiry_time = None @@ -107,6 +175,23 @@ class QQOfficialClient: return response, 200 if payload.get('op') == 0: + # INTERACTION_CREATE (button click) skips ``get_message`` — + # that helper only flattens message-event fields and would + # drop ``data.resolved.button_data`` / ``data.button_id``. + if payload.get('t') == 'INTERACTION_CREATE': + if self._interaction_handler: + try: + d = payload.get('d') or {} + # Top-level ``id`` is the ws/event id used as + # ``event_id`` for passive replies. ``d.id`` + # is the interaction id used for ACK. Do not + # confuse the two — QQ rejects misuse with + # 40034025. + ws_event_id = payload.get('id') + await self._interaction_handler(d, ws_event_id) + except Exception: + await self.logger.error(f'Error in interaction handler: {traceback.format_exc()}') + return {'code': 0, 'message': 'success'} message_data = await self.get_message(payload) if message_data: event = QQOfficialEvent.from_payload(message_data) @@ -133,6 +218,21 @@ class QQOfficialClient: return decorator + def on_interaction(self): + """Register a single handler for INTERACTION_CREATE events. + + The handler receives ``(data_dict, interaction_id)`` — the raw + ``d`` payload plus the top-level ``id`` field (the interaction + id, needed for the PUT /interactions/{id} ack and for reuse as + an ``event_id`` on the resumed reply within 30 minutes). + """ + + def decorator(func: Callable[[Dict[str, Any], Optional[str]], Any]): + self._interaction_handler = func + return func + + return decorator + async def _handle_message(self, event: QQOfficialEvent): """处理消息事件""" msg_type = event.t @@ -177,8 +277,20 @@ class QQOfficialClient: content_type = attachment.get('content_type', '') return content_type.startswith('image/') - async def send_private_text_msg(self, user_openid: str, content: str, msg_id: str): - """发送私聊消息""" + async def send_private_text_msg( + self, + user_openid: str, + content: str, + msg_id: Optional[str] = None, + event_id: Optional[str] = None, + msg_seq: int = 1, + ): + """Send a c2c text message. + + Either ``msg_id`` (inbound user msg, free passive reply) or + ``event_id`` (e.g. INTERACTION_CREATE id, valid 30 min) is + required. Without either, the call costs the proactive-send quota. + """ if not await self.check_access_token(): await self.get_access_token() @@ -188,11 +300,15 @@ class QQOfficialClient: 'Authorization': f'QQBot {self.access_token}', 'Content-Type': 'application/json', } - data = { + data: dict[str, Any] = { 'content': content, 'msg_type': 0, - 'msg_id': msg_id, + 'msg_seq': msg_seq, } + if msg_id: + data['msg_id'] = msg_id + if event_id: + data['event_id'] = event_id response = await client.post(url, headers=headers, json=data) response_data = response.json() if response.status_code == 200: @@ -201,8 +317,19 @@ class QQOfficialClient: await self.logger.error(f'Failed to send private message: {response_data}') raise ValueError(response) - async def send_group_text_msg(self, group_openid: str, content: str, msg_id: str): - """发送群聊消息""" + async def send_group_text_msg( + self, + group_openid: str, + content: str, + msg_id: Optional[str] = None, + event_id: Optional[str] = None, + msg_seq: int = 1, + ): + """Send a group text message. + + Either ``msg_id`` or ``event_id`` is required (see + :meth:`send_private_text_msg` for the distinction). + """ if not await self.check_access_token(): await self.get_access_token() @@ -212,11 +339,15 @@ class QQOfficialClient: 'Authorization': f'QQBot {self.access_token}', 'Content-Type': 'application/json', } - data = { + data: dict[str, Any] = { 'content': content, 'msg_type': 0, - 'msg_id': msg_id, + 'msg_seq': msg_seq, } + if msg_id: + data['msg_id'] = msg_id + if event_id: + data['event_id'] = event_id response = await client.post(url, headers=headers, json=data) if response.status_code == 200: return @@ -485,6 +616,106 @@ class QQOfficialClient: raise Exception(f'Failed to send stream message: HTTP {response.status_code} {response.text}') return response.json() + async def send_markdown_keyboard( + self, + target_type: str, + target_id: str, + markdown_content: str, + keyboard: dict, + msg_id: Optional[str] = None, + event_id: Optional[str] = None, + msg_seq: int = 1, + ) -> dict: + """Send a ``msg_type=2`` (markdown) message carrying a keyboard. + + The keyboard ride-along is the only documented way to attach + buttons in QQ official; pure keyboard-only messages are not + accepted by the server (markdown content is required). + + Args: + target_type: 'c2c' (single chat), 'group', 'channel' (text + channel — uses POST /channels/{id}/messages instead of v2). + target_id: openid for c2c/group, channel_id for channel. + markdown_content: Plain markdown text shown above the buttons. + keyboard: ``{'content': {'rows': [{'buttons': [...]}]}}`` per + the official spec. Use :func:`build_keyboard_from_form` + to construct from Dify form_data. + msg_id: Inbound user message id; turns this into a passive + reply (preferred — no monthly quota cost). + event_id: Use ``INTERACTION_CREATE`` event id from a prior + button click to keep within the 30-minute passive window + without an inbound msg_id. + msg_seq: De-dup counter when reusing msg_id. + """ + if not await self.check_access_token(): + await self.get_access_token() + + if target_type == 'c2c': + url = f'{self.base_url}/v2/users/{target_id}/messages' + elif target_type == 'group': + url = f'{self.base_url}/v2/groups/{target_id}/messages' + elif target_type == 'channel': + url = f'{self.base_url}/channels/{target_id}/messages' + else: + raise ValueError(f'Unsupported target_type for markdown+keyboard: {target_type}') + + body: dict[str, Any] = { + 'msg_type': 2, + 'markdown': {'content': markdown_content}, + 'keyboard': keyboard, + 'msg_seq': msg_seq, + } + if msg_id: + body['msg_id'] = msg_id + if event_id: + body['event_id'] = event_id + + async with httpx.AsyncClient(timeout=30) as client: + headers = { + 'Authorization': f'QQBot {self.access_token}', + 'Content-Type': 'application/json', + } + response = await client.post(url, headers=headers, json=body) + if response.status_code != 200: + await self.logger.error( + f'Failed to send markdown+keyboard: HTTP {response.status_code} {response.text}' + ) + raise Exception(f'Failed to send markdown+keyboard: HTTP {response.status_code} {response.text}') + return response.json() + + async def ack_interaction(self, interaction_id: str, code: int = 0) -> None: + """Acknowledge a button-click INTERACTION_CREATE event. + + QQ keeps the client in a loading spinner until this ack is + received. Should be called as soon as the click is parsed, before + any heavier downstream work (the actual workflow resume can run + async). + + Args: + interaction_id: The ``id`` field from the INTERACTION_CREATE event. + code: 0=success, 1=fail, 2=rate-limited, 3=duplicate, 4=no + permission, 5=admin only. Default 0. + """ + if not interaction_id: + return + if not await self.check_access_token(): + await self.get_access_token() + + url = f'{self.base_url}/interactions/{interaction_id}' + async with httpx.AsyncClient(timeout=10) as client: + headers = { + 'Authorization': f'QQBot {self.access_token}', + 'Content-Type': 'application/json', + } + try: + response = await client.put(url, headers=headers, json={'code': code}) + if response.status_code >= 400: + await self.logger.warning( + f'ack_interaction non-success: HTTP {response.status_code} {response.text}' + ) + except Exception as e: + await self.logger.warning(f'ack_interaction error (non-fatal): {e}') + async def is_token_expired(self): """检查token是否过期""" if self.access_token_expiry_time is None: @@ -653,6 +884,12 @@ class QQOfficialClient: d = payload.get('d', {}) s = payload.get('s') t = payload.get('t') + # Top-level event id, distinct from `d.id`. Per QQ + # spec this is the only value accepted as ``event_id`` + # in subsequent passive-reply send-message calls + # (``d.id`` for INTERACTION_CREATE is the interaction + # id, used solely for PUT /interactions/{id} ack). + ws_event_id = payload.get('id') if not isinstance(d, dict): d = {} @@ -731,7 +968,22 @@ class QQOfficialClient: else: await self.logger.debug(f'Received event: {t}, seq={s}') - if on_event: + # INTERACTION_CREATE bypasses the regular + # on_event dispatcher so the adapter sees the + # top-level ws_event_id (needed as event_id + # for the resumed reply) — same shape as the + # webhook handler. + if t == 'INTERACTION_CREATE': + if self._interaction_handler: + try: + result = self._interaction_handler(d, ws_event_id) + if asyncio.iscoroutine(result): + await result + except Exception: + await self.logger.error( + f'Error in interaction handler (ws): {traceback.format_exc()}' + ) + elif on_event: try: result = on_event(t, d) if asyncio.iscoroutine(result): diff --git a/src/langbot/pkg/platform/sources/qqofficial.py b/src/langbot/pkg/platform/sources/qqofficial.py index 8af406972..1c2f6fe86 100644 --- a/src/langbot/pkg/platform/sources/qqofficial.py +++ b/src/langbot/pkg/platform/sources/qqofficial.py @@ -11,7 +11,7 @@ import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platf import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.entities as platform_entities -from langbot.libs.qq_official_api.api import QQOfficialClient +from langbot.libs.qq_official_api.api import QQOfficialClient, build_keyboard_from_form from langbot.libs.qq_official_api.qqofficialevent import QQOfficialEvent from ...utils import image from ..logger import EventLogger @@ -191,6 +191,7 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter enable_webhook: bool = False message_converter: QQOfficialMessageConverter = QQOfficialMessageConverter() event_converter: QQOfficialEventConverter = QQOfficialEventConverter() + ap: typing.Any = None def __init__(self, config: dict, logger: EventLogger): enable_webhook = config.get('enable-webhook', False) @@ -216,6 +217,31 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter self._stream_ctx_ts: dict[str, float] = {} self._fallback_text: dict[str, str] = {} self._fallback_text_ts: dict[str, float] = {} + # Dify form-action bookkeeping for the human-input button flow. + # session_key = "_" where scene is c2c/group/channel and + # id is user_openid / group_openid / channel_id. + # session_key -> {form_data, msg_id, event_id, scene, target_id, + # sender_id, posted_at} + # Set when we send a markdown+keyboard card and consulted when: + # (a) INTERACTION_CREATE fires — we look up the form by + # session_key (button's `data` carries the action_id), + # (b) the resumed-workflow query needs to find a passive-reply + # event_id (INTERACTION_CREATE id, 30-min validity). + self._pending_forms: dict[str, dict] = {} + # session_key -> most recent ``INTERACTION_CREATE`` event_id, used + # as the passive event_id for the resumed query's LLM output. + self._session_event_ids: dict[str, dict] = {} + # Per-anchor msg_seq counter. QQ accepts up to 5 passive replies + # per (msg_id|event_id) within 60 min, but each reuse needs a + # fresh ``msg_seq`` — re-sending with msg_seq=1 is silently dedup'd. + self._anchor_msg_seq: dict[str, int] = {} + + # Wire button-click handler so webhook mode catches INTERACTION_CREATE. + # (ws mode is wired separately via on_event in _run_websocket so the + # raw payload bypasses get_message's message-only flattening.) + @self.bot.on_interaction() + async def _on_interaction(event_data: dict, interaction_id: typing.Optional[str]): + await self._handle_interaction_create(event_data, interaction_id) async def reply_message( self, @@ -227,6 +253,13 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter message_source, ) + # Synthetic event (button-click resume): no inbound platform + # object → no msg_id. Route via the cached INTERACTION_CREATE + # event_id (valid 30 min, no quota cost). + if qq_official_event is None: + await self._reply_synthetic(message_source, message) + return + content_list = await QQOfficialMessageConverter.yiri2target(message) # 确定 target_type 和 target_id @@ -376,6 +409,9 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter await self.logger.info('QQ Official WebSocket connected and ready') async def on_event(event_type: str, event_data: dict): + # INTERACTION_CREATE is dispatched via bot.on_interaction() + # (registered in __init__) so we get the top-level ws_event_id + # — needed as the passive-reply event_id. It never reaches here. # 只处理消息事件,忽略 READY/RESUMED 等系统事件 message_event_types = { 'C2C_MESSAGE_CREATE', @@ -439,6 +475,12 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter async def create_message_card(self, message_id: str, event: platform_events.MessageEvent) -> bool: source = event.source_platform_object + # Synthetic events (button-click resume) have no source object — + # they ride a cached INTERACTION_CREATE event_id, not a streamable + # msg_id. Skip stream setup; reply_message handles the one-shot + # send at is_final. + if source is None: + return False # Streaming API only supports C2C private chat if source.t != 'C2C_MESSAGE_CREATE': return False @@ -469,6 +511,29 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter ): # Periodically clean up stale stream contexts await self._cleanup_stale_streams() + + # Dify human-input pause: when the runner attaches `_form_data` to + # the final chunk, finalize any in-flight stream session and send + # a markdown + keyboard message instead. Plain-text content from + # earlier chunks is already on the stream; we close it cleanly + # and the buttons land as a separate reply. + form_data = getattr(bot_message, '_form_data', None) if not isinstance(bot_message, dict) else None + if is_final: + _resume = getattr(bot_message, '_resume_from_form', None) if not isinstance(bot_message, dict) else None + _open_new = getattr(bot_message, '_open_new_card', None) if not isinstance(bot_message, dict) else None + if self.ap is not None: + self.ap.logger.info( + f'QQ Official reply_message_chunk final: ' + f'type={type(bot_message).__name__} ' + f'is_final={is_final} ' + f'form_data_present={form_data is not None} ' + f'resume_from_form={_resume} open_new_card={_open_new} ' + f'content_len={len(getattr(bot_message, "content", "") or "")}' + ) + if form_data and is_final: + await self._handle_form_chunk(message_source, message, form_data) + return + # 提取纯文本内容(当前 chunk 的文本) text_parts = [] for msg in message: @@ -557,3 +622,462 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter ], ): return super().unregister_listener(event_type, callback) + + # ------------------------------------------------------------------ + # Dify human-input button-interaction support + # ------------------------------------------------------------------ + + _PENDING_FORM_TTL = 1800 # 30 min — matches QQ passive-reply window. + _MAX_REPLIES_PER_ANCHOR = 5 # QQ hard limit per msg_id / event_id. + + def _next_msg_seq(self, anchor: str) -> typing.Optional[int]: + """Return the next msg_seq for an anchor, or ``None`` if the + anchor has already been used 5 times (further sends would be + silently dropped by QQ).""" + if not anchor: + return 1 + used = self._anchor_msg_seq.get(anchor, 0) + if used >= self._MAX_REPLIES_PER_ANCHOR: + return None + self._anchor_msg_seq[anchor] = used + 1 + return used + 1 + + async def _reply_synthetic( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + ) -> None: + """Deliver a reply for a synthetic (button-click-resume) event. + + Synthetic events have ``source_platform_object=None`` and no + fresh inbound msg_id. The previous INTERACTION_CREATE id we + cached in :attr:`_session_event_ids` is a valid passive-reply + anchor (``event_id``) for up to 30 minutes — use it. + """ + if isinstance(message_source, platform_events.GroupMessage): + target_type = 'group' + group = getattr(message_source, 'group', None) or ( + message_source.sender.group if hasattr(message_source.sender, 'group') else None + ) + target_id = str(group.id) if group else None + else: + target_type = 'c2c' + target_id = str(message_source.sender.id) if message_source.sender else None + + if not target_id: + await self.logger.warning('QQ Official: synthetic reply has no target_id; dropping') + return + + session_key = f'{target_type}_{target_id}' + cached = self._session_event_ids.get(session_key) + event_id = cached.get('event_id') if cached else None + if cached and (time.time() - cached.get('posted_at', 0)) > self._PENDING_FORM_TTL: + event_id = None + + if not event_id: + await self.logger.warning( + f'QQ Official: no cached event_id for {session_key}; ' + f'cannot deliver synthetic reply within passive-reply window' + ) + return + + content_list = await QQOfficialMessageConverter.yiri2target(message) + text_parts = [c['content'] for c in content_list if c.get('type') == 'text' and c.get('content')] + if not text_parts: + await self.logger.info('QQ Official: synthetic reply has no text content; skipping') + return + text = '\n\n'.join(text_parts) + + msg_seq = self._next_msg_seq(event_id) + if msg_seq is None: + await self.logger.warning( + f'QQ Official: anchor {event_id!r} exhausted (>5 passive replies); ' + f'cannot deliver synthetic reply for {session_key}' + ) + return + + try: + if target_type == 'c2c': + await self.bot.send_private_text_msg( + user_openid=target_id, + content=text, + event_id=event_id, + msg_seq=msg_seq, + ) + elif target_type == 'group': + await self.bot.send_group_text_msg( + group_openid=target_id, + content=text, + event_id=event_id, + msg_seq=msg_seq, + ) + except Exception: + await self.logger.error(f'QQ Official: synthetic reply delivery failed: {traceback.format_exc()}') + + def _resolve_target_from_source(self, source: QQOfficialEvent) -> typing.Optional[tuple[str, str]]: + """Return ``(target_type, target_id)`` for sending a reply, or + ``None`` if the scene cannot host a markdown+keyboard message.""" + if source is None: + return None + if source.t == 'C2C_MESSAGE_CREATE': + return 'c2c', source.user_openid + if source.t == 'GROUP_AT_MESSAGE_CREATE': + return 'group', source.group_openid + if source.t == 'AT_MESSAGE_CREATE': + return 'channel', source.channel_id + # DIRECT_MESSAGE_CREATE uses the guild DM API which does not accept + # markdown+keyboard at the time of writing — caller falls back to text. + return None + + def _resolve_target_from_event( + self, message_source: platform_events.MessageEvent + ) -> typing.Optional[tuple[str, str]]: + """Resolve ``(target_type, target_id)`` from the public event. + + Prefers the platform-native source when present; falls back to + the synthesized event's sender/group fields so button-click + resume queries can still find a destination. + """ + source = message_source.source_platform_object + if source is not None: + return self._resolve_target_from_source(source) + if isinstance(message_source, platform_events.GroupMessage): + group = getattr(message_source, 'group', None) or ( + message_source.sender.group + if message_source.sender and hasattr(message_source.sender, 'group') + else None + ) + if group and getattr(group, 'id', None): + return 'group', str(group.id) + if isinstance(message_source, platform_events.FriendMessage): + if message_source.sender and getattr(message_source.sender, 'id', None): + return 'c2c', str(message_source.sender.id) + return None + + def _prune_pending_forms(self) -> None: + now = time.time() + stale = [k for k, v in self._pending_forms.items() if now - v.get('posted_at', 0) > self._PENDING_FORM_TTL] + for k in stale: + self._pending_forms.pop(k, None) + stale_e = [ + k for k, v in self._session_event_ids.items() if now - v.get('posted_at', 0) > self._PENDING_FORM_TTL + ] + for k in stale_e: + self._session_event_ids.pop(k, None) + + async def _handle_form_chunk( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + form_data: dict, + ) -> None: + """Send the markdown + keyboard form prompt for a Dify pause. + + Called from ``reply_message_chunk`` when the runner attaches + ``_form_data`` to the final chunk. Replaces what would otherwise + be a plain-text numbered-list fallback. + """ + if self.ap is not None: + self.ap.logger.info( + f'QQ Official _handle_form_chunk entered; ' + f'source_present={message_source.source_platform_object is not None} ' + f'form_actions={len(form_data.get("actions") or [])}' + ) + self._prune_pending_forms() + + source = message_source.source_platform_object + scene_target = self._resolve_target_from_event(message_source) + if scene_target is None: + # No rich-UI fit — fall through to existing text path. + await self.logger.info('QQ Official: form chunk on unsupported scene; falling back to text') + text_parts = [m.text for m in message if type(m) is platform_message.Plain] + fallback_msg = platform_message.MessageChain([platform_message.Plain(text='\n\n'.join(text_parts))]) + try: + await self.reply_message(message_source, fallback_msg) + except Exception: + await self.logger.error(f'QQ Official: form fallback text send failed: {traceback.format_exc()}') + return + + target_type, target_id = scene_target + session_key = f'{target_type}_{target_id}' + + # Cancel any in-flight stream / fallback ctx so plain-text prefix + # doesn't continue alongside the keyboard message. + msg_id = getattr(source, 'd_id', '') or '' if source is not None else '' + if msg_id: + self._stream_ctx.pop(msg_id, None) + self._stream_ctx_ts.pop(msg_id, None) + self._fallback_text.pop(msg_id, None) + self._fallback_text_ts.pop(msg_id, None) + + node_title = form_data.get('node_title') or 'Confirmation needed' + form_content = form_data.get('form_content') or '' + parts = [f'### {node_title}'] + if form_content.strip(): + parts.append(form_content.strip()) + parts.append('请点击下方按钮选择:') + markdown_content = '\n\n'.join(parts) + + keyboard = build_keyboard_from_form(form_data, buttons_per_row=2) + if not keyboard.get('content', {}).get('rows'): + # No actions to render — fall back to plain text. + text_msg = platform_message.MessageChain([platform_message.Plain(text=markdown_content)]) + try: + await self.reply_message(message_source, text_msg) + except Exception: + await self.logger.error(f'QQ Official: empty-keyboard fallback send failed: {traceback.format_exc()}') + return + + # Prefer the inbound msg_id (no quota cost). If the source is a + # synthetic event from a prior click, the cached interaction id + # serves as event_id for up to 30 min. + event_id = None + if not msg_id: + cached = self._session_event_ids.get(session_key) + if cached and (time.time() - cached.get('posted_at', 0)) < self._PENDING_FORM_TTL: + event_id = cached.get('event_id') + + anchor = msg_id or event_id or '' + msg_seq = self._next_msg_seq(anchor) + if msg_seq is None: + await self.logger.warning( + f'QQ Official: anchor {anchor!r} exhausted (>5 passive replies); ' + f'cannot deliver form card for session={session_key}' + ) + return + + try: + await self.bot.send_markdown_keyboard( + target_type=target_type, + target_id=target_id, + markdown_content=markdown_content, + keyboard=keyboard, + msg_id=msg_id if (msg_id and not event_id) else None, + event_id=event_id, + msg_seq=msg_seq, + ) + if self.ap is not None: + self.ap.logger.info( + f'QQ Official: form card sent ' + f'target={target_type}/{target_id} ' + f'msg_id={msg_id!r} event_id={event_id!r} msg_seq={msg_seq}' + ) + except Exception: + if self.ap is not None: + self.ap.logger.error( + f'QQ Official: send_markdown_keyboard failed, falling back to text: {traceback.format_exc()}' + ) + await self.logger.error( + f'QQ Official: send_markdown_keyboard failed, falling back to text: {traceback.format_exc()}' + ) + text_msg = platform_message.MessageChain([platform_message.Plain(text=markdown_content)]) + try: + await self.reply_message(message_source, text_msg) + except Exception: + pass + return + + sender_id = '' + if source is not None: + sender_id = ( + getattr(source, 'user_openid', None) + or getattr(source, 'member_openid', None) + or getattr(source, 'd_author_id', None) + or '' + ) + if not sender_id and message_source.sender is not None: + sender_id = str(getattr(message_source.sender, 'id', '') or '') + self._pending_forms[session_key] = { + 'form_data': form_data, + 'msg_id': msg_id, + 'sender_id': sender_id, + 'target_type': target_type, + 'target_id': target_id, + 'source_event_t': source.t if source is not None else None, + 'posted_at': time.time(), + } + await self.logger.info( + f'QQ Official: form posted session={session_key} actions={len(form_data.get("actions") or [])}' + ) + + async def _handle_interaction_create( + self, + event_data: dict, + ws_event_id: typing.Optional[str] = None, + ) -> None: + """Handle a button-click INTERACTION_CREATE event. + + Two IDs at play (QQ keeps them separate): + ws_event_id top-level payload ``id`` (or webhook ``X-Bot- + Event-Id``). The ONLY value accepted as + ``event_id`` for subsequent passive replies. + d['id'] the interaction id — used for PUT + /interactions/{id} ack. Cannot be reused as + event_id (QQ returns 40034025 if you try). + + Layout (https://bot.q.qq.com/.../msg-btn.html): + chat_type 0 channel / 1 group / 2 c2c + data.resolved.button_data what we set as ``action.data`` + data.resolved.button_id ``id`` field on the button row + """ + import langbot_plugin.api.entities.builtin.provider.session as provider_session + + if self.ap is not None: + self.ap.logger.info( + f'QQ Official _handle_interaction_create entered; ' + f'ws_event_id={ws_event_id!r} ' + f'interaction_id={(event_data.get("id") if isinstance(event_data, dict) else None)!r} ' + f'chat_type={event_data.get("chat_type") if isinstance(event_data, dict) else None}' + ) + + if not isinstance(event_data, dict): + await self.logger.warning(f'QQ Official: INTERACTION_CREATE event_data is not dict: {type(event_data)}') + return + + # ACK uses the interaction id, NOT the ws event id. + interaction_id = event_data.get('id') or '' + if interaction_id: + asyncio.create_task(self.bot.ack_interaction(interaction_id, code=0)) + + resolved = (event_data.get('data') or {}).get('resolved') or {} + action_id = str(resolved.get('button_data') or resolved.get('button_id') or '').strip() + if not action_id: + await self.logger.warning('QQ Official: INTERACTION_CREATE missing button_data/button_id; ignoring') + return + + chat_type = event_data.get('chat_type') + scene_target: typing.Optional[tuple[str, str]] = None + if chat_type == 2 or event_data.get('user_openid'): + scene_target = ('c2c', event_data.get('user_openid') or '') + elif chat_type == 1 or event_data.get('group_openid'): + scene_target = ('group', event_data.get('group_openid') or '') + elif chat_type == 0 or event_data.get('channel_id'): + scene_target = ('channel', event_data.get('channel_id') or '') + + if not scene_target or not scene_target[1]: + await self.logger.warning(f'QQ Official: INTERACTION_CREATE missing scene/target; raw={event_data}') + return + + target_type, target_id = scene_target + session_key = f'{target_type}_{target_id}' + + self._prune_pending_forms() + pending = self._pending_forms.pop(session_key, None) + if not pending: + await self.logger.warning( + f'QQ Official: no pending form for session {session_key}; click ignored (action_id={action_id!r})' + ) + return + + # Cache ws_event_id so a follow-up pause / text reply can use it + # as event_id for passive delivery (30-min window). Falls back to + # the interaction_id only if no ws_event_id was provided (e.g. + # tests / older payload shape) — QQ will reject that value but + # we log so the mismatch is debuggable. + cached_event_id = ws_event_id or interaction_id + if cached_event_id: + self._session_event_ids[session_key] = { + 'event_id': cached_event_id, + 'posted_at': time.time(), + } + # New anchor → fresh 5-reply budget. + self._anchor_msg_seq[cached_event_id] = 0 + if self.ap is not None and not ws_event_id: + self.ap.logger.warning( + 'QQ Official: INTERACTION_CREATE lacked ws_event_id; ' + 'falling back to interaction_id (passive reply may be rejected)' + ) + + form_data: dict = pending.get('form_data') or {} + actions = form_data.get('actions') or [] + matched = next( + (a for a in actions if str(a.get('id', '')) == action_id), + None, + ) + action_title = (matched or {}).get('title') or action_id + + sender_id = pending.get('sender_id') or event_data.get('user_openid') or event_data.get('member_openid') or '' + + # Build resume payload matching the shape every other adapter uses + # (DingTalk / Lark / Telegram / WeCom). The runner's + # _merge_pending_form_action consumes this verbatim. + if target_type == 'group' or target_type == 'channel': + launcher_type = provider_session.LauncherTypes.GROUP + launcher_id = target_id + else: + launcher_type = provider_session.LauncherTypes.PERSON + launcher_id = sender_id or target_id + + form_action_data = { + 'form_token': form_data.get('form_token', ''), + 'workflow_run_id': form_data.get('workflow_run_id', ''), + 'action_id': action_id, + 'action_title': action_title, + 'node_title': form_data.get('node_title', ''), + 'user': f'{launcher_type.value}_{launcher_id}', + 'inputs': {}, + } + + message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')]) + + if launcher_type == provider_session.LauncherTypes.GROUP: + synthetic_event: platform_events.MessageEvent = platform_events.GroupMessage( + sender=platform_entities.GroupMember( + id=sender_id or launcher_id, + member_name='', + permission='MEMBER', + group=platform_entities.Group( + id=launcher_id, + name='', + permission=platform_entities.Permission.Member, + ), + special_title='', + ), + message_chain=message_chain, + time=int(time.time()), + source_platform_object=None, + ) + else: + synthetic_event = platform_events.FriendMessage( + sender=platform_entities.Friend( + id=sender_id or launcher_id, + nickname='', + remark='', + ), + message_chain=message_chain, + time=int(time.time()), + source_platform_object=None, + ) + + if self.ap is None: + await self.logger.error('QQ Official: ap not injected; cannot enqueue button-click query') + return + + bot_uuid = '' + pipeline_uuid = None + for bot in self.ap.platform_mgr.bots: + if bot.adapter is self: + bot_uuid = bot.bot_entity.uuid + pipeline_uuid = bot.bot_entity.use_pipeline_uuid + break + + try: + await self.ap.query_pool.add_query( + bot_uuid=bot_uuid, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=sender_id or launcher_id, + message_event=synthetic_event, + message_chain=message_chain, + adapter=self, + pipeline_uuid=pipeline_uuid, + variables={ + '_dify_form_action': form_action_data, + '_routed_by_rule': True, + }, + ) + await self.logger.info( + f'QQ Official: button-click query enqueued action_id={action_id!r} session={session_key}' + ) + except Exception: + await self.logger.error(f'QQ Official: enqueue button-click query failed: {traceback.format_exc()}')