feat: support dynamic agent runner defaults

This commit is contained in:
huanghuoguoguo
2026-05-16 09:35:40 +08:00
parent 7bc211d582
commit 711f12d71f
16 changed files with 981 additions and 117 deletions

View File

@@ -1,4 +1,5 @@
"""Tests for pipeline config migration to new runner format."""
from __future__ import annotations
import json
@@ -149,7 +150,7 @@ class TestDefaultPipelineConfig:
"""Tests for default-pipeline-config.json format."""
def test_default_config_is_new_format(self):
"""Default pipeline config should use new format."""
"""Default pipeline template should use the new runner config shape."""
from langbot.pkg.utils import paths as path_utils
template_path = path_utils.get_resource_path('templates/default-pipeline-config.json')
@@ -160,27 +161,25 @@ class TestDefaultPipelineConfig:
assert 'ai' in config
assert 'runner' in config['ai']
assert 'id' in config['ai']['runner']
assert config['ai']['runner']['id'] == 'plugin:langbot/local-agent/default'
assert config['ai']['runner']['id'] == ''
# Should have runner_config with local-agent default
# Plugin runner selection and config defaults are rendered at creation
# time from installed AgentRunner metadata.
assert 'runner_config' in config['ai']
assert 'plugin:langbot/local-agent/default' in config['ai']['runner_config']
assert config['ai']['runner_config'] == {}
# Should NOT have old local-agent key
assert 'local-agent' not in config['ai']
def test_default_config_has_model_config(self):
"""Default config should have model config in runner_config."""
def test_default_config_does_not_hardcode_plugin_schema(self):
"""Default template should not duplicate plugin-provided config schema."""
from langbot.pkg.utils import paths as path_utils
template_path = path_utils.get_resource_path('templates/default-pipeline-config.json')
with open(template_path, 'r', encoding='utf-8') as f:
config = json.load(f)
runner_config = config['ai']['runner_config']['plugin:langbot/local-agent/default']
assert 'model' in runner_config
assert 'max-round' in runner_config
assert 'prompt' in runner_config
assert config['ai']['runner_config'] == {}
class TestResolveRunnerIdBackwardCompat:
@@ -242,9 +241,7 @@ class TestResolveRunnerConfigBackwardCompat:
},
},
}
runner_config = ConfigMigration.resolve_runner_config(
config, 'plugin:langbot/local-agent/default'
)
runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default')
assert runner_config['max-round'] == 20
def test_resolve_old_format_config(self):
@@ -254,9 +251,7 @@ class TestResolveRunnerConfigBackwardCompat:
'local-agent': {'max-round': 15},
},
}
runner_config = ConfigMigration.resolve_runner_config(
config, 'plugin:langbot/local-agent/default'
)
runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default')
assert runner_config['max-round'] == 15
def test_resolve_new_format_priority(self):
@@ -269,7 +264,5 @@ class TestResolveRunnerConfigBackwardCompat:
'local-agent': {'max-round': 10}, # Old, should be ignored
},
}
runner_config = ConfigMigration.resolve_runner_config(
config, 'plugin:langbot/local-agent/default'
)
assert runner_config['max-round'] == 25
runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default')
assert runner_config['max-round'] == 25

View File

@@ -13,9 +13,11 @@ Authorization paths:
from __future__ import annotations
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import types
from unittest.mock import AsyncMock, MagicMock
from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry
from langbot.pkg.plugin.handler import _build_tool_detail
# Import shared test fixtures from conftest.py
from .conftest import make_resources
@@ -114,10 +116,6 @@ class MockDisconnectCallback:
return True
# Import ActionResponse for checking responses
from langbot_plugin.runtime.io import handler
class TestInvokeLLMAuthorization:
"""Tests for INVOKE_LLM authorization."""
@@ -238,6 +236,33 @@ class TestInvokeLLMStreamAuthorization:
assert run_id is None
def test_build_tool_detail_normalizes_plugin_component_manifest():
"""GET_TOOL_DETAIL returns a uniform schema for ordinary plugin Tool manifests."""
manifest_tool = types.SimpleNamespace(
metadata=types.SimpleNamespace(
name='search',
label={'en_US': 'Search'},
description={'en_US': 'Search public data'},
),
spec={
'llm_prompt': 'Search test data',
'parameters': {
'type': 'object',
'properties': {'q': {'type': 'string'}},
},
},
)
detail = _build_tool_detail(manifest_tool, requested_tool_name='author/plugin/search')
assert detail['name'] == 'author/plugin/search'
assert detail['description'] == 'Search test data'
assert detail['human_desc'] == 'Search test data'
assert detail['parameters']['properties']['q']['type'] == 'string'
assert detail['label'] == {'en_US': 'Search'}
assert detail['spec'] == manifest_tool.spec
class TestCallToolAuthorization:
"""Tests for CALL_TOOL authorization."""
@@ -559,8 +584,6 @@ class TestHandlerActionAuthorization:
@pytest.mark.asyncio
async def test_invoke_llm_handler_authorized_path(self):
"""INVOKE_LLM handler: authorized when model in resources."""
from langbot_plugin.runtime.io import handler as io_handler
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
@@ -822,8 +845,6 @@ class TestSDKAgentRunAPIProxyFieldConsistency:
"""RETRIEVE_KNOWLEDGE_BASE: SDK fields match Host handler."""
# SDK agent_run_api.py lines 178-183
sdk_fields = ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']
# Host handler.py lines 863-867
host_fields = ['query_id', 'kb_id', 'query_text', 'top_k', 'filters', 'run_id']
# Note: query_id is from query context, not SDK proxy
for field in ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']:
@@ -934,6 +955,7 @@ class TestSessionExpiryAndCleanup:
# Check session status
started_at = session['status']['started_at']
last_activity = session['status']['last_activity_at']
assert last_activity >= started_at
# Session should be valid initially
current_time = int(time.time())
@@ -964,6 +986,7 @@ class TestSessionExpiryAndCleanup:
# Note: This won't actually cleanup because session is just created
# We need to manually test cleanup logic
cleaned = await registry.cleanup_stale_sessions(max_age_seconds=0)
assert isinstance(cleaned, int)
# Session should still exist (it was just created)
# With max_age=0, sessions with last_activity > 0 seconds ago would be cleaned
@@ -1974,4 +1997,4 @@ class TestBackwardCompatStorageNoRunId:
raise AssertionError('Should not execute validation')
# File access unrestricted for regular plugins
assert run_id is None
assert run_id is None

View File

@@ -0,0 +1,368 @@
"""Integration-style tests for AgentRunOrchestrator with a fake plugin runner."""
from __future__ import annotations
import datetime
import types
from unittest.mock import AsyncMock
import pytest
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.agent.runner.errors import RunnerExecutionError
from langbot.pkg.agent.runner.context_builder import AgentRunContextBuilder
from langbot.pkg.agent.runner.orchestrator import AgentRunOrchestrator
from langbot.pkg.agent.runner.session_registry import get_session_registry
from langbot.pkg.agent.runner.state_store import get_state_store, reset_state_store
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from langbot_plugin.api.entities.builtin.provider import session as provider_session
from langbot_plugin.api.entities.builtin.resource import tool as resource_tool
RUNNER_ID = "plugin:langbot/local-agent/default"
class FakeLogger:
def debug(self, msg):
pass
def info(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
class FakeVersionManager:
def get_current_version(self):
return "test-version"
class FakeModel:
def __init__(self, model_type: str = "chat"):
self.model_entity = types.SimpleNamespace(model_type=model_type)
self.provider_entity = types.SimpleNamespace(name="fake-provider")
class FakeKnowledgeBase:
def __init__(self, kb_id: str):
self.kb_id = kb_id
self.knowledge_base_entity = types.SimpleNamespace(kb_type="fake")
def get_name(self):
return f"KB {self.kb_id}"
class FakePluginConnector:
is_enable_plugin = True
def __init__(self, results=None, error: Exception | None = None):
self.results = results or []
self.error = error
self.calls: list[dict] = []
self.contexts: list[dict] = []
self.sessions_during_run: list[dict | None] = []
async def run_agent(self, plugin_author, plugin_name, runner_name, context):
self.calls.append(
{
"plugin_author": plugin_author,
"plugin_name": plugin_name,
"runner_name": runner_name,
}
)
self.contexts.append(context)
self.sessions_during_run.append(await get_session_registry().get(context["run_id"]))
if self.error:
raise self.error
for result in self.results:
yield result
class FakeRegistry:
def __init__(self, descriptor: AgentRunnerDescriptor):
self.descriptor = descriptor
self.calls: list[dict] = []
async def get(self, runner_id, bound_plugins=None):
self.calls.append({"runner_id": runner_id, "bound_plugins": bound_plugins})
assert runner_id == self.descriptor.id
return self.descriptor
class FakeApplication:
def __init__(self, plugin_connector: FakePluginConnector):
self.logger = FakeLogger()
self.ver_mgr = FakeVersionManager()
self.plugin_connector = plugin_connector
self.model_mgr = types.SimpleNamespace(
get_model_by_uuid=AsyncMock(return_value=FakeModel())
)
self.rag_mgr = types.SimpleNamespace(
get_knowledge_base_by_uuid=AsyncMock(return_value=FakeKnowledgeBase("kb_001"))
)
class FakeConversation:
uuid = "conv_existing"
create_time = datetime.datetime(2026, 5, 15, 12, 0, 0)
def make_descriptor() -> AgentRunnerDescriptor:
return AgentRunnerDescriptor(
id=RUNNER_ID,
source="plugin",
label={"en_US": "Local Agent"},
plugin_author="langbot",
plugin_name="local-agent",
runner_name="default",
protocol_version="1",
capabilities={"streaming": True, "tool_calling": True},
permissions={
"models": ["invoke", "stream"],
"tools": ["list", "detail", "call"],
"knowledge_bases": ["list", "retrieve"],
"storage": ["plugin"],
"files": [],
},
)
def make_query():
async def fake_func(**kwargs):
return kwargs
message_chain = platform_message.MessageChain(
[
platform_message.Source(
id="msg_001",
time=datetime.datetime(2026, 5, 15, 12, 0, 0),
),
platform_message.Plain(text="hello"),
platform_message.File(name="spec.txt", url="https://example.com/spec.txt"),
]
)
sender = platform_entities.Friend(id="user_001", nickname="Alice", remark=None)
message_event = platform_events.FriendMessage(sender=sender, message_chain=message_chain, time=1_784_098_800.0)
session = types.SimpleNamespace(
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id="user_001",
sender_id="user_001",
using_conversation=FakeConversation(),
)
return types.SimpleNamespace(
query_id=1001,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id="user_001",
sender_id="user_001",
message_event=message_event,
message_chain=message_chain,
bot_uuid="bot_001",
pipeline_uuid="pipeline_001",
pipeline_config={
"ai": {
"runner": {"id": RUNNER_ID},
"runner_config": {
RUNNER_ID: {
"model": {"primary": "model_primary", "fallbacks": ["model_fallback"]},
"knowledge-bases": ["kb_001"],
"timeout": 30,
},
},
},
},
session=session,
messages=[],
user_message=provider_message.Message(
role="user",
content=[
provider_message.ContentElement.from_text("hello"),
provider_message.ContentElement.from_file_url("https://example.com/spec.txt", "spec.txt"),
],
),
variables={
"_pipeline_bound_plugins": ["langbot/local-agent"],
"_fallback_model_uuids": ["model_fallback"],
"public_param": "visible",
},
use_llm_model_uuid="model_primary",
use_funcs=[
resource_tool.LLMTool(
name="langbot/test-tool/search",
human_desc="Search",
description="Search test data",
parameters={"type": "object", "properties": {"q": {"type": "string"}}},
func=fake_func,
)
],
)
def test_context_builder_includes_consumable_base64_attachments():
builder = AgentRunContextBuilder(ap=types.SimpleNamespace())
query = make_query()
query.user_message = provider_message.Message(
role="user",
content=[
provider_message.ContentElement.from_text("see attached"),
provider_message.ContentElement.from_image_base64("data:image/png;base64,aGVsbG8="),
provider_message.ContentElement.from_file_base64("data:text/plain;base64,aGVsbG8=", "hello.txt"),
],
)
query.message_chain = platform_message.MessageChain(
[platform_message.Image(base64="data:image/jpeg;base64,aGVsbG8=")]
)
input_data = builder._build_input(query)
attachments = input_data["attachments"]
image_attachment = next(item for item in attachments if item["type"] == "image" and item["source"] == "base64")
file_attachment = next(item for item in attachments if item["type"] == "file" and item["source"] == "base64")
chain_attachment = next(item for item in attachments if item["source"] == "message_chain")
assert image_attachment["content"] == "data:image/png;base64,aGVsbG8="
assert image_attachment["content_type"] == "image/png"
assert file_attachment["content"] == "data:text/plain;base64,aGVsbG8="
assert file_attachment["content_type"] == "text/plain"
assert file_attachment["name"] == "hello.txt"
assert chain_attachment["content"] == "data:image/jpeg;base64,aGVsbG8="
assert chain_attachment["content_type"] == "image/jpeg"
@pytest.fixture(autouse=True)
async def clean_agent_state():
reset_state_store()
registry = get_session_registry()
for session in await registry.list_active_runs():
await registry.unregister(session["run_id"])
yield
for session in await registry.list_active_runs():
await registry.unregister(session["run_id"])
reset_state_store()
@pytest.mark.asyncio
async def test_orchestrator_runs_fake_plugin_with_authorized_context():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "message.completed",
"data": {"message": {"role": "assistant", "content": "fake response"}},
}
]
)
ap = FakeApplication(plugin_connector)
orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor))
query = make_query()
messages = [message async for message in orchestrator.run_from_query(query)]
assert len(messages) == 1
assert messages[0].content == "fake response"
assert plugin_connector.calls == [
{
"plugin_author": "langbot",
"plugin_name": "local-agent",
"runner_name": "default",
}
]
context = plugin_connector.contexts[0]
assert context["config"]["timeout"] == 30
assert context["runtime"]["deadline_at"] is not None
assert context["params"] == {"public_param": "visible"}
assert context["event"]["event_type"] == "FriendMessage"
assert context["actor"]["actor_id"] == "user_001"
assert context["actor"]["actor_name"] == "Alice"
assert context["subject"]["subject_id"] == "msg_001"
assert context["input"]["attachments"]
resources = context["resources"]
assert {m["model_id"] for m in resources["models"]} == {"model_primary", "model_fallback"}
assert resources["tools"][0]["tool_name"] == "langbot/test-tool/search"
assert resources["knowledge_bases"][0]["kb_id"] == "kb_001"
assert resources["storage"]["plugin_storage"] is True
session_during_run = plugin_connector.sessions_during_run[0]
assert session_during_run is not None
assert session_during_run["plugin_identity"] == "langbot/local-agent"
assert session_during_run["_authorized_ids"]["tool"] == {"langbot/test-tool/search"}
assert await get_session_registry().get(context["run_id"]) is None
@pytest.mark.asyncio
async def test_orchestrator_streams_fake_plugin_deltas():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{"type": "message.delta", "data": {"chunk": {"role": "assistant", "content": "hel"}}},
{"type": "message.delta", "data": {"chunk": {"role": "assistant", "content": "hello"}}},
{"type": "run.completed", "data": {"finish_reason": "stop"}},
]
)
orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor))
chunks = [message async for message in orchestrator.run_from_query(make_query())]
assert [chunk.content for chunk in chunks] == ["hel", "hello"]
@pytest.mark.asyncio
async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "state.updated",
"data": {
"scope": "conversation",
"key": "external.conversation_id",
"value": "external_conv_123",
},
},
{
"type": "message.completed",
"data": {"message": {"role": "assistant", "content": "state saved"}},
},
]
)
orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor))
query = make_query()
messages = [message async for message in orchestrator.run_from_query(query)]
assert [message.content for message in messages] == ["state saved"]
assert query.session.using_conversation.uuid == "external_conv_123"
snapshot = get_state_store().build_snapshot(query, descriptor)
assert snapshot["conversation"]["external.conversation_id"] == "external_conv_123"
@pytest.mark.asyncio
async def test_orchestrator_unregisters_session_after_runner_failure():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "run.failed",
"data": {"error": "boom", "code": "fake.error", "retryable": False},
}
]
)
orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor))
with pytest.raises(RunnerExecutionError):
[message async for message in orchestrator.run_from_query(make_query())]
context = plugin_connector.contexts[0]
assert plugin_connector.sessions_during_run[0] is not None
assert await get_session_registry().get(context["run_id"]) is None

View File

@@ -1,4 +1,5 @@
"""Tests for agent runner registry."""
from __future__ import annotations
import pytest
@@ -10,14 +11,18 @@ from langbot.pkg.agent.runner.errors import RunnerNotFoundError, RunnerNotAuthor
class FakeApplication:
"""Fake Application for testing."""
def __init__(self):
class FakeLogger:
def info(self, msg):
pass
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
@@ -234,13 +239,11 @@ class TestRegistryMetadataForPipeline:
assert 'plugin:langbot/local-agent/default' in option_ids
assert 'plugin:alice/my-agent/custom' in option_ids
# Should have stages for runners with config
# Note: stages may be empty if config_schema is empty list
# In real scenarios, runners with config_schema will generate stages
# Only runners with non-empty config_schema generate stages
# mock data has config: [{'name': 'param1', 'type': 'string'}] for alice/my-agent
# but config is now taken from runner_data.get('config', [])
assert len(stages) >= 0 # Can be 0 if all runners have empty config
# Should fall back to manifest.spec.config when runtime does not return
# extracted config at top level.
assert len(stages) == 1
assert stages[0]['name'] == 'plugin:alice/my-agent/custom'
assert stages[0]['config'] == [{'name': 'param1', 'type': 'string'}]
class TestDescriptorValidation:
@@ -275,4 +278,4 @@ class TestDescriptorValidation:
assert descriptor.supports_streaming() is True
assert descriptor.supports_tool_calling() is False
assert descriptor.supports_knowledge_retrieval() is False
assert descriptor.supports_knowledge_retrieval() is False

View File

@@ -0,0 +1,80 @@
"""Tests for dynamic default pipeline config rendering."""
from __future__ import annotations
from types import SimpleNamespace
import pytest
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.api.http.service.pipeline import PipelineService
class FakeLogger:
def warning(self, msg):
pass
class FakeRegistry:
def __init__(self, runners):
self.runners = runners
async def list_runners(self, bound_plugins=None):
return self.runners
def make_runner(runner_id: str, config_schema: list[dict]):
parts = runner_id.removeprefix('plugin:').split('/')
return AgentRunnerDescriptor(
id=runner_id,
source='plugin',
label={'en_US': runner_id},
plugin_author=parts[0],
plugin_name=parts[1],
runner_name=parts[2],
config_schema=config_schema,
)
@pytest.mark.asyncio
async def test_default_pipeline_config_uses_installed_local_agent_schema():
local_agent = make_runner(
'plugin:langbot/local-agent/default',
[
{'name': 'model', 'type': 'model-fallback-selector', 'default': {'primary': '', 'fallbacks': []}},
{'name': 'max-round', 'type': 'integer', 'default': 10},
{'name': 'prompt', 'type': 'prompt-editor', 'default': [{'role': 'system', 'content': 'Hello'}]},
],
)
custom_agent = make_runner(
'plugin:alice/custom-agent/default',
[{'name': 'api-key', 'type': 'string', 'default': ''}],
)
ap = SimpleNamespace(
logger=FakeLogger(),
agent_runner_registry=FakeRegistry([custom_agent, local_agent]),
)
config = await PipelineService(ap).get_default_pipeline_config()
assert config['ai']['runner']['id'] == 'plugin:langbot/local-agent/default'
assert config['ai']['runner_config'] == {
'plugin:langbot/local-agent/default': {
'model': {'primary': '', 'fallbacks': []},
'max-round': 10,
'prompt': [{'role': 'system', 'content': 'Hello'}],
},
}
@pytest.mark.asyncio
async def test_default_pipeline_config_stays_neutral_without_installed_runners():
ap = SimpleNamespace(
logger=FakeLogger(),
agent_runner_registry=FakeRegistry([]),
)
config = await PipelineService(ap).get_default_pipeline_config()
assert config['ai']['runner']['id'] == ''
assert config['ai']['runner_config'] == {}

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
@@ -158,6 +159,28 @@ async def test_openai_embedding_call_overrides_placeholder_api_key():
assert usage_info == {'prompt_tokens': 3, 'total_tokens': 3}
@pytest.mark.asyncio
async def test_model_manager_initialize_skips_space_sync_after_timeout():
ap = SimpleNamespace()
ap.discover = SimpleNamespace(get_components_by_kind=Mock(return_value=[]))
ap.instance_config = SimpleNamespace(data={'space': {'models_sync_timeout': 0.01}})
ap.logger = Mock()
mgr = ModelManager(ap)
mgr.load_models_from_db = AsyncMock()
async def slow_sync():
await asyncio.sleep(1)
mgr.sync_new_models_from_space = AsyncMock(side_effect=slow_sync)
await mgr.initialize()
mgr.load_models_from_db.assert_awaited_once()
mgr.sync_new_models_from_space.assert_awaited_once()
ap.logger.warning.assert_any_call('LangBot Space model sync timed out after 0.01s, skipping startup sync.')
@pytest.mark.asyncio
async def test_updated_llm_model_is_immediately_usable_by_local_agent_pipeline():
from langbot.pkg.api.http.service.model import LLMModelsService