feat: make agent runner config schema driven

This commit is contained in:
huanghuoguoguo
2026-05-19 12:20:28 +08:00
committed by huanghuoguoguo
parent 651e28113e
commit d8d98b0838
20 changed files with 900 additions and 235 deletions
+106 -81
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import datetime
import typing
from .. import stage, entities
from langbot_plugin.api.entities.builtin.provider import message as provider_message
@@ -9,10 +10,15 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.platform.events as platform_events
from ...agent.runner.descriptor import AgentRunnerDescriptor
from ...agent.runner.config_migration import ConfigMigration
from ...agent.runner import config_schema
# Official local-agent runner ID
DEFAULT_PROMPT_CONFIG = [
{'role': 'system', 'content': 'You are a helpful assistant.'},
]
LOCAL_AGENT_RUNNER_ID = 'plugin:langbot/local-agent/default'
@@ -31,6 +37,76 @@ class PreProcessor(stage.PipelineStage):
- use_funcs
"""
async def _get_runner_descriptor(
self,
runner_id: str | None,
bound_plugins: list[str] | None,
) -> AgentRunnerDescriptor | None:
if not runner_id:
return None
registry = getattr(self.ap, 'agent_runner_registry', None)
if registry is None:
return None
try:
return await registry.get(runner_id, bound_plugins)
except Exception as e:
self.ap.logger.debug(f'Unable to load AgentRunner descriptor for {runner_id}: {e}')
return None
async def _resolve_llm_model(
self,
primary_uuid: str,
) -> typing.Any | None:
if primary_uuid in config_schema.NONE_SENTINELS:
return None
try:
return await self.ap.model_mgr.get_model_by_uuid(primary_uuid)
except ValueError:
self.ap.logger.warning(f'LLM model {primary_uuid} not found or not configured')
return None
async def _resolve_fallback_models(self, fallback_uuids: list[str]) -> list[str]:
valid_fallbacks = []
for fallback_uuid in fallback_uuids:
if fallback_uuid in config_schema.NONE_SENTINELS:
continue
try:
await self.ap.model_mgr.get_model_by_uuid(fallback_uuid)
valid_fallbacks.append(fallback_uuid)
except ValueError:
self.ap.logger.warning(f'Fallback model {fallback_uuid} not found, skipping')
return valid_fallbacks
def _runner_accepts_multimodal_input(self, descriptor: AgentRunnerDescriptor | None) -> bool:
if descriptor is None:
return True
return descriptor.capabilities.get('multimodal_input', False)
def _model_supports_vision(self, llm_model: typing.Any | None) -> bool:
if not llm_model:
return False
abilities = getattr(getattr(llm_model, 'model_entity', None), 'abilities', [])
return 'vision' in (abilities or [])
def _should_keep_image_inputs(
self,
descriptor: AgentRunnerDescriptor | None,
uses_host_models: bool,
llm_model: typing.Any | None,
) -> bool:
if not self._runner_accepts_multimodal_input(descriptor):
return False
if uses_host_models:
return self._model_supports_vision(llm_model)
return True
def _strip_images_from_history(self, query: pipeline_query.Query) -> None:
for msg in query.messages:
if isinstance(msg.content, list):
msg.content = [elem for elem in msg.content if elem.type != 'image_url']
async def process(
self,
query: pipeline_query.Query,
@@ -40,57 +116,28 @@ class PreProcessor(stage.PipelineStage):
# Resolve runner ID using ConfigMigration (supports both new and old formats)
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
# Get runner config (from new ai.runner_config or old ai.<runner-name>)
# Get runner config from ai.runner_config[runner_id].
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {}
query.variables = query.variables or {}
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
descriptor = await self._get_runner_descriptor(runner_id, bound_plugins)
session = await self.ap.sess_mgr.get_session(query)
# Determine if this is a local-agent runner (built-in LLM capabilities)
# Check by runner_id OR by legacy runner field for backward compatibility
is_local_agent = runner_id == LOCAL_AGENT_RUNNER_ID or (
runner_id is None and
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner') == 'local-agent'
)
uses_host_models = config_schema.uses_host_models(descriptor)
uses_host_tools = config_schema.uses_host_tools(descriptor)
is_local_agent = runner_id == LOCAL_AGENT_RUNNER_ID
include_skill_authoring = is_local_agent and getattr(self.ap, 'skill_service', None) is not None
# When not local-agent, llm_model is None
llm_model = None
if is_local_agent:
# Read model config — new format is { primary: str, fallbacks: [str] },
# but handle legacy plain string for backward compatibility
model_config = runner_config.get('model', {})
if isinstance(model_config, str):
# Legacy format: plain UUID string
primary_uuid = model_config
fallback_uuids = []
else:
primary_uuid = model_config.get('primary', '')
fallback_uuids = model_config.get('fallbacks', [])
if uses_host_models:
primary_uuid, fallback_uuids = config_schema.extract_model_selection(descriptor, runner_config)
llm_model = await self._resolve_llm_model(primary_uuid)
valid_fallbacks = await self._resolve_fallback_models(fallback_uuids)
if valid_fallbacks:
query.variables['_fallback_model_uuids'] = valid_fallbacks
if primary_uuid:
try:
llm_model = await self.ap.model_mgr.get_model_by_uuid(primary_uuid)
except ValueError:
self.ap.logger.warning(f'LLM model {primary_uuid} not found or not configured')
# Resolve fallback model UUIDs
if fallback_uuids:
valid_fallbacks = []
for fb_uuid in fallback_uuids:
try:
await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
valid_fallbacks.append(fb_uuid)
except ValueError:
self.ap.logger.warning(f'Fallback model {fb_uuid} not found, skipping')
if valid_fallbacks:
query.variables['_fallback_model_uuids'] = valid_fallbacks
# Get prompt config - for local-agent, use runner_config; for others, use default prompt
prompt_config = runner_config.get('prompt', [
{'role': 'system', 'content': 'You are a helpful assistant.'}
]) if is_local_agent else [
{'role': 'system', 'content': 'You are a helpful assistant.'}
]
prompt_config = config_schema.extract_prompt_config(descriptor, runner_config, DEFAULT_PROMPT_CONFIG)
conversation = await self.ap.sess_mgr.get_conversation(
query,
@@ -126,15 +173,12 @@ class PreProcessor(stage.PipelineStage):
query.prompt = conversation.prompt.copy()
query.messages = conversation.messages.copy()
if is_local_agent:
if uses_host_models:
query.use_funcs = []
if llm_model:
query.use_llm_model_uuid = llm_model.model_entity.uuid
if 'func_call' in (llm_model.model_entity.abilities or []):
# Get bound plugins and MCP servers for filtering tools
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
if uses_host_tools and 'func_call' in (llm_model.model_entity.abilities or []):
query.use_funcs = await self.ap.tool_mgr.get_all_tools(
bound_plugins,
bound_mcp_servers,
@@ -147,9 +191,7 @@ class PreProcessor(stage.PipelineStage):
# If primary model doesn't support func_call but fallback models exist,
# load tools anyway since fallback models may support them
if not query.use_funcs and query.variables.get('_fallback_model_uuids'):
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
if uses_host_tools and not query.use_funcs and query.variables.get('_fallback_model_uuids'):
query.use_funcs = await self.ap.tool_mgr.get_all_tools(
bound_plugins,
bound_mcp_servers,
@@ -179,18 +221,9 @@ class PreProcessor(stage.PipelineStage):
}
query.variables.update(variables)
# Check if this model supports vision, if not, remove all images
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
if (
is_local_agent
and llm_model
and 'vision' not in (llm_model.model_entity.abilities or [])
):
for msg in query.messages:
if isinstance(msg.content, list):
for me in msg.content:
if me.type == 'image_url':
msg.content.remove(me)
keep_image_inputs = self._should_keep_image_inputs(descriptor, uses_host_models, llm_model)
if not keep_image_inputs:
self._strip_images_from_history(query)
content_list: list[provider_message.ContentElement] = []
@@ -202,10 +235,7 @@ class PreProcessor(stage.PipelineStage):
content_list.append(provider_message.ContentElement.from_text(me.text))
plain_text += me.text
elif isinstance(me, platform_message.Image):
# Allow images for non-local-agent runners or if local-agent has vision
if not is_local_agent or (
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
):
if keep_image_inputs:
if me.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(me.base64))
elif isinstance(me, platform_message.Voice):
@@ -224,9 +254,7 @@ class PreProcessor(stage.PipelineStage):
if isinstance(msg, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if not is_local_agent or (
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
):
if keep_image_inputs:
if msg.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(msg.base64))
elif isinstance(msg, platform_message.File):
@@ -246,15 +274,12 @@ class PreProcessor(stage.PipelineStage):
query.user_message = provider_message.Message(role='user', content=content_list)
# Extract knowledge base UUIDs into query variables so plugins can modify them
# during PromptPreProcessing before the runner performs retrieval.
# Only for local-agent runner
kb_uuids = runner_config.get('knowledge-bases', []) if is_local_agent else []
if not kb_uuids:
old_kb_uuid = runner_config.get('knowledge-base', '') if is_local_agent else ''
if old_kb_uuid and old_kb_uuid != '__none__':
kb_uuids = [old_kb_uuid]
query.variables['_knowledge_base_uuids'] = list(kb_uuids)
# Extract configured KB UUIDs into query variables so PromptPreProcessing
# plugins can still adjust the authorized retrieval set before run_agent.
query.variables['_knowledge_base_uuids'] = config_schema.extract_knowledge_base_uuids(
descriptor,
runner_config,
)
# =========== 触发事件 PromptPreProcessing