Compare commits

..

2 Commits

Author SHA1 Message Date
huanghuoguoguo d02e1a14a3 test(agent): cover agent service event bindings 2026-06-25 00:04:59 +08:00
huanghuoguoguo cd6a39d3a2 test(agent): cover pluginized agent runner runtime 2026-06-24 20:46:32 +08:00
13 changed files with 1169 additions and 454 deletions
@@ -220,7 +220,6 @@ class AgentRunContextBuilder:
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
resources: AgentResources,
run_id: str | None = None,
) -> AgentRunContextPayload:
"""Build AgentRunContext from event-first envelope.
@@ -236,8 +235,8 @@ class AgentRunContextBuilder:
Returns:
AgentRunContextPayload for the runner
"""
# Generate new run_id unless an API caller already reserved one.
run_id = run_id or str(uuid.uuid4())
# Generate new run_id
run_id = str(uuid.uuid4())
# Build trigger from event
trigger: AgentTrigger = {
@@ -68,7 +68,6 @@ class AgentRunOrchestrator:
binding: AgentBinding,
bound_plugins: list[str] | None = None,
adapter_context: dict[str, typing.Any] | None = None,
run_id: str | None = None,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run an AgentRunner from an event-first envelope."""
runner_id = binding.runner_id
@@ -85,7 +84,6 @@ class AgentRunOrchestrator:
binding=binding,
descriptor=descriptor,
resources=resources,
run_id=run_id,
)
session_query_id = None
+4 -2
View File
@@ -146,8 +146,10 @@ class AgentRunnerRegistry:
Returns:
List of runner descriptors
"""
if use_cache and self._cache is not None:
# Filter from cache
if use_cache and self._cache is not None and self._cache:
# Filter from cache. Do not treat an empty cache as final because the
# plugin runtime may still be launching installed plugins when the
# first metadata request arrives.
return self._filter_runners_by_bound_plugins(self._cache, bound_plugins)
# Discover fresh (always full list)
@@ -10,7 +10,7 @@ ssereadtimeout) live in ``extra_args`` and are left untouched — the
auto-detecting remote transport consumes them regardless.
Revision ID: 0006_normalize_mcp_remote_mode
Revises: 0005_add_llm_context_length
Revises: 8d3a1f2c4b6e
Create Date: 2026-06-21
"""
@@ -18,7 +18,7 @@ import sqlalchemy as sa
from alembic import op
revision = '0006_normalize_mcp_remote_mode'
down_revision = '0005_add_llm_context_length'
down_revision = '8d3a1f2c4b6e'
branch_labels = None
depends_on = None
@@ -3,24 +3,12 @@
from __future__ import annotations
from typing import Any
import asyncio
import time
import uuid
from langbot_plugin.runtime.io import handler
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from ..agent.runner.host_models import (
AgentBinding,
AgentEventEnvelope,
BindingScope,
DeliveryPolicy,
ResourcePolicy,
StatePolicy,
)
from ..agent.runner.run_ledger_store import TERMINAL_STATUSES
from .agent_run_support import (
@@ -42,320 +30,7 @@ from .agent_run_support import (
)
def _dict_payload(value: Any, *, field_name: str) -> tuple[dict[str, Any], handler.ActionResponse | None]:
if value is None:
return {}, None
if not isinstance(value, dict):
return {}, handler.ActionResponse.error(message=f'{field_name} must be an object')
return dict(value), None
def _build_run_create_event(data: dict[str, Any], *, run_id: str) -> tuple[AgentEventEnvelope | None, handler.ActionResponse | None]:
event_payload, error = _dict_payload(data.get('event'), field_name='event')
if error:
return None, error
input_payload = event_payload.get('input', data.get('input'))
if input_payload is None:
input_payload = {
'text': data.get('text'),
'contents': [],
'attachments': [],
}
if not isinstance(input_payload, dict):
return None, handler.ActionResponse.error(message='input must be an object')
delivery_payload = event_payload.get('delivery', data.get('delivery'))
if delivery_payload is None:
delivery_payload = {
'surface': 'api',
'reply_target': None,
'supports_streaming': False,
'supports_edit': False,
'supports_reaction': False,
'platform_capabilities': {},
}
if not isinstance(delivery_payload, dict):
return None, handler.ActionResponse.error(message='delivery must be an object')
event_data = event_payload.get('data', data.get('data'))
if event_data is None:
event_data = {}
if not isinstance(event_data, dict):
return None, handler.ActionResponse.error(message='event data must be an object')
event_type = str(event_payload.get('event_type') or data.get('event_type') or 'api.invoked')
source = str(event_payload.get('source') or data.get('source') or 'api')
event_id = str(event_payload.get('event_id') or data.get('event_id') or f'{source}:{run_id}')
event_time = event_payload.get('event_time', data.get('event_time'))
if event_time is None:
event_time = int(time.time())
try:
envelope = AgentEventEnvelope(
event_id=event_id,
event_type=event_type,
event_time=int(event_time) if isinstance(event_time, (int, float, str)) else None,
source=source,
source_event_type=event_payload.get('source_event_type') or data.get('source_event_type') or event_type,
bot_id=event_payload.get('bot_id', data.get('bot_id')),
workspace_id=event_payload.get('workspace_id', data.get('workspace_id')),
conversation_id=event_payload.get('conversation_id', data.get('conversation_id')),
thread_id=event_payload.get('thread_id', data.get('thread_id')),
actor=event_payload.get('actor', data.get('actor')),
subject=event_payload.get('subject', data.get('subject')),
input=AgentInput.model_validate(input_payload),
delivery=DeliveryContext.model_validate(delivery_payload),
raw_ref=event_payload.get('raw_ref', data.get('raw_ref')),
data=event_data,
)
except Exception as exc:
return None, handler.ActionResponse.error(message=f'invalid event payload: {exc}')
return envelope, None
def _build_run_create_binding(
data: dict[str, Any],
*,
event: AgentEventEnvelope,
run_id: str,
) -> tuple[AgentBinding | None, handler.ActionResponse | None]:
binding_payload, error = _dict_payload(data.get('binding'), field_name='binding')
if error:
return None, error
runner_id = binding_payload.get('runner_id') or data.get('runner_id')
if not runner_id:
return None, handler.ActionResponse.error(message='runner_id is required')
scope_payload = binding_payload.get('scope')
if scope_payload is None:
agent_id = binding_payload.get('agent_id') or data.get('agent_id')
workspace_id = event.workspace_id
bot_id = event.bot_id
if agent_id:
scope_payload = {'scope_type': 'agent', 'scope_id': agent_id}
elif bot_id:
scope_payload = {'scope_type': 'bot', 'scope_id': bot_id}
elif workspace_id:
scope_payload = {'scope_type': 'workspace', 'scope_id': workspace_id}
else:
scope_payload = {'scope_type': 'global', 'scope_id': None}
if not isinstance(scope_payload, dict):
return None, handler.ActionResponse.error(message='binding.scope must be an object')
runner_config_payload = binding_payload.get('runner_config', data.get('runner_config'))
if runner_config_payload is None:
runner_config_payload = {}
if not isinstance(runner_config_payload, dict):
return None, handler.ActionResponse.error(message='runner_config must be an object')
resource_policy_payload = binding_payload.get('resource_policy', data.get('resource_policy'))
if resource_policy_payload is None:
resource_policy_payload = {}
if not isinstance(resource_policy_payload, dict):
return None, handler.ActionResponse.error(message='resource_policy must be an object')
state_policy_payload = binding_payload.get('state_policy', data.get('state_policy'))
if state_policy_payload is None:
state_policy_payload = {}
if not isinstance(state_policy_payload, dict):
return None, handler.ActionResponse.error(message='state_policy must be an object')
delivery_policy_payload = binding_payload.get('delivery_policy', data.get('delivery_policy'))
if delivery_policy_payload is None:
delivery_policy_payload = {
'enable_streaming': bool(event.delivery.supports_streaming),
'enable_reply': False,
}
if not isinstance(delivery_policy_payload, dict):
return None, handler.ActionResponse.error(message='delivery_policy must be an object')
try:
binding = AgentBinding(
binding_id=str(binding_payload.get('binding_id') or data.get('binding_id') or f'api:{runner_id}:{run_id}'),
scope=BindingScope.model_validate(scope_payload),
event_types=list(binding_payload.get('event_types') or data.get('event_types') or [event.event_type]),
runner_id=str(runner_id),
runner_config=runner_config_payload,
resource_policy=ResourcePolicy.model_validate(resource_policy_payload),
state_policy=StatePolicy.model_validate(state_policy_payload),
delivery_policy=DeliveryPolicy.model_validate(delivery_policy_payload),
enabled=bool(binding_payload.get('enabled', data.get('enabled', True))),
agent_id=binding_payload.get('agent_id') or data.get('agent_id'),
)
except Exception as exc:
return None, handler.ActionResponse.error(message=f'invalid binding payload: {exc}')
if event.event_type not in binding.event_types:
return None, handler.ActionResponse.error(
message=f'binding.event_types must include event type {event.event_type}'
)
return binding, None
async def _consume_programmatic_run(
h,
*,
run_id: str,
event: AgentEventEnvelope,
binding: AgentBinding,
bound_plugins: list[str] | None,
) -> None:
async for _result in h.ap.agent_run_orchestrator.run(
event,
binding,
bound_plugins=bound_plugins,
run_id=run_id,
):
pass
def register(h):
@h.action(_plugin_runtime_action('RUN_CREATE', 'run_create'))
async def run_create(data: dict[str, Any]) -> handler.ActionResponse:
"""Create a programmatic AgentRunner run from an explicit event and binding."""
caller_plugin_identity = data.get('caller_plugin_identity')
if not _has_agent_runner_admin_permission(
h.ap,
caller_plugin_identity,
AGENT_RUN_ADMIN_PERMISSION,
):
return handler.ActionResponse.error(message='Run create access not authorized')
orchestrator = getattr(h.ap, 'agent_run_orchestrator', None)
if orchestrator is None:
return handler.ActionResponse.error(message='AgentRunOrchestrator is not available')
run_id = str(data.get('run_id') or uuid.uuid4())
event, event_error = _build_run_create_event(data, run_id=run_id)
if event_error:
return event_error
assert event is not None
binding, binding_error = _build_run_create_binding(data, event=event, run_id=run_id)
if binding_error:
return binding_error
assert binding is not None
include_plugins = data.get('include_plugins')
if include_plugins is not None and not isinstance(include_plugins, list):
return handler.ActionResponse.error(message='include_plugins must be a list')
bound_plugins = [str(item) for item in include_plugins] if include_plugins else None
registry = getattr(h.ap, 'agent_runner_registry', None)
if registry is not None:
try:
await registry.get(binding.runner_id, bound_plugins)
except Exception as exc:
return handler.ActionResponse.error(message=f'Runner {binding.runner_id} is not available: {exc}')
async def background_run(*, raise_errors: bool = False) -> None:
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
try:
await _consume_programmatic_run(
h,
run_id=run_id,
event=event,
binding=binding,
bound_plugins=bound_plugins,
)
except Exception as exc:
h.ap.logger.error(f'RUN_CREATE background run {run_id} failed: {exc}', exc_info=True)
if await store.get_run(run_id) is None:
await store.create_run(
run_id=run_id,
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=binding.runner_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
agent_id=binding.agent_id,
authorization={
'runner_id': binding.runner_id,
'binding_id': binding.binding_id,
'plugin_identity': None,
'resources': {},
'available_apis': {},
'conversation_id': event.conversation_id,
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'thread_id': event.thread_id,
},
metadata={
'event_type': event.event_type,
'source': event.source,
'run_create_error': True,
},
status='running',
)
await store.finalize_run(
run_id=run_id,
status='failed',
status_reason=str(exc),
metadata={'run_create_error': True},
)
if raise_errors:
raise
if data.get('wait_for_completion'):
try:
await background_run(raise_errors=True)
except Exception as exc:
return handler.ActionResponse.error(message=f'Run create error: {exc}')
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
run = await store.get_run(run_id)
if run is not None:
await _record_agent_runner_admin_action(
h.ap,
store,
action='run_create',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=run_id,
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
)
return handler.ActionResponse.success(data=run)
else:
asyncio.create_task(background_run())
await _record_agent_runner_admin_action(
h.ap,
None,
action='run_create',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=run_id,
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
)
return handler.ActionResponse.success(
data={
'run_id': run_id,
'event_id': event.event_id,
'agent_id': binding.agent_id,
'binding_id': binding.binding_id,
'runner_id': binding.runner_id,
'conversation_id': event.conversation_id,
'thread_id': event.thread_id,
'workspace_id': event.workspace_id,
'bot_id': event.bot_id,
'status': 'created',
'metadata': {
'event_type': event.event_type,
'source': event.source,
'accepted': True,
},
}
)
@h.action(_plugin_runtime_action('RUN_GET', 'run_get'))
async def run_get(data: dict[str, Any]) -> handler.ActionResponse:
"""Get one Host-owned run record visible to the current run."""
+1 -1
View File
@@ -61,7 +61,7 @@ def langbot_process(e2e_config_path, e2e_port, e2e_tmpdir):
project_root=project_root,
work_dir=e2e_tmpdir, # Run in tmpdir where data/config.yaml exists
port=e2e_port,
timeout=60, # Longer timeout for first startup
timeout=180, # Longer timeout for first startup
collect_coverage=collect_coverage,
)
@@ -0,0 +1,473 @@
"""E2E tests for pluginized AgentRunner execution.
This module starts the real LangBot backend with the plugin system enabled and
loads a deterministic AgentRunner plugin through the real SDK Plugin Runtime.
"""
from __future__ import annotations
import shutil
import socket
import sqlite3
import subprocess
import tempfile
import textwrap
import time
from pathlib import Path
import httpx
import pytest
from tests.e2e.utils.config_factory import create_minimal_config, create_test_directories
from tests.e2e.utils.process_manager import LangBotProcess, find_project_root
pytestmark = pytest.mark.e2e
QA_RUNNER_ID = 'plugin:e2e/agent-runner-qa/default'
QA_PLUGIN_DIRNAME = 'e2e__agent-runner-qa'
@pytest.fixture(scope='session')
def agent_runner_e2e_port():
"""Port for the AgentRunner plugin-runtime E2E process."""
return 15310
@pytest.fixture(scope='session')
def agent_runner_e2e_tmpdir():
"""Create temporary directory for AgentRunner E2E testing."""
tmpdir = Path(tempfile.mkdtemp(prefix='langbot_agent_runner_e2e_'))
yield tmpdir
shutil.rmtree(tmpdir, ignore_errors=True)
def _write_qa_agent_runner_plugin(plugin_root: Path) -> None:
"""Write a deterministic AgentRunner plugin used by this E2E."""
runner_dir = plugin_root / 'components' / 'agent_runner'
runner_dir.mkdir(parents=True, exist_ok=True)
(plugin_root / 'assets').mkdir(parents=True, exist_ok=True)
(plugin_root / 'assets' / 'icon.svg').write_text(
'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1 1"></svg>',
encoding='utf-8',
)
(plugin_root / 'manifest.yaml').write_text(
textwrap.dedent(
"""
apiVersion: langbot/v1
kind: Plugin
metadata:
author: e2e
name: agent-runner-qa
version: 0.1.0
label:
en_US: AgentRunner QA
zh_Hans: AgentRunner QA
description:
en_US: Deterministic AgentRunner E2E probe.
zh_Hans: 确定性的 AgentRunner E2E 探针。
icon: assets/icon.svg
spec:
version: 0.1.0
config: []
components:
AgentRunner:
fromDirs:
- path: components/agent_runner/
pages: []
execution:
python:
path: main.py
attr: AgentRunnerQAPlugin
"""
).strip()
+ '\n',
encoding='utf-8',
)
(plugin_root / 'main.py').write_text(
textwrap.dedent(
"""
from __future__ import annotations
from langbot_plugin.api.definition.plugin import BasePlugin
class AgentRunnerQAPlugin(BasePlugin):
async def initialize(self) -> None:
pass
"""
).strip()
+ '\n',
encoding='utf-8',
)
(runner_dir / 'default.yaml').write_text(
textwrap.dedent(
"""
apiVersion: langbot/v1
kind: AgentRunner
metadata:
name: default
label:
en_US: QA Echo Runner
zh_Hans: QA Echo Runner
description:
en_US: Echoes input and exercises run-scoped state APIs.
zh_Hans: 回显输入并验证运行级状态 API。
spec:
config: []
capabilities:
streaming: false
permissions: {}
execution:
python:
path: default.py
attr: DefaultAgentRunner
"""
).strip()
+ '\n',
encoding='utf-8',
)
(runner_dir / 'default.py').write_text(
textwrap.dedent(
"""
from __future__ import annotations
from typing import AsyncGenerator
from langbot_plugin.api.definition.components.agent_runner.runner import AgentRunner
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
from langbot_plugin.api.entities.builtin.agent_runner.result import AgentRunResult
from langbot_plugin.api.entities.builtin.provider.message import Message
class DefaultAgentRunner(AgentRunner):
async def run(self, ctx: AgentRunContext) -> AsyncGenerator[AgentRunResult, None]:
text = ctx.input.to_text()
yield AgentRunResult.message_completed(
ctx.run_id,
Message(role='assistant', content=f'e2e echo: {text}'),
)
yield AgentRunResult.state_updated(
ctx.run_id,
'e2e.echo_count',
{'count': 1},
scope='conversation',
)
yield AgentRunResult.run_completed(ctx.run_id, finish_reason='stop')
"""
).strip()
+ '\n',
encoding='utf-8',
)
def _free_port() -> int:
"""Reserve a currently-free localhost TCP port for this E2E process."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(('127.0.0.1', 0))
return int(sock.getsockname()[1])
@pytest.fixture(scope='session')
def agent_runner_runtime_ports():
"""Control/debug ports for the standalone plugin runtime."""
control_port = _free_port()
debug_port = _free_port()
while debug_port == control_port:
debug_port = _free_port()
return control_port, debug_port
@pytest.fixture(scope='session')
def agent_runner_e2e_config_path(agent_runner_e2e_tmpdir, agent_runner_e2e_port, agent_runner_runtime_ports):
"""Create a plugin-enabled config and deterministic AgentRunner fixture."""
config_path = create_minimal_config(agent_runner_e2e_tmpdir, port=agent_runner_e2e_port)
create_test_directories(agent_runner_e2e_tmpdir)
import yaml
with open(config_path, encoding='utf-8') as f:
config = yaml.safe_load(f)
config['api']['global_api_key'] = 'e2e-agent-runner-key'
runtime_control_port, _runtime_debug_port = agent_runner_runtime_ports
config['plugin']['enable'] = True
config['plugin']['runtime_ws_url'] = f'ws://127.0.0.1:{runtime_control_port}/control/ws'
config['plugin']['enable_marketplace'] = False
config['box']['enabled'] = False
config['system']['jwt']['secret'] = 'e2e-agent-runner-secret-key'
with open(config_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(config, f, default_flow_style=False)
_write_qa_agent_runner_plugin(agent_runner_e2e_tmpdir / 'data' / 'plugins' / QA_PLUGIN_DIRNAME)
return config_path
@pytest.fixture(scope='session')
def agent_runner_runtime_process(agent_runner_e2e_tmpdir, agent_runner_runtime_ports):
"""Start the real SDK plugin runtime over WebSocket."""
control_port, debug_port = agent_runner_runtime_ports
stdout_path = agent_runner_e2e_tmpdir / 'plugin-runtime.stdout.log'
stderr_path = agent_runner_e2e_tmpdir / 'plugin-runtime.stderr.log'
stdout_file = open(stdout_path, 'wb')
stderr_file = open(stderr_path, 'wb')
proc = subprocess.Popen(
[
str(find_project_root() / '.venv' / 'bin' / 'python'),
'-m',
'langbot_plugin.cli.__init__',
'rt',
'--ws-control-port',
str(control_port),
'--ws-debug-port',
str(debug_port),
],
cwd=agent_runner_e2e_tmpdir,
stdout=stdout_file,
stderr=stderr_file,
start_new_session=True,
)
yield proc
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
stdout_file.close()
stderr_file.close()
@pytest.fixture(scope='session')
def agent_runner_langbot_process(
agent_runner_e2e_config_path,
agent_runner_e2e_port,
agent_runner_e2e_tmpdir,
agent_runner_runtime_process,
):
"""Start real LangBot with plugin runtime enabled."""
project_root = find_project_root()
proc = LangBotProcess(
project_root=project_root,
work_dir=agent_runner_e2e_tmpdir,
port=agent_runner_e2e_port,
timeout=180,
debug=True,
cli_args=['--standalone-runtime'],
)
success = proc.start()
if not success:
stdout, stderr = proc.get_logs()
pytest.fail(f'LangBot failed to start with AgentRunner plugin runtime:\nstdout: {stdout}\nstderr: {stderr}')
yield proc
proc.stop()
@pytest.fixture
def agent_runner_client(agent_runner_e2e_port, agent_runner_langbot_process):
"""HTTP client for the AgentRunner E2E backend."""
with httpx.Client(
base_url=f'http://127.0.0.1:{agent_runner_e2e_port}',
timeout=90.0,
trust_env=False,
) as client:
yield client
def _init_and_auth(client: httpx.Client) -> str:
"""Initialize the test admin user and return a bearer token."""
init_resp = client.post('/api/v1/user/init', json={'user': 'admin', 'password': 'admin'})
assert init_resp.status_code == 200
assert init_resp.json()['code'] in [0, 1]
auth_resp = client.post('/api/v1/user/auth', json={'user': 'admin', 'password': 'admin'})
assert auth_resp.status_code == 200
payload = auth_resp.json()
assert payload['code'] == 0
return payload['data']['token']
def test_plugin_runtime_discovers_agent_runner(agent_runner_client, agent_runner_langbot_process):
"""Pipeline metadata should include the real runtime-discovered QA runner."""
token = _init_and_auth(agent_runner_client)
start = time.time()
while time.time() - start < 60:
response = agent_runner_client.get(
'/api/v1/pipelines/_/metadata',
headers={'Authorization': f'Bearer {token}'},
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
metadata_groups = data['data']['configs']
ai_metadata = next(group for group in metadata_groups if group.get('name') == 'ai')
runner_stage = next(stage for stage in ai_metadata['stages'] if stage['name'] == 'runner')
runner_select = next(item for item in runner_stage['config'] if item['name'] == 'id')
option_names = {option['name'] for option in runner_select['options']}
if QA_RUNNER_ID in option_names:
return
time.sleep(2)
assert QA_RUNNER_ID in option_names
def test_host_orchestrator_runs_agent_runner_and_records_ledger(
agent_runner_e2e_config_path,
agent_runner_e2e_tmpdir,
agent_runner_runtime_process,
):
"""The Host orchestrator should run the pluginized runner and persist run side effects."""
import asyncio
import os
from langbot.pkg.agent.runner.host_models import (
AgentBinding,
AgentEventEnvelope,
BindingScope,
DeliveryPolicy,
StatePolicy,
)
from langbot.pkg.core import boot
from langbot.pkg.utils import platform as platform_utils
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext, SubjectContext
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
async def _run_probe():
previous_cwd = Path.cwd()
previous_standalone_runtime = platform_utils.standalone_runtime
os.chdir(agent_runner_e2e_tmpdir)
platform_utils.standalone_runtime = True
ap = None
try:
ap = await boot.make_app(asyncio.get_running_loop())
for _ in range(60):
handler = getattr(ap.plugin_connector, 'handler', None)
if handler is not None:
await handler.ping()
break
await asyncio.sleep(1)
else:
raise AssertionError('Plugin runtime did not connect')
for _ in range(60):
runners = await ap.agent_runner_registry.list_runners(use_cache=False)
if any(runner.id == QA_RUNNER_ID for runner in runners):
break
await asyncio.sleep(1)
else:
raise AssertionError(f'{QA_RUNNER_ID} was not discovered')
event = AgentEventEnvelope(
event_id='e2e-orchestrator-event-001',
event_type='message.received',
source='api',
conversation_id='e2e-conversation',
thread_id='e2e-thread',
actor=ActorContext(actor_type='user', actor_id='user-001', actor_name='E2E User'),
subject=SubjectContext(subject_type='chat', subject_id='chat-001'),
input=AgentInput(text='hello from orchestrator e2e'),
delivery=DeliveryContext(surface='e2e'),
)
binding = AgentBinding(
binding_id='e2e-binding',
scope=BindingScope(scope_type='global'),
runner_id=QA_RUNNER_ID,
state_policy=StatePolicy(enable_state=True, state_scopes=['conversation']),
delivery_policy=DeliveryPolicy(enable_streaming=False, enable_reply=True),
)
return [message async for message in ap.agent_run_orchestrator.run(event, binding)]
finally:
if ap is not None:
ap.dispose()
platform_utils.standalone_runtime = previous_standalone_runtime
os.chdir(previous_cwd)
messages = asyncio.run(_run_probe())
assert len(messages) == 1
assert messages[0].role == 'assistant'
assert messages[0].content == 'e2e echo: hello from orchestrator e2e'
db_path = agent_runner_e2e_tmpdir / 'data' / 'langbot.db'
conn = sqlite3.connect(str(db_path))
try:
run_row = conn.execute(
"SELECT status, runner_id FROM agent_run WHERE event_id = 'e2e-orchestrator-event-001'"
).fetchone()
assert run_row == ('completed', QA_RUNNER_ID)
event_types = {
row[0]
for row in conn.execute(
"SELECT type FROM agent_run_event WHERE run_id = (SELECT run_id FROM agent_run WHERE event_id = 'e2e-orchestrator-event-001')"
).fetchall()
}
assert {'state.updated', 'message.completed', 'run.completed'}.issubset(event_types)
state_row = conn.execute(
"SELECT value_json FROM agent_runner_state WHERE state_key = 'e2e.echo_count'"
).fetchone()
assert state_row is not None
assert '"count": 1' in state_row[0]
finally:
conn.close()
def test_pluginized_agent_runner_executes_through_runtime(agent_runner_client, agent_runner_langbot_process):
"""The Host debug surface should invoke the QA runner through the real Plugin Runtime."""
token = _init_and_auth(agent_runner_client)
start = time.time()
while time.time() - start < 60:
metadata_response = agent_runner_client.get(
'/api/v1/pipelines/_/metadata',
headers={'Authorization': f'Bearer {token}'},
)
assert metadata_response.status_code == 200
metadata = metadata_response.json()['data']['configs']
ai_metadata = next(group for group in metadata if group.get('name') == 'ai')
runner_stage = next(stage for stage in ai_metadata['stages'] if stage['name'] == 'runner')
runner_select = next(item for item in runner_stage['config'] if item['name'] == 'id')
if QA_RUNNER_ID in {option['name'] for option in runner_select['options']}:
break
time.sleep(2)
else:
pytest.fail(f'{QA_RUNNER_ID} was not discovered before run_agent')
response = agent_runner_client.post(
'/api/v1/system/debug/plugin/action',
headers={'Authorization': f'Bearer {token}'},
json={
'action': 'run_agent',
'timeout': 60,
'data': {
'plugin_author': 'e2e',
'plugin_name': 'agent-runner-qa',
'runner_name': 'default',
'context': {
'run_id': 'e2e-run-001',
'trigger': {'type': 'message.received'},
'event': {
'event_id': 'e2e-event-001',
'event_type': 'message.received',
'source': 'api',
},
'input': {'text': 'hello from real e2e'},
'delivery': {'surface': 'e2e'},
'resources': {},
'runtime': {},
},
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload['code'] == 0
result = payload['data']
assert result['type'] == 'message.completed', result
assert result['data']['message']['role'] == 'assistant'
assert result['data']['message']['content'] == 'e2e echo: hello from real e2e'
+25
View File
@@ -141,6 +141,27 @@ def create_minimal_config(tmpdir: Path, port: int = 15300) -> Path:
'delete_batch_size': 1000,
},
},
'box': {
'enabled': False,
'backend': 'local',
'runtime': {
'endpoint': '',
},
'local': {
'profile': 'default',
'image': '',
'host_root': str(tmpdir / 'box'),
'default_workspace': '',
'skills_root': 'skills',
'allowed_mount_roots': [str(tmpdir / 'box'), '/tmp'],
'workspace_quota_mb': None,
},
'e2b': {
'api_key': '',
'api_url': '',
'template': '',
},
},
'space': {
'url': 'https://space.langbot.app',
'models_gateway_api_url': 'https://api.langbot.cloud/v1',
@@ -168,8 +189,12 @@ def create_test_directories(tmpdir: Path) -> dict[str, Path]:
"""Create necessary directories for LangBot testing."""
directories = {
'data': tmpdir / 'data',
'labels': tmpdir / 'data' / 'labels',
'metadata': tmpdir / 'data' / 'metadata',
'logs': tmpdir / 'logs',
'data_logs': tmpdir / 'data' / 'logs',
'storage': tmpdir / 'storage',
'box': tmpdir / 'box',
'chroma': tmpdir / 'chroma',
}
+40 -7
View File
@@ -9,6 +9,7 @@ import subprocess
import time
import signal
import os
import sys
from pathlib import Path
from typing import Optional
import logging
@@ -26,13 +27,21 @@ class LangBotProcess:
port: int = 15300,
timeout: int = 30,
collect_coverage: bool = True,
debug: bool = False,
cli_args: list[str] | None = None,
):
self.project_root = project_root
self.work_dir = work_dir # Directory containing data/config.yaml
self.port = port
self.timeout = timeout
self.collect_coverage = collect_coverage
self.debug = debug
self.cli_args = cli_args or []
self.process: Optional[subprocess.Popen] = None
self._stdout_file = None
self._stderr_file = None
self._stdout_path = self.work_dir / 'langbot.stdout.log'
self._stderr_path = self.work_dir / 'langbot.stderr.log'
self._stdout_data: bytes = b''
self._stderr_data: bytes = b''
self._coverage_file: Optional[Path] = None
@@ -63,13 +72,14 @@ class LangBotProcess:
# Disable telemetry
env['SPACE__DISABLE_TELEMETRY'] = 'true'
env['SPACE__DISABLE_MODELS_SERVICE'] = 'true'
if self.debug:
env['DEBUG'] = 'true'
# Build command
if self.collect_coverage:
# Use coverage.py to collect coverage data
# Set COVERAGE_PROCESS_START to enable coverage in subprocess
self._coverage_file = self.work_dir / '.coverage.e2e'
env['COVERAGE_PROCESS_START'] = str(self.project_root / '.coveragerc')
env['COVERAGE_FILE'] = str(self._coverage_file)
# Create .coveragerc for subprocess
@@ -88,27 +98,33 @@ precision = 2
coveragerc_path = self.work_dir / '.coveragerc'
with open(coveragerc_path, 'w') as f:
f.write(coveragerc_content)
env['COVERAGE_PROCESS_START'] = str(coveragerc_path)
cmd = [
sys.executable,
'-m',
'coverage',
'run',
'--rcfile=' + str(coveragerc_path),
'-m',
'langbot',
*self.cli_args,
]
else:
cmd = ['uv', 'run', 'python', '-m', 'langbot']
cmd = [sys.executable, '-m', 'langbot', *self.cli_args]
logger.info(f'Starting LangBot in: {self.work_dir}')
logger.info(f'Command: {cmd}')
# Start process (run in work_dir so it finds data/config.yaml)
self._stdout_file = open(self._stdout_path, 'wb')
self._stderr_file = open(self._stderr_path, 'wb')
self.process = subprocess.Popen(
cmd,
cwd=self.work_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=self._stdout_file,
stderr=self._stderr_file,
preexec_fn=os.setsid if os.name != 'nt' else None,
)
@@ -117,7 +133,14 @@ precision = 2
while time.time() - start_time < self.timeout:
# Check if process died
if self.process.poll() is not None:
self._stdout_data, self._stderr_data = self.process.communicate()
if self._stdout_file:
self._stdout_file.close()
self._stdout_file = None
if self._stderr_file:
self._stderr_file.close()
self._stderr_file = None
self._stdout_data = self._stdout_path.read_bytes() if self._stdout_path.exists() else b''
self._stderr_data = self._stderr_path.read_bytes() if self._stderr_path.exists() else b''
logger.error(f'LangBot process died: {self._stderr_data.decode()}')
return False
@@ -170,8 +193,14 @@ precision = 2
self.process.wait()
# Collect output for debugging
if self.process.stdout or self.process.stderr:
self._stdout_data, self._stderr_data = self.process.communicate()
if self._stdout_file:
self._stdout_file.close()
self._stdout_file = None
if self._stderr_file:
self._stderr_file.close()
self._stderr_file = None
self._stdout_data = self._stdout_path.read_bytes() if self._stdout_path.exists() else b''
self._stderr_data = self._stderr_path.read_bytes() if self._stderr_path.exists() else b''
self.process = None
@@ -183,6 +212,10 @@ precision = 2
"""Get stdout and stderr logs."""
stdout = self._stdout_data.decode('utf-8', errors='replace')
stderr = self._stderr_data.decode('utf-8', errors='replace')
if not stdout and self._stdout_path.exists():
stdout = self._stdout_path.read_text(encoding='utf-8', errors='replace')
if not stderr and self._stderr_path.exists():
stderr = self._stderr_path.read_text(encoding='utf-8', errors='replace')
return stdout, stderr
def get_coverage_file(self) -> Optional[Path]:
@@ -33,12 +33,11 @@ class FakeConnection:
class FakeApplication:
def __init__(self, db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
def __init__(self, db_engine, admin_plugins=None, runner_registry=None):
self.logger = MagicMock()
self.persistence_mgr = MagicMock()
self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine)
self.agent_runner_registry = runner_registry
self.agent_run_orchestrator = orchestrator
self.instance_config = SimpleNamespace(
data={
'agent_runner': {
@@ -73,66 +72,16 @@ class FakeRunnerRegistry:
self.runners = runners
self.calls = []
async def get(self, runner_id, bound_plugins=None):
self.calls.append({'runner_id': runner_id, 'bound_plugins': bound_plugins})
if isinstance(self.runners, dict):
if runner_id not in self.runners:
raise KeyError(runner_id)
return self.runners[runner_id]
for runner in self.runners:
if getattr(runner, 'id', None) == runner_id:
return runner
raise KeyError(runner_id)
async def list_runners(self, *, bound_plugins=None, use_cache=True):
self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache})
return self.runners
class FakeProgrammaticOrchestrator:
def __init__(self, db_engine):
self.db_engine = db_engine
self.calls = []
async def run(self, event, binding, bound_plugins=None, adapter_context=None, run_id=None):
self.calls.append(
{
'event': event,
'binding': binding,
'bound_plugins': bound_plugins,
'adapter_context': adapter_context,
'run_id': run_id,
}
)
store = RunLedgerStore(self.db_engine)
await store.create_run(
run_id=run_id,
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=binding.runner_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
authorization={'available_apis': {}},
metadata={'event_type': event.event_type, 'source': event.source},
status='running',
)
await store.finalize_run(run_id=run_id, status='completed', status_reason='done')
if False:
yield None
def _handler(db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
def _handler(db_engine, admin_plugins=None, runner_registry=None):
async def fake_disconnect():
return True
fake_app = FakeApplication(
db_engine,
admin_plugins=admin_plugins,
runner_registry=runner_registry,
orchestrator=orchestrator,
)
fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins, runner_registry=runner_registry)
return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app)
@@ -1311,64 +1260,6 @@ async def test_runtime_register_heartbeat_and_list_actions(session_registry, db_
assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1']
@pytest.mark.asyncio
async def test_admin_run_create_starts_programmatic_run(db_engine):
orchestrator = FakeProgrammaticOrchestrator(db_engine)
runner_id = 'plugin:test/runner/default'
registry = FakeRunnerRegistry({runner_id: SimpleNamespace(id=runner_id)})
handler = _handler(
db_engine,
admin_plugins=[{'identity': 'admin/plugin', 'permissions': ['agent_run:admin']}],
runner_registry=registry,
orchestrator=orchestrator,
)
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
result = await run_create(
{
'caller_plugin_identity': 'admin/plugin',
'run_id': 'run_programmatic',
'runner_id': runner_id,
'input': {'text': 'work on issue 1', 'contents': [], 'attachments': []},
'conversation_id': 'conv_issue_board',
'event_type': 'api.invoked',
'wait_for_completion': True,
}
)
assert result.code == 0
assert result.data['run_id'] == 'run_programmatic'
assert result.data['runner_id'] == runner_id
assert result.data['status'] == 'completed'
assert len(orchestrator.calls) == 1
call = orchestrator.calls[0]
assert call['run_id'] == 'run_programmatic'
assert call['event'].event_type == 'api.invoked'
assert call['event'].source == 'api'
assert call['event'].conversation_id == 'conv_issue_board'
assert call['binding'].runner_id == runner_id
assert call['binding'].delivery_policy.enable_reply is False
@pytest.mark.asyncio
async def test_run_create_requires_admin_permission(db_engine):
orchestrator = FakeProgrammaticOrchestrator(db_engine)
handler = _handler(db_engine, orchestrator=orchestrator)
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
result = await run_create(
{
'caller_plugin_identity': 'regular/plugin',
'runner_id': 'plugin:test/runner/default',
'input': {'text': 'work'},
}
)
assert result.code != 0
assert 'not authorized' in result.message
assert orchestrator.calls == []
@pytest.mark.asyncio
async def test_run_claim_renew_and_release_actions(session_registry, db_engine):
await _register_session(
@@ -0,0 +1,309 @@
from __future__ import annotations
import datetime as dt
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
from langbot.pkg.api.http.service.agent import (
AGENT_DEFAULT_EVENT_PATTERNS,
AGENT_KIND_AGENT,
AGENT_KIND_PIPELINE,
PIPELINE_EVENT_PATTERNS,
AgentService,
)
pytestmark = pytest.mark.asyncio
def _result(items: list | None = None, first_item=None):
result = Mock()
result.all = Mock(return_value=items or [])
result.first = Mock(return_value=first_item)
return result
def _agent_row(
agent_uuid: str = 'agent-1',
name: str = 'Test Agent',
updated_at: dt.datetime | str | None = None,
config: dict | None = None,
supported_event_patterns: list[str] | None = None,
):
return SimpleNamespace(
uuid=agent_uuid,
name=name,
description='Agent description',
emoji='A',
kind=AGENT_KIND_AGENT,
component_ref='plugin:test/runner/default',
config=config or {
'runner': {'id': 'plugin:test/runner/default', 'expire-time': 0},
'runner_config': {'plugin:test/runner/default': {'temperature': 0.2}},
},
enabled=True,
supported_event_patterns=supported_event_patterns or ['*'],
created_at=dt.datetime(2026, 1, 1, 9, 0, 0),
updated_at=updated_at or dt.datetime(2026, 1, 1, 10, 0, 0),
)
def _serialize_agent(model_cls, entity, masked_columns=None):
return {
'uuid': entity.uuid,
'name': entity.name,
'description': entity.description,
'emoji': entity.emoji,
'kind': entity.kind,
'component_ref': entity.component_ref,
'config': entity.config,
'enabled': entity.enabled,
'supported_event_patterns': entity.supported_event_patterns,
'created_at': entity.created_at,
'updated_at': entity.updated_at,
}
def _compiled_params(statement):
return statement.compile().params
def _compiled_update_values(statement):
return {
key: value
for key, value in statement.compile().params.items()
if not key.startswith('uuid_')
}
def _make_app():
app = SimpleNamespace()
app.persistence_mgr = SimpleNamespace(
execute_async=AsyncMock(),
serialize_model=Mock(side_effect=_serialize_agent),
)
app.pipeline_service = SimpleNamespace(
get_pipeline_metadata=AsyncMock(return_value=[]),
get_pipelines=AsyncMock(return_value=[]),
get_pipeline=AsyncMock(return_value=None),
create_pipeline=AsyncMock(),
update_pipeline=AsyncMock(),
delete_pipeline=AsyncMock(),
_get_default_values_from_schema=Mock(return_value={}),
)
app.agent_runner_registry = None
app.logger = Mock()
return app
class TestAgentServiceMetadata:
async def test_get_agent_metadata_exposes_runner_config_and_kind_capabilities(self):
app = _make_app()
ai_metadata = {'name': 'ai', 'stages': [{'name': 'runner'}]}
app.pipeline_service.get_pipeline_metadata = AsyncMock(
return_value=[{'name': 'trigger'}, ai_metadata, {'name': 'output'}]
)
metadata = await AgentService(app).get_agent_metadata()
assert metadata['runner_config'] == ai_metadata
assert metadata['kinds'] == [
{
'name': AGENT_KIND_AGENT,
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
'message_only': False,
},
{
'name': AGENT_KIND_PIPELINE,
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
'message_only': True,
},
]
class TestAgentServiceListAndLookup:
async def test_get_agents_merges_agents_and_pipelines_without_leaking_config(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(
return_value=_result(
items=[
_agent_row(
agent_uuid='agent-1',
updated_at=dt.datetime(2026, 1, 1, 10, 0, 0),
supported_event_patterns=['platform.member.*'],
)
]
)
)
app.pipeline_service.get_pipelines = AsyncMock(
return_value=[
{
'uuid': 'pipeline-1',
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
'config': {'ai': {'runner': {'id': 'pipeline-runner'}}},
'created_at': '2026-01-01T08:00:00',
'updated_at': '2026-01-01T11:00:00',
}
]
)
agents = await AgentService(app).get_agents(sort_by='updated_at', sort_order='DESC')
assert [agent['uuid'] for agent in agents] == ['pipeline-1', 'agent-1']
assert agents[0]['kind'] == AGENT_KIND_PIPELINE
assert agents[0]['component_ref'] == 'pipeline'
assert agents[0]['capability'] == {
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
'message_only': True,
}
assert agents[1]['kind'] == AGENT_KIND_AGENT
assert agents[1]['capability'] == {
'supported_event_patterns': ['platform.member.*'],
'message_only': False,
}
assert all('config' not in agent for agent in agents)
async def test_get_agent_returns_agent_with_config_before_pipeline_fallback(self):
app = _make_app()
agent = _agent_row(agent_uuid='agent-1')
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=agent))
result = await AgentService(app).get_agent('agent-1')
assert result['uuid'] == 'agent-1'
assert result['kind'] == AGENT_KIND_AGENT
assert result['config'] == agent.config
app.pipeline_service.get_pipeline.assert_not_awaited()
async def test_get_agent_falls_back_to_pipeline_product_item_with_config(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=None))
app.pipeline_service.get_pipeline = AsyncMock(
return_value={
'uuid': 'pipeline-1',
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
'config': {'ai': {'runner': {'id': 'pipeline-runner'}}},
'created_at': '2026-01-01T08:00:00',
'updated_at': '2026-01-01T11:00:00',
}
)
result = await AgentService(app).get_agent('pipeline-1')
assert result['kind'] == AGENT_KIND_PIPELINE
assert result['enabled'] is True
assert result['config'] == {'ai': {'runner': {'id': 'pipeline-runner'}}}
assert result['capability']['message_only'] is True
class TestAgentServiceCreateUpdateDelete:
async def test_create_agent_uses_default_runner_config_from_registry(self):
app = _make_app()
runner = SimpleNamespace(
id='plugin:langbot/local-agent/default',
config_schema=[
{'name': 'model', 'default': 'gpt-4.1'},
{'name': 'temperature', 'default': 0.2},
{'name': 'no-default'},
],
)
app.agent_runner_registry = SimpleNamespace(list_runners=AsyncMock(return_value=[runner]))
app.pipeline_service._get_default_values_from_schema = Mock(
return_value={'model': 'gpt-4.1', 'temperature': 0.2}
)
app.persistence_mgr.execute_async = AsyncMock(return_value=Mock())
result = await AgentService(app).create_agent(
{
'name': 'Support Agent',
'description': 'Handles support events',
'emoji': 'S',
}
)
insert_values = _compiled_params(app.persistence_mgr.execute_async.await_args.args[0])
assert result['kind'] == AGENT_KIND_AGENT
assert result['uuid'] == insert_values['uuid']
assert insert_values['name'] == 'Support Agent'
assert insert_values['component_ref'] == runner.id
assert insert_values['config'] == {
'runner': {'id': runner.id, 'expire-time': 0},
'runner_config': {runner.id: {'model': 'gpt-4.1', 'temperature': 0.2}},
}
assert insert_values['enabled'] is True
assert insert_values['supported_event_patterns'] == AGENT_DEFAULT_EVENT_PATTERNS
app.pipeline_service._get_default_values_from_schema.assert_called_once_with(runner.config_schema)
async def test_update_agent_protects_immutable_fields_and_recalculates_component_ref(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(
side_effect=[
_result(first_item=_agent_row(agent_uuid='agent-1')),
Mock(),
]
)
new_config = {
'runner': {'id': 'plugin:test/new-runner/default', 'expire-time': 0},
'runner_config': {'plugin:test/new-runner/default': {'timeout': 30}},
}
await AgentService(app).update_agent(
'agent-1',
{
'uuid': 'caller-owned-uuid',
'kind': AGENT_KIND_PIPELINE,
'created_at': '2020-01-01T00:00:00',
'updated_at': '2020-01-01T00:00:00',
'capability': {'message_only': True},
'name': 'Updated Agent',
'config': new_config,
'supported_event_patterns': [],
},
)
update_values = _compiled_update_values(app.persistence_mgr.execute_async.await_args_list[1].args[0])
assert update_values == {
'name': 'Updated Agent',
'config': new_config,
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
'component_ref': 'plugin:test/new-runner/default',
}
async def test_pipeline_kind_create_update_delete_delegate_to_pipeline_service(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=None))
app.pipeline_service.create_pipeline = AsyncMock(return_value='pipeline-created')
app.pipeline_service.get_pipeline = AsyncMock(return_value={'uuid': 'pipeline-1'})
service = AgentService(app)
created = await service.create_agent(
{
'kind': AGENT_KIND_PIPELINE,
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
}
)
await service.update_agent('pipeline-1', {'name': 'Updated Pipeline'})
await service.delete_agent('pipeline-1')
assert created == {'uuid': 'pipeline-created', 'kind': AGENT_KIND_PIPELINE}
app.pipeline_service.create_pipeline.assert_awaited_once_with(
{
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
'config': {},
}
)
app.pipeline_service.update_pipeline.assert_awaited_once_with(
'pipeline-1',
{'name': 'Updated Pipeline'},
)
app.pipeline_service.delete_pipeline.assert_awaited_once_with('pipeline-1')
@@ -52,6 +52,15 @@ def _create_mock_result(items: list = None, first_item=None):
return result
def _compiled_update_values(statement):
"""Return update values without SQLAlchemy WHERE bind params."""
return {
key: value
for key, value in statement.compile().params.items()
if not key.startswith('uuid_')
}
def _create_mock_discover(adapter_webhook_flags: dict[str, bool] = None):
"""Create mock ComponentDiscoveryEngine exposing MessagePlatformAdapter manifests.
@@ -531,6 +540,201 @@ class TestBotServiceUpdateBot:
assert update_params['use_pipeline_name'] == 'Updated Pipeline'
class TestBotServiceEventBindings:
"""Tests for EBA event binding validation and persistence."""
async def test_normalize_event_bindings_validates_targets_and_preserves_order(self):
"""Valid bindings are normalized with stable order and target data."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace()
ap.persistence_mgr.execute_async = AsyncMock(
side_effect=[
_create_mock_result(first_item=SimpleNamespace(uuid='pipeline-1')),
_create_mock_result(
first_item=SimpleNamespace(
uuid='agent-1',
supported_event_patterns=['platform.member.*'],
)
),
]
)
service = BotService(ap)
normalized = await service._normalize_event_bindings(
[
{
'event_pattern': ' message.received ',
'target_type': 'pipeline',
'target_uuid': ' pipeline-1 ',
'filters': [{'field': 'sender.id', 'operator': 'eq', 'value': '1000'}],
'priority': '5',
'enabled': False,
'description': 'Route message events',
},
{
'id': 'agent-binding',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'priority': 7,
},
{
'event_pattern': 'platform.member.left',
'target_type': 'discard',
'target_uuid': 'ignored-target',
},
]
)
uuid.UUID(normalized[0]['id'])
assert normalized == [
{
'id': normalized[0]['id'],
'event_pattern': 'message.received',
'target_type': 'pipeline',
'target_uuid': 'pipeline-1',
'filters': [{'field': 'sender.id', 'operator': 'eq', 'value': '1000'}],
'priority': 5,
'enabled': False,
'description': 'Route message events',
'order': 0,
},
{
'id': 'agent-binding',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'filters': [],
'priority': 7,
'enabled': True,
'description': '',
'order': 1,
},
{
'id': normalized[2]['id'],
'event_pattern': 'platform.member.left',
'target_type': 'discard',
'target_uuid': '',
'filters': [],
'priority': 0,
'enabled': True,
'description': '',
'order': 2,
},
]
async def test_normalize_event_bindings_rejects_pipeline_for_non_message_event(self):
"""Pipeline targets are limited to message events."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace(execute_async=AsyncMock())
service = BotService(ap)
with pytest.raises(ValueError, match='Pipeline can only be bound to message events'):
await service._normalize_event_bindings(
[
{
'event_pattern': 'platform.member.joined',
'target_type': 'pipeline',
'target_uuid': 'pipeline-1',
}
]
)
ap.persistence_mgr.execute_async.assert_not_awaited()
@pytest.mark.parametrize(
('agent', 'error'),
[
(None, 'Agent not found'),
(
SimpleNamespace(uuid='agent-1', supported_event_patterns=['message.*']),
'Agent does not support this event pattern',
),
],
)
async def test_normalize_event_bindings_rejects_invalid_agent_target(self, agent, error):
"""Agent targets must exist and support the requested event pattern."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace(
execute_async=AsyncMock(return_value=_create_mock_result(first_item=agent))
)
service = BotService(ap)
with pytest.raises(ValueError, match=error):
await service._normalize_event_bindings(
[
{
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
}
]
)
async def test_update_bot_persists_normalized_event_bindings_and_reloads_runtime_bot(self):
"""update_bot stores normalized bindings before reloading the runtime bot."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace()
ap.persistence_mgr.execute_async = AsyncMock(
side_effect=[
_create_mock_result(
first_item=SimpleNamespace(
uuid='agent-1',
supported_event_patterns=['platform.member.*'],
)
),
Mock(),
]
)
runtime_bot = SimpleNamespace(enable=True, run=AsyncMock())
loaded_bot = {'uuid': 'bot-1', 'name': 'Bot with bindings'}
ap.platform_mgr = SimpleNamespace(
remove_bot=AsyncMock(),
load_bot=AsyncMock(return_value=runtime_bot),
)
bot_session = SimpleNamespace(using_conversation=SimpleNamespace(bot_uuid='bot-1'))
other_session = SimpleNamespace(using_conversation=SimpleNamespace(bot_uuid='other-bot'))
ap.sess_mgr = SimpleNamespace(session_list=[bot_session, other_session])
service = BotService(ap)
service.get_bot = AsyncMock(return_value=loaded_bot)
await service.update_bot(
'bot-1',
{
'event_bindings': [
{
'id': 'binding-1',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'priority': '9',
}
]
},
)
update_values = _compiled_update_values(ap.persistence_mgr.execute_async.await_args_list[1].args[0])
assert update_values['event_bindings'] == [
{
'id': 'binding-1',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'filters': [],
'priority': 9,
'enabled': True,
'description': '',
'order': 0,
}
]
ap.platform_mgr.remove_bot.assert_awaited_once_with('bot-1')
service.get_bot.assert_awaited_once_with('bot-1')
ap.platform_mgr.load_bot.assert_awaited_once_with(loaded_bot)
runtime_bot.run.assert_awaited_once()
assert bot_session.using_conversation is None
assert other_session.using_conversation.bot_uuid == 'other-bot'
class TestBotServiceDeleteBot:
"""Tests for delete_bot method."""
@@ -2,6 +2,7 @@
RuntimeBot.resolve_pipeline_uuid and _match_operator unit tests
"""
from types import SimpleNamespace
from unittest.mock import Mock
@@ -278,3 +279,108 @@ class TestResolvePipelineUuid:
uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'normal message')
assert uuid == 'default-uuid'
assert routed is False
class TestEBAEventBindings:
"""Test RuntimeBot EBA event binding helpers."""
@staticmethod
def _make_bot(bindings):
from langbot.pkg.platform.botmgr import RuntimeBot
bot = object.__new__(RuntimeBot)
bot.bot_entity = SimpleNamespace(event_bindings=bindings)
return bot
def test_resolve_eba_event_binding_uses_enabled_pattern_filters_priority_and_order(self):
"""The selected binding is the first matching highest-priority binding."""
bot = self._make_bot(
[
{
'id': 'disabled',
'enabled': False,
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-disabled',
'priority': 100,
'order': 0,
},
{
'id': 'wrong-room',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-wrong-room',
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-2'}],
'priority': 50,
'order': 1,
},
{
'id': 'first-high',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-first',
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-1'}],
'priority': 10,
'order': 2,
},
{
'id': 'second-high',
'event_pattern': 'platform.member.*',
'target_type': 'agent',
'target_uuid': 'agent-second',
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-1'}],
'priority': 10,
'order': 3,
},
{
'id': 'fallback',
'event_pattern': '*',
'target_type': 'discard',
'priority': 1,
'order': 4,
},
]
)
selected = bot._resolve_eba_event_binding(
{'room': {'id': 'room-1'}},
'platform.member.joined',
)
assert selected['id'] == 'first-high'
assert selected['target_uuid'] == 'agent-first'
def test_agent_product_to_binding_projects_runner_config_and_policies(self):
"""Agent products become bot-scoped runner bindings for EBA dispatch."""
from langbot.pkg.platform.botmgr import RuntimeBot
binding = RuntimeBot._agent_product_to_binding(
{
'uuid': 'agent-1',
'component_ref': 'plugin:test/fallback/default',
'config': {
'runner': {'id': 'plugin:test/runner/default'},
'runner_config': {
'plugin:test/runner/default': {
'temperature': 0.2,
'max_tokens': 1000,
}
},
},
},
{'id': 'binding-1'},
'platform.member.joined',
'bot-1',
)
assert binding is not None
assert binding.binding_id == 'bot:bot-1:binding-1'
assert binding.scope.scope_type == 'bot'
assert binding.scope.scope_id == 'bot-1'
assert binding.event_types == ['platform.member.joined']
assert binding.runner_id == 'plugin:test/runner/default'
assert binding.runner_config == {'temperature': 0.2, 'max_tokens': 1000}
assert binding.delivery_policy.enable_streaming is False
assert binding.delivery_policy.enable_reply is True
assert binding.state_policy.state_scopes == ['conversation', 'actor', 'subject', 'runner']
assert binding.agent_id == 'agent-1'