mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-18 03:34:20 +00:00
feat(agent-runner): add artifact store pull APIs
This commit is contained in:
committed by
huanghuoguoguo
parent
a31f910f10
commit
bec11e5a18
@@ -7,7 +7,8 @@ import typing
|
||||
import uuid
|
||||
|
||||
import sqlalchemy
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from ...entity.persistence.event_log import EventLog
|
||||
from ...entity.persistence.transcript import Transcript
|
||||
@@ -27,6 +28,9 @@ class EventLogStore:
|
||||
|
||||
def __init__(self, engine: AsyncEngine):
|
||||
self.engine = engine
|
||||
self._session_factory = sessionmaker(
|
||||
engine, class_=AsyncSession, expire_on_commit=False
|
||||
)
|
||||
|
||||
async def append_event(
|
||||
self,
|
||||
@@ -83,32 +87,31 @@ class EventLogStore:
|
||||
if input_summary and len(input_summary) > self.MAX_INPUT_SUMMARY_LENGTH:
|
||||
input_summary = input_summary[:self.MAX_INPUT_SUMMARY_LENGTH - 3] + "..."
|
||||
|
||||
async with self.engine.connect() as conn:
|
||||
await conn.execute(
|
||||
sqlalchemy.insert(EventLog).values(
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
event_time=event_time,
|
||||
source=source,
|
||||
bot_id=bot_id,
|
||||
workspace_id=workspace_id,
|
||||
conversation_id=conversation_id,
|
||||
thread_id=thread_id,
|
||||
actor_type=actor_type,
|
||||
actor_id=actor_id,
|
||||
actor_name=actor_name,
|
||||
subject_type=subject_type,
|
||||
subject_id=subject_id,
|
||||
input_summary=input_summary,
|
||||
input_json=json.dumps(input_json) if input_json else None,
|
||||
raw_ref=raw_ref,
|
||||
run_id=run_id,
|
||||
runner_id=runner_id,
|
||||
metadata_json=json.dumps(metadata) if metadata else None,
|
||||
created_at=datetime.datetime.utcnow(),
|
||||
)
|
||||
async with self._session_factory() as session:
|
||||
event = EventLog(
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
event_time=event_time,
|
||||
source=source,
|
||||
bot_id=bot_id,
|
||||
workspace_id=workspace_id,
|
||||
conversation_id=conversation_id,
|
||||
thread_id=thread_id,
|
||||
actor_type=actor_type,
|
||||
actor_id=actor_id,
|
||||
actor_name=actor_name,
|
||||
subject_type=subject_type,
|
||||
subject_id=subject_id,
|
||||
input_summary=input_summary,
|
||||
input_json=json.dumps(input_json) if input_json else None,
|
||||
raw_ref=raw_ref,
|
||||
run_id=run_id,
|
||||
runner_id=runner_id,
|
||||
metadata_json=json.dumps(metadata) if metadata else None,
|
||||
created_at=datetime.datetime.utcnow(),
|
||||
)
|
||||
await conn.commit()
|
||||
session.add(event)
|
||||
await session.commit()
|
||||
|
||||
return event_id
|
||||
|
||||
@@ -124,14 +127,14 @@ class EventLogStore:
|
||||
Returns:
|
||||
Event record as dict, or None if not found
|
||||
"""
|
||||
async with self.engine.connect() as conn:
|
||||
result = await conn.execute(
|
||||
async with self._session_factory() as session:
|
||||
result = await session.execute(
|
||||
sqlalchemy.select(EventLog).where(EventLog.event_id == event_id)
|
||||
)
|
||||
row = result.fetchone()
|
||||
row = result.scalars().first()
|
||||
if row is None:
|
||||
return None
|
||||
return self._row_to_dict(row[0])
|
||||
return self._row_to_dict(row)
|
||||
|
||||
async def page_events(
|
||||
self,
|
||||
@@ -153,7 +156,7 @@ class EventLogStore:
|
||||
"""
|
||||
limit = min(limit, 100) # Hard cap
|
||||
|
||||
async with self.engine.connect() as conn:
|
||||
async with self._session_factory() as session:
|
||||
query = sqlalchemy.select(EventLog)
|
||||
|
||||
if conversation_id is not None:
|
||||
@@ -167,10 +170,10 @@ class EventLogStore:
|
||||
|
||||
query = query.order_by(EventLog.id.desc()).limit(limit + 1)
|
||||
|
||||
result = await conn.execute(query)
|
||||
rows = result.fetchall()
|
||||
result = await session.execute(query)
|
||||
rows = result.scalars().all()
|
||||
|
||||
items = [self._row_to_dict(row[0]) for row in rows[:limit]]
|
||||
items = [self._row_to_dict(row) for row in rows[:limit]]
|
||||
has_more = len(rows) > limit
|
||||
next_seq = items[-1]['id'] if items and has_more else None
|
||||
|
||||
@@ -188,17 +191,17 @@ class EventLogStore:
|
||||
Returns:
|
||||
Cursor string (seq number), or None if no events
|
||||
"""
|
||||
async with self.engine.connect() as conn:
|
||||
result = await conn.execute(
|
||||
async with self._session_factory() as session:
|
||||
result = await session.execute(
|
||||
sqlalchemy.select(EventLog.id)
|
||||
.where(EventLog.conversation_id == conversation_id)
|
||||
.order_by(EventLog.id.desc())
|
||||
.limit(1)
|
||||
)
|
||||
row = result.fetchone()
|
||||
row = result.scalars().first()
|
||||
if row is None:
|
||||
return None
|
||||
return str(row[0])
|
||||
return str(row)
|
||||
|
||||
async def has_events_before(
|
||||
self,
|
||||
@@ -214,8 +217,8 @@ class EventLogStore:
|
||||
Returns:
|
||||
True if there are events before
|
||||
"""
|
||||
async with self.engine.connect() as conn:
|
||||
result = await conn.execute(
|
||||
async with self._session_factory() as session:
|
||||
result = await session.execute(
|
||||
sqlalchemy.select(sqlalchemy.func.count())
|
||||
.select_from(EventLog)
|
||||
.where(
|
||||
|
||||
Reference in New Issue
Block a user