diff --git a/src/langbot/pkg/api/http/controller/groups/human_takeover.py b/src/langbot/pkg/api/http/controller/groups/human_takeover.py new file mode 100644 index 00000000..32e23f6b --- /dev/null +++ b/src/langbot/pkg/api/http/controller/groups/human_takeover.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import quart + +from .. import group + + +@group.group_class('human-takeover', '/api/v1/human-takeover') +class HumanTakeoverRouterGroup(group.RouterGroup): + async def initialize(self) -> None: + @self.route('/sessions', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_sessions(): + """Get list of takeover sessions, optionally filtered by bot UUID.""" + bot_uuid = quart.request.args.get('botUuid') + limit = int(quart.request.args.get('limit', 100)) + offset = int(quart.request.args.get('offset', 0)) + + sessions, total = await self.ap.human_takeover_service.get_active_sessions( + bot_uuid=bot_uuid if bot_uuid else None, + limit=limit, + offset=offset, + ) + + return self.success( + data={ + 'sessions': sessions, + 'total': total, + 'limit': limit, + 'offset': offset, + } + ) + + @self.route('/sessions/', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_session_detail(session_id: str): + """Get detail for a specific takeover session.""" + detail = await self.ap.human_takeover_service.get_session_detail(session_id) + if not detail: + return self.success(data={'found': False, 'session_id': session_id}) + return self.success(data={'found': True, 'session': detail}) + + @self.route('/sessions//takeover', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) + async def takeover_session(session_id: str, user_email: str = None): + """Take over a conversation session.""" + data = await quart.request.get_json(silent=True) or {} + + bot_uuid = data.get('bot_uuid') + if not bot_uuid: + return self.fail(-1, 'bot_uuid is required') + + platform = data.get('platform') + user_id = data.get('user_id') + user_name = data.get('user_name') + + try: + result = await self.ap.human_takeover_service.takeover_session( + session_id=session_id, + bot_uuid=bot_uuid, + taken_by=user_email or data.get('taken_by'), + platform=platform, + user_id=user_id, + user_name=user_name, + ) + return self.success(data=result) + except ValueError as e: + return self.fail(-1, str(e)) + + @self.route('/sessions//release', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) + async def release_session(session_id: str): + """Release a taken-over session back to AI pipeline.""" + try: + result = await self.ap.human_takeover_service.release_session(session_id) + return self.success(data=result) + except ValueError as e: + return self.fail(-1, str(e)) + + @self.route('/sessions//message', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) + async def send_message(session_id: str, user_email: str = None): + """Send a message from the operator to the user.""" + data = await quart.request.get_json(silent=True) or {} + + message_text = data.get('message') + if not message_text: + return self.fail(-1, 'message is required') + + operator_name = user_email or data.get('operator_name', 'Operator') + + try: + result = await self.ap.human_takeover_service.send_message( + session_id=session_id, + message_text=message_text, + operator_name=operator_name, + ) + return self.success(data=result) + except ValueError as e: + return self.fail(-1, str(e)) + except RuntimeError as e: + return self.fail(-2, str(e)) diff --git a/src/langbot/pkg/api/http/service/human_takeover.py b/src/langbot/pkg/api/http/service/human_takeover.py new file mode 100644 index 00000000..e9dd652a --- /dev/null +++ b/src/langbot/pkg/api/http/service/human_takeover.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import uuid +import datetime +import json +import logging + +import sqlalchemy + +from ....core import app +from ....entity.persistence import human_takeover as persistence_human_takeover + +import langbot_plugin.api.entities.builtin.platform.message as platform_message + + +class HumanTakeoverService: + """Human takeover service. + + Manages operator takeover of user conversation sessions, bypassing + the normal AI pipeline. Uses an in-memory cache for fast synchronous + lookups on the hot message path, backed by database persistence. + """ + + ap: app.Application + + # In-memory cache: session_id -> HumanTakeoverSession record id + # Only contains sessions with status='active' + _active_sessions: dict[str, str] + + logger: logging.Logger + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + self._active_sessions = {} + self.logger = logging.getLogger('human-takeover') + + async def initialize(self) -> None: + """Load active takeover sessions from DB into memory cache.""" + try: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession).where( + persistence_human_takeover.HumanTakeoverSession.status == 'active' + ) + ) + rows = result.all() + for row in rows: + session = row[0] if isinstance(row, tuple) else row + self._active_sessions[session.session_id] = session.id + self.logger.info(f'Loaded {len(self._active_sessions)} active takeover sessions from DB') + except Exception as e: + self.logger.warning(f'Failed to load active takeover sessions: {e}') + + def is_taken_over(self, session_id: str) -> bool: + """Check if a session is currently under human takeover. + + This is a synchronous in-memory lookup for performance, since it + is called on every incoming message (hot path). + """ + return session_id in self._active_sessions + + async def takeover_session( + self, + session_id: str, + bot_uuid: str, + taken_by: str | None = None, + platform: str | None = None, + user_id: str | None = None, + user_name: str | None = None, + ) -> dict: + """Take over a conversation session. + + Args: + session_id: The session to take over (e.g. 'person_123' or 'group_456'). + bot_uuid: UUID of the bot whose session is being taken over. + taken_by: Email/username of the admin performing the takeover. + platform: Platform name. + user_id: The end-user's ID in the session. + user_name: The end-user's display name. + + Returns: + Dict with the created takeover session record. + + Raises: + ValueError: If the session is already taken over. + """ + if self.is_taken_over(session_id): + raise ValueError(f'Session {session_id} is already taken over') + + record_id = str(uuid.uuid4()) + now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + record_data = { + 'id': record_id, + 'session_id': session_id, + 'bot_uuid': bot_uuid, + 'status': 'active', + 'taken_by': taken_by, + 'taken_at': now, + 'released_at': None, + 'platform': platform, + 'user_id': user_id, + 'user_name': user_name, + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_human_takeover.HumanTakeoverSession).values(record_data) + ) + + # Update in-memory cache + self._active_sessions[session_id] = record_id + + self.logger.info(f'Session {session_id} taken over by {taken_by}') + + return record_data + + async def release_session(self, session_id: str) -> dict: + """Release a taken-over session back to AI pipeline processing. + + Args: + session_id: The session to release. + + Returns: + Dict with the updated takeover session record. + + Raises: + ValueError: If the session is not currently taken over. + """ + if not self.is_taken_over(session_id): + raise ValueError(f'Session {session_id} is not currently taken over') + + now = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.update(persistence_human_takeover.HumanTakeoverSession) + .where( + sqlalchemy.and_( + persistence_human_takeover.HumanTakeoverSession.session_id == session_id, + persistence_human_takeover.HumanTakeoverSession.status == 'active', + ) + ) + .values(status='released', released_at=now) + ) + + # Remove from in-memory cache + self._active_sessions.pop(session_id, None) + + self.logger.info(f'Session {session_id} released back to AI pipeline') + + return { + 'session_id': session_id, + 'status': 'released', + 'released_at': now.isoformat(), + } + + async def send_message( + self, + session_id: str, + message_text: str, + operator_name: str | None = None, + ) -> dict: + """Send a message from the operator to the user via the platform adapter. + + Args: + session_id: The taken-over session ID (e.g. 'person_123' or 'group_456'). + message_text: The text message to send. + operator_name: Name of the operator sending the message. + + Returns: + Dict with send result info. + + Raises: + ValueError: If the session is not currently taken over. + RuntimeError: If the bot or adapter cannot be found. + """ + if not self.is_taken_over(session_id): + raise ValueError(f'Session {session_id} is not currently taken over') + + # Look up the takeover record to get bot_uuid + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession).where( + sqlalchemy.and_( + persistence_human_takeover.HumanTakeoverSession.session_id == session_id, + persistence_human_takeover.HumanTakeoverSession.status == 'active', + ) + ) + ) + row = result.first() + if not row: + raise RuntimeError(f'Active takeover record not found for session {session_id}') + + takeover_record = row[0] if isinstance(row, tuple) else row + bot_uuid = takeover_record.bot_uuid + + # Get the runtime bot + runtime_bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid) + if not runtime_bot: + raise RuntimeError(f'Bot {bot_uuid} not found or not running') + + # Parse session_id to determine target_type and target_id + # Format: 'person_{id}' or 'group_{id}' + if session_id.startswith('person_'): + target_type = 'person' + target_id = session_id[len('person_') :] + elif session_id.startswith('group_'): + target_type = 'group' + target_id = session_id[len('group_') :] + else: + raise ValueError(f'Invalid session_id format: {session_id}') + + # Build message chain + message_chain = platform_message.MessageChain([platform_message.Plain(text=message_text)]) + + # Send via adapter + await runtime_bot.adapter.send_message(target_type, target_id, message_chain) + + # Record the operator message in monitoring + bot_name = runtime_bot.bot_entity.name or bot_uuid + try: + message_content = json.dumps(message_chain.model_dump(), ensure_ascii=False) + except Exception: + message_content = message_text + + await self.ap.monitoring_service.record_message( + bot_id=bot_uuid, + bot_name=bot_name, + pipeline_id='__human_takeover__', + pipeline_name='Human Takeover', + message_content=message_content, + session_id=session_id, + status='success', + level='info', + platform=takeover_record.platform, + user_id=operator_name or 'operator', + user_name=operator_name or 'Operator', + role='operator', + ) + + self.logger.info(f'Operator message sent to session {session_id}: {message_text[:50]}...') + + return { + 'session_id': session_id, + 'message_sent': True, + } + + async def get_active_sessions( + self, + bot_uuid: str | None = None, + limit: int = 100, + offset: int = 0, + ) -> tuple[list[dict], int]: + """Get list of active (or all) takeover sessions. + + Args: + bot_uuid: Optional filter by bot UUID. + limit: Maximum number of results. + offset: Pagination offset. + + Returns: + Tuple of (list of session dicts, total count). + """ + conditions = [] + + if bot_uuid: + conditions.append(persistence_human_takeover.HumanTakeoverSession.bot_uuid == bot_uuid) + + # Count + count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_human_takeover.HumanTakeoverSession.id)) + if conditions: + count_query = count_query.where(sqlalchemy.and_(*conditions)) + + count_result = await self.ap.persistence_mgr.execute_async(count_query) + total = count_result.scalar() or 0 + + # Fetch records + query = sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession).order_by( + persistence_human_takeover.HumanTakeoverSession.taken_at.desc() + ) + if conditions: + query = query.where(sqlalchemy.and_(*conditions)) + + query = query.limit(limit).offset(offset) + + result = await self.ap.persistence_mgr.execute_async(query) + rows = result.all() + + sessions = [] + for row in rows: + session = row[0] if isinstance(row, tuple) else row + sessions.append( + self.ap.persistence_mgr.serialize_model(persistence_human_takeover.HumanTakeoverSession, session) + ) + + return sessions, total + + async def get_session_detail(self, session_id: str) -> dict | None: + """Get detail for a specific takeover session. + + Args: + session_id: The session ID to look up. + + Returns: + Session dict or None if not found. + """ + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_human_takeover.HumanTakeoverSession) + .where(persistence_human_takeover.HumanTakeoverSession.session_id == session_id) + .order_by(persistence_human_takeover.HumanTakeoverSession.taken_at.desc()) + ) + row = result.first() + if not row: + return None + + session = row[0] if isinstance(row, tuple) else row + return self.ap.persistence_mgr.serialize_model(persistence_human_takeover.HumanTakeoverSession, session) diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index e515cfb9..6912a1c8 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -31,6 +31,7 @@ from ..api.http.service import mcp as mcp_service from ..api.http.service import apikey as apikey_service from ..api.http.service import webhook as webhook_service from ..api.http.service import monitoring as monitoring_service +from ..api.http.service import human_takeover as human_takeover_service from ..discover import engine as discover_engine from ..storage import mgr as storagemgr @@ -153,6 +154,8 @@ class Application: monitoring_service: monitoring_service.MonitoringService = None + human_takeover_service: human_takeover_service.HumanTakeoverService = None + def __init__(self): pass diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 62f0ae7b..74193d31 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -28,6 +28,7 @@ from ...api.http.service import mcp as mcp_service from ...api.http.service import apikey as apikey_service from ...api.http.service import webhook as webhook_service from ...api.http.service import monitoring as monitoring_service +from ...api.http.service import human_takeover as human_takeover_service from ...discover import engine as discover_engine from ...storage import mgr as storagemgr from ...utils import logcache @@ -164,6 +165,10 @@ class BuildAppStage(stage.BootingStage): monitoring_service_inst = monitoring_service.MonitoringService(ap) ap.monitoring_service = monitoring_service_inst + human_takeover_service_inst = human_takeover_service.HumanTakeoverService(ap) + await human_takeover_service_inst.initialize() + ap.human_takeover_service = human_takeover_service_inst + async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None: await asyncio.sleep(3) await plugin_connector_inst.initialize() diff --git a/src/langbot/pkg/entity/persistence/human_takeover.py b/src/langbot/pkg/entity/persistence/human_takeover.py new file mode 100644 index 00000000..e1857a84 --- /dev/null +++ b/src/langbot/pkg/entity/persistence/human_takeover.py @@ -0,0 +1,36 @@ +import sqlalchemy + +from .base import Base + + +class HumanTakeoverSession(Base): + """Human takeover session records. + + Tracks which conversation sessions are currently under human operator control, + bypassing the normal AI pipeline processing. + """ + + __tablename__ = 'human_takeover_sessions' + + id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, unique=True, index=True) + """Corresponds to monitoring_sessions.session_id, format: 'person_{id}' or 'group_{id}'""" + + bot_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) + """UUID of the bot whose session is being taken over""" + + status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, default='active', index=True) + """Takeover status: 'active' or 'released'""" + + taken_by = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Email/username of the admin who took over the session""" + + taken_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False) + """Timestamp when the takeover started""" + + released_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True) + """Timestamp when the takeover was released (null if still active)""" + + platform = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + user_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) diff --git a/src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py b/src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py new file mode 100644 index 00000000..699e50d2 --- /dev/null +++ b/src/langbot/pkg/persistence/migrations/dbm026_human_takeover_sessions.py @@ -0,0 +1,36 @@ +import sqlalchemy +from .. import migration + + +@migration.migration_class(26) +class DBMigrateHumanTakeoverSessions(migration.DBMigration): + """Create human_takeover_sessions table for human operator takeover support""" + + async def upgrade(self): + sql_text = sqlalchemy.text(""" + CREATE TABLE IF NOT EXISTS human_takeover_sessions ( + id VARCHAR(255) PRIMARY KEY, + session_id VARCHAR(255) NOT NULL UNIQUE, + bot_uuid VARCHAR(255) NOT NULL, + status VARCHAR(50) NOT NULL DEFAULT 'active', + taken_by VARCHAR(255), + taken_at DATETIME NOT NULL, + released_at DATETIME, + platform VARCHAR(255), + user_id VARCHAR(255), + user_name VARCHAR(255) + ) + """) + await self.ap.persistence_mgr.execute_async(sql_text) + + # Create indexes + for idx_sql in [ + 'CREATE INDEX IF NOT EXISTS idx_hts_session_id ON human_takeover_sessions (session_id)', + 'CREATE INDEX IF NOT EXISTS idx_hts_bot_uuid ON human_takeover_sessions (bot_uuid)', + 'CREATE INDEX IF NOT EXISTS idx_hts_status ON human_takeover_sessions (status)', + ]: + await self.ap.persistence_mgr.execute_async(sqlalchemy.text(idx_sql)) + + async def downgrade(self): + sql_text = sqlalchemy.text('DROP TABLE IF EXISTS human_takeover_sessions') + await self.ap.persistence_mgr.execute_async(sql_text) diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 6ae0f4c2..93df0747 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -220,6 +220,47 @@ class RuntimeBot: # Only add to query pool if no webhook requested to skip pipeline if not skip_pipeline: + # Check if session is under human takeover + person_session_id = f'person_{event.sender.id}' + if ( + hasattr(self.ap, 'human_takeover_service') + and self.ap.human_takeover_service + and self.ap.human_takeover_service.is_taken_over(person_session_id) + ): + # Session is taken over: record message to monitoring then stop + await self.logger.info( + f'Person message intercepted by human takeover for session {person_session_id}' + ) + try: + if hasattr(event.message_chain, 'model_dump'): + msg_content = json.dumps(event.message_chain.model_dump(), ensure_ascii=False) + else: + msg_content = str(event.message_chain) + + sender_name = None + if hasattr(event, 'sender') and hasattr(event.sender, 'nickname'): + sender_name = event.sender.nickname + + await self.ap.monitoring_service.record_message( + bot_id=self.bot_entity.uuid, + bot_name=self.bot_entity.name or self.bot_entity.uuid, + pipeline_id='__human_takeover__', + pipeline_name='Human Takeover', + message_content=msg_content, + session_id=person_session_id, + status='success', + level='info', + platform=adapter.__class__.__name__, + user_id=str(event.sender.id), + user_name=sender_name, + role='user', + ) + + await self.ap.monitoring_service.update_session_activity(person_session_id) + except Exception as e: + await self.logger.error(f'Failed to record takeover message: {e}') + return + launcher_id = event.sender.id if hasattr(adapter, 'get_launcher_id'): @@ -281,6 +322,50 @@ class RuntimeBot: # Only add to query pool if no webhook requested to skip pipeline if not skip_pipeline: + # Check if session is under human takeover + group_session_id = f'group_{event.group.id}' + if ( + hasattr(self.ap, 'human_takeover_service') + and self.ap.human_takeover_service + and self.ap.human_takeover_service.is_taken_over(group_session_id) + ): + # Session is taken over: record message to monitoring then stop + await self.logger.info( + f'Group message intercepted by human takeover for session {group_session_id}' + ) + try: + if hasattr(event.message_chain, 'model_dump'): + msg_content = json.dumps(event.message_chain.model_dump(), ensure_ascii=False) + else: + msg_content = str(event.message_chain) + + sender_name = None + if hasattr(event, 'sender'): + if hasattr(event.sender, 'member_name'): + sender_name = event.sender.member_name + elif hasattr(event.sender, 'nickname'): + sender_name = event.sender.nickname + + await self.ap.monitoring_service.record_message( + bot_id=self.bot_entity.uuid, + bot_name=self.bot_entity.name or self.bot_entity.uuid, + pipeline_id='__human_takeover__', + pipeline_name='Human Takeover', + message_content=msg_content, + session_id=group_session_id, + status='success', + level='info', + platform=adapter.__class__.__name__, + user_id=str(event.sender.id), + user_name=sender_name, + role='user', + ) + + await self.ap.monitoring_service.update_session_activity(group_session_id) + except Exception as e: + await self.logger.error(f'Failed to record takeover message: {e}') + return + launcher_id = event.group.id if hasattr(adapter, 'get_launcher_id'): diff --git a/src/langbot/pkg/utils/constants.py b/src/langbot/pkg/utils/constants.py index 4fad9069..f97255ab 100644 --- a/src/langbot/pkg/utils/constants.py +++ b/src/langbot/pkg/utils/constants.py @@ -2,7 +2,7 @@ import langbot semantic_version = f'v{langbot.__version__}' -required_database_version = 25 +required_database_version = 26 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx index fbaf6fb3..231caeef 100644 --- a/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx +++ b/web/src/app/home/bots/components/bot-session/BotSessionMonitor.tsx @@ -10,7 +10,7 @@ import { useTranslation } from 'react-i18next'; import { httpClient } from '@/app/infra/http/HttpClient'; import { ScrollArea } from '@/components/ui/scroll-area'; import { cn } from '@/lib/utils'; -import { Ban, Bot, Copy, Check, Workflow } from 'lucide-react'; +import { Ban, Bot, Copy, Check, Workflow, UserCheck, Send } from 'lucide-react'; import { MessageChainComponent, Plain, @@ -77,6 +77,16 @@ const BotSessionMonitor = forwardRef< const [copiedUserId, setCopiedUserId] = useState(false); const messagesContainerRef = useRef(null); + // Human takeover state + const [isTakenOver, setIsTakenOver] = useState(false); + const [takeoverLoading, setTakeoverLoading] = useState(false); + const [operatorMessage, setOperatorMessage] = useState(''); + const [sendingMessage, setSendingMessage] = useState(false); + // Track which sessions are taken over for showing badges in the list + const [takenOverSessions, setTakenOverSessions] = useState>( + new Set(), + ); + const parseSessionType = (sessionId: string): string | null => { const idx = sessionId.indexOf('_'); if (idx === -1) return null; @@ -109,6 +119,24 @@ const BotSessionMonitor = forwardRef< } }, [botId]); + // Load active takeover sessions to know which ones show a badge + const loadTakeoverStatus = useCallback(async () => { + try { + const response = await httpClient.getHumanTakeoverSessions({ + botUuid: botId, + }); + const activeIds = new Set(); + for (const session of response.sessions ?? []) { + if (session.status === 'active') { + activeIds.add(session.session_id); + } + } + setTakenOverSessions(activeIds); + } catch { + // Silently ignore — takeover feature may not be available + } + }, [botId]); + useImperativeHandle( ref, () => ({ @@ -133,17 +161,45 @@ const BotSessionMonitor = forwardRef< } }, []); + // Check takeover status for selected session + const checkTakeoverStatus = useCallback( + async (sessionId: string) => { + try { + const response = + await httpClient.getHumanTakeoverSessionDetail(sessionId); + const isActive = + response.found && response.session?.status === 'active'; + setIsTakenOver(isActive); + } catch { + setIsTakenOver(false); + } + }, + [], + ); + useEffect(() => { loadSessions(); - }, [loadSessions]); + loadTakeoverStatus(); + }, [loadSessions, loadTakeoverStatus]); useEffect(() => { if (selectedSessionId) { loadMessages(selectedSessionId); + checkTakeoverStatus(selectedSessionId); } else { setMessages([]); + setIsTakenOver(false); } - }, [selectedSessionId, loadMessages]); + }, [selectedSessionId, loadMessages, checkTakeoverStatus]); + + // Auto-refresh messages when session is taken over (polling) + useEffect(() => { + if (!selectedSessionId || !isTakenOver) return; + const interval = setInterval(() => { + loadMessages(selectedSessionId); + }, 3000); + return () => clearInterval(interval); + }, [selectedSessionId, isTakenOver, loadMessages]); useEffect(() => { if (messages.length === 0) return; @@ -160,6 +216,76 @@ const BotSessionMonitor = forwardRef< }); }, [messages]); + const handleTakeover = async () => { + if (!selectedSessionId || !selectedSession) return; + if (!confirm(t('bots.sessionMonitor.takeoverConfirm'))) return; + + setTakeoverLoading(true); + try { + await httpClient.takeoverSession(selectedSessionId, { + bot_uuid: botId, + platform: selectedSession.platform ?? undefined, + user_id: selectedSession.user_id ?? undefined, + user_name: selectedSession.user_name ?? undefined, + }); + setIsTakenOver(true); + setTakenOverSessions((prev) => new Set(prev).add(selectedSessionId)); + } catch (error) { + console.error('Takeover failed:', error); + alert(t('bots.sessionMonitor.takeoverFailed')); + } finally { + setTakeoverLoading(false); + } + }; + + const handleRelease = async () => { + if (!selectedSessionId) return; + if (!confirm(t('bots.sessionMonitor.releaseConfirm'))) return; + + setTakeoverLoading(true); + try { + await httpClient.releaseSession(selectedSessionId); + setIsTakenOver(false); + setTakenOverSessions((prev) => { + const next = new Set(prev); + next.delete(selectedSessionId); + return next; + }); + } catch (error) { + console.error('Release failed:', error); + alert(t('bots.sessionMonitor.releaseFailed')); + } finally { + setTakeoverLoading(false); + } + }; + + const handleSendMessage = async () => { + if (!selectedSessionId || !operatorMessage.trim()) return; + + setSendingMessage(true); + try { + await httpClient.sendTakeoverMessage( + selectedSessionId, + operatorMessage.trim(), + ); + setOperatorMessage(''); + // Reload messages to show the sent one + await loadMessages(selectedSessionId); + } catch (error) { + console.error('Send message failed:', error); + alert(t('bots.sessionMonitor.sendFailed')); + } finally { + setSendingMessage(false); + } + }; + + const handleMessageKeyDown = (e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + handleSendMessage(); + } + }; + const parseMessageChain = (content: string): MessageChainComponent[] => { try { const parsed = JSON.parse(content); @@ -173,11 +299,16 @@ const BotSessionMonitor = forwardRef< }; const isUserMessage = (msg: SessionMessage): boolean => { + if (msg.role === 'operator') return false; if (msg.role === 'assistant') return false; if (msg.role === 'user') return true; return !msg.runner_name; }; + const isOperatorMessage = (msg: SessionMessage): boolean => { + return msg.role === 'operator'; + }; + const renderMessageComponent = ( component: MessageChainComponent, index: number, @@ -243,7 +374,7 @@ const BotSessionMonitor = forwardRef< key={index} className="inline-flex items-center gap-1 text-muted-foreground text-xs" > - 🎙 [Voice] + [Voice] ); } @@ -277,7 +408,7 @@ const BotSessionMonitor = forwardRef< const file = component as MessageChainComponent & { name?: string }; return ( - 📎 {file.name || 'File'} + [{file.name || 'File'}] ); } @@ -337,6 +468,22 @@ const BotSessionMonitor = forwardRef< (s) => s.session_id === selectedSessionId, ); + const getMessageRoleLabel = (msg: SessionMessage): string => { + if (isOperatorMessage(msg)) { + return t('bots.sessionMonitor.operatorMessage', { + defaultValue: 'Operator', + }); + } + if (isUserMessage(msg)) { + return t('bots.sessionMonitor.userMessage', { + defaultValue: 'User', + }); + } + return t('bots.sessionMonitor.botMessage', { + defaultValue: 'Assistant', + }); + }; + return (
{/* Left Panel: Session List */} @@ -355,6 +502,9 @@ const BotSessionMonitor = forwardRef<
{sessions.map((session) => { const isSelected = selectedSessionId === session.session_id; + const sessionTakenOver = takenOverSessions.has( + session.session_id, + ); return ( + + )} + {isTakenOver ? ( + <> + · + + + {t('bots.sessionMonitor.takenOver', { + defaultValue: 'Taken Over', + })} + + + ) : ( + selectedSession?.is_active && ( + <> + · + + + Active + + + ) + )} +
-
- {parseSessionType(selectedSessionId) && ( - {parseSessionType(selectedSessionId)} - )} - {selectedSession?.platform && ( - <> - {parseSessionType(selectedSessionId) && ·} - {selectedSession.platform} - - )} - {selectedSession?.user_id && ( - <> - · - - {selectedSession.user_id} - - - - )} - {selectedSession?.is_active && ( - <> - · - - - Active - - + {/* Takeover / Release button */} +
+ {isTakenOver ? ( + + ) : ( + )}
@@ -481,6 +678,7 @@ const BotSessionMonitor = forwardRef< ) : ( messages.map((msg) => { const isUser = isUserMessage(msg); + const isOperator = isOperatorMessage(msg); const isDiscarded = msg.status === 'discarded' || msg.pipeline_id === PIPELINE_DISCARD; @@ -497,7 +695,9 @@ const BotSessionMonitor = forwardRef< 'max-w-3xl px-4 py-2.5 rounded-2xl text-sm', isUser ? 'bg-primary/10 rounded-br-sm' - : 'bg-muted rounded-bl-sm', + : isOperator + ? 'bg-orange-100/80 dark:bg-orange-900/30 rounded-bl-sm' + : 'bg-muted rounded-bl-sm', msg.status === 'error' && 'ring-1 ring-red-400/50', isDiscarded && 'opacity-60', )} @@ -509,14 +709,13 @@ const BotSessionMonitor = forwardRef< 'text-[11px] mt-1.5 flex items-center gap-1.5 text-muted-foreground', )} > - - {isUser - ? t('bots.sessionMonitor.userMessage', { - defaultValue: 'User', - }) - : t('bots.sessionMonitor.botMessage', { - defaultValue: 'Assistant', - })} + + {getMessageRoleLabel(msg)} {formatTime(msg.timestamp)} @@ -528,12 +727,21 @@ const BotSessionMonitor = forwardRef< defaultValue: 'Discarded', })} - ) : msg.pipeline_name ? ( + ) : msg.pipeline_name && + msg.pipeline_name !== 'Human Takeover' ? ( {msg.pipeline_name} ) : null} + {isOperator && ( + + + {t('bots.sessionMonitor.humanTakeover', { + defaultValue: 'Human Takeover', + })} + + )} {msg.status === 'error' && ( error )} @@ -551,6 +759,33 @@ const BotSessionMonitor = forwardRef< )} + + {/* Operator Message Input (only shown when session is taken over) */} + {isTakenOver && ( +
+
+ setOperatorMessage(e.target.value)} + onKeyDown={handleMessageKeyDown} + placeholder={t('bots.sessionMonitor.sendMessage', { + defaultValue: 'Send message as operator...', + })} + disabled={sendingMessage} + className="flex-1 h-9 px-3 rounded-md border bg-background text-sm placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-1 focus-visible:ring-ring disabled:opacity-50" + /> + +
+
+ )} )} diff --git a/web/src/app/infra/http/BackendClient.ts b/web/src/app/infra/http/BackendClient.ts index ecc0cce3..65c5c28a 100644 --- a/web/src/app/infra/http/BackendClient.ts +++ b/web/src/app/infra/http/BackendClient.ts @@ -1036,6 +1036,92 @@ export class BackendClient extends BaseHttpClient { return this.get(`/api/v1/monitoring/overview?${queryParams.toString()}`); } + // ============ Human Takeover API ============ + + public getHumanTakeoverSessions(params: { + botUuid?: string; + limit?: number; + offset?: number; + }): Promise<{ + sessions: Array<{ + id: string; + session_id: string; + bot_uuid: string; + status: string; + taken_by: string | null; + taken_at: string; + released_at: string | null; + platform: string | null; + user_id: string | null; + user_name: string | null; + }>; + total: number; + limit: number; + offset: number; + }> { + const queryParams = new URLSearchParams(); + if (params.botUuid) queryParams.append('botUuid', params.botUuid); + if (params.limit) queryParams.append('limit', params.limit.toString()); + if (params.offset) queryParams.append('offset', params.offset.toString()); + return this.get( + `/api/v1/human-takeover/sessions?${queryParams.toString()}`, + ); + } + + public getHumanTakeoverSessionDetail(sessionId: string): Promise<{ + found: boolean; + session_id?: string; + session?: { + id: string; + session_id: string; + bot_uuid: string; + status: string; + taken_by: string | null; + taken_at: string; + released_at: string | null; + platform: string | null; + user_id: string | null; + user_name: string | null; + }; + }> { + return this.get(`/api/v1/human-takeover/sessions/${sessionId}`); + } + + public takeoverSession( + sessionId: string, + params: { + bot_uuid: string; + platform?: string; + user_id?: string; + user_name?: string; + }, + ): Promise { + return this.post( + `/api/v1/human-takeover/sessions/${sessionId}/takeover`, + params, + ); + } + + public releaseSession(sessionId: string): Promise { + return this.post( + `/api/v1/human-takeover/sessions/${sessionId}/release`, + {}, + ); + } + + public sendTakeoverMessage( + sessionId: string, + message: string, + ): Promise<{ + session_id: string; + message_sent: boolean; + }> { + return this.post( + `/api/v1/human-takeover/sessions/${sessionId}/message`, + { message }, + ); + } + // ============ Survey API ============ public getSurveyPending(): Promise<{ survey: { diff --git a/web/src/i18n/locales/en-US.ts b/web/src/i18n/locales/en-US.ts index 192165f2..69191a71 100644 --- a/web/src/i18n/locales/en-US.ts +++ b/web/src/i18n/locales/en-US.ts @@ -391,6 +391,20 @@ const enUS = { discarded: 'Discarded', userMessage: 'User', botMessage: 'Assistant', + operatorMessage: 'Operator', + humanTakeover: 'Human Takeover', + takeoverBtn: 'Take Over', + releaseBtn: 'Release', + takeoverConfirm: 'Take over this session? The AI bot will stop responding until released.', + releaseConfirm: 'Release this session? The AI bot will resume responding.', + takeoverSuccess: 'Session taken over successfully', + releaseSuccess: 'Session released successfully', + takeoverFailed: 'Failed to take over session', + releaseFailed: 'Failed to release session', + sendMessage: 'Send message as operator...', + sendBtn: 'Send', + sendFailed: 'Failed to send message', + takenOver: 'Taken Over', }, }, plugins: { diff --git a/web/src/i18n/locales/es-ES.ts b/web/src/i18n/locales/es-ES.ts index 4a296276..25d7bc67 100644 --- a/web/src/i18n/locales/es-ES.ts +++ b/web/src/i18n/locales/es-ES.ts @@ -401,6 +401,20 @@ const esES = { discarded: 'Descartado', userMessage: 'Usuario', botMessage: 'Asistente', + operatorMessage: 'Operador', + humanTakeover: 'Toma de control humana', + takeoverBtn: 'Tomar control', + releaseBtn: 'Liberar', + takeoverConfirm: '¿Tomar control de esta sesión? El bot de IA dejará de responder hasta que se libere.', + releaseConfirm: '¿Liberar esta sesión? El bot de IA reanudará las respuestas.', + takeoverSuccess: 'Sesión tomada exitosamente', + releaseSuccess: 'Sesión liberada exitosamente', + takeoverFailed: 'Error al tomar control de la sesión', + releaseFailed: 'Error al liberar la sesión', + sendMessage: 'Enviar mensaje como operador...', + sendBtn: 'Enviar', + sendFailed: 'Error al enviar el mensaje', + takenOver: 'Tomada', }, }, plugins: { diff --git a/web/src/i18n/locales/ja-JP.ts b/web/src/i18n/locales/ja-JP.ts index 7153379b..de78f68d 100644 --- a/web/src/i18n/locales/ja-JP.ts +++ b/web/src/i18n/locales/ja-JP.ts @@ -392,6 +392,20 @@ discarded: '破棄済み', userMessage: 'ユーザー', botMessage: 'アシスタント', + operatorMessage: 'オペレーター', + humanTakeover: '有人対応', + takeoverBtn: '引き継ぐ', + releaseBtn: '解除', + takeoverConfirm: 'このセッションを引き継ぎますか?解除するまでAIボットは応答を停止します。', + releaseConfirm: 'このセッションを解除しますか?AIボットが応答を再開します。', + takeoverSuccess: 'セッションの引き継ぎに成功しました', + releaseSuccess: 'セッションの解除に成功しました', + takeoverFailed: 'セッションの引き継ぎに失敗しました', + releaseFailed: 'セッションの解除に失敗しました', + sendMessage: 'オペレーターとしてメッセージを送信...', + sendBtn: '送信', + sendFailed: 'メッセージの送信に失敗しました', + takenOver: '引き継ぎ中', }, }, plugins: { diff --git a/web/src/i18n/locales/th-TH.ts b/web/src/i18n/locales/th-TH.ts index 361facab..71d6cccd 100644 --- a/web/src/i18n/locales/th-TH.ts +++ b/web/src/i18n/locales/th-TH.ts @@ -386,6 +386,20 @@ const thTH = { discarded: 'ถูกละทิ้ง', userMessage: 'ผู้ใช้', botMessage: 'ผู้ช่วย', + operatorMessage: 'เจ้าหน้าที่', + humanTakeover: 'เจ้าหน้าที่รับช่วง', + takeoverBtn: 'รับช่วง', + releaseBtn: 'ปล่อย', + takeoverConfirm: 'รับช่วงเซสชันนี้หรือไม่? บอท AI จะหยุดตอบจนกว่าจะปล่อย', + releaseConfirm: 'ปล่อยเซสชันนี้หรือไม่? บอท AI จะกลับมาตอบอีกครั้ง', + takeoverSuccess: 'รับช่วงเซสชันสำเร็จ', + releaseSuccess: 'ปล่อยเซสชันสำเร็จ', + takeoverFailed: 'รับช่วงเซสชันล้มเหลว', + releaseFailed: 'ปล่อยเซสชันล้มเหลว', + sendMessage: 'ส่งข้อความในฐานะเจ้าหน้าที่...', + sendBtn: 'ส่ง', + sendFailed: 'ส่งข้อความล้มเหลว', + takenOver: 'ถูกรับช่วงแล้ว', }, }, plugins: { diff --git a/web/src/i18n/locales/vi-VN.ts b/web/src/i18n/locales/vi-VN.ts index 22bff3e4..d330ab52 100644 --- a/web/src/i18n/locales/vi-VN.ts +++ b/web/src/i18n/locales/vi-VN.ts @@ -395,6 +395,20 @@ const viVN = { discarded: 'Đã loại bỏ', userMessage: 'Người dùng', botMessage: 'Trợ lý', + operatorMessage: 'Nhân viên', + humanTakeover: 'Tiếp nhận thủ công', + takeoverBtn: 'Tiếp nhận', + releaseBtn: 'Giải phóng', + takeoverConfirm: 'Tiếp nhận phiên này? Bot AI sẽ ngừng phản hồi cho đến khi được giải phóng.', + releaseConfirm: 'Giải phóng phiên này? Bot AI sẽ tiếp tục phản hồi.', + takeoverSuccess: 'Tiếp nhận phiên thành công', + releaseSuccess: 'Giải phóng phiên thành công', + takeoverFailed: 'Tiếp nhận phiên thất bại', + releaseFailed: 'Giải phóng phiên thất bại', + sendMessage: 'Gửi tin nhắn với tư cách nhân viên...', + sendBtn: 'Gửi', + sendFailed: 'Gửi tin nhắn thất bại', + takenOver: 'Đã tiếp nhận', }, }, plugins: { diff --git a/web/src/i18n/locales/zh-Hans.ts b/web/src/i18n/locales/zh-Hans.ts index 7822bcfa..2950b232 100644 --- a/web/src/i18n/locales/zh-Hans.ts +++ b/web/src/i18n/locales/zh-Hans.ts @@ -376,6 +376,20 @@ const zhHans = { discarded: '已丢弃', userMessage: '用户', botMessage: '助手', + operatorMessage: '人工客服', + humanTakeover: '人工接管', + takeoverBtn: '接管', + releaseBtn: '释放', + takeoverConfirm: '确定接管此会话?AI 机器人将停止回复,直到释放。', + releaseConfirm: '确定释放此会话?AI 机器人将恢复回复。', + takeoverSuccess: '会话接管成功', + releaseSuccess: '会话释放成功', + takeoverFailed: '会话接管失败', + releaseFailed: '会话释放失败', + sendMessage: '以人工客服身份发送消息...', + sendBtn: '发送', + sendFailed: '消息发送失败', + takenOver: '已接管', }, }, plugins: { diff --git a/web/src/i18n/locales/zh-Hant.ts b/web/src/i18n/locales/zh-Hant.ts index e100dbd1..87ba5939 100644 --- a/web/src/i18n/locales/zh-Hant.ts +++ b/web/src/i18n/locales/zh-Hant.ts @@ -371,6 +371,20 @@ const zhHant = { discarded: '已丟棄', userMessage: '使用者', botMessage: '助手', + operatorMessage: '人工客服', + humanTakeover: '人工接管', + takeoverBtn: '接管', + releaseBtn: '釋放', + takeoverConfirm: '確定接管此會話?AI 機器人將停止回覆,直到釋放。', + releaseConfirm: '確定釋放此會話?AI 機器人將恢復回覆。', + takeoverSuccess: '會話接管成功', + releaseSuccess: '會話釋放成功', + takeoverFailed: '會話接管失敗', + releaseFailed: '會話釋放失敗', + sendMessage: '以人工客服身份發送訊息...', + sendBtn: '發送', + sendFailed: '訊息發送失敗', + takenOver: '已接管', }, }, plugins: {