feat(agent-runner): add admin reconcile primitives

This commit is contained in:
huanghuoguoguo
2026-06-15 19:42:33 +08:00
parent aa4fdd1144
commit e9dd7f423d
4 changed files with 601 additions and 5 deletions
@@ -211,6 +211,44 @@ class RunLedgerStore:
await session.commit()
return self._run_to_dict(run)
async def release_expired_claims(
self,
*,
now: datetime.datetime | None = None,
status: str = 'queued',
status_reason: str = 'claim lease expired',
limit: int = 100,
) -> list[dict[str, typing.Any]]:
"""Release claimed runs whose claim lease has expired."""
current_time = now or _utc_now()
if current_time.tzinfo is None:
current_time = current_time.replace(tzinfo=UTC)
limit = min(max(int(limit), 1), 500)
async with self._session_factory() as session:
result = await session.execute(
sqlalchemy.select(AgentRun)
.where(
AgentRun.status == 'claimed',
AgentRun.claim_lease_expires_at.is_not(None),
AgentRun.claim_lease_expires_at <= current_time,
)
.order_by(AgentRun.claim_lease_expires_at.asc(), AgentRun.id.asc())
.limit(limit)
)
runs = result.scalars().all()
for run in runs:
run.status = status
run.status_reason = status_reason
run.claimed_by_runtime_id = None
run.claim_token = None
run.claim_lease_expires_at = None
run.updated_at = current_time
if status in TERMINAL_STATUSES:
run.finished_at = run.finished_at or current_time
await session.commit()
return [self._run_to_dict(run) for run in runs]
async def append_event(
self,
*,
@@ -254,6 +292,41 @@ class RunLedgerStore:
await session.commit()
return self._event_to_dict(row)
async def append_audit_event(
self,
*,
run_id: str,
event_type: str,
data: dict[str, typing.Any] | None = None,
metadata: dict[str, typing.Any] | None = None,
) -> dict[str, typing.Any] | None:
"""Append a Host-authored audit event after the current max sequence."""
async with self._session_factory() as session:
run = await self._get_run_row(session, run_id)
if run is None:
return None
result = await session.execute(
sqlalchemy.select(sqlalchemy.func.max(AgentRunEvent.sequence)).where(
AgentRunEvent.run_id == run_id,
)
)
next_sequence = int(result.scalar_one_or_none() or 0) + 1
row = AgentRunEvent(
run_id=run_id,
sequence=next_sequence,
type=event_type,
data_json=_json_dumps(data or {}),
usage_json=None,
created_at=_utc_now(),
source='host',
artifact_refs_json=_json_dumps([]),
metadata_json=_json_dumps(metadata or {}),
)
session.add(row)
await session.commit()
return self._event_to_dict(row)
async def finalize_run(
self,
*,
@@ -410,16 +483,34 @@ class RunLedgerStore:
*,
now: datetime.datetime | None = None,
stale_status: str = 'stale',
stale_after_seconds: int | float | None = None,
) -> list[dict[str, typing.Any]]:
"""Mark runtimes stale when their heartbeat deadline has passed."""
current_time = now or _utc_now()
if current_time.tzinfo is None:
current_time = current_time.replace(tzinfo=UTC)
stale_conditions: list[typing.Any] = [
sqlalchemy.and_(
AgentRuntime.heartbeat_deadline_at.is_not(None),
AgentRuntime.heartbeat_deadline_at < current_time,
)
]
if stale_after_seconds is not None:
try:
stale_after_delta = datetime.timedelta(seconds=max(float(stale_after_seconds), 0))
except (TypeError, ValueError):
stale_after_delta = None
if stale_after_delta is not None:
stale_conditions.append(
sqlalchemy.and_(
AgentRuntime.last_heartbeat_at.is_not(None),
AgentRuntime.last_heartbeat_at < current_time - stale_after_delta,
)
)
async with self._session_factory() as session:
result = await session.execute(
sqlalchemy.select(AgentRuntime).where(
AgentRuntime.heartbeat_deadline_at.is_not(None),
AgentRuntime.heartbeat_deadline_at < current_time,
sqlalchemy.or_(*stale_conditions),
AgentRuntime.status != stale_status,
)
)
+277
View File
@@ -495,6 +495,67 @@ def _project_event_record_for_api(event: dict[str, Any]) -> dict[str, Any]:
}
def _project_runner_descriptor_for_api(descriptor: Any) -> dict[str, Any]:
"""Project an AgentRunnerDescriptor-like object onto a JSON dict."""
if isinstance(descriptor, dict):
return dict(descriptor)
if hasattr(descriptor, 'model_dump'):
return descriptor.model_dump(mode='json')
return {
'id': getattr(descriptor, 'id', None),
'source': getattr(descriptor, 'source', None),
'label': getattr(descriptor, 'label', {}),
'description': getattr(descriptor, 'description', None),
'plugin_author': getattr(descriptor, 'plugin_author', None),
'plugin_name': getattr(descriptor, 'plugin_name', None),
'runner_name': getattr(descriptor, 'runner_name', None),
'plugin_version': getattr(descriptor, 'plugin_version', None),
'config_schema': getattr(descriptor, 'config_schema', []),
'capabilities': getattr(descriptor, 'capabilities', {}),
'permissions': getattr(descriptor, 'permissions', {}),
'raw_manifest': getattr(descriptor, 'raw_manifest', {}),
}
async def _record_agent_runner_admin_action(
ap: app.Application,
store: Any,
*,
action: str,
caller_plugin_identity: str | None,
permission: str,
durable_run_id: str | None = None,
target_runtime_id: str | None = None,
detail: dict[str, Any] | None = None,
) -> None:
"""Record a small audit trail for privileged AgentRunner operations."""
audit_data: dict[str, Any] = {
'action': action,
'caller_plugin_identity': caller_plugin_identity,
'permission': permission,
}
if durable_run_id:
audit_data['target_run_id'] = durable_run_id
if target_runtime_id:
audit_data['target_runtime_id'] = target_runtime_id
if detail:
audit_data['detail'] = detail
ap.logger.info('Agent runner admin action: %s', audit_data)
if not durable_run_id or store is None or not hasattr(store, 'append_audit_event'):
return
try:
await store.append_audit_event(
run_id=str(durable_run_id),
event_type=f'admin.{action}',
data=audit_data,
metadata={'permission': permission},
)
except Exception as exc:
ap.logger.warning(f'Failed to record AgentRunner admin audit event: {exc}', exc_info=True)
def _normalize_uuid_list(values: Any) -> list[str]:
"""Normalize a user/config supplied UUID list while preserving order."""
if not isinstance(values, list):
@@ -2056,6 +2117,15 @@ class RuntimeConnectionHandler(handler.Handler):
return handler.ActionResponse.error(message=f'Run {target_run_id} not found')
if not is_admin and not _run_matches_run_scope(session, run):
return handler.ActionResponse.error(message=f'Run {target_run_id} is not accessible by this run')
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='run_get',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
detail={'target_run_id': str(target_run_id)},
)
return handler.ActionResponse.success(data=run)
except Exception as e:
self.ap.logger.error(f'RUN_GET error: {e}', exc_info=True)
@@ -2131,6 +2201,18 @@ class RuntimeConnectionHandler(handler.Handler):
limit=limit,
**scope_filters,
)
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='run_list',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
detail={
'statuses': [str(status) for status in statuses] if statuses else None,
'limit': limit,
},
)
return handler.ActionResponse.success(
data={
'items': items,
@@ -2143,6 +2225,65 @@ class RuntimeConnectionHandler(handler.Handler):
self.ap.logger.error(f'RUN_LIST error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Run list error: {e}')
@self.action(_plugin_runtime_action('RUNNER_LIST', 'runner_list'))
async def runner_list(data: dict[str, Any]) -> handler.ActionResponse:
"""List Host-discovered AgentRunner descriptors."""
run_id = data.get('run_id')
caller_plugin_identity = data.get('caller_plugin_identity')
is_admin = _has_agent_runner_admin_permission(
self.ap,
caller_plugin_identity,
AGENT_RUN_ADMIN_PERMISSION,
)
if not is_admin:
return handler.ActionResponse.error(message='Runner list access not authorized')
_session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'Runner list',
api_capability='runner_list',
allow_persistent_authorization=True,
admin_permission=AGENT_RUN_ADMIN_PERMISSION,
)
if error:
return error
include_plugins = data.get('include_plugins')
if include_plugins is not None and not isinstance(include_plugins, list):
return handler.ActionResponse.error(message='include_plugins must be a list')
registry = getattr(self.ap, 'agent_runner_registry', None)
if registry is None:
return handler.ActionResponse.success(data={'items': []})
try:
runners = await registry.list_runners(
bound_plugins=[str(item) for item in include_plugins] if include_plugins else None,
use_cache=bool(data.get('use_cache', True)),
)
items = [_project_runner_descriptor_for_api(item) for item in runners]
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
None,
action='runner_list',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
detail={
'include_plugins': [str(item) for item in include_plugins]
if include_plugins
else None,
'count': len(items),
},
)
return handler.ActionResponse.success(data={'items': items})
except Exception as e:
self.ap.logger.error(f'RUNNER_LIST error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Runner list error: {e}')
@self.action(_plugin_runtime_action('RUN_EVENTS_PAGE', 'run_events_page'))
async def run_events_page(data: dict[str, Any]) -> handler.ActionResponse:
"""Page result events for one Host-owned run visible to current run."""
@@ -2200,6 +2341,15 @@ class RuntimeConnectionHandler(handler.Handler):
limit=limit,
direction=str(direction or 'forward'),
)
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='run_events_page',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
detail={'target_run_id': str(target_run_id), 'limit': limit},
)
return handler.ActionResponse.success(
data={
'items': items,
@@ -2258,6 +2408,16 @@ class RuntimeConnectionHandler(handler.Handler):
)
if not updated:
return handler.ActionResponse.error(message=f'Run {target_run_id} not found')
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='run_cancel',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=str(target_run_id),
detail={'status_reason': data.get('status_reason') or data.get('reason')},
)
return handler.ActionResponse.success(data=updated)
except Exception as e:
self.ap.logger.error(f'RUN_CANCEL error: {e}', exc_info=True)
@@ -2328,6 +2488,16 @@ class RuntimeConnectionHandler(handler.Handler):
artifact_refs=artifact_refs,
metadata=metadata,
)
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='run_append_result',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=str(target_run_id),
detail={'event_type': str(event_type), 'sequence': sequence},
)
return handler.ActionResponse.success(data=event)
except Exception as e:
self.ap.logger.error(f'RUN_APPEND_RESULT error: {e}', exc_info=True)
@@ -2386,6 +2556,16 @@ class RuntimeConnectionHandler(handler.Handler):
)
if not updated:
return handler.ActionResponse.error(message=f'Run {target_run_id} not found')
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='run_finalize',
caller_plugin_identity=caller_plugin_identity,
permission=AGENT_RUN_ADMIN_PERMISSION,
durable_run_id=str(target_run_id),
detail={'status': str(status)},
)
return handler.ActionResponse.success(data=updated)
except Exception as e:
self.ap.logger.error(f'RUN_FINALIZE error: {e}', exc_info=True)
@@ -2435,6 +2615,16 @@ class RuntimeConnectionHandler(handler.Handler):
metadata=data.get('metadata') if isinstance(data.get('metadata'), dict) else {},
heartbeat_deadline_seconds=_deadline_seconds_from_payload(data),
)
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='runtime_register',
caller_plugin_identity=caller_plugin_identity,
permission=RUNTIME_ADMIN_PERMISSION,
target_runtime_id=str(runtime_id),
detail={'status': runtime.get('status')},
)
return handler.ActionResponse.success(data=runtime)
except Exception as e:
self.ap.logger.error(f'RUNTIME_REGISTER error: {e}', exc_info=True)
@@ -2483,6 +2673,16 @@ class RuntimeConnectionHandler(handler.Handler):
)
if runtime is None:
return handler.ActionResponse.error(message=f'Runtime {runtime_id} not found')
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='runtime_heartbeat',
caller_plugin_identity=caller_plugin_identity,
permission=RUNTIME_ADMIN_PERMISSION,
target_runtime_id=str(runtime_id),
detail={'status': runtime.get('status')},
)
return handler.ActionResponse.success(data=runtime)
except Exception as e:
self.ap.logger.error(f'RUNTIME_HEARTBEAT error: {e}', exc_info=True)
@@ -2533,6 +2733,18 @@ class RuntimeConnectionHandler(handler.Handler):
for runtime in runtimes
if all(runtime.get('labels', {}).get(key) == value for key, value in labels.items())
]
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='runtime_list',
caller_plugin_identity=caller_plugin_identity,
permission=RUNTIME_ADMIN_PERMISSION,
detail={
'statuses': [str(status) for status in statuses] if statuses else None,
'limit': data.get('limit', 50),
},
)
return handler.ActionResponse.success(
data={
'items': runtimes,
@@ -2545,6 +2757,71 @@ class RuntimeConnectionHandler(handler.Handler):
self.ap.logger.error(f'RUNTIME_LIST error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Runtime list error: {e}')
@self.action(_plugin_runtime_action('RUNTIME_RECONCILE', 'runtime_reconcile'))
async def runtime_reconcile(data: dict[str, Any]) -> handler.ActionResponse:
"""Reconcile stale runtime heartbeats and expired claim leases."""
run_id = data.get('run_id')
caller_plugin_identity = data.get('caller_plugin_identity')
is_admin = _has_agent_runner_admin_permission(
self.ap,
caller_plugin_identity,
RUNTIME_ADMIN_PERMISSION,
)
if not is_admin:
return handler.ActionResponse.error(message='Runtime reconcile access not authorized')
_session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'Runtime reconcile',
api_capability='runtime_reconcile',
admin_permission=RUNTIME_ADMIN_PERMISSION,
)
if error:
return error
stale_after_seconds = data.get('stale_after_seconds')
if stale_after_seconds is not None:
try:
stale_after_seconds = max(float(stale_after_seconds), 0)
except (TypeError, ValueError):
return handler.ActionResponse.error(message='stale_after_seconds must be a number')
from ..agent.runner.run_ledger_store import RunLedgerStore
store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine())
try:
stale_runtimes = await store.mark_stale_runtimes(
stale_after_seconds=stale_after_seconds,
)
released_claims = await store.release_expired_claims()
if is_admin:
await _record_agent_runner_admin_action(
self.ap,
store,
action='runtime_reconcile',
caller_plugin_identity=caller_plugin_identity,
permission=RUNTIME_ADMIN_PERMISSION,
detail={
'stale_count': len(stale_runtimes),
'released_claim_count': len(released_claims),
},
)
return handler.ActionResponse.success(
data={
'stale_runtimes': stale_runtimes,
'released_claims': released_claims,
'stale_count': len(stale_runtimes),
'released_claim_count': len(released_claims),
}
)
except Exception as e:
self.ap.logger.error(f'RUNTIME_RECONCILE error: {e}', exc_info=True)
return handler.ActionResponse.error(message=f'Runtime reconcile error: {e}')
@self.action(_plugin_runtime_action('RUN_CLAIM', 'run_claim'))
async def run_claim(data: dict[str, Any]) -> handler.ActionResponse:
"""Claim one queued run for a runtime lease."""