diff --git a/docs/agent-runner-pluginization/AGENT_RUNNER_QA_GUIDE.md b/docs/agent-runner-pluginization/AGENT_RUNNER_QA_GUIDE.md index 106cf27e..2aa51702 100644 --- a/docs/agent-runner-pluginization/AGENT_RUNNER_QA_GUIDE.md +++ b/docs/agent-runner-pluginization/AGENT_RUNNER_QA_GUIDE.md @@ -145,6 +145,7 @@ bin/lbs case list | LA-06 | 多模态 | 发送图片输入。 | `ctx.input.contents` 保留图片;支持视觉模型时正常处理,不支持时受控失败。 | | LA-07 | fallback / 错误 | 模拟 primary 模型失败或 runner 抛错。 | fallback 或 `run.failed` 行为受控;后续请求不受影响。 | | LA-08 | 无输出保护 | 测试 runner 完成但不产出消息。 | 不产生空白成功回复;按受控失败或明确缺陷处理。 | +| LA-09 | steering / 运行中追加消息 | 使用支持 steering 的 runner,第一条消息触发长 run;run 未结束时在同 conversation 追加第二条消息。 | 第二条消息被 active run claim,不启动并发 run;runner 通过 `steering_pull` 看到追加输入;EventLog 有 `queued` -> `steering.injected`,若未消费则有 `steering.dropped` 终态;后续普通消息仍可处理。 | Rerank、remove-think、文件输入等场景只在本次改动直接涉及时补测,不作为每轮必跑项。 @@ -195,7 +196,10 @@ Dify、n8n、Coze、DashScope、Langflow、Tbox 等外部服务 runner 不作为 | run_id 结束后复用被拒绝 | session registry 注销测试。 | | 插件身份不匹配被拒绝 | `caller_plugin_identity` mismatch 测试。 | | 绑定插件身份的 run_id 省略 caller identity 被拒绝 | `_validate_run_authorization(..., caller_plugin_identity=None)` 返回错误。 | +| 未注册 Runtime 连接伪造插件身份被剥离 | SDK runtime forwarding 测试:请求自带 `caller_plugin_identity` 时,未注册连接转发前必须 `pop`,已注册连接必须覆盖为真实插件身份。 | | storage/state scope 越权被拒绝 | state/storage proxy 单测。 | +| steering claim 异常不杀 consumer loop | controller 单测:无效 runner / registry 异常只让当前消息回到普通 session 槽位路径,消息消费循环继续。 | +| steering queue 未消费有终态 | session registry / orchestrator 单测:队列有上限;run unregister 时未 pull 项写 `steering.dropped` 审计。 | 如果这些单测失败,不能用 WebUI 正常回复替代。 diff --git a/docs/agent-runner-pluginization/PROTOCOL_V1.md b/docs/agent-runner-pluginization/PROTOCOL_V1.md index 6725da08..f0e55eb5 100644 --- a/docs/agent-runner-pluginization/PROTOCOL_V1.md +++ b/docs/agent-runner-pluginization/PROTOCOL_V1.md @@ -47,7 +47,8 @@ Agent / binding 的持久化形态。 - 新增可选字段保持向后兼容。 - 删除字段或改变既有字段语义,需要在 SDK 发布前完成;发布后应走新的显式兼容方案。 -- 结果流演进:Host **必须忽略未知 result type 并记录 warning**(除非该 type 明确要求强校验)。新增 result type 不提升大版本。 +- 结果流演进:Host **必须忽略未知 result type 并记录 warning**(除非该 type 明确要求强校验)。SDK envelope 接收入站未知 `type` 字符串,runner 侧可按原字符串转发或忽略;新增 result type 不提升大版本。 +- SDK 入站 context 类实体偏宽松,用于兼容 Host 附加的非核心字段;manifest、result payload、page/result 返回与错误模型偏严格,未知字段默认禁止。安全边界仍在 Host,SDK 校验只提升开发体验。 ## 4. Discovery 协议 @@ -65,11 +66,15 @@ class AgentRunnerDiscovery(BaseModel): runner_name: str runner_description: I18nObject | None = None manifest: AgentRunnerManifest + capabilities: AgentRunnerCapabilities # compatibility alias of manifest.capabilities + permissions: AgentRunnerPermissions # compatibility alias of manifest.permissions config: list[DynamicFormItemSchema] = [] ``` `manifest` 是 SDK typed `AgentRunnerManifest`,由 Runtime 从插件组件 manifest 解析并校验后返回。`plugin_author` / `plugin_name` / `runner_name` 保留为 transport 寻址字段;Host 以它们生成稳定 runner id,并把 `manifest.id` 校验为 `plugin:author/name/runner`。单个 runner manifest 解析失败时 Runtime/Host 记录 warning 并跳过该 runner,不影响同一插件或其它插件的 runner discovery。 +`capabilities` / `permissions` 顶层字段是兼容旧 discovery 消费方的冗余别名;新代码必须以 `manifest.capabilities` / `manifest.permissions` 为准。 + ### 4.2 AgentRunnerManifest 这里的 manifest 指 Runtime 返回给 Host 的 typed runner manifest: @@ -247,8 +252,10 @@ class ConversationContext(BaseModel): thread_id: str | None = None launcher_type: str | None = None launcher_id: str | None = None + sender_id: str | None = None bot_id: str | None = None workspace_id: str | None = None + session_id: str | None = None class ActorContext(BaseModel): actor_type: str @@ -335,7 +342,7 @@ class ContextAPICapabilities(BaseModel): ```python class AgentRuntimeContext(BaseModel): langbot_version: str | None = None - trace_id: str + trace_id: str | None = None deadline_at: float | None = None metadata: dict[str, Any] = {} ``` @@ -395,7 +402,7 @@ ResultType = Literal[ class AgentRunResult(BaseModel): run_id: str - type: AgentRunResultType + type: AgentRunResultType | str data: dict[str, Any] = {} sequence: int | None = None timestamp: int | None = None @@ -508,7 +515,7 @@ await api.get_langbot_version() `steering_pull(mode="all")` 是推荐默认:Host 按 claim 顺序返回全部 pending steering 输入并清空对应队列。`mode="one-at-a-time"` 仅用于 runner 主动节流,每次返回一条。Host 不合并多条用户消息;runner 负责在 turn 边界决定模型侧格式。 -Steering 审计使用 EventLog 而不是 Transcript schema 扩展:被 active run 吸收的原始 `message.received` 事件保留原事件类型,并在 `metadata.steering` 标记 `status="queued"`、`trigger_behavior="absorbed_into_active_run"`、`claimed_by_run_id`、`claimed_runner_id`、`claimed_at`。Runner 成功 pull 后,Host 追加 `steering.injected` EventLog 记录,`metadata.steering.status="injected"` 并引用 `source_event_id`。Transcript 继续只表示会话事实,不承担 dispatch 行为标记。 +Steering 审计使用 EventLog 而不是 Transcript schema 扩展:被 active run 吸收的原始 `message.received` 事件保留原事件类型,并在 `metadata.steering` 标记 `status="queued"`、`trigger_behavior="absorbed_into_active_run"`、`claimed_by_run_id`、`claimed_runner_id`、`claimed_at`。Runner 成功 pull 后,Host 追加 `steering.injected` EventLog 记录,`metadata.steering.status="injected"` 并引用 `source_event_id`。若 run 结束时仍有已 claim 但未 pull 的 steering 输入,Host 追加 `steering.dropped` EventLog 记录,`metadata.steering.status="dropped"` 并引用 `source_event_id`;这不是用户消息事实的删除,只是 dispatch 终态。Transcript 继续只表示会话事实,不承担 dispatch 行为标记。 `state` 与 `storage` 的建议边界:`state` 放小型 JSON(conversation / actor / subject / runner),`storage` 放 blob 或较大数据(插件私有数据、workspace 数据、checkpoint)。 @@ -650,6 +657,8 @@ class AgentAPIError(BaseModel): | `invalid_argument` | 参数错误。 | | `runtime_error` | Host 或下游能力错误。 | +SDK runner-facing proxy 在 Host 返回结构化错误或畸形响应时抛出 `AgentAPIException`,其中 `error` 字段为 `AgentAPIError`。Legacy transport 只返回字符串错误时,SDK 使用 `host.action_error` 包装,避免 runner 继续依赖裸 `KeyError` 或字符串匹配。 + Runner 失败使用 `run.failed`: ```json diff --git a/docs/agent-runner-pluginization/README.md b/docs/agent-runner-pluginization/README.md index 2bee0a2f..e8051b81 100644 --- a/docs/agent-runner-pluginization/README.md +++ b/docs/agent-runner-pluginization/README.md @@ -82,7 +82,7 @@ EventGateway / EventRouter 在本文档中描述为 **external EBA branch integr | [EVENT_BASED_AGENT.md](./EVENT_BASED_AGENT.md) | EBA 接入边界:事件模型、事件来源、触发绑定、非消息事件如何复用 AgentRunner 调度;完整 EventGateway / EventRouter 由外部 EBA 分支联调。 | | [RUNTIME_CONTROL_PLANE_V2.md](./RUNTIME_CONTROL_PLANE_V2.md) | Agent Platform v2 / runtime 管控面决策:第一阶段优先把 `AgentRun` / `AgentRunEvent` / run control 做成 Host 事实源;完整 runtime registry / daemon 管控是后续可选阶段。**标注为 future design note**。 | | [OFFICIAL_RUNNER_PLUGINS.md](./OFFICIAL_RUNNER_PLUGINS.md) | 官方 runner 插件迁移,包括 local-agent 和外部 runner。它是下游落地计划,不是 LangBot 基础能力设计的前置约束。 | -| [RUN_STEERING_AND_CHECKPOINT.md](./RUN_STEERING_AND_CHECKPOINT.md) | 运行中消息注入(steering / follow-up)与压缩摘要持久化(compaction checkpoint)的 Host 能力缺口设计:来自 local-agent 对照 Pi agent harness 的差距分析。**标注为 future design note**。 | +| [RUN_STEERING_AND_CHECKPOINT.md](./RUN_STEERING_AND_CHECKPOINT.md) | 运行中消息注入(steering / follow-up)与压缩摘要持久化(compaction checkpoint)的设计与落地状态记录;schema 仍以 PROTOCOL_V1 为准。 | | [STATUS.md](./STATUS.md) | 当前实现状态、spec 与实现已知差距、runner 验收状态和历史高价值记录。 | | [AGENT_RUNNER_QA_GUIDE.md](./AGENT_RUNNER_QA_GUIDE.md) | Agent Runner QA 指南:保留最高价值测试路径,指导 agent 开展下一轮 WebUI / runner smoke 验证。 | | [SECURITY_HARDENING.md](./SECURITY_HARDENING.md) | 安全发布级 hardening 的后续发布门槛:路径隔离、权限边界、secret、资源配额、MCP / skill 投影和审计。 | diff --git a/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md b/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md index 322c489d..cc0b1113 100644 --- a/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md +++ b/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md @@ -52,6 +52,8 @@ pi-agent-core 区分两个队列,注入时机都在 turn 边界,不打断进 - **回执**:被 steering 消费的事件通过 EventLog 审计。原始 `message.received` 记录在 `metadata.steering` 标记 queued/absorbed 与 `claimed_by_run_id`; runner 成功 pull 后,Host 追加 `steering.injected` 记录并引用源事件。 + run 结束时仍未被 pull 的已 claim 输入,Host 追加 `steering.dropped` 记录作为 + dispatch 终态;原始 Transcript 事实不删除。 Transcript 继续只表示会话事实,不扩展 dispatch 行为字段。 已落地的协议面(最终定义归 PROTOCOL_V1): @@ -61,6 +63,8 @@ pi-agent-core 区分两个队列,注入时机都在 turn 边界,不打断进 pending 输入;`one-at-a-time` 仅作为 runner 主动节流选项。 3. dispatch 层的"认领"规则:`message.received` 可被同 conversation 的 active run 吸收,原事件写 EventLog / Transcript,dispatch 行为写入 EventLog metadata。 +4. Host 对单 run steering queue 设置内存上限,队列满时不再 claim 新消息,消息回到 + 正常 dispatch 路径,避免 active run 无限吞入同会话输入。 ### 1.4 边界 @@ -131,7 +135,7 @@ pi-agent-core 把 compaction 条目持久化进 session tree:摘要带 | 项 | 归属 | 依赖 | | --- | --- | --- | -| steering queue、事件认领、基础审计 | LangBot Host(dispatch / binding 层) | 已落地 | +| steering queue、事件认领、基础审计 | LangBot Host(dispatch / binding 层) | 已落地,含队列上限与未消费 dropped 终态 | | steering pull API + capability 位 | PROTOCOL_V1 + SDK proxy | 已落地 | | turn 边界拉取与注入 | langbot-local-agent | 已落地 | | local-agent 对 state API 的 checkpoint 读写 | langbot-local-agent | 已落地 | diff --git a/docs/agent-runner-pluginization/STATUS.md b/docs/agent-runner-pluginization/STATUS.md index 234812d6..6c1a0944 100644 --- a/docs/agent-runner-pluginization/STATUS.md +++ b/docs/agent-runner-pluginization/STATUS.md @@ -2,7 +2,7 @@ 本文档是 `docs/agent-runner-pluginization/` 的状态事实源。协议 schema 仍以 [PROTOCOL_V1.md](./PROTOCOL_V1.md) 为准;测试步骤以 [AGENT_RUNNER_QA_GUIDE.md](./AGENT_RUNNER_QA_GUIDE.md) 为准;安全发布门槛以 [SECURITY_HARDENING.md](./SECURITY_HARDENING.md) 为准。 -状态快照日期:2026-06-10。 +状态快照日期:2026-06-12。 ## 实现状态 @@ -17,12 +17,15 @@ | Official runner manifests | Done | `local-agent`、LiteLLM Agent Platform、外部服务 runner 已重新声明真实生效的 LangBot resource permissions。 | | Runtime Control Plane v2 | Future | 第一阶段设计为 Host-owned Run Ledger;runtime registry / heartbeat / daemon claim 是后续可选阶段。 | | Full release security gate | Future | self-host / container opt-in 可继续;managed/default external harness 需完成 SECURITY_HARDENING full gate。 | +| Steering control path | Done | claim 异常不再逃逸 consumer loop;queue 有上限;未 pull 的 claimed 输入在 run 结束时写 `steering.dropped` 审计终态。 | +| SDK v1 contract closure | Done | SDK 提供 `AgentAPIError` / `AgentAPIException`、typed `SteeringPullResult`、未知 result type 宽容解析、result `sequence` 注入与取消传播。 | ## Spec 与实现已知差距 - `action.requested` 仍只作为 telemetry / reserved surface;platform action executor 不在本分支执行。 - EventGateway / EventRouter 完整实现由外部 EBA 分支联调;本分支只提供 event-first host envelope / binding / run 入口。 - State 与 storage 的长期类型边界仍可继续收窄;当前合同只要求 JSON-safe state 与受控 storage API。 +- Artifact 读取路径已检查 `expires_at`,EventLog / Transcript / Artifact 已提供显式 cleanup primitive;长期 retention 默认值、TTL 调度接入和大 payload 去重仍是运维收尾项,应在 Runtime Control Plane Phase 1 前补齐。 - External harness 的 native shell / filesystem / CLI / MCP 权限不受 manifest permissions 约束;manifest permissions 只约束 LangBot 持有的资源访问。 - Managed/cloud/default external harness 的 OS/process/network quota、workspace GC、完整 audit/admin control 仍是发布门槛,不是 Protocol v1 已完成能力。 diff --git a/src/langbot/pkg/agent/runner/artifact_store.py b/src/langbot/pkg/agent/runner/artifact_store.py index 888e8c0e..0ed2e9d6 100644 --- a/src/langbot/pkg/agent/runner/artifact_store.py +++ b/src/langbot/pkg/agent/runner/artifact_store.py @@ -212,6 +212,8 @@ class ArtifactStore: row = result.scalars().first() if row is None: return None + if self._is_expired(row): + return None return self._row_to_public_dict(row) async def _get_internal_record( @@ -234,7 +236,10 @@ class ArtifactStore: AgentArtifact.artifact_id == artifact_id ) ) - return result.scalars().first() + record = result.scalars().first() + if record is not None and self._is_expired(record): + return None + return record async def read_artifact( self, @@ -321,6 +326,51 @@ class ArtifactStore: 'has_more': False, } + async def cleanup_expired_artifacts( + self, + *, + now: datetime.datetime | None = None, + ) -> int: + """Delete expired artifact metadata and Host-owned binary blobs. + + Returns the number of artifact metadata rows removed. External/file + storage references are only dereferenced from LangBot metadata; their + backing lifecycle remains owned by the storage provider. + """ + if now is None: + now = datetime.datetime.utcnow() + + async with self._session_factory() as session: + result = await session.execute( + sqlalchemy.select(AgentArtifact).where( + AgentArtifact.expires_at.is_not(None), + AgentArtifact.expires_at <= now, + ) + ) + expired = result.scalars().all() + if not expired: + return 0 + + binary_storage_keys = [ + artifact.storage_key + for artifact in expired + if artifact.storage_type == 'binary_storage' and artifact.storage_key + ] + if binary_storage_keys: + await session.execute( + sqlalchemy.delete(BinaryStorage).where( + BinaryStorage.unique_key.in_(binary_storage_keys) + ) + ) + + await session.execute( + sqlalchemy.delete(AgentArtifact).where( + AgentArtifact.id.in_([artifact.id for artifact in expired]) + ) + ) + await session.commit() + return len(expired) + async def _read_binary_storage(self, key: str) -> bytes | None: """Read content from BinaryStorage. @@ -407,6 +457,17 @@ class ArtifactStore: metadata.pop(_FILE_ARTIFACT_METADATA_KEY, None) return metadata + @staticmethod + def _is_expired( + row: AgentArtifact, + now: datetime.datetime | None = None, + ) -> bool: + if row.expires_at is None: + return False + if now is None: + now = datetime.datetime.utcnow() + return row.expires_at <= now + def _row_to_public_dict(self, row: AgentArtifact) -> dict[str, typing.Any]: """Convert an AgentArtifact row to public dict. diff --git a/src/langbot/pkg/agent/runner/event_log_store.py b/src/langbot/pkg/agent/runner/event_log_store.py index 134d07df..7e146c66 100644 --- a/src/langbot/pkg/agent/runner/event_log_store.py +++ b/src/langbot/pkg/agent/runner/event_log_store.py @@ -228,6 +228,18 @@ class EventLogStore: count = result.scalar() return count > 0 + async def cleanup_events_older_than( + self, + before: datetime.datetime, + ) -> int: + """Delete EventLog rows created before the supplied timestamp.""" + async with self._session_factory() as session: + result = await session.execute( + sqlalchemy.delete(EventLog).where(EventLog.created_at < before) + ) + await session.commit() + return result.rowcount or 0 + def _row_to_dict(self, row: EventLog) -> dict[str, typing.Any]: """Convert an EventLog row to dict.""" return { diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 27b05d11..0be102f8 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -134,9 +134,39 @@ class AgentRunOrchestrator: ) pending_artifact_refs: list[dict[str, typing.Any]] = [] + seen_sequences: set[int] = set() + last_sequence = 0 try: async for result_dict in self.invoker.invoke(descriptor, context): + sequence = result_dict.get('sequence') + if sequence is not None: + try: + sequence_int = int(sequence) + except (TypeError, ValueError): + self.ap.logger.warning( + f'Runner {descriptor.id} returned invalid result sequence: {sequence}' + ) + else: + if sequence_int in seen_sequences: + self.ap.logger.warning( + f'Runner {descriptor.id} returned duplicate result sequence ' + f'{sequence_int} for run {run_id}; dropping duplicate' + ) + continue + if sequence_int <= 0: + self.ap.logger.warning( + f'Runner {descriptor.id} returned non-positive result sequence ' + f'{sequence_int} for run {run_id}' + ) + elif last_sequence and sequence_int != last_sequence + 1: + self.ap.logger.warning( + f'Runner {descriptor.id} result sequence gap or out-of-order ' + f'for run {run_id}: previous={last_sequence}, current={sequence_int}' + ) + seen_sequences.add(sequence_int) + last_sequence = max(last_sequence, sequence_int) + result_type = result_dict.get('type') if result_type and not self.result_normalizer.validate_payload( result_type, @@ -180,7 +210,20 @@ class AgentRunOrchestrator: if result is not None: yield result finally: - await self._session_registry.unregister(run_id) + session = await self._session_registry.unregister(run_id) + pending_steering = session.get('steering_queue', []) if session else [] + if pending_steering: + try: + await self.journal.write_steering_dropped_audits( + pending_steering, + run_id, + descriptor.id, + ) + except Exception as exc: + self.ap.logger.warning( + f'Failed to write dropped steering audit for run {run_id}: {exc}', + exc_info=True, + ) async def run_from_query( self, diff --git a/src/langbot/pkg/agent/runner/run_journal.py b/src/langbot/pkg/agent/runner/run_journal.py index 61b83ee0..30405f98 100644 --- a/src/langbot/pkg/agent/runner/run_journal.py +++ b/src/langbot/pkg/agent/runner/run_journal.py @@ -386,6 +386,76 @@ class AgentRunJournal: return merged + async def write_steering_dropped_audits( + self, + items: list[dict[str, typing.Any]], + run_id: str, + runner_id: str, + *, + reason: str = 'run_ended', + ) -> None: + """Write terminal audit events for steering items left unconsumed.""" + if not items: + return + + import datetime + import uuid + + from .event_log_store import EventLogStore + + store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + + for item in items: + event = item.get('event') if isinstance(item.get('event'), dict) else {} + input_data = item.get('input') if isinstance(item.get('input'), dict) else {} + conversation = item.get('conversation') if isinstance(item.get('conversation'), dict) else {} + actor = item.get('actor') if isinstance(item.get('actor'), dict) else {} + subject = item.get('subject') if isinstance(item.get('subject'), dict) else {} + + text = input_data.get('text') + input_summary = text[:1000] if isinstance(text, str) and text else 'Unconsumed steering input dropped' + event_time = None + raw_event_time = event.get('event_time') + if raw_event_time: + try: + event_time = datetime.datetime.fromtimestamp(raw_event_time) + except (TypeError, ValueError, OSError): + event_time = None + + await store.append_event( + event_id=str(uuid.uuid4()), + event_type='steering.dropped', + source='host', + bot_id=conversation.get('bot_id'), + workspace_id=conversation.get('workspace_id'), + conversation_id=conversation.get('conversation_id'), + thread_id=conversation.get('thread_id'), + actor_type=actor.get('actor_type'), + actor_id=actor.get('actor_id'), + actor_name=actor.get('actor_name'), + subject_type=subject.get('subject_type'), + subject_id=subject.get('subject_id'), + input_summary=input_summary, + input_json={ + 'text': text, + 'contents': input_data.get('contents') or [], + 'attachments': input_data.get('attachments') or [], + }, + run_id=run_id, + runner_id=runner_id, + event_time=event_time, + metadata={ + 'steering': { + 'status': 'dropped', + 'reason': reason, + 'original_event_id': event.get('event_id'), + 'claimed_run_id': item.get('claimed_run_id'), + 'claimed_runner_id': item.get('runner_id'), + 'claimed_at': item.get('claimed_at'), + }, + }, + ) + async def write_assistant_transcript( self, result_dict: dict[str, typing.Any], diff --git a/src/langbot/pkg/agent/runner/session_registry.py b/src/langbot/pkg/agent/runner/session_registry.py index 289f777b..85bce3b4 100644 --- a/src/langbot/pkg/agent/runner/session_registry.py +++ b/src/langbot/pkg/agent/runner/session_registry.py @@ -10,6 +10,9 @@ import threading from .context_builder import AgentResources +MAX_STEERING_QUEUE_ITEMS = 100 + + class AgentRunSessionStatus(typing.TypedDict): """Status tracking for agent run session.""" started_at: int @@ -148,15 +151,18 @@ class AgentRunSessionRegistry: 'file': {f.get('file_id') for f in resources.get('files', [])}, } - async def unregister(self, run_id: str) -> None: + async def unregister(self, run_id: str) -> AgentRunSession | None: """Unregister an agent run session. Args: run_id: Unique run identifier + + Returns: + The removed session, if one existed. Callers can inspect any + pending in-memory queues before they are discarded. """ async with self._lock: - if run_id in self._sessions: - del self._sessions[run_id] + return self._sessions.pop(run_id, None) async def get(self, run_id: str) -> AgentRunSession | None: """Get session by run_id. @@ -215,6 +221,8 @@ class AgentRunSessionRegistry: session = self._sessions.get(run_id) if session is None: return False + if len(session['steering_queue']) >= MAX_STEERING_QUEUE_ITEMS: + return False session['steering_queue'].append(copy.deepcopy(item)) session['status']['last_activity_at'] = int(time.time()) return True diff --git a/src/langbot/pkg/agent/runner/transcript_store.py b/src/langbot/pkg/agent/runner/transcript_store.py index b1d7487b..a26436b6 100644 --- a/src/langbot/pkg/agent/runner/transcript_store.py +++ b/src/langbot/pkg/agent/runner/transcript_store.py @@ -276,6 +276,18 @@ class TranscriptStore: count = result.scalar() return count > 0 + async def cleanup_transcripts_older_than( + self, + before: datetime.datetime, + ) -> int: + """Delete Transcript rows created before the supplied timestamp.""" + async with self._session_factory() as session: + result = await session.execute( + sqlalchemy.delete(Transcript).where(Transcript.created_at < before) + ) + await session.commit() + return result.rowcount or 0 + async def _get_next_seq(self, conversation_id: str) -> int: """Fallback next sequence number for stores that cannot expose autoincrement IDs.""" async with self._session_factory() as session: diff --git a/src/langbot/pkg/pipeline/controller.py b/src/langbot/pkg/pipeline/controller.py index ed5a520f..15af3d21 100644 --- a/src/langbot/pkg/pipeline/controller.py +++ b/src/langbot/pkg/pipeline/controller.py @@ -31,22 +31,29 @@ class Controller: semaphore; otherwise the active run can finish before the query reaches ChatMessageHandler.try_claim_steering_from_query. """ - pipeline_uuid = query.pipeline_uuid - if not pipeline_uuid: + try: + pipeline_uuid = query.pipeline_uuid + if not pipeline_uuid: + return False + + pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid) + if not pipeline: + return False + + session = await self.ap.sess_mgr.get_session(query) + query.session = session + query.pipeline_config = pipeline.pipeline_entity.config + query.variables['_pipeline_bound_plugins'] = pipeline.bound_plugins + query.variables['_pipeline_bound_mcp_servers'] = pipeline.bound_mcp_servers + + return await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query) + except Exception as exc: + self.ap.logger.warning( + f'Failed to claim query {query.query_id} as steering input: {exc}', + exc_info=True, + ) return False - pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid) - if not pipeline: - return False - - session = await self.ap.sess_mgr.get_session(query) - query.session = session - query.pipeline_config = pipeline.pipeline_entity.config - query.variables['_pipeline_bound_plugins'] = pipeline.bound_plugins - query.variables['_pipeline_bound_mcp_servers'] = pipeline.bound_mcp_servers - - return await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query) - async def consumer(self): """事件处理循环""" try: diff --git a/tests/unit_tests/agent/test_artifact_store.py b/tests/unit_tests/agent/test_artifact_store.py index 64cad193..55e4fa28 100644 --- a/tests/unit_tests/agent/test_artifact_store.py +++ b/tests/unit_tests/agent/test_artifact_store.py @@ -566,6 +566,55 @@ class TestArtifactStoreRealSQLite: assert result["has_more"] is True assert result["length"] == 100 + @pytest.mark.asyncio + async def test_expired_artifact_is_not_readable_before_cleanup(self, db_engine): + """Expired artifacts are hidden even before a cleanup job deletes rows.""" + store = ArtifactStore(db_engine) + await store.register_artifact( + artifact_id="art_expired_hidden", + artifact_type="file", + source="runner", + content=b"expired", + expires_at=datetime.datetime.utcnow() - datetime.timedelta(seconds=1), + ) + + assert await store.get_metadata("art_expired_hidden") is None + assert await store.read_artifact("art_expired_hidden") is None + + @pytest.mark.asyncio + async def test_cleanup_expired_artifacts_deletes_binary_storage(self, db_engine): + """Expired artifacts and their Host-owned binary blobs are removed.""" + from sqlalchemy import select + from langbot.pkg.entity.persistence.bstorage import BinaryStorage + + store = ArtifactStore(db_engine) + now = datetime.datetime.utcnow() + await store.register_artifact( + artifact_id="art_expired", + artifact_type="file", + source="runner", + content=b"expired", + expires_at=now - datetime.timedelta(seconds=1), + ) + await store.register_artifact( + artifact_id="art_fresh", + artifact_type="file", + source="runner", + content=b"fresh", + expires_at=now + datetime.timedelta(days=1), + ) + + removed = await store.cleanup_expired_artifacts(now=now) + + assert removed == 1 + assert await store.get_metadata("art_expired") is None + assert await store.get_metadata("art_fresh") is not None + async with store._session_factory() as session: + result = await session.execute( + select(BinaryStorage).where(BinaryStorage.unique_key == "artifact:art_expired") + ) + assert result.scalars().first() is None + @pytest.mark.asyncio async def test_file_artifact_range_read_and_public_metadata(self, db_engine, tmp_path): """File-backed artifacts read ranges without exposing host paths.""" diff --git a/tests/unit_tests/agent/test_event_log_transcript.py b/tests/unit_tests/agent/test_event_log_transcript.py index 7fe1f0bc..0d46ed54 100644 --- a/tests/unit_tests/agent/test_event_log_transcript.py +++ b/tests/unit_tests/agent/test_event_log_transcript.py @@ -1,6 +1,8 @@ """Tests for EventLog, Transcript, and history/event APIs.""" from __future__ import annotations +import datetime + import pytest from langbot.pkg.agent.runner.host_models import ( @@ -505,6 +507,45 @@ class TestEventLogStoreRealSQLite: assert cursor is not None assert int(cursor) > 0 + @pytest.mark.asyncio + async def test_cleanup_events_older_than(self, db_engine): + """EventLog cleanup removes only rows older than the cutoff.""" + import sqlalchemy + from langbot.pkg.entity.persistence.event_log import EventLog + + store = EventLogStore(db_engine) + cutoff = datetime.datetime.utcnow() + await store.append_event( + event_id="evt_cleanup_old", + event_type="message.received", + source="platform", + conversation_id="conv_cleanup", + ) + await store.append_event( + event_id="evt_cleanup_new", + event_type="message.received", + source="platform", + conversation_id="conv_cleanup", + ) + async with store._session_factory() as session: + await session.execute( + sqlalchemy.update(EventLog) + .where(EventLog.event_id == "evt_cleanup_old") + .values(created_at=cutoff - datetime.timedelta(days=2)) + ) + await session.execute( + sqlalchemy.update(EventLog) + .where(EventLog.event_id == "evt_cleanup_new") + .values(created_at=cutoff + datetime.timedelta(days=2)) + ) + await session.commit() + + removed = await store.cleanup_events_older_than(cutoff) + + assert removed == 1 + assert await store.get_event("evt_cleanup_old") is None + assert await store.get_event("evt_cleanup_new") is not None + class TestTranscriptStoreRealSQLite: """Test TranscriptStore with real SQLite database.""" @@ -637,6 +678,47 @@ class TestTranscriptStoreRealSQLite: assert cursor is not None assert int(cursor) > 0 + @pytest.mark.asyncio + async def test_cleanup_transcripts_older_than(self, db_engine): + """Transcript cleanup removes only rows older than the cutoff.""" + import sqlalchemy + from langbot.pkg.entity.persistence.transcript import Transcript + + store = TranscriptStore(db_engine) + cutoff = datetime.datetime.utcnow() + await store.append_transcript( + transcript_id="trans_cleanup_old", + event_id="evt_cleanup_old", + conversation_id="conv_cleanup", + role="user", + content="old", + ) + await store.append_transcript( + transcript_id="trans_cleanup_new", + event_id="evt_cleanup_new", + conversation_id="conv_cleanup", + role="assistant", + content="new", + ) + async with store._session_factory() as session: + await session.execute( + sqlalchemy.update(Transcript) + .where(Transcript.transcript_id == "trans_cleanup_old") + .values(created_at=cutoff - datetime.timedelta(days=2)) + ) + await session.execute( + sqlalchemy.update(Transcript) + .where(Transcript.transcript_id == "trans_cleanup_new") + .values(created_at=cutoff + datetime.timedelta(days=2)) + ) + await session.commit() + + removed = await store.cleanup_transcripts_older_than(cutoff) + items, _, _, _ = await store.page_transcript("conv_cleanup", limit=10) + + assert removed == 1 + assert [item["content"] for item in items] == ["new"] + # Fixtures @pytest.fixture diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index ecd92eac..389fec91 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -28,14 +28,17 @@ RUNNER_ID = "plugin:langbot/local-agent/default" class FakeLogger: + def __init__(self): + self.warnings: list[str] = [] + def debug(self, msg): pass def info(self, msg): pass - def warning(self, msg): - pass + def warning(self, msg, *args, **kwargs): + self.warnings.append(str(msg)) def error(self, msg): pass @@ -424,6 +427,41 @@ async def test_orchestrator_streams_fake_plugin_deltas(clean_agent_state): assert [chunk.content for chunk in chunks] == ["hel", "hello"] +@pytest.mark.asyncio +async def test_orchestrator_drops_duplicate_result_sequence(clean_agent_state): + """Duplicate runner result sequences are idempotently ignored.""" + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.delta", + "sequence": 1, + "data": {"chunk": {"role": "assistant", "content": "first"}}, + }, + { + "type": "message.delta", + "sequence": 1, + "data": {"chunk": {"role": "assistant", "content": "duplicate"}}, + }, + { + "type": "message.delta", + "sequence": 3, + "data": {"chunk": {"role": "assistant", "content": "after-gap"}}, + }, + {"type": "run.completed", "sequence": 4, "data": {"finish_reason": "stop"}}, + ] + ) + ap = FakeApplication(plugin_connector, db_engine) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + + chunks = [message async for message in orchestrator.run_from_query(make_query())] + + assert [chunk.content for chunk in chunks] == ["first", "after-gap"] + assert any("duplicate result sequence 1" in warning for warning in ap.logger.warnings) + assert any("result sequence gap or out-of-order" in warning for warning in ap.logger.warnings) + + @pytest.mark.asyncio async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event(clean_agent_state): """Test that state.updated events are applied and not yielded to pipeline.""" diff --git a/tests/unit_tests/agent/test_session_registry.py b/tests/unit_tests/agent/test_session_registry.py index a926e281..37f0d15d 100644 --- a/tests/unit_tests/agent/test_session_registry.py +++ b/tests/unit_tests/agent/test_session_registry.py @@ -8,6 +8,7 @@ import time from langbot.pkg.agent.runner.session_registry import ( AgentRunSessionRegistry, AgentRunSession, + MAX_STEERING_QUEUE_ITEMS, get_session_registry, ) @@ -258,6 +259,59 @@ class TestSessionRegistryBasic: assert [item['event']['event_id'] for item in first] == ['event_1'] assert [item['event']['event_id'] for item in second] == ['event_2'] + @pytest.mark.asyncio + async def test_enqueue_steering_rejects_when_queue_is_full(self): + """A full steering queue does not claim more queries.""" + registry = AgentRunSessionRegistry() + await registry.register( + run_id='run_steering_full', + runner_id='plugin:test/my-runner/default', + query_id=1, + plugin_identity='test/my-runner', + resources=make_resources(), + conversation_id='conv_1', + available_apis={'steering_pull': True}, + ) + + for index in range(MAX_STEERING_QUEUE_ITEMS): + assert await registry.enqueue_steering( + 'run_steering_full', + {'event': {'event_id': f'event_{index}'}}, + ) + + assert not await registry.enqueue_steering( + 'run_steering_full', + {'event': {'event_id': 'overflow'}}, + ) + + items = await registry.pull_steering('run_steering_full', mode='all') + assert len(items) == MAX_STEERING_QUEUE_ITEMS + assert all(item['event']['event_id'] != 'overflow' for item in items) + + @pytest.mark.asyncio + async def test_unregister_returns_pending_steering_queue(self): + """Unregister returns the removed session so callers can audit pending steering.""" + registry = AgentRunSessionRegistry() + await registry.register( + run_id='run_steering_unregister', + runner_id='plugin:test/my-runner/default', + query_id=1, + plugin_identity='test/my-runner', + resources=make_resources(), + conversation_id='conv_1', + available_apis={'steering_pull': True}, + ) + await registry.enqueue_steering( + 'run_steering_unregister', + {'event': {'event_id': 'event_pending'}}, + ) + + session = await registry.unregister('run_steering_unregister') + + assert session is not None + assert session['steering_queue'][0]['event']['event_id'] == 'event_pending' + assert await registry.get('run_steering_unregister') is None + class TestIsResourceAllowed: """Tests for is_resource_allowed validation.""" diff --git a/tests/unit_tests/pipeline/test_controller.py b/tests/unit_tests/pipeline/test_controller.py new file mode 100644 index 00000000..8c5fc6d8 --- /dev/null +++ b/tests/unit_tests/pipeline/test_controller.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from langbot.pkg.agent.runner.errors import RunnerNotFoundError +from langbot.pkg.pipeline.controller import Controller + + +def make_app(): + app = SimpleNamespace() + app.instance_config = SimpleNamespace(data={'concurrency': {'pipeline': 10}}) + app.logger = MagicMock() + app.pipeline_mgr = SimpleNamespace() + app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock() + app.sess_mgr = SimpleNamespace() + app.sess_mgr.get_session = AsyncMock(return_value=SimpleNamespace()) + app.agent_run_orchestrator = SimpleNamespace() + app.agent_run_orchestrator.try_claim_steering_from_query = AsyncMock() + return app + + +def make_pipeline(): + return SimpleNamespace( + pipeline_entity=SimpleNamespace(config={'ai': {'runner': {'id': 'plugin:test/runner/default'}}}), + bound_plugins=['test/runner'], + bound_mcp_servers=[], + ) + + +@pytest.mark.asyncio +async def test_try_claim_steering_returns_false_when_runner_lookup_fails(): + app = make_app() + app.pipeline_mgr.get_pipeline_by_uuid.return_value = make_pipeline() + app.agent_run_orchestrator.try_claim_steering_from_query.side_effect = RunnerNotFoundError( + 'plugin:missing/runner/default' + ) + controller = Controller(app) + query = SimpleNamespace(query_id=1, pipeline_uuid='pipeline-001', variables={}) + + claimed = await controller._try_claim_steering_before_session_slot(query) + + assert claimed is False + app.logger.warning.assert_called_once() + + +@pytest.mark.asyncio +async def test_try_claim_steering_sets_pipeline_context_before_claiming(): + app = make_app() + pipeline = make_pipeline() + app.pipeline_mgr.get_pipeline_by_uuid.return_value = pipeline + app.agent_run_orchestrator.try_claim_steering_from_query.return_value = True + controller = Controller(app) + query = SimpleNamespace(query_id=2, pipeline_uuid='pipeline-002', variables={}) + + claimed = await controller._try_claim_steering_before_session_slot(query) + + assert claimed is True + assert query.pipeline_config is pipeline.pipeline_entity.config + assert query.variables['_pipeline_bound_plugins'] == ['test/runner'] + app.agent_run_orchestrator.try_claim_steering_from_query.assert_awaited_once_with(query)