From aa4fdd11448d7bcdfa4248237148a840256d3fa8 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <60681390+huanghuoguoguo@users.noreply.github.com> Date: Mon, 15 Jun 2026 18:55:11 +0800 Subject: [PATCH] feat(agent-runner): add host admin permissions --- src/langbot/pkg/plugin/handler.py | 222 +++++++-- src/langbot/templates/config.yaml | 13 + .../agent/test_run_ledger_api_auth.py | 439 +++++++++++++++++- 3 files changed, 642 insertions(+), 32 deletions(-) diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 2e8a4700..77904757 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -37,10 +37,82 @@ class _RuntimeActionName: self.value = value +AGENT_RUN_ADMIN_PERMISSION = 'agent_run:admin' +RUNTIME_ADMIN_PERMISSION = 'runtime:admin' +AGENT_RUNNER_ADMIN_PERMISSION = 'agent_runner:admin' + + def _plugin_runtime_action(name: str, value: str) -> Any: return getattr(PluginToRuntimeAction, name, _RuntimeActionName(value)) +def _normalize_permission_set(value: Any) -> set[str]: + if isinstance(value, str): + return {permission.strip() for permission in value.split(',') if permission.strip()} + if isinstance(value, list): + return {str(item).strip() for item in value if str(item).strip()} + if isinstance(value, dict): + return {str(item).strip() for item, enabled in value.items() if enabled and str(item).strip()} + return set() + + +def _iter_agent_runner_admin_plugin_configs(ap: app.Application) -> list[dict[str, Any]]: + instance_config = getattr(ap, 'instance_config', None) + config_data = getattr(instance_config, 'data', {}) if instance_config is not None else {} + if not isinstance(config_data, dict): + return [] + agent_runner_config = config_data.get('agent_runner', {}) + if not isinstance(agent_runner_config, dict): + return [] + raw_admin_plugins = agent_runner_config.get('admin_plugins', []) + if isinstance(raw_admin_plugins, dict): + items: list[dict[str, Any]] = [] + for identity, entry in raw_admin_plugins.items(): + if isinstance(entry, dict): + merged = dict(entry) + merged.setdefault('identity', identity) + items.append(merged) + else: + items.append({'identity': identity, 'permissions': entry}) + return items + if isinstance(raw_admin_plugins, list): + return [item for item in raw_admin_plugins if isinstance(item, dict)] + return [] + + +def _agent_runner_admin_permissions(ap: app.Application, plugin_identity: str | None) -> set[str]: + if not isinstance(plugin_identity, str) or not plugin_identity.strip(): + return set() + normalized_identity = plugin_identity.strip() + permissions: set[str] = set() + for entry in _iter_agent_runner_admin_plugin_configs(ap): + if entry.get('enabled', True) is False: + continue + identity = entry.get('identity') or entry.get('plugin_identity') or entry.get('plugin') or entry.get('id') + if identity != normalized_identity: + continue + permissions.update(_normalize_permission_set(entry.get('permissions'))) + permissions.update(_normalize_permission_set(entry.get('scopes'))) + return permissions + + +def _has_agent_runner_admin_permission( + ap: app.Application, + plugin_identity: str | None, + permission: str, +) -> bool: + permissions = _agent_runner_admin_permissions(ap, plugin_identity) + if not permissions: + return False + domain = permission.split(':', 1)[0] + return bool( + permission in permissions + or f'{domain}:*' in permissions + or AGENT_RUNNER_ADMIN_PERMISSION in permissions + or '*' in permissions + ) + + def _deadline_seconds_from_payload(data: dict[str, Any], default: int = 60) -> int: deadline_at = data.get('heartbeat_deadline_at') if deadline_at is not None: @@ -256,8 +328,24 @@ async def _validate_agent_run_session( api_name: str, api_capability: str | None = None, allow_persistent_authorization: bool = False, + admin_permission: str | None = None, ) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]: """Validate an AgentRunner pull API run session and run-scoped API access.""" + if not run_id and admin_permission and _has_agent_runner_admin_permission( + ap, + caller_plugin_identity, + admin_permission, + ): + return { + 'run_id': run_id, + 'runner_id': None, + 'query_id': None, + 'plugin_identity': caller_plugin_identity, + 'authorization': {}, + 'status': {}, + 'steering_queue': [], + }, None + session_registry = get_session_registry() session = await session_registry.get(run_id) if not session: @@ -281,7 +369,12 @@ async def _validate_agent_run_session( if api_capability: available_apis = _get_run_authorization(session).get('available_apis', {}) - if not available_apis.get(api_capability, False): + has_admin_permission = bool(admin_permission) and _has_agent_runner_admin_permission( + ap, + caller_plugin_identity, + admin_permission, + ) + if not available_apis.get(api_capability, False) and not has_admin_permission: return None, handler.ActionResponse.error(message=f'{api_name} access not authorized') return session, None @@ -1930,8 +2023,13 @@ class RuntimeConnectionHandler(handler.Handler): run_id = data.get('run_id') target_run_id = data.get('target_run_id') or run_id caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -1943,6 +2041,7 @@ class RuntimeConnectionHandler(handler.Handler): 'Run get', api_capability='run_get', allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, ) if error: return error @@ -1955,7 +2054,7 @@ class RuntimeConnectionHandler(handler.Handler): run = await store.get_run(str(target_run_id)) if not run: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not _run_matches_run_scope(session, run): + if not is_admin and not _run_matches_run_scope(session, run): return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run') return handler.ActionResponse.success(data=run) except Exception as e: @@ -1971,10 +2070,16 @@ class RuntimeConnectionHandler(handler.Handler): before_cursor = data.get('before_cursor') limit = data.get('limit', 50) caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') + scope_filters: dict[str, Any] = {} session, error = await _validate_agent_run_session( run_id, caller_plugin_identity, @@ -1982,19 +2087,22 @@ class RuntimeConnectionHandler(handler.Handler): 'Run list', api_capability='run_list', allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, ) if error: return error - conversation_id, scope_error = _resolve_run_conversation( - session, - conversation_id, - 'Run list', - ) - if scope_error: - return scope_error + if not is_admin: + conversation_id, scope_error = _resolve_run_conversation( + session, + conversation_id, + 'Run list', + ) + if scope_error: + return scope_error + scope_filters = _run_scope_filters(session) - if not conversation_id: + if not is_admin and not conversation_id: return handler.ActionResponse.success( data={ 'items': [], @@ -2021,7 +2129,7 @@ class RuntimeConnectionHandler(handler.Handler): statuses=[str(status) for status in statuses] if statuses else None, before_id=before_id, limit=limit, - **_run_scope_filters(session), + **scope_filters, ) return handler.ActionResponse.success( data={ @@ -2045,8 +2153,13 @@ class RuntimeConnectionHandler(handler.Handler): limit = data.get('limit', 50) direction = data.get('direction', 'forward') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -2058,6 +2171,7 @@ class RuntimeConnectionHandler(handler.Handler): 'Run events page', api_capability='run_events_page', allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, ) if error: return error @@ -2076,7 +2190,7 @@ class RuntimeConnectionHandler(handler.Handler): run = await store.get_run(str(target_run_id)) if not run: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not _run_matches_run_scope(session, run): + if not is_admin and not _run_matches_run_scope(session, run): return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run') items, next_cursor, prev_cursor, has_more = await store.page_run_events( @@ -2104,8 +2218,13 @@ class RuntimeConnectionHandler(handler.Handler): run_id = data.get('run_id') target_run_id = data.get('target_run_id') or run_id caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -2117,6 +2236,7 @@ class RuntimeConnectionHandler(handler.Handler): 'Run cancel', api_capability='run_cancel', allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, ) if error: return error @@ -2129,7 +2249,7 @@ class RuntimeConnectionHandler(handler.Handler): run = await store.get_run(str(target_run_id)) if not run: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not _run_matches_run_scope(session, run): + if not is_admin and not _run_matches_run_scope(session, run): return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run') updated = await store.request_cancel( @@ -2150,8 +2270,13 @@ class RuntimeConnectionHandler(handler.Handler): target_run_id = data.get('target_run_id') or run_id caller_plugin_identity = data.get('caller_plugin_identity') result = data.get('result') if isinstance(data.get('result'), dict) else {} + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -2177,6 +2302,7 @@ class RuntimeConnectionHandler(handler.Handler): 'Run append result', api_capability='run_append_result', allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, ) if error: return error @@ -2189,7 +2315,7 @@ class RuntimeConnectionHandler(handler.Handler): run = await store.get_run(str(target_run_id)) if not run: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not _run_matches_run_scope(session, run): + if not is_admin and not _run_matches_run_scope(session, run): return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run') event = await store.append_event( @@ -2214,8 +2340,13 @@ class RuntimeConnectionHandler(handler.Handler): target_run_id = data.get('target_run_id') or run_id caller_plugin_identity = data.get('caller_plugin_identity') status = data.get('status') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -2229,6 +2360,7 @@ class RuntimeConnectionHandler(handler.Handler): 'Run finalize', api_capability='run_finalize', allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, ) if error: return error @@ -2241,7 +2373,7 @@ class RuntimeConnectionHandler(handler.Handler): run = await store.get_run(str(target_run_id)) if not run: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') - if not _run_matches_run_scope(session, run): + if not is_admin and not _run_matches_run_scope(session, run): return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run') updated = await store.finalize_run( @@ -2265,8 +2397,13 @@ class RuntimeConnectionHandler(handler.Handler): run_id = data.get('run_id') runtime_id = data.get('runtime_id') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not runtime_id: return handler.ActionResponse.error(message='runtime_id is required') @@ -2277,6 +2414,7 @@ class RuntimeConnectionHandler(handler.Handler): self.ap, 'Runtime register', api_capability='runtime_register', + admin_permission=RUNTIME_ADMIN_PERMISSION, ) if error: return error @@ -2308,8 +2446,13 @@ class RuntimeConnectionHandler(handler.Handler): run_id = data.get('run_id') runtime_id = data.get('runtime_id') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not runtime_id: return handler.ActionResponse.error(message='runtime_id is required') @@ -2320,6 +2463,7 @@ class RuntimeConnectionHandler(handler.Handler): self.ap, 'Runtime heartbeat', api_capability='runtime_heartbeat', + admin_permission=RUNTIME_ADMIN_PERMISSION, ) if error: return error @@ -2349,8 +2493,13 @@ class RuntimeConnectionHandler(handler.Handler): """List Host-owned runtime registry records.""" run_id = data.get('run_id') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') _session, error = await _validate_agent_run_session( @@ -2359,6 +2508,7 @@ class RuntimeConnectionHandler(handler.Handler): self.ap, 'Runtime list', api_capability='runtime_list', + admin_permission=RUNTIME_ADMIN_PERMISSION, ) if error: return error @@ -2401,8 +2551,13 @@ class RuntimeConnectionHandler(handler.Handler): run_id = data.get('run_id') runtime_id = data.get('runtime_id') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not runtime_id: return handler.ActionResponse.error(message='runtime_id is required') @@ -2413,6 +2568,7 @@ class RuntimeConnectionHandler(handler.Handler): self.ap, 'Run claim', api_capability='run_claim', + admin_permission=RUNTIME_ADMIN_PERMISSION, ) if error: return error @@ -2447,8 +2603,13 @@ class RuntimeConnectionHandler(handler.Handler): runtime_id = data.get('runtime_id') claim_token = data.get('claim_token') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -2463,6 +2624,7 @@ class RuntimeConnectionHandler(handler.Handler): self.ap, 'Run renew claim', api_capability='run_renew_claim', + admin_permission=RUNTIME_ADMIN_PERMISSION, ) if error: return error @@ -2495,8 +2657,13 @@ class RuntimeConnectionHandler(handler.Handler): runtime_id = data.get('runtime_id') claim_token = data.get('claim_token') caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) - if not run_id: + if not is_admin and not run_id: return handler.ActionResponse.error(message='run_id is required') if not target_run_id: return handler.ActionResponse.error(message='target_run_id is required') @@ -2511,6 +2678,7 @@ class RuntimeConnectionHandler(handler.Handler): self.ap, 'Run release claim', api_capability='run_release_claim', + admin_permission=RUNTIME_ADMIN_PERMISSION, ) if error: return error diff --git a/src/langbot/templates/config.yaml b/src/langbot/templates/config.yaml index c0002e8f..1e6768d8 100644 --- a/src/langbot/templates/config.yaml +++ b/src/langbot/templates/config.yaml @@ -107,6 +107,19 @@ plugin: binary_storage: # Max bytes for a single plugin binary storage value max_value_bytes: 10485760 +agent_runner: + # Host-level admin permissions for trusted control plugins. These plugins + # can use existing plugin action handlers to inspect or manage AgentRunner + # infrastructure across runner/plugin boundaries. Keep empty unless you + # fully trust the plugin identity. + # + # Example: + # admin_plugins: + # - identity: langbot/agent-runner-control + # permissions: + # - agent_run:admin + # - runtime:admin + admin_plugins: [] monitoring: auto_cleanup: # Enable automatic cleanup of expired monitoring records diff --git a/tests/unit_tests/agent/test_run_ledger_api_auth.py b/tests/unit_tests/agent/test_run_ledger_api_auth.py index ebe42767..67f5b367 100644 --- a/tests/unit_tests/agent/test_run_ledger_api_auth.py +++ b/tests/unit_tests/agent/test_run_ledger_api_auth.py @@ -2,6 +2,7 @@ from __future__ import annotations +from types import SimpleNamespace from unittest.mock import MagicMock import pytest @@ -28,10 +29,17 @@ class FakeConnection: class FakeApplication: - def __init__(self, db_engine): + def __init__(self, db_engine, admin_plugins=None): self.logger = MagicMock() self.persistence_mgr = MagicMock() self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine) + self.instance_config = SimpleNamespace( + data={ + 'agent_runner': { + 'admin_plugins': admin_plugins or [], + } + } + ) @pytest.fixture @@ -54,11 +62,11 @@ async def db_engine(): await engine.dispose() -def _handler(db_engine): +def _handler(db_engine, admin_plugins=None): async def fake_disconnect(): return True - fake_app = FakeApplication(db_engine) + fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins) return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) @@ -92,6 +100,7 @@ async def _create_run( workspace_id='workspace_1', thread_id=None, plugin_identity='test/runner', + runner_id='plugin:test/runner/default', available_apis=None, ): store = RunLedgerStore(db_engine) @@ -99,13 +108,13 @@ async def _create_run( run_id=run_id, event_id='evt_1', binding_id='binding_1', - runner_id='plugin:test/runner/default', + runner_id=runner_id, conversation_id=conversation_id, bot_id=bot_id, workspace_id=workspace_id, thread_id=thread_id, authorization={ - 'runner_id': 'plugin:test/runner/default', + 'runner_id': runner_id, 'binding_id': 'binding_1', 'plugin_identity': plugin_identity, 'resources': make_resources(), @@ -281,6 +290,426 @@ async def test_persistent_authorization_does_not_reopen_artifact_api(db_engine): assert 'not found or expired' in result.message.lower() +@pytest.mark.asyncio +async def test_agent_run_admin_can_list_all_runs_with_own_run_session(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + await _create_run(db_engine) + await _create_run( + db_engine, + run_id='run_other', + conversation_id='conv_other', + bot_id='bot_other', + workspace_id='workspace_other', + plugin_identity='other/runner', + runner_id='plugin:other/runner/default', + available_apis={'run_list': True}, + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': ['agent_run:admin'], + } + ], + ) + run_list = handler.actions[PluginToRuntimeAction.RUN_LIST.value] + + result = await run_list( + { + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + 'statuses': ['running'], + } + ) + + assert result.code == 0 + page = RunPage.model_validate(result.data) + assert [run.run_id for run in page.items] == ['run_other', 'run_1'] + + +@pytest.mark.asyncio +async def test_agent_run_admin_permission_string_allows_without_run_id(db_engine): + await _create_run(db_engine) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': 'agent_run:admin', + } + ], + ) + run_list = handler.actions[PluginToRuntimeAction.RUN_LIST.value] + + result = await run_list( + { + 'caller_plugin_identity': 'test/runner', + } + ) + + assert result.code == 0 + page = RunPage.model_validate(result.data) + assert [run.run_id for run in page.items] == ['run_1'] + + +@pytest.mark.asyncio +async def test_agent_run_admin_can_get_and_page_cross_scope_with_own_run_session(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + await _create_run( + db_engine, + run_id='run_other', + conversation_id='conv_other', + bot_id='bot_other', + workspace_id='workspace_other', + plugin_identity='other/runner', + runner_id='plugin:other/runner/default', + available_apis={'run_get': True, 'run_events_page': True}, + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': ['agent_run:admin'], + } + ], + ) + run_get = handler.actions[PluginToRuntimeAction.RUN_GET.value] + run_events_page = handler.actions[PluginToRuntimeAction.RUN_EVENTS_PAGE.value] + + run_result = await run_get( + { + 'run_id': 'run_1', + 'target_run_id': 'run_other', + 'caller_plugin_identity': 'test/runner', + } + ) + events_result = await run_events_page( + { + 'run_id': 'run_1', + 'target_run_id': 'run_other', + 'caller_plugin_identity': 'test/runner', + } + ) + + assert run_result.code == 0 + assert AgentRun.model_validate(run_result.data).run_id == 'run_other' + assert events_result.code == 0 + page = RunEventPage.model_validate(events_result.data) + assert [event.type for event in page.items] == ['message.completed'] + + +@pytest.mark.asyncio +async def test_agent_run_admin_can_get_and_page_cross_scope_without_run_id(db_engine): + await _create_run( + db_engine, + run_id='run_other', + conversation_id='conv_other', + bot_id='bot_other', + workspace_id='workspace_other', + plugin_identity='other/runner', + runner_id='plugin:other/runner/default', + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'langbot/control', + 'permissions': ['agent_run:admin'], + } + ], + ) + run_get = handler.actions[PluginToRuntimeAction.RUN_GET.value] + run_events_page = handler.actions[PluginToRuntimeAction.RUN_EVENTS_PAGE.value] + + run_result = await run_get( + { + 'target_run_id': 'run_other', + 'caller_plugin_identity': 'langbot/control', + } + ) + events_result = await run_events_page( + { + 'target_run_id': 'run_other', + 'caller_plugin_identity': 'langbot/control', + } + ) + + assert run_result.code == 0 + assert AgentRun.model_validate(run_result.data).run_id == 'run_other' + assert events_result.code == 0 + page = RunEventPage.model_validate(events_result.data) + assert [event.type for event in page.items] == ['message.completed'] + + +@pytest.mark.asyncio +async def test_unconfigured_plugin_cannot_use_admin_run_actions_without_run_id(db_engine): + await _create_run(db_engine) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'langbot/control', + 'permissions': ['agent_run:admin'], + } + ], + ) + run_list = handler.actions[PluginToRuntimeAction.RUN_LIST.value] + + result = await run_list( + { + 'caller_plugin_identity': 'test/runner', + } + ) + + assert result.code != 0 + assert 'run_id is required' in result.message.lower() + + +@pytest.mark.asyncio +async def test_agent_run_admin_can_cancel_cross_scope_with_own_run_session(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + await _create_run( + db_engine, + run_id='run_other', + conversation_id='conv_other', + bot_id='bot_other', + workspace_id='workspace_other', + plugin_identity='other/runner', + runner_id='plugin:other/runner/default', + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': ['agent_run:admin'], + } + ], + ) + run_cancel = handler.actions[PluginToRuntimeAction.RUN_CANCEL.value] + + result = await run_cancel( + { + 'run_id': 'run_1', + 'target_run_id': 'run_other', + 'caller_plugin_identity': 'test/runner', + 'reason': 'admin requested', + } + ) + + assert result.code == 0 + run = AgentRun.model_validate(result.data) + assert run.run_id == 'run_other' + assert run.cancel_requested_at is not None + assert run.status_reason == 'admin requested' + + +@pytest.mark.asyncio +async def test_configured_admin_identity_cannot_be_spoofed_with_other_run_session(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + await _create_run(db_engine) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'langbot/control', + 'permissions': ['agent_run:admin'], + } + ], + ) + run_get = handler.actions[PluginToRuntimeAction.RUN_GET.value] + + result = await run_get( + { + 'run_id': 'run_1', + 'target_run_id': 'run_1', + 'caller_plugin_identity': 'langbot/control', + } + ) + + assert result.code != 0 + assert 'mismatch' in result.message.lower() + + +@pytest.mark.asyncio +async def test_agent_run_admin_permission_does_not_grant_runtime_admin(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': ['agent_run:admin'], + } + ], + ) + runtime_list = handler.actions[PluginToRuntimeAction.RUNTIME_LIST.value] + + result = await runtime_list( + { + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + } + ) + + assert result.code != 0 + assert 'not authorized' in result.message.lower() + + +@pytest.mark.asyncio +async def test_runtime_admin_can_register_list_and_claim_with_own_run_session(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + await RunLedgerStore(db_engine).create_run( + run_id='queued_run', + event_id='evt_queued', + binding_id='binding_1', + runner_id='plugin:other/runner/default', + conversation_id='conv_1', + bot_id='bot_1', + workspace_id='workspace_1', + status='queued', + queue_name='default', + priority=5, + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': ['runtime:admin'], + } + ], + ) + runtime_register = handler.actions[PluginToRuntimeAction.RUNTIME_REGISTER.value] + runtime_list = handler.actions[PluginToRuntimeAction.RUNTIME_LIST.value] + run_claim = handler.actions[PluginToRuntimeAction.RUN_CLAIM.value] + + registered = await runtime_register( + { + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + 'runtime_id': 'runtime_1', + 'display_name': 'Runtime 1', + 'labels': {'region': 'test'}, + } + ) + page = await runtime_list( + { + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + 'statuses': ['online'], + 'labels': {'region': 'test'}, + } + ) + claimed = await run_claim( + { + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + 'runtime_id': 'runtime_1', + 'queue_name': 'default', + 'runner_ids': ['plugin:other/runner/default'], + } + ) + + assert registered.code == 0 + assert registered.data['runtime_id'] == 'runtime_1' + assert page.code == 0 + assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1'] + assert claimed.code == 0 + assert claimed.data['run_id'] == 'queued_run' + assert claimed.data['claimed_by_runtime_id'] == 'runtime_1' + + +@pytest.mark.asyncio +async def test_runtime_admin_can_register_list_and_claim_without_run_id(db_engine): + await RunLedgerStore(db_engine).create_run( + run_id='queued_run', + event_id='evt_queued', + binding_id='binding_1', + runner_id='plugin:other/runner/default', + conversation_id='conv_1', + bot_id='bot_1', + workspace_id='workspace_1', + status='queued', + queue_name='default', + priority=5, + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'langbot/control', + 'permissions': ['runtime:admin'], + } + ], + ) + runtime_register = handler.actions[PluginToRuntimeAction.RUNTIME_REGISTER.value] + runtime_list = handler.actions[PluginToRuntimeAction.RUNTIME_LIST.value] + run_claim = handler.actions[PluginToRuntimeAction.RUN_CLAIM.value] + + registered = await runtime_register( + { + 'caller_plugin_identity': 'langbot/control', + 'runtime_id': 'runtime_1', + 'display_name': 'Runtime 1', + 'labels': {'region': 'test'}, + } + ) + page = await runtime_list( + { + 'caller_plugin_identity': 'langbot/control', + 'statuses': ['online'], + 'labels': {'region': 'test'}, + } + ) + claimed = await run_claim( + { + 'caller_plugin_identity': 'langbot/control', + 'runtime_id': 'runtime_1', + 'queue_name': 'default', + 'runner_ids': ['plugin:other/runner/default'], + } + ) + + assert registered.code == 0 + assert registered.data['runtime_id'] == 'runtime_1' + assert page.code == 0 + assert [item['runtime_id'] for item in page.data['items']] == ['runtime_1'] + assert claimed.code == 0 + assert claimed.data['run_id'] == 'queued_run' + assert claimed.data['claimed_by_runtime_id'] == 'runtime_1' + + +@pytest.mark.asyncio +async def test_disabled_admin_plugin_entry_does_not_grant_access(session_registry, db_engine): + await _register_session(session_registry, available_apis={}) + await _create_run(db_engine) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'test/runner', + 'permissions': ['agent_run:admin', 'runtime:admin'], + 'enabled': False, + } + ], + ) + run_get = handler.actions[PluginToRuntimeAction.RUN_GET.value] + + result = await run_get( + { + 'run_id': 'run_1', + 'target_run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + } + ) + + assert result.code != 0 + assert 'not authorized' in result.message.lower() + + @pytest.mark.asyncio async def test_run_cancel_basic_path(session_registry, db_engine): await _register_session(session_registry, available_apis={'run_cancel': True})