feat(agent-runner): normalize binding config boundaries

This commit is contained in:
huanghuoguoguo
2026-06-02 15:40:57 +08:00
parent 93cd852061
commit 4d4ccfabd5
8 changed files with 185 additions and 108 deletions

View File

@@ -1,4 +1,5 @@
"""Configuration migration for agent runner IDs."""
from __future__ import annotations
import typing
@@ -113,10 +114,33 @@ class ConfigMigration:
if old_runner_name:
old_config = ai_config.get(old_runner_name, {})
if old_config:
return old_config
return ConfigMigration.normalize_runner_config_for_migration(runner_id, old_config)
return {}
@staticmethod
def normalize_runner_config_for_migration(
runner_id: str,
runner_config: dict[str, typing.Any],
) -> dict[str, typing.Any]:
"""Normalize released legacy runner config before storing binding config.
Runtime code should not carry aliases. This helper is intentionally used
only by config migration so AgentRunner implementations can consume the
current manifest-defined field names.
"""
normalized = dict(runner_config)
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
legacy_kb = normalized.pop('knowledge-base', None)
if 'knowledge-bases' not in normalized:
if isinstance(legacy_kb, str) and legacy_kb and legacy_kb not in {'__none__', '__none'}:
normalized['knowledge-bases'] = [legacy_kb]
elif legacy_kb is not None:
normalized['knowledge-bases'] = []
return normalized
@staticmethod
def get_old_runner_name(runner_id: str) -> str | None:
"""Get old runner name from mapped runner ID.
@@ -188,6 +212,7 @@ class ConfigMigration:
if not resolved_config:
resolved_config = ConfigMigration.resolve_legacy_runner_config(pipeline_config, runner_id)
if resolved_config:
resolved_config = ConfigMigration.normalize_runner_config_for_migration(runner_id, resolved_config)
runner_configs[runner_id] = resolved_config
# Remove old runner config block
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():

View File

@@ -1,4 +1,5 @@
"""Agent run context builder for provisioning AgentRunContext envelopes."""
from __future__ import annotations
import uuid
@@ -19,6 +20,7 @@ DEFAULT_RUNNER_TIMEOUT_SECONDS = 300
class AgentTrigger(typing.TypedDict):
"""Agent trigger information."""
type: str
source: str # 'pipeline' or 'event_router'
timestamp: int | None
@@ -26,6 +28,7 @@ class AgentTrigger(typing.TypedDict):
class ConversationContext(typing.TypedDict):
"""Conversation context."""
conversation_id: str | None
thread_id: str | None
launcher_type: str | None
@@ -39,6 +42,7 @@ class ConversationContext(typing.TypedDict):
class AgentInput(typing.TypedDict):
"""Agent input."""
text: str | None
contents: list[dict[str, typing.Any]]
message_chain: dict[str, typing.Any] | None
@@ -47,6 +51,7 @@ class AgentInput(typing.TypedDict):
class AgentRunState(typing.TypedDict):
"""Agent run state with 4 scopes."""
conversation: dict[str, typing.Any]
actor: dict[str, typing.Any]
subject: dict[str, typing.Any]
@@ -58,6 +63,7 @@ class AgentRunState(typing.TypedDict):
class ModelResource(typing.TypedDict):
"""Model resource payload."""
model_id: str
model_type: str | None
provider: str | None
@@ -65,6 +71,7 @@ class ModelResource(typing.TypedDict):
class ToolResource(typing.TypedDict):
"""Tool resource payload."""
tool_name: str
tool_type: str | None
description: str | None
@@ -72,6 +79,7 @@ class ToolResource(typing.TypedDict):
class KnowledgeBaseResource(typing.TypedDict):
"""Knowledge base resource payload."""
kb_id: str
kb_name: str | None
kb_type: str | None
@@ -79,6 +87,7 @@ class KnowledgeBaseResource(typing.TypedDict):
class FileResource(typing.TypedDict):
"""File resource payload."""
file_id: str
file_name: str | None
mime_type: str | None
@@ -87,12 +96,14 @@ class FileResource(typing.TypedDict):
class StorageResource(typing.TypedDict):
"""Storage resource payload."""
plugin_storage: bool
workspace_storage: bool
class AgentResources(typing.TypedDict):
"""Agent resources payload."""
models: list[ModelResource]
tools: list[ToolResource]
knowledge_bases: list[KnowledgeBaseResource]
@@ -103,6 +114,7 @@ class AgentResources(typing.TypedDict):
class AgentRuntimeContext(typing.TypedDict):
"""Agent runtime context."""
langbot_version: str | None
sdk_protocol_version: str
query_id: int | None
@@ -119,6 +131,7 @@ class AgentRunContextPayload(typing.TypedDict):
Note: The 'config' field contains the binding config from ai.runner_config[runner_id],
which is Pipeline's configuration for this specific runner binding (not plugin instance config).
"""
run_id: str
trigger: AgentTrigger
conversation: ConversationContext | None
@@ -237,7 +250,9 @@ class AgentRunContextBuilder:
'text': event.input.text,
'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents],
'message_chain': event.input.message_chain,
'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments],
'attachments': [
a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments
],
}
# Build context access (no history inlined by default for Protocol v1)
@@ -245,9 +260,7 @@ class AgentRunContextBuilder:
context_access = await self._build_context_access(event, descriptor, binding)
# Build state snapshot from persistent state store (event-first Protocol v1)
persistent_state_store = get_persistent_state_store(
self.ap.persistence_mgr.get_db_engine()
)
persistent_state_store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine())
state: AgentRunState = await persistent_state_store.build_snapshot_from_event(event, binding, descriptor)
# Build runtime context
@@ -261,6 +274,10 @@ class AgentRunContextBuilder:
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'streaming_supported': event.delivery.supports_streaming,
'model_context_window_tokens': None,
# TODO(model-info): populate model_context_window_tokens after
# LiteLLM/model metadata lands. Runners fall back to their
# binding config until Host can provide the real window.
},
}
@@ -375,6 +392,7 @@ class AgentRunContextBuilder:
if conversation_id:
try:
from .transcript_store import TranscriptStore
store = TranscriptStore(self.ap.persistence_mgr.get_db_engine())
latest_cursor = await store.get_latest_cursor(conversation_id)
@@ -406,5 +424,6 @@ class AgentRunContextBuilder:
'artifact_read': artifact_read_enabled,
'state': state_enabled,
'storage': True,
'prompt_get': False,
},
}

View File

@@ -31,6 +31,21 @@ def is_plugin_runner_id(runner_id: str) -> bool:
return runner_id.startswith('plugin:')
def normalize_runner_config_for_migration(runner_id: str, runner_config: dict) -> dict:
"""Normalize released legacy runner fields before storing binding config."""
normalized = dict(runner_config)
if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']:
legacy_kb = normalized.pop('knowledge-base', None)
if 'knowledge-bases' not in normalized:
if isinstance(legacy_kb, str) and legacy_kb and legacy_kb not in {'__none__', '__none'}:
normalized['knowledge-bases'] = [legacy_kb]
elif legacy_kb is not None:
normalized['knowledge-bases'] = []
return normalized
def migrate_pipeline_config(config: dict) -> dict:
"""Migrate pipeline config to new format."""
new_config = dict(config)
@@ -44,7 +59,13 @@ def migrate_pipeline_config(config: dict) -> dict:
# Check for new format first
runner_id = runner_config.get('id')
if runner_id and is_plugin_runner_id(runner_id):
# Already in new format, no need to migrate
if runner_id in runner_configs:
runner_configs[runner_id] = normalize_runner_config_for_migration(
runner_id,
runner_configs[runner_id],
)
ai_config['runner_config'] = runner_configs
new_config['ai'] = ai_config
return new_config
# Check for old format
@@ -67,14 +88,14 @@ def migrate_pipeline_config(config: dict) -> dict:
if old_runner_name in ai_config:
old_runner_config = ai_config[old_runner_name]
if old_runner_config:
runner_configs[runner_id] = old_runner_config
runner_configs[runner_id] = normalize_runner_config_for_migration(runner_id, old_runner_config)
# Remove old config block after migration
del ai_config[old_runner_name]
# Also check if runner_id has config under other old name formats
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id and old_name in ai_config:
runner_configs[runner_id] = ai_config[old_name]
runner_configs[runner_id] = normalize_runner_config_for_migration(runner_id, ai_config[old_name])
# Remove old config block after migration
del ai_config[old_name]
@@ -111,7 +132,7 @@ def upgrade() -> None:
if json.dumps(config, sort_keys=True) != json.dumps(migrated_config, sort_keys=True):
conn.execute(
sa.text('UPDATE pipelines SET config = :config WHERE uuid = :uuid'),
{'config': json.dumps(migrated_config), 'uuid': pipeline_uuid}
{'config': json.dumps(migrated_config), 'uuid': pipeline_uuid},
)
except Exception:
# Skip invalid configs
@@ -121,4 +142,4 @@ def upgrade() -> None:
def downgrade() -> None:
"""Downgrade is not supported for data migration."""
# No downgrade - keep configs in new format
pass
pass