From db68c5d0c9e97df5b48d53e69ea49f488b4a7fbc Mon Sep 17 00:00:00 2001 From: RockChinQ Date: Sat, 4 Apr 2026 23:47:46 +0800 Subject: [PATCH] feat(human-takeover): add human customer service takeover feature Add ability for admin operators to take over user conversation sessions from the AI bot, manually reply as the bot, and release sessions back to AI processing. Backend: - New HumanTakeoverSession DB model and migration (v26) - HumanTakeoverService with in-memory cache for hot-path performance - REST API endpoints for takeover/release/send-message/list/detail - Message interception in botmgr.py (after webhook, before pipeline) Frontend: - Takeover/release controls in BotSessionMonitor chat header - Operator message input bar with visual distinction (orange theme) - Taken-over session indicators in session list - 3-second auto-refresh polling during active takeover - Full i18n coverage across all 7 locales --- .../http/controller/groups/human_takeover.py | 97 +++++ .../pkg/api/http/service/human_takeover.py | 314 ++++++++++++++++ src/langbot/pkg/core/app.py | 3 + src/langbot/pkg/core/stages/build_app.py | 5 + .../pkg/entity/persistence/human_takeover.py | 36 ++ .../dbm026_human_takeover_sessions.py | 36 ++ src/langbot/pkg/platform/botmgr.py | 85 +++++ src/langbot/pkg/utils/constants.py | 2 +- .../bot-session/BotSessionMonitor.tsx | 353 +++++++++++++++--- web/src/app/infra/http/BackendClient.ts | 86 +++++ web/src/i18n/locales/en-US.ts | 14 + web/src/i18n/locales/es-ES.ts | 14 + web/src/i18n/locales/ja-JP.ts | 14 + web/src/i18n/locales/th-TH.ts | 14 + web/src/i18n/locales/vi-VN.ts | 14 + web/src/i18n/locales/zh-Hans.ts | 14 + web/src/i18n/locales/zh-Hant.ts | 14 + 17 files changed, 1055 insertions(+), 60 deletions(-) create mode 100644 src/langbot/pkg/api/http/controller/groups/human_takeover.py create mode 100644 src/langbot/pkg/api/http/service/human_takeover.py create mode 100644 src/langbot/pkg/entity/persistence/human_takeover.py create mode 100644 src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py diff --git a/src/langbot/pkg/api/http/controller/groups/human_takeover.py b/src/langbot/pkg/api/http/controller/groups/human_takeover.py new file mode 100644 index 00000000..32e23f6b --- /dev/null +++ b/src/langbot/pkg/api/http/controller/groups/human_takeover.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import quart + +from .. import group + + +@group.group_class('human-takeover', '/api/v1/human-takeover') +class HumanTakeoverRouterGroup(group.RouterGroup): + async def initialize(self) -> None: + @self.route('/sessions', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_sessions(): + """Get list of takeover sessions, optionally filtered by bot UUID.""" + bot_uuid = quart.request.args.get('botUuid') + limit = int(quart.request.args.get('limit', 100)) + offset = int(quart.request.args.get('offset', 0)) + + sessions, total = await self.ap.human_takeover_service.get_active_sessions( + bot_uuid=bot_uuid if bot_uuid else None, + limit=limit, + offset=offset, + ) + + return self.success( + data={ + 'sessions': sessions, + 'total': total, + 'limit': limit, + 'offset': offset, + } + ) + + @self.route('/sessions/', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_session_detail(session_id: str): + """Get detail for a specific takeover session.""" + detail = await self.ap.human_takeover_service.get_session_detail(session_id) + if not detail: + return self.success(data={'found': False, 'session_id': session_id}) + return self.success(data={'found': True, 'session': detail}) + + @self.route('/sessions//takeover', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) + async def takeover_session(session_id: str, user_email: str = None): + """Take over a conversation session.""" + data = await quart.request.get_json(silent=True) or {} + + bot_uuid = data.get('bot_uuid') + if not bot_uuid: + return self.fail(-1, 'bot_uuid is required') + + platform = data.get('platform') + user_id = data.get('user_id') + user_name = data.get('user_name') + + try: + result = await self.ap.human_takeover_service.takeover_session( + session_id=session_id, + bot_uuid=bot_uuid, + taken_by=user_email or data.get('taken_by'), + platform=platform, + user_id=user_id, + user_name=user_name, + ) + return self.success(data=result) + except ValueError as e: + return self.fail(-1, str(e)) + + @self.route('/sessions//release', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) + async def release_session(session_id: str): + """Release a taken-over session back to AI pipeline.""" + try: + result = await self.ap.human_takeover_service.release_session(session_id) + return self.success(data=result) + except ValueError as e: + return self.fail(-1, str(e)) + + @self.route('/sessions//message', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) + async def send_message(session_id: str, user_email: str = None): + """Send a message from the operator to the user.""" + data = await quart.request.get_json(silent=True) or {} + + message_text = data.get('message') + if not message_text: + return self.fail(-1, 'message is required') + + operator_name = user_email or data.get('operator_name', 'Operator') + + try: + result = await self.ap.human_takeover_service.send_message( + session_id=session_id, + message_text=message_text, + operator_name=operator_name, + ) + return self.success(data=result) + except ValueError as e: + return self.fail(-1, str(e)) + except RuntimeError as e: + return self.fail(-2, str(e)) diff --git a/src/langbot/pkg/api/http/service/human_takeover.py b/src/langbot/pkg/api/http/service/human_takeover.py new file mode 100644 index 00000000..e9dd652a --- /dev/null +++ b/src/langbot/pkg/api/http/service/human_takeover.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import uuid +import datetime +import json +import logging + +import sqlalchemy + +from ....core import app +from ....entity.persistence import human_takeover as persistence_human_takeover + +import langbot_plugin.api.entities.builtin.platform.message as platform_message + + +class HumanTakeoverService: + """Human takeover service. + + Manages operator takeover of user conversation sessions, bypassing + the normal AI pipeline. Uses an in-memory cache for fast synchronous + lookups on the hot message path, backed by database persistence. + """ + + ap: app.Application + + # In-memory cache: session_id -> HumanTakeoverSession record id + # Only contains sessions with status='active' + _active_sessions: dict[str, str] + + logger: logging.Logger + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + self._active_sessions = {} + self.logger = logging.getLogger('human-takeover') + + async def initialize(self) -> None: + """Load active takeover sessions from DB into memory cache.""" + try: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession).where( + persistence_human_takeover.HumanTakeoverSession.status == 'active' + ) + ) + rows = result.all() + for row in rows: + session = row[0] if isinstance(row, tuple) else row + self._active_sessions[session.session_id] = session.id + self.logger.info(f'Loaded {len(self._active_sessions)} active takeover sessions from DB') + except Exception as e: + self.logger.warning(f'Failed to load active takeover sessions: {e}') + + def is_taken_over(self, session_id: str) -> bool: + """Check if a session is currently under human takeover. + + This is a synchronous in-memory lookup for performance, since it + is called on every incoming message (hot path). + """ + return session_id in self._active_sessions + + async def takeover_session( + self, + session_id: str, + bot_uuid: str, + taken_by: str | None = None, + platform: str | None = None, + user_id: str | None = None, + user_name: str | None = None, + ) -> dict: + """Take over a conversation session. + + Args: + session_id: The session to take over (e.g. 'person_123' or 'group_456'). + bot_uuid: UUID of the bot whose session is being taken over. + taken_by: Email/username of the admin performing the takeover. + platform: Platform name. + user_id: The end-user's ID in the session. + user_name: The end-user's display name. + + Returns: + Dict with the created takeover session record. + + Raises: + ValueError: If the session is already taken over. + """ + if self.is_taken_over(session_id): + raise ValueError(f'Session {session_id} is already taken over') + + record_id = str(uuid.uuid4()) + now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + record_data = { + 'id': record_id, + 'session_id': session_id, + 'bot_uuid': bot_uuid, + 'status': 'active', + 'taken_by': taken_by, + 'taken_at': now, + 'released_at': None, + 'platform': platform, + 'user_id': user_id, + 'user_name': user_name, + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_human_takeover.HumanTakeoverSession).values(record_data) + ) + + # Update in-memory cache + self._active_sessions[session_id] = record_id + + self.logger.info(f'Session {session_id} taken over by {taken_by}') + + return record_data + + async def release_session(self, session_id: str) -> dict: + """Release a taken-over session back to AI pipeline processing. + + Args: + session_id: The session to release. + + Returns: + Dict with the updated takeover session record. + + Raises: + ValueError: If the session is not currently taken over. + """ + if not self.is_taken_over(session_id): + raise ValueError(f'Session {session_id} is not currently taken over') + + now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.update(persistence_human_takeover.HumanTakeoverSession) + .where( + sqlalchemy.and_( + persistence_human_takeover.HumanTakeoverSession.session_id == session_id, + persistence_human_takeover.HumanTakeoverSession.status == 'active', + ) + ) + .values(status='released', released_at=now) + ) + + # Remove from in-memory cache + self._active_sessions.pop(session_id, None) + + self.logger.info(f'Session {session_id} released back to AI pipeline') + + return { + 'session_id': session_id, + 'status': 'released', + 'released_at': now.isoformat(), + } + + async def send_message( + self, + session_id: str, + message_text: str, + operator_name: str | None = None, + ) -> dict: + """Send a message from the operator to the user via the platform adapter. + + Args: + session_id: The taken-over session ID (e.g. 'person_123' or 'group_456'). + message_text: The text message to send. + operator_name: Name of the operator sending the message. + + Returns: + Dict with send result info. + + Raises: + ValueError: If the session is not currently taken over. + RuntimeError: If the bot or adapter cannot be found. + """ + if not self.is_taken_over(session_id): + raise ValueError(f'Session {session_id} is not currently taken over') + + # Look up the takeover record to get bot_uuid + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession).where( + sqlalchemy.and_( + persistence_human_takeover.HumanTakeoverSession.session_id == session_id, + persistence_human_takeover.HumanTakeoverSession.status == 'active', + ) + ) + ) + row = result.first() + if not row: + raise RuntimeError(f'Active takeover record not found for session {session_id}') + + takeover_record = row[0] if isinstance(row, tuple) else row + bot_uuid = takeover_record.bot_uuid + + # Get the runtime bot + runtime_bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid) + if not runtime_bot: + raise RuntimeError(f'Bot {bot_uuid} not found or not running') + + # Parse session_id to determine target_type and target_id + # Format: 'person_{id}' or 'group_{id}' + if session_id.startswith('person_'): + target_type = 'person' + target_id = session_id[len('person_') :] + elif session_id.startswith('group_'): + target_type = 'group' + target_id = session_id[len('group_') :] + else: + raise ValueError(f'Invalid session_id format: {session_id}') + + # Build message chain + message_chain = platform_message.MessageChain([platform_message.Plain(text=message_text)]) + + # Send via adapter + await runtime_bot.adapter.send_message(target_type, target_id, message_chain) + + # Record the operator message in monitoring + bot_name = runtime_bot.bot_entity.name or bot_uuid + try: + message_content = json.dumps(message_chain.model_dump(), ensure_ascii=False) + except Exception: + message_content = message_text + + await self.ap.monitoring_service.record_message( + bot_id=bot_uuid, + bot_name=bot_name, + pipeline_id='__human_takeover__', + pipeline_name='Human Takeover', + message_content=message_content, + session_id=session_id, + status='success', + level='info', + platform=takeover_record.platform, + user_id=operator_name or 'operator', + user_name=operator_name or 'Operator', + role='operator', + ) + + self.logger.info(f'Operator message sent to session {session_id}: {message_text[:50]}...') + + return { + 'session_id': session_id, + 'message_sent': True, + } + + async def get_active_sessions( + self, + bot_uuid: str | None = None, + limit: int = 100, + offset: int = 0, + ) -> tuple[list[dict], int]: + """Get list of active (or all) takeover sessions. + + Args: + bot_uuid: Optional filter by bot UUID. + limit: Maximum number of results. + offset: Pagination offset. + + Returns: + Tuple of (list of session dicts, total count). + """ + conditions = [] + + if bot_uuid: + conditions.append(persistence_human_takeover.HumanTakeoverSession.bot_uuid == bot_uuid) + + # Count + count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_human_takeover.HumanTakeoverSession.id)) + if conditions: + count_query = count_query.where(sqlalchemy.and_(*conditions)) + + count_result = await self.ap.persistence_mgr.execute_async(count_query) + total = count_result.scalar() or 0 + + # Fetch records + query = sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession).order_by( + persistence_human_takeover.HumanTakeoverSession.taken_at.desc() + ) + if conditions: + query = query.where(sqlalchemy.and_(*conditions)) + + query = query.limit(limit).offset(offset) + + result = await self.ap.persistence_mgr.execute_async(query) + rows = result.all() + + sessions = [] + for row in rows: + session = row[0] if isinstance(row, tuple) else row + sessions.append( + self.ap.persistence_mgr.serialize_model(persistence_human_takeover.HumanTakeoverSession, session) + ) + + return sessions, total + + async def get_session_detail(self, session_id: str) -> dict | None: + """Get detail for a specific takeover session. + + Args: + session_id: The session ID to look up. + + Returns: + Session dict or None if not found. + """ + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession) + .where(persistence_human_takeover.HumanTakeoverSession.session_id == session_id) + .order_by(persistence_human_takeover.HumanTakeoverSession.taken_at.desc()) + ) + row = result.first() + if not row: + return None + + session = row[0] if isinstance(row, tuple) else row + return self.ap.persistence_mgr.serialize_model(persistence_human_takeover.HumanTakeoverSession, session) diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index e515cfb9..6912a1c8 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -31,6 +31,7 @@ from ..api.http.service import mcp as mcp_service from ..api.http.service import apikey as apikey_service from ..api.http.service import webhook as webhook_service from ..api.http.service import monitoring as monitoring_service +from ..api.http.service import human_takeover as human_takeover_service from ..discover import engine as discover_engine from ..storage import mgr as storagemgr @@ -153,6 +154,8 @@ class Application: monitoring_service: monitoring_service.MonitoringService = None + human_takeover_service: human_takeover_service.HumanTakeoverService = None + def __init__(self): pass diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 62f0ae7b..74193d31 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -28,6 +28,7 @@ from ...api.http.service import mcp as mcp_service from ...api.http.service import apikey as apikey_service from ...api.http.service import webhook as webhook_service from ...api.http.service import monitoring as monitoring_service +from ...api.http.service import human_takeover as human_takeover_service from ...discover import engine as discover_engine from ...storage import mgr as storagemgr from ...utils import logcache @@ -164,6 +165,10 @@ class BuildAppStage(stage.BootingStage): monitoring_service_inst = monitoring_service.MonitoringService(ap) ap.monitoring_service = monitoring_service_inst + human_takeover_service_inst = human_takeover_service.HumanTakeoverService(ap) + await human_takeover_service_inst.initialize() + ap.human_takeover_service = human_takeover_service_inst + async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None: await asyncio.sleep(3) await plugin_connector_inst.initialize() diff --git a/src/langbot/pkg/entity/persistence/human_takeover.py b/src/langbot/pkg/entity/persistence/human_takeover.py new file mode 100644 index 00000000..e1857a84 --- /dev/null +++ b/src/langbot/pkg/entity/persistence/human_takeover.py @@ -0,0 +1,36 @@ +import sqlalchemy + +from .base import Base + + +class HumanTakeoverSession(Base): + """Human takeover session records. + + Tracks which conversation sessions are currently under human operator control, + bypassing the normal AI pipeline processing. + """ + + __tablename__ = 'human_takeover_sessions' + + id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, unique=True, index=True) + """Corresponds to monitoring_sessions.session_id, format: 'person_{id}' or 'group_{id}'""" + + bot_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) + """UUID of the bot whose session is being taken over""" + + status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, default='active', index=True) + """Takeover status: 'active' or 'released'""" + + taken_by = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Email/username of the admin who took over the session""" + + taken_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False) + """Timestamp when the takeover started""" + + released_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True) + """Timestamp when the takeover was released (null if still active)""" + + platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + user_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) diff --git a/src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py b/src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py new file mode 100644 index 00000000..699e50d2 --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py @@ -0,0 +1,36 @@ +import sqlalchemy +from .. import migration + + +@migration.migration_class(26) +class DBMigrateHumanTakeoverSessions(migration.DBMigration): + """Create human_takeover_sessions table for human operator takeover support""" + + async def upgrade(self): + sql_text = sqlalchemy.text(""" + CREATE TABLE IF NOT EXISTS human_takeover_sessions ( + id VARCHAR(255) PRIMARY KEY, + session_id VARCHAR(255) NOT NULL UNIQUE, + bot_uuid VARCHAR(255) NOT NULL, + status VARCHAR(50) NOT NULL DEFAULT 'active', + taken_by VARCHAR(255), + taken_at DATETIME NOT NULL, + released_at DATETIME, + platform VARCHAR(255), + user_id VARCHAR(255), + user_name VARCHAR(255) + ) + """) + await self.ap.persistence_mgr.execute_async(sql_text) + + # Create indexes + for idx_sql in [ + 'CREATE INDEX IF NOT EXISTS idx_hts_session_id ON human_takeover_sessions (session_id)', + 'CREATE INDEX IF NOT EXISTS idx_hts_bot_uuid ON human_takeover_sessions (bot_uuid)', + 'CREATE INDEX IF NOT EXISTS idx_hts_status ON human_takeover_sessions (status)', + ]: + await self.ap.persistence_mgr.execute_async(sqlalchemy.text(idx_sql)) + + async def downgrade(self): + sql_text = sqlalchemy.text('DROP TABLE IF EXISTS human_takeover_sessions') + await self.ap.persistence_mgr.execute_async(sql_text) diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 6ae0f4c2..93df0747 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -220,6 +220,47 @@ class RuntimeBot: # Only add to query pool if no webhook requested to skip pipeline if not skip_pipeline: + # Check if session is under human takeover + person_session_id = f'person_{event.sender.id}' + if ( + hasattr(self.ap, 'human_takeover_service') + and self.ap.human_takeover_service + and self.ap.human_takeover_service.is_taken_over(person_session_id) + ): + # Session is taken over: record message to monitoring then stop + await self.logger.info( + f'Person message intercepted by human takeover for session {person_session_id}' + ) + try: + if hasattr(event.message_chain, 'model_dump'): + msg_content = json.dumps(event.message_chain.model_dump(), ensure_ascii=False) + else: + msg_content = str(event.message_chain) + + sender_name = None + if hasattr(event, 'sender') and hasattr(event.sender, 'nickname'): + sender_name = event.sender.nickname + + await self.ap.monitoring_service.record_message( + bot_id=self.bot_entity.uuid, + bot_name=self.bot_entity.name or self.bot_entity.uuid, + pipeline_id='__human_takeover__', + pipeline_name='Human Takeover', + message_content=msg_content, + session_id=person_session_id, + status='success', + level='info', + platform=adapter.__class__.__name__, + user_id=str(event.sender.id), + user_name=sender_name, + role='user', + ) + + await self.ap.monitoring_service.update_session_activity(person_session_id) + except Exception as e: + await self.logger.error(f'Failed to record takeover message: {e}') + return + launcher_id = event.sender.id if hasattr(adapter, 'get_launcher_id'): @@ -281,6 +322,50 @@ class RuntimeBot: # Only add to query pool if no webhook requested to skip pipeline if not skip_pipeline: + # Check if session is under human takeover + group_session_id = f'group_{event.group.id}' + if ( + hasattr(self.ap, 'human_takeover_service') + and self.ap.human_takeover_service + and self.ap.human_takeover_service.is_taken_over(group_session_id) + ): + # Session is taken over: record message to monitoring then stop + await self.logger.info( + f'Group message intercepted by human takeover for session {group_session_id}' + ) + try: + if hasattr(event.message_chain, 'model_dump'): + msg_content = json.dumps(event.message_chain.model_dump(), ensure_ascii=False) + else: + msg_content = str(event.message_chain) + + sender_name = None + if hasattr(event, 'sender'): + if hasattr(event.sender, 'member_name'): + sender_name = event.sender.member_name + elif hasattr(event.sender, 'nickname'): + sender_name = event.sender.nickname + + await self.ap.monitoring_service.record_message( + bot_id=self.bot_entity.uuid, + bot_name=self.bot_entity.name or self.bot_entity.uuid, + pipeline_id='__human_takeover__', + pipeline_name='Human Takeover', + message_content=msg_content, + session_id=group_session_id, + status='success', + level='info', + platform=adapter.__class__.__name__, + user_id=str(event.sender.id), + user_name=sender_name, + role='user', + ) + + await self.ap.monitoring_service.update_session_activity(group_session_id) + except Exception as e: + await self.logger.error(f'Failed to record takeover message: {e}') + return + launcher_id = event.group.id if hasattr(adapter, 'get_launcher_id'): diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index 4fad9069..f97255ab 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 25 +required_database_version = 26 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx index fbaf6fb3..231caeef 100644 --- a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx +++ b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx @@ -10,7 +10,7 @@ import { useTranslation } from 'react-i18next'; import { httpClient } from '@/app/infra/http/HttpClient'; import { ScrollArea } from '@/components/ui/scroll-area'; import { cn } from '@/lib/utils'; -import { Ban, Bot, Copy, Check, Workflow } from 'lucide-react'; +import { Ban, Bot, Copy, Check, Workflow, UserCheck, Send } from 'lucide-react'; import { MessageChainComponent, Plain, @@ -77,6 +77,16 @@ const BotSessionMonitor = forwardRef< const [copiedUserId, setCopiedUserId] = useState(false); const messagesContainerRef = useRef(null); + // Human takeover state + const [isTakenOver, setIsTakenOver] = useState(false); + const [takeoverLoading, setTakeoverLoading] = useState(false); + const [operatorMessage, setOperatorMessage] = useState(''); + const [sendingMessage, setSendingMessage] = useState(false); + // Track which sessions are taken over for showing badges in the list + const [takenOverSessions, setTakenOverSessions] = useState>( + new Set(), + ); + const parseSessionType = (sessionId: string): string | null => { const idx = sessionId.indexOf('_'); if (idx === -1) return null; @@ -109,6 +119,24 @@ const BotSessionMonitor = forwardRef< } }, [botId]); + // Load active takeover sessions to know which ones show a badge + const loadTakeoverStatus = useCallback(async () => { + try { + const response = await httpClient.getHumanTakeoverSessions({ + botUuid: botId, + }); + const activeIds = new Set(); + for (const session of response.sessions ?? []) { + if (session.status === 'active') { + activeIds.add(session.session_id); + } + } + setTakenOverSessions(activeIds); + } catch { + // Silently ignore — takeover feature may not be available + } + }, [botId]); + useImperativeHandle( ref, () => ({ @@ -133,17 +161,45 @@ const BotSessionMonitor = forwardRef< } }, []); + // Check takeover status for selected session + const checkTakeoverStatus = useCallback( + async (sessionId: string) => { + try { + const response = + await httpClient.getHumanTakeoverSessionDetail(sessionId); + const isActive = + response.found && response.session?.status === 'active'; + setIsTakenOver(isActive); + } catch { + setIsTakenOver(false); + } + }, + [], + ); + useEffect(() => { loadSessions(); - }, [loadSessions]); + loadTakeoverStatus(); + }, [loadSessions, loadTakeoverStatus]); useEffect(() => { if (selectedSessionId) { loadMessages(selectedSessionId); + checkTakeoverStatus(selectedSessionId); } else { setMessages([]); + setIsTakenOver(false); } - }, [selectedSessionId, loadMessages]); + }, [selectedSessionId, loadMessages, checkTakeoverStatus]); + + // Auto-refresh messages when session is taken over (polling) + useEffect(() => { + if (!selectedSessionId || !isTakenOver) return; + const interval = setInterval(() => { + loadMessages(selectedSessionId); + }, 3000); + return () => clearInterval(interval); + }, [selectedSessionId, isTakenOver, loadMessages]); useEffect(() => { if (messages.length === 0) return; @@ -160,6 +216,76 @@ const BotSessionMonitor = forwardRef< }); }, [messages]); + const handleTakeover = async () => { + if (!selectedSessionId || !selectedSession) return; + if (!confirm(t('bots.sessionMonitor.takeoverConfirm'))) return; + + setTakeoverLoading(true); + try { + await httpClient.takeoverSession(selectedSessionId, { + bot_uuid: botId, + platform: selectedSession.platform ?? undefined, + user_id: selectedSession.user_id ?? undefined, + user_name: selectedSession.user_name ?? undefined, + }); + setIsTakenOver(true); + setTakenOverSessions((prev) => new Set(prev).add(selectedSessionId)); + } catch (error) { + console.error('Takeover failed:', error); + alert(t('bots.sessionMonitor.takeoverFailed')); + } finally { + setTakeoverLoading(false); + } + }; + + const handleRelease = async () => { + if (!selectedSessionId) return; + if (!confirm(t('bots.sessionMonitor.releaseConfirm'))) return; + + setTakeoverLoading(true); + try { + await httpClient.releaseSession(selectedSessionId); + setIsTakenOver(false); + setTakenOverSessions((prev) => { + const next = new Set(prev); + next.delete(selectedSessionId); + return next; + }); + } catch (error) { + console.error('Release failed:', error); + alert(t('bots.sessionMonitor.releaseFailed')); + } finally { + setTakeoverLoading(false); + } + }; + + const handleSendMessage = async () => { + if (!selectedSessionId || !operatorMessage.trim()) return; + + setSendingMessage(true); + try { + await httpClient.sendTakeoverMessage( + selectedSessionId, + operatorMessage.trim(), + ); + setOperatorMessage(''); + // Reload messages to show the sent one + await loadMessages(selectedSessionId); + } catch (error) { + console.error('Send message failed:', error); + alert(t('bots.sessionMonitor.sendFailed')); + } finally { + setSendingMessage(false); + } + }; + + const handleMessageKeyDown = (e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + handleSendMessage(); + } + }; + const parseMessageChain = (content: string): MessageChainComponent[] => { try { const parsed = JSON.parse(content); @@ -173,11 +299,16 @@ const BotSessionMonitor = forwardRef< }; const isUserMessage = (msg: SessionMessage): boolean => { + if (msg.role === 'operator') return false; if (msg.role === 'assistant') return false; if (msg.role === 'user') return true; return !msg.runner_name; }; + const isOperatorMessage = (msg: SessionMessage): boolean => { + return msg.role === 'operator'; + }; + const renderMessageComponent = ( component: MessageChainComponent, index: number, @@ -243,7 +374,7 @@ const BotSessionMonitor = forwardRef< key={index} className="inline-flex items-center gap-1 text-muted-foreground text-xs" > - 🎙 [Voice] + [Voice] ); } @@ -277,7 +408,7 @@ const BotSessionMonitor = forwardRef< const file = component as MessageChainComponent & { name?: string }; return ( - 📎 {file.name || 'File'} + [{file.name || 'File'}] ); } @@ -337,6 +468,22 @@ const BotSessionMonitor = forwardRef< (s) => s.session_id === selectedSessionId, ); + const getMessageRoleLabel = (msg: SessionMessage): string => { + if (isOperatorMessage(msg)) { + return t('bots.sessionMonitor.operatorMessage', { + defaultValue: 'Operator', + }); + } + if (isUserMessage(msg)) { + return t('bots.sessionMonitor.userMessage', { + defaultValue: 'User', + }); + } + return t('bots.sessionMonitor.botMessage', { + defaultValue: 'Assistant', + }); + }; + return (
{/* Left Panel: Session List */} @@ -355,6 +502,9 @@ const BotSessionMonitor = forwardRef<
{sessions.map((session) => { const isSelected = selectedSessionId === session.session_id; + const sessionTakenOver = takenOverSessions.has( + session.session_id, + ); return ( + + )} + {isTakenOver ? ( + <> + · + + + {t('bots.sessionMonitor.takenOver', { + defaultValue: 'Taken Over', + })} + + + ) : ( + selectedSession?.is_active && ( + <> + · + + + Active + + + ) + )} +
-
- {parseSessionType(selectedSessionId) && ( - {parseSessionType(selectedSessionId)} - )} - {selectedSession?.platform && ( - <> - {parseSessionType(selectedSessionId) && ·} - {selectedSession.platform} - - )} - {selectedSession?.user_id && ( - <> - · - - {selectedSession.user_id} - - - - )} - {selectedSession?.is_active && ( - <> - · - - - Active - - + {/* Takeover / Release button */} +
+ {isTakenOver ? ( + + ) : ( + )}
@@ -481,6 +678,7 @@ const BotSessionMonitor = forwardRef< ) : ( messages.map((msg) => { const isUser = isUserMessage(msg); + const isOperator = isOperatorMessage(msg); const isDiscarded = msg.status === 'discarded' || msg.pipeline_id === PIPELINE_DISCARD; @@ -497,7 +695,9 @@ const BotSessionMonitor = forwardRef< 'max-w-3xl px-4 py-2.5 rounded-2xl text-sm', isUser ? 'bg-primary/10 rounded-br-sm' - : 'bg-muted rounded-bl-sm', + : isOperator + ? 'bg-orange-100/80 dark:bg-orange-900/30 rounded-bl-sm' + : 'bg-muted rounded-bl-sm', msg.status === 'error' && 'ring-1 ring-red-400/50', isDiscarded && 'opacity-60', )} @@ -509,14 +709,13 @@ const BotSessionMonitor = forwardRef< 'text-[11px] mt-1.5 flex items-center gap-1.5 text-muted-foreground', )} > - - {isUser - ? t('bots.sessionMonitor.userMessage', { - defaultValue: 'User', - }) - : t('bots.sessionMonitor.botMessage', { - defaultValue: 'Assistant', - })} + + {getMessageRoleLabel(msg)} {formatTime(msg.timestamp)} @@ -528,12 +727,21 @@ const BotSessionMonitor = forwardRef< defaultValue: 'Discarded', })} - ) : msg.pipeline_name ? ( + ) : msg.pipeline_name && + msg.pipeline_name !== 'Human Takeover' ? ( {msg.pipeline_name} ) : null} + {isOperator && ( + + + {t('bots.sessionMonitor.humanTakeover', { + defaultValue: 'Human Takeover', + })} + + )} {msg.status === 'error' && ( error )} @@ -551,6 +759,33 @@ const BotSessionMonitor = forwardRef< )} + + {/* Operator Message Input (only shown when session is taken over) */} + {isTakenOver && ( +
+
+ setOperatorMessage(e.target.value)} + onKeyDown={handleMessageKeyDown} + placeholder={t('bots.sessionMonitor.sendMessage', { + defaultValue: 'Send message as operator...', + })} + disabled={sendingMessage} + className="flex-1 h-9 px-3 rounded-md border bg-background text-sm placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-1 focus-visible:ring-ring disabled:opacity-50" + /> + +
+
+ )} )} diff --git a/web/src/app/infra/http/BackendClient.ts b/web/src/app/infra/http/BackendClient.ts index ecc0cce3..65c5c28a 100644 --- a/web/src/app/infra/http/BackendClient.ts +++ b/web/src/app/infra/http/BackendClient.ts @@ -1036,6 +1036,92 @@ export class BackendClient extends BaseHttpClient { return this.get(`/api/v1/monitoring/overview?${queryParams.toString()}`); } + // ============ Human Takeover API ============ + + public getHumanTakeoverSessions(params: { + botUuid?: string; + limit?: number; + offset?: number; + }): Promise<{ + sessions: Array<{ + id: string; + session_id: string; + bot_uuid: string; + status: string; + taken_by: string | null; + taken_at: string; + released_at: string | null; + platform: string | null; + user_id: string | null; + user_name: string | null; + }>; + total: number; + limit: number; + offset: number; + }> { + const queryParams = new URLSearchParams(); + if (params.botUuid) queryParams.append('botUuid', params.botUuid); + if (params.limit) queryParams.append('limit', params.limit.toString()); + if (params.offset) queryParams.append('offset', params.offset.toString()); + return this.get( + `/api/v1/human-takeover/sessions?${queryParams.toString()}`, + ); + } + + public getHumanTakeoverSessionDetail(sessionId: string): Promise<{ + found: boolean; + session_id?: string; + session?: { + id: string; + session_id: string; + bot_uuid: string; + status: string; + taken_by: string | null; + taken_at: string; + released_at: string | null; + platform: string | null; + user_id: string | null; + user_name: string | null; + }; + }> { + return this.get(`/api/v1/human-takeover/sessions/${sessionId}`); + } + + public takeoverSession( + sessionId: string, + params: { + bot_uuid: string; + platform?: string; + user_id?: string; + user_name?: string; + }, + ): Promise { + return this.post( + `/api/v1/human-takeover/sessions/${sessionId}/takeover`, + params, + ); + } + + public releaseSession(sessionId: string): Promise { + return this.post( + `/api/v1/human-takeover/sessions/${sessionId}/release`, + {}, + ); + } + + public sendTakeoverMessage( + sessionId: string, + message: string, + ): Promise<{ + session_id: string; + message_sent: boolean; + }> { + return this.post( + `/api/v1/human-takeover/sessions/${sessionId}/message`, + { message }, + ); + } + // ============ Survey API ============ public getSurveyPending(): Promise<{ survey: { diff --git a/web/src/i18n/locales/en-US.ts b/web/src/i18n/locales/en-US.ts index 192165f2..69191a71 100644 --- a/web/src/i18n/locales/en-US.ts +++ b/web/src/i18n/locales/en-US.ts @@ -391,6 +391,20 @@ const enUS = { discarded: 'Discarded', userMessage: 'User', botMessage: 'Assistant', + operatorMessage: 'Operator', + humanTakeover: 'Human Takeover', + takeoverBtn: 'Take Over', + releaseBtn: 'Release', + takeoverConfirm: 'Take over this session? The AI bot will stop responding until released.', + releaseConfirm: 'Release this session? The AI bot will resume responding.', + takeoverSuccess: 'Session taken over successfully', + releaseSuccess: 'Session released successfully', + takeoverFailed: 'Failed to take over session', + releaseFailed: 'Failed to release session', + sendMessage: 'Send message as operator...', + sendBtn: 'Send', + sendFailed: 'Failed to send message', + takenOver: 'Taken Over', }, }, plugins: { diff --git a/web/src/i18n/locales/es-ES.ts b/web/src/i18n/locales/es-ES.ts index 4a296276..25d7bc67 100644 --- a/web/src/i18n/locales/es-ES.ts +++ b/web/src/i18n/locales/es-ES.ts @@ -401,6 +401,20 @@ const esES = { discarded: 'Descartado', userMessage: 'Usuario', botMessage: 'Asistente', + operatorMessage: 'Operador', + humanTakeover: 'Toma de control humana', + takeoverBtn: 'Tomar control', + releaseBtn: 'Liberar', + takeoverConfirm: '¿Tomar control de esta sesión? El bot de IA dejará de responder hasta que se libere.', + releaseConfirm: '¿Liberar esta sesión? El bot de IA reanudará las respuestas.', + takeoverSuccess: 'Sesión tomada exitosamente', + releaseSuccess: 'Sesión liberada exitosamente', + takeoverFailed: 'Error al tomar control de la sesión', + releaseFailed: 'Error al liberar la sesión', + sendMessage: 'Enviar mensaje como operador...', + sendBtn: 'Enviar', + sendFailed: 'Error al enviar el mensaje', + takenOver: 'Tomada', }, }, plugins: { diff --git a/web/src/i18n/locales/ja-JP.ts b/web/src/i18n/locales/ja-JP.ts index 7153379b..de78f68d 100644 --- a/web/src/i18n/locales/ja-JP.ts +++ b/web/src/i18n/locales/ja-JP.ts @@ -392,6 +392,20 @@ discarded: '破棄済み', userMessage: 'ユーザー', botMessage: 'アシスタント', + operatorMessage: 'オペレーター', + humanTakeover: '有人対応', + takeoverBtn: '引き継ぐ', + releaseBtn: '解除', + takeoverConfirm: 'このセッションを引き継ぎますか?解除するまでAIボットは応答を停止します。', + releaseConfirm: 'このセッションを解除しますか?AIボットが応答を再開します。', + takeoverSuccess: 'セッションの引き継ぎに成功しました', + releaseSuccess: 'セッションの解除に成功しました', + takeoverFailed: 'セッションの引き継ぎに失敗しました', + releaseFailed: 'セッションの解除に失敗しました', + sendMessage: 'オペレーターとしてメッセージを送信...', + sendBtn: '送信', + sendFailed: 'メッセージの送信に失敗しました', + takenOver: '引き継ぎ中', }, }, plugins: { diff --git a/web/src/i18n/locales/th-TH.ts b/web/src/i18n/locales/th-TH.ts index 361facab..71d6cccd 100644 --- a/web/src/i18n/locales/th-TH.ts +++ b/web/src/i18n/locales/th-TH.ts @@ -386,6 +386,20 @@ const thTH = { discarded: 'ถูกละทิ้ง', userMessage: 'ผู้ใช้', botMessage: 'ผู้ช่วย', + operatorMessage: 'เจ้าหน้าที่', + humanTakeover: 'เจ้าหน้าที่รับช่วง', + takeoverBtn: 'รับช่วง', + releaseBtn: 'ปล่อย', + takeoverConfirm: 'รับช่วงเซสชันนี้หรือไม่? บอท AI จะหยุดตอบจนกว่าจะปล่อย', + releaseConfirm: 'ปล่อยเซสชันนี้หรือไม่? บอท AI จะกลับมาตอบอีกครั้ง', + takeoverSuccess: 'รับช่วงเซสชันสำเร็จ', + releaseSuccess: 'ปล่อยเซสชันสำเร็จ', + takeoverFailed: 'รับช่วงเซสชันล้มเหลว', + releaseFailed: 'ปล่อยเซสชันล้มเหลว', + sendMessage: 'ส่งข้อความในฐานะเจ้าหน้าที่...', + sendBtn: 'ส่ง', + sendFailed: 'ส่งข้อความล้มเหลว', + takenOver: 'ถูกรับช่วงแล้ว', }, }, plugins: { diff --git a/web/src/i18n/locales/vi-VN.ts b/web/src/i18n/locales/vi-VN.ts index 22bff3e4..d330ab52 100644 --- a/web/src/i18n/locales/vi-VN.ts +++ b/web/src/i18n/locales/vi-VN.ts @@ -395,6 +395,20 @@ const viVN = { discarded: 'Đã loại bỏ', userMessage: 'Người dùng', botMessage: 'Trợ lý', + operatorMessage: 'Nhân viên', + humanTakeover: 'Tiếp nhận thủ công', + takeoverBtn: 'Tiếp nhận', + releaseBtn: 'Giải phóng', + takeoverConfirm: 'Tiếp nhận phiên này? Bot AI sẽ ngừng phản hồi cho đến khi được giải phóng.', + releaseConfirm: 'Giải phóng phiên này? Bot AI sẽ tiếp tục phản hồi.', + takeoverSuccess: 'Tiếp nhận phiên thành công', + releaseSuccess: 'Giải phóng phiên thành công', + takeoverFailed: 'Tiếp nhận phiên thất bại', + releaseFailed: 'Giải phóng phiên thất bại', + sendMessage: 'Gửi tin nhắn với tư cách nhân viên...', + sendBtn: 'Gửi', + sendFailed: 'Gửi tin nhắn thất bại', + takenOver: 'Đã tiếp nhận', }, }, plugins: { diff --git a/web/src/i18n/locales/zh-Hans.ts b/web/src/i18n/locales/zh-Hans.ts index 7822bcfa..2950b232 100644 --- a/web/src/i18n/locales/zh-Hans.ts +++ b/web/src/i18n/locales/zh-Hans.ts @@ -376,6 +376,20 @@ const zhHans = { discarded: '已丢弃', userMessage: '用户', botMessage: '助手', + operatorMessage: '人工客服', + humanTakeover: '人工接管', + takeoverBtn: '接管', + releaseBtn: '释放', + takeoverConfirm: '确定接管此会话?AI 机器人将停止回复,直到释放。', + releaseConfirm: '确定释放此会话?AI 机器人将恢复回复。', + takeoverSuccess: '会话接管成功', + releaseSuccess: '会话释放成功', + takeoverFailed: '会话接管失败', + releaseFailed: '会话释放失败', + sendMessage: '以人工客服身份发送消息...', + sendBtn: '发送', + sendFailed: '消息发送失败', + takenOver: '已接管', }, }, plugins: { diff --git a/web/src/i18n/locales/zh-Hant.ts b/web/src/i18n/locales/zh-Hant.ts index e100dbd1..87ba5939 100644 --- a/web/src/i18n/locales/zh-Hant.ts +++ b/web/src/i18n/locales/zh-Hant.ts @@ -371,6 +371,20 @@ const zhHant = { discarded: '已丟棄', userMessage: '使用者', botMessage: '助手', + operatorMessage: '人工客服', + humanTakeover: '人工接管', + takeoverBtn: '接管', + releaseBtn: '釋放', + takeoverConfirm: '確定接管此會話?AI 機器人將停止回覆,直到釋放。', + releaseConfirm: '確定釋放此會話?AI 機器人將恢復回覆。', + takeoverSuccess: '會話接管成功', + releaseSuccess: '會話釋放成功', + takeoverFailed: '會話接管失敗', + releaseFailed: '會話釋放失敗', + sendMessage: '以人工客服身份發送訊息...', + sendBtn: '發送', + sendFailed: '訊息發送失敗', + takenOver: '已接管', }, }, plugins: {