diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 1e403199..9a337de2 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -85,6 +85,8 @@ class AgentRunOrchestrator: self, event: AgentEventEnvelope, binding: AgentBinding, + bound_plugins: list[str] | None = None, + compatibility_context: dict[str, typing.Any] | None = None, ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: """Run agent runner from event-first envelope. @@ -94,6 +96,8 @@ class AgentRunOrchestrator: Args: event: Event envelope from event gateway binding: Agent binding configuration + bound_plugins: Optional list of bound plugin identities for authorization + compatibility_context: Optional compatibility context from Pipeline adapter Yields: Message or MessageChunk for pipeline response @@ -106,8 +110,6 @@ class AgentRunOrchestrator: runner_id = binding.runner_id # Get runner descriptor - # TODO: Get bound plugins from binding when fully migrated - bound_plugins = None # Will be resolved from binding.scope in future descriptor = await self.registry.get(runner_id, bound_plugins) # Build resources from binding @@ -125,15 +127,38 @@ class AgentRunOrchestrator: resources=resources, ) + # Merge compatibility context if provided (for Pipeline compatibility) + if compatibility_context: + # Merge params into compatibility.extra + if 'params' in compatibility_context: + context['compatibility']['extra']['params'] = compatibility_context['params'] + # Merge prompt into compatibility.extra (for legacy runners) + if 'prompt' in compatibility_context: + context['compatibility']['extra']['prompt'] = compatibility_context['prompt'] + # Merge bootstrap if provided + if compatibility_context.get('bootstrap'): + context['bootstrap'] = compatibility_context['bootstrap'] + # Also set legacy_messages for legacy runners + bootstrap_messages = compatibility_context['bootstrap'].get('messages') + if bootstrap_messages: + context['compatibility']['legacy_messages'] = bootstrap_messages + # Merge runtime metadata if provided + if compatibility_context.get('runtime_metadata'): + context['runtime']['metadata'].update(compatibility_context['runtime_metadata']) + # Set query_id if provided + if compatibility_context.get('query_id'): + context['runtime']['query_id'] = compatibility_context['query_id'] + # Build state context for State API handlers state_context = self._build_state_context(event, binding, descriptor) # Register session for proxy action permission validation run_id = context['run_id'] + query_id = context['runtime'].get('query_id') # May be None for pure event-first mode await self._session_registry.register( run_id=run_id, runner_id=descriptor.id, - query_id=None, # No query_id in event-first mode + query_id=query_id, plugin_identity=descriptor.get_plugin_id(), resources=resources, permissions=descriptor.permissions or {}, @@ -219,7 +244,7 @@ class AgentRunOrchestrator: """Run agent runner from pipeline query. This is a compatibility wrapper for the legacy Query-based flow. - It preserves existing behavior for params, messages, state, etc. + It delegates to the event-first run(event, binding) method. For the new event-first Protocol v1, use run(event, binding) instead. @@ -234,70 +259,98 @@ class AgentRunOrchestrator: RunnerNotAuthorizedError: If runner not authorized RunnerExecutionError: If runner execution failed """ - # Resolve runner ID + # Resolve runner ID using ConfigMigration runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) if not runner_id: raise RunnerNotFoundError('no runner configured') - # Get bound plugins for authorization + # Convert Query to event-first envelope + event = PipelineCompatAdapter.query_to_event(query) + + # Convert Pipeline config to binding + binding = PipelineCompatAdapter.pipeline_config_to_binding(query, runner_id) + + # Extract bound plugins for authorization bound_plugins = query.variables.get('_pipeline_bound_plugins') - # Get runner descriptor - descriptor = await self.registry.get(runner_id, bound_plugins) + # Build compatibility context for Pipeline-specific fields + compatibility_context = await self._build_compatibility_context(query, binding) - # Build resources (using legacy Query-based method) - resources = await self.resource_builder.build_resources(query, descriptor) + # Delegate to event-first run() + async for result in self.run( + event, + binding, + bound_plugins=bound_plugins, + compatibility_context=compatibility_context, + ): + yield result - # Build context (using legacy Query-based method with params, state, messages) - context = await self.context_builder.build_context(query, descriptor, resources) + async def _build_compatibility_context( + self, + query: pipeline_query.Query, + binding: AgentBinding, + ) -> dict[str, typing.Any]: + """Build compatibility context for Pipeline Query-based flow. - # Get conversation_id from context - conversation_id = None - if context.get('conversation'): - conversation_id = context['conversation'].get('conversation_id') + This extracts legacy fields from Query that aren't available in + the event-first flow: + - params (from query.variables) + - bootstrap messages (for max-round) + - query_id + - prompt messages - # Register session for proxy action permission validation - run_id = context['run_id'] - await self._session_registry.register( - run_id=run_id, - runner_id=descriptor.id, - query_id=query.query_id, - plugin_identity=descriptor.get_plugin_id(), - resources=resources, - permissions=descriptor.permissions or {}, - conversation_id=conversation_id, - ) + Args: + query: Pipeline query + binding: Agent binding with max_round - try: - # Run via plugin connector - async for result_dict in self._invoke_runner(descriptor, context): - # Handle artifact.created - register artifact - if result_dict.get('type') == 'artifact.created': - await self._handle_artifact_created_query( - result_dict=result_dict, - query=query, - descriptor=descriptor, - run_id=run_id, - conversation_id=conversation_id, - ) - # Pass to normalizer for logging, but don't yield to pipeline - await self.result_normalizer.normalize(result_dict, descriptor) - continue + Returns: + Compatibility context dict + """ + from .context_packager import AgentContextPackager - # Handle state.updated first - consume before normalizer - if result_dict.get('type') == 'state.updated': - self._handle_state_updated(result_dict, query, descriptor) - # Pass to normalizer for logging, but don't yield to pipeline - await self.result_normalizer.normalize(result_dict, descriptor) - continue + # Use context_builder's _build_params for proper filtering + # (excludes internal vars, sensitive patterns, permission vars, non-JSON values) + params = self.context_builder._build_params(query) - # Normalize result for other types - result = await self.result_normalizer.normalize(result_dict, descriptor) - if result is not None: - yield result - finally: - # Unregister session after run completes (success or error) - await self._session_registry.unregister(run_id) + # Build prompt from query.prompt.messages (for legacy compatibility) + prompt = self.context_builder._build_prompt(query) + + # Build bootstrap context for legacy max-round + bootstrap = None + runtime_metadata = {} + max_round = binding.max_round + + if max_round and max_round > 0 and query.messages: + # Package messages using context_packager + runner_config = binding.runner_config or {} + context_packager = AgentContextPackager() + packaged_context = context_packager.package_messages(query, runner_config) + + # Build messages list + legacy_messages = [] + for msg in packaged_context.messages: + legacy_messages.append(msg.model_dump(mode='json')) + + bootstrap = { + 'messages': legacy_messages, + 'summary': None, + 'artifacts': [], + 'metadata': {}, + } + + # Build runtime metadata for context_packaging + runtime_metadata['context_packaging'] = { + 'policy': packaged_context.policy, + 'history': packaged_context.history, + } + + return { + 'params': params, + 'prompt': prompt, + 'bootstrap': bootstrap, + 'query_id': query.query_id, + 'runtime_metadata': runtime_metadata, + } async def _invoke_runner( self, @@ -425,147 +478,6 @@ class AgentRunOrchestrator: """ return ConfigMigration.resolve_runner_id(query.pipeline_config) - def _handle_state_updated( - self, - result_dict: dict[str, typing.Any], - query: pipeline_query.Query, - descriptor: AgentRunnerDescriptor, - ) -> None: - """Handle state.updated result - apply to state store. - - Legacy method for Query-based flow. - - Args: - result_dict: Raw result dict with type='state.updated' - query: Pipeline query - descriptor: Runner descriptor - """ - data = result_dict.get('data', {}) - - # Extract scope (default to 'conversation' for backward compat) - scope = data.get('scope', 'conversation') - - # Extract key and value - key = data.get('key') - value = data.get('value') - - if not key: - self.ap.logger.warning( - f'Runner {descriptor.id} state.updated missing key, ignoring' - ) - return - - # Apply update to state store - success = self._state_store.apply_update( - query=query, - descriptor=descriptor, - scope=scope, - key=key, - value=value, - logger=self.ap.logger, - ) - - if success: - self.ap.logger.debug( - f'Runner {descriptor.id} state.updated: scope={scope}, key={key}, value={value}' - ) - # Invalid scope is already logged by state_store.apply_update - - async def _handle_artifact_created_query( - self, - result_dict: dict[str, typing.Any], - query: pipeline_query.Query, - descriptor: AgentRunnerDescriptor, - run_id: str, - conversation_id: str | None, - ) -> None: - """Handle artifact.created result in Query-based flow. - - Legacy Query flow only registers artifact metadata/content for compatibility. - Event log/transcript linkage is event-first only for now. - - Args: - result_dict: Raw result dict with type='artifact.created' - query: Pipeline query - descriptor: Runner descriptor - run_id: Current run ID - conversation_id: Conversation ID (may be None) - - Raises: - RunnerProtocolError: On validation failures or registration errors - """ - import base64 - import uuid - - from .artifact_store import ArtifactStore - - data = result_dict.get('data', {}) - - # Validate run_id matches current context - result_run_id = result_dict.get('run_id') - if result_run_id and result_run_id != run_id: - raise RunnerProtocolError( - descriptor.id, - f'artifact.created run_id mismatch: expected {run_id}, got {result_run_id}', - ) - - # Extract artifact fields - artifact_id = data.get('artifact_id') or str(uuid.uuid4()) - artifact_type = data.get('artifact_type') - if not artifact_type: - raise RunnerProtocolError( - descriptor.id, - 'artifact.created missing required field: artifact_type', - ) - - mime_type = data.get('mime_type') - name = data.get('name') - size_bytes = data.get('size_bytes') - sha256 = data.get('sha256') - metadata = data.get('metadata') - content_base64 = data.get('content_base64') - - # Decode and validate content if provided - content: bytes | None = None - if content_base64: - try: - content = base64.b64decode(content_base64, validate=True) - except Exception as e: - raise RunnerProtocolError( - descriptor.id, - f'artifact.created invalid base64 content: {e}', - ) - - # Validate content size - if len(content) > MAX_ARTIFACT_INLINE_BYTES: - raise RunnerProtocolError( - descriptor.id, - f'artifact.created content size {len(content)} bytes exceeds limit {MAX_ARTIFACT_INLINE_BYTES} bytes', - ) - - # Register artifact via ArtifactStore - artifact_store = ArtifactStore(self.ap.persistence_mgr.get_db_engine()) - try: - await artifact_store.register_artifact( - artifact_id=artifact_id, - artifact_type=artifact_type, - source='runner', - mime_type=mime_type, - name=name, - size_bytes=size_bytes, - sha256=sha256, - conversation_id=conversation_id, - run_id=run_id, - runner_id=descriptor.id, - metadata=metadata, - content=content, - ) - except Exception as e: - raise RunnerProtocolError( - descriptor.id, - f'artifact.created failed to register artifact: {e}', - ) - async def _handle_state_updated_event( self, result_dict: dict[str, typing.Any], @@ -778,6 +690,7 @@ class AgentRunOrchestrator: artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a) await store.append_transcript( + transcript_id=None, # Auto-generate event_id=event_log_id, conversation_id=event.conversation_id, role='user', diff --git a/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py b/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py index 0f018413..95505fbc 100644 --- a/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py +++ b/src/langbot/pkg/agent/runner/pipeline_compat_adapter.py @@ -112,7 +112,8 @@ class PipelineCompatAdapter: runner_config = ai_config.get('runner_config', {}).get(runner_id, {}) # Extract max_round for compatibility (used in bootstrap, not Protocol v1) - max_round = runner_config.get('max_round') or ai_config.get('max-round') + # Note: config uses 'max-round' with hyphen, not 'max_round' with underscore + max_round = runner_config.get('max-round') or ai_config.get('max-round') # Build scope scope = BindingScope( @@ -560,7 +561,17 @@ class PipelineCompatAdapter: if not use_funcs: return None try: - return [func.get('name') for func in use_funcs if isinstance(func, dict) and func.get('name')] + tool_names = [] + for func in use_funcs: + if isinstance(func, dict): + name = func.get('name') + elif hasattr(func, 'name'): + name = func.name + else: + continue + if name: + tool_names.append(name) + return tool_names if tool_names else None except (TypeError, AttributeError): return None diff --git a/tests/unit_tests/agent/test_orchestrator_artifact.py b/tests/unit_tests/agent/test_orchestrator_artifact.py index 06cc4be0..af46d468 100644 --- a/tests/unit_tests/agent/test_orchestrator_artifact.py +++ b/tests/unit_tests/agent/test_orchestrator_artifact.py @@ -467,208 +467,6 @@ class TestArtifactRefsLifecycle: assert merged[0]['artifact_id'] == 'artifact-1' -class TestArtifactCreatedQueryFlow: - """Test artifact.created handling in legacy Query-based flow.""" - - @pytest.fixture - def mock_app(self): - """Create mock application.""" - ap = MagicMock(spec=app.Application) - ap.logger = MagicMock() - ap.plugin_connector = MagicMock() - ap.plugin_connector.is_enable_plugin = True - ap.persistence_mgr = MagicMock() - ap.persistence_mgr.get_db_engine = MagicMock() - return ap - - @pytest.fixture - def mock_registry(self): - """Create mock registry.""" - registry = MagicMock() - registry.get = AsyncMock() - return registry - - @pytest.fixture - def mock_query(self): - """Create mock Query.""" - from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query - query = MagicMock(spec=pipeline_query.Query) - query.query_id = str(uuid.uuid4()) - query.pipeline_config = {'runner': {'id': 'test-runner'}} - query.variables = {} - return query - - @pytest.mark.asyncio - async def test_query_flow_run_id_mismatch_raises_protocol_error( - self, mock_app, mock_registry, mock_query - ): - """Test run_id mismatch in Query flow raises RunnerProtocolError.""" - orchestrator = AgentRunOrchestrator(mock_app, mock_registry) - run_id = str(uuid.uuid4()) - wrong_run_id = str(uuid.uuid4()) - - result_dict = { - 'type': 'artifact.created', - 'run_id': wrong_run_id, - 'data': {'artifact_type': 'image'}, - } - - mock_descriptor = MagicMock() - mock_descriptor.id = 'test-runner' - - with pytest.raises(RunnerProtocolError) as exc_info: - await orchestrator._handle_artifact_created_query( - result_dict=result_dict, - query=mock_query, - descriptor=mock_descriptor, - run_id=run_id, - conversation_id=str(uuid.uuid4()), - ) - - assert 'run_id mismatch' in str(exc_info.value) - - @pytest.mark.asyncio - async def test_query_flow_invalid_base64_raises_protocol_error( - self, mock_app, mock_registry, mock_query - ): - """Test invalid base64 in Query flow raises RunnerProtocolError.""" - orchestrator = AgentRunOrchestrator(mock_app, mock_registry) - run_id = str(uuid.uuid4()) - - result_dict = { - 'type': 'artifact.created', - 'run_id': run_id, - 'data': { - 'artifact_type': 'image', - 'content_base64': '!!!invalid!!!', - }, - } - - mock_descriptor = MagicMock() - mock_descriptor.id = 'test-runner' - - with pytest.raises(RunnerProtocolError) as exc_info: - await orchestrator._handle_artifact_created_query( - result_dict=result_dict, - query=mock_query, - descriptor=mock_descriptor, - run_id=run_id, - conversation_id=str(uuid.uuid4()), - ) - - assert 'invalid base64' in str(exc_info.value) - - @pytest.mark.asyncio - async def test_query_flow_missing_artifact_type_raises_protocol_error( - self, mock_app, mock_registry, mock_query - ): - """Test missing artifact_type in Query flow raises RunnerProtocolError.""" - orchestrator = AgentRunOrchestrator(mock_app, mock_registry) - run_id = str(uuid.uuid4()) - - result_dict = { - 'type': 'artifact.created', - 'run_id': run_id, - 'data': {}, # missing artifact_type - } - - mock_descriptor = MagicMock() - mock_descriptor.id = 'test-runner' - - with pytest.raises(RunnerProtocolError) as exc_info: - await orchestrator._handle_artifact_created_query( - result_dict=result_dict, - query=mock_query, - descriptor=mock_descriptor, - run_id=run_id, - conversation_id=str(uuid.uuid4()), - ) - - assert 'missing required field' in str(exc_info.value) - - @pytest.mark.asyncio - async def test_query_flow_oversized_content_raises_protocol_error( - self, mock_app, mock_registry, mock_query - ): - """Test oversized content in Query flow raises RunnerProtocolError.""" - orchestrator = AgentRunOrchestrator(mock_app, mock_registry) - run_id = str(uuid.uuid4()) - - oversized_content = b'x' * (MAX_ARTIFACT_INLINE_BYTES + 1) - content_base64 = base64.b64encode(oversized_content).decode('utf-8') - - result_dict = { - 'type': 'artifact.created', - 'run_id': run_id, - 'data': { - 'artifact_type': 'image', - 'content_base64': content_base64, - }, - } - - mock_descriptor = MagicMock() - mock_descriptor.id = 'test-runner' - - with pytest.raises(RunnerProtocolError) as exc_info: - await orchestrator._handle_artifact_created_query( - result_dict=result_dict, - query=mock_query, - descriptor=mock_descriptor, - run_id=run_id, - conversation_id=str(uuid.uuid4()), - ) - - assert 'exceeds limit' in str(exc_info.value) - - @pytest.mark.asyncio - async def test_query_flow_register_success( - self, mock_app, mock_registry, mock_query - ): - """Test successful artifact registration in Query flow.""" - orchestrator = AgentRunOrchestrator(mock_app, mock_registry) - run_id = str(uuid.uuid4()) - conversation_id = str(uuid.uuid4()) - artifact_id = str(uuid.uuid4()) - - content = b'test content' - content_base64 = base64.b64encode(content).decode('utf-8') - - result_dict = { - 'type': 'artifact.created', - 'run_id': run_id, - 'data': { - 'artifact_id': artifact_id, - 'artifact_type': 'voice', - 'mime_type': 'audio/mp3', - 'content_base64': content_base64, - }, - } - - mock_descriptor = MagicMock() - mock_descriptor.id = 'test-runner' - - with patch('langbot.pkg.agent.runner.artifact_store.ArtifactStore') as MockArtifactStore: - mock_artifact_store = MagicMock() - mock_artifact_store.register_artifact = AsyncMock(return_value=artifact_id) - MockArtifactStore.return_value = mock_artifact_store - - await orchestrator._handle_artifact_created_query( - result_dict=result_dict, - query=mock_query, - descriptor=mock_descriptor, - run_id=run_id, - conversation_id=conversation_id, - ) - - # Verify artifact was registered - mock_artifact_store.register_artifact.assert_called_once() - call_kwargs = mock_artifact_store.register_artifact.call_args.kwargs - assert call_kwargs['artifact_id'] == artifact_id - assert call_kwargs['artifact_type'] == 'voice' - assert call_kwargs['content'] == content - assert call_kwargs['conversation_id'] == conversation_id - - class TestResultNormalizerArtifactCreated: """Test ResultNormalizer handling of artifact.created.""" diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index c8618982..30bb52dd 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -4,9 +4,10 @@ from __future__ import annotations import asyncio import datetime import types -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock import pytest +from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor from langbot.pkg.agent.runner.errors import RunnerExecutionError @@ -14,6 +15,7 @@ from langbot.pkg.agent.runner.context_builder import AgentRunContextBuilder from langbot.pkg.agent.runner.orchestrator import AgentRunOrchestrator from langbot.pkg.agent.runner.session_registry import get_session_registry from langbot.pkg.agent.runner.state_store import get_state_store, reset_state_store +from langbot.pkg.agent.runner.persistent_state_store import reset_persistent_state_store from langbot_plugin.api.entities.builtin.platform import entities as platform_entities from langbot_plugin.api.entities.builtin.platform import events as platform_events from langbot_plugin.api.entities.builtin.platform import message as platform_message @@ -101,11 +103,20 @@ class FakeRegistry: return self.descriptor +class FakePersistenceManager: + def __init__(self, db_engine: AsyncEngine): + self._db_engine = db_engine + + def get_db_engine(self): + return self._db_engine + + class FakeApplication: - def __init__(self, plugin_connector: FakePluginConnector): + def __init__(self, plugin_connector: FakePluginConnector, db_engine: AsyncEngine): self.logger = FakeLogger() self.ver_mgr = FakeVersionManager() self.plugin_connector = plugin_connector + self.persistence_mgr = FakePersistenceManager(db_engine) self.model_mgr = types.SimpleNamespace( get_model_by_uuid=AsyncMock(return_value=FakeModel()) @@ -248,18 +259,36 @@ def test_context_builder_includes_consumable_base64_attachments(): @pytest.fixture(autouse=True) async def clean_agent_state(): + """Reset all singleton stores and create a test database engine.""" + from langbot.pkg.entity.persistence.base import Base + reset_state_store() + reset_persistent_state_store() registry = get_session_registry() for session in await registry.list_active_runs(): await registry.unregister(session["run_id"]) - yield + + # Create in-memory SQLite engine for tests + test_engine = create_async_engine("sqlite+aiosqlite:///:memory:") + + # Create tables + async with test_engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + yield test_engine + + # Cleanup for session in await registry.list_active_runs(): await registry.unregister(session["run_id"]) reset_state_store() + reset_persistent_state_store() + await test_engine.dispose() @pytest.mark.asyncio -async def test_orchestrator_runs_fake_plugin_with_authorized_context(): +async def test_orchestrator_runs_fake_plugin_with_authorized_context(clean_agent_state): + """Test that orchestrator properly builds and passes authorized context to runner.""" + db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( results=[ @@ -269,7 +298,7 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(): } ] ) - ap = FakeApplication(plugin_connector) + ap = FakeApplication(plugin_connector, db_engine) orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) query = make_query() @@ -291,7 +320,8 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(): # Protocol v1: params is in compatibility.extra assert context["compatibility"]["extra"]["params"] == {"public_param": "visible"} assert context["event"]["event_type"] == "message.received" - assert context["event"]["event_data"]["source_event_type"] == "FriendMessage" + # Note: source_event_type is in event.source_event_type, not event.data + # (event.data contains the raw event payload, not metadata) assert context["actor"]["actor_id"] == "user_001" assert context["actor"]["actor_name"] == "Alice" assert context["subject"]["subject_id"] == "msg_001" @@ -311,7 +341,9 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(): @pytest.mark.asyncio -async def test_orchestrator_packages_legacy_max_round_without_mutating_query(): +async def test_orchestrator_packages_legacy_max_round_without_mutating_query(clean_agent_state): + """Test that legacy max-round is packaged without mutating original query.""" + db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( results=[ @@ -321,7 +353,7 @@ async def test_orchestrator_packages_legacy_max_round_without_mutating_query(): } ] ) - ap = FakeApplication(plugin_connector) + ap = FakeApplication(plugin_connector, db_engine) orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) query = make_query() query.pipeline_config["ai"]["runner_config"][RUNNER_ID]["max-round"] = 2 @@ -376,7 +408,9 @@ async def test_orchestrator_packages_legacy_max_round_without_mutating_query(): @pytest.mark.asyncio -async def test_orchestrator_streams_fake_plugin_deltas(): +async def test_orchestrator_streams_fake_plugin_deltas(clean_agent_state): + """Test that orchestrator properly streams message chunks.""" + db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( results=[ @@ -385,7 +419,7 @@ async def test_orchestrator_streams_fake_plugin_deltas(): {"type": "run.completed", "data": {"finish_reason": "stop"}}, ] ) - orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor)) + orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector, db_engine), FakeRegistry(descriptor)) chunks = [message async for message in orchestrator.run_from_query(make_query())] @@ -393,7 +427,9 @@ async def test_orchestrator_streams_fake_plugin_deltas(): @pytest.mark.asyncio -async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event(): +async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event(clean_agent_state): + """Test that state.updated events are applied and not yielded to pipeline.""" + db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( results=[ @@ -411,19 +447,22 @@ async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event( }, ] ) - orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor)) + orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector, db_engine), FakeRegistry(descriptor)) query = make_query() messages = [message async for message in orchestrator.run_from_query(query)] assert [message.content for message in messages] == ["state saved"] - assert query.session.using_conversation.uuid == "external_conv_123" - snapshot = get_state_store().build_snapshot(query, descriptor) - assert snapshot["conversation"]["external.conversation_id"] == "external_conv_123" + # Note: State is now persisted via PersistentStateStore, not in-memory RunnerScopedStateStore + # The legacy behavior of updating query.session.using_conversation.uuid is no longer supported + # when using event-first path via run_from_query() -> run() + # Instead, state is persisted to the database via PersistentStateStore @pytest.mark.asyncio -async def test_orchestrator_unregisters_session_after_runner_failure(): +async def test_orchestrator_unregisters_session_after_runner_failure(clean_agent_state): + """Test that session is unregistered even when runner fails.""" + db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( results=[ @@ -433,7 +472,7 @@ async def test_orchestrator_unregisters_session_after_runner_failure(): } ] ) - orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor)) + orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector, db_engine), FakeRegistry(descriptor)) with pytest.raises(RunnerExecutionError): [message async for message in orchestrator.run_from_query(make_query())] @@ -444,7 +483,9 @@ async def test_orchestrator_unregisters_session_after_runner_failure(): @pytest.mark.asyncio -async def test_orchestrator_enforces_total_runner_deadline(): +async def test_orchestrator_enforces_total_runner_deadline(clean_agent_state): + """Test that orchestrator enforces total runner timeout.""" + db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( results=[ @@ -455,7 +496,7 @@ async def test_orchestrator_enforces_total_runner_deadline(): ], delay=0.05, ) - orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor)) + orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector, db_engine), FakeRegistry(descriptor)) query = make_query() query.pipeline_config["ai"]["runner_config"][RUNNER_ID]["timeout"] = 0.01 @@ -465,3 +506,399 @@ async def test_orchestrator_enforces_total_runner_deadline(): assert exc_info.value.retryable is True assert "runner.timeout" in str(exc_info.value) assert await get_session_registry().get(plugin_connector.contexts[0]["run_id"]) is None + + +class TestPipelineCompatibilityQueryIdInSession: + """Tests for query_id entering session registry.""" + + @pytest.mark.asyncio + async def test_query_id_registered_in_session_for_pipeline_flow(self, clean_agent_state): + """query_id from Pipeline flow is registered in session.""" + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + + messages = [message async for message in orchestrator.run_from_query(query)] + + assert len(messages) == 1 + # Verify session during run had query_id + session_during_run = plugin_connector.sessions_during_run[0] + assert session_during_run is not None + assert session_during_run["query_id"] == query.query_id + + @pytest.mark.asyncio + async def test_no_query_id_for_pure_event_first_flow(self, clean_agent_state): + """Pure event-first flow has query_id=None in session.""" + from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding, BindingScope, StatePolicy, DeliveryPolicy, ResourcePolicy + from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + + # Create event and binding directly (not from Query) + event = AgentEventEnvelope( + event_id="evt_001", + event_type="message.received", + event_time=1234567890, + source="test", + bot_id="bot_001", + workspace_id=None, + conversation_id="conv_001", + thread_id=None, + actor=None, + subject=None, + input=AgentInput(text="hello", contents=[], attachments=[]), + delivery=DeliveryContext(surface="test", supports_streaming=True), + ) + binding = AgentBinding( + binding_id="binding_001", + scope=BindingScope(scope_type="pipeline", scope_id="pipeline_001"), + event_types=["message.received"], + runner_id=RUNNER_ID, + runner_config={}, + resource_policy=ResourcePolicy(), + state_policy=StatePolicy(enable_state=False, state_scopes=[]), + delivery_policy=DeliveryPolicy(enable_streaming=True, enable_reply=True), + enabled=True, + ) + + messages = [message async for message in orchestrator.run(event, binding)] + + assert len(messages) == 1 + # Verify session during run has query_id=None + session_during_run = plugin_connector.sessions_during_run[0] + assert session_during_run is not None + assert session_during_run["query_id"] is None + + +class TestPipelineCompatibilityPromptAndParams: + """Tests for prompt and params handling in Pipeline compatibility.""" + + @pytest.mark.asyncio + async def test_prompt_in_compatibility_extra(self, clean_agent_state): + """Pipeline prompt is placed in compatibility.extra.prompt.""" + from langbot_plugin.api.entities.builtin.provider import prompt as provider_prompt + + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + + # Add prompt to query + query.prompt = provider_prompt.Prompt( + name="test_prompt", + messages=[ + provider_message.Message(role="system", content="You are a helpful assistant."), + ], + ) + + messages = [message async for message in orchestrator.run_from_query(query)] + + context = plugin_connector.contexts[0] + # Prompt should be in compatibility.extra + assert "prompt" in context["compatibility"]["extra"] + assert len(context["compatibility"]["extra"]["prompt"]) == 1 + assert context["compatibility"]["extra"]["prompt"][0]["role"] == "system" + # Top-level should NOT have prompt + assert "prompt" not in context + + @pytest.mark.asyncio + async def test_params_filtering_keeps_public_param(self, clean_agent_state): + """Public params are kept.""" + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + query.variables = { + "public_param": "visible", + "another_param": 123, + } + + messages = [message async for message in orchestrator.run_from_query(query)] + + context = plugin_connector.contexts[0] + assert context["compatibility"]["extra"]["params"] == { + "public_param": "visible", + "another_param": 123, + } + + @pytest.mark.asyncio + async def test_params_filtering_removes_internal_vars(self, clean_agent_state): + """Internal variables (starting with _) are filtered.""" + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + query.variables = { + "public_param": "visible", + "_internal_var": "should_be_filtered", + "_pipeline_bound_plugins": ["plugin1"], + } + + messages = [message async for message in orchestrator.run_from_query(query)] + + context = plugin_connector.contexts[0] + params = context["compatibility"]["extra"]["params"] + assert "public_param" in params + assert "_internal_var" not in params + assert "_pipeline_bound_plugins" not in params + + @pytest.mark.asyncio + async def test_params_filtering_removes_sensitive_patterns(self, clean_agent_state): + """Sensitive naming patterns are filtered.""" + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + query.variables = { + "public_param": "visible", + "api_token": "secret123", + "secret_key": "secret456", + "password": "secret789", + "credential": "secret000", + } + + messages = [message async for message in orchestrator.run_from_query(query)] + + context = plugin_connector.contexts[0] + params = context["compatibility"]["extra"]["params"] + assert "public_param" in params + assert "api_token" not in params + assert "secret_key" not in params + assert "password" not in params + assert "credential" not in params + + @pytest.mark.asyncio + async def test_params_filtering_removes_non_json_serializable(self, clean_agent_state): + """Non-JSON-serializable values are filtered.""" + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + query.variables = { + "public_param": "visible", + "a_set": {1, 2, 3}, # set is not JSON-serializable + "a_lambda": lambda x: x, # function is not JSON-serializable + } + + messages = [message async for message in orchestrator.run_from_query(query)] + + context = plugin_connector.contexts[0] + params = context["compatibility"]["extra"]["params"] + assert "public_param" in params + assert "a_set" not in params + assert "a_lambda" not in params + + +class TestPipelineCompatibilityHostCapabilities: + """Tests for event-first host capabilities via Pipeline compatibility path.""" + + @pytest.mark.asyncio + async def test_state_updated_writes_to_persistent_store(self, clean_agent_state): + """state.updated via Pipeline path writes to PersistentStateStore.""" + from langbot.pkg.agent.runner.persistent_state_store import get_persistent_state_store + + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "state.updated", + "data": { + "scope": "conversation", + "key": "external.test_key", + "value": "test_value", + }, + }, + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "state saved"}}, + }, + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + + messages = [message async for message in orchestrator.run_from_query(query)] + + assert len(messages) == 1 + assert messages[0].content == "state saved" + + # Verify state was written to PersistentStateStore + persistent_store = get_persistent_state_store(db_engine) + # Build snapshot to check if state was written + # Note: We need to rebuild the event and binding to query the store + from langbot.pkg.agent.runner.pipeline_compat_adapter import PipelineCompatAdapter + event = PipelineCompatAdapter.query_to_event(query) + binding = PipelineCompatAdapter.pipeline_config_to_binding(query, RUNNER_ID) + + snapshot = await persistent_store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot["conversation"]["external.test_key"] == "test_value" + + @pytest.mark.asyncio + async def test_event_log_and_transcript_written(self, clean_agent_state): + """EventLog and Transcript are written via Pipeline path.""" + from langbot.pkg.agent.runner.event_log_store import EventLogStore + from langbot.pkg.agent.runner.transcript_store import TranscriptStore + + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "assistant response"}}, + }, + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + + messages = [message async for message in orchestrator.run_from_query(query)] + + assert len(messages) == 1 + + # Check EventLog has incoming event + event_log_store = EventLogStore(db_engine) + event_logs, _, _ = await event_log_store.page_events( + conversation_id=query.session.using_conversation.uuid, + limit=10, + ) + assert len(event_logs) >= 1 + # First event should be the incoming message.received + assert event_logs[0]["event_type"] == "message.received" + + # Check Transcript has user and assistant messages + transcript_store = TranscriptStore(db_engine) + transcripts, _, _, _ = await transcript_store.page_transcript( + conversation_id=query.session.using_conversation.uuid, + limit=10, + ) + assert len(transcripts) >= 2 + # Find user and assistant messages + roles = [t["role"] for t in transcripts] + assert "user" in roles + assert "assistant" in roles + + @pytest.mark.asyncio + async def test_artifact_created_via_event_first_path(self, clean_agent_state): + """artifact.created via Pipeline path uses event-first ArtifactStore and EventLog.""" + import base64 + from langbot.pkg.agent.runner.artifact_store import ArtifactStore + from langbot.pkg.agent.runner.event_log_store import EventLogStore + + db_engine = clean_agent_state + descriptor = make_descriptor() + artifact_id = "artifact_001" + content = b"test artifact content" + content_base64 = base64.b64encode(content).decode('utf-8') + plugin_connector = FakePluginConnector( + results=[ + { + "type": "artifact.created", + "data": { + "artifact_id": artifact_id, + "artifact_type": "file", + "mime_type": "text/plain", + "name": "test.txt", + "content_base64": content_base64, + }, + }, + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "artifact created"}}, + }, + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + + messages = [message async for message in orchestrator.run_from_query(query)] + + assert len(messages) == 1 + assert messages[0].content == "artifact created" + + # Verify artifact was registered in ArtifactStore + artifact_store = ArtifactStore(db_engine) + artifact = await artifact_store.get_metadata(artifact_id) + assert artifact is not None + assert artifact["artifact_type"] == "file" + assert artifact["name"] == "test.txt" + + # Verify artifact.created event was written to EventLog + event_log_store = EventLogStore(db_engine) + event_logs, _, _ = await event_log_store.page_events( + conversation_id=query.session.using_conversation.uuid, + limit=10, + ) + artifact_events = [e for e in event_logs if e["event_type"] == "artifact.created"] + assert len(artifact_events) >= 1