fix: enforce agent run API permissions

This commit is contained in:
huanghuoguoguo
2026-05-30 20:14:06 +08:00
parent bbe7666642
commit 93cd852061
12 changed files with 522 additions and 166 deletions

View File

@@ -141,6 +141,70 @@ def _validate_artifact_access(
return False, f'Artifact {operation} access denied: artifact not in session conversation and not created by this run'
async def _validate_agent_run_session(
run_id: str,
caller_plugin_identity: str | None,
ap: app.Application,
api_name: str,
permission_group: str | None = None,
permission_operation: str | None = None,
) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]:
"""Validate an AgentRunner pull API run session and optional manifest permission."""
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return None, handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return None, handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
ap.logger.warning(
f'{api_name}: caller_plugin_identity {caller_plugin_identity} '
f'does not match session plugin_identity {session_plugin_identity}'
)
return None, handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
if permission_group and permission_operation:
permissions = session.get('permissions', {})
allowed_operations = permissions.get(permission_group, [])
if permission_operation not in allowed_operations:
return None, handler.ActionResponse.error(
message=f'{api_name} access not authorized'
)
return session, None
def _resolve_run_conversation(
session: dict[str, Any],
requested_conversation_id: str | None,
api_name: str,
) -> tuple[str | None, handler.ActionResponse | None]:
"""Resolve and enforce current-run conversation scope."""
session_conversation_id = session.get('conversation_id')
if requested_conversation_id:
if not session_conversation_id:
return None, handler.ActionResponse.error(
message=f'{api_name} is not available without a run conversation'
)
if requested_conversation_id != session_conversation_id:
return None, handler.ActionResponse.error(
message=f'Conversation {requested_conversation_id} is not accessible by this run'
)
return requested_conversation_id, None
return session_conversation_id, None
def _normalize_uuid_list(values: Any) -> list[str]:
"""Normalize a user/config supplied UUID list while preserving order."""
if not isinstance(values, list):
@@ -1197,7 +1261,7 @@ class RuntimeConnectionHandler(handler.Handler):
kb_id = data['kb_id']
query_text = data['query_text']
top_k = data.get('top_k', 5)
filters = data.get('filters', {})
filters = data.get('filters') or {}
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
@@ -1271,7 +1335,7 @@ class RuntimeConnectionHandler(handler.Handler):
kb_id = data['kb_id']
query_text = data['query_text']
top_k = data.get('top_k', 5)
filters = data.get('filters', {})
filters = data.get('filters') or {}
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
@@ -1342,29 +1406,24 @@ class RuntimeConnectionHandler(handler.Handler):
if not run_id:
return handler.ActionResponse.error(message='run_id is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'History page',
permission_group='history',
permission_operation='page',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get conversation from session if not provided
if not conversation_id:
conversation_id = session.get('conversation_id')
conversation_id, scope_error = _resolve_run_conversation(
session,
conversation_id,
'History page',
)
if scope_error:
return scope_error
if not conversation_id:
return handler.ActionResponse.success(data={
@@ -1411,35 +1470,32 @@ class RuntimeConnectionHandler(handler.Handler):
"""
run_id = data.get('run_id')
query_text = data.get('query', '')
filters = data.get('filters', {})
filters = data.get('filters') or {}
top_k = data.get('top_k', 10)
caller_plugin_identity = data.get('caller_plugin_identity')
if not run_id:
return handler.ActionResponse.error(message='run_id is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'History search',
permission_group='history',
permission_operation='search',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get conversation from session or filters
conversation_id = filters.get('conversation_id') or session.get('conversation_id')
requested_conversation_id = filters.get('conversation_id')
conversation_id, scope_error = _resolve_run_conversation(
session,
requested_conversation_id,
'History search',
)
if scope_error:
return scope_error
if not conversation_id:
return handler.ActionResponse.success(data={
@@ -1453,10 +1509,11 @@ class RuntimeConnectionHandler(handler.Handler):
store = TranscriptStore(self.ap.persistence_mgr.get_db_engine())
try:
safe_filters = {k: v for k, v in filters.items() if k != 'conversation_id'}
items = await store.search_transcript(
conversation_id=conversation_id,
query_text=query_text,
filters=filters,
filters=safe_filters,
top_k=top_k,
)
@@ -1485,25 +1542,16 @@ class RuntimeConnectionHandler(handler.Handler):
if not event_id:
return handler.ActionResponse.error(message='event_id is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'Event get',
permission_group='events',
permission_operation='get',
)
if error:
return error
# Get event
from ..agent.runner.event_log_store import EventLogStore
@@ -1516,9 +1564,12 @@ class RuntimeConnectionHandler(handler.Handler):
message=f'Event {event_id} not found'
)
# Validate event is in the same conversation as the run
# Validate event is in the same conversation as the run, or was created by the same run.
session_conversation_id = session.get('conversation_id')
if session_conversation_id and event.get('conversation_id') != session_conversation_id:
event_run_id = event.get('run_id')
if event_run_id and event_run_id == run_id:
return handler.ActionResponse.success(data=event)
if not session_conversation_id or event.get('conversation_id') != session_conversation_id:
return handler.ActionResponse.error(
message=f'Event {event_id} is not accessible by this run'
)
@@ -1544,29 +1595,24 @@ class RuntimeConnectionHandler(handler.Handler):
if not run_id:
return handler.ActionResponse.error(message='run_id is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'Event page',
permission_group='events',
permission_operation='page',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get conversation from session if not provided
if not conversation_id:
conversation_id = session.get('conversation_id')
conversation_id, scope_error = _resolve_run_conversation(
session,
conversation_id,
'Event page',
)
if scope_error:
return scope_error
if not conversation_id:
return handler.ActionResponse.success(data={
@@ -1620,33 +1666,16 @@ class RuntimeConnectionHandler(handler.Handler):
if not artifact_id:
return handler.ActionResponse.error(message='artifact_id is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Check artifact permission from session.permissions (from descriptor.permissions)
permissions = session.get('permissions', {})
artifact_permissions = permissions.get('artifacts', [])
if 'metadata' not in artifact_permissions:
return handler.ActionResponse.error(
message='Artifact metadata access not authorized'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'Artifact metadata',
permission_group='artifacts',
permission_operation='metadata',
)
if error:
return error
# Get artifact metadata
from ..agent.runner.artifact_store import ArtifactStore
@@ -1708,33 +1737,16 @@ class RuntimeConnectionHandler(handler.Handler):
if limit <= 0:
return handler.ActionResponse.error(message='limit must be > 0')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Check artifact permission from session.permissions (from descriptor.permissions)
permissions = session.get('permissions', {})
artifact_permissions = permissions.get('artifacts', [])
if 'read' not in artifact_permissions:
return handler.ActionResponse.error(
message='Artifact read access not authorized'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'Artifact read',
permission_group='artifacts',
permission_operation='read',
)
if error:
return error
# Get artifact metadata first to validate access
from ..agent.runner.artifact_store import ArtifactStore