mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-19 12:04:21 +00:00
413 lines
15 KiB
Python
413 lines
15 KiB
Python
"""Run-side effects for AgentRunner executions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import typing
|
|
|
|
from ...core import app
|
|
from .descriptor import AgentRunnerDescriptor
|
|
from .errors import RunnerProtocolError
|
|
from .host_models import AgentBinding, AgentEventEnvelope
|
|
from .persistent_state_store import PersistentStateStore, get_persistent_state_store
|
|
from .run_ledger_store import RunLedgerStore
|
|
|
|
|
|
class AgentRunJournal:
|
|
"""Persist run events, transcript records, and state updates."""
|
|
|
|
ap: app.Application
|
|
|
|
_persistent_state_store: PersistentStateStore | None
|
|
_run_ledger_store: RunLedgerStore | None
|
|
|
|
def __init__(self, ap: app.Application):
|
|
self.ap = ap
|
|
self._persistent_state_store = None
|
|
self._run_ledger_store = None
|
|
|
|
def _get_run_ledger_store(self) -> RunLedgerStore:
|
|
if self._run_ledger_store is None:
|
|
self._run_ledger_store = RunLedgerStore(self.ap.persistence_mgr.get_db_engine())
|
|
return self._run_ledger_store
|
|
|
|
@staticmethod
|
|
def _to_plain_dict(value: typing.Any) -> dict[str, typing.Any]:
|
|
if hasattr(value, 'model_dump'):
|
|
value = value.model_dump(mode='json')
|
|
if isinstance(value, dict):
|
|
return dict(value)
|
|
return {}
|
|
|
|
@classmethod
|
|
def _sanitize_content_item(cls, value: typing.Any) -> typing.Any:
|
|
item = cls._to_plain_dict(value)
|
|
if not item:
|
|
return value
|
|
item_type = item.get('type')
|
|
if item_type == 'image_base64' and item.get('image_base64'):
|
|
item['image_base64'] = None
|
|
item['content_redacted'] = True
|
|
elif item_type == 'file_base64' and item.get('file_base64'):
|
|
item['file_base64'] = None
|
|
item['content_redacted'] = True
|
|
return item
|
|
|
|
@classmethod
|
|
def _sanitize_attachment_ref(cls, value: typing.Any) -> dict[str, typing.Any]:
|
|
item = cls._to_plain_dict(value)
|
|
if item.get('content'):
|
|
item['content'] = None
|
|
item['content_redacted'] = True
|
|
return item
|
|
|
|
@classmethod
|
|
def _sanitize_contents(cls, contents: typing.Iterable[typing.Any]) -> list[typing.Any]:
|
|
return [cls._sanitize_content_item(content) for content in contents]
|
|
|
|
@classmethod
|
|
def _sanitize_attachments(cls, attachments: typing.Iterable[typing.Any]) -> list[dict[str, typing.Any]]:
|
|
return [cls._sanitize_attachment_ref(attachment) for attachment in attachments]
|
|
|
|
async def create_run(
|
|
self,
|
|
*,
|
|
event: AgentEventEnvelope,
|
|
binding: AgentBinding,
|
|
descriptor: AgentRunnerDescriptor,
|
|
context: dict[str, typing.Any],
|
|
authorization: dict[str, typing.Any],
|
|
) -> dict[str, typing.Any]:
|
|
"""Create the Host-owned run ledger record."""
|
|
runtime = context.get('runtime') if isinstance(context, dict) else {}
|
|
return await self._get_run_ledger_store().create_run(
|
|
run_id=context['run_id'],
|
|
event_id=event.event_id,
|
|
binding_id=binding.binding_id,
|
|
runner_id=descriptor.id,
|
|
conversation_id=event.conversation_id,
|
|
thread_id=event.thread_id,
|
|
workspace_id=event.workspace_id,
|
|
bot_id=event.bot_id,
|
|
deadline_at=runtime.get('deadline_at') if isinstance(runtime, dict) else None,
|
|
authorization=authorization,
|
|
metadata={
|
|
'event_type': event.event_type,
|
|
'source': event.source,
|
|
},
|
|
)
|
|
|
|
async def append_run_result(
|
|
self,
|
|
*,
|
|
result_dict: dict[str, typing.Any],
|
|
run_id: str,
|
|
sequence: int,
|
|
source: str = 'runner',
|
|
metadata: dict[str, typing.Any] | None = None,
|
|
) -> dict[str, typing.Any]:
|
|
"""Persist one AgentRunResult in the run ledger."""
|
|
usage = result_dict.get('usage')
|
|
if hasattr(usage, 'model_dump'):
|
|
usage = usage.model_dump(mode='json')
|
|
return await self._get_run_ledger_store().append_event(
|
|
run_id=run_id,
|
|
sequence=sequence,
|
|
event_type=str(result_dict.get('type') or 'unknown'),
|
|
data=result_dict.get('data') if isinstance(result_dict.get('data'), dict) else {},
|
|
usage=usage if isinstance(usage, dict) else None,
|
|
source=source,
|
|
metadata=metadata,
|
|
)
|
|
|
|
async def finalize_run(
|
|
self,
|
|
*,
|
|
run_id: str,
|
|
status: str,
|
|
status_reason: str | None = None,
|
|
usage: dict[str, typing.Any] | None = None,
|
|
metadata: dict[str, typing.Any] | None = None,
|
|
) -> dict[str, typing.Any] | None:
|
|
"""Finalize or update the Host-owned run ledger record."""
|
|
return await self._get_run_ledger_store().finalize_run(
|
|
run_id=run_id,
|
|
status=status,
|
|
status_reason=status_reason,
|
|
usage=usage,
|
|
metadata=metadata,
|
|
)
|
|
|
|
async def get_run(self, run_id: str) -> dict[str, typing.Any] | None:
|
|
"""Return the persisted run ledger record."""
|
|
return await self._get_run_ledger_store().get_run(run_id)
|
|
|
|
async def handle_state_updated_event(
|
|
self,
|
|
result_dict: dict[str, typing.Any],
|
|
event: AgentEventEnvelope,
|
|
binding: AgentBinding,
|
|
descriptor: AgentRunnerDescriptor,
|
|
run_id: str | None = None,
|
|
) -> None:
|
|
"""Handle state.updated result in event-first mode."""
|
|
data = result_dict.get('data', {})
|
|
|
|
result_run_id = result_dict.get('run_id')
|
|
if run_id and result_run_id and result_run_id != run_id:
|
|
raise RunnerProtocolError(
|
|
descriptor.id,
|
|
f'state.updated run_id mismatch: expected {run_id}, got {result_run_id}',
|
|
)
|
|
|
|
scope = data.get('scope')
|
|
if not scope:
|
|
raise RunnerProtocolError(
|
|
descriptor.id,
|
|
'state.updated missing required field: scope',
|
|
)
|
|
|
|
key = data.get('key')
|
|
value = data.get('value')
|
|
|
|
if not key:
|
|
raise RunnerProtocolError(
|
|
descriptor.id,
|
|
'state.updated missing required field: key',
|
|
)
|
|
|
|
if self._persistent_state_store is None:
|
|
self._persistent_state_store = get_persistent_state_store(self.ap.persistence_mgr.get_db_engine())
|
|
|
|
success, error = await self._persistent_state_store.apply_update_from_event(
|
|
event=event,
|
|
binding=binding,
|
|
descriptor=descriptor,
|
|
scope=scope,
|
|
key=key,
|
|
value=value,
|
|
logger=self.ap.logger,
|
|
)
|
|
|
|
if success:
|
|
self.ap.logger.debug(f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}')
|
|
elif error:
|
|
self.ap.logger.warning(f'Runner {descriptor.id} state.updated rejected: {error}')
|
|
|
|
async def write_event_log(
|
|
self,
|
|
event: AgentEventEnvelope,
|
|
binding: AgentBinding,
|
|
run_id: str,
|
|
runner_id: str,
|
|
metadata: dict[str, typing.Any] | None = None,
|
|
) -> str:
|
|
"""Write incoming event to EventLog."""
|
|
import datetime
|
|
|
|
from .event_log_store import EventLogStore
|
|
|
|
store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
|
|
|
|
input_summary = None
|
|
input_json = None
|
|
if event.input:
|
|
if event.input.text:
|
|
input_summary = event.input.text[:1000]
|
|
input_json = {
|
|
'text': event.input.text,
|
|
'contents': self._sanitize_contents(event.input.contents),
|
|
'attachments': self._sanitize_attachments(event.input.attachments),
|
|
}
|
|
|
|
return await store.append_event(
|
|
event_id=event.event_id,
|
|
event_type=event.event_type,
|
|
source=event.source,
|
|
bot_id=event.bot_id,
|
|
workspace_id=event.workspace_id,
|
|
conversation_id=event.conversation_id,
|
|
thread_id=event.thread_id,
|
|
actor_type=event.actor.actor_type if event.actor else None,
|
|
actor_id=event.actor.actor_id if event.actor else None,
|
|
actor_name=event.actor.actor_name if event.actor else None,
|
|
subject_type=event.subject.subject_type if event.subject else None,
|
|
subject_id=event.subject.subject_id if event.subject else None,
|
|
input_summary=input_summary,
|
|
input_json=input_json,
|
|
run_id=run_id,
|
|
runner_id=runner_id,
|
|
event_time=(
|
|
datetime.datetime.fromtimestamp(event.event_time, datetime.timezone.utc) if event.event_time else None
|
|
),
|
|
metadata=metadata,
|
|
)
|
|
|
|
async def write_user_transcript(
|
|
self,
|
|
event: AgentEventEnvelope,
|
|
event_log_id: str,
|
|
) -> None:
|
|
"""Write user message to Transcript."""
|
|
from .transcript_store import TranscriptStore
|
|
|
|
store = TranscriptStore(self.ap.persistence_mgr.get_db_engine())
|
|
|
|
content = event.input.text if event.input else None
|
|
content_json = None
|
|
if event.input:
|
|
content_json = {
|
|
'role': 'user',
|
|
'content': self._sanitize_contents(event.input.contents) if event.input.contents else [],
|
|
}
|
|
|
|
attachment_refs = []
|
|
if event.input and event.input.attachments:
|
|
for a in event.input.attachments:
|
|
attachment_refs.append(self._sanitize_attachment_ref(a))
|
|
|
|
await store.append_transcript(
|
|
transcript_id=None,
|
|
event_id=event_log_id,
|
|
conversation_id=event.conversation_id,
|
|
role='user',
|
|
bot_id=event.bot_id,
|
|
workspace_id=event.workspace_id,
|
|
content=content,
|
|
content_json=content_json,
|
|
attachment_refs=attachment_refs if attachment_refs else None,
|
|
thread_id=event.thread_id,
|
|
item_type='message',
|
|
metadata={
|
|
'actor_type': event.actor.actor_type if event.actor else None,
|
|
'actor_id': event.actor.actor_id if event.actor else None,
|
|
},
|
|
)
|
|
|
|
async def write_steering_dropped_audits(
|
|
self,
|
|
items: list[dict[str, typing.Any]],
|
|
run_id: str,
|
|
runner_id: str,
|
|
*,
|
|
reason: str = 'run_ended',
|
|
) -> None:
|
|
"""Write terminal audit events for steering items left unconsumed."""
|
|
if not items:
|
|
return
|
|
|
|
import datetime
|
|
import uuid
|
|
|
|
from .event_log_store import EventLogStore
|
|
|
|
store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
|
|
|
|
for item in items:
|
|
event = item.get('event') if isinstance(item.get('event'), dict) else {}
|
|
input_data = item.get('input') if isinstance(item.get('input'), dict) else {}
|
|
conversation = item.get('conversation') if isinstance(item.get('conversation'), dict) else {}
|
|
actor = item.get('actor') if isinstance(item.get('actor'), dict) else {}
|
|
subject = item.get('subject') if isinstance(item.get('subject'), dict) else {}
|
|
|
|
text = input_data.get('text')
|
|
input_summary = text[:1000] if isinstance(text, str) and text else 'Unconsumed steering input dropped'
|
|
event_time = None
|
|
raw_event_time = event.get('event_time')
|
|
if raw_event_time:
|
|
try:
|
|
event_time = datetime.datetime.fromtimestamp(
|
|
raw_event_time,
|
|
datetime.timezone.utc,
|
|
)
|
|
except (TypeError, ValueError, OSError):
|
|
event_time = None
|
|
|
|
await store.append_event(
|
|
event_id=str(uuid.uuid4()),
|
|
event_type='steering.dropped',
|
|
source='host',
|
|
bot_id=conversation.get('bot_id'),
|
|
workspace_id=conversation.get('workspace_id'),
|
|
conversation_id=conversation.get('conversation_id'),
|
|
thread_id=conversation.get('thread_id'),
|
|
actor_type=actor.get('actor_type'),
|
|
actor_id=actor.get('actor_id'),
|
|
actor_name=actor.get('actor_name'),
|
|
subject_type=subject.get('subject_type'),
|
|
subject_id=subject.get('subject_id'),
|
|
input_summary=input_summary,
|
|
input_json={
|
|
'text': text,
|
|
'contents': self._sanitize_contents(input_data.get('contents') or []),
|
|
'attachments': self._sanitize_attachments(input_data.get('attachments') or []),
|
|
},
|
|
run_id=run_id,
|
|
runner_id=runner_id,
|
|
event_time=event_time,
|
|
metadata={
|
|
'steering': {
|
|
'status': 'dropped',
|
|
'reason': reason,
|
|
'original_event_id': event.get('event_id'),
|
|
'claimed_run_id': item.get('claimed_run_id'),
|
|
'claimed_runner_id': item.get('runner_id'),
|
|
'claimed_at': item.get('claimed_at'),
|
|
},
|
|
},
|
|
)
|
|
|
|
async def write_assistant_transcript(
|
|
self,
|
|
result_dict: dict[str, typing.Any],
|
|
event: AgentEventEnvelope,
|
|
run_id: str,
|
|
runner_id: str,
|
|
) -> None:
|
|
"""Write assistant message to Transcript."""
|
|
import uuid
|
|
|
|
from .transcript_store import TranscriptStore
|
|
|
|
store = TranscriptStore(self.ap.persistence_mgr.get_db_engine())
|
|
|
|
data = result_dict.get('data', {})
|
|
message = data.get('message', {})
|
|
|
|
content = None
|
|
content_json = None
|
|
|
|
if isinstance(message.get('content'), str):
|
|
content = message['content']
|
|
content_json = message
|
|
elif isinstance(message.get('content'), list):
|
|
text_parts = []
|
|
for c in message['content']:
|
|
if isinstance(c, dict) and c.get('type') == 'text':
|
|
text_parts.append(c.get('text', ''))
|
|
content = ' '.join(text_parts) if text_parts else None
|
|
content_json = {
|
|
**message,
|
|
'content': self._sanitize_contents(message['content']),
|
|
}
|
|
|
|
assistant_event_id = str(uuid.uuid4())
|
|
|
|
await store.append_transcript(
|
|
transcript_id=str(uuid.uuid4()),
|
|
event_id=assistant_event_id,
|
|
conversation_id=event.conversation_id,
|
|
role='assistant',
|
|
bot_id=event.bot_id,
|
|
workspace_id=event.workspace_id,
|
|
content=content,
|
|
content_json=content_json,
|
|
thread_id=event.thread_id,
|
|
item_type='message',
|
|
run_id=run_id,
|
|
runner_id=runner_id,
|
|
metadata={
|
|
'run_id': run_id,
|
|
'runner_id': runner_id,
|
|
},
|
|
)
|