mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-05 05:16:03 +00:00
refactor(agent-runner): simplify event-first entry path
This commit is contained in:
@@ -1,44 +1,25 @@
|
||||
"""Configuration migration for agent runner IDs."""
|
||||
"""Helpers for the current AgentRunner config shape."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from .id import is_plugin_runner_id
|
||||
|
||||
|
||||
# Mapping from old built-in runner names to official plugin runner IDs
|
||||
OLD_RUNNER_TO_PLUGIN_RUNNER_ID = {
|
||||
'local-agent': 'plugin:langbot/local-agent/default',
|
||||
'dify-service-api': 'plugin:langbot/dify-agent/default',
|
||||
'n8n-service-api': 'plugin:langbot/n8n-agent/default',
|
||||
'coze-api': 'plugin:langbot/coze-agent/default',
|
||||
'dashscope-app-api': 'plugin:langbot/dashscope-agent/default',
|
||||
'langflow-api': 'plugin:langbot/langflow-agent/default',
|
||||
'tbox-app-api': 'plugin:langbot/tbox-agent/default',
|
||||
}
|
||||
|
||||
|
||||
class ConfigMigration:
|
||||
"""Configuration migration helper for agent runner IDs.
|
||||
"""Configuration helper for agent runner IDs.
|
||||
|
||||
Responsibilities:
|
||||
- Resolve runner ID from new ai.runner.id or old ai.runner.runner
|
||||
- Map old built-in runner names to official plugin runner IDs
|
||||
- Resolve runner ID from ai.runner.id
|
||||
- Extract current Agent/runner config from ai.runner_config
|
||||
- Migrate old ai.<runner-name> blocks into ai.runner_config
|
||||
- Keep the current config container shape stable on save
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def resolve_runner_id(pipeline_config: dict[str, typing.Any]) -> str | None:
|
||||
"""Resolve runner ID from pipeline configuration.
|
||||
|
||||
Priority:
|
||||
1. New format: ai.runner.id (must be plugin:* format)
|
||||
2. Old format: ai.runner.runner (mapped to plugin:* if built-in)
|
||||
"""Resolve runner ID from current configuration.
|
||||
|
||||
Args:
|
||||
pipeline_config: Pipeline configuration dict
|
||||
pipeline_config: Current configuration container
|
||||
|
||||
Returns:
|
||||
Runner ID string, or None if not configured
|
||||
@@ -46,26 +27,9 @@ class ConfigMigration:
|
||||
ai_config = pipeline_config.get('ai', {})
|
||||
runner_config = ai_config.get('runner', {})
|
||||
|
||||
# Check new format first
|
||||
runner_id = runner_config.get('id')
|
||||
if runner_id:
|
||||
if is_plugin_runner_id(runner_id):
|
||||
return runner_id
|
||||
# If it's not a plugin ID, try to map it as old runner name
|
||||
return OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(runner_id, runner_id)
|
||||
|
||||
# Check old format
|
||||
old_runner_name = runner_config.get('runner')
|
||||
if old_runner_name:
|
||||
# If already plugin:* format, return directly
|
||||
if is_plugin_runner_id(old_runner_name):
|
||||
return old_runner_name
|
||||
# Map old built-in runner to official plugin ID
|
||||
mapped_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name)
|
||||
if mapped_id:
|
||||
return mapped_id
|
||||
# Return old name if no mapping exists (will error in registry)
|
||||
return old_runner_name
|
||||
return runner_id
|
||||
|
||||
return None
|
||||
|
||||
@@ -74,14 +38,10 @@ class ConfigMigration:
|
||||
pipeline_config: dict[str, typing.Any],
|
||||
runner_id: str,
|
||||
) -> dict[str, typing.Any]:
|
||||
"""Resolve Agent/runner configuration from pipeline configuration.
|
||||
|
||||
Runtime code should only read the migrated format. Legacy
|
||||
ai.<runner-name> blocks are handled by migration helpers, not by the
|
||||
hot path.
|
||||
"""Resolve Agent/runner configuration from the current container.
|
||||
|
||||
Args:
|
||||
pipeline_config: Pipeline configuration dict
|
||||
pipeline_config: Current configuration container
|
||||
runner_id: Resolved runner ID
|
||||
|
||||
Returns:
|
||||
@@ -89,79 +49,18 @@ class ConfigMigration:
|
||||
"""
|
||||
ai_config = pipeline_config.get('ai', {})
|
||||
|
||||
# Check new format
|
||||
runner_configs = ai_config.get('runner_config', {})
|
||||
if runner_id in runner_configs:
|
||||
return runner_configs[runner_id]
|
||||
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def resolve_legacy_runner_config(
|
||||
pipeline_config: dict[str, typing.Any],
|
||||
runner_id: str,
|
||||
) -> dict[str, typing.Any]:
|
||||
"""Resolve old ai.<runner-name> config for migration only."""
|
||||
ai_config = pipeline_config.get('ai', {})
|
||||
|
||||
# Try to find old runner name from runner_id
|
||||
old_runner_name = None
|
||||
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
|
||||
if mapped_id == runner_id:
|
||||
old_runner_name = old_name
|
||||
break
|
||||
|
||||
if old_runner_name:
|
||||
old_config = ai_config.get(old_runner_name, {})
|
||||
if old_config:
|
||||
return ConfigMigration.normalize_runner_config_for_migration(runner_id, old_config)
|
||||
|
||||
return {}
|
||||
|
||||
@staticmethod
|
||||
def normalize_runner_config_for_migration(
|
||||
runner_id: str,
|
||||
runner_config: dict[str, typing.Any],
|
||||
) -> dict[str, typing.Any]:
|
||||
"""Normalize released legacy runner config before storing Agent/runner config.
|
||||
|
||||
Runtime code should not carry aliases. This helper is intentionally used
|
||||
only by config migration so AgentRunner implementations can consume the
|
||||
current manifest-defined field names.
|
||||
"""
|
||||
normalized = dict(runner_config)
|
||||
|
||||
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
|
||||
legacy_kb = normalized.pop('knowledge-base', None)
|
||||
if 'knowledge-bases' not in normalized:
|
||||
if isinstance(legacy_kb, str) and legacy_kb and legacy_kb not in {'__none__', '__none'}:
|
||||
normalized['knowledge-bases'] = [legacy_kb]
|
||||
elif legacy_kb is not None:
|
||||
normalized['knowledge-bases'] = []
|
||||
|
||||
return normalized
|
||||
|
||||
@staticmethod
|
||||
def get_old_runner_name(runner_id: str) -> str | None:
|
||||
"""Get old runner name from mapped runner ID.
|
||||
|
||||
Args:
|
||||
runner_id: Plugin runner ID
|
||||
|
||||
Returns:
|
||||
Old runner name if mapped, None otherwise
|
||||
"""
|
||||
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
|
||||
if mapped_id == runner_id:
|
||||
return old_name
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_expire_time(pipeline_config: dict[str, typing.Any]) -> int:
|
||||
"""Get conversation expire time from configuration.
|
||||
|
||||
Args:
|
||||
pipeline_config: Pipeline configuration dict
|
||||
pipeline_config: Current configuration container
|
||||
|
||||
Returns:
|
||||
Expire time in seconds (0 means no expiry)
|
||||
@@ -172,54 +71,23 @@ class ConfigMigration:
|
||||
|
||||
@staticmethod
|
||||
def migrate_pipeline_config(pipeline_config: dict[str, typing.Any]) -> dict[str, typing.Any]:
|
||||
"""Migrate pipeline config to new format.
|
||||
|
||||
This converts old ai.runner.runner and ai.<runner-name> to
|
||||
new ai.runner.id and ai.runner_config format.
|
||||
"""Normalize the current config container before saving.
|
||||
|
||||
Args:
|
||||
pipeline_config: Original pipeline configuration
|
||||
pipeline_config: Original configuration
|
||||
|
||||
Returns:
|
||||
Migrated pipeline configuration
|
||||
Configuration with explicit ai.runner and ai.runner_config containers
|
||||
"""
|
||||
# Create copy
|
||||
new_config = dict(pipeline_config)
|
||||
ai_config = new_config.get('ai', {})
|
||||
if not ai_config:
|
||||
if 'ai' not in new_config:
|
||||
return new_config
|
||||
|
||||
runner_config = ai_config.get('runner', {})
|
||||
runner_configs = ai_config.get('runner_config', {})
|
||||
ai_config = dict(new_config.get('ai', {}))
|
||||
|
||||
# Resolve runner ID
|
||||
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
|
||||
if runner_id:
|
||||
# Set new format
|
||||
runner_config['id'] = runner_id
|
||||
# Remove old runner field if present
|
||||
if 'runner' in runner_config and is_plugin_runner_id(runner_config['runner']):
|
||||
# Already migrated plugin:* format, keep as id
|
||||
pass
|
||||
elif 'runner' in runner_config:
|
||||
# Old built-in runner name, remove after migration
|
||||
old_name = runner_config['runner']
|
||||
if old_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID:
|
||||
del runner_config['runner']
|
||||
runner_config = dict(ai_config.get('runner', {}))
|
||||
runner_configs = dict(ai_config.get('runner_config', {}))
|
||||
|
||||
# Migrate runner config
|
||||
resolved_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
|
||||
if not resolved_config:
|
||||
resolved_config = ConfigMigration.resolve_legacy_runner_config(pipeline_config, runner_id)
|
||||
if resolved_config:
|
||||
resolved_config = ConfigMigration.normalize_runner_config_for_migration(runner_id, resolved_config)
|
||||
runner_configs[runner_id] = resolved_config
|
||||
# Remove old runner config block
|
||||
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
|
||||
if mapped_id == runner_id and old_name in ai_config:
|
||||
del ai_config[old_name]
|
||||
|
||||
# Update configs
|
||||
ai_config['runner'] = runner_config
|
||||
ai_config['runner_config'] = runner_configs
|
||||
new_config['ai'] = ai_config
|
||||
|
||||
@@ -116,7 +116,6 @@ class AgentRuntimeContext(typing.TypedDict):
|
||||
|
||||
langbot_version: str | None
|
||||
sdk_protocol_version: str
|
||||
query_id: int | None
|
||||
trace_id: str | None
|
||||
deadline_at: float | None
|
||||
metadata: dict[str, typing.Any]
|
||||
@@ -128,8 +127,8 @@ class AgentRunContextPayload(typing.TypedDict):
|
||||
Protocol v1 structure - matches SDK AgentRunContext.
|
||||
|
||||
Note: The 'config' field contains the current Agent/runner config
|
||||
from ai.runner_config[runner_id] while Pipeline remains the temporary
|
||||
configuration container. It is not plugin instance config.
|
||||
from ai.runner_config[runner_id] while the current Query entry remains
|
||||
a temporary configuration container. It is not plugin instance config.
|
||||
"""
|
||||
|
||||
run_id: str
|
||||
@@ -145,7 +144,6 @@ class AgentRunContextPayload(typing.TypedDict):
|
||||
state: AgentRunState
|
||||
runtime: AgentRuntimeContext
|
||||
config: dict[str, typing.Any] # Agent/runner config from ai.runner_config[runner_id]
|
||||
bootstrap: dict[str, typing.Any] | None # Optional bootstrap context
|
||||
adapter: dict[str, typing.Any] | None # Entry adapter context
|
||||
metadata: dict[str, typing.Any] # Additional metadata
|
||||
|
||||
@@ -162,7 +160,7 @@ class AgentRunContextBuilder:
|
||||
- Build runtime context with host info, trace_id, deadline
|
||||
- Set config from current Agent/runner configuration.
|
||||
|
||||
Pipeline Query adaptation belongs to PipelineAdapter, not this builder.
|
||||
Query adaptation belongs to QueryEntryAdapter, not this builder.
|
||||
"""
|
||||
|
||||
ap: app.Application
|
||||
@@ -266,7 +264,6 @@ class AgentRunContextBuilder:
|
||||
runtime: AgentRuntimeContext = {
|
||||
'langbot_version': self.ap.ver_mgr.get_current_version(),
|
||||
'sdk_protocol_version': descriptor.protocol_version,
|
||||
'query_id': None, # No query_id in event-first mode
|
||||
'trace_id': run_id,
|
||||
'deadline_at': self._build_deadline_from_binding(binding),
|
||||
'metadata': {
|
||||
@@ -293,7 +290,6 @@ class AgentRunContextBuilder:
|
||||
|
||||
# Build adapter context (empty for event-first)
|
||||
adapter_context = {
|
||||
'query_id': None,
|
||||
'extra': {},
|
||||
}
|
||||
|
||||
@@ -312,7 +308,6 @@ class AgentRunContextBuilder:
|
||||
'state': state,
|
||||
'runtime': runtime,
|
||||
'config': binding.runner_config,
|
||||
'bootstrap': None,
|
||||
'adapter': adapter_context,
|
||||
'metadata': {}, # Additional metadata
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ from .persistent_state_store import get_persistent_state_store, PersistentStateS
|
||||
from .session_registry import get_session_registry, AgentRunSessionRegistry
|
||||
from .config_migration import ConfigMigration
|
||||
from .host_models import AgentEventEnvelope, AgentBinding
|
||||
from .pipeline_adapter import PipelineAdapter
|
||||
from .query_entry_adapter import QueryEntryAdapter
|
||||
from .state_scope import build_state_context
|
||||
from .errors import (
|
||||
RunnerNotFoundError,
|
||||
@@ -37,7 +37,7 @@ class AgentRunOrchestrator:
|
||||
"""Orchestrator for agent runner execution.
|
||||
|
||||
Responsibilities:
|
||||
- Resolve runner ID from pipeline config (new or old format)
|
||||
- Resolve runner ID from current Agent/runner config
|
||||
- Get runner descriptor from registry
|
||||
- Provision AgentRunContext envelope from Query
|
||||
- Build AgentResources with permission filtering
|
||||
@@ -48,7 +48,7 @@ class AgentRunOrchestrator:
|
||||
|
||||
Entry points:
|
||||
- run(event, binding): Main entry for event-first Protocol v1
|
||||
- run_from_query(query): Pipeline adapter wrapper
|
||||
- run_from_query(query): current Query entry adapter wrapper
|
||||
"""
|
||||
|
||||
ap: app.Application
|
||||
@@ -125,28 +125,24 @@ class AgentRunOrchestrator:
|
||||
resources=resources,
|
||||
)
|
||||
|
||||
session_query_id = None
|
||||
|
||||
# Merge adapter context if provided
|
||||
if adapter_context:
|
||||
session_query_id = adapter_context.get('query_id')
|
||||
# Merge params into adapter.extra
|
||||
if 'params' in adapter_context:
|
||||
context['adapter']['extra']['params'] = adapter_context['params']
|
||||
# Merge prompt into adapter.extra for transitional adapter consumers.
|
||||
if 'prompt' in adapter_context:
|
||||
context['adapter']['extra']['prompt'] = adapter_context['prompt']
|
||||
# Set query_id if provided
|
||||
if adapter_context.get('query_id'):
|
||||
context['runtime']['query_id'] = adapter_context['query_id']
|
||||
|
||||
# Build state context for State API handlers
|
||||
state_context = build_state_context(event, binding, descriptor)
|
||||
|
||||
# Register session for proxy action permission validation
|
||||
run_id = context['run_id']
|
||||
query_id = context['runtime'].get('query_id') # May be None for pure event-first mode
|
||||
await self._session_registry.register(
|
||||
run_id=run_id,
|
||||
runner_id=descriptor.id,
|
||||
query_id=query_id,
|
||||
query_id=session_query_id,
|
||||
plugin_identity=descriptor.get_plugin_id(),
|
||||
resources=resources,
|
||||
permissions=descriptor.permissions or {},
|
||||
@@ -238,7 +234,7 @@ class AgentRunOrchestrator:
|
||||
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
|
||||
"""Run agent runner from pipeline query.
|
||||
|
||||
This is the Pipeline adapter wrapper for the Query-based flow.
|
||||
This is the Query entry adapter wrapper for the query-based flow.
|
||||
It delegates to the event-first run(event, binding) method.
|
||||
|
||||
For the new event-first Protocol v1, use run(event, binding) instead.
|
||||
@@ -260,16 +256,16 @@ class AgentRunOrchestrator:
|
||||
raise RunnerNotFoundError('no runner configured')
|
||||
|
||||
# Convert Query to event-first envelope
|
||||
event = PipelineAdapter.query_to_event(query)
|
||||
event = QueryEntryAdapter.query_to_event(query)
|
||||
|
||||
# Convert Pipeline config to binding
|
||||
binding = PipelineAdapter.pipeline_config_to_binding(query, runner_id)
|
||||
# Convert current config to binding
|
||||
binding = QueryEntryAdapter.config_to_binding(query, runner_id)
|
||||
|
||||
# Extract bound plugins for authorization
|
||||
bound_plugins = query.variables.get('_pipeline_bound_plugins')
|
||||
|
||||
# Build adapter context for Pipeline-specific fields
|
||||
adapter_context = PipelineAdapter.build_adapter_context(query, binding)
|
||||
# Build adapter context for Query-specific fields
|
||||
adapter_context = QueryEntryAdapter.build_adapter_context(query, binding)
|
||||
|
||||
# Delegate to event-first run()
|
||||
async for result in self.run(
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Pipeline adapter for converting Query to event-first envelope.
|
||||
"""Query entry adapter for converting Query to event-first envelope.
|
||||
|
||||
This adapter bridges the Query/Pipeline entry point with the event-first
|
||||
Protocol v1 architecture.
|
||||
This adapter bridges the current Query entry point with the event-first
|
||||
Protocol v1 architecture without exposing Query internals to runners.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -31,12 +31,12 @@ from .host_models import (
|
||||
from . import events as runner_events
|
||||
|
||||
|
||||
class PipelineAdapter:
|
||||
"""Adapter for converting Pipeline Query to event-first envelope.
|
||||
class QueryEntryAdapter:
|
||||
"""Adapter for converting Query to event-first envelope.
|
||||
|
||||
This adapter is responsible for:
|
||||
- Converting Query to AgentEventEnvelope
|
||||
- Converting Pipeline config to temporary AgentBinding
|
||||
- Converting current Agent/runner config to temporary AgentBinding
|
||||
- Putting Query-only fields into adapter context
|
||||
"""
|
||||
|
||||
@@ -49,10 +49,10 @@ class PipelineAdapter:
|
||||
cls,
|
||||
query: pipeline_query.Query,
|
||||
) -> AgentEventEnvelope:
|
||||
"""Convert Pipeline Query to AgentEventEnvelope.
|
||||
"""Convert Query to AgentEventEnvelope.
|
||||
|
||||
Args:
|
||||
query: Pipeline query
|
||||
query: Current entry query
|
||||
|
||||
Returns:
|
||||
AgentEventEnvelope for event-first processing
|
||||
@@ -82,7 +82,7 @@ class PipelineAdapter:
|
||||
event_id=event.event_id or str(query.query_id),
|
||||
event_type=event.event_type or runner_events.MESSAGE_RECEIVED,
|
||||
event_time=event.event_time,
|
||||
source="pipeline_adapter",
|
||||
source="host_adapter",
|
||||
source_event_type=event.source_event_type,
|
||||
bot_id=query.bot_uuid,
|
||||
workspace_id=None, # Not available in Query
|
||||
@@ -97,15 +97,15 @@ class PipelineAdapter:
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def pipeline_config_to_binding(
|
||||
def config_to_binding(
|
||||
cls,
|
||||
query: pipeline_query.Query,
|
||||
runner_id: str,
|
||||
) -> AgentBinding:
|
||||
"""Convert Pipeline config to temporary AgentBinding.
|
||||
"""Convert current config container to temporary AgentBinding.
|
||||
|
||||
Args:
|
||||
query: Pipeline query
|
||||
query: Current entry query
|
||||
runner_id: Resolved runner ID
|
||||
|
||||
Returns:
|
||||
@@ -121,7 +121,7 @@ class PipelineAdapter:
|
||||
scope_id=agent_id,
|
||||
)
|
||||
|
||||
# Build resource policy from pipeline config
|
||||
# Build resource policy from current config
|
||||
resource_policy = ResourcePolicy(
|
||||
allowed_model_uuids=cls._extract_allowed_models(query),
|
||||
allowed_tool_names=cls._extract_allowed_tools(query),
|
||||
@@ -159,10 +159,9 @@ class PipelineAdapter:
|
||||
query: pipeline_query.Query,
|
||||
binding: AgentBinding,
|
||||
) -> dict[str, typing.Any]:
|
||||
"""Build Query-derived fields for the Pipeline adapter entry."""
|
||||
"""Build Query-derived fields for the current entry adapter."""
|
||||
return {
|
||||
'params': cls.build_params(query),
|
||||
'prompt': cls.build_prompt(query),
|
||||
'query_id': getattr(query, 'query_id', None),
|
||||
}
|
||||
|
||||
@@ -187,15 +186,6 @@ class PipelineAdapter:
|
||||
|
||||
return params
|
||||
|
||||
@classmethod
|
||||
def build_prompt(cls, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
|
||||
"""Build effective prompt messages from Pipeline preprocessing output."""
|
||||
prompt = getattr(query, 'prompt', None)
|
||||
messages = getattr(prompt, 'messages', None)
|
||||
if not messages:
|
||||
return []
|
||||
return [cls._dump_message(msg) for msg in messages]
|
||||
|
||||
@classmethod
|
||||
def is_json_serializable(cls, value: typing.Any) -> bool:
|
||||
"""Return whether a value can safely cross the adapter boundary as JSON."""
|
||||
@@ -210,18 +200,6 @@ class PipelineAdapter:
|
||||
)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _dump_message(message: typing.Any) -> dict[str, typing.Any]:
|
||||
"""Serialize a provider message-like object."""
|
||||
if hasattr(message, 'model_dump'):
|
||||
return message.model_dump(mode='json')
|
||||
if isinstance(message, dict):
|
||||
return message
|
||||
return {
|
||||
'role': getattr(message, 'role', None),
|
||||
'content': getattr(message, 'content', None),
|
||||
}
|
||||
|
||||
# Private helper methods
|
||||
|
||||
@classmethod
|
||||
@@ -262,7 +240,7 @@ class PipelineAdapter:
|
||||
event_id=cls._build_scoped_event_id(query, source_event_id, event_time),
|
||||
event_type=runner_events.MESSAGE_RECEIVED,
|
||||
event_time=event_time,
|
||||
source="pipeline_adapter",
|
||||
source="host_adapter",
|
||||
source_event_type=source_event_type,
|
||||
data=event_data,
|
||||
)
|
||||
@@ -278,7 +256,7 @@ class PipelineAdapter:
|
||||
launcher_type = getattr(query, 'launcher_type', None)
|
||||
launcher_type_value = getattr(launcher_type, 'value', launcher_type) if launcher_type is not None else None
|
||||
scope_parts = [
|
||||
'pipeline_adapter',
|
||||
'host_adapter',
|
||||
getattr(query, 'pipeline_uuid', None),
|
||||
getattr(query, 'bot_uuid', None),
|
||||
launcher_type_value,
|
||||
@@ -289,7 +267,7 @@ class PipelineAdapter:
|
||||
]
|
||||
scoped = '|'.join('' if part is None else str(part) for part in scope_parts)
|
||||
digest = hashlib.sha256(scoped.encode('utf-8')).hexdigest()[:32]
|
||||
return f'pipeline:{digest}'
|
||||
return f'host:{digest}'
|
||||
|
||||
@classmethod
|
||||
def _build_conversation_context(
|
||||
@@ -23,7 +23,7 @@ class AgentRunSession(typing.TypedDict):
|
||||
Fields:
|
||||
run_id: Unique run identifier (UUID from AgentRunContext)
|
||||
runner_id: Runner descriptor ID (plugin:author/name/runner)
|
||||
query_id: Pipeline query ID
|
||||
query_id: Host entry query ID, only present for query-based adapters
|
||||
plugin_identity: Plugin identifier (author/name) of the runner
|
||||
conversation_id: Conversation ID for history/event access
|
||||
resources: Authorized resources for this run (from AgentResources)
|
||||
@@ -82,7 +82,7 @@ class AgentRunSessionRegistry:
|
||||
Args:
|
||||
run_id: Unique run identifier
|
||||
runner_id: Runner descriptor ID
|
||||
query_id: Pipeline query ID
|
||||
query_id: Host entry query ID, only present for query-based adapters
|
||||
plugin_identity: Plugin identifier (author/name)
|
||||
resources: Authorized resources for this run
|
||||
conversation_id: Conversation ID for history/event access
|
||||
@@ -247,4 +247,4 @@ def get_session_registry() -> AgentRunSessionRegistry:
|
||||
with _global_registry_lock:
|
||||
if _global_registry is None:
|
||||
_global_registry = AgentRunSessionRegistry()
|
||||
return _global_registry
|
||||
return _global_registry
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""Migrate pipeline config to new runner format
|
||||
"""Normalize AgentRunner config containers
|
||||
|
||||
Revision ID: 0004_migrate_runner_config
|
||||
Revises: 0003_add_rerank_models
|
||||
@@ -14,101 +14,23 @@ down_revision = '0003_add_rerank_models'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
# Mapping from old built-in runner names to official plugin runner IDs
|
||||
OLD_RUNNER_TO_PLUGIN_RUNNER_ID = {
|
||||
'local-agent': 'plugin:langbot/local-agent/default',
|
||||
'dify-service-api': 'plugin:langbot/dify-agent/default',
|
||||
'n8n-service-api': 'plugin:langbot/n8n-agent/default',
|
||||
'coze-api': 'plugin:langbot/coze-agent/default',
|
||||
'dashscope-app-api': 'plugin:langbot/dashscope-agent/default',
|
||||
'langflow-api': 'plugin:langbot/langflow-agent/default',
|
||||
'tbox-app-api': 'plugin:langbot/tbox-agent/default',
|
||||
}
|
||||
|
||||
|
||||
def is_plugin_runner_id(runner_id: str) -> bool:
|
||||
"""Check if runner ID is in plugin:* format."""
|
||||
return runner_id.startswith('plugin:')
|
||||
|
||||
|
||||
def normalize_runner_config_for_migration(runner_id: str, runner_config: dict) -> dict:
|
||||
"""Normalize released legacy runner fields before storing binding config."""
|
||||
normalized = dict(runner_config)
|
||||
|
||||
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
|
||||
legacy_kb = normalized.pop('knowledge-base', None)
|
||||
if 'knowledge-bases' not in normalized:
|
||||
if isinstance(legacy_kb, str) and legacy_kb and legacy_kb not in {'__none__', '__none'}:
|
||||
normalized['knowledge-bases'] = [legacy_kb]
|
||||
elif legacy_kb is not None:
|
||||
normalized['knowledge-bases'] = []
|
||||
|
||||
return normalized
|
||||
|
||||
|
||||
def migrate_pipeline_config(config: dict) -> dict:
|
||||
"""Migrate pipeline config to new format."""
|
||||
"""Keep current AgentRunner config containers explicit."""
|
||||
new_config = dict(config)
|
||||
ai_config = new_config.get('ai', {})
|
||||
if not ai_config:
|
||||
if 'ai' not in new_config:
|
||||
return new_config
|
||||
|
||||
runner_config = ai_config.get('runner', {})
|
||||
runner_configs = ai_config.get('runner_config', {})
|
||||
ai_config = dict(new_config.get('ai', {}))
|
||||
|
||||
# Check for new format first
|
||||
runner_id = runner_config.get('id')
|
||||
if runner_id and is_plugin_runner_id(runner_id):
|
||||
if runner_id in runner_configs:
|
||||
runner_configs[runner_id] = normalize_runner_config_for_migration(
|
||||
runner_id,
|
||||
runner_configs[runner_id],
|
||||
)
|
||||
ai_config['runner_config'] = runner_configs
|
||||
new_config['ai'] = ai_config
|
||||
return new_config
|
||||
|
||||
# Check for old format
|
||||
old_runner_name = runner_config.get('runner')
|
||||
if old_runner_name:
|
||||
# Map to new runner ID
|
||||
if is_plugin_runner_id(old_runner_name):
|
||||
runner_id = old_runner_name
|
||||
else:
|
||||
runner_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name, old_runner_name)
|
||||
|
||||
# Set new format
|
||||
runner_config['id'] = runner_id
|
||||
|
||||
# Remove old runner field if it's a mapped built-in runner
|
||||
if old_runner_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID:
|
||||
del runner_config['runner']
|
||||
|
||||
# Migrate runner-specific config and remove old config blocks
|
||||
if old_runner_name in ai_config:
|
||||
old_runner_config = ai_config[old_runner_name]
|
||||
if old_runner_config:
|
||||
runner_configs[runner_id] = normalize_runner_config_for_migration(runner_id, old_runner_config)
|
||||
# Remove old config block after migration
|
||||
del ai_config[old_runner_name]
|
||||
|
||||
# Also check if runner_id has config under other old name formats
|
||||
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
|
||||
if mapped_id == runner_id and old_name in ai_config:
|
||||
runner_configs[runner_id] = normalize_runner_config_for_migration(runner_id, ai_config[old_name])
|
||||
# Remove old config block after migration
|
||||
del ai_config[old_name]
|
||||
|
||||
# Update configs
|
||||
ai_config['runner'] = runner_config
|
||||
ai_config['runner_config'] = runner_configs
|
||||
ai_config['runner'] = dict(ai_config.get('runner', {}))
|
||||
ai_config['runner_config'] = dict(ai_config.get('runner_config', {}))
|
||||
new_config['ai'] = ai_config
|
||||
|
||||
return new_config
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Migrate existing pipeline configs to new runner format."""
|
||||
"""Normalize existing pipeline config containers."""
|
||||
conn = op.get_bind()
|
||||
inspector = sa.inspect(conn)
|
||||
|
||||
|
||||
@@ -11,12 +11,19 @@ from .. import handler
|
||||
from ... import entities
|
||||
|
||||
import langbot_plugin.api.entities.events as events
|
||||
from ....agent.runner.config_migration import ConfigMigration
|
||||
from ....agent.runner import config_schema
|
||||
from ....utils import constants, runner as runner_utils
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
DEFAULT_PROMPT_CONFIG = [
|
||||
{'role': 'system', 'content': 'You are a helpful assistant.'},
|
||||
]
|
||||
|
||||
|
||||
class ChatMessageHandler(handler.MessageHandler):
|
||||
"""Chat message handler using AgentRunOrchestrator.
|
||||
|
||||
@@ -140,8 +147,9 @@ class ChatMessageHandler(handler.MessageHandler):
|
||||
)
|
||||
|
||||
# Update conversation history
|
||||
query.session.using_conversation.messages.append(query.user_message)
|
||||
query.session.using_conversation.messages.extend(query.resp_messages)
|
||||
conversation = await self._ensure_conversation_for_history(query)
|
||||
conversation.messages.append(query.user_message)
|
||||
conversation.messages.extend(query.resp_messages)
|
||||
|
||||
except Exception as e:
|
||||
# Import orchestrator errors for specific handling
|
||||
@@ -234,3 +242,69 @@ class ChatMessageHandler(handler.MessageHandler):
|
||||
await self.ap.survey.trigger_event('first_bot_response_success')
|
||||
except Exception as ex:
|
||||
self.ap.logger.warning(f'Failed to send telemetry: {ex}')
|
||||
|
||||
async def _ensure_conversation_for_history(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
) -> provider_session.Conversation:
|
||||
session = getattr(query, 'session', None)
|
||||
conversation = getattr(session, 'using_conversation', None)
|
||||
if conversation is not None:
|
||||
return conversation
|
||||
|
||||
if session is None or getattr(self.ap, 'sess_mgr', None) is None:
|
||||
raise RuntimeError('Conversation is not available for history update')
|
||||
|
||||
prompt_config = await self._build_history_prompt_config(query)
|
||||
conversation = await self.ap.sess_mgr.get_conversation(
|
||||
query,
|
||||
session,
|
||||
prompt_config,
|
||||
query.pipeline_uuid,
|
||||
query.bot_uuid,
|
||||
)
|
||||
if conversation is None:
|
||||
raise RuntimeError('Conversation manager did not return a conversation')
|
||||
|
||||
if getattr(session, 'using_conversation', None) is None:
|
||||
session.using_conversation = conversation
|
||||
return conversation
|
||||
|
||||
async def _build_history_prompt_config(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
) -> list[dict[str, typing.Any]]:
|
||||
prompt_messages = getattr(getattr(query, 'prompt', None), 'messages', None)
|
||||
if prompt_messages:
|
||||
prompt_config = []
|
||||
for message in prompt_messages:
|
||||
if hasattr(message, 'model_dump'):
|
||||
prompt_config.append(message.model_dump(mode='python'))
|
||||
elif isinstance(message, dict):
|
||||
prompt_config.append(message)
|
||||
if prompt_config:
|
||||
return prompt_config
|
||||
|
||||
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
|
||||
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {}
|
||||
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
||||
descriptor = await self._get_runner_descriptor(runner_id, bound_plugins)
|
||||
return config_schema.extract_prompt_config(descriptor, runner_config, DEFAULT_PROMPT_CONFIG)
|
||||
|
||||
async def _get_runner_descriptor(
|
||||
self,
|
||||
runner_id: str | None,
|
||||
bound_plugins: list[str] | None,
|
||||
) -> typing.Any | None:
|
||||
if not runner_id:
|
||||
return None
|
||||
|
||||
registry = getattr(self.ap, 'agent_runner_registry', None)
|
||||
if registry is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return await registry.get(runner_id, bound_plugins)
|
||||
except Exception as e:
|
||||
self.ap.logger.debug(f'Unable to load AgentRunner descriptor for {runner_id}: {e}')
|
||||
return None
|
||||
|
||||
@@ -239,7 +239,7 @@ async def _get_pipeline_knowledge_base_uuids(ap: app.Application, query: Any) ->
|
||||
try:
|
||||
descriptor = await registry.get(runner_id, bound_plugins)
|
||||
except Exception as e:
|
||||
ap.logger.warning(f'Failed to load AgentRunner descriptor for pipeline knowledge-base scope: {e}')
|
||||
ap.logger.warning(f'Failed to load AgentRunner descriptor for knowledge-base scope: {e}')
|
||||
return []
|
||||
|
||||
return config_schema.extract_knowledge_base_uuids(descriptor, runner_config)
|
||||
@@ -302,7 +302,7 @@ async def _validate_run_authorization(
|
||||
|
||||
|
||||
def _get_cached_query(ap: app.Application, query_id: int | None) -> Any | None:
|
||||
"""Return a cached pipeline Query for runtime actions when available."""
|
||||
"""Return a cached Query for query-based runtime actions when available."""
|
||||
if query_id is None:
|
||||
return None
|
||||
|
||||
@@ -313,7 +313,7 @@ def _get_cached_query(ap: app.Application, query_id: int | None) -> Any | None:
|
||||
|
||||
|
||||
def _resolve_action_query(data: dict[str, Any], session: Any | None, ap: app.Application) -> Any | None:
|
||||
"""Resolve the current Query from an AgentRunner session or action payload."""
|
||||
"""Resolve the current Query from internal run state or query-based action payload."""
|
||||
query_id = None
|
||||
if session:
|
||||
query_id = session.get('query_id')
|
||||
@@ -762,8 +762,6 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
parameters = data.get('tool_parameters') or data.get('parameters', {})
|
||||
run_id = data.get('run_id') # Optional: present for AgentRunner calls
|
||||
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
|
||||
# session_data = data['session']
|
||||
# query_id = data['query_id']
|
||||
session = None
|
||||
|
||||
# Permission validation for AgentRunner calls
|
||||
@@ -1322,7 +1320,7 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
|
||||
@self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE)
|
||||
async def retrieve_knowledge_base(data: dict[str, Any]) -> handler.ActionResponse:
|
||||
"""Retrieve documents from a knowledge base within the pipeline's scope.
|
||||
"""Retrieve documents from a knowledge base within the current run or query scope.
|
||||
|
||||
For AgentRunner calls: requires run_id and validates kb_id against session.resources.knowledge_bases.
|
||||
For regular plugin calls: no run_id, validates against pipeline's configured knowledge bases.
|
||||
@@ -1331,20 +1329,14 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
- AgentRunner: uses session_registry for permission check
|
||||
- Regular plugin: uses ConfigMigration.resolve_runner_config for pipeline-level check
|
||||
"""
|
||||
query_id = data['query_id']
|
||||
kb_id = data['kb_id']
|
||||
query_text = data['query_text']
|
||||
top_k = data.get('top_k', 5)
|
||||
filters = data.get('filters') or {}
|
||||
run_id = data.get('run_id') # Optional: present for AgentRunner calls
|
||||
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
|
||||
|
||||
if query_id not in self.ap.query_pool.cached_queries:
|
||||
return handler.ActionResponse.error(
|
||||
message=f'Query with query_id {query_id} not found',
|
||||
)
|
||||
|
||||
query = self.ap.query_pool.cached_queries[query_id]
|
||||
session = None
|
||||
query = None
|
||||
|
||||
# Permission validation for AgentRunner calls
|
||||
if run_id:
|
||||
@@ -1353,7 +1345,16 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
)
|
||||
if error:
|
||||
return error
|
||||
query = _resolve_action_query(data, session, self.ap)
|
||||
else:
|
||||
query_id = data['query_id']
|
||||
if query_id not in self.ap.query_pool.cached_queries:
|
||||
return handler.ActionResponse.error(
|
||||
message=f'Query with query_id {query_id} not found',
|
||||
)
|
||||
|
||||
query = self.ap.query_pool.cached_queries[query_id]
|
||||
|
||||
# Regular plugin call: validate against the runner binding's
|
||||
# schema-defined KB selectors or the preprocessed query scope.
|
||||
allowed_kb_uuids = await _get_pipeline_knowledge_base_uuids(self.ap, query)
|
||||
@@ -1370,16 +1371,22 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
)
|
||||
|
||||
try:
|
||||
session_name = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
|
||||
settings: dict[str, Any] = {
|
||||
'top_k': top_k,
|
||||
'filters': filters,
|
||||
}
|
||||
if query is not None:
|
||||
session_name = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
|
||||
settings.update(
|
||||
{
|
||||
'session_name': session_name,
|
||||
'bot_uuid': query.bot_uuid or '',
|
||||
'sender_id': str(query.sender_id),
|
||||
}
|
||||
)
|
||||
entries = await kb.retrieve(
|
||||
query_text,
|
||||
settings={
|
||||
'top_k': top_k,
|
||||
'filters': filters,
|
||||
'session_name': session_name,
|
||||
'bot_uuid': query.bot_uuid or '',
|
||||
'sender_id': str(query.sender_id),
|
||||
},
|
||||
settings=settings,
|
||||
)
|
||||
results = [entry.model_dump(mode='json') for entry in entries]
|
||||
return handler.ActionResponse.success(data={'results': results})
|
||||
|
||||
@@ -540,7 +540,7 @@ class MCPLoader(loader.ToolLoader):
|
||||
return function
|
||||
return None
|
||||
|
||||
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
|
||||
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query | None) -> typing.Any:
|
||||
"""执行工具调用"""
|
||||
for session in self.sessions.values():
|
||||
for function in session.get_tools():
|
||||
|
||||
@@ -45,7 +45,12 @@ class PluginToolLoader(loader.ToolLoader):
|
||||
return tool
|
||||
return None
|
||||
|
||||
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
|
||||
async def invoke_tool(self, name: str, parameters: dict, query: pipeline_query.Query | None) -> typing.Any:
|
||||
if query is None:
|
||||
raise ValueError(
|
||||
f'Plugin tool {name} requires a query-based host context. '
|
||||
'Use MCP tools or provide a Host tool implementation that is run-scoped.'
|
||||
)
|
||||
try:
|
||||
return await self.ap.plugin_connector.call_tool(
|
||||
name, parameters, session=query.session, query_id=query.query_id
|
||||
|
||||
@@ -117,7 +117,9 @@ class ToolManager:
|
||||
|
||||
return tools
|
||||
|
||||
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
|
||||
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query | None) -> typing.Any:
|
||||
"""执行函数调用"""
|
||||
|
||||
if await self.native_tool_loader.has_tool(name):
|
||||
return await self.native_tool_loader.invoke_tool(name, parameters, query)
|
||||
if await self.plugin_tool_loader.has_tool(name):
|
||||
|
||||
Reference in New Issue
Block a user