diff --git a/.gitignore b/.gitignore index 7d870c33..557d6e3d 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,11 @@ src/langbot/web/ /dist /build *.egg-info + +# Docker 部署产生的本地文件 +docker/data/ +docker/docker-compose.override.yaml + +# 备份目录 +LangBot_backup_*/ +*.bak diff --git a/Dockerfile b/Dockerfile index b30d21b0..6e00dc2c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,14 +10,18 @@ FROM python:3.12.7-slim WORKDIR /app +# Use Chinese mirror for faster and more reliable package downloads +RUN sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list.d/debian.sources 2>/dev/null || \ + sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list 2>/dev/null || true + COPY . . COPY --from=node /app/web/out ./web/out RUN apt update \ && apt install gcc -y \ - && python -m pip install --no-cache-dir uv \ - && uv sync \ + && python -m pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple uv \ + && uv sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple \ && touch /.dockerenv CMD [ "uv", "run", "--no-sync", "main.py" ] \ No newline at end of file diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 6971477d..d3ba8ad9 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -34,4 +34,4 @@ services: networks: langbot_network: - driver: bridge + driver: bridge \ No newline at end of file diff --git a/src/langbot/libs/wecom_ai_bot_api/api.py b/src/langbot/libs/wecom_ai_bot_api/api.py index 70b6c79c..765bab0d 100644 --- a/src/langbot/libs/wecom_ai_bot_api/api.py +++ b/src/langbot/libs/wecom_ai_bot_api/api.py @@ -64,6 +64,9 @@ class StreamSession: # 缓存最近一次片段,处理重试或超时兜底 last_chunk: Optional[StreamChunk] = None + # 反馈 ID,用于接收用户点赞/点踩反馈 + feedback_id: Optional[str] = None + class StreamSessionManager: """管理 stream 会话的生命周期,并负责队列的生产消费。""" @@ -74,6 +77,7 @@ class StreamSessionManager: self.ttl = ttl # 超时时间(秒),超过该时间未被访问的会话会被清理由 cleanup self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射 self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话 + self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射 def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]: if not msg_id: @@ -83,6 +87,32 @@ class StreamSessionManager: def get_session(self, stream_id: str) -> Optional[StreamSession]: return self._sessions.get(stream_id) + def get_session_by_feedback_id(self, feedback_id: str) -> Optional[StreamSession]: + """根据 feedback_id 查找会话。 + + Args: + feedback_id: 企业微信反馈事件中的反馈 ID。 + + Returns: + Optional[StreamSession]: 找到的会话实例,未找到返回 None。 + """ + if not feedback_id: + return None + stream_id = self._feedback_index.get(feedback_id) + if stream_id: + return self._sessions.get(stream_id) + return None + + def register_feedback_id(self, stream_id: str, feedback_id: str) -> None: + """注册 feedback_id 与 stream_id 的映射。 + + Args: + stream_id: 企业微信流式会话 ID。 + feedback_id: 反馈 ID。 + """ + if feedback_id and stream_id: + self._feedback_index[feedback_id] = stream_id + def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]: """根据企业微信回调创建或获取会话。 @@ -597,14 +627,27 @@ class WecomBotClient: self.stream_sessions = StreamSessionManager(logger=logger) self.stream_poll_timeout = 0.5 + self._feedback_callback: Optional[Callable] = None + + def set_feedback_callback(self, callback: Callable) -> None: + """设置反馈回调函数。 + + Args: + callback: 反馈回调函数,签名: async def callback(feedback_id, feedback_type, feedback_content, inaccurate_reasons, session) + """ + self._feedback_callback = callback + @staticmethod - def _build_stream_payload(stream_id: str, content: str, finish: bool) -> dict[str, Any]: + def _build_stream_payload( + stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None + ) -> dict[str, Any]: """按照企业微信协议拼装返回报文。 Args: stream_id: 企业微信会话 ID。 content: 推送的文本内容。 finish: 是否为最终片段。 + feedback_id: 反馈 ID,用于接收用户点赞/点踩反馈。 Returns: dict[str, Any]: 可直接加密返回的 payload。 @@ -612,13 +655,16 @@ class WecomBotClient: Example: 组装 `{'msgtype': 'stream', 'stream': {'id': 'sid', ...}}` 结构。 """ + stream_payload = { + 'id': stream_id, + 'finish': finish, + 'content': content, + } + if feedback_id: + stream_payload['feedback'] = {'id': feedback_id} return { 'msgtype': 'stream', - 'stream': { - 'id': stream_id, - 'finish': finish, - 'content': content, - }, + 'stream': stream_payload, } async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]: @@ -674,9 +720,14 @@ class WecomBotClient: """ session, is_new = self.stream_sessions.create_or_get(msg_json) + feedback_id = str(uuid.uuid4()) + session.feedback_id = feedback_id + self.stream_sessions.register_feedback_id(session.stream_id, feedback_id) + message_data = await self.get_message(msg_json) if message_data: message_data['stream_id'] = session.stream_id + message_data['feedback_id'] = feedback_id try: event = wecombotevent.WecomBotEvent(message_data) except Exception: @@ -685,7 +736,7 @@ class WecomBotClient: if is_new: asyncio.create_task(self._dispatch_event(event)) - payload = self._build_stream_payload(session.stream_id, '', False) + payload = self._build_stream_payload(session.stream_id, '', False, feedback_id) return await self._encrypt_and_reply(payload, nonce) async def _handle_post_followup_response(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]: @@ -810,11 +861,79 @@ class WecomBotClient: msg_json = json.loads(decrypted_xml) + event = msg_json.get('event', {}) + event_type = event.get('eventtype', '') + + if event_type == 'feedback_event': + return await self._handle_feedback_event(msg_json, nonce) + if msg_json.get('msgtype') == 'stream': return await self._handle_post_followup_response(msg_json, nonce) return await self._handle_post_initial_response(msg_json, nonce) + async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]: + """处理企业微信用户反馈事件(点赞/点踩)。 + + Args: + msg_json: 解密后的企业微信反馈事件 JSON。 + nonce: 企业微信回调参数 nonce。 + + Returns: + Tuple[Response, int]: Quart Response 及状态码。 + + Note: + 企业微信协议要求:反馈事件目前仅支持回复空包。 + """ + try: + feedback_event = msg_json.get('event', {}).get('feedback_event', {}) + feedback_id = feedback_event.get('id', '') + feedback_type = feedback_event.get('type', 0) + feedback_content = feedback_event.get('content', '') + inaccurate_reasons = feedback_event.get('inaccurate_reason_list', []) + + await self.logger.info( + f'收到用户反馈事件: feedback_id={feedback_id}, type={feedback_type}, ' + f'content={feedback_content}, reasons={inaccurate_reasons}' + ) + + session = self.stream_sessions.get_session_by_feedback_id(feedback_id) + if session: + await self.logger.info( + f'反馈关联到会话: stream_id={session.stream_id}, msg_id={session.msg_id}, ' + f'user_id={session.user_id}' + ) + for handler in self._message_handlers.get('feedback', []): + try: + await handler( + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_content, + inaccurate_reasons=inaccurate_reasons, + session=session, + ) + except Exception: + await self.logger.error(traceback.format_exc()) + + if self._feedback_callback: + try: + await self._feedback_callback( + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_content, + inaccurate_reasons=inaccurate_reasons, + session=session, + ) + except Exception: + await self.logger.error(traceback.format_exc()) + else: + await self.logger.warning(f'未找到 feedback_id={feedback_id} 对应的会话') + + except Exception: + await self.logger.error(traceback.format_exc()) + + return await self._encrypt_and_reply({}, nonce) + async def get_message(self, msg_json): return await parse_wecom_bot_message(msg_json, self.EnCodingAESKey, self.logger) @@ -883,6 +1002,15 @@ class WecomBotClient: return decorator + def on_feedback(self): + def decorator(func: Callable): + if 'feedback' not in self._message_handlers: + self._message_handlers['feedback'] = [] + self._message_handlers['feedback'].append(func) + return func + + return decorator + async def download_url_to_base64(self, download_url, encoding_aes_key): data, _filename = await download_encrypted_file(download_url, encoding_aes_key, self.logger) if data: diff --git a/src/langbot/libs/wecom_ai_bot_api/wecombotevent.py b/src/langbot/libs/wecom_ai_bot_api/wecombotevent.py index bc105cf8..327441e4 100644 --- a/src/langbot/libs/wecom_ai_bot_api/wecombotevent.py +++ b/src/langbot/libs/wecom_ai_bot_api/wecombotevent.py @@ -133,3 +133,17 @@ class WecomBotEvent(dict): AI Bot ID """ return self.get('aibotid', '') + + @property + def feedback_id(self) -> str: + """ + 反馈 ID,用于关联用户点赞/点踩反馈 + """ + return self.get('feedback_id', '') + + @property + def stream_id(self) -> str: + """ + 流式消息 ID + """ + return self.get('stream_id', '') diff --git a/src/langbot/pkg/api/http/controller/groups/monitoring.py b/src/langbot/pkg/api/http/controller/groups/monitoring.py index 3cf08b7c..11c9e272 100644 --- a/src/langbot/pkg/api/http/controller/groups/monitoring.py +++ b/src/langbot/pkg/api/http/controller/groups/monitoring.py @@ -456,6 +456,31 @@ class MonitoringRouterGroup(group.RouterGroup): 'platform', 'user_id', ] + elif export_type == 'feedback': + data = await self.ap.monitoring_service.export_feedback( + bot_ids=bot_ids if bot_ids else None, + pipeline_ids=pipeline_ids if pipeline_ids else None, + start_time=start_time, + end_time=end_time, + limit=limit, + ) + headers = [ + 'id', + 'timestamp', + 'feedback_id', + 'feedback_type', + 'feedback_content', + 'inaccurate_reasons', + 'bot_id', + 'bot_name', + 'pipeline_id', + 'pipeline_name', + 'session_id', + 'message_id', + 'stream_id', + 'user_id', + 'platform', + ] else: return self.error(message=f'Invalid export type: {export_type}', code=400) @@ -486,3 +511,63 @@ class MonitoringRouterGroup(group.RouterGroup): ) return response, 200 + + @self.route('/feedback/stats', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_feedback_stats() -> str: + """Get feedback statistics""" + # Parse query parameters + bot_ids = quart.request.args.getlist('botId') + pipeline_ids = quart.request.args.getlist('pipelineId') + start_time_str = quart.request.args.get('startTime') + end_time_str = quart.request.args.get('endTime') + + # Parse datetime + start_time = parse_iso_datetime(start_time_str) + end_time = parse_iso_datetime(end_time_str) + + stats = await self.ap.monitoring_service.get_feedback_stats( + bot_ids=bot_ids if bot_ids else None, + pipeline_ids=pipeline_ids if pipeline_ids else None, + start_time=start_time, + end_time=end_time, + ) + + return self.success(data=stats) + + @self.route('/feedback', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_feedback() -> str: + """Get feedback list""" + # Parse query parameters + bot_ids = quart.request.args.getlist('botId') + pipeline_ids = quart.request.args.getlist('pipelineId') + feedback_type_str = quart.request.args.get('feedbackType') + start_time_str = quart.request.args.get('startTime') + end_time_str = quart.request.args.get('endTime') + limit = int(quart.request.args.get('limit', 100)) + offset = int(quart.request.args.get('offset', 0)) + + # Parse datetime + start_time = parse_iso_datetime(start_time_str) + end_time = parse_iso_datetime(end_time_str) + + # Parse feedback type + feedback_type = int(feedback_type_str) if feedback_type_str else None + + feedback_list, total = await self.ap.monitoring_service.get_feedback_list( + bot_ids=bot_ids if bot_ids else None, + pipeline_ids=pipeline_ids if pipeline_ids else None, + feedback_type=feedback_type, + start_time=start_time, + end_time=end_time, + limit=limit, + offset=offset, + ) + + return self.success( + data={ + 'feedback': feedback_list, + 'total': total, + 'limit': limit, + 'offset': offset, + } + ) diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index d2267a14..764b6b45 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -1183,3 +1183,268 @@ class MonitoringService: } for row in rows ] + + # ========== Feedback Methods ========== + + async def record_feedback( + self, + feedback_id: str, + feedback_type: int, + feedback_content: str | None = None, + inaccurate_reasons: list[str] | None = None, + bot_id: str | None = None, + bot_name: str | None = None, + pipeline_id: str | None = None, + pipeline_name: str | None = None, + session_id: str | None = None, + message_id: str | None = None, + stream_id: str | None = None, + user_id: str | None = None, + platform: str | None = None, + ) -> str: + """Record user feedback (like/dislike) from AI Bot conversation. + + Args: + feedback_id: Unique feedback identifier from platform (e.g., WeChat Work) + feedback_type: 1 = like (thumbs up), 2 = dislike (thumbs down) + feedback_content: Optional user feedback text + inaccurate_reasons: List of reasons for inaccurate response (for dislike) + bot_id: Bot ID + bot_name: Bot name + pipeline_id: Pipeline ID + pipeline_name: Pipeline name + session_id: Session ID + message_id: Message ID + stream_id: Stream ID (for WeChat Work streaming messages) + user_id: User ID + platform: Platform name (e.g., 'wecom') + + Returns: + The record ID + """ + import json + + record_id = str(uuid.uuid4()) + record_data = { + 'id': record_id, + 'timestamp': datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None), + 'feedback_id': feedback_id, + 'feedback_type': feedback_type, + 'feedback_content': feedback_content, + 'inaccurate_reasons': json.dumps(inaccurate_reasons, ensure_ascii=False) if inaccurate_reasons else None, + 'bot_id': bot_id, + 'bot_name': bot_name, + 'pipeline_id': pipeline_id, + 'pipeline_name': pipeline_name, + 'session_id': session_id, + 'message_id': message_id, + 'stream_id': stream_id, + 'user_id': user_id, + 'platform': platform, + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_monitoring.MonitoringFeedback).values(record_data) + ) + + return record_id + + async def get_feedback_stats( + self, + bot_ids: list[str] | None = None, + pipeline_ids: list[str] | None = None, + start_time: datetime.datetime | None = None, + end_time: datetime.datetime | None = None, + ) -> dict: + """Get feedback statistics. + + Returns: + Dictionary with total likes, dislikes, and breakdown by bot/pipeline + """ + conditions = [] + + if bot_ids: + conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids)) + if pipeline_ids: + conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids)) + if start_time: + conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time) + if end_time: + conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time) + + # Get total likes (feedback_type = 1) + likes_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)).where( + persistence_monitoring.MonitoringFeedback.feedback_type == 1 + ) + if conditions: + likes_query = likes_query.where(sqlalchemy.and_(*conditions)) + likes_result = await self.ap.persistence_mgr.execute_async(likes_query) + total_likes = likes_result.scalar() or 0 + + # Get total dislikes (feedback_type = 2) + dislikes_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)).where( + persistence_monitoring.MonitoringFeedback.feedback_type == 2 + ) + if conditions: + dislikes_query = dislikes_query.where(sqlalchemy.and_(*conditions)) + dislikes_result = await self.ap.persistence_mgr.execute_async(dislikes_query) + total_dislikes = dislikes_result.scalar() or 0 + + # Get total feedback count + total_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)) + if conditions: + total_query = total_query.where(sqlalchemy.and_(*conditions)) + total_result = await self.ap.persistence_mgr.execute_async(total_query) + total_feedback = total_result.scalar() or 0 + + # Calculate satisfaction rate + satisfaction_rate = (total_likes / total_feedback * 100) if total_feedback > 0 else 0 + + # Get feedback by bot + bot_stats_query = ( + sqlalchemy.select( + persistence_monitoring.MonitoringFeedback.bot_id, + persistence_monitoring.MonitoringFeedback.bot_name, + sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id).label('total'), + sqlalchemy.func.sum( + sqlalchemy.case( + (persistence_monitoring.MonitoringFeedback.feedback_type == 1, 1), + else_=0 + ) + ).label('likes'), + sqlalchemy.func.sum( + sqlalchemy.case( + (persistence_monitoring.MonitoringFeedback.feedback_type == 2, 1), + else_=0 + ) + ).label('dislikes'), + ) + .group_by( + persistence_monitoring.MonitoringFeedback.bot_id, + persistence_monitoring.MonitoringFeedback.bot_name, + ) + ) + if conditions: + bot_stats_query = bot_stats_query.where(sqlalchemy.and_(*conditions)) + bot_stats_result = await self.ap.persistence_mgr.execute_async(bot_stats_query) + bot_stats = [ + { + 'bot_id': row.bot_id, + 'bot_name': row.bot_name, + 'total': row.total, + 'likes': row.likes or 0, + 'dislikes': row.dislikes or 0, + } + for row in bot_stats_result.all() + ] + + return { + 'total_feedback': total_feedback, + 'total_likes': total_likes, + 'total_dislikes': total_dislikes, + 'satisfaction_rate': round(satisfaction_rate, 2), + 'by_bot': bot_stats, + } + + async def get_feedback_list( + self, + bot_ids: list[str] | None = None, + pipeline_ids: list[str] | None = None, + feedback_type: int | None = None, + start_time: datetime.datetime | None = None, + end_time: datetime.datetime | None = None, + limit: int = 100, + offset: int = 0, + ) -> tuple[list[dict], int]: + """Get feedback list with filters.""" + conditions = [] + + if bot_ids: + conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids)) + if pipeline_ids: + conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids)) + if feedback_type is not None: + conditions.append(persistence_monitoring.MonitoringFeedback.feedback_type == feedback_type) + if start_time: + conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time) + if end_time: + conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time) + + # Get total count + count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringFeedback.id)) + if conditions: + count_query = count_query.where(sqlalchemy.and_(*conditions)) + count_result = await self.ap.persistence_mgr.execute_async(count_query) + total = count_result.scalar() or 0 + + # Get feedback list + query = sqlalchemy.select(persistence_monitoring.MonitoringFeedback).order_by( + persistence_monitoring.MonitoringFeedback.timestamp.desc() + ) + if conditions: + query = query.where(sqlalchemy.and_(*conditions)) + query = query.limit(limit).offset(offset) + + result = await self.ap.persistence_mgr.execute_async(query) + rows = result.all() + + return ( + [ + self.ap.persistence_mgr.serialize_model( + persistence_monitoring.MonitoringFeedback, row[0] if isinstance(row, tuple) else row + ) + for row in rows + ], + total, + ) + + async def export_feedback( + self, + bot_ids: list[str] | None = None, + pipeline_ids: list[str] | None = None, + start_time: datetime.datetime | None = None, + end_time: datetime.datetime | None = None, + limit: int = 100000, + ) -> list[dict]: + """Export feedback as list of dictionaries for CSV conversion.""" + conditions = [] + + if bot_ids: + conditions.append(persistence_monitoring.MonitoringFeedback.bot_id.in_(bot_ids)) + if pipeline_ids: + conditions.append(persistence_monitoring.MonitoringFeedback.pipeline_id.in_(pipeline_ids)) + if start_time: + conditions.append(persistence_monitoring.MonitoringFeedback.timestamp >= start_time) + if end_time: + conditions.append(persistence_monitoring.MonitoringFeedback.timestamp <= end_time) + + query = sqlalchemy.select(persistence_monitoring.MonitoringFeedback).order_by( + persistence_monitoring.MonitoringFeedback.timestamp.desc() + ) + if conditions: + query = query.where(sqlalchemy.and_(*conditions)) + query = query.limit(limit) + + result = await self.ap.persistence_mgr.execute_async(query) + rows = result.all() + + return [ + { + 'id': row[0].id if isinstance(row, tuple) else row.id, + 'timestamp': self._format_timestamp(row[0].timestamp if isinstance(row, tuple) else row.timestamp), + 'feedback_id': row[0].feedback_id if isinstance(row, tuple) else row.feedback_id, + 'feedback_type': 'like' if (row[0].feedback_type if isinstance(row, tuple) else row.feedback_type) == 1 else 'dislike', + 'feedback_content': row[0].feedback_content if isinstance(row, tuple) else row.feedback_content, + 'inaccurate_reasons': row[0].inaccurate_reasons if isinstance(row, tuple) else row.inaccurate_reasons, + 'bot_id': row[0].bot_id if isinstance(row, tuple) else row.bot_id, + 'bot_name': row[0].bot_name if isinstance(row, tuple) else row.bot_name, + 'pipeline_id': row[0].pipeline_id if isinstance(row, tuple) else row.pipeline_id, + 'pipeline_name': row[0].pipeline_name if isinstance(row, tuple) else row.pipeline_name, + 'session_id': row[0].session_id if isinstance(row, tuple) else row.session_id, + 'message_id': row[0].message_id if isinstance(row, tuple) else row.message_id, + 'stream_id': row[0].stream_id if isinstance(row, tuple) else row.stream_id, + 'user_id': row[0].user_id if isinstance(row, tuple) else row.user_id, + 'platform': row[0].platform if isinstance(row, tuple) else row.platform, + } + for row in rows + ] diff --git a/src/langbot/pkg/entity/persistence/monitoring.py b/src/langbot/pkg/entity/persistence/monitoring.py index 6647bcb0..01e4fdd3 100644 --- a/src/langbot/pkg/entity/persistence/monitoring.py +++ b/src/langbot/pkg/entity/persistence/monitoring.py @@ -106,3 +106,26 @@ class MonitoringEmbeddingCall(Base): session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) call_type = sqlalchemy.Column(sqlalchemy.String(50), nullable=True) # embedding, retrieve + + +class MonitoringFeedback(Base): + """User feedback records (like/dislike) from AI Bot conversations""" + + __tablename__ = 'monitoring_feedback' + + id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + timestamp = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True) + feedback_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, unique=True, index=True) + feedback_type = sqlalchemy.Column(sqlalchemy.Integer, nullable=False) # 1=like, 2=dislike + feedback_content = sqlalchemy.Column(sqlalchemy.Text, nullable=True) # User feedback text + inaccurate_reasons = sqlalchemy.Column(sqlalchemy.Text, nullable=True) # JSON list of inaccurate reasons + # Context fields + bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + stream_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) # e.g., wecom diff --git a/src/langbot/pkg/persistence/migrations/dbm025_feedback_stats.py b/src/langbot/pkg/persistence/migrations/dbm025_feedback_stats.py new file mode 100644 index 00000000..d7146b99 --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm025_feedback_stats.py @@ -0,0 +1,75 @@ +import sqlalchemy +from .. import migration + + +@migration.migration_class(25) +class DBMigrateFeedbackStats(migration.DBMigration): + """Add monitoring_feedback table for storing user feedback from AI Bot conversations""" + + async def _table_exists(self, table_name: str) -> bool: + """Check if a table exists (works for both SQLite and PostgreSQL).""" + if self.ap.persistence_mgr.db.name == 'postgresql': + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = :table_name);' + ).bindparams(table_name=table_name) + ) + return bool(result.scalar()) + else: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text("SELECT name FROM sqlite_master WHERE type='table' AND name=:table_name;").bindparams( + table_name=table_name + ) + ) + return result.first() is not None + + async def upgrade(self): + """Create monitoring_feedback table.""" + if await self._table_exists('monitoring_feedback'): + self.ap.logger.debug('monitoring_feedback table already exists, skipping migration.') + return + + # Create monitoring_feedback table with all columns + create_table_sql = ''' + CREATE TABLE monitoring_feedback ( + id VARCHAR(255) PRIMARY KEY, + timestamp DATETIME NOT NULL, + feedback_id VARCHAR(255) NOT NULL UNIQUE, + feedback_type INTEGER NOT NULL, + feedback_content TEXT, + inaccurate_reasons TEXT, + bot_id VARCHAR(255), + bot_name VARCHAR(255), + pipeline_id VARCHAR(255), + pipeline_name VARCHAR(255), + session_id VARCHAR(255), + message_id VARCHAR(255), + stream_id VARCHAR(255), + user_id VARCHAR(255), + platform VARCHAR(255) + ) + ''' + await self.ap.persistence_mgr.execute_async(sqlalchemy.text(create_table_sql)) + + # Create indexes + indexes = [ + 'CREATE INDEX ix_monitoring_feedback_timestamp ON monitoring_feedback (timestamp)', + 'CREATE UNIQUE INDEX ix_monitoring_feedback_feedback_id ON monitoring_feedback (feedback_id)', + 'CREATE INDEX ix_monitoring_feedback_bot_id ON monitoring_feedback (bot_id)', + 'CREATE INDEX ix_monitoring_feedback_pipeline_id ON monitoring_feedback (pipeline_id)', + 'CREATE INDEX ix_monitoring_feedback_session_id ON monitoring_feedback (session_id)', + 'CREATE INDEX ix_monitoring_feedback_message_id ON monitoring_feedback (message_id)', + 'CREATE INDEX ix_monitoring_feedback_stream_id ON monitoring_feedback (stream_id)', + ] + for index_sql in indexes: + await self.ap.persistence_mgr.execute_async(sqlalchemy.text(index_sql)) + + self.ap.logger.info('Created monitoring_feedback table with indexes.') + + async def downgrade(self): + """Drop monitoring_feedback table.""" + if await self._table_exists('monitoring_feedback'): + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('DROP TABLE monitoring_feedback') + ) + self.ap.logger.info('Dropped monitoring_feedback table.') diff --git a/src/langbot/pkg/pipeline/monitoring_helper.py b/src/langbot/pkg/pipeline/monitoring_helper.py index 19467cc8..12f4efe9 100644 --- a/src/langbot/pkg/pipeline/monitoring_helper.py +++ b/src/langbot/pkg/pipeline/monitoring_helper.py @@ -353,3 +353,62 @@ class LLMCallMonitor: ) return False # Don't suppress exceptions + + +class FeedbackMonitor: + """Helper for recording user feedback from AI Bot conversations""" + + @staticmethod + async def record_feedback( + ap: app.Application, + feedback_id: str, + feedback_type: int, + feedback_content: str | None = None, + inaccurate_reasons: list[str] | None = None, + bot_id: str | None = None, + bot_name: str | None = None, + pipeline_id: str | None = None, + pipeline_name: str | None = None, + session_id: str | None = None, + message_id: str | None = None, + stream_id: str | None = None, + user_id: str | None = None, + platform: str = 'wecom', + ): + """Record user feedback (like/dislike) from AI Bot conversation. + + Args: + ap: Application instance + feedback_id: Unique feedback identifier from platform + feedback_type: 1 = like, 2 = dislike + feedback_content: Optional user feedback text + inaccurate_reasons: List of reasons for inaccurate response + bot_id: Bot UUID + bot_name: Bot name + pipeline_id: Pipeline UUID + pipeline_name: Pipeline name + session_id: Session ID + message_id: Message ID + stream_id: Stream ID + user_id: User ID + platform: Platform name (default: wecom) + """ + try: + await ap.monitoring_service.record_feedback( + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_content, + inaccurate_reasons=inaccurate_reasons, + bot_id=bot_id, + bot_name=bot_name, + pipeline_id=pipeline_id, + pipeline_name=pipeline_name, + session_id=session_id, + message_id=message_id, + stream_id=stream_id, + user_id=user_id, + platform=platform, + ) + ap.logger.info(f'Recorded feedback: feedback_id={feedback_id}, type={feedback_type}') + except Exception as e: + ap.logger.error(f'Failed to record feedback: {e}') diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 44874cfb..46e22cf1 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -9,6 +9,7 @@ from ..core import app, entities as core_entities, taskmgr from ..discover import engine from ..entity.persistence import bot as persistence_bot +from ..entity.persistence import pipeline as persistence_pipeline from ..entity.errors import platform as platform_errors @@ -267,12 +268,35 @@ class PlatformManager: adapter_inst = self.adapter_dict[bot_entity.adapter]( bot_entity.adapter_config, logger, + ap=self.ap, ) # 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook) if hasattr(adapter_inst, 'set_bot_uuid'): adapter_inst.set_bot_uuid(bot_entity.uuid) + # 如果 adapter 支持 set_bot_info 方法,设置 bot 信息(用于监控记录) + if hasattr(adapter_inst, 'set_bot_info'): + pipeline_name = '' + if bot_entity.use_pipeline_uuid: + try: + pipeline_result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_pipeline.LegacyPipeline.name).where( + persistence_pipeline.LegacyPipeline.uuid == bot_entity.use_pipeline_uuid + ) + ) + pipeline_row = pipeline_result.first() + if pipeline_row: + pipeline_name = pipeline_row[0] + except Exception: + pass + adapter_inst.set_bot_info( + bot_id=bot_entity.uuid, + bot_name=bot_entity.name, + pipeline_id=bot_entity.use_pipeline_uuid or '', + pipeline_name=pipeline_name, + ) + runtime_bot = RuntimeBot(ap=self.ap, bot_entity=bot_entity, adapter=adapter_inst, logger=logger) await runtime_bot.initialize() diff --git a/src/langbot/pkg/platform/sources/dingtalk.py b/src/langbot/pkg/platform/sources/dingtalk.py index bd9d0fa3..89d8566f 100644 --- a/src/langbot/pkg/platform/sources/dingtalk.py +++ b/src/langbot/pkg/platform/sources/dingtalk.py @@ -139,7 +139,7 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): dict # 回复卡片消息字典,key为消息id,value为回复卡片实例id,用于在流式消息时判断是否发送到指定卡片 ) - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): required_keys = [ 'client_id', 'client_secret', diff --git a/src/langbot/pkg/platform/sources/line.py b/src/langbot/pkg/platform/sources/line.py index 3d0f75c7..23ad6ea9 100644 --- a/src/langbot/pkg/platform/sources/line.py +++ b/src/langbot/pkg/platform/sources/line.py @@ -136,7 +136,7 @@ class LINEAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识 - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): configuration = Configuration(access_token=config['channel_access_token']) line_webhook = WebhookHandler(config['channel_secret']) parser = WebhookParser(config['channel_secret']) diff --git a/src/langbot/pkg/platform/sources/officialaccount.py b/src/langbot/pkg/platform/sources/officialaccount.py index 288991d6..fd71b846 100644 --- a/src/langbot/pkg/platform/sources/officialaccount.py +++ b/src/langbot/pkg/platform/sources/officialaccount.py @@ -60,7 +60,7 @@ class OfficialAccountAdapter(abstract_platform_adapter.AbstractMessagePlatformAd bot: typing.Union[OAClient, OAClientForLongerResponse] = pydantic.Field(exclude=True) bot_uuid: str = None - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): # 校验必填项 required_keys = ['token', 'EncodingAESKey', 'AppSecret', 'AppID', 'Mode'] missing_keys = [k for k in required_keys if k not in config] diff --git a/src/langbot/pkg/platform/sources/qqofficial.py b/src/langbot/pkg/platform/sources/qqofficial.py index 354afc41..224d3f88 100644 --- a/src/langbot/pkg/platform/sources/qqofficial.py +++ b/src/langbot/pkg/platform/sources/qqofficial.py @@ -132,7 +132,7 @@ class QQOfficialAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter message_converter: QQOfficialMessageConverter = QQOfficialMessageConverter() event_converter: QQOfficialEventConverter = QQOfficialEventConverter() - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): bot = QQOfficialClient( app_id=config['appid'], secret=config['secret'], token=config['token'], logger=logger, unified_mode=True ) diff --git a/src/langbot/pkg/platform/sources/slack.py b/src/langbot/pkg/platform/sources/slack.py index e577e3bd..5009ca33 100644 --- a/src/langbot/pkg/platform/sources/slack.py +++ b/src/langbot/pkg/platform/sources/slack.py @@ -99,7 +99,7 @@ class SlackAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): event_converter: SlackEventConverter = SlackEventConverter() config: dict - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): required_keys = [ 'bot_token', 'signing_secret', diff --git a/src/langbot/pkg/platform/sources/wechatpad.py b/src/langbot/pkg/platform/sources/wechatpad.py index 1c9c5ee2..f1e5f6d4 100644 --- a/src/langbot/pkg/platform/sources/wechatpad.py +++ b/src/langbot/pkg/platform/sources/wechatpad.py @@ -539,7 +539,7 @@ class WeChatPadAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter) typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None], ] = {} - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): quart_app = quart.Quart(__name__) message_converter = WeChatPadMessageConverter(config, logger) diff --git a/src/langbot/pkg/platform/sources/wecom.py b/src/langbot/pkg/platform/sources/wecom.py index 27c77ad3..baa06a85 100644 --- a/src/langbot/pkg/platform/sources/wecom.py +++ b/src/langbot/pkg/platform/sources/wecom.py @@ -206,7 +206,7 @@ class WecomAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): config: dict bot_uuid: str = None - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap=None, **kwargs): # 校验必填项 required_keys = [ 'corpid', diff --git a/src/langbot/pkg/platform/sources/wecombot.py b/src/langbot/pkg/platform/sources/wecombot.py index 07372ee7..cc47b9ac 100644 --- a/src/langbot/pkg/platform/sources/wecombot.py +++ b/src/langbot/pkg/platform/sources/wecombot.py @@ -10,8 +10,10 @@ import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.entities as platform_entities from ..logger import EventLogger from langbot.libs.wecom_ai_bot_api.wecombotevent import WecomBotEvent -from langbot.libs.wecom_ai_bot_api.api import WecomBotClient +from langbot.libs.wecom_ai_bot_api.api import WecomBotClient, StreamSession from langbot.libs.wecom_ai_bot_api.ws_client import WecomBotWsClient +from ...core import app as langbot_app +from ...pipeline.monitoring_helper import FeedbackMonitor class WecomBotMessageConverter(abstract_platform_adapter.AbstractMessageConverter): @@ -192,8 +194,10 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): _ws_mode: bool = False bot_name: str = '' listeners: dict = {} + ap: langbot_app.Application = None # Application reference for monitoring + _bot_info: dict = None # Bot info for monitoring (bot_id, bot_name, pipeline_id, pipeline_name) - def __init__(self, config: dict, logger: EventLogger): + def __init__(self, config: dict, logger: EventLogger, ap: langbot_app.Application = None, **kwargs): enable_webhook = config.get('enable-webhook', False) bot_name = config.get('robot_name', '') @@ -228,8 +232,14 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): bot_account_id=bot_account_id, bot_name=bot_name, event_converter=event_converter, + **kwargs, ) self.listeners = {} + object.__setattr__(self, '_ws_mode', ws_mode) + object.__setattr__(self, 'ap', ap) + + # Register feedback handler for monitoring + self._register_feedback_handler() async def reply_message( self, @@ -318,6 +328,66 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): """设置 bot UUID(用于生成 webhook URL)""" self.bot_uuid = bot_uuid + def set_bot_info( + self, + bot_id: str, + bot_name: str, + pipeline_id: str, + pipeline_name: str, + ): + """设置 bot 信息(用于监控记录)""" + self._bot_info = { + 'bot_id': bot_id, + 'bot_name': bot_name, + 'pipeline_id': pipeline_id, + 'pipeline_name': pipeline_name, + } + + def _register_feedback_handler(self): + """注册用户反馈处理器,用于持久化反馈数据到监控服务""" + + async def handle_feedback( + feedback_id: str, + feedback_type: int, + feedback_content: str, + inaccurate_reasons: list[str], + session: StreamSession, + ): + """处理用户反馈事件,持久化到监控服务""" + if not self.ap or not self._bot_info: + return + + try: + # Build session_id from session info + session_id = None + if session.chat_id: + session_id = f'group_{session.chat_id}' + elif session.user_id: + session_id = f'person_{session.user_id}' + + await FeedbackMonitor.record_feedback( + ap=self.ap, + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_content if feedback_content else None, + inaccurate_reasons=inaccurate_reasons if inaccurate_reasons else None, + bot_id=self._bot_info['bot_id'], + bot_name=self._bot_info['bot_name'], + pipeline_id=self._bot_info['pipeline_id'], + pipeline_name=self._bot_info['pipeline_name'], + session_id=session_id, + message_id=session.msg_id if session else None, + stream_id=session.stream_id if session else None, + user_id=session.user_id if session else None, + platform='wecom', + ) + except Exception: + await self.logger.error(f'Failed to record feedback: {traceback.format_exc()}') + + # Register the feedback handler with the bot client + if hasattr(self.bot, 'on_feedback'): + self.bot.on_feedback()(handle_feedback) + async def handle_unified_webhook(self, bot_uuid: str, path: str, request): _ws_mode = not self.config.get('enable-webhook', False) if _ws_mode: diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index ea0a682f..4fad9069 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 24 +required_database_version = 25 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/web/src/app/home/monitoring/components/FeedbackCard.tsx b/web/src/app/home/monitoring/components/FeedbackCard.tsx new file mode 100644 index 00000000..fd9835bd --- /dev/null +++ b/web/src/app/home/monitoring/components/FeedbackCard.tsx @@ -0,0 +1,160 @@ +'use client'; + +import React from 'react'; +import { useTranslation } from 'react-i18next'; +import { ThumbsUp, ThumbsDown, TrendingUp, TrendingDown, Minus } from 'lucide-react'; + +interface FeedbackCardProps { + title: string; + value: number | string; + subtitle?: string; + icon: React.ReactNode; + trend?: { + value: number; + direction: 'up' | 'down' | 'neutral'; + }; + variant?: 'default' | 'success' | 'warning' | 'danger'; + loading?: boolean; +} + +export function FeedbackCard({ + title, + value, + subtitle, + icon, + trend, + variant = 'default', + loading = false, +}: FeedbackCardProps) { + const variantStyles = { + default: 'bg-white dark:bg-[#2a2a2e] border-gray-200 dark:border-gray-700', + success: 'bg-green-50 dark:bg-green-900/20 border-green-200 dark:border-green-800', + warning: 'bg-yellow-50 dark:bg-yellow-900/20 border-yellow-200 dark:border-yellow-800', + danger: 'bg-red-50 dark:bg-red-900/20 border-red-200 dark:border-red-800', + }; + + const iconStyles = { + default: 'text-gray-500 dark:text-gray-400', + success: 'text-green-500 dark:text-green-400', + warning: 'text-yellow-500 dark:text-yellow-400', + danger: 'text-red-500 dark:text-red-400', + }; + + const trendStyles = { + up: 'text-green-500', + down: 'text-red-500', + neutral: 'text-gray-500', + }; + + if (loading) { + return ( +
+ {title} +
++ {value} +
+ {subtitle && ( ++ {subtitle} +
+ )} + {trend && ( ++ {t('monitoring.feedback.noFeedback')} +
++ {t('monitoring.feedback.noFeedbackDescription')} +
++ {item.feedbackContent} +
+ )} + + {item.inaccurateReasons && item.inaccurateReasons.length > 0 && ( ++ {item.feedbackContent} +
+