feat(agent-runner): route pipeline runs through event-first flow

- run_from_query() now delegates to run(event, binding) instead of maintaining
  a separate legacy execution path
- Pipeline Query is converted to AgentEventEnvelope via PipelineCompatAdapter
- Pipeline config is converted to AgentBinding with StatePolicy
- bound_plugins authorization preserved from Pipeline
- Legacy compatibility fields preserved:
  - query_id → context.runtime.query_id → session registry
  - prompt → context.compatibility.extra.prompt (not top-level)
  - params → context.compatibility.extra.params (with proper filtering)
  - max-round → bootstrap.messages and compatibility.legacy_messages
- Pipeline path gains event-first host capabilities:
  - EventLog and Transcript writing
  - ArtifactStore registration
  - PersistentStateStore for state.updated
- Removed legacy handlers:
  - _handle_artifact_created_query() (replaced by _handle_artifact_created)
  - _handle_state_updated() (replaced by _handle_state_updated_event)

This change unifies the execution path while preserving backward compatibility
for Pipeline-based runners. EventGateway is not implemented in this branch;
only the event-first entry point is reserved.
This commit is contained in:
huanghuoguoguo
2026-05-23 22:26:15 +08:00
parent fa6b40a82b
commit a0d15ea054
4 changed files with 578 additions and 419 deletions

View File

@@ -85,6 +85,8 @@ class AgentRunOrchestrator:
self,
event: AgentEventEnvelope,
binding: AgentBinding,
bound_plugins: list[str] | None = None,
compatibility_context: dict[str, typing.Any] | None = None,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from event-first envelope.
@@ -94,6 +96,8 @@ class AgentRunOrchestrator:
Args:
event: Event envelope from event gateway
binding: Agent binding configuration
bound_plugins: Optional list of bound plugin identities for authorization
compatibility_context: Optional compatibility context from Pipeline adapter
Yields:
Message or MessageChunk for pipeline response
@@ -106,8 +110,6 @@ class AgentRunOrchestrator:
runner_id = binding.runner_id
# Get runner descriptor
# TODO: Get bound plugins from binding when fully migrated
bound_plugins = None # Will be resolved from binding.scope in future
descriptor = await self.registry.get(runner_id, bound_plugins)
# Build resources from binding
@@ -125,15 +127,38 @@ class AgentRunOrchestrator:
resources=resources,
)
# Merge compatibility context if provided (for Pipeline compatibility)
if compatibility_context:
# Merge params into compatibility.extra
if 'params' in compatibility_context:
context['compatibility']['extra']['params'] = compatibility_context['params']
# Merge prompt into compatibility.extra (for legacy runners)
if 'prompt' in compatibility_context:
context['compatibility']['extra']['prompt'] = compatibility_context['prompt']
# Merge bootstrap if provided
if compatibility_context.get('bootstrap'):
context['bootstrap'] = compatibility_context['bootstrap']
# Also set legacy_messages for legacy runners
bootstrap_messages = compatibility_context['bootstrap'].get('messages')
if bootstrap_messages:
context['compatibility']['legacy_messages'] = bootstrap_messages
# Merge runtime metadata if provided
if compatibility_context.get('runtime_metadata'):
context['runtime']['metadata'].update(compatibility_context['runtime_metadata'])
# Set query_id if provided
if compatibility_context.get('query_id'):
context['runtime']['query_id'] = compatibility_context['query_id']
# Build state context for State API handlers
state_context = self._build_state_context(event, binding, descriptor)
# Register session for proxy action permission validation
run_id = context['run_id']
query_id = context['runtime'].get('query_id') # May be None for pure event-first mode
await self._session_registry.register(
run_id=run_id,
runner_id=descriptor.id,
query_id=None, # No query_id in event-first mode
query_id=query_id,
plugin_identity=descriptor.get_plugin_id(),
resources=resources,
permissions=descriptor.permissions or {},
@@ -219,7 +244,7 @@ class AgentRunOrchestrator:
"""Run agent runner from pipeline query.
This is a compatibility wrapper for the legacy Query-based flow.
It preserves existing behavior for params, messages, state, etc.
It delegates to the event-first run(event, binding) method.
For the new event-first Protocol v1, use run(event, binding) instead.
@@ -234,70 +259,98 @@ class AgentRunOrchestrator:
RunnerNotAuthorizedError: If runner not authorized
RunnerExecutionError: If runner execution failed
"""
# Resolve runner ID
# Resolve runner ID using ConfigMigration
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
if not runner_id:
raise RunnerNotFoundError('no runner configured')
# Get bound plugins for authorization
# Convert Query to event-first envelope
event = PipelineCompatAdapter.query_to_event(query)
# Convert Pipeline config to binding
binding = PipelineCompatAdapter.pipeline_config_to_binding(query, runner_id)
# Extract bound plugins for authorization
bound_plugins = query.variables.get('_pipeline_bound_plugins')
# Get runner descriptor
descriptor = await self.registry.get(runner_id, bound_plugins)
# Build compatibility context for Pipeline-specific fields
compatibility_context = await self._build_compatibility_context(query, binding)
# Build resources (using legacy Query-based method)
resources = await self.resource_builder.build_resources(query, descriptor)
# Delegate to event-first run()
async for result in self.run(
event,
binding,
bound_plugins=bound_plugins,
compatibility_context=compatibility_context,
):
yield result
# Build context (using legacy Query-based method with params, state, messages)
context = await self.context_builder.build_context(query, descriptor, resources)
async def _build_compatibility_context(
self,
query: pipeline_query.Query,
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build compatibility context for Pipeline Query-based flow.
# Get conversation_id from context
conversation_id = None
if context.get('conversation'):
conversation_id = context['conversation'].get('conversation_id')
This extracts legacy fields from Query that aren't available in
the event-first flow:
- params (from query.variables)
- bootstrap messages (for max-round)
- query_id
- prompt messages
# Register session for proxy action permission validation
run_id = context['run_id']
await self._session_registry.register(
run_id=run_id,
runner_id=descriptor.id,
query_id=query.query_id,
plugin_identity=descriptor.get_plugin_id(),
resources=resources,
permissions=descriptor.permissions or {},
conversation_id=conversation_id,
)
Args:
query: Pipeline query
binding: Agent binding with max_round
try:
# Run via plugin connector
async for result_dict in self._invoke_runner(descriptor, context):
# Handle artifact.created - register artifact
if result_dict.get('type') == 'artifact.created':
await self._handle_artifact_created_query(
result_dict=result_dict,
query=query,
descriptor=descriptor,
run_id=run_id,
conversation_id=conversation_id,
)
# Pass to normalizer for logging, but don't yield to pipeline
await self.result_normalizer.normalize(result_dict, descriptor)
continue
Returns:
Compatibility context dict
"""
from .context_packager import AgentContextPackager
# Handle state.updated first - consume before normalizer
if result_dict.get('type') == 'state.updated':
self._handle_state_updated(result_dict, query, descriptor)
# Pass to normalizer for logging, but don't yield to pipeline
await self.result_normalizer.normalize(result_dict, descriptor)
continue
# Use context_builder's _build_params for proper filtering
# (excludes internal vars, sensitive patterns, permission vars, non-JSON values)
params = self.context_builder._build_params(query)
# Normalize result for other types
result = await self.result_normalizer.normalize(result_dict, descriptor)
if result is not None:
yield result
finally:
# Unregister session after run completes (success or error)
await self._session_registry.unregister(run_id)
# Build prompt from query.prompt.messages (for legacy compatibility)
prompt = self.context_builder._build_prompt(query)
# Build bootstrap context for legacy max-round
bootstrap = None
runtime_metadata = {}
max_round = binding.max_round
if max_round and max_round > 0 and query.messages:
# Package messages using context_packager
runner_config = binding.runner_config or {}
context_packager = AgentContextPackager()
packaged_context = context_packager.package_messages(query, runner_config)
# Build messages list
legacy_messages = []
for msg in packaged_context.messages:
legacy_messages.append(msg.model_dump(mode='json'))
bootstrap = {
'messages': legacy_messages,
'summary': None,
'artifacts': [],
'metadata': {},
}
# Build runtime metadata for context_packaging
runtime_metadata['context_packaging'] = {
'policy': packaged_context.policy,
'history': packaged_context.history,
}
return {
'params': params,
'prompt': prompt,
'bootstrap': bootstrap,
'query_id': query.query_id,
'runtime_metadata': runtime_metadata,
}
async def _invoke_runner(
self,
@@ -425,147 +478,6 @@ class AgentRunOrchestrator:
"""
return ConfigMigration.resolve_runner_id(query.pipeline_config)
def _handle_state_updated(
self,
result_dict: dict[str, typing.Any],
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> None:
"""Handle state.updated result - apply to state store.
Legacy method for Query-based flow.
Args:
result_dict: Raw result dict with type='state.updated'
query: Pipeline query
descriptor: Runner descriptor
"""
data = result_dict.get('data', {})
# Extract scope (default to 'conversation' for backward compat)
scope = data.get('scope', 'conversation')
# Extract key and value
key = data.get('key')
value = data.get('value')
if not key:
self.ap.logger.warning(
f'Runner {descriptor.id} state.updated missing key, ignoring'
)
return
# Apply update to state store
success = self._state_store.apply_update(
query=query,
descriptor=descriptor,
scope=scope,
key=key,
value=value,
logger=self.ap.logger,
)
if success:
self.ap.logger.debug(
f'Runner {descriptor.id} state.updated: scope={scope}, key={key}, value={value}'
)
# Invalid scope is already logged by state_store.apply_update
async def _handle_artifact_created_query(
self,
result_dict: dict[str, typing.Any],
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
run_id: str,
conversation_id: str | None,
) -> None:
"""Handle artifact.created result in Query-based flow.
Legacy Query flow only registers artifact metadata/content for compatibility.
Event log/transcript linkage is event-first only for now.
Args:
result_dict: Raw result dict with type='artifact.created'
query: Pipeline query
descriptor: Runner descriptor
run_id: Current run ID
conversation_id: Conversation ID (may be None)
Raises:
RunnerProtocolError: On validation failures or registration errors
"""
import base64
import uuid
from .artifact_store import ArtifactStore
data = result_dict.get('data', {})
# Validate run_id matches current context
result_run_id = result_dict.get('run_id')
if result_run_id and result_run_id != run_id:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created run_id mismatch: expected {run_id}, got {result_run_id}',
)
# Extract artifact fields
artifact_id = data.get('artifact_id') or str(uuid.uuid4())
artifact_type = data.get('artifact_type')
if not artifact_type:
raise RunnerProtocolError(
descriptor.id,
'artifact.created missing required field: artifact_type',
)
mime_type = data.get('mime_type')
name = data.get('name')
size_bytes = data.get('size_bytes')
sha256 = data.get('sha256')
metadata = data.get('metadata')
content_base64 = data.get('content_base64')
# Decode and validate content if provided
content: bytes | None = None
if content_base64:
try:
content = base64.b64decode(content_base64, validate=True)
except Exception as e:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created invalid base64 content: {e}',
)
# Validate content size
if len(content) > MAX_ARTIFACT_INLINE_BYTES:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created content size {len(content)} bytes exceeds limit {MAX_ARTIFACT_INLINE_BYTES} bytes',
)
# Register artifact via ArtifactStore
artifact_store = ArtifactStore(self.ap.persistence_mgr.get_db_engine())
try:
await artifact_store.register_artifact(
artifact_id=artifact_id,
artifact_type=artifact_type,
source='runner',
mime_type=mime_type,
name=name,
size_bytes=size_bytes,
sha256=sha256,
conversation_id=conversation_id,
run_id=run_id,
runner_id=descriptor.id,
metadata=metadata,
content=content,
)
except Exception as e:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created failed to register artifact: {e}',
)
async def _handle_state_updated_event(
self,
result_dict: dict[str, typing.Any],
@@ -778,6 +690,7 @@ class AgentRunOrchestrator:
artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a)
await store.append_transcript(
transcript_id=None, # Auto-generate
event_id=event_log_id,
conversation_id=event.conversation_id,
role='user',

View File

@@ -112,7 +112,8 @@ class PipelineCompatAdapter:
runner_config = ai_config.get('runner_config', {}).get(runner_id, {})
# Extract max_round for compatibility (used in bootstrap, not Protocol v1)
max_round = runner_config.get('max_round') or ai_config.get('max-round')
# Note: config uses 'max-round' with hyphen, not 'max_round' with underscore
max_round = runner_config.get('max-round') or ai_config.get('max-round')
# Build scope
scope = BindingScope(
@@ -560,7 +561,17 @@ class PipelineCompatAdapter:
if not use_funcs:
return None
try:
return [func.get('name') for func in use_funcs if isinstance(func, dict) and func.get('name')]
tool_names = []
for func in use_funcs:
if isinstance(func, dict):
name = func.get('name')
elif hasattr(func, 'name'):
name = func.name
else:
continue
if name:
tool_names.append(name)
return tool_names if tool_names else None
except (TypeError, AttributeError):
return None