feat(agent-runner): add host run ledger primitives

This commit is contained in:
huanghuoguoguo
2026-06-15 18:09:05 +08:00
parent d073134c83
commit 9aa643b55f
12 changed files with 3088 additions and 425 deletions
+88 -15
View File
@@ -9,6 +9,7 @@ from .descriptor import AgentRunnerDescriptor
from .errors import RunnerProtocolError
from .host_models import AgentBinding, AgentEventEnvelope
from .persistent_state_store import PersistentStateStore, get_persistent_state_store
from .run_ledger_store import RunLedgerStore
# Maximum inline artifact content size (1MB)
@@ -21,10 +22,17 @@ class AgentRunJournal:
ap: app.Application
_persistent_state_store: PersistentStateStore | None
_run_ledger_store: RunLedgerStore | None
def __init__(self, ap: app.Application):
self.ap = ap
self._persistent_state_store = None
self._run_ledger_store = None
def _get_run_ledger_store(self) -> RunLedgerStore:
if self._run_ledger_store is None:
self._run_ledger_store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine())
return self._run_ledger_store
@staticmethod
def _to_plain_dict(value: typing.Any) -> dict[str, typing.Any]:
@@ -64,6 +72,81 @@ class AgentRunJournal:
def _sanitize_attachments(cls, attachments: typing.Iterable[typing.Any]) -> list[dict[str, typing.Any]]:
return [cls._sanitize_attachment_ref(attachment) for attachment in attachments]
async def create_run(
self,
*,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
context: dict[str, typing.Any],
authorization: dict[str, typing.Any],
) -> dict[str, typing.Any]:
"""Create the Host-owned run ledger record."""
runtime = context.get('runtime') if isinstance(context, dict) else {}
return await self._get_run_ledger_store().create_run(
run_id=context['run_id'],
event_id=event.event_id,
binding_id=binding.binding_id,
runner_id=descriptor.id,
conversation_id=event.conversation_id,
thread_id=event.thread_id,
workspace_id=event.workspace_id,
bot_id=event.bot_id,
deadline_at=runtime.get('deadline_at') if isinstance(runtime, dict) else None,
authorization=authorization,
metadata={
'event_type': event.event_type,
'source': event.source,
},
)
async def append_run_result(
self,
*,
result_dict: dict[str, typing.Any],
run_id: str,
sequence: int,
source: str = 'runner',
artifact_refs: list[dict[str, typing.Any]] | None = None,
metadata: dict[str, typing.Any] | None = None,
) -> dict[str, typing.Any]:
"""Persist one AgentRunResult in the run ledger."""
usage = result_dict.get('usage')
if hasattr(usage, 'model_dump'):
usage = usage.model_dump(mode='json')
return await self._get_run_ledger_store().append_event(
run_id=run_id,
sequence=sequence,
event_type=str(result_dict.get('type') or 'unknown'),
data=result_dict.get('data') if isinstance(result_dict.get('data'), dict) else {},
usage=usage if isinstance(usage, dict) else None,
source=source,
artifact_refs=artifact_refs,
metadata=metadata,
)
async def finalize_run(
self,
*,
run_id: str,
status: str,
status_reason: str | None = None,
usage: dict[str, typing.Any] | None = None,
metadata: dict[str, typing.Any] | None = None,
) -> dict[str, typing.Any] | None:
"""Finalize or update the Host-owned run ledger record."""
return await self._get_run_ledger_store().finalize_run(
run_id=run_id,
status=status,
status_reason=status_reason,
usage=usage,
metadata=metadata,
)
async def get_run(self, run_id: str) -> dict[str, typing.Any] | None:
"""Return the persisted run ledger record."""
return await self._get_run_ledger_store().get_run(run_id)
async def handle_state_updated_event(
self,
result_dict: dict[str, typing.Any],
@@ -99,9 +182,7 @@ class AgentRunJournal:
)
if self._persistent_state_store is None:
self._persistent_state_store = get_persistent_state_store(
self.ap.persistence_mgr.get_db_engine()
)
self._persistent_state_store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine())
success, error = await self._persistent_state_store.apply_update_from_event(
event=event,
@@ -114,13 +195,9 @@ class AgentRunJournal:
)
if success:
self.ap.logger.debug(
f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}'
)
self.ap.logger.debug(f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}')
elif error:
self.ap.logger.warning(
f'Runner {descriptor.id} state.updated rejected: {error}'
)
self.ap.logger.warning(f'Runner {descriptor.id} state.updated rejected: {error}')
async def write_event_log(
self,
@@ -166,9 +243,7 @@ class AgentRunJournal:
run_id=run_id,
runner_id=runner_id,
event_time=(
datetime.datetime.fromtimestamp(event.event_time, datetime.timezone.utc)
if event.event_time
else None
datetime.datetime.fromtimestamp(event.event_time, datetime.timezone.utc) if event.event_time else None
),
metadata=metadata,
)
@@ -239,9 +314,7 @@ class AgentRunJournal:
content=content,
)
except Exception as e:
self.ap.logger.warning(
f'Failed to register input artifact {artifact_id}: {e}'
)
self.ap.logger.warning(f'Failed to register input artifact {artifact_id}: {e}')
def decode_attachment_content(
self,