From c7efa4dd7f93f45cb86ad859ee9292e2a56883b7 Mon Sep 17 00:00:00 2001 From: fdc310 <82008029+fdc310@users.noreply.github.com> Date: Fri, 3 Apr 2026 15:03:41 +0800 Subject: [PATCH] feat: add wecombot ws on_feedback (#2098) * feat: add wecombot ws on_feedback * feat:lark on_feedback but bug * feat: Add lark feedback processing function and event handling logic --- .../libs/wecom_ai_bot_api/ws_client.py | 107 ++++++++++++++++-- src/langbot/pkg/platform/sources/lark.py | 107 +++++++++++++++++- src/langbot/pkg/platform/sources/wecombot.py | 5 + 3 files changed, 208 insertions(+), 11 deletions(-) diff --git a/src/langbot/libs/wecom_ai_bot_api/ws_client.py b/src/langbot/libs/wecom_ai_bot_api/ws_client.py index 332b4eb9..5125a704 100644 --- a/src/langbot/libs/wecom_ai_bot_api/ws_client.py +++ b/src/langbot/libs/wecom_ai_bot_api/ws_client.py @@ -20,7 +20,7 @@ from typing import Any, Callable, Optional import aiohttp from langbot.libs.wecom_ai_bot_api import wecombotevent -from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message +from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession from langbot.pkg.platform.logger import EventLogger DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com' @@ -96,6 +96,12 @@ class WecomBotWsClient: self._stream_ids: dict[str, str] = {} # msg_id -> req_id|stream_id # Dedup: skip sending when content hasn't changed self._stream_last_content: dict[str, str] = {} # msg_id -> last content sent + # Stream session info for feedback tracking + self._stream_sessions: dict[str, dict] = {} # msg_id -> session info + # Feedback tracking: feedback_id -> session info + self._feedback_sessions: dict[str, dict] = {} # feedback_id -> {msg_id, user_id, chat_id, stream_id, req_id} + # msg_id -> feedback_id (for associating feedback with message) + self._msg_feedback_ids: dict[str, str] = {} # msg_id -> feedback_id # ── Public API ────────────────────────────────────────────────── @@ -164,12 +170,27 @@ class WecomBotWsClient: return decorator + def on_feedback(self) -> Callable: + """Decorator to register a feedback event handler. + + Same interface as WecomBotClient.on_feedback for compatibility. + """ + + def decorator(func: Callable): + if 'feedback' not in self._message_handlers: + self._message_handlers['feedback'] = [] + self._message_handlers['feedback'].append(func) + return func + + return decorator + async def reply_stream( self, req_id: str, stream_id: str, content: str, finish: bool = False, + feedback_id: str = '', ) -> Optional[dict]: """Send a streaming reply frame. @@ -178,17 +199,22 @@ class WecomBotWsClient: stream_id: The stream ID for this streaming session. content: The content to send (supports Markdown). finish: Whether this is the final chunk. + feedback_id: Optional feedback ID for receiving user feedback (like/dislike). Returns: The ACK frame dict, or None on failure. """ + stream_payload = { + 'id': stream_id, + 'finish': finish, + 'content': content, + } + if feedback_id: + stream_payload['feedback'] = {'id': feedback_id} + body = { 'msgtype': 'stream', - 'stream': { - 'id': stream_id, - 'finish': finish, - 'content': content, - }, + 'stream': stream_payload, } return await self._send_reply(req_id, body) @@ -253,11 +279,23 @@ class WecomBotWsClient: # Skip sending if content hasn't changed (e.g. during tool call argument streaming) if not is_final and content == self._stream_last_content.get(msg_id): return True - await self.reply_stream(req_id, stream_id, content, finish=is_final) + + # Generate feedback_id for final chunk + feedback_id = '' + if is_final: + feedback_id = _generate_req_id('feedback') + self._msg_feedback_ids[msg_id] = feedback_id + # Store session info for feedback tracking + session_info = self._stream_sessions.get(msg_id) + if session_info: + self._feedback_sessions[feedback_id] = session_info + + await self.reply_stream(req_id, stream_id, content, finish=is_final, feedback_id=feedback_id) self._stream_last_content[msg_id] = content if is_final: self._stream_ids.pop(msg_id, None) self._stream_last_content.pop(msg_id, None) + self._stream_sessions.pop(msg_id, None) return True except Exception: await self.logger.error(f'Failed to push stream chunk: {traceback.format_exc()}') @@ -445,6 +483,15 @@ class WecomBotWsClient: msg_id = message_data.get('msgid', '') if msg_id: self._stream_ids[msg_id] = f'{req_id}|{stream_id}' + # Store session info for feedback tracking + self._stream_sessions[msg_id] = { + 'req_id': req_id, + 'stream_id': stream_id, + 'msg_id': msg_id, + 'user_id': message_data.get('userid', ''), + 'chat_id': message_data.get('chatid', ''), + 'chat_type': message_data.get('type', 'single'), + } message_data['stream_id'] = stream_id message_data['req_id'] = req_id @@ -454,7 +501,7 @@ class WecomBotWsClient: await self.logger.error(f'Error in message callback: {traceback.format_exc()}') async def _handle_event_callback(self, frame: dict): - """Handle an incoming event callback frame (enter_chat, template_card_event, etc.).""" + """Handle an incoming event callback frame (enter_chat, template_card_event, feedback_event, disconnected_event).""" try: body = frame.get('body', {}) req_id = frame.get('headers', {}).get('req_id', '') @@ -479,14 +526,54 @@ class WecomBotWsClient: if body.get('chatid'): message_data['chatid'] = body.get('chatid', '') + if event_type == 'feedback_event': + feedback_event = event_info.get('feedback_event', {}) + feedback_id = feedback_event.get('id', '') + feedback_type = feedback_event.get('type', 0) + feedback_content = feedback_event.get('content', '') + inaccurate_reasons = feedback_event.get('inaccurate_reason_list', []) + + await self.logger.info( + f'收到用户反馈事件: feedback_id={feedback_id}, type={feedback_type}, ' + f'content={feedback_content}, reasons={inaccurate_reasons}' + ) + + # Look up session by feedback_id + session_info = self._feedback_sessions.get(feedback_id) + session = None + if session_info: + session = StreamSession( + stream_id=session_info.get('stream_id', ''), + msg_id=session_info.get('msg_id', ''), + chat_id=session_info.get('chat_id') or None, + user_id=session_info.get('user_id') or None, + feedback_id=feedback_id, + ) + await self.logger.info( + f'反馈关联到会话: stream_id={session.stream_id}, msg_id={session.msg_id}, user_id={session.user_id}' + ) + else: + await self.logger.warning(f'未找到 feedback_id={feedback_id} 对应的会话') + + for handler in self._message_handlers.get('feedback', []): + try: + await handler( + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_content, + inaccurate_reasons=inaccurate_reasons, + session=session, + ) + except Exception: + await self.logger.error(f'Error in feedback handler: {traceback.format_exc()}') + return + event = wecombotevent.WecomBotEvent(message_data) - # Dispatch to event-specific handlers if event_type in self._message_handlers: for handler in self._message_handlers[event_type]: await handler(event) - # Also dispatch to generic 'event' handlers if 'event' in self._message_handlers: for handler in self._message_handlers['event']: await handler(event) diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py index c115243b..ad9ae163 100644 --- a/src/langbot/pkg/platform/sources/lark.py +++ b/src/langbot/pkg/platform/sources/lark.py @@ -797,8 +797,65 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): asyncio.create_task(on_message(event)) + def sync_on_card_action(event): + try: + action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {}) + action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else '' + + if action_value == '有帮助': + feedback_type = 1 + elif action_value == '无帮助': + feedback_type = 2 + else: + from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse + + return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}}) + + operator = getattr(event.event, 'operator', None) + context = getattr(event.event, 'context', None) + + user_id = getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) + open_chat_id = getattr(context, 'open_chat_id', None) + open_message_id = getattr(context, 'open_message_id', None) + + if open_chat_id: + session_id = f'group_{open_chat_id}' + elif user_id: + session_id = f'person_{user_id}' + else: + session_id = None + + feedback_event = platform_events.FeedbackEvent( + feedback_id=getattr(event.header, 'event_id', str(uuid.uuid4())), + feedback_type=feedback_type, + feedback_content=action_value, + user_id=user_id, + session_id=session_id, + message_id=open_message_id, + source_platform_object=event, + ) + + if platform_events.FeedbackEvent in self.listeners: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self)) + else: + loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self)) + + from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse + + return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}}) + except Exception: + asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}')) + from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse + + return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}}) + event_handler = ( - lark_oapi.EventDispatcherHandler.builder('', '').register_p2_im_message_receive_v1(sync_on_message).build() + lark_oapi.EventDispatcherHandler.builder('', '') + .register_p2_im_message_receive_v1(sync_on_message) + .register_p2_card_action_trigger(sync_on_card_action) + .build() ) bot_account_id = config['bot_name'] @@ -1088,6 +1145,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): 'size': 'medium', 'icon': {'tag': 'standard_icon', 'token': 'thumbsup_outlined'}, 'hover_tips': {'tag': 'plain_text', 'content': '有帮助'}, + 'behaviors': [{'type': 'callback', 'value': {'feedback': '有帮助'}}], 'margin': '0px 0px 0px 0px', } ], @@ -1111,6 +1169,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): 'size': 'medium', 'icon': {'tag': 'standard_icon', 'token': 'thumbdown_outlined'}, 'hover_tips': {'tag': 'plain_text', 'content': '无帮助'}, + 'behaviors': [{'type': 'callback', 'value': {'feedback': '无帮助'}}], 'margin': '0px 0px 0px 0px', } ], @@ -1472,6 +1531,52 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): if event.__class__ in self.listeners: await self.listeners[event.__class__](event, self) + elif 'card.action.trigger' == type: + try: + event_data = data.get('event', {}) + operator = event_data.get('operator', {}) + action = event_data.get('action', {}) + context_data = event_data.get('context', {}) + + action_value_obj = action.get('value', {}) + action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else '' + + if action_value == '有帮助': + feedback_type = 1 + elif action_value == '无帮助': + feedback_type = 2 + else: + return {'toast': {'type': 'success', 'content': '操作成功'}} + + user_id = operator.get('open_id') or operator.get('user_id') + open_chat_id = context_data.get('open_chat_id') + open_message_id = context_data.get('open_message_id') + + if open_chat_id: + session_id = f'group_{open_chat_id}' + elif user_id: + session_id = f'person_{user_id}' + else: + session_id = None + + feedback_event = platform_events.FeedbackEvent( + feedback_id=data.get('header', {}).get('event_id', str(uuid.uuid4())), + feedback_type=feedback_type, + feedback_content=action_value, + user_id=user_id, + session_id=session_id, + message_id=open_message_id, + source_platform_object=data, + ) + + if platform_events.FeedbackEvent in self.listeners: + await self.listeners[platform_events.FeedbackEvent](feedback_event, self) + + return {'toast': {'type': 'success', 'content': '感谢您的反馈'}} + except Exception: + await self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}') + return {'toast': {'type': 'error', 'content': '反馈处理失败'}} + elif 'im.chat.member.bot.added_v1' == type: try: bot_added_welcome_msg = self.config.get('bot_added_welcome', '') diff --git a/src/langbot/pkg/platform/sources/wecombot.py b/src/langbot/pkg/platform/sources/wecombot.py index e97b5b95..d102f1ce 100644 --- a/src/langbot/pkg/platform/sources/wecombot.py +++ b/src/langbot/pkg/platform/sources/wecombot.py @@ -343,6 +343,11 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): message_id = session.msg_id stream_id = session.stream_id + await self.logger.info( + f'Feedback event: feedback_id={feedback_id}, type={feedback_type}, ' + f'session_id={session_id}, user_id={user_id}, message_id={message_id}' + ) + event = platform_events.FeedbackEvent( feedback_id=feedback_id, feedback_type=feedback_type,