diff --git a/src/langbot/pkg/agent/runner/run_ledger_store.py b/src/langbot/pkg/agent/runner/run_ledger_store.py index 0c447b37..5f0b2de7 100644 --- a/src/langbot/pkg/agent/runner/run_ledger_store.py +++ b/src/langbot/pkg/agent/runner/run_ledger_store.py @@ -211,6 +211,44 @@ class RunLedgerStore: await session.commit() return self._run_to_dict(run) + async def release_expired_claims( + self, + *, + now: datetime.datetime | None = None, + status: str = 'queued', + status_reason: str = 'claim lease expired', + limit: int = 100, + ) -> list[dict[str, typing.Any]]: + """Release claimed runs whose claim lease has expired.""" + current_time = now or _utc_now() + if current_time.tzinfo is None: + current_time = current_time.replace(tzinfo=UTC) + limit = min(max(int(limit), 1), 500) + + async with self._session_factory() as session: + result = await session.execute( + sqlalchemy.select(AgentRun) + .where( + AgentRun.status == 'claimed', + AgentRun.claim_lease_expires_at.is_not(None), + AgentRun.claim_lease_expires_at <= current_time, + ) + .order_by(AgentRun.claim_lease_expires_at.asc(), AgentRun.id.asc()) + .limit(limit) + ) + runs = result.scalars().all() + for run in runs: + run.status = status + run.status_reason = status_reason + run.claimed_by_runtime_id = None + run.claim_token = None + run.claim_lease_expires_at = None + run.updated_at = current_time + if status in TERMINAL_STATUSES: + run.finished_at = run.finished_at or current_time + await session.commit() + return [self._run_to_dict(run) for run in runs] + async def append_event( self, *, @@ -254,6 +292,41 @@ class RunLedgerStore: await session.commit() return self._event_to_dict(row) + async def append_audit_event( + self, + *, + run_id: str, + event_type: str, + data: dict[str, typing.Any] | None = None, + metadata: dict[str, typing.Any] | None = None, + ) -> dict[str, typing.Any] | None: + """Append a Host-authored audit event after the current max sequence.""" + async with self._session_factory() as session: + run = await self._get_run_row(session, run_id) + if run is None: + return None + + result = await session.execute( + sqlalchemy.select(sqlalchemy.func.max(AgentRunEvent.sequence)).where( + AgentRunEvent.run_id == run_id, + ) + ) + next_sequence = int(result.scalar_one_or_none() or 0) + 1 + row = AgentRunEvent( + run_id=run_id, + sequence=next_sequence, + type=event_type, + data_json=_json_dumps(data or {}), + usage_json=None, + created_at=_utc_now(), + source='host', + artifact_refs_json=_json_dumps([]), + metadata_json=_json_dumps(metadata or {}), + ) + session.add(row) + await session.commit() + return self._event_to_dict(row) + async def finalize_run( self, *, @@ -410,16 +483,34 @@ class RunLedgerStore: *, now: datetime.datetime | None = None, stale_status: str = 'stale', + stale_after_seconds: int | float | None = None, ) -> list[dict[str, typing.Any]]: """Mark runtimes stale when their heartbeat deadline has passed.""" current_time = now or _utc_now() if current_time.tzinfo is None: current_time = current_time.replace(tzinfo=UTC) + stale_conditions: list[typing.Any] = [ + sqlalchemy.and_( + AgentRuntime.heartbeat_deadline_at.is_not(None), + AgentRuntime.heartbeat_deadline_at < current_time, + ) + ] + if stale_after_seconds is not None: + try: + stale_after_delta = datetime.timedelta(seconds=max(float(stale_after_seconds), 0)) + except (TypeError, ValueError): + stale_after_delta = None + if stale_after_delta is not None: + stale_conditions.append( + sqlalchemy.and_( + AgentRuntime.last_heartbeat_at.is_not(None), + AgentRuntime.last_heartbeat_at < current_time - stale_after_delta, + ) + ) async with self._session_factory() as session: result = await session.execute( sqlalchemy.select(AgentRuntime).where( - AgentRuntime.heartbeat_deadline_at.is_not(None), - AgentRuntime.heartbeat_deadline_at < current_time, + sqlalchemy.or_(*stale_conditions), AgentRuntime.status != stale_status, ) ) diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 77904757..2d78cb9b 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -495,6 +495,67 @@ def _project_event_record_for_api(event: dict[str, Any]) -> dict[str, Any]: } +def _project_runner_descriptor_for_api(descriptor: Any) -> dict[str, Any]: + """Project an AgentRunnerDescriptor-like object onto a JSON dict.""" + if isinstance(descriptor, dict): + return dict(descriptor) + if hasattr(descriptor, 'model_dump'): + return descriptor.model_dump(mode='json') + return { + 'id': getattr(descriptor, 'id', None), + 'source': getattr(descriptor, 'source', None), + 'label': getattr(descriptor, 'label', {}), + 'description': getattr(descriptor, 'description', None), + 'plugin_author': getattr(descriptor, 'plugin_author', None), + 'plugin_name': getattr(descriptor, 'plugin_name', None), + 'runner_name': getattr(descriptor, 'runner_name', None), + 'plugin_version': getattr(descriptor, 'plugin_version', None), + 'config_schema': getattr(descriptor, 'config_schema', []), + 'capabilities': getattr(descriptor, 'capabilities', {}), + 'permissions': getattr(descriptor, 'permissions', {}), + 'raw_manifest': getattr(descriptor, 'raw_manifest', {}), + } + + +async def _record_agent_runner_admin_action( + ap: app.Application, + store: Any, + *, + action: str, + caller_plugin_identity: str | None, + permission: str, + durable_run_id: str | None = None, + target_runtime_id: str | None = None, + detail: dict[str, Any] | None = None, +) -> None: + """Record a small audit trail for privileged AgentRunner operations.""" + audit_data: dict[str, Any] = { + 'action': action, + 'caller_plugin_identity': caller_plugin_identity, + 'permission': permission, + } + if durable_run_id: + audit_data['target_run_id'] = durable_run_id + if target_runtime_id: + audit_data['target_runtime_id'] = target_runtime_id + if detail: + audit_data['detail'] = detail + + ap.logger.info('Agent runner admin action: %s', audit_data) + if not durable_run_id or store is None or not hasattr(store, 'append_audit_event'): + return + + try: + await store.append_audit_event( + run_id=str(durable_run_id), + event_type=f'admin.{action}', + data=audit_data, + metadata={'permission': permission}, + ) + except Exception as exc: + ap.logger.warning(f'Failed to record AgentRunner admin audit event: {exc}', exc_info=True) + + def _normalize_uuid_list(values: Any) -> list[str]: """Normalize a user/config supplied UUID list while preserving order.""" if not isinstance(values, list): @@ -2056,6 +2117,15 @@ class RuntimeConnectionHandler(handler.Handler): return handler.ActionResponse.error(message=f'Run {target_run_id} not found') if not is_admin and not _run_matches_run_scope(session, run): return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run') + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_get', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={'target_run_id': str(target_run_id)}, + ) return handler.ActionResponse.success(data=run) except Exception as e: self.ap.logger.error(f'RUN_GET error: {e}', exc_info=True) @@ -2131,6 +2201,18 @@ class RuntimeConnectionHandler(handler.Handler): limit=limit, **scope_filters, ) + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_list', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'statuses': [str(status) for status in statuses] if statuses else None, + 'limit': limit, + }, + ) return handler.ActionResponse.success( data={ 'items': items, @@ -2143,6 +2225,65 @@ class RuntimeConnectionHandler(handler.Handler): self.ap.logger.error(f'RUN_LIST error: {e}', exc_info=True) return handler.ActionResponse.error(message=f'Run list error: {e}') + @self.action(_plugin_runtime_action('RUNNER_LIST', 'runner_list')) + async def runner_list(data: dict[str, Any]) -> handler.ActionResponse: + """List Host-discovered AgentRunner descriptors.""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + AGENT_RUN_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runner list access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'Runner list', + api_capability='runner_list', + allow_persistent_authorization=True, + admin_permission=AGENT_RUN_ADMIN_PERMISSION, + ) + if error: + return error + + include_plugins = data.get('include_plugins') + if include_plugins is not None and not isinstance(include_plugins, list): + return handler.ActionResponse.error(message='include_plugins must be a list') + + registry = getattr(self.ap, 'agent_runner_registry', None) + if registry is None: + return handler.ActionResponse.success(data={'items': []}) + + try: + runners = await registry.list_runners( + bound_plugins=[str(item) for item in include_plugins] if include_plugins else None, + use_cache=bool(data.get('use_cache', True)), + ) + items = [_project_runner_descriptor_for_api(item) for item in runners] + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + None, + action='runner_list', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={ + 'include_plugins': [str(item) for item in include_plugins] + if include_plugins + else None, + 'count': len(items), + }, + ) + return handler.ActionResponse.success(data={'items': items}) + except Exception as e: + self.ap.logger.error(f'RUNNER_LIST error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runner list error: {e}') + @self.action(_plugin_runtime_action('RUN_EVENTS_PAGE', 'run_events_page')) async def run_events_page(data: dict[str, Any]) -> handler.ActionResponse: """Page result events for one Host-owned run visible to current run.""" @@ -2200,6 +2341,15 @@ class RuntimeConnectionHandler(handler.Handler): limit=limit, direction=str(direction or 'forward'), ) + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_events_page', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + detail={'target_run_id': str(target_run_id), 'limit': limit}, + ) return handler.ActionResponse.success( data={ 'items': items, @@ -2258,6 +2408,16 @@ class RuntimeConnectionHandler(handler.Handler): ) if not updated: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_cancel', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + detail={'status_reason': data.get('status_reason') or data.get('reason')}, + ) return handler.ActionResponse.success(data=updated) except Exception as e: self.ap.logger.error(f'RUN_CANCEL error: {e}', exc_info=True) @@ -2328,6 +2488,16 @@ class RuntimeConnectionHandler(handler.Handler): artifact_refs=artifact_refs, metadata=metadata, ) + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_append_result', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + detail={'event_type': str(event_type), 'sequence': sequence}, + ) return handler.ActionResponse.success(data=event) except Exception as e: self.ap.logger.error(f'RUN_APPEND_RESULT error: {e}', exc_info=True) @@ -2386,6 +2556,16 @@ class RuntimeConnectionHandler(handler.Handler): ) if not updated: return handler.ActionResponse.error(message=f'Run {target_run_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='run_finalize', + caller_plugin_identity=caller_plugin_identity, + permission=AGENT_RUN_ADMIN_PERMISSION, + durable_run_id=str(target_run_id), + detail={'status': str(status)}, + ) return handler.ActionResponse.success(data=updated) except Exception as e: self.ap.logger.error(f'RUN_FINALIZE error: {e}', exc_info=True) @@ -2435,6 +2615,16 @@ class RuntimeConnectionHandler(handler.Handler): metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else {}, heartbeat_deadline_seconds=_deadline_seconds_from_payload(data), ) + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='runtime_register', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + target_runtime_id=str(runtime_id), + detail={'status': runtime.get('status')}, + ) return handler.ActionResponse.success(data=runtime) except Exception as e: self.ap.logger.error(f'RUNTIME_REGISTER error: {e}', exc_info=True) @@ -2483,6 +2673,16 @@ class RuntimeConnectionHandler(handler.Handler): ) if runtime is None: return handler.ActionResponse.error(message=f'Runtime {runtime_id} not found') + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='runtime_heartbeat', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + target_runtime_id=str(runtime_id), + detail={'status': runtime.get('status')}, + ) return handler.ActionResponse.success(data=runtime) except Exception as e: self.ap.logger.error(f'RUNTIME_HEARTBEAT error: {e}', exc_info=True) @@ -2533,6 +2733,18 @@ class RuntimeConnectionHandler(handler.Handler): for runtime in runtimes if all(runtime.get('labels', {}).get(key) == value for key, value in labels.items()) ] + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='runtime_list', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + detail={ + 'statuses': [str(status) for status in statuses] if statuses else None, + 'limit': data.get('limit', 50), + }, + ) return handler.ActionResponse.success( data={ 'items': runtimes, @@ -2545,6 +2757,71 @@ class RuntimeConnectionHandler(handler.Handler): self.ap.logger.error(f'RUNTIME_LIST error: {e}', exc_info=True) return handler.ActionResponse.error(message=f'Runtime list error: {e}') + @self.action(_plugin_runtime_action('RUNTIME_RECONCILE', 'runtime_reconcile')) + async def runtime_reconcile(data: dict[str, Any]) -> handler.ActionResponse: + """Reconcile stale runtime heartbeats and expired claim leases.""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + is_admin = _has_agent_runner_admin_permission( + self.ap, + caller_plugin_identity, + RUNTIME_ADMIN_PERMISSION, + ) + + if not is_admin: + return handler.ActionResponse.error(message='Runtime reconcile access not authorized') + + _session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'Runtime reconcile', + api_capability='runtime_reconcile', + admin_permission=RUNTIME_ADMIN_PERMISSION, + ) + if error: + return error + + stale_after_seconds = data.get('stale_after_seconds') + if stale_after_seconds is not None: + try: + stale_after_seconds = max(float(stale_after_seconds), 0) + except (TypeError, ValueError): + return handler.ActionResponse.error(message='stale_after_seconds must be a number') + + from ..agent.runner.run_ledger_store import RunLedgerStore + + store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine()) + + try: + stale_runtimes = await store.mark_stale_runtimes( + stale_after_seconds=stale_after_seconds, + ) + released_claims = await store.release_expired_claims() + if is_admin: + await _record_agent_runner_admin_action( + self.ap, + store, + action='runtime_reconcile', + caller_plugin_identity=caller_plugin_identity, + permission=RUNTIME_ADMIN_PERMISSION, + detail={ + 'stale_count': len(stale_runtimes), + 'released_claim_count': len(released_claims), + }, + ) + return handler.ActionResponse.success( + data={ + 'stale_runtimes': stale_runtimes, + 'released_claims': released_claims, + 'stale_count': len(stale_runtimes), + 'released_claim_count': len(released_claims), + } + ) + except Exception as e: + self.ap.logger.error(f'RUNTIME_RECONCILE error: {e}', exc_info=True) + return handler.ActionResponse.error(message=f'Runtime reconcile error: {e}') + @self.action(_plugin_runtime_action('RUN_CLAIM', 'run_claim')) async def run_claim(data: dict[str, Any]) -> handler.ActionResponse: """Claim one queued run for a runtime lease.""" diff --git a/tests/unit_tests/agent/test_run_ledger_api_auth.py b/tests/unit_tests/agent/test_run_ledger_api_auth.py index 67f5b367..d1f6cb19 100644 --- a/tests/unit_tests/agent/test_run_ledger_api_auth.py +++ b/tests/unit_tests/agent/test_run_ledger_api_auth.py @@ -2,11 +2,15 @@ from __future__ import annotations +import datetime from types import SimpleNamespace from unittest.mock import MagicMock import pytest +import sqlalchemy +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.orm import sessionmaker from langbot.pkg.agent.runner.run_ledger_store import RunLedgerStore from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry @@ -29,10 +33,11 @@ class FakeConnection: class FakeApplication: - def __init__(self, db_engine, admin_plugins=None): + def __init__(self, db_engine, admin_plugins=None, runner_registry=None): self.logger = MagicMock() self.persistence_mgr = MagicMock() self.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine) + self.agent_runner_registry = runner_registry self.instance_config = SimpleNamespace( data={ 'agent_runner': { @@ -62,11 +67,21 @@ async def db_engine(): await engine.dispose() -def _handler(db_engine, admin_plugins=None): +class FakeRunnerRegistry: + def __init__(self, runners): + self.runners = runners + self.calls = [] + + async def list_runners(self, *, bound_plugins=None, use_cache=True): + self.calls.append({'bound_plugins': bound_plugins, 'use_cache': use_cache}) + return self.runners + + +def _handler(db_engine, admin_plugins=None, runner_registry=None): async def fake_disconnect(): return True - fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins) + fake_app = FakeApplication(db_engine, admin_plugins=admin_plugins, runner_registry=runner_registry) return RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) @@ -353,6 +368,60 @@ async def test_agent_run_admin_permission_string_allows_without_run_id(db_engine assert [run.run_id for run in page.items] == ['run_1'] +@pytest.mark.asyncio +async def test_agent_run_admin_can_list_runner_registry_without_run_id(db_engine): + runner_registry = FakeRunnerRegistry( + [ + { + 'id': 'plugin:test/runner/default', + 'source': 'plugin', + 'plugin_author': 'test', + 'plugin_name': 'runner', + 'runner_name': 'default', + 'label': {'en_US': 'Default'}, + } + ] + ) + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'langbot/control', + 'permissions': ['agent_run:admin'], + } + ], + runner_registry=runner_registry, + ) + runner_list = handler.actions['runner_list'] + + result = await runner_list( + { + 'caller_plugin_identity': 'langbot/control', + 'include_plugins': ['test/runner'], + } + ) + + assert result.code == 0 + assert result.data['items'][0]['id'] == 'plugin:test/runner/default' + assert runner_registry.calls == [ + { + 'bound_plugins': ['test/runner'], + 'use_cache': True, + } + ] + + +@pytest.mark.asyncio +async def test_unconfigured_plugin_cannot_list_runner_registry(db_engine): + handler = _handler(db_engine, runner_registry=FakeRunnerRegistry([])) + runner_list = handler.actions['runner_list'] + + result = await runner_list({'caller_plugin_identity': 'test/runner'}) + + assert result.code != 0 + assert 'not authorized' in result.message.lower() + + @pytest.mark.asyncio async def test_agent_run_admin_can_get_and_page_cross_scope_with_own_run_session(session_registry, db_engine): await _register_session(session_registry, available_apis={}) @@ -504,6 +573,13 @@ async def test_agent_run_admin_can_cancel_cross_scope_with_own_run_session(sessi assert run.run_id == 'run_other' assert run.cancel_requested_at is not None assert run.status_reason == 'admin requested' + events, _next_cursor, _prev_cursor, _has_more = await RunLedgerStore(db_engine).page_run_events( + run_id='run_other', + ) + assert [event['type'] for event in events] == ['message.completed', 'admin.run_cancel'] + assert events[1]['source'] == 'host' + assert events[1]['data']['caller_plugin_identity'] == 'test/runner' + assert events[1]['metadata'] == {'permission': 'agent_run:admin'} @pytest.mark.asyncio @@ -682,6 +758,80 @@ async def test_runtime_admin_can_register_list_and_claim_without_run_id(db_engin assert claimed.data['claimed_by_runtime_id'] == 'runtime_1' +@pytest.mark.asyncio +async def test_runtime_admin_can_reconcile_without_run_id(db_engine): + store = RunLedgerStore(db_engine) + await store.register_runtime( + runtime_id='runtime_stale', + display_name='Runtime Stale', + heartbeat_deadline_seconds=60, + ) + await store.create_run( + run_id='claimed_run', + event_id='evt_claimed', + binding_id='binding_1', + runner_id='plugin:other/runner/default', + status='queued', + queue_name='default', + ) + claim = await store.claim_next_run(runtime_id='runtime_stale', queue_name='default', lease_seconds=60) + assert claim is not None + + session_factory = sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) + expired_at = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=1) + async with session_factory() as session: + await session.execute( + sqlalchemy.update(agent_run_model.AgentRun) + .where(agent_run_model.AgentRun.run_id == 'claimed_run') + .values(claim_lease_expires_at=expired_at) + ) + await session.execute( + sqlalchemy.update(agent_run_model.AgentRuntime) + .where(agent_run_model.AgentRuntime.runtime_id == 'runtime_stale') + .values( + last_heartbeat_at=expired_at, + heartbeat_deadline_at=expired_at, + ) + ) + await session.commit() + + handler = _handler( + db_engine, + admin_plugins=[ + { + 'identity': 'langbot/control', + 'permissions': ['runtime:admin'], + } + ], + ) + runtime_reconcile = handler.actions['runtime_reconcile'] + + result = await runtime_reconcile({'caller_plugin_identity': 'langbot/control'}) + + assert result.code == 0 + assert result.data['stale_count'] == 1 + assert result.data['released_claim_count'] == 1 + assert result.data['stale_runtimes'][0]['runtime_id'] == 'runtime_stale' + assert result.data['released_claims'][0]['run_id'] == 'claimed_run' + assert (await store.get_runtime('runtime_stale'))['status'] == 'stale' + released_run = await store.get_run('claimed_run') + assert released_run is not None + assert released_run['status'] == 'queued' + assert released_run['claimed_by_runtime_id'] is None + assert released_run['claim_token'] is None + + +@pytest.mark.asyncio +async def test_unconfigured_plugin_cannot_reconcile_runtime(db_engine): + handler = _handler(db_engine) + runtime_reconcile = handler.actions['runtime_reconcile'] + + result = await runtime_reconcile({'caller_plugin_identity': 'test/runner'}) + + assert result.code != 0 + assert 'not authorized' in result.message.lower() + + @pytest.mark.asyncio async def test_disabled_admin_plugin_entry_does_not_grant_access(session_registry, db_engine): await _register_session(session_registry, available_apis={}) diff --git a/tests/unit_tests/agent/test_run_ledger_store.py b/tests/unit_tests/agent/test_run_ledger_store.py index 84c35f3f..d8d51d17 100644 --- a/tests/unit_tests/agent/test_run_ledger_store.py +++ b/tests/unit_tests/agent/test_run_ledger_store.py @@ -124,6 +124,84 @@ async def test_expired_claim_can_be_reclaimed(store, db_engine): assert reclaimed['dispatch_attempts'] == 2 +@pytest.mark.asyncio +async def test_release_expired_claims_requeues_runs(store, db_engine): + await store.create_run( + run_id='run-expired-release', + event_id='evt-3', + binding_id='binding-1', + runner_id='runner-a', + status='queued', + queue_name='default', + ) + await store.create_run( + run_id='run-active-claim', + event_id='evt-4', + binding_id='binding-1', + runner_id='runner-a', + status='queued', + queue_name='default', + ) + expired_claim = await store.claim_next_run(runtime_id='runtime-a', queue_name='default', lease_seconds=60) + active_claim = await store.claim_next_run(runtime_id='runtime-b', queue_name='default', lease_seconds=60) + assert expired_claim is not None + assert active_claim is not None + + session_factory = sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) + async with session_factory() as session: + await session.execute( + sqlalchemy.update(AgentRun) + .where(AgentRun.run_id == 'run-expired-release') + .values(claim_lease_expires_at=datetime.datetime.now(UTC) - datetime.timedelta(seconds=1)) + ) + await session.commit() + + released = await store.release_expired_claims() + + assert [run['run_id'] for run in released] == ['run-expired-release'] + assert released[0]['status'] == 'queued' + assert released[0]['status_reason'] == 'claim lease expired' + assert released[0]['claimed_by_runtime_id'] is None + assert released[0]['claim_token'] is None + assert released[0]['claim_lease_expires_at'] is None + + active = await store.get_run('run-active-claim') + assert active is not None + assert active['status'] == 'claimed' + assert active['claim_token'] == active_claim['claim_token'] + + +@pytest.mark.asyncio +async def test_append_audit_event_uses_next_sequence(store): + await store.create_run( + run_id='run-audit', + event_id='evt-5', + binding_id='binding-1', + runner_id='runner-a', + ) + await store.append_event( + run_id='run-audit', + sequence=1, + event_type='message.completed', + data={'ok': True}, + ) + + event = await store.append_audit_event( + run_id='run-audit', + event_type='admin.run_cancel', + data={'action': 'run_cancel'}, + metadata={'permission': 'agent_run:admin'}, + ) + + assert event is not None + assert event['sequence'] == 2 + assert event['type'] == 'admin.run_cancel' + assert event['source'] == 'host' + assert event['data'] == {'action': 'run_cancel'} + assert event['metadata'] == {'permission': 'agent_run:admin'} + assert await store.append_audit_event(run_id='missing', event_type='admin.missing') is None + + @pytest.mark.asyncio async def test_runtime_register_heartbeat_list_and_mark_stale(store): registered = await store.register_runtime(