Compare commits

..

2 Commits

Author SHA1 Message Date
huanghuoguoguo d02e1a14a3 test(agent): cover agent service event bindings 2026-06-25 00:04:59 +08:00
huanghuoguoguo cd6a39d3a2 test(agent): cover pluginized agent runner runtime 2026-06-24 20:46:32 +08:00
13 changed files with 1169 additions and 454 deletions
@@ -220,7 +220,6 @@ class AgentRunContextBuilder:
binding: AgentBinding, binding: AgentBinding,
descriptor: AgentRunnerDescriptor, descriptor: AgentRunnerDescriptor,
resources: AgentResources, resources: AgentResources,
run_id: str | None = None,
) -> AgentRunContextPayload: ) -> AgentRunContextPayload:
"""Build AgentRunContext from event-first envelope. """Build AgentRunContext from event-first envelope.
@@ -236,8 +235,8 @@ class AgentRunContextBuilder:
Returns: Returns:
AgentRunContextPayload for the runner AgentRunContextPayload for the runner
""" """
# Generate new run_id unless an API caller already reserved one. # Generate new run_id
run_id = run_id or str(uuid.uuid4()) run_id = str(uuid.uuid4())
# Build trigger from event # Build trigger from event
trigger: AgentTrigger = { trigger: AgentTrigger = {
@@ -68,7 +68,6 @@ class AgentRunOrchestrator:
binding: AgentBinding, binding: AgentBinding,
bound_plugins: list[str] | None = None, bound_plugins: list[str] | None = None,
adapter_context: dict[str, typing.Any] | None = None, adapter_context: dict[str, typing.Any] | None = None,
run_id: str | None = None,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run an AgentRunner from an event-first envelope.""" """Run an AgentRunner from an event-first envelope."""
runner_id = binding.runner_id runner_id = binding.runner_id
@@ -85,7 +84,6 @@ class AgentRunOrchestrator:
binding=binding, binding=binding,
descriptor=descriptor, descriptor=descriptor,
resources=resources, resources=resources,
run_id=run_id,
) )
session_query_id = None session_query_id = None
+4 -2
View File
@@ -146,8 +146,10 @@ class AgentRunnerRegistry:
Returns: Returns:
List of runner descriptors List of runner descriptors
""" """
if use_cache and self._cache is not None: if use_cache and self._cache is not None and self._cache:
# Filter from cache # Filter from cache. Do not treat an empty cache as final because the
# plugin runtime may still be launching installed plugins when the
# first metadata request arrives.
return self._filter_runners_by_bound_plugins(self._cache, bound_plugins) return self._filter_runners_by_bound_plugins(self._cache, bound_plugins)
# Discover fresh (always full list) # Discover fresh (always full list)
@@ -10,7 +10,7 @@ ssereadtimeout) live in ``extra_args`` and are left untouched — the
auto-detecting remote transport consumes them regardless. auto-detecting remote transport consumes them regardless.
Revision ID: 0006_normalize_mcp_remote_mode Revision ID: 0006_normalize_mcp_remote_mode
Revises: 0005_add_llm_context_length Revises: 8d3a1f2c4b6e
Create Date: 2026-06-21 Create Date: 2026-06-21
""" """
@@ -18,7 +18,7 @@ import sqlalchemy as sa
from alembic import op from alembic import op
revision = '0006_normalize_mcp_remote_mode' revision = '0006_normalize_mcp_remote_mode'
down_revision = '0005_add_llm_context_length' down_revision = '8d3a1f2c4b6e'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@@ -3,24 +3,12 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any
import asyncio
import time import time
import uuid
from langbot_plugin.runtime.io import handler from langbot_plugin.runtime.io import handler
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from ..agent.runner.host_models import (
AgentBinding,
AgentEventEnvelope,
BindingScope,
DeliveryPolicy,
ResourcePolicy,
StatePolicy,
)
from ..agent.runner.run_ledger_store import TERMINAL_STATUSES from ..agent.runner.run_ledger_store import TERMINAL_STATUSES
from .agent_run_support import ( from .agent_run_support import (
@@ -42,320 +30,7 @@ from .agent_run_support import (
) )
def _dict_payload(value: Any, *, field_name: str) -> tuple[dict[str, Any], handler.ActionResponse | None]:
if value is None:
return {}, None
if not isinstance(value, dict):
return {}, handler.ActionResponse.error(message=f'{field_name} must be an object')
return dict(value), None
def _build_run_create_event(data: dict[str, Any], *, run_id: str) -> tuple[AgentEventEnvelope | None, handler.ActionResponse | None]:
event_payload, error = _dict_payload(data.get('event'), field_name='event')
if error:
return None, error
input_payload = event_payload.get('input', data.get('input'))
if input_payload is None:
input_payload = {
'text': data.get('text'),
'contents': [],
'attachments': [],
}
if not isinstance(input_payload, dict):
return None, handler.ActionResponse.error(message='input must be an object')
delivery_payload = event_payload.get('delivery', data.get('delivery'))
if delivery_payload is None:
delivery_payload = {
'surface': 'api',
'reply_target': None,
'supports_streaming': False,
'supports_edit': False,
'supports_reaction': False,
'platform_capabilities': {},
}
if not isinstance(delivery_payload, dict):
return None, handler.ActionResponse.error(message='delivery must be an object')
event_data = event_payload.get('data', data.get('data'))
if event_data is None:
event_data = {}
if not isinstance(event_data, dict):
return None, handler.ActionResponse.error(message='event data must be an object')
event_type = str(event_payload.get('event_type') or data.get('event_type') or 'api.invoked')
source = str(event_payload.get('source') or data.get('source') or 'api')
event_id = str(event_payload.get('event_id') or data.get('event_id') or f'{source}:{run_id}')
event_time = event_payload.get('event_time', data.get('event_time'))
if event_time is None:
event_time = int(time.time())
try:
envelope = AgentEventEnvelope(
event_id=event_id,
event_type=event_type,
event_time=int(event_time) if isinstance(event_time, (int, float, str)) else None,
source=source,
source_event_type=event_payload.get('source_event_type') or data.get('source_event_type') or event_type,
bot_id=event_payload.get('bot_id', data.get('bot_id')),
workspace_id=event_payload.get('workspace_id', data.get('workspace_id')),
conversation_id=event_payload.get('conversation_id', data.get('conversation_id')),
thread_id=event_payload.get('thread_id', data.get('thread_id')),
actor=event_payload.get('actor', data.get('actor')),
subject=event_payload.get('subject', data.get('subject')),
input=AgentInput.model_validate(input_payload),
delivery=DeliveryContext.model_validate(delivery_payload),
raw_ref=event_payload.get('raw_ref', data.get('raw_ref')),
data=event_data,
)
except Exception as exc:
return None, handler.ActionResponse.error(message=f'invalid event payload: {exc}')
return envelope, None
def _build_run_create_binding(
data: dict[str, Any],
*,
event: AgentEventEnvelope,
run_id: str,
) -> tuple[AgentBinding | None, handler.ActionResponse | None]:
binding_payload, error = _dict_payload(data.get('binding'), field_name='binding')
if error:
return None, error
runner_id = binding_payload.get('runner_id') or data.get('runner_id')
if not runner_id:
return None, handler.ActionResponse.error(message='runner_id is required')
scope_payload = binding_payload.get('scope')
if scope_payload is None:
agent_id = binding_payload.get('agent_id') or data.get('agent_id')
workspace_id = event.workspace_id
bot_id = event.bot_id
if agent_id:
scope_payload = {'scope_type': 'agent', 'scope_id': agent_id}
elif bot_id:
scope_payload = {'scope_type': 'bot', 'scope_id': bot_id}
elif workspace_id:
scope_payload = {'scope_type': 'workspace', 'scope_id': workspace_id}
else:
scope_payload = {'scope_type': 'global', 'scope_id': None}
if not isinstance(scope_payload, dict):
return None, handler.ActionResponse.error(message='binding.scope must be an object')
runner_config_payload = binding_payload.get('runner_config', data.get('runner_config'))
if runner_config_payload is None:
runner_config_payload = {}
if not isinstance(runner_config_payload, dict):
return None, handler.ActionResponse.error(message='runner_config must be an object')
resource_policy_payload = binding_payload.get('resource_policy', data.get('resource_policy'))
if resource_policy_payload is None:
resource_policy_payload = {}
if not isinstance(resource_policy_payload, dict):
return None, handler.ActionResponse.error(message='resource_policy must be an object')
state_policy_payload = binding_payload.get('state_policy', data.get('state_policy'))
if state_policy_payload is None:
state_policy_payload = {}
if not isinstance(state_policy_payload, dict):
return None, handler.ActionResponse.error(message='state_policy must be an object')
delivery_policy_payload = binding_payload.get('delivery_policy', data.get('delivery_policy'))
if delivery_policy_payload is None:
delivery_policy_payload = {
'enable_streaming': bool(event.delivery.supports_streaming),
'enable_reply': False,
}
if not isinstance(delivery_policy_payload, dict):
return None, handler.ActionResponse.error(message='delivery_policy must be an object')
try:
binding = AgentBinding(
binding_id=str(binding_payload.get('binding_id') or data.get('binding_id') or f'api:{runner_id}:{run_id}'),
scope=BindingScope.model_validate(scope_payload),
event_types=list(binding_payload.get('event_types') or data.get('event_types') or [event.event_type]),
runner_id=str(runner_id),
runner_config=runner_config_payload,
resource_policy=ResourcePolicy.model_validate(resource_policy_payload),
state_policy=StatePolicy.model_validate(state_policy_payload),
delivery_policy=DeliveryPolicy.model_validate(delivery_policy_payload),
enabled=bool(binding_payload.get('enabled', data.get('enabled', True))),
agent_id=binding_payload.get('agent_id') or data.get('agent_id'),
)
except Exception as exc:
return None, handler.ActionResponse.error(message=f'invalid binding payload: {exc}')
if event.event_type not in binding.event_types:
return None, handler.ActionResponse.error(
message=f'binding.event_types must include event type {event.event_type}'
)
return binding, None
async def _consume_programmatic_run(
h,
*,
run_id: str,
event: AgentEventEnvelope,
binding: AgentBinding,
bound_plugins: list[str] | None,
) -> None:
async for _result in h.ap.agent_run_orchestrator.run(
event,
binding,
bound_plugins=bound_plugins,
run_id=run_id,
):
pass
def register(h): def register(h):
@h.action(_plugin_runtime_action('RUN_CREATE', 'run_create'))
async def run_create(data: dict[str, Any]) -> handler.ActionResponse:
"""Create a programmatic AgentRunner run from an explicit event and binding."""
caller_plugin_identity = data.get('caller_plugin_identity')
if not _has_agent_runner_admin_permission(
h.ap,
caller_plugin_identity,
AGENT_RUN_ADMIN_PERMISSION,
):
return handler.ActionResponse.error(message='Run create access not authorized')
orchestrator = getattr(h.ap, 'agent_run_orchestrator', None)
if orchestrator is None:
return handler.ActionResponse.error(message='AgentRunOrchestrator is not available')
run_id = str(data.get('run_id') or uuid.uuid4())
event, event_error = _build_run_create_event(data, run_id=run_id)
if event_error:
return event_error
assert event is not None
binding, binding_error = _build_run_create_binding(data, event=event, run_id=run_id)
if binding_error:
return binding_error
assert binding is not None
include_plugins = data.get('include_plugins')
if include_plugins is not None and not isinstance(include_plugins, list):
return handler.ActionResponse.error(message='include_plugins must be a list')
bound_plugins = [str(item) for item in include_plugins] if include_plugins else None
registry = getattr(h.ap, 'agent_runner_registry', None)
if registry is not None:
try:
await registry.get(binding.runner_id, bound_plugins)
except Exception as exc:
return handler.ActionResponse.error(message=f'Runner {binding.runner_id} is not available: {exc}')
async def background_run(*, raise_errors: bool = False) -> None:
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
try:
await _consume_programmatic_run(
h,
run_id=run_id,
event=event,
binding=binding,
bound_plugins=bound_plugins,
)
except Exception as exc:
h.ap.logger.error(f'RUN_CREATE background run {run_id} failed: {exc}', exc_info=True)
if await store.get_run(run_id) is None:
await store.create_run(
run_id=run_id,
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=binding.runner_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
agent_id=binding.agent_id,
authorization={
'runner_id': binding.runner_id,
'binding_id': binding.binding_id,
'plugin_identity': None,
'resources': {},
'available_apis': {},
'conversation_id': event.conversation_id,
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'thread_id': event.thread_id,
},
metadata={
'event_type': event.event_type,
'source': event.source,
'run_create_error': True,
},
status='running',
)
await store.finalize_run(
run_id=run_id,
status='failed',
status_reason=str(exc),
metadata={'run_create_error': True},
)
if raise_errors:
raise
if data.get('wait_for_completion'):
try:
await background_run(raise_errors=True)
except Exception as exc:
return handler.ActionResponse.error(message=f'Run create error: {exc}')
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
run = await store.get_run(run_id)
if run is not None:
await _record_agent_runner_admin_action(
h.ap,
store,
action='run_create',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=run_id,
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
)
return handler.ActionResponse.success(data=run)
else:
asyncio.create_task(background_run())
await _record_agent_runner_admin_action(
h.ap,
None,
action='run_create',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=run_id,
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
)
return handler.ActionResponse.success(
data={
'run_id': run_id,
'event_id': event.event_id,
'agent_id': binding.agent_id,
'binding_id': binding.binding_id,
'runner_id': binding.runner_id,
'conversation_id': event.conversation_id,
'thread_id': event.thread_id,
'workspace_id': event.workspace_id,
'bot_id': event.bot_id,
'status': 'created',
'metadata': {
'event_type': event.event_type,
'source': event.source,
'accepted': True,
},
}
)
@h.action(_plugin_runtime_action('RUN_GET', 'run_get')) @h.action(_plugin_runtime_action('RUN_GET', 'run_get'))
async def run_get(data: dict[str, Any]) -> handler.ActionResponse: async def run_get(data: dict[str, Any]) -> handler.ActionResponse:
"""Get one Host-owned run record visible to the current run.""" """Get one Host-owned run record visible to the current run."""
+1 -1
View File
@@ -61,7 +61,7 @@ def langbot_process(e2e_config_path, e2e_port, e2e_tmpdir):
project_root=project_root, project_root=project_root,
work_dir=e2e_tmpdir, # Run in tmpdir where data/config.yaml exists work_dir=e2e_tmpdir, # Run in tmpdir where data/config.yaml exists
port=e2e_port, port=e2e_port,
timeout=60, # Longer timeout for first startup timeout=180, # Longer timeout for first startup
collect_coverage=collect_coverage, collect_coverage=collect_coverage,
) )
@@ -0,0 +1,473 @@
"""E2E tests for pluginized AgentRunner execution.
This module starts the real LangBot backend with the plugin system enabled and
loads a deterministic AgentRunner plugin through the real SDK Plugin Runtime.
"""
from __future__ import annotations
import shutil
import socket
import sqlite3
import subprocess
import tempfile
import textwrap
import time
from pathlib import Path
import httpx
import pytest
from tests.e2e.utils.config_factory import create_minimal_config, create_test_directories
from tests.e2e.utils.process_manager import LangBotProcess, find_project_root
pytestmark = pytest.mark.e2e
QA_RUNNER_ID = 'plugin:e2e/agent-runner-qa/default'
QA_PLUGIN_DIRNAME = 'e2e__agent-runner-qa'
@pytest.fixture(scope='session')
def agent_runner_e2e_port():
"""Port for the AgentRunner plugin-runtime E2E process."""
return 15310
@pytest.fixture(scope='session')
def agent_runner_e2e_tmpdir():
"""Create temporary directory for AgentRunner E2E testing."""
tmpdir = Path(tempfile.mkdtemp(prefix='langbot_agent_runner_e2e_'))
yield tmpdir
shutil.rmtree(tmpdir, ignore_errors=True)
def _write_qa_agent_runner_plugin(plugin_root: Path) -> None:
"""Write a deterministic AgentRunner plugin used by this E2E."""
runner_dir = plugin_root / 'components' / 'agent_runner'
runner_dir.mkdir(parents=True, exist_ok=True)
(plugin_root / 'assets').mkdir(parents=True, exist_ok=True)
(plugin_root / 'assets' / 'icon.svg').write_text(
'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1 1"></svg>',
encoding='utf-8',
)
(plugin_root / 'manifest.yaml').write_text(
textwrap.dedent(
"""
apiVersion: langbot/v1
kind: Plugin
metadata:
author: e2e
name: agent-runner-qa
version: 0.1.0
label:
en_US: AgentRunner QA
zh_Hans: AgentRunner QA
description:
en_US: Deterministic AgentRunner E2E probe.
zh_Hans: 确定性的 AgentRunner E2E 探针。
icon: assets/icon.svg
spec:
version: 0.1.0
config: []
components:
AgentRunner:
fromDirs:
- path: components/agent_runner/
pages: []
execution:
python:
path: main.py
attr: AgentRunnerQAPlugin
"""
).strip()
+ '\n',
encoding='utf-8',
)
(plugin_root / 'main.py').write_text(
textwrap.dedent(
"""
from __future__ import annotations
from langbot_plugin.api.definition.plugin import BasePlugin
class AgentRunnerQAPlugin(BasePlugin):
async def initialize(self) -> None:
pass
"""
).strip()
+ '\n',
encoding='utf-8',
)
(runner_dir / 'default.yaml').write_text(
textwrap.dedent(
"""
apiVersion: langbot/v1
kind: AgentRunner
metadata:
name: default
label:
en_US: QA Echo Runner
zh_Hans: QA Echo Runner
description:
en_US: Echoes input and exercises run-scoped state APIs.
zh_Hans: 回显输入并验证运行级状态 API。
spec:
config: []
capabilities:
streaming: false
permissions: {}
execution:
python:
path: default.py
attr: DefaultAgentRunner
"""
).strip()
+ '\n',
encoding='utf-8',
)
(runner_dir / 'default.py').write_text(
textwrap.dedent(
"""
from __future__ import annotations
from typing import AsyncGenerator
from langbot_plugin.api.definition.components.agent_runner.runner import AgentRunner
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
from langbot_plugin.api.entities.builtin.agent_runner.result import AgentRunResult
from langbot_plugin.api.entities.builtin.provider.message import Message
class DefaultAgentRunner(AgentRunner):
async def run(self, ctx: AgentRunContext) -> AsyncGenerator[AgentRunResult, None]:
text = ctx.input.to_text()
yield AgentRunResult.message_completed(
ctx.run_id,
Message(role='assistant', content=f'e2e echo: {text}'),
)
yield AgentRunResult.state_updated(
ctx.run_id,
'e2e.echo_count',
{'count': 1},
scope='conversation',
)
yield AgentRunResult.run_completed(ctx.run_id, finish_reason='stop')
"""
).strip()
+ '\n',
encoding='utf-8',
)
def _free_port() -> int:
"""Reserve a currently-free localhost TCP port for this E2E process."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(('127.0.0.1', 0))
return int(sock.getsockname()[1])
@pytest.fixture(scope='session')
def agent_runner_runtime_ports():
"""Control/debug ports for the standalone plugin runtime."""
control_port = _free_port()
debug_port = _free_port()
while debug_port == control_port:
debug_port = _free_port()
return control_port, debug_port
@pytest.fixture(scope='session')
def agent_runner_e2e_config_path(agent_runner_e2e_tmpdir, agent_runner_e2e_port, agent_runner_runtime_ports):
"""Create a plugin-enabled config and deterministic AgentRunner fixture."""
config_path = create_minimal_config(agent_runner_e2e_tmpdir, port=agent_runner_e2e_port)
create_test_directories(agent_runner_e2e_tmpdir)
import yaml
with open(config_path, encoding='utf-8') as f:
config = yaml.safe_load(f)
config['api']['global_api_key'] = 'e2e-agent-runner-key'
runtime_control_port, _runtime_debug_port = agent_runner_runtime_ports
config['plugin']['enable'] = True
config['plugin']['runtime_ws_url'] = f'ws://127.0.0.1:{runtime_control_port}/control/ws'
config['plugin']['enable_marketplace'] = False
config['box']['enabled'] = False
config['system']['jwt']['secret'] = 'e2e-agent-runner-secret-key'
with open(config_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(config, f, default_flow_style=False)
_write_qa_agent_runner_plugin(agent_runner_e2e_tmpdir / 'data' / 'plugins' / QA_PLUGIN_DIRNAME)
return config_path
@pytest.fixture(scope='session')
def agent_runner_runtime_process(agent_runner_e2e_tmpdir, agent_runner_runtime_ports):
"""Start the real SDK plugin runtime over WebSocket."""
control_port, debug_port = agent_runner_runtime_ports
stdout_path = agent_runner_e2e_tmpdir / 'plugin-runtime.stdout.log'
stderr_path = agent_runner_e2e_tmpdir / 'plugin-runtime.stderr.log'
stdout_file = open(stdout_path, 'wb')
stderr_file = open(stderr_path, 'wb')
proc = subprocess.Popen(
[
str(find_project_root() / '.venv' / 'bin' / 'python'),
'-m',
'langbot_plugin.cli.__init__',
'rt',
'--ws-control-port',
str(control_port),
'--ws-debug-port',
str(debug_port),
],
cwd=agent_runner_e2e_tmpdir,
stdout=stdout_file,
stderr=stderr_file,
start_new_session=True,
)
yield proc
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
stdout_file.close()
stderr_file.close()
@pytest.fixture(scope='session')
def agent_runner_langbot_process(
agent_runner_e2e_config_path,
agent_runner_e2e_port,
agent_runner_e2e_tmpdir,
agent_runner_runtime_process,
):
"""Start real LangBot with plugin runtime enabled."""
project_root = find_project_root()
proc = LangBotProcess(
project_root=project_root,
work_dir=agent_runner_e2e_tmpdir,
port=agent_runner_e2e_port,
timeout=180,
debug=True,
cli_args=['--standalone-runtime'],
)
success = proc.start()
if not success:
stdout, stderr = proc.get_logs()
pytest.fail(f'LangBot failed to start with AgentRunner plugin runtime:\nstdout: {stdout}\nstderr: {stderr}')
yield proc
proc.stop()
@pytest.fixture
def agent_runner_client(agent_runner_e2e_port, agent_runner_langbot_process):
"""HTTP client for the AgentRunner E2E backend."""
with httpx.Client(
base_url=f'http://127.0.0.1:{agent_runner_e2e_port}',
timeout=90.0,
trust_env=False,
) as client:
yield client
def _init_and_auth(client: httpx.Client) -> str:
"""Initialize the test admin user and return a bearer token."""
init_resp = client.post('/api/v1/user/init', json={'user': 'admin', 'password': 'admin'})
assert init_resp.status_code == 200
assert init_resp.json()['code'] in [0, 1]
auth_resp = client.post('/api/v1/user/auth', json={'user': 'admin', 'password': 'admin'})
assert auth_resp.status_code == 200
payload = auth_resp.json()
assert payload['code'] == 0
return payload['data']['token']
def test_plugin_runtime_discovers_agent_runner(agent_runner_client, agent_runner_langbot_process):
"""Pipeline metadata should include the real runtime-discovered QA runner."""
token = _init_and_auth(agent_runner_client)
start = time.time()
while time.time() - start < 60:
response = agent_runner_client.get(
'/api/v1/pipelines/_/metadata',
headers={'Authorization': f'Bearer {token}'},
)
assert response.status_code == 200
data = response.json()
assert data['code'] == 0
metadata_groups = data['data']['configs']
ai_metadata = next(group for group in metadata_groups if group.get('name') == 'ai')
runner_stage = next(stage for stage in ai_metadata['stages'] if stage['name'] == 'runner')
runner_select = next(item for item in runner_stage['config'] if item['name'] == 'id')
option_names = {option['name'] for option in runner_select['options']}
if QA_RUNNER_ID in option_names:
return
time.sleep(2)
assert QA_RUNNER_ID in option_names
def test_host_orchestrator_runs_agent_runner_and_records_ledger(
agent_runner_e2e_config_path,
agent_runner_e2e_tmpdir,
agent_runner_runtime_process,
):
"""The Host orchestrator should run the pluginized runner and persist run side effects."""
import asyncio
import os
from langbot.pkg.agent.runner.host_models import (
AgentBinding,
AgentEventEnvelope,
BindingScope,
DeliveryPolicy,
StatePolicy,
)
from langbot.pkg.core import boot
from langbot.pkg.utils import platform as platform_utils
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext, SubjectContext
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
async def _run_probe():
previous_cwd = Path.cwd()
previous_standalone_runtime = platform_utils.standalone_runtime
os.chdir(agent_runner_e2e_tmpdir)
platform_utils.standalone_runtime = True
ap = None
try:
ap = await boot.make_app(asyncio.get_running_loop())
for _ in range(60):
handler = getattr(ap.plugin_connector, 'handler', None)
if handler is not None:
await handler.ping()
break
await asyncio.sleep(1)
else:
raise AssertionError('Plugin runtime did not connect')
for _ in range(60):
runners = await ap.agent_runner_registry.list_runners(use_cache=False)
if any(runner.id == QA_RUNNER_ID for runner in runners):
break
await asyncio.sleep(1)
else:
raise AssertionError(f'{QA_RUNNER_ID} was not discovered')
event = AgentEventEnvelope(
event_id='e2e-orchestrator-event-001',
event_type='message.received',
source='api',
conversation_id='e2e-conversation',
thread_id='e2e-thread',
actor=ActorContext(actor_type='user', actor_id='user-001', actor_name='E2E User'),
subject=SubjectContext(subject_type='chat', subject_id='chat-001'),
input=AgentInput(text='hello from orchestrator e2e'),
delivery=DeliveryContext(surface='e2e'),
)
binding = AgentBinding(
binding_id='e2e-binding',
scope=BindingScope(scope_type='global'),
runner_id=QA_RUNNER_ID,
state_policy=StatePolicy(enable_state=True, state_scopes=['conversation']),
delivery_policy=DeliveryPolicy(enable_streaming=False, enable_reply=True),
)
return [message async for message in ap.agent_run_orchestrator.run(event, binding)]
finally:
if ap is not None:
ap.dispose()
platform_utils.standalone_runtime = previous_standalone_runtime
os.chdir(previous_cwd)
messages = asyncio.run(_run_probe())
assert len(messages) == 1
assert messages[0].role == 'assistant'
assert messages[0].content == 'e2e echo: hello from orchestrator e2e'
db_path = agent_runner_e2e_tmpdir / 'data' / 'langbot.db'
conn = sqlite3.connect(str(db_path))
try:
run_row = conn.execute(
"SELECT status, runner_id FROM agent_run WHERE event_id = 'e2e-orchestrator-event-001'"
).fetchone()
assert run_row == ('completed', QA_RUNNER_ID)
event_types = {
row[0]
for row in conn.execute(
"SELECT type FROM agent_run_event WHERE run_id = (SELECT run_id FROM agent_run WHERE event_id = 'e2e-orchestrator-event-001')"
).fetchall()
}
assert {'state.updated', 'message.completed', 'run.completed'}.issubset(event_types)
state_row = conn.execute(
"SELECT value_json FROM agent_runner_state WHERE state_key = 'e2e.echo_count'"
).fetchone()
assert state_row is not None
assert '"count": 1' in state_row[0]
finally:
conn.close()
def test_pluginized_agent_runner_executes_through_runtime(agent_runner_client, agent_runner_langbot_process):
"""The Host debug surface should invoke the QA runner through the real Plugin Runtime."""
token = _init_and_auth(agent_runner_client)
start = time.time()
while time.time() - start < 60:
metadata_response = agent_runner_client.get(
'/api/v1/pipelines/_/metadata',
headers={'Authorization': f'Bearer {token}'},
)
assert metadata_response.status_code == 200
metadata = metadata_response.json()['data']['configs']
ai_metadata = next(group for group in metadata if group.get('name') == 'ai')
runner_stage = next(stage for stage in ai_metadata['stages'] if stage['name'] == 'runner')
runner_select = next(item for item in runner_stage['config'] if item['name'] == 'id')
if QA_RUNNER_ID in {option['name'] for option in runner_select['options']}:
break
time.sleep(2)
else:
pytest.fail(f'{QA_RUNNER_ID} was not discovered before run_agent')
response = agent_runner_client.post(
'/api/v1/system/debug/plugin/action',
headers={'Authorization': f'Bearer {token}'},
json={
'action': 'run_agent',
'timeout': 60,
'data': {
'plugin_author': 'e2e',
'plugin_name': 'agent-runner-qa',
'runner_name': 'default',
'context': {
'run_id': 'e2e-run-001',
'trigger': {'type': 'message.received'},
'event': {
'event_id': 'e2e-event-001',
'event_type': 'message.received',
'source': 'api',
},
'input': {'text': 'hello from real e2e'},
'delivery': {'surface': 'e2e'},
'resources': {},
'runtime': {},
},
},
},
)
assert response.status_code == 200
payload = response.json()
assert payload['code'] == 0
result = payload['data']
assert result['type'] == 'message.completed', result
assert result['data']['message']['role'] == 'assistant'
assert result['data']['message']['content'] == 'e2e echo: hello from real e2e'
+25
View File
@@ -141,6 +141,27 @@ def create_minimal_config(tmpdir: Path, port: int = 15300) -> Path:
'delete_batch_size': 1000, 'delete_batch_size': 1000,
}, },
}, },
'box': {
'enabled': False,
'backend': 'local',
'runtime': {
'endpoint': '',
},
'local': {
'profile': 'default',
'image': '',
'host_root': str(tmpdir / 'box'),
'default_workspace': '',
'skills_root': 'skills',
'allowed_mount_roots': [str(tmpdir / 'box'), '/tmp'],
'workspace_quota_mb': None,
},
'e2b': {
'api_key': '',
'api_url': '',
'template': '',
},
},
'space': { 'space': {
'url': 'https://space.langbot.app', 'url': 'https://space.langbot.app',
'models_gateway_api_url': 'https://api.langbot.cloud/v1', 'models_gateway_api_url': 'https://api.langbot.cloud/v1',
@@ -168,8 +189,12 @@ def create_test_directories(tmpdir: Path) -> dict[str, Path]:
"""Create necessary directories for LangBot testing.""" """Create necessary directories for LangBot testing."""
directories = { directories = {
'data': tmpdir / 'data', 'data': tmpdir / 'data',
'labels': tmpdir / 'data' / 'labels',
'metadata': tmpdir / 'data' / 'metadata',
'logs': tmpdir / 'logs', 'logs': tmpdir / 'logs',
'data_logs': tmpdir / 'data' / 'logs',
'storage': tmpdir / 'storage', 'storage': tmpdir / 'storage',
'box': tmpdir / 'box',
'chroma': tmpdir / 'chroma', 'chroma': tmpdir / 'chroma',
} }
+40 -7
View File
@@ -9,6 +9,7 @@ import subprocess
import time import time
import signal import signal
import os import os
import sys
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
import logging import logging
@@ -26,13 +27,21 @@ class LangBotProcess:
port: int = 15300, port: int = 15300,
timeout: int = 30, timeout: int = 30,
collect_coverage: bool = True, collect_coverage: bool = True,
debug: bool = False,
cli_args: list[str] | None = None,
): ):
self.project_root = project_root self.project_root = project_root
self.work_dir = work_dir # Directory containing data/config.yaml self.work_dir = work_dir # Directory containing data/config.yaml
self.port = port self.port = port
self.timeout = timeout self.timeout = timeout
self.collect_coverage = collect_coverage self.collect_coverage = collect_coverage
self.debug = debug
self.cli_args = cli_args or []
self.process: Optional[subprocess.Popen] = None self.process: Optional[subprocess.Popen] = None
self._stdout_file = None
self._stderr_file = None
self._stdout_path = self.work_dir / 'langbot.stdout.log'
self._stderr_path = self.work_dir / 'langbot.stderr.log'
self._stdout_data: bytes = b'' self._stdout_data: bytes = b''
self._stderr_data: bytes = b'' self._stderr_data: bytes = b''
self._coverage_file: Optional[Path] = None self._coverage_file: Optional[Path] = None
@@ -63,13 +72,14 @@ class LangBotProcess:
# Disable telemetry # Disable telemetry
env['SPACE__DISABLE_TELEMETRY'] = 'true' env['SPACE__DISABLE_TELEMETRY'] = 'true'
env['SPACE__DISABLE_MODELS_SERVICE'] = 'true' env['SPACE__DISABLE_MODELS_SERVICE'] = 'true'
if self.debug:
env['DEBUG'] = 'true'
# Build command # Build command
if self.collect_coverage: if self.collect_coverage:
# Use coverage.py to collect coverage data # Use coverage.py to collect coverage data
# Set COVERAGE_PROCESS_START to enable coverage in subprocess # Set COVERAGE_PROCESS_START to enable coverage in subprocess
self._coverage_file = self.work_dir / '.coverage.e2e' self._coverage_file = self.work_dir / '.coverage.e2e'
env['COVERAGE_PROCESS_START'] = str(self.project_root / '.coveragerc')
env['COVERAGE_FILE'] = str(self._coverage_file) env['COVERAGE_FILE'] = str(self._coverage_file)
# Create .coveragerc for subprocess # Create .coveragerc for subprocess
@@ -88,27 +98,33 @@ precision = 2
coveragerc_path = self.work_dir / '.coveragerc' coveragerc_path = self.work_dir / '.coveragerc'
with open(coveragerc_path, 'w') as f: with open(coveragerc_path, 'w') as f:
f.write(coveragerc_content) f.write(coveragerc_content)
env['COVERAGE_PROCESS_START'] = str(coveragerc_path)
cmd = [ cmd = [
sys.executable,
'-m',
'coverage', 'coverage',
'run', 'run',
'--rcfile=' + str(coveragerc_path), '--rcfile=' + str(coveragerc_path),
'-m', '-m',
'langbot', 'langbot',
*self.cli_args,
] ]
else: else:
cmd = ['uv', 'run', 'python', '-m', 'langbot'] cmd = [sys.executable, '-m', 'langbot', *self.cli_args]
logger.info(f'Starting LangBot in: {self.work_dir}') logger.info(f'Starting LangBot in: {self.work_dir}')
logger.info(f'Command: {cmd}') logger.info(f'Command: {cmd}')
# Start process (run in work_dir so it finds data/config.yaml) # Start process (run in work_dir so it finds data/config.yaml)
self._stdout_file = open(self._stdout_path, 'wb')
self._stderr_file = open(self._stderr_path, 'wb')
self.process = subprocess.Popen( self.process = subprocess.Popen(
cmd, cmd,
cwd=self.work_dir, cwd=self.work_dir,
env=env, env=env,
stdout=subprocess.PIPE, stdout=self._stdout_file,
stderr=subprocess.PIPE, stderr=self._stderr_file,
preexec_fn=os.setsid if os.name != 'nt' else None, preexec_fn=os.setsid if os.name != 'nt' else None,
) )
@@ -117,7 +133,14 @@ precision = 2
while time.time() - start_time < self.timeout: while time.time() - start_time < self.timeout:
# Check if process died # Check if process died
if self.process.poll() is not None: if self.process.poll() is not None:
self._stdout_data, self._stderr_data = self.process.communicate() if self._stdout_file:
self._stdout_file.close()
self._stdout_file = None
if self._stderr_file:
self._stderr_file.close()
self._stderr_file = None
self._stdout_data = self._stdout_path.read_bytes() if self._stdout_path.exists() else b''
self._stderr_data = self._stderr_path.read_bytes() if self._stderr_path.exists() else b''
logger.error(f'LangBot process died: {self._stderr_data.decode()}') logger.error(f'LangBot process died: {self._stderr_data.decode()}')
return False return False
@@ -170,8 +193,14 @@ precision = 2
self.process.wait() self.process.wait()
# Collect output for debugging # Collect output for debugging
if self.process.stdout or self.process.stderr: if self._stdout_file:
self._stdout_data, self._stderr_data = self.process.communicate() self._stdout_file.close()
self._stdout_file = None
if self._stderr_file:
self._stderr_file.close()
self._stderr_file = None
self._stdout_data = self._stdout_path.read_bytes() if self._stdout_path.exists() else b''
self._stderr_data = self._stderr_path.read_bytes() if self._stderr_path.exists() else b''
self.process = None self.process = None
@@ -183,6 +212,10 @@ precision = 2
"""Get stdout and stderr logs.""" """Get stdout and stderr logs."""
stdout = self._stdout_data.decode('utf-8', errors='replace') stdout = self._stdout_data.decode('utf-8', errors='replace')
stderr = self._stderr_data.decode('utf-8', errors='replace') stderr = self._stderr_data.decode('utf-8', errors='replace')
if not stdout and self._stdout_path.exists():
stdout = self._stdout_path.read_text(encoding='utf-8', errors='replace')
if not stderr and self._stderr_path.exists():
stderr = self._stderr_path.read_text(encoding='utf-8', errors='replace')
return stdout, stderr return stdout, stderr
def get_coverage_file(self) -> Optional[Path]: def get_coverage_file(self) -> Optional[Path]:
@@ -33,12 +33,11 @@ class FakeConnection:
class FakeApplication: class FakeApplication:
def __init__(self, db_engine, admin_plugins=None, runner_registry=None, orchestrator=None): def __init__(self, db_engine, admin_plugins=None, runner_registry=None):
self.logger = MagicMock() self.logger = MagicMock()
self.persistence_mgr = MagicMock() self.persistence_mgr = MagicMock()
self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine) self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine)
self.agent_runner_registry = runner_registry self.agent_runner_registry = runner_registry
self.agent_run_orchestrator = orchestrator
self.instance_config = SimpleNamespace( self.instance_config = SimpleNamespace(
data={ data={
'agent_runner': { 'agent_runner': {
@@ -73,66 +72,16 @@ class FakeRunnerRegistry:
self.runners = runners self.runners = runners
self.calls = [] self.calls = []
async def get(self, runner_id, bound_plugins=None):
self.calls.append({'runner_id': runner_id, 'bound_plugins': bound_plugins})
if isinstance(self.runners, dict):
if runner_id not in self.runners:
raise KeyError(runner_id)
return self.runners[runner_id]
for runner in self.runners:
if getattr(runner, 'id', None) == runner_id:
return runner
raise KeyError(runner_id)
async def list_runners(self, *, bound_plugins=None, use_cache=True): async def list_runners(self, *, bound_plugins=None, use_cache=True):
self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache}) self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache})
return self.runners return self.runners
class FakeProgrammaticOrchestrator: def _handler(db_engine, admin_plugins=None, runner_registry=None):
def __init__(self, db_engine):
self.db_engine = db_engine
self.calls = []
async def run(self, event, binding, bound_plugins=None, adapter_context=None, run_id=None):
self.calls.append(
{
'event': event,
'binding': binding,
'bound_plugins': bound_plugins,
'adapter_context': adapter_context,
'run_id': run_id,
}
)
store = RunLedgerStore(self.db_engine)
await store.create_run(
run_id=run_id,
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=binding.runner_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
authorization={'available_apis': {}},
metadata={'event_type': event.event_type, 'source': event.source},
status='running',
)
await store.finalize_run(run_id=run_id, status='completed', status_reason='done')
if False:
yield None
def _handler(db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
async def fake_disconnect(): async def fake_disconnect():
return True return True
fake_app = FakeApplication( fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins, runner_registry=runner_registry)
db_engine,
admin_plugins=admin_plugins,
runner_registry=runner_registry,
orchestrator=orchestrator,
)
return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app)
@@ -1311,64 +1260,6 @@ async def test_runtime_register_heartbeat_and_list_actions(session_registry, db_
assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1'] assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1']
@pytest.mark.asyncio
async def test_admin_run_create_starts_programmatic_run(db_engine):
orchestrator = FakeProgrammaticOrchestrator(db_engine)
runner_id = 'plugin:test/runner/default'
registry = FakeRunnerRegistry({runner_id: SimpleNamespace(id=runner_id)})
handler = _handler(
db_engine,
admin_plugins=[{'identity': 'admin/plugin', 'permissions': ['agent_run:admin']}],
runner_registry=registry,
orchestrator=orchestrator,
)
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
result = await run_create(
{
'caller_plugin_identity': 'admin/plugin',
'run_id': 'run_programmatic',
'runner_id': runner_id,
'input': {'text': 'work on issue 1', 'contents': [], 'attachments': []},
'conversation_id': 'conv_issue_board',
'event_type': 'api.invoked',
'wait_for_completion': True,
}
)
assert result.code == 0
assert result.data['run_id'] == 'run_programmatic'
assert result.data['runner_id'] == runner_id
assert result.data['status'] == 'completed'
assert len(orchestrator.calls) == 1
call = orchestrator.calls[0]
assert call['run_id'] == 'run_programmatic'
assert call['event'].event_type == 'api.invoked'
assert call['event'].source == 'api'
assert call['event'].conversation_id == 'conv_issue_board'
assert call['binding'].runner_id == runner_id
assert call['binding'].delivery_policy.enable_reply is False
@pytest.mark.asyncio
async def test_run_create_requires_admin_permission(db_engine):
orchestrator = FakeProgrammaticOrchestrator(db_engine)
handler = _handler(db_engine, orchestrator=orchestrator)
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
result = await run_create(
{
'caller_plugin_identity': 'regular/plugin',
'runner_id': 'plugin:test/runner/default',
'input': {'text': 'work'},
}
)
assert result.code != 0
assert 'not authorized' in result.message
assert orchestrator.calls == []
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_run_claim_renew_and_release_actions(session_registry, db_engine): async def test_run_claim_renew_and_release_actions(session_registry, db_engine):
await _register_session( await _register_session(
@@ -0,0 +1,309 @@
from __future__ import annotations
import datetime as dt
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
from langbot.pkg.api.http.service.agent import (
AGENT_DEFAULT_EVENT_PATTERNS,
AGENT_KIND_AGENT,
AGENT_KIND_PIPELINE,
PIPELINE_EVENT_PATTERNS,
AgentService,
)
pytestmark = pytest.mark.asyncio
def _result(items: list | None = None, first_item=None):
result = Mock()
result.all = Mock(return_value=items or [])
result.first = Mock(return_value=first_item)
return result
def _agent_row(
agent_uuid: str = 'agent-1',
name: str = 'Test Agent',
updated_at: dt.datetime | str | None = None,
config: dict | None = None,
supported_event_patterns: list[str] | None = None,
):
return SimpleNamespace(
uuid=agent_uuid,
name=name,
description='Agent description',
emoji='A',
kind=AGENT_KIND_AGENT,
component_ref='plugin:test/runner/default',
config=config or {
'runner': {'id': 'plugin:test/runner/default', 'expire-time': 0},
'runner_config': {'plugin:test/runner/default': {'temperature': 0.2}},
},
enabled=True,
supported_event_patterns=supported_event_patterns or ['*'],
created_at=dt.datetime(2026, 1, 1, 9, 0, 0),
updated_at=updated_at or dt.datetime(2026, 1, 1, 10, 0, 0),
)
def _serialize_agent(model_cls, entity, masked_columns=None):
return {
'uuid': entity.uuid,
'name': entity.name,
'description': entity.description,
'emoji': entity.emoji,
'kind': entity.kind,
'component_ref': entity.component_ref,
'config': entity.config,
'enabled': entity.enabled,
'supported_event_patterns': entity.supported_event_patterns,
'created_at': entity.created_at,
'updated_at': entity.updated_at,
}
def _compiled_params(statement):
return statement.compile().params
def _compiled_update_values(statement):
return {
key: value
for key, value in statement.compile().params.items()
if not key.startswith('uuid_')
}
def _make_app():
app = SimpleNamespace()
app.persistence_mgr = SimpleNamespace(
execute_async=AsyncMock(),
serialize_model=Mock(side_effect=_serialize_agent),
)
app.pipeline_service = SimpleNamespace(
get_pipeline_metadata=AsyncMock(return_value=[]),
get_pipelines=AsyncMock(return_value=[]),
get_pipeline=AsyncMock(return_value=None),
create_pipeline=AsyncMock(),
update_pipeline=AsyncMock(),
delete_pipeline=AsyncMock(),
_get_default_values_from_schema=Mock(return_value={}),
)
app.agent_runner_registry = None
app.logger = Mock()
return app
class TestAgentServiceMetadata:
async def test_get_agent_metadata_exposes_runner_config_and_kind_capabilities(self):
app = _make_app()
ai_metadata = {'name': 'ai', 'stages': [{'name': 'runner'}]}
app.pipeline_service.get_pipeline_metadata = AsyncMock(
return_value=[{'name': 'trigger'}, ai_metadata, {'name': 'output'}]
)
metadata = await AgentService(app).get_agent_metadata()
assert metadata['runner_config'] == ai_metadata
assert metadata['kinds'] == [
{
'name': AGENT_KIND_AGENT,
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
'message_only': False,
},
{
'name': AGENT_KIND_PIPELINE,
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
'message_only': True,
},
]
class TestAgentServiceListAndLookup:
async def test_get_agents_merges_agents_and_pipelines_without_leaking_config(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(
return_value=_result(
items=[
_agent_row(
agent_uuid='agent-1',
updated_at=dt.datetime(2026, 1, 1, 10, 0, 0),
supported_event_patterns=['platform.member.*'],
)
]
)
)
app.pipeline_service.get_pipelines = AsyncMock(
return_value=[
{
'uuid': 'pipeline-1',
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
'config': {'ai': {'runner': {'id': 'pipeline-runner'}}},
'created_at': '2026-01-01T08:00:00',
'updated_at': '2026-01-01T11:00:00',
}
]
)
agents = await AgentService(app).get_agents(sort_by='updated_at', sort_order='DESC')
assert [agent['uuid'] for agent in agents] == ['pipeline-1', 'agent-1']
assert agents[0]['kind'] == AGENT_KIND_PIPELINE
assert agents[0]['component_ref'] == 'pipeline'
assert agents[0]['capability'] == {
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
'message_only': True,
}
assert agents[1]['kind'] == AGENT_KIND_AGENT
assert agents[1]['capability'] == {
'supported_event_patterns': ['platform.member.*'],
'message_only': False,
}
assert all('config' not in agent for agent in agents)
async def test_get_agent_returns_agent_with_config_before_pipeline_fallback(self):
app = _make_app()
agent = _agent_row(agent_uuid='agent-1')
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=agent))
result = await AgentService(app).get_agent('agent-1')
assert result['uuid'] == 'agent-1'
assert result['kind'] == AGENT_KIND_AGENT
assert result['config'] == agent.config
app.pipeline_service.get_pipeline.assert_not_awaited()
async def test_get_agent_falls_back_to_pipeline_product_item_with_config(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=None))
app.pipeline_service.get_pipeline = AsyncMock(
return_value={
'uuid': 'pipeline-1',
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
'config': {'ai': {'runner': {'id': 'pipeline-runner'}}},
'created_at': '2026-01-01T08:00:00',
'updated_at': '2026-01-01T11:00:00',
}
)
result = await AgentService(app).get_agent('pipeline-1')
assert result['kind'] == AGENT_KIND_PIPELINE
assert result['enabled'] is True
assert result['config'] == {'ai': {'runner': {'id': 'pipeline-runner'}}}
assert result['capability']['message_only'] is True
class TestAgentServiceCreateUpdateDelete:
async def test_create_agent_uses_default_runner_config_from_registry(self):
app = _make_app()
runner = SimpleNamespace(
id='plugin:langbot/local-agent/default',
config_schema=[
{'name': 'model', 'default': 'gpt-4.1'},
{'name': 'temperature', 'default': 0.2},
{'name': 'no-default'},
],
)
app.agent_runner_registry = SimpleNamespace(list_runners=AsyncMock(return_value=[runner]))
app.pipeline_service._get_default_values_from_schema = Mock(
return_value={'model': 'gpt-4.1', 'temperature': 0.2}
)
app.persistence_mgr.execute_async = AsyncMock(return_value=Mock())
result = await AgentService(app).create_agent(
{
'name': 'Support Agent',
'description': 'Handles support events',
'emoji': 'S',
}
)
insert_values = _compiled_params(app.persistence_mgr.execute_async.await_args.args[0])
assert result['kind'] == AGENT_KIND_AGENT
assert result['uuid'] == insert_values['uuid']
assert insert_values['name'] == 'Support Agent'
assert insert_values['component_ref'] == runner.id
assert insert_values['config'] == {
'runner': {'id': runner.id, 'expire-time': 0},
'runner_config': {runner.id: {'model': 'gpt-4.1', 'temperature': 0.2}},
}
assert insert_values['enabled'] is True
assert insert_values['supported_event_patterns'] == AGENT_DEFAULT_EVENT_PATTERNS
app.pipeline_service._get_default_values_from_schema.assert_called_once_with(runner.config_schema)
async def test_update_agent_protects_immutable_fields_and_recalculates_component_ref(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(
side_effect=[
_result(first_item=_agent_row(agent_uuid='agent-1')),
Mock(),
]
)
new_config = {
'runner': {'id': 'plugin:test/new-runner/default', 'expire-time': 0},
'runner_config': {'plugin:test/new-runner/default': {'timeout': 30}},
}
await AgentService(app).update_agent(
'agent-1',
{
'uuid': 'caller-owned-uuid',
'kind': AGENT_KIND_PIPELINE,
'created_at': '2020-01-01T00:00:00',
'updated_at': '2020-01-01T00:00:00',
'capability': {'message_only': True},
'name': 'Updated Agent',
'config': new_config,
'supported_event_patterns': [],
},
)
update_values = _compiled_update_values(app.persistence_mgr.execute_async.await_args_list[1].args[0])
assert update_values == {
'name': 'Updated Agent',
'config': new_config,
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
'component_ref': 'plugin:test/new-runner/default',
}
async def test_pipeline_kind_create_update_delete_delegate_to_pipeline_service(self):
app = _make_app()
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=None))
app.pipeline_service.create_pipeline = AsyncMock(return_value='pipeline-created')
app.pipeline_service.get_pipeline = AsyncMock(return_value={'uuid': 'pipeline-1'})
service = AgentService(app)
created = await service.create_agent(
{
'kind': AGENT_KIND_PIPELINE,
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
}
)
await service.update_agent('pipeline-1', {'name': 'Updated Pipeline'})
await service.delete_agent('pipeline-1')
assert created == {'uuid': 'pipeline-created', 'kind': AGENT_KIND_PIPELINE}
app.pipeline_service.create_pipeline.assert_awaited_once_with(
{
'name': 'Pipeline Agent',
'description': 'Legacy pipeline',
'emoji': 'P',
'config': {},
}
)
app.pipeline_service.update_pipeline.assert_awaited_once_with(
'pipeline-1',
{'name': 'Updated Pipeline'},
)
app.pipeline_service.delete_pipeline.assert_awaited_once_with('pipeline-1')
@@ -52,6 +52,15 @@ def _create_mock_result(items: list = None, first_item=None):
return result return result
def _compiled_update_values(statement):
"""Return update values without SQLAlchemy WHERE bind params."""
return {
key: value
for key, value in statement.compile().params.items()
if not key.startswith('uuid_')
}
def _create_mock_discover(adapter_webhook_flags: dict[str, bool] = None): def _create_mock_discover(adapter_webhook_flags: dict[str, bool] = None):
"""Create mock ComponentDiscoveryEngine exposing MessagePlatformAdapter manifests. """Create mock ComponentDiscoveryEngine exposing MessagePlatformAdapter manifests.
@@ -531,6 +540,201 @@ class TestBotServiceUpdateBot:
assert update_params['use_pipeline_name'] == 'Updated Pipeline' assert update_params['use_pipeline_name'] == 'Updated Pipeline'
class TestBotServiceEventBindings:
"""Tests for EBA event binding validation and persistence."""
async def test_normalize_event_bindings_validates_targets_and_preserves_order(self):
"""Valid bindings are normalized with stable order and target data."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace()
ap.persistence_mgr.execute_async = AsyncMock(
side_effect=[
_create_mock_result(first_item=SimpleNamespace(uuid='pipeline-1')),
_create_mock_result(
first_item=SimpleNamespace(
uuid='agent-1',
supported_event_patterns=['platform.member.*'],
)
),
]
)
service = BotService(ap)
normalized = await service._normalize_event_bindings(
[
{
'event_pattern': ' message.received ',
'target_type': 'pipeline',
'target_uuid': ' pipeline-1 ',
'filters': [{'field': 'sender.id', 'operator': 'eq', 'value': '1000'}],
'priority': '5',
'enabled': False,
'description': 'Route message events',
},
{
'id': 'agent-binding',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'priority': 7,
},
{
'event_pattern': 'platform.member.left',
'target_type': 'discard',
'target_uuid': 'ignored-target',
},
]
)
uuid.UUID(normalized[0]['id'])
assert normalized == [
{
'id': normalized[0]['id'],
'event_pattern': 'message.received',
'target_type': 'pipeline',
'target_uuid': 'pipeline-1',
'filters': [{'field': 'sender.id', 'operator': 'eq', 'value': '1000'}],
'priority': 5,
'enabled': False,
'description': 'Route message events',
'order': 0,
},
{
'id': 'agent-binding',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'filters': [],
'priority': 7,
'enabled': True,
'description': '',
'order': 1,
},
{
'id': normalized[2]['id'],
'event_pattern': 'platform.member.left',
'target_type': 'discard',
'target_uuid': '',
'filters': [],
'priority': 0,
'enabled': True,
'description': '',
'order': 2,
},
]
async def test_normalize_event_bindings_rejects_pipeline_for_non_message_event(self):
"""Pipeline targets are limited to message events."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace(execute_async=AsyncMock())
service = BotService(ap)
with pytest.raises(ValueError, match='Pipeline can only be bound to message events'):
await service._normalize_event_bindings(
[
{
'event_pattern': 'platform.member.joined',
'target_type': 'pipeline',
'target_uuid': 'pipeline-1',
}
]
)
ap.persistence_mgr.execute_async.assert_not_awaited()
@pytest.mark.parametrize(
('agent', 'error'),
[
(None, 'Agent not found'),
(
SimpleNamespace(uuid='agent-1', supported_event_patterns=['message.*']),
'Agent does not support this event pattern',
),
],
)
async def test_normalize_event_bindings_rejects_invalid_agent_target(self, agent, error):
"""Agent targets must exist and support the requested event pattern."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace(
execute_async=AsyncMock(return_value=_create_mock_result(first_item=agent))
)
service = BotService(ap)
with pytest.raises(ValueError, match=error):
await service._normalize_event_bindings(
[
{
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
}
]
)
async def test_update_bot_persists_normalized_event_bindings_and_reloads_runtime_bot(self):
"""update_bot stores normalized bindings before reloading the runtime bot."""
ap = SimpleNamespace()
ap.persistence_mgr = SimpleNamespace()
ap.persistence_mgr.execute_async = AsyncMock(
side_effect=[
_create_mock_result(
first_item=SimpleNamespace(
uuid='agent-1',
supported_event_patterns=['platform.member.*'],
)
),
Mock(),
]
)
runtime_bot = SimpleNamespace(enable=True, run=AsyncMock())
loaded_bot = {'uuid': 'bot-1', 'name': 'Bot with bindings'}
ap.platform_mgr = SimpleNamespace(
remove_bot=AsyncMock(),
load_bot=AsyncMock(return_value=runtime_bot),
)
bot_session = SimpleNamespace(using_conversation=SimpleNamespace(bot_uuid='bot-1'))
other_session = SimpleNamespace(using_conversation=SimpleNamespace(bot_uuid='other-bot'))
ap.sess_mgr = SimpleNamespace(session_list=[bot_session, other_session])
service = BotService(ap)
service.get_bot = AsyncMock(return_value=loaded_bot)
await service.update_bot(
'bot-1',
{
'event_bindings': [
{
'id': 'binding-1',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'priority': '9',
}
]
},
)
update_values = _compiled_update_values(ap.persistence_mgr.execute_async.await_args_list[1].args[0])
assert update_values['event_bindings'] == [
{
'id': 'binding-1',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-1',
'filters': [],
'priority': 9,
'enabled': True,
'description': '',
'order': 0,
}
]
ap.platform_mgr.remove_bot.assert_awaited_once_with('bot-1')
service.get_bot.assert_awaited_once_with('bot-1')
ap.platform_mgr.load_bot.assert_awaited_once_with(loaded_bot)
runtime_bot.run.assert_awaited_once()
assert bot_session.using_conversation is None
assert other_session.using_conversation.bot_uuid == 'other-bot'
class TestBotServiceDeleteBot: class TestBotServiceDeleteBot:
"""Tests for delete_bot method.""" """Tests for delete_bot method."""
@@ -2,6 +2,7 @@
RuntimeBot.resolve_pipeline_uuid and _match_operator unit tests RuntimeBot.resolve_pipeline_uuid and _match_operator unit tests
""" """
from types import SimpleNamespace
from unittest.mock import Mock from unittest.mock import Mock
@@ -278,3 +279,108 @@ class TestResolvePipelineUuid:
uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'normal message') uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'normal message')
assert uuid == 'default-uuid' assert uuid == 'default-uuid'
assert routed is False assert routed is False
class TestEBAEventBindings:
"""Test RuntimeBot EBA event binding helpers."""
@staticmethod
def _make_bot(bindings):
from langbot.pkg.platform.botmgr import RuntimeBot
bot = object.__new__(RuntimeBot)
bot.bot_entity = SimpleNamespace(event_bindings=bindings)
return bot
def test_resolve_eba_event_binding_uses_enabled_pattern_filters_priority_and_order(self):
"""The selected binding is the first matching highest-priority binding."""
bot = self._make_bot(
[
{
'id': 'disabled',
'enabled': False,
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-disabled',
'priority': 100,
'order': 0,
},
{
'id': 'wrong-room',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-wrong-room',
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-2'}],
'priority': 50,
'order': 1,
},
{
'id': 'first-high',
'event_pattern': 'platform.member.joined',
'target_type': 'agent',
'target_uuid': 'agent-first',
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-1'}],
'priority': 10,
'order': 2,
},
{
'id': 'second-high',
'event_pattern': 'platform.member.*',
'target_type': 'agent',
'target_uuid': 'agent-second',
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-1'}],
'priority': 10,
'order': 3,
},
{
'id': 'fallback',
'event_pattern': '*',
'target_type': 'discard',
'priority': 1,
'order': 4,
},
]
)
selected = bot._resolve_eba_event_binding(
{'room': {'id': 'room-1'}},
'platform.member.joined',
)
assert selected['id'] == 'first-high'
assert selected['target_uuid'] == 'agent-first'
def test_agent_product_to_binding_projects_runner_config_and_policies(self):
"""Agent products become bot-scoped runner bindings for EBA dispatch."""
from langbot.pkg.platform.botmgr import RuntimeBot
binding = RuntimeBot._agent_product_to_binding(
{
'uuid': 'agent-1',
'component_ref': 'plugin:test/fallback/default',
'config': {
'runner': {'id': 'plugin:test/runner/default'},
'runner_config': {
'plugin:test/runner/default': {
'temperature': 0.2,
'max_tokens': 1000,
}
},
},
},
{'id': 'binding-1'},
'platform.member.joined',
'bot-1',
)
assert binding is not None
assert binding.binding_id == 'bot:bot-1:binding-1'
assert binding.scope.scope_type == 'bot'
assert binding.scope.scope_id == 'bot-1'
assert binding.event_types == ['platform.member.joined']
assert binding.runner_id == 'plugin:test/runner/default'
assert binding.runner_config == {'temperature': 0.2, 'max_tokens': 1000}
assert binding.delivery_policy.enable_streaming is False
assert binding.delivery_policy.enable_reply is True
assert binding.state_policy.state_scopes == ['conversation', 'actor', 'subject', 'runner']
assert binding.agent_id == 'agent-1'