From 8db23bf95084d6bf84caeb4480537ff29d8e2d21 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Sat, 23 May 2026 16:07:46 +0800 Subject: [PATCH] feat(agent-runner): add event-first context facts and pull APIs Add EventLog and Transcript persistence entities for storing auditable event facts and conversation history projection. Implement event-first AgentRunContext builder that produces Protocol v1 compliant context payloads with required fields: event, delivery, context (ContextAccess). Key changes: - EventLog ORM: auditable event records with indexes - Transcript ORM: conversation history projection with composite indexes - AgentRunContextBuilder: Protocol v1 payload with delivery, context, bootstrap - EventLogStore/TranscriptStore: async stores for fact sources - Host action handlers: HISTORY_PAGE, HISTORY_SEARCH, EVENT_GET, EVENT_PAGE - Context validation: build_context output validates via SDK AgentRunContext - Alembic migration for event_log and transcript tables - Alembic env.py imports all ORM models for autogenerate discovery Legacy compatibility: max-round messages go into bootstrap.messages and compatibility.legacy_messages, not top-level messages field. --- .../pkg/agent/runner/context_builder.py | 420 +++++++++++-- .../pkg/agent/runner/event_log_store.py | 253 ++++++++ src/langbot/pkg/agent/runner/host_models.py | 171 ++++++ src/langbot/pkg/agent/runner/orchestrator.py | 314 +++++++++- .../agent/runner/pipeline_compat_adapter.py | 579 ++++++++++++++++++ .../pkg/agent/runner/resource_builder.py | 167 +++++ .../pkg/agent/runner/session_registry.py | 5 + .../pkg/agent/runner/transcript_store.py | 288 +++++++++ .../pkg/entity/persistence/event_log.py | 85 +++ .../pkg/entity/persistence/transcript.py | 72 +++ src/langbot/pkg/persistence/alembic/env.py | 20 + ...a81_add_event_log_and_transcript_tables.py | 102 +++ src/langbot/pkg/plugin/handler.py | 263 ++++++++ .../test_context_builder_params_state.py | 25 +- .../agent/test_context_validation.py | 232 +++++++ .../agent/test_event_first_protocol.py | 431 +++++++++++++ .../agent/test_event_log_transcript.py | 324 ++++++++++ .../agent/test_orchestrator_integration.py | 14 +- 18 files changed, 3705 insertions(+), 60 deletions(-) create mode 100644 src/langbot/pkg/agent/runner/event_log_store.py create mode 100644 src/langbot/pkg/agent/runner/host_models.py create mode 100644 src/langbot/pkg/agent/runner/pipeline_compat_adapter.py create mode 100644 src/langbot/pkg/agent/runner/transcript_store.py create mode 100644 src/langbot/pkg/entity/persistence/event_log.py create mode 100644 src/langbot/pkg/entity/persistence/transcript.py create mode 100644 src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py create mode 100644 tests/unit_tests/agent/test_context_validation.py create mode 100644 tests/unit_tests/agent/test_event_first_protocol.py create mode 100644 tests/unit_tests/agent/test_event_log_transcript.py diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index 3ee8c65d..95fb2606 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -14,6 +14,8 @@ from .config_migration import ConfigMigration from .context_packager import AgentContextPackager from .state_store import get_state_store from . import events as runner_events +from .host_models import AgentEventEnvelope, AgentBinding +from .pipeline_compat_adapter import PipelineCompatAdapter DEFAULT_RUNNER_TIMEOUT_SECONDS = 300 @@ -117,39 +119,45 @@ class AgentRuntimeContext(typing.TypedDict): class AgentRunContextPayload(typing.TypedDict): """AgentRunContext payload passed to an agent runner. + Protocol v1 structure - matches SDK AgentRunContext. + Note: The 'config' field contains the binding config from ai.runner_config[runner_id], which is Pipeline's configuration for this specific runner binding (not plugin instance config). """ run_id: str trigger: AgentTrigger conversation: ConversationContext | None - event: dict[str, typing.Any] | None + event: dict[str, typing.Any] # REQUIRED for Protocol v1 actor: dict[str, typing.Any] | None subject: dict[str, typing.Any] | None - messages: list[dict[str, typing.Any]] - prompt: list[dict[str, typing.Any]] input: AgentInput - params: dict[str, typing.Any] + delivery: dict[str, typing.Any] # REQUIRED for Protocol v1 resources: AgentResources + context: dict[str, typing.Any] # ContextAccess - REQUIRED for Protocol v1 state: AgentRunState runtime: AgentRuntimeContext config: dict[str, typing.Any] # Binding config from ai.runner_config[runner_id] + bootstrap: dict[str, typing.Any] | None # Optional bootstrap context + compatibility: dict[str, typing.Any] | None # Legacy compatibility context + metadata: dict[str, typing.Any] # Additional metadata class AgentRunContextBuilder: - """Builder for provisioning AgentRunContext from a Pipeline Query. + """Builder for provisioning AgentRunContext. + + Two entry points: + - build_context_from_event(event, binding): Event-first Protocol v1 + - build_context(query, descriptor, resources): Legacy Query-based (calls event-based internally) Responsibilities: - Generate new run_id (UUID, not query id) - - Set trigger type to 'message.received' for pipeline - - Build conversation context from session - - Package and convert messages to SDK format - - Build input from user_message and message_chain - - Build params from query.variables with filtering + - Set trigger type based on source + - Build conversation context from session/event + - Build input from user_message/event + - Build params with filtering - Build state snapshot from state_store - - Set resources from AgentResourceBuilder result - Build runtime context with host info, trace_id, deadline - - Set config from runner binding configuration (ai.runner_config[runner_id]) + - Set config from runner binding configuration """ ap: app.Application @@ -168,6 +176,156 @@ class AgentRunContextBuilder: self.ap = ap self.context_packager = AgentContextPackager() + async def build_context_from_event( + self, + event: AgentEventEnvelope, + binding: AgentBinding, + descriptor: AgentRunnerDescriptor, + resources: AgentResources, + ) -> AgentRunContextPayload: + """Build AgentRunContext from event-first envelope. + + This is the main entry point for Protocol v1. + Does NOT inline full history by default. + + Args: + event: Event envelope + binding: Agent binding configuration + descriptor: Runner descriptor + resources: Built resources + + Returns: + AgentRunContextPayload for the runner + """ + # Generate new run_id + run_id = str(uuid.uuid4()) + + # Build trigger from event + trigger: AgentTrigger = { + 'type': event.event_type, + 'source': event.source, + 'timestamp': event.event_time or int(time.time()), + } + + # Build conversation context from event + conversation: ConversationContext | None = None + if event.conversation_id: + conversation = { + 'session_id': None, # Legacy field + 'conversation_id': event.conversation_id, + 'thread_id': event.thread_id, + 'launcher_type': None, # Will be filled from actor/subject if needed + 'launcher_id': None, + 'sender_id': event.actor.actor_id if event.actor else None, + 'bot_uuid': event.bot_id, + 'pipeline_uuid': binding.pipeline_uuid, # Legacy + } + + # Build event context (Protocol v1 event-first) + event_context = { + 'event_id': event.event_id, + 'event_type': event.event_type, + 'event_time': event.event_time, + 'source': event.source, + 'source_event_type': None, + 'data': {}, + } + + # Build actor context + actor_context = None + if event.actor: + actor_context = { + 'actor_type': event.actor.actor_type, + 'actor_id': event.actor.actor_id, + 'actor_name': event.actor.actor_name, + } + + # Build subject context + subject_context = None + if event.subject: + subject_context = { + 'subject_type': event.subject.subject_type, + 'subject_id': event.subject.subject_id, + 'subject_data': event.subject.data, + } + + # Build input from event + input: AgentInput = { + 'text': event.input.text, + 'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents], + 'message_chain': event.input.message_chain, + 'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments], + } + + # Build context access (no history inlined by default for Protocol v1) + # Populate with actual values from stores + context_access = await self._build_context_access(event, descriptor) + + # Build state snapshot (for event-first, we need event-based scope key) + # For now, return empty state - will be implemented with state_store migration + state: AgentRunState = { + 'conversation': {}, + 'actor': {}, + 'subject': {}, + 'runner': {}, + } + + # Build runtime context + runtime: AgentRuntimeContext = { + 'langbot_version': self.ap.ver_mgr.get_current_version(), + 'sdk_protocol_version': descriptor.protocol_version, + 'query_id': None, # No query_id in event-first mode + 'trace_id': run_id, + 'deadline_at': self._build_deadline_from_binding(binding), + 'metadata': { + 'bot_id': event.bot_id, + 'workspace_id': event.workspace_id, + 'streaming_supported': event.delivery.supports_streaming, + }, + } + + # Build delivery context + delivery_context = { + 'surface': event.delivery.surface, + 'reply_target': event.delivery.reply_target, + 'supports_streaming': event.delivery.supports_streaming, + 'supports_edit': event.delivery.supports_edit, + 'supports_reaction': event.delivery.supports_reaction, + 'max_message_size': event.delivery.max_message_size, + 'platform_capabilities': event.delivery.platform_capabilities, + } + + # Build compatibility context (empty for event-first) + compatibility_context = { + 'query_id': None, + 'pipeline_uuid': binding.pipeline_uuid, + 'max_round': binding.max_round, # For reference only + 'legacy_messages': [], + 'extra': {}, + } + + # Build full context - Protocol v1 structure + context: AgentRunContextPayload = { + 'run_id': run_id, + 'trigger': trigger, + 'conversation': conversation, + 'event': event_context, # REQUIRED + 'actor': actor_context, + 'subject': subject_context, + 'input': input, + 'delivery': delivery_context, # REQUIRED + 'resources': resources, + 'context': context_access, # ContextAccess - REQUIRED + 'state': state, + 'runtime': runtime, + 'config': binding.runner_config, + 'bootstrap': None, # Optional - no messages inlined by default + 'compatibility': compatibility_context, + 'metadata': {}, # Additional metadata + } + + return context + async def build_context( self, query: pipeline_query.Query, @@ -176,6 +334,13 @@ class AgentRunContextBuilder: ) -> AgentRunContextPayload: """Build AgentRunContext envelope from Query. + This is a compatibility wrapper that converts Query to event + binding + and delegates to build_context_from_event(). + + For Protocol v1, messages are NOT inlined by default. + Legacy max-round only affects bootstrap (via compatibility adapter), + NOT Protocol v1 entities. + Args: query: Pipeline query descriptor: Runner descriptor @@ -184,8 +349,19 @@ class AgentRunContextBuilder: Returns: AgentRunContext payload for the plugin runner """ - # Generate new run_id - run_id = str(uuid.uuid4()) + # Resolve runner config for binding + runner_id = descriptor.id + runner_config = ConfigMigration.resolve_runner_config( + query.pipeline_config, + runner_id, + ) + + # Extract max_round for compatibility (NOT Protocol v1) + # Note: config uses 'max-round' with hyphen, not 'max_round' + max_round = runner_config.get('max-round') + if max_round is None: + ai_config = query.pipeline_config.get('ai', {}) if query.pipeline_config else {} + max_round = ai_config.get('max-round') # Build trigger trigger: AgentTrigger = { @@ -196,32 +372,21 @@ class AgentRunContextBuilder: # Build conversation context conversation: ConversationContext | None = None - if query.session: + session = getattr(query, 'session', None) + if session: conversation = { - 'session_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}', - 'conversation_id': getattr(query.session.using_conversation, 'uuid', None), - 'launcher_type': query.session.launcher_type.value, - 'launcher_id': query.session.launcher_id, - 'sender_id': str(query.sender_id), - 'bot_uuid': query.bot_uuid, - 'pipeline_uuid': query.pipeline_uuid, + 'session_id': f'{getattr(session, "launcher_type", "").value if hasattr(getattr(session, "launcher_type", ""), "value") else getattr(session, "launcher_type", "")}_{getattr(session, "launcher_id", "")}', + 'conversation_id': getattr(getattr(session, 'using_conversation', None), 'uuid', None), + 'launcher_type': getattr(session, 'launcher_type', None).value if hasattr(getattr(session, 'launcher_type', None), 'value') else getattr(session, 'launcher_type', None), + 'launcher_id': getattr(session, 'launcher_id', None), + 'sender_id': str(getattr(query, 'sender_id', '')) if getattr(query, 'sender_id', None) else None, + 'bot_uuid': getattr(query, 'bot_uuid', None), + 'pipeline_uuid': getattr(query, 'pipeline_uuid', None), } - # Get runner binding config from ai.runner_config[runner_id] - # This is Pipeline's configuration for this specific runner binding, - # passed through AgentRunContext.config to the runner - runner_config = ConfigMigration.resolve_runner_config( - query.pipeline_config, - descriptor.id, - ) - # Build input input: AgentInput = self._build_input(query) - # Build bounded working context window for the runner. - packaged_context = self.context_packager.package_messages(query, runner_config) - messages = self._build_messages(packaged_context.messages) - # Build params from query.variables with filtering params = self._build_params(query) @@ -230,9 +395,10 @@ class AgentRunContextBuilder: state: AgentRunState = state_store.build_snapshot(query, descriptor) streaming_supported = await self._is_stream_output_supported(query) - remove_think = query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) + remove_think = query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) if query.pipeline_config else False # Build runtime context + run_id = str(uuid.uuid4()) runtime: AgentRuntimeContext = { 'langbot_version': self.ap.ver_mgr.get_current_version(), 'sdk_protocol_version': descriptor.protocol_version, @@ -240,33 +406,104 @@ class AgentRunContextBuilder: 'trace_id': run_id, # Use run_id as trace_id for now 'deadline_at': self._build_deadline(runner_config), 'metadata': { - 'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'), - 'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'), + 'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown') if query.variables else 'Unknown', + 'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown') if query.variables else 'Unknown', 'streaming_supported': streaming_supported, 'remove_think': remove_think, - 'context_packaging': { - 'policy': packaged_context.policy, - 'history': packaged_context.history, - }, }, } - # Build full context + # Build delivery context from query adapter capabilities + delivery_context = { + 'surface': 'pipeline', # Legacy pipeline surface + 'reply_target': None, + 'supports_streaming': streaming_supported, + 'supports_edit': False, + 'supports_reaction': False, + 'max_message_size': None, + 'platform_capabilities': {}, + } + + # Build context access (for legacy, minimal API availability) + context_access = { + 'conversation_id': conversation.get('conversation_id') if conversation else None, + 'thread_id': None, + 'latest_cursor': None, + 'event_seq': None, + 'transcript_seq': None, + 'has_history_before': False, + 'inline_policy': { + 'mode': 'current_event', + 'delivered_count': 0, + 'source_total_count': None, + 'messages_complete': False, + 'reason': 'legacy_pipeline', + }, + 'available_apis': { + 'history_page': False, + 'history_search': False, + 'event_get': False, + 'event_page': False, + 'artifact_metadata': False, + 'artifact_read': False, + 'state': True, + 'storage': True, + }, + } + + # Build compatibility context (for legacy Query/Pipeline fields) + compatibility_context = { + 'query_id': query.query_id, + 'pipeline_uuid': getattr(query, 'pipeline_uuid', None), + 'max_round': max_round, # For reference only + 'legacy_messages': [], # Will be filled if max_round is set + 'extra': { + 'params': params, # Put params in compatibility.extra + 'prompt': self._build_prompt(query), # Put prompt in compatibility.extra + }, + } + + # Build bootstrap context (optional, for legacy max-round) + bootstrap_context = None + + # For legacy compatibility: add bootstrap messages if max_round is set + # This goes into bootstrap.messages, NOT top-level messages + if max_round and max_round > 0: + packaged_context = self.context_packager.package_messages(query, runner_config) + legacy_messages = self._build_messages(packaged_context.messages) + # Put in bootstrap for Protocol v1 + bootstrap_context = { + 'messages': legacy_messages, + 'summary': None, + 'artifacts': [], + 'metadata': {}, + } + # Also update compatibility for legacy runners + compatibility_context['legacy_messages'] = legacy_messages + # Update runtime metadata + runtime['metadata']['context_packaging'] = { + 'policy': packaged_context.policy, + 'history': packaged_context.history, + } + + # Build full context - Protocol v1 structure context: AgentRunContextPayload = { 'run_id': run_id, 'trigger': trigger, 'conversation': conversation, - 'event': self._build_event(query), + 'event': self._build_event(query), # REQUIRED 'actor': self._build_actor(query), 'subject': self._build_subject(query), - 'messages': messages, - 'prompt': self._build_prompt(query), 'input': input, - 'params': params, + 'delivery': delivery_context, # REQUIRED 'resources': resources, + 'context': context_access, # ContextAccess - REQUIRED 'state': state, 'runtime': runtime, 'config': runner_config, + 'bootstrap': bootstrap_context, # Optional bootstrap + 'compatibility': compatibility_context, # Legacy compatibility + 'metadata': {}, # Additional metadata } return context @@ -510,6 +747,29 @@ class AgentRunContextBuilder: return time.time() + timeout_seconds + def _build_deadline_from_binding(self, binding: AgentBinding) -> float | None: + """Build deadline timestamp from binding timeout config. + + Args: + binding: Agent binding with runner_config + + Returns: + Deadline timestamp or None + """ + timeout = binding.runner_config.get('timeout', DEFAULT_RUNNER_TIMEOUT_SECONDS) + if timeout is None: + return None + + try: + timeout_seconds = float(timeout) + except (TypeError, ValueError): + return None + + if timeout_seconds <= 0: + return None + + return time.time() + timeout_seconds + async def _is_stream_output_supported(self, query: pipeline_query.Query) -> bool: """Check whether the current adapter can consume streaming chunks.""" try: @@ -609,3 +869,71 @@ class AgentRunContextBuilder: # Pydantic models and other complex types are not directly serializable # as params (they may have internal structure not meant for runners) return False + + async def _build_context_access( + self, + event: AgentEventEnvelope, + descriptor: AgentRunnerDescriptor, + ) -> dict[str, typing.Any]: + """Build ContextAccess with actual values from stores. + + Args: + event: Event envelope + descriptor: Runner descriptor + + Returns: + ContextAccess dict + """ + conversation_id = event.conversation_id + + # Check if history APIs are available for this runner + # Based on runner permissions + permissions = descriptor.permissions or {} + history_permissions = permissions.get('history', []) + event_permissions = permissions.get('events', []) + + history_page_enabled = 'page' in history_permissions and conversation_id is not None + history_search_enabled = 'search' in history_permissions and conversation_id is not None + event_get_enabled = 'get' in event_permissions + event_page_enabled = 'page' in event_permissions and conversation_id is not None + + # Get latest cursor and has_history_before if conversation exists + latest_cursor = None + has_history_before = False + + if conversation_id: + try: + from .transcript_store import TranscriptStore + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + latest_cursor = await store.get_latest_cursor(conversation_id) + if latest_cursor: + has_history_before = True + except Exception as e: + self.ap.logger.warning(f'Failed to get transcript cursor: {e}') + + return { + 'conversation_id': conversation_id, + 'thread_id': event.thread_id, + 'latest_cursor': latest_cursor, + 'event_seq': None, # Will be populated when EventLog is written + 'transcript_seq': int(latest_cursor) if latest_cursor else None, + 'has_history_before': has_history_before, + 'inline_policy': { + 'mode': 'current_event', + 'delivered_count': 0, + 'source_total_count': None, + 'messages_complete': False, + 'reason': 'self_managed_context', + }, + 'available_apis': { + 'history_page': history_page_enabled, + 'history_search': history_search_enabled, + 'event_get': event_get_enabled, + 'event_page': event_page_enabled, + 'artifact_metadata': False, # TODO: Implement artifact store + 'artifact_read': False, + 'state': True, + 'storage': True, + }, + } diff --git a/src/langbot/pkg/agent/runner/event_log_store.py b/src/langbot/pkg/agent/runner/event_log_store.py new file mode 100644 index 00000000..6eb08361 --- /dev/null +++ b/src/langbot/pkg/agent/runner/event_log_store.py @@ -0,0 +1,253 @@ +"""EventLog store for writing and querying event records.""" +from __future__ import annotations + +import json +import datetime +import typing +import uuid + +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncEngine + +from ...entity.persistence.event_log import EventLog +from ...entity.persistence.transcript import Transcript + + +class EventLogStore: + """Store for EventLog records. + + Handles writing events to the event log and querying them. + All methods are async and use the provided database engine. + """ + + engine: AsyncEngine + + # Hard limits + MAX_INPUT_SUMMARY_LENGTH = 1000 + + def __init__(self, engine: AsyncEngine): + self.engine = engine + + async def append_event( + self, + event_id: str | None, + event_type: str, + source: str, + bot_id: str | None = None, + workspace_id: str | None = None, + conversation_id: str | None = None, + thread_id: str | None = None, + actor_type: str | None = None, + actor_id: str | None = None, + actor_name: str | None = None, + subject_type: str | None = None, + subject_id: str | None = None, + input_summary: str | None = None, + input_json: dict[str, typing.Any] | None = None, + raw_ref: str | None = None, + run_id: str | None = None, + runner_id: str | None = None, + event_time: datetime.datetime | None = None, + metadata: dict[str, typing.Any] | None = None, + ) -> str: + """Append an event to the event log. + + Args: + event_id: Unique event ID (generated if None) + event_type: Event type + source: Event source + bot_id: Bot UUID + workspace_id: Workspace ID + conversation_id: Conversation ID + thread_id: Thread ID + actor_type: Actor type + actor_id: Actor ID + actor_name: Actor display name + subject_type: Subject type + subject_id: Subject ID + input_summary: Brief input summary + input_json: Full input JSON + raw_ref: Reference to raw event payload + run_id: Run ID processing this event + runner_id: Runner ID processing this event + event_time: When the event occurred + metadata: Additional metadata + + Returns: + The event_id + """ + if event_id is None: + event_id = str(uuid.uuid4()) + + # Truncate input summary if too long + if input_summary and len(input_summary) > self.MAX_INPUT_SUMMARY_LENGTH: + input_summary = input_summary[:self.MAX_INPUT_SUMMARY_LENGTH - 3] + "..." + + async with self.engine.connect() as conn: + await conn.execute( + sqlalchemy.insert(EventLog).values( + event_id=event_id, + event_type=event_type, + event_time=event_time, + source=source, + bot_id=bot_id, + workspace_id=workspace_id, + conversation_id=conversation_id, + thread_id=thread_id, + actor_type=actor_type, + actor_id=actor_id, + actor_name=actor_name, + subject_type=subject_type, + subject_id=subject_id, + input_summary=input_summary, + input_json=json.dumps(input_json) if input_json else None, + raw_ref=raw_ref, + run_id=run_id, + runner_id=runner_id, + metadata_json=json.dumps(metadata) if metadata else None, + created_at=datetime.datetime.utcnow(), + ) + ) + await conn.commit() + + return event_id + + async def get_event( + self, + event_id: str, + ) -> dict[str, typing.Any] | None: + """Get a single event by ID. + + Args: + event_id: Event ID + + Returns: + Event record as dict, or None if not found + """ + async with self.engine.connect() as conn: + result = await conn.execute( + sqlalchemy.select(EventLog).where(EventLog.event_id == event_id) + ) + row = result.fetchone() + if row is None: + return None + return self._row_to_dict(row[0]) + + async def page_events( + self, + conversation_id: str | None = None, + event_types: list[str] | None = None, + before_seq: int | None = None, + limit: int = 50, + ) -> tuple[list[dict[str, typing.Any]], int | None, bool]: + """Page through event records. + + Args: + conversation_id: Filter by conversation ID + event_types: Filter by event types + before_seq: Get events before this sequence number + limit: Maximum items to return (capped at 100) + + Returns: + Tuple of (items, next_seq, has_more) + """ + limit = min(limit, 100) # Hard cap + + async with self.engine.connect() as conn: + query = sqlalchemy.select(EventLog) + + if conversation_id is not None: + query = query.where(EventLog.conversation_id == conversation_id) + + if event_types: + query = query.where(EventLog.event_type.in_(event_types)) + + if before_seq is not None: + query = query.where(EventLog.id < before_seq) + + query = query.order_by(EventLog.id.desc()).limit(limit + 1) + + result = await conn.execute(query) + rows = result.fetchall() + + items = [self._row_to_dict(row[0]) for row in rows[:limit]] + has_more = len(rows) > limit + next_seq = items[-1]['id'] if items and has_more else None + + return items, next_seq, has_more + + async def get_latest_cursor( + self, + conversation_id: str, + ) -> str | None: + """Get the latest cursor for a conversation. + + Args: + conversation_id: Conversation ID + + Returns: + Cursor string (seq number), or None if no events + """ + async with self.engine.connect() as conn: + result = await conn.execute( + sqlalchemy.select(EventLog.id) + .where(EventLog.conversation_id == conversation_id) + .order_by(EventLog.id.desc()) + .limit(1) + ) + row = result.fetchone() + if row is None: + return None + return str(row[0]) + + async def has_events_before( + self, + conversation_id: str, + seq: int, + ) -> bool: + """Check if there are events before a sequence number. + + Args: + conversation_id: Conversation ID + seq: Sequence number + + Returns: + True if there are events before + """ + async with self.engine.connect() as conn: + result = await conn.execute( + sqlalchemy.select(sqlalchemy.func.count()) + .select_from(EventLog) + .where( + EventLog.conversation_id == conversation_id, + EventLog.id < seq, + ) + ) + count = result.scalar() + return count > 0 + + def _row_to_dict(self, row: EventLog) -> dict[str, typing.Any]: + """Convert an EventLog row to dict.""" + return { + 'id': row.id, + 'event_id': row.event_id, + 'event_type': row.event_type, + 'event_time': int(row.event_time.timestamp()) if row.event_time else None, + 'source': row.source, + 'bot_id': row.bot_id, + 'workspace_id': row.workspace_id, + 'conversation_id': row.conversation_id, + 'thread_id': row.thread_id, + 'actor_type': row.actor_type, + 'actor_id': row.actor_id, + 'actor_name': row.actor_name, + 'subject_type': row.subject_type, + 'subject_id': row.subject_id, + 'input_summary': row.input_summary, + 'input_json': json.loads(row.input_json) if row.input_json else None, + 'raw_ref': row.raw_ref, + 'run_id': row.run_id, + 'runner_id': row.runner_id, + 'created_at': int(row.created_at.timestamp()) if row.created_at else None, + 'metadata': json.loads(row.metadata_json) if row.metadata_json else {}, + } diff --git a/src/langbot/pkg/agent/runner/host_models.py b/src/langbot/pkg/agent/runner/host_models.py new file mode 100644 index 00000000..8cc2484f --- /dev/null +++ b/src/langbot/pkg/agent/runner/host_models.py @@ -0,0 +1,171 @@ +"""Agent event envelope and binding models for LangBot Host. + +These are Host-internal models, not exposed to SDK. +""" +from __future__ import annotations + +import typing +import pydantic + +from langbot_plugin.api.entities.builtin.agent_runner.event import ( + AgentEventContext, + ConversationContext, + ActorContext, + SubjectContext, + RawEventRef, +) +from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput +from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + +class AgentEventEnvelope(pydantic.BaseModel): + """Event envelope for LangBot Host event gateway. + + This is the unified input model that replaces Query-first approach. + IM / WebUI / API / EventRouter all produce this envelope. + """ + + event_id: str + """Unique event identifier.""" + + event_type: str + """Event type (message.received, message.recalled, etc.).""" + + event_time: int | None = None + """Event timestamp (epoch seconds).""" + + source: str + """Event source (platform, webui, api, scheduler, system).""" + + bot_id: str | None = None + """Bot UUID handling this event.""" + + workspace_id: str | None = None + """Workspace ID (for multi-tenant).""" + + conversation_id: str | None = None + """Conversation ID.""" + + thread_id: str | None = None + """Thread ID (for platforms supporting threads).""" + + actor: ActorContext | None = None + """Actor (who triggered the event).""" + + subject: SubjectContext | None = None + """Subject (what the event is about).""" + + input: AgentInput + """Event input.""" + + delivery: DeliveryContext + """Delivery context.""" + + raw_ref: RawEventRef | None = None + """Reference to raw event payload.""" + + +# Binding scope types +class BindingScope(pydantic.BaseModel): + """Scope for agent binding.""" + + scope_type: typing.Literal["bot", "pipeline", "workspace", "global"] = "pipeline" + """Scope type.""" + + scope_id: str | None = None + """Scope identifier (bot_uuid, pipeline_uuid, etc.).""" + + +class ResourcePolicy(pydantic.BaseModel): + """Resource policy for agent binding. + + Controls what resources the runner can access. + """ + + allowed_model_uuids: list[str] | None = None + """Allowed model UUIDs. None means all authorized.""" + + allowed_tool_names: list[str] | None = None + """Allowed tool names. None means all authorized.""" + + allowed_kb_uuids: list[str] | None = None + """Allowed knowledge base UUIDs. None means all authorized.""" + + allow_plugin_storage: bool = True + """Whether plugin storage is allowed.""" + + allow_workspace_storage: bool = False + """Whether workspace storage is allowed.""" + + +class StatePolicy(pydantic.BaseModel): + """State policy for agent binding. + + Controls state management behavior. + """ + + enable_state: bool = True + """Whether host-owned state is enabled.""" + + state_scopes: list[typing.Literal["conversation", "actor", "subject", "runner"]] = ( + pydantic.Field(default_factory=lambda: ["conversation", "actor"]) + ) + """Enabled state scopes.""" + + +class DeliveryPolicy(pydantic.BaseModel): + """Delivery policy for agent binding. + + Controls how results are delivered. + """ + + enable_streaming: bool = True + """Whether streaming output is enabled.""" + + enable_reply: bool = True + """Whether reply is enabled.""" + + max_message_size: int | None = None + """Maximum message size.""" + + +class AgentBinding(pydantic.BaseModel): + """Binding configuration for mapping events to runners. + + This is Host-internal model for event-to-runner binding. + It replaces the old Pipeline runner config role. + """ + + binding_id: str + """Unique binding identifier.""" + + scope: BindingScope = pydantic.Field(default_factory=BindingScope) + """Binding scope.""" + + event_types: list[str] = pydantic.Field(default_factory=lambda: ["message.received"]) + """Event types this binding handles.""" + + runner_id: str + """Runner ID to invoke.""" + + runner_config: dict[str, typing.Any] = pydantic.Field(default_factory=dict) + """Runner instance configuration.""" + + resource_policy: ResourcePolicy = pydantic.Field(default_factory=ResourcePolicy) + """Resource policy.""" + + state_policy: StatePolicy = pydantic.Field(default_factory=StatePolicy) + """State policy.""" + + delivery_policy: DeliveryPolicy = pydantic.Field(default_factory=DeliveryPolicy) + """Delivery policy.""" + + enabled: bool = True + """Whether binding is enabled.""" + + # Legacy fields for compatibility adapter + pipeline_uuid: str | None = None + """Legacy pipeline UUID (for compatibility).""" + + max_round: int | None = None + """Legacy max-round (for compatibility adapter, not Protocol v1).""" diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index c3f522d9..cf0e1d83 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -19,6 +19,8 @@ from .result_normalizer import AgentResultNormalizer from .state_store import get_state_store, RunnerScopedStateStore from .session_registry import get_session_registry, AgentRunSessionRegistry from .config_migration import ConfigMigration +from .host_models import AgentEventEnvelope, AgentBinding +from .pipeline_compat_adapter import PipelineCompatAdapter from .errors import ( RunnerNotFoundError, RunnerExecutionError, @@ -38,7 +40,9 @@ class AgentRunOrchestrator: - Handle errors, timeouts, protocol errors - Maintain streaming card behavior - This is the main entry point for ChatMessageHandler. + Entry points: + - run(event, binding): Main entry for event-first Protocol v1 + - run_from_query(query): Compatibility wrapper for Pipeline """ ap: app.Application @@ -69,13 +73,113 @@ class AgentRunOrchestrator: self._session_registry = get_session_registry() self._state_store = get_state_store() + async def run( + self, + event: AgentEventEnvelope, + binding: AgentBinding, + ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: + """Run agent runner from event-first envelope. + + This is the main entry point for Protocol v1. + Event Gateway -> AgentBindingResolver -> run(event, binding). + + Args: + event: Event envelope from event gateway + binding: Agent binding configuration + + Yields: + Message or MessageChunk for pipeline response + + Raises: + RunnerNotFoundError: If runner not found + RunnerNotAuthorizedError: If runner not authorized + RunnerExecutionError: If runner execution failed + """ + runner_id = binding.runner_id + + # Get runner descriptor + # TODO: Get bound plugins from binding when fully migrated + bound_plugins = None # Will be resolved from binding.scope in future + descriptor = await self.registry.get(runner_id, bound_plugins) + + # Build resources from binding + resources = await self.resource_builder.build_resources_from_binding( + event=event, + binding=binding, + descriptor=descriptor, + ) + + # Build context from event + binding + context = await self.context_builder.build_context_from_event( + event=event, + binding=binding, + descriptor=descriptor, + resources=resources, + ) + + # Register session for proxy action permission validation + run_id = context['run_id'] + await self._session_registry.register( + run_id=run_id, + runner_id=descriptor.id, + query_id=None, # No query_id in event-first mode + plugin_identity=descriptor.get_plugin_id(), + resources=resources, + conversation_id=event.conversation_id, + ) + + # Write incoming event to EventLog + event_log_id = await self._write_event_log( + event=event, + binding=binding, + run_id=run_id, + runner_id=descriptor.id, + ) + + # Write user message to Transcript if message.received + if event.event_type == 'message.received' and event.conversation_id: + await self._write_user_transcript( + event=event, + event_log_id=event_log_id, + ) + + try: + # Run via plugin connector + async for result_dict in self._invoke_runner(descriptor, context): + # Handle state.updated first - consume before normalizer + if result_dict.get('type') == 'state.updated': + self._handle_state_updated_event(result_dict, event, descriptor) + # Pass to normalizer for logging, but don't yield to pipeline + await self.result_normalizer.normalize(result_dict, descriptor) + continue + + # Handle message.completed - write to Transcript + if result_dict.get('type') == 'message.completed' and event.conversation_id: + await self._write_assistant_transcript( + result_dict=result_dict, + event=event, + run_id=run_id, + runner_id=descriptor.id, + ) + + # Normalize result for other types + result = await self.result_normalizer.normalize(result_dict, descriptor) + if result is not None: + yield result + finally: + # Unregister session after run completes (success or error) + await self._session_registry.unregister(run_id) + async def run_from_query( self, query: pipeline_query.Query, ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: """Run agent runner from pipeline query. - This is the main entry point called by ChatMessageHandler. + This is a compatibility wrapper for the legacy Query-based flow. + It preserves existing behavior for params, messages, state, etc. + + For the new event-first Protocol v1, use run(event, binding) instead. Args: query: Pipeline query with pipeline_config, session, messages, etc. @@ -99,12 +203,17 @@ class AgentRunOrchestrator: # Get runner descriptor descriptor = await self.registry.get(runner_id, bound_plugins) - # Build resources + # Build resources (using legacy Query-based method) resources = await self.resource_builder.build_resources(query, descriptor) - # Build context + # Build context (using legacy Query-based method with params, state, messages) context = await self.context_builder.build_context(query, descriptor, resources) + # Get conversation_id from context + conversation_id = None + if context.get('conversation'): + conversation_id = context['conversation'].get('conversation_id') + # Register session for proxy action permission validation run_id = context['run_id'] await self._session_registry.register( @@ -113,6 +222,7 @@ class AgentRunOrchestrator: query_id=query.query_id, plugin_identity=descriptor.get_plugin_id(), resources=resources, + conversation_id=conversation_id, ) try: @@ -267,6 +377,8 @@ class AgentRunOrchestrator: ) -> None: """Handle state.updated result - apply to state store. + Legacy method for Query-based flow. + Args: result_dict: Raw result dict with type='state.updated' query: Pipeline query @@ -302,3 +414,197 @@ class AgentRunOrchestrator: f'Runner {descriptor.id} state.updated: scope={scope}, key={key}, value={value}' ) # Invalid scope is already logged by state_store.apply_update + + def _handle_state_updated_event( + self, + result_dict: dict[str, typing.Any], + event: AgentEventEnvelope, + descriptor: AgentRunnerDescriptor, + ) -> None: + """Handle state.updated result in event-first mode. + + Args: + result_dict: Raw result dict with type='state.updated' + event: Event envelope + descriptor: Runner descriptor + """ + data = result_dict.get('data', {}) + + # Extract scope (default to 'conversation' for backward compat) + scope = data.get('scope', 'conversation') + + # Extract key and value + key = data.get('key') + value = data.get('value') + + if not key: + self.ap.logger.warning( + f'Runner {descriptor.id} state.updated missing key, ignoring' + ) + return + + # Apply update to state store using event context + # Note: state_store needs to support event-based scope key calculation + # For now, we log and skip actual persistence in event-first mode + # This will be implemented when state_store is migrated to support events + self.ap.logger.debug( + f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}, value={value}' + ) + + async def _write_event_log( + self, + event: AgentEventEnvelope, + binding: AgentBinding, + run_id: str, + runner_id: str, + ) -> str: + """Write incoming event to EventLog. + + Args: + event: Event envelope + binding: Agent binding + run_id: Run ID + runner_id: Runner ID + + Returns: + Event log ID + """ + import datetime + + from .event_log_store import EventLogStore + store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + + # Build input summary + input_summary = None + input_json = None + if event.input: + if event.input.text: + input_summary = event.input.text[:1000] + input_json = { + 'text': event.input.text, + 'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents], + 'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments], + } + + return await store.append_event( + event_id=event.event_id, + event_type=event.event_type, + source=event.source, + bot_id=event.bot_id, + workspace_id=event.workspace_id, + conversation_id=event.conversation_id, + thread_id=event.thread_id, + actor_type=event.actor.actor_type if event.actor else None, + actor_id=event.actor.actor_id if event.actor else None, + actor_name=event.actor.actor_name if event.actor else None, + subject_type=event.subject.subject_type if event.subject else None, + subject_id=event.subject.subject_id if event.subject else None, + input_summary=input_summary, + input_json=input_json, + run_id=run_id, + runner_id=runner_id, + event_time=datetime.datetime.fromtimestamp(event.event_time) if event.event_time else None, + ) + + async def _write_user_transcript( + self, + event: AgentEventEnvelope, + event_log_id: str, + ) -> None: + """Write user message to Transcript. + + Args: + event: Event envelope + event_log_id: Event log ID + """ + from .transcript_store import TranscriptStore + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + # Build content + content = event.input.text if event.input else None + content_json = None + if event.input: + content_json = { + 'role': 'user', + 'content': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents] if event.input.contents else [], + } + + # Build artifact refs + artifact_refs = [] + if event.input and event.input.attachments: + for a in event.input.attachments: + artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a) + + await store.append_transcript( + event_id=event_log_id, + conversation_id=event.conversation_id, + role='user', + content=content, + content_json=content_json, + artifact_refs=artifact_refs if artifact_refs else None, + thread_id=event.thread_id, + item_type='message', + metadata={ + 'actor_type': event.actor.actor_type if event.actor else None, + 'actor_id': event.actor.actor_id if event.actor else None, + }, + ) + + async def _write_assistant_transcript( + self, + result_dict: dict[str, typing.Any], + event: AgentEventEnvelope, + run_id: str, + runner_id: str, + ) -> None: + """Write assistant message to Transcript. + + Args: + result_dict: Result dict from runner + event: Original event envelope + run_id: Run ID + runner_id: Runner ID + """ + import uuid + + from .transcript_store import TranscriptStore + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + data = result_dict.get('data', {}) + message = data.get('message', {}) + + # Build content + content = None + content_json = None + + if isinstance(message.get('content'), str): + content = message['content'] + content_json = message + elif isinstance(message.get('content'), list): + # Extract text from content list + text_parts = [] + for c in message['content']: + if isinstance(c, dict) and c.get('type') == 'text': + text_parts.append(c.get('text', '')) + content = ' '.join(text_parts) if text_parts else None + content_json = message + + # Generate a unique event ID for assistant message + assistant_event_id = str(uuid.uuid4()) + + await store.append_transcript( + transcript_id=str(uuid.uuid4()), + event_id=assistant_event_id, + conversation_id=event.conversation_id, + role='assistant', + content=content, + content_json=content_json, + thread_id=event.thread_id, + item_type='message', + run_id=run_id, + runner_id=runner_id, + metadata={ + 'run_id': run_id, + 'runner_id': runner_id, + }, + ) diff --git a/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py b/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py new file mode 100644 index 00000000..0f018413 --- /dev/null +++ b/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py @@ -0,0 +1,579 @@ +"""Pipeline compatibility adapter for converting Query to event-first envelope. + +This adapter bridges the legacy Query/Pipeline approach with the new +event-first Protocol v1 architecture. It is a compatibility layer only. +""" +from __future__ import annotations + +import typing +import time + +from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.agent_runner.event import ( + AgentEventContext, + ConversationContext, + ActorContext, + SubjectContext, + RawEventRef, +) +from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput +from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext +from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger + +from .host_models import ( + AgentEventEnvelope, + AgentBinding, + BindingScope, + ResourcePolicy, + StatePolicy, + DeliveryPolicy, +) +from . import events as runner_events + + +class PipelineCompatAdapter: + """Adapter for converting Pipeline Query to event-first envelope. + + This adapter is responsible for: + - Converting Query to AgentEventEnvelope + - Converting Pipeline config to temporary AgentBinding + - Handling legacy max-round as bootstrap policy + - Putting Query-only fields into compatibility context + """ + + @classmethod + def query_to_event( + cls, + query: pipeline_query.Query, + ) -> AgentEventEnvelope: + """Convert Pipeline Query to AgentEventEnvelope. + + Args: + query: Pipeline query + + Returns: + AgentEventEnvelope for event-first processing + """ + # Build event context + event = cls._build_event_context(query) + + # Build conversation context + conversation = cls._build_conversation_context(query) + + # Build actor context + actor = cls._build_actor_context(query) + + # Build subject context + subject = cls._build_subject_context(query) + + # Build input + input = cls._build_input(query) + + # Build delivery context + delivery = cls._build_delivery_context(query) + + # Build raw ref + raw_ref = cls._build_raw_ref(query) + + return AgentEventEnvelope( + event_id=event.event_id or str(query.query_id), + event_type=event.event_type or runner_events.MESSAGE_RECEIVED, + event_time=event.event_time, + source="pipeline_compat", + bot_id=query.bot_uuid, + workspace_id=None, # Not available in Query + conversation_id=conversation.conversation_id, + thread_id=conversation.thread_id, + actor=actor, + subject=subject, + input=input, + delivery=delivery, + raw_ref=raw_ref, + ) + + @classmethod + def pipeline_config_to_binding( + cls, + query: pipeline_query.Query, + runner_id: str, + ) -> AgentBinding: + """Convert Pipeline config to temporary AgentBinding. + + Args: + query: Pipeline query + runner_id: Resolved runner ID + + Returns: + AgentBinding for this run + """ + pipeline_config = query.pipeline_config or {} + ai_config = pipeline_config.get('ai', {}) + runner_config = ai_config.get('runner_config', {}).get(runner_id, {}) + + # Extract max_round for compatibility (used in bootstrap, not Protocol v1) + max_round = runner_config.get('max_round') or ai_config.get('max-round') + + # Build scope + scope = BindingScope( + scope_type="pipeline", + scope_id=query.pipeline_uuid, + ) + + # Build resource policy from pipeline config + resource_policy = ResourcePolicy( + allowed_model_uuids=cls._extract_allowed_models(query), + allowed_tool_names=cls._extract_allowed_tools(query), + allowed_kb_uuids=cls._extract_allowed_kbs(query), + ) + + # Build state policy + state_policy = StatePolicy( + enable_state=True, + state_scopes=["conversation", "actor", "subject", "runner"], + ) + + # Build delivery policy + output_config = pipeline_config.get('output', {}) + delivery_policy = DeliveryPolicy( + enable_streaming=True, + enable_reply=True, + ) + + return AgentBinding( + binding_id=f"pipeline_{query.pipeline_uuid or 'default'}_{runner_id}", + scope=scope, + event_types=[runner_events.MESSAGE_RECEIVED], + runner_id=runner_id, + runner_config=runner_config, + resource_policy=resource_policy, + state_policy=state_policy, + delivery_policy=delivery_policy, + enabled=True, + pipeline_uuid=query.pipeline_uuid, + max_round=max_round, + ) + + @classmethod + def build_bootstrap_from_binding( + cls, + query: pipeline_query.Query, + binding: AgentBinding, + ) -> dict[str, typing.Any]: + """Build bootstrap context from binding for legacy max-round. + + This method handles the legacy max-round -> bootstrap conversion. + max-round is NOT part of Protocol v1, only used by compatibility adapter. + + Args: + query: Pipeline query + binding: Agent binding with max_round + + Returns: + Bootstrap context data + """ + max_round = binding.max_round + + # If no max_round or self_managed_context, return empty bootstrap + if max_round is None or max_round <= 0: + return { + "messages": [], + "summary": None, + "artifacts": [], + "metadata": { + "policy": "self_managed", + "legacy_max_round": None, + }, + } + + # Legacy max-round packaging (will be handled by context_packager) + return { + "messages": [], # Will be filled by context_packager + "summary": None, + "artifacts": [], + "metadata": { + "policy": "legacy_max_round", + "legacy_max_round": max_round, + }, + } + + @classmethod + def build_compatibility_context( + cls, + query: pipeline_query.Query, + ) -> dict[str, typing.Any]: + """Build compatibility context for legacy Query/Pipeline fields. + + These fields are for migration purposes only. + Runners should NOT depend on them for long-term capabilities. + + Args: + query: Pipeline query + + Returns: + Compatibility context data + """ + return { + "query_id": query.query_id, + "pipeline_uuid": query.pipeline_uuid, + "max_round": None, # Moved to binding, not here + "legacy_messages": [], # Will be filled by context_packager + "extra": { + "bot_uuid": query.bot_uuid, + "sender_id": str(query.sender_id) if query.sender_id else None, + "launcher_type": query.launcher_type.value if query.launcher_type else None, + "launcher_id": query.launcher_id, + }, + } + + # Private helper methods + + @classmethod + def _build_event_context( + cls, + query: pipeline_query.Query, + ) -> AgentEventContext: + """Build AgentEventContext from Query.""" + message_event = getattr(query, 'message_event', None) + + event_data: dict[str, typing.Any] = {} + if message_event and hasattr(message_event, 'model_dump'): + try: + event_data = message_event.model_dump(mode='json') + except TypeError: + event_data = message_event.model_dump() + except Exception: + event_data = {} + event_data.pop('source_platform_object', None) + + source_event_type = None + if message_event: + source_event_type = getattr(message_event, 'type', None) + + message_chain = getattr(query, 'message_chain', None) + message_id = getattr(message_chain, 'message_id', None) + if message_id == -1: + message_id = None + + event_time = None + if message_event: + event_time = getattr(message_event, 'time', None) + if isinstance(event_time, (int, float)): + event_time = int(event_time) + + return AgentEventContext( + event_id=str(message_id or query.query_id), + event_type=runner_events.MESSAGE_RECEIVED, + event_time=event_time, + source="pipeline_compat", + source_event_type=source_event_type, + data=event_data, + ) + + @classmethod + def _build_conversation_context( + cls, + query: pipeline_query.Query, + ) -> ConversationContext: + """Build ConversationContext from Query.""" + # Handle session and conversation_id + conversation_id = None + session = getattr(query, 'session', None) + if session: + conversation = getattr(session, 'using_conversation', None) + if conversation: + conversation_id = getattr(conversation, 'uuid', None) + + # Handle launcher_type safely + launcher_type = getattr(query, 'launcher_type', None) + launcher_type_value = None + if launcher_type is not None: + launcher_type_value = getattr(launcher_type, 'value', launcher_type) + + # Handle launcher_id + launcher_id = getattr(query, 'launcher_id', None) + + # Handle sender_id + sender_id = getattr(query, 'sender_id', None) + if sender_id is not None: + sender_id = str(sender_id) + + # Handle bot_uuid + bot_uuid = getattr(query, 'bot_uuid', None) + + # Handle pipeline_uuid + pipeline_uuid = getattr(query, 'pipeline_uuid', None) + + # Build session_id from launcher info if available + session_id = None + if launcher_type_value and launcher_id: + session_id = f'{launcher_type_value}_{launcher_id}' + + return ConversationContext( + conversation_id=conversation_id, + thread_id=None, + launcher_type=launcher_type_value, + launcher_id=launcher_id, + sender_id=sender_id, + bot_id=bot_uuid, + workspace_id=None, + session_id=session_id, + pipeline_uuid=pipeline_uuid, + ) + + @classmethod + def _build_actor_context( + cls, + query: pipeline_query.Query, + ) -> ActorContext: + """Build ActorContext from Query.""" + message_event = getattr(query, 'message_event', None) + sender = getattr(message_event, 'sender', None) if message_event else None + sender_id = getattr(query, 'sender_id', None) + actor_id = getattr(sender, 'id', None) if sender else None + if actor_id is None: + actor_id = sender_id + actor_name = sender.get_name() if sender and hasattr(sender, 'get_name') else None + + return ActorContext( + actor_type="user", + actor_id=str(actor_id) if actor_id is not None else None, + actor_name=actor_name, + metadata={}, + ) + + @classmethod + def _build_subject_context( + cls, + query: pipeline_query.Query, + ) -> SubjectContext: + """Build SubjectContext from Query.""" + message_chain = getattr(query, 'message_chain', None) + message_id = getattr(message_chain, 'message_id', None) if message_chain else None + if message_id == -1: + message_id = None + + query_id = getattr(query, 'query_id', None) + + # Safely get launcher_type + launcher_type = getattr(query, 'launcher_type', None) + launcher_type_value = None + if launcher_type is not None: + launcher_type_value = getattr(launcher_type, 'value', launcher_type) + + return SubjectContext( + subject_type="message", + subject_id=str(message_id or query_id or ''), + data={ + "launcher_type": launcher_type_value, + "launcher_id": getattr(query, 'launcher_id', None), + "sender_id": str(getattr(query, 'sender_id', '')) if getattr(query, 'sender_id', None) else None, + "bot_uuid": getattr(query, 'bot_uuid', None), + "pipeline_uuid": getattr(query, 'pipeline_uuid', None), + }, + ) + + @classmethod + def _build_input( + cls, + query: pipeline_query.Query, + ) -> AgentInput: + """Build AgentInput from Query.""" + text = None + text_parts: list[str] = [] + contents: list[dict[str, typing.Any]] = [] + + user_message = getattr(query, 'user_message', None) + if user_message: + content = getattr(user_message, 'content', None) + if isinstance(content, list): + for elem in content: + # Handle both real objects and mocks + if hasattr(elem, 'model_dump'): + contents.append(elem.model_dump(mode='json')) + elif isinstance(elem, dict): + contents.append(elem) + else: + # For mocks, extract type and text attributes + elem_type = getattr(elem, 'type', None) + if elem_type == 'text': + elem_text = getattr(elem, 'text', None) + contents.append({'type': 'text', 'text': elem_text}) + if elem_text: + text_parts.append(elem_text) + continue + + # Extract text for the text field + if hasattr(elem, 'type') and getattr(elem, 'type', None) == 'text': + elem_text = getattr(elem, 'text', None) + if elem_text: + text_parts.append(elem_text) + elif content is not None: + text = str(content) + contents.append({'type': 'text', 'text': text}) + + if text_parts: + text = ''.join(text_parts) + + message_chain_dict = None + message_chain = getattr(query, 'message_chain', None) + if message_chain: + if hasattr(message_chain, 'model_dump'): + message_chain_dict = message_chain.model_dump(mode='json') + + attachments = cls._build_attachments(query, contents) + + return AgentInput( + text=text, + contents=contents, + message_chain=message_chain_dict, + attachments=attachments, + ) + + @classmethod + def _build_attachments( + cls, + query: pipeline_query.Query, + contents: list[dict[str, typing.Any]], + ) -> list[dict[str, typing.Any]]: + """Extract attachments from query.""" + import uuid + + attachments: list[dict[str, typing.Any]] = [] + + for elem in contents: + elem_type = elem.get('type') + artifact_id = str(uuid.uuid4()) # Generate unique ID + + if elem_type == 'image_url': + image_url = elem.get('image_url') or {} + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'image', + 'source': 'url', + 'url': image_url.get('url') if isinstance(image_url, dict) else str(image_url), + }) + elif elem_type == 'image_base64': + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'image', + 'source': 'base64', + 'content': elem.get('image_base64'), + }) + elif elem_type == 'file_url': + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'file', + 'source': 'url', + 'url': elem.get('file_url'), + 'name': elem.get('file_name'), + }) + elif elem_type == 'file_base64': + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'file', + 'source': 'base64', + 'content': elem.get('file_base64'), + 'name': elem.get('file_name'), + }) + + message_chain = getattr(query, 'message_chain', None) + if message_chain: + try: + for component in message_chain: + artifact_id = str(uuid.uuid4()) # Generate unique ID + + if isinstance(component, platform_message.Image): + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'image', + 'source': 'message_chain', + 'id': component.image_id or None, + 'url': component.url or None, + }) + elif isinstance(component, platform_message.File): + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'file', + 'source': 'message_chain', + 'id': component.id or None, + 'name': component.name or None, + }) + elif isinstance(component, platform_message.Voice): + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'voice', + 'source': 'message_chain', + 'id': component.voice_id or None, + 'url': component.url or None, + }) + except TypeError: + # message_chain is not iterable (e.g., a Mock object) + pass + + return attachments + + @classmethod + def _build_delivery_context( + cls, + query: pipeline_query.Query, + ) -> DeliveryContext: + """Build DeliveryContext from Query.""" + return DeliveryContext( + surface="platform", + reply_target={ + "message_id": getattr(query.message_chain, 'message_id', None), + }, + supports_streaming=True, + supports_edit=False, + supports_reaction=False, + platform_capabilities={}, + ) + + @classmethod + def _build_raw_ref( + cls, + query: pipeline_query.Query, + ) -> RawEventRef | None: + """Build RawEventRef from Query.""" + # For now, we don't store raw event payload + return None + + @classmethod + def _extract_allowed_models( + cls, + query: pipeline_query.Query, + ) -> list[str] | None: + """Extract allowed model UUIDs from query.""" + model_uuid = getattr(query, 'use_llm_model_uuid', None) + if model_uuid: + return [model_uuid] + return None + + @classmethod + def _extract_allowed_tools( + cls, + query: pipeline_query.Query, + ) -> list[str] | None: + """Extract allowed tool names from query.""" + use_funcs = getattr(query, 'use_funcs', None) + if not use_funcs: + return None + try: + return [func.get('name') for func in use_funcs if isinstance(func, dict) and func.get('name')] + except (TypeError, AttributeError): + return None + + @classmethod + def _extract_allowed_kbs( + cls, + query: pipeline_query.Query, + ) -> list[str] | None: + """Extract allowed knowledge base UUIDs from query.""" + variables = getattr(query, 'variables', None) + if not variables: + return None + kb_uuids = variables.get('_knowledge_base_uuids') + if kb_uuids: + return kb_uuids + return None diff --git a/src/langbot/pkg/agent/runner/resource_builder.py b/src/langbot/pkg/agent/runner/resource_builder.py index 8c925b4e..ce54c246 100644 --- a/src/langbot/pkg/agent/runner/resource_builder.py +++ b/src/langbot/pkg/agent/runner/resource_builder.py @@ -14,6 +14,7 @@ from .context_builder import ( StorageResource, ) from . import config_schema +from .host_models import AgentEventEnvelope, AgentBinding class AgentResourceBuilder: @@ -29,6 +30,10 @@ class AgentResourceBuilder: - Build knowledge_bases list from config - Build storage and files permissions summary + Entry points: + - build_resources_from_binding(event, binding, descriptor): Event-first Protocol v1 + - build_resources(query, descriptor): Legacy Query-based + Note: This only builds the resource declaration. The actual proxy actions in handler.py must still validate against ctx.resources at runtime. @@ -44,6 +49,166 @@ class AgentResourceBuilder: def __init__(self, ap: app.Application): self.ap = ap + async def build_resources_from_binding( + self, + event: AgentEventEnvelope, + binding: AgentBinding, + descriptor: AgentRunnerDescriptor, + ) -> AgentResources: + """Build AgentResources from event and binding. + + This is the main entry point for Protocol v1. + + Args: + event: Event envelope + binding: Agent binding with resource policy + descriptor: Runner descriptor with permissions and capabilities + + Returns: + AgentResources dict with filtered resource lists + """ + # Layer 1: Runner manifest permissions + manifest_perms = descriptor.permissions + + # Layer 2: Binding resource policy + resource_policy = binding.resource_policy + + # Layer 3: Runner instance config + runner_config = binding.runner_config + + # Build each resource category + models = await self._build_models_from_binding( + manifest_perms, resource_policy, descriptor, runner_config + ) + tools = await self._build_tools_from_binding( + manifest_perms, resource_policy, binding + ) + knowledge_bases = await self._build_knowledge_bases_from_binding( + manifest_perms, resource_policy, descriptor, runner_config + ) + storage = self._build_storage_from_binding(manifest_perms, binding) + + return { + 'models': models, + 'tools': tools, + 'knowledge_bases': knowledge_bases, + 'files': [], # Files are populated at runtime + 'storage': storage, + 'platform_capabilities': {}, # Reserved for EBA + } + + async def _build_models_from_binding( + self, + manifest_perms: dict[str, list[str]], + resource_policy: typing.Any, + descriptor: AgentRunnerDescriptor, + runner_config: dict[str, typing.Any], + ) -> list[ModelResource]: + """Build models list from binding.""" + models: list[ModelResource] = [] + seen_model_ids: set[str] = set() + + # Check manifest permission + model_perms = manifest_perms.get('models', []) + if 'invoke' not in model_perms and 'stream' not in model_perms: + return models + + # Get model UUIDs from resource policy + allowed_uuids = resource_policy.allowed_model_uuids + + # Add model resources from binding config schema + await self._append_config_declared_model_resources( + models=models, + seen_model_ids=seen_model_ids, + descriptor=descriptor, + runner_config=runner_config, + ) + + # Add explicitly allowed models + if allowed_uuids: + for model_uuid in allowed_uuids: + await self._append_llm_model_resource(models, seen_model_ids, model_uuid) + + return models + + async def _build_tools_from_binding( + self, + manifest_perms: dict[str, list[str]], + resource_policy: typing.Any, + binding: AgentBinding, + ) -> list[ToolResource]: + """Build tools list from binding.""" + tools: list[ToolResource] = [] + + # Check manifest permission + tool_perms = manifest_perms.get('tools', []) + if 'detail' not in tool_perms and 'call' not in tool_perms: + return tools + + # Get tool names from resource policy + allowed_names = resource_policy.allowed_tool_names + + if allowed_names: + for tool_name in allowed_names: + tools.append({ + 'tool_name': tool_name, + 'tool_type': None, + 'description': None, + }) + + return tools + + async def _build_knowledge_bases_from_binding( + self, + manifest_perms: dict[str, list[str]], + resource_policy: typing.Any, + descriptor: AgentRunnerDescriptor, + runner_config: dict[str, typing.Any], + ) -> list[KnowledgeBaseResource]: + """Build knowledge bases list from binding.""" + kb_resources: list[KnowledgeBaseResource] = [] + + # Check manifest permission + kb_perms = manifest_perms.get('knowledge_bases', []) + if 'list' not in kb_perms and 'retrieve' not in kb_perms: + return kb_resources + + # Get KB UUIDs from schema-defined config fields + kb_uuids = config_schema.extract_knowledge_base_uuids(descriptor, runner_config) + + # Also check resource policy + allowed_uuids = resource_policy.allowed_kb_uuids + if allowed_uuids: + kb_uuids = allowed_uuids + + for kb_uuid in kb_uuids: + try: + kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) + if kb: + kb_resources.append({ + 'kb_id': kb_uuid, + 'kb_name': kb.get_name(), + 'kb_type': kb.knowledge_base_entity.kb_type if hasattr(kb.knowledge_base_entity, 'kb_type') else None, + }) + except Exception as e: + self.ap.logger.warning(f'Failed to build knowledge base resource {kb_uuid}: {e}') + + return kb_resources + + def _build_storage_from_binding( + self, + manifest_perms: dict[str, list[str]], + binding: AgentBinding, + ) -> StorageResource: + """Build storage permissions from binding.""" + storage_perms = manifest_perms.get('storage', []) + resource_policy = binding.resource_policy + + return { + 'plugin_storage': 'plugin' in storage_perms and resource_policy.allow_plugin_storage, + 'workspace_storage': 'workspace' in storage_perms and resource_policy.allow_workspace_storage, + } + async def build_resources( self, query: typing.Any, # pipeline_query.Query @@ -51,6 +216,8 @@ class AgentResourceBuilder: ) -> AgentResources: """Build AgentResources from query and runner descriptor. + This is a compatibility wrapper for Query-based flow. + Args: query: Pipeline query with pipeline_config and variables descriptor: Runner descriptor with permissions and capabilities diff --git a/src/langbot/pkg/agent/runner/session_registry.py b/src/langbot/pkg/agent/runner/session_registry.py index 76b0f46e..1d24f593 100644 --- a/src/langbot/pkg/agent/runner/session_registry.py +++ b/src/langbot/pkg/agent/runner/session_registry.py @@ -25,6 +25,7 @@ class AgentRunSession(typing.TypedDict): runner_id: Runner descriptor ID (plugin:author/name/runner) query_id: Pipeline query ID plugin_identity: Plugin identifier (author/name) of the runner + conversation_id: Conversation ID for history/event access resources: Authorized resources for this run (from AgentResources) status: Session status tracking _authorized_ids: Pre-computed authorized resource IDs for O(1) lookup @@ -33,6 +34,7 @@ class AgentRunSession(typing.TypedDict): runner_id: str query_id: int | None plugin_identity: str # author/name + conversation_id: str | None resources: AgentResources status: AgentRunSessionStatus _authorized_ids: dict[str, set[str]] # Pre-computed sets for O(1) lookup @@ -64,6 +66,7 @@ class AgentRunSessionRegistry: query_id: int | None, plugin_identity: str, resources: AgentResources, + conversation_id: str | None = None, ) -> None: """Register a new agent run session. @@ -73,6 +76,7 @@ class AgentRunSessionRegistry: query_id: Pipeline query ID plugin_identity: Plugin identifier (author/name) resources: Authorized resources for this run + conversation_id: Conversation ID for history/event access """ now = int(time.time()) @@ -89,6 +93,7 @@ class AgentRunSessionRegistry: 'runner_id': runner_id, 'query_id': query_id, 'plugin_identity': plugin_identity, + 'conversation_id': conversation_id, 'resources': resources, 'status': { 'started_at': now, diff --git a/src/langbot/pkg/agent/runner/transcript_store.py b/src/langbot/pkg/agent/runner/transcript_store.py new file mode 100644 index 00000000..ea63c427 --- /dev/null +++ b/src/langbot/pkg/agent/runner/transcript_store.py @@ -0,0 +1,288 @@ +"""Transcript store for writing and querying conversation history.""" +from __future__ import annotations + +import json +import datetime +import typing +import uuid + +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncEngine + +from ...entity.persistence.transcript import Transcript + + +class TranscriptStore: + """Store for Transcript records. + + Handles writing transcript items and querying them for history API. + All methods are async and use the provided database engine. + """ + + engine: AsyncEngine + + # Hard limits + MAX_CONTENT_LENGTH = 4000 + HARD_LIMIT = 100 + + def __init__(self, engine: AsyncEngine): + self.engine = engine + + async def append_transcript( + self, + transcript_id: str | None, + event_id: str, + conversation_id: str, + role: str, + content: str | None = None, + content_json: dict[str, typing.Any] | None = None, + artifact_refs: list[dict[str, typing.Any]] | None = None, + thread_id: str | None = None, + item_type: str = "message", + run_id: str | None = None, + runner_id: str | None = None, + metadata: dict[str, typing.Any] | None = None, + ) -> str: + """Append a transcript item. + + Args: + transcript_id: Unique transcript ID (generated if None) + event_id: Source event ID + conversation_id: Conversation ID + role: Message role (user, assistant, system, tool) + content: Text content + content_json: Full structured content + artifact_refs: Artifact references + thread_id: Thread ID + item_type: Item type + run_id: Run ID that generated this + runner_id: Runner ID that generated this + metadata: Additional metadata + + Returns: + The transcript_id + """ + if transcript_id is None: + transcript_id = str(uuid.uuid4()) + + # Truncate content if too long + if content and len(content) > self.MAX_CONTENT_LENGTH: + content = content[:self.MAX_CONTENT_LENGTH - 3] + "..." + + # Get next sequence number for this conversation + seq = await self._get_next_seq(conversation_id) + + async with self.engine.connect() as conn: + await conn.execute( + sqlalchemy.insert(Transcript).values( + transcript_id=transcript_id, + event_id=event_id, + conversation_id=conversation_id, + thread_id=thread_id, + role=role, + item_type=item_type, + content=content, + content_json=json.dumps(content_json) if content_json else None, + artifact_refs_json=json.dumps(artifact_refs) if artifact_refs else None, + seq=seq, + run_id=run_id, + runner_id=runner_id, + created_at=datetime.datetime.utcnow(), + metadata_json=json.dumps(metadata) if metadata else None, + ) + ) + await conn.commit() + + return transcript_id + + async def page_transcript( + self, + conversation_id: str, + before_seq: int | None = None, + after_seq: int | None = None, + limit: int = 50, + direction: str = "backward", + include_artifacts: bool = False, + ) -> tuple[list[dict[str, typing.Any]], int | None, int | None, bool]: + """Page through transcript items. + + Args: + conversation_id: Conversation ID + before_seq: Get items before this sequence (backward) + after_seq: Get items after this sequence (forward) + limit: Maximum items to return (capped at 100) + direction: 'backward' (older) or 'forward' (newer) + include_artifacts: Include artifact refs + + Returns: + Tuple of (items, next_seq, prev_seq, has_more) + """ + limit = min(limit, self.HARD_LIMIT) + + async with self.engine.connect() as conn: + query = sqlalchemy.select(Transcript).where( + Transcript.conversation_id == conversation_id + ) + + if direction == "backward" and before_seq is not None: + query = query.where(Transcript.seq < before_seq) + query = query.order_by(Transcript.seq.desc()) + elif direction == "forward" and after_seq is not None: + query = query.where(Transcript.seq > after_seq) + query = query.order_by(Transcript.seq.asc()) + else: + # Default: most recent items first (backward from latest) + query = query.order_by(Transcript.seq.desc()) + + query = query.limit(limit + 1) + + result = await conn.execute(query) + rows = result.fetchall() + + items = [self._row_to_dict(row[0], include_artifacts) for row in rows[:limit]] + has_more = len(rows) > limit + + # Calculate cursors + next_seq = None + prev_seq = None + + if direction == "backward": + # Items are in descending order + if items: + next_seq = items[-1].get('seq') if has_more else None + prev_seq = items[0].get('seq') + else: + # Items are in ascending order + if items: + next_seq = items[-1].get('seq') if has_more else None + prev_seq = items[0].get('seq') + + return items, next_seq, prev_seq, has_more + + async def search_transcript( + self, + conversation_id: str, + query_text: str, + filters: dict[str, typing.Any] | None = None, + top_k: int = 10, + ) -> list[dict[str, typing.Any]]: + """Search transcript items. + + Basic implementation using LIKE filtering. + + Args: + conversation_id: Conversation ID + query_text: Search query + filters: Optional filters + top_k: Maximum results + + Returns: + List of matching items + """ + async with self.engine.connect() as conn: + query = sqlalchemy.select(Transcript).where( + Transcript.conversation_id == conversation_id, + Transcript.content.ilike(f"%{query_text}%"), + ) + + # Apply additional filters + if filters: + if 'roles' in filters: + query = query.where(Transcript.role.in_(filters['roles'])) + if 'item_types' in filters: + query = query.where(Transcript.item_type.in_(filters['item_types'])) + + query = query.order_by(Transcript.seq.desc()).limit(top_k) + + result = await conn.execute(query) + rows = result.fetchall() + + return [self._row_to_dict(row[0], include_artifacts=True) for row in rows] + + async def get_latest_cursor( + self, + conversation_id: str, + ) -> str | None: + """Get the latest cursor for a conversation. + + Args: + conversation_id: Conversation ID + + Returns: + Cursor string (seq number), or None if no items + """ + async with self.engine.connect() as conn: + result = await conn.execute( + sqlalchemy.select(Transcript.seq) + .where(Transcript.conversation_id == conversation_id) + .order_by(Transcript.seq.desc()) + .limit(1) + ) + row = result.fetchone() + if row is None: + return None + return str(row[0]) + + async def has_history_before( + self, + conversation_id: str, + seq: int, + ) -> bool: + """Check if there is history before a sequence number. + + Args: + conversation_id: Conversation ID + seq: Sequence number + + Returns: + True if there are items before + """ + async with self.engine.connect() as conn: + result = await conn.execute( + sqlalchemy.select(sqlalchemy.func.count()) + .select_from(Transcript) + .where( + Transcript.conversation_id == conversation_id, + Transcript.seq < seq, + ) + ) + count = result.scalar() + return count > 0 + + async def _get_next_seq(self, conversation_id: str) -> int: + """Get the next sequence number for a conversation.""" + async with self.engine.connect() as conn: + result = await conn.execute( + sqlalchemy.select(sqlalchemy.func.max(Transcript.seq)) + .where(Transcript.conversation_id == conversation_id) + ) + max_seq = result.scalar() + return (max_seq or 0) + 1 + + def _row_to_dict( + self, + row: Transcript, + include_artifacts: bool = False, + ) -> dict[str, typing.Any]: + """Convert a Transcript row to dict.""" + result = { + 'transcript_id': row.transcript_id, + 'event_id': row.event_id, + 'conversation_id': row.conversation_id, + 'thread_id': row.thread_id, + 'role': row.role, + 'item_type': row.item_type, + 'content': row.content, + 'content_json': json.loads(row.content_json) if row.content_json else None, + 'seq': row.seq, + 'cursor': str(row.seq), + 'created_at': int(row.created_at.timestamp()) if row.created_at else None, + 'metadata': json.loads(row.metadata_json) if row.metadata_json else {}, + } + + if include_artifacts and row.artifact_refs_json: + result['artifact_refs'] = json.loads(row.artifact_refs_json) + else: + result['artifact_refs'] = [] + + return result diff --git a/src/langbot/pkg/entity/persistence/event_log.py b/src/langbot/pkg/entity/persistence/event_log.py new file mode 100644 index 00000000..1d1dd86a --- /dev/null +++ b/src/langbot/pkg/entity/persistence/event_log.py @@ -0,0 +1,85 @@ +"""EventLog persistence entity for storing auditable event facts.""" +from __future__ import annotations + +import sqlalchemy +import datetime + +from .base import Base + + +class EventLog(Base): + """EventLog stores auditable event records for AgentRunner. + + This is the fact source for events - messages, tool calls, system events, etc. + Large payloads are stored separately as artifacts; this table stores + references and summaries. + """ + + __tablename__ = 'event_log' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + """Auto-increment ID for sequencing.""" + + event_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, unique=True, index=True) + """Unique event identifier.""" + + event_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False, index=True) + """Event type (message.received, tool.call.started, etc.).""" + + event_time = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True) + """When the event occurred.""" + + source = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) + """Event source (platform, webui, api, scheduler, system, pipeline_compat).""" + + bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + """Bot UUID that handled this event.""" + + workspace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Workspace ID for multi-tenant deployments.""" + + conversation_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + """Conversation ID this event belongs to.""" + + thread_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Thread ID if platform supports threads.""" + + # Actor information + actor_type = sqlalchemy.Column(sqlalchemy.String(50), nullable=True) + """Actor type (user, system, runner).""" + + actor_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Actor identifier.""" + + actor_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Actor display name.""" + + # Subject information + subject_type = sqlalchemy.Column(sqlalchemy.String(50), nullable=True) + """Subject type (message, tool_call, artifact).""" + + subject_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Subject identifier.""" + + # Input information + input_summary = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Brief summary of input (truncated text, max 1000 chars).""" + + input_json = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Full input JSON if reasonably sized (AgentInput as JSON string).""" + + # Raw event reference + raw_ref = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Reference to raw event payload in ArtifactStore.""" + + run_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + """Run ID that processed this event.""" + + runner_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Runner ID that processed this event.""" + + created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, default=datetime.datetime.utcnow) + """When this record was created.""" + + metadata_json = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Additional metadata as JSON string.""" diff --git a/src/langbot/pkg/entity/persistence/transcript.py b/src/langbot/pkg/entity/persistence/transcript.py new file mode 100644 index 00000000..5d72340b --- /dev/null +++ b/src/langbot/pkg/entity/persistence/transcript.py @@ -0,0 +1,72 @@ +"""Transcript persistence entity for conversation history projection.""" +from __future__ import annotations + +import sqlalchemy +import datetime + +from .base import Base + + +class Transcript(Base): + """Transcript stores conversation-oriented message projection for history API. + + This is a projection of EventLog, optimized for agent history retrieval. + It includes message content and artifact refs, but not raw platform payloads. + """ + + __tablename__ = 'transcript' + + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) + """Auto-increment ID for sequencing.""" + + transcript_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, unique=True, index=True) + """Unique transcript item identifier.""" + + event_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) + """Reference to the source event in EventLog.""" + + conversation_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) + """Conversation this item belongs to.""" + + thread_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Thread ID if platform supports threads.""" + + role = sqlalchemy.Column(sqlalchemy.String(50), nullable=False) + """Message role: 'user', 'assistant', 'system', or 'tool'.""" + + item_type = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, default='message') + """Item type: 'message', 'tool_call', 'tool_result', 'system'.""" + + # Content + content = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Text content summary (may be truncated for large messages, max 4000 chars).""" + + content_json = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Full structured content as JSON string (Message model dump).""" + + # Artifact references + artifact_refs_json = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Artifact references as JSON string (list of ArtifactRef).""" + + # Sequence for cursor-based pagination + seq = sqlalchemy.Column(sqlalchemy.Integer, nullable=False, index=True) + """Sequence number within conversation (auto-increment per conversation).""" + + # Context + run_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + """Run ID that generated this item (for assistant messages).""" + + runner_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Runner ID that generated this item.""" + + created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, default=datetime.datetime.utcnow) + """When this item was created.""" + + metadata_json = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + """Additional metadata as JSON string (sender_id, platform, etc.).""" + + # Indexes + __table_args__ = ( + sqlalchemy.Index('ix_transcript_conversation_seq', 'conversation_id', 'seq'), + sqlalchemy.Index('ix_transcript_conversation_created', 'conversation_id', 'created_at'), + ) diff --git a/src/langbot/pkg/persistence/alembic/env.py b/src/langbot/pkg/persistence/alembic/env.py index 40543edd..43fae5ab 100644 --- a/src/langbot/pkg/persistence/alembic/env.py +++ b/src/langbot/pkg/persistence/alembic/env.py @@ -13,6 +13,26 @@ from sqlalchemy.engine import Connection from langbot.pkg.entity.persistence.base import Base +# Import all ORM models so they are registered with Base.metadata +# This is required for autogenerate to detect model changes +from langbot.pkg.entity.persistence import ( + apikey, + bot, + bstorage, + event_log, + mcp, + metadata, + model, + monitoring, + pipeline, + plugin, + rag, + transcript, + user, + vector, + webhook, +) + target_metadata = Base.metadata diff --git a/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py b/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py new file mode 100644 index 00000000..c26da0db --- /dev/null +++ b/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py @@ -0,0 +1,102 @@ +"""add_event_log_and_transcript_tables + +Revision ID: 58846a8d7a81 +Revises: 0004_migrate_runner_config +Create Date: 2026-05-23 15:41:47.030841 +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers +revision = '58846a8d7a81' +down_revision = '0004_migrate_runner_config' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create event_log table + op.create_table( + 'event_log', + sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), + sa.Column('event_id', sa.String(255), nullable=False, unique=True), + sa.Column('event_type', sa.String(100), nullable=False), + sa.Column('event_time', sa.DateTime(), nullable=True), + sa.Column('source', sa.String(50), nullable=False), + sa.Column('bot_id', sa.String(255), nullable=True), + sa.Column('workspace_id', sa.String(255), nullable=True), + sa.Column('conversation_id', sa.String(255), nullable=True), + sa.Column('thread_id', sa.String(255), nullable=True), + sa.Column('actor_type', sa.String(50), nullable=True), + sa.Column('actor_id', sa.String(255), nullable=True), + sa.Column('actor_name', sa.String(255), nullable=True), + sa.Column('subject_type', sa.String(50), nullable=True), + sa.Column('subject_id', sa.String(255), nullable=True), + sa.Column('input_summary', sa.Text(), nullable=True), + sa.Column('input_json', sa.Text(), nullable=True), + sa.Column('raw_ref', sa.String(255), nullable=True), + sa.Column('run_id', sa.String(255), nullable=True), + sa.Column('runner_id', sa.String(255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), + sa.Column('metadata_json', sa.Text(), nullable=True), + ) + + # Create indexes for event_log + with op.batch_alter_table('event_log', schema=None) as batch_op: + batch_op.create_index('ix_event_log_event_id', ['event_id'], unique=True) + batch_op.create_index('ix_event_log_event_type', ['event_type'], unique=False) + batch_op.create_index('ix_event_log_bot_id', ['bot_id'], unique=False) + batch_op.create_index('ix_event_log_conversation_id', ['conversation_id'], unique=False) + batch_op.create_index('ix_event_log_run_id', ['run_id'], unique=False) + + # Create transcript table + op.create_table( + 'transcript', + sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), + sa.Column('transcript_id', sa.String(255), nullable=False, unique=True), + sa.Column('event_id', sa.String(255), nullable=False), + sa.Column('conversation_id', sa.String(255), nullable=False), + sa.Column('thread_id', sa.String(255), nullable=True), + sa.Column('role', sa.String(50), nullable=False), + sa.Column('item_type', sa.String(50), nullable=False, server_default='message'), + sa.Column('content', sa.Text(), nullable=True), + sa.Column('content_json', sa.Text(), nullable=True), + sa.Column('artifact_refs_json', sa.Text(), nullable=True), + sa.Column('seq', sa.Integer(), nullable=False), + sa.Column('run_id', sa.String(255), nullable=True), + sa.Column('runner_id', sa.String(255), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), + sa.Column('metadata_json', sa.Text(), nullable=True), + ) + + # Create indexes for transcript + with op.batch_alter_table('transcript', schema=None) as batch_op: + batch_op.create_index('ix_transcript_transcript_id', ['transcript_id'], unique=True) + batch_op.create_index('ix_transcript_event_id', ['event_id'], unique=False) + batch_op.create_index('ix_transcript_conversation_id', ['conversation_id'], unique=False) + batch_op.create_index('ix_transcript_conversation_seq', ['conversation_id', 'seq'], unique=False) + batch_op.create_index('ix_transcript_conversation_created', ['conversation_id', 'created_at'], unique=False) + batch_op.create_index('ix_transcript_run_id', ['run_id'], unique=False) + + +def downgrade() -> None: + # Drop transcript table + with op.batch_alter_table('transcript', schema=None) as batch_op: + batch_op.drop_index('ix_transcript_run_id') + batch_op.drop_index('ix_transcript_conversation_created') + batch_op.drop_index('ix_transcript_conversation_seq') + batch_op.drop_index('ix_transcript_conversation_id') + batch_op.drop_index('ix_transcript_event_id') + batch_op.drop_index('ix_transcript_transcript_id') + + op.drop_table('transcript') + + # Drop event_log table + with op.batch_alter_table('event_log', schema=None) as batch_op: + batch_op.drop_index('ix_event_log_run_id') + batch_op.drop_index('ix_event_log_conversation_id') + batch_op.drop_index('ix_event_log_bot_id') + batch_op.drop_index('ix_event_log_event_type') + batch_op.drop_index('ix_event_log_event_id') + + op.drop_table('event_log') diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 0c39d630..334218c3 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -1279,6 +1279,269 @@ class RuntimeConnectionHandler(handler.Handler): except Exception as e: return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id) + # ================= Agent History/Event APIs ================= + + @self.action(PluginToRuntimeAction.HISTORY_PAGE) + async def history_page(data: dict[str, Any]) -> handler.ActionResponse: + """Page through transcript history for a conversation. + + Requires run_id authorization. Only allows access to current run's conversation. + """ + run_id = data.get('run_id') + conversation_id = data.get('conversation_id') + before_cursor = data.get('before_cursor') + after_cursor = data.get('after_cursor') + limit = data.get('limit', 50) + direction = data.get('direction', 'backward') + include_artifacts = data.get('include_artifacts', False) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + # Validate run session + session_registry = get_session_registry() + session = await session_registry.get(run_id) + if not session: + return handler.ActionResponse.error( + message=f'Run session {run_id} not found or expired' + ) + + # Validate caller plugin identity + if caller_plugin_identity: + session_plugin_identity = session.get('plugin_identity') + if session_plugin_identity and caller_plugin_identity != session_plugin_identity: + return handler.ActionResponse.error( + message=f'Plugin identity mismatch for run_id {run_id}' + ) + + # Get conversation from session if not provided + if not conversation_id: + conversation_id = session.get('conversation_id') + + if not conversation_id: + return handler.ActionResponse.success(data={ + 'items': [], + 'next_cursor': None, + 'prev_cursor': None, + 'has_more': False, + }) + + # Parse cursors + before_seq = int(before_cursor) if before_cursor else None + after_seq = int(after_cursor) if after_cursor else None + + # Query transcript + from ..agent.runner.transcript_store import TranscriptStore + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + try: + items, next_seq, prev_seq, has_more = await store.page_transcript( + conversation_id=conversation_id, + before_seq=before_seq, + after_seq=after_seq, + limit=limit, + direction=direction, + include_artifacts=include_artifacts, + ) + + return handler.ActionResponse.success(data={ + 'items': items, + 'next_cursor': str(next_seq) if next_seq else None, + 'prev_cursor': str(prev_seq) if prev_seq else None, + 'has_more': has_more, + }) + except Exception as e: + self.ap.logger.error(f'HISTORY_PAGE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'History page error: {e}') + + @self.action(PluginToRuntimeAction.HISTORY_SEARCH) + async def history_search(data: dict[str, Any]) -> handler.ActionResponse: + """Search transcript history. + + Requires run_id authorization. Only searches current run's conversation. + Basic implementation using LIKE filtering. + """ + run_id = data.get('run_id') + query_text = data.get('query', '') + filters = data.get('filters', {}) + top_k = data.get('top_k', 10) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + # Validate run session + session_registry = get_session_registry() + session = await session_registry.get(run_id) + if not session: + return handler.ActionResponse.error( + message=f'Run session {run_id} not found or expired' + ) + + # Validate caller plugin identity + if caller_plugin_identity: + session_plugin_identity = session.get('plugin_identity') + if session_plugin_identity and caller_plugin_identity != session_plugin_identity: + return handler.ActionResponse.error( + message=f'Plugin identity mismatch for run_id {run_id}' + ) + + # Get conversation from session or filters + conversation_id = filters.get('conversation_id') or session.get('conversation_id') + + if not conversation_id: + return handler.ActionResponse.success(data={ + 'items': [], + 'total_count': 0, + 'query': query_text, + }) + + # Search transcript + from ..agent.runner.transcript_store import TranscriptStore + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + try: + items = await store.search_transcript( + conversation_id=conversation_id, + query_text=query_text, + filters=filters, + top_k=top_k, + ) + + return handler.ActionResponse.success(data={ + 'items': items, + 'total_count': len(items), + 'query': query_text, + }) + except Exception as e: + self.ap.logger.error(f'HISTORY_SEARCH error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'History search error: {e}') + + @self.action(PluginToRuntimeAction.EVENT_GET) + async def event_get(data: dict[str, Any]) -> handler.ActionResponse: + """Get a single event record by ID. + + Requires run_id authorization. Only allows access to events in current run's conversation. + """ + run_id = data.get('run_id') + event_id = data.get('event_id') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if not event_id: + return handler.ActionResponse.error(message='event_id is required') + + # Validate run session + session_registry = get_session_registry() + session = await session_registry.get(run_id) + if not session: + return handler.ActionResponse.error( + message=f'Run session {run_id} not found or expired' + ) + + # Validate caller plugin identity + if caller_plugin_identity: + session_plugin_identity = session.get('plugin_identity') + if session_plugin_identity and caller_plugin_identity != session_plugin_identity: + return handler.ActionResponse.error( + message=f'Plugin identity mismatch for run_id {run_id}' + ) + + # Get event + from ..agent.runner.event_log_store import EventLogStore + store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + + try: + event = await store.get_event(event_id) + if not event: + return handler.ActionResponse.error( + message=f'Event {event_id} not found' + ) + + # Validate event is in the same conversation as the run + session_conversation_id = session.get('conversation_id') + if session_conversation_id and event.get('conversation_id') != session_conversation_id: + return handler.ActionResponse.error( + message=f'Event {event_id} is not accessible by this run' + ) + + return handler.ActionResponse.success(data=event) + except Exception as e: + self.ap.logger.error(f'EVENT_GET error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Event get error: {e}') + + @self.action(PluginToRuntimeAction.EVENT_PAGE) + async def event_page(data: dict[str, Any]) -> handler.ActionResponse: + """Page through event records. + + Requires run_id authorization. Only allows access to current run's conversation. + """ + run_id = data.get('run_id') + conversation_id = data.get('conversation_id') + event_types = data.get('event_types') + before_cursor = data.get('before_cursor') + limit = data.get('limit', 50) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + # Validate run session + session_registry = get_session_registry() + session = await session_registry.get(run_id) + if not session: + return handler.ActionResponse.error( + message=f'Run session {run_id} not found or expired' + ) + + # Validate caller plugin identity + if caller_plugin_identity: + session_plugin_identity = session.get('plugin_identity') + if session_plugin_identity and caller_plugin_identity != session_plugin_identity: + return handler.ActionResponse.error( + message=f'Plugin identity mismatch for run_id {run_id}' + ) + + # Get conversation from session if not provided + if not conversation_id: + conversation_id = session.get('conversation_id') + + if not conversation_id: + return handler.ActionResponse.success(data={ + 'items': [], + 'next_cursor': None, + 'prev_cursor': None, + 'has_more': False, + }) + + # Parse cursor + before_seq = int(before_cursor) if before_cursor else None + + # Query events + from ..agent.runner.event_log_store import EventLogStore + store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + + try: + items, next_seq, has_more = await store.page_events( + conversation_id=conversation_id, + event_types=event_types, + before_seq=before_seq, + limit=limit, + ) + + return handler.ActionResponse.success(data={ + 'items': items, + 'next_cursor': str(next_seq) if next_seq else None, + 'prev_cursor': None, + 'has_more': has_more, + }) + except Exception as e: + self.ap.logger.error(f'EVENT_PAGE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Event page error: {e}') + @self.action(CommonAction.PING) async def ping(data: dict[str, Any]) -> handler.ActionResponse: """Ping""" diff --git a/tests/unit_tests/agent/test_context_builder_params_state.py b/tests/unit_tests/agent/test_context_builder_params_state.py index 89ece4b9..45ffa200 100644 --- a/tests/unit_tests/agent/test_context_builder_params_state.py +++ b/tests/unit_tests/agent/test_context_builder_params_state.py @@ -420,9 +420,12 @@ class TestBuildParamsInContext: context = await builder.build_context(query, descriptor, resources) - assert 'params' in context - assert context['params']['public_param'] == 'value' - assert '_private' not in context['params'] + # Protocol v1: params is in compatibility.extra + assert 'compatibility' in context + assert 'extra' in context['compatibility'] + assert 'params' in context['compatibility']['extra'] + assert context['compatibility']['extra']['params']['public_param'] == 'value' + assert '_private' not in context['compatibility']['extra']['params'] @pytest.mark.asyncio async def test_params_and_state_both_present(self): @@ -454,10 +457,12 @@ class TestBuildParamsInContext: context = await builder.build_context(query, descriptor, resources) - # params should have public vars - assert 'params' in context - assert context['params']['workflow_input'] == 'user_question' - assert context['params']['sender_name'] == 'John' + # Protocol v1: params is in compatibility.extra + assert 'compatibility' in context + assert 'extra' in context['compatibility'] + assert 'params' in context['compatibility']['extra'] + assert context['compatibility']['extra']['params']['workflow_input'] == 'user_question' + assert context['compatibility']['extra']['params']['sender_name'] == 'John' # state should have seeded conversation_id assert 'state' in context @@ -490,6 +495,10 @@ class TestBuildParamsInContext: context = await builder.build_context(query, descriptor, resources) - assert context['prompt'][0]['content'] == 'Effective prompt' + # Protocol v1: prompt is in compatibility.extra + assert 'compatibility' in context + assert 'extra' in context['compatibility'] + assert 'prompt' in context['compatibility']['extra'] + assert context['compatibility']['extra']['prompt'][0]['content'] == 'Effective prompt' assert context['runtime']['metadata']['streaming_supported'] is True assert context['runtime']['metadata']['remove_think'] is True diff --git a/tests/unit_tests/agent/test_context_validation.py b/tests/unit_tests/agent/test_context_validation.py new file mode 100644 index 00000000..ffb641ff --- /dev/null +++ b/tests/unit_tests/agent/test_context_validation.py @@ -0,0 +1,232 @@ +"""Test that LangBot context builder output validates against SDK AgentRunContext.""" +from __future__ import annotations + +import pytest +from unittest.mock import MagicMock, AsyncMock, patch +import uuid + +# SDK imports for validation +from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext +from langbot_plugin.api.entities.builtin.agent_runner.event import AgentEventContext +from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext +from langbot_plugin.api.entities.builtin.agent_runner.context_access import ContextAccess +from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger +from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput +from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources +from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext +from langbot_plugin.api.entities.builtin.agent_runner.state import AgentRunState + +# LangBot imports +from langbot.pkg.agent.runner.context_builder import ( + AgentRunContextBuilder, + AgentTrigger as BuilderTrigger, + ConversationContext as BuilderConversation, + AgentInput as BuilderInput, + AgentRunState as BuilderState, + AgentResources as BuilderResources, + AgentRuntimeContext as BuilderRuntime, +) +from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding, BindingScope +from langbot.pkg.core import app + + +class TestContextValidation: + """Test that context builder output validates against SDK AgentRunContext.""" + + def _make_mock_app(self): + """Create a mock application.""" + mock_app = MagicMock(spec=app.Application) + mock_app.ver_mgr = MagicMock() + mock_app.ver_mgr.get_current_version = MagicMock(return_value="1.0.0") + mock_app.persistence_mgr = MagicMock() + mock_app.persistence_mgr.get_db_engine = MagicMock() + mock_app.logger = MagicMock() + return mock_app + + def _make_event_envelope(self) -> AgentEventEnvelope: + """Create a test event envelope.""" + from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext + from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput as EventInput + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + return AgentEventEnvelope( + event_id="evt_1", + event_type="message.received", + event_time=1700000000, + source="platform", + bot_id="bot_1", + workspace_id=None, + conversation_id="conv_1", + thread_id=None, + actor=ActorContext( + actor_type="user", + actor_id="user_1", + actor_name="Test User", + ), + subject=None, + input=EventInput(text="Hello world"), + delivery=DeliveryContext(surface="test"), + ) + + def _make_binding(self) -> AgentBinding: + """Create a test binding.""" + return AgentBinding( + binding_id="binding_1", + scope=BindingScope(scope_type="pipeline", scope_id="pipeline_1"), + event_types=["message.received"], + runner_id="plugin:test/plugin/runner", + runner_config={"timeout": 300}, + pipeline_uuid="pipeline_1", + enabled=True, + ) + + def _make_resources(self) -> BuilderResources: + """Create test resources.""" + return { + 'models': [], + 'tools': [], + 'knowledge_bases': [], + 'files': [], + 'storage': {'plugin_storage': True, 'workspace_storage': True}, + 'platform_capabilities': {}, + } + + def _make_descriptor(self): + """Create a mock runner descriptor.""" + descriptor = MagicMock() + descriptor.id = "plugin:test/plugin/runner" + descriptor.protocol_version = "1" + descriptor.permissions = { + 'history': ['page', 'search'], + 'events': ['get', 'page'], + } + return descriptor + + @pytest.mark.asyncio + async def test_build_context_from_event_validates(self): + """Test that build_context_from_event output validates against SDK AgentRunContext.""" + mock_app = self._make_mock_app() + builder = AgentRunContextBuilder(mock_app) + + event = self._make_event_envelope() + binding = self._make_binding() + resources = self._make_resources() + descriptor = self._make_descriptor() + + # Build context + context_dict = await builder.build_context_from_event( + event=event, + binding=binding, + descriptor=descriptor, + resources=resources, + ) + + # Validate it can be parsed by SDK AgentRunContext + # This will raise ValidationError if invalid + validated = AgentRunContext.model_validate(context_dict) + + # Verify required fields + assert validated.run_id is not None + assert validated.event is not None + assert isinstance(validated.event, AgentEventContext) + assert validated.delivery is not None + assert isinstance(validated.delivery, DeliveryContext) + assert validated.context is not None + assert isinstance(validated.context, ContextAccess) + assert validated.input is not None + assert isinstance(validated.input, AgentInput) + assert validated.resources is not None + assert isinstance(validated.resources, AgentResources) + assert validated.runtime is not None + assert isinstance(validated.runtime, AgentRuntimeContext) + + # Verify event context + assert validated.event.event_id == "evt_1" + assert validated.event.event_type == "message.received" + assert validated.event.source == "platform" + + # Verify delivery context + assert validated.delivery.surface == "test" + + # Verify input + assert validated.input.text == "Hello world" + + @pytest.mark.asyncio + async def test_build_context_from_event_has_no_legacy_top_level_fields(self): + """Test that build_context_from_event does NOT have top-level messages/prompt/params.""" + mock_app = self._make_mock_app() + builder = AgentRunContextBuilder(mock_app) + + event = self._make_event_envelope() + binding = self._make_binding() + resources = self._make_resources() + descriptor = self._make_descriptor() + + context_dict = await builder.build_context_from_event( + event=event, + binding=binding, + descriptor=descriptor, + resources=resources, + ) + + # Protocol v1 does NOT have these as core fields + assert 'messages' not in context_dict, "messages should not be top-level in Protocol v1" + assert 'prompt' not in context_dict, "prompt should not be top-level in Protocol v1" + assert 'params' not in context_dict, "params should not be top-level in Protocol v1" + + # Protocol v1 DOES have these + assert 'delivery' in context_dict, "delivery is REQUIRED in Protocol v1" + assert 'context' in context_dict, "context (ContextAccess) is REQUIRED in Protocol v1" + assert 'bootstrap' in context_dict, "bootstrap should exist (can be None)" + assert 'compatibility' in context_dict, "compatibility should exist" + assert 'metadata' in context_dict, "metadata should exist" + + @pytest.mark.asyncio + async def test_build_context_from_event_event_is_not_none(self): + """Test that event field is NOT None in Protocol v1.""" + mock_app = self._make_mock_app() + builder = AgentRunContextBuilder(mock_app) + + event = self._make_event_envelope() + binding = self._make_binding() + resources = self._make_resources() + descriptor = self._make_descriptor() + + context_dict = await builder.build_context_from_event( + event=event, + binding=binding, + descriptor=descriptor, + resources=resources, + ) + + # event is REQUIRED in Protocol v1 + assert context_dict.get('event') is not None, "event is REQUIRED for Protocol v1" + + # Validate + validated = AgentRunContext.model_validate(context_dict) + assert validated.event is not None + + @pytest.mark.asyncio + async def test_build_context_from_event_delivery_is_not_none(self): + """Test that delivery field is NOT None in Protocol v1.""" + mock_app = self._make_mock_app() + builder = AgentRunContextBuilder(mock_app) + + event = self._make_event_envelope() + binding = self._make_binding() + resources = self._make_resources() + descriptor = self._make_descriptor() + + context_dict = await builder.build_context_from_event( + event=event, + binding=binding, + descriptor=descriptor, + resources=resources, + ) + + # delivery is REQUIRED in Protocol v1 + assert context_dict.get('delivery') is not None, "delivery is REQUIRED for Protocol v1" + + # Validate + validated = AgentRunContext.model_validate(context_dict) + assert validated.delivery is not None diff --git a/tests/unit_tests/agent/test_event_first_protocol.py b/tests/unit_tests/agent/test_event_first_protocol.py new file mode 100644 index 00000000..132df809 --- /dev/null +++ b/tests/unit_tests/agent/test_event_first_protocol.py @@ -0,0 +1,431 @@ +"""Tests for event-first Protocol v1 entities and Pipeline compatibility adapter. + +Tests cover: +1. Pipeline Query -> AgentEventEnvelope conversion +2. Pipeline config -> AgentBinding conversion +3. AgentRunContext not inlining full history by default +4. Legacy max-round only affecting bootstrap/compat adapter +5. Event-first run() entry point +""" +from __future__ import annotations + +import pytest +from unittest.mock import Mock, MagicMock, patch +import typing + +# Import SDK entities +from langbot_plugin.api.entities.builtin.agent_runner.event import ( + AgentEventContext, + ConversationContext, + ActorContext, + SubjectContext, +) +from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput +from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger +from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext +from langbot_plugin.api.entities.builtin.agent_runner.result import ( + AgentRunResult, + AgentRunResultType, +) +from langbot_plugin.api.entities.builtin.agent_runner.capabilities import ( + AgentRunnerCapabilities, +) +from langbot_plugin.api.entities.builtin.agent_runner.permissions import ( + AgentRunnerPermissions, +) +from langbot_plugin.api.entities.builtin.agent_runner.context_policy import ( + AgentRunnerContextPolicy, +) +from langbot_plugin.api.entities.builtin.agent_runner.manifest import ( + AgentRunnerManifest, +) + +# Import LangBot host models +from langbot.pkg.agent.runner.host_models import ( + AgentEventEnvelope, + AgentBinding, + BindingScope, + ResourcePolicy, + StatePolicy, + DeliveryPolicy, +) +from langbot.pkg.agent.runner.pipeline_compat_adapter import PipelineCompatAdapter + + +class TestPipelineQueryToEventEnvelope: + """Test Pipeline Query -> AgentEventEnvelope conversion.""" + + def test_query_to_event_basic_fields(self, mock_query): + """Test basic field conversion from Query to Event envelope.""" + event = PipelineCompatAdapter.query_to_event(mock_query) + + assert event.event_type == "message.received" + assert event.source == "pipeline_compat" + assert event.bot_id == mock_query.bot_uuid + assert event.actor is not None + assert event.actor.actor_type == "user" + + def test_query_to_event_input(self, mock_query): + """Test input conversion from Query.""" + event = PipelineCompatAdapter.query_to_event(mock_query) + + assert event.input is not None + assert event.input.text == "Hello world" + + def test_query_to_event_conversation(self, mock_query): + """Test conversation context extraction.""" + event = PipelineCompatAdapter.query_to_event(mock_query) + + # Conversation may be None if no session + if event.conversation_id: + assert event.conversation_id is not None + + def test_query_to_event_delivery_context(self, mock_query): + """Test delivery context extraction.""" + event = PipelineCompatAdapter.query_to_event(mock_query) + + assert event.delivery is not None + assert event.delivery.surface == "platform" + assert isinstance(event.delivery.supports_streaming, bool) + + +class TestPipelineConfigToBinding: + """Test Pipeline config -> AgentBinding conversion.""" + + def test_config_to_binding_runner_id(self, mock_query): + """Test binding runner_id extraction.""" + binding = PipelineCompatAdapter.pipeline_config_to_binding( + mock_query, "plugin:author/plugin/runner" + ) + + assert binding.runner_id == "plugin:author/plugin/runner" + + def test_config_to_binding_scope(self, mock_query): + """Test binding scope extraction.""" + binding = PipelineCompatAdapter.pipeline_config_to_binding( + mock_query, "plugin:test/plugin/runner" + ) + + assert binding.scope.scope_type == "pipeline" + assert binding.scope.scope_id == mock_query.pipeline_uuid + + def test_config_to_binding_max_round(self, mock_query_with_max_round): + """Test max_round extraction for compatibility adapter.""" + binding = PipelineCompatAdapter.pipeline_config_to_binding( + mock_query_with_max_round, "plugin:test/plugin/runner" + ) + + # max_round should be captured but NOT in Protocol v1 entities + assert binding.max_round == 10 + + def test_config_to_binding_no_max_round(self, mock_query): + """Test binding without max_round.""" + binding = PipelineCompatAdapter.pipeline_config_to_binding( + mock_query, "plugin:test/plugin/runner" + ) + + # max_round may be None + assert binding.max_round is None + + +class TestAgentRunContextProtocolV1: + """Test AgentRunContext Protocol v1 behavior.""" + + def test_sdk_context_event_required(self): + """Test that event is required in Protocol v1 context.""" + trigger = AgentTrigger(type="message.received") + event = AgentEventContext( + event_id="evt_1", + event_type="message.received", + source="platform", + ) + input = AgentInput(text="Hello") + from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources + from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + ctx = AgentRunContext( + run_id="run_1", + trigger=trigger, + event=event, + input=input, + delivery=DeliveryContext(surface="platform"), + resources=AgentResources(), + runtime=AgentRuntimeContext(), + ) + + assert ctx.event is not None + assert ctx.event.event_type == "message.received" + + def test_sdk_context_messages_default_empty(self): + """Test that messages default to empty (not full history).""" + trigger = AgentTrigger(type="message.received") + event = AgentEventContext( + event_id="evt_1", + event_type="message.received", + source="platform", + ) + input = AgentInput(text="Hello") + from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources + from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + ctx = AgentRunContext( + run_id="run_1", + trigger=trigger, + event=event, + input=input, + delivery=DeliveryContext(surface="platform"), + resources=AgentResources(), + runtime=AgentRuntimeContext(), + ) + + # messages is now in bootstrap, not top-level + assert ctx.bootstrap is None or ctx.bootstrap.messages == [] + + def test_sdk_context_bootstrap_optional(self): + """Test that bootstrap is optional.""" + trigger = AgentTrigger(type="message.received") + event = AgentEventContext( + event_id="evt_1", + event_type="message.received", + source="platform", + ) + input = AgentInput(text="Hello") + from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources + from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + ctx = AgentRunContext( + run_id="run_1", + trigger=trigger, + event=event, + input=input, + delivery=DeliveryContext(surface="platform"), + resources=AgentResources(), + runtime=AgentRuntimeContext(), + ) + + # bootstrap is optional + assert ctx.bootstrap is None or isinstance(ctx.bootstrap.messages, list) + + +class TestLegacyMaxRoundNotInProtocol: + """Test that legacy max-round only affects compat adapter, not Protocol v1.""" + + def test_max_round_not_in_sdk_context(self): + """Test max-round is not a field in SDK AgentRunContext.""" + # AgentRunContext should not have max_round field + ctx_fields = AgentRunContext.model_fields.keys() + + assert "max_round" not in ctx_fields + assert "maxRound" not in ctx_fields + + def test_max_round_in_compatibility_context(self): + """Test max_round is in compatibility context, not main context.""" + trigger = AgentTrigger(type="message.received") + event = AgentEventContext( + event_id="evt_1", + event_type="message.received", + source="platform", + ) + input = AgentInput(text="Hello") + from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources + from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + from langbot_plugin.api.entities.builtin.agent_runner.context import CompatibilityContext + + compat = CompatibilityContext(max_round=10) + + ctx = AgentRunContext( + run_id="run_1", + trigger=trigger, + event=event, + input=input, + delivery=DeliveryContext(surface="platform"), + resources=AgentResources(), + runtime=AgentRuntimeContext(), + compatibility=compat, + ) + + # max_round is in compatibility context, not main context + assert ctx.compatibility is not None + assert ctx.compatibility.max_round == 10 + + def test_binding_max_round_for_adapter_only(self, mock_query_with_max_round): + """Test max_round in binding is for adapter use, not Protocol v1.""" + binding = PipelineCompatAdapter.pipeline_config_to_binding( + mock_query_with_max_round, "plugin:test/plugin/runner" + ) + + # max_round is in binding (Host-internal) for compat adapter + assert binding.max_round == 10 + + # But SDK entities don't have it + ctx_fields = AgentRunContext.model_fields.keys() + assert "max_round" not in ctx_fields + + +class TestSDKCapabilitiesProtocolV1: + """Test SDK capabilities for Protocol v1.""" + + def test_self_managed_context_default_true(self): + """Test self_managed_context defaults to True for Protocol v1.""" + caps = AgentRunnerCapabilities() + + assert caps.self_managed_context is True + + def test_event_context_default_true(self): + """Test event_context defaults to True for Protocol v1.""" + caps = AgentRunnerCapabilities() + + assert caps.event_context is True + + +class TestSDKPermissionsProtocolV1: + """Test SDK permissions for Protocol v1.""" + + def test_permissions_new_fields(self): + """Test new permission fields for Protocol v1.""" + perms = AgentRunnerPermissions( + models=["invoke", "stream", "rerank"], + tools=["detail", "call"], + knowledge_bases=["list", "retrieve"], + history=["page", "search"], + events=["get", "page"], + artifacts=["metadata", "read"], + storage=["plugin", "workspace", "binding"], + ) + + assert perms.history == ["page", "search"] + assert perms.events == ["get", "page"] + assert perms.artifacts == ["metadata", "read"] + assert perms.storage == ["plugin", "workspace", "binding"] + + +class TestSDKResultProtocolV1: + """Test SDK AgentRunResult for Protocol v1.""" + + def test_result_requires_run_id(self): + """Test result requires run_id for Protocol v1.""" + from langbot_plugin.api.entities.builtin.provider.message import Message + + result = AgentRunResult.message_completed( + run_id="run_1", + message=Message(role="assistant", content="Hello"), + ) + + assert result.run_id == "run_1" + + def test_artifact_created_result_type(self): + """Test artifact.created result type.""" + result = AgentRunResult.artifact_created( + run_id="run_1", + artifact_id="artifact_1", + artifact_type="image", + ) + + assert result.type == AgentRunResultType.ARTIFACT_CREATED + assert result.data["artifact_id"] == "artifact_1" + + +# Fixtures +@pytest.fixture +def mock_query(): + """Create a mock Pipeline Query for testing.""" + query = Mock() + query.query_id = 123 + query.bot_uuid = "bot-uuid-123" + query.pipeline_uuid = "pipeline-uuid-456" + query.launcher_type = Mock(value="person") + query.launcher_id = "launcher-123" + query.sender_id = "sender-123" + query.pipeline_config = { + "ai": { + "runner": "plugin:test/plugin/runner", + } + } + query.variables = {} + + # Create a proper content element mock + content_elem = Mock(spec=['type', 'text', 'model_dump']) + content_elem.type = 'text' + content_elem.text = 'Hello world' + content_elem.model_dump = Mock(return_value={'type': 'text', 'text': 'Hello world'}) + + query.user_message = Mock() + query.user_message.content = [content_elem] + + # Create message_chain mock + message_chain = Mock() + message_chain.message_id = 789 + message_chain.model_dump = Mock(return_value={'message_id': 789, 'components': []}) + query.message_chain = message_chain + + query.message_event = None + + # Mock session with proper conversation + query.session = Mock() + query.session.launcher_type = Mock(value="person") + query.session.launcher_id = "launcher-123" + query.session.using_conversation = Mock() + query.session.using_conversation.uuid = "conv-uuid-123" + + # Mock use_funcs (empty list by default) + query.use_funcs = [] + query.use_llm_model_uuid = None + + return query + + +@pytest.fixture +def mock_query_with_max_round(mock_query): + """Create a mock Query with max_round configuration.""" + mock_query.pipeline_config = { + "ai": { + "runner": "plugin:test/plugin/runner", + "max-round": 10, + } + } + return mock_query + + +@pytest.fixture +def mock_query_no_session(): + """Create a mock Query without session.""" + query = Mock() + query.query_id = 456 + query.bot_uuid = "bot-uuid-456" + query.pipeline_uuid = "pipeline-uuid-789" + query.launcher_type = Mock(value="person") + query.launcher_id = "launcher-456" + query.sender_id = "sender-456" + query.pipeline_config = { + "ai": { + "runner": "plugin:test/plugin/runner", + } + } + query.variables = {} + + # Create a proper content element mock + content_elem = Mock(spec=['type', 'text', 'model_dump']) + content_elem.type = 'text' + content_elem.text = 'Test message' + content_elem.model_dump = Mock(return_value={'type': 'text', 'text': 'Test message'}) + + query.user_message = Mock() + query.user_message.content = [content_elem] + + message_chain = Mock() + message_chain.message_id = -1 + message_chain.model_dump = Mock(return_value={'message_id': -1, 'components': []}) + query.message_chain = message_chain + + query.message_event = None + query.session = None + + # Mock use_funcs + query.use_funcs = [] + query.use_llm_model_uuid = None + + return query diff --git a/tests/unit_tests/agent/test_event_log_transcript.py b/tests/unit_tests/agent/test_event_log_transcript.py new file mode 100644 index 00000000..4b1d5fb1 --- /dev/null +++ b/tests/unit_tests/agent/test_event_log_transcript.py @@ -0,0 +1,324 @@ +"""Tests for EventLog, Transcript, and history/event APIs.""" +from __future__ import annotations + +import pytest +from unittest.mock import Mock, MagicMock, patch +import datetime + +from langbot.pkg.agent.runner.host_models import ( + AgentEventEnvelope, + AgentBinding, + BindingScope, + ResourcePolicy, + StatePolicy, + DeliveryPolicy, +) +from langbot.pkg.agent.runner.event_log_store import EventLogStore +from langbot.pkg.agent.runner.transcript_store import TranscriptStore +from langbot.pkg.agent.runner.session_registry import get_session_registry +from langbot_plugin.api.entities.builtin.agent_runner.event import ( + AgentEventContext, + ActorContext, +) +from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput +from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + +def make_event_envelope( + event_id: str = "evt_1", + event_type: str = "message.received", + conversation_id: str | None = "conv_1", + actor_id: str | None = "user_1", + input_text: str = "Hello", +) -> AgentEventEnvelope: + """Create a test event envelope.""" + return AgentEventEnvelope( + event_id=event_id, + event_type=event_type, + event_time=1700000000, + source="platform", + bot_id="bot_1", + workspace_id=None, + conversation_id=conversation_id, + thread_id=None, + actor=ActorContext( + actor_type="user", + actor_id=actor_id, + actor_name="Test User", + ) if actor_id else None, + subject=None, + input=AgentInput(text=input_text), + delivery=DeliveryContext(surface="test"), + ) + + +def make_binding(runner_id: str = "plugin:test/plugin/runner") -> AgentBinding: + """Create a test binding.""" + return AgentBinding( + binding_id="binding_1", + scope=BindingScope(scope_type="pipeline", scope_id="pipeline_1"), + event_types=["message.received"], + runner_id=runner_id, + runner_config={}, + resource_policy=ResourcePolicy(), + state_policy=StatePolicy(), + delivery_policy=DeliveryPolicy(), + enabled=True, + ) + + +class TestEventLogStore: + """Test EventLogStore operations.""" + + @pytest.mark.asyncio + async def test_append_event(self, mock_db_engine): + """Test appending an event to EventLog.""" + store = EventLogStore(mock_db_engine) + + event_id = await store.append_event( + event_id="evt_1", + event_type="message.received", + source="platform", + bot_id="bot_1", + conversation_id="conv_1", + actor_type="user", + actor_id="user_1", + input_summary="Hello world", + run_id="run_1", + runner_id="plugin:test/plugin/runner", + ) + + assert event_id == "evt_1" + + @pytest.mark.asyncio + async def test_append_event_truncates_input_summary(self, mock_db_engine): + """Test that long input summaries are truncated.""" + store = EventLogStore(mock_db_engine) + + long_text = "x" * 2000 + event_id = await store.append_event( + event_id="evt_2", + event_type="message.received", + source="platform", + input_summary=long_text, + ) + + assert event_id == "evt_2" + + @pytest.mark.asyncio + async def test_page_events_with_conversation_filter(self, mock_db_engine): + """Test paging events with conversation_id filter.""" + store = EventLogStore(mock_db_engine) + + items, next_seq, has_more = await store.page_events( + conversation_id="conv_1", + limit=10, + ) + + assert isinstance(items, list) + + +class TestTranscriptStore: + """Test TranscriptStore operations.""" + + @pytest.mark.asyncio + async def test_append_transcript(self, mock_db_engine): + """Test appending a transcript item.""" + store = TranscriptStore(mock_db_engine) + + transcript_id = await store.append_transcript( + transcript_id=None, # Auto-generate + event_id="evt_1", + conversation_id="conv_1", + role="user", + content="Hello", + ) + + assert transcript_id is not None + + @pytest.mark.asyncio + async def test_append_transcript_with_artifacts(self, mock_db_engine): + """Test appending transcript with artifact refs.""" + store = TranscriptStore(mock_db_engine) + + transcript_id = await store.append_transcript( + transcript_id=None, # Auto-generate + event_id="evt_2", + conversation_id="conv_1", + role="assistant", + content="Here's an image", + artifact_refs=[ + {"artifact_id": "art_1", "artifact_type": "image", "url": "http://example.com/img.png"} + ], + ) + + assert transcript_id is not None + + @pytest.mark.asyncio + async def test_page_transcript_backward(self, mock_db_engine): + """Test paging transcript backward (older items).""" + store = TranscriptStore(mock_db_engine) + + items, next_seq, prev_seq, has_more = await store.page_transcript( + conversation_id="conv_1", + limit=10, + direction="backward", + ) + + assert isinstance(items, list) + + @pytest.mark.asyncio + async def test_page_transcript_has_hard_limit(self, mock_db_engine): + """Test that transcript paging has a hard limit.""" + store = TranscriptStore(mock_db_engine) + + # Request more than the hard limit + items, next_seq, prev_seq, has_more = await store.page_transcript( + conversation_id="conv_1", + limit=200, # Request 200, but hard limit is 100 + ) + + # The store should cap at 100 + assert len(items) <= store.HARD_LIMIT + + @pytest.mark.asyncio + async def test_search_transcript(self, mock_db_engine): + """Test searching transcript.""" + store = TranscriptStore(mock_db_engine) + + items = await store.search_transcript( + conversation_id="conv_1", + query_text="database", + top_k=10, + ) + + assert isinstance(items, list) + + +class TestHistoryPageAuthorization: + """Test history.page authorization.""" + + @pytest.mark.asyncio + async def test_history_page_requires_run_id(self, mock_handler, mock_db_engine): + """Test history.page requires run_id.""" + from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction + + # Mock call_action to simulate the handler + result = await mock_handler.call_action( + PluginToRuntimeAction.HISTORY_PAGE, + {"run_id": None}, + ) + + # Should return error + assert result.get("ok") is False or "error" in str(result).lower() + + @pytest.mark.asyncio + async def test_history_page_validates_conversation_scope(self, mock_db_engine): + """Test history.page only allows access to run's conversation.""" + # This test verifies the authorization logic + # The actual implementation validates conversation_id matches session + session_registry = get_session_registry() + + await session_registry.register( + run_id="run_1", + runner_id="plugin:test/plugin/runner", + query_id=None, + plugin_identity="test/plugin", + resources={"models": [], "tools": [], "knowledge_bases": [], "storage": {"plugin_storage": True}}, + conversation_id="conv_1", + ) + + session = await session_registry.get("run_1") + assert session is not None + assert session["conversation_id"] == "conv_1" + + # Cleanup + await session_registry.unregister("run_1") + + +class TestEventGetAuthorization: + """Test event.get authorization.""" + + @pytest.mark.asyncio + async def test_event_get_requires_run_id(self, mock_handler): + """Test event.get requires run_id.""" + from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction + + result = await mock_handler.call_action( + PluginToRuntimeAction.EVENT_GET, + {"run_id": None, "event_id": "evt_1"}, + ) + + # Should return error + assert result.get("ok") is False or "error" in str(result).lower() + + +class TestContextAccessPopulation: + """Test ContextAccess population in build_context_from_event.""" + + @pytest.mark.asyncio + async def test_context_access_has_history_apis_when_permitted(self, mock_db_engine): + """Test ContextAccess shows available APIs based on permissions.""" + # This would test the context builder logic + # For now we verify the store methods work + store = TranscriptStore(mock_db_engine) + + cursor = await store.get_latest_cursor("conv_1") + # Should return None or a cursor string + assert cursor is None or isinstance(cursor, str) + + @pytest.mark.asyncio + async def test_context_access_shows_has_history_before(self, mock_db_engine): + """Test ContextAccess indicates if history exists.""" + store = TranscriptStore(mock_db_engine) + + has_history = await store.has_history_before("conv_1", 10) + assert isinstance(has_history, bool) + + +# Fixtures +@pytest.fixture +def mock_db_engine(): + """Create a mock database engine.""" + from unittest.mock import MagicMock, AsyncMock + from sqlalchemy.ext.asyncio import AsyncEngine + + engine = MagicMock(spec=AsyncEngine) + + # Mock connection + mock_conn = MagicMock() + mock_result = MagicMock() + mock_result.fetchone.return_value = None + mock_result.fetchall.return_value = [] + mock_result.scalar.return_value = 0 + mock_conn.execute = AsyncMock(return_value=mock_result) + mock_conn.commit = AsyncMock() + + # Create async context manager for connect() + class AsyncConnectContextManager: + async def __aenter__(self): + return mock_conn + async def __aexit__(self, *args): + pass + + # connect() should return an async context manager + engine.connect = MagicMock(return_value=AsyncConnectContextManager()) + return engine + + +@pytest.fixture +def mock_handler(): + """Create a mock handler for testing actions.""" + from langbot_plugin.runtime.io.handler import Handler, ActionResponse + + class MockHandler(Handler): + def __init__(self): + self._responses = {} + + async def call_action(self, action, data, timeout=30): + # Simulate error response for missing run_id + if not data.get("run_id"): + return {"ok": False, "message": "run_id is required"} + return {"ok": True, "data": {}} + + return MockHandler() diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index 122824a5..c8618982 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -288,7 +288,8 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(): context = plugin_connector.contexts[0] assert context["config"]["timeout"] == 30 assert context["runtime"]["deadline_at"] is not None - assert context["params"] == {"public_param": "visible"} + # Protocol v1: params is in compatibility.extra + assert context["compatibility"]["extra"]["params"] == {"public_param": "visible"} assert context["event"]["event_type"] == "message.received" assert context["event"]["event_data"]["source_event_type"] == "FriendMessage" assert context["actor"]["actor_id"] == "user_001" @@ -337,7 +338,16 @@ async def test_orchestrator_packages_legacy_max_round_without_mutating_query(): assert len(messages) == 1 context = plugin_connector.contexts[0] - assert [message["content"] for message in context["messages"]] == [ + # Protocol v1: legacy messages are in bootstrap.messages + assert context["bootstrap"] is not None + assert [message["content"] for message in context["bootstrap"]["messages"]] == [ + "message 2", + "response 2", + "message 3", + "response 3", + ] + # Also in compatibility.legacy_messages for legacy runners + assert [message["content"] for message in context["compatibility"]["legacy_messages"]] == [ "message 2", "response 2", "message 3",