From 754f7197c54933acb7d0341ea912ffa98bb7540e Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Fri, 5 Jun 2026 23:57:44 +0800 Subject: [PATCH] refactor agent runner orchestration boundaries --- src/langbot/pkg/agent/runner/__init__.py | 2 + .../pkg/agent/runner/default_config.py | 72 ++ src/langbot/pkg/agent/runner/invoker.py | 131 ++++ src/langbot/pkg/agent/runner/orchestrator.py | 720 ++---------------- src/langbot/pkg/agent/runner/query_bridge.py | 56 ++ src/langbot/pkg/agent/runner/run_journal.py | 437 +++++++++++ src/langbot/pkg/api/http/service/model.py | 48 +- src/langbot/pkg/api/http/service/pipeline.py | 21 +- src/langbot/pkg/core/app.py | 4 +- src/langbot/pkg/core/stages/build_app.py | 5 +- src/langbot/pkg/provider/modelmgr/modelmgr.py | 14 +- .../api/service/test_model_service.py | 2 + .../api/test_pipeline_service_defaults.py | 9 +- .../unit_tests/plugin/test_handler_actions.py | 5 + tests/unit_tests/provider/test_skill_tools.py | 6 +- 15 files changed, 802 insertions(+), 730 deletions(-) create mode 100644 src/langbot/pkg/agent/runner/default_config.py create mode 100644 src/langbot/pkg/agent/runner/invoker.py create mode 100644 src/langbot/pkg/agent/runner/query_bridge.py create mode 100644 src/langbot/pkg/agent/runner/run_journal.py diff --git a/src/langbot/pkg/agent/runner/__init__.py b/src/langbot/pkg/agent/runner/__init__.py index c9937f16..cebed269 100644 --- a/src/langbot/pkg/agent/runner/__init__.py +++ b/src/langbot/pkg/agent/runner/__init__.py @@ -16,6 +16,7 @@ from .resource_builder import AgentResourceBuilder from .result_normalizer import AgentResultNormalizer from .orchestrator import AgentRunOrchestrator from .config_migration import ConfigMigration +from .default_config import AgentRunnerDefaultConfigService from .binding_resolver import AgentBindingResolver, AgentBindingResolutionError from .session_registry import ( AgentRunSessionRegistry, @@ -47,6 +48,7 @@ __all__ = [ 'AgentResultNormalizer', 'AgentRunOrchestrator', 'ConfigMigration', + 'AgentRunnerDefaultConfigService', 'AgentBindingResolver', 'AgentBindingResolutionError', 'AgentRunSessionRegistry', diff --git a/src/langbot/pkg/agent/runner/default_config.py b/src/langbot/pkg/agent/runner/default_config.py new file mode 100644 index 00000000..6c884fb4 --- /dev/null +++ b/src/langbot/pkg/agent/runner/default_config.py @@ -0,0 +1,72 @@ +"""Default AgentRunner binding configuration helpers.""" + +from __future__ import annotations + +import sqlalchemy + +from ...core import app +from ...entity.persistence import pipeline as persistence_pipeline +from . import config_schema +from .config_migration import ConfigMigration + + +class AgentRunnerDefaultConfigService: + """Apply AgentRunner schema-defined defaults to host binding config.""" + + ap: app.Application + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + + async def _get_runner_descriptor(self, runner_id: str): + registry = getattr(self.ap, 'agent_runner_registry', None) + if registry is None: + return None + try: + return await registry.get(runner_id, bound_plugins=None) + except Exception as e: + logger = getattr(self.ap, 'logger', None) + if logger: + logger.warning(f'Failed to load AgentRunner descriptor while setting default model: {e}') + return None + + async def auto_set_default_pipeline_llm_model(self, model_uuid: str) -> bool: + """Set model_uuid into the default pipeline runner config when the selector is empty.""" + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_pipeline.LegacyPipeline).where( + persistence_pipeline.LegacyPipeline.is_default == True + ) + ) + pipeline = result.first() + if pipeline is None: + return False + + return await self.set_pipeline_llm_model_if_empty(pipeline, model_uuid) + + async def set_pipeline_llm_model_if_empty( + self, + pipeline: persistence_pipeline.LegacyPipeline, + model_uuid: str, + ) -> bool: + """Set model_uuid into a pipeline's schema-defined LLM selector if it is empty.""" + pipeline_config = pipeline.config + if not isinstance(pipeline_config, dict): + return False + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + if not runner_id: + return False + + descriptor = await self._get_runner_descriptor(runner_id) + if descriptor is None: + return False + + ai_config = pipeline_config.setdefault('ai', {}) + runner_configs = ai_config.setdefault('runner_config', {}) + runner_config = runner_configs.setdefault(runner_id, {}) + + if not config_schema.set_empty_llm_model_selection(descriptor, runner_config, model_uuid): + return False + + await self.ap.pipeline_service.update_pipeline(pipeline.uuid, {'config': pipeline_config}) + return True diff --git a/src/langbot/pkg/agent/runner/invoker.py b/src/langbot/pkg/agent/runner/invoker.py new file mode 100644 index 00000000..4f45747b --- /dev/null +++ b/src/langbot/pkg/agent/runner/invoker.py @@ -0,0 +1,131 @@ +"""Plugin-runtime invocation for AgentRunner executions.""" + +from __future__ import annotations + +import asyncio +import time +import traceback +import typing + +from langbot_plugin.entities.io.errors import ActionCallTimeoutError + +from ...core import app +from .context_builder import AgentRunContextPayload +from .descriptor import AgentRunnerDescriptor +from .errors import RunnerExecutionError + + +class AgentRunnerInvoker: + """Invoke an AgentRunner through the plugin runtime. + + This keeps runtime transport, deadline enforcement, and transport error + mapping out of the orchestration state machine. + """ + + ap: app.Application + + def __init__(self, ap: app.Application): + self.ap = ap + + async def invoke( + self, + descriptor: AgentRunnerDescriptor, + context: AgentRunContextPayload, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """Invoke the runner and yield raw result dictionaries.""" + if not self.ap.plugin_connector.is_enable_plugin: + raise RunnerExecutionError( + descriptor.id, + 'Plugin system is disabled', + retryable=False, + ) + + try: + gen = self.ap.plugin_connector.run_agent( + plugin_author=descriptor.plugin_author, + plugin_name=descriptor.plugin_name, + runner_name=descriptor.runner_name, + context=context, + ) + + while True: + try: + result_dict = await self._next_with_deadline(gen, descriptor, context) + except StopAsyncIteration: + break + yield result_dict + + except asyncio.TimeoutError as e: + raise RunnerExecutionError( + descriptor.id, + 'Runner timed out (code: runner.timeout)', + retryable=True, + ) from e + except ActionCallTimeoutError as e: + raise RunnerExecutionError( + descriptor.id, + f'{e} (code: runner.timeout)', + retryable=True, + ) from e + except RunnerExecutionError: + raise + except Exception as e: + self.ap.logger.error( + f'Runner {descriptor.id} unexpected error: {traceback.format_exc()}' + ) + raise RunnerExecutionError( + descriptor.id, + str(e), + retryable=False, + ) + + async def _next_with_deadline( + self, + gen: typing.AsyncGenerator[dict[str, typing.Any], None], + descriptor: AgentRunnerDescriptor, + context: AgentRunContextPayload, + ) -> dict[str, typing.Any]: + """Read the next runner result while enforcing the run deadline.""" + remaining = self._remaining_deadline_seconds(context) + if remaining is not None and remaining <= 0: + await self._close_generator(gen, descriptor) + raise asyncio.TimeoutError + + try: + if remaining is None: + return await anext(gen) + return await asyncio.wait_for(anext(gen), timeout=remaining) + except StopAsyncIteration: + if self._is_deadline_exhausted(context): + raise asyncio.TimeoutError + raise + except asyncio.TimeoutError: + await self._close_generator(gen, descriptor) + raise + + def _remaining_deadline_seconds( + self, + context: AgentRunContextPayload, + ) -> float | None: + runtime = context.get('runtime') or {} + deadline_at = runtime.get('deadline_at') + if deadline_at is None: + return None + try: + return float(deadline_at) - time.time() + except (TypeError, ValueError): + return None + + def _is_deadline_exhausted(self, context: AgentRunContextPayload) -> bool: + remaining = self._remaining_deadline_seconds(context) + return remaining is not None and remaining <= 0 + + async def _close_generator( + self, + gen: typing.AsyncGenerator[dict[str, typing.Any], None], + descriptor: AgentRunnerDescriptor, + ) -> None: + try: + await gen.aclose() + except Exception as e: + self.ap.logger.warning(f'Failed to close timed-out runner {descriptor.id}: {e}') diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 1064da95..9416f921 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -1,72 +1,48 @@ """Agent run orchestrator for coordinating runner execution.""" + from __future__ import annotations import typing -import traceback -import asyncio -import time from langbot_plugin.api.entities.builtin.provider import message as provider_message from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query -from langbot_plugin.entities.io.errors import ActionCallTimeoutError from ...core import app -from .descriptor import AgentRunnerDescriptor -from .registry import AgentRunnerRegistry +from .binding_resolver import AgentBindingResolver from .context_builder import AgentRunContextBuilder, AgentRunContextPayload +from .descriptor import AgentRunnerDescriptor +from .host_models import AgentBinding, AgentEventEnvelope +from .invoker import AgentRunnerInvoker +from .query_bridge import QueryRunBridge +from .registry import AgentRunnerRegistry from .resource_builder import AgentResourceBuilder from .result_normalizer import AgentResultNormalizer -from .persistent_state_store import get_persistent_state_store, PersistentStateStore -from .session_registry import get_session_registry, AgentRunSessionRegistry -from .config_migration import ConfigMigration -from .host_models import AgentEventEnvelope, AgentBinding -from .query_entry_adapter import QueryEntryAdapter -from .binding_resolver import AgentBindingResolver +from .run_journal import AgentRunJournal, MAX_ARTIFACT_INLINE_BYTES as _MAX_ARTIFACT_INLINE_BYTES +from .session_registry import AgentRunSessionRegistry, get_session_registry from .state_scope import build_state_context -from .errors import ( - RunnerNotFoundError, - RunnerExecutionError, - RunnerProtocolError, -) -# Maximum inline artifact content size (1MB) -MAX_ARTIFACT_INLINE_BYTES = 1 * 1024 * 1024 +MAX_ARTIFACT_INLINE_BYTES = _MAX_ARTIFACT_INLINE_BYTES class AgentRunOrchestrator: - """Orchestrator for agent runner execution. + """Coordinate one AgentRunner execution. - Responsibilities: - - Resolve runner ID from current Agent/runner config - - Get runner descriptor from registry - - Provision AgentRunContext envelope from Query - - Build AgentResources with permission filtering - - Invoke plugin runtime RUN_AGENT action - - Normalize AgentRunResult to Pipeline messages - - Handle errors, timeouts, protocol errors - - Maintain streaming card behavior - - Entry points: - - run(event, binding): Main entry for event-first Protocol v1 - - run_from_query(query): current Query entry adapter wrapper + The orchestrator keeps the run state machine readable and delegates + transport, Query bridging, and persistence side effects to narrower + collaborators. """ ap: app.Application - registry: AgentRunnerRegistry - context_builder: AgentRunContextBuilder - resource_builder: AgentResourceBuilder - result_normalizer: AgentResultNormalizer - binding_resolver: AgentBindingResolver - - # Cached singleton references (set in __init__) + query_bridge: QueryRunBridge + invoker: AgentRunnerInvoker + journal: AgentRunJournal _session_registry: AgentRunSessionRegistry - _persistent_state_store: PersistentStateStore | None def __init__( self, @@ -79,9 +55,10 @@ class AgentRunOrchestrator: self.resource_builder = AgentResourceBuilder(ap) self.result_normalizer = AgentResultNormalizer(ap) self.binding_resolver = AgentBindingResolver() - # Cache singleton references to avoid per-request getter calls + self.query_bridge = QueryRunBridge(self.binding_resolver) + self.invoker = AgentRunnerInvoker(ap) + self.journal = AgentRunJournal(ap) self._session_registry = get_session_registry() - self._persistent_state_store = None # Lazy init on first use async def run( self, @@ -90,38 +67,16 @@ class AgentRunOrchestrator: bound_plugins: list[str] | None = None, adapter_context: dict[str, typing.Any] | None = None, ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: - """Run agent runner from event-first envelope. - - This is the main entry point for Protocol v1. - Event Gateway -> AgentBindingResolver -> run(event, binding). - - Args: - event: Event envelope from event gateway - binding: Agent binding - bound_plugins: Optional list of bound plugin identities for authorization - adapter_context: Optional context from an entry adapter - - Yields: - Message or MessageChunk for pipeline response - - Raises: - RunnerNotFoundError: If runner not found - RunnerNotAuthorizedError: If runner not authorized - RunnerExecutionError: If runner execution failed - """ + """Run an AgentRunner from an event-first envelope.""" runner_id = binding.runner_id - - # Get runner descriptor descriptor = await self.registry.get(runner_id, bound_plugins) - # Build resources from binding resources = await self.resource_builder.build_resources_from_binding( event=event, binding=binding, descriptor=descriptor, ) - # Build context from event + binding context = await self.context_builder.build_context_from_event( event=event, binding=binding, @@ -130,20 +85,14 @@ class AgentRunOrchestrator: ) session_query_id = None - - # Merge adapter context if provided if adapter_context: session_query_id = adapter_context.get('query_id') - # Merge params into adapter.extra if 'params' in adapter_context: context['adapter']['extra']['params'] = adapter_context['params'] if adapter_context.get('prompt_get'): context['context']['available_apis']['prompt_get'] = True - # Build state context for State API handlers state_context = build_state_context(event, binding, descriptor) - - # Register session for proxy action permission validation run_id = context['run_id'] await self._session_registry.register( run_id=run_id, @@ -160,65 +109,53 @@ class AgentRunOrchestrator: state_context=state_context, ) - # Write incoming event to EventLog - event_log_id = await self._write_event_log( + event_log_id = await self.journal.write_event_log( event=event, binding=binding, run_id=run_id, runner_id=descriptor.id, ) - - # Register incoming attachments so input/transcript artifact_refs are resolvable. - await self._register_input_artifacts( + await self.journal.register_input_artifacts( event=event, run_id=run_id, runner_id=descriptor.id, ) - - # Write user message to Transcript if message.received if event.event_type == 'message.received' and event.conversation_id: - await self._write_user_transcript( + await self.journal.write_user_transcript( event=event, event_log_id=event_log_id, ) - # Track artifact refs for assistant transcript (cleared after each message.completed) pending_artifact_refs: list[dict[str, typing.Any]] = [] try: - # Run via plugin connector - async for result_dict in self._invoke_runner(descriptor, context): - # Handle artifact.created first - consume before normalizer - if result_dict.get('type') == 'artifact.created': - artifact_ref = await self._handle_artifact_created( + async for result_dict in self.invoker.invoke(descriptor, context): + result_type = result_dict.get('type') + + if result_type == 'artifact.created': + artifact_ref = await self.journal.handle_artifact_created( result_dict=result_dict, event=event, run_id=run_id, runner_id=descriptor.id, ) pending_artifact_refs.append(artifact_ref) - # Pass to normalizer for logging, but don't yield to pipeline await self.result_normalizer.normalize(result_dict, descriptor) continue - # Handle state.updated first - consume before normalizer - if result_dict.get('type') == 'state.updated': - await self._handle_state_updated_event(result_dict, event, binding, descriptor) - # Pass to normalizer for logging, but don't yield to pipeline + if result_type == 'state.updated': + await self.journal.handle_state_updated_event(result_dict, event, binding, descriptor) await self.result_normalizer.normalize(result_dict, descriptor) continue - # Handle message.completed - write to Transcript - if result_dict.get('type') == 'message.completed' and event.conversation_id: - # Merge pending artifact refs with message's own refs - merged_refs = self._merge_artifact_refs( + if result_type == 'message.completed' and event.conversation_id: + merged_refs = self.journal.merge_artifact_refs( pending_artifact_refs, result_dict, ) - # Clear pending refs after attaching to this message pending_artifact_refs.clear() - await self._write_assistant_transcript( + await self.journal.write_assistant_transcript( result_dict=result_dict, event=event, run_id=run_id, @@ -226,127 +163,38 @@ class AgentRunOrchestrator: artifact_refs=merged_refs if merged_refs else None, ) - # Normalize result for other types result = await self.result_normalizer.normalize(result_dict, descriptor) if result is not None: yield result finally: - # Unregister session after run completes (success or error) await self._session_registry.unregister(run_id) async def run_from_query( self, query: pipeline_query.Query, ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: - """Run agent runner from pipeline query. - - This is the Query entry adapter wrapper for the query-based flow. - It delegates to the event-first run(event, binding) method. - - For the new event-first Protocol v1, use run(event, binding) instead. - - Args: - query: Pipeline query with pipeline_config, session, messages, etc. - - Yields: - Message or MessageChunk for pipeline response - - Raises: - RunnerNotFoundError: If runner not found - RunnerNotAuthorizedError: If runner not authorized - RunnerExecutionError: If runner execution failed - """ - # Resolve runner ID using ConfigMigration - runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) - if not runner_id: - raise RunnerNotFoundError('no runner configured') - - # Convert Query to event-first envelope - event = QueryEntryAdapter.query_to_event(query) - - # Project the current Pipeline adapter config into target Agent config. - # exactly one effective binding for this event. - agent_config = QueryEntryAdapter.config_to_agent_config(query, runner_id) - binding = self.binding_resolver.resolve_one(event, [agent_config]) - - # Extract bound plugins for authorization - bound_plugins = query.variables.get('_pipeline_bound_plugins') - - # Build adapter context for Query-specific fields - adapter_context = QueryEntryAdapter.build_adapter_context(query, binding) - - # Delegate to event-first run() + """Run an AgentRunner from the current Pipeline Query entry point.""" + plan = self.query_bridge.build_plan(query) async for result in self.run( - event, - binding, - bound_plugins=bound_plugins, - adapter_context=adapter_context, + plan.event, + plan.binding, + bound_plugins=plan.bound_plugins, + adapter_context=plan.adapter_context, ): yield result + def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None: + """Resolve runner ID for telemetry/logging without full execution.""" + return self.query_bridge.resolve_runner_id_for_telemetry(query) + async def _invoke_runner( self, descriptor: AgentRunnerDescriptor, context: AgentRunContextPayload, ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: - """Invoke runner via plugin connector. - - Args: - descriptor: Runner descriptor - context: AgentRunContext dict - - Yields: - Raw result dicts from plugin runtime - - Raises: - RunnerExecutionError: If plugin system disabled or runtime error - """ - if not self.ap.plugin_connector.is_enable_plugin: - raise RunnerExecutionError( - descriptor.id, - 'Plugin system is disabled', - retryable=False, - ) - - try: - gen = self.ap.plugin_connector.run_agent( - plugin_author=descriptor.plugin_author, - plugin_name=descriptor.plugin_name, - runner_name=descriptor.runner_name, - context=context, - ) - - while True: - try: - result_dict = await self._next_with_deadline(gen, descriptor, context) - except StopAsyncIteration: - break - yield result_dict - - except asyncio.TimeoutError as e: - raise RunnerExecutionError( - descriptor.id, - 'Runner timed out (code: runner.timeout)', - retryable=True, - ) from e - except ActionCallTimeoutError as e: - raise RunnerExecutionError( - descriptor.id, - f'{e} (code: runner.timeout)', - retryable=True, - ) from e - except RunnerExecutionError: - raise - except Exception as e: - # Wrap unexpected errors - self.ap.logger.error( - f'Runner {descriptor.id} unexpected error: {traceback.format_exc()}' - ) - raise RunnerExecutionError( - descriptor.id, - str(e), - retryable=False, - ) + """Compatibility delegate for older tests and internal callers.""" + async for result in self.invoker.invoke(descriptor, context): + yield result async def _next_with_deadline( self, @@ -354,61 +202,23 @@ class AgentRunOrchestrator: descriptor: AgentRunnerDescriptor, context: AgentRunContextPayload, ) -> dict[str, typing.Any]: - """Read the next runner result while enforcing the run deadline.""" - remaining = self._remaining_deadline_seconds(context) - if remaining is not None and remaining <= 0: - await self._close_generator(gen, descriptor) - raise asyncio.TimeoutError - - try: - if remaining is None: - return await anext(gen) - return await asyncio.wait_for(anext(gen), timeout=remaining) - except StopAsyncIteration: - if self._is_deadline_exhausted(context): - raise asyncio.TimeoutError - raise - except asyncio.TimeoutError: - await self._close_generator(gen, descriptor) - raise + return await self.invoker._next_with_deadline(gen, descriptor, context) def _remaining_deadline_seconds( self, context: AgentRunContextPayload, ) -> float | None: - runtime = context.get('runtime') or {} - deadline_at = runtime.get('deadline_at') - if deadline_at is None: - return None - try: - return float(deadline_at) - time.time() - except (TypeError, ValueError): - return None + return self.invoker._remaining_deadline_seconds(context) def _is_deadline_exhausted(self, context: AgentRunContextPayload) -> bool: - remaining = self._remaining_deadline_seconds(context) - return remaining is not None and remaining <= 0 + return self.invoker._is_deadline_exhausted(context) async def _close_generator( self, gen: typing.AsyncGenerator[dict[str, typing.Any], None], descriptor: AgentRunnerDescriptor, ) -> None: - try: - await gen.aclose() - except Exception as e: - self.ap.logger.warning(f'Failed to close timed-out runner {descriptor.id}: {e}') - - def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None: - """Resolve runner ID for telemetry/logging without full execution. - - Args: - query: Pipeline query - - Returns: - Runner ID string, or None - """ - return ConfigMigration.resolve_runner_id(query.pipeline_config) + await self.invoker._close_generator(gen, descriptor) async def _handle_state_updated_event( self, @@ -417,60 +227,7 @@ class AgentRunOrchestrator: binding: AgentBinding, descriptor: AgentRunnerDescriptor, ) -> None: - """Handle state.updated result in event-first mode. - - Persists state to database via PersistentStateStore. - - Args: - result_dict: Raw result dict with type='state.updated' - event: Event envelope - binding: Agent binding - descriptor: Runner descriptor - """ - data = result_dict.get('data', {}) - - scope = data.get('scope') - if not scope: - raise RunnerProtocolError( - descriptor.id, - 'state.updated missing required field: scope', - ) - - # Extract key and value - key = data.get('key') - value = data.get('value') - - if not key: - raise RunnerProtocolError( - descriptor.id, - 'state.updated missing required field: key', - ) - - # Lazy init persistent state store - if self._persistent_state_store is None: - self._persistent_state_store = get_persistent_state_store( - self.ap.persistence_mgr.get_db_engine() - ) - - # Apply update to persistent state store - success, error = await self._persistent_state_store.apply_update_from_event( - event=event, - binding=binding, - descriptor=descriptor, - scope=scope, - key=key, - value=value, - logger=self.ap.logger, - ) - - if success: - self.ap.logger.debug( - f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}' - ) - elif error: - self.ap.logger.warning( - f'Runner {descriptor.id} state.updated rejected: {error}' - ) + await self.journal.handle_state_updated_event(result_dict, event, binding, descriptor) async def _write_event_log( self, @@ -479,53 +236,7 @@ class AgentRunOrchestrator: run_id: str, runner_id: str, ) -> str: - """Write incoming event to EventLog. - - Args: - event: Event envelope - binding: Agent binding - run_id: Run ID - runner_id: Runner ID - - Returns: - Event log ID - """ - import datetime - - from .event_log_store import EventLogStore - store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) - - # Build input summary - input_summary = None - input_json = None - if event.input: - if event.input.text: - input_summary = event.input.text[:1000] - input_json = { - 'text': event.input.text, - 'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents], - 'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments], - } - - return await store.append_event( - event_id=event.event_id, - event_type=event.event_type, - source=event.source, - bot_id=event.bot_id, - workspace_id=event.workspace_id, - conversation_id=event.conversation_id, - thread_id=event.thread_id, - actor_type=event.actor.actor_type if event.actor else None, - actor_id=event.actor.actor_id if event.actor else None, - actor_name=event.actor.actor_name if event.actor else None, - subject_type=event.subject.subject_type if event.subject else None, - subject_id=event.subject.subject_id if event.subject else None, - input_summary=input_summary, - input_json=input_json, - run_id=run_id, - runner_id=runner_id, - event_time=datetime.datetime.fromtimestamp(event.event_time) if event.event_time else None, - ) + return await self.journal.write_event_log(event, binding, run_id, runner_id) async def _register_input_artifacts( self, @@ -533,135 +244,20 @@ class AgentRunOrchestrator: run_id: str, runner_id: str, ) -> None: - """Register current-event attachments referenced by AgentInput.""" - if not event.input or not event.input.attachments: - return - - from .artifact_store import ArtifactStore - store = ArtifactStore(self.ap.persistence_mgr.get_db_engine()) - - for attachment in event.input.attachments: - data = attachment.model_dump(mode='json') if hasattr(attachment, 'model_dump') else attachment - if not isinstance(data, dict): - continue - - artifact_id = data.get('artifact_id') - artifact_type = data.get('artifact_type') or 'file' - if not artifact_id: - continue - - content, parsed_mime_type = self._decode_attachment_content(data.get('content')) - url = data.get('url') - platform_ref_id = data.get('id') - storage_key = None - storage_type = 'metadata_only' - if content is None: - if url: - storage_key = url - storage_type = 'url' - elif platform_ref_id: - storage_key = platform_ref_id - storage_type = 'platform_ref' - - metadata = { - 'input_attachment': True, - 'input_source': data.get('source') or 'platform', - } - if url: - metadata['url'] = url - if platform_ref_id: - metadata['platform_ref_id'] = platform_ref_id - - try: - await store.register_artifact( - artifact_id=artifact_id, - artifact_type=artifact_type, - source='platform', - storage_key=storage_key, - storage_type=storage_type, - mime_type=data.get('mime_type') or parsed_mime_type, - name=data.get('name'), - size_bytes=data.get('size') or (len(content) if content is not None else None), - conversation_id=event.conversation_id, - run_id=run_id, - runner_id=runner_id, - bot_id=event.bot_id, - workspace_id=event.workspace_id, - metadata=metadata, - content=content, - ) - except Exception as e: - self.ap.logger.warning( - f'Failed to register input artifact {artifact_id}: {e}' - ) + await self.journal.register_input_artifacts(event, run_id, runner_id) def _decode_attachment_content( self, content: typing.Any, ) -> tuple[bytes | None, str | None]: - """Decode base64 attachment content, including data URLs.""" - if not isinstance(content, str) or not content: - return None, None - - import base64 - import binascii - - mime_type = None - payload = content - if content.startswith('data:') and ',' in content: - header, payload = content.split(',', 1) - if ';base64' in header: - mime_type = header[5:].split(';', 1)[0] or None - - try: - return base64.b64decode(payload, validate=False), mime_type - except (binascii.Error, ValueError): - return None, mime_type + return self.journal.decode_attachment_content(content) async def _write_user_transcript( self, event: AgentEventEnvelope, event_log_id: str, ) -> None: - """Write user message to Transcript. - - Args: - event: Event envelope - event_log_id: Event log ID - """ - from .transcript_store import TranscriptStore - store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) - - # Build content - content = event.input.text if event.input else None - content_json = None - if event.input: - content_json = { - 'role': 'user', - 'content': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents] if event.input.contents else [], - } - - # Build artifact refs - artifact_refs = [] - if event.input and event.input.attachments: - for a in event.input.attachments: - artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a) - - await store.append_transcript( - transcript_id=None, # Auto-generate - event_id=event_log_id, - conversation_id=event.conversation_id, - role='user', - content=content, - content_json=content_json, - artifact_refs=artifact_refs if artifact_refs else None, - thread_id=event.thread_id, - item_type='message', - metadata={ - 'actor_type': event.actor.actor_type if event.actor else None, - 'actor_id': event.actor.actor_id if event.actor else None, - }, - ) + await self.journal.write_user_transcript(event, event_log_id) async def _handle_artifact_created( self, @@ -670,160 +266,14 @@ class AgentRunOrchestrator: run_id: str, runner_id: str, ) -> dict[str, typing.Any]: - """Handle artifact.created result - register artifact and write EventLog. - - Args: - result_dict: Raw result dict with type='artifact.created' - event: Event envelope - run_id: Current run ID - runner_id: Runner ID - - Returns: - Artifact reference dict for Transcript - - Raises: - RunnerProtocolError: On validation failures or registration errors - """ - import base64 - import uuid - - from .artifact_store import ArtifactStore - from .event_log_store import EventLogStore - - data = result_dict.get('data', {}) - - # Validate run_id matches current context - result_run_id = result_dict.get('run_id') - if result_run_id and result_run_id != run_id: - raise RunnerProtocolError( - runner_id, - f'artifact.created run_id mismatch: expected {run_id}, got {result_run_id}', - ) - - # Extract artifact fields - artifact_id = data.get('artifact_id') or str(uuid.uuid4()) - artifact_type = data.get('artifact_type') - if not artifact_type: - raise RunnerProtocolError( - runner_id, - 'artifact.created missing required field: artifact_type', - ) - - mime_type = data.get('mime_type') - name = data.get('name') - size_bytes = data.get('size_bytes') - sha256 = data.get('sha256') - metadata = data.get('metadata') - content_base64 = data.get('content_base64') - - # Decode and validate content if provided - content: bytes | None = None - if content_base64: - try: - content = base64.b64decode(content_base64, validate=True) - except Exception as e: - raise RunnerProtocolError( - runner_id, - f'artifact.created invalid base64 content: {e}', - ) - - # Validate content size - if len(content) > MAX_ARTIFACT_INLINE_BYTES: - raise RunnerProtocolError( - runner_id, - f'artifact.created content size {len(content)} bytes exceeds limit {MAX_ARTIFACT_INLINE_BYTES} bytes', - ) - - # Register artifact via ArtifactStore - artifact_store = ArtifactStore(self.ap.persistence_mgr.get_db_engine()) - try: - registered_id = await artifact_store.register_artifact( - artifact_id=artifact_id, - artifact_type=artifact_type, - source='runner', - mime_type=mime_type, - name=name, - size_bytes=size_bytes, - sha256=sha256, - conversation_id=event.conversation_id, - run_id=run_id, - runner_id=runner_id, - bot_id=event.bot_id, - workspace_id=event.workspace_id, - metadata=metadata, - content=content, - ) - except Exception as e: - raise RunnerProtocolError( - runner_id, - f'artifact.created failed to register artifact: {e}', - ) - - # Write to EventLog - event_log_store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) - await event_log_store.append_event( - event_id=str(uuid.uuid4()), - event_type='artifact.created', - source='runner', - bot_id=event.bot_id, - workspace_id=event.workspace_id, - conversation_id=event.conversation_id, - thread_id=event.thread_id, - actor_type=event.actor.actor_type if event.actor else None, - actor_id=event.actor.actor_id if event.actor else None, - actor_name=event.actor.actor_name if event.actor else None, - input_summary=f'Artifact created: {artifact_type}', - input_json={ - 'artifact_id': registered_id, - 'artifact_type': artifact_type, - 'mime_type': mime_type, - 'name': name, - 'size_bytes': size_bytes, - }, - run_id=run_id, - runner_id=runner_id, - ) - - # Return artifact ref for Transcript - return { - 'artifact_id': registered_id, - 'artifact_type': artifact_type, - 'mime_type': mime_type, - 'name': name, - } + return await self.journal.handle_artifact_created(result_dict, event, run_id, runner_id) def _merge_artifact_refs( self, pending_refs: list[dict[str, typing.Any]], result_dict: dict[str, typing.Any], ) -> list[dict[str, typing.Any]]: - """Merge pending artifact refs with message's own refs, deduplicating by artifact_id. - - Args: - pending_refs: Artifact refs accumulated from artifact.created events - result_dict: Result dict that may contain message with artifact_refs - - Returns: - Merged and deduplicated list of artifact refs - """ - # Start with pending refs - merged = list(pending_refs) - seen_ids = {ref.get('artifact_id') for ref in pending_refs if ref.get('artifact_id')} - - # Extract refs from message data if present - data = result_dict.get('data', {}) - message = data.get('message', {}) - message_refs = message.get('artifact_refs', []) - - if isinstance(message_refs, list): - for ref in message_refs: - if isinstance(ref, dict): - artifact_id = ref.get('artifact_id') - if artifact_id and artifact_id not in seen_ids: - merged.append(ref) - seen_ids.add(artifact_id) - - return merged + return self.journal.merge_artifact_refs(pending_refs, result_dict) async def _write_assistant_transcript( self, @@ -833,56 +283,10 @@ class AgentRunOrchestrator: runner_id: str, artifact_refs: list[dict[str, typing.Any]] | None = None, ) -> None: - """Write assistant message to Transcript. - - Args: - result_dict: Result dict from runner - event: Original event envelope - run_id: Run ID - runner_id: Runner ID - artifact_refs: Optional artifact references to include - """ - import uuid - - from .transcript_store import TranscriptStore - store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) - - data = result_dict.get('data', {}) - message = data.get('message', {}) - - # Build content - content = None - content_json = None - - if isinstance(message.get('content'), str): - content = message['content'] - content_json = message - elif isinstance(message.get('content'), list): - # Extract text from content list - text_parts = [] - for c in message['content']: - if isinstance(c, dict) and c.get('type') == 'text': - text_parts.append(c.get('text', '')) - content = ' '.join(text_parts) if text_parts else None - content_json = message - - # Generate a unique event ID for assistant message - assistant_event_id = str(uuid.uuid4()) - - await store.append_transcript( - transcript_id=str(uuid.uuid4()), - event_id=assistant_event_id, - conversation_id=event.conversation_id, - role='assistant', - content=content, - content_json=content_json, - artifact_refs=artifact_refs, - thread_id=event.thread_id, - item_type='message', + await self.journal.write_assistant_transcript( + result_dict=result_dict, + event=event, run_id=run_id, runner_id=runner_id, - metadata={ - 'run_id': run_id, - 'runner_id': runner_id, - }, + artifact_refs=artifact_refs, ) diff --git a/src/langbot/pkg/agent/runner/query_bridge.py b/src/langbot/pkg/agent/runner/query_bridge.py new file mode 100644 index 00000000..42e4601e --- /dev/null +++ b/src/langbot/pkg/agent/runner/query_bridge.py @@ -0,0 +1,56 @@ +"""Pipeline Query bridge for AgentRunner execution.""" + +from __future__ import annotations + +import dataclasses +import typing + +from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query + +from .binding_resolver import AgentBindingResolver +from .config_migration import ConfigMigration +from .errors import RunnerNotFoundError +from .host_models import AgentBinding, AgentEventEnvelope +from .query_entry_adapter import QueryEntryAdapter + + +@dataclasses.dataclass(frozen=True) +class QueryRunPlan: + """Projected event-first execution request for a Query-backed run.""" + + event: AgentEventEnvelope + binding: AgentBinding + bound_plugins: list[str] | None + adapter_context: dict[str, typing.Any] + + +class QueryRunBridge: + """Project the current Pipeline Query entry point into Protocol v1 inputs.""" + + binding_resolver: AgentBindingResolver + + def __init__(self, binding_resolver: AgentBindingResolver): + self.binding_resolver = binding_resolver + + def build_plan(self, query: pipeline_query.Query) -> QueryRunPlan: + """Build an event-first run plan from a Pipeline Query.""" + runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) + if not runner_id: + raise RunnerNotFoundError('no runner configured') + + event = QueryEntryAdapter.query_to_event(query) + agent_config = QueryEntryAdapter.config_to_agent_config(query, runner_id) + binding = self.binding_resolver.resolve_one(event, [agent_config]) + bound_plugins = query.variables.get('_pipeline_bound_plugins') + adapter_context = QueryEntryAdapter.build_adapter_context(query, binding) + + return QueryRunPlan( + event=event, + binding=binding, + bound_plugins=bound_plugins, + adapter_context=adapter_context, + ) + + def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None: + """Resolve runner ID for telemetry/logging without full execution.""" + return ConfigMigration.resolve_runner_id(query.pipeline_config) diff --git a/src/langbot/pkg/agent/runner/run_journal.py b/src/langbot/pkg/agent/runner/run_journal.py new file mode 100644 index 00000000..de5ff118 --- /dev/null +++ b/src/langbot/pkg/agent/runner/run_journal.py @@ -0,0 +1,437 @@ +"""Run-side effects for AgentRunner executions.""" + +from __future__ import annotations + +import typing + +from ...core import app +from .descriptor import AgentRunnerDescriptor +from .errors import RunnerProtocolError +from .host_models import AgentBinding, AgentEventEnvelope +from .persistent_state_store import PersistentStateStore, get_persistent_state_store + + +# Maximum inline artifact content size (1MB) +MAX_ARTIFACT_INLINE_BYTES = 1 * 1024 * 1024 + + +class AgentRunJournal: + """Persist run events, transcript records, artifacts, and state updates.""" + + ap: app.Application + + _persistent_state_store: PersistentStateStore | None + + def __init__(self, ap: app.Application): + self.ap = ap + self._persistent_state_store = None + + async def handle_state_updated_event( + self, + result_dict: dict[str, typing.Any], + event: AgentEventEnvelope, + binding: AgentBinding, + descriptor: AgentRunnerDescriptor, + ) -> None: + """Handle state.updated result in event-first mode.""" + data = result_dict.get('data', {}) + + scope = data.get('scope') + if not scope: + raise RunnerProtocolError( + descriptor.id, + 'state.updated missing required field: scope', + ) + + key = data.get('key') + value = data.get('value') + + if not key: + raise RunnerProtocolError( + descriptor.id, + 'state.updated missing required field: key', + ) + + if self._persistent_state_store is None: + self._persistent_state_store = get_persistent_state_store( + self.ap.persistence_mgr.get_db_engine() + ) + + success, error = await self._persistent_state_store.apply_update_from_event( + event=event, + binding=binding, + descriptor=descriptor, + scope=scope, + key=key, + value=value, + logger=self.ap.logger, + ) + + if success: + self.ap.logger.debug( + f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}' + ) + elif error: + self.ap.logger.warning( + f'Runner {descriptor.id} state.updated rejected: {error}' + ) + + async def write_event_log( + self, + event: AgentEventEnvelope, + binding: AgentBinding, + run_id: str, + runner_id: str, + ) -> str: + """Write incoming event to EventLog.""" + import datetime + + from .event_log_store import EventLogStore + + store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + + input_summary = None + input_json = None + if event.input: + if event.input.text: + input_summary = event.input.text[:1000] + input_json = { + 'text': event.input.text, + 'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents], + 'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments], + } + + return await store.append_event( + event_id=event.event_id, + event_type=event.event_type, + source=event.source, + bot_id=event.bot_id, + workspace_id=event.workspace_id, + conversation_id=event.conversation_id, + thread_id=event.thread_id, + actor_type=event.actor.actor_type if event.actor else None, + actor_id=event.actor.actor_id if event.actor else None, + actor_name=event.actor.actor_name if event.actor else None, + subject_type=event.subject.subject_type if event.subject else None, + subject_id=event.subject.subject_id if event.subject else None, + input_summary=input_summary, + input_json=input_json, + run_id=run_id, + runner_id=runner_id, + event_time=datetime.datetime.fromtimestamp(event.event_time) if event.event_time else None, + ) + + async def register_input_artifacts( + self, + event: AgentEventEnvelope, + run_id: str, + runner_id: str, + ) -> None: + """Register current-event attachments referenced by AgentInput.""" + if not event.input or not event.input.attachments: + return + + from .artifact_store import ArtifactStore + + store = ArtifactStore(self.ap.persistence_mgr.get_db_engine()) + + for attachment in event.input.attachments: + data = attachment.model_dump(mode='json') if hasattr(attachment, 'model_dump') else attachment + if not isinstance(data, dict): + continue + + artifact_id = data.get('artifact_id') + artifact_type = data.get('artifact_type') or 'file' + if not artifact_id: + continue + + content, parsed_mime_type = self.decode_attachment_content(data.get('content')) + url = data.get('url') + platform_ref_id = data.get('id') + storage_key = None + storage_type = 'metadata_only' + if content is None: + if url: + storage_key = url + storage_type = 'url' + elif platform_ref_id: + storage_key = platform_ref_id + storage_type = 'platform_ref' + + metadata = { + 'input_attachment': True, + 'input_source': data.get('source') or 'platform', + } + if url: + metadata['url'] = url + if platform_ref_id: + metadata['platform_ref_id'] = platform_ref_id + + try: + await store.register_artifact( + artifact_id=artifact_id, + artifact_type=artifact_type, + source='platform', + storage_key=storage_key, + storage_type=storage_type, + mime_type=data.get('mime_type') or parsed_mime_type, + name=data.get('name'), + size_bytes=data.get('size') or (len(content) if content is not None else None), + conversation_id=event.conversation_id, + run_id=run_id, + runner_id=runner_id, + bot_id=event.bot_id, + workspace_id=event.workspace_id, + metadata=metadata, + content=content, + ) + except Exception as e: + self.ap.logger.warning( + f'Failed to register input artifact {artifact_id}: {e}' + ) + + def decode_attachment_content( + self, + content: typing.Any, + ) -> tuple[bytes | None, str | None]: + """Decode base64 attachment content, including data URLs.""" + if not isinstance(content, str) or not content: + return None, None + + import base64 + import binascii + + mime_type = None + payload = content + if content.startswith('data:') and ',' in content: + header, payload = content.split(',', 1) + if ';base64' in header: + mime_type = header[5:].split(';', 1)[0] or None + + try: + return base64.b64decode(payload, validate=False), mime_type + except (binascii.Error, ValueError): + return None, mime_type + + async def write_user_transcript( + self, + event: AgentEventEnvelope, + event_log_id: str, + ) -> None: + """Write user message to Transcript.""" + from .transcript_store import TranscriptStore + + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + content = event.input.text if event.input else None + content_json = None + if event.input: + content_json = { + 'role': 'user', + 'content': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents] if event.input.contents else [], + } + + artifact_refs = [] + if event.input and event.input.attachments: + for a in event.input.attachments: + artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a) + + await store.append_transcript( + transcript_id=None, + event_id=event_log_id, + conversation_id=event.conversation_id, + role='user', + content=content, + content_json=content_json, + artifact_refs=artifact_refs if artifact_refs else None, + thread_id=event.thread_id, + item_type='message', + metadata={ + 'actor_type': event.actor.actor_type if event.actor else None, + 'actor_id': event.actor.actor_id if event.actor else None, + }, + ) + + async def handle_artifact_created( + self, + result_dict: dict[str, typing.Any], + event: AgentEventEnvelope, + run_id: str, + runner_id: str, + ) -> dict[str, typing.Any]: + """Handle artifact.created result, register artifact, and write EventLog.""" + import base64 + import uuid + + from .artifact_store import ArtifactStore + from .event_log_store import EventLogStore + + data = result_dict.get('data', {}) + + result_run_id = result_dict.get('run_id') + if result_run_id and result_run_id != run_id: + raise RunnerProtocolError( + runner_id, + f'artifact.created run_id mismatch: expected {run_id}, got {result_run_id}', + ) + + artifact_id = data.get('artifact_id') or str(uuid.uuid4()) + artifact_type = data.get('artifact_type') + if not artifact_type: + raise RunnerProtocolError( + runner_id, + 'artifact.created missing required field: artifact_type', + ) + + mime_type = data.get('mime_type') + name = data.get('name') + size_bytes = data.get('size_bytes') + sha256 = data.get('sha256') + metadata = data.get('metadata') + content_base64 = data.get('content_base64') + + content: bytes | None = None + if content_base64: + try: + content = base64.b64decode(content_base64, validate=True) + except Exception as e: + raise RunnerProtocolError( + runner_id, + f'artifact.created invalid base64 content: {e}', + ) + + if len(content) > MAX_ARTIFACT_INLINE_BYTES: + raise RunnerProtocolError( + runner_id, + f'artifact.created content size {len(content)} bytes exceeds limit {MAX_ARTIFACT_INLINE_BYTES} bytes', + ) + + artifact_store = ArtifactStore(self.ap.persistence_mgr.get_db_engine()) + try: + registered_id = await artifact_store.register_artifact( + artifact_id=artifact_id, + artifact_type=artifact_type, + source='runner', + mime_type=mime_type, + name=name, + size_bytes=size_bytes, + sha256=sha256, + conversation_id=event.conversation_id, + run_id=run_id, + runner_id=runner_id, + bot_id=event.bot_id, + workspace_id=event.workspace_id, + metadata=metadata, + content=content, + ) + except Exception as e: + raise RunnerProtocolError( + runner_id, + f'artifact.created failed to register artifact: {e}', + ) + + event_log_store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + await event_log_store.append_event( + event_id=str(uuid.uuid4()), + event_type='artifact.created', + source='runner', + bot_id=event.bot_id, + workspace_id=event.workspace_id, + conversation_id=event.conversation_id, + thread_id=event.thread_id, + actor_type=event.actor.actor_type if event.actor else None, + actor_id=event.actor.actor_id if event.actor else None, + actor_name=event.actor.actor_name if event.actor else None, + input_summary=f'Artifact created: {artifact_type}', + input_json={ + 'artifact_id': registered_id, + 'artifact_type': artifact_type, + 'mime_type': mime_type, + 'name': name, + 'size_bytes': size_bytes, + }, + run_id=run_id, + runner_id=runner_id, + ) + + return { + 'artifact_id': registered_id, + 'artifact_type': artifact_type, + 'mime_type': mime_type, + 'name': name, + } + + def merge_artifact_refs( + self, + pending_refs: list[dict[str, typing.Any]], + result_dict: dict[str, typing.Any], + ) -> list[dict[str, typing.Any]]: + """Merge pending artifact refs with a message's own refs.""" + merged = list(pending_refs) + seen_ids = {ref.get('artifact_id') for ref in pending_refs if ref.get('artifact_id')} + + data = result_dict.get('data', {}) + message = data.get('message', {}) + message_refs = message.get('artifact_refs', []) + + if isinstance(message_refs, list): + for ref in message_refs: + if isinstance(ref, dict): + artifact_id = ref.get('artifact_id') + if artifact_id and artifact_id not in seen_ids: + merged.append(ref) + seen_ids.add(artifact_id) + + return merged + + async def write_assistant_transcript( + self, + result_dict: dict[str, typing.Any], + event: AgentEventEnvelope, + run_id: str, + runner_id: str, + artifact_refs: list[dict[str, typing.Any]] | None = None, + ) -> None: + """Write assistant message to Transcript.""" + import uuid + + from .transcript_store import TranscriptStore + + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + + data = result_dict.get('data', {}) + message = data.get('message', {}) + + content = None + content_json = None + + if isinstance(message.get('content'), str): + content = message['content'] + content_json = message + elif isinstance(message.get('content'), list): + text_parts = [] + for c in message['content']: + if isinstance(c, dict) and c.get('type') == 'text': + text_parts.append(c.get('text', '')) + content = ' '.join(text_parts) if text_parts else None + content_json = message + + assistant_event_id = str(uuid.uuid4()) + + await store.append_transcript( + transcript_id=str(uuid.uuid4()), + event_id=assistant_event_id, + conversation_id=event.conversation_id, + role='assistant', + content=content, + content_json=content_json, + artifact_refs=artifact_refs, + thread_id=event.thread_id, + item_type='message', + run_id=run_id, + runner_id=runner_id, + metadata={ + 'run_id': run_id, + 'runner_id': runner_id, + }, + ) diff --git a/src/langbot/pkg/api/http/service/model.py b/src/langbot/pkg/api/http/service/model.py index 3758cbbc..8f678ef1 100644 --- a/src/langbot/pkg/api/http/service/model.py +++ b/src/langbot/pkg/api/http/service/model.py @@ -7,10 +7,7 @@ from langbot_plugin.api.entities.builtin.provider import message as provider_mes from ....core import app from ....entity.persistence import model as persistence_model -from ....entity.persistence import pipeline as persistence_pipeline from ....provider.modelmgr import requester as model_requester -from ....agent.runner.config_migration import ConfigMigration -from ....agent.runner import config_schema def _parse_provider_api_keys(provider_dict: dict) -> dict: @@ -42,40 +39,6 @@ class LLMModelsService: def __init__(self, ap: app.Application) -> None: self.ap = ap - async def _get_runner_descriptor(self, runner_id: str): - registry = getattr(self.ap, 'agent_runner_registry', None) - if registry is None: - return None - try: - return await registry.get(runner_id, bound_plugins=None) - except Exception as e: - logger = getattr(self.ap, 'logger', None) - if logger: - logger.warning(f'Failed to load AgentRunner descriptor while setting default model: {e}') - return None - - async def _auto_set_default_pipeline_llm_model(self, pipeline: persistence_pipeline.LegacyPipeline, model_uuid: str): - pipeline_config = pipeline.config - if not isinstance(pipeline_config, dict): - return - - runner_id = ConfigMigration.resolve_runner_id(pipeline_config) - if not runner_id: - return - - descriptor = await self._get_runner_descriptor(runner_id) - if descriptor is None: - return - - ai_config = pipeline_config.setdefault('ai', {}) - runner_configs = ai_config.setdefault('runner_config', {}) - runner_config = runner_configs.setdefault(runner_id, {}) - - if not config_schema.set_empty_llm_model_selection(descriptor, runner_config, model_uuid): - return - - await self.ap.pipeline_service.update_pipeline(pipeline.uuid, {'config': pipeline_config}) - async def get_llm_models(self, include_secret: bool = True) -> list[dict]: """Get all LLM models with provider info""" result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_model.LLMModel)) @@ -145,14 +108,9 @@ class LLMModelsService: self.ap.model_mgr.llm_models.append(runtime_llm_model) if auto_set_to_default_pipeline: - result = await self.ap.persistence_mgr.execute_async( - sqlalchemy.select(persistence_pipeline.LegacyPipeline).where( - persistence_pipeline.LegacyPipeline.is_default == True - ) - ) - pipeline = result.first() - if pipeline is not None: - await self._auto_set_default_pipeline_llm_model(pipeline, model_data['uuid']) + default_config_service = getattr(self.ap, 'agent_runner_default_config_service', None) + if default_config_service is not None: + await default_config_service.auto_set_default_pipeline_llm_model(model_data['uuid']) return model_data['uuid'] diff --git a/src/langbot/pkg/api/http/service/pipeline.py b/src/langbot/pkg/api/http/service/pipeline.py index fbafe6d2..dbe7c2dd 100644 --- a/src/langbot/pkg/api/http/service/pipeline.py +++ b/src/langbot/pkg/api/http/service/pipeline.py @@ -8,11 +8,6 @@ import typing from ....core import app from ....entity.persistence import pipeline as persistence_pipeline -# Prefer the official local-agent plugin when it is installed. This is not a -# built-in fallback: when no AgentRunner plugin is available, the default -# pipeline stays unbound so the UI can guide users to install a runner. -PREFERRED_DEFAULT_RUNNER_ID = 'plugin:langbot/local-agent/default' - default_stage_order = [ 'GroupRespondRuleCheckStage', # 群响应规则检查 @@ -69,10 +64,7 @@ class PipelineService: if not runners: return config - selected_runner = next( - (runner for runner in runners if runner.id == PREFERRED_DEFAULT_RUNNER_ID), - runners[0], - ) + selected_runner = runners[0] ai_config = config.setdefault('ai', {}) runner_config = ai_config.setdefault('runner', {}) runner_config['id'] = selected_runner.id @@ -113,15 +105,10 @@ class PipelineService: # Only installed/available runners should be shown config_item['options'] = runner_options - # Prefer the official local-agent plugin when installed; otherwise use the first - # discoverable runner. If no runner is available, leave the default unset so the - # UI can recommend installing an AgentRunner plugin, similar to the RAG flow. + # Use the registry order as the default order. If no runner is available, leave + # the default unset so the UI can recommend installing an AgentRunner plugin. if runner_options and 'default' not in config_item: - default_option = next( - (option for option in runner_options if option['name'] == PREFERRED_DEFAULT_RUNNER_ID), - runner_options[0], - ) - config_item['default'] = default_option['name'] + config_item['default'] = runner_options[0]['name'] # Add corresponding stage configuration for each runner for stage_config in runner_stages: diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index 074c3505..b46c84fa 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -48,7 +48,7 @@ from ..survey import manager as survey_module from ..skill import manager as skill_mgr if TYPE_CHECKING: - from ..agent.runner import AgentRunnerRegistry, AgentRunOrchestrator + from ..agent.runner import AgentRunnerRegistry, AgentRunOrchestrator, AgentRunnerDefaultConfigService class Application: @@ -172,6 +172,8 @@ class Application: # Agent runner subsystem agent_runner_registry: AgentRunnerRegistry = None + agent_runner_default_config_service: AgentRunnerDefaultConfigService = None + agent_run_orchestrator: AgentRunOrchestrator = None def __init__(self): diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 1484bafe..092bed5f 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -39,7 +39,7 @@ from ...vector import mgr as vectordb_mgr from .. import taskmgr from ...telemetry import telemetry as telemetry_module from ...survey import manager as survey_module -from ...agent.runner import AgentRunnerRegistry, AgentRunOrchestrator +from ...agent.runner import AgentRunnerRegistry, AgentRunOrchestrator, AgentRunnerDefaultConfigService @stage.stage_class('BuildAppStage') @@ -199,6 +199,9 @@ class BuildAppStage(stage.BootingStage): agent_runner_registry_inst = AgentRunnerRegistry(ap) ap.agent_runner_registry = agent_runner_registry_inst + agent_runner_default_config_service_inst = AgentRunnerDefaultConfigService(ap) + ap.agent_runner_default_config_service = agent_runner_default_config_service_inst + agent_run_orchestrator_inst = AgentRunOrchestrator(ap, agent_runner_registry_inst) ap.agent_run_orchestrator = agent_run_orchestrator_inst diff --git a/src/langbot/pkg/provider/modelmgr/modelmgr.py b/src/langbot/pkg/provider/modelmgr/modelmgr.py index 976f1263..85f6c20d 100644 --- a/src/langbot/pkg/provider/modelmgr/modelmgr.py +++ b/src/langbot/pkg/provider/modelmgr/modelmgr.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import sqlalchemy import traceback @@ -54,8 +55,19 @@ class ModelManager: self.ap.logger.info('LangBot Space Models service is disabled, skipping sync.') return + sync_timeout = space_config.get('models_sync_timeout') try: - await self.sync_new_models_from_space() + if sync_timeout: + await asyncio.wait_for( + self.sync_new_models_from_space(), + timeout=float(sync_timeout), + ) + else: + await self.sync_new_models_from_space() + except asyncio.TimeoutError: + self.ap.logger.warning( + f'LangBot Space model sync timed out after {sync_timeout}s, skipping startup sync.' + ) except Exception as e: self.ap.logger.warning('Failed to sync new models from LangBot Space, model list may not be updated.') self.ap.logger.warning(f' - Error: {e}') diff --git a/tests/unit_tests/api/service/test_model_service.py b/tests/unit_tests/api/service/test_model_service.py index fb8670f7..1911e990 100644 --- a/tests/unit_tests/api/service/test_model_service.py +++ b/tests/unit_tests/api/service/test_model_service.py @@ -18,6 +18,7 @@ from unittest.mock import AsyncMock, Mock import pytest +from langbot.pkg.agent.runner.default_config import AgentRunnerDefaultConfigService from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor from langbot.pkg.api.http.service.model import ( LLMModelsService, @@ -432,6 +433,7 @@ class TestLLMModelsServiceCreateLLMModel: ap.model_mgr.load_llm_model_with_provider = AsyncMock(return_value=Mock()) ap.pipeline_service = SimpleNamespace(update_pipeline=AsyncMock()) ap.agent_runner_registry = FakeAgentRunnerRegistry() + ap.agent_runner_default_config_service = AgentRunnerDefaultConfigService(ap) pipeline = SimpleNamespace( uuid='pipeline-uuid', diff --git a/tests/unit_tests/api/test_pipeline_service_defaults.py b/tests/unit_tests/api/test_pipeline_service_defaults.py index 1cdb7af8..415a4a3d 100644 --- a/tests/unit_tests/api/test_pipeline_service_defaults.py +++ b/tests/unit_tests/api/test_pipeline_service_defaults.py @@ -37,7 +37,7 @@ def make_runner(runner_id: str, config_schema: list[dict]): @pytest.mark.asyncio -async def test_default_pipeline_config_uses_installed_local_agent_schema(): +async def test_default_pipeline_config_uses_first_installed_runner_schema(): local_agent = make_runner( 'plugin:langbot/local-agent/default', [ @@ -56,11 +56,10 @@ async def test_default_pipeline_config_uses_installed_local_agent_schema(): config = await PipelineService(ap).get_default_pipeline_config() - assert config['ai']['runner']['id'] == 'plugin:langbot/local-agent/default' + assert config['ai']['runner']['id'] == 'plugin:alice/custom-agent/default' assert config['ai']['runner_config'] == { - 'plugin:langbot/local-agent/default': { - 'model': {'primary': '', 'fallbacks': []}, - 'prompt': [{'role': 'system', 'content': 'Hello'}], + 'plugin:alice/custom-agent/default': { + 'api-key': '', }, } diff --git a/tests/unit_tests/plugin/test_handler_actions.py b/tests/unit_tests/plugin/test_handler_actions.py index 6c7480b9..67adbadb 100644 --- a/tests/unit_tests/plugin/test_handler_actions.py +++ b/tests/unit_tests/plugin/test_handler_actions.py @@ -425,6 +425,7 @@ class TestAgentRunProxyActions: try: response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM.value]({ 'run_id': run_id, + 'caller_plugin_identity': 'test/runner', 'llm_model_uuid': 'llm_001', 'messages': [{'role': 'user', 'content': 'hello'}], 'funcs': [{ @@ -489,6 +490,7 @@ class TestAgentRunProxyActions: try: stream = runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM_STREAM.value]({ 'run_id': run_id, + 'caller_plugin_identity': 'test/runner', 'llm_model_uuid': 'llm_stream_001', 'messages': [{'role': 'user', 'content': 'hello'}], 'funcs': [{ @@ -547,6 +549,7 @@ class TestAgentRunProxyActions: try: stream = runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM_STREAM.value]({ 'run_id': run_id, + 'caller_plugin_identity': 'test/runner', 'llm_model_uuid': 'llm_stream_002', 'messages': [{'role': 'user', 'content': 'hello'}], }) @@ -582,6 +585,7 @@ class TestAgentRunProxyActions: try: response = await runtime_handler.actions[PluginToRuntimeAction.CALL_TOOL.value]({ 'run_id': run_id, + 'caller_plugin_identity': 'test/runner', 'tool_name': 'test/search', 'parameters': {'q': 'langbot'}, }) @@ -628,6 +632,7 @@ class TestAgentRunProxyActions: try: response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_RERANK.value]({ 'run_id': run_id, + 'caller_plugin_identity': 'test/runner', 'rerank_model_uuid': 'rerank_001', 'query': 'hello', 'documents': ['a', 'b'], diff --git a/tests/unit_tests/provider/test_skill_tools.py b/tests/unit_tests/provider/test_skill_tools.py index 5651dfd4..d486e30a 100644 --- a/tests/unit_tests/provider/test_skill_tools.py +++ b/tests/unit_tests/provider/test_skill_tools.py @@ -424,7 +424,9 @@ class TestNativeToolLoaderSkillPaths: SimpleNamespace(query_id='q1', variables={PIPELINE_BOUND_SKILLS_KEY: ['demo']}), ) - assert result == {'ok': True, 'content': 'demo instructions'} + assert result['ok'] is True + assert result['content'] == 'demo instructions' + assert result['truncated'] is False @pytest.mark.asyncio async def test_exec_in_activated_skill_mount_rewrites_command_and_refreshes(self): @@ -453,7 +455,7 @@ class TestNativeToolLoaderSkillPaths: query, ) - assert result == {'ok': True} + assert result['ok'] is True tool_parameters = ap.box_service.execute_tool.await_args.args[0] assert tool_parameters['command'] == 'python /workspace/.skills/demo/scripts/run.py' assert tool_parameters['workdir'] == '/workspace/.skills/demo'