refactor(agent-runner): simplify event-first entry path

This commit is contained in:
huanghuoguoguo
2026-06-03 17:33:47 +08:00
parent f2153f736c
commit d0169e2888
32 changed files with 743 additions and 2653 deletions

View File

@@ -1,44 +1,25 @@
"""Configuration migration for agent runner IDs."""
"""Helpers for the current AgentRunner config shape."""
from __future__ import annotations
import typing
from .id import is_plugin_runner_id
# Mapping from old built-in runner names to official plugin runner IDs
OLD_RUNNER_TO_PLUGIN_RUNNER_ID = {
'local-agent': 'plugin:langbot/local-agent/default',
'dify-service-api': 'plugin:langbot/dify-agent/default',
'n8n-service-api': 'plugin:langbot/n8n-agent/default',
'coze-api': 'plugin:langbot/coze-agent/default',
'dashscope-app-api': 'plugin:langbot/dashscope-agent/default',
'langflow-api': 'plugin:langbot/langflow-agent/default',
'tbox-app-api': 'plugin:langbot/tbox-agent/default',
}
class ConfigMigration:
"""Configuration migration helper for agent runner IDs.
"""Configuration helper for agent runner IDs.
Responsibilities:
- Resolve runner ID from new ai.runner.id or old ai.runner.runner
- Map old built-in runner names to official plugin runner IDs
- Resolve runner ID from ai.runner.id
- Extract current Agent/runner config from ai.runner_config
- Migrate old ai.<runner-name> blocks into ai.runner_config
- Keep the current config container shape stable on save
"""
@staticmethod
def resolve_runner_id(pipeline_config: dict[str, typing.Any]) -> str | None:
"""Resolve runner ID from pipeline configuration.
Priority:
1. New format: ai.runner.id (must be plugin:* format)
2. Old format: ai.runner.runner (mapped to plugin:* if built-in)
"""Resolve runner ID from current configuration.
Args:
pipeline_config: Pipeline configuration dict
pipeline_config: Current configuration container
Returns:
Runner ID string, or None if not configured
@@ -46,26 +27,9 @@ class ConfigMigration:
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner', {})
# Check new format first
runner_id = runner_config.get('id')
if runner_id:
if is_plugin_runner_id(runner_id):
return runner_id
# If it's not a plugin ID, try to map it as old runner name
return OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(runner_id, runner_id)
# Check old format
old_runner_name = runner_config.get('runner')
if old_runner_name:
# If already plugin:* format, return directly
if is_plugin_runner_id(old_runner_name):
return old_runner_name
# Map old built-in runner to official plugin ID
mapped_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name)
if mapped_id:
return mapped_id
# Return old name if no mapping exists (will error in registry)
return old_runner_name
return runner_id
return None
@@ -74,14 +38,10 @@ class ConfigMigration:
pipeline_config: dict[str, typing.Any],
runner_id: str,
) -> dict[str, typing.Any]:
"""Resolve Agent/runner configuration from pipeline configuration.
Runtime code should only read the migrated format. Legacy
ai.<runner-name> blocks are handled by migration helpers, not by the
hot path.
"""Resolve Agent/runner configuration from the current container.
Args:
pipeline_config: Pipeline configuration dict
pipeline_config: Current configuration container
runner_id: Resolved runner ID
Returns:
@@ -89,79 +49,18 @@ class ConfigMigration:
"""
ai_config = pipeline_config.get('ai', {})
# Check new format
runner_configs = ai_config.get('runner_config', {})
if runner_id in runner_configs:
return runner_configs[runner_id]
return {}
@staticmethod
def resolve_legacy_runner_config(
pipeline_config: dict[str, typing.Any],
runner_id: str,
) -> dict[str, typing.Any]:
"""Resolve old ai.<runner-name> config for migration only."""
ai_config = pipeline_config.get('ai', {})
# Try to find old runner name from runner_id
old_runner_name = None
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id:
old_runner_name = old_name
break
if old_runner_name:
old_config = ai_config.get(old_runner_name, {})
if old_config:
return ConfigMigration.normalize_runner_config_for_migration(runner_id, old_config)
return {}
@staticmethod
def normalize_runner_config_for_migration(
runner_id: str,
runner_config: dict[str, typing.Any],
) -> dict[str, typing.Any]:
"""Normalize released legacy runner config before storing Agent/runner config.
Runtime code should not carry aliases. This helper is intentionally used
only by config migration so AgentRunner implementations can consume the
current manifest-defined field names.
"""
normalized = dict(runner_config)
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
legacy_kb = normalized.pop('knowledge-base', None)
if 'knowledge-bases' not in normalized:
if isinstance(legacy_kb, str) and legacy_kb and legacy_kb not in {'__none__', '__none'}:
normalized['knowledge-bases'] = [legacy_kb]
elif legacy_kb is not None:
normalized['knowledge-bases'] = []
return normalized
@staticmethod
def get_old_runner_name(runner_id: str) -> str | None:
"""Get old runner name from mapped runner ID.
Args:
runner_id: Plugin runner ID
Returns:
Old runner name if mapped, None otherwise
"""
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id:
return old_name
return None
@staticmethod
def get_expire_time(pipeline_config: dict[str, typing.Any]) -> int:
"""Get conversation expire time from configuration.
Args:
pipeline_config: Pipeline configuration dict
pipeline_config: Current configuration container
Returns:
Expire time in seconds (0 means no expiry)
@@ -172,54 +71,23 @@ class ConfigMigration:
@staticmethod
def migrate_pipeline_config(pipeline_config: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""Migrate pipeline config to new format.
This converts old ai.runner.runner and ai.<runner-name> to
new ai.runner.id and ai.runner_config format.
"""Normalize the current config container before saving.
Args:
pipeline_config: Original pipeline configuration
pipeline_config: Original configuration
Returns:
Migrated pipeline configuration
Configuration with explicit ai.runner and ai.runner_config containers
"""
# Create copy
new_config = dict(pipeline_config)
ai_config = new_config.get('ai', {})
if not ai_config:
if 'ai' not in new_config:
return new_config
runner_config = ai_config.get('runner', {})
runner_configs = ai_config.get('runner_config', {})
ai_config = dict(new_config.get('ai', {}))
# Resolve runner ID
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
if runner_id:
# Set new format
runner_config['id'] = runner_id
# Remove old runner field if present
if 'runner' in runner_config and is_plugin_runner_id(runner_config['runner']):
# Already migrated plugin:* format, keep as id
pass
elif 'runner' in runner_config:
# Old built-in runner name, remove after migration
old_name = runner_config['runner']
if old_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID:
del runner_config['runner']
runner_config = dict(ai_config.get('runner', {}))
runner_configs = dict(ai_config.get('runner_config', {}))
# Migrate runner config
resolved_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
if not resolved_config:
resolved_config = ConfigMigration.resolve_legacy_runner_config(pipeline_config, runner_id)
if resolved_config:
resolved_config = ConfigMigration.normalize_runner_config_for_migration(runner_id, resolved_config)
runner_configs[runner_id] = resolved_config
# Remove old runner config block
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id and old_name in ai_config:
del ai_config[old_name]
# Update configs
ai_config['runner'] = runner_config
ai_config['runner_config'] = runner_configs
new_config['ai'] = ai_config

View File

@@ -116,7 +116,6 @@ class AgentRuntimeContext(typing.TypedDict):
langbot_version: str | None
sdk_protocol_version: str
query_id: int | None
trace_id: str | None
deadline_at: float | None
metadata: dict[str, typing.Any]
@@ -128,8 +127,8 @@ class AgentRunContextPayload(typing.TypedDict):
Protocol v1 structure - matches SDK AgentRunContext.
Note: The 'config' field contains the current Agent/runner config
from ai.runner_config[runner_id] while Pipeline remains the temporary
configuration container. It is not plugin instance config.
from ai.runner_config[runner_id] while the current Query entry remains
a temporary configuration container. It is not plugin instance config.
"""
run_id: str
@@ -145,7 +144,6 @@ class AgentRunContextPayload(typing.TypedDict):
state: AgentRunState
runtime: AgentRuntimeContext
config: dict[str, typing.Any] # Agent/runner config from ai.runner_config[runner_id]
bootstrap: dict[str, typing.Any] | None # Optional bootstrap context
adapter: dict[str, typing.Any] | None # Entry adapter context
metadata: dict[str, typing.Any] # Additional metadata
@@ -162,7 +160,7 @@ class AgentRunContextBuilder:
- Build runtime context with host info, trace_id, deadline
- Set config from current Agent/runner configuration.
Pipeline Query adaptation belongs to PipelineAdapter, not this builder.
Query adaptation belongs to QueryEntryAdapter, not this builder.
"""
ap: app.Application
@@ -266,7 +264,6 @@ class AgentRunContextBuilder:
runtime: AgentRuntimeContext = {
'langbot_version': self.ap.ver_mgr.get_current_version(),
'sdk_protocol_version': descriptor.protocol_version,
'query_id': None, # No query_id in event-first mode
'trace_id': run_id,
'deadline_at': self._build_deadline_from_binding(binding),
'metadata': {
@@ -293,7 +290,6 @@ class AgentRunContextBuilder:
# Build adapter context (empty for event-first)
adapter_context = {
'query_id': None,
'extra': {},
}
@@ -312,7 +308,6 @@ class AgentRunContextBuilder:
'state': state,
'runtime': runtime,
'config': binding.runner_config,
'bootstrap': None,
'adapter': adapter_context,
'metadata': {}, # Additional metadata
}

View File

@@ -20,7 +20,7 @@ from .persistent_state_store import get_persistent_state_store, PersistentStateS
from .session_registry import get_session_registry, AgentRunSessionRegistry
from .config_migration import ConfigMigration
from .host_models import AgentEventEnvelope, AgentBinding
from .pipeline_adapter import PipelineAdapter
from .query_entry_adapter import QueryEntryAdapter
from .state_scope import build_state_context
from .errors import (
RunnerNotFoundError,
@@ -37,7 +37,7 @@ class AgentRunOrchestrator:
"""Orchestrator for agent runner execution.
Responsibilities:
- Resolve runner ID from pipeline config (new or old format)
- Resolve runner ID from current Agent/runner config
- Get runner descriptor from registry
- Provision AgentRunContext envelope from Query
- Build AgentResources with permission filtering
@@ -48,7 +48,7 @@ class AgentRunOrchestrator:
Entry points:
- run(event, binding): Main entry for event-first Protocol v1
- run_from_query(query): Pipeline adapter wrapper
- run_from_query(query): current Query entry adapter wrapper
"""
ap: app.Application
@@ -125,28 +125,24 @@ class AgentRunOrchestrator:
resources=resources,
)
session_query_id = None
# Merge adapter context if provided
if adapter_context:
session_query_id = adapter_context.get('query_id')
# Merge params into adapter.extra
if 'params' in adapter_context:
context['adapter']['extra']['params'] = adapter_context['params']
# Merge prompt into adapter.extra for transitional adapter consumers.
if 'prompt' in adapter_context:
context['adapter']['extra']['prompt'] = adapter_context['prompt']
# Set query_id if provided
if adapter_context.get('query_id'):
context['runtime']['query_id'] = adapter_context['query_id']
# Build state context for State API handlers
state_context = build_state_context(event, binding, descriptor)
# Register session for proxy action permission validation
run_id = context['run_id']
query_id = context['runtime'].get('query_id') # May be None for pure event-first mode
await self._session_registry.register(
run_id=run_id,
runner_id=descriptor.id,
query_id=query_id,
query_id=session_query_id,
plugin_identity=descriptor.get_plugin_id(),
resources=resources,
permissions=descriptor.permissions or {},
@@ -238,7 +234,7 @@ class AgentRunOrchestrator:
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from pipeline query.
This is the Pipeline adapter wrapper for the Query-based flow.
This is the Query entry adapter wrapper for the query-based flow.
It delegates to the event-first run(event, binding) method.
For the new event-first Protocol v1, use run(event, binding) instead.
@@ -260,16 +256,16 @@ class AgentRunOrchestrator:
raise RunnerNotFoundError('no runner configured')
# Convert Query to event-first envelope
event = PipelineAdapter.query_to_event(query)
event = QueryEntryAdapter.query_to_event(query)
# Convert Pipeline config to binding
binding = PipelineAdapter.pipeline_config_to_binding(query, runner_id)
# Convert current config to binding
binding = QueryEntryAdapter.config_to_binding(query, runner_id)
# Extract bound plugins for authorization
bound_plugins = query.variables.get('_pipeline_bound_plugins')
# Build adapter context for Pipeline-specific fields
adapter_context = PipelineAdapter.build_adapter_context(query, binding)
# Build adapter context for Query-specific fields
adapter_context = QueryEntryAdapter.build_adapter_context(query, binding)
# Delegate to event-first run()
async for result in self.run(

View File

@@ -1,7 +1,7 @@
"""Pipeline adapter for converting Query to event-first envelope.
"""Query entry adapter for converting Query to event-first envelope.
This adapter bridges the Query/Pipeline entry point with the event-first
Protocol v1 architecture.
This adapter bridges the current Query entry point with the event-first
Protocol v1 architecture without exposing Query internals to runners.
"""
from __future__ import annotations
@@ -31,12 +31,12 @@ from .host_models import (
from . import events as runner_events
class PipelineAdapter:
"""Adapter for converting Pipeline Query to event-first envelope.
class QueryEntryAdapter:
"""Adapter for converting Query to event-first envelope.
This adapter is responsible for:
- Converting Query to AgentEventEnvelope
- Converting Pipeline config to temporary AgentBinding
- Converting current Agent/runner config to temporary AgentBinding
- Putting Query-only fields into adapter context
"""
@@ -49,10 +49,10 @@ class PipelineAdapter:
cls,
query: pipeline_query.Query,
) -> AgentEventEnvelope:
"""Convert Pipeline Query to AgentEventEnvelope.
"""Convert Query to AgentEventEnvelope.
Args:
query: Pipeline query
query: Current entry query
Returns:
AgentEventEnvelope for event-first processing
@@ -82,7 +82,7 @@ class PipelineAdapter:
event_id=event.event_id or str(query.query_id),
event_type=event.event_type or runner_events.MESSAGE_RECEIVED,
event_time=event.event_time,
source="pipeline_adapter",
source="host_adapter",
source_event_type=event.source_event_type,
bot_id=query.bot_uuid,
workspace_id=None, # Not available in Query
@@ -97,15 +97,15 @@ class PipelineAdapter:
)
@classmethod
def pipeline_config_to_binding(
def config_to_binding(
cls,
query: pipeline_query.Query,
runner_id: str,
) -> AgentBinding:
"""Convert Pipeline config to temporary AgentBinding.
"""Convert current config container to temporary AgentBinding.
Args:
query: Pipeline query
query: Current entry query
runner_id: Resolved runner ID
Returns:
@@ -121,7 +121,7 @@ class PipelineAdapter:
scope_id=agent_id,
)
# Build resource policy from pipeline config
# Build resource policy from current config
resource_policy = ResourcePolicy(
allowed_model_uuids=cls._extract_allowed_models(query),
allowed_tool_names=cls._extract_allowed_tools(query),
@@ -159,10 +159,9 @@ class PipelineAdapter:
query: pipeline_query.Query,
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build Query-derived fields for the Pipeline adapter entry."""
"""Build Query-derived fields for the current entry adapter."""
return {
'params': cls.build_params(query),
'prompt': cls.build_prompt(query),
'query_id': getattr(query, 'query_id', None),
}
@@ -187,15 +186,6 @@ class PipelineAdapter:
return params
@classmethod
def build_prompt(cls, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
"""Build effective prompt messages from Pipeline preprocessing output."""
prompt = getattr(query, 'prompt', None)
messages = getattr(prompt, 'messages', None)
if not messages:
return []
return [cls._dump_message(msg) for msg in messages]
@classmethod
def is_json_serializable(cls, value: typing.Any) -> bool:
"""Return whether a value can safely cross the adapter boundary as JSON."""
@@ -210,18 +200,6 @@ class PipelineAdapter:
)
return False
@staticmethod
def _dump_message(message: typing.Any) -> dict[str, typing.Any]:
"""Serialize a provider message-like object."""
if hasattr(message, 'model_dump'):
return message.model_dump(mode='json')
if isinstance(message, dict):
return message
return {
'role': getattr(message, 'role', None),
'content': getattr(message, 'content', None),
}
# Private helper methods
@classmethod
@@ -262,7 +240,7 @@ class PipelineAdapter:
event_id=cls._build_scoped_event_id(query, source_event_id, event_time),
event_type=runner_events.MESSAGE_RECEIVED,
event_time=event_time,
source="pipeline_adapter",
source="host_adapter",
source_event_type=source_event_type,
data=event_data,
)
@@ -278,7 +256,7 @@ class PipelineAdapter:
launcher_type = getattr(query, 'launcher_type', None)
launcher_type_value = getattr(launcher_type, 'value', launcher_type) if launcher_type is not None else None
scope_parts = [
'pipeline_adapter',
'host_adapter',
getattr(query, 'pipeline_uuid', None),
getattr(query, 'bot_uuid', None),
launcher_type_value,
@@ -289,7 +267,7 @@ class PipelineAdapter:
]
scoped = '|'.join('' if part is None else str(part) for part in scope_parts)
digest = hashlib.sha256(scoped.encode('utf-8')).hexdigest()[:32]
return f'pipeline:{digest}'
return f'host:{digest}'
@classmethod
def _build_conversation_context(

View File

@@ -23,7 +23,7 @@ class AgentRunSession(typing.TypedDict):
Fields:
run_id: Unique run identifier (UUID from AgentRunContext)
runner_id: Runner descriptor ID (plugin:author/name/runner)
query_id: Pipeline query ID
query_id: Host entry query ID, only present for query-based adapters
plugin_identity: Plugin identifier (author/name) of the runner
conversation_id: Conversation ID for history/event access
resources: Authorized resources for this run (from AgentResources)
@@ -82,7 +82,7 @@ class AgentRunSessionRegistry:
Args:
run_id: Unique run identifier
runner_id: Runner descriptor ID
query_id: Pipeline query ID
query_id: Host entry query ID, only present for query-based adapters
plugin_identity: Plugin identifier (author/name)
resources: Authorized resources for this run
conversation_id: Conversation ID for history/event access
@@ -247,4 +247,4 @@ def get_session_registry() -> AgentRunSessionRegistry:
with _global_registry_lock:
if _global_registry is None:
_global_registry = AgentRunSessionRegistry()
return _global_registry
return _global_registry

View File

@@ -1,4 +1,4 @@
"""Migrate pipeline config to new runner format
"""Normalize AgentRunner config containers
Revision ID: 0004_migrate_runner_config
Revises: 0003_add_rerank_models
@@ -14,101 +14,23 @@ down_revision = '0003_add_rerank_models'
branch_labels = None
depends_on = None
# Mapping from old built-in runner names to official plugin runner IDs
OLD_RUNNER_TO_PLUGIN_RUNNER_ID = {
'local-agent': 'plugin:langbot/local-agent/default',
'dify-service-api': 'plugin:langbot/dify-agent/default',
'n8n-service-api': 'plugin:langbot/n8n-agent/default',
'coze-api': 'plugin:langbot/coze-agent/default',
'dashscope-app-api': 'plugin:langbot/dashscope-agent/default',
'langflow-api': 'plugin:langbot/langflow-agent/default',
'tbox-app-api': 'plugin:langbot/tbox-agent/default',
}
def is_plugin_runner_id(runner_id: str) -> bool:
"""Check if runner ID is in plugin:* format."""
return runner_id.startswith('plugin:')
def normalize_runner_config_for_migration(runner_id: str, runner_config: dict) -> dict:
"""Normalize released legacy runner fields before storing binding config."""
normalized = dict(runner_config)
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
legacy_kb = normalized.pop('knowledge-base', None)
if 'knowledge-bases' not in normalized:
if isinstance(legacy_kb, str) and legacy_kb and legacy_kb not in {'__none__', '__none'}:
normalized['knowledge-bases'] = [legacy_kb]
elif legacy_kb is not None:
normalized['knowledge-bases'] = []
return normalized
def migrate_pipeline_config(config: dict) -> dict:
"""Migrate pipeline config to new format."""
"""Keep current AgentRunner config containers explicit."""
new_config = dict(config)
ai_config = new_config.get('ai', {})
if not ai_config:
if 'ai' not in new_config:
return new_config
runner_config = ai_config.get('runner', {})
runner_configs = ai_config.get('runner_config', {})
ai_config = dict(new_config.get('ai', {}))
# Check for new format first
runner_id = runner_config.get('id')
if runner_id and is_plugin_runner_id(runner_id):
if runner_id in runner_configs:
runner_configs[runner_id] = normalize_runner_config_for_migration(
runner_id,
runner_configs[runner_id],
)
ai_config['runner_config'] = runner_configs
new_config['ai'] = ai_config
return new_config
# Check for old format
old_runner_name = runner_config.get('runner')
if old_runner_name:
# Map to new runner ID
if is_plugin_runner_id(old_runner_name):
runner_id = old_runner_name
else:
runner_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name, old_runner_name)
# Set new format
runner_config['id'] = runner_id
# Remove old runner field if it's a mapped built-in runner
if old_runner_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID:
del runner_config['runner']
# Migrate runner-specific config and remove old config blocks
if old_runner_name in ai_config:
old_runner_config = ai_config[old_runner_name]
if old_runner_config:
runner_configs[runner_id] = normalize_runner_config_for_migration(runner_id, old_runner_config)
# Remove old config block after migration
del ai_config[old_runner_name]
# Also check if runner_id has config under other old name formats
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id and old_name in ai_config:
runner_configs[runner_id] = normalize_runner_config_for_migration(runner_id, ai_config[old_name])
# Remove old config block after migration
del ai_config[old_name]
# Update configs
ai_config['runner'] = runner_config
ai_config['runner_config'] = runner_configs
ai_config['runner'] = dict(ai_config.get('runner', {}))
ai_config['runner_config'] = dict(ai_config.get('runner_config', {}))
new_config['ai'] = ai_config
return new_config
def upgrade() -> None:
"""Migrate existing pipeline configs to new runner format."""
"""Normalize existing pipeline config containers."""
conn = op.get_bind()
inspector = sa.inspect(conn)

View File

@@ -11,12 +11,19 @@ from .. import handler
from ... import entities
import langbot_plugin.api.entities.events as events
from ....agent.runner.config_migration import ConfigMigration
from ....agent.runner import config_schema
from ....utils import constants, runner as runner_utils
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
DEFAULT_PROMPT_CONFIG = [
{'role': 'system', 'content': 'You are a helpful assistant.'},
]
class ChatMessageHandler(handler.MessageHandler):
"""Chat message handler using AgentRunOrchestrator.
@@ -140,8 +147,9 @@ class ChatMessageHandler(handler.MessageHandler):
)
# Update conversation history
query.session.using_conversation.messages.append(query.user_message)
query.session.using_conversation.messages.extend(query.resp_messages)
conversation = await self._ensure_conversation_for_history(query)
conversation.messages.append(query.user_message)
conversation.messages.extend(query.resp_messages)
except Exception as e:
# Import orchestrator errors for specific handling
@@ -234,3 +242,69 @@ class ChatMessageHandler(handler.MessageHandler):
await self.ap.survey.trigger_event('first_bot_response_success')
except Exception as ex:
self.ap.logger.warning(f'Failed to send telemetry: {ex}')
async def _ensure_conversation_for_history(
self,
query: pipeline_query.Query,
) -> provider_session.Conversation:
session = getattr(query, 'session', None)
conversation = getattr(session, 'using_conversation', None)
if conversation is not None:
return conversation
if session is None or getattr(self.ap, 'sess_mgr', None) is None:
raise RuntimeError('Conversation is not available for history update')
prompt_config = await self._build_history_prompt_config(query)
conversation = await self.ap.sess_mgr.get_conversation(
query,
session,
prompt_config,
query.pipeline_uuid,
query.bot_uuid,
)
if conversation is None:
raise RuntimeError('Conversation manager did not return a conversation')
if getattr(session, 'using_conversation', None) is None:
session.using_conversation = conversation
return conversation
async def _build_history_prompt_config(
self,
query: pipeline_query.Query,
) -> list[dict[str, typing.Any]]:
prompt_messages = getattr(getattr(query, 'prompt', None), 'messages', None)
if prompt_messages:
prompt_config = []
for message in prompt_messages:
if hasattr(message, 'model_dump'):
prompt_config.append(message.model_dump(mode='python'))
elif isinstance(message, dict):
prompt_config.append(message)
if prompt_config:
return prompt_config
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {}
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
descriptor = await self._get_runner_descriptor(runner_id, bound_plugins)
return config_schema.extract_prompt_config(descriptor, runner_config, DEFAULT_PROMPT_CONFIG)
async def _get_runner_descriptor(
self,
runner_id: str | None,
bound_plugins: list[str] | None,
) -> typing.Any | None:
if not runner_id:
return None
registry = getattr(self.ap, 'agent_runner_registry', None)
if registry is None:
return None
try:
return await registry.get(runner_id, bound_plugins)
except Exception as e:
self.ap.logger.debug(f'Unable to load AgentRunner descriptor for {runner_id}: {e}')
return None

View File

@@ -239,7 +239,7 @@ async def _get_pipeline_knowledge_base_uuids(ap: app.Application, query: Any) ->
try:
descriptor = await registry.get(runner_id, bound_plugins)
except Exception as e:
ap.logger.warning(f'Failed to load AgentRunner descriptor for pipeline knowledge-base scope: {e}')
ap.logger.warning(f'Failed to load AgentRunner descriptor for knowledge-base scope: {e}')
return []
return config_schema.extract_knowledge_base_uuids(descriptor, runner_config)
@@ -302,7 +302,7 @@ async def _validate_run_authorization(
def _get_cached_query(ap: app.Application, query_id: int | None) -> Any | None:
"""Return a cached pipeline Query for runtime actions when available."""
"""Return a cached Query for query-based runtime actions when available."""
if query_id is None:
return None
@@ -313,7 +313,7 @@ def _get_cached_query(ap: app.Application, query_id: int | None) -> Any | None:
def _resolve_action_query(data: dict[str, Any], session: Any | None, ap: app.Application) -> Any | None:
"""Resolve the current Query from an AgentRunner session or action payload."""
"""Resolve the current Query from internal run state or query-based action payload."""
query_id = None
if session:
query_id = session.get('query_id')
@@ -762,8 +762,6 @@ class RuntimeConnectionHandler(handler.Handler):
parameters = data.get('tool_parameters') or data.get('parameters', {})
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# session_data = data['session']
# query_id = data['query_id']
session = None
# Permission validation for AgentRunner calls
@@ -1322,7 +1320,7 @@ class RuntimeConnectionHandler(handler.Handler):
@self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE)
async def retrieve_knowledge_base(data: dict[str, Any]) -> handler.ActionResponse:
"""Retrieve documents from a knowledge base within the pipeline's scope.
"""Retrieve documents from a knowledge base within the current run or query scope.
For AgentRunner calls: requires run_id and validates kb_id against session.resources.knowledge_bases.
For regular plugin calls: no run_id, validates against pipeline's configured knowledge bases.
@@ -1331,20 +1329,14 @@ class RuntimeConnectionHandler(handler.Handler):
- AgentRunner: uses session_registry for permission check
- Regular plugin: uses ConfigMigration.resolve_runner_config for pipeline-level check
"""
query_id = data['query_id']
kb_id = data['kb_id']
query_text = data['query_text']
top_k = data.get('top_k', 5)
filters = data.get('filters') or {}
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
if query_id not in self.ap.query_pool.cached_queries:
return handler.ActionResponse.error(
message=f'Query with query_id {query_id} not found',
)
query = self.ap.query_pool.cached_queries[query_id]
session = None
query = None
# Permission validation for AgentRunner calls
if run_id:
@@ -1353,7 +1345,16 @@ class RuntimeConnectionHandler(handler.Handler):
)
if error:
return error
query = _resolve_action_query(data, session, self.ap)
else:
query_id = data['query_id']
if query_id not in self.ap.query_pool.cached_queries:
return handler.ActionResponse.error(
message=f'Query with query_id {query_id} not found',
)
query = self.ap.query_pool.cached_queries[query_id]
# Regular plugin call: validate against the runner binding's
# schema-defined KB selectors or the preprocessed query scope.
allowed_kb_uuids = await _get_pipeline_knowledge_base_uuids(self.ap, query)
@@ -1370,16 +1371,22 @@ class RuntimeConnectionHandler(handler.Handler):
)
try:
session_name = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
settings: dict[str, Any] = {
'top_k': top_k,
'filters': filters,
}
if query is not None:
session_name = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
settings.update(
{
'session_name': session_name,
'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id),
}
)
entries = await kb.retrieve(
query_text,
settings={
'top_k': top_k,
'filters': filters,
'session_name': session_name,
'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id),
},
settings=settings,
)
results = [entry.model_dump(mode='json') for entry in entries]
return handler.ActionResponse.success(data={'results': results})

View File

@@ -540,7 +540,7 @@ class MCPLoader(loader.ToolLoader):
return function
return None
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query | None) -> typing.Any:
"""执行工具调用"""
for session in self.sessions.values():
for function in session.get_tools():

View File

@@ -45,7 +45,12 @@ class PluginToolLoader(loader.ToolLoader):
return tool
return None
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query | None) -> typing.Any:
if query is None:
raise ValueError(
f'Plugin tool {name} requires a query-based host context. '
'Use MCP tools or provide a Host tool implementation that is run-scoped.'
)
try:
return await self.ap.plugin_connector.call_tool(
name, parameters, session=query.session, query_id=query.query_id

View File

@@ -117,7 +117,9 @@ class ToolManager:
return tools
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query | None) -> typing.Any:
"""执行函数调用"""
if await self.native_tool_loader.has_tool(name):
return await self.native_tool_loader.invoke_tool(name, parameters, query)
if await self.plugin_tool_loader.has_tool(name):