feat(agent-runner): audit steering injection

This commit is contained in:
huanghuoguoguo
2026-06-12 00:28:03 +08:00
parent 6c186661e6
commit 14c9a3a8c6
7 changed files with 166 additions and 29 deletions

View File

@@ -506,6 +506,10 @@ await api.get_file(file_key)
await api.get_langbot_version()
```
`steering_pull(mode="all")` 是推荐默认Host 按 claim 顺序返回全部 pending steering 输入并清空对应队列。`mode="one-at-a-time"` 仅用于 runner 主动节流每次返回一条。Host 不合并多条用户消息runner 负责在 turn 边界决定模型侧格式。
Steering 审计使用 EventLog 而不是 Transcript schema 扩展:被 active run 吸收的原始 `message.received` 事件保留原事件类型,并在 `metadata.steering` 标记 `status="queued"``trigger_behavior="absorbed_into_active_run"``claimed_by_run_id``claimed_runner_id``claimed_at`。Runner 成功 pull 后Host 追加 `steering.injected` EventLog 记录,`metadata.steering.status="injected"` 并引用 `source_event_id`。Transcript 继续只表示会话事实,不承担 dispatch 行为标记。
`state``storage` 的建议边界:`state` 放小型 JSONconversation / actor / subject / runner`storage` 放 blob 或较大数据插件私有数据、workspace 数据、checkpoint
Compaction checkpoint 的推荐 state 约定:

View File

@@ -1,10 +1,10 @@
# Run Steering 与 Compaction CheckpointFuture Design Note
# Run Steering 与 Compaction CheckpointDesign Note
本文档描述两项尚未落地的 Host 能力缺口**运行中消息注入steering / follow-up**和
本文档记录两项 Host/runner 协作能力**运行中消息注入steering / follow-up**和
**压缩摘要持久化compaction checkpoint**。两者来自官方 local-agent 对照
Pi agent harness`pi-mono/packages/agent`,下称 pi-agent-core的差距分析
local-agent 已移植 Pi 的事件生命周期、并行工具语义、hook 扩展点和压缩预算模型,
这两项无法由 runner 单方面闭环,需要 Host 协议授权配合
这两项需要 Host 协议授权与 runner turn 边界协同才能闭环
> 本文是设计备忘,不是 schema 事实源。涉及的数据结构最终落到
> [PROTOCOL_V1.md](./PROTOCOL_V1.md);上下文边界语义以
@@ -49,15 +49,18 @@ pi-agent-core 区分两个队列,注入时机都在 turn 边界,不打断进
`should_stop_after_turn` 已预留了对应的注入点。
- **能力协商**runner manifest 声明 `steering` capability参照 PROTOCOL_V1 §4.3
未声明的 runner 保持现状(新消息按现有规则另起 run
- **回执**:被 steering 消费的事件需要可审计的归属记录event 被哪个 run_id 认领、
是否最终注入成功),形式可以是新的 result type 或 EventLog 记录,落协议时定。
- **回执**:被 steering 消费的事件通过 EventLog 审计。原始 `message.received`
记录在 `metadata.steering` 标记 queued/absorbed 与 `claimed_by_run_id`
runner 成功 pull 后Host 追加 `steering.injected` 记录并引用源事件。
Transcript 继续只表示会话事实,不扩展 dispatch 行为字段。
需要新增的协议面(最终定义归 PROTOCOL_V1
已落地的协议面(最终定义归 PROTOCOL_V1
1. `ContextAccess.available_apis` 增加 steering pull 能力位。
2. `AgentRunAPIProxy` 增加 steering 拉取 action(含 one-at-a-time / all 语义参数)。
3. dispatch 层的"认领"规则:什么事件类型可被 steering 吸收、超时未拉取如何回退
建议run 结束或 deadline 到期时,未消费的排队事件按普通事件重新触发 run)。
2. `AgentRunAPIProxy` 增加 steering 拉取 action:默认 `mode=all`Host 保序返回全部
pending 输入;`one-at-a-time` 仅作为 runner 主动节流选项。
3. dispatch 层的"认领"规则:`message.received` 可被同 conversation 的 active run
吸收,原事件写 EventLog / Transcriptdispatch 行为写入 EventLog metadata。
### 1.4 边界
@@ -84,16 +87,15 @@ pi-agent-core 把 compaction 条目持久化进 session tree摘要带
### 2.2 现状盘点
协议面基本已备齐,缺的是消费约定和授权
协议面和主消费路径已具备
- State / Storage API 已定义PROTOCOL_V1 §8 "State / Storage"
且 AGENT_CONTEXT_PROTOCOL 已点名 `summary.checkpoint` 是 state 的预期用法。
- `ContextAccess.available_apis.state` 默认 `false`PROTOCOL_V1 §5.8
Host 尚未对 local-agent binding 默认开启。
- local-agent 侧完全未消费:不读不写 checkpoint其 README "Current Boundary"
已声明这是预期的未来工作)。
- Host 会根据 binding state policy 暴露 `ContextAccess.available_apis.state`
- local-agent 会在 state API 可用时读取/写入 `runner.compaction.checkpoint`
缺失、schema 不匹配、conversation 不匹配或游标失败时回退尾部历史拉取。
- LLM 生成摘要**不依赖**本项 Host 能力——runner 用已授权的 `invoke_llm`
即可生成,可以先行实现;本项只解决"存下来、下次复用"。
即可生成checkpoint 只解决"存下来、下次复用"。
### 2.3 设计方向
@@ -129,24 +131,20 @@ pi-agent-core 把 compaction 条目持久化进 session tree摘要带
| 项 | 归属 | 依赖 |
| --- | --- | --- |
| steering queue、事件认领、超时回退 | LangBot Hostdispatch / binding 层) | |
| steering pull API + capability 位 | PROTOCOL_V1 + SDK proxy | 上一项 |
| turn 边界拉取与注入 | langbot-local-agenthooks 已预留) | 上两项 |
| local-agent 对 state API 的 checkpoint 读写 | langbot-local-agent | Host 开启 `available_apis.state` |
| checkpoint key / 内容 / 失效约定 | 本文档 → PROTOCOL_V1 | 无 |
| LLM 压缩摘要生成 | langbot-local-agent | `invoke_llm` 已可用 |
| steering queue、事件认领、基础审计 | LangBot Hostdispatch / binding 层) | 已落地 |
| steering pull API + capability 位 | PROTOCOL_V1 + SDK proxy | 已落地 |
| turn 边界拉取与注入 | langbot-local-agent | 已落地 |
| local-agent 对 state API 的 checkpoint 读写 | langbot-local-agent | 已落地 |
| checkpoint key / 内容 / 失效约定 | PROTOCOL_V1 + local-agent README | 已落地 |
| LLM 压缩摘要生成 | langbot-local-agent | 已落地`invoke_llm`,失败回退确定性摘要 |
| usage / context-window metadata 透传 | LangBot Hostmodel 层) | LiteLLM model-info |
建议顺序checkpoint 先行(协议面现成,改动集中在授权和 runner 消费),
steering 后行(需要新协议面和 dispatch 行为变更)
剩余工作应优先补 usage / context-window metadata。streaming delivery 衔接依赖
`ctx.delivery` 编辑/追加语义,不建议在协议能力缺失时硬编码
## 4. 开放问题
- steering 注入的消息在 Transcript 中如何与普通消息区分(审计需要区分
"作为新 run 触发"与"被在途 run 吸收")。
- 多条排队消息的合并语义由谁定Host 全量递给 runner还是支持
one-at-a-time 协商;建议 Host 全量递、runner 自行决定消费节奏。
- streaming delivery 下 steering 注入后,前序 turn 已流出的内容与新 turn
输出在 IM 消息编辑面的衔接(涉及 `ctx.delivery` 能力,待 delivery 演进定)。
- checkpoint 是否需要 Host 侧主动失效通知(如会话清空时删除对应 state key
还是仅靠 runner 读取时校验 `covers_until`
- checkpoint 是否需要 Host 侧主动失效通知(如会话清空时删除对应 state key
当前实现靠 runner 读取时校验并回退,功能不阻塞

View File

@@ -235,6 +235,15 @@ class AgentRunOrchestrator:
binding=binding,
run_id=target_run_id,
runner_id=descriptor.id,
metadata={
'steering': {
'status': 'queued',
'trigger_behavior': 'absorbed_into_active_run',
'claimed_by_run_id': target_run_id,
'claimed_runner_id': descriptor.id,
'claimed_at': steering_item.get('claimed_at'),
},
},
)
await self.journal.register_input_artifacts(
event=event,

View File

@@ -82,6 +82,7 @@ class AgentRunJournal:
binding: AgentBinding,
run_id: str,
runner_id: str,
metadata: dict[str, typing.Any] | None = None,
) -> str:
"""Write incoming event to EventLog."""
import datetime
@@ -119,6 +120,7 @@ class AgentRunJournal:
run_id=run_id,
runner_id=runner_id,
event_time=datetime.datetime.fromtimestamp(event.event_time) if event.event_time else None,
metadata=metadata,
)
async def register_input_artifacts(

View File

@@ -1755,6 +1755,50 @@ class RuntimeConnectionHandler(handler.Handler):
mode=str(mode or 'all'),
limit=limit,
)
if items:
try:
from ..agent.runner.event_log_store import EventLogStore
store = EventLogStore(self.ap.persistence_mgr.get_db_engine())
for item in items:
event = item.get('event') if isinstance(item, dict) else None
conversation = item.get('conversation') if isinstance(item, dict) else None
actor = item.get('actor') if isinstance(item, dict) else None
subject = item.get('subject') if isinstance(item, dict) else None
if not isinstance(event, dict):
continue
await store.append_event(
event_id=None,
event_type='steering.injected',
source='agent_runner',
bot_id=conversation.get('bot_id') if isinstance(conversation, dict) else None,
workspace_id=conversation.get('workspace_id') if isinstance(conversation, dict) else None,
conversation_id=conversation.get('conversation_id') if isinstance(conversation, dict) else None,
thread_id=conversation.get('thread_id') if isinstance(conversation, dict) else None,
actor_type=actor.get('actor_type') if isinstance(actor, dict) else None,
actor_id=actor.get('actor_id') if isinstance(actor, dict) else None,
actor_name=actor.get('actor_name') if isinstance(actor, dict) else None,
subject_type=subject.get('subject_type') if isinstance(subject, dict) else None,
subject_id=subject.get('subject_id') if isinstance(subject, dict) else None,
input_summary=f"steering injected from {event.get('event_id')}",
run_id=run_id,
runner_id=session.get('runner_id') if isinstance(session, dict) else None,
metadata={
'steering': {
'status': 'injected',
'source_event_id': event.get('event_id'),
'claimed_by_run_id': item.get('claimed_run_id') if isinstance(item, dict) else run_id,
'claimed_runner_id': item.get('runner_id') if isinstance(item, dict) else None,
'claimed_at': item.get('claimed_at') if isinstance(item, dict) else None,
'pull_mode': str(mode or 'all'),
},
},
)
except Exception as exc:
self.ap.logger.warning(
f'Failed to write steering injection audit for run {run_id}: {exc}',
exc_info=True,
)
return handler.ActionResponse.success(data={'items': items})
# ================= Artifact APIs =================

View File

@@ -95,6 +95,41 @@ class TestEventLogStore:
)
assert event_id == "evt_1"
stored_event = mock_session.add.call_args.args[0]
assert stored_event.metadata_json is None
@pytest.mark.asyncio
async def test_append_event_stores_metadata_json(self, mock_db_engine):
"""EventLog metadata records steering dispatch/audit facts."""
from unittest.mock import AsyncMock, MagicMock, patch
store = EventLogStore(mock_db_engine)
mock_session = AsyncMock()
mock_session.add = MagicMock()
mock_session.commit = AsyncMock()
with patch.object(store, '_session_factory') as mock_factory:
mock_factory.return_value.__aenter__.return_value = mock_session
event_id = await store.append_event(
event_id="evt_steering",
event_type="message.received",
source="platform",
run_id="run_1",
runner_id="plugin:test/plugin/runner",
metadata={
"steering": {
"status": "queued",
"claimed_by_run_id": "run_1",
}
},
)
assert event_id == "evt_steering"
stored_event = mock_session.add.call_args.args[0]
assert '"status": "queued"' in stored_event.metadata_json
assert '"claimed_by_run_id": "run_1"' in stored_event.metadata_json
@pytest.mark.asyncio
async def test_append_event_truncates_input_summary(self, mock_db_engine):

View File

@@ -213,6 +213,51 @@ class TestSessionRegistryBasic:
assert await registry.get('old_run') is None
assert await registry.get('new_run') is not None
@pytest.mark.asyncio
async def test_pull_steering_all_preserves_queue_order(self):
"""Default all-mode steering returns every queued item in FIFO order."""
registry = AgentRunSessionRegistry()
await registry.register(
run_id='run_steering',
runner_id='plugin:test/my-runner/default',
query_id=1,
plugin_identity='test/my-runner',
resources=make_resources(),
conversation_id='conv_1',
available_apis={'steering_pull': True},
)
await registry.enqueue_steering('run_steering', {'event': {'event_id': 'event_1'}, 'input': {'text': 'first'}})
await registry.enqueue_steering('run_steering', {'event': {'event_id': 'event_2'}, 'input': {'text': 'second'}})
await registry.enqueue_steering('run_steering', {'event': {'event_id': 'event_3'}, 'input': {'text': 'third'}})
items = await registry.pull_steering('run_steering', mode='all')
assert [item['event']['event_id'] for item in items] == ['event_1', 'event_2', 'event_3']
assert await registry.pull_steering('run_steering', mode='all') == []
@pytest.mark.asyncio
async def test_pull_steering_one_at_a_time_leaves_remaining_items(self):
"""one-at-a-time is an explicit runner-side throttling mode."""
registry = AgentRunSessionRegistry()
await registry.register(
run_id='run_steering_one',
runner_id='plugin:test/my-runner/default',
query_id=1,
plugin_identity='test/my-runner',
resources=make_resources(),
conversation_id='conv_1',
available_apis={'steering_pull': True},
)
await registry.enqueue_steering('run_steering_one', {'event': {'event_id': 'event_1'}})
await registry.enqueue_steering('run_steering_one', {'event': {'event_id': 'event_2'}})
first = await registry.pull_steering('run_steering_one', mode='one-at-a-time')
second = await registry.pull_steering('run_steering_one', mode='one-at-a-time')
assert [item['event']['event_id'] for item in first] == ['event_1']
assert [item['event']['event_id'] for item in second] == ['event_2']
class TestIsResourceAllowed:
"""Tests for is_resource_allowed validation."""