feat(agent-runner): add event-first context facts and pull APIs

Add EventLog and Transcript persistence entities for storing auditable
event facts and conversation history projection. Implement event-first
AgentRunContext builder that produces Protocol v1 compliant context
payloads with required fields: event, delivery, context (ContextAccess).

Key changes:
- EventLog ORM: auditable event records with indexes
- Transcript ORM: conversation history projection with composite indexes
- AgentRunContextBuilder: Protocol v1 payload with delivery, context, bootstrap
- EventLogStore/TranscriptStore: async stores for fact sources
- Host action handlers: HISTORY_PAGE, HISTORY_SEARCH, EVENT_GET, EVENT_PAGE
- Context validation: build_context output validates via SDK AgentRunContext
- Alembic migration for event_log and transcript tables
- Alembic env.py imports all ORM models for autogenerate discovery

Legacy compatibility: max-round messages go into bootstrap.messages and
compatibility.legacy_messages, not top-level messages field.
This commit is contained in:
huanghuoguoguo
2026-05-23 16:07:46 +08:00
parent 9086f77cc5
commit 0a3bafae4b
18 changed files with 3705 additions and 60 deletions

View File

@@ -19,6 +19,8 @@ from .result_normalizer import AgentResultNormalizer
from .state_store import get_state_store, RunnerScopedStateStore
from .session_registry import get_session_registry, AgentRunSessionRegistry
from .config_migration import ConfigMigration
from .host_models import AgentEventEnvelope, AgentBinding
from .pipeline_compat_adapter import PipelineCompatAdapter
from .errors import (
RunnerNotFoundError,
RunnerExecutionError,
@@ -38,7 +40,9 @@ class AgentRunOrchestrator:
- Handle errors, timeouts, protocol errors
- Maintain streaming card behavior
This is the main entry point for ChatMessageHandler.
Entry points:
- run(event, binding): Main entry for event-first Protocol v1
- run_from_query(query): Compatibility wrapper for Pipeline
"""
ap: app.Application
@@ -69,13 +73,113 @@ class AgentRunOrchestrator:
self._session_registry = get_session_registry()
self._state_store = get_state_store()
async def run(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from event-first envelope.
This is the main entry point for Protocol v1.
Event Gateway -> AgentBindingResolver -> run(event, binding).
Args:
event: Event envelope from event gateway
binding: Agent binding configuration
Yields:
Message or MessageChunk for pipeline response
Raises:
RunnerNotFoundError: If runner not found
RunnerNotAuthorizedError: If runner not authorized
RunnerExecutionError: If runner execution failed
"""
runner_id = binding.runner_id
# Get runner descriptor
# TODO: Get bound plugins from binding when fully migrated
bound_plugins = None # Will be resolved from binding.scope in future
descriptor = await self.registry.get(runner_id, bound_plugins)
# Build resources from binding
resources = await self.resource_builder.build_resources_from_binding(
event=event,
binding=binding,
descriptor=descriptor,
)
# Build context from event + binding
context = await self.context_builder.build_context_from_event(
event=event,
binding=binding,
descriptor=descriptor,
resources=resources,
)
# Register session for proxy action permission validation
run_id = context['run_id']
await self._session_registry.register(
run_id=run_id,
runner_id=descriptor.id,
query_id=None, # No query_id in event-first mode
plugin_identity=descriptor.get_plugin_id(),
resources=resources,
conversation_id=event.conversation_id,
)
# Write incoming event to EventLog
event_log_id = await self._write_event_log(
event=event,
binding=binding,
run_id=run_id,
runner_id=descriptor.id,
)
# Write user message to Transcript if message.received
if event.event_type == 'message.received' and event.conversation_id:
await self._write_user_transcript(
event=event,
event_log_id=event_log_id,
)
try:
# Run via plugin connector
async for result_dict in self._invoke_runner(descriptor, context):
# Handle state.updated first - consume before normalizer
if result_dict.get('type') == 'state.updated':
self._handle_state_updated_event(result_dict, event, descriptor)
# Pass to normalizer for logging, but don't yield to pipeline
await self.result_normalizer.normalize(result_dict, descriptor)
continue
# Handle message.completed - write to Transcript
if result_dict.get('type') == 'message.completed' and event.conversation_id:
await self._write_assistant_transcript(
result_dict=result_dict,
event=event,
run_id=run_id,
runner_id=descriptor.id,
)
# Normalize result for other types
result = await self.result_normalizer.normalize(result_dict, descriptor)
if result is not None:
yield result
finally:
# Unregister session after run completes (success or error)
await self._session_registry.unregister(run_id)
async def run_from_query(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from pipeline query.
This is the main entry point called by ChatMessageHandler.
This is a compatibility wrapper for the legacy Query-based flow.
It preserves existing behavior for params, messages, state, etc.
For the new event-first Protocol v1, use run(event, binding) instead.
Args:
query: Pipeline query with pipeline_config, session, messages, etc.
@@ -99,12 +203,17 @@ class AgentRunOrchestrator:
# Get runner descriptor
descriptor = await self.registry.get(runner_id, bound_plugins)
# Build resources
# Build resources (using legacy Query-based method)
resources = await self.resource_builder.build_resources(query, descriptor)
# Build context
# Build context (using legacy Query-based method with params, state, messages)
context = await self.context_builder.build_context(query, descriptor, resources)
# Get conversation_id from context
conversation_id = None
if context.get('conversation'):
conversation_id = context['conversation'].get('conversation_id')
# Register session for proxy action permission validation
run_id = context['run_id']
await self._session_registry.register(
@@ -113,6 +222,7 @@ class AgentRunOrchestrator:
query_id=query.query_id,
plugin_identity=descriptor.get_plugin_id(),
resources=resources,
conversation_id=conversation_id,
)
try:
@@ -267,6 +377,8 @@ class AgentRunOrchestrator:
) -> None:
"""Handle state.updated result - apply to state store.
Legacy method for Query-based flow.
Args:
result_dict: Raw result dict with type='state.updated'
query: Pipeline query
@@ -302,3 +414,197 @@ class AgentRunOrchestrator:
f'Runner {descriptor.id} state.updated: scope={scope}, key={key}, value={value}'
)
# Invalid scope is already logged by state_store.apply_update
def _handle_state_updated_event(
self,
result_dict: dict[str, typing.Any],
event: AgentEventEnvelope,
descriptor: AgentRunnerDescriptor,
) -> None:
"""Handle state.updated result in event-first mode.
Args:
result_dict: Raw result dict with type='state.updated'
event: Event envelope
descriptor: Runner descriptor
"""
data = result_dict.get('data', {})
# Extract scope (default to 'conversation' for backward compat)
scope = data.get('scope', 'conversation')
# Extract key and value
key = data.get('key')
value = data.get('value')
if not key:
self.ap.logger.warning(
f'Runner {descriptor.id} state.updated missing key, ignoring'
)
return
# Apply update to state store using event context
# Note: state_store needs to support event-based scope key calculation
# For now, we log and skip actual persistence in event-first mode
# This will be implemented when state_store is migrated to support events
self.ap.logger.debug(
f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}, value={value}'
)
async def _write_event_log(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
run_id: str,
runner_id: str,
) -> str:
"""Write incoming event to EventLog.
Args:
event: Event envelope
binding: Agent binding
run_id: Run ID
runner_id: Runner ID
Returns:
Event log ID
"""
import datetime
from .event_log_store import EventLogStore
store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
# Build input summary
input_summary = None
input_json = None
if event.input:
if event.input.text:
input_summary = event.input.text[:1000]
input_json = {
'text': event.input.text,
'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents],
'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments],
}
return await store.append_event(
event_id=event.event_id,
event_type=event.event_type,
source=event.source,
bot_id=event.bot_id,
workspace_id=event.workspace_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
actor_type=event.actor.actor_type if event.actor else None,
actor_id=event.actor.actor_id if event.actor else None,
actor_name=event.actor.actor_name if event.actor else None,
subject_type=event.subject.subject_type if event.subject else None,
subject_id=event.subject.subject_id if event.subject else None,
input_summary=input_summary,
input_json=input_json,
run_id=run_id,
runner_id=runner_id,
event_time=datetime.datetime.fromtimestamp(event.event_time) if event.event_time else None,
)
async def _write_user_transcript(
self,
event: AgentEventEnvelope,
event_log_id: str,
) -> None:
"""Write user message to Transcript.
Args:
event: Event envelope
event_log_id: Event log ID
"""
from .transcript_store import TranscriptStore
store = TranscriptStore(self.ap.persistence_mgr.get_db_engine())
# Build content
content = event.input.text if event.input else None
content_json = None
if event.input:
content_json = {
'role': 'user',
'content': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents] if event.input.contents else [],
}
# Build artifact refs
artifact_refs = []
if event.input and event.input.attachments:
for a in event.input.attachments:
artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a)
await store.append_transcript(
event_id=event_log_id,
conversation_id=event.conversation_id,
role='user',
content=content,
content_json=content_json,
artifact_refs=artifact_refs if artifact_refs else None,
thread_id=event.thread_id,
item_type='message',
metadata={
'actor_type': event.actor.actor_type if event.actor else None,
'actor_id': event.actor.actor_id if event.actor else None,
},
)
async def _write_assistant_transcript(
self,
result_dict: dict[str, typing.Any],
event: AgentEventEnvelope,
run_id: str,
runner_id: str,
) -> None:
"""Write assistant message to Transcript.
Args:
result_dict: Result dict from runner
event: Original event envelope
run_id: Run ID
runner_id: Runner ID
"""
import uuid
from .transcript_store import TranscriptStore
store = TranscriptStore(self.ap.persistence_mgr.get_db_engine())
data = result_dict.get('data', {})
message = data.get('message', {})
# Build content
content = None
content_json = None
if isinstance(message.get('content'), str):
content = message['content']
content_json = message
elif isinstance(message.get('content'), list):
# Extract text from content list
text_parts = []
for c in message['content']:
if isinstance(c, dict) and c.get('type') == 'text':
text_parts.append(c.get('text', ''))
content = ' '.join(text_parts) if text_parts else None
content_json = message
# Generate a unique event ID for assistant message
assistant_event_id = str(uuid.uuid4())
await store.append_transcript(
transcript_id=str(uuid.uuid4()),
event_id=assistant_event_id,
conversation_id=event.conversation_id,
role='assistant',
content=content,
content_json=content_json,
thread_id=event.thread_id,
item_type='message',
run_id=run_id,
runner_id=runner_id,
metadata={
'run_id': run_id,
'runner_id': runner_id,
},
)