From 89064a9d5b44c14d45296c6d0122d0e2f843dea5 Mon Sep 17 00:00:00 2001 From: Guanchao Wang Date: Thu, 12 Mar 2026 01:27:22 +0800 Subject: [PATCH] feat: add support for username (#2047) * feat: add support for username * fix: lint * fix: migerations * fix: change to version 21 * fix: remove duplicate dbm021 migration and rename dbm022 * feat: add user_id and user_name display with copy functionality in BotSessionMonitor --------- Co-authored-by: wangcham Co-authored-by: Junyan Qin --- .../libs/wecom_customer_service_api/api.py | 55 ++++++++++++++ .../pkg/api/http/service/monitoring.py | 4 + .../pkg/entity/persistence/monitoring.py | 2 + .../migrations/dbm022_monitoring_user_name.py | 73 +++++++++++++++++++ src/langbot/pkg/pipeline/monitoring_helper.py | 31 ++++++++ src/langbot/pkg/platform/sources/wecomcs.py | 19 ++++- src/langbot/pkg/utils/constants.py | 2 +- .../bot-session/BotSessionMonitor.tsx | 72 ++++++++++++++++-- web/src/app/infra/http/BackendClient.ts | 2 + 9 files changed, 250 insertions(+), 10 deletions(-) create mode 100644 src/langbot/pkg/persistence/migrations/dbm022_monitoring_user_name.py diff --git a/src/langbot/libs/wecom_customer_service_api/api.py b/src/langbot/libs/wecom_customer_service_api/api.py index e1b94879..70270b72 100644 --- a/src/langbot/libs/wecom_customer_service_api/api.py +++ b/src/langbot/libs/wecom_customer_service_api/api.py @@ -10,6 +10,7 @@ from typing import Callable from .wecomcsevent import WecomCSEvent import langbot_plugin.api.entities.builtin.platform.message as platform_message import aiofiles +import time class WecomCSClient: @@ -34,6 +35,10 @@ class WecomCSClient: self.unified_mode = unified_mode self.app = Quart(__name__) + # Customer info cache: {external_userid: (info_dict, timestamp)} + self._customer_cache: dict[str, tuple[dict, float]] = {} + self._cache_ttl = 60 # Cache TTL in seconds (1 minute) + # 只有在非统一模式下才注册独立路由 if not self.unified_mode: self.app.add_url_rule( @@ -378,3 +383,53 @@ class WecomCSClient: async def get_media_id(self, image: platform_message.Image): media_id = await self.upload_to_work(image=image) return media_id + + async def get_customer_info(self, external_userid: str) -> dict | None: + """ + Get customer information by external_userid with caching. + + Uses a 1-minute cache to avoid repeated API calls for the same user. + + Args: + external_userid: The external user ID of the customer. + + Returns: + Customer info dict with 'nickname', 'avatar', etc., or None if not found. + """ + # Check cache first + current_time = time.time() + if external_userid in self._customer_cache: + cached_info, cached_time = self._customer_cache[external_userid] + if current_time - cached_time < self._cache_ttl: + return cached_info + + # Cache miss or expired, fetch from API + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + + url = f'{self.base_url}/kf/customer/batchget?access_token={self.access_token}' + + payload = { + 'external_userid_list': [external_userid], + } + + async with httpx.AsyncClient() as client: + response = await client.post(url, json=payload) + data = response.json() + + if data.get('errcode') in [40014, 42001]: + self.access_token = await self.get_access_token(self.secret) + return await self.get_customer_info(external_userid) + + if data.get('errcode', 0) != 0: + if self.logger: + await self.logger.warning(f'Failed to get customer info: {data}') + return None + + customer_list = data.get('customer_list', []) + if customer_list: + customer_info = customer_list[0] + # Store in cache + self._customer_cache[external_userid] = (customer_info, current_time) + return customer_info + return None diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index 886b4ccc..33504aec 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -30,6 +30,7 @@ class MonitoringService: level: str = 'info', platform: str | None = None, user_id: str | None = None, + user_name: str | None = None, runner_name: str | None = None, variables: str | None = None, role: str = 'user', @@ -49,6 +50,7 @@ class MonitoringService: 'level': level, 'platform': platform, 'user_id': user_id, + 'user_name': user_name, 'runner_name': runner_name, 'variables': variables, 'role': role, @@ -152,6 +154,7 @@ class MonitoringService: pipeline_name: str, platform: str | None = None, user_id: str | None = None, + user_name: str | None = None, ) -> None: """Record a new session""" session_data = { @@ -166,6 +169,7 @@ class MonitoringService: 'is_active': True, 'platform': platform, 'user_id': user_id, + 'user_name': user_name, } await self.ap.persistence_mgr.execute_async( diff --git a/src/langbot/pkg/entity/persistence/monitoring.py b/src/langbot/pkg/entity/persistence/monitoring.py index 82d8ece5..6647bcb0 100644 --- a/src/langbot/pkg/entity/persistence/monitoring.py +++ b/src/langbot/pkg/entity/persistence/monitoring.py @@ -20,6 +20,7 @@ class MonitoringMessage(Base): level = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) # info, warning, error, debug platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + user_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) # User display name runner_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) # Runner name for this query variables = sqlalchemy.Column(sqlalchemy.Text, nullable=True) # Query variables as JSON string role = sqlalchemy.Column(sqlalchemy.String(50), nullable=True, default='user') # user, assistant @@ -64,6 +65,7 @@ class MonitoringSession(Base): is_active = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False, default=True, index=True) platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + user_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) # User display name class MonitoringError(Base): diff --git a/src/langbot/pkg/persistence/migrations/dbm022_monitoring_user_name.py b/src/langbot/pkg/persistence/migrations/dbm022_monitoring_user_name.py new file mode 100644 index 00000000..b66adc73 --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm022_monitoring_user_name.py @@ -0,0 +1,73 @@ +import sqlalchemy +from .. import migration + + +@migration.migration_class(22) +class DBMigrateMonitoringUserId(migration.DBMigration): + """Add user_id and user_name columns to monitoring_sessions table + + This migration adds the missing user_id column and also ensures user_name + column exists (in case migration 21 failed or was skipped). + """ + + async def _table_exists(self, table_name: str) -> bool: + """Check if a table exists (works for both SQLite and PostgreSQL).""" + if self.ap.persistence_mgr.db.name == 'postgresql': + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = :table_name);' + ).bindparams(table_name=table_name) + ) + return bool(result.scalar()) + else: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text("SELECT name FROM sqlite_master WHERE type='table' AND name=:table_name;").bindparams( + table_name=table_name + ) + ) + return result.first() is not None + + async def _get_table_columns(self, table_name: str) -> list[str]: + """Get column names from a table (works for both SQLite and PostgreSQL).""" + if self.ap.persistence_mgr.db.name == 'postgresql': + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.text( + 'SELECT column_name FROM information_schema.columns WHERE table_name = :table_name;' + ).bindparams(table_name=table_name) + ) + return [row[0] for row in result.fetchall()] + else: + if not table_name.isidentifier(): + raise ValueError(f'Invalid table name: {table_name}') + result = await self.ap.persistence_mgr.execute_async(sqlalchemy.text(f'PRAGMA table_info({table_name});')) + return [row[1] for row in result.fetchall()] + + async def _add_column_if_not_exists(self, table_name: str, column_name: str, column_type: str): + """Add a column to a table if it does not already exist.""" + columns = await self._get_table_columns(table_name) + if column_name in columns: + self.ap.logger.debug('%s column already exists in %s.', column_name, table_name) + return + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text(f'ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type};') + ) + self.ap.logger.info('Added %s column to %s table.', column_name, table_name) + + async def upgrade(self): + # Check if monitoring_sessions table exists + if not await self._table_exists('monitoring_sessions'): + self.ap.logger.warning('monitoring_sessions table does not exist, skipping migration.') + return + + # Add user_id column to monitoring_sessions table + await self._add_column_if_not_exists('monitoring_sessions', 'user_id', 'VARCHAR(255)') + + # Add user_name column to monitoring_sessions table (in case migration 21 failed) + await self._add_column_if_not_exists('monitoring_sessions', 'user_name', 'VARCHAR(255)') + + # Add user_name column to monitoring_messages table (in case migration 21 failed) + if await self._table_exists('monitoring_messages'): + await self._add_column_if_not_exists('monitoring_messages', 'user_name', 'VARCHAR(255)') + + async def downgrade(self): + pass diff --git a/src/langbot/pkg/pipeline/monitoring_helper.py b/src/langbot/pkg/pipeline/monitoring_helper.py index 6ad9a30d..19467cc8 100644 --- a/src/langbot/pkg/pipeline/monitoring_helper.py +++ b/src/langbot/pkg/pipeline/monitoring_helper.py @@ -34,6 +34,15 @@ class MonitoringHelper: # Check if session exists, if not, record session start session_id = f'{query.launcher_type}_{query.launcher_id}' + # Get sender name from message event + sender_name = None + if hasattr(query, 'message_event'): + if hasattr(query.message_event, 'sender'): + if hasattr(query.message_event.sender, 'nickname'): + sender_name = query.message_event.sender.nickname + elif hasattr(query.message_event.sender, 'member_name'): + sender_name = query.message_event.sender.member_name + # Try to record message # Use JSON serialization to preserve message chain structure (including image URLs, etc.) if hasattr(query, 'message_chain') and hasattr(query.message_chain, 'model_dump'): @@ -57,6 +66,7 @@ class MonitoringHelper: if hasattr(query.launcher_type, 'value') else str(query.launcher_type), user_id=query.sender_id, + user_name=sender_name, runner_name=runner_name, variables=None, # Will be updated in record_query_success ) @@ -80,6 +90,7 @@ class MonitoringHelper: if hasattr(query.launcher_type, 'value') else str(query.launcher_type), user_id=query.sender_id, + user_name=sender_name, ) return message_id @@ -128,6 +139,15 @@ class MonitoringHelper: try: session_id = f'{query.launcher_type}_{query.launcher_id}' + # Get sender name from message event + sender_name = None + if hasattr(query, 'message_event'): + if hasattr(query.message_event, 'sender'): + if hasattr(query.message_event.sender, 'nickname'): + sender_name = query.message_event.sender.nickname + elif hasattr(query.message_event.sender, 'member_name'): + sender_name = query.message_event.sender.member_name + # Extract response content from resp_message_chain if hasattr(query, 'resp_message_chain') and query.resp_message_chain: # Serialize the last response message chain @@ -162,6 +182,7 @@ class MonitoringHelper: if hasattr(query.launcher_type, 'value') else str(query.launcher_type), user_id=query.sender_id, + user_name=sender_name, runner_name=runner_name, role='assistant', ) @@ -183,6 +204,15 @@ class MonitoringHelper: try: session_id = f'{query.launcher_type}_{query.launcher_id}' + # Get sender name from message event + sender_name = None + if hasattr(query, 'message_event'): + if hasattr(query.message_event, 'sender'): + if hasattr(query.message_event.sender, 'nickname'): + sender_name = query.message_event.sender.nickname + elif hasattr(query.message_event.sender, 'member_name'): + sender_name = query.message_event.sender.member_name + # Record error message message_id = await ap.monitoring_service.record_message( bot_id=bot_id, @@ -197,6 +227,7 @@ class MonitoringHelper: if hasattr(query.launcher_type, 'value') else str(query.launcher_type), user_id=query.sender_id, + user_name=sender_name, runner_name=runner_name, ) diff --git a/src/langbot/pkg/platform/sources/wecomcs.py b/src/langbot/pkg/platform/sources/wecomcs.py index 536429cc..9af809f7 100644 --- a/src/langbot/pkg/platform/sources/wecomcs.py +++ b/src/langbot/pkg/platform/sources/wecomcs.py @@ -81,22 +81,33 @@ class WecomEventConverter(abstract_platform_adapter.AbstractEventConverter): return event.source_platform_object @staticmethod - async def target2yiri(event: WecomCSEvent): + async def target2yiri(event: WecomCSEvent, bot: WecomCSClient = None): """ 将 WecomEvent 转换为平台的 FriendMessage 对象。 Args: event (WecomEvent): 企业微信客服事件。 + bot (WecomCSClient): 企业微信客服客户端,用于获取用户信息。 Returns: platform_events.FriendMessage: 转换后的 FriendMessage 对象。 """ + # Try to get customer nickname from WeChat API + nickname = str(event.user_id) + if bot and event.user_id: + try: + customer_info = await bot.get_customer_info(event.user_id) + if customer_info and customer_info.get('nickname'): + nickname = customer_info.get('nickname') + except Exception: + pass # Fall back to user_id as nickname + # 转换消息链 if event.type == 'text': yiri_chain = await WecomMessageConverter.target2yiri(event.message, event.message_id) friend = platform_entities.Friend( id=f'u{event.user_id}', - nickname=str(event.user_id), + nickname=nickname, remark='', ) @@ -106,7 +117,7 @@ class WecomEventConverter(abstract_platform_adapter.AbstractEventConverter): elif event.type == 'image': friend = platform_entities.Friend( id=f'u{event.user_id}', - nickname=str(event.user_id), + nickname=nickname, remark='', ) @@ -187,7 +198,7 @@ class WecomCSAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): async def on_message(event: WecomCSEvent): self.bot_account_id = event.receiver_id try: - return await callback(await self.event_converter.target2yiri(event), self) + return await callback(await self.event_converter.target2yiri(event, self.bot), self) except Exception: await self.logger.error(f'Error in wecomcs callback: {traceback.format_exc()}') diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index 8d8e972a..d87efec7 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 21 +required_database_version = 22 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx index f3c8e19e..94734485 100644 --- a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx +++ b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx @@ -6,6 +6,7 @@ import { httpClient } from '@/app/infra/http/HttpClient'; import { ScrollArea } from '@/components/ui/scroll-area'; import { Button } from '@/components/ui/button'; import { cn } from '@/lib/utils'; +import { Copy, Check } from 'lucide-react'; import { MessageChainComponent, Plain, @@ -27,6 +28,7 @@ interface SessionInfo { is_active: boolean; platform?: string | null; user_id?: string | null; + user_name?: string | null; } interface SessionMessage { @@ -60,8 +62,29 @@ export default function BotSessionMonitor({ botId }: BotSessionMonitorProps) { const [messages, setMessages] = useState([]); const [loadingSessions, setLoadingSessions] = useState(false); const [loadingMessages, setLoadingMessages] = useState(false); + const [copiedUserId, setCopiedUserId] = useState(false); const messagesContainerRef = useRef(null); + const parseSessionType = (sessionId: string): string | null => { + const idx = sessionId.indexOf('_'); + if (idx === -1) return null; + const type = sessionId.slice(0, idx); + if (type === 'person' || type === 'group') return type; + return null; + }; + + const abbreviateId = (id: string): string => { + if (id.length <= 10) return id; + return `${id.slice(0, 4)}..${id.slice(-4)}`; + }; + + const copyUserId = (userId: string) => { + navigator.clipboard.writeText(userId).then(() => { + setCopiedUserId(true); + setTimeout(() => setCopiedUserId(false), 2000); + }); + }; + const loadSessions = useCallback(async () => { setLoadingSessions(true); try { @@ -338,24 +361,36 @@ export default function BotSessionMonitor({ botId }: BotSessionMonitorProps) { >
- {session.user_id || session.session_id.slice(0, 12)} + {session.user_name || + session.user_id || + session.session_id.slice(0, 12)} {formatRelativeTime(session.last_activity)}
+ {parseSessionType(session.session_id) && ( + + {parseSessionType(session.session_id)} + + )} {session.platform && ( {session.platform} )} + {session.user_id && ( + + {abbreviateId(session.user_id)} + + )} {session.is_active && ( )} - {session.pipeline_name} + {session.pipeline_name}
); @@ -377,15 +412,42 @@ export default function BotSessionMonitor({ botId }: BotSessionMonitorProps) {
- {selectedSession?.user_id || selectedSessionId.slice(0, 20)} + {selectedSession?.user_name || + selectedSession?.user_id || + selectedSessionId.slice(0, 20)}
+ {parseSessionType(selectedSessionId) && ( + {parseSessionType(selectedSessionId)} + )} {selectedSession?.platform && ( - {selectedSession.platform} + <> + {parseSessionType(selectedSessionId) && ·} + {selectedSession.platform} + + )} + {selectedSession?.user_id && ( + <> + · + + {selectedSession.user_id} + + + )} {selectedSession?.pipeline_name && ( <> - {selectedSession?.platform && ·} + · {selectedSession.pipeline_name} )} diff --git a/web/src/app/infra/http/BackendClient.ts b/web/src/app/infra/http/BackendClient.ts index 1c3609c4..d234ddd1 100644 --- a/web/src/app/infra/http/BackendClient.ts +++ b/web/src/app/infra/http/BackendClient.ts @@ -356,6 +356,7 @@ export class BackendClient extends BaseHttpClient { is_active: boolean; platform: string | null; user_id: string | null; + user_name: string | null; }>; total: number; }> { @@ -384,6 +385,7 @@ export class BackendClient extends BaseHttpClient { level: string; platform: string | null; user_id: string | null; + user_name: string | null; runner_name: string | null; variables: string | null; role: string | null;