feat(agent-runner): align protocol adapter terminology

This commit is contained in:
huanghuoguoguo
2026-05-24 09:13:15 +08:00
parent 32e42f04ea
commit cc911cc413
26 changed files with 471 additions and 342 deletions

View File

@@ -21,7 +21,7 @@ from .persistent_state_store import get_persistent_state_store, PersistentStateS
from .session_registry import get_session_registry, AgentRunSessionRegistry
from .config_migration import ConfigMigration
from .host_models import AgentEventEnvelope, AgentBinding
from .pipeline_compat_adapter import PipelineCompatAdapter
from .pipeline_adapter import PipelineAdapter
from .errors import (
RunnerNotFoundError,
RunnerExecutionError,
@@ -48,7 +48,7 @@ class AgentRunOrchestrator:
Entry points:
- run(event, binding): Main entry for event-first Protocol v1
- run_from_query(query): Compatibility wrapper for Pipeline
- run_from_query(query): Pipeline adapter wrapper
"""
ap: app.Application
@@ -86,7 +86,7 @@ class AgentRunOrchestrator:
event: AgentEventEnvelope,
binding: AgentBinding,
bound_plugins: list[str] | None = None,
compatibility_context: dict[str, typing.Any] | None = None,
adapter_context: dict[str, typing.Any] | None = None,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from event-first envelope.
@@ -97,7 +97,7 @@ class AgentRunOrchestrator:
event: Event envelope from event gateway
binding: Agent binding configuration
bound_plugins: Optional list of bound plugin identities for authorization
compatibility_context: Optional compatibility context from Pipeline adapter
adapter_context: Optional adapter context from Pipeline adapter
Yields:
Message or MessageChunk for pipeline response
@@ -127,27 +127,27 @@ class AgentRunOrchestrator:
resources=resources,
)
# Merge compatibility context if provided (for Pipeline compatibility)
if compatibility_context:
# Merge params into compatibility.extra
if 'params' in compatibility_context:
context['compatibility']['extra']['params'] = compatibility_context['params']
# Merge prompt into compatibility.extra (for legacy runners)
if 'prompt' in compatibility_context:
context['compatibility']['extra']['prompt'] = compatibility_context['prompt']
# Merge adapter context if provided (for Pipeline adapter)
if adapter_context:
# Merge params into adapter.extra
if 'params' in adapter_context:
context['adapter']['extra']['params'] = adapter_context['params']
# Merge prompt into adapter.extra (for transition runners)
if 'prompt' in adapter_context:
context['adapter']['extra']['prompt'] = adapter_context['prompt']
# Merge bootstrap if provided
if compatibility_context.get('bootstrap'):
context['bootstrap'] = compatibility_context['bootstrap']
# Also set legacy_messages for legacy runners
bootstrap_messages = compatibility_context['bootstrap'].get('messages')
if adapter_context.get('bootstrap'):
context['bootstrap'] = adapter_context['bootstrap']
# Also set adapter_messages for transition runners
bootstrap_messages = adapter_context['bootstrap'].get('messages')
if bootstrap_messages:
context['compatibility']['legacy_messages'] = bootstrap_messages
context['adapter']['adapter_messages'] = bootstrap_messages
# Merge runtime metadata if provided
if compatibility_context.get('runtime_metadata'):
context['runtime']['metadata'].update(compatibility_context['runtime_metadata'])
if adapter_context.get('runtime_metadata'):
context['runtime']['metadata'].update(adapter_context['runtime_metadata'])
# Set query_id if provided
if compatibility_context.get('query_id'):
context['runtime']['query_id'] = compatibility_context['query_id']
if adapter_context.get('query_id'):
context['runtime']['query_id'] = adapter_context['query_id']
# Build state context for State API handlers
state_context = self._build_state_context(event, binding, descriptor)
@@ -243,7 +243,7 @@ class AgentRunOrchestrator:
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from pipeline query.
This is a compatibility wrapper for the legacy Query-based flow.
This is the Pipeline adapter wrapper for the Query-based flow.
It delegates to the event-first run(event, binding) method.
For the new event-first Protocol v1, use run(event, binding) instead.
@@ -265,34 +265,34 @@ class AgentRunOrchestrator:
raise RunnerNotFoundError('no runner configured')
# Convert Query to event-first envelope
event = PipelineCompatAdapter.query_to_event(query)
event = PipelineAdapter.query_to_event(query)
# Convert Pipeline config to binding
binding = PipelineCompatAdapter.pipeline_config_to_binding(query, runner_id)
binding = PipelineAdapter.pipeline_config_to_binding(query, runner_id)
# Extract bound plugins for authorization
bound_plugins = query.variables.get('_pipeline_bound_plugins')
# Build compatibility context for Pipeline-specific fields
compatibility_context = await self._build_compatibility_context(query, binding)
# Build adapter context for Pipeline-specific fields
adapter_context = await self._build_adapter_context(query, binding)
# Delegate to event-first run()
async for result in self.run(
event,
binding,
bound_plugins=bound_plugins,
compatibility_context=compatibility_context,
adapter_context=adapter_context,
):
yield result
async def _build_compatibility_context(
async def _build_adapter_context(
self,
query: pipeline_query.Query,
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build compatibility context for Pipeline Query-based flow.
"""Build adapter context for Pipeline Query-based flow.
This extracts legacy fields from Query that aren't available in
This extracts adapter-specific fields from Query that aren't available in
the event-first flow:
- params (from query.variables)
- bootstrap messages (for max-round)
@@ -304,7 +304,7 @@ class AgentRunOrchestrator:
binding: Agent binding with max_round
Returns:
Compatibility context dict
Adapter context dict
"""
from .context_packager import AgentContextPackager
@@ -312,10 +312,10 @@ class AgentRunOrchestrator:
# (excludes internal vars, sensitive patterns, permission vars, non-JSON values)
params = self.context_builder._build_params(query)
# Build prompt from query.prompt.messages (for legacy compatibility)
# Build prompt from query.prompt.messages (for transition runners)
prompt = self.context_builder._build_prompt(query)
# Build bootstrap context for legacy max-round
# Build bootstrap context for max-round
bootstrap = None
runtime_metadata = {}
max_round = binding.max_round
@@ -327,12 +327,12 @@ class AgentRunOrchestrator:
packaged_context = context_packager.package_messages(query, runner_config)
# Build messages list
legacy_messages = []
adapter_messages = []
for msg in packaged_context.messages:
legacy_messages.append(msg.model_dump(mode='json'))
adapter_messages.append(msg.model_dump(mode='json'))
bootstrap = {
'messages': legacy_messages,
'messages': adapter_messages,
'summary': None,
'artifacts': [],
'metadata': {},
@@ -497,7 +497,7 @@ class AgentRunOrchestrator:
"""
data = result_dict.get('data', {})
# Extract scope (default to 'conversation' for backward compat)
# Extract scope (default to conversation when omitted by the runner)
scope = data.get('scope', 'conversation')
# Extract key and value