feat(agent-runner): persist created artifacts

This commit is contained in:
huanghuoguoguo
2026-05-23 18:13:53 +08:00
parent 6fc93235f7
commit 92d28bfcb0
3 changed files with 1172 additions and 0 deletions

View File

@@ -24,9 +24,14 @@ from .pipeline_compat_adapter import PipelineCompatAdapter
from .errors import (
RunnerNotFoundError,
RunnerExecutionError,
RunnerProtocolError,
)
# Maximum inline artifact content size (1MB)
MAX_ARTIFACT_INLINE_BYTES = 1 * 1024 * 1024
class AgentRunOrchestrator:
"""Orchestrator for agent runner execution.
@@ -144,9 +149,25 @@ class AgentRunOrchestrator:
event_log_id=event_log_id,
)
# Track artifact refs for assistant transcript (cleared after each message.completed)
pending_artifact_refs: list[dict[str, typing.Any]] = []
try:
# Run via plugin connector
async for result_dict in self._invoke_runner(descriptor, context):
# Handle artifact.created first - consume before normalizer
if result_dict.get('type') == 'artifact.created':
artifact_ref = await self._handle_artifact_created(
result_dict=result_dict,
event=event,
run_id=run_id,
runner_id=descriptor.id,
)
pending_artifact_refs.append(artifact_ref)
# Pass to normalizer for logging, but don't yield to pipeline
await self.result_normalizer.normalize(result_dict, descriptor)
continue
# Handle state.updated first - consume before normalizer
if result_dict.get('type') == 'state.updated':
self._handle_state_updated_event(result_dict, event, descriptor)
@@ -156,11 +177,20 @@ class AgentRunOrchestrator:
# Handle message.completed - write to Transcript
if result_dict.get('type') == 'message.completed' and event.conversation_id:
# Merge pending artifact refs with message's own refs
merged_refs = self._merge_artifact_refs(
pending_artifact_refs,
result_dict,
)
# Clear pending refs after attaching to this message
pending_artifact_refs.clear()
await self._write_assistant_transcript(
result_dict=result_dict,
event=event,
run_id=run_id,
runner_id=descriptor.id,
artifact_refs=merged_refs if merged_refs else None,
)
# Normalize result for other types
@@ -230,6 +260,19 @@ class AgentRunOrchestrator:
try:
# Run via plugin connector
async for result_dict in self._invoke_runner(descriptor, context):
# Handle artifact.created - register artifact
if result_dict.get('type') == 'artifact.created':
await self._handle_artifact_created_query(
result_dict=result_dict,
query=query,
descriptor=descriptor,
run_id=run_id,
conversation_id=conversation_id,
)
# Pass to normalizer for logging, but don't yield to pipeline
await self.result_normalizer.normalize(result_dict, descriptor)
continue
# Handle state.updated first - consume before normalizer
if result_dict.get('type') == 'state.updated':
self._handle_state_updated(result_dict, query, descriptor)
@@ -417,6 +460,101 @@ class AgentRunOrchestrator:
)
# Invalid scope is already logged by state_store.apply_update
async def _handle_artifact_created_query(
self,
result_dict: dict[str, typing.Any],
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
run_id: str,
conversation_id: str | None,
) -> None:
"""Handle artifact.created result in Query-based flow.
Legacy Query flow only registers artifact metadata/content for compatibility.
Event log/transcript linkage is event-first only for now.
Args:
result_dict: Raw result dict with type='artifact.created'
query: Pipeline query
descriptor: Runner descriptor
run_id: Current run ID
conversation_id: Conversation ID (may be None)
Raises:
RunnerProtocolError: On validation failures or registration errors
"""
import base64
import uuid
from .artifact_store import ArtifactStore
data = result_dict.get('data', {})
# Validate run_id matches current context
result_run_id = result_dict.get('run_id')
if result_run_id and result_run_id != run_id:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created run_id mismatch: expected {run_id}, got {result_run_id}',
)
# Extract artifact fields
artifact_id = data.get('artifact_id') or str(uuid.uuid4())
artifact_type = data.get('artifact_type')
if not artifact_type:
raise RunnerProtocolError(
descriptor.id,
'artifact.created missing required field: artifact_type',
)
mime_type = data.get('mime_type')
name = data.get('name')
size_bytes = data.get('size_bytes')
sha256 = data.get('sha256')
metadata = data.get('metadata')
content_base64 = data.get('content_base64')
# Decode and validate content if provided
content: bytes | None = None
if content_base64:
try:
content = base64.b64decode(content_base64, validate=True)
except Exception as e:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created invalid base64 content: {e}',
)
# Validate content size
if len(content) > MAX_ARTIFACT_INLINE_BYTES:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created content size {len(content)} bytes exceeds limit {MAX_ARTIFACT_INLINE_BYTES} bytes',
)
# Register artifact via ArtifactStore
artifact_store = ArtifactStore(self.ap.persistence_mgr.get_db_engine())
try:
await artifact_store.register_artifact(
artifact_id=artifact_id,
artifact_type=artifact_type,
source='runner',
mime_type=mime_type,
name=name,
size_bytes=size_bytes,
sha256=sha256,
conversation_id=conversation_id,
run_id=run_id,
runner_id=descriptor.id,
metadata=metadata,
content=content,
)
except Exception as e:
raise RunnerProtocolError(
descriptor.id,
f'artifact.created failed to register artifact: {e}',
)
def _handle_state_updated_event(
self,
result_dict: dict[str, typing.Any],
@@ -552,12 +690,175 @@ class AgentRunOrchestrator:
},
)
async def _handle_artifact_created(
self,
result_dict: dict[str, typing.Any],
event: AgentEventEnvelope,
run_id: str,
runner_id: str,
) -> dict[str, typing.Any]:
"""Handle artifact.created result - register artifact and write EventLog.
Args:
result_dict: Raw result dict with type='artifact.created'
event: Event envelope
run_id: Current run ID
runner_id: Runner ID
Returns:
Artifact reference dict for Transcript
Raises:
RunnerProtocolError: On validation failures or registration errors
"""
import base64
import uuid
from .artifact_store import ArtifactStore
from .event_log_store import EventLogStore
data = result_dict.get('data', {})
# Validate run_id matches current context
result_run_id = result_dict.get('run_id')
if result_run_id and result_run_id != run_id:
raise RunnerProtocolError(
runner_id,
f'artifact.created run_id mismatch: expected {run_id}, got {result_run_id}',
)
# Extract artifact fields
artifact_id = data.get('artifact_id') or str(uuid.uuid4())
artifact_type = data.get('artifact_type')
if not artifact_type:
raise RunnerProtocolError(
runner_id,
'artifact.created missing required field: artifact_type',
)
mime_type = data.get('mime_type')
name = data.get('name')
size_bytes = data.get('size_bytes')
sha256 = data.get('sha256')
metadata = data.get('metadata')
content_base64 = data.get('content_base64')
# Decode and validate content if provided
content: bytes | None = None
if content_base64:
try:
content = base64.b64decode(content_base64, validate=True)
except Exception as e:
raise RunnerProtocolError(
runner_id,
f'artifact.created invalid base64 content: {e}',
)
# Validate content size
if len(content) > MAX_ARTIFACT_INLINE_BYTES:
raise RunnerProtocolError(
runner_id,
f'artifact.created content size {len(content)} bytes exceeds limit {MAX_ARTIFACT_INLINE_BYTES} bytes',
)
# Register artifact via ArtifactStore
artifact_store = ArtifactStore(self.ap.persistence_mgr.get_db_engine())
try:
registered_id = await artifact_store.register_artifact(
artifact_id=artifact_id,
artifact_type=artifact_type,
source='runner',
mime_type=mime_type,
name=name,
size_bytes=size_bytes,
sha256=sha256,
conversation_id=event.conversation_id,
run_id=run_id,
runner_id=runner_id,
bot_id=event.bot_id,
workspace_id=event.workspace_id,
metadata=metadata,
content=content,
)
except Exception as e:
raise RunnerProtocolError(
runner_id,
f'artifact.created failed to register artifact: {e}',
)
# Write to EventLog
event_log_store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
await event_log_store.append_event(
event_id=str(uuid.uuid4()),
event_type='artifact.created',
source='runner',
bot_id=event.bot_id,
workspace_id=event.workspace_id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
actor_type=event.actor.actor_type if event.actor else None,
actor_id=event.actor.actor_id if event.actor else None,
actor_name=event.actor.actor_name if event.actor else None,
input_summary=f'Artifact created: {artifact_type}',
input_json={
'artifact_id': registered_id,
'artifact_type': artifact_type,
'mime_type': mime_type,
'name': name,
'size_bytes': size_bytes,
},
run_id=run_id,
runner_id=runner_id,
)
# Return artifact ref for Transcript
return {
'artifact_id': registered_id,
'artifact_type': artifact_type,
'mime_type': mime_type,
'name': name,
}
def _merge_artifact_refs(
self,
pending_refs: list[dict[str, typing.Any]],
result_dict: dict[str, typing.Any],
) -> list[dict[str, typing.Any]]:
"""Merge pending artifact refs with message's own refs, deduplicating by artifact_id.
Args:
pending_refs: Artifact refs accumulated from artifact.created events
result_dict: Result dict that may contain message with artifact_refs
Returns:
Merged and deduplicated list of artifact refs
"""
# Start with pending refs
merged = list(pending_refs)
seen_ids = {ref.get('artifact_id') for ref in pending_refs if ref.get('artifact_id')}
# Extract refs from message data if present
data = result_dict.get('data', {})
message = data.get('message', {})
message_refs = message.get('artifact_refs', [])
if isinstance(message_refs, list):
for ref in message_refs:
if isinstance(ref, dict):
artifact_id = ref.get('artifact_id')
if artifact_id and artifact_id not in seen_ids:
merged.append(ref)
seen_ids.add(artifact_id)
return merged
async def _write_assistant_transcript(
self,
result_dict: dict[str, typing.Any],
event: AgentEventEnvelope,
run_id: str,
runner_id: str,
artifact_refs: list[dict[str, typing.Any]] | None = None,
) -> None:
"""Write assistant message to Transcript.
@@ -566,6 +867,7 @@ class AgentRunOrchestrator:
event: Original event envelope
run_id: Run ID
runner_id: Runner ID
artifact_refs: Optional artifact references to include
"""
import uuid
@@ -601,6 +903,7 @@ class AgentRunOrchestrator:
role='assistant',
content=content,
content_json=content_json,
artifact_refs=artifact_refs,
thread_id=event.thread_id,
item_type='message',
run_id=run_id,

View File

@@ -143,6 +143,15 @@ class AgentResultNormalizer:
)
return None
elif result_type == 'artifact.created':
# Log for telemetry, consumed by orchestrator
artifact_id = data.get('artifact_id', 'unknown')
artifact_type = data.get('artifact_type', 'unknown')
self.ap.logger.debug(
f'Runner {descriptor.id} artifact.created logged: artifact_id={artifact_id}, type={artifact_type}'
)
return None
else:
# Unknown type - warn and ignore.
self.ap.logger.warning(