diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index 7da30b40f..e313dc841 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -220,6 +220,7 @@ class AgentRunContextBuilder: binding: AgentBinding, descriptor: AgentRunnerDescriptor, resources: AgentResources, + run_id: str | None = None, ) -> AgentRunContextPayload: """Build AgentRunContext from event-first envelope. @@ -235,8 +236,8 @@ class AgentRunContextBuilder: Returns: AgentRunContextPayload for the runner """ - # Generate new run_id - run_id = str(uuid.uuid4()) + # Generate new run_id unless an API caller already reserved one. + run_id = run_id or str(uuid.uuid4()) # Build trigger from event trigger: AgentTrigger = { diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 008fc810a..eef117c30 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -68,6 +68,7 @@ class AgentRunOrchestrator: binding: AgentBinding, bound_plugins: list[str] | None = None, adapter_context: dict[str, typing.Any] | None = None, + run_id: str | None = None, ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: """Run an AgentRunner from an event-first envelope.""" runner_id = binding.runner_id @@ -84,6 +85,7 @@ class AgentRunOrchestrator: binding=binding, descriptor=descriptor, resources=resources, + run_id=run_id, ) session_query_id = None diff --git a/src/langbot/pkg/plugin/agent_runner_actions.py b/src/langbot/pkg/plugin/agent_runner_actions.py index 40e72e194..0b21c55a9 100644 --- a/src/langbot/pkg/plugin/agent_runner_actions.py +++ b/src/langbot/pkg/plugin/agent_runner_actions.py @@ -3,12 +3,24 @@ from __future__ import annotations from typing import Any +import asyncio import time +import uuid from langbot_plugin.runtime.io import handler +from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext +from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput +from ..agent.runner.host_models import ( + AgentBinding, + AgentEventEnvelope, + BindingScope, + DeliveryPolicy, + ResourcePolicy, + StatePolicy, +) from ..agent.runner.run_ledger_store import TERMINAL_STATUSES from .agent_run_support import ( @@ -30,7 +42,320 @@ from .agent_run_support import ( ) +def _dict_payload(value: Any, *, field_name: str) -> tuple[dict[str, Any], handler.ActionResponse | None]: + if value is None: + return {}, None + if not isinstance(value, dict): + return {}, handler.ActionResponse.error(message=f'{field_name} must be an object') + return dict(value), None + + +def _build_run_create_event(data: dict[str, Any], *, run_id: str) -> tuple[AgentEventEnvelope | None, handler.ActionResponse | None]: + event_payload, error = _dict_payload(data.get('event'), field_name='event') + if error: + return None, error + + input_payload = event_payload.get('input', data.get('input')) + if input_payload is None: + input_payload = { + 'text': data.get('text'), + 'contents': [], + 'attachments': [], + } + if not isinstance(input_payload, dict): + return None, handler.ActionResponse.error(message='input must be an object') + + delivery_payload = event_payload.get('delivery', data.get('delivery')) + if delivery_payload is None: + delivery_payload = { + 'surface': 'api', + 'reply_target': None, + 'supports_streaming': False, + 'supports_edit': False, + 'supports_reaction': False, + 'platform_capabilities': {}, + } + if not isinstance(delivery_payload, dict): + return None, handler.ActionResponse.error(message='delivery must be an object') + + event_data = event_payload.get('data', data.get('data')) + if event_data is None: + event_data = {} + if not isinstance(event_data, dict): + return None, handler.ActionResponse.error(message='event data must be an object') + + event_type = str(event_payload.get('event_type') or data.get('event_type') or 'api.invoked') + source = str(event_payload.get('source') or data.get('source') or 'api') + event_id = str(event_payload.get('event_id') or data.get('event_id') or f'{source}:{run_id}') + event_time = event_payload.get('event_time', data.get('event_time')) + if event_time is None: + event_time = int(time.time()) + + try: + envelope = AgentEventEnvelope( + event_id=event_id, + event_type=event_type, + event_time=int(event_time) if isinstance(event_time, (int, float, str)) else None, + source=source, + source_event_type=event_payload.get('source_event_type') or data.get('source_event_type') or event_type, + bot_id=event_payload.get('bot_id', data.get('bot_id')), + workspace_id=event_payload.get('workspace_id', data.get('workspace_id')), + conversation_id=event_payload.get('conversation_id', data.get('conversation_id')), + thread_id=event_payload.get('thread_id', data.get('thread_id')), + actor=event_payload.get('actor', data.get('actor')), + subject=event_payload.get('subject', data.get('subject')), + input=AgentInput.model_validate(input_payload), + delivery=DeliveryContext.model_validate(delivery_payload), + raw_ref=event_payload.get('raw_ref', data.get('raw_ref')), + data=event_data, + ) + except Exception as exc: + return None, handler.ActionResponse.error(message=f'invalid event payload: {exc}') + + return envelope, None + + +def _build_run_create_binding( + data: dict[str, Any], + *, + event: AgentEventEnvelope, + run_id: str, +) -> tuple[AgentBinding | None, handler.ActionResponse | None]: + binding_payload, error = _dict_payload(data.get('binding'), field_name='binding') + if error: + return None, error + + runner_id = binding_payload.get('runner_id') or data.get('runner_id') + if not runner_id: + return None, handler.ActionResponse.error(message='runner_id is required') + + scope_payload = binding_payload.get('scope') + if scope_payload is None: + agent_id = binding_payload.get('agent_id') or data.get('agent_id') + workspace_id = event.workspace_id + bot_id = event.bot_id + if agent_id: + scope_payload = {'scope_type': 'agent', 'scope_id': agent_id} + elif bot_id: + scope_payload = {'scope_type': 'bot', 'scope_id': bot_id} + elif workspace_id: + scope_payload = {'scope_type': 'workspace', 'scope_id': workspace_id} + else: + scope_payload = {'scope_type': 'global', 'scope_id': None} + if not isinstance(scope_payload, dict): + return None, handler.ActionResponse.error(message='binding.scope must be an object') + + runner_config_payload = binding_payload.get('runner_config', data.get('runner_config')) + if runner_config_payload is None: + runner_config_payload = {} + if not isinstance(runner_config_payload, dict): + return None, handler.ActionResponse.error(message='runner_config must be an object') + + resource_policy_payload = binding_payload.get('resource_policy', data.get('resource_policy')) + if resource_policy_payload is None: + resource_policy_payload = {} + if not isinstance(resource_policy_payload, dict): + return None, handler.ActionResponse.error(message='resource_policy must be an object') + + state_policy_payload = binding_payload.get('state_policy', data.get('state_policy')) + if state_policy_payload is None: + state_policy_payload = {} + if not isinstance(state_policy_payload, dict): + return None, handler.ActionResponse.error(message='state_policy must be an object') + + delivery_policy_payload = binding_payload.get('delivery_policy', data.get('delivery_policy')) + if delivery_policy_payload is None: + delivery_policy_payload = { + 'enable_streaming': bool(event.delivery.supports_streaming), + 'enable_reply': False, + } + if not isinstance(delivery_policy_payload, dict): + return None, handler.ActionResponse.error(message='delivery_policy must be an object') + + try: + binding = AgentBinding( + binding_id=str(binding_payload.get('binding_id') or data.get('binding_id') or f'api:{runner_id}:{run_id}'), + scope=BindingScope.model_validate(scope_payload), + event_types=list(binding_payload.get('event_types') or data.get('event_types') or [event.event_type]), + runner_id=str(runner_id), + runner_config=runner_config_payload, + resource_policy=ResourcePolicy.model_validate(resource_policy_payload), + state_policy=StatePolicy.model_validate(state_policy_payload), + delivery_policy=DeliveryPolicy.model_validate(delivery_policy_payload), + enabled=bool(binding_payload.get('enabled', data.get('enabled', True))), + agent_id=binding_payload.get('agent_id') or data.get('agent_id'), + ) + except Exception as exc: + return None, handler.ActionResponse.error(message=f'invalid binding payload: {exc}') + + if event.event_type not in binding.event_types: + return None, handler.ActionResponse.error( + message=f'binding.event_types must include event type {event.event_type}' + ) + + return binding, None + + +async def _consume_programmatic_run( + h, + *, + run_id: str, + event: AgentEventEnvelope, + binding: AgentBinding, + bound_plugins: list[str] | None, +) -> None: + async for _result in h.ap.agent_run_orchestrator.run( + event, + binding, + bound_plugins=bound_plugins, + run_id=run_id, + ): + pass + + def register(h): + @h.action(_plugin_runtime_action('RUN_CREATE', 'run_create')) + async def run_create(data: dict[str, Any]) -> handler.ActionResponse: + """Create a programmatic AgentRunner run from an explicit event and binding.""" + caller_plugin_identity = data.get('caller_plugin_identity') + if not _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ): + return handler.ActionResponse.error(message='Run create access not authorized') + + orchestrator = getattr(h.ap, 'agent_run_orchestrator', None) + if orchestrator is None: + return handler.ActionResponse.error(message='AgentRunOrchestrator is not available') + + run_id = str(data.get('run_id') or uuid.uuid4()) + event, event_error = _build_run_create_event(data, run_id=run_id) + if event_error: + return event_error + assert event is not None + + binding, binding_error = _build_run_create_binding(data, event=event, run_id=run_id) + if binding_error: + return binding_error + assert binding is not None + + include_plugins = data.get('include_plugins') + if include_plugins is not None and not isinstance(include_plugins, list): + return handler.ActionResponse.error(message='include_plugins must be a list') + bound_plugins = [str(item) for item in include_plugins] if include_plugins else None + + registry = getattr(h.ap, 'agent_runner_registry', None) + if registry is not None: + try: + await registry.get(binding.runner_id, bound_plugins) + except Exception as exc: + return handler.ActionResponse.error(message=f'Runner {binding.runner_id} is not available: {exc}') + + async def background_run(*, raise_errors: bool = False) -> None: + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + try: + await _consume_programmatic_run( + h, + run_id=run_id, + event=event, + binding=binding, + bound_plugins=bound_plugins, + ) + except Exception as exc: + h.ap.logger.error(f'RUN_CREATE background run {run_id} failed: {exc}', exc_info=True) + if await store.get_run(run_id) is None: + await store.create_run( + run_id=run_id, + event_id=event.event_id, + binding_id=binding.binding_id, + runner_id=binding.runner_id, + conversation_id=event.conversation_id, + thread_id=event.thread_id, + workspace_id=event.workspace_id, + bot_id=event.bot_id, + agent_id=binding.agent_id, + authorization={ + 'runner_id': binding.runner_id, + 'binding_id': binding.binding_id, + 'plugin_identity': None, + 'resources': {}, + 'available_apis': {}, + 'conversation_id': event.conversation_id, + 'bot_id': event.bot_id, + 'workspace_id': event.workspace_id, + 'thread_id': event.thread_id, + }, + metadata={ + 'event_type': event.event_type, + 'source': event.source, + 'run_create_error': True, + }, + status='running', + ) + await store.finalize_run( + run_id=run_id, + status='failed', + status_reason=str(exc), + metadata={'run_create_error': True}, + ) + if raise_errors: + raise + + if data.get('wait_for_completion'): + try: + await background_run(raise_errors=True) + except Exception as exc: + return handler.ActionResponse.error(message=f'Run create error: {exc}') + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + run = await store.get_run(run_id) + if run is not None: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_create', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=run_id, + detail={'runner_id': binding.runner_id, 'event_type': event.event_type}, + ) + return handler.ActionResponse.success(data=run) + else: + asyncio.create_task(background_run()) + + await _record_agent_runner_admin_action( + h.ap, + None, + action='run_create', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=run_id, + detail={'runner_id': binding.runner_id, 'event_type': event.event_type}, + ) + return handler.ActionResponse.success( + data={ + 'run_id': run_id, + 'event_id': event.event_id, + 'agent_id': binding.agent_id, + 'binding_id': binding.binding_id, + 'runner_id': binding.runner_id, + 'conversation_id': event.conversation_id, + 'thread_id': event.thread_id, + 'workspace_id': event.workspace_id, + 'bot_id': event.bot_id, + 'status': 'created', + 'metadata': { + 'event_type': event.event_type, + 'source': event.source, + 'accepted': True, + }, + } + ) + @h.action(_plugin_runtime_action('RUN_GET', 'run_get')) async def run_get(data: dict[str, Any]) -> handler.ActionResponse: """Get one Host-owned run record visible to the current run.""" diff --git a/tests/unit_tests/agent/test_run_ledger_api_auth.py b/tests/unit_tests/agent/test_run_ledger_api_auth.py index d71ce4215..f49e7e282 100644 --- a/tests/unit_tests/agent/test_run_ledger_api_auth.py +++ b/tests/unit_tests/agent/test_run_ledger_api_auth.py @@ -33,11 +33,12 @@ class FakeConnection: class FakeApplication: - def __init__(self, db_engine, admin_plugins=None, runner_registry=None): + def __init__(self, db_engine, admin_plugins=None, runner_registry=None, orchestrator=None): self.logger = MagicMock() self.persistence_mgr = MagicMock() self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine) self.agent_runner_registry = runner_registry + self.agent_run_orchestrator = orchestrator self.instance_config = SimpleNamespace( data={ 'agent_runner': { @@ -72,16 +73,66 @@ class FakeRunnerRegistry: self.runners = runners self.calls = [] + async def get(self, runner_id, bound_plugins=None): + self.calls.append({'runner_id': runner_id, 'bound_plugins': bound_plugins}) + if isinstance(self.runners, dict): + if runner_id not in self.runners: + raise KeyError(runner_id) + return self.runners[runner_id] + for runner in self.runners: + if getattr(runner, 'id', None) == runner_id: + return runner + raise KeyError(runner_id) + async def list_runners(self, *, bound_plugins=None, use_cache=True): self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache}) return self.runners -def _handler(db_engine, admin_plugins=None, runner_registry=None): +class FakeProgrammaticOrchestrator: + def __init__(self, db_engine): + self.db_engine = db_engine + self.calls = [] + + async def run(self, event, binding, bound_plugins=None, adapter_context=None, run_id=None): + self.calls.append( + { + 'event': event, + 'binding': binding, + 'bound_plugins': bound_plugins, + 'adapter_context': adapter_context, + 'run_id': run_id, + } + ) + store = RunLedgerStore(self.db_engine) + await store.create_run( + run_id=run_id, + event_id=event.event_id, + binding_id=binding.binding_id, + runner_id=binding.runner_id, + conversation_id=event.conversation_id, + thread_id=event.thread_id, + workspace_id=event.workspace_id, + bot_id=event.bot_id, + authorization={'available_apis': {}}, + metadata={'event_type': event.event_type, 'source': event.source}, + status='running', + ) + await store.finalize_run(run_id=run_id, status='completed', status_reason='done') + if False: + yield None + + +def _handler(db_engine, admin_plugins=None, runner_registry=None, orchestrator=None): async def fake_disconnect(): return True - fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins, runner_registry=runner_registry) + fake_app = FakeApplication( + db_engine, + admin_plugins=admin_plugins, + runner_registry=runner_registry, + orchestrator=orchestrator, + ) return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) @@ -1260,6 +1311,64 @@ async def test_runtime_register_heartbeat_and_list_actions(session_registry, db_ assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1'] +@pytest.mark.asyncio +async def test_admin_run_create_starts_programmatic_run(db_engine): + orchestrator = FakeProgrammaticOrchestrator(db_engine) + runner_id = 'plugin:test/runner/default' + registry = FakeRunnerRegistry({runner_id: SimpleNamespace(id=runner_id)}) + handler = _handler( + db_engine, + admin_plugins=[{'identity': 'admin/plugin', 'permissions': ['agent_run:admin']}], + runner_registry=registry, + orchestrator=orchestrator, + ) + run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value] + + result = await run_create( + { + 'caller_plugin_identity': 'admin/plugin', + 'run_id': 'run_programmatic', + 'runner_id': runner_id, + 'input': {'text': 'work on issue 1', 'contents': [], 'attachments': []}, + 'conversation_id': 'conv_issue_board', + 'event_type': 'api.invoked', + 'wait_for_completion': True, + } + ) + + assert result.code == 0 + assert result.data['run_id'] == 'run_programmatic' + assert result.data['runner_id'] == runner_id + assert result.data['status'] == 'completed' + assert len(orchestrator.calls) == 1 + call = orchestrator.calls[0] + assert call['run_id'] == 'run_programmatic' + assert call['event'].event_type == 'api.invoked' + assert call['event'].source == 'api' + assert call['event'].conversation_id == 'conv_issue_board' + assert call['binding'].runner_id == runner_id + assert call['binding'].delivery_policy.enable_reply is False + + +@pytest.mark.asyncio +async def test_run_create_requires_admin_permission(db_engine): + orchestrator = FakeProgrammaticOrchestrator(db_engine) + handler = _handler(db_engine, orchestrator=orchestrator) + run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value] + + result = await run_create( + { + 'caller_plugin_identity': 'regular/plugin', + 'runner_id': 'plugin:test/runner/default', + 'input': {'text': 'work'}, + } + ) + + assert result.code != 0 + assert 'not authorized' in result.message + assert orchestrator.calls == [] + + @pytest.mark.asyncio async def test_run_claim_renew_and_release_actions(session_registry, db_engine): await _register_session(