feat: support dynamic agent runner defaults

This commit is contained in:
huanghuoguoguo
2026-05-16 09:35:40 +08:00
committed by huanghuoguoguo
parent 3baf899c20
commit 6d0e6dcc63
16 changed files with 1051 additions and 117 deletions
+6 -5
View File
@@ -4,7 +4,7 @@
## 总体进度 ## 总体进度
**当前阶段**: Phase 3 进行中 **当前阶段**: Phase 3 已完成,Phase 4 预留/部分上下文字段已填充
| Phase | 描述 | 状态 | | Phase | 描述 | 状态 |
|-------|------|------| |-------|------|------|
@@ -12,7 +12,7 @@
| Phase 1 | 核心架构(Registry、Orchestrator、上下文模型) | ✅ 完成 | | Phase 1 | 核心架构(Registry、Orchestrator、上下文模型) | ✅ 完成 |
| Phase 2 | 权限、能力声明、资源注入 | ✅ 完成 | | Phase 2 | 权限、能力声明、资源注入 | ✅ 完成 |
| Phase 3 | 内置 runner 迁移到插件 | ✅ 完成(7/7) | | Phase 3 | 内置 runner 迁移到插件 | ✅ 完成(7/7) |
| Phase 4 | EBA 事件支持 | 🔲 未开始 | | Phase 4 | EBA 事件支持 | 🔲 未开始message event/actor/subject 上下文已预填充) |
--- ---
@@ -49,7 +49,8 @@
### 官方插件 ### 官方插件
> 插件仓库:`/home/glwuy/langbot-app/langbot-agent-runner/` (monorepo) > 外部服务插件仓库:`/home/glwuy/langbot-app/langbot-agent-runner/`
> 本地 Local Agent 插件仓库:`/home/glwuy/langbot-app/langbot-local-agent/`
| 插件 | 状态 | 备注 | | 插件 | 状态 | 备注 |
|------|------|------| |------|------|------|
@@ -69,11 +70,11 @@
### 高优先级 ### 高优先级
- [ ] 工具详情 API — 需要在 SDK 添加 GET_TOOL_DETAIL action 并在 AgentRunAPIProxy 中暴露 - [x] 工具详情 API — SDK `GET_TOOL_DETAIL` action`AgentRunAPIProxy.get_tool_detail()` 与 Host 侧授权校验已接通
### 低优先级 / 未来 ### 低优先级 / 未来
- [ ] EBA 完整集成 — event context 未在 context builder 中填充 - [ ] EBA 完整集成 — message event/actor/subject 上下文已填充,完整事件路由与非消息事件仍待实现
- [ ] 平台 API 动作执行 — `action.requested` 结果类型存在但未执行 - [ ] 平台 API 动作执行 — `action.requested` 结果类型存在但未执行
--- ---
+201 -9
View File
@@ -6,6 +6,7 @@ import time
import typing import typing
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from ...core import app from ...core import app
from .descriptor import AgentRunnerDescriptor from .descriptor import AgentRunnerDescriptor
@@ -117,9 +118,9 @@ class AgentRunContextV1(typing.TypedDict):
run_id: str run_id: str
trigger: AgentTrigger trigger: AgentTrigger
conversation: ConversationContext | None conversation: ConversationContext | None
event: dict[str, typing.Any] | None # Reserved for EBA event: dict[str, typing.Any] | None
actor: dict[str, typing.Any] | None # Reserved for EBA actor: dict[str, typing.Any] | None
subject: dict[str, typing.Any] | None # Reserved for EBA subject: dict[str, typing.Any] | None
messages: list[dict[str, typing.Any]] messages: list[dict[str, typing.Any]]
input: AgentInput input: AgentInput
params: dict[str, typing.Any] params: dict[str, typing.Any]
@@ -226,7 +227,7 @@ class AgentRunContextBuilder:
'sdk_protocol_version': descriptor.protocol_version, 'sdk_protocol_version': descriptor.protocol_version,
'query_id': query.query_id, 'query_id': query.query_id,
'trace_id': run_id, # Use run_id as trace_id for now 'trace_id': run_id, # Use run_id as trace_id for now
'deadline_at': None, # TODO: set from runner config timeout 'deadline_at': self._build_deadline(runner_config),
'metadata': { 'metadata': {
'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'), 'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'),
'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'), 'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'),
@@ -238,9 +239,9 @@ class AgentRunContextBuilder:
'run_id': run_id, 'run_id': run_id,
'trigger': trigger, 'trigger': trigger,
'conversation': conversation, 'conversation': conversation,
'event': None, # Reserved for EBA 'event': self._build_event(query),
'actor': None, # Reserved for EBA 'actor': self._build_actor(query),
'subject': None, # Reserved for EBA 'subject': self._build_subject(query),
'messages': messages, 'messages': messages,
'input': input, 'input': input,
'params': params, 'params': params,
@@ -278,9 +279,200 @@ class AgentRunContextBuilder:
'text': text, 'text': text,
'contents': contents, 'contents': contents,
'message_chain': message_chain_dict, 'message_chain': message_chain_dict,
'attachments': [], # TODO: extract attachments from message_chain 'attachments': self._build_attachments(query, contents),
} }
def _build_attachments(
self,
query: pipeline_query.Query,
contents: list[dict[str, typing.Any]],
) -> list[dict[str, typing.Any]]:
"""Extract runner-consumable attachment data from input contents."""
attachments: list[dict[str, typing.Any]] = []
for elem in contents:
elem_type = elem.get('type')
if elem_type == 'image_url':
image_url = elem.get('image_url') or {}
attachments.append(
{
'type': 'image',
'source': 'url',
'url': image_url.get('url') if isinstance(image_url, dict) else str(image_url),
}
)
elif elem_type == 'image_base64':
image_base64 = elem.get('image_base64')
attachments.append(
{
'type': 'image',
'source': 'base64',
'content': image_base64,
'content_type': self._infer_base64_content_type(image_base64, 'image/jpeg'),
'name': 'image',
'has_content': bool(image_base64),
}
)
elif elem_type == 'file_url':
attachments.append(
{
'type': 'file',
'source': 'url',
'url': elem.get('file_url'),
'name': elem.get('file_name'),
}
)
elif elem_type == 'file_base64':
file_base64 = elem.get('file_base64')
attachments.append(
{
'type': 'file',
'source': 'base64',
'name': elem.get('file_name'),
'content': file_base64,
'content_type': self._infer_base64_content_type(file_base64, 'application/octet-stream'),
'has_content': bool(file_base64),
}
)
message_chain = getattr(query, 'message_chain', None)
if message_chain:
for component in message_chain:
if isinstance(component, platform_message.Image):
attachments.append(
{
'type': 'image',
'source': 'message_chain',
'id': component.image_id or None,
'url': component.url or None,
'path': str(component.path) if component.path else None,
'content': component.base64 or None,
'content_type': self._infer_base64_content_type(component.base64, 'image/jpeg'),
'name': 'image',
'has_content': bool(component.base64),
}
)
elif isinstance(component, platform_message.File):
attachments.append(
{
'type': 'file',
'source': 'message_chain',
'id': component.id or None,
'name': component.name or None,
'size': component.size or 0,
'url': component.url or None,
'path': component.path or None,
'content': component.base64 or None,
'content_type': self._infer_base64_content_type(component.base64, 'application/octet-stream'),
'has_content': bool(component.base64),
}
)
elif isinstance(component, platform_message.Voice):
attachments.append(
{
'type': 'voice',
'source': 'message_chain',
'id': component.voice_id or None,
'url': component.url or None,
'path': component.path or None,
'duration': component.length or 0,
'content': component.base64 or None,
'content_type': self._infer_base64_content_type(component.base64, 'audio/mpeg'),
'name': 'voice',
'has_content': bool(component.base64),
}
)
return attachments
def _infer_base64_content_type(self, value: typing.Any, default: str) -> str:
"""Infer MIME type from a data URL base64 value."""
if not isinstance(value, str):
return default
if value.startswith('data:') and ';base64,' in value:
return value[5:value.find(';base64,')] or default
return default
def _build_event(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build a minimal event envelope from the platform message event."""
message_event = getattr(query, 'message_event', None)
event_data: dict[str, typing.Any] = {}
if message_event and hasattr(message_event, 'model_dump'):
try:
event_data = message_event.model_dump(mode='json')
except TypeError:
event_data = message_event.model_dump()
except Exception:
event_data = {}
event_data.pop('source_platform_object', None)
message_chain = getattr(query, 'message_chain', None)
message_id = getattr(message_chain, 'message_id', None)
if message_id == -1:
message_id = None
event_time = getattr(message_event, 'time', None) if message_event else None
event_timestamp = int(event_time) if isinstance(event_time, (int, float)) else None
return {
'event_type': getattr(message_event, 'type', None) or 'message.received',
'event_id': str(message_id or getattr(query, 'query_id', '')),
'event_timestamp': event_timestamp,
'event_data': event_data,
}
def _build_actor(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build actor context for the sender that triggered the run."""
message_event = getattr(query, 'message_event', None)
sender = getattr(message_event, 'sender', None) if message_event else None
actor_id = getattr(sender, 'id', None) or getattr(query, 'sender_id', None)
actor_name = sender.get_name() if sender and hasattr(sender, 'get_name') else None
return {
'actor_type': 'user',
'actor_id': str(actor_id) if actor_id is not None else None,
'actor_name': actor_name,
}
def _build_subject(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build subject context for the current message."""
message_chain = getattr(query, 'message_chain', None)
message_id = getattr(message_chain, 'message_id', None)
if message_id == -1:
message_id = None
launcher_type = getattr(query, 'launcher_type', None)
launcher_type_value = getattr(launcher_type, 'value', launcher_type)
return {
'subject_type': 'message',
'subject_id': str(message_id or getattr(query, 'query_id', '')),
'subject_data': {
'launcher_type': launcher_type_value,
'launcher_id': getattr(query, 'launcher_id', None),
'sender_id': str(getattr(query, 'sender_id', '')),
'bot_uuid': getattr(query, 'bot_uuid', None),
'pipeline_uuid': getattr(query, 'pipeline_uuid', None),
},
}
def _build_deadline(self, runner_config: dict[str, typing.Any]) -> int | None:
"""Build deadline timestamp from runner timeout config if present."""
timeout = runner_config.get('timeout')
if timeout is None:
return None
try:
timeout_seconds = float(timeout)
except (TypeError, ValueError):
return None
if timeout_seconds <= 0:
return None
return int(time.time() + timeout_seconds)
def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]: def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
"""Build messages list from query.""" """Build messages list from query."""
messages: list[dict[str, typing.Any]] = [] messages: list[dict[str, typing.Any]] = []
@@ -357,4 +549,4 @@ class AgentRunContextBuilder:
) )
# Pydantic models and other complex types are not directly serializable # Pydantic models and other complex types are not directly serializable
# as params (they may have internal structure not meant for runners) # as params (they may have internal structure not meant for runners)
return False return False
+25 -17
View File
@@ -1,4 +1,5 @@
"""Agent runner registry for discovering and caching runner descriptors.""" """Agent runner registry for discovering and caching runner descriptors."""
from __future__ import annotations from __future__ import annotations
import typing import typing
@@ -109,11 +110,14 @@ class AgentRunnerRegistry:
if not label: if not label:
label = {name: name} # fallback label = {name: name} # fallback
# SDK now provides these directly extracted from spec spec = manifest.get('spec', {})
protocol_version = runner_data.get('protocol_version', '1')
config_schema = runner_data.get('config', []) # SDK now provides these directly extracted from spec. Fall back to
capabilities = runner_data.get('capabilities', {}) # manifest.spec for older runtimes/tests that return the raw manifest.
permissions = runner_data.get('permissions', {}) protocol_version = runner_data.get('protocol_version') or spec.get('protocol_version', '1')
config_schema = runner_data.get('config') or spec.get('config', [])
capabilities = runner_data.get('capabilities') or spec.get('capabilities', {})
permissions = runner_data.get('permissions') or spec.get('permissions', {})
# Build descriptor # Build descriptor
runner_id = format_runner_id( runner_id = format_runner_id(
@@ -259,19 +263,23 @@ class AgentRunnerRegistry:
for descriptor in runners: for descriptor in runners:
# Add runner option # Add runner option
options.append({ options.append(
'name': descriptor.id, {
'label': descriptor.label,
'description': descriptor.description,
})
# Add config schema as stage if not empty
if descriptor.config_schema:
stages.append({
'name': descriptor.id, 'name': descriptor.id,
'label': descriptor.label, 'label': descriptor.label,
'description': descriptor.description, 'description': descriptor.description,
'config': descriptor.config_schema, }
}) )
return options, stages # Add config schema as stage if not empty
if descriptor.config_schema:
stages.append(
{
'name': descriptor.id,
'label': descriptor.label,
'description': descriptor.description,
'config': descriptor.config_schema,
}
)
return options, stages
+54 -8
View File
@@ -3,10 +3,13 @@ from __future__ import annotations
import uuid import uuid
import json import json
import sqlalchemy import sqlalchemy
import typing
from ....core import app from ....core import app
from ....entity.persistence import pipeline as persistence_pipeline from ....entity.persistence import pipeline as persistence_pipeline
DEFAULT_RUNNER_ID = 'plugin:langbot/local-agent/default'
default_stage_order = [ default_stage_order = [
'GroupRespondRuleCheckStage', # 群响应规则检查 'GroupRespondRuleCheckStage', # 群响应规则检查
@@ -30,6 +33,46 @@ class PipelineService:
def __init__(self, ap: app.Application) -> None: def __init__(self, ap: app.Application) -> None:
self.ap = ap self.ap = ap
def _get_default_values_from_schema(self, config_schema: list[dict[str, typing.Any]]) -> dict[str, typing.Any]:
"""Build runner config defaults from a DynamicForm schema."""
defaults: dict[str, typing.Any] = {}
for item in config_schema:
name = item.get('name')
if not name:
continue
if 'default' in item:
defaults[name] = item['default']
return defaults
async def get_default_pipeline_config(self) -> dict[str, typing.Any]:
"""Get the default pipeline config, rendering runner defaults from installed plugins."""
from ....utils import paths as path_utils
template_path = path_utils.get_resource_path('templates/default-pipeline-config.json')
with open(template_path, 'r', encoding='utf-8') as f:
config = json.load(f)
try:
runners = await self.ap.agent_runner_registry.list_runners(bound_plugins=None)
except Exception as e:
self.ap.logger.warning(f'Failed to load plugin agent runners for default pipeline config: {e}')
return config
if not runners:
return config
selected_runner = next((runner for runner in runners if runner.id == DEFAULT_RUNNER_ID), runners[0])
ai_config = config.setdefault('ai', {})
runner_config = ai_config.setdefault('runner', {})
runner_config['id'] = selected_runner.id
runner_config.setdefault('expire-time', 0)
ai_config['runner_config'] = {
selected_runner.id: self._get_default_values_from_schema(selected_runner.config_schema),
}
return config
async def get_pipeline_metadata(self) -> list[dict]: async def get_pipeline_metadata(self) -> list[dict]:
"""Get pipeline metadata with dynamically loaded plugin runners from registry""" """Get pipeline metadata with dynamically loaded plugin runners from registry"""
import copy import copy
@@ -50,15 +93,22 @@ class PipelineService:
if config_item.get('name') == 'id': if config_item.get('name') == 'id':
# Get plugin agent runners from registry # Get plugin agent runners from registry
try: try:
runner_options, runner_stages = await self.ap.agent_runner_registry.get_runner_metadata_for_pipeline() (
runner_options,
runner_stages,
) = await self.ap.agent_runner_registry.get_runner_metadata_for_pipeline()
# Replace options entirely with registry options # Replace options entirely with registry options
# Only installed/available runners should be shown # Only installed/available runners should be shown
config_item['options'] = runner_options config_item['options'] = runner_options
# Set default to first available runner if not specified # Set default to the official local-agent when installed, otherwise first available runner.
if runner_options and 'default' not in config_item: if runner_options and 'default' not in config_item:
config_item['default'] = runner_options[0]['name'] default_option = next(
(option for option in runner_options if option['name'] == DEFAULT_RUNNER_ID),
runner_options[0],
)
config_item['default'] = default_option['name']
# Add corresponding stage configuration for each runner # Add corresponding stage configuration for each runner
for stage_config in runner_stages: for stage_config in runner_stages:
@@ -113,8 +163,6 @@ class PipelineService:
return self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline) return self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline)
async def create_pipeline(self, pipeline_data: dict, default: bool = False) -> str: async def create_pipeline(self, pipeline_data: dict, default: bool = False) -> str:
from ....utils import paths as path_utils
# Check limitation # Check limitation
limitation = self.ap.instance_config.data.get('system', {}).get('limitation', {}) limitation = self.ap.instance_config.data.get('system', {}).get('limitation', {})
max_pipelines = limitation.get('max_pipelines', -1) max_pipelines = limitation.get('max_pipelines', -1)
@@ -128,9 +176,7 @@ class PipelineService:
pipeline_data['stages'] = default_stage_order.copy() pipeline_data['stages'] = default_stage_order.copy()
pipeline_data['is_default'] = default pipeline_data['is_default'] = default
template_path = path_utils.get_resource_path('templates/default-pipeline-config.json') pipeline_data['config'] = await self.get_default_pipeline_config()
with open(template_path, 'r', encoding='utf-8') as f:
pipeline_data['config'] = json.load(f)
# Ensure extensions_preferences is set with enable_all_plugins and enable_all_mcp_servers=True by default # Ensure extensions_preferences is set with enable_all_plugins and enable_all_mcp_servers=True by default
if 'extensions_preferences' not in pipeline_data: if 'extensions_preferences' not in pipeline_data:
+14
View File
@@ -187,6 +187,15 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
async def initialize_plugins(self): async def initialize_plugins(self):
pass pass
async def _refresh_agent_runner_registry(self) -> None:
registry = getattr(self.ap, 'agent_runner_registry', None)
if registry is None:
return
try:
await registry.refresh()
except Exception as e:
self.ap.logger.warning(f'Failed to refresh agent runner registry: {e}')
async def ping_plugin_runtime(self): async def ping_plugin_runtime(self):
if not hasattr(self, 'handler'): if not hasattr(self, 'handler'):
raise PluginRuntimeNotConnectedError('Plugin runtime is not connected') raise PluginRuntimeNotConnectedError('Plugin runtime is not connected')
@@ -550,6 +559,7 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
task_context.metadata.update(metadata) task_context.metadata.update(metadata)
await self._wait_for_installed_plugin_ready(plugin_author, plugin_name, task_context) await self._wait_for_installed_plugin_ready(plugin_author, plugin_name, task_context)
await self._refresh_agent_runner_registry()
async def upgrade_plugin( async def upgrade_plugin(
self, self,
@@ -568,6 +578,8 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
if task_context is not None: if task_context is not None:
task_context.trace(trace) task_context.trace(trace)
await self._refresh_agent_runner_registry()
async def delete_plugin( async def delete_plugin(
self, self,
plugin_author: str, plugin_author: str,
@@ -592,6 +604,8 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
task_context.trace('Cleaning up plugin configuration and storage...') task_context.trace('Cleaning up plugin configuration and storage...')
await self.handler.cleanup_plugin_data(plugin_author, plugin_name) await self.handler.cleanup_plugin_data(plugin_author, plugin_name)
await self._refresh_agent_runner_registry()
async def list_plugins(self, component_kinds: list[str] | None = None) -> list[dict[str, Any]]: async def list_plugins(self, component_kinds: list[str] | None = None) -> list[dict[str, Any]]:
"""List plugins, optionally filtered by component kinds. """List plugins, optionally filtered by component kinds.
+69 -10
View File
@@ -41,6 +41,63 @@ def _make_rag_error_response(error: Exception, error_type: str, **extra_context)
return handler.ActionResponse.error(message=message) return handler.ActionResponse.error(message=message)
def _i18n_to_dict(value: Any) -> dict[str, Any]:
"""Convert SDK i18n values to plain dictionaries."""
if value is None:
return {}
if isinstance(value, dict):
return value
if hasattr(value, 'to_dict'):
return value.to_dict()
if hasattr(value, 'model_dump'):
return value.model_dump()
return {'en_US': str(value)}
def _i18n_to_text(value: Any) -> str:
"""Return a stable human-readable text from SDK i18n values."""
data = _i18n_to_dict(value)
for key in ('en_US', 'zh_Hans', 'zh_Hant'):
text = data.get(key)
if text:
return str(text)
for text in data.values():
if text:
return str(text)
return ''
def _build_tool_detail(tool: Any, requested_tool_name: str | None = None) -> dict[str, Any]:
"""Normalize LLMTool and plugin ComponentManifest objects for tool detail APIs."""
if hasattr(tool, 'metadata') and hasattr(tool, 'spec'):
metadata = tool.metadata
spec = tool.spec or {}
description = spec.get('llm_prompt') or _i18n_to_text(getattr(metadata, 'description', None))
parameters = spec.get('parameters') or {}
return {
'name': requested_tool_name or getattr(metadata, 'name', ''),
'label': _i18n_to_dict(getattr(metadata, 'label', None)),
'description': description,
'human_desc': description,
'parameters': parameters,
'spec': spec,
}
name = getattr(tool, 'name', requested_tool_name or '')
description = getattr(tool, 'description', None) or getattr(tool, 'human_desc', '') or ''
parameters = getattr(tool, 'parameters', None) or {}
return {
'name': name,
'label': {},
'description': description,
'human_desc': getattr(tool, 'human_desc', description) or description,
'parameters': parameters,
'spec': {'parameters': parameters},
}
async def _validate_run_authorization( async def _validate_run_authorization(
run_id: str, run_id: str,
resource_type: str, resource_type: str,
@@ -462,7 +519,13 @@ class RuntimeConnectionHandler(handler.Handler):
return return
messages_obj = [provider_message.Message.model_validate(message) for message in messages] messages_obj = [provider_message.Message.model_validate(message) for message in messages]
funcs_obj = [resource_tool.LLMTool.model_validate(func) for func in funcs]
# The func field is excluded during model_dump() in plugin side
# but required by LLMTool validation on Host.
async def _placeholder_func(**kwargs):
pass
funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs]
async for chunk in llm_model.provider.invoke_llm_stream( async for chunk in llm_model.provider.invoke_llm_stream(
query=None, query=None,
@@ -538,30 +601,26 @@ class RuntimeConnectionHandler(handler.Handler):
""" """
tool_name = data['tool_name'] tool_name = data['tool_name']
run_id = data.get('run_id') # Optional: present for AgentRunner calls run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls # Permission validation for AgentRunner calls
if run_id: if run_id:
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
run_id, 'tool', tool_name, self.ap run_id, 'tool', tool_name, self.ap, caller_plugin_identity
) )
if error: if error:
return error return error
try: try:
tool = self.ap.tool_mgr.get_tool_by_name(tool_name) tool = await self.ap.tool_mgr.get_tool_by_name(tool_name)
if tool is None: if tool is None:
return handler.ActionResponse.error( return handler.ActionResponse.error(
message=f'Tool {tool_name} not found', message=f'Tool {tool_name} not found',
) )
# Build tool detail for LLM function calling tool_detail = _build_tool_detail(tool, requested_tool_name=tool_name)
tool_detail = {
'name': tool.name,
'description': tool.description or '',
'parameters': tool.parameters or {},
}
return handler.ActionResponse.success(data=tool_detail) return handler.ActionResponse.success(data={'tool': tool_detail})
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return handler.ActionResponse.error( return handler.ActionResponse.error(
+27 -1
View File
@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import sqlalchemy import sqlalchemy
import traceback import traceback
@@ -85,7 +86,21 @@ class ModelManager:
return return
try: try:
await self.sync_new_models_from_space() sync_timeout = float(space_config.get('models_sync_timeout', 10))
except (TypeError, ValueError):
sync_timeout = 10
try:
self.ap.logger.info('Syncing new models from LangBot Space...')
if sync_timeout > 0:
await asyncio.wait_for(self.sync_new_models_from_space(), timeout=sync_timeout)
else:
await self.sync_new_models_from_space()
self.ap.logger.info('LangBot Space model sync completed.')
except asyncio.TimeoutError:
self.ap.logger.warning(
f'LangBot Space model sync timed out after {sync_timeout:g}s, skipping startup sync.'
)
except Exception as e: except Exception as e:
self.ap.logger.warning('Failed to sync new models from LangBot Space, model list may not be updated.') self.ap.logger.warning('Failed to sync new models from LangBot Space, model list may not be updated.')
self.ap.logger.warning(f' - Error: {e}') self.ap.logger.warning(f' - Error: {e}')
@@ -103,6 +118,9 @@ class ModelManager:
) )
for provider in providers_result.all(): for provider in providers_result.all():
try: try:
self.ap.logger.info(
f'Loading model provider {provider.uuid} ({provider.name}, requester={provider.requester})...'
)
runtime_provider = await self.load_provider(provider) runtime_provider = await self.load_provider(provider)
self.provider_dict[provider.uuid] = runtime_provider self.provider_dict[provider.uuid] = runtime_provider
except provider_errors.RequesterNotFoundError as e: except provider_errors.RequesterNotFoundError as e:
@@ -157,6 +175,14 @@ class ModelManager:
except Exception as e: except Exception as e:
self.ap.logger.error(f'Failed to load model {rerank_model.uuid}: {e}\n{traceback.format_exc()}') self.ap.logger.error(f'Failed to load model {rerank_model.uuid}: {e}\n{traceback.format_exc()}')
self.ap.logger.info(
'Loaded models from db: '
f'{len(self.provider_dict)} providers, '
f'{len(self.llm_models)} llm models, '
f'{len(self.embedding_models)} embedding models, '
f'{len(self.rerank_models)} rerank models.'
)
async def sync_new_models_from_space(self): async def sync_new_models_from_space(self):
"""Sync models from Space""" """Sync models from Space"""
space_model_provider = await self.ap.persistence_mgr.execute_async( space_model_provider = await self.ap.persistence_mgr.execute_async(
+2
View File
@@ -150,4 +150,6 @@ space:
# OAuth authorization page URL (user will be redirected here) # OAuth authorization page URL (user will be redirected here)
oauth_authorize_url: 'https://space.langbot.app/auth/authorize' oauth_authorize_url: 'https://space.langbot.app/auth/authorize'
disable_models_service: false disable_models_service: false
# Max seconds to wait for startup model-list sync. Set to 0 to disable the timeout.
models_sync_timeout: 10
disable_telemetry: false disable_telemetry: false
@@ -38,12 +38,10 @@
}, },
"ai": { "ai": {
"runner": { "runner": {
"id": "plugin:langbot/local-agent/default", "id": "",
"expire-time": 0 "expire-time": 0
}, },
"runner_config": { "runner_config": {}
"plugin:langbot/local-agent/default": {}
}
}, },
"output": { "output": {
"long-text-processing": { "long-text-processing": {
@@ -64,4 +62,4 @@
"remove-think": false "remove-think": false
} }
} }
} }
@@ -1,4 +1,5 @@
"""Tests for pipeline config migration to new runner format.""" """Tests for pipeline config migration to new runner format."""
from __future__ import annotations from __future__ import annotations
import json import json
@@ -149,7 +150,7 @@ class TestDefaultPipelineConfig:
"""Tests for default-pipeline-config.json format.""" """Tests for default-pipeline-config.json format."""
def test_default_config_is_new_format(self): def test_default_config_is_new_format(self):
"""Default pipeline config should use new format.""" """Default pipeline template should use the new runner config shape."""
from langbot.pkg.utils import paths as path_utils from langbot.pkg.utils import paths as path_utils
template_path = path_utils.get_resource_path('templates/default-pipeline-config.json') template_path = path_utils.get_resource_path('templates/default-pipeline-config.json')
@@ -160,27 +161,25 @@ class TestDefaultPipelineConfig:
assert 'ai' in config assert 'ai' in config
assert 'runner' in config['ai'] assert 'runner' in config['ai']
assert 'id' in config['ai']['runner'] assert 'id' in config['ai']['runner']
assert config['ai']['runner']['id'] == 'plugin:langbot/local-agent/default' assert config['ai']['runner']['id'] == ''
# Should have runner_config with local-agent default # Plugin runner selection and config defaults are rendered at creation
# time from installed AgentRunner metadata.
assert 'runner_config' in config['ai'] assert 'runner_config' in config['ai']
assert 'plugin:langbot/local-agent/default' in config['ai']['runner_config'] assert config['ai']['runner_config'] == {}
# Should NOT have old local-agent key # Should NOT have old local-agent key
assert 'local-agent' not in config['ai'] assert 'local-agent' not in config['ai']
def test_default_config_has_model_config(self): def test_default_config_does_not_hardcode_plugin_schema(self):
"""Default config should have model config in runner_config.""" """Default template should not duplicate plugin-provided config schema."""
from langbot.pkg.utils import paths as path_utils from langbot.pkg.utils import paths as path_utils
template_path = path_utils.get_resource_path('templates/default-pipeline-config.json') template_path = path_utils.get_resource_path('templates/default-pipeline-config.json')
with open(template_path, 'r', encoding='utf-8') as f: with open(template_path, 'r', encoding='utf-8') as f:
config = json.load(f) config = json.load(f)
runner_config = config['ai']['runner_config']['plugin:langbot/local-agent/default'] assert config['ai']['runner_config'] == {}
assert 'model' in runner_config
assert 'max-round' in runner_config
assert 'prompt' in runner_config
class TestResolveRunnerIdBackwardCompat: class TestResolveRunnerIdBackwardCompat:
@@ -242,9 +241,7 @@ class TestResolveRunnerConfigBackwardCompat:
}, },
}, },
} }
runner_config = ConfigMigration.resolve_runner_config( runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default')
config, 'plugin:langbot/local-agent/default'
)
assert runner_config['max-round'] == 20 assert runner_config['max-round'] == 20
def test_resolve_old_format_config(self): def test_resolve_old_format_config(self):
@@ -254,9 +251,7 @@ class TestResolveRunnerConfigBackwardCompat:
'local-agent': {'max-round': 15}, 'local-agent': {'max-round': 15},
}, },
} }
runner_config = ConfigMigration.resolve_runner_config( runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default')
config, 'plugin:langbot/local-agent/default'
)
assert runner_config['max-round'] == 15 assert runner_config['max-round'] == 15
def test_resolve_new_format_priority(self): def test_resolve_new_format_priority(self):
@@ -269,7 +264,5 @@ class TestResolveRunnerConfigBackwardCompat:
'local-agent': {'max-round': 10}, # Old, should be ignored 'local-agent': {'max-round': 10}, # Old, should be ignored
}, },
} }
runner_config = ConfigMigration.resolve_runner_config( runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default')
config, 'plugin:langbot/local-agent/default' assert runner_config['max-round'] == 25
)
assert runner_config['max-round'] == 25
+33 -10
View File
@@ -13,9 +13,11 @@ Authorization paths:
from __future__ import annotations from __future__ import annotations
import pytest import pytest
from unittest.mock import AsyncMock, MagicMock, patch import types
from unittest.mock import AsyncMock, MagicMock
from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry
from langbot.pkg.plugin.handler import _build_tool_detail
# Import shared test fixtures from conftest.py # Import shared test fixtures from conftest.py
from .conftest import make_resources from .conftest import make_resources
@@ -114,10 +116,6 @@ class MockDisconnectCallback:
return True return True
# Import ActionResponse for checking responses
from langbot_plugin.runtime.io import handler
class TestInvokeLLMAuthorization: class TestInvokeLLMAuthorization:
"""Tests for INVOKE_LLM authorization.""" """Tests for INVOKE_LLM authorization."""
@@ -238,6 +236,33 @@ class TestInvokeLLMStreamAuthorization:
assert run_id is None assert run_id is None
def test_build_tool_detail_normalizes_plugin_component_manifest():
"""GET_TOOL_DETAIL returns a uniform schema for ordinary plugin Tool manifests."""
manifest_tool = types.SimpleNamespace(
metadata=types.SimpleNamespace(
name='search',
label={'en_US': 'Search'},
description={'en_US': 'Search public data'},
),
spec={
'llm_prompt': 'Search test data',
'parameters': {
'type': 'object',
'properties': {'q': {'type': 'string'}},
},
},
)
detail = _build_tool_detail(manifest_tool, requested_tool_name='author/plugin/search')
assert detail['name'] == 'author/plugin/search'
assert detail['description'] == 'Search test data'
assert detail['human_desc'] == 'Search test data'
assert detail['parameters']['properties']['q']['type'] == 'string'
assert detail['label'] == {'en_US': 'Search'}
assert detail['spec'] == manifest_tool.spec
class TestCallToolAuthorization: class TestCallToolAuthorization:
"""Tests for CALL_TOOL authorization.""" """Tests for CALL_TOOL authorization."""
@@ -559,8 +584,6 @@ class TestHandlerActionAuthorization:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_invoke_llm_handler_authorized_path(self): async def test_invoke_llm_handler_authorized_path(self):
"""INVOKE_LLM handler: authorized when model in resources.""" """INVOKE_LLM handler: authorized when model in resources."""
from langbot_plugin.runtime.io import handler as io_handler
registry = AgentRunSessionRegistry() registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}]) resources = make_resources(models=[{'model_id': 'model_001'}])
@@ -822,8 +845,6 @@ class TestSDKAgentRunAPIProxyFieldConsistency:
"""RETRIEVE_KNOWLEDGE_BASE: SDK fields match Host handler.""" """RETRIEVE_KNOWLEDGE_BASE: SDK fields match Host handler."""
# SDK agent_run_api.py lines 178-183 # SDK agent_run_api.py lines 178-183
sdk_fields = ['run_id', 'kb_id', 'query_text', 'top_k', 'filters'] sdk_fields = ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']
# Host handler.py lines 863-867
host_fields = ['query_id', 'kb_id', 'query_text', 'top_k', 'filters', 'run_id']
# Note: query_id is from query context, not SDK proxy # Note: query_id is from query context, not SDK proxy
for field in ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']: for field in ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']:
@@ -934,6 +955,7 @@ class TestSessionExpiryAndCleanup:
# Check session status # Check session status
started_at = session['status']['started_at'] started_at = session['status']['started_at']
last_activity = session['status']['last_activity_at'] last_activity = session['status']['last_activity_at']
assert last_activity >= started_at
# Session should be valid initially # Session should be valid initially
current_time = int(time.time()) current_time = int(time.time())
@@ -964,6 +986,7 @@ class TestSessionExpiryAndCleanup:
# Note: This won't actually cleanup because session is just created # Note: This won't actually cleanup because session is just created
# We need to manually test cleanup logic # We need to manually test cleanup logic
cleaned = await registry.cleanup_stale_sessions(max_age_seconds=0) cleaned = await registry.cleanup_stale_sessions(max_age_seconds=0)
assert isinstance(cleaned, int)
# Session should still exist (it was just created) # Session should still exist (it was just created)
# With max_age=0, sessions with last_activity > 0 seconds ago would be cleaned # With max_age=0, sessions with last_activity > 0 seconds ago would be cleaned
@@ -1974,4 +1997,4 @@ class TestBackwardCompatStorageNoRunId:
raise AssertionError('Should not execute validation') raise AssertionError('Should not execute validation')
# File access unrestricted for regular plugins # File access unrestricted for regular plugins
assert run_id is None assert run_id is None
@@ -0,0 +1,368 @@
"""Integration-style tests for AgentRunOrchestrator with a fake plugin runner."""
from __future__ import annotations
import datetime
import types
from unittest.mock import AsyncMock
import pytest
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.agent.runner.errors import RunnerExecutionError
from langbot.pkg.agent.runner.context_builder import AgentRunContextBuilder
from langbot.pkg.agent.runner.orchestrator import AgentRunOrchestrator
from langbot.pkg.agent.runner.session_registry import get_session_registry
from langbot.pkg.agent.runner.state_store import get_state_store, reset_state_store
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from langbot_plugin.api.entities.builtin.provider import session as provider_session
from langbot_plugin.api.entities.builtin.resource import tool as resource_tool
RUNNER_ID = "plugin:langbot/local-agent/default"
class FakeLogger:
def debug(self, msg):
pass
def info(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
class FakeVersionManager:
def get_current_version(self):
return "test-version"
class FakeModel:
def __init__(self, model_type: str = "chat"):
self.model_entity = types.SimpleNamespace(model_type=model_type)
self.provider_entity = types.SimpleNamespace(name="fake-provider")
class FakeKnowledgeBase:
def __init__(self, kb_id: str):
self.kb_id = kb_id
self.knowledge_base_entity = types.SimpleNamespace(kb_type="fake")
def get_name(self):
return f"KB {self.kb_id}"
class FakePluginConnector:
is_enable_plugin = True
def __init__(self, results=None, error: Exception | None = None):
self.results = results or []
self.error = error
self.calls: list[dict] = []
self.contexts: list[dict] = []
self.sessions_during_run: list[dict | None] = []
async def run_agent(self, plugin_author, plugin_name, runner_name, context):
self.calls.append(
{
"plugin_author": plugin_author,
"plugin_name": plugin_name,
"runner_name": runner_name,
}
)
self.contexts.append(context)
self.sessions_during_run.append(await get_session_registry().get(context["run_id"]))
if self.error:
raise self.error
for result in self.results:
yield result
class FakeRegistry:
def __init__(self, descriptor: AgentRunnerDescriptor):
self.descriptor = descriptor
self.calls: list[dict] = []
async def get(self, runner_id, bound_plugins=None):
self.calls.append({"runner_id": runner_id, "bound_plugins": bound_plugins})
assert runner_id == self.descriptor.id
return self.descriptor
class FakeApplication:
def __init__(self, plugin_connector: FakePluginConnector):
self.logger = FakeLogger()
self.ver_mgr = FakeVersionManager()
self.plugin_connector = plugin_connector
self.model_mgr = types.SimpleNamespace(
get_model_by_uuid=AsyncMock(return_value=FakeModel())
)
self.rag_mgr = types.SimpleNamespace(
get_knowledge_base_by_uuid=AsyncMock(return_value=FakeKnowledgeBase("kb_001"))
)
class FakeConversation:
uuid = "conv_existing"
create_time = datetime.datetime(2026, 5, 15, 12, 0, 0)
def make_descriptor() -> AgentRunnerDescriptor:
return AgentRunnerDescriptor(
id=RUNNER_ID,
source="plugin",
label={"en_US": "Local Agent"},
plugin_author="langbot",
plugin_name="local-agent",
runner_name="default",
protocol_version="1",
capabilities={"streaming": True, "tool_calling": True},
permissions={
"models": ["invoke", "stream"],
"tools": ["list", "detail", "call"],
"knowledge_bases": ["list", "retrieve"],
"storage": ["plugin"],
"files": [],
},
)
def make_query():
async def fake_func(**kwargs):
return kwargs
message_chain = platform_message.MessageChain(
[
platform_message.Source(
id="msg_001",
time=datetime.datetime(2026, 5, 15, 12, 0, 0),
),
platform_message.Plain(text="hello"),
platform_message.File(name="spec.txt", url="https://example.com/spec.txt"),
]
)
sender = platform_entities.Friend(id="user_001", nickname="Alice", remark=None)
message_event = platform_events.FriendMessage(sender=sender, message_chain=message_chain, time=1_784_098_800.0)
session = types.SimpleNamespace(
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id="user_001",
sender_id="user_001",
using_conversation=FakeConversation(),
)
return types.SimpleNamespace(
query_id=1001,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id="user_001",
sender_id="user_001",
message_event=message_event,
message_chain=message_chain,
bot_uuid="bot_001",
pipeline_uuid="pipeline_001",
pipeline_config={
"ai": {
"runner": {"id": RUNNER_ID},
"runner_config": {
RUNNER_ID: {
"model": {"primary": "model_primary", "fallbacks": ["model_fallback"]},
"knowledge-bases": ["kb_001"],
"timeout": 30,
},
},
},
},
session=session,
messages=[],
user_message=provider_message.Message(
role="user",
content=[
provider_message.ContentElement.from_text("hello"),
provider_message.ContentElement.from_file_url("https://example.com/spec.txt", "spec.txt"),
],
),
variables={
"_pipeline_bound_plugins": ["langbot/local-agent"],
"_fallback_model_uuids": ["model_fallback"],
"public_param": "visible",
},
use_llm_model_uuid="model_primary",
use_funcs=[
resource_tool.LLMTool(
name="langbot/test-tool/search",
human_desc="Search",
description="Search test data",
parameters={"type": "object", "properties": {"q": {"type": "string"}}},
func=fake_func,
)
],
)
def test_context_builder_includes_consumable_base64_attachments():
builder = AgentRunContextBuilder(ap=types.SimpleNamespace())
query = make_query()
query.user_message = provider_message.Message(
role="user",
content=[
provider_message.ContentElement.from_text("see attached"),
provider_message.ContentElement.from_image_base64("data:image/png;base64,aGVsbG8="),
provider_message.ContentElement.from_file_base64("data:text/plain;base64,aGVsbG8=", "hello.txt"),
],
)
query.message_chain = platform_message.MessageChain(
[platform_message.Image(base64="data:image/jpeg;base64,aGVsbG8=")]
)
input_data = builder._build_input(query)
attachments = input_data["attachments"]
image_attachment = next(item for item in attachments if item["type"] == "image" and item["source"] == "base64")
file_attachment = next(item for item in attachments if item["type"] == "file" and item["source"] == "base64")
chain_attachment = next(item for item in attachments if item["source"] == "message_chain")
assert image_attachment["content"] == "data:image/png;base64,aGVsbG8="
assert image_attachment["content_type"] == "image/png"
assert file_attachment["content"] == "data:text/plain;base64,aGVsbG8="
assert file_attachment["content_type"] == "text/plain"
assert file_attachment["name"] == "hello.txt"
assert chain_attachment["content"] == "data:image/jpeg;base64,aGVsbG8="
assert chain_attachment["content_type"] == "image/jpeg"
@pytest.fixture(autouse=True)
async def clean_agent_state():
reset_state_store()
registry = get_session_registry()
for session in await registry.list_active_runs():
await registry.unregister(session["run_id"])
yield
for session in await registry.list_active_runs():
await registry.unregister(session["run_id"])
reset_state_store()
@pytest.mark.asyncio
async def test_orchestrator_runs_fake_plugin_with_authorized_context():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "message.completed",
"data": {"message": {"role": "assistant", "content": "fake response"}},
}
]
)
ap = FakeApplication(plugin_connector)
orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor))
query = make_query()
messages = [message async for message in orchestrator.run_from_query(query)]
assert len(messages) == 1
assert messages[0].content == "fake response"
assert plugin_connector.calls == [
{
"plugin_author": "langbot",
"plugin_name": "local-agent",
"runner_name": "default",
}
]
context = plugin_connector.contexts[0]
assert context["config"]["timeout"] == 30
assert context["runtime"]["deadline_at"] is not None
assert context["params"] == {"public_param": "visible"}
assert context["event"]["event_type"] == "FriendMessage"
assert context["actor"]["actor_id"] == "user_001"
assert context["actor"]["actor_name"] == "Alice"
assert context["subject"]["subject_id"] == "msg_001"
assert context["input"]["attachments"]
resources = context["resources"]
assert {m["model_id"] for m in resources["models"]} == {"model_primary", "model_fallback"}
assert resources["tools"][0]["tool_name"] == "langbot/test-tool/search"
assert resources["knowledge_bases"][0]["kb_id"] == "kb_001"
assert resources["storage"]["plugin_storage"] is True
session_during_run = plugin_connector.sessions_during_run[0]
assert session_during_run is not None
assert session_during_run["plugin_identity"] == "langbot/local-agent"
assert session_during_run["_authorized_ids"]["tool"] == {"langbot/test-tool/search"}
assert await get_session_registry().get(context["run_id"]) is None
@pytest.mark.asyncio
async def test_orchestrator_streams_fake_plugin_deltas():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{"type": "message.delta", "data": {"chunk": {"role": "assistant", "content": "hel"}}},
{"type": "message.delta", "data": {"chunk": {"role": "assistant", "content": "hello"}}},
{"type": "run.completed", "data": {"finish_reason": "stop"}},
]
)
orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor))
chunks = [message async for message in orchestrator.run_from_query(make_query())]
assert [chunk.content for chunk in chunks] == ["hel", "hello"]
@pytest.mark.asyncio
async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "state.updated",
"data": {
"scope": "conversation",
"key": "external.conversation_id",
"value": "external_conv_123",
},
},
{
"type": "message.completed",
"data": {"message": {"role": "assistant", "content": "state saved"}},
},
]
)
orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor))
query = make_query()
messages = [message async for message in orchestrator.run_from_query(query)]
assert [message.content for message in messages] == ["state saved"]
assert query.session.using_conversation.uuid == "external_conv_123"
snapshot = get_state_store().build_snapshot(query, descriptor)
assert snapshot["conversation"]["external.conversation_id"] == "external_conv_123"
@pytest.mark.asyncio
async def test_orchestrator_unregisters_session_after_runner_failure():
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "run.failed",
"data": {"error": "boom", "code": "fake.error", "retryable": False},
}
]
)
orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector), FakeRegistry(descriptor))
with pytest.raises(RunnerExecutionError):
[message async for message in orchestrator.run_from_query(make_query())]
context = plugin_connector.contexts[0]
assert plugin_connector.sessions_during_run[0] is not None
assert await get_session_registry().get(context["run_id"]) is None
+11 -8
View File
@@ -1,4 +1,5 @@
"""Tests for agent runner registry.""" """Tests for agent runner registry."""
from __future__ import annotations from __future__ import annotations
import pytest import pytest
@@ -10,14 +11,18 @@ from langbot.pkg.agent.runner.errors import RunnerNotFoundError, RunnerNotAuthor
class FakeApplication: class FakeApplication:
"""Fake Application for testing.""" """Fake Application for testing."""
def __init__(self): def __init__(self):
class FakeLogger: class FakeLogger:
def info(self, msg): def info(self, msg):
pass pass
def debug(self, msg): def debug(self, msg):
pass pass
def warning(self, msg): def warning(self, msg):
pass pass
def error(self, msg): def error(self, msg):
pass pass
@@ -234,13 +239,11 @@ class TestRegistryMetadataForPipeline:
assert 'plugin:langbot/local-agent/default' in option_ids assert 'plugin:langbot/local-agent/default' in option_ids
assert 'plugin:alice/my-agent/custom' in option_ids assert 'plugin:alice/my-agent/custom' in option_ids
# Should have stages for runners with config # Should fall back to manifest.spec.config when runtime does not return
# Note: stages may be empty if config_schema is empty list # extracted config at top level.
# In real scenarios, runners with config_schema will generate stages assert len(stages) == 1
# Only runners with non-empty config_schema generate stages assert stages[0]['name'] == 'plugin:alice/my-agent/custom'
# mock data has config: [{'name': 'param1', 'type': 'string'}] for alice/my-agent assert stages[0]['config'] == [{'name': 'param1', 'type': 'string'}]
# but config is now taken from runner_data.get('config', [])
assert len(stages) >= 0 # Can be 0 if all runners have empty config
class TestDescriptorValidation: class TestDescriptorValidation:
@@ -275,4 +278,4 @@ class TestDescriptorValidation:
assert descriptor.supports_streaming() is True assert descriptor.supports_streaming() is True
assert descriptor.supports_tool_calling() is False assert descriptor.supports_tool_calling() is False
assert descriptor.supports_knowledge_retrieval() is False assert descriptor.supports_knowledge_retrieval() is False
@@ -0,0 +1,80 @@
"""Tests for dynamic default pipeline config rendering."""
from __future__ import annotations
from types import SimpleNamespace
import pytest
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.api.http.service.pipeline import PipelineService
class FakeLogger:
def warning(self, msg):
pass
class FakeRegistry:
def __init__(self, runners):
self.runners = runners
async def list_runners(self, bound_plugins=None):
return self.runners
def make_runner(runner_id: str, config_schema: list[dict]):
parts = runner_id.removeprefix('plugin:').split('/')
return AgentRunnerDescriptor(
id=runner_id,
source='plugin',
label={'en_US': runner_id},
plugin_author=parts[0],
plugin_name=parts[1],
runner_name=parts[2],
config_schema=config_schema,
)
@pytest.mark.asyncio
async def test_default_pipeline_config_uses_installed_local_agent_schema():
local_agent = make_runner(
'plugin:langbot/local-agent/default',
[
{'name': 'model', 'type': 'model-fallback-selector', 'default': {'primary': '', 'fallbacks': []}},
{'name': 'max-round', 'type': 'integer', 'default': 10},
{'name': 'prompt', 'type': 'prompt-editor', 'default': [{'role': 'system', 'content': 'Hello'}]},
],
)
custom_agent = make_runner(
'plugin:alice/custom-agent/default',
[{'name': 'api-key', 'type': 'string', 'default': ''}],
)
ap = SimpleNamespace(
logger=FakeLogger(),
agent_runner_registry=FakeRegistry([custom_agent, local_agent]),
)
config = await PipelineService(ap).get_default_pipeline_config()
assert config['ai']['runner']['id'] == 'plugin:langbot/local-agent/default'
assert config['ai']['runner_config'] == {
'plugin:langbot/local-agent/default': {
'model': {'primary': '', 'fallbacks': []},
'max-round': 10,
'prompt': [{'role': 'system', 'content': 'Hello'}],
},
}
@pytest.mark.asyncio
async def test_default_pipeline_config_stays_neutral_without_installed_runners():
ap = SimpleNamespace(
logger=FakeLogger(),
agent_runner_registry=FakeRegistry([]),
)
config = await PipelineService(ap).get_default_pipeline_config()
assert config['ai']['runner']['id'] == ''
assert config['ai']['runner_config'] == {}
@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio
from types import SimpleNamespace from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock from unittest.mock import AsyncMock, Mock
@@ -16,6 +17,8 @@ from langbot.pkg.entity.persistence import model as persistence_model
from langbot.pkg.pipeline.preproc.preproc import PreProcessor from langbot.pkg.pipeline.preproc.preproc import PreProcessor
from langbot.pkg.provider.modelmgr import requester from langbot.pkg.provider.modelmgr import requester
from langbot.pkg.provider.modelmgr.modelmgr import ModelManager from langbot.pkg.provider.modelmgr.modelmgr import ModelManager
from langbot.pkg.provider.modelmgr.requesters.chatcmpl import OpenAIChatCompletions
from langbot.pkg.provider.modelmgr.requesters.modelscopechatcmpl import ModelScopeChatCompletions
from langbot.pkg.provider.modelmgr.token import TokenManager from langbot.pkg.provider.modelmgr.token import TokenManager
from langbot.pkg.provider.runners.localagent import LocalAgentRunner from langbot.pkg.provider.runners.localagent import LocalAgentRunner
@@ -88,6 +91,96 @@ def test_token_manager_next_token_ignores_empty_token_list():
assert token_mgr.using_token_index == 0 assert token_mgr.using_token_index == 0
@pytest.mark.asyncio
async def test_openai_requester_initialize_uses_placeholder_api_key(monkeypatch):
captured_kwargs = {}
def fake_client(**kwargs):
captured_kwargs.update(kwargs)
return SimpleNamespace(**kwargs)
monkeypatch.setattr('langbot.pkg.provider.modelmgr.requesters.chatcmpl.openai.AsyncClient', fake_client)
monkeypatch.setattr('langbot.pkg.provider.modelmgr.requesters.chatcmpl.httpx.AsyncClient', fake_client)
requester_inst = OpenAIChatCompletions(ap=SimpleNamespace(), config={})
await requester_inst.initialize()
assert captured_kwargs['api_key'] == OpenAIChatCompletions.init_api_key
@pytest.mark.asyncio
async def test_modelscope_requester_initialize_uses_placeholder_api_key(monkeypatch):
captured_kwargs = {}
def fake_client(**kwargs):
captured_kwargs.update(kwargs)
return SimpleNamespace(**kwargs)
monkeypatch.setattr('langbot.pkg.provider.modelmgr.requesters.modelscopechatcmpl.openai.AsyncClient', fake_client)
monkeypatch.setattr('langbot.pkg.provider.modelmgr.requesters.modelscopechatcmpl.httpx.AsyncClient', fake_client)
requester_inst = ModelScopeChatCompletions(ap=SimpleNamespace(), config={})
await requester_inst.initialize()
assert captured_kwargs['api_key'] == ModelScopeChatCompletions.init_api_key
@pytest.mark.asyncio
async def test_openai_embedding_call_overrides_placeholder_api_key():
captured_request = {}
async def fake_create(**kwargs):
captured_request['api_key'] = fake_client.api_key
captured_request['kwargs'] = kwargs
return SimpleNamespace(
data=[SimpleNamespace(embedding=[0.1, 0.2])],
usage=SimpleNamespace(prompt_tokens=3, total_tokens=3),
)
fake_client = SimpleNamespace(
api_key=OpenAIChatCompletions.init_api_key,
embeddings=SimpleNamespace(create=fake_create),
)
requester_inst = OpenAIChatCompletions(ap=SimpleNamespace(), config={})
requester_inst.client = fake_client
embeddings, usage_info = await requester_inst.invoke_embedding(
model=requester.RuntimeEmbeddingModel(
model_entity=SimpleNamespace(name='text-embedding-3-small', extra_args={}),
provider=SimpleNamespace(token_mgr=TokenManager('provider-uuid', [' runtime-key ', '', 'runtime-key'])),
),
input_text=['hello'],
)
assert captured_request['api_key'] == 'runtime-key'
assert captured_request['kwargs']['model'] == 'text-embedding-3-small'
assert embeddings == [[0.1, 0.2]]
assert usage_info == {'prompt_tokens': 3, 'total_tokens': 3}
@pytest.mark.asyncio
async def test_model_manager_initialize_skips_space_sync_after_timeout():
ap = SimpleNamespace()
ap.discover = SimpleNamespace(get_components_by_kind=Mock(return_value=[]))
ap.instance_config = SimpleNamespace(data={'space': {'models_sync_timeout': 0.01}})
ap.logger = Mock()
mgr = ModelManager(ap)
mgr.load_models_from_db = AsyncMock()
async def slow_sync():
await asyncio.sleep(1)
mgr.sync_new_models_from_space = AsyncMock(side_effect=slow_sync)
await mgr.initialize()
mgr.load_models_from_db.assert_awaited_once()
mgr.sync_new_models_from_space.assert_awaited_once()
ap.logger.warning.assert_any_call('LangBot Space model sync timed out after 0.01s, skipping startup sync.')
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_updated_llm_model_is_immediately_usable_by_local_agent_pipeline(): async def test_updated_llm_model_is_immediately_usable_by_local_agent_pipeline():
from langbot.pkg.api.http.service.model import LLMModelsService from langbot.pkg.api.http.service.model import LLMModelsService
+52 -24
View File
@@ -202,7 +202,9 @@ export default function WizardPage() {
const runnerOptions = useMemo(() => { const runnerOptions = useMemo(() => {
if (!runnerStage) return []; if (!runnerStage) return [];
const runnerField = runnerStage.config.find((c) => c.name === 'runner'); const runnerField =
runnerStage.config.find((c) => c.name === 'id') ??
runnerStage.config.find((c) => c.name === 'runner');
return runnerField?.options ?? []; return runnerField?.options ?? [];
}, [runnerStage]); }, [runnerStage]);
@@ -257,9 +259,11 @@ export default function WizardPage() {
const handleSelectRunner = useCallback( const handleSelectRunner = useCallback(
(runner: string) => { (runner: string) => {
setSelectedRunner(runner); setSelectedRunner(runner);
const configStage = aiConfigTab?.stages.find((s) => s.name === runner);
setRunnerConfig(configStage ? getDefaultValues(configStage.config) : {});
saveProgress({ step: 2, selected_runner: runner }); saveProgress({ step: 2, selected_runner: runner });
}, },
[saveProgress], [aiConfigTab, saveProgress],
); );
// ---- Navigation helpers ---- // ---- Navigation helpers ----
@@ -427,14 +431,36 @@ export default function WizardPage() {
// (includes trigger, safety, ai, output sections). // (includes trigger, safety, ai, output sections).
// Then merge only the AI section with the wizard's runner config. // Then merge only the AI section with the wizard's runner config.
const createdPipeline = await httpClient.getPipeline(pipelineResp.uuid); const createdPipeline = await httpClient.getPipeline(pipelineResp.uuid);
const fullConfig = createdPipeline.pipeline.config; const fullConfig = createdPipeline.pipeline.config as unknown as Record<
string,
unknown
>;
const fullAiConfig =
fullConfig.ai && typeof fullConfig.ai === 'object'
? (fullConfig.ai as Record<string, unknown>)
: {};
const existingRunner =
fullAiConfig.runner && typeof fullAiConfig.runner === 'object'
? (fullAiConfig.runner as Record<string, unknown>)
: {};
const existingRunnerConfigs =
fullAiConfig.runner_config &&
typeof fullAiConfig.runner_config === 'object'
? (fullAiConfig.runner_config as Record<string, unknown>)
: {};
const mergedConfig = { const mergedConfig = {
...fullConfig, ...fullConfig,
ai: { ai: {
...fullConfig.ai, ...fullAiConfig,
runner: { runner: selectedRunner }, runner: {
[selectedRunner]: runnerConfig, ...existingRunner,
id: selectedRunner,
},
runner_config: {
...existingRunnerConfigs,
[selectedRunner]: runnerConfig,
},
}, },
}; };
@@ -1113,26 +1139,28 @@ function StepAIEngine({
})} })}
{/* Space promotion banner */} {/* Space promotion banner */}
{selected === 'local-agent' && isLocalAccount && ( {(selected === 'local-agent' ||
<div className="animate-in fade-in slide-in-from-left-2 duration-300"> selected === 'plugin:langbot/local-agent/default') &&
<div className="relative rounded-lg p-[2px] bg-gradient-to-r from-purple-500 via-pink-500 to-orange-500"> isLocalAccount && (
<div className="rounded-[calc(0.5rem-2px)] bg-background p-3 flex flex-col items-center gap-2 text-center"> <div className="animate-in fade-in slide-in-from-left-2 duration-300">
<Sparkles className="w-6 h-6 text-purple-500 shrink-0" /> <div className="relative rounded-lg p-[2px] bg-gradient-to-r from-purple-500 via-pink-500 to-orange-500">
<p className="text-xs font-medium"> <div className="rounded-[calc(0.5rem-2px)] bg-background p-3 flex flex-col items-center gap-2 text-center">
{t('wizard.spaceBanner.message')} <Sparkles className="w-6 h-6 text-purple-500 shrink-0" />
</p> <p className="text-xs font-medium">
<Button {t('wizard.spaceBanner.message')}
variant="outline" </p>
size="sm" <Button
onClick={onSpaceAuth} variant="outline"
className="w-full" size="sm"
> onClick={onSpaceAuth}
{t('wizard.spaceBanner.action')} className="w-full"
</Button> >
{t('wizard.spaceBanner.action')}
</Button>
</div>
</div> </div>
</div> </div>
</div> )}
)}
</div> </div>
</div> </div>