Fix agent runner steering and lifecycle hardening

This commit is contained in:
huanghuoguoguo
2026-06-12 11:58:09 +08:00
parent 8da5fecfbf
commit 9f95c6bd0d
17 changed files with 547 additions and 28 deletions

View File

@@ -145,6 +145,7 @@ bin/lbs case list
| LA-06 | 多模态 | 发送图片输入。 | `ctx.input.contents` 保留图片;支持视觉模型时正常处理,不支持时受控失败。 |
| LA-07 | fallback / 错误 | 模拟 primary 模型失败或 runner 抛错。 | fallback 或 `run.failed` 行为受控;后续请求不受影响。 |
| LA-08 | 无输出保护 | 测试 runner 完成但不产出消息。 | 不产生空白成功回复;按受控失败或明确缺陷处理。 |
| LA-09 | steering / 运行中追加消息 | 使用支持 steering 的 runner第一条消息触发长 runrun 未结束时在同 conversation 追加第二条消息。 | 第二条消息被 active run claim不启动并发 runrunner 通过 `steering_pull` 看到追加输入EventLog 有 `queued` -> `steering.injected`,若未消费则有 `steering.dropped` 终态;后续普通消息仍可处理。 |
Rerank、remove-think、文件输入等场景只在本次改动直接涉及时补测不作为每轮必跑项。
@@ -195,7 +196,10 @@ Dify、n8n、Coze、DashScope、Langflow、Tbox 等外部服务 runner 不作为
| run_id 结束后复用被拒绝 | session registry 注销测试。 |
| 插件身份不匹配被拒绝 | `caller_plugin_identity` mismatch 测试。 |
| 绑定插件身份的 run_id 省略 caller identity 被拒绝 | `_validate_run_authorization(..., caller_plugin_identity=None)` 返回错误。 |
| 未注册 Runtime 连接伪造插件身份被剥离 | SDK runtime forwarding 测试:请求自带 `caller_plugin_identity` 时,未注册连接转发前必须 `pop`,已注册连接必须覆盖为真实插件身份。 |
| storage/state scope 越权被拒绝 | state/storage proxy 单测。 |
| steering claim 异常不杀 consumer loop | controller 单测:无效 runner / registry 异常只让当前消息回到普通 session 槽位路径,消息消费循环继续。 |
| steering queue 未消费有终态 | session registry / orchestrator 单测队列有上限run unregister 时未 pull 项写 `steering.dropped` 审计。 |
如果这些单测失败,不能用 WebUI 正常回复替代。

View File

@@ -47,7 +47,8 @@ Agent / binding 的持久化形态。
- 新增可选字段保持向后兼容。
- 删除字段或改变既有字段语义,需要在 SDK 发布前完成;发布后应走新的显式兼容方案。
- 结果流演进Host **必须忽略未知 result type 并记录 warning**(除非该 type 明确要求强校验)。新增 result type 不提升大版本。
- 结果流演进Host **必须忽略未知 result type 并记录 warning**(除非该 type 明确要求强校验)。SDK envelope 接收入站未知 `type` 字符串runner 侧可按原字符串转发或忽略;新增 result type 不提升大版本。
- SDK 入站 context 类实体偏宽松,用于兼容 Host 附加的非核心字段manifest、result payload、page/result 返回与错误模型偏严格,未知字段默认禁止。安全边界仍在 HostSDK 校验只提升开发体验。
## 4. Discovery 协议
@@ -65,11 +66,15 @@ class AgentRunnerDiscovery(BaseModel):
runner_name: str
runner_description: I18nObject | None = None
manifest: AgentRunnerManifest
capabilities: AgentRunnerCapabilities # compatibility alias of manifest.capabilities
permissions: AgentRunnerPermissions # compatibility alias of manifest.permissions
config: list[DynamicFormItemSchema] = []
```
`manifest` 是 SDK typed `AgentRunnerManifest`,由 Runtime 从插件组件 manifest 解析并校验后返回。`plugin_author` / `plugin_name` / `runner_name` 保留为 transport 寻址字段Host 以它们生成稳定 runner id并把 `manifest.id` 校验为 `plugin:author/name/runner`。单个 runner manifest 解析失败时 Runtime/Host 记录 warning 并跳过该 runner不影响同一插件或其它插件的 runner discovery。
`capabilities` / `permissions` 顶层字段是兼容旧 discovery 消费方的冗余别名;新代码必须以 `manifest.capabilities` / `manifest.permissions` 为准。
### 4.2 AgentRunnerManifest
这里的 manifest 指 Runtime 返回给 Host 的 typed runner manifest
@@ -247,8 +252,10 @@ class ConversationContext(BaseModel):
thread_id: str | None = None
launcher_type: str | None = None
launcher_id: str | None = None
sender_id: str | None = None
bot_id: str | None = None
workspace_id: str | None = None
session_id: str | None = None
class ActorContext(BaseModel):
actor_type: str
@@ -335,7 +342,7 @@ class ContextAPICapabilities(BaseModel):
```python
class AgentRuntimeContext(BaseModel):
langbot_version: str | None = None
trace_id: str
trace_id: str | None = None
deadline_at: float | None = None
metadata: dict[str, Any] = {}
```
@@ -395,7 +402,7 @@ ResultType = Literal[
class AgentRunResult(BaseModel):
run_id: str
type: AgentRunResultType
type: AgentRunResultType | str
data: dict[str, Any] = {}
sequence: int | None = None
timestamp: int | None = None
@@ -508,7 +515,7 @@ await api.get_langbot_version()
`steering_pull(mode="all")` 是推荐默认Host 按 claim 顺序返回全部 pending steering 输入并清空对应队列。`mode="one-at-a-time"` 仅用于 runner 主动节流每次返回一条。Host 不合并多条用户消息runner 负责在 turn 边界决定模型侧格式。
Steering 审计使用 EventLog 而不是 Transcript schema 扩展:被 active run 吸收的原始 `message.received` 事件保留原事件类型,并在 `metadata.steering` 标记 `status="queued"``trigger_behavior="absorbed_into_active_run"``claimed_by_run_id``claimed_runner_id``claimed_at`。Runner 成功 pull 后Host 追加 `steering.injected` EventLog 记录,`metadata.steering.status="injected"` 并引用 `source_event_id`。Transcript 继续只表示会话事实,不承担 dispatch 行为标记。
Steering 审计使用 EventLog 而不是 Transcript schema 扩展:被 active run 吸收的原始 `message.received` 事件保留原事件类型,并在 `metadata.steering` 标记 `status="queued"``trigger_behavior="absorbed_into_active_run"``claimed_by_run_id``claimed_runner_id``claimed_at`。Runner 成功 pull 后Host 追加 `steering.injected` EventLog 记录,`metadata.steering.status="injected"` 并引用 `source_event_id`若 run 结束时仍有已 claim 但未 pull 的 steering 输入Host 追加 `steering.dropped` EventLog 记录,`metadata.steering.status="dropped"` 并引用 `source_event_id`;这不是用户消息事实的删除,只是 dispatch 终态。Transcript 继续只表示会话事实,不承担 dispatch 行为标记。
`state``storage` 的建议边界:`state` 放小型 JSONconversation / actor / subject / runner`storage` 放 blob 或较大数据插件私有数据、workspace 数据、checkpoint
@@ -650,6 +657,8 @@ class AgentAPIError(BaseModel):
| `invalid_argument` | 参数错误。 |
| `runtime_error` | Host 或下游能力错误。 |
SDK runner-facing proxy 在 Host 返回结构化错误或畸形响应时抛出 `AgentAPIException`,其中 `error` 字段为 `AgentAPIError`。Legacy transport 只返回字符串错误时SDK 使用 `host.action_error` 包装,避免 runner 继续依赖裸 `KeyError` 或字符串匹配。
Runner 失败使用 `run.failed`
```json

View File

@@ -82,7 +82,7 @@ EventGateway / EventRouter 在本文档中描述为 **external EBA branch integr
| [EVENT_BASED_AGENT.md](./EVENT_BASED_AGENT.md) | EBA 接入边界:事件模型、事件来源、触发绑定、非消息事件如何复用 AgentRunner 调度;完整 EventGateway / EventRouter 由外部 EBA 分支联调。 |
| [RUNTIME_CONTROL_PLANE_V2.md](./RUNTIME_CONTROL_PLANE_V2.md) | Agent Platform v2 / runtime 管控面决策:第一阶段优先把 `AgentRun` / `AgentRunEvent` / run control 做成 Host 事实源;完整 runtime registry / daemon 管控是后续可选阶段。**标注为 future design note**。 |
| [OFFICIAL_RUNNER_PLUGINS.md](./OFFICIAL_RUNNER_PLUGINS.md) | 官方 runner 插件迁移,包括 local-agent 和外部 runner。它是下游落地计划不是 LangBot 基础能力设计的前置约束。 |
| [RUN_STEERING_AND_CHECKPOINT.md](./RUN_STEERING_AND_CHECKPOINT.md) | 运行中消息注入steering / follow-up与压缩摘要持久化compaction checkpoint Host 能力缺口设计:来自 local-agent 对照 Pi agent harness 的差距分析。**标注为 future design note**。 |
| [RUN_STEERING_AND_CHECKPOINT.md](./RUN_STEERING_AND_CHECKPOINT.md) | 运行中消息注入steering / follow-up与压缩摘要持久化compaction checkpoint设计与落地状态记录schema 仍以 PROTOCOL_V1 为准。 |
| [STATUS.md](./STATUS.md) | 当前实现状态、spec 与实现已知差距、runner 验收状态和历史高价值记录。 |
| [AGENT_RUNNER_QA_GUIDE.md](./AGENT_RUNNER_QA_GUIDE.md) | Agent Runner QA 指南:保留最高价值测试路径,指导 agent 开展下一轮 WebUI / runner smoke 验证。 |
| [SECURITY_HARDENING.md](./SECURITY_HARDENING.md) | 安全发布级 hardening 的后续发布门槛路径隔离、权限边界、secret、资源配额、MCP / skill 投影和审计。 |

View File

@@ -52,6 +52,8 @@ pi-agent-core 区分两个队列,注入时机都在 turn 边界,不打断进
- **回执**:被 steering 消费的事件通过 EventLog 审计。原始 `message.received`
记录在 `metadata.steering` 标记 queued/absorbed 与 `claimed_by_run_id`
runner 成功 pull 后Host 追加 `steering.injected` 记录并引用源事件。
run 结束时仍未被 pull 的已 claim 输入Host 追加 `steering.dropped` 记录作为
dispatch 终态;原始 Transcript 事实不删除。
Transcript 继续只表示会话事实,不扩展 dispatch 行为字段。
已落地的协议面(最终定义归 PROTOCOL_V1
@@ -61,6 +63,8 @@ pi-agent-core 区分两个队列,注入时机都在 turn 边界,不打断进
pending 输入;`one-at-a-time` 仅作为 runner 主动节流选项。
3. dispatch 层的"认领"规则:`message.received` 可被同 conversation 的 active run
吸收,原事件写 EventLog / Transcriptdispatch 行为写入 EventLog metadata。
4. Host 对单 run steering queue 设置内存上限,队列满时不再 claim 新消息,消息回到
正常 dispatch 路径,避免 active run 无限吞入同会话输入。
### 1.4 边界
@@ -131,7 +135,7 @@ pi-agent-core 把 compaction 条目持久化进 session tree摘要带
| 项 | 归属 | 依赖 |
| --- | --- | --- |
| steering queue、事件认领、基础审计 | LangBot Hostdispatch / binding 层) | 已落地 |
| steering queue、事件认领、基础审计 | LangBot Hostdispatch / binding 层) | 已落地,含队列上限与未消费 dropped 终态 |
| steering pull API + capability 位 | PROTOCOL_V1 + SDK proxy | 已落地 |
| turn 边界拉取与注入 | langbot-local-agent | 已落地 |
| local-agent 对 state API 的 checkpoint 读写 | langbot-local-agent | 已落地 |

View File

@@ -2,7 +2,7 @@
本文档是 `docs/agent-runner-pluginization/` 的状态事实源。协议 schema 仍以 [PROTOCOL_V1.md](./PROTOCOL_V1.md) 为准;测试步骤以 [AGENT_RUNNER_QA_GUIDE.md](./AGENT_RUNNER_QA_GUIDE.md) 为准;安全发布门槛以 [SECURITY_HARDENING.md](./SECURITY_HARDENING.md) 为准。
状态快照日期2026-06-10
状态快照日期2026-06-12
## 实现状态
@@ -17,12 +17,15 @@
| Official runner manifests | Done | `local-agent`、LiteLLM Agent Platform、外部服务 runner 已重新声明真实生效的 LangBot resource permissions。 |
| Runtime Control Plane v2 | Future | 第一阶段设计为 Host-owned Run Ledgerruntime registry / heartbeat / daemon claim 是后续可选阶段。 |
| Full release security gate | Future | self-host / container opt-in 可继续managed/default external harness 需完成 SECURITY_HARDENING full gate。 |
| Steering control path | Done | claim 异常不再逃逸 consumer loopqueue 有上限;未 pull 的 claimed 输入在 run 结束时写 `steering.dropped` 审计终态。 |
| SDK v1 contract closure | Done | SDK 提供 `AgentAPIError` / `AgentAPIException`、typed `SteeringPullResult`、未知 result type 宽容解析、result `sequence` 注入与取消传播。 |
## Spec 与实现已知差距
- `action.requested` 仍只作为 telemetry / reserved surfaceplatform action executor 不在本分支执行。
- EventGateway / EventRouter 完整实现由外部 EBA 分支联调;本分支只提供 event-first host envelope / binding / run 入口。
- State 与 storage 的长期类型边界仍可继续收窄;当前合同只要求 JSON-safe state 与受控 storage API。
- Artifact 读取路径已检查 `expires_at`EventLog / Transcript / Artifact 已提供显式 cleanup primitive长期 retention 默认值、TTL 调度接入和大 payload 去重仍是运维收尾项,应在 Runtime Control Plane Phase 1 前补齐。
- External harness 的 native shell / filesystem / CLI / MCP 权限不受 manifest permissions 约束manifest permissions 只约束 LangBot 持有的资源访问。
- Managed/cloud/default external harness 的 OS/process/network quota、workspace GC、完整 audit/admin control 仍是发布门槛,不是 Protocol v1 已完成能力。

View File

@@ -212,6 +212,8 @@ class ArtifactStore:
row = result.scalars().first()
if row is None:
return None
if self._is_expired(row):
return None
return self._row_to_public_dict(row)
async def _get_internal_record(
@@ -234,7 +236,10 @@ class ArtifactStore:
AgentArtifact.artifact_id == artifact_id
)
)
return result.scalars().first()
record = result.scalars().first()
if record is not None and self._is_expired(record):
return None
return record
async def read_artifact(
self,
@@ -321,6 +326,51 @@ class ArtifactStore:
'has_more': False,
}
async def cleanup_expired_artifacts(
self,
*,
now: datetime.datetime | None = None,
) -> int:
"""Delete expired artifact metadata and Host-owned binary blobs.
Returns the number of artifact metadata rows removed. External/file
storage references are only dereferenced from LangBot metadata; their
backing lifecycle remains owned by the storage provider.
"""
if now is None:
now = datetime.datetime.utcnow()
async with self._session_factory() as session:
result = await session.execute(
sqlalchemy.select(AgentArtifact).where(
AgentArtifact.expires_at.is_not(None),
AgentArtifact.expires_at <= now,
)
)
expired = result.scalars().all()
if not expired:
return 0
binary_storage_keys = [
artifact.storage_key
for artifact in expired
if artifact.storage_type == 'binary_storage' and artifact.storage_key
]
if binary_storage_keys:
await session.execute(
sqlalchemy.delete(BinaryStorage).where(
BinaryStorage.unique_key.in_(binary_storage_keys)
)
)
await session.execute(
sqlalchemy.delete(AgentArtifact).where(
AgentArtifact.id.in_([artifact.id for artifact in expired])
)
)
await session.commit()
return len(expired)
async def _read_binary_storage(self, key: str) -> bytes | None:
"""Read content from BinaryStorage.
@@ -407,6 +457,17 @@ class ArtifactStore:
metadata.pop(_FILE_ARTIFACT_METADATA_KEY, None)
return metadata
@staticmethod
def _is_expired(
row: AgentArtifact,
now: datetime.datetime | None = None,
) -> bool:
if row.expires_at is None:
return False
if now is None:
now = datetime.datetime.utcnow()
return row.expires_at <= now
def _row_to_public_dict(self, row: AgentArtifact) -> dict[str, typing.Any]:
"""Convert an AgentArtifact row to public dict.

View File

@@ -228,6 +228,18 @@ class EventLogStore:
count = result.scalar()
return count > 0
async def cleanup_events_older_than(
self,
before: datetime.datetime,
) -> int:
"""Delete EventLog rows created before the supplied timestamp."""
async with self._session_factory() as session:
result = await session.execute(
sqlalchemy.delete(EventLog).where(EventLog.created_at < before)
)
await session.commit()
return result.rowcount or 0
def _row_to_dict(self, row: EventLog) -> dict[str, typing.Any]:
"""Convert an EventLog row to dict."""
return {

View File

@@ -134,9 +134,39 @@ class AgentRunOrchestrator:
)
pending_artifact_refs: list[dict[str, typing.Any]] = []
seen_sequences: set[int] = set()
last_sequence = 0
try:
async for result_dict in self.invoker.invoke(descriptor, context):
sequence = result_dict.get('sequence')
if sequence is not None:
try:
sequence_int = int(sequence)
except (TypeError, ValueError):
self.ap.logger.warning(
f'Runner {descriptor.id} returned invalid result sequence: {sequence}'
)
else:
if sequence_int in seen_sequences:
self.ap.logger.warning(
f'Runner {descriptor.id} returned duplicate result sequence '
f'{sequence_int} for run {run_id}; dropping duplicate'
)
continue
if sequence_int <= 0:
self.ap.logger.warning(
f'Runner {descriptor.id} returned non-positive result sequence '
f'{sequence_int} for run {run_id}'
)
elif last_sequence and sequence_int != last_sequence + 1:
self.ap.logger.warning(
f'Runner {descriptor.id} result sequence gap or out-of-order '
f'for run {run_id}: previous={last_sequence}, current={sequence_int}'
)
seen_sequences.add(sequence_int)
last_sequence = max(last_sequence, sequence_int)
result_type = result_dict.get('type')
if result_type and not self.result_normalizer.validate_payload(
result_type,
@@ -180,7 +210,20 @@ class AgentRunOrchestrator:
if result is not None:
yield result
finally:
await self._session_registry.unregister(run_id)
session = await self._session_registry.unregister(run_id)
pending_steering = session.get('steering_queue', []) if session else []
if pending_steering:
try:
await self.journal.write_steering_dropped_audits(
pending_steering,
run_id,
descriptor.id,
)
except Exception as exc:
self.ap.logger.warning(
f'Failed to write dropped steering audit for run {run_id}: {exc}',
exc_info=True,
)
async def run_from_query(
self,

View File

@@ -386,6 +386,76 @@ class AgentRunJournal:
return merged
async def write_steering_dropped_audits(
self,
items: list[dict[str, typing.Any]],
run_id: str,
runner_id: str,
*,
reason: str = 'run_ended',
) -> None:
"""Write terminal audit events for steering items left unconsumed."""
if not items:
return
import datetime
import uuid
from .event_log_store import EventLogStore
store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
for item in items:
event = item.get('event') if isinstance(item.get('event'), dict) else {}
input_data = item.get('input') if isinstance(item.get('input'), dict) else {}
conversation = item.get('conversation') if isinstance(item.get('conversation'), dict) else {}
actor = item.get('actor') if isinstance(item.get('actor'), dict) else {}
subject = item.get('subject') if isinstance(item.get('subject'), dict) else {}
text = input_data.get('text')
input_summary = text[:1000] if isinstance(text, str) and text else 'Unconsumed steering input dropped'
event_time = None
raw_event_time = event.get('event_time')
if raw_event_time:
try:
event_time = datetime.datetime.fromtimestamp(raw_event_time)
except (TypeError, ValueError, OSError):
event_time = None
await store.append_event(
event_id=str(uuid.uuid4()),
event_type='steering.dropped',
source='host',
bot_id=conversation.get('bot_id'),
workspace_id=conversation.get('workspace_id'),
conversation_id=conversation.get('conversation_id'),
thread_id=conversation.get('thread_id'),
actor_type=actor.get('actor_type'),
actor_id=actor.get('actor_id'),
actor_name=actor.get('actor_name'),
subject_type=subject.get('subject_type'),
subject_id=subject.get('subject_id'),
input_summary=input_summary,
input_json={
'text': text,
'contents': input_data.get('contents') or [],
'attachments': input_data.get('attachments') or [],
},
run_id=run_id,
runner_id=runner_id,
event_time=event_time,
metadata={
'steering': {
'status': 'dropped',
'reason': reason,
'original_event_id': event.get('event_id'),
'claimed_run_id': item.get('claimed_run_id'),
'claimed_runner_id': item.get('runner_id'),
'claimed_at': item.get('claimed_at'),
},
},
)
async def write_assistant_transcript(
self,
result_dict: dict[str, typing.Any],

View File

@@ -10,6 +10,9 @@ import threading
from .context_builder import AgentResources
MAX_STEERING_QUEUE_ITEMS = 100
class AgentRunSessionStatus(typing.TypedDict):
"""Status tracking for agent run session."""
started_at: int
@@ -148,15 +151,18 @@ class AgentRunSessionRegistry:
'file': {f.get('file_id') for f in resources.get('files', [])},
}
async def unregister(self, run_id: str) -> None:
async def unregister(self, run_id: str) -> AgentRunSession | None:
"""Unregister an agent run session.
Args:
run_id: Unique run identifier
Returns:
The removed session, if one existed. Callers can inspect any
pending in-memory queues before they are discarded.
"""
async with self._lock:
if run_id in self._sessions:
del self._sessions[run_id]
return self._sessions.pop(run_id, None)
async def get(self, run_id: str) -> AgentRunSession | None:
"""Get session by run_id.
@@ -215,6 +221,8 @@ class AgentRunSessionRegistry:
session = self._sessions.get(run_id)
if session is None:
return False
if len(session['steering_queue']) >= MAX_STEERING_QUEUE_ITEMS:
return False
session['steering_queue'].append(copy.deepcopy(item))
session['status']['last_activity_at'] = int(time.time())
return True

View File

@@ -276,6 +276,18 @@ class TranscriptStore:
count = result.scalar()
return count > 0
async def cleanup_transcripts_older_than(
self,
before: datetime.datetime,
) -> int:
"""Delete Transcript rows created before the supplied timestamp."""
async with self._session_factory() as session:
result = await session.execute(
sqlalchemy.delete(Transcript).where(Transcript.created_at < before)
)
await session.commit()
return result.rowcount or 0
async def _get_next_seq(self, conversation_id: str) -> int:
"""Fallback next sequence number for stores that cannot expose autoincrement IDs."""
async with self._session_factory() as session:

View File

@@ -31,22 +31,29 @@ class Controller:
semaphore; otherwise the active run can finish before the query reaches
ChatMessageHandler.try_claim_steering_from_query.
"""
pipeline_uuid = query.pipeline_uuid
if not pipeline_uuid:
try:
pipeline_uuid = query.pipeline_uuid
if not pipeline_uuid:
return False
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
if not pipeline:
return False
session = await self.ap.sess_mgr.get_session(query)
query.session = session
query.pipeline_config = pipeline.pipeline_entity.config
query.variables['_pipeline_bound_plugins'] = pipeline.bound_plugins
query.variables['_pipeline_bound_mcp_servers'] = pipeline.bound_mcp_servers
return await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query)
except Exception as exc:
self.ap.logger.warning(
f'Failed to claim query {query.query_id} as steering input: {exc}',
exc_info=True,
)
return False
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
if not pipeline:
return False
session = await self.ap.sess_mgr.get_session(query)
query.session = session
query.pipeline_config = pipeline.pipeline_entity.config
query.variables['_pipeline_bound_plugins'] = pipeline.bound_plugins
query.variables['_pipeline_bound_mcp_servers'] = pipeline.bound_mcp_servers
return await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query)
async def consumer(self):
"""事件处理循环"""
try:

View File

@@ -566,6 +566,55 @@ class TestArtifactStoreRealSQLite:
assert result["has_more"] is True
assert result["length"] == 100
@pytest.mark.asyncio
async def test_expired_artifact_is_not_readable_before_cleanup(self, db_engine):
"""Expired artifacts are hidden even before a cleanup job deletes rows."""
store = ArtifactStore(db_engine)
await store.register_artifact(
artifact_id="art_expired_hidden",
artifact_type="file",
source="runner",
content=b"expired",
expires_at=datetime.datetime.utcnow() - datetime.timedelta(seconds=1),
)
assert await store.get_metadata("art_expired_hidden") is None
assert await store.read_artifact("art_expired_hidden") is None
@pytest.mark.asyncio
async def test_cleanup_expired_artifacts_deletes_binary_storage(self, db_engine):
"""Expired artifacts and their Host-owned binary blobs are removed."""
from sqlalchemy import select
from langbot.pkg.entity.persistence.bstorage import BinaryStorage
store = ArtifactStore(db_engine)
now = datetime.datetime.utcnow()
await store.register_artifact(
artifact_id="art_expired",
artifact_type="file",
source="runner",
content=b"expired",
expires_at=now - datetime.timedelta(seconds=1),
)
await store.register_artifact(
artifact_id="art_fresh",
artifact_type="file",
source="runner",
content=b"fresh",
expires_at=now + datetime.timedelta(days=1),
)
removed = await store.cleanup_expired_artifacts(now=now)
assert removed == 1
assert await store.get_metadata("art_expired") is None
assert await store.get_metadata("art_fresh") is not None
async with store._session_factory() as session:
result = await session.execute(
select(BinaryStorage).where(BinaryStorage.unique_key == "artifact:art_expired")
)
assert result.scalars().first() is None
@pytest.mark.asyncio
async def test_file_artifact_range_read_and_public_metadata(self, db_engine, tmp_path):
"""File-backed artifacts read ranges without exposing host paths."""

View File

@@ -1,6 +1,8 @@
"""Tests for EventLog, Transcript, and history/event APIs."""
from __future__ import annotations
import datetime
import pytest
from langbot.pkg.agent.runner.host_models import (
@@ -505,6 +507,45 @@ class TestEventLogStoreRealSQLite:
assert cursor is not None
assert int(cursor) > 0
@pytest.mark.asyncio
async def test_cleanup_events_older_than(self, db_engine):
"""EventLog cleanup removes only rows older than the cutoff."""
import sqlalchemy
from langbot.pkg.entity.persistence.event_log import EventLog
store = EventLogStore(db_engine)
cutoff = datetime.datetime.utcnow()
await store.append_event(
event_id="evt_cleanup_old",
event_type="message.received",
source="platform",
conversation_id="conv_cleanup",
)
await store.append_event(
event_id="evt_cleanup_new",
event_type="message.received",
source="platform",
conversation_id="conv_cleanup",
)
async with store._session_factory() as session:
await session.execute(
sqlalchemy.update(EventLog)
.where(EventLog.event_id == "evt_cleanup_old")
.values(created_at=cutoff - datetime.timedelta(days=2))
)
await session.execute(
sqlalchemy.update(EventLog)
.where(EventLog.event_id == "evt_cleanup_new")
.values(created_at=cutoff + datetime.timedelta(days=2))
)
await session.commit()
removed = await store.cleanup_events_older_than(cutoff)
assert removed == 1
assert await store.get_event("evt_cleanup_old") is None
assert await store.get_event("evt_cleanup_new") is not None
class TestTranscriptStoreRealSQLite:
"""Test TranscriptStore with real SQLite database."""
@@ -637,6 +678,47 @@ class TestTranscriptStoreRealSQLite:
assert cursor is not None
assert int(cursor) > 0
@pytest.mark.asyncio
async def test_cleanup_transcripts_older_than(self, db_engine):
"""Transcript cleanup removes only rows older than the cutoff."""
import sqlalchemy
from langbot.pkg.entity.persistence.transcript import Transcript
store = TranscriptStore(db_engine)
cutoff = datetime.datetime.utcnow()
await store.append_transcript(
transcript_id="trans_cleanup_old",
event_id="evt_cleanup_old",
conversation_id="conv_cleanup",
role="user",
content="old",
)
await store.append_transcript(
transcript_id="trans_cleanup_new",
event_id="evt_cleanup_new",
conversation_id="conv_cleanup",
role="assistant",
content="new",
)
async with store._session_factory() as session:
await session.execute(
sqlalchemy.update(Transcript)
.where(Transcript.transcript_id == "trans_cleanup_old")
.values(created_at=cutoff - datetime.timedelta(days=2))
)
await session.execute(
sqlalchemy.update(Transcript)
.where(Transcript.transcript_id == "trans_cleanup_new")
.values(created_at=cutoff + datetime.timedelta(days=2))
)
await session.commit()
removed = await store.cleanup_transcripts_older_than(cutoff)
items, _, _, _ = await store.page_transcript("conv_cleanup", limit=10)
assert removed == 1
assert [item["content"] for item in items] == ["new"]
# Fixtures
@pytest.fixture

View File

@@ -28,14 +28,17 @@ RUNNER_ID = "plugin:langbot/local-agent/default"
class FakeLogger:
def __init__(self):
self.warnings: list[str] = []
def debug(self, msg):
pass
def info(self, msg):
pass
def warning(self, msg):
pass
def warning(self, msg, *args, **kwargs):
self.warnings.append(str(msg))
def error(self, msg):
pass
@@ -424,6 +427,41 @@ async def test_orchestrator_streams_fake_plugin_deltas(clean_agent_state):
assert [chunk.content for chunk in chunks] == ["hel", "hello"]
@pytest.mark.asyncio
async def test_orchestrator_drops_duplicate_result_sequence(clean_agent_state):
"""Duplicate runner result sequences are idempotently ignored."""
db_engine = clean_agent_state
descriptor = make_descriptor()
plugin_connector = FakePluginConnector(
results=[
{
"type": "message.delta",
"sequence": 1,
"data": {"chunk": {"role": "assistant", "content": "first"}},
},
{
"type": "message.delta",
"sequence": 1,
"data": {"chunk": {"role": "assistant", "content": "duplicate"}},
},
{
"type": "message.delta",
"sequence": 3,
"data": {"chunk": {"role": "assistant", "content": "after-gap"}},
},
{"type": "run.completed", "sequence": 4, "data": {"finish_reason": "stop"}},
]
)
ap = FakeApplication(plugin_connector, db_engine)
orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor))
chunks = [message async for message in orchestrator.run_from_query(make_query())]
assert [chunk.content for chunk in chunks] == ["first", "after-gap"]
assert any("duplicate result sequence 1" in warning for warning in ap.logger.warnings)
assert any("result sequence gap or out-of-order" in warning for warning in ap.logger.warnings)
@pytest.mark.asyncio
async def test_orchestrator_applies_state_updates_and_suppresses_protocol_event(clean_agent_state):
"""Test that state.updated events are applied and not yielded to pipeline."""

View File

@@ -8,6 +8,7 @@ import time
from langbot.pkg.agent.runner.session_registry import (
AgentRunSessionRegistry,
AgentRunSession,
MAX_STEERING_QUEUE_ITEMS,
get_session_registry,
)
@@ -258,6 +259,59 @@ class TestSessionRegistryBasic:
assert [item['event']['event_id'] for item in first] == ['event_1']
assert [item['event']['event_id'] for item in second] == ['event_2']
@pytest.mark.asyncio
async def test_enqueue_steering_rejects_when_queue_is_full(self):
"""A full steering queue does not claim more queries."""
registry = AgentRunSessionRegistry()
await registry.register(
run_id='run_steering_full',
runner_id='plugin:test/my-runner/default',
query_id=1,
plugin_identity='test/my-runner',
resources=make_resources(),
conversation_id='conv_1',
available_apis={'steering_pull': True},
)
for index in range(MAX_STEERING_QUEUE_ITEMS):
assert await registry.enqueue_steering(
'run_steering_full',
{'event': {'event_id': f'event_{index}'}},
)
assert not await registry.enqueue_steering(
'run_steering_full',
{'event': {'event_id': 'overflow'}},
)
items = await registry.pull_steering('run_steering_full', mode='all')
assert len(items) == MAX_STEERING_QUEUE_ITEMS
assert all(item['event']['event_id'] != 'overflow' for item in items)
@pytest.mark.asyncio
async def test_unregister_returns_pending_steering_queue(self):
"""Unregister returns the removed session so callers can audit pending steering."""
registry = AgentRunSessionRegistry()
await registry.register(
run_id='run_steering_unregister',
runner_id='plugin:test/my-runner/default',
query_id=1,
plugin_identity='test/my-runner',
resources=make_resources(),
conversation_id='conv_1',
available_apis={'steering_pull': True},
)
await registry.enqueue_steering(
'run_steering_unregister',
{'event': {'event_id': 'event_pending'}},
)
session = await registry.unregister('run_steering_unregister')
assert session is not None
assert session['steering_queue'][0]['event']['event_id'] == 'event_pending'
assert await registry.get('run_steering_unregister') is None
class TestIsResourceAllowed:
"""Tests for is_resource_allowed validation."""

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from langbot.pkg.agent.runner.errors import RunnerNotFoundError
from langbot.pkg.pipeline.controller import Controller
def make_app():
app = SimpleNamespace()
app.instance_config = SimpleNamespace(data={'concurrency': {'pipeline': 10}})
app.logger = MagicMock()
app.pipeline_mgr = SimpleNamespace()
app.pipeline_mgr.get_pipeline_by_uuid = AsyncMock()
app.sess_mgr = SimpleNamespace()
app.sess_mgr.get_session = AsyncMock(return_value=SimpleNamespace())
app.agent_run_orchestrator = SimpleNamespace()
app.agent_run_orchestrator.try_claim_steering_from_query = AsyncMock()
return app
def make_pipeline():
return SimpleNamespace(
pipeline_entity=SimpleNamespace(config={'ai': {'runner': {'id': 'plugin:test/runner/default'}}}),
bound_plugins=['test/runner'],
bound_mcp_servers=[],
)
@pytest.mark.asyncio
async def test_try_claim_steering_returns_false_when_runner_lookup_fails():
app = make_app()
app.pipeline_mgr.get_pipeline_by_uuid.return_value = make_pipeline()
app.agent_run_orchestrator.try_claim_steering_from_query.side_effect = RunnerNotFoundError(
'plugin:missing/runner/default'
)
controller = Controller(app)
query = SimpleNamespace(query_id=1, pipeline_uuid='pipeline-001', variables={})
claimed = await controller._try_claim_steering_before_session_slot(query)
assert claimed is False
app.logger.warning.assert_called_once()
@pytest.mark.asyncio
async def test_try_claim_steering_sets_pipeline_context_before_claiming():
app = make_app()
pipeline = make_pipeline()
app.pipeline_mgr.get_pipeline_by_uuid.return_value = pipeline
app.agent_run_orchestrator.try_claim_steering_from_query.return_value = True
controller = Controller(app)
query = SimpleNamespace(query_id=2, pipeline_uuid='pipeline-002', variables={})
claimed = await controller._try_claim_steering_before_session_slot(query)
assert claimed is True
assert query.pipeline_config is pipeline.pipeline_entity.config
assert query.variables['_pipeline_bound_plugins'] == ['test/runner']
app.agent_run_orchestrator.try_claim_steering_from_query.assert_awaited_once_with(query)