mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-14 09:46:03 +00:00
fix(agent-runner): harden state and event APIs
This commit is contained in:
@@ -12,6 +12,9 @@ from datetime import datetime
|
||||
import sqlalchemy
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine
|
||||
from sqlalchemy import select, delete, update
|
||||
from sqlalchemy.dialects.postgresql import insert as postgresql_insert
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from .descriptor import AgentRunnerDescriptor
|
||||
from .host_models import AgentEventEnvelope, AgentBinding
|
||||
@@ -87,6 +90,49 @@ class PersistentStateStore:
|
||||
|
||||
return json_str, None
|
||||
|
||||
async def _upsert_state_row(
|
||||
self,
|
||||
conn: typing.Any,
|
||||
values: dict[str, typing.Any],
|
||||
) -> None:
|
||||
"""Insert or update a state row by the logical scope/key identity."""
|
||||
update_values = {
|
||||
'value_json': values['value_json'],
|
||||
'updated_at': values['updated_at'],
|
||||
}
|
||||
constraint_columns = ['scope_key', 'state_key']
|
||||
dialect_name = self._db_engine.dialect.name
|
||||
|
||||
if dialect_name == 'sqlite':
|
||||
stmt = sqlite_insert(AgentRunnerState).values(**values)
|
||||
await conn.execute(
|
||||
stmt.on_conflict_do_update(
|
||||
index_elements=constraint_columns,
|
||||
set_=update_values,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
if dialect_name == 'postgresql':
|
||||
stmt = postgresql_insert(AgentRunnerState).values(**values)
|
||||
await conn.execute(
|
||||
stmt.on_conflict_do_update(
|
||||
index_elements=constraint_columns,
|
||||
set_=update_values,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
await conn.execute(sqlalchemy.insert(AgentRunnerState).values(**values))
|
||||
except IntegrityError:
|
||||
await conn.execute(
|
||||
update(AgentRunnerState)
|
||||
.where(AgentRunnerState.scope_key == values['scope_key'])
|
||||
.where(AgentRunnerState.state_key == values['state_key'])
|
||||
.values(**update_values)
|
||||
)
|
||||
|
||||
# ========== Async DB Operations ==========
|
||||
|
||||
async def build_snapshot_from_event(
|
||||
@@ -195,49 +241,29 @@ class PersistentStateStore:
|
||||
# Build context fields
|
||||
binding_identity = get_binding_identity(binding)
|
||||
|
||||
now = datetime.utcnow()
|
||||
async with self._db_engine.begin() as conn:
|
||||
# Check if entry exists
|
||||
result = await conn.execute(
|
||||
select(AgentRunnerState.id)
|
||||
.where(AgentRunnerState.scope_key == scope_key)
|
||||
.where(AgentRunnerState.state_key == key)
|
||||
await self._upsert_state_row(
|
||||
conn,
|
||||
{
|
||||
'runner_id': descriptor.id,
|
||||
'binding_identity': binding_identity,
|
||||
'scope': scope,
|
||||
'scope_key': scope_key,
|
||||
'state_key': key,
|
||||
'value_json': value_json,
|
||||
'bot_id': event.bot_id,
|
||||
'workspace_id': event.workspace_id,
|
||||
'conversation_id': event.conversation_id,
|
||||
'thread_id': event.thread_id,
|
||||
'actor_type': event.actor.actor_type if event.actor else None,
|
||||
'actor_id': event.actor.actor_id if event.actor else None,
|
||||
'subject_type': event.subject.subject_type if event.subject else None,
|
||||
'subject_id': event.subject.subject_id if event.subject else None,
|
||||
'created_at': now,
|
||||
'updated_at': now,
|
||||
},
|
||||
)
|
||||
existing = result.first()
|
||||
|
||||
now = datetime.utcnow()
|
||||
|
||||
if existing:
|
||||
# Update existing entry
|
||||
await conn.execute(
|
||||
update(AgentRunnerState)
|
||||
.where(AgentRunnerState.id == existing.id)
|
||||
.values(
|
||||
value_json=value_json,
|
||||
updated_at=now,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Insert new entry
|
||||
await conn.execute(
|
||||
sqlalchemy.insert(AgentRunnerState).values(
|
||||
runner_id=descriptor.id,
|
||||
binding_identity=binding_identity,
|
||||
scope=scope,
|
||||
scope_key=scope_key,
|
||||
state_key=key,
|
||||
value_json=value_json,
|
||||
bot_id=event.bot_id,
|
||||
workspace_id=event.workspace_id,
|
||||
conversation_id=event.conversation_id,
|
||||
thread_id=event.thread_id,
|
||||
actor_type=event.actor.actor_type if event.actor else None,
|
||||
actor_id=event.actor.actor_id if event.actor else None,
|
||||
subject_type=event.subject.subject_type if event.subject else None,
|
||||
subject_id=event.subject.subject_id if event.subject else None,
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
)
|
||||
|
||||
return True, None
|
||||
|
||||
@@ -293,49 +319,29 @@ class PersistentStateStore:
|
||||
|
||||
context = context or {}
|
||||
|
||||
now = datetime.utcnow()
|
||||
async with self._db_engine.begin() as conn:
|
||||
# Check if entry exists
|
||||
result = await conn.execute(
|
||||
select(AgentRunnerState.id)
|
||||
.where(AgentRunnerState.scope_key == scope_key)
|
||||
.where(AgentRunnerState.state_key == state_key)
|
||||
await self._upsert_state_row(
|
||||
conn,
|
||||
{
|
||||
'runner_id': runner_id,
|
||||
'binding_identity': binding_identity,
|
||||
'scope': scope,
|
||||
'scope_key': scope_key,
|
||||
'state_key': state_key,
|
||||
'value_json': value_json,
|
||||
'bot_id': context.get('bot_id'),
|
||||
'workspace_id': context.get('workspace_id'),
|
||||
'conversation_id': context.get('conversation_id'),
|
||||
'thread_id': context.get('thread_id'),
|
||||
'actor_type': context.get('actor_type'),
|
||||
'actor_id': context.get('actor_id'),
|
||||
'subject_type': context.get('subject_type'),
|
||||
'subject_id': context.get('subject_id'),
|
||||
'created_at': now,
|
||||
'updated_at': now,
|
||||
},
|
||||
)
|
||||
existing = result.first()
|
||||
|
||||
now = datetime.utcnow()
|
||||
|
||||
if existing:
|
||||
# Update existing entry
|
||||
await conn.execute(
|
||||
update(AgentRunnerState)
|
||||
.where(AgentRunnerState.id == existing.id)
|
||||
.values(
|
||||
value_json=value_json,
|
||||
updated_at=now,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Insert new entry
|
||||
await conn.execute(
|
||||
sqlalchemy.insert(AgentRunnerState).values(
|
||||
runner_id=runner_id,
|
||||
binding_identity=binding_identity,
|
||||
scope=scope,
|
||||
scope_key=scope_key,
|
||||
state_key=state_key,
|
||||
value_json=value_json,
|
||||
bot_id=context.get('bot_id'),
|
||||
workspace_id=context.get('workspace_id'),
|
||||
conversation_id=context.get('conversation_id'),
|
||||
thread_id=context.get('thread_id'),
|
||||
actor_type=context.get('actor_type'),
|
||||
actor_id=context.get('actor_id'),
|
||||
subject_type=context.get('subject_type'),
|
||||
subject_id=context.get('subject_id'),
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
)
|
||||
|
||||
return True, None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user