mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-24 22:44:23 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d02e1a14a3 | |||
| cd6a39d3a2 |
@@ -220,7 +220,6 @@ class AgentRunContextBuilder:
|
||||
binding: AgentBinding,
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
resources: AgentResources,
|
||||
run_id: str | None = None,
|
||||
) -> AgentRunContextPayload:
|
||||
"""Build AgentRunContext from event-first envelope.
|
||||
|
||||
@@ -236,8 +235,8 @@ class AgentRunContextBuilder:
|
||||
Returns:
|
||||
AgentRunContextPayload for the runner
|
||||
"""
|
||||
# Generate new run_id unless an API caller already reserved one.
|
||||
run_id = run_id or str(uuid.uuid4())
|
||||
# Generate new run_id
|
||||
run_id = str(uuid.uuid4())
|
||||
|
||||
# Build trigger from event
|
||||
trigger: AgentTrigger = {
|
||||
|
||||
@@ -68,7 +68,6 @@ class AgentRunOrchestrator:
|
||||
binding: AgentBinding,
|
||||
bound_plugins: list[str] | None = None,
|
||||
adapter_context: dict[str, typing.Any] | None = None,
|
||||
run_id: str | None = None,
|
||||
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
|
||||
"""Run an AgentRunner from an event-first envelope."""
|
||||
runner_id = binding.runner_id
|
||||
@@ -85,7 +84,6 @@ class AgentRunOrchestrator:
|
||||
binding=binding,
|
||||
descriptor=descriptor,
|
||||
resources=resources,
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
session_query_id = None
|
||||
|
||||
@@ -146,8 +146,10 @@ class AgentRunnerRegistry:
|
||||
Returns:
|
||||
List of runner descriptors
|
||||
"""
|
||||
if use_cache and self._cache is not None:
|
||||
# Filter from cache
|
||||
if use_cache and self._cache is not None and self._cache:
|
||||
# Filter from cache. Do not treat an empty cache as final because the
|
||||
# plugin runtime may still be launching installed plugins when the
|
||||
# first metadata request arrives.
|
||||
return self._filter_runners_by_bound_plugins(self._cache, bound_plugins)
|
||||
|
||||
# Discover fresh (always full list)
|
||||
|
||||
@@ -10,7 +10,7 @@ ssereadtimeout) live in ``extra_args`` and are left untouched — the
|
||||
auto-detecting remote transport consumes them regardless.
|
||||
|
||||
Revision ID: 0006_normalize_mcp_remote_mode
|
||||
Revises: 0005_add_llm_context_length
|
||||
Revises: 8d3a1f2c4b6e
|
||||
Create Date: 2026-06-21
|
||||
"""
|
||||
|
||||
@@ -18,7 +18,7 @@ import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
revision = '0006_normalize_mcp_remote_mode'
|
||||
down_revision = '0005_add_llm_context_length'
|
||||
down_revision = '8d3a1f2c4b6e'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
@@ -3,24 +3,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
import asyncio
|
||||
import time
|
||||
import uuid
|
||||
|
||||
|
||||
from langbot_plugin.runtime.io import handler
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
|
||||
|
||||
|
||||
from ..agent.runner.host_models import (
|
||||
AgentBinding,
|
||||
AgentEventEnvelope,
|
||||
BindingScope,
|
||||
DeliveryPolicy,
|
||||
ResourcePolicy,
|
||||
StatePolicy,
|
||||
)
|
||||
from ..agent.runner.run_ledger_store import TERMINAL_STATUSES
|
||||
|
||||
from .agent_run_support import (
|
||||
@@ -42,320 +30,7 @@ from .agent_run_support import (
|
||||
)
|
||||
|
||||
|
||||
def _dict_payload(value: Any, *, field_name: str) -> tuple[dict[str, Any], handler.ActionResponse | None]:
|
||||
if value is None:
|
||||
return {}, None
|
||||
if not isinstance(value, dict):
|
||||
return {}, handler.ActionResponse.error(message=f'{field_name} must be an object')
|
||||
return dict(value), None
|
||||
|
||||
|
||||
def _build_run_create_event(data: dict[str, Any], *, run_id: str) -> tuple[AgentEventEnvelope | None, handler.ActionResponse | None]:
|
||||
event_payload, error = _dict_payload(data.get('event'), field_name='event')
|
||||
if error:
|
||||
return None, error
|
||||
|
||||
input_payload = event_payload.get('input', data.get('input'))
|
||||
if input_payload is None:
|
||||
input_payload = {
|
||||
'text': data.get('text'),
|
||||
'contents': [],
|
||||
'attachments': [],
|
||||
}
|
||||
if not isinstance(input_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='input must be an object')
|
||||
|
||||
delivery_payload = event_payload.get('delivery', data.get('delivery'))
|
||||
if delivery_payload is None:
|
||||
delivery_payload = {
|
||||
'surface': 'api',
|
||||
'reply_target': None,
|
||||
'supports_streaming': False,
|
||||
'supports_edit': False,
|
||||
'supports_reaction': False,
|
||||
'platform_capabilities': {},
|
||||
}
|
||||
if not isinstance(delivery_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='delivery must be an object')
|
||||
|
||||
event_data = event_payload.get('data', data.get('data'))
|
||||
if event_data is None:
|
||||
event_data = {}
|
||||
if not isinstance(event_data, dict):
|
||||
return None, handler.ActionResponse.error(message='event data must be an object')
|
||||
|
||||
event_type = str(event_payload.get('event_type') or data.get('event_type') or 'api.invoked')
|
||||
source = str(event_payload.get('source') or data.get('source') or 'api')
|
||||
event_id = str(event_payload.get('event_id') or data.get('event_id') or f'{source}:{run_id}')
|
||||
event_time = event_payload.get('event_time', data.get('event_time'))
|
||||
if event_time is None:
|
||||
event_time = int(time.time())
|
||||
|
||||
try:
|
||||
envelope = AgentEventEnvelope(
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
event_time=int(event_time) if isinstance(event_time, (int, float, str)) else None,
|
||||
source=source,
|
||||
source_event_type=event_payload.get('source_event_type') or data.get('source_event_type') or event_type,
|
||||
bot_id=event_payload.get('bot_id', data.get('bot_id')),
|
||||
workspace_id=event_payload.get('workspace_id', data.get('workspace_id')),
|
||||
conversation_id=event_payload.get('conversation_id', data.get('conversation_id')),
|
||||
thread_id=event_payload.get('thread_id', data.get('thread_id')),
|
||||
actor=event_payload.get('actor', data.get('actor')),
|
||||
subject=event_payload.get('subject', data.get('subject')),
|
||||
input=AgentInput.model_validate(input_payload),
|
||||
delivery=DeliveryContext.model_validate(delivery_payload),
|
||||
raw_ref=event_payload.get('raw_ref', data.get('raw_ref')),
|
||||
data=event_data,
|
||||
)
|
||||
except Exception as exc:
|
||||
return None, handler.ActionResponse.error(message=f'invalid event payload: {exc}')
|
||||
|
||||
return envelope, None
|
||||
|
||||
|
||||
def _build_run_create_binding(
|
||||
data: dict[str, Any],
|
||||
*,
|
||||
event: AgentEventEnvelope,
|
||||
run_id: str,
|
||||
) -> tuple[AgentBinding | None, handler.ActionResponse | None]:
|
||||
binding_payload, error = _dict_payload(data.get('binding'), field_name='binding')
|
||||
if error:
|
||||
return None, error
|
||||
|
||||
runner_id = binding_payload.get('runner_id') or data.get('runner_id')
|
||||
if not runner_id:
|
||||
return None, handler.ActionResponse.error(message='runner_id is required')
|
||||
|
||||
scope_payload = binding_payload.get('scope')
|
||||
if scope_payload is None:
|
||||
agent_id = binding_payload.get('agent_id') or data.get('agent_id')
|
||||
workspace_id = event.workspace_id
|
||||
bot_id = event.bot_id
|
||||
if agent_id:
|
||||
scope_payload = {'scope_type': 'agent', 'scope_id': agent_id}
|
||||
elif bot_id:
|
||||
scope_payload = {'scope_type': 'bot', 'scope_id': bot_id}
|
||||
elif workspace_id:
|
||||
scope_payload = {'scope_type': 'workspace', 'scope_id': workspace_id}
|
||||
else:
|
||||
scope_payload = {'scope_type': 'global', 'scope_id': None}
|
||||
if not isinstance(scope_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='binding.scope must be an object')
|
||||
|
||||
runner_config_payload = binding_payload.get('runner_config', data.get('runner_config'))
|
||||
if runner_config_payload is None:
|
||||
runner_config_payload = {}
|
||||
if not isinstance(runner_config_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='runner_config must be an object')
|
||||
|
||||
resource_policy_payload = binding_payload.get('resource_policy', data.get('resource_policy'))
|
||||
if resource_policy_payload is None:
|
||||
resource_policy_payload = {}
|
||||
if not isinstance(resource_policy_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='resource_policy must be an object')
|
||||
|
||||
state_policy_payload = binding_payload.get('state_policy', data.get('state_policy'))
|
||||
if state_policy_payload is None:
|
||||
state_policy_payload = {}
|
||||
if not isinstance(state_policy_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='state_policy must be an object')
|
||||
|
||||
delivery_policy_payload = binding_payload.get('delivery_policy', data.get('delivery_policy'))
|
||||
if delivery_policy_payload is None:
|
||||
delivery_policy_payload = {
|
||||
'enable_streaming': bool(event.delivery.supports_streaming),
|
||||
'enable_reply': False,
|
||||
}
|
||||
if not isinstance(delivery_policy_payload, dict):
|
||||
return None, handler.ActionResponse.error(message='delivery_policy must be an object')
|
||||
|
||||
try:
|
||||
binding = AgentBinding(
|
||||
binding_id=str(binding_payload.get('binding_id') or data.get('binding_id') or f'api:{runner_id}:{run_id}'),
|
||||
scope=BindingScope.model_validate(scope_payload),
|
||||
event_types=list(binding_payload.get('event_types') or data.get('event_types') or [event.event_type]),
|
||||
runner_id=str(runner_id),
|
||||
runner_config=runner_config_payload,
|
||||
resource_policy=ResourcePolicy.model_validate(resource_policy_payload),
|
||||
state_policy=StatePolicy.model_validate(state_policy_payload),
|
||||
delivery_policy=DeliveryPolicy.model_validate(delivery_policy_payload),
|
||||
enabled=bool(binding_payload.get('enabled', data.get('enabled', True))),
|
||||
agent_id=binding_payload.get('agent_id') or data.get('agent_id'),
|
||||
)
|
||||
except Exception as exc:
|
||||
return None, handler.ActionResponse.error(message=f'invalid binding payload: {exc}')
|
||||
|
||||
if event.event_type not in binding.event_types:
|
||||
return None, handler.ActionResponse.error(
|
||||
message=f'binding.event_types must include event type {event.event_type}'
|
||||
)
|
||||
|
||||
return binding, None
|
||||
|
||||
|
||||
async def _consume_programmatic_run(
|
||||
h,
|
||||
*,
|
||||
run_id: str,
|
||||
event: AgentEventEnvelope,
|
||||
binding: AgentBinding,
|
||||
bound_plugins: list[str] | None,
|
||||
) -> None:
|
||||
async for _result in h.ap.agent_run_orchestrator.run(
|
||||
event,
|
||||
binding,
|
||||
bound_plugins=bound_plugins,
|
||||
run_id=run_id,
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
def register(h):
|
||||
@h.action(_plugin_runtime_action('RUN_CREATE', 'run_create'))
|
||||
async def run_create(data: dict[str, Any]) -> handler.ActionResponse:
|
||||
"""Create a programmatic AgentRunner run from an explicit event and binding."""
|
||||
caller_plugin_identity = data.get('caller_plugin_identity')
|
||||
if not _has_agent_runner_admin_permission(
|
||||
h.ap,
|
||||
caller_plugin_identity,
|
||||
AGENT_RUN_ADMIN_PERMISSION,
|
||||
):
|
||||
return handler.ActionResponse.error(message='Run create access not authorized')
|
||||
|
||||
orchestrator = getattr(h.ap, 'agent_run_orchestrator', None)
|
||||
if orchestrator is None:
|
||||
return handler.ActionResponse.error(message='AgentRunOrchestrator is not available')
|
||||
|
||||
run_id = str(data.get('run_id') or uuid.uuid4())
|
||||
event, event_error = _build_run_create_event(data, run_id=run_id)
|
||||
if event_error:
|
||||
return event_error
|
||||
assert event is not None
|
||||
|
||||
binding, binding_error = _build_run_create_binding(data, event=event, run_id=run_id)
|
||||
if binding_error:
|
||||
return binding_error
|
||||
assert binding is not None
|
||||
|
||||
include_plugins = data.get('include_plugins')
|
||||
if include_plugins is not None and not isinstance(include_plugins, list):
|
||||
return handler.ActionResponse.error(message='include_plugins must be a list')
|
||||
bound_plugins = [str(item) for item in include_plugins] if include_plugins else None
|
||||
|
||||
registry = getattr(h.ap, 'agent_runner_registry', None)
|
||||
if registry is not None:
|
||||
try:
|
||||
await registry.get(binding.runner_id, bound_plugins)
|
||||
except Exception as exc:
|
||||
return handler.ActionResponse.error(message=f'Runner {binding.runner_id} is not available: {exc}')
|
||||
|
||||
async def background_run(*, raise_errors: bool = False) -> None:
|
||||
from ..agent.runner.run_ledger_store import RunLedgerStore
|
||||
|
||||
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
|
||||
try:
|
||||
await _consume_programmatic_run(
|
||||
h,
|
||||
run_id=run_id,
|
||||
event=event,
|
||||
binding=binding,
|
||||
bound_plugins=bound_plugins,
|
||||
)
|
||||
except Exception as exc:
|
||||
h.ap.logger.error(f'RUN_CREATE background run {run_id} failed: {exc}', exc_info=True)
|
||||
if await store.get_run(run_id) is None:
|
||||
await store.create_run(
|
||||
run_id=run_id,
|
||||
event_id=event.event_id,
|
||||
binding_id=binding.binding_id,
|
||||
runner_id=binding.runner_id,
|
||||
conversation_id=event.conversation_id,
|
||||
thread_id=event.thread_id,
|
||||
workspace_id=event.workspace_id,
|
||||
bot_id=event.bot_id,
|
||||
agent_id=binding.agent_id,
|
||||
authorization={
|
||||
'runner_id': binding.runner_id,
|
||||
'binding_id': binding.binding_id,
|
||||
'plugin_identity': None,
|
||||
'resources': {},
|
||||
'available_apis': {},
|
||||
'conversation_id': event.conversation_id,
|
||||
'bot_id': event.bot_id,
|
||||
'workspace_id': event.workspace_id,
|
||||
'thread_id': event.thread_id,
|
||||
},
|
||||
metadata={
|
||||
'event_type': event.event_type,
|
||||
'source': event.source,
|
||||
'run_create_error': True,
|
||||
},
|
||||
status='running',
|
||||
)
|
||||
await store.finalize_run(
|
||||
run_id=run_id,
|
||||
status='failed',
|
||||
status_reason=str(exc),
|
||||
metadata={'run_create_error': True},
|
||||
)
|
||||
if raise_errors:
|
||||
raise
|
||||
|
||||
if data.get('wait_for_completion'):
|
||||
try:
|
||||
await background_run(raise_errors=True)
|
||||
except Exception as exc:
|
||||
return handler.ActionResponse.error(message=f'Run create error: {exc}')
|
||||
from ..agent.runner.run_ledger_store import RunLedgerStore
|
||||
|
||||
store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine())
|
||||
run = await store.get_run(run_id)
|
||||
if run is not None:
|
||||
await _record_agent_runner_admin_action(
|
||||
h.ap,
|
||||
store,
|
||||
action='run_create',
|
||||
caller_plugin_identity=caller_plugin_identity,
|
||||
permission=AGENT_RUN_ADMIN_PERMISSION,
|
||||
durable_run_id=run_id,
|
||||
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
|
||||
)
|
||||
return handler.ActionResponse.success(data=run)
|
||||
else:
|
||||
asyncio.create_task(background_run())
|
||||
|
||||
await _record_agent_runner_admin_action(
|
||||
h.ap,
|
||||
None,
|
||||
action='run_create',
|
||||
caller_plugin_identity=caller_plugin_identity,
|
||||
permission=AGENT_RUN_ADMIN_PERMISSION,
|
||||
durable_run_id=run_id,
|
||||
detail={'runner_id': binding.runner_id, 'event_type': event.event_type},
|
||||
)
|
||||
return handler.ActionResponse.success(
|
||||
data={
|
||||
'run_id': run_id,
|
||||
'event_id': event.event_id,
|
||||
'agent_id': binding.agent_id,
|
||||
'binding_id': binding.binding_id,
|
||||
'runner_id': binding.runner_id,
|
||||
'conversation_id': event.conversation_id,
|
||||
'thread_id': event.thread_id,
|
||||
'workspace_id': event.workspace_id,
|
||||
'bot_id': event.bot_id,
|
||||
'status': 'created',
|
||||
'metadata': {
|
||||
'event_type': event.event_type,
|
||||
'source': event.source,
|
||||
'accepted': True,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@h.action(_plugin_runtime_action('RUN_GET', 'run_get'))
|
||||
async def run_get(data: dict[str, Any]) -> handler.ActionResponse:
|
||||
"""Get one Host-owned run record visible to the current run."""
|
||||
|
||||
@@ -61,7 +61,7 @@ def langbot_process(e2e_config_path, e2e_port, e2e_tmpdir):
|
||||
project_root=project_root,
|
||||
work_dir=e2e_tmpdir, # Run in tmpdir where data/config.yaml exists
|
||||
port=e2e_port,
|
||||
timeout=60, # Longer timeout for first startup
|
||||
timeout=180, # Longer timeout for first startup
|
||||
collect_coverage=collect_coverage,
|
||||
)
|
||||
|
||||
|
||||
@@ -0,0 +1,473 @@
|
||||
"""E2E tests for pluginized AgentRunner execution.
|
||||
|
||||
This module starts the real LangBot backend with the plugin system enabled and
|
||||
loads a deterministic AgentRunner plugin through the real SDK Plugin Runtime.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
import socket
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import tempfile
|
||||
import textwrap
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from tests.e2e.utils.config_factory import create_minimal_config, create_test_directories
|
||||
from tests.e2e.utils.process_manager import LangBotProcess, find_project_root
|
||||
|
||||
pytestmark = pytest.mark.e2e
|
||||
|
||||
|
||||
QA_RUNNER_ID = 'plugin:e2e/agent-runner-qa/default'
|
||||
QA_PLUGIN_DIRNAME = 'e2e__agent-runner-qa'
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def agent_runner_e2e_port():
|
||||
"""Port for the AgentRunner plugin-runtime E2E process."""
|
||||
return 15310
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def agent_runner_e2e_tmpdir():
|
||||
"""Create temporary directory for AgentRunner E2E testing."""
|
||||
tmpdir = Path(tempfile.mkdtemp(prefix='langbot_agent_runner_e2e_'))
|
||||
yield tmpdir
|
||||
shutil.rmtree(tmpdir, ignore_errors=True)
|
||||
|
||||
|
||||
def _write_qa_agent_runner_plugin(plugin_root: Path) -> None:
|
||||
"""Write a deterministic AgentRunner plugin used by this E2E."""
|
||||
runner_dir = plugin_root / 'components' / 'agent_runner'
|
||||
runner_dir.mkdir(parents=True, exist_ok=True)
|
||||
(plugin_root / 'assets').mkdir(parents=True, exist_ok=True)
|
||||
(plugin_root / 'assets' / 'icon.svg').write_text(
|
||||
'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1 1"></svg>',
|
||||
encoding='utf-8',
|
||||
)
|
||||
(plugin_root / 'manifest.yaml').write_text(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
apiVersion: langbot/v1
|
||||
kind: Plugin
|
||||
metadata:
|
||||
author: e2e
|
||||
name: agent-runner-qa
|
||||
version: 0.1.0
|
||||
label:
|
||||
en_US: AgentRunner QA
|
||||
zh_Hans: AgentRunner QA
|
||||
description:
|
||||
en_US: Deterministic AgentRunner E2E probe.
|
||||
zh_Hans: 确定性的 AgentRunner E2E 探针。
|
||||
icon: assets/icon.svg
|
||||
spec:
|
||||
version: 0.1.0
|
||||
config: []
|
||||
components:
|
||||
AgentRunner:
|
||||
fromDirs:
|
||||
- path: components/agent_runner/
|
||||
pages: []
|
||||
execution:
|
||||
python:
|
||||
path: main.py
|
||||
attr: AgentRunnerQAPlugin
|
||||
"""
|
||||
).strip()
|
||||
+ '\n',
|
||||
encoding='utf-8',
|
||||
)
|
||||
(plugin_root / 'main.py').write_text(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from langbot_plugin.api.definition.plugin import BasePlugin
|
||||
|
||||
|
||||
class AgentRunnerQAPlugin(BasePlugin):
|
||||
async def initialize(self) -> None:
|
||||
pass
|
||||
"""
|
||||
).strip()
|
||||
+ '\n',
|
||||
encoding='utf-8',
|
||||
)
|
||||
(runner_dir / 'default.yaml').write_text(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
apiVersion: langbot/v1
|
||||
kind: AgentRunner
|
||||
metadata:
|
||||
name: default
|
||||
label:
|
||||
en_US: QA Echo Runner
|
||||
zh_Hans: QA Echo Runner
|
||||
description:
|
||||
en_US: Echoes input and exercises run-scoped state APIs.
|
||||
zh_Hans: 回显输入并验证运行级状态 API。
|
||||
spec:
|
||||
config: []
|
||||
capabilities:
|
||||
streaming: false
|
||||
permissions: {}
|
||||
execution:
|
||||
python:
|
||||
path: default.py
|
||||
attr: DefaultAgentRunner
|
||||
"""
|
||||
).strip()
|
||||
+ '\n',
|
||||
encoding='utf-8',
|
||||
)
|
||||
(runner_dir / 'default.py').write_text(
|
||||
textwrap.dedent(
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from langbot_plugin.api.definition.components.agent_runner.runner import AgentRunner
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.result import AgentRunResult
|
||||
from langbot_plugin.api.entities.builtin.provider.message import Message
|
||||
|
||||
|
||||
class DefaultAgentRunner(AgentRunner):
|
||||
async def run(self, ctx: AgentRunContext) -> AsyncGenerator[AgentRunResult, None]:
|
||||
text = ctx.input.to_text()
|
||||
yield AgentRunResult.message_completed(
|
||||
ctx.run_id,
|
||||
Message(role='assistant', content=f'e2e echo: {text}'),
|
||||
)
|
||||
yield AgentRunResult.state_updated(
|
||||
ctx.run_id,
|
||||
'e2e.echo_count',
|
||||
{'count': 1},
|
||||
scope='conversation',
|
||||
)
|
||||
yield AgentRunResult.run_completed(ctx.run_id, finish_reason='stop')
|
||||
"""
|
||||
).strip()
|
||||
+ '\n',
|
||||
encoding='utf-8',
|
||||
)
|
||||
|
||||
|
||||
def _free_port() -> int:
|
||||
"""Reserve a currently-free localhost TCP port for this E2E process."""
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.bind(('127.0.0.1', 0))
|
||||
return int(sock.getsockname()[1])
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def agent_runner_runtime_ports():
|
||||
"""Control/debug ports for the standalone plugin runtime."""
|
||||
control_port = _free_port()
|
||||
debug_port = _free_port()
|
||||
while debug_port == control_port:
|
||||
debug_port = _free_port()
|
||||
return control_port, debug_port
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def agent_runner_e2e_config_path(agent_runner_e2e_tmpdir, agent_runner_e2e_port, agent_runner_runtime_ports):
|
||||
"""Create a plugin-enabled config and deterministic AgentRunner fixture."""
|
||||
config_path = create_minimal_config(agent_runner_e2e_tmpdir, port=agent_runner_e2e_port)
|
||||
create_test_directories(agent_runner_e2e_tmpdir)
|
||||
|
||||
import yaml
|
||||
|
||||
with open(config_path, encoding='utf-8') as f:
|
||||
config = yaml.safe_load(f)
|
||||
config['api']['global_api_key'] = 'e2e-agent-runner-key'
|
||||
runtime_control_port, _runtime_debug_port = agent_runner_runtime_ports
|
||||
config['plugin']['enable'] = True
|
||||
config['plugin']['runtime_ws_url'] = f'ws://127.0.0.1:{runtime_control_port}/control/ws'
|
||||
config['plugin']['enable_marketplace'] = False
|
||||
config['box']['enabled'] = False
|
||||
config['system']['jwt']['secret'] = 'e2e-agent-runner-secret-key'
|
||||
with open(config_path, 'w', encoding='utf-8') as f:
|
||||
yaml.safe_dump(config, f, default_flow_style=False)
|
||||
|
||||
_write_qa_agent_runner_plugin(agent_runner_e2e_tmpdir / 'data' / 'plugins' / QA_PLUGIN_DIRNAME)
|
||||
return config_path
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def agent_runner_runtime_process(agent_runner_e2e_tmpdir, agent_runner_runtime_ports):
|
||||
"""Start the real SDK plugin runtime over WebSocket."""
|
||||
control_port, debug_port = agent_runner_runtime_ports
|
||||
stdout_path = agent_runner_e2e_tmpdir / 'plugin-runtime.stdout.log'
|
||||
stderr_path = agent_runner_e2e_tmpdir / 'plugin-runtime.stderr.log'
|
||||
stdout_file = open(stdout_path, 'wb')
|
||||
stderr_file = open(stderr_path, 'wb')
|
||||
proc = subprocess.Popen(
|
||||
[
|
||||
str(find_project_root() / '.venv' / 'bin' / 'python'),
|
||||
'-m',
|
||||
'langbot_plugin.cli.__init__',
|
||||
'rt',
|
||||
'--ws-control-port',
|
||||
str(control_port),
|
||||
'--ws-debug-port',
|
||||
str(debug_port),
|
||||
],
|
||||
cwd=agent_runner_e2e_tmpdir,
|
||||
stdout=stdout_file,
|
||||
stderr=stderr_file,
|
||||
start_new_session=True,
|
||||
)
|
||||
yield proc
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
stdout_file.close()
|
||||
stderr_file.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def agent_runner_langbot_process(
|
||||
agent_runner_e2e_config_path,
|
||||
agent_runner_e2e_port,
|
||||
agent_runner_e2e_tmpdir,
|
||||
agent_runner_runtime_process,
|
||||
):
|
||||
"""Start real LangBot with plugin runtime enabled."""
|
||||
project_root = find_project_root()
|
||||
proc = LangBotProcess(
|
||||
project_root=project_root,
|
||||
work_dir=agent_runner_e2e_tmpdir,
|
||||
port=agent_runner_e2e_port,
|
||||
timeout=180,
|
||||
debug=True,
|
||||
cli_args=['--standalone-runtime'],
|
||||
)
|
||||
|
||||
success = proc.start()
|
||||
if not success:
|
||||
stdout, stderr = proc.get_logs()
|
||||
pytest.fail(f'LangBot failed to start with AgentRunner plugin runtime:\nstdout: {stdout}\nstderr: {stderr}')
|
||||
|
||||
yield proc
|
||||
|
||||
proc.stop()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def agent_runner_client(agent_runner_e2e_port, agent_runner_langbot_process):
|
||||
"""HTTP client for the AgentRunner E2E backend."""
|
||||
with httpx.Client(
|
||||
base_url=f'http://127.0.0.1:{agent_runner_e2e_port}',
|
||||
timeout=90.0,
|
||||
trust_env=False,
|
||||
) as client:
|
||||
yield client
|
||||
|
||||
|
||||
def _init_and_auth(client: httpx.Client) -> str:
|
||||
"""Initialize the test admin user and return a bearer token."""
|
||||
init_resp = client.post('/api/v1/user/init', json={'user': 'admin', 'password': 'admin'})
|
||||
assert init_resp.status_code == 200
|
||||
assert init_resp.json()['code'] in [0, 1]
|
||||
|
||||
auth_resp = client.post('/api/v1/user/auth', json={'user': 'admin', 'password': 'admin'})
|
||||
assert auth_resp.status_code == 200
|
||||
payload = auth_resp.json()
|
||||
assert payload['code'] == 0
|
||||
return payload['data']['token']
|
||||
|
||||
|
||||
def test_plugin_runtime_discovers_agent_runner(agent_runner_client, agent_runner_langbot_process):
|
||||
"""Pipeline metadata should include the real runtime-discovered QA runner."""
|
||||
token = _init_and_auth(agent_runner_client)
|
||||
start = time.time()
|
||||
while time.time() - start < 60:
|
||||
response = agent_runner_client.get(
|
||||
'/api/v1/pipelines/_/metadata',
|
||||
headers={'Authorization': f'Bearer {token}'},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data['code'] == 0
|
||||
metadata_groups = data['data']['configs']
|
||||
ai_metadata = next(group for group in metadata_groups if group.get('name') == 'ai')
|
||||
|
||||
runner_stage = next(stage for stage in ai_metadata['stages'] if stage['name'] == 'runner')
|
||||
runner_select = next(item for item in runner_stage['config'] if item['name'] == 'id')
|
||||
option_names = {option['name'] for option in runner_select['options']}
|
||||
if QA_RUNNER_ID in option_names:
|
||||
return
|
||||
time.sleep(2)
|
||||
|
||||
assert QA_RUNNER_ID in option_names
|
||||
|
||||
def test_host_orchestrator_runs_agent_runner_and_records_ledger(
|
||||
agent_runner_e2e_config_path,
|
||||
agent_runner_e2e_tmpdir,
|
||||
agent_runner_runtime_process,
|
||||
):
|
||||
"""The Host orchestrator should run the pluginized runner and persist run side effects."""
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from langbot.pkg.agent.runner.host_models import (
|
||||
AgentBinding,
|
||||
AgentEventEnvelope,
|
||||
BindingScope,
|
||||
DeliveryPolicy,
|
||||
StatePolicy,
|
||||
)
|
||||
from langbot.pkg.core import boot
|
||||
from langbot.pkg.utils import platform as platform_utils
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext, SubjectContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
|
||||
|
||||
async def _run_probe():
|
||||
previous_cwd = Path.cwd()
|
||||
previous_standalone_runtime = platform_utils.standalone_runtime
|
||||
os.chdir(agent_runner_e2e_tmpdir)
|
||||
platform_utils.standalone_runtime = True
|
||||
ap = None
|
||||
try:
|
||||
ap = await boot.make_app(asyncio.get_running_loop())
|
||||
for _ in range(60):
|
||||
handler = getattr(ap.plugin_connector, 'handler', None)
|
||||
if handler is not None:
|
||||
await handler.ping()
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
raise AssertionError('Plugin runtime did not connect')
|
||||
|
||||
for _ in range(60):
|
||||
runners = await ap.agent_runner_registry.list_runners(use_cache=False)
|
||||
if any(runner.id == QA_RUNNER_ID for runner in runners):
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
raise AssertionError(f'{QA_RUNNER_ID} was not discovered')
|
||||
|
||||
event = AgentEventEnvelope(
|
||||
event_id='e2e-orchestrator-event-001',
|
||||
event_type='message.received',
|
||||
source='api',
|
||||
conversation_id='e2e-conversation',
|
||||
thread_id='e2e-thread',
|
||||
actor=ActorContext(actor_type='user', actor_id='user-001', actor_name='E2E User'),
|
||||
subject=SubjectContext(subject_type='chat', subject_id='chat-001'),
|
||||
input=AgentInput(text='hello from orchestrator e2e'),
|
||||
delivery=DeliveryContext(surface='e2e'),
|
||||
)
|
||||
binding = AgentBinding(
|
||||
binding_id='e2e-binding',
|
||||
scope=BindingScope(scope_type='global'),
|
||||
runner_id=QA_RUNNER_ID,
|
||||
state_policy=StatePolicy(enable_state=True, state_scopes=['conversation']),
|
||||
delivery_policy=DeliveryPolicy(enable_streaming=False, enable_reply=True),
|
||||
)
|
||||
return [message async for message in ap.agent_run_orchestrator.run(event, binding)]
|
||||
finally:
|
||||
if ap is not None:
|
||||
ap.dispose()
|
||||
platform_utils.standalone_runtime = previous_standalone_runtime
|
||||
os.chdir(previous_cwd)
|
||||
|
||||
messages = asyncio.run(_run_probe())
|
||||
|
||||
assert len(messages) == 1
|
||||
assert messages[0].role == 'assistant'
|
||||
assert messages[0].content == 'e2e echo: hello from orchestrator e2e'
|
||||
|
||||
db_path = agent_runner_e2e_tmpdir / 'data' / 'langbot.db'
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
try:
|
||||
run_row = conn.execute(
|
||||
"SELECT status, runner_id FROM agent_run WHERE event_id = 'e2e-orchestrator-event-001'"
|
||||
).fetchone()
|
||||
assert run_row == ('completed', QA_RUNNER_ID)
|
||||
|
||||
event_types = {
|
||||
row[0]
|
||||
for row in conn.execute(
|
||||
"SELECT type FROM agent_run_event WHERE run_id = (SELECT run_id FROM agent_run WHERE event_id = 'e2e-orchestrator-event-001')"
|
||||
).fetchall()
|
||||
}
|
||||
assert {'state.updated', 'message.completed', 'run.completed'}.issubset(event_types)
|
||||
|
||||
state_row = conn.execute(
|
||||
"SELECT value_json FROM agent_runner_state WHERE state_key = 'e2e.echo_count'"
|
||||
).fetchone()
|
||||
assert state_row is not None
|
||||
assert '"count": 1' in state_row[0]
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_pluginized_agent_runner_executes_through_runtime(agent_runner_client, agent_runner_langbot_process):
|
||||
"""The Host debug surface should invoke the QA runner through the real Plugin Runtime."""
|
||||
token = _init_and_auth(agent_runner_client)
|
||||
start = time.time()
|
||||
while time.time() - start < 60:
|
||||
metadata_response = agent_runner_client.get(
|
||||
'/api/v1/pipelines/_/metadata',
|
||||
headers={'Authorization': f'Bearer {token}'},
|
||||
)
|
||||
assert metadata_response.status_code == 200
|
||||
metadata = metadata_response.json()['data']['configs']
|
||||
ai_metadata = next(group for group in metadata if group.get('name') == 'ai')
|
||||
runner_stage = next(stage for stage in ai_metadata['stages'] if stage['name'] == 'runner')
|
||||
runner_select = next(item for item in runner_stage['config'] if item['name'] == 'id')
|
||||
if QA_RUNNER_ID in {option['name'] for option in runner_select['options']}:
|
||||
break
|
||||
time.sleep(2)
|
||||
else:
|
||||
pytest.fail(f'{QA_RUNNER_ID} was not discovered before run_agent')
|
||||
|
||||
response = agent_runner_client.post(
|
||||
'/api/v1/system/debug/plugin/action',
|
||||
headers={'Authorization': f'Bearer {token}'},
|
||||
json={
|
||||
'action': 'run_agent',
|
||||
'timeout': 60,
|
||||
'data': {
|
||||
'plugin_author': 'e2e',
|
||||
'plugin_name': 'agent-runner-qa',
|
||||
'runner_name': 'default',
|
||||
'context': {
|
||||
'run_id': 'e2e-run-001',
|
||||
'trigger': {'type': 'message.received'},
|
||||
'event': {
|
||||
'event_id': 'e2e-event-001',
|
||||
'event_type': 'message.received',
|
||||
'source': 'api',
|
||||
},
|
||||
'input': {'text': 'hello from real e2e'},
|
||||
'delivery': {'surface': 'e2e'},
|
||||
'resources': {},
|
||||
'runtime': {},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
payload = response.json()
|
||||
assert payload['code'] == 0
|
||||
result = payload['data']
|
||||
assert result['type'] == 'message.completed', result
|
||||
assert result['data']['message']['role'] == 'assistant'
|
||||
assert result['data']['message']['content'] == 'e2e echo: hello from real e2e'
|
||||
@@ -141,6 +141,27 @@ def create_minimal_config(tmpdir: Path, port: int = 15300) -> Path:
|
||||
'delete_batch_size': 1000,
|
||||
},
|
||||
},
|
||||
'box': {
|
||||
'enabled': False,
|
||||
'backend': 'local',
|
||||
'runtime': {
|
||||
'endpoint': '',
|
||||
},
|
||||
'local': {
|
||||
'profile': 'default',
|
||||
'image': '',
|
||||
'host_root': str(tmpdir / 'box'),
|
||||
'default_workspace': '',
|
||||
'skills_root': 'skills',
|
||||
'allowed_mount_roots': [str(tmpdir / 'box'), '/tmp'],
|
||||
'workspace_quota_mb': None,
|
||||
},
|
||||
'e2b': {
|
||||
'api_key': '',
|
||||
'api_url': '',
|
||||
'template': '',
|
||||
},
|
||||
},
|
||||
'space': {
|
||||
'url': 'https://space.langbot.app',
|
||||
'models_gateway_api_url': 'https://api.langbot.cloud/v1',
|
||||
@@ -168,8 +189,12 @@ def create_test_directories(tmpdir: Path) -> dict[str, Path]:
|
||||
"""Create necessary directories for LangBot testing."""
|
||||
directories = {
|
||||
'data': tmpdir / 'data',
|
||||
'labels': tmpdir / 'data' / 'labels',
|
||||
'metadata': tmpdir / 'data' / 'metadata',
|
||||
'logs': tmpdir / 'logs',
|
||||
'data_logs': tmpdir / 'data' / 'logs',
|
||||
'storage': tmpdir / 'storage',
|
||||
'box': tmpdir / 'box',
|
||||
'chroma': tmpdir / 'chroma',
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import subprocess
|
||||
import time
|
||||
import signal
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
import logging
|
||||
@@ -26,13 +27,21 @@ class LangBotProcess:
|
||||
port: int = 15300,
|
||||
timeout: int = 30,
|
||||
collect_coverage: bool = True,
|
||||
debug: bool = False,
|
||||
cli_args: list[str] | None = None,
|
||||
):
|
||||
self.project_root = project_root
|
||||
self.work_dir = work_dir # Directory containing data/config.yaml
|
||||
self.port = port
|
||||
self.timeout = timeout
|
||||
self.collect_coverage = collect_coverage
|
||||
self.debug = debug
|
||||
self.cli_args = cli_args or []
|
||||
self.process: Optional[subprocess.Popen] = None
|
||||
self._stdout_file = None
|
||||
self._stderr_file = None
|
||||
self._stdout_path = self.work_dir / 'langbot.stdout.log'
|
||||
self._stderr_path = self.work_dir / 'langbot.stderr.log'
|
||||
self._stdout_data: bytes = b''
|
||||
self._stderr_data: bytes = b''
|
||||
self._coverage_file: Optional[Path] = None
|
||||
@@ -63,13 +72,14 @@ class LangBotProcess:
|
||||
# Disable telemetry
|
||||
env['SPACE__DISABLE_TELEMETRY'] = 'true'
|
||||
env['SPACE__DISABLE_MODELS_SERVICE'] = 'true'
|
||||
if self.debug:
|
||||
env['DEBUG'] = 'true'
|
||||
|
||||
# Build command
|
||||
if self.collect_coverage:
|
||||
# Use coverage.py to collect coverage data
|
||||
# Set COVERAGE_PROCESS_START to enable coverage in subprocess
|
||||
self._coverage_file = self.work_dir / '.coverage.e2e'
|
||||
env['COVERAGE_PROCESS_START'] = str(self.project_root / '.coveragerc')
|
||||
env['COVERAGE_FILE'] = str(self._coverage_file)
|
||||
|
||||
# Create .coveragerc for subprocess
|
||||
@@ -88,27 +98,33 @@ precision = 2
|
||||
coveragerc_path = self.work_dir / '.coveragerc'
|
||||
with open(coveragerc_path, 'w') as f:
|
||||
f.write(coveragerc_content)
|
||||
env['COVERAGE_PROCESS_START'] = str(coveragerc_path)
|
||||
|
||||
cmd = [
|
||||
sys.executable,
|
||||
'-m',
|
||||
'coverage',
|
||||
'run',
|
||||
'--rcfile=' + str(coveragerc_path),
|
||||
'-m',
|
||||
'langbot',
|
||||
*self.cli_args,
|
||||
]
|
||||
else:
|
||||
cmd = ['uv', 'run', 'python', '-m', 'langbot']
|
||||
cmd = [sys.executable, '-m', 'langbot', *self.cli_args]
|
||||
|
||||
logger.info(f'Starting LangBot in: {self.work_dir}')
|
||||
logger.info(f'Command: {cmd}')
|
||||
|
||||
# Start process (run in work_dir so it finds data/config.yaml)
|
||||
self._stdout_file = open(self._stdout_path, 'wb')
|
||||
self._stderr_file = open(self._stderr_path, 'wb')
|
||||
self.process = subprocess.Popen(
|
||||
cmd,
|
||||
cwd=self.work_dir,
|
||||
env=env,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
stdout=self._stdout_file,
|
||||
stderr=self._stderr_file,
|
||||
preexec_fn=os.setsid if os.name != 'nt' else None,
|
||||
)
|
||||
|
||||
@@ -117,7 +133,14 @@ precision = 2
|
||||
while time.time() - start_time < self.timeout:
|
||||
# Check if process died
|
||||
if self.process.poll() is not None:
|
||||
self._stdout_data, self._stderr_data = self.process.communicate()
|
||||
if self._stdout_file:
|
||||
self._stdout_file.close()
|
||||
self._stdout_file = None
|
||||
if self._stderr_file:
|
||||
self._stderr_file.close()
|
||||
self._stderr_file = None
|
||||
self._stdout_data = self._stdout_path.read_bytes() if self._stdout_path.exists() else b''
|
||||
self._stderr_data = self._stderr_path.read_bytes() if self._stderr_path.exists() else b''
|
||||
logger.error(f'LangBot process died: {self._stderr_data.decode()}')
|
||||
return False
|
||||
|
||||
@@ -170,8 +193,14 @@ precision = 2
|
||||
self.process.wait()
|
||||
|
||||
# Collect output for debugging
|
||||
if self.process.stdout or self.process.stderr:
|
||||
self._stdout_data, self._stderr_data = self.process.communicate()
|
||||
if self._stdout_file:
|
||||
self._stdout_file.close()
|
||||
self._stdout_file = None
|
||||
if self._stderr_file:
|
||||
self._stderr_file.close()
|
||||
self._stderr_file = None
|
||||
self._stdout_data = self._stdout_path.read_bytes() if self._stdout_path.exists() else b''
|
||||
self._stderr_data = self._stderr_path.read_bytes() if self._stderr_path.exists() else b''
|
||||
|
||||
self.process = None
|
||||
|
||||
@@ -183,6 +212,10 @@ precision = 2
|
||||
"""Get stdout and stderr logs."""
|
||||
stdout = self._stdout_data.decode('utf-8', errors='replace')
|
||||
stderr = self._stderr_data.decode('utf-8', errors='replace')
|
||||
if not stdout and self._stdout_path.exists():
|
||||
stdout = self._stdout_path.read_text(encoding='utf-8', errors='replace')
|
||||
if not stderr and self._stderr_path.exists():
|
||||
stderr = self._stderr_path.read_text(encoding='utf-8', errors='replace')
|
||||
return stdout, stderr
|
||||
|
||||
def get_coverage_file(self) -> Optional[Path]:
|
||||
|
||||
@@ -33,12 +33,11 @@ class FakeConnection:
|
||||
|
||||
|
||||
class FakeApplication:
|
||||
def __init__(self, db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
|
||||
def __init__(self, db_engine, admin_plugins=None, runner_registry=None):
|
||||
self.logger = MagicMock()
|
||||
self.persistence_mgr = MagicMock()
|
||||
self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine)
|
||||
self.agent_runner_registry = runner_registry
|
||||
self.agent_run_orchestrator = orchestrator
|
||||
self.instance_config = SimpleNamespace(
|
||||
data={
|
||||
'agent_runner': {
|
||||
@@ -73,66 +72,16 @@ class FakeRunnerRegistry:
|
||||
self.runners = runners
|
||||
self.calls = []
|
||||
|
||||
async def get(self, runner_id, bound_plugins=None):
|
||||
self.calls.append({'runner_id': runner_id, 'bound_plugins': bound_plugins})
|
||||
if isinstance(self.runners, dict):
|
||||
if runner_id not in self.runners:
|
||||
raise KeyError(runner_id)
|
||||
return self.runners[runner_id]
|
||||
for runner in self.runners:
|
||||
if getattr(runner, 'id', None) == runner_id:
|
||||
return runner
|
||||
raise KeyError(runner_id)
|
||||
|
||||
async def list_runners(self, *, bound_plugins=None, use_cache=True):
|
||||
self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache})
|
||||
return self.runners
|
||||
|
||||
|
||||
class FakeProgrammaticOrchestrator:
|
||||
def __init__(self, db_engine):
|
||||
self.db_engine = db_engine
|
||||
self.calls = []
|
||||
|
||||
async def run(self, event, binding, bound_plugins=None, adapter_context=None, run_id=None):
|
||||
self.calls.append(
|
||||
{
|
||||
'event': event,
|
||||
'binding': binding,
|
||||
'bound_plugins': bound_plugins,
|
||||
'adapter_context': adapter_context,
|
||||
'run_id': run_id,
|
||||
}
|
||||
)
|
||||
store = RunLedgerStore(self.db_engine)
|
||||
await store.create_run(
|
||||
run_id=run_id,
|
||||
event_id=event.event_id,
|
||||
binding_id=binding.binding_id,
|
||||
runner_id=binding.runner_id,
|
||||
conversation_id=event.conversation_id,
|
||||
thread_id=event.thread_id,
|
||||
workspace_id=event.workspace_id,
|
||||
bot_id=event.bot_id,
|
||||
authorization={'available_apis': {}},
|
||||
metadata={'event_type': event.event_type, 'source': event.source},
|
||||
status='running',
|
||||
)
|
||||
await store.finalize_run(run_id=run_id, status='completed', status_reason='done')
|
||||
if False:
|
||||
yield None
|
||||
|
||||
|
||||
def _handler(db_engine, admin_plugins=None, runner_registry=None, orchestrator=None):
|
||||
def _handler(db_engine, admin_plugins=None, runner_registry=None):
|
||||
async def fake_disconnect():
|
||||
return True
|
||||
|
||||
fake_app = FakeApplication(
|
||||
db_engine,
|
||||
admin_plugins=admin_plugins,
|
||||
runner_registry=runner_registry,
|
||||
orchestrator=orchestrator,
|
||||
)
|
||||
fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins, runner_registry=runner_registry)
|
||||
return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app)
|
||||
|
||||
|
||||
@@ -1311,64 +1260,6 @@ async def test_runtime_register_heartbeat_and_list_actions(session_registry, db_
|
||||
assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1']
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_admin_run_create_starts_programmatic_run(db_engine):
|
||||
orchestrator = FakeProgrammaticOrchestrator(db_engine)
|
||||
runner_id = 'plugin:test/runner/default'
|
||||
registry = FakeRunnerRegistry({runner_id: SimpleNamespace(id=runner_id)})
|
||||
handler = _handler(
|
||||
db_engine,
|
||||
admin_plugins=[{'identity': 'admin/plugin', 'permissions': ['agent_run:admin']}],
|
||||
runner_registry=registry,
|
||||
orchestrator=orchestrator,
|
||||
)
|
||||
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
|
||||
|
||||
result = await run_create(
|
||||
{
|
||||
'caller_plugin_identity': 'admin/plugin',
|
||||
'run_id': 'run_programmatic',
|
||||
'runner_id': runner_id,
|
||||
'input': {'text': 'work on issue 1', 'contents': [], 'attachments': []},
|
||||
'conversation_id': 'conv_issue_board',
|
||||
'event_type': 'api.invoked',
|
||||
'wait_for_completion': True,
|
||||
}
|
||||
)
|
||||
|
||||
assert result.code == 0
|
||||
assert result.data['run_id'] == 'run_programmatic'
|
||||
assert result.data['runner_id'] == runner_id
|
||||
assert result.data['status'] == 'completed'
|
||||
assert len(orchestrator.calls) == 1
|
||||
call = orchestrator.calls[0]
|
||||
assert call['run_id'] == 'run_programmatic'
|
||||
assert call['event'].event_type == 'api.invoked'
|
||||
assert call['event'].source == 'api'
|
||||
assert call['event'].conversation_id == 'conv_issue_board'
|
||||
assert call['binding'].runner_id == runner_id
|
||||
assert call['binding'].delivery_policy.enable_reply is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_create_requires_admin_permission(db_engine):
|
||||
orchestrator = FakeProgrammaticOrchestrator(db_engine)
|
||||
handler = _handler(db_engine, orchestrator=orchestrator)
|
||||
run_create = handler.actions[PluginToRuntimeAction.RUN_CREATE.value]
|
||||
|
||||
result = await run_create(
|
||||
{
|
||||
'caller_plugin_identity': 'regular/plugin',
|
||||
'runner_id': 'plugin:test/runner/default',
|
||||
'input': {'text': 'work'},
|
||||
}
|
||||
)
|
||||
|
||||
assert result.code != 0
|
||||
assert 'not authorized' in result.message
|
||||
assert orchestrator.calls == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_claim_renew_and_release_actions(session_registry, db_engine):
|
||||
await _register_session(
|
||||
|
||||
@@ -0,0 +1,309 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime as dt
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from langbot.pkg.api.http.service.agent import (
|
||||
AGENT_DEFAULT_EVENT_PATTERNS,
|
||||
AGENT_KIND_AGENT,
|
||||
AGENT_KIND_PIPELINE,
|
||||
PIPELINE_EVENT_PATTERNS,
|
||||
AgentService,
|
||||
)
|
||||
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
def _result(items: list | None = None, first_item=None):
|
||||
result = Mock()
|
||||
result.all = Mock(return_value=items or [])
|
||||
result.first = Mock(return_value=first_item)
|
||||
return result
|
||||
|
||||
|
||||
def _agent_row(
|
||||
agent_uuid: str = 'agent-1',
|
||||
name: str = 'Test Agent',
|
||||
updated_at: dt.datetime | str | None = None,
|
||||
config: dict | None = None,
|
||||
supported_event_patterns: list[str] | None = None,
|
||||
):
|
||||
return SimpleNamespace(
|
||||
uuid=agent_uuid,
|
||||
name=name,
|
||||
description='Agent description',
|
||||
emoji='A',
|
||||
kind=AGENT_KIND_AGENT,
|
||||
component_ref='plugin:test/runner/default',
|
||||
config=config or {
|
||||
'runner': {'id': 'plugin:test/runner/default', 'expire-time': 0},
|
||||
'runner_config': {'plugin:test/runner/default': {'temperature': 0.2}},
|
||||
},
|
||||
enabled=True,
|
||||
supported_event_patterns=supported_event_patterns or ['*'],
|
||||
created_at=dt.datetime(2026, 1, 1, 9, 0, 0),
|
||||
updated_at=updated_at or dt.datetime(2026, 1, 1, 10, 0, 0),
|
||||
)
|
||||
|
||||
|
||||
def _serialize_agent(model_cls, entity, masked_columns=None):
|
||||
return {
|
||||
'uuid': entity.uuid,
|
||||
'name': entity.name,
|
||||
'description': entity.description,
|
||||
'emoji': entity.emoji,
|
||||
'kind': entity.kind,
|
||||
'component_ref': entity.component_ref,
|
||||
'config': entity.config,
|
||||
'enabled': entity.enabled,
|
||||
'supported_event_patterns': entity.supported_event_patterns,
|
||||
'created_at': entity.created_at,
|
||||
'updated_at': entity.updated_at,
|
||||
}
|
||||
|
||||
|
||||
def _compiled_params(statement):
|
||||
return statement.compile().params
|
||||
|
||||
|
||||
def _compiled_update_values(statement):
|
||||
return {
|
||||
key: value
|
||||
for key, value in statement.compile().params.items()
|
||||
if not key.startswith('uuid_')
|
||||
}
|
||||
|
||||
|
||||
def _make_app():
|
||||
app = SimpleNamespace()
|
||||
app.persistence_mgr = SimpleNamespace(
|
||||
execute_async=AsyncMock(),
|
||||
serialize_model=Mock(side_effect=_serialize_agent),
|
||||
)
|
||||
app.pipeline_service = SimpleNamespace(
|
||||
get_pipeline_metadata=AsyncMock(return_value=[]),
|
||||
get_pipelines=AsyncMock(return_value=[]),
|
||||
get_pipeline=AsyncMock(return_value=None),
|
||||
create_pipeline=AsyncMock(),
|
||||
update_pipeline=AsyncMock(),
|
||||
delete_pipeline=AsyncMock(),
|
||||
_get_default_values_from_schema=Mock(return_value={}),
|
||||
)
|
||||
app.agent_runner_registry = None
|
||||
app.logger = Mock()
|
||||
return app
|
||||
|
||||
|
||||
class TestAgentServiceMetadata:
|
||||
async def test_get_agent_metadata_exposes_runner_config_and_kind_capabilities(self):
|
||||
app = _make_app()
|
||||
ai_metadata = {'name': 'ai', 'stages': [{'name': 'runner'}]}
|
||||
app.pipeline_service.get_pipeline_metadata = AsyncMock(
|
||||
return_value=[{'name': 'trigger'}, ai_metadata, {'name': 'output'}]
|
||||
)
|
||||
|
||||
metadata = await AgentService(app).get_agent_metadata()
|
||||
|
||||
assert metadata['runner_config'] == ai_metadata
|
||||
assert metadata['kinds'] == [
|
||||
{
|
||||
'name': AGENT_KIND_AGENT,
|
||||
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
|
||||
'message_only': False,
|
||||
},
|
||||
{
|
||||
'name': AGENT_KIND_PIPELINE,
|
||||
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
|
||||
'message_only': True,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class TestAgentServiceListAndLookup:
|
||||
async def test_get_agents_merges_agents_and_pipelines_without_leaking_config(self):
|
||||
app = _make_app()
|
||||
app.persistence_mgr.execute_async = AsyncMock(
|
||||
return_value=_result(
|
||||
items=[
|
||||
_agent_row(
|
||||
agent_uuid='agent-1',
|
||||
updated_at=dt.datetime(2026, 1, 1, 10, 0, 0),
|
||||
supported_event_patterns=['platform.member.*'],
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
app.pipeline_service.get_pipelines = AsyncMock(
|
||||
return_value=[
|
||||
{
|
||||
'uuid': 'pipeline-1',
|
||||
'name': 'Pipeline Agent',
|
||||
'description': 'Legacy pipeline',
|
||||
'emoji': 'P',
|
||||
'config': {'ai': {'runner': {'id': 'pipeline-runner'}}},
|
||||
'created_at': '2026-01-01T08:00:00',
|
||||
'updated_at': '2026-01-01T11:00:00',
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
agents = await AgentService(app).get_agents(sort_by='updated_at', sort_order='DESC')
|
||||
|
||||
assert [agent['uuid'] for agent in agents] == ['pipeline-1', 'agent-1']
|
||||
assert agents[0]['kind'] == AGENT_KIND_PIPELINE
|
||||
assert agents[0]['component_ref'] == 'pipeline'
|
||||
assert agents[0]['capability'] == {
|
||||
'supported_event_patterns': PIPELINE_EVENT_PATTERNS,
|
||||
'message_only': True,
|
||||
}
|
||||
assert agents[1]['kind'] == AGENT_KIND_AGENT
|
||||
assert agents[1]['capability'] == {
|
||||
'supported_event_patterns': ['platform.member.*'],
|
||||
'message_only': False,
|
||||
}
|
||||
assert all('config' not in agent for agent in agents)
|
||||
|
||||
async def test_get_agent_returns_agent_with_config_before_pipeline_fallback(self):
|
||||
app = _make_app()
|
||||
agent = _agent_row(agent_uuid='agent-1')
|
||||
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=agent))
|
||||
|
||||
result = await AgentService(app).get_agent('agent-1')
|
||||
|
||||
assert result['uuid'] == 'agent-1'
|
||||
assert result['kind'] == AGENT_KIND_AGENT
|
||||
assert result['config'] == agent.config
|
||||
app.pipeline_service.get_pipeline.assert_not_awaited()
|
||||
|
||||
async def test_get_agent_falls_back_to_pipeline_product_item_with_config(self):
|
||||
app = _make_app()
|
||||
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=None))
|
||||
app.pipeline_service.get_pipeline = AsyncMock(
|
||||
return_value={
|
||||
'uuid': 'pipeline-1',
|
||||
'name': 'Pipeline Agent',
|
||||
'description': 'Legacy pipeline',
|
||||
'emoji': 'P',
|
||||
'config': {'ai': {'runner': {'id': 'pipeline-runner'}}},
|
||||
'created_at': '2026-01-01T08:00:00',
|
||||
'updated_at': '2026-01-01T11:00:00',
|
||||
}
|
||||
)
|
||||
|
||||
result = await AgentService(app).get_agent('pipeline-1')
|
||||
|
||||
assert result['kind'] == AGENT_KIND_PIPELINE
|
||||
assert result['enabled'] is True
|
||||
assert result['config'] == {'ai': {'runner': {'id': 'pipeline-runner'}}}
|
||||
assert result['capability']['message_only'] is True
|
||||
|
||||
|
||||
class TestAgentServiceCreateUpdateDelete:
|
||||
async def test_create_agent_uses_default_runner_config_from_registry(self):
|
||||
app = _make_app()
|
||||
runner = SimpleNamespace(
|
||||
id='plugin:langbot/local-agent/default',
|
||||
config_schema=[
|
||||
{'name': 'model', 'default': 'gpt-4.1'},
|
||||
{'name': 'temperature', 'default': 0.2},
|
||||
{'name': 'no-default'},
|
||||
],
|
||||
)
|
||||
app.agent_runner_registry = SimpleNamespace(list_runners=AsyncMock(return_value=[runner]))
|
||||
app.pipeline_service._get_default_values_from_schema = Mock(
|
||||
return_value={'model': 'gpt-4.1', 'temperature': 0.2}
|
||||
)
|
||||
app.persistence_mgr.execute_async = AsyncMock(return_value=Mock())
|
||||
|
||||
result = await AgentService(app).create_agent(
|
||||
{
|
||||
'name': 'Support Agent',
|
||||
'description': 'Handles support events',
|
||||
'emoji': 'S',
|
||||
}
|
||||
)
|
||||
|
||||
insert_values = _compiled_params(app.persistence_mgr.execute_async.await_args.args[0])
|
||||
assert result['kind'] == AGENT_KIND_AGENT
|
||||
assert result['uuid'] == insert_values['uuid']
|
||||
assert insert_values['name'] == 'Support Agent'
|
||||
assert insert_values['component_ref'] == runner.id
|
||||
assert insert_values['config'] == {
|
||||
'runner': {'id': runner.id, 'expire-time': 0},
|
||||
'runner_config': {runner.id: {'model': 'gpt-4.1', 'temperature': 0.2}},
|
||||
}
|
||||
assert insert_values['enabled'] is True
|
||||
assert insert_values['supported_event_patterns'] == AGENT_DEFAULT_EVENT_PATTERNS
|
||||
app.pipeline_service._get_default_values_from_schema.assert_called_once_with(runner.config_schema)
|
||||
|
||||
async def test_update_agent_protects_immutable_fields_and_recalculates_component_ref(self):
|
||||
app = _make_app()
|
||||
app.persistence_mgr.execute_async = AsyncMock(
|
||||
side_effect=[
|
||||
_result(first_item=_agent_row(agent_uuid='agent-1')),
|
||||
Mock(),
|
||||
]
|
||||
)
|
||||
new_config = {
|
||||
'runner': {'id': 'plugin:test/new-runner/default', 'expire-time': 0},
|
||||
'runner_config': {'plugin:test/new-runner/default': {'timeout': 30}},
|
||||
}
|
||||
|
||||
await AgentService(app).update_agent(
|
||||
'agent-1',
|
||||
{
|
||||
'uuid': 'caller-owned-uuid',
|
||||
'kind': AGENT_KIND_PIPELINE,
|
||||
'created_at': '2020-01-01T00:00:00',
|
||||
'updated_at': '2020-01-01T00:00:00',
|
||||
'capability': {'message_only': True},
|
||||
'name': 'Updated Agent',
|
||||
'config': new_config,
|
||||
'supported_event_patterns': [],
|
||||
},
|
||||
)
|
||||
|
||||
update_values = _compiled_update_values(app.persistence_mgr.execute_async.await_args_list[1].args[0])
|
||||
assert update_values == {
|
||||
'name': 'Updated Agent',
|
||||
'config': new_config,
|
||||
'supported_event_patterns': AGENT_DEFAULT_EVENT_PATTERNS,
|
||||
'component_ref': 'plugin:test/new-runner/default',
|
||||
}
|
||||
|
||||
async def test_pipeline_kind_create_update_delete_delegate_to_pipeline_service(self):
|
||||
app = _make_app()
|
||||
app.persistence_mgr.execute_async = AsyncMock(return_value=_result(first_item=None))
|
||||
app.pipeline_service.create_pipeline = AsyncMock(return_value='pipeline-created')
|
||||
app.pipeline_service.get_pipeline = AsyncMock(return_value={'uuid': 'pipeline-1'})
|
||||
service = AgentService(app)
|
||||
|
||||
created = await service.create_agent(
|
||||
{
|
||||
'kind': AGENT_KIND_PIPELINE,
|
||||
'name': 'Pipeline Agent',
|
||||
'description': 'Legacy pipeline',
|
||||
'emoji': 'P',
|
||||
}
|
||||
)
|
||||
await service.update_agent('pipeline-1', {'name': 'Updated Pipeline'})
|
||||
await service.delete_agent('pipeline-1')
|
||||
|
||||
assert created == {'uuid': 'pipeline-created', 'kind': AGENT_KIND_PIPELINE}
|
||||
app.pipeline_service.create_pipeline.assert_awaited_once_with(
|
||||
{
|
||||
'name': 'Pipeline Agent',
|
||||
'description': 'Legacy pipeline',
|
||||
'emoji': 'P',
|
||||
'config': {},
|
||||
}
|
||||
)
|
||||
app.pipeline_service.update_pipeline.assert_awaited_once_with(
|
||||
'pipeline-1',
|
||||
{'name': 'Updated Pipeline'},
|
||||
)
|
||||
app.pipeline_service.delete_pipeline.assert_awaited_once_with('pipeline-1')
|
||||
@@ -52,6 +52,15 @@ def _create_mock_result(items: list = None, first_item=None):
|
||||
return result
|
||||
|
||||
|
||||
def _compiled_update_values(statement):
|
||||
"""Return update values without SQLAlchemy WHERE bind params."""
|
||||
return {
|
||||
key: value
|
||||
for key, value in statement.compile().params.items()
|
||||
if not key.startswith('uuid_')
|
||||
}
|
||||
|
||||
|
||||
def _create_mock_discover(adapter_webhook_flags: dict[str, bool] = None):
|
||||
"""Create mock ComponentDiscoveryEngine exposing MessagePlatformAdapter manifests.
|
||||
|
||||
@@ -531,6 +540,201 @@ class TestBotServiceUpdateBot:
|
||||
assert update_params['use_pipeline_name'] == 'Updated Pipeline'
|
||||
|
||||
|
||||
class TestBotServiceEventBindings:
|
||||
"""Tests for EBA event binding validation and persistence."""
|
||||
|
||||
async def test_normalize_event_bindings_validates_targets_and_preserves_order(self):
|
||||
"""Valid bindings are normalized with stable order and target data."""
|
||||
ap = SimpleNamespace()
|
||||
ap.persistence_mgr = SimpleNamespace()
|
||||
ap.persistence_mgr.execute_async = AsyncMock(
|
||||
side_effect=[
|
||||
_create_mock_result(first_item=SimpleNamespace(uuid='pipeline-1')),
|
||||
_create_mock_result(
|
||||
first_item=SimpleNamespace(
|
||||
uuid='agent-1',
|
||||
supported_event_patterns=['platform.member.*'],
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
service = BotService(ap)
|
||||
|
||||
normalized = await service._normalize_event_bindings(
|
||||
[
|
||||
{
|
||||
'event_pattern': ' message.received ',
|
||||
'target_type': 'pipeline',
|
||||
'target_uuid': ' pipeline-1 ',
|
||||
'filters': [{'field': 'sender.id', 'operator': 'eq', 'value': '1000'}],
|
||||
'priority': '5',
|
||||
'enabled': False,
|
||||
'description': 'Route message events',
|
||||
},
|
||||
{
|
||||
'id': 'agent-binding',
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-1',
|
||||
'priority': 7,
|
||||
},
|
||||
{
|
||||
'event_pattern': 'platform.member.left',
|
||||
'target_type': 'discard',
|
||||
'target_uuid': 'ignored-target',
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
uuid.UUID(normalized[0]['id'])
|
||||
assert normalized == [
|
||||
{
|
||||
'id': normalized[0]['id'],
|
||||
'event_pattern': 'message.received',
|
||||
'target_type': 'pipeline',
|
||||
'target_uuid': 'pipeline-1',
|
||||
'filters': [{'field': 'sender.id', 'operator': 'eq', 'value': '1000'}],
|
||||
'priority': 5,
|
||||
'enabled': False,
|
||||
'description': 'Route message events',
|
||||
'order': 0,
|
||||
},
|
||||
{
|
||||
'id': 'agent-binding',
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-1',
|
||||
'filters': [],
|
||||
'priority': 7,
|
||||
'enabled': True,
|
||||
'description': '',
|
||||
'order': 1,
|
||||
},
|
||||
{
|
||||
'id': normalized[2]['id'],
|
||||
'event_pattern': 'platform.member.left',
|
||||
'target_type': 'discard',
|
||||
'target_uuid': '',
|
||||
'filters': [],
|
||||
'priority': 0,
|
||||
'enabled': True,
|
||||
'description': '',
|
||||
'order': 2,
|
||||
},
|
||||
]
|
||||
|
||||
async def test_normalize_event_bindings_rejects_pipeline_for_non_message_event(self):
|
||||
"""Pipeline targets are limited to message events."""
|
||||
ap = SimpleNamespace()
|
||||
ap.persistence_mgr = SimpleNamespace(execute_async=AsyncMock())
|
||||
service = BotService(ap)
|
||||
|
||||
with pytest.raises(ValueError, match='Pipeline can only be bound to message events'):
|
||||
await service._normalize_event_bindings(
|
||||
[
|
||||
{
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'pipeline',
|
||||
'target_uuid': 'pipeline-1',
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
ap.persistence_mgr.execute_async.assert_not_awaited()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
('agent', 'error'),
|
||||
[
|
||||
(None, 'Agent not found'),
|
||||
(
|
||||
SimpleNamespace(uuid='agent-1', supported_event_patterns=['message.*']),
|
||||
'Agent does not support this event pattern',
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_normalize_event_bindings_rejects_invalid_agent_target(self, agent, error):
|
||||
"""Agent targets must exist and support the requested event pattern."""
|
||||
ap = SimpleNamespace()
|
||||
ap.persistence_mgr = SimpleNamespace(
|
||||
execute_async=AsyncMock(return_value=_create_mock_result(first_item=agent))
|
||||
)
|
||||
service = BotService(ap)
|
||||
|
||||
with pytest.raises(ValueError, match=error):
|
||||
await service._normalize_event_bindings(
|
||||
[
|
||||
{
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-1',
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
async def test_update_bot_persists_normalized_event_bindings_and_reloads_runtime_bot(self):
|
||||
"""update_bot stores normalized bindings before reloading the runtime bot."""
|
||||
ap = SimpleNamespace()
|
||||
ap.persistence_mgr = SimpleNamespace()
|
||||
ap.persistence_mgr.execute_async = AsyncMock(
|
||||
side_effect=[
|
||||
_create_mock_result(
|
||||
first_item=SimpleNamespace(
|
||||
uuid='agent-1',
|
||||
supported_event_patterns=['platform.member.*'],
|
||||
)
|
||||
),
|
||||
Mock(),
|
||||
]
|
||||
)
|
||||
runtime_bot = SimpleNamespace(enable=True, run=AsyncMock())
|
||||
loaded_bot = {'uuid': 'bot-1', 'name': 'Bot with bindings'}
|
||||
ap.platform_mgr = SimpleNamespace(
|
||||
remove_bot=AsyncMock(),
|
||||
load_bot=AsyncMock(return_value=runtime_bot),
|
||||
)
|
||||
bot_session = SimpleNamespace(using_conversation=SimpleNamespace(bot_uuid='bot-1'))
|
||||
other_session = SimpleNamespace(using_conversation=SimpleNamespace(bot_uuid='other-bot'))
|
||||
ap.sess_mgr = SimpleNamespace(session_list=[bot_session, other_session])
|
||||
service = BotService(ap)
|
||||
service.get_bot = AsyncMock(return_value=loaded_bot)
|
||||
|
||||
await service.update_bot(
|
||||
'bot-1',
|
||||
{
|
||||
'event_bindings': [
|
||||
{
|
||||
'id': 'binding-1',
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-1',
|
||||
'priority': '9',
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
update_values = _compiled_update_values(ap.persistence_mgr.execute_async.await_args_list[1].args[0])
|
||||
assert update_values['event_bindings'] == [
|
||||
{
|
||||
'id': 'binding-1',
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-1',
|
||||
'filters': [],
|
||||
'priority': 9,
|
||||
'enabled': True,
|
||||
'description': '',
|
||||
'order': 0,
|
||||
}
|
||||
]
|
||||
ap.platform_mgr.remove_bot.assert_awaited_once_with('bot-1')
|
||||
service.get_bot.assert_awaited_once_with('bot-1')
|
||||
ap.platform_mgr.load_bot.assert_awaited_once_with(loaded_bot)
|
||||
runtime_bot.run.assert_awaited_once()
|
||||
assert bot_session.using_conversation is None
|
||||
assert other_session.using_conversation.bot_uuid == 'other-bot'
|
||||
|
||||
|
||||
class TestBotServiceDeleteBot:
|
||||
"""Tests for delete_bot method."""
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
RuntimeBot.resolve_pipeline_uuid and _match_operator unit tests
|
||||
"""
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import Mock
|
||||
|
||||
|
||||
@@ -278,3 +279,108 @@ class TestResolvePipelineUuid:
|
||||
uuid, routed = bot.resolve_pipeline_uuid('person', '123', 'normal message')
|
||||
assert uuid == 'default-uuid'
|
||||
assert routed is False
|
||||
|
||||
|
||||
class TestEBAEventBindings:
|
||||
"""Test RuntimeBot EBA event binding helpers."""
|
||||
|
||||
@staticmethod
|
||||
def _make_bot(bindings):
|
||||
from langbot.pkg.platform.botmgr import RuntimeBot
|
||||
|
||||
bot = object.__new__(RuntimeBot)
|
||||
bot.bot_entity = SimpleNamespace(event_bindings=bindings)
|
||||
return bot
|
||||
|
||||
def test_resolve_eba_event_binding_uses_enabled_pattern_filters_priority_and_order(self):
|
||||
"""The selected binding is the first matching highest-priority binding."""
|
||||
bot = self._make_bot(
|
||||
[
|
||||
{
|
||||
'id': 'disabled',
|
||||
'enabled': False,
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-disabled',
|
||||
'priority': 100,
|
||||
'order': 0,
|
||||
},
|
||||
{
|
||||
'id': 'wrong-room',
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-wrong-room',
|
||||
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-2'}],
|
||||
'priority': 50,
|
||||
'order': 1,
|
||||
},
|
||||
{
|
||||
'id': 'first-high',
|
||||
'event_pattern': 'platform.member.joined',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-first',
|
||||
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-1'}],
|
||||
'priority': 10,
|
||||
'order': 2,
|
||||
},
|
||||
{
|
||||
'id': 'second-high',
|
||||
'event_pattern': 'platform.member.*',
|
||||
'target_type': 'agent',
|
||||
'target_uuid': 'agent-second',
|
||||
'filters': [{'field': 'room.id', 'operator': 'eq', 'value': 'room-1'}],
|
||||
'priority': 10,
|
||||
'order': 3,
|
||||
},
|
||||
{
|
||||
'id': 'fallback',
|
||||
'event_pattern': '*',
|
||||
'target_type': 'discard',
|
||||
'priority': 1,
|
||||
'order': 4,
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
selected = bot._resolve_eba_event_binding(
|
||||
{'room': {'id': 'room-1'}},
|
||||
'platform.member.joined',
|
||||
)
|
||||
|
||||
assert selected['id'] == 'first-high'
|
||||
assert selected['target_uuid'] == 'agent-first'
|
||||
|
||||
def test_agent_product_to_binding_projects_runner_config_and_policies(self):
|
||||
"""Agent products become bot-scoped runner bindings for EBA dispatch."""
|
||||
from langbot.pkg.platform.botmgr import RuntimeBot
|
||||
|
||||
binding = RuntimeBot._agent_product_to_binding(
|
||||
{
|
||||
'uuid': 'agent-1',
|
||||
'component_ref': 'plugin:test/fallback/default',
|
||||
'config': {
|
||||
'runner': {'id': 'plugin:test/runner/default'},
|
||||
'runner_config': {
|
||||
'plugin:test/runner/default': {
|
||||
'temperature': 0.2,
|
||||
'max_tokens': 1000,
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
{'id': 'binding-1'},
|
||||
'platform.member.joined',
|
||||
'bot-1',
|
||||
)
|
||||
|
||||
assert binding is not None
|
||||
assert binding.binding_id == 'bot:bot-1:binding-1'
|
||||
assert binding.scope.scope_type == 'bot'
|
||||
assert binding.scope.scope_id == 'bot-1'
|
||||
assert binding.event_types == ['platform.member.joined']
|
||||
assert binding.runner_id == 'plugin:test/runner/default'
|
||||
assert binding.runner_config == {'temperature': 0.2, 'max_tokens': 1000}
|
||||
assert binding.delivery_policy.enable_streaming is False
|
||||
assert binding.delivery_policy.enable_reply is True
|
||||
assert binding.state_policy.state_scopes == ['conversation', 'actor', 'subject', 'runner']
|
||||
assert binding.agent_id == 'agent-1'
|
||||
|
||||
Reference in New Issue
Block a user