From b0a23150294c02f0b40e561286bd0b91cae54238 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <60681390+huanghuoguoguo@users.noreply.github.com> Date: Fri, 12 Jun 2026 00:28:03 +0800 Subject: [PATCH] feat(agent-runner): audit steering injection --- .../agent-runner-pluginization/PROTOCOL_V1.md | 4 ++ .../RUN_STEERING_AND_CHECKPOINT.md | 56 +++++++++---------- src/langbot/pkg/agent/runner/orchestrator.py | 9 +++ src/langbot/pkg/agent/runner/run_journal.py | 2 + src/langbot/pkg/plugin/handler.py | 44 +++++++++++++++ .../agent/test_event_log_transcript.py | 35 ++++++++++++ .../unit_tests/agent/test_session_registry.py | 45 +++++++++++++++ 7 files changed, 166 insertions(+), 29 deletions(-) diff --git a/docs/agent-runner-pluginization/PROTOCOL_V1.md b/docs/agent-runner-pluginization/PROTOCOL_V1.md index 7789a586..d657c95d 100644 --- a/docs/agent-runner-pluginization/PROTOCOL_V1.md +++ b/docs/agent-runner-pluginization/PROTOCOL_V1.md @@ -506,6 +506,10 @@ await api.get_file(file_key) await api.get_langbot_version() ``` +`steering_pull(mode="all")` 是推荐默认:Host 按 claim 顺序返回全部 pending steering 输入并清空对应队列。`mode="one-at-a-time"` 仅用于 runner 主动节流,每次返回一条。Host 不合并多条用户消息;runner 负责在 turn 边界决定模型侧格式。 + +Steering 审计使用 EventLog 而不是 Transcript schema 扩展:被 active run 吸收的原始 `message.received` 事件保留原事件类型,并在 `metadata.steering` 标记 `status="queued"`、`trigger_behavior="absorbed_into_active_run"`、`claimed_by_run_id`、`claimed_runner_id`、`claimed_at`。Runner 成功 pull 后,Host 追加 `steering.injected` EventLog 记录,`metadata.steering.status="injected"` 并引用 `source_event_id`。Transcript 继续只表示会话事实,不承担 dispatch 行为标记。 + `state` 与 `storage` 的建议边界:`state` 放小型 JSON(conversation / actor / subject / runner),`storage` 放 blob 或较大数据(插件私有数据、workspace 数据、checkpoint)。 Compaction checkpoint 的推荐 state 约定: diff --git a/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md b/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md index c593f49e..322c489d 100644 --- a/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md +++ b/docs/agent-runner-pluginization/RUN_STEERING_AND_CHECKPOINT.md @@ -1,10 +1,10 @@ -# Run Steering 与 Compaction Checkpoint(Future Design Note) +# Run Steering 与 Compaction Checkpoint(Design Note) -本文档描述两项尚未落地的 Host 能力缺口:**运行中消息注入(steering / follow-up)**和 +本文档记录两项 Host/runner 协作能力:**运行中消息注入(steering / follow-up)**和 **压缩摘要持久化(compaction checkpoint)**。两者来自官方 local-agent 对照 Pi agent harness(`pi-mono/packages/agent`,下称 pi-agent-core)的差距分析: local-agent 已移植 Pi 的事件生命周期、并行工具语义、hook 扩展点和压缩预算模型, -但这两项无法由 runner 单方面闭环,需要 Host 协议或授权配合。 +这两项需要 Host 协议、授权与 runner turn 边界协同才能闭环。 > 本文是设计备忘,不是 schema 事实源。涉及的数据结构最终落到 > [PROTOCOL_V1.md](./PROTOCOL_V1.md);上下文边界语义以 @@ -49,15 +49,18 @@ pi-agent-core 区分两个队列,注入时机都在 turn 边界,不打断进 `should_stop_after_turn` 已预留了对应的注入点。 - **能力协商**:runner manifest 声明 `steering` capability(参照 PROTOCOL_V1 §4.3); 未声明的 runner 保持现状(新消息按现有规则另起 run)。 -- **回执**:被 steering 消费的事件需要可审计的归属记录(event 被哪个 run_id 认领、 - 是否最终注入成功),形式可以是新的 result type 或 EventLog 记录,落协议时定。 +- **回执**:被 steering 消费的事件通过 EventLog 审计。原始 `message.received` + 记录在 `metadata.steering` 标记 queued/absorbed 与 `claimed_by_run_id`; + runner 成功 pull 后,Host 追加 `steering.injected` 记录并引用源事件。 + Transcript 继续只表示会话事实,不扩展 dispatch 行为字段。 -需要新增的协议面(最终定义归 PROTOCOL_V1): +已落地的协议面(最终定义归 PROTOCOL_V1): 1. `ContextAccess.available_apis` 增加 steering pull 能力位。 -2. `AgentRunAPIProxy` 增加 steering 拉取 action(含 one-at-a-time / all 语义参数)。 -3. dispatch 层的"认领"规则:什么事件类型可被 steering 吸收、超时未拉取如何回退 - (建议:run 结束或 deadline 到期时,未消费的排队事件按普通事件重新触发 run)。 +2. `AgentRunAPIProxy` 增加 steering 拉取 action:默认 `mode=all`,Host 保序返回全部 + pending 输入;`one-at-a-time` 仅作为 runner 主动节流选项。 +3. dispatch 层的"认领"规则:`message.received` 可被同 conversation 的 active run + 吸收,原事件写 EventLog / Transcript,dispatch 行为写入 EventLog metadata。 ### 1.4 边界 @@ -84,16 +87,15 @@ pi-agent-core 把 compaction 条目持久化进 session tree:摘要带 ### 2.2 现状盘点 -协议面基本已备齐,缺的是消费约定和授权: +协议面和主消费路径已具备: - State / Storage API 已定义(PROTOCOL_V1 §8 "State / Storage"), 且 AGENT_CONTEXT_PROTOCOL 已点名 `summary.checkpoint` 是 state 的预期用法。 -- `ContextAccess.available_apis.state` 默认 `false`(PROTOCOL_V1 §5.8); - Host 尚未对 local-agent binding 默认开启。 -- local-agent 侧完全未消费:不读不写 checkpoint(其 README "Current Boundary" - 已声明这是预期的未来工作)。 +- Host 会根据 binding state policy 暴露 `ContextAccess.available_apis.state`。 +- local-agent 会在 state API 可用时读取/写入 `runner.compaction.checkpoint`; + 缺失、schema 不匹配、conversation 不匹配或游标失败时回退尾部历史拉取。 - LLM 生成摘要**不依赖**本项 Host 能力——runner 用已授权的 `invoke_llm` - 即可生成,可以先行实现;本项只解决"存下来、下次复用"。 + 即可生成;checkpoint 只解决"存下来、下次复用"。 ### 2.3 设计方向 @@ -129,24 +131,20 @@ pi-agent-core 把 compaction 条目持久化进 session tree:摘要带 | 项 | 归属 | 依赖 | | --- | --- | --- | -| steering queue、事件认领、超时回退 | LangBot Host(dispatch / binding 层) | 无 | -| steering pull API + capability 位 | PROTOCOL_V1 + SDK proxy | 上一项 | -| turn 边界拉取与注入 | langbot-local-agent(hooks 已预留) | 上两项 | -| local-agent 对 state API 的 checkpoint 读写 | langbot-local-agent | Host 开启 `available_apis.state` | -| checkpoint key / 内容 / 失效约定 | 本文档 → PROTOCOL_V1 | 无 | -| LLM 压缩摘要生成 | langbot-local-agent | 无(`invoke_llm` 已可用) | +| steering queue、事件认领、基础审计 | LangBot Host(dispatch / binding 层) | 已落地 | +| steering pull API + capability 位 | PROTOCOL_V1 + SDK proxy | 已落地 | +| turn 边界拉取与注入 | langbot-local-agent | 已落地 | +| local-agent 对 state API 的 checkpoint 读写 | langbot-local-agent | 已落地 | +| checkpoint key / 内容 / 失效约定 | PROTOCOL_V1 + local-agent README | 已落地 | +| LLM 压缩摘要生成 | langbot-local-agent | 已落地(`invoke_llm`,失败回退确定性摘要) | | usage / context-window metadata 透传 | LangBot Host(model 层) | LiteLLM model-info | -建议顺序:checkpoint 先行(协议面现成,改动集中在授权和 runner 消费), -steering 后行(需要新协议面和 dispatch 行为变更)。 +剩余工作应优先补 usage / context-window metadata。streaming delivery 衔接依赖 +`ctx.delivery` 编辑/追加语义,不建议在协议能力缺失时硬编码。 ## 4. 开放问题 -- steering 注入的消息在 Transcript 中如何与普通消息区分(审计需要区分 - "作为新 run 触发"与"被在途 run 吸收")。 -- 多条排队消息的合并语义由谁定:Host 全量递给 runner,还是支持 - one-at-a-time 协商;建议 Host 全量递、runner 自行决定消费节奏。 - streaming delivery 下 steering 注入后,前序 turn 已流出的内容与新 turn 输出在 IM 消息编辑面的衔接(涉及 `ctx.delivery` 能力,待 delivery 演进定)。 -- checkpoint 是否需要 Host 侧主动失效通知(如会话清空时删除对应 state key), - 还是仅靠 runner 读取时校验 `covers_until`。 +- checkpoint 是否需要 Host 侧主动失效通知(如会话清空时删除对应 state key)。 + 当前实现靠 runner 读取时校验并回退,功能不阻塞。 diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 2646be4e..27b05d11 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -235,6 +235,15 @@ class AgentRunOrchestrator: binding=binding, run_id=target_run_id, runner_id=descriptor.id, + metadata={ + 'steering': { + 'status': 'queued', + 'trigger_behavior': 'absorbed_into_active_run', + 'claimed_by_run_id': target_run_id, + 'claimed_runner_id': descriptor.id, + 'claimed_at': steering_item.get('claimed_at'), + }, + }, ) await self.journal.register_input_artifacts( event=event, diff --git a/src/langbot/pkg/agent/runner/run_journal.py b/src/langbot/pkg/agent/runner/run_journal.py index de5ff118..61b83ee0 100644 --- a/src/langbot/pkg/agent/runner/run_journal.py +++ b/src/langbot/pkg/agent/runner/run_journal.py @@ -82,6 +82,7 @@ class AgentRunJournal: binding: AgentBinding, run_id: str, runner_id: str, + metadata: dict[str, typing.Any] | None = None, ) -> str: """Write incoming event to EventLog.""" import datetime @@ -119,6 +120,7 @@ class AgentRunJournal: run_id=run_id, runner_id=runner_id, event_time=datetime.datetime.fromtimestamp(event.event_time) if event.event_time else None, + metadata=metadata, ) async def register_input_artifacts( diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index a39716a4..5b45bf79 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -1755,6 +1755,50 @@ class RuntimeConnectionHandler(handler.Handler): mode=str(mode or 'all'), limit=limit, ) + if items: + try: + from ..agent.runner.event_log_store import EventLogStore + + store = EventLogStore(self.ap.persistence_mgr.get_db_engine()) + for item in items: + event = item.get('event') if isinstance(item, dict) else None + conversation = item.get('conversation') if isinstance(item, dict) else None + actor = item.get('actor') if isinstance(item, dict) else None + subject = item.get('subject') if isinstance(item, dict) else None + if not isinstance(event, dict): + continue + await store.append_event( + event_id=None, + event_type='steering.injected', + source='agent_runner', + bot_id=conversation.get('bot_id') if isinstance(conversation, dict) else None, + workspace_id=conversation.get('workspace_id') if isinstance(conversation, dict) else None, + conversation_id=conversation.get('conversation_id') if isinstance(conversation, dict) else None, + thread_id=conversation.get('thread_id') if isinstance(conversation, dict) else None, + actor_type=actor.get('actor_type') if isinstance(actor, dict) else None, + actor_id=actor.get('actor_id') if isinstance(actor, dict) else None, + actor_name=actor.get('actor_name') if isinstance(actor, dict) else None, + subject_type=subject.get('subject_type') if isinstance(subject, dict) else None, + subject_id=subject.get('subject_id') if isinstance(subject, dict) else None, + input_summary=f"steering injected from {event.get('event_id')}", + run_id=run_id, + runner_id=session.get('runner_id') if isinstance(session, dict) else None, + metadata={ + 'steering': { + 'status': 'injected', + 'source_event_id': event.get('event_id'), + 'claimed_by_run_id': item.get('claimed_run_id') if isinstance(item, dict) else run_id, + 'claimed_runner_id': item.get('runner_id') if isinstance(item, dict) else None, + 'claimed_at': item.get('claimed_at') if isinstance(item, dict) else None, + 'pull_mode': str(mode or 'all'), + }, + }, + ) + except Exception as exc: + self.ap.logger.warning( + f'Failed to write steering injection audit for run {run_id}: {exc}', + exc_info=True, + ) return handler.ActionResponse.success(data={'items': items}) # ================= Artifact APIs ================= diff --git a/tests/unit_tests/agent/test_event_log_transcript.py b/tests/unit_tests/agent/test_event_log_transcript.py index fa24e4b6..7fe1f0bc 100644 --- a/tests/unit_tests/agent/test_event_log_transcript.py +++ b/tests/unit_tests/agent/test_event_log_transcript.py @@ -95,6 +95,41 @@ class TestEventLogStore: ) assert event_id == "evt_1" + stored_event = mock_session.add.call_args.args[0] + assert stored_event.metadata_json is None + + @pytest.mark.asyncio + async def test_append_event_stores_metadata_json(self, mock_db_engine): + """EventLog metadata records steering dispatch/audit facts.""" + from unittest.mock import AsyncMock, MagicMock, patch + + store = EventLogStore(mock_db_engine) + + mock_session = AsyncMock() + mock_session.add = MagicMock() + mock_session.commit = AsyncMock() + + with patch.object(store, '_session_factory') as mock_factory: + mock_factory.return_value.__aenter__.return_value = mock_session + + event_id = await store.append_event( + event_id="evt_steering", + event_type="message.received", + source="platform", + run_id="run_1", + runner_id="plugin:test/plugin/runner", + metadata={ + "steering": { + "status": "queued", + "claimed_by_run_id": "run_1", + } + }, + ) + + assert event_id == "evt_steering" + stored_event = mock_session.add.call_args.args[0] + assert '"status": "queued"' in stored_event.metadata_json + assert '"claimed_by_run_id": "run_1"' in stored_event.metadata_json @pytest.mark.asyncio async def test_append_event_truncates_input_summary(self, mock_db_engine): diff --git a/tests/unit_tests/agent/test_session_registry.py b/tests/unit_tests/agent/test_session_registry.py index d6ff4f08..a926e281 100644 --- a/tests/unit_tests/agent/test_session_registry.py +++ b/tests/unit_tests/agent/test_session_registry.py @@ -213,6 +213,51 @@ class TestSessionRegistryBasic: assert await registry.get('old_run') is None assert await registry.get('new_run') is not None + @pytest.mark.asyncio + async def test_pull_steering_all_preserves_queue_order(self): + """Default all-mode steering returns every queued item in FIFO order.""" + registry = AgentRunSessionRegistry() + await registry.register( + run_id='run_steering', + runner_id='plugin:test/my-runner/default', + query_id=1, + plugin_identity='test/my-runner', + resources=make_resources(), + conversation_id='conv_1', + available_apis={'steering_pull': True}, + ) + + await registry.enqueue_steering('run_steering', {'event': {'event_id': 'event_1'}, 'input': {'text': 'first'}}) + await registry.enqueue_steering('run_steering', {'event': {'event_id': 'event_2'}, 'input': {'text': 'second'}}) + await registry.enqueue_steering('run_steering', {'event': {'event_id': 'event_3'}, 'input': {'text': 'third'}}) + + items = await registry.pull_steering('run_steering', mode='all') + assert [item['event']['event_id'] for item in items] == ['event_1', 'event_2', 'event_3'] + assert await registry.pull_steering('run_steering', mode='all') == [] + + @pytest.mark.asyncio + async def test_pull_steering_one_at_a_time_leaves_remaining_items(self): + """one-at-a-time is an explicit runner-side throttling mode.""" + registry = AgentRunSessionRegistry() + await registry.register( + run_id='run_steering_one', + runner_id='plugin:test/my-runner/default', + query_id=1, + plugin_identity='test/my-runner', + resources=make_resources(), + conversation_id='conv_1', + available_apis={'steering_pull': True}, + ) + + await registry.enqueue_steering('run_steering_one', {'event': {'event_id': 'event_1'}}) + await registry.enqueue_steering('run_steering_one', {'event': {'event_id': 'event_2'}}) + + first = await registry.pull_steering('run_steering_one', mode='one-at-a-time') + second = await registry.pull_steering('run_steering_one', mode='one-at-a-time') + + assert [item['event']['event_id'] for item in first] == ['event_1'] + assert [item['event']['event_id'] for item in second] == ['event_2'] + class TestIsResourceAllowed: """Tests for is_resource_allowed validation."""