feat(agent-runner): integrate AgentRunner Protocol v1 with plugin system

Phase 0 integration complete - verified minimal loop with local-agent stub runner.

Changes:
- Add AgentRunOrchestrator for plugin-based agent execution
- Add AgentResultNormalizer for Protocol v1 result conversion
- Add AgentRunnerDescriptor for runner ID parsing (plugin:author/name/runner)
- Update chat handler to use new orchestrator instead of direct runner lookup
- Add plugin handler methods for list_agent_runners and run_agent
- Add connector methods for AgentRunner protocol forwarding
- Update pipeline API to include runner options in metadata
- Add integration docs and implementation plan

Integration verified:
- Runner: plugin:langbot/local-agent/default
- Input: "你好"
- Output: [stub] Echo: 你好
- Date: 2026-05-10 10:09

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
huanghuoguoguo
2026-05-10 10:11:54 +08:00
parent b01294b005
commit d6b8f48e73
29 changed files with 3960 additions and 289 deletions

View File

@@ -0,0 +1,37 @@
"""Agent runner subsystem for LangBot."""
from __future__ import annotations
from .runner.descriptor import AgentRunnerDescriptor
from .runner.id import parse_runner_id, format_runner_id, RunnerIdParts, is_plugin_runner_id
from .runner.errors import (
AgentRunnerError,
RunnerNotFoundError,
RunnerNotAuthorizedError,
RunnerProtocolError,
RunnerExecutionError,
)
from .runner.registry import AgentRunnerRegistry
from .runner.context_builder import AgentRunContextBuilder
from .runner.resource_builder import AgentResourceBuilder
from .runner.result_normalizer import AgentResultNormalizer
from .runner.orchestrator import AgentRunOrchestrator
from .runner.config_migration import ConfigMigration
__all__ = [
'AgentRunnerDescriptor',
'parse_runner_id',
'format_runner_id',
'is_plugin_runner_id',
'RunnerIdParts',
'AgentRunnerError',
'RunnerNotFoundError',
'RunnerNotAuthorizedError',
'RunnerProtocolError',
'RunnerExecutionError',
'AgentRunnerRegistry',
'AgentRunContextBuilder',
'AgentResourceBuilder',
'AgentResultNormalizer',
'AgentRunOrchestrator',
'ConfigMigration',
]

View File

@@ -0,0 +1,36 @@
"""Agent runner modules."""
from __future__ import annotations
from .descriptor import AgentRunnerDescriptor
from .id import parse_runner_id, format_runner_id, RunnerIdParts
from .errors import (
AgentRunnerError,
RunnerNotFoundError,
RunnerNotAuthorizedError,
RunnerProtocolError,
RunnerExecutionError,
)
from .registry import AgentRunnerRegistry
from .context_builder import AgentRunContextBuilder
from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .orchestrator import AgentRunOrchestrator
from .config_migration import ConfigMigration
__all__ = [
'AgentRunnerDescriptor',
'parse_runner_id',
'format_runner_id',
'RunnerIdParts',
'AgentRunnerError',
'RunnerNotFoundError',
'RunnerNotAuthorizedError',
'RunnerProtocolError',
'RunnerExecutionError',
'AgentRunnerRegistry',
'AgentRunContextBuilder',
'AgentResourceBuilder',
'AgentResultNormalizer',
'AgentRunOrchestrator',
'ConfigMigration',
]

View File

@@ -0,0 +1,196 @@
"""Configuration migration for agent runner IDs."""
from __future__ import annotations
import typing
from .id import is_plugin_runner_id
# Mapping from old built-in runner names to official plugin runner IDs
OLD_RUNNER_TO_PLUGIN_RUNNER_ID = {
'local-agent': 'plugin:langbot/local-agent/default',
'dify-service-api': 'plugin:langbot/dify-agent/default',
'n8n-service-api': 'plugin:langbot/n8n-agent/default',
'coze-api': 'plugin:langbot/coze-agent/default',
'dashscope-app-api': 'plugin:langbot/dashscope-agent/default',
'langflow-api': 'plugin:langbot/langflow-agent/default',
'tbox-app-api': 'plugin:langbot/tbox-agent/default',
}
class ConfigMigration:
"""Configuration migration helper for agent runner IDs.
Responsibilities:
- Resolve runner ID from new ai.runner.id or old ai.runner.runner
- Map old built-in runner names to official plugin runner IDs
- Extract runner config from ai.runner_config or old ai.<runner-name>
"""
@staticmethod
def resolve_runner_id(pipeline_config: dict[str, typing.Any]) -> str | None:
"""Resolve runner ID from pipeline configuration.
Priority:
1. New format: ai.runner.id (must be plugin:* format)
2. Old format: ai.runner.runner (mapped to plugin:* if built-in)
Args:
pipeline_config: Pipeline configuration dict
Returns:
Runner ID string, or None if not configured
"""
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner', {})
# Check new format first
runner_id = runner_config.get('id')
if runner_id:
if is_plugin_runner_id(runner_id):
return runner_id
# If it's not a plugin ID, try to map it as old runner name
return OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(runner_id, runner_id)
# Check old format
old_runner_name = runner_config.get('runner')
if old_runner_name:
# If already plugin:* format, return directly
if is_plugin_runner_id(old_runner_name):
return old_runner_name
# Map old built-in runner to official plugin ID
mapped_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name)
if mapped_id:
return mapped_id
# Return old name if no mapping exists (will error in registry)
return old_runner_name
return None
@staticmethod
def resolve_runner_config(
pipeline_config: dict[str, typing.Any],
runner_id: str,
) -> dict[str, typing.Any]:
"""Resolve runner instance configuration from pipeline configuration.
Priority:
1. New format: ai.runner_config[runner_id]
2. Old format: ai.<runner-name> (mapped from runner_id if applicable)
Args:
pipeline_config: Pipeline configuration dict
runner_id: Resolved runner ID
Returns:
Runner configuration dict (empty if not found)
"""
ai_config = pipeline_config.get('ai', {})
# Check new format
runner_configs = ai_config.get('runner_config', {})
if runner_id in runner_configs:
return runner_configs[runner_id]
# Check old format: ai.<old_runner_name>
# Try to find old runner name from runner_id
old_runner_name = None
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id:
old_runner_name = old_name
break
if old_runner_name:
old_config = ai_config.get(old_runner_name, {})
if old_config:
return old_config
# If runner_id is plugin:* format, try extracting runner_name as config key
if is_plugin_runner_id(runner_id):
# Some configs might use just the runner_name component as key
# But this is legacy behavior - prefer ai.runner_config[id]
pass
return {}
@staticmethod
def get_old_runner_name(runner_id: str) -> str | None:
"""Get old runner name from mapped runner ID.
Args:
runner_id: Plugin runner ID
Returns:
Old runner name if mapped, None otherwise
"""
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id:
return old_name
return None
@staticmethod
def get_expire_time(pipeline_config: dict[str, typing.Any]) -> int:
"""Get conversation expire time from configuration.
Args:
pipeline_config: Pipeline configuration dict
Returns:
Expire time in seconds (0 means no expiry)
"""
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner', {})
return runner_config.get('expire-time', 0)
@staticmethod
def migrate_pipeline_config(pipeline_config: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""Migrate pipeline config to new format.
This converts old ai.runner.runner and ai.<runner-name> to
new ai.runner.id and ai.runner_config format.
Args:
pipeline_config: Original pipeline configuration
Returns:
Migrated pipeline configuration
"""
# Create copy
new_config = dict(pipeline_config)
ai_config = new_config.get('ai', {})
if not ai_config:
return new_config
runner_config = ai_config.get('runner', {})
runner_configs = ai_config.get('runner_config', {})
# Resolve runner ID
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
if runner_id:
# Set new format
runner_config['id'] = runner_id
# Remove old runner field if present
if 'runner' in runner_config and is_plugin_runner_id(runner_config['runner']):
# Already migrated plugin:* format, keep as id
pass
elif 'runner' in runner_config:
# Old built-in runner name, remove after migration
old_name = runner_config['runner']
if old_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID:
del runner_config['runner']
# Migrate runner config
resolved_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
if resolved_config:
runner_configs[runner_id] = resolved_config
# Remove old runner config block
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id and old_name in ai_config:
del ai_config[old_name]
# Update configs
ai_config['runner'] = runner_config
ai_config['runner_config'] = runner_configs
new_config['ai'] = ai_config
return new_config

View File

@@ -0,0 +1,254 @@
"""Agent run context builder for converting Query to SDK v1 AgentRunContext."""
from __future__ import annotations
import uuid
import time
import typing
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .config_migration import ConfigMigration
# Internal models for SDK v1 context protocol matching SDK v1 resources.py
class AgentTrigger(typing.TypedDict):
"""Agent trigger information."""
type: str
source: str # 'pipeline' or 'event_router'
timestamp: int | None
class ConversationContext(typing.TypedDict):
"""Conversation context."""
session_id: str | None
conversation_id: str | None
launcher_type: str | None
launcher_id: str | None
sender_id: str | None
bot_uuid: str | None
pipeline_uuid: str | None
class AgentInput(typing.TypedDict):
"""Agent input."""
text: str | None
contents: list[dict[str, typing.Any]]
message_chain: dict[str, typing.Any] | None
attachments: list[dict[str, typing.Any]]
# SDK v1 Protocol resource models - matching langbot-plugin-sdk/resources.py
class ModelResource(typing.TypedDict):
"""Model resource per SDK v1."""
model_id: str
model_type: str | None
provider: str | None
class ToolResource(typing.TypedDict):
"""Tool resource per SDK v1."""
tool_name: str
tool_type: str | None
description: str | None
class KnowledgeBaseResource(typing.TypedDict):
"""Knowledge base resource per SDK v1."""
kb_id: str
kb_name: str | None
kb_type: str | None
class FileResource(typing.TypedDict):
"""File resource per SDK v1."""
file_id: str
file_name: str | None
mime_type: str | None
source: str | None
class StorageResource(typing.TypedDict):
"""Storage resource per SDK v1."""
plugin_storage: bool
workspace_storage: bool
class AgentResources(typing.TypedDict):
"""Agent resources per SDK v1."""
models: list[ModelResource]
tools: list[ToolResource]
knowledge_bases: list[KnowledgeBaseResource]
files: list[FileResource]
storage: StorageResource
platform_capabilities: dict[str, typing.Any]
class AgentRuntimeContext(typing.TypedDict):
"""Agent runtime context."""
langbot_version: str | None
sdk_protocol_version: str
query_id: int | None
trace_id: str | None
deadline_at: int | None
metadata: dict[str, typing.Any]
class AgentRunContextV1(typing.TypedDict):
"""SDK v1 AgentRunContext per PROTOCOL_V1.md."""
run_id: str
trigger: AgentTrigger
conversation: ConversationContext | None
event: dict[str, typing.Any] | None # Reserved for EBA
actor: dict[str, typing.Any] | None # Reserved for EBA
subject: dict[str, typing.Any] | None # Reserved for EBA
messages: list[dict[str, typing.Any]]
input: AgentInput
resources: AgentResources
runtime: AgentRuntimeContext
config: dict[str, typing.Any]
class AgentRunContextBuilder:
"""Builder for converting Query to SDK v1 AgentRunContext.
Responsibilities:
- Generate new run_id (UUID, not query id)
- Set trigger type to 'message.received' for pipeline
- Build conversation context from session
- Convert messages to SDK format
- Build input from user_message and message_chain
- Set resources from AgentResourceBuilder result
- Build runtime context with host info, trace_id, deadline
- Set config from runner instance configuration
"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def build_context(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
resources: AgentResources,
) -> AgentRunContextV1:
"""Build AgentRunContext from Query.
Args:
query: Pipeline query
descriptor: Runner descriptor
resources: Built resources from AgentResourceBuilder
Returns:
AgentRunContextV1 dict matching PROTOCOL_V1.md
"""
# Generate new run_id
run_id = str(uuid.uuid4())
# Build trigger
trigger: AgentTrigger = {
'type': 'message.received',
'source': 'pipeline',
'timestamp': int(time.time()),
}
# Build conversation context
conversation: ConversationContext | None = None
if query.session:
conversation = {
'session_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
'conversation_id': getattr(query.session.using_conversation, 'uuid', None),
'launcher_type': query.session.launcher_type.value,
'launcher_id': query.session.launcher_id,
'sender_id': str(query.sender_id),
'bot_uuid': query.bot_uuid,
'pipeline_uuid': query.pipeline_uuid,
}
# Build input
input: AgentInput = self._build_input(query)
# Build messages
messages = self._build_messages(query)
# Get runner config
runner_config = ConfigMigration.resolve_runner_config(
query.pipeline_config,
descriptor.id,
)
# Build runtime context
runtime: AgentRuntimeContext = {
'langbot_version': self.ap.ver_mgr.get_current_version(),
'sdk_protocol_version': descriptor.protocol_version,
'query_id': query.query_id,
'trace_id': run_id, # Use run_id as trace_id for now
'deadline_at': None, # TODO: set from runner config timeout
'metadata': {
'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'),
'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'),
},
}
# Build full context
context: AgentRunContextV1 = {
'run_id': run_id,
'trigger': trigger,
'conversation': conversation,
'event': None, # Reserved for EBA
'actor': None, # Reserved for EBA
'subject': None, # Reserved for EBA
'messages': messages,
'input': input,
'resources': resources,
'runtime': runtime,
'config': runner_config,
}
return context
def _build_input(self, query: pipeline_query.Query) -> AgentInput:
"""Build AgentInput from query."""
text = None
contents: list[dict[str, typing.Any]] = []
if query.user_message:
# Extract text if content is single text element
if isinstance(query.user_message.content, list):
for elem in query.user_message.content:
contents.append(elem.model_dump(mode='json'))
if elem.type == 'text':
text = getattr(elem, 'text', None)
else:
# Single string content
text = str(query.user_message.content)
contents.append({'type': 'text', 'text': text})
# Include message_chain for platform-specific format
message_chain_dict = None
if query.message_chain:
message_chain_dict = query.message_chain.model_dump(mode='json')
return {
'text': text,
'contents': contents,
'message_chain': message_chain_dict,
'attachments': [], # TODO: extract attachments from message_chain
}
def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
"""Build messages list from query."""
messages: list[dict[str, typing.Any]] = []
if query.messages:
for msg in query.messages:
messages.append(msg.model_dump(mode='json'))
return messages

View File

@@ -0,0 +1,72 @@
"""Agent runner descriptor."""
from __future__ import annotations
import typing
import pydantic
class AgentRunnerDescriptor(pydantic.BaseModel):
"""Descriptor for an agent runner.
Represents the discovered metadata for a runner, including
its identity, capabilities, permissions, and configuration schema.
"""
id: str
"""Unique runner ID: plugin:author/plugin_name/runner_name"""
source: typing.Literal['plugin']
"""Runner source type"""
label: dict[str, str]
"""Display labels keyed by locale (e.g., en_US, zh_Hans)"""
description: dict[str, str] | None = None
"""Optional description keyed by locale"""
plugin_author: str
"""Plugin author from manifest"""
plugin_name: str
"""Plugin name from manifest"""
runner_name: str
"""AgentRunner component name from manifest"""
plugin_version: str | None = None
"""Optional plugin version"""
protocol_version: str = '1'
"""SDK protocol version, default '1'"""
config_schema: list[dict[str, typing.Any]] = []
"""Configuration schema using DynamicForm format"""
capabilities: dict[str, bool] = {}
"""Runner capabilities: streaming, tool_calling, knowledge_retrieval, etc."""
permissions: dict[str, list[str]] = {}
"""Requested permissions: models, tools, knowledge_bases, storage, files, platform_api"""
raw_manifest: dict[str, typing.Any] = {}
"""Original manifest for reference"""
model_config = pydantic.ConfigDict(
extra='allow',
)
def get_plugin_id(self) -> str:
"""Return plugin identifier as author/name."""
return f'{self.plugin_author}/{self.plugin_name}'
def supports_streaming(self) -> bool:
"""Check if runner supports streaming output."""
return self.capabilities.get('streaming', False)
def supports_tool_calling(self) -> bool:
"""Check if runner supports tool calling."""
return self.capabilities.get('tool_calling', False)
def supports_knowledge_retrieval(self) -> bool:
"""Check if runner supports knowledge retrieval."""
return self.capabilities.get('knowledge_retrieval', False)

View File

@@ -0,0 +1,37 @@
"""Agent runner errors."""
from __future__ import annotations
class AgentRunnerError(Exception):
"""Base error for agent runner operations."""
pass
class RunnerNotFoundError(AgentRunnerError):
"""Runner not found in registry."""
def __init__(self, runner_id: str):
self.runner_id = runner_id
super().__init__(f'Agent runner not found: {runner_id}')
class RunnerNotAuthorizedError(AgentRunnerError):
"""Runner not authorized for this pipeline."""
def __init__(self, runner_id: str, bound_plugins: list[str] | None):
self.runner_id = runner_id
self.bound_plugins = bound_plugins
super().__init__(f'Agent runner {runner_id} not authorized for bound_plugins={bound_plugins}')
class RunnerProtocolError(AgentRunnerError):
"""Runner protocol version mismatch or invalid manifest."""
def __init__(self, runner_id: str, message: str):
self.runner_id = runner_id
super().__init__(f'Agent runner protocol error for {runner_id}: {message}')
class RunnerExecutionError(AgentRunnerError):
"""Runner execution failed."""
def __init__(self, runner_id: str, message: str, retryable: bool = False):
self.runner_id = runner_id
self.retryable = retryable
super().__init__(f'Agent runner {runner_id} execution failed: {message}')

View File

@@ -0,0 +1,92 @@
"""Agent runner ID parsing and formatting."""
from __future__ import annotations
import dataclasses
@dataclasses.dataclass(frozen=True)
class RunnerIdParts:
"""Parsed runner ID components."""
source: str # 'plugin' (future: 'builtin')
plugin_author: str
plugin_name: str
runner_name: str
def to_plugin_id(self) -> str:
"""Return plugin identifier as author/name."""
return f'{self.plugin_author}/{self.plugin_name}'
def parse_runner_id(runner_id: str) -> RunnerIdParts:
"""Parse runner ID string into components.
Args:
runner_id: Runner ID in format 'plugin:author/plugin_name/runner_name'
Returns:
RunnerIdParts with parsed components
Raises:
ValueError: If runner_id format is invalid
"""
if runner_id.startswith('plugin:'):
parts = runner_id[7:].split('/')
if len(parts) != 3:
raise ValueError(
f'Invalid plugin runner ID format: {runner_id}. '
f'Expected: plugin:author/plugin_name/runner_name'
)
plugin_author, plugin_name, runner_name = parts
if not plugin_author or not plugin_name or not runner_name:
raise ValueError(
f'Invalid plugin runner ID: {runner_id}. '
f'author, plugin_name, and runner_name must be non-empty'
)
return RunnerIdParts(
source='plugin',
plugin_author=plugin_author,
plugin_name=plugin_name,
runner_name=runner_name,
)
else:
# For backward compatibility with old built-in runner names
# This should eventually be removed after migration
raise ValueError(
f'Invalid runner ID format: {runner_id}. '
f'Expected: plugin:author/plugin_name/runner_name'
)
def format_runner_id(
source: str,
plugin_author: str,
plugin_name: str,
runner_name: str,
) -> str:
"""Format runner ID from components.
Args:
source: Runner source ('plugin')
plugin_author: Plugin author
plugin_name: Plugin name
runner_name: Runner component name
Returns:
Runner ID string
"""
if source == 'plugin':
return f'plugin:{plugin_author}/{plugin_name}/{runner_name}'
else:
raise ValueError(f'Invalid runner source: {source}')
def is_plugin_runner_id(runner_id: str) -> bool:
"""Check if runner ID is a plugin runner.
Args:
runner_id: Runner ID string
Returns:
True if runner ID starts with 'plugin:'
"""
return runner_id.startswith('plugin:')

View File

@@ -0,0 +1,158 @@
"""Agent run orchestrator for coordinating runner execution."""
from __future__ import annotations
import typing
import traceback
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .registry import AgentRunnerRegistry
from .context_builder import AgentRunContextBuilder, AgentRunContextV1
from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .config_migration import ConfigMigration
from .errors import (
RunnerNotFoundError,
RunnerExecutionError,
)
class AgentRunOrchestrator:
"""Orchestrator for agent runner execution.
Responsibilities:
- Resolve runner ID from pipeline config (new or old format)
- Get runner descriptor from registry
- Build AgentRunContext from Query
- Build AgentResources with permission filtering
- Invoke plugin runtime RUN_AGENT action
- Normalize AgentRunResult to Pipeline messages
- Handle errors, timeouts, protocol errors
- Maintain streaming card behavior
This is the main entry point for ChatMessageHandler.
"""
ap: app.Application
registry: AgentRunnerRegistry
context_builder: AgentRunContextBuilder
resource_builder: AgentResourceBuilder
result_normalizer: AgentResultNormalizer
def __init__(
self,
ap: app.Application,
registry: AgentRunnerRegistry,
):
self.ap = ap
self.registry = registry
self.context_builder = AgentRunContextBuilder(ap)
self.resource_builder = AgentResourceBuilder(ap)
self.result_normalizer = AgentResultNormalizer(ap)
async def run_from_query(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from pipeline query.
This is the main entry point called by ChatMessageHandler.
Args:
query: Pipeline query with pipeline_config, session, messages, etc.
Yields:
Message or MessageChunk for pipeline response
Raises:
RunnerNotFoundError: If runner not found
RunnerNotAuthorizedError: If runner not authorized
RunnerExecutionError: If runner execution failed
"""
# Resolve runner ID
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
if not runner_id:
raise RunnerNotFoundError('no runner configured')
# Get bound plugins for authorization
bound_plugins = query.variables.get('_pipeline_bound_plugins')
# Get runner descriptor
descriptor = await self.registry.get(runner_id, bound_plugins)
# Build resources
resources = await self.resource_builder.build_resources(query, descriptor)
# Build context
context = await self.context_builder.build_context(query, descriptor, resources)
# Run via plugin connector
async for result_dict in self._invoke_runner(descriptor, context):
# Normalize result
result = await self.result_normalizer.normalize(result_dict, descriptor)
if result is not None:
yield result
async def _invoke_runner(
self,
descriptor: AgentRunnerDescriptor,
context: AgentRunContextV1,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""Invoke runner via plugin connector.
Args:
descriptor: Runner descriptor
context: AgentRunContext dict
Yields:
Raw result dicts from plugin runtime
Raises:
RunnerExecutionError: If plugin system disabled or runtime error
"""
if not self.ap.plugin_connector.is_enable_plugin:
raise RunnerExecutionError(
descriptor.id,
'Plugin system is disabled',
retryable=False,
)
try:
async for result_dict in self.ap.plugin_connector.run_agent(
plugin_author=descriptor.plugin_author,
plugin_name=descriptor.plugin_name,
runner_name=descriptor.runner_name,
context=context,
):
yield result_dict
except RunnerExecutionError:
raise
except Exception as e:
# Wrap unexpected errors
self.ap.logger.error(
f'Runner {descriptor.id} unexpected error: {traceback.format_exc()}'
)
raise RunnerExecutionError(
descriptor.id,
str(e),
retryable=False,
)
def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None:
"""Resolve runner ID for telemetry/logging without full execution.
Args:
query: Pipeline query
Returns:
Runner ID string, or None
"""
return ConfigMigration.resolve_runner_id(query.pipeline_config)

View File

@@ -0,0 +1,277 @@
"""Agent runner registry for discovering and caching runner descriptors."""
from __future__ import annotations
import typing
import asyncio
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .id import parse_runner_id, format_runner_id
from .errors import RunnerNotFoundError, RunnerNotAuthorizedError
class AgentRunnerRegistry:
"""Registry for discovering and managing agent runners.
Responsibilities:
- Discover runners from plugin runtime via LIST_AGENT_RUNNERS
- Validate runner manifests (kind, metadata, spec)
- Cache discovered runners for performance
- Filter runners by bound plugins
- Handle manifest errors gracefully (log warning, skip runner)
"""
ap: app.Application
_cache: dict[str, AgentRunnerDescriptor] | None
"""Cached runner descriptors keyed by runner ID"""
_cache_lock: asyncio.Lock
"""Lock for cache refresh operations"""
def __init__(self, ap: app.Application):
self.ap = ap
self._cache = None
self._cache_lock = asyncio.Lock()
async def _discover_runners(self) -> dict[str, AgentRunnerDescriptor]:
"""Discover runners from plugin runtime.
Always discovers ALL runners (no bound_plugins filter).
The cache should contain unfiltered discovery results.
Returns:
Dict of runner descriptors keyed by runner ID
"""
if not self.ap.plugin_connector.is_enable_plugin:
return {}
runners: dict[str, AgentRunnerDescriptor] = {}
try:
# Always list all runners (bound_plugins=None)
plugin_runners = await self.ap.plugin_connector.list_agent_runners(None)
for runner_data in plugin_runners:
try:
descriptor = self._validate_and_build_descriptor(runner_data)
if descriptor is not None:
runners[descriptor.id] = descriptor
except Exception as e:
plugin_author = runner_data.get('plugin_author', 'unknown')
plugin_name = runner_data.get('plugin_name', 'unknown')
runner_name = runner_data.get('runner_name', 'unknown')
self.ap.logger.warning(
f'Invalid runner manifest for plugin:{plugin_author}/{plugin_name}/{runner_name}: {e}'
)
continue
except Exception as e:
self.ap.logger.warning(f'Failed to list agent runners from plugin runtime: {e}')
return {}
return runners
def _validate_and_build_descriptor(self, runner_data: dict[str, typing.Any]) -> AgentRunnerDescriptor | None:
"""Validate runner manifest and build descriptor.
Args:
runner_data: Raw runner data from plugin runtime with fields:
- plugin_author, plugin_name, runner_name
- manifest (full component manifest dict)
- protocol_version, capabilities, permissions, config (extracted from spec)
Returns:
AgentRunnerDescriptor if valid, None if invalid
"""
plugin_author = runner_data.get('plugin_author', '')
plugin_name = runner_data.get('plugin_name', '')
runner_name = runner_data.get('runner_name', '')
if not plugin_author or not plugin_name or not runner_name:
return None
manifest = runner_data.get('manifest', {})
# Validate kind
kind = manifest.get('kind', '')
if kind != 'AgentRunner':
return None
# Validate metadata
metadata = manifest.get('metadata', {})
name = metadata.get('name', '')
if not name:
return None
# metadata.label must exist
label = metadata.get('label', {})
if not label:
label = {name: name} # fallback
# SDK now provides these directly extracted from spec
protocol_version = runner_data.get('protocol_version', '1')
config_schema = runner_data.get('config', [])
capabilities = runner_data.get('capabilities', {})
permissions = runner_data.get('permissions', {})
# Build descriptor
runner_id = format_runner_id(
source='plugin',
plugin_author=plugin_author,
plugin_name=plugin_name,
runner_name=runner_name,
)
return AgentRunnerDescriptor(
id=runner_id,
source='plugin',
label=label,
description=metadata.get('description') or runner_data.get('runner_description'),
plugin_author=plugin_author,
plugin_name=plugin_name,
runner_name=runner_name,
plugin_version=runner_data.get('plugin_version'),
protocol_version=protocol_version,
config_schema=config_schema,
capabilities=capabilities,
permissions=permissions,
raw_manifest=manifest,
)
async def refresh(self) -> None:
"""Refresh runner cache.
Always discovers ALL runners (no bound_plugins filter).
The cache contains unfiltered discovery results.
"""
async with self._cache_lock:
self._cache = await self._discover_runners()
async def list_runners(
self,
bound_plugins: list[str] | None = None,
use_cache: bool = True,
) -> list[AgentRunnerDescriptor]:
"""List available runners.
Args:
bound_plugins: Optional filter for bound plugins (applied locally)
use_cache: Use cached data if available
Returns:
List of runner descriptors
"""
if use_cache and self._cache is not None:
# Filter from cache
return self._filter_runners_by_bound_plugins(self._cache, bound_plugins)
# Discover fresh (always full list)
runners = await self._discover_runners()
# Update cache (full list, unfiltered)
async with self._cache_lock:
self._cache = runners
# Filter locally
return self._filter_runners_by_bound_plugins(runners, bound_plugins)
def _filter_runners_by_bound_plugins(
self,
runners: dict[str, AgentRunnerDescriptor],
bound_plugins: list[str] | None,
) -> list[AgentRunnerDescriptor]:
"""Filter runners by bound plugins.
Args:
runners: Dict of runner descriptors
bound_plugins: Optional filter (None means all plugins allowed)
Returns:
Filtered list of runner descriptors
"""
if bound_plugins is None:
# All plugins allowed
return list(runners.values())
allowed_plugin_ids = set(bound_plugins)
filtered = []
for descriptor in runners.values():
plugin_id = descriptor.get_plugin_id()
if plugin_id in allowed_plugin_ids:
filtered.append(descriptor)
return filtered
async def get(
self,
runner_id: str,
bound_plugins: list[str] | None = None,
) -> AgentRunnerDescriptor:
"""Get a specific runner descriptor.
Args:
runner_id: Runner ID to lookup
bound_plugins: Optional bound plugins filter
Returns:
AgentRunnerDescriptor
Raises:
RunnerNotFoundError: If runner not found
RunnerNotAuthorizedError: If runner not in bound plugins
"""
# Parse and validate runner ID format
try:
parse_runner_id(runner_id)
except ValueError as e:
raise RunnerNotFoundError(runner_id) from e
# Get from cache or discover (always full list)
if self._cache is None:
await self.refresh()
if self._cache is None:
raise RunnerNotFoundError(runner_id)
descriptor = self._cache.get(runner_id)
if descriptor is None:
raise RunnerNotFoundError(runner_id)
# Check authorization
if bound_plugins is not None:
plugin_id = descriptor.get_plugin_id()
if plugin_id not in bound_plugins:
raise RunnerNotAuthorizedError(runner_id, bound_plugins)
return descriptor
async def get_runner_metadata_for_pipeline(self) -> list[dict[str, typing.Any]]:
"""Get runner metadata for pipeline configuration UI.
Returns runner options and their config schemas for the DynamicForm.
"""
# Get all runners (no bound plugin filter for metadata listing)
runners = await self.list_runners(bound_plugins=None)
options = []
stages = []
for descriptor in runners:
# Add runner option
options.append({
'name': descriptor.id,
'label': descriptor.label,
'description': descriptor.description,
})
# Add config schema as stage if not empty
if descriptor.config_schema:
stages.append({
'name': descriptor.id,
'label': descriptor.label,
'description': descriptor.description,
'config': descriptor.config_schema,
})
return options, stages

View File

@@ -0,0 +1,210 @@
"""Agent resource builder for constructing authorized resources."""
from __future__ import annotations
import typing
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .context_builder import (
AgentResources,
ModelResource,
ToolResource,
KnowledgeBaseResource,
StorageResource,
)
class AgentResourceBuilder:
"""Builder for constructing AgentResources with permission filtering.
Responsibilities:
- Apply 3-layer permission filtering:
1. Runner manifest declared permissions
2. Pipeline extensions_preference (bound plugins/MCP servers)
3. Runner instance config selected resources
- Build models list from authorized models
- Build tools list from bound plugins/MCP servers
- Build knowledge_bases list from config
- Build storage and files permissions summary
Note: This only builds the resource declaration. The actual proxy actions
in handler.py must still validate against ctx.resources at runtime.
Resource field names match SDK v1 Protocol:
- ModelResource: model_id, model_type, provider
- ToolResource: tool_name, tool_type, description
- KnowledgeBaseResource: kb_id, kb_name, kb_type
- StorageResource: plugin_storage, workspace_storage
"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def build_resources(
self,
query: typing.Any, # pipeline_query.Query
descriptor: AgentRunnerDescriptor,
) -> AgentResources:
"""Build AgentResources from query and runner descriptor.
Args:
query: Pipeline query with pipeline_config and variables
descriptor: Runner descriptor with permissions and capabilities
Returns:
AgentResources dict with filtered resource lists
"""
# Get bound plugins and MCP servers from query
bound_plugins = query.variables.get('_pipeline_bound_plugins')
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers')
# Layer 1: Runner manifest permissions
manifest_perms = descriptor.permissions
# Layer 2: Pipeline extensions_preference (already in bound_plugins/MCP servers)
# Layer 3: Runner instance config (from pipeline_config) - resolved via ConfigMigration
from .config_migration import ConfigMigration
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, descriptor.id)
# Build each resource category
models = await self._build_models(manifest_perms, query)
tools = await self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query)
knowledge_bases = await self._build_knowledge_bases(manifest_perms, runner_config, query)
storage = self._build_storage(manifest_perms)
return {
'models': models,
'tools': tools,
'knowledge_bases': knowledge_bases,
'files': [], # Files are populated at runtime
'storage': storage,
'platform_capabilities': {}, # Reserved for EBA
}
async def _build_models(
self,
manifest_perms: dict[str, list[str]],
query: typing.Any,
) -> list[ModelResource]:
"""Build models list with SDK v1 field names."""
models: list[ModelResource] = []
# Check manifest permission
model_perms = manifest_perms.get('models', [])
if 'invoke' not in model_perms and 'stream' not in model_perms:
return models
# Get model from query (preproc already resolved this)
model_uuid = getattr(query, 'use_llm_model_uuid', None)
if not model_uuid:
return models
try:
model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
if model and model.model_entity:
# Use SDK v1 field names: model_id, model_type, provider
models.append({
'model_id': model_uuid,
'model_type': model.model_entity.model_type,
'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None,
})
except Exception:
pass
# Add fallback models if present
fallback_uuids = query.variables.get('_fallback_model_uuids', [])
for fb_uuid in fallback_uuids:
try:
model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
if model and model.model_entity:
models.append({
'model_id': fb_uuid,
'model_type': model.model_entity.model_type,
'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None,
})
except Exception:
pass
return models
async def _build_tools(
self,
manifest_perms: dict[str, list[str]],
bound_plugins: list[str] | None,
bound_mcp_servers: list[str] | None,
query: typing.Any,
) -> list[ToolResource]:
"""Build tools list with SDK v1 field names."""
tools: list[ToolResource] = []
# Check manifest permission
tool_perms = manifest_perms.get('tools', [])
if 'list' not in tool_perms and 'call' not in tool_perms:
return tools
# Get tools from query (preproc already resolved this for local-agent)
use_funcs = getattr(query, 'use_funcs', [])
for tool in use_funcs:
# Use SDK v1 field names: tool_name, tool_type, description
tools.append({
'tool_name': tool.name,
'tool_type': None, # Tool type not available in current LLMTool
'description': tool.description,
})
return tools
async def _build_knowledge_bases(
self,
manifest_perms: dict[str, list[str]],
runner_config: dict[str, typing.Any],
query: typing.Any,
) -> list[KnowledgeBaseResource]:
"""Build knowledge bases list with SDK v1 field names."""
kb_resources: list[KnowledgeBaseResource] = []
# Check manifest permission
kb_perms = manifest_perms.get('knowledge_bases', [])
if 'list' not in kb_perms and 'retrieve' not in kb_perms:
return kb_resources
# Get knowledge base UUIDs from config
kb_uuids = runner_config.get('knowledge-bases', [])
if not kb_uuids:
# Old single KB config
old_kb_uuid = runner_config.get('knowledge-base', '')
if old_kb_uuid and old_kb_uuid != '__none__':
kb_uuids = [old_kb_uuid]
# Also check query variables (may be modified by plugin PromptPreProcessing)
kb_uuids_from_vars = query.variables.get('_knowledge_base_uuids', [])
if kb_uuids_from_vars:
kb_uuids = kb_uuids_from_vars
for kb_uuid in kb_uuids:
try:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if kb:
# Use SDK v1 field names: kb_id, kb_name, kb_type
kb_resources.append({
'kb_id': kb_uuid,
'kb_name': kb.get_name(),
'kb_type': kb.knowledge_base_entity.kb_type if hasattr(kb.knowledge_base_entity, 'kb_type') else None,
})
except Exception:
pass
return kb_resources
def _build_storage(
self,
manifest_perms: dict[str, list[str]],
) -> StorageResource:
"""Build storage permissions with SDK v1 field names."""
storage_perms = manifest_perms.get('storage', [])
return {
'plugin_storage': 'plugin' in storage_perms,
'workspace_storage': 'workspace' in storage_perms,
}

View File

@@ -0,0 +1,180 @@
"""Agent result normalizer for converting SDK v1 AgentRunResult to Pipeline messages."""
from __future__ import annotations
import typing
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .errors import RunnerExecutionError, RunnerProtocolError
# Maximum size for a single result payload (prevent memory exhaustion)
MAX_RESULT_SIZE_BYTES = 1024 * 1024 # 1 MB
class AgentResultNormalizer:
"""Normalizer for converting SDK v1 AgentRunResult to Pipeline messages.
Responsibilities:
- Accept only SDK v1 result types (message.delta, message.completed, etc.)
- Map message.delta -> MessageChunk
- Map message.completed -> Message
- Map run.completed (with message) -> Message
- Handle run.failed as controlled error
- Ignore unknown types with warning
- Validate result size
- Validate message schema
Per PROTOCOL_V1.md, accepted types:
- message.delta
- message.completed
- tool.call.started
- tool.call.completed
- state.updated
- run.completed
- run.failed
- action.requested (log only, don't execute)
"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def normalize(
self,
result_dict: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
) -> provider_message.Message | provider_message.MessageChunk | None:
"""Normalize AgentRunResult to Message or MessageChunk.
Args:
result_dict: Raw result dict from plugin runtime
descriptor: Runner descriptor for error context
Returns:
Message, MessageChunk, or None (for non-message events)
Raises:
RunnerExecutionError: On run.failed
RunnerProtocolError: On invalid result format
"""
# Validate result type
result_type = result_dict.get('type')
if not result_type:
raise RunnerProtocolError(descriptor.id, 'Missing result type')
# Validate result size
try:
import json
result_json = json.dumps(result_dict)
if len(result_json) > MAX_RESULT_SIZE_BYTES:
self.ap.logger.warning(
f'Runner {descriptor.id} result too large ({len(result_json)} bytes), truncating'
)
# Truncate content if possible
data = result_dict.get('data', {})
if 'chunk' in data or 'message' in data:
content = data.get('chunk', {}).get('content', '') or data.get('message', {}).get('content', '')
if isinstance(content, str) and len(content) > 10000:
# Keep reasonable length
data['chunk'] = {'role': 'assistant', 'content': content[:10000] + '...[truncated]'}
except Exception:
pass
# Handle each result type
data = result_dict.get('data', {})
if result_type == 'message.delta':
return self._normalize_message_delta(data, descriptor)
elif result_type == 'message.completed':
return self._normalize_message_completed(data, descriptor)
elif result_type == 'tool.call.started':
# Log only, don't yield to pipeline
self.ap.logger.debug(
f'Runner {descriptor.id} tool call started: {data.get("tool_name", "unknown")}'
)
return None
elif result_type == 'tool.call.completed':
# Log only, don't yield to pipeline
self.ap.logger.debug(
f'Runner {descriptor.id} tool call completed: {data.get("tool_name", "unknown")}'
)
return None
elif result_type == 'state.updated':
# Log for telemetry, don't yield
self.ap.logger.debug(
f'Runner {descriptor.id} state updated: {data.get("key", "unknown")}={data.get("value", "...")}'
)
return None
elif result_type == 'run.completed':
# May include final message
if 'message' in data:
return self._normalize_message_completed(data, descriptor)
# If no message, it's just completion signal
return None
elif result_type == 'run.failed':
error_msg = data.get('error', 'Unknown error')
error_code = data.get('code', 'unknown')
retryable = data.get('retryable', False)
raise RunnerExecutionError(
descriptor.id,
f'{error_msg} (code: {error_code})',
retryable=retryable,
)
elif result_type == 'action.requested':
# Reserved for EBA - log only, don't execute
self.ap.logger.info(
f'Runner {descriptor.id} requested action (not executed in current phase): '
f'{data.get("action", "unknown")}'
)
return None
else:
# Unknown type - warn and ignore (SDK v1 only)
self.ap.logger.warning(
f'Runner {descriptor.id} returned unknown result type: {result_type}. '
f'Expected SDK v1 types (message.delta, message.completed, run.completed, run.failed, etc.)'
)
return None
def _normalize_message_delta(
self,
data: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
) -> provider_message.MessageChunk:
"""Normalize message.delta to MessageChunk."""
chunk_data = data.get('chunk', {})
if not chunk_data:
raise RunnerProtocolError(descriptor.id, 'message.delta missing chunk data')
try:
chunk = provider_message.MessageChunk.model_validate(chunk_data)
return chunk
except Exception as e:
raise RunnerProtocolError(descriptor.id, f'Invalid chunk schema: {e}')
def _normalize_message_completed(
self,
data: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
) -> provider_message.Message:
"""Normalize message.completed to Message."""
message_data = data.get('message', {})
if not message_data:
raise RunnerProtocolError(descriptor.id, 'message.completed missing message data')
try:
msg = provider_message.Message.model_validate(message_data)
return msg
except Exception as e:
raise RunnerProtocolError(descriptor.id, f'Invalid message schema: {e}')

View File

@@ -31,7 +31,7 @@ class PipelineService:
self.ap = ap
async def get_pipeline_metadata(self) -> list[dict]:
"""Get pipeline metadata with dynamically loaded plugin runners"""
"""Get pipeline metadata with dynamically loaded plugin runners from registry"""
import copy
# Deep copy AI metadata to avoid modifying the original
@@ -48,43 +48,20 @@ class PipelineService:
# Find the runner select config
for config_item in runner_stage.get('config', []):
if config_item.get('name') == 'runner':
# Get plugin agent runners
# Get plugin agent runners from registry
try:
plugin_runners = await self.ap.plugin_connector.list_agent_runners()
runner_options, runner_stages = await self.ap.agent_runner_registry.get_runner_metadata_for_pipeline()
# Add plugin runners to options
for runner in plugin_runners:
manifest = runner.get('manifest', {})
metadata = manifest.get('metadata', {})
for option in runner_options:
config_item['options'].append(option)
# Format: plugin:author/plugin_name/runner_name
runner_value = (
f'plugin:{runner["plugin_author"]}/{runner["plugin_name"]}/{runner["runner_name"]}'
)
# Add to options
config_item['options'].append(
{
'name': runner_value,
'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}),
'description': metadata.get('description'),
}
)
# Add corresponding stage configuration for this runner
spec_config = manifest.get('spec', {}).get('config', [])
if spec_config:
ai_metadata['stages'].append(
{
'name': runner_value,
'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}),
'description': metadata.get('description'),
'config': spec_config,
}
)
# Add corresponding stage configuration for each runner
for stage_config in runner_stages:
ai_metadata['stages'].append(stage_config)
except Exception as e:
self.ap.logger.warning(f'Failed to load plugin agent runners: {e}')
self.ap.logger.warning(f'Failed to load plugin agent runners from registry: {e}')
return [
self.ap.pipeline_config_meta_trigger,

View File

@@ -4,6 +4,7 @@ import logging
import asyncio
import traceback
import os
from typing import TYPE_CHECKING
from ..platform import botmgr as im_mgr
from ..platform.webhook_pusher import WebhookPusher
@@ -46,6 +47,9 @@ from ..telemetry import telemetry as telemetry_module
from ..survey import manager as survey_module
from ..skill import manager as skill_mgr
if TYPE_CHECKING:
from ..agent.runner import AgentRunnerRegistry, AgentRunOrchestrator
class Application:
"""Runtime application object and context"""
@@ -165,6 +169,11 @@ class Application:
maintenance_service: maintenance_service.MaintenanceService = None
# Agent runner subsystem
agent_runner_registry: AgentRunnerRegistry = None
agent_run_orchestrator: AgentRunOrchestrator = None
def __init__(self):
pass

View File

@@ -39,6 +39,7 @@ from ...vector import mgr as vectordb_mgr
from .. import taskmgr
from ...telemetry import telemetry as telemetry_module
from ...survey import manager as survey_module
from ...agent.runner import AgentRunnerRegistry, AgentRunOrchestrator
@stage.stage_class('BuildAppStage')
@@ -194,5 +195,12 @@ class BuildAppStage(stage.BootingStage):
await plugin_connector_inst.initialize()
ap.plugin_connector = plugin_connector_inst
# Initialize agent runner subsystem
agent_runner_registry_inst = AgentRunnerRegistry(ap)
ap.agent_runner_registry = agent_runner_registry_inst
agent_run_orchestrator_inst = AgentRunOrchestrator(ap, agent_runner_registry_inst)
ap.agent_run_orchestrator = agent_run_orchestrator_inst
ctrl = controller.Controller(ap)
ap.ctrl = ctrl

View File

@@ -9,6 +9,12 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.platform.events as platform_events
from ...agent.runner.config_migration import ConfigMigration
# Official local-agent runner ID
LOCAL_AGENT_RUNNER_ID = 'plugin:langbot/local-agent/default'
@stage.stage_class('PreProcessor')
class PreProcessor(stage.PipelineStage):
@@ -31,19 +37,28 @@ class PreProcessor(stage.PipelineStage):
stage_inst_name: str,
) -> entities.StageProcessResult:
"""Process"""
selected_runner = query.pipeline_config['ai']['runner']['runner']
include_skill_authoring = (
selected_runner == 'local-agent' and getattr(self.ap, 'skill_service', None) is not None
)
# Resolve runner ID using ConfigMigration (supports both new and old formats)
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
# Get runner config (from new ai.runner_config or old ai.<runner-name>)
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {}
session = await self.ap.sess_mgr.get_session(query)
# Determine if this is a local-agent runner (built-in LLM capabilities)
# Check by runner_id OR by legacy runner field for backward compatibility
is_local_agent = runner_id == LOCAL_AGENT_RUNNER_ID or (
runner_id is None and
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner') == 'local-agent'
)
include_skill_authoring = is_local_agent and getattr(self.ap, 'skill_service', None) is not None
# When not local-agent, llm_model is None
llm_model = None
if selected_runner == 'local-agent':
if is_local_agent:
# Read model config — new format is { primary: str, fallbacks: [str] },
# but handle legacy plain string for backward compatibility
model_config = query.pipeline_config['ai']['local-agent'].get('model', {})
model_config = runner_config.get('model', {})
if isinstance(model_config, str):
# Legacy format: plain UUID string
primary_uuid = model_config
@@ -70,10 +85,17 @@ class PreProcessor(stage.PipelineStage):
if valid_fallbacks:
query.variables['_fallback_model_uuids'] = valid_fallbacks
# Get prompt config - for local-agent, use runner_config; for others, use default prompt
prompt_config = runner_config.get('prompt', [
{'role': 'system', 'content': 'You are a helpful assistant.'}
]) if is_local_agent else [
{'role': 'system', 'content': 'You are a helpful assistant.'}
]
conversation = await self.ap.sess_mgr.get_conversation(
query,
session,
query.pipeline_config['ai']['local-agent']['prompt'],
prompt_config,
query.pipeline_uuid,
query.bot_uuid,
)
@@ -82,7 +104,7 @@ class PreProcessor(stage.PipelineStage):
# been idle for longer than the configured conversation expire time.
# The idle window is measured from the last preprocess/update time, not
# from the conversation creation time.
conversation_expire_time = query.pipeline_config.get('ai', {}).get('runner', {}).get('expire-time', None)
conversation_expire_time = ConfigMigration.get_expire_time(query.pipeline_config)
now = datetime.datetime.now()
if conversation_expire_time is not None and conversation_expire_time > 0:
last_update_time = getattr(conversation, 'update_time', None) or getattr(conversation, 'create_time', None)
@@ -104,7 +126,7 @@ class PreProcessor(stage.PipelineStage):
query.prompt = conversation.prompt.copy()
query.messages = conversation.messages.copy()
if selected_runner == 'local-agent':
if is_local_agent:
query.use_funcs = []
if llm_model:
query.use_llm_model_uuid = llm_model.model_entity.uuid
@@ -160,7 +182,7 @@ class PreProcessor(stage.PipelineStage):
# Check if this model supports vision, if not, remove all images
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
if (
selected_runner == 'local-agent'
is_local_agent
and llm_model
and not llm_model.model_entity.abilities.__contains__('vision')
):
@@ -173,14 +195,15 @@ class PreProcessor(stage.PipelineStage):
content_list: list[provider_message.ContentElement] = []
plain_text = ''
quote_msg = query.pipeline_config['trigger'].get('misc', '').get('combine-quote-message')
quote_msg = query.pipeline_config['trigger'].get('misc', {}).get('combine-quote-message', False)
for me in query.message_chain:
if isinstance(me, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(me.text))
plain_text += me.text
elif isinstance(me, platform_message.Image):
if selected_runner != 'local-agent' or (
# Allow images for non-local-agent runners or if local-agent has vision
if not is_local_agent or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if me.base64 is not None:
@@ -201,7 +224,7 @@ class PreProcessor(stage.PipelineStage):
if isinstance(msg, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if selected_runner != 'local-agent' or (
if not is_local_agent or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if msg.base64 is not None:
@@ -225,9 +248,10 @@ class PreProcessor(stage.PipelineStage):
# Extract knowledge base UUIDs into query variables so plugins can modify them
# during PromptPreProcessing before the runner performs retrieval.
kb_uuids = query.pipeline_config['ai']['local-agent'].get('knowledge-bases', [])
# Only for local-agent runner
kb_uuids = runner_config.get('knowledge-bases', []) if is_local_agent else []
if not kb_uuids:
old_kb_uuid = query.pipeline_config['ai']['local-agent'].get('knowledge-base', '')
old_kb_uuid = runner_config.get('knowledge-base', '') if is_local_agent else ''
if old_kb_uuid and old_kb_uuid != '__none__':
kb_uuids = [old_kb_uuid]
query.variables['_knowledge_base_uuids'] = list(kb_uuids)
@@ -260,7 +284,7 @@ class PreProcessor(stage.PipelineStage):
# only) into the system prompt. The contributor's original PR
# relied on this injection; without it the LLM never discovers
# the skills are there and just calls native tools instead.
if selected_runner == 'local-agent' and self.ap.skill_mgr:
if is_local_agent and self.ap.skill_mgr:
pipeline_data = await self.ap.pipeline_service.get_pipeline(query.pipeline_uuid)
extensions_prefs = (pipeline_data or {}).get('extensions_preferences', {})
enable_all_skills = extensions_prefs.get('enable_all_skills', True)

View File

@@ -9,99 +9,28 @@ from datetime import datetime
from .. import handler
from ... import entities
from ....provider import runner as runner_module
import langbot_plugin.api.entities.events as events
from ....utils import importutil, constants, runner as runner_utils
from ....provider import runners
from ....utils import constants, runner as runner_utils
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
importutil.import_modules_in_pkg(runners)
class PluginAgentRunnerWrapper(runner_module.RequestRunner):
"""Wrapper to run AgentRunner from plugin"""
def __init__(self, ap, plugin_author: str, plugin_name: str, runner_name: str, pipeline_config: dict):
super().__init__(ap, pipeline_config)
self.plugin_author = plugin_author
self.plugin_name = plugin_name
self.runner_name = runner_name
self.name = f'plugin:{plugin_author}/{plugin_name}/{runner_name}'
async def run(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run the plugin agent runner"""
# Build AgentRunContext
context = AgentRunContext(
query_id=query.query_id,
session=query.session,
messages=query.messages,
user_message=query.user_message.content[0]
if isinstance(query.user_message.content, list)
else provider_message.ContentElement.from_text(query.user_message.content),
use_funcs=query.use_funcs,
extra_config=self.pipeline_config.get('ai', {}).get(self.runner_name, {}),
)
# Call plugin connector to run agent
async for result_dict in self.ap.plugin_connector.run_agent(
plugin_author=self.plugin_author,
plugin_name=self.plugin_name,
runner_name=self.runner_name,
context=context.model_dump(),
):
# Convert result to Message/MessageChunk
result_type = result_dict.get('type')
if result_type == 'chunk':
# Stream chunk
chunk_data = result_dict.get('message_chunk')
if chunk_data:
yield provider_message.MessageChunk.model_validate(chunk_data)
elif result_type == 'text':
# Text content
content = result_dict.get('content', '')
yield provider_message.MessageChunk(
role='assistant',
content=content,
)
elif result_type == 'tool_call':
# Tool call notification (may not need to yield anything here)
pass
elif result_type == 'finish':
# Final message
message_data = result_dict.get('message')
if message_data:
yield provider_message.Message.model_validate(message_data)
else:
# Fallback: create message from content
content = result_dict.get('content', '')
yield provider_message.Message(
role='assistant',
content=content,
)
class ChatMessageHandler(handler.MessageHandler):
"""Chat message handler using AgentRunOrchestrator.
This handler delegates all runner execution to the agent_run_orchestrator,
which resolves runner ID, builds context, invokes plugin runtime,
and normalizes results.
"""
async def handle(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[entities.StageProcessResult, None]:
"""处理"""
# 调API
# 生成器
# 触发插件事件
"""Handle chat message by delegating to AgentRunOrchestrator."""
# Trigger plugin event
event_class = (
events.PersonNormalMessageReceived
if query.launcher_type == provider_session.LauncherTypes.PERSON
@@ -122,7 +51,7 @@ class ChatMessageHandler(handler.MessageHandler):
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
event_ctx = await self.ap.plugin_connector.emit_event(event, bound_plugins)
is_create_card = False # 判断下是否需要创建流式卡片
is_create_card = False # Track if streaming card was created
if event_ctx.is_prevented_default():
if event_ctx.event.reply_message_chain is not None:
@@ -153,55 +82,37 @@ class ChatMessageHandler(handler.MessageHandler):
is_stream = False
try:
runner_name = query.pipeline_config['ai']['runner']['runner']
# Check if it's a built-in runner
runner = None
for r in runner_module.preregistered_runners:
if r.name == runner_name:
runner = r(self.ap, query.pipeline_config)
break
# If not found in built-in runners, check plugin runners
if runner is None:
# Parse runner name: format is "plugin:author/plugin_name/runner_name"
if runner_name.startswith('plugin:'):
parts = runner_name[7:].split('/') # Remove "plugin:" prefix
if len(parts) == 3:
plugin_author, plugin_name, component_runner_name = parts
runner = PluginAgentRunnerWrapper(
self.ap, plugin_author, plugin_name, component_runner_name, query.pipeline_config
)
else:
raise ValueError(
f'Invalid plugin runner name format: {runner_name}. Expected: plugin:author/name/runner'
)
else:
raise ValueError(f'Request Runner not found: {runner_name}')
# Mark start time for telemetry
start_ts = time.time()
if is_stream:
resp_message_id = uuid.uuid4()
chunk_count = 0 # Track streaming chunks to reduce excessive logging
# Create a single resp_message_id for the entire streaming response
resp_message_id = uuid.uuid4()
chunk_count = 0
async for result in runner.run(query):
result.resp_message_id = str(resp_message_id)
# Use AgentRunOrchestrator to run the agent
# This replaces direct runner lookup and PluginAgentRunnerWrapper
async for result in self.ap.agent_run_orchestrator.run_from_query(query):
result.resp_message_id = str(resp_message_id)
# For streaming mode, pop previous response before adding new chunk
# This allows incremental card updates
if is_stream:
if query.resp_messages:
query.resp_messages.pop()
if query.resp_message_chain:
query.resp_message_chain.pop()
# 此时连接外部 AI 服务正常,创建卡片
if not is_create_card: # 只有不是第一次才创建卡片
# Create streaming card on first result (connection established)
if not is_create_card:
await query.adapter.create_message_card(str(resp_message_id), query.message_event)
is_create_card = True
query.resp_messages.append(result)
query.resp_messages.append(result)
if is_stream:
chunk_count += 1
# Only log every 10th chunk to reduce excessive logging during streaming
# This prevents memory overflow from thousands of log entries per conversation
# First chunk uses INFO level to confirm connection establishment
# Only log every 10th chunk to reduce excessive logging during streaming.
# First chunk uses INFO level to confirm connection establishment.
if chunk_count == 1:
summary = self.format_result_log(result)
if summary is not None:
@@ -212,46 +123,57 @@ class ChatMessageHandler(handler.MessageHandler):
self.ap.logger.debug(
f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}'
)
if result.content is not None:
text_length += len(result.content)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
# Log final summary after streaming completes
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars'
)
else:
async for result in runner.run(query):
query.resp_messages.append(result)
else:
summary = self.format_result_log(result)
if summary is not None:
self.ap.logger.info(f'Conversation({query.query_id}) Response: {summary}')
if result.content is not None:
text_length += len(result.content)
if result.content is not None:
text_length += len(result.content)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
# Log final summary after streaming completes
if is_stream:
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars'
)
# Update conversation history
query.session.using_conversation.messages.append(query.user_message)
query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e:
# Import orchestrator errors for specific handling
from ....agent.runner.errors import (
RunnerNotFoundError,
RunnerNotAuthorizedError,
RunnerExecutionError,
)
error_info = f'{traceback.format_exc()}'
self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {error_info}')
traceback.print_exc()
exception_handling = query.pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
# Handle specific runner errors with appropriate messages
if isinstance(e, RunnerNotFoundError):
user_notice = f'Agent runner not found: {e.runner_id}'
elif isinstance(e, RunnerNotAuthorizedError):
user_notice = 'Agent runner not authorized for this pipeline'
elif isinstance(e, RunnerExecutionError):
if e.retryable:
user_notice = 'Agent runner temporarily unavailable. Please try again.'
else:
user_notice = 'Agent runner execution failed.'
else:
# Use existing exception handling
exception_handling = query.pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
if exception_handling == 'show-error':
user_notice = f'{e}'
elif exception_handling == 'show-hint':
user_notice = query.pipeline_config['output']['misc'].get('failure-hint', 'Request failed.')
else: # hide
user_notice = None
if exception_handling == 'show-error':
user_notice = f'{e}'
elif exception_handling == 'show-hint':
user_notice = query.pipeline_config['output']['misc'].get('failure-hint', 'Request failed.')
else: # hide
user_notice = None
yield entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT,
@@ -261,7 +183,7 @@ class ChatMessageHandler(handler.MessageHandler):
debug_notice=traceback.format_exc(),
)
finally:
# Telemetry reporting: collect minimal per-query execution info and send asynchronously
# Telemetry reporting
try:
end_ts = time.time()
duration_ms = None
@@ -269,16 +191,14 @@ class ChatMessageHandler(handler.MessageHandler):
duration_ms = int((end_ts - start_ts) * 1000)
adapter_name = query.adapter.__class__.__name__ if hasattr(query, 'adapter') else None
runner_name = (
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner')
if query.pipeline_config
else None
)
# Model name if using localagent
# Use orchestrator to resolve runner ID for telemetry
runner_name = self.ap.agent_run_orchestrator.resolve_runner_id_for_telemetry(query)
# Model name if available
model_name = None
try:
if runner_name == 'local-agent' and getattr(query, 'use_llm_model_uuid', None):
if getattr(query, 'use_llm_model_uuid', None):
m = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid)
if m and getattr(m, 'model_entity', None):
model_name = getattr(m.model_entity, 'name', None)
@@ -288,7 +208,7 @@ class ChatMessageHandler(handler.MessageHandler):
pipeline_plugins = query.variables.get('_pipeline_bound_plugins', None)
runner_category = runner_utils.get_runner_category_from_runner(
runner_name, runner, query.pipeline_config
runner_name, None, query.pipeline_config
)
payload = {
@@ -306,7 +226,6 @@ class ChatMessageHandler(handler.MessageHandler):
'timestamp': datetime.utcnow().isoformat(),
}
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
await self.ap.telemetry.start_send_task(payload)
# Trigger survey event on first successful non-WebSocket response
@@ -314,5 +233,4 @@ class ChatMessageHandler(handler.MessageHandler):
if self.ap.survey:
await self.ap.survey.trigger_event('first_bot_response_success')
except Exception as ex:
# Ensure telemetry issues do not affect normal flow
self.ap.logger.warning(f'Failed to send telemetry: {ex}')

View File

@@ -779,14 +779,16 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
yield cmd_ret
# AgentRunner methods
async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[ComponentManifest]:
"""List all available AgentRunner components."""
async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[dict[str, Any]]:
"""List all available AgentRunner components.
Returns list of dicts with plugin_author, plugin_name, runner_name, manifest, etc.
"""
if not self.is_enable_plugin:
return []
runners_data = await self.handler.list_agent_runners(include_plugins=bound_plugins)
runners = [ComponentManifest.model_validate(runner) for runner in runners_data]
return runners
return runners_data
async def run_agent(
self,
@@ -804,10 +806,18 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
context: AgentRunContext as dict
Yields:
AgentRunReturn results as dicts
AgentRunResult dicts per Protocol v1
"""
if not self.is_enable_plugin:
yield {'type': 'finish', 'finish_reason': 'error', 'content': 'Plugin system is disabled'}
# Return v1 protocol run.failed
yield {
'type': 'run.failed',
'data': {
'error': 'Plugin system is disabled',
'code': 'plugin.disabled',
'retryable': False,
},
}
return
gen = self.handler.run_agent(plugin_author, plugin_name, runner_name, context)

View File

@@ -419,76 +419,6 @@ class RuntimeConnectionHandler(handler.Handler):
message=f'Failed to execute tool {tool_name}: {e}',
)
@self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE)
async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse:
"""Retrieve knowledge from a knowledge base"""
kb_uuid = data['kb_uuid']
query = data['query']
top_k = data.get('top_k', 5)
try:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if kb is None:
return handler.ActionResponse.error(
message=f'Knowledge base with uuid {kb_uuid} not found',
)
results = await kb.retrieve(query=query, top_k=top_k)
# Convert results to dict format
results_data = [
{
'id': r.id,
'content': [c.model_dump() for c in r.content],
'metadata': r.metadata,
}
for r in results
]
return handler.ActionResponse.success(
data={
'results': results_data,
},
)
except Exception as e:
traceback.print_exc()
return handler.ActionResponse.error(
message=f'Failed to retrieve knowledge: {e}',
)
@self.action(PluginToRuntimeAction.INVOKE_EMBEDDING)
async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse:
"""Invoke an embedding model"""
embedding_model_uuid = data['embedding_model_uuid']
texts = data['texts']
try:
embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid)
if embedding_model is None:
return handler.ActionResponse.error(
message=f'Embedding model with uuid {embedding_model_uuid} not found',
)
# Call embedding model to generate embeddings
embeddings = []
for text in texts:
embedding = await embedding_model.provider.invoke_embedding(
model=embedding_model,
text=text,
)
embeddings.append(embedding)
return handler.ActionResponse.success(
data={
'embeddings': embeddings,
},
)
except Exception as e:
traceback.print_exc()
return handler.ActionResponse.error(
message=f'Failed to invoke embedding model: {e}',
)
@self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE)
async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
"""Set binary storage"""
@@ -856,10 +786,11 @@ class RuntimeConnectionHandler(handler.Handler):
# Validate kb_id is in pipeline's allowed list
allowed_kb_uuids = []
if query.pipeline_config:
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
allowed_kb_uuids = local_agent_config.get('knowledge-bases', [])
from langbot.pkg.agent.runner.config_migration import ConfigMigration
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, None)
allowed_kb_uuids = runner_config.get('knowledge-bases', [])
if not allowed_kb_uuids:
old_kb_uuid = local_agent_config.get('knowledge-base', '')
old_kb_uuid = runner_config.get('knowledge-base', '')
if old_kb_uuid and old_kb_uuid != '__none__':
allowed_kb_uuids = [old_kb_uuid]
@@ -1035,6 +966,55 @@ class RuntimeConnectionHandler(handler.Handler):
return result['tools']
async def list_agent_runners(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]:
"""List agent runners from plugin runtime.
Returns list of dicts with:
- plugin_author
- plugin_name
- runner_name
- runner_description
- manifest
- protocol_version
- capabilities
- permissions
- config
"""
result = await self.call_action(
LangBotToRuntimeAction.LIST_AGENT_RUNNERS,
{
'include_plugins': include_plugins,
},
timeout=20,
)
return result['runners']
async def run_agent(
self,
plugin_author: str,
plugin_name: str,
runner_name: str,
context: dict[str, Any],
) -> typing.AsyncGenerator[dict[str, Any], None]:
"""Run an AgentRunner component.
Yields AgentRunResult dicts per Protocol v1.
"""
gen = self.call_action_generator(
LangBotToRuntimeAction.RUN_AGENT,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
'runner_name': runner_name,
'context': context,
},
timeout=300,
)
async for ret in gen:
yield ret
async def get_plugin_icon(self, plugin_author: str, plugin_name: str) -> dict[str, Any]:
"""Get plugin icon"""
result = await self.call_action(