mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-13 01:06:03 +00:00
Fix agent runner steering and lifecycle hardening
This commit is contained in:
@@ -212,6 +212,8 @@ class ArtifactStore:
|
||||
row = result.scalars().first()
|
||||
if row is None:
|
||||
return None
|
||||
if self._is_expired(row):
|
||||
return None
|
||||
return self._row_to_public_dict(row)
|
||||
|
||||
async def _get_internal_record(
|
||||
@@ -234,7 +236,10 @@ class ArtifactStore:
|
||||
AgentArtifact.artifact_id == artifact_id
|
||||
)
|
||||
)
|
||||
return result.scalars().first()
|
||||
record = result.scalars().first()
|
||||
if record is not None and self._is_expired(record):
|
||||
return None
|
||||
return record
|
||||
|
||||
async def read_artifact(
|
||||
self,
|
||||
@@ -321,6 +326,51 @@ class ArtifactStore:
|
||||
'has_more': False,
|
||||
}
|
||||
|
||||
async def cleanup_expired_artifacts(
|
||||
self,
|
||||
*,
|
||||
now: datetime.datetime | None = None,
|
||||
) -> int:
|
||||
"""Delete expired artifact metadata and Host-owned binary blobs.
|
||||
|
||||
Returns the number of artifact metadata rows removed. External/file
|
||||
storage references are only dereferenced from LangBot metadata; their
|
||||
backing lifecycle remains owned by the storage provider.
|
||||
"""
|
||||
if now is None:
|
||||
now = datetime.datetime.utcnow()
|
||||
|
||||
async with self._session_factory() as session:
|
||||
result = await session.execute(
|
||||
sqlalchemy.select(AgentArtifact).where(
|
||||
AgentArtifact.expires_at.is_not(None),
|
||||
AgentArtifact.expires_at <= now,
|
||||
)
|
||||
)
|
||||
expired = result.scalars().all()
|
||||
if not expired:
|
||||
return 0
|
||||
|
||||
binary_storage_keys = [
|
||||
artifact.storage_key
|
||||
for artifact in expired
|
||||
if artifact.storage_type == 'binary_storage' and artifact.storage_key
|
||||
]
|
||||
if binary_storage_keys:
|
||||
await session.execute(
|
||||
sqlalchemy.delete(BinaryStorage).where(
|
||||
BinaryStorage.unique_key.in_(binary_storage_keys)
|
||||
)
|
||||
)
|
||||
|
||||
await session.execute(
|
||||
sqlalchemy.delete(AgentArtifact).where(
|
||||
AgentArtifact.id.in_([artifact.id for artifact in expired])
|
||||
)
|
||||
)
|
||||
await session.commit()
|
||||
return len(expired)
|
||||
|
||||
async def _read_binary_storage(self, key: str) -> bytes | None:
|
||||
"""Read content from BinaryStorage.
|
||||
|
||||
@@ -407,6 +457,17 @@ class ArtifactStore:
|
||||
metadata.pop(_FILE_ARTIFACT_METADATA_KEY, None)
|
||||
return metadata
|
||||
|
||||
@staticmethod
|
||||
def _is_expired(
|
||||
row: AgentArtifact,
|
||||
now: datetime.datetime | None = None,
|
||||
) -> bool:
|
||||
if row.expires_at is None:
|
||||
return False
|
||||
if now is None:
|
||||
now = datetime.datetime.utcnow()
|
||||
return row.expires_at <= now
|
||||
|
||||
def _row_to_public_dict(self, row: AgentArtifact) -> dict[str, typing.Any]:
|
||||
"""Convert an AgentArtifact row to public dict.
|
||||
|
||||
|
||||
@@ -228,6 +228,18 @@ class EventLogStore:
|
||||
count = result.scalar()
|
||||
return count > 0
|
||||
|
||||
async def cleanup_events_older_than(
|
||||
self,
|
||||
before: datetime.datetime,
|
||||
) -> int:
|
||||
"""Delete EventLog rows created before the supplied timestamp."""
|
||||
async with self._session_factory() as session:
|
||||
result = await session.execute(
|
||||
sqlalchemy.delete(EventLog).where(EventLog.created_at < before)
|
||||
)
|
||||
await session.commit()
|
||||
return result.rowcount or 0
|
||||
|
||||
def _row_to_dict(self, row: EventLog) -> dict[str, typing.Any]:
|
||||
"""Convert an EventLog row to dict."""
|
||||
return {
|
||||
|
||||
@@ -134,9 +134,39 @@ class AgentRunOrchestrator:
|
||||
)
|
||||
|
||||
pending_artifact_refs: list[dict[str, typing.Any]] = []
|
||||
seen_sequences: set[int] = set()
|
||||
last_sequence = 0
|
||||
|
||||
try:
|
||||
async for result_dict in self.invoker.invoke(descriptor, context):
|
||||
sequence = result_dict.get('sequence')
|
||||
if sequence is not None:
|
||||
try:
|
||||
sequence_int = int(sequence)
|
||||
except (TypeError, ValueError):
|
||||
self.ap.logger.warning(
|
||||
f'Runner {descriptor.id} returned invalid result sequence: {sequence}'
|
||||
)
|
||||
else:
|
||||
if sequence_int in seen_sequences:
|
||||
self.ap.logger.warning(
|
||||
f'Runner {descriptor.id} returned duplicate result sequence '
|
||||
f'{sequence_int} for run {run_id}; dropping duplicate'
|
||||
)
|
||||
continue
|
||||
if sequence_int <= 0:
|
||||
self.ap.logger.warning(
|
||||
f'Runner {descriptor.id} returned non-positive result sequence '
|
||||
f'{sequence_int} for run {run_id}'
|
||||
)
|
||||
elif last_sequence and sequence_int != last_sequence + 1:
|
||||
self.ap.logger.warning(
|
||||
f'Runner {descriptor.id} result sequence gap or out-of-order '
|
||||
f'for run {run_id}: previous={last_sequence}, current={sequence_int}'
|
||||
)
|
||||
seen_sequences.add(sequence_int)
|
||||
last_sequence = max(last_sequence, sequence_int)
|
||||
|
||||
result_type = result_dict.get('type')
|
||||
if result_type and not self.result_normalizer.validate_payload(
|
||||
result_type,
|
||||
@@ -180,7 +210,20 @@ class AgentRunOrchestrator:
|
||||
if result is not None:
|
||||
yield result
|
||||
finally:
|
||||
await self._session_registry.unregister(run_id)
|
||||
session = await self._session_registry.unregister(run_id)
|
||||
pending_steering = session.get('steering_queue', []) if session else []
|
||||
if pending_steering:
|
||||
try:
|
||||
await self.journal.write_steering_dropped_audits(
|
||||
pending_steering,
|
||||
run_id,
|
||||
descriptor.id,
|
||||
)
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(
|
||||
f'Failed to write dropped steering audit for run {run_id}: {exc}',
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
async def run_from_query(
|
||||
self,
|
||||
|
||||
@@ -386,6 +386,76 @@ class AgentRunJournal:
|
||||
|
||||
return merged
|
||||
|
||||
async def write_steering_dropped_audits(
|
||||
self,
|
||||
items: list[dict[str, typing.Any]],
|
||||
run_id: str,
|
||||
runner_id: str,
|
||||
*,
|
||||
reason: str = 'run_ended',
|
||||
) -> None:
|
||||
"""Write terminal audit events for steering items left unconsumed."""
|
||||
if not items:
|
||||
return
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from .event_log_store import EventLogStore
|
||||
|
||||
store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
|
||||
|
||||
for item in items:
|
||||
event = item.get('event') if isinstance(item.get('event'), dict) else {}
|
||||
input_data = item.get('input') if isinstance(item.get('input'), dict) else {}
|
||||
conversation = item.get('conversation') if isinstance(item.get('conversation'), dict) else {}
|
||||
actor = item.get('actor') if isinstance(item.get('actor'), dict) else {}
|
||||
subject = item.get('subject') if isinstance(item.get('subject'), dict) else {}
|
||||
|
||||
text = input_data.get('text')
|
||||
input_summary = text[:1000] if isinstance(text, str) and text else 'Unconsumed steering input dropped'
|
||||
event_time = None
|
||||
raw_event_time = event.get('event_time')
|
||||
if raw_event_time:
|
||||
try:
|
||||
event_time = datetime.datetime.fromtimestamp(raw_event_time)
|
||||
except (TypeError, ValueError, OSError):
|
||||
event_time = None
|
||||
|
||||
await store.append_event(
|
||||
event_id=str(uuid.uuid4()),
|
||||
event_type='steering.dropped',
|
||||
source='host',
|
||||
bot_id=conversation.get('bot_id'),
|
||||
workspace_id=conversation.get('workspace_id'),
|
||||
conversation_id=conversation.get('conversation_id'),
|
||||
thread_id=conversation.get('thread_id'),
|
||||
actor_type=actor.get('actor_type'),
|
||||
actor_id=actor.get('actor_id'),
|
||||
actor_name=actor.get('actor_name'),
|
||||
subject_type=subject.get('subject_type'),
|
||||
subject_id=subject.get('subject_id'),
|
||||
input_summary=input_summary,
|
||||
input_json={
|
||||
'text': text,
|
||||
'contents': input_data.get('contents') or [],
|
||||
'attachments': input_data.get('attachments') or [],
|
||||
},
|
||||
run_id=run_id,
|
||||
runner_id=runner_id,
|
||||
event_time=event_time,
|
||||
metadata={
|
||||
'steering': {
|
||||
'status': 'dropped',
|
||||
'reason': reason,
|
||||
'original_event_id': event.get('event_id'),
|
||||
'claimed_run_id': item.get('claimed_run_id'),
|
||||
'claimed_runner_id': item.get('runner_id'),
|
||||
'claimed_at': item.get('claimed_at'),
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
async def write_assistant_transcript(
|
||||
self,
|
||||
result_dict: dict[str, typing.Any],
|
||||
|
||||
@@ -10,6 +10,9 @@ import threading
|
||||
from .context_builder import AgentResources
|
||||
|
||||
|
||||
MAX_STEERING_QUEUE_ITEMS = 100
|
||||
|
||||
|
||||
class AgentRunSessionStatus(typing.TypedDict):
|
||||
"""Status tracking for agent run session."""
|
||||
started_at: int
|
||||
@@ -148,15 +151,18 @@ class AgentRunSessionRegistry:
|
||||
'file': {f.get('file_id') for f in resources.get('files', [])},
|
||||
}
|
||||
|
||||
async def unregister(self, run_id: str) -> None:
|
||||
async def unregister(self, run_id: str) -> AgentRunSession | None:
|
||||
"""Unregister an agent run session.
|
||||
|
||||
Args:
|
||||
run_id: Unique run identifier
|
||||
|
||||
Returns:
|
||||
The removed session, if one existed. Callers can inspect any
|
||||
pending in-memory queues before they are discarded.
|
||||
"""
|
||||
async with self._lock:
|
||||
if run_id in self._sessions:
|
||||
del self._sessions[run_id]
|
||||
return self._sessions.pop(run_id, None)
|
||||
|
||||
async def get(self, run_id: str) -> AgentRunSession | None:
|
||||
"""Get session by run_id.
|
||||
@@ -215,6 +221,8 @@ class AgentRunSessionRegistry:
|
||||
session = self._sessions.get(run_id)
|
||||
if session is None:
|
||||
return False
|
||||
if len(session['steering_queue']) >= MAX_STEERING_QUEUE_ITEMS:
|
||||
return False
|
||||
session['steering_queue'].append(copy.deepcopy(item))
|
||||
session['status']['last_activity_at'] = int(time.time())
|
||||
return True
|
||||
|
||||
@@ -276,6 +276,18 @@ class TranscriptStore:
|
||||
count = result.scalar()
|
||||
return count > 0
|
||||
|
||||
async def cleanup_transcripts_older_than(
|
||||
self,
|
||||
before: datetime.datetime,
|
||||
) -> int:
|
||||
"""Delete Transcript rows created before the supplied timestamp."""
|
||||
async with self._session_factory() as session:
|
||||
result = await session.execute(
|
||||
sqlalchemy.delete(Transcript).where(Transcript.created_at < before)
|
||||
)
|
||||
await session.commit()
|
||||
return result.rowcount or 0
|
||||
|
||||
async def _get_next_seq(self, conversation_id: str) -> int:
|
||||
"""Fallback next sequence number for stores that cannot expose autoincrement IDs."""
|
||||
async with self._session_factory() as session:
|
||||
|
||||
@@ -31,22 +31,29 @@ class Controller:
|
||||
semaphore; otherwise the active run can finish before the query reaches
|
||||
ChatMessageHandler.try_claim_steering_from_query.
|
||||
"""
|
||||
pipeline_uuid = query.pipeline_uuid
|
||||
if not pipeline_uuid:
|
||||
try:
|
||||
pipeline_uuid = query.pipeline_uuid
|
||||
if not pipeline_uuid:
|
||||
return False
|
||||
|
||||
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
|
||||
if not pipeline:
|
||||
return False
|
||||
|
||||
session = await self.ap.sess_mgr.get_session(query)
|
||||
query.session = session
|
||||
query.pipeline_config = pipeline.pipeline_entity.config
|
||||
query.variables['_pipeline_bound_plugins'] = pipeline.bound_plugins
|
||||
query.variables['_pipeline_bound_mcp_servers'] = pipeline.bound_mcp_servers
|
||||
|
||||
return await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query)
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(
|
||||
f'Failed to claim query {query.query_id} as steering input: {exc}',
|
||||
exc_info=True,
|
||||
)
|
||||
return False
|
||||
|
||||
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
|
||||
if not pipeline:
|
||||
return False
|
||||
|
||||
session = await self.ap.sess_mgr.get_session(query)
|
||||
query.session = session
|
||||
query.pipeline_config = pipeline.pipeline_entity.config
|
||||
query.variables['_pipeline_bound_plugins'] = pipeline.bound_plugins
|
||||
query.variables['_pipeline_bound_mcp_servers'] = pipeline.bound_mcp_servers
|
||||
|
||||
return await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query)
|
||||
|
||||
async def consumer(self):
|
||||
"""事件处理循环"""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user