From c7d4885bfc182ff1656f06f6f9cf73325f2cda82 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <60681390+huanghuoguoguo@users.noreply.github.com> Date: Mon, 22 Jun 2026 13:08:34 +0800 Subject: [PATCH] refactor(plugin): split agent-runner action handlers out of handler.py Extract the AgentRunner Protocol v1 host-side surface from the giant RuntimeConnectionHandler.__init__ into sibling modules using a registration- function pattern (behavior-preserving; @h.action == @self.action): - agent_run_support.py: shared constants + authorization/scope/projection helpers - agent_pull_actions.py: register(h) for history/event pull APIs - agent_runner_actions.py: register(h) for run/runtime/stats/claim lifecycle - agent_state_actions.py: register(h) for steering/state APIs __init__ now calls the three register(self) functions. handler.py keeps the pre-existing plugin/llm/vector/knowledge handlers, get_prompt/call_tool/ get_tool_detail (coupled to retained helpers), shared helpers, and outbound methods; it re-imports _validate_agent_run_session so external imports keep working. handler.py: 4066 -> 1871 lines. test_state_api_auth.py: repoint get_session_registry patch targets to agent_run_support (the lookup moved modules). 385 agent unit tests pass; ruff clean. --- src/langbot/pkg/plugin/agent_pull_actions.py | 293 +++ src/langbot/pkg/plugin/agent_run_support.py | 488 ++++ .../pkg/plugin/agent_runner_actions.py | 1195 +++++++++ src/langbot/pkg/plugin/agent_state_actions.py | 316 +++ src/langbot/pkg/plugin/handler.py | 2209 +---------------- tests/unit_tests/agent/test_state_api_auth.py | 20 +- 6 files changed, 2309 insertions(+), 2212 deletions(-) create mode 100644 src/langbot/pkg/plugin/agent_pull_actions.py create mode 100644 src/langbot/pkg/plugin/agent_run_support.py create mode 100644 src/langbot/pkg/plugin/agent_runner_actions.py create mode 100644 src/langbot/pkg/plugin/agent_state_actions.py diff --git a/src/langbot/pkg/plugin/agent_pull_actions.py b/src/langbot/pkg/plugin/agent_pull_actions.py new file mode 100644 index 000000000..e1e3a1714 --- /dev/null +++ b/src/langbot/pkg/plugin/agent_pull_actions.py @@ -0,0 +1,293 @@ +"""Agent-runner pull actions (history / event).""" + +from __future__ import annotations + +from typing import Any + + +from langbot_plugin.runtime.io import handler +from langbot_plugin.entities.io.actions.enums import ( + PluginToRuntimeAction, +) + + +from .agent_run_support import ( + _get_run_authorization, + _validate_agent_run_session, + _resolve_run_conversation, + _run_scope_filters, + _event_matches_run_scope, + _project_event_record_for_api, +) + + +def register(h): + @h.action(PluginToRuntimeAction.HISTORY_PAGE) + async def history_page(data: dict[str, Any]) -> handler.ActionResponse: + """Page through transcript history for a conversation. + + Requires run_id authorization. Only allows access to current run's conversation. + """ + run_id = data.get('run_id') + conversation_id = data.get('conversation_id') + before_cursor = data.get('before_cursor') + after_cursor = data.get('after_cursor') + limit = data.get('limit', 50) + direction = data.get('direction', 'backward') + include_attachments = data.get('include_attachments', False) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'History page', + api_capability='history_page', + ) + if error: + return error + + conversation_id, scope_error = _resolve_run_conversation( + session, + conversation_id, + 'History page', + ) + if scope_error: + return scope_error + + if not conversation_id: + return handler.ActionResponse.success( + data={ + 'items': [], + 'next_cursor': None, + 'prev_cursor': None, + 'has_more': False, + } + ) + + # Parse cursors + before_seq = int(before_cursor) if before_cursor else None + after_seq = int(after_cursor) if after_cursor else None + + # Query transcript + from ..agent.runner.transcript_store import TranscriptStore + + store = TranscriptStore(h.ap.persistence_mgr.get_db_engine()) + + try: + items, next_seq, prev_seq, has_more = await store.page_transcript( + conversation_id=conversation_id, + before_seq=before_seq, + after_seq=after_seq, + limit=limit, + direction=direction, + include_attachments=include_attachments, + **_run_scope_filters(session), + ) + + return handler.ActionResponse.success( + data={ + 'items': items, + 'next_cursor': str(next_seq) if next_seq else None, + 'prev_cursor': str(prev_seq) if prev_seq else None, + 'has_more': has_more, + } + ) + except Exception as e: + h.ap.logger.error(f'HISTORY_PAGE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'History page error: {e}') + + @h.action(PluginToRuntimeAction.HISTORY_SEARCH) + async def history_search(data: dict[str, Any]) -> handler.ActionResponse: + """Search transcript history. + + Requires run_id authorization. Only searches current run's conversation. + Basic implementation using LIKE filtering. + """ + run_id = data.get('run_id') + query_text = data.get('query', '') + filters = data.get('filters') or {} + top_k = data.get('top_k', 10) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'History search', + api_capability='history_search', + ) + if error: + return error + + requested_conversation_id = filters.get('conversation_id') + conversation_id, scope_error = _resolve_run_conversation( + session, + requested_conversation_id, + 'History search', + ) + if scope_error: + return scope_error + + if not conversation_id: + return handler.ActionResponse.success( + data={ + 'items': [], + 'total_count': 0, + 'query': query_text, + } + ) + + # Search transcript + from ..agent.runner.transcript_store import TranscriptStore + + store = TranscriptStore(h.ap.persistence_mgr.get_db_engine()) + + try: + safe_filters = {k: v for k, v in filters.items() if k != 'conversation_id'} + items = await store.search_transcript( + conversation_id=conversation_id, + query_text=query_text, + filters=safe_filters, + top_k=top_k, + **_run_scope_filters(session), + ) + + return handler.ActionResponse.success( + data={ + 'items': items, + 'total_count': len(items), + 'query': query_text, + } + ) + except Exception as e: + h.ap.logger.error(f'HISTORY_SEARCH error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'History search error: {e}') + + @h.action(PluginToRuntimeAction.EVENT_GET) + async def event_get(data: dict[str, Any]) -> handler.ActionResponse: + """Get a single event record by ID. + + Requires run_id authorization. Only allows access to events in current run's conversation. + """ + run_id = data.get('run_id') + event_id = data.get('event_id') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if not event_id: + return handler.ActionResponse.error(message='event_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Event get', + api_capability='event_get', + ) + if error: + return error + + # Get event + from ..agent.runner.event_log_store import EventLogStore + + store = EventLogStore(h.ap.persistence_mgr.get_db_engine()) + + try: + event = await store.get_event(event_id) + if not event: + return handler.ActionResponse.error(message=f'Event {event_id} not found') + + # Validate event is in the same conversation as the run, or was created by the same run. + session_conversation_id = _get_run_authorization(session).get('conversation_id') + event_run_id = event.get('run_id') + if event_run_id and event_run_id == run_id: + return handler.ActionResponse.success(data=_project_event_record_for_api(event)) + if not session_conversation_id or not _event_matches_run_scope(session, event): + return handler.ActionResponse.error(message=f'Event {event_id} is not accessible by this run') + + return handler.ActionResponse.success(data=_project_event_record_for_api(event)) + except Exception as e: + h.ap.logger.error(f'EVENT_GET error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Event get error: {e}') + + @h.action(PluginToRuntimeAction.EVENT_PAGE) + async def event_page(data: dict[str, Any]) -> handler.ActionResponse: + """Page through event records. + + Requires run_id authorization. Only allows access to current run's conversation. + """ + run_id = data.get('run_id') + conversation_id = data.get('conversation_id') + event_types = data.get('event_types') + before_cursor = data.get('before_cursor') + limit = data.get('limit', 50) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Event page', + api_capability='event_page', + ) + if error: + return error + + conversation_id, scope_error = _resolve_run_conversation( + session, + conversation_id, + 'Event page', + ) + if scope_error: + return scope_error + + if not conversation_id: + return handler.ActionResponse.success( + data={ + 'items': [], + 'next_cursor': None, + 'prev_cursor': None, + 'has_more': False, + } + ) + + # Parse cursor + before_seq = int(before_cursor) if before_cursor else None + + # Query events + from ..agent.runner.event_log_store import EventLogStore + + store = EventLogStore(h.ap.persistence_mgr.get_db_engine()) + + try: + items, next_seq, has_more = await store.page_events( + conversation_id=conversation_id, + event_types=event_types, + before_seq=before_seq, + limit=limit, + **_run_scope_filters(session), + ) + + return handler.ActionResponse.success( + data={ + 'items': [_project_event_record_for_api(item) for item in items], + 'next_cursor': str(next_seq) if next_seq else None, + 'prev_cursor': None, + 'has_more': has_more, + } + ) + except Exception as e: + h.ap.logger.error(f'EVENT_PAGE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Event page error: {e}') diff --git a/src/langbot/pkg/plugin/agent_run_support.py b/src/langbot/pkg/plugin/agent_run_support.py new file mode 100644 index 000000000..d19a08b55 --- /dev/null +++ b/src/langbot/pkg/plugin/agent_run_support.py @@ -0,0 +1,488 @@ +"""Agent-runner protocol support: shared constants and authorization/scope/projection helpers extracted from handler.py.""" + +from __future__ import annotations + +from typing import Any, Union +import json +import time + +import sqlalchemy + +from langbot_plugin.runtime.io import handler +from langbot_plugin.entities.io.actions.enums import ( + PluginToRuntimeAction, +) + + +from ..core import app +from ..agent.runner.session_registry import get_session_registry +from ..agent.runner.result_normalizer import MAX_RESULT_SIZE_BYTES, STRICT_RESULT_PAYLOADS + + +class _RuntimeActionName: + def __init__(self, value: str): + self.value = value + + +AGENT_RUN_ADMIN_PERMISSION = 'agent_run:admin' +RUNTIME_ADMIN_PERMISSION = 'runtime:admin' +AGENT_RUNNER_ADMIN_PERMISSION = 'agent_runner:admin' +LEDGER_ONLY_SIDE_EFFECTING_RESULT_TYPES = { + 'message.delta', + 'message.completed', + 'state.updated', + 'run.completed', + 'run.failed', +} + + +def _plugin_runtime_action(name: str, value: str) -> Any: + return getattr(PluginToRuntimeAction, name, _RuntimeActionName(value)) + + +def _normalize_permission_set(value: Any) -> set[str]: + if isinstance(value, str): + return {permission.strip() for permission in value.split(',') if permission.strip()} + if isinstance(value, list): + return {str(item).strip() for item in value if str(item).strip()} + if isinstance(value, dict): + return {str(item).strip() for item, enabled in value.items() if enabled and str(item).strip()} + return set() + + +def _iter_agent_runner_admin_plugin_configs(ap: app.Application) -> list[dict[str, Any]]: + instance_config = getattr(ap, 'instance_config', None) + config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {} + if not isinstance(config_data, dict): + return [] + agent_runner_config = config_data.get('agent_runner', {}) + if not isinstance(agent_runner_config, dict): + return [] + raw_admin_plugins = agent_runner_config.get('admin_plugins', []) + if isinstance(raw_admin_plugins, dict): + items: list[dict[str, Any]] = [] + for identity, entry in raw_admin_plugins.items(): + if isinstance(entry, dict): + merged = dict(entry) + merged.setdefault('identity', identity) + items.append(merged) + else: + items.append({'identity': identity, 'permissions': entry}) + return items + if isinstance(raw_admin_plugins, list): + return [item for item in raw_admin_plugins if isinstance(item, dict)] + return [] + + +def _agent_runner_admin_permissions(ap: app.Application, plugin_identity: str | None) -> set[str]: + if not isinstance(plugin_identity, str) or not plugin_identity.strip(): + return set() + normalized_identity = plugin_identity.strip() + permissions: set[str] = set() + for entry in _iter_agent_runner_admin_plugin_configs(ap): + if entry.get('enabled', True) is False: + continue + identity = entry.get('identity') or entry.get('plugin_identity') or entry.get('plugin') or entry.get('id') + if identity != normalized_identity: + continue + permissions.update(_normalize_permission_set(entry.get('permissions'))) + permissions.update(_normalize_permission_set(entry.get('scopes'))) + return permissions + + +def _has_agent_runner_admin_permission( + ap: app.Application, + plugin_identity: str | None, + permission: str, +) -> bool: + permissions = _agent_runner_admin_permissions(ap, plugin_identity) + if not permissions: + return False + domain = permission.split(':', 1)[0] + return bool( + permission in permissions + or f'{domain}:*' in permissions + or AGENT_RUNNER_ADMIN_PERMISSION in permissions + or '*' in permissions + ) + + +def _deadline_seconds_from_payload(data: dict[str, Any], default: int = 60) -> int: + deadline_at = data.get('heartbeat_deadline_at') + if deadline_at is not None: + try: + return max(int(float(deadline_at) - time.time()), 1) + except (TypeError, ValueError): + pass + try: + return max(int(data.get('heartbeat_ttl_seconds') or default), 1) + except (TypeError, ValueError): + return default + + +def _get_run_authorization(session: dict[str, Any]) -> dict[str, Any]: + """Return the run-scoped authorization snapshot.""" + return session['authorization'] + + +def _run_matches_run_scope(session: dict[str, Any], run: dict[str, Any]) -> bool: + authorization = _get_run_authorization(session) + session_run_id = session.get('run_id') + if run.get('run_id') == session_run_id: + return True + session_runner_id = session.get('runner_id') or authorization.get('runner_id') + if not session_runner_id or run.get('runner_id') != session_runner_id: + return False + if not authorization.get('conversation_id'): + return False + if run.get('conversation_id') != authorization.get('conversation_id'): + return False + if authorization.get('bot_id') is not None and authorization.get('bot_id') != run.get('bot_id'): + return False + if authorization.get('workspace_id') is not None and authorization.get('workspace_id') != run.get('workspace_id'): + return False + if authorization.get('thread_id') != run.get('thread_id'): + return False + return True + + +def _authorize_target_run( + session: dict[str, Any], + run: dict[str, Any], +) -> handler.ActionResponse | None: + """Authorize non-admin target-run access against scope and runner owner.""" + if _run_matches_run_scope(session, run): + return None + return handler.ActionResponse.error(message=f'Run {run.get("run_id")} is not accessible by this run') + + +def _validate_ledger_only_result_payload( + *, + ap: app.Application, + runner_id: str | None, + event_type: str, + data: dict[str, Any], +) -> str | None: + """Validate result payloads that can be safely stored without side effects.""" + try: + result_json = json.dumps({'type': event_type, 'data': data}) + except (TypeError, ValueError) as exc: + return f'event data must be JSON serializable: {exc}' + if len(result_json) > MAX_RESULT_SIZE_BYTES: + return f'event payload exceeds {MAX_RESULT_SIZE_BYTES} bytes' + + payload_model = STRICT_RESULT_PAYLOADS.get(event_type) + if payload_model is None: + return f'unknown result type: {event_type}' + try: + payload_model.model_validate(data) + except Exception as exc: + return f'invalid {event_type} payload: {exc}' + + if event_type in LEDGER_ONLY_SIDE_EFFECTING_RESULT_TYPES: + if runner_id: + ap.logger.warning( + f'Runner {runner_id} attempted ledger-only append for side-effecting result type {event_type}' + ) + return f'{event_type} must be emitted through the canonical runner result path' + return None + + +async def _require_runtime_write_ownership( + *, + store: Any, + session: dict[str, Any], + run: dict[str, Any], + data: dict[str, Any], + api_name: str, +) -> handler.ActionResponse | None: + """Require current-run ownership or an active runtime claim for run writes.""" + if run.get('run_id') == session.get('run_id') and run.get('status') != 'claimed': + return None + + runtime_id = data.get('runtime_id') + claim_token = data.get('claim_token') + if not runtime_id or not claim_token: + return handler.ActionResponse.error( + message=f'{api_name} requires active claim ownership for target run {run.get("run_id")}' + ) + + if not await store.validate_active_claim( + run_id=str(run.get('run_id')), + runtime_id=str(runtime_id), + claim_token=str(claim_token), + ): + return handler.ActionResponse.error( + message=f'{api_name} claim ownership is not active for target run {run.get("run_id")}' + ) + + return None + + +def _resolve_state_scope( + session: dict[str, Any], + scope: str, +) -> tuple[dict[str, Any] | None, str | None, handler.ActionResponse | None]: + """Resolve state policy/context for an authorized run scope.""" + authorization = _get_run_authorization(session) + state_policy = authorization['state_policy'] + + if not state_policy.get('enable_state', True): + return None, None, handler.ActionResponse.error(message='State access is disabled by binding policy') + + state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) + if scope not in state_scopes: + return None, None, handler.ActionResponse.error(message=f'Scope "{scope}" is not enabled by binding policy') + + state_context = authorization['state_context'] + scope_key = state_context.get('scope_keys', {}).get(scope) + if not scope_key: + return None, None, handler.ActionResponse.error(message=f'Scope key not available for scope "{scope}"') + + return state_context, scope_key, None + + +async def _validate_agent_run_session( + run_id: str, + caller_plugin_identity: str | None, + ap: app.Application, + api_name: str, + api_capability: str | None = None, + allow_persistent_authorization: bool = False, + admin_permission: str | None = None, +) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]: + """Validate an AgentRunner pull API run session and run-scoped API access.""" + if ( + not run_id + and admin_permission + and _has_agent_runner_admin_permission( + ap, + caller_plugin_identity, + admin_permission, + ) + ): + return { + 'run_id': run_id, + 'runner_id': None, + 'query_id': None, + 'plugin_identity': caller_plugin_identity, + 'authorization': {}, + 'status': {}, + 'steering_queue': [], + }, None + + session_registry = get_session_registry() + session = await session_registry.get(run_id) + if not session: + if allow_persistent_authorization: + session = await _load_persistent_agent_run_session(run_id, ap, api_name) + if not session: + return None, handler.ActionResponse.error(message=f'Run session {run_id} not found or expired') + + session_plugin_identity = session.get('plugin_identity') + if not isinstance(session_plugin_identity, str) or not session_plugin_identity.strip(): + ap.logger.warning(f'{api_name}: run_id {run_id} has no plugin_identity') + return None, handler.ActionResponse.error(message=f'Run session {run_id} has no plugin_identity') + if not caller_plugin_identity: + return None, handler.ActionResponse.error(message=f'caller_plugin_identity is required for run_id {run_id}') + if caller_plugin_identity != session_plugin_identity: + ap.logger.warning( + f'{api_name}: caller_plugin_identity {caller_plugin_identity} ' + f'does not match session plugin_identity {session_plugin_identity}' + ) + return None, handler.ActionResponse.error(message=f'Plugin identity mismatch for run_id {run_id}') + + if api_capability: + available_apis = _get_run_authorization(session).get('available_apis', {}) + has_admin_permission = bool(admin_permission) and _has_agent_runner_admin_permission( + ap, + caller_plugin_identity, + admin_permission, + ) + if not available_apis.get(api_capability, False) and not has_admin_permission: + return None, handler.ActionResponse.error(message=f'{api_name} access not authorized') + + return session, None + + +async def _load_persistent_agent_run_session( + run_id: str, + ap: app.Application, + api_name: str, +) -> dict[str, Any] | None: + """Load an expired run session from the AgentRun authorization snapshot.""" + try: + from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.orm import sessionmaker + + from ..entity.persistence.agent_run import AgentRun + + engine = ap.persistence_mgr.get_db_engine() + session_factory = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as db_session: + result = await db_session.execute(sqlalchemy.select(AgentRun).where(AgentRun.run_id == run_id)) + run = result.scalars().first() + except Exception as e: + ap.logger.error(f'{api_name}: failed to load persistent authorization for run_id {run_id}: {e}', exc_info=True) + return None + + if run is None: + return None + + try: + authorization = json.loads(run.authorization_json) if run.authorization_json else {} + except (TypeError, ValueError) as e: + ap.logger.warning(f'{api_name}: run_id {run_id} has invalid authorization_json: {e}') + return None + + if not isinstance(authorization, dict): + ap.logger.warning(f'{api_name}: run_id {run_id} authorization_json is not an object') + return None + + return { + 'run_id': run.run_id, + 'runner_id': authorization.get('runner_id') or run.runner_id, + 'query_id': None, + 'plugin_identity': authorization.get('plugin_identity'), + 'authorization': authorization, + 'status': {}, + 'steering_queue': [], + } + + +def _resolve_run_conversation( + session: dict[str, Any], + requested_conversation_id: str | None, + api_name: str, +) -> tuple[str | None, handler.ActionResponse | None]: + """Resolve and enforce current-run conversation scope.""" + session_conversation_id = _get_run_authorization(session).get('conversation_id') + + if requested_conversation_id: + if not session_conversation_id: + return None, handler.ActionResponse.error(message=f'{api_name} is not available without a run conversation') + if requested_conversation_id != session_conversation_id: + return None, handler.ActionResponse.error( + message=f'Conversation {requested_conversation_id} is not accessible by this run' + ) + return requested_conversation_id, None + + return session_conversation_id, None + + +def _run_scope_filters(session: dict[str, Any]) -> dict[str, Any]: + authorization = _get_run_authorization(session) + return { + 'bot_id': authorization.get('bot_id'), + 'workspace_id': authorization.get('workspace_id'), + 'thread_id': authorization.get('thread_id'), + 'strict_thread': True, + } + + +def _run_ledger_scope_filters(session: dict[str, Any]) -> dict[str, Any]: + authorization = _get_run_authorization(session) + filters = _run_scope_filters(session) + filters['runner_id'] = session.get('runner_id') or authorization.get('runner_id') + return filters + + +def _event_matches_run_scope(session: dict[str, Any], event: dict[str, Any]) -> bool: + authorization = _get_run_authorization(session) + if authorization.get('conversation_id') != event.get('conversation_id'): + return False + if authorization.get('bot_id') is not None and authorization.get('bot_id') != event.get('bot_id'): + return False + if authorization.get('workspace_id') is not None and authorization.get('workspace_id') != event.get('workspace_id'): + return False + if authorization.get('thread_id') != event.get('thread_id'): + return False + return True + + +def _project_event_record_for_api(event: dict[str, Any]) -> dict[str, Any]: + """Project EventLogStore rows onto the SDK AgentEventRecord DTO.""" + seq = event.get('seq') or event.get('id') + return { + 'event_id': event.get('event_id'), + 'event_type': event.get('event_type'), + 'event_time': event.get('event_time'), + 'source': event.get('source'), + 'bot_id': event.get('bot_id'), + 'workspace_id': event.get('workspace_id'), + 'conversation_id': event.get('conversation_id'), + 'thread_id': event.get('thread_id'), + 'actor_type': event.get('actor_type'), + 'actor_id': event.get('actor_id'), + 'actor_name': event.get('actor_name'), + 'subject_type': event.get('subject_type'), + 'subject_id': event.get('subject_id'), + 'input_summary': event.get('input_summary'), + 'input_ref': event.get('input_ref'), + 'raw_ref': event.get('raw_ref'), + 'seq': seq, + 'cursor': event.get('cursor') or (str(seq) if seq is not None else None), + 'created_at': event.get('created_at'), + 'metadata': event.get('metadata') or {}, + } + + +def _project_runner_descriptor_for_api(descriptor: Any) -> dict[str, Any]: + """Project an AgentRunnerDescriptor-like object onto a JSON dict.""" + if isinstance(descriptor, dict): + return dict(descriptor) + if hasattr(descriptor, 'model_dump'): + return descriptor.model_dump(mode='json') + return { + 'id': getattr(descriptor, 'id', None), + 'source': getattr(descriptor, 'source', None), + 'label': getattr(descriptor, 'label', {}), + 'description': getattr(descriptor, 'description', None), + 'plugin_author': getattr(descriptor, 'plugin_author', None), + 'plugin_name': getattr(descriptor, 'plugin_name', None), + 'runner_name': getattr(descriptor, 'runner_name', None), + 'plugin_version': getattr(descriptor, 'plugin_version', None), + 'config_schema': getattr(descriptor, 'config_schema', []), + 'capabilities': getattr(descriptor, 'capabilities', {}), + 'permissions': getattr(descriptor, 'permissions', {}), + 'raw_manifest': getattr(descriptor, 'raw_manifest', {}), + } + + +async def _record_agent_runner_admin_action( + ap: app.Application, + store: Any, + *, + action: str, + caller_plugin_identity: str | None, + permission: str, + durable_run_id: str | None = None, + target_runtime_id: str | None = None, + detail: dict[str, Any] | None = None, +) -> None: + """Record a small audit trail for privileged AgentRunner operations.""" + audit_data: dict[str, Any] = { + 'action': action, + 'caller_plugin_identity': caller_plugin_identity, + 'permission': permission, + } + if durable_run_id: + audit_data['target_run_id'] = durable_run_id + if target_runtime_id: + audit_data['target_runtime_id'] = target_runtime_id + if detail: + audit_data['detail'] = detail + + ap.logger.info('Agent runner admin action: %s', audit_data) + if not durable_run_id or store is None or not hasattr(store, 'append_audit_event'): + return + + try: + await store.append_audit_event( + run_id=str(durable_run_id), + event_type=f'admin.{action}', + data=audit_data, + metadata={'permission': permission}, + ) + except Exception as exc: + ap.logger.warning(f'Failed to record AgentRunner admin audit event: {exc}', exc_info=True) diff --git a/src/langbot/pkg/plugin/agent_runner_actions.py b/src/langbot/pkg/plugin/agent_runner_actions.py new file mode 100644 index 000000000..40e72e194 --- /dev/null +++ b/src/langbot/pkg/plugin/agent_runner_actions.py @@ -0,0 +1,1195 @@ +"""Agent-runner run / runtime / stats / claim actions.""" + +from __future__ import annotations + +from typing import Any +import time + + +from langbot_plugin.runtime.io import handler + + +from ..agent.runner.run_ledger_store import TERMINAL_STATUSES + +from .agent_run_support import ( + AGENT_RUN_ADMIN_PERMISSION, + RUNTIME_ADMIN_PERMISSION, + _plugin_runtime_action, + _has_agent_runner_admin_permission, + _deadline_seconds_from_payload, + _get_run_authorization, + _authorize_target_run, + _validate_ledger_only_result_payload, + _require_runtime_write_ownership, + _validate_agent_run_session, + _resolve_run_conversation, + _run_scope_filters, + _run_ledger_scope_filters, + _project_runner_descriptor_for_api, + _record_agent_runner_admin_action, +) + + +def register(h): + @h.action(_plugin_runtime_action('RUN_GET', 'run_get')) + async def run_get(data: dict[str, Any]) -> handler.ActionResponse: + """Get one Host-owned run record visible to the current run.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') or run_id + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run get', + api_capability='run_get', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + run = await store.get_run(str(target_run_id)) + if not run: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, run) + if auth_error: + return auth_error + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_get', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={'target_run_id': str(target_run_id)}, + ) + return handler.ActionResponse.success(data=run) + except Exception as e: + h.ap.logger.error(f'RUN_GET error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run get error: {e}') + + @h.action(_plugin_runtime_action('RUN_LIST', 'run_list')) + async def run_list(data: dict[str, Any]) -> handler.ActionResponse: + """List Host-owned runs visible to the current run conversation.""" + run_id = data.get('run_id') + conversation_id = data.get('conversation_id') + statuses = data.get('statuses') + before_cursor = data.get('before_cursor') + limit = data.get('limit', 50) + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + + scope_filters: dict[str, Any] = {} + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run list', + api_capability='run_list', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + if not is_admin: + conversation_id, scope_error = _resolve_run_conversation( + session, + conversation_id, + 'Run list', + ) + if scope_error: + return scope_error + scope_filters = _run_ledger_scope_filters(session) + + if not is_admin and not conversation_id: + return handler.ActionResponse.success( + data={ + 'items': [], + 'next_cursor': None, + 'prev_cursor': None, + 'has_more': False, + 'total_count': 0, + } + ) + + if statuses is not None and not isinstance(statuses, list): + return handler.ActionResponse.error(message='statuses must be a list') + try: + before_id = int(before_cursor) if before_cursor else None + except (TypeError, ValueError): + return handler.ActionResponse.error(message='before_cursor must be an integer cursor') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + items, next_cursor, has_more, total_count = await store.list_runs( + conversation_id=conversation_id, + statuses=[str(status) for status in statuses] if statuses else None, + before_id=before_id, + limit=limit, + **scope_filters, + ) + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_list', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'statuses': [str(status) for status in statuses] if statuses else None, + 'limit': limit, + }, + ) + return handler.ActionResponse.success( + data={ + 'items': items, + 'next_cursor': str(next_cursor) if next_cursor else None, + 'prev_cursor': None, + 'has_more': has_more, + 'total_count': total_count, + } + ) + except Exception as e: + h.ap.logger.error(f'RUN_LIST error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run list error: {e}') + + @h.action(_plugin_runtime_action('RUNNER_LIST', 'runner_list')) + async def runner_list(data: dict[str, Any]) -> handler.ActionResponse: + """List Host-discovered AgentRunner descriptors.""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runner list access not authorized') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runner list', + api_capability='runner_list', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + include_plugins = data.get('include_plugins') + if include_plugins is not None and not isinstance(include_plugins, list): + return handler.ActionResponse.error(message='include_plugins must be a list') + + registry = getattr(h.ap, 'agent_runner_registry', None) + if registry is None: + return handler.ActionResponse.success(data={'items': []}) + + try: + runners = await registry.list_runners( + bound_plugins=[str(item) for item in include_plugins] if include_plugins else None, + use_cache=bool(data.get('use_cache', True)), + ) + items = [_project_runner_descriptor_for_api(item) for item in runners] + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + None, + action='runner_list', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'include_plugins': [str(item) for item in include_plugins] if include_plugins else None, + 'count': len(items), + }, + ) + return handler.ActionResponse.success(data={'items': items}) + except Exception as e: + h.ap.logger.error(f'RUNNER_LIST error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runner list error: {e}') + + @h.action(_plugin_runtime_action('RUN_EVENTS_PAGE', 'run_events_page')) + async def run_events_page(data: dict[str, Any]) -> handler.ActionResponse: + """Page result events for one Host-owned run visible to current run.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') or run_id + before_cursor = data.get('before_cursor') + after_cursor = data.get('after_cursor') + limit = data.get('limit', 50) + direction = data.get('direction', 'forward') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run events page', + api_capability='run_events_page', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + try: + before_sequence = int(before_cursor) if before_cursor else None + after_sequence = int(after_cursor) if after_cursor else None + except (TypeError, ValueError): + return handler.ActionResponse.error(message='run event cursors must be integer sequences') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + run = await store.get_run(str(target_run_id)) + if not run: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, run) + if auth_error: + return auth_error + + items, next_cursor, prev_cursor, has_more = await store.page_run_events( + run_id=str(target_run_id), + before_sequence=before_sequence, + after_sequence=after_sequence, + limit=limit, + direction=str(direction or 'forward'), + ) + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_events_page', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={'target_run_id': str(target_run_id), 'limit': limit}, + ) + return handler.ActionResponse.success( + data={ + 'items': items, + 'next_cursor': str(next_cursor) if next_cursor else None, + 'prev_cursor': str(prev_cursor) if prev_cursor else None, + 'has_more': has_more, + } + ) + except Exception as e: + h.ap.logger.error(f'RUN_EVENTS_PAGE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run events page error: {e}') + + @h.action(_plugin_runtime_action('RUN_CANCEL', 'run_cancel')) + async def run_cancel(data: dict[str, Any]) -> handler.ActionResponse: + """Request cancellation for one Host-owned run visible to the current run.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') or run_id + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run cancel', + api_capability='run_cancel', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + run = await store.get_run(str(target_run_id)) + if not run: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, run) + if auth_error: + return auth_error + + updated = await store.request_cancel( + run_id=str(target_run_id), + status_reason=data.get('status_reason') or data.get('reason'), + ) + if not updated: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_cancel', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + detail={'status_reason': data.get('status_reason') or data.get('reason')}, + ) + return handler.ActionResponse.success(data=updated) + except Exception as e: + h.ap.logger.error(f'RUN_CANCEL error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run cancel error: {e}') + + @h.action(_plugin_runtime_action('RUN_APPEND_RESULT', 'run_append_result')) + async def run_append_result(data: dict[str, Any]) -> handler.ActionResponse: + """Append one result event for a Host-owned run visible to the current run.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') or run_id + caller_plugin_identity = data.get('caller_plugin_identity') + result = data.get('result') if isinstance(data.get('result'), dict) else {} + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + + try: + sequence = int(data.get('sequence') or result.get('sequence')) + except (TypeError, ValueError): + return handler.ActionResponse.error(message='sequence is required and must be an integer') + + event_type = data.get('event_type') or data.get('type') or result.get('type') + if not event_type: + return handler.ActionResponse.error(message='event_type is required') + + event_data = data.get('data') if isinstance(data.get('data'), dict) else result.get('data') + usage = data.get('usage') if isinstance(data.get('usage'), dict) else result.get('usage') + metadata = data.get('metadata') if isinstance(data.get('metadata'), dict) else None + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run append result', + api_capability='run_append_result', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + run = await store.get_run(str(target_run_id)) + if not run: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, run) + if auth_error: + return auth_error + if run.get('status') in TERMINAL_STATUSES: + return handler.ActionResponse.error( + message=f'Run append result is not allowed for terminal run {target_run_id}' + ) + claim_error = await _require_runtime_write_ownership( + store=store, + session=session, + run=run, + data=data, + api_name='Run append result', + ) + if claim_error: + return claim_error + + event_payload = event_data if isinstance(event_data, dict) else {} + payload_error = _validate_ledger_only_result_payload( + ap=h.ap, + runner_id=run.get('runner_id'), + event_type=str(event_type), + data=event_payload, + ) + if payload_error: + return handler.ActionResponse.error(message=payload_error) + + event = await store.append_event( + run_id=str(target_run_id), + sequence=sequence, + event_type=str(event_type), + data=event_payload, + usage=usage if isinstance(usage, dict) else None, + source=str(data.get('source') or result.get('source') or 'runner'), + metadata=metadata, + ) + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_append_result', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + detail={'event_type': str(event_type), 'sequence': sequence}, + ) + return handler.ActionResponse.success(data=event) + except Exception as e: + h.ap.logger.error(f'RUN_APPEND_RESULT error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run append result error: {e}') + + @h.action(_plugin_runtime_action('RUN_FINALIZE', 'run_finalize')) + async def run_finalize(data: dict[str, Any]) -> handler.ActionResponse: + """Finalize one Host-owned run visible to the current run.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') or run_id + caller_plugin_identity = data.get('caller_plugin_identity') + status = data.get('status') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + if not status: + return handler.ActionResponse.error(message='status is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run finalize', + api_capability='run_finalize', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + run = await store.get_run(str(target_run_id)) + if not run: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, run) + if auth_error: + return auth_error + claim_error = await _require_runtime_write_ownership( + store=store, + session=session, + run=run, + data=data, + api_name='Run finalize', + ) + if claim_error: + return claim_error + + updated = await store.finalize_run( + run_id=str(target_run_id), + status=str(status), + status_reason=data.get('status_reason') or data.get('reason'), + usage=data.get('usage') if isinstance(data.get('usage'), dict) else None, + cost=data.get('cost') if isinstance(data.get('cost'), dict) else None, + metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else None, + ) + if not updated: + return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_finalize', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + detail={'status': str(status)}, + ) + return handler.ActionResponse.success(data=updated) + except Exception as e: + h.ap.logger.error(f'RUN_FINALIZE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run finalize error: {e}') + + @h.action(_plugin_runtime_action('RUNTIME_REGISTER', 'runtime_register')) + async def runtime_register(data: dict[str, Any]) -> handler.ActionResponse: + """Register or update one Host-owned runtime registry record.""" + run_id = data.get('run_id') + runtime_id = data.get('runtime_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not runtime_id: + return handler.ActionResponse.error(message='runtime_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runtime register', + api_capability='runtime_register', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + runtime = await store.register_runtime( + runtime_id=str(runtime_id), + status=str(data.get('status') or 'online'), + display_name=data.get('display_name'), + endpoint=data.get('endpoint'), + version=data.get('version'), + capabilities=data.get('capabilities') if isinstance(data.get('capabilities'), dict) else {}, + labels=data.get('labels') if isinstance(data.get('labels'), dict) else {}, + metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else {}, + heartbeat_deadline_seconds=_deadline_seconds_from_payload(data), + ) + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='runtime_register', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + target_runtime_id=str(runtime_id), + detail={'status': runtime.get('status')}, + ) + return handler.ActionResponse.success(data=runtime) + except Exception as e: + h.ap.logger.error(f'RUNTIME_REGISTER error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime register error: {e}') + + @h.action(_plugin_runtime_action('RUNTIME_HEARTBEAT', 'runtime_heartbeat')) + async def runtime_heartbeat(data: dict[str, Any]) -> handler.ActionResponse: + """Refresh one Host-owned runtime heartbeat.""" + run_id = data.get('run_id') + runtime_id = data.get('runtime_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not runtime_id: + return handler.ActionResponse.error(message='runtime_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runtime heartbeat', + api_capability='runtime_heartbeat', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + runtime = await store.heartbeat_runtime( + runtime_id=str(runtime_id), + status=str(data.get('status') or 'online'), + capabilities=data.get('capabilities') if isinstance(data.get('capabilities'), dict) else None, + labels=data.get('labels') if isinstance(data.get('labels'), dict) else None, + metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else None, + heartbeat_deadline_seconds=_deadline_seconds_from_payload(data), + ) + if runtime is None: + return handler.ActionResponse.error(message=f'Runtime {runtime_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='runtime_heartbeat', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + target_runtime_id=str(runtime_id), + detail={'status': runtime.get('status')}, + ) + return handler.ActionResponse.success(data=runtime) + except Exception as e: + h.ap.logger.error(f'RUNTIME_HEARTBEAT error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime heartbeat error: {e}') + + @h.action(_plugin_runtime_action('RUNTIME_LIST', 'runtime_list')) + async def runtime_list(data: dict[str, Any]) -> handler.ActionResponse: + """List Host-owned runtime registry records.""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runtime list', + api_capability='runtime_list', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + statuses = data.get('statuses') + if statuses is not None and not isinstance(statuses, list): + return handler.ActionResponse.error(message='statuses must be a list') + labels = data.get('labels') if isinstance(data.get('labels'), dict) else {} + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + runtimes, total_count = await store.list_runtimes( + statuses=[str(status) for status in statuses] if statuses else None, + labels=labels, + limit=data.get('limit', 50), + ) + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='runtime_list', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + detail={ + 'statuses': [str(status) for status in statuses] if statuses else None, + 'limit': data.get('limit', 50), + }, + ) + return handler.ActionResponse.success( + data={ + 'items': runtimes, + 'next_cursor': None, + 'prev_cursor': None, + 'has_more': False, + 'total_count': total_count, + } + ) + except Exception as e: + h.ap.logger.error(f'RUNTIME_LIST error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime list error: {e}') + + @h.action(_plugin_runtime_action('RUNTIME_RECONCILE', 'runtime_reconcile')) + async def runtime_reconcile(data: dict[str, Any]) -> handler.ActionResponse: + """Reconcile stale runtime heartbeats and expired claim leases.""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runtime reconcile access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runtime reconcile', + api_capability='runtime_reconcile', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + stale_after_seconds = data.get('stale_after_seconds') + if stale_after_seconds is not None: + try: + stale_after_seconds = max(float(stale_after_seconds), 0) + except (TypeError, ValueError): + return handler.ActionResponse.error(message='stale_after_seconds must be a number') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + stale_runtimes = await store.mark_stale_runtimes( + stale_after_seconds=stale_after_seconds, + ) + released_claims = await store.release_expired_claims() + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='runtime_reconcile', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + detail={ + 'stale_count': len(stale_runtimes), + 'released_claim_count': len(released_claims), + }, + ) + return handler.ActionResponse.success( + data={ + 'stale_runtimes': stale_runtimes, + 'released_claims': released_claims, + 'stale_count': len(stale_runtimes), + 'released_claim_count': len(released_claims), + } + ) + except Exception as e: + h.ap.logger.error(f'RUNTIME_RECONCILE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime reconcile error: {e}') + + @h.action(_plugin_runtime_action('RUN_STATS', 'run_stats')) + async def run_stats(data: dict[str, Any]) -> handler.ActionResponse: + """Get run statistics within a time window (admin-only).""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Run stats access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run stats', + api_capability='run_stats', + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + end_time = data.get('end_time') or int(time.time()) + start_time = data.get('start_time') or (end_time - 3600) # Default: 1 hour + runner_id = data.get('runner_id') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + stats = await store.get_run_stats( + start_time=start_time, + end_time=end_time, + runner_id=runner_id, + ) + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_stats', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'start_time': start_time, + 'end_time': end_time, + 'runner_id': runner_id, + }, + ) + return handler.ActionResponse.success(data=stats) + except Exception as e: + h.ap.logger.error(f'RUN_STATS error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run stats error: {e}') + + @h.action(_plugin_runtime_action('RUNTIME_STATS', 'runtime_stats')) + async def runtime_stats(data: dict[str, Any]) -> handler.ActionResponse: + """Get runtime registry statistics (admin-only).""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runtime stats access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runtime stats', + api_capability='runtime_stats', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + stats = await store.get_runtime_stats() + await _record_agent_runner_admin_action( + h.ap, + store, + action='runtime_stats', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + detail={}, + ) + return handler.ActionResponse.success(data=stats) + except Exception as e: + h.ap.logger.error(f'RUNTIME_STATS error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime stats error: {e}') + + @h.action(_plugin_runtime_action('RUNNER_STATS', 'runner_stats')) + async def runner_stats(data: dict[str, Any]) -> handler.ActionResponse: + """Get runner-aggregated statistics (admin-only).""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runner stats access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Runner stats', + api_capability='runner_stats', + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + end_time = data.get('end_time') or int(time.time()) + start_time = data.get('start_time') or (end_time - 3600) # Default: 1 hour + limit = min(int(data.get('limit', 50)), 100) + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + stats = await store.get_runner_stats( + start_time=start_time, + end_time=end_time, + limit=limit, + ) + await _record_agent_runner_admin_action( + h.ap, + store, + action='runner_stats', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'start_time': start_time, + 'end_time': end_time, + 'limit': limit, + }, + ) + return handler.ActionResponse.success(data={'items': stats, 'total_count': len(stats), 'has_more': False}) + except Exception as e: + h.ap.logger.error(f'RUNNER_STATS error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runner stats error: {e}') + + @h.action(_plugin_runtime_action('RUN_CLAIM', 'run_claim')) + async def run_claim(data: dict[str, Any]) -> handler.ActionResponse: + """Claim one queued run for a runtime lease.""" + run_id = data.get('run_id') + runtime_id = data.get('runtime_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not runtime_id: + return handler.ActionResponse.error(message='runtime_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run claim', + api_capability='run_claim', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + runner_ids = data.get('runner_ids') + if runner_ids is not None and not isinstance(runner_ids, list): + return handler.ActionResponse.error(message='runner_ids must be a list') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + scope_filters: dict[str, Any] = {} + if not is_admin: + authorization = _get_run_authorization(session) + session_runner_id = session.get('runner_id') or authorization.get('runner_id') + if not session_runner_id: + return handler.ActionResponse.error(message='Run claim is not available without a runner_id') + if runner_ids and any(str(item) != session_runner_id for item in runner_ids): + return handler.ActionResponse.error(message='Run claim runner_ids are not accessible by this run') + runner_ids = [session_runner_id] + scope_filters = { + 'conversation_id': authorization.get('conversation_id'), + **_run_scope_filters(session), + } + run = await store.claim_next_run( + runtime_id=str(runtime_id), + queue_name=data.get('queue_name'), + lease_seconds=data.get('lease_seconds', 60), + runner_ids=[str(item) for item in runner_ids] if runner_ids else None, + **scope_filters, + ) + if run is None: + return handler.ActionResponse.error(message='No queued run available') + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_claim', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + durable_run_id=str(run.get('run_id')), + target_runtime_id=str(runtime_id), + detail={ + 'queue_name': data.get('queue_name'), + 'runner_ids': [str(item) for item in runner_ids] if runner_ids else None, + }, + ) + return handler.ActionResponse.success(data=run) + except Exception as e: + h.ap.logger.error(f'RUN_CLAIM error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run claim error: {e}') + + @h.action(_plugin_runtime_action('RUN_RENEW_CLAIM', 'run_renew_claim')) + async def run_renew_claim(data: dict[str, Any]) -> handler.ActionResponse: + """Renew one run claim lease.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') + runtime_id = data.get('runtime_id') + claim_token = data.get('claim_token') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + if not runtime_id: + return handler.ActionResponse.error(message='runtime_id is required') + if not claim_token: + return handler.ActionResponse.error(message='claim_token is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run renew claim', + api_capability='run_renew_claim', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + current = await store.get_run(str(target_run_id)) + if not current or current.get('claimed_by_runtime_id') != runtime_id: + return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, current) + if auth_error: + return auth_error + run = await store.renew_claim( + run_id=str(target_run_id), + claim_token=str(claim_token), + runtime_id=str(runtime_id), + lease_seconds=data.get('lease_seconds', 60), + ) + if run is None: + return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_renew_claim', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + target_runtime_id=str(runtime_id), + detail={'lease_seconds': data.get('lease_seconds', 60)}, + ) + return handler.ActionResponse.success(data=run) + except Exception as e: + h.ap.logger.error(f'RUN_RENEW_CLAIM error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run renew claim error: {e}') + + @h.action(_plugin_runtime_action('RUN_RELEASE_CLAIM', 'run_release_claim')) + async def run_release_claim(data: dict[str, Any]) -> handler.ActionResponse: + """Release one run claim lease.""" + run_id = data.get('run_id') + target_run_id = data.get('target_run_id') + runtime_id = data.get('runtime_id') + claim_token = data.get('claim_token') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + h.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin and not run_id: + return handler.ActionResponse.error(message='run_id is required') + if not target_run_id: + return handler.ActionResponse.error(message='target_run_id is required') + if not runtime_id: + return handler.ActionResponse.error(message='runtime_id is required') + if not claim_token: + return handler.ActionResponse.error(message='claim_token is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Run release claim', + api_capability='run_release_claim', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(h.ap.persistence_mgr.get_db_engine()) + + try: + current = await store.get_run(str(target_run_id)) + if not current or current.get('claimed_by_runtime_id') != runtime_id: + return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') + if not is_admin: + auth_error = _authorize_target_run(session, current) + if auth_error: + return auth_error + release_status = str(data.get('status') or 'queued') + if release_status in TERMINAL_STATUSES: + return handler.ActionResponse.error( + message='Run release claim cannot finalize a run; use run_finalize' + ) + run = await store.release_claim( + run_id=str(target_run_id), + claim_token=str(claim_token), + runtime_id=str(runtime_id), + status=str(data.get('status') or 'queued'), + status_reason=data.get('status_reason') or data.get('reason'), + ) + if run is None: + return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + h.ap, + store, + action='run_release_claim', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + target_runtime_id=str(runtime_id), + detail={ + 'status': str(data.get('status') or 'queued'), + 'status_reason': data.get('status_reason') or data.get('reason'), + }, + ) + return handler.ActionResponse.success(data=run) + except Exception as e: + h.ap.logger.error(f'RUN_RELEASE_CLAIM error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Run release claim error: {e}') diff --git a/src/langbot/pkg/plugin/agent_state_actions.py b/src/langbot/pkg/plugin/agent_state_actions.py new file mode 100644 index 000000000..2479ad426 --- /dev/null +++ b/src/langbot/pkg/plugin/agent_state_actions.py @@ -0,0 +1,316 @@ +"""Agent-runner steering / state actions.""" + +from __future__ import annotations + +from typing import Any + + +from langbot_plugin.runtime.io import handler +from langbot_plugin.entities.io.actions.enums import ( + PluginToRuntimeAction, +) + + +from ..agent.runner.session_registry import get_session_registry + +from .agent_run_support import ( + _resolve_state_scope, + _validate_agent_run_session, +) + + +def register(h): + @h.action(PluginToRuntimeAction.STEERING_PULL) + async def steering_pull(data: dict[str, Any]) -> handler.ActionResponse: + """Pull pending steering/follow-up inputs for the current run.""" + run_id = data.get('run_id') + mode = data.get('mode', 'all') + limit = data.get('limit') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if limit is not None: + try: + limit = int(limit) + except (TypeError, ValueError): + return handler.ActionResponse.error(message='limit must be an integer') + if limit <= 0: + return handler.ActionResponse.error(message='limit must be > 0') + limit = min(limit, 100) + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'Steering pull', + api_capability='steering_pull', + ) + if error: + return error + + session_registry = get_session_registry() + items = await session_registry.pull_steering( + run_id, + mode=str(mode or 'all'), + limit=limit, + ) + if items: + try: + from ..agent.runner.event_log_store import EventLogStore + + store = EventLogStore(h.ap.persistence_mgr.get_db_engine()) + for item in items: + event = item.get('event') if isinstance(item, dict) else None + conversation = item.get('conversation') if isinstance(item, dict) else None + actor = item.get('actor') if isinstance(item, dict) else None + subject = item.get('subject') if isinstance(item, dict) else None + if not isinstance(event, dict): + continue + await store.append_event( + event_id=None, + event_type='steering.injected', + source='agent_runner', + bot_id=conversation.get('bot_id') if isinstance(conversation, dict) else None, + workspace_id=conversation.get('workspace_id') if isinstance(conversation, dict) else None, + conversation_id=conversation.get('conversation_id') if isinstance(conversation, dict) else None, + thread_id=conversation.get('thread_id') if isinstance(conversation, dict) else None, + actor_type=actor.get('actor_type') if isinstance(actor, dict) else None, + actor_id=actor.get('actor_id') if isinstance(actor, dict) else None, + actor_name=actor.get('actor_name') if isinstance(actor, dict) else None, + subject_type=subject.get('subject_type') if isinstance(subject, dict) else None, + subject_id=subject.get('subject_id') if isinstance(subject, dict) else None, + input_summary=f'steering injected from {event.get("event_id")}', + run_id=run_id, + runner_id=session.get('runner_id') if isinstance(session, dict) else None, + metadata={ + 'steering': { + 'status': 'injected', + 'source_event_id': event.get('event_id'), + 'claimed_by_run_id': item.get('claimed_run_id') if isinstance(item, dict) else run_id, + 'claimed_runner_id': item.get('runner_id') if isinstance(item, dict) else None, + 'claimed_at': item.get('claimed_at') if isinstance(item, dict) else None, + 'pull_mode': str(mode or 'all'), + }, + }, + ) + except Exception as exc: + h.ap.logger.warning( + f'Failed to write steering injection audit for run {run_id}: {exc}', + exc_info=True, + ) + return handler.ActionResponse.success(data={'items': items}) + + # ================= State APIs (run-scoped, policy-enforced) ================= + + @h.action(PluginToRuntimeAction.STATE_GET) + async def state_get(data: dict[str, Any]) -> handler.ActionResponse: + """Get a state value from host-owned state store. + + Requires run_id authorization and scope enabled by state_policy. + """ + run_id = data.get('run_id') + scope = data.get('scope') + key = data.get('key') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if not scope: + return handler.ActionResponse.error(message='scope is required') + + if not key: + return handler.ActionResponse.error(message='key is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'State get', + api_capability='state', + ) + if error: + return error + + _state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error + + # Get state from persistent store + from ..agent.runner.persistent_state_store import get_persistent_state_store + + store = get_persistent_state_store(h.ap.persistence_mgr.get_db_engine()) + + try: + value = await store.state_get(scope_key, key) + return handler.ActionResponse.success(data={'value': value}) + except Exception as e: + h.ap.logger.error(f'STATE_GET error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'State get error: {e}') + + @h.action(PluginToRuntimeAction.STATE_SET) + async def state_set(data: dict[str, Any]) -> handler.ActionResponse: + """Set a state value in host-owned state store. + + Requires run_id authorization and scope enabled by state_policy. + Value must be JSON-serializable and size-limited. + """ + run_id = data.get('run_id') + scope = data.get('scope') + key = data.get('key') + value = data.get('value') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if not scope: + return handler.ActionResponse.error(message='scope is required') + + if not key: + return handler.ActionResponse.error(message='key is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'State set', + api_capability='state', + ) + if error: + return error + + state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error + + # Get additional context for DB insert + runner_id = session.get('runner_id', '') + binding_identity = state_context.get('binding_identity', 'unknown') + + # Set state in persistent store + from ..agent.runner.persistent_state_store import get_persistent_state_store + + store = get_persistent_state_store(h.ap.persistence_mgr.get_db_engine()) + + try: + success, error = await store.state_set( + scope_key=scope_key, + state_key=key, + value=value, + runner_id=runner_id, + binding_identity=binding_identity, + scope=scope, + context=state_context, + logger=h.ap.logger, + ) + + if not success: + return handler.ActionResponse.error(message=error or 'Failed to set state') + + return handler.ActionResponse.success(data={'success': True}) + except Exception as e: + h.ap.logger.error(f'STATE_SET error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'State set error: {e}') + + @h.action(PluginToRuntimeAction.STATE_DELETE) + async def state_delete(data: dict[str, Any]) -> handler.ActionResponse: + """Delete a state value from host-owned state store. + + Requires run_id authorization and scope enabled by state_policy. + """ + run_id = data.get('run_id') + scope = data.get('scope') + key = data.get('key') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if not scope: + return handler.ActionResponse.error(message='scope is required') + + if not key: + return handler.ActionResponse.error(message='key is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'State delete', + api_capability='state', + ) + if error: + return error + + _state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error + + # Delete state from persistent store + from ..agent.runner.persistent_state_store import get_persistent_state_store + + store = get_persistent_state_store(h.ap.persistence_mgr.get_db_engine()) + + try: + deleted = await store.state_delete(scope_key, key) + return handler.ActionResponse.success(data={'success': deleted}) + except Exception as e: + h.ap.logger.error(f'STATE_DELETE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'State delete error: {e}') + + @h.action(PluginToRuntimeAction.STATE_LIST) + async def state_list(data: dict[str, Any]) -> handler.ActionResponse: + """List state keys in a scope. + + Requires run_id authorization and scope enabled by state_policy. + """ + run_id = data.get('run_id') + scope = data.get('scope') + prefix = data.get('prefix') + limit = data.get('limit', 100) + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + if not scope: + return handler.ActionResponse.error(message='scope is required') + + # Validate limit + if not isinstance(limit, int) or limit <= 0: + limit = 100 + limit = min(limit, 100) # Cap at 100 + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + h.ap, + 'State list', + api_capability='state', + ) + if error: + return error + + _state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error + + # List state keys from persistent store + from ..agent.runner.persistent_state_store import get_persistent_state_store + + store = get_persistent_state_store(h.ap.persistence_mgr.get_db_engine()) + + try: + keys, has_more = await store.state_list(scope_key, prefix, limit) + return handler.ActionResponse.success( + data={ + 'keys': keys, + 'has_more': has_more, + } + ) + except Exception as e: + h.ap.logger.error(f'STATE_LIST error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'State list error: {e}') diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 8c7c55bbe..6fd859386 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -3,8 +3,6 @@ from __future__ import annotations import typing from typing import Any, Union import base64 -import json -import time import traceback import sqlalchemy @@ -30,109 +28,12 @@ from ..utils import constants from ..agent.runner.session_registry import get_session_registry from ..agent.runner.config_migration import ConfigMigration from ..agent.runner import config_schema -from ..agent.runner.result_normalizer import MAX_RESULT_SIZE_BYTES, STRICT_RESULT_PAYLOADS -from ..agent.runner.run_ledger_store import TERMINAL_STATUSES -class _RuntimeActionName: - def __init__(self, value: str): - self.value = value - - -AGENT_RUN_ADMIN_PERMISSION = 'agent_run:admin' -RUNTIME_ADMIN_PERMISSION = 'runtime:admin' -AGENT_RUNNER_ADMIN_PERMISSION = 'agent_runner:admin' -LEDGER_ONLY_SIDE_EFFECTING_RESULT_TYPES = { - 'message.delta', - 'message.completed', - 'state.updated', - 'run.completed', - 'run.failed', -} - - -def _plugin_runtime_action(name: str, value: str) -> Any: - return getattr(PluginToRuntimeAction, name, _RuntimeActionName(value)) - - -def _normalize_permission_set(value: Any) -> set[str]: - if isinstance(value, str): - return {permission.strip() for permission in value.split(',') if permission.strip()} - if isinstance(value, list): - return {str(item).strip() for item in value if str(item).strip()} - if isinstance(value, dict): - return {str(item).strip() for item, enabled in value.items() if enabled and str(item).strip()} - return set() - - -def _iter_agent_runner_admin_plugin_configs(ap: app.Application) -> list[dict[str, Any]]: - instance_config = getattr(ap, 'instance_config', None) - config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {} - if not isinstance(config_data, dict): - return [] - agent_runner_config = config_data.get('agent_runner', {}) - if not isinstance(agent_runner_config, dict): - return [] - raw_admin_plugins = agent_runner_config.get('admin_plugins', []) - if isinstance(raw_admin_plugins, dict): - items: list[dict[str, Any]] = [] - for identity, entry in raw_admin_plugins.items(): - if isinstance(entry, dict): - merged = dict(entry) - merged.setdefault('identity', identity) - items.append(merged) - else: - items.append({'identity': identity, 'permissions': entry}) - return items - if isinstance(raw_admin_plugins, list): - return [item for item in raw_admin_plugins if isinstance(item, dict)] - return [] - - -def _agent_runner_admin_permissions(ap: app.Application, plugin_identity: str | None) -> set[str]: - if not isinstance(plugin_identity, str) or not plugin_identity.strip(): - return set() - normalized_identity = plugin_identity.strip() - permissions: set[str] = set() - for entry in _iter_agent_runner_admin_plugin_configs(ap): - if entry.get('enabled', True) is False: - continue - identity = entry.get('identity') or entry.get('plugin_identity') or entry.get('plugin') or entry.get('id') - if identity != normalized_identity: - continue - permissions.update(_normalize_permission_set(entry.get('permissions'))) - permissions.update(_normalize_permission_set(entry.get('scopes'))) - return permissions - - -def _has_agent_runner_admin_permission( - ap: app.Application, - plugin_identity: str | None, - permission: str, -) -> bool: - permissions = _agent_runner_admin_permissions(ap, plugin_identity) - if not permissions: - return False - domain = permission.split(':', 1)[0] - return bool( - permission in permissions - or f'{domain}:*' in permissions - or AGENT_RUNNER_ADMIN_PERMISSION in permissions - or '*' in permissions - ) - - -def _deadline_seconds_from_payload(data: dict[str, Any], default: int = 60) -> int: - deadline_at = data.get('heartbeat_deadline_at') - if deadline_at is not None: - try: - return max(int(float(deadline_at) - time.time()), 1) - except (TypeError, ValueError): - pass - try: - return max(int(data.get('heartbeat_ttl_seconds') or default), 1) - except (TypeError, ValueError): - return default +from . import agent_pull_actions, agent_runner_actions, agent_state_actions +from .agent_run_support import ( + _validate_agent_run_session, +) def _make_rag_error_response(error: Exception, error_type: str, **extra_context) -> handler.ActionResponse: @@ -221,370 +122,6 @@ def _build_tool_detail(tool: Any, requested_tool_name: str | None = None) -> dic } -def _get_run_authorization(session: dict[str, Any]) -> dict[str, Any]: - """Return the run-scoped authorization snapshot.""" - return session['authorization'] - - -def _run_matches_run_scope(session: dict[str, Any], run: dict[str, Any]) -> bool: - authorization = _get_run_authorization(session) - session_run_id = session.get('run_id') - if run.get('run_id') == session_run_id: - return True - session_runner_id = session.get('runner_id') or authorization.get('runner_id') - if not session_runner_id or run.get('runner_id') != session_runner_id: - return False - if not authorization.get('conversation_id'): - return False - if run.get('conversation_id') != authorization.get('conversation_id'): - return False - if authorization.get('bot_id') is not None and authorization.get('bot_id') != run.get('bot_id'): - return False - if authorization.get('workspace_id') is not None and authorization.get('workspace_id') != run.get('workspace_id'): - return False - if authorization.get('thread_id') != run.get('thread_id'): - return False - return True - - -def _authorize_target_run( - session: dict[str, Any], - run: dict[str, Any], -) -> handler.ActionResponse | None: - """Authorize non-admin target-run access against scope and runner owner.""" - if _run_matches_run_scope(session, run): - return None - return handler.ActionResponse.error(message=f'Run {run.get("run_id")} is not accessible by this run') - - -def _validate_ledger_only_result_payload( - *, - ap: app.Application, - runner_id: str | None, - event_type: str, - data: dict[str, Any], -) -> str | None: - """Validate result payloads that can be safely stored without side effects.""" - try: - result_json = json.dumps({'type': event_type, 'data': data}) - except (TypeError, ValueError) as exc: - return f'event data must be JSON serializable: {exc}' - if len(result_json) > MAX_RESULT_SIZE_BYTES: - return f'event payload exceeds {MAX_RESULT_SIZE_BYTES} bytes' - - payload_model = STRICT_RESULT_PAYLOADS.get(event_type) - if payload_model is None: - return f'unknown result type: {event_type}' - try: - payload_model.model_validate(data) - except Exception as exc: - return f'invalid {event_type} payload: {exc}' - - if event_type in LEDGER_ONLY_SIDE_EFFECTING_RESULT_TYPES: - if runner_id: - ap.logger.warning( - f'Runner {runner_id} attempted ledger-only append for side-effecting result type {event_type}' - ) - return f'{event_type} must be emitted through the canonical runner result path' - return None - - -async def _require_runtime_write_ownership( - *, - store: Any, - session: dict[str, Any], - run: dict[str, Any], - data: dict[str, Any], - api_name: str, -) -> handler.ActionResponse | None: - """Require current-run ownership or an active runtime claim for run writes.""" - if run.get('run_id') == session.get('run_id') and run.get('status') != 'claimed': - return None - - runtime_id = data.get('runtime_id') - claim_token = data.get('claim_token') - if not runtime_id or not claim_token: - return handler.ActionResponse.error( - message=f'{api_name} requires active claim ownership for target run {run.get("run_id")}' - ) - - if not await store.validate_active_claim( - run_id=str(run.get('run_id')), - runtime_id=str(runtime_id), - claim_token=str(claim_token), - ): - return handler.ActionResponse.error( - message=f'{api_name} claim ownership is not active for target run {run.get("run_id")}' - ) - - return None - - -def _resolve_state_scope( - session: dict[str, Any], - scope: str, -) -> tuple[dict[str, Any] | None, str | None, handler.ActionResponse | None]: - """Resolve state policy/context for an authorized run scope.""" - authorization = _get_run_authorization(session) - state_policy = authorization['state_policy'] - - if not state_policy.get('enable_state', True): - return None, None, handler.ActionResponse.error(message='State access is disabled by binding policy') - - state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) - if scope not in state_scopes: - return None, None, handler.ActionResponse.error(message=f'Scope "{scope}" is not enabled by binding policy') - - state_context = authorization['state_context'] - scope_key = state_context.get('scope_keys', {}).get(scope) - if not scope_key: - return None, None, handler.ActionResponse.error(message=f'Scope key not available for scope "{scope}"') - - return state_context, scope_key, None - - -async def _validate_agent_run_session( - run_id: str, - caller_plugin_identity: str | None, - ap: app.Application, - api_name: str, - api_capability: str | None = None, - allow_persistent_authorization: bool = False, - admin_permission: str | None = None, -) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]: - """Validate an AgentRunner pull API run session and run-scoped API access.""" - if not run_id and admin_permission and _has_agent_runner_admin_permission( - ap, - caller_plugin_identity, - admin_permission, - ): - return { - 'run_id': run_id, - 'runner_id': None, - 'query_id': None, - 'plugin_identity': caller_plugin_identity, - 'authorization': {}, - 'status': {}, - 'steering_queue': [], - }, None - - session_registry = get_session_registry() - session = await session_registry.get(run_id) - if not session: - if allow_persistent_authorization: - session = await _load_persistent_agent_run_session(run_id, ap, api_name) - if not session: - return None, handler.ActionResponse.error(message=f'Run session {run_id} not found or expired') - - session_plugin_identity = session.get('plugin_identity') - if not isinstance(session_plugin_identity, str) or not session_plugin_identity.strip(): - ap.logger.warning(f'{api_name}: run_id {run_id} has no plugin_identity') - return None, handler.ActionResponse.error(message=f'Run session {run_id} has no plugin_identity') - if not caller_plugin_identity: - return None, handler.ActionResponse.error(message=f'caller_plugin_identity is required for run_id {run_id}') - if caller_plugin_identity != session_plugin_identity: - ap.logger.warning( - f'{api_name}: caller_plugin_identity {caller_plugin_identity} ' - f'does not match session plugin_identity {session_plugin_identity}' - ) - return None, handler.ActionResponse.error(message=f'Plugin identity mismatch for run_id {run_id}') - - if api_capability: - available_apis = _get_run_authorization(session).get('available_apis', {}) - has_admin_permission = bool(admin_permission) and _has_agent_runner_admin_permission( - ap, - caller_plugin_identity, - admin_permission, - ) - if not available_apis.get(api_capability, False) and not has_admin_permission: - return None, handler.ActionResponse.error(message=f'{api_name} access not authorized') - - return session, None - - -async def _load_persistent_agent_run_session( - run_id: str, - ap: app.Application, - api_name: str, -) -> dict[str, Any] | None: - """Load an expired run session from the AgentRun authorization snapshot.""" - try: - from sqlalchemy.ext.asyncio import AsyncSession - from sqlalchemy.orm import sessionmaker - - from ..entity.persistence.agent_run import AgentRun - - engine = ap.persistence_mgr.get_db_engine() - session_factory = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) - async with session_factory() as db_session: - result = await db_session.execute(sqlalchemy.select(AgentRun).where(AgentRun.run_id == run_id)) - run = result.scalars().first() - except Exception as e: - ap.logger.error(f'{api_name}: failed to load persistent authorization for run_id {run_id}: {e}', exc_info=True) - return None - - if run is None: - return None - - try: - authorization = json.loads(run.authorization_json) if run.authorization_json else {} - except (TypeError, ValueError) as e: - ap.logger.warning(f'{api_name}: run_id {run_id} has invalid authorization_json: {e}') - return None - - if not isinstance(authorization, dict): - ap.logger.warning(f'{api_name}: run_id {run_id} authorization_json is not an object') - return None - - return { - 'run_id': run.run_id, - 'runner_id': authorization.get('runner_id') or run.runner_id, - 'query_id': None, - 'plugin_identity': authorization.get('plugin_identity'), - 'authorization': authorization, - 'status': {}, - 'steering_queue': [], - } - - -def _resolve_run_conversation( - session: dict[str, Any], - requested_conversation_id: str | None, - api_name: str, -) -> tuple[str | None, handler.ActionResponse | None]: - """Resolve and enforce current-run conversation scope.""" - session_conversation_id = _get_run_authorization(session).get('conversation_id') - - if requested_conversation_id: - if not session_conversation_id: - return None, handler.ActionResponse.error(message=f'{api_name} is not available without a run conversation') - if requested_conversation_id != session_conversation_id: - return None, handler.ActionResponse.error( - message=f'Conversation {requested_conversation_id} is not accessible by this run' - ) - return requested_conversation_id, None - - return session_conversation_id, None - - -def _run_scope_filters(session: dict[str, Any]) -> dict[str, Any]: - authorization = _get_run_authorization(session) - return { - 'bot_id': authorization.get('bot_id'), - 'workspace_id': authorization.get('workspace_id'), - 'thread_id': authorization.get('thread_id'), - 'strict_thread': True, - } - - -def _run_ledger_scope_filters(session: dict[str, Any]) -> dict[str, Any]: - authorization = _get_run_authorization(session) - filters = _run_scope_filters(session) - filters['runner_id'] = session.get('runner_id') or authorization.get('runner_id') - return filters - - -def _event_matches_run_scope(session: dict[str, Any], event: dict[str, Any]) -> bool: - authorization = _get_run_authorization(session) - if authorization.get('conversation_id') != event.get('conversation_id'): - return False - if authorization.get('bot_id') is not None and authorization.get('bot_id') != event.get('bot_id'): - return False - if authorization.get('workspace_id') is not None and authorization.get('workspace_id') != event.get('workspace_id'): - return False - if authorization.get('thread_id') != event.get('thread_id'): - return False - return True - - -def _project_event_record_for_api(event: dict[str, Any]) -> dict[str, Any]: - """Project EventLogStore rows onto the SDK AgentEventRecord DTO.""" - seq = event.get('seq') or event.get('id') - return { - 'event_id': event.get('event_id'), - 'event_type': event.get('event_type'), - 'event_time': event.get('event_time'), - 'source': event.get('source'), - 'bot_id': event.get('bot_id'), - 'workspace_id': event.get('workspace_id'), - 'conversation_id': event.get('conversation_id'), - 'thread_id': event.get('thread_id'), - 'actor_type': event.get('actor_type'), - 'actor_id': event.get('actor_id'), - 'actor_name': event.get('actor_name'), - 'subject_type': event.get('subject_type'), - 'subject_id': event.get('subject_id'), - 'input_summary': event.get('input_summary'), - 'input_ref': event.get('input_ref'), - 'raw_ref': event.get('raw_ref'), - 'seq': seq, - 'cursor': event.get('cursor') or (str(seq) if seq is not None else None), - 'created_at': event.get('created_at'), - 'metadata': event.get('metadata') or {}, - } - - -def _project_runner_descriptor_for_api(descriptor: Any) -> dict[str, Any]: - """Project an AgentRunnerDescriptor-like object onto a JSON dict.""" - if isinstance(descriptor, dict): - return dict(descriptor) - if hasattr(descriptor, 'model_dump'): - return descriptor.model_dump(mode='json') - return { - 'id': getattr(descriptor, 'id', None), - 'source': getattr(descriptor, 'source', None), - 'label': getattr(descriptor, 'label', {}), - 'description': getattr(descriptor, 'description', None), - 'plugin_author': getattr(descriptor, 'plugin_author', None), - 'plugin_name': getattr(descriptor, 'plugin_name', None), - 'runner_name': getattr(descriptor, 'runner_name', None), - 'plugin_version': getattr(descriptor, 'plugin_version', None), - 'config_schema': getattr(descriptor, 'config_schema', []), - 'capabilities': getattr(descriptor, 'capabilities', {}), - 'permissions': getattr(descriptor, 'permissions', {}), - 'raw_manifest': getattr(descriptor, 'raw_manifest', {}), - } - - -async def _record_agent_runner_admin_action( - ap: app.Application, - store: Any, - *, - action: str, - caller_plugin_identity: str | None, - permission: str, - durable_run_id: str | None = None, - target_runtime_id: str | None = None, - detail: dict[str, Any] | None = None, -) -> None: - """Record a small audit trail for privileged AgentRunner operations.""" - audit_data: dict[str, Any] = { - 'action': action, - 'caller_plugin_identity': caller_plugin_identity, - 'permission': permission, - } - if durable_run_id: - audit_data['target_run_id'] = durable_run_id - if target_runtime_id: - audit_data['target_runtime_id'] = target_runtime_id - if detail: - audit_data['detail'] = detail - - ap.logger.info('Agent runner admin action: %s', audit_data) - if not durable_run_id or store is None or not hasattr(store, 'append_audit_event'): - return - - try: - await store.append_audit_event( - run_id=str(durable_run_id), - event_type=f'admin.{action}', - data=audit_data, - metadata={'permission': permission}, - ) - except Exception as exc: - ap.logger.warning(f'Failed to record AgentRunner admin audit event: {exc}', exc_info=True) - - def _normalize_uuid_list(values: Any) -> list[str]: """Normalize a user/config supplied UUID list while preserving order.""" if not isinstance(values, list): @@ -1820,1741 +1357,9 @@ class RuntimeConnectionHandler(handler.Handler): } ) - @self.action(PluginToRuntimeAction.HISTORY_PAGE) - async def history_page(data: dict[str, Any]) -> handler.ActionResponse: - """Page through transcript history for a conversation. - - Requires run_id authorization. Only allows access to current run's conversation. - """ - run_id = data.get('run_id') - conversation_id = data.get('conversation_id') - before_cursor = data.get('before_cursor') - after_cursor = data.get('after_cursor') - limit = data.get('limit', 50) - direction = data.get('direction', 'backward') - include_attachments = data.get('include_attachments', False) - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'History page', - api_capability='history_page', - ) - if error: - return error - - conversation_id, scope_error = _resolve_run_conversation( - session, - conversation_id, - 'History page', - ) - if scope_error: - return scope_error - - if not conversation_id: - return handler.ActionResponse.success( - data={ - 'items': [], - 'next_cursor': None, - 'prev_cursor': None, - 'has_more': False, - } - ) - - # Parse cursors - before_seq = int(before_cursor) if before_cursor else None - after_seq = int(after_cursor) if after_cursor else None - - # Query transcript - from ..agent.runner.transcript_store import TranscriptStore - - store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) - - try: - items, next_seq, prev_seq, has_more = await store.page_transcript( - conversation_id=conversation_id, - before_seq=before_seq, - after_seq=after_seq, - limit=limit, - direction=direction, - include_attachments=include_attachments, - **_run_scope_filters(session), - ) - - return handler.ActionResponse.success( - data={ - 'items': items, - 'next_cursor': str(next_seq) if next_seq else None, - 'prev_cursor': str(prev_seq) if prev_seq else None, - 'has_more': has_more, - } - ) - except Exception as e: - self.ap.logger.error(f'HISTORY_PAGE error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'History page error: {e}') - - @self.action(PluginToRuntimeAction.HISTORY_SEARCH) - async def history_search(data: dict[str, Any]) -> handler.ActionResponse: - """Search transcript history. - - Requires run_id authorization. Only searches current run's conversation. - Basic implementation using LIKE filtering. - """ - run_id = data.get('run_id') - query_text = data.get('query', '') - filters = data.get('filters') or {} - top_k = data.get('top_k', 10) - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'History search', - api_capability='history_search', - ) - if error: - return error - - requested_conversation_id = filters.get('conversation_id') - conversation_id, scope_error = _resolve_run_conversation( - session, - requested_conversation_id, - 'History search', - ) - if scope_error: - return scope_error - - if not conversation_id: - return handler.ActionResponse.success( - data={ - 'items': [], - 'total_count': 0, - 'query': query_text, - } - ) - - # Search transcript - from ..agent.runner.transcript_store import TranscriptStore - - store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) - - try: - safe_filters = {k: v for k, v in filters.items() if k != 'conversation_id'} - items = await store.search_transcript( - conversation_id=conversation_id, - query_text=query_text, - filters=safe_filters, - top_k=top_k, - **_run_scope_filters(session), - ) - - return handler.ActionResponse.success( - data={ - 'items': items, - 'total_count': len(items), - 'query': query_text, - } - ) - except Exception as e: - self.ap.logger.error(f'HISTORY_SEARCH error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'History search error: {e}') - - @self.action(PluginToRuntimeAction.EVENT_GET) - async def event_get(data: dict[str, Any]) -> handler.ActionResponse: - """Get a single event record by ID. - - Requires run_id authorization. Only allows access to events in current run's conversation. - """ - run_id = data.get('run_id') - event_id = data.get('event_id') - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - if not event_id: - return handler.ActionResponse.error(message='event_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Event get', - api_capability='event_get', - ) - if error: - return error - - # Get event - from ..agent.runner.event_log_store import EventLogStore - - store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) - - try: - event = await store.get_event(event_id) - if not event: - return handler.ActionResponse.error(message=f'Event {event_id} not found') - - # Validate event is in the same conversation as the run, or was created by the same run. - session_conversation_id = _get_run_authorization(session).get('conversation_id') - event_run_id = event.get('run_id') - if event_run_id and event_run_id == run_id: - return handler.ActionResponse.success(data=_project_event_record_for_api(event)) - if not session_conversation_id or not _event_matches_run_scope(session, event): - return handler.ActionResponse.error(message=f'Event {event_id} is not accessible by this run') - - return handler.ActionResponse.success(data=_project_event_record_for_api(event)) - except Exception as e: - self.ap.logger.error(f'EVENT_GET error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Event get error: {e}') - - @self.action(PluginToRuntimeAction.EVENT_PAGE) - async def event_page(data: dict[str, Any]) -> handler.ActionResponse: - """Page through event records. - - Requires run_id authorization. Only allows access to current run's conversation. - """ - run_id = data.get('run_id') - conversation_id = data.get('conversation_id') - event_types = data.get('event_types') - before_cursor = data.get('before_cursor') - limit = data.get('limit', 50) - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Event page', - api_capability='event_page', - ) - if error: - return error - - conversation_id, scope_error = _resolve_run_conversation( - session, - conversation_id, - 'Event page', - ) - if scope_error: - return scope_error - - if not conversation_id: - return handler.ActionResponse.success( - data={ - 'items': [], - 'next_cursor': None, - 'prev_cursor': None, - 'has_more': False, - } - ) - - # Parse cursor - before_seq = int(before_cursor) if before_cursor else None - - # Query events - from ..agent.runner.event_log_store import EventLogStore - - store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) - - try: - items, next_seq, has_more = await store.page_events( - conversation_id=conversation_id, - event_types=event_types, - before_seq=before_seq, - limit=limit, - **_run_scope_filters(session), - ) - - return handler.ActionResponse.success( - data={ - 'items': [_project_event_record_for_api(item) for item in items], - 'next_cursor': str(next_seq) if next_seq else None, - 'prev_cursor': None, - 'has_more': has_more, - } - ) - except Exception as e: - self.ap.logger.error(f'EVENT_PAGE error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Event page error: {e}') - - @self.action(_plugin_runtime_action('RUN_GET', 'run_get')) - async def run_get(data: dict[str, Any]) -> handler.ActionResponse: - """Get one Host-owned run record visible to the current run.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') or run_id - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run get', - api_capability='run_get', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - run = await store.get_run(str(target_run_id)) - if not run: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, run) - if auth_error: - return auth_error - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_get', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - detail={'target_run_id': str(target_run_id)}, - ) - return handler.ActionResponse.success(data=run) - except Exception as e: - self.ap.logger.error(f'RUN_GET error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run get error: {e}') - - @self.action(_plugin_runtime_action('RUN_LIST', 'run_list')) - async def run_list(data: dict[str, Any]) -> handler.ActionResponse: - """List Host-owned runs visible to the current run conversation.""" - run_id = data.get('run_id') - conversation_id = data.get('conversation_id') - statuses = data.get('statuses') - before_cursor = data.get('before_cursor') - limit = data.get('limit', 50) - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - - scope_filters: dict[str, Any] = {} - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run list', - api_capability='run_list', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - if not is_admin: - conversation_id, scope_error = _resolve_run_conversation( - session, - conversation_id, - 'Run list', - ) - if scope_error: - return scope_error - scope_filters = _run_ledger_scope_filters(session) - - if not is_admin and not conversation_id: - return handler.ActionResponse.success( - data={ - 'items': [], - 'next_cursor': None, - 'prev_cursor': None, - 'has_more': False, - 'total_count': 0, - } - ) - - if statuses is not None and not isinstance(statuses, list): - return handler.ActionResponse.error(message='statuses must be a list') - try: - before_id = int(before_cursor) if before_cursor else None - except (TypeError, ValueError): - return handler.ActionResponse.error(message='before_cursor must be an integer cursor') - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - items, next_cursor, has_more, total_count = await store.list_runs( - conversation_id=conversation_id, - statuses=[str(status) for status in statuses] if statuses else None, - before_id=before_id, - limit=limit, - **scope_filters, - ) - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_list', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - detail={ - 'statuses': [str(status) for status in statuses] if statuses else None, - 'limit': limit, - }, - ) - return handler.ActionResponse.success( - data={ - 'items': items, - 'next_cursor': str(next_cursor) if next_cursor else None, - 'prev_cursor': None, - 'has_more': has_more, - 'total_count': total_count, - } - ) - except Exception as e: - self.ap.logger.error(f'RUN_LIST error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run list error: {e}') - - @self.action(_plugin_runtime_action('RUNNER_LIST', 'runner_list')) - async def runner_list(data: dict[str, Any]) -> handler.ActionResponse: - """List Host-discovered AgentRunner descriptors.""" - run_id = data.get('run_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin: - return handler.ActionResponse.error(message='Runner list access not authorized') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runner list', - api_capability='runner_list', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - include_plugins = data.get('include_plugins') - if include_plugins is not None and not isinstance(include_plugins, list): - return handler.ActionResponse.error(message='include_plugins must be a list') - - registry = getattr(self.ap, 'agent_runner_registry', None) - if registry is None: - return handler.ActionResponse.success(data={'items': []}) - - try: - runners = await registry.list_runners( - bound_plugins=[str(item) for item in include_plugins] if include_plugins else None, - use_cache=bool(data.get('use_cache', True)), - ) - items = [_project_runner_descriptor_for_api(item) for item in runners] - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - None, - action='runner_list', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - detail={ - 'include_plugins': [str(item) for item in include_plugins] - if include_plugins - else None, - 'count': len(items), - }, - ) - return handler.ActionResponse.success(data={'items': items}) - except Exception as e: - self.ap.logger.error(f'RUNNER_LIST error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runner list error: {e}') - - @self.action(_plugin_runtime_action('RUN_EVENTS_PAGE', 'run_events_page')) - async def run_events_page(data: dict[str, Any]) -> handler.ActionResponse: - """Page result events for one Host-owned run visible to current run.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') or run_id - before_cursor = data.get('before_cursor') - after_cursor = data.get('after_cursor') - limit = data.get('limit', 50) - direction = data.get('direction', 'forward') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run events page', - api_capability='run_events_page', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - try: - before_sequence = int(before_cursor) if before_cursor else None - after_sequence = int(after_cursor) if after_cursor else None - except (TypeError, ValueError): - return handler.ActionResponse.error(message='run event cursors must be integer sequences') - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - run = await store.get_run(str(target_run_id)) - if not run: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, run) - if auth_error: - return auth_error - - items, next_cursor, prev_cursor, has_more = await store.page_run_events( - run_id=str(target_run_id), - before_sequence=before_sequence, - after_sequence=after_sequence, - limit=limit, - direction=str(direction or 'forward'), - ) - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_events_page', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - detail={'target_run_id': str(target_run_id), 'limit': limit}, - ) - return handler.ActionResponse.success( - data={ - 'items': items, - 'next_cursor': str(next_cursor) if next_cursor else None, - 'prev_cursor': str(prev_cursor) if prev_cursor else None, - 'has_more': has_more, - } - ) - except Exception as e: - self.ap.logger.error(f'RUN_EVENTS_PAGE error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run events page error: {e}') - - @self.action(_plugin_runtime_action('RUN_CANCEL', 'run_cancel')) - async def run_cancel(data: dict[str, Any]) -> handler.ActionResponse: - """Request cancellation for one Host-owned run visible to the current run.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') or run_id - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run cancel', - api_capability='run_cancel', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - run = await store.get_run(str(target_run_id)) - if not run: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, run) - if auth_error: - return auth_error - - updated = await store.request_cancel( - run_id=str(target_run_id), - status_reason=data.get('status_reason') or data.get('reason'), - ) - if not updated: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_cancel', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - durable_run_id=str(target_run_id), - detail={'status_reason': data.get('status_reason') or data.get('reason')}, - ) - return handler.ActionResponse.success(data=updated) - except Exception as e: - self.ap.logger.error(f'RUN_CANCEL error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run cancel error: {e}') - - @self.action(_plugin_runtime_action('RUN_APPEND_RESULT', 'run_append_result')) - async def run_append_result(data: dict[str, Any]) -> handler.ActionResponse: - """Append one result event for a Host-owned run visible to the current run.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') or run_id - caller_plugin_identity = data.get('caller_plugin_identity') - result = data.get('result') if isinstance(data.get('result'), dict) else {} - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - - try: - sequence = int(data.get('sequence') or result.get('sequence')) - except (TypeError, ValueError): - return handler.ActionResponse.error(message='sequence is required and must be an integer') - - event_type = data.get('event_type') or data.get('type') or result.get('type') - if not event_type: - return handler.ActionResponse.error(message='event_type is required') - - event_data = data.get('data') if isinstance(data.get('data'), dict) else result.get('data') - usage = data.get('usage') if isinstance(data.get('usage'), dict) else result.get('usage') - metadata = data.get('metadata') if isinstance(data.get('metadata'), dict) else None - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run append result', - api_capability='run_append_result', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - run = await store.get_run(str(target_run_id)) - if not run: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, run) - if auth_error: - return auth_error - if run.get('status') in TERMINAL_STATUSES: - return handler.ActionResponse.error( - message=f'Run append result is not allowed for terminal run {target_run_id}' - ) - claim_error = await _require_runtime_write_ownership( - store=store, - session=session, - run=run, - data=data, - api_name='Run append result', - ) - if claim_error: - return claim_error - - event_payload = event_data if isinstance(event_data, dict) else {} - payload_error = _validate_ledger_only_result_payload( - ap=self.ap, - runner_id=run.get('runner_id'), - event_type=str(event_type), - data=event_payload, - ) - if payload_error: - return handler.ActionResponse.error(message=payload_error) - - event = await store.append_event( - run_id=str(target_run_id), - sequence=sequence, - event_type=str(event_type), - data=event_payload, - usage=usage if isinstance(usage, dict) else None, - source=str(data.get('source') or result.get('source') or 'runner'), - metadata=metadata, - ) - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_append_result', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - durable_run_id=str(target_run_id), - detail={'event_type': str(event_type), 'sequence': sequence}, - ) - return handler.ActionResponse.success(data=event) - except Exception as e: - self.ap.logger.error(f'RUN_APPEND_RESULT error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run append result error: {e}') - - @self.action(_plugin_runtime_action('RUN_FINALIZE', 'run_finalize')) - async def run_finalize(data: dict[str, Any]) -> handler.ActionResponse: - """Finalize one Host-owned run visible to the current run.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') or run_id - caller_plugin_identity = data.get('caller_plugin_identity') - status = data.get('status') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - if not status: - return handler.ActionResponse.error(message='status is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run finalize', - api_capability='run_finalize', - allow_persistent_authorization=True, - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - run = await store.get_run(str(target_run_id)) - if not run: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, run) - if auth_error: - return auth_error - claim_error = await _require_runtime_write_ownership( - store=store, - session=session, - run=run, - data=data, - api_name='Run finalize', - ) - if claim_error: - return claim_error - - updated = await store.finalize_run( - run_id=str(target_run_id), - status=str(status), - status_reason=data.get('status_reason') or data.get('reason'), - usage=data.get('usage') if isinstance(data.get('usage'), dict) else None, - cost=data.get('cost') if isinstance(data.get('cost'), dict) else None, - metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else None, - ) - if not updated: - return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_finalize', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - durable_run_id=str(target_run_id), - detail={'status': str(status)}, - ) - return handler.ActionResponse.success(data=updated) - except Exception as e: - self.ap.logger.error(f'RUN_FINALIZE error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run finalize error: {e}') - - @self.action(_plugin_runtime_action('RUNTIME_REGISTER', 'runtime_register')) - async def runtime_register(data: dict[str, Any]) -> handler.ActionResponse: - """Register or update one Host-owned runtime registry record.""" - run_id = data.get('run_id') - runtime_id = data.get('runtime_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not runtime_id: - return handler.ActionResponse.error(message='runtime_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runtime register', - api_capability='runtime_register', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - runtime = await store.register_runtime( - runtime_id=str(runtime_id), - status=str(data.get('status') or 'online'), - display_name=data.get('display_name'), - endpoint=data.get('endpoint'), - version=data.get('version'), - capabilities=data.get('capabilities') if isinstance(data.get('capabilities'), dict) else {}, - labels=data.get('labels') if isinstance(data.get('labels'), dict) else {}, - metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else {}, - heartbeat_deadline_seconds=_deadline_seconds_from_payload(data), - ) - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='runtime_register', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - target_runtime_id=str(runtime_id), - detail={'status': runtime.get('status')}, - ) - return handler.ActionResponse.success(data=runtime) - except Exception as e: - self.ap.logger.error(f'RUNTIME_REGISTER error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runtime register error: {e}') - - @self.action(_plugin_runtime_action('RUNTIME_HEARTBEAT', 'runtime_heartbeat')) - async def runtime_heartbeat(data: dict[str, Any]) -> handler.ActionResponse: - """Refresh one Host-owned runtime heartbeat.""" - run_id = data.get('run_id') - runtime_id = data.get('runtime_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not runtime_id: - return handler.ActionResponse.error(message='runtime_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runtime heartbeat', - api_capability='runtime_heartbeat', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - runtime = await store.heartbeat_runtime( - runtime_id=str(runtime_id), - status=str(data.get('status') or 'online'), - capabilities=data.get('capabilities') if isinstance(data.get('capabilities'), dict) else None, - labels=data.get('labels') if isinstance(data.get('labels'), dict) else None, - metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else None, - heartbeat_deadline_seconds=_deadline_seconds_from_payload(data), - ) - if runtime is None: - return handler.ActionResponse.error(message=f'Runtime {runtime_id} not found') - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='runtime_heartbeat', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - target_runtime_id=str(runtime_id), - detail={'status': runtime.get('status')}, - ) - return handler.ActionResponse.success(data=runtime) - except Exception as e: - self.ap.logger.error(f'RUNTIME_HEARTBEAT error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runtime heartbeat error: {e}') - - @self.action(_plugin_runtime_action('RUNTIME_LIST', 'runtime_list')) - async def runtime_list(data: dict[str, Any]) -> handler.ActionResponse: - """List Host-owned runtime registry records.""" - run_id = data.get('run_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - - _session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runtime list', - api_capability='runtime_list', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - statuses = data.get('statuses') - if statuses is not None and not isinstance(statuses, list): - return handler.ActionResponse.error(message='statuses must be a list') - labels = data.get('labels') if isinstance(data.get('labels'), dict) else {} - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - runtimes, total_count = await store.list_runtimes( - statuses=[str(status) for status in statuses] if statuses else None, - labels=labels, - limit=data.get('limit', 50), - ) - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='runtime_list', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - detail={ - 'statuses': [str(status) for status in statuses] if statuses else None, - 'limit': data.get('limit', 50), - }, - ) - return handler.ActionResponse.success( - data={ - 'items': runtimes, - 'next_cursor': None, - 'prev_cursor': None, - 'has_more': False, - 'total_count': total_count, - } - ) - except Exception as e: - self.ap.logger.error(f'RUNTIME_LIST error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runtime list error: {e}') - - @self.action(_plugin_runtime_action('RUNTIME_RECONCILE', 'runtime_reconcile')) - async def runtime_reconcile(data: dict[str, Any]) -> handler.ActionResponse: - """Reconcile stale runtime heartbeats and expired claim leases.""" - run_id = data.get('run_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin: - return handler.ActionResponse.error(message='Runtime reconcile access not authorized') - - _session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runtime reconcile', - api_capability='runtime_reconcile', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - stale_after_seconds = data.get('stale_after_seconds') - if stale_after_seconds is not None: - try: - stale_after_seconds = max(float(stale_after_seconds), 0) - except (TypeError, ValueError): - return handler.ActionResponse.error(message='stale_after_seconds must be a number') - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - stale_runtimes = await store.mark_stale_runtimes( - stale_after_seconds=stale_after_seconds, - ) - released_claims = await store.release_expired_claims() - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='runtime_reconcile', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - detail={ - 'stale_count': len(stale_runtimes), - 'released_claim_count': len(released_claims), - }, - ) - return handler.ActionResponse.success( - data={ - 'stale_runtimes': stale_runtimes, - 'released_claims': released_claims, - 'stale_count': len(stale_runtimes), - 'released_claim_count': len(released_claims), - } - ) - except Exception as e: - self.ap.logger.error(f'RUNTIME_RECONCILE error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runtime reconcile error: {e}') - - @self.action(_plugin_runtime_action('RUN_STATS', 'run_stats')) - async def run_stats(data: dict[str, Any]) -> handler.ActionResponse: - """Get run statistics within a time window (admin-only).""" - run_id = data.get('run_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin: - return handler.ActionResponse.error(message='Run stats access not authorized') - - _session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run stats', - api_capability='run_stats', - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - import time - end_time = data.get('end_time') or int(time.time()) - start_time = data.get('start_time') or (end_time - 3600) # Default: 1 hour - runner_id = data.get('runner_id') - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - stats = await store.get_run_stats( - start_time=start_time, - end_time=end_time, - runner_id=runner_id, - ) - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_stats', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - detail={ - 'start_time': start_time, - 'end_time': end_time, - 'runner_id': runner_id, - }, - ) - return handler.ActionResponse.success(data=stats) - except Exception as e: - self.ap.logger.error(f'RUN_STATS error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run stats error: {e}') - - @self.action(_plugin_runtime_action('RUNTIME_STATS', 'runtime_stats')) - async def runtime_stats(data: dict[str, Any]) -> handler.ActionResponse: - """Get runtime registry statistics (admin-only).""" - run_id = data.get('run_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin: - return handler.ActionResponse.error(message='Runtime stats access not authorized') - - _session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runtime stats', - api_capability='runtime_stats', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - stats = await store.get_runtime_stats() - await _record_agent_runner_admin_action( - self.ap, - store, - action='runtime_stats', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - detail={}, - ) - return handler.ActionResponse.success(data=stats) - except Exception as e: - self.ap.logger.error(f'RUNTIME_STATS error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runtime stats error: {e}') - - @self.action(_plugin_runtime_action('RUNNER_STATS', 'runner_stats')) - async def runner_stats(data: dict[str, Any]) -> handler.ActionResponse: - """Get runner-aggregated statistics (admin-only).""" - run_id = data.get('run_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - AGENT_RUN_ADMIN_PERMISSION, - ) - - if not is_admin: - return handler.ActionResponse.error(message='Runner stats access not authorized') - - _session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Runner stats', - api_capability='runner_stats', - admin_permission=AGENT_RUN_ADMIN_PERMISSION, - ) - if error: - return error - - import time - end_time = data.get('end_time') or int(time.time()) - start_time = data.get('start_time') or (end_time - 3600) # Default: 1 hour - limit = min(int(data.get('limit', 50)), 100) - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - stats = await store.get_runner_stats( - start_time=start_time, - end_time=end_time, - limit=limit, - ) - await _record_agent_runner_admin_action( - self.ap, - store, - action='runner_stats', - caller_plugin_identity=caller_plugin_identity, - permission=AGENT_RUN_ADMIN_PERMISSION, - detail={ - 'start_time': start_time, - 'end_time': end_time, - 'limit': limit, - }, - ) - return handler.ActionResponse.success(data={'items': stats, 'total_count': len(stats), 'has_more': False}) - except Exception as e: - self.ap.logger.error(f'RUNNER_STATS error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Runner stats error: {e}') - - @self.action(_plugin_runtime_action('RUN_CLAIM', 'run_claim')) - async def run_claim(data: dict[str, Any]) -> handler.ActionResponse: - """Claim one queued run for a runtime lease.""" - run_id = data.get('run_id') - runtime_id = data.get('runtime_id') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not runtime_id: - return handler.ActionResponse.error(message='runtime_id is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run claim', - api_capability='run_claim', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - runner_ids = data.get('runner_ids') - if runner_ids is not None and not isinstance(runner_ids, list): - return handler.ActionResponse.error(message='runner_ids must be a list') - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - scope_filters: dict[str, Any] = {} - if not is_admin: - authorization = _get_run_authorization(session) - session_runner_id = session.get('runner_id') or authorization.get('runner_id') - if not session_runner_id: - return handler.ActionResponse.error(message='Run claim is not available without a runner_id') - if runner_ids and any(str(item) != session_runner_id for item in runner_ids): - return handler.ActionResponse.error(message='Run claim runner_ids are not accessible by this run') - runner_ids = [session_runner_id] - scope_filters = { - 'conversation_id': authorization.get('conversation_id'), - **_run_scope_filters(session), - } - run = await store.claim_next_run( - runtime_id=str(runtime_id), - queue_name=data.get('queue_name'), - lease_seconds=data.get('lease_seconds', 60), - runner_ids=[str(item) for item in runner_ids] if runner_ids else None, - **scope_filters, - ) - if run is None: - return handler.ActionResponse.error(message='No queued run available') - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_claim', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - durable_run_id=str(run.get('run_id')), - target_runtime_id=str(runtime_id), - detail={ - 'queue_name': data.get('queue_name'), - 'runner_ids': [str(item) for item in runner_ids] if runner_ids else None, - }, - ) - return handler.ActionResponse.success(data=run) - except Exception as e: - self.ap.logger.error(f'RUN_CLAIM error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run claim error: {e}') - - @self.action(_plugin_runtime_action('RUN_RENEW_CLAIM', 'run_renew_claim')) - async def run_renew_claim(data: dict[str, Any]) -> handler.ActionResponse: - """Renew one run claim lease.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') - runtime_id = data.get('runtime_id') - claim_token = data.get('claim_token') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - if not runtime_id: - return handler.ActionResponse.error(message='runtime_id is required') - if not claim_token: - return handler.ActionResponse.error(message='claim_token is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run renew claim', - api_capability='run_renew_claim', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - current = await store.get_run(str(target_run_id)) - if not current or current.get('claimed_by_runtime_id') != runtime_id: - return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, current) - if auth_error: - return auth_error - run = await store.renew_claim( - run_id=str(target_run_id), - claim_token=str(claim_token), - runtime_id=str(runtime_id), - lease_seconds=data.get('lease_seconds', 60), - ) - if run is None: - return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_renew_claim', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - durable_run_id=str(target_run_id), - target_runtime_id=str(runtime_id), - detail={'lease_seconds': data.get('lease_seconds', 60)}, - ) - return handler.ActionResponse.success(data=run) - except Exception as e: - self.ap.logger.error(f'RUN_RENEW_CLAIM error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run renew claim error: {e}') - - @self.action(_plugin_runtime_action('RUN_RELEASE_CLAIM', 'run_release_claim')) - async def run_release_claim(data: dict[str, Any]) -> handler.ActionResponse: - """Release one run claim lease.""" - run_id = data.get('run_id') - target_run_id = data.get('target_run_id') - runtime_id = data.get('runtime_id') - claim_token = data.get('claim_token') - caller_plugin_identity = data.get('caller_plugin_identity') - is_admin = _has_agent_runner_admin_permission( - self.ap, - caller_plugin_identity, - RUNTIME_ADMIN_PERMISSION, - ) - - if not is_admin and not run_id: - return handler.ActionResponse.error(message='run_id is required') - if not target_run_id: - return handler.ActionResponse.error(message='target_run_id is required') - if not runtime_id: - return handler.ActionResponse.error(message='runtime_id is required') - if not claim_token: - return handler.ActionResponse.error(message='claim_token is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Run release claim', - api_capability='run_release_claim', - admin_permission=RUNTIME_ADMIN_PERMISSION, - ) - if error: - return error - - from ..agent.runner.run_ledger_store import RunLedgerStore - - store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) - - try: - current = await store.get_run(str(target_run_id)) - if not current or current.get('claimed_by_runtime_id') != runtime_id: - return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') - if not is_admin: - auth_error = _authorize_target_run(session, current) - if auth_error: - return auth_error - release_status = str(data.get('status') or 'queued') - if release_status in TERMINAL_STATUSES: - return handler.ActionResponse.error( - message='Run release claim cannot finalize a run; use run_finalize' - ) - run = await store.release_claim( - run_id=str(target_run_id), - claim_token=str(claim_token), - runtime_id=str(runtime_id), - status=str(data.get('status') or 'queued'), - status_reason=data.get('status_reason') or data.get('reason'), - ) - if run is None: - return handler.ActionResponse.error(message=f'Run claim {target_run_id} not found') - if is_admin: - await _record_agent_runner_admin_action( - self.ap, - store, - action='run_release_claim', - caller_plugin_identity=caller_plugin_identity, - permission=RUNTIME_ADMIN_PERMISSION, - durable_run_id=str(target_run_id), - target_runtime_id=str(runtime_id), - detail={ - 'status': str(data.get('status') or 'queued'), - 'status_reason': data.get('status_reason') or data.get('reason'), - }, - ) - return handler.ActionResponse.success(data=run) - except Exception as e: - self.ap.logger.error(f'RUN_RELEASE_CLAIM error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'Run release claim error: {e}') - - @self.action(PluginToRuntimeAction.STEERING_PULL) - async def steering_pull(data: dict[str, Any]) -> handler.ActionResponse: - """Pull pending steering/follow-up inputs for the current run.""" - run_id = data.get('run_id') - mode = data.get('mode', 'all') - limit = data.get('limit') - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - if limit is not None: - try: - limit = int(limit) - except (TypeError, ValueError): - return handler.ActionResponse.error(message='limit must be an integer') - if limit <= 0: - return handler.ActionResponse.error(message='limit must be > 0') - limit = min(limit, 100) - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'Steering pull', - api_capability='steering_pull', - ) - if error: - return error - - session_registry = get_session_registry() - items = await session_registry.pull_steering( - run_id, - mode=str(mode or 'all'), - limit=limit, - ) - if items: - try: - from ..agent.runner.event_log_store import EventLogStore - - store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) - for item in items: - event = item.get('event') if isinstance(item, dict) else None - conversation = item.get('conversation') if isinstance(item, dict) else None - actor = item.get('actor') if isinstance(item, dict) else None - subject = item.get('subject') if isinstance(item, dict) else None - if not isinstance(event, dict): - continue - await store.append_event( - event_id=None, - event_type='steering.injected', - source='agent_runner', - bot_id=conversation.get('bot_id') if isinstance(conversation, dict) else None, - workspace_id=conversation.get('workspace_id') if isinstance(conversation, dict) else None, - conversation_id=conversation.get('conversation_id') - if isinstance(conversation, dict) - else None, - thread_id=conversation.get('thread_id') if isinstance(conversation, dict) else None, - actor_type=actor.get('actor_type') if isinstance(actor, dict) else None, - actor_id=actor.get('actor_id') if isinstance(actor, dict) else None, - actor_name=actor.get('actor_name') if isinstance(actor, dict) else None, - subject_type=subject.get('subject_type') if isinstance(subject, dict) else None, - subject_id=subject.get('subject_id') if isinstance(subject, dict) else None, - input_summary=f'steering injected from {event.get("event_id")}', - run_id=run_id, - runner_id=session.get('runner_id') if isinstance(session, dict) else None, - metadata={ - 'steering': { - 'status': 'injected', - 'source_event_id': event.get('event_id'), - 'claimed_by_run_id': item.get('claimed_run_id') - if isinstance(item, dict) - else run_id, - 'claimed_runner_id': item.get('runner_id') if isinstance(item, dict) else None, - 'claimed_at': item.get('claimed_at') if isinstance(item, dict) else None, - 'pull_mode': str(mode or 'all'), - }, - }, - ) - except Exception as exc: - self.ap.logger.warning( - f'Failed to write steering injection audit for run {run_id}: {exc}', - exc_info=True, - ) - return handler.ActionResponse.success(data={'items': items}) - - # ================= State APIs (run-scoped, policy-enforced) ================= - - @self.action(PluginToRuntimeAction.STATE_GET) - async def state_get(data: dict[str, Any]) -> handler.ActionResponse: - """Get a state value from host-owned state store. - - Requires run_id authorization and scope enabled by state_policy. - """ - run_id = data.get('run_id') - scope = data.get('scope') - key = data.get('key') - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - if not scope: - return handler.ActionResponse.error(message='scope is required') - - if not key: - return handler.ActionResponse.error(message='key is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'State get', - api_capability='state', - ) - if error: - return error - - _state_context, scope_key, state_error = _resolve_state_scope(session, scope) - if state_error: - return state_error - - # Get state from persistent store - from ..agent.runner.persistent_state_store import get_persistent_state_store - - store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine()) - - try: - value = await store.state_get(scope_key, key) - return handler.ActionResponse.success(data={'value': value}) - except Exception as e: - self.ap.logger.error(f'STATE_GET error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'State get error: {e}') - - @self.action(PluginToRuntimeAction.STATE_SET) - async def state_set(data: dict[str, Any]) -> handler.ActionResponse: - """Set a state value in host-owned state store. - - Requires run_id authorization and scope enabled by state_policy. - Value must be JSON-serializable and size-limited. - """ - run_id = data.get('run_id') - scope = data.get('scope') - key = data.get('key') - value = data.get('value') - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - if not scope: - return handler.ActionResponse.error(message='scope is required') - - if not key: - return handler.ActionResponse.error(message='key is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'State set', - api_capability='state', - ) - if error: - return error - - state_context, scope_key, state_error = _resolve_state_scope(session, scope) - if state_error: - return state_error - - # Get additional context for DB insert - runner_id = session.get('runner_id', '') - binding_identity = state_context.get('binding_identity', 'unknown') - - # Set state in persistent store - from ..agent.runner.persistent_state_store import get_persistent_state_store - - store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine()) - - try: - success, error = await store.state_set( - scope_key=scope_key, - state_key=key, - value=value, - runner_id=runner_id, - binding_identity=binding_identity, - scope=scope, - context=state_context, - logger=self.ap.logger, - ) - - if not success: - return handler.ActionResponse.error(message=error or 'Failed to set state') - - return handler.ActionResponse.success(data={'success': True}) - except Exception as e: - self.ap.logger.error(f'STATE_SET error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'State set error: {e}') - - @self.action(PluginToRuntimeAction.STATE_DELETE) - async def state_delete(data: dict[str, Any]) -> handler.ActionResponse: - """Delete a state value from host-owned state store. - - Requires run_id authorization and scope enabled by state_policy. - """ - run_id = data.get('run_id') - scope = data.get('scope') - key = data.get('key') - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - if not scope: - return handler.ActionResponse.error(message='scope is required') - - if not key: - return handler.ActionResponse.error(message='key is required') - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'State delete', - api_capability='state', - ) - if error: - return error - - _state_context, scope_key, state_error = _resolve_state_scope(session, scope) - if state_error: - return state_error - - # Delete state from persistent store - from ..agent.runner.persistent_state_store import get_persistent_state_store - - store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine()) - - try: - deleted = await store.state_delete(scope_key, key) - return handler.ActionResponse.success(data={'success': deleted}) - except Exception as e: - self.ap.logger.error(f'STATE_DELETE error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'State delete error: {e}') - - @self.action(PluginToRuntimeAction.STATE_LIST) - async def state_list(data: dict[str, Any]) -> handler.ActionResponse: - """List state keys in a scope. - - Requires run_id authorization and scope enabled by state_policy. - """ - run_id = data.get('run_id') - scope = data.get('scope') - prefix = data.get('prefix') - limit = data.get('limit', 100) - caller_plugin_identity = data.get('caller_plugin_identity') - - if not run_id: - return handler.ActionResponse.error(message='run_id is required') - - if not scope: - return handler.ActionResponse.error(message='scope is required') - - # Validate limit - if not isinstance(limit, int) or limit <= 0: - limit = 100 - limit = min(limit, 100) # Cap at 100 - - session, error = await _validate_agent_run_session( - run_id, - caller_plugin_identity, - self.ap, - 'State list', - api_capability='state', - ) - if error: - return error - - _state_context, scope_key, state_error = _resolve_state_scope(session, scope) - if state_error: - return state_error - - # List state keys from persistent store - from ..agent.runner.persistent_state_store import get_persistent_state_store - - store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine()) - - try: - keys, has_more = await store.state_list(scope_key, prefix, limit) - return handler.ActionResponse.success( - data={ - 'keys': keys, - 'has_more': has_more, - } - ) - except Exception as e: - self.ap.logger.error(f'STATE_LIST error: {e}', exc_info=True) - return handler.ActionResponse.error(message=f'State list error: {e}') + agent_pull_actions.register(self) + agent_runner_actions.register(self) + agent_state_actions.register(self) @self.action(CommonAction.PING) async def ping(data: dict[str, Any]) -> handler.ActionResponse: diff --git a/tests/unit_tests/agent/test_state_api_auth.py b/tests/unit_tests/agent/test_state_api_auth.py index 5e1300f2c..315bdfb7b 100644 --- a/tests/unit_tests/agent/test_state_api_auth.py +++ b/tests/unit_tests/agent/test_state_api_auth.py @@ -90,7 +90,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) # Get the STATE_GET action handler (actions dict is keyed by action value string) @@ -111,7 +111,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -146,7 +146,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -182,7 +182,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -219,7 +219,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -255,7 +255,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -292,7 +292,7 @@ class TestStateAPIHandlerAuthorization: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -340,7 +340,7 @@ class TestStateAPIFullFlowWithRealDB: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) # Verify session has correct state_context @@ -446,7 +446,7 @@ class TestStateHandlerReadsFromAuthorizationSnapshot: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] @@ -490,7 +490,7 @@ class TestStateHandlerReadsFromAuthorizationSnapshot: async def fake_disconnect(): return True - with patch('langbot.pkg.plugin.handler.get_session_registry', return_value=session_registry): + with patch('langbot.pkg.plugin.agent_run_support.get_session_registry', return_value=session_registry): handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_set_handler = handler.actions[PluginToRuntimeAction.STATE_SET.value]