feat: make agent runner config schema driven

This commit is contained in:
huanghuoguoguo
2026-05-19 12:20:28 +08:00
parent f4f91c43b5
commit be8d30894a
20 changed files with 901 additions and 236 deletions

View File

@@ -24,7 +24,8 @@ class ConfigMigration:
Responsibilities:
- Resolve runner ID from new ai.runner.id or old ai.runner.runner
- Map old built-in runner names to official plugin runner IDs
- Extract runner config from ai.runner_config or old ai.<runner-name>
- Extract runtime runner config from ai.runner_config
- Migrate old ai.<runner-name> blocks into ai.runner_config
"""
@staticmethod
@@ -74,9 +75,9 @@ class ConfigMigration:
) -> dict[str, typing.Any]:
"""Resolve runner binding configuration from pipeline configuration.
Priority:
1. New format: ai.runner_config[runner_id]
2. Old format: ai.<runner-name> (mapped from runner_id if applicable)
Runtime code should only read the migrated format. Legacy
ai.<runner-name> blocks are handled by migration helpers, not by the
hot path.
Args:
pipeline_config: Pipeline configuration dict
@@ -92,7 +93,16 @@ class ConfigMigration:
if runner_id in runner_configs:
return runner_configs[runner_id]
# Check old format: ai.<old_runner_name>
return {}
@staticmethod
def resolve_legacy_runner_config(
pipeline_config: dict[str, typing.Any],
runner_id: str,
) -> dict[str, typing.Any]:
"""Resolve old ai.<runner-name> config for migration only."""
ai_config = pipeline_config.get('ai', {})
# Try to find old runner name from runner_id
old_runner_name = None
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
@@ -105,12 +115,6 @@ class ConfigMigration:
if old_config:
return old_config
# If runner_id is plugin:* format, try extracting runner_name as config key
if is_plugin_runner_id(runner_id):
# Some configs might use just the runner_name component as key
# But this is legacy behavior - prefer ai.runner_config[id]
pass
return {}
@staticmethod
@@ -181,6 +185,8 @@ class ConfigMigration:
# Migrate runner config
resolved_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
if not resolved_config:
resolved_config = ConfigMigration.resolve_legacy_runner_config(pipeline_config, runner_id)
if resolved_config:
runner_configs[runner_id] = resolved_config
# Remove old runner config block
@@ -193,4 +199,4 @@ class ConfigMigration:
ai_config['runner_config'] = runner_configs
new_config['ai'] = ai_config
return new_config
return new_config

View File

@@ -0,0 +1,208 @@
"""Helpers for interpreting AgentRunner DynamicForm configuration."""
from __future__ import annotations
import typing
from .descriptor import AgentRunnerDescriptor
LLM_MODEL_SELECTOR_TYPES = {'model-fallback-selector', 'llm-model-selector'}
KB_SELECTOR_TYPES = {'knowledge-base-multi-selector'}
PROMPT_EDITOR_TYPES = {'prompt-editor'}
NONE_SENTINELS = {'', '__none__', '__none'}
def iter_schema_items(
descriptor: AgentRunnerDescriptor | None,
field_types: set[str],
) -> typing.Iterator[dict[str, typing.Any]]:
"""Yield descriptor config schema items whose type is in field_types."""
if descriptor is None:
return
for item in descriptor.config_schema or []:
if not isinstance(item, dict):
continue
if item.get('type') in field_types:
yield item
def has_permission(
descriptor: AgentRunnerDescriptor | None,
name: str,
actions: set[str],
) -> bool:
"""Return whether a runner descriptor requests one of the given actions."""
if descriptor is None:
return False
configured_actions = descriptor.permissions.get(name, [])
return any(action in configured_actions for action in actions)
def uses_host_models(descriptor: AgentRunnerDescriptor | None) -> bool:
"""Return whether LangBot should resolve model resources for this runner."""
return (
has_permission(descriptor, 'models', {'invoke', 'stream', 'list'})
and any(True for _ in iter_schema_items(descriptor, LLM_MODEL_SELECTOR_TYPES))
)
def uses_host_tools(descriptor: AgentRunnerDescriptor | None) -> bool:
"""Return whether LangBot should expose tool resources to this runner."""
return (
descriptor is not None
and descriptor.supports_tool_calling()
and has_permission(descriptor, 'tools', {'list', 'detail', 'call'})
)
def uses_host_knowledge_bases(descriptor: AgentRunnerDescriptor | None) -> bool:
"""Return whether LangBot should expose knowledge-base resources to this runner."""
return (
descriptor is not None
and descriptor.supports_knowledge_retrieval()
and has_permission(descriptor, 'knowledge_bases', {'list', 'retrieve'})
)
def extract_prompt_config(
descriptor: AgentRunnerDescriptor | None,
runner_config: dict[str, typing.Any],
default_prompt: list[dict[str, typing.Any]],
) -> list[dict[str, typing.Any]]:
"""Extract the prompt-editor value selected by the runner schema."""
for item in iter_schema_items(descriptor, PROMPT_EDITOR_TYPES):
field_name = item.get('name')
if field_name and field_name in runner_config:
configured_prompt = runner_config[field_name]
if isinstance(configured_prompt, list):
return configured_prompt
default_value = item.get('default')
if isinstance(default_value, list):
return default_value
return default_prompt
def extract_model_selection(
descriptor: AgentRunnerDescriptor | None,
runner_config: dict[str, typing.Any],
) -> tuple[str, list[str]]:
"""Extract primary/fallback LLM selections from schema-defined fields."""
primary_uuid = ''
fallback_uuids: list[str] = []
for item in iter_schema_items(descriptor, LLM_MODEL_SELECTOR_TYPES):
field_name = item.get('name')
if not field_name:
continue
value = runner_config.get(field_name, item.get('default'))
if item.get('type') == 'model-fallback-selector':
if isinstance(value, str):
primary_uuid = value
elif isinstance(value, dict):
primary_uuid = value.get('primary') or ''
fallbacks = value.get('fallbacks', [])
if isinstance(fallbacks, list):
fallback_uuids = [fallback for fallback in fallbacks if isinstance(fallback, str)]
break
if item.get('type') == 'llm-model-selector' and isinstance(value, str):
primary_uuid = value
break
return primary_uuid, fallback_uuids
def extract_knowledge_base_uuids(
descriptor: AgentRunnerDescriptor | None,
runner_config: dict[str, typing.Any],
) -> list[str]:
"""Extract configured knowledge-base UUIDs from schema-defined fields."""
if not uses_host_knowledge_bases(descriptor):
return []
kb_uuids: list[str] = []
for item in iter_schema_items(descriptor, KB_SELECTOR_TYPES):
field_name = item.get('name')
if not field_name:
continue
value = runner_config.get(field_name, item.get('default', []))
if isinstance(value, list):
kb_uuids.extend(
kb_uuid for kb_uuid in value if isinstance(kb_uuid, str) and kb_uuid not in NONE_SENTINELS
)
return list(dict.fromkeys(kb_uuids))
def iter_config_model_refs(
descriptor: AgentRunnerDescriptor,
runner_config: dict[str, typing.Any],
) -> typing.Iterator[tuple[str, str]]:
"""Yield model references declared by schema-defined model selector fields."""
for item in descriptor.config_schema or []:
if not isinstance(item, dict):
continue
field_name = item.get('name')
field_type = item.get('type')
if not field_name or field_name not in runner_config:
continue
value = runner_config.get(field_name)
if field_type == 'model-fallback-selector':
if isinstance(value, str) and value not in NONE_SENTINELS:
yield 'llm', value
elif isinstance(value, dict):
primary = value.get('primary')
if isinstance(primary, str) and primary not in NONE_SENTINELS:
yield 'llm', primary
fallbacks = value.get('fallbacks', [])
if isinstance(fallbacks, list):
for fallback_uuid in fallbacks:
if isinstance(fallback_uuid, str) and fallback_uuid not in NONE_SENTINELS:
yield 'llm', fallback_uuid
elif field_type == 'llm-model-selector':
if isinstance(value, str) and value not in NONE_SENTINELS:
yield 'llm', value
elif field_type == 'rerank-model-selector':
if isinstance(value, str) and value not in NONE_SENTINELS:
yield 'rerank', value
def set_empty_llm_model_selection(
descriptor: AgentRunnerDescriptor,
runner_config: dict[str, typing.Any],
model_uuid: str,
) -> bool:
"""Set the first empty schema-defined LLM selector to model_uuid."""
for item in iter_schema_items(descriptor, LLM_MODEL_SELECTOR_TYPES):
field_name = item.get('name')
field_type = item.get('type')
if not field_name:
continue
value = runner_config.get(field_name, item.get('default'))
if field_type == 'model-fallback-selector':
if isinstance(value, dict):
primary = value.get('primary') or ''
if primary not in NONE_SENTINELS:
return False
fallbacks = value.get('fallbacks', [])
runner_config[field_name] = {
'primary': model_uuid,
'fallbacks': fallbacks if isinstance(fallbacks, list) else [],
}
return True
if isinstance(value, str) and value not in NONE_SENTINELS:
return False
runner_config[field_name] = {'primary': model_uuid, 'fallbacks': []}
return True
if field_type == 'llm-model-selector':
if isinstance(value, str) and value not in NONE_SENTINELS:
return False
runner_config[field_name] = model_uuid
return True
return False

View File

@@ -15,6 +15,9 @@ from .state_store import get_state_store
from . import events as runner_events
DEFAULT_RUNNER_TIMEOUT_SECONDS = 300
# Internal models for the agent runner context protocol.
@@ -106,7 +109,7 @@ class AgentRuntimeContext(typing.TypedDict):
sdk_protocol_version: str
query_id: int | None
trace_id: str | None
deadline_at: int | None
deadline_at: float | None
metadata: dict[str, typing.Any]
@@ -480,9 +483,13 @@ class AgentRunContextBuilder:
},
}
def _build_deadline(self, runner_config: dict[str, typing.Any]) -> int | None:
"""Build deadline timestamp from runner timeout config if present."""
timeout = runner_config.get('timeout')
def _build_deadline(self, runner_config: dict[str, typing.Any]) -> float | None:
"""Build deadline timestamp from runner timeout config.
A missing timeout uses the host default. Explicit null, zero, or negative
values disable the total run deadline for advanced deployments.
"""
timeout = runner_config.get('timeout', DEFAULT_RUNNER_TIMEOUT_SECONDS)
if timeout is None:
return None
@@ -494,7 +501,7 @@ class AgentRunContextBuilder:
if timeout_seconds <= 0:
return None
return int(time.time() + timeout_seconds)
return time.time() + timeout_seconds
async def _is_stream_output_supported(self, query: pipeline_query.Query) -> bool:
"""Check whether the current adapter can consume streaming chunks."""

View File

@@ -3,9 +3,12 @@ from __future__ import annotations
import typing
import traceback
import asyncio
import time
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from langbot_plugin.entities.io.errors import ActionCallTimeoutError
from ...core import app
from .descriptor import AgentRunnerDescriptor
@@ -155,14 +158,32 @@ class AgentRunOrchestrator:
)
try:
async for result_dict in self.ap.plugin_connector.run_agent(
gen = self.ap.plugin_connector.run_agent(
plugin_author=descriptor.plugin_author,
plugin_name=descriptor.plugin_name,
runner_name=descriptor.runner_name,
context=context,
):
)
while True:
try:
result_dict = await self._next_with_deadline(gen, descriptor, context)
except StopAsyncIteration:
break
yield result_dict
except asyncio.TimeoutError as e:
raise RunnerExecutionError(
descriptor.id,
'Runner timed out (code: runner.timeout)',
retryable=True,
) from e
except ActionCallTimeoutError as e:
raise RunnerExecutionError(
descriptor.id,
f'{e} (code: runner.timeout)',
retryable=True,
) from e
except RunnerExecutionError:
raise
except Exception as e:
@@ -176,6 +197,57 @@ class AgentRunOrchestrator:
retryable=False,
)
async def _next_with_deadline(
self,
gen: typing.AsyncGenerator[dict[str, typing.Any], None],
descriptor: AgentRunnerDescriptor,
context: AgentRunContextPayload,
) -> dict[str, typing.Any]:
"""Read the next runner result while enforcing the run deadline."""
remaining = self._remaining_deadline_seconds(context)
if remaining is not None and remaining <= 0:
await self._close_generator(gen, descriptor)
raise asyncio.TimeoutError
try:
if remaining is None:
return await anext(gen)
return await asyncio.wait_for(anext(gen), timeout=remaining)
except StopAsyncIteration:
if self._is_deadline_exhausted(context):
raise asyncio.TimeoutError
raise
except asyncio.TimeoutError:
await self._close_generator(gen, descriptor)
raise
def _remaining_deadline_seconds(
self,
context: AgentRunContextPayload,
) -> float | None:
runtime = context.get('runtime') or {}
deadline_at = runtime.get('deadline_at')
if deadline_at is None:
return None
try:
return float(deadline_at) - time.time()
except (TypeError, ValueError):
return None
def _is_deadline_exhausted(self, context: AgentRunContextPayload) -> bool:
remaining = self._remaining_deadline_seconds(context)
return remaining is not None and remaining <= 0
async def _close_generator(
self,
gen: typing.AsyncGenerator[dict[str, typing.Any], None],
descriptor: AgentRunnerDescriptor,
) -> None:
try:
await gen.aclose()
except Exception as e:
self.ap.logger.warning(f'Failed to close timed-out runner {descriptor.id}: {e}')
def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None:
"""Resolve runner ID for telemetry/logging without full execution.

View File

@@ -13,6 +13,7 @@ from .context_builder import (
KnowledgeBaseResource,
StorageResource,
)
from . import config_schema
class AgentResourceBuilder:
@@ -73,7 +74,7 @@ class AgentResourceBuilder:
models, tools, knowledge_bases = await asyncio.gather(
self._build_models(manifest_perms, runner_config, descriptor, query),
self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query),
self._build_knowledge_bases(manifest_perms, runner_config, query),
self._build_knowledge_bases(manifest_perms, runner_config, descriptor, query),
)
storage = self._build_storage(manifest_perms)
@@ -132,34 +133,11 @@ class AgentResourceBuilder:
runner_config: dict[str, typing.Any],
) -> None:
"""Authorize model-like values selected through DynamicForm fields."""
for item in descriptor.config_schema or []:
if not isinstance(item, dict):
continue
field_name = item.get('name')
field_type = item.get('type')
if not field_name or field_name not in runner_config:
continue
value = runner_config.get(field_name)
if field_type == 'model-fallback-selector':
if isinstance(value, str):
await self._append_llm_model_resource(models, seen_model_ids, value)
elif isinstance(value, dict):
primary = value.get('primary')
if isinstance(primary, str):
await self._append_llm_model_resource(models, seen_model_ids, primary)
fallbacks = value.get('fallbacks', [])
if isinstance(fallbacks, list):
for fallback_uuid in fallbacks:
if isinstance(fallback_uuid, str):
await self._append_llm_model_resource(models, seen_model_ids, fallback_uuid)
elif field_type == 'llm-model-selector':
if isinstance(value, str):
await self._append_llm_model_resource(models, seen_model_ids, value)
elif field_type == 'rerank-model-selector':
if isinstance(value, str):
await self._append_rerank_model_resource(models, seen_model_ids, value)
for model_type, model_uuid in config_schema.iter_config_model_refs(descriptor, runner_config):
if model_type == 'llm':
await self._append_llm_model_resource(models, seen_model_ids, model_uuid)
elif model_type == 'rerank':
await self._append_rerank_model_resource(models, seen_model_ids, model_uuid)
async def _append_llm_model_resource(
self,
@@ -236,6 +214,7 @@ class AgentResourceBuilder:
self,
manifest_perms: dict[str, list[str]],
runner_config: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
query: typing.Any,
) -> list[KnowledgeBaseResource]:
"""Build knowledge bases list with plugin SDK field names."""
@@ -246,13 +225,8 @@ class AgentResourceBuilder:
if 'list' not in kb_perms and 'retrieve' not in kb_perms:
return kb_resources
# Get knowledge base UUIDs from config
kb_uuids = runner_config.get('knowledge-bases', [])
if not kb_uuids:
# Old single KB config
old_kb_uuid = runner_config.get('knowledge-base', '')
if old_kb_uuid and old_kb_uuid != '__none__':
kb_uuids = [old_kb_uuid]
# Get knowledge base UUIDs from schema-defined config fields.
kb_uuids = config_schema.extract_knowledge_base_uuids(descriptor, runner_config)
# Also check query variables (may be modified by plugin PromptPreProcessing)
kb_uuids_from_vars = query.variables.get('_knowledge_base_uuids', [])