feat(agent-runner): add programmatic run create action

This commit is contained in:
huanghuoguoguo
2026-06-24 20:20:40 +08:00
parent ed3598f8ac
commit f1b3bd50fd
4 changed files with 442 additions and 5 deletions
@@ -220,6 +220,7 @@ class AgentRunContextBuilder:
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
resources: AgentResources,
run_id: str | None = None,
) -> AgentRunContextPayload:
"""Build AgentRunContext from event-first envelope.
@@ -235,8 +236,8 @@ class AgentRunContextBuilder:
Returns:
AgentRunContextPayload for the runner
"""
# Generate new run_id
run_id = str(uuid.uuid4())
# Generate new run_id unless an API caller already reserved one.
run_id = run_id or str(uuid.uuid4())
# Build trigger from event
trigger: AgentTrigger = {
@@ -68,6 +68,7 @@ class AgentRunOrchestrator:
binding: AgentBinding,
bound_plugins: list[str] | None = None,
adapter_context: dict[str, typing.Any] | None = None,
run_id: str | None = None,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run an AgentRunner from an event-first envelope."""
runner_id = binding.runner_id
@@ -84,6 +85,7 @@ class AgentRunOrchestrator:
binding=binding,
descriptor=descriptor,
resources=resources,
run_id=run_id,
)
session_query_id = None
@@ -3,12 +3,24 @@
from __future__ import annotations
from typing import Any
import asyncio
import time
import uuid
from langbot_plugin.runtime.io import handler
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from ..agent.runner.host_models import (
AgentBinding,
AgentEventEnvelope,
BindingScope,
DeliveryPolicy,
ResourcePolicy,
StatePolicy,
)
from ..agent.runner.run_ledger_store import TERMINAL_STATUSES
from .agent_run_support import (
@@ -30,7 +42,320 @@ from .agent_run_support import (
)
def _dict_payload(value: Any, *, field_name: str) -> tuple[dict[str, Any], handler.ActionResponse | None]:
if value is None:
return {}, None
if not isinstance(value, dict):
return {}, handler.ActionResponse.error(message=f'{field_name} must be an object')
return dict(value), None
def _build_run_create_event(data: dict[str, Any], *, run_id: str) -> tuple[AgentEventEnvelope | None, handler.ActionResponse | None]:
event_payload, error = _dict_payload(data.get('event'), field_name='event')
if error:
return None, error
input_payload = event_payload.get('input', data.get('input'))
if input_payload is None:
input_payload = {
'text': data.get('text'),
'contents': [],
'attachments': [],
}
if not isinstance(input_payload, dict):
return None, handler.ActionResponse.error(message='input must be an object')
delivery_payload = event_payload.get('delivery', data.get('delivery'))
if delivery_payload is None:
delivery_payload = {
'surface': 'api',
'reply_target': None,
'supports_streaming': False,
'supports_edit': False,
'supports_reaction': False,
'platform_capabilities': {},
}
if not isinstance(delivery_payload, dict):
return None, handler.ActionResponse.error(message='delivery must be an object')
event_data = event_payload.get('data', data.get('data'))
if event_data is None:
event_data = {}
if not isinstance(event_data, dict):
return None, handler.ActionResponse.error(message='event data must be an object')
event_type = str(event_payload.get('event_type') or data.get('event_type') or 'api.invoked')
source = str(event_payload.get('source') or data.get('source') or 'api')
event_id = str(event_payload.get('event_id') or data.get('event_id') or f'{source}:{run_id}')
event_time = event_payload.get('event_time', data.get('event_time'))
if event_time is None:
event_time = int(time.time())
try:
envelope = AgentEventEnvelope(
event_id=event_id,
event_type=event_type,
event_time=int(event_time) if isinstance(event_time, (int, float, str)) else None,
source=source,
source_event_type=event_payload.get('source_event_type') or data.get('source_event_type') or event_type,
bot_id=event_payload.get('bot_id', data.get('bot_id')),
workspace_id=event_payload.get('workspace_id', data.get('workspace_id')),
conversation_id=event_payload.get('conversation_id', data.get('conversation_id')),
thread_id=event_payload.get('thread_id', data.get('thread_id')),
actor=event_payload.get('actor', data.get('actor')),
subject=event_payload.get('subject', data.get('subject')),
input=AgentInput.model_validate(input_payload),
delivery=DeliveryContext.model_validate(delivery_payload),
raw_ref=event_payload.get('raw_ref', data.get('raw_ref')),
data=event_data,
)
except Exception as exc:
return None, handler.ActionResponse.error(message=f'invalid event payload: {exc}')
return envelope, None
def _build_run_create_binding(
data: dict[str, Any],
*,
event: AgentEventEnvelope,
run_id: str,
) -> tuple[AgentBinding | None, handler.ActionResponse | None]:
binding_payload, error = _dict_payload(data.get('binding'), field_name='binding')
if error:
return None, error
runner_id = binding_payload.get('runner_id') or data.get('runner_id')
if not runner_id:
return None, handler.ActionResponse.error(message='runner_id is required')
scope_payload = binding_payload.get('scope')
if scope_payload is None:
agent_id = binding_payload.get('agent_id') or data.get('agent_id')
workspace_id = event.workspace_id
bot_id = event.bot_id
if agent_id:
scope_payload = {'scope_type': 'agent', 'scope_id': agent_id}
elif bot_id:
scope_payload = {'scope_type': 'bot', 'scope_id': bot_id}
elif workspace_id:
scope_payload = {'scope_type': 'workspace', 'scope_id': workspace_id}
else:
scope_payload = {'scope_type': 'global', 'scope_id': None}
if not isinstance(scope_payload, dict):
return None, handler.ActionResponse.error(message='binding.scope must be an object')
runner_config_payload = binding_payload.get('runner_config', data.get('runner_config'))
if runner_config_payload is None:
runner_config_payload = {}
if not isinstance(runner_config_payload, dict):
return None, handler.ActionResponse.error(message='runner_config must be an object')
resource_policy_payload = binding_payload.get('resource_policy', data.get('resource_policy'))
if resource_policy_payload is None:
resource_policy_payload = {}
if not isinstance(resource_policy_payload, dict):
return None, handler.ActionResponse.error(message='resource_policy must be an object')
state_policy_payload = binding_payload.get('state_policy', data.get('state_policy'))
if state_policy_payload is None:
state_policy_payload = {}
if not isinstance(state_policy_payload, dict):
return None, handler.ActionResponse.error(message='state_policy must be an object')
delivery_policy_payload = binding_payload.get('delivery_policy', data.get('delivery_policy'))
if delivery_policy_payload is None:
delivery_policy_payload = {
'enable_streaming': bool(event.delivery.supports_streaming),
'enable_reply': False,
}
if not isinstance(delivery_policy_payload, dict):
return None, handler.ActionResponse.error(message='delivery_policy must be an object')
try:
binding = AgentBinding(
binding_id=str(binding_payload.get('binding_id') or data.get('binding_id') or f'api:{runner_id}:{run_id}'),
scope=BindingScope.model_validate(scope_payload),
event_types=list(binding_payload.get('event_types') or data.get('event_types') or [event.event_type]),
runner_id=str(runner_id),
runner_config=runner_config_payload,
resource_policy=ResourcePolicy.model_validate(resource_policy_payload),
state_policy=StatePolicy.model_validate(state_policy_payload),
delivery_policy=DeliveryPolicy.model_validate(delivery_policy_payload),
enabled=bool(binding_payload.get('enabled', data.get('enabled', True))),
agent_id=binding_payload.get('agent_id') or data.get('agent_id'),
)
except Exception as exc:
return None, handler.ActionResponse.error(message=f'invalid binding payload: {exc}')
if event.event_type not in binding.event_types:
return None, handler.ActionResponse.error(
message=f'binding.event_types must include event type {event.event_type}'
)
return binding, None
async def _consume_programmatic_run(
h,
*,
run_id: str,
event: AgentEventEnvelope,
binding: AgentBinding,
bound_plugins: list[str] | None,
) -> None:
async for _result in h.ap.agent_run_orchestrator.run(
event,
binding,
bound_plugins=bound_plugins,
run_id=run_id,
):
pass
def register(h):
@h.action(_plugin_runtime_action('RUN_CREATE', 'run_create'))
async def run_create(data: dict[str, Any]) -> handler.ActionResponse:
"""Create a programmatic AgentRunner run from an explicit event and binding."""
caller_plugin_identity = data.get('caller_plugin_identity')
if not _has_agent_runner_admin_permission(
h.ap,
caller_plugin_identity,
AGENT_RUN_ADMIN_PERMISSION,
):
return handler.ActionResponse.error(message='Run create access not authorized')
orchestrator = getattr(h.ap, 'agent_run_orchestrator', None)
if orchestrator is None:
return handler.ActionResponse.error(message='AgentRunOrchestrator is not available')
run_id = str(data.get('run_id') or uuid.uuid4())
event, event_error = _build_run_create_event(data, run_id=run_id)
if event_error:
return event_error
assert event is not None
binding, binding_error = _build_run_create_binding(data, event=event, run_id=run_id)
if binding_error:
return binding_error
assert binding is not None
include_plugins = data.get('include_plugins')
if include_plugins is not None and not isinstance(include_plugins, list):
return handler.ActionResponse.error(message='include_plugins must be a list')
bound_plugins = [str(item) for item in include_plugins] if include_plugins else None
registry = getattr(h.ap, 'agent_runner_registry', None)
if registry is not None:
try:
await registry.get(binding.runner_id, bound_plugins)
except Exception as exc:
return handler.ActionResponse.error(message=f'Runner {binding.runner_id} is not available: {exc}')
async def background_run(*, raise_errors: bool = False) -> None:
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
try:
await _consume_programmatic_run(
h,
run_id=run_id,
event=event,
binding=binding,
bound_plugins=bound_plugins,
)
except Exception as exc:
h.ap.logger.error(f'RUN_CREATE background run {run_id} failed: {exc}', exc_info=True)
if await store.get_run(run_id) is None:
await store.create_run(
run_id=run_id,
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=binding.runner_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
agent_id=binding.agent_id,
authorization={
'runner_id': binding.runner_id,
'binding_id': binding.binding_id,
'plugin_identity': None,
'resources': {},
'available_apis': {},
'conversation_id': event.conversation_id,
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'thread_id': event.thread_id,
},
metadata={
'event_type': event.event_type,
'source': event.source,
'run_create_error': True,
},
status='running',
)
await store.finalize_run(
run_id=run_id,
status='failed',
status_reason=str(exc),
metadata={'run_create_error': True},
)
if raise_errors:
raise
if data.get('wait_for_completion'):
try:
await background_run(raise_errors=True)
except Exception as exc:
return handler.ActionResponse.error(message=f'Run create error: {exc}')
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
run = await store.get_run(run_id)
if run is not None:
await _record_agent_runner_admin_action(
h.ap,
store,
action='run_create',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=run_id,
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
)
return handler.ActionResponse.success(data=run)
else:
asyncio.create_task(background_run())
await _record_agent_runner_admin_action(
h.ap,
None,
action='run_create',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=run_id,
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
)
return handler.ActionResponse.success(
data={
'run_id': run_id,
'event_id': event.event_id,
'agent_id': binding.agent_id,
'binding_id': binding.binding_id,
'runner_id': binding.runner_id,
'conversation_id': event.conversation_id,
'thread_id': event.thread_id,
'workspace_id': event.workspace_id,
'bot_id': event.bot_id,
'status': 'created',
'metadata': {
'event_type': event.event_type,
'source': event.source,
'accepted': True,
},
}
)
@h.action(_plugin_runtime_action('RUN_GET', 'run_get'))
async def run_get(data: dict[str, Any]) -> handler.ActionResponse:
"""Get one Host-owned run record visible to the current run."""
@@ -33,11 +33,12 @@ class FakeConnection:
class FakeApplication:
def __init__(self, db_engine, admin_plugins=None, runner_registry=None):
def __init__(self, db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
self.logger = MagicMock()
self.persistence_mgr = MagicMock()
self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine)
self.agent_runner_registry = runner_registry
self.agent_run_orchestrator = orchestrator
self.instance_config = SimpleNamespace(
data={
'agent_runner': {
@@ -72,16 +73,66 @@ class FakeRunnerRegistry:
self.runners = runners
self.calls = []
async def get(self, runner_id, bound_plugins=None):
self.calls.append({'runner_id': runner_id, 'bound_plugins': bound_plugins})
if isinstance(self.runners, dict):
if runner_id not in self.runners:
raise KeyError(runner_id)
return self.runners[runner_id]
for runner in self.runners:
if getattr(runner, 'id', None) == runner_id:
return runner
raise KeyError(runner_id)
async def list_runners(self, *, bound_plugins=None, use_cache=True):
self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache})
return self.runners
def _handler(db_engine, admin_plugins=None, runner_registry=None):
class FakeProgrammaticOrchestrator:
def __init__(self, db_engine):
self.db_engine = db_engine
self.calls = []
async def run(self, event, binding, bound_plugins=None, adapter_context=None, run_id=None):
self.calls.append(
{
'event': event,
'binding': binding,
'bound_plugins': bound_plugins,
'adapter_context': adapter_context,
'run_id': run_id,
}
)
store = RunLedgerStore(self.db_engine)
await store.create_run(
run_id=run_id,
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=binding.runner_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
authorization={'available_apis': {}},
metadata={'event_type': event.event_type, 'source': event.source},
status='running',
)
await store.finalize_run(run_id=run_id, status='completed', status_reason='done')
if False:
yield None
def _handler(db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
async def fake_disconnect():
return True
fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins, runner_registry=runner_registry)
fake_app = FakeApplication(
db_engine,
admin_plugins=admin_plugins,
runner_registry=runner_registry,
orchestrator=orchestrator,
)
return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app)
@@ -1260,6 +1311,64 @@ async def test_runtime_register_heartbeat_and_list_actions(session_registry, db_
assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1']
@pytest.mark.asyncio
async def test_admin_run_create_starts_programmatic_run(db_engine):
orchestrator = FakeProgrammaticOrchestrator(db_engine)
runner_id = 'plugin:test/runner/default'
registry = FakeRunnerRegistry({runner_id: SimpleNamespace(id=runner_id)})
handler = _handler(
db_engine,
admin_plugins=[{'identity': 'admin/plugin', 'permissions': ['agent_run:admin']}],
runner_registry=registry,
orchestrator=orchestrator,
)
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
result = await run_create(
{
'caller_plugin_identity': 'admin/plugin',
'run_id': 'run_programmatic',
'runner_id': runner_id,
'input': {'text': 'work on issue 1', 'contents': [], 'attachments': []},
'conversation_id': 'conv_issue_board',
'event_type': 'api.invoked',
'wait_for_completion': True,
}
)
assert result.code == 0
assert result.data['run_id'] == 'run_programmatic'
assert result.data['runner_id'] == runner_id
assert result.data['status'] == 'completed'
assert len(orchestrator.calls) == 1
call = orchestrator.calls[0]
assert call['run_id'] == 'run_programmatic'
assert call['event'].event_type == 'api.invoked'
assert call['event'].source == 'api'
assert call['event'].conversation_id == 'conv_issue_board'
assert call['binding'].runner_id == runner_id
assert call['binding'].delivery_policy.enable_reply is False
@pytest.mark.asyncio
async def test_run_create_requires_admin_permission(db_engine):
orchestrator = FakeProgrammaticOrchestrator(db_engine)
handler = _handler(db_engine, orchestrator=orchestrator)
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
result = await run_create(
{
'caller_plugin_identity': 'regular/plugin',
'runner_id': 'plugin:test/runner/default',
'input': {'text': 'work'},
}
)
assert result.code != 0
assert 'not authorized' in result.message
assert orchestrator.calls == []
@pytest.mark.asyncio
async def test_run_claim_renew_and_release_actions(session_registry, db_engine):
await _register_session(