feat(agent-runner): add artifact store pull APIs

This commit is contained in:
huanghuoguoguo
2026-05-23 17:29:18 +08:00
parent 0a3bafae4b
commit 201c805802
12 changed files with 1728 additions and 170 deletions

View File

@@ -100,6 +100,47 @@ def _build_tool_detail(tool: Any, requested_tool_name: str | None = None) -> dic
}
def _validate_artifact_access(
session: dict[str, Any],
artifact_metadata: dict[str, Any],
operation: str,
) -> tuple[bool, str | None]:
"""Validate artifact access for a run session.
Authorization rules (evaluated in order, first match wins):
1. Artifact run_id matches session run_id → ALLOW (created by this run)
2. Artifact has conversation_id AND matches session conversation_id → ALLOW (same conversation)
3. Otherwise → DENY
Note: Artifacts without conversation_id are NOT globally accessible by default.
Without an explicit scope field, we enforce strict access control.
Args:
session: AgentRunSession dict with run_id, conversation_id, permissions
artifact_metadata: Artifact metadata dict with conversation_id, run_id
operation: Operation name for error messages ('metadata' or 'read')
Returns:
Tuple of (is_allowed, error_message). If is_allowed is False, error_message contains reason.
"""
artifact_conversation_id = artifact_metadata.get('conversation_id')
artifact_run_id = artifact_metadata.get('run_id')
session_conversation_id = session.get('conversation_id')
session_run_id = session.get('run_id')
# Rule 1: Created by this run (allows cross-conversation access for self-created artifacts)
if artifact_run_id and artifact_run_id == session_run_id:
return True, None
# Rule 2: Same conversation (requires artifact to have conversation_id)
if artifact_conversation_id and session_conversation_id:
if artifact_conversation_id == session_conversation_id:
return True, None
# Rule 3: Deny - no matching authorization rule
return False, f'Artifact {operation} access denied: artifact not in session conversation and not created by this run'
def _normalize_uuid_list(values: Any) -> list[str]:
"""Normalize a user/config supplied UUID list while preserving order."""
if not isinstance(values, list):
@@ -1542,6 +1583,169 @@ class RuntimeConnectionHandler(handler.Handler):
self.ap.logger.error(f'EVENT_PAGE error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Event page error: {e}')
# ================= Artifact APIs =================
@self.action(PluginToRuntimeAction.ARTIFACT_METADATA)
async def artifact_metadata(data: dict[str, Any]) -> handler.ActionResponse:
"""Get artifact metadata.
Requires run_id authorization. Only allows access to artifacts
in current run's conversation or created by current run.
"""
run_id = data.get('run_id')
artifact_id = data.get('artifact_id')
caller_plugin_identity = data.get('caller_plugin_identity')
if not run_id:
return handler.ActionResponse.error(message='run_id is required')
if not artifact_id:
return handler.ActionResponse.error(message='artifact_id is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
# Validate caller plugin identity
if caller_plugin_identity:
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity and caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Check artifact permission from session.permissions (from descriptor.permissions)
permissions = session.get('permissions', {})
artifact_permissions = permissions.get('artifacts', [])
if 'metadata' not in artifact_permissions:
return handler.ActionResponse.error(
message='Artifact metadata access not authorized'
)
# Get artifact metadata
from ..agent.runner.artifact_store import ArtifactStore
store = ArtifactStore(self.ap.persistence_mgr.get_db_engine())
try:
metadata = await store.get_metadata(artifact_id)
if not metadata:
return handler.ActionResponse.error(
message=f'Artifact {artifact_id} not found'
)
# Validate artifact access scope
is_allowed, error_msg = _validate_artifact_access(session, metadata, 'metadata')
if not is_allowed:
return handler.ActionResponse.error(message=error_msg)
return handler.ActionResponse.success(data=metadata)
except Exception as e:
self.ap.logger.error(f'ARTIFACT_METADATA error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Artifact metadata error: {e}')
@self.action(PluginToRuntimeAction.ARTIFACT_READ)
async def artifact_read(data: dict[str, Any]) -> handler.ActionResponse:
"""Read artifact content.
Requires run_id authorization. Only allows access to artifacts
in current run's conversation or created by current run.
Supports range reads with offset/limit.
"""
run_id = data.get('run_id')
artifact_id = data.get('artifact_id')
caller_plugin_identity = data.get('caller_plugin_identity')
if not run_id:
return handler.ActionResponse.error(message='run_id is required')
if not artifact_id:
return handler.ActionResponse.error(message='artifact_id is required')
# Validate and parse offset
offset = data.get('offset', 0)
if not isinstance(offset, int):
try:
offset = int(offset)
except (TypeError, ValueError):
return handler.ActionResponse.error(message='offset must be an integer')
if offset < 0:
return handler.ActionResponse.error(message='offset must be >= 0')
# Validate and parse limit if provided
limit = data.get('limit')
if limit is not None:
if not isinstance(limit, int):
try:
limit = int(limit)
except (TypeError, ValueError):
return handler.ActionResponse.error(message='limit must be an integer')
if limit <= 0:
return handler.ActionResponse.error(message='limit must be > 0')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
# Validate caller plugin identity
if caller_plugin_identity:
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity and caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Check artifact permission from session.permissions (from descriptor.permissions)
permissions = session.get('permissions', {})
artifact_permissions = permissions.get('artifacts', [])
if 'read' not in artifact_permissions:
return handler.ActionResponse.error(
message='Artifact read access not authorized'
)
# Get artifact metadata first to validate access
from ..agent.runner.artifact_store import ArtifactStore
store = ArtifactStore(self.ap.persistence_mgr.get_db_engine())
try:
metadata = await store.get_metadata(artifact_id)
if not metadata:
return handler.ActionResponse.error(
message=f'Artifact {artifact_id} not found'
)
# Validate artifact access scope
is_allowed, error_msg = _validate_artifact_access(session, metadata, 'read')
if not is_allowed:
return handler.ActionResponse.error(message=error_msg)
# Read artifact content (validates offset/limit internally)
result = await store.read_artifact(
artifact_id=artifact_id,
offset=offset,
limit=limit,
)
if not result:
return handler.ActionResponse.error(
message=f'Failed to read artifact {artifact_id}'
)
return handler.ActionResponse.success(data=result)
except ValueError as e:
# Offset/limit validation error
return handler.ActionResponse.error(message=str(e))
except Exception as e:
self.ap.logger.error(f'ARTIFACT_READ error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Artifact read error: {e}')
@self.action(CommonAction.PING)
async def ping(data: dict[str, Any]) -> handler.ActionResponse:
"""Ping"""