From f8010a20eb2592d9e174875d640ff18fa4c9e4cc Mon Sep 17 00:00:00 2001 From: 6mvp6 <119733319+6mvp6@users.noreply.github.com> Date: Sat, 18 Apr 2026 12:56:41 +0800 Subject: [PATCH] =?UTF-8?q?feat(monitoring):=20=E5=85=B3=E8=81=94=E5=8F=8D?= =?UTF-8?q?=E9=A6=88=E8=AE=B0=E5=BD=95=E4=B8=8E=E6=B6=88=E6=81=AFID?= =?UTF-8?q?=EF=BC=8C=E6=96=B0=E5=A2=9E=E5=8F=8D=E9=A6=88=E5=AF=BC=E5=87=BA?= =?UTF-8?q?=20(#2120)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(monitoring): link feedback to LangBot message ID and add feedback export - Add pipeline→adapter notification hook so monitoring message ID is passed back to WecomBotAdapter after creation - Store stream_id→monitoring_message_id mapping with 10-min TTL cleanup - Replace feedback record stream_id with LangBot monitoring message ID so feedback can be linked to actual message records - Rename streamId label to "Related Query ID" in all 7 i18n locales - Remove non-functional message ID jump button from FeedbackList - Add feedback export option to ExportDropdown (backend already implemented) Co-Authored-By: Claude Opus 4.6 * feat(monitoring): add combined refresh handler for monitoring and feedback data * fix(wecombot): improve stream ID mapping and error logging in WecomBotAdapter * feat(lark): add monitoring message ID mapping for feedback correlation * feat(lark): rename monitoring message ID mappings for clarity and consistency feat(feedback): add button to view conversation for feedback items * feat(bot-session-monitor): add feedback handling for bot messages with visual indicators * feat(bot-session-monitor): enhance feedback display with hover content for like/dislike indicators * fix(dingtalk): use voice recognition text instead of raw audio binary When DingTalk sends a voice message to the bot, the callback JSON contains a 'recognition' field with the speech-to-text result (powered by Qwen). Previously, LangBot only extracted the 'downloadCode' to download the raw audio binary and passed it as 'file_base64' to LLM APIs, which caused 400 errors since most models don't support this content type. This patch: - Extracts the 'recognition' field from DingTalk audio message content - Uses it as plain text input to the LLM instead of raw audio - Falls back to audio binary only when no recognition text is available - Fixes duplicate text issue for audio messages with recognition Fixes voice messages returning 'Request failed' on all LLM models. * fix: add filereader for dingtalk,lark (#2122) * fix: add filereader for dingtalk * feat: add lark * feat: update uv.lock * chore: update version to 4.9.6 in pyproject.toml, __init__.py, and uv.lock * fix: update langbot-plugin version to 0.3.8 * fix: update langbot-plugin version to 0.3.8 * fix(wecombot): extend StreamSession TTL for feedback sessions to prevent context data loss StreamSessionManager.cleanup() removes sessions after 60s TTL, but feedback events (like → cancel → dislike) can arrive later. When the session expires before the dislike event, all context fields (session_id, user_id, message_id, stream_id) are lost because get_session_by_feedback_id() returns None. Fix: Sessions with registered feedback_ids now use a 10-minute TTL, aligned with the adapter's _stream_to_monitoring_msg TTL in wecombot.py. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: 6mvp6 <13727783693@163.com> Co-authored-by: Claude Opus 4.6 Co-authored-by: fdc310 <2213070223@qq.com> Co-authored-by: haiyangbg Co-authored-by: Guanchao Wang Co-authored-by: Rock Chin <1010553892@qq.com> --- src/langbot/libs/wecom_ai_bot_api/api.py | 15 ++- src/langbot/pkg/pipeline/pipelinemgr.py | 3 + src/langbot/pkg/platform/sources/lark.py | 49 ++++++++ src/langbot/pkg/platform/sources/wecombot.py | 30 ++++- .../bot-session/BotSessionMonitor.tsx | 114 +++++++++++++++--- .../monitoring/components/ExportDropdown.tsx | 9 +- .../monitoring/components/FeedbackList.tsx | 31 ++--- web/src/app/home/monitoring/page.tsx | 16 ++- web/src/i18n/locales/en-US.ts | 3 +- web/src/i18n/locales/es-ES.ts | 3 +- web/src/i18n/locales/ja-JP.ts | 3 +- web/src/i18n/locales/th-TH.ts | 3 +- web/src/i18n/locales/vi-VN.ts | 3 +- web/src/i18n/locales/zh-Hans.ts | 3 +- web/src/i18n/locales/zh-Hant.ts | 3 +- 15 files changed, 241 insertions(+), 47 deletions(-) diff --git a/src/langbot/libs/wecom_ai_bot_api/api.py b/src/langbot/libs/wecom_ai_bot_api/api.py index bf77743d..b6f45cf2 100644 --- a/src/langbot/libs/wecom_ai_bot_api/api.py +++ b/src/langbot/libs/wecom_ai_bot_api/api.py @@ -71,6 +71,11 @@ class StreamSession: class StreamSessionManager: """管理 stream 会话的生命周期,并负责队列的生产消费。""" + # Sessions with registered feedback_ids use a longer TTL to survive the + # full like → cancel → dislike feedback flow. Must align with the adapter's + # _stream_to_monitoring_msg TTL (wecombot.py). + _FEEDBACK_SESSION_TTL = 600 # 10 minutes + def __init__(self, logger: EventLogger, ttl: int = 60) -> None: self.logger = logger @@ -214,11 +219,17 @@ class StreamSessionManager: session.last_access = time.time() def cleanup(self) -> None: - """定期清理过期会话,防止队列与映射无上限累积。""" + """定期清理过期会话,防止队列与映射无上限累积。 + + 已注册 feedback_id 的会话使用更长的 TTL,确保用户在点赞/取消/点踩流程中 + 不会因为 session 被提前清除而丢失上下文信息。 + """ now = time.time() expired: list[str] = [] for stream_id, session in self._sessions.items(): - if now - session.last_access > self.ttl: + # Sessions with registered feedback_ids use a longer TTL + effective_ttl = self._FEEDBACK_SESSION_TTL if session.feedback_id else self.ttl + if now - session.last_access > effective_ttl: expired.append(stream_id) for stream_id in expired: diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 40f93cda..1426fe3d 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -297,6 +297,9 @@ class RuntimePipeline: ) # Store message_id in query variables for LLM call monitoring query.variables['_monitoring_message_id'] = message_id + # Notify adapter so it can map platform-specific IDs to monitoring message ID + if hasattr(query.adapter, 'on_monitoring_message_created'): + await query.adapter.on_monitoring_message_created(query, message_id) except Exception as e: self.ap.logger.error(f'Failed to record query start: {e}') diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py index 364382c7..c73528a7 100644 --- a/src/langbot/pkg/platform/sources/lark.py +++ b/src/langbot/pkg/platform/sources/lark.py @@ -787,6 +787,13 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): card_id_dict: dict[str, str] # 消息id到卡片id的映射,便于创建卡片后的发送消息到指定卡片 + # Monitoring message ID mapping for feedback correlation + # Temp: user Lark message ID → monitoring_message_id (populated by on_monitoring_message_created, consumed by create_message_card) + pending_monitoring_msg: dict[str, str] + # Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks) + reply_to_monitoring_msg: dict[str, tuple[str, float]] + _MONITORING_MAPPING_TTL = 600 # 10 minutes + seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识 bot_uuid: str = None # 机器人UUID app_ticket: str = None # 商店应用用到 @@ -833,6 +840,11 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): else: session_id = None + # Resolve monitoring message ID from reply message mapping + monitoring_msg_id = None + if open_message_id and open_message_id in self.reply_to_monitoring_msg: + monitoring_msg_id = self.reply_to_monitoring_msg[open_message_id][0] + feedback_event = platform_events.FeedbackEvent( feedback_id=getattr(event.header, 'event_id', str(uuid.uuid4())), feedback_type=feedback_type, @@ -840,6 +852,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): user_id=user_id, session_id=session_id, message_id=open_message_id, + stream_id=monitoring_msg_id, source_platform_object=event, ) @@ -878,6 +891,8 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): logger=logger, lark_tenant_key=config.get('lark_tenant_key', ''), card_id_dict={}, + pending_monitoring_msg={}, + reply_to_monitoring_msg={}, seq=1, listeners={}, quart_app=quart_app, @@ -1018,6 +1033,22 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): is_stream = True return is_stream + async def on_monitoring_message_created(self, query, monitoring_message_id: str): + """Called by pipeline after monitoring message is created, to map user message ID to monitoring message ID.""" + try: + user_msg_id = query.message_event.message_chain.message_id + if user_msg_id: + self.pending_monitoring_msg[user_msg_id] = monitoring_message_id + except Exception as e: + await self.logger.debug(f'Failed to map message to monitoring message: {e}') + + def _cleanup_monitoring_mapping(self): + """Remove entries older than TTL from the reply-to-monitoring mapping.""" + now = time.time() + expired = [k for k, (_, ts) in self.reply_to_monitoring_msg.items() if now - ts > self._MONITORING_MAPPING_TTL] + for k in expired: + del self.reply_to_monitoring_msg[k] + async def create_card_id(self, message_id): try: # self.logger.debug('飞书支持stream输出,创建卡片......') @@ -1257,6 +1288,18 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): raise Exception( f'client.im.v1.message.reply failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}' ) + + # Transfer monitoring message mapping: user msg ID → reply msg ID + try: + user_msg_id = event.message_chain.message_id + reply_msg_id = getattr(response.data, 'message_id', None) + monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None) + if reply_msg_id and monitoring_msg_id: + self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time()) + self._cleanup_monitoring_mapping() + except Exception as e: + asyncio.create_task(self.logger.debug(f'Failed to transfer monitoring mapping in create_message_card: {e}')) + return True async def reply_message( @@ -1567,6 +1610,11 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): else: session_id = None + # Resolve monitoring message ID from reply message mapping + monitoring_msg_id = None + if open_message_id and open_message_id in self.reply_to_monitoring_msg: + monitoring_msg_id = self.reply_to_monitoring_msg[open_message_id][0] + feedback_event = platform_events.FeedbackEvent( feedback_id=data.get('header', {}).get('event_id', str(uuid.uuid4())), feedback_type=feedback_type, @@ -1574,6 +1622,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): user_id=user_id, session_id=session_id, message_id=open_message_id, + stream_id=monitoring_msg_id, source_platform_object=data, ) diff --git a/src/langbot/pkg/platform/sources/wecombot.py b/src/langbot/pkg/platform/sources/wecombot.py index 98a403a6..dd726544 100644 --- a/src/langbot/pkg/platform/sources/wecombot.py +++ b/src/langbot/pkg/platform/sources/wecombot.py @@ -1,6 +1,7 @@ from __future__ import annotations import typing import asyncio +import time import traceback import datetime @@ -293,6 +294,8 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): _ws_mode: bool = False bot_name: str = '' listeners: dict = {} + _stream_to_monitoring_msg: dict = {} # Maps stream_id to (monitoring_message_id, timestamp) + _STREAM_MAPPING_TTL = 600 # 10 minutes def __init__(self, config: dict, logger: EventLogger): enable_webhook = config.get('enable-webhook', False) @@ -329,8 +332,9 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): bot_account_id=bot_account_id, bot_name=bot_name, event_converter=event_converter, + listeners={}, + _stream_to_monitoring_msg={}, ) - self.listeners = {} async def reply_message( self, @@ -422,6 +426,23 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): """设置 bot UUID(用于生成 webhook URL)""" self.bot_uuid = bot_uuid + async def on_monitoring_message_created(self, query, monitoring_message_id: str): + """Called by pipeline after monitoring message is created, to map stream_id to monitoring message ID.""" + try: + stream_id = query.message_event.source_platform_object.stream_id + if stream_id: + self._stream_to_monitoring_msg[stream_id] = (monitoring_message_id, time.time()) + self._cleanup_stream_mapping() + except Exception as e: + await self.logger.debug(f'Failed to map stream_id to monitoring message: {e}') + + def _cleanup_stream_mapping(self): + """Remove entries older than TTL from the stream_id to monitoring message mapping.""" + now = time.time() + expired = [k for k, (_, ts) in self._stream_to_monitoring_msg.items() if now - ts > self._STREAM_MAPPING_TTL] + for k in expired: + del self._stream_to_monitoring_msg[k] + async def _on_feedback(self, **kwargs): """Handle feedback event from WeChat Work AI Bot SDK and dispatch as FeedbackEvent.""" try: @@ -447,6 +468,11 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): message_id = session.msg_id stream_id = session.stream_id + # Resolve stream_id to LangBot monitoring message ID if available + monitoring_msg_id = None + if stream_id and stream_id in self._stream_to_monitoring_msg: + monitoring_msg_id = self._stream_to_monitoring_msg[stream_id][0] + await self.logger.info( f'Feedback event: feedback_id={feedback_id}, type={feedback_type}, ' f'session_id={session_id}, user_id={user_id}, message_id={message_id}' @@ -460,7 +486,7 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): user_id=user_id, session_id=session_id, message_id=message_id, - stream_id=stream_id, + stream_id=monitoring_msg_id or stream_id, source_platform_object=session, ) diff --git a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx index fbaf6fb3..9b7d8928 100644 --- a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx +++ b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx @@ -10,7 +10,15 @@ import { useTranslation } from 'react-i18next'; import { httpClient } from '@/app/infra/http/HttpClient'; import { ScrollArea } from '@/components/ui/scroll-area'; import { cn } from '@/lib/utils'; -import { Ban, Bot, Copy, Check, Workflow } from 'lucide-react'; +import { + Ban, + Bot, + Copy, + Check, + Workflow, + ThumbsUp, + ThumbsDown, +} from 'lucide-react'; import { MessageChainComponent, Plain, @@ -54,6 +62,12 @@ interface SessionMessage { role?: string | null; } +interface SessionFeedback { + feedback_type: number; // 1=like, 2=dislike + feedback_content?: string | null; + stream_id?: string | null; +} + export interface BotSessionMonitorHandle { refreshSessions: () => Promise; } @@ -75,6 +89,9 @@ const BotSessionMonitor = forwardRef< const [loadingSessions, setLoadingSessions] = useState(false); const [loadingMessages, setLoadingMessages] = useState(false); const [copiedUserId, setCopiedUserId] = useState(false); + const [feedbackMap, setFeedbackMap] = useState< + Record + >({}); const messagesContainerRef = useRef(null); const parseSessionType = (sessionId: string): string | null => { @@ -117,21 +134,50 @@ const BotSessionMonitor = forwardRef< [loadSessions], ); - const loadMessages = useCallback(async (sessionId: string) => { - setLoadingMessages(true); - try { - const response = await httpClient.getSessionMessages(sessionId); - const sorted = (response.messages ?? []).sort( - (a, b) => - new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(), - ); - setMessages(sorted); - } catch (error) { - console.error('Failed to load session messages:', error); - } finally { - setLoadingMessages(false); - } - }, []); + const loadMessages = useCallback( + async (sessionId: string) => { + setLoadingMessages(true); + try { + const messagesRes = await httpClient.getSessionMessages(sessionId); + const sorted = (messagesRes.messages ?? []).sort( + (a, b) => + new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(), + ); + setMessages(sorted); + + // Collect user message IDs for feedback matching + const userMsgIds = new Set( + sorted.filter((m) => !m.role || m.role === 'user').map((m) => m.id), + ); + + if (userMsgIds.size > 0) { + // Fetch feedback for this bot, then match by stream_id locally + const feedbackRes = await httpClient.get<{ + feedback: SessionFeedback[]; + }>( + `/api/v1/monitoring/feedback?botId=${encodeURIComponent(botId)}&limit=200`, + ); + + const map: Record = {}; + if (feedbackRes?.feedback) { + for (const fb of feedbackRes.feedback) { + if (fb.stream_id && userMsgIds.has(fb.stream_id)) { + map[fb.stream_id] = fb; + } + } + } + setFeedbackMap(map); + } else { + setFeedbackMap({}); + } + } catch (error) { + console.error('Failed to load session messages:', error); + } finally { + setLoadingMessages(false); + } + }, + [botId], + ); useEffect(() => { loadSessions(); @@ -479,11 +525,21 @@ const BotSessionMonitor = forwardRef< {t('bots.sessionMonitor.noMessages')} ) : ( - messages.map((msg) => { + messages.map((msg, msgIndex) => { const isUser = isUserMessage(msg); const isDiscarded = msg.status === 'discarded' || msg.pipeline_id === PIPELINE_DISCARD; + // For bot replies, find feedback linked to the preceding user message + let msgFeedback: SessionFeedback | undefined; + if (!isUser) { + for (let i = msgIndex - 1; i >= 0; i--) { + if (isUserMessage(messages[i])) { + msgFeedback = feedbackMap[messages[i].id]; + break; + } + } + } return (
)} + {/* Feedback indicator — same line, pushed right */} + {!isUser && + msgFeedback && + (msgFeedback.feedback_type === 1 ? ( + + + {t('monitoring.feedback.like')} + {msgFeedback.feedback_content && ( + + {msgFeedback.feedback_content} + + )} + + ) : ( + + + {t('monitoring.feedback.dislike')} + {msgFeedback.feedback_content && ( + + {msgFeedback.feedback_content} + + )} + + ))}
diff --git a/web/src/app/home/monitoring/components/ExportDropdown.tsx b/web/src/app/home/monitoring/components/ExportDropdown.tsx index 47a3e87d..29dde3fd 100644 --- a/web/src/app/home/monitoring/components/ExportDropdown.tsx +++ b/web/src/app/home/monitoring/components/ExportDropdown.tsx @@ -7,6 +7,7 @@ import { AlertCircle, Users, Layers, + ThumbsUp, } from 'lucide-react'; import { Button } from '@/components/ui/button'; import { @@ -25,7 +26,8 @@ export type ExportType = | 'llm-calls' | 'embedding-calls' | 'errors' - | 'sessions'; + | 'sessions' + | 'feedback'; interface ExportDropdownProps { filterState: FilterState; @@ -162,6 +164,11 @@ export function ExportDropdown({ filterState }: ExportDropdownProps) { label: t('monitoring.export.sessions'), icon: , }, + { + type: 'feedback', + label: t('monitoring.export.feedback'), + icon: , + }, ]; return ( diff --git a/web/src/app/home/monitoring/components/FeedbackList.tsx b/web/src/app/home/monitoring/components/FeedbackList.tsx index 407d5e56..a64fdec1 100644 --- a/web/src/app/home/monitoring/components/FeedbackList.tsx +++ b/web/src/app/home/monitoring/components/FeedbackList.tsx @@ -127,6 +127,20 @@ export function FeedbackList({ {item.platform} )} + {item.streamId && onViewMessage && ( + + )} {item.feedbackContent && ( @@ -221,21 +235,8 @@ export function FeedbackList({
{t('monitoring.feedback.messageId')}
-
- {item.messageId} - {onViewMessage && ( - - )} +
+ {item.messageId}
)} diff --git a/web/src/app/home/monitoring/page.tsx b/web/src/app/home/monitoring/page.tsx index 02a60580..6fa6f121 100644 --- a/web/src/app/home/monitoring/page.tsx +++ b/web/src/app/home/monitoring/page.tsx @@ -1,4 +1,4 @@ -import React, { Suspense, useState, useMemo } from 'react'; +import React, { Suspense, useState, useMemo, useCallback } from 'react'; import { useTranslation } from 'react-i18next'; import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs'; import { Button } from '@/components/ui/button'; @@ -69,6 +69,9 @@ function MonitoringPageContent() { useMonitoringFilters(); const { data, loading, refetch } = useMonitoringData(filterState); + // Counter to force feedbackTimeRange recomputation on manual refresh + const [feedbackRefreshKey, setFeedbackRefreshKey] = useState(0); + // Get time range for feedback data const feedbackTimeRange = useMemo(() => { const now = new Date(); @@ -106,7 +109,8 @@ function MonitoringPageContent() { startTime: startTime?.toISOString(), endTime: endTime.toISOString(), }; - }, [filterState.timeRange, filterState.customDateRange]); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [filterState.timeRange, filterState.customDateRange, feedbackRefreshKey]); // Feedback data hook const { @@ -127,6 +131,12 @@ function MonitoringPageContent() { limit: 50, }); + // Combined refresh handler for both monitoring and feedback data + const handleRefresh = useCallback(() => { + refetch(); + setFeedbackRefreshKey((k) => k + 1); + }, [refetch]); + const [expandedMessageId, setExpandedMessageId] = useState( null, ); @@ -265,7 +275,7 @@ function MonitoringPageContent() {