diff --git a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md index 6da645ab..df62d60b 100644 --- a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md +++ b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md @@ -70,6 +70,7 @@ SDK Runtime RUN_AGENT -> plugin AgentRunner.run() - `ChatMessageHandler` 不解析 `plugin:*`,不实例化 wrapper,不知道 runner 组件细节。 - `PipelineService.get_pipeline_metadata()` 不直接访问插件 runtime,而是读取 registry。 - 旧 `RequestRunner` 只作为迁移参考,不作为最终运行路径。 +- `AgentRunOrchestrator` 是 LangBot 侧运行编排层:负责 runner 绑定解析、资源授权、context envelope provisioning、run scope 注册、插件调用和结果归一化;不负责决定 Agent 的最终 prompt/window/压缩策略。 - 插件是无状态执行单元:多个 Pipeline 可以绑定同一个 runner id,并分别保存自己的 `ai.runner_config[id]`;运行时 LangBot 只把当前绑定配置放入 `ctx.config` 转发给同一个插件 runner。 - 禁止按 Pipeline 或 runner config 创建多个插件实例。需要跨请求持久化的状态必须走明确授权的 plugin storage / workspace storage / 外部服务,不能隐式保存在 per-pipeline 插件对象里。 - EBA 只做字段预留,不在本轮实现 EventBus、EventRouter、平台动作执行。 @@ -151,7 +152,7 @@ class AgentRunnerDescriptor(BaseModel): ### 3.4 context_builder.py -把当前 Pipeline query 直接转换成 SDK v1 `AgentRunContext`。 +把当前 Pipeline query 转换成 SDK v1 `AgentRunContext` envelope。这里做协议字段组装、Host-owned 状态快照、授权资源挂载和默认工作窗口 provisioning,不承担 Agent 的最终 prompt 组装或长期记忆/压缩策略。 当前消息 Pipeline 的最小字段: @@ -162,7 +163,8 @@ class AgentRunnerDescriptor(BaseModel): - `actor`: sender - `subject`: 当前消息或 launcher - `prompt`: 宿主已处理的有效 prompt,即 `query.prompt.messages` -- `messages`: `query.messages` +- `messages`: `query.messages` 进入 AgentRunner context packaging 后的历史窗口。插件化 AgentRunner 路径不再由 Pipeline `msgtrun` 截断 +- `runtime.metadata.context_packaging`: Host 本次实际下发的历史窗口元数据,例如来源、策略、下发消息数、完整性;未来可扩展 cursor 和 host-side history API - `input`: 从 `query.user_message` 和 `query.message_chain` 构造 - `params`: 过滤后的公开业务变量 - `resources`: 由 `resource_builder` 注入 @@ -184,6 +186,99 @@ query.prompt.messages + query.messages + [query.user_message] ctx.prompt + ctx.messages + [current_user_message_from_ctx.input] ``` +现阶段不要优化裁剪算法,也不要把新的压缩或 token-budget 裁剪塞回 Pipeline stage。 +插件化 AgentRunner 路径应跳过 Pipeline `msgtrun` 的破坏性截断,然后由 +`AgentContextPackager` 在 AgentRunner 边界执行同一套 legacy max-round user-round 规则。 +当前 SDK v1 还没有顶层 context packaging 字段,LangBot 先把本次 packaging +元数据放在 `ctx.runtime.metadata.context_packaging`。这是实际下发结果说明,不是 LangBot 侧的长期策略控制面。 +后续 LiteLLM 接入后再把真实 context window、token 预算和摘要策略接到这个边界上。 + +### 3.4.1 Agentic context plan + +本轮只落地 `AgentContextPackager` 的 `legacy_max_round` working window,不改变旧裁剪算法。 +下面的 `ConversationStore` / `EventLog`、`ContextCompressor` 和 host history API 仍是设计预留。 +目标是让 Pipeline 逐步退化为 legacy 入口,让 AgentRunner 层拥有上下文打包职责。 + +建议最终拆成四个 host-side 服务: + +```text +ConversationStore / EventLog + -> durable append-only raw messages, events, tool results, artifact refs +ConversationProjection + -> converts events into agent-readable conversation history +AgentContextPackager + -> builds the bounded working context for one run +ContextCompressor + -> creates and updates summaries/checkpoints when thresholds are exceeded +``` + +关键原则: + +- 完整历史属于 LangBot host,不属于插件实例。插件仍是 singleton/stateless。 +- `ctx.messages` 是 working context window,不是完整 conversation dump。 +- 每轮不能全量复制/序列化完整历史给插件 runtime;否则长会话会产生 O(n) 成本和跨进程 payload 膨胀。 +- `max-round` 的旧 user-round 规则可以先搬到 `AgentContextPackager`,作为 `legacy_max_round` 策略。 +- LiteLLM 接入后,`AgentContextPackager` 再读取模型 context window,升级为 token budget 策略。 +- `ContextCompressor` 生成的是派生 summary/checkpoint,不能覆盖或删除 raw history。 +- 重启恢复依赖持久化 store 和 summary checkpoint,不依赖 `SessionManager` 里的进程内 conversation list。 + +后续 `AgentRunContext` 可增加: + +```python +context_request: AgentContextRequest | None +context_packaging: ContextPackagingMetadata +``` + +建议语义: + +- `context_request.mode`: AgentRunner manifest / binding config 请求的 `legacy_max_round`、`token_budget`、`summary_hybrid`、`external_session` +- `context_request.budget`: 模型窗口、预留输出 token、工具/RAG 预算等偏好 +- `context_packaging.policy`: Host 本次实际采用的打包策略 +- `context_packaging.delivered_count`: 本次下发的历史消息数 +- `context_packaging.source_total_count`: packager 可见的原始历史消息数 +- `context_packaging.messages_complete`: 本窗口是否已经包含完整历史 +- `context_packaging.cursor_before`: 未来通过 host API 读取更早历史的 cursor + +未来需要的受限 API: + +```python +api.get_conversation_messages(cursor: str | None, limit: int) -> HistoryPage +api.get_context_summary(scope: str = "conversation") -> ContextSummary | None +api.request_context_compaction(policy: dict) -> CompactionResult +``` + +这些 API 必须绑定 `run_id`、runner id、actor/subject scope 和资源权限;Host 需要限制 +page size、总字节数、deadline 和可访问 conversation。 + +### 3.4.2 Large artifacts and tool collaboration + +大文件、多模态输入和工具产物不要内联进 `ctx.messages` 或 tool result。后续统一用 +artifact/resource ref 协作: + +- message/content 里只放小文本和必要摘要。 +- 大文件、图片、音频、长工具输出返回 `artifact_id`、`mime_type`、`size`、`digest`、 + `summary`、`expires_at`、`permissions`。 +- `/tmp` 只能作为单次 run 的临时 staging,用于插件或工具短时间读写;它不是 durable store, + 也不能作为重启恢复依据。 +- box/object storage 是长期 artifact 的目标位置。当前分支尚未合并 box 能力,因此本轮只写文档预留,不实现 API。 +- 工具之间传递大结果时应传 artifact ref,不传完整 blob。Agent 需要读取时走受限 proxy。 + +未来建议 API: + +```python +api.get_artifact_metadata(artifact_id: str) -> ArtifactMetadata +api.open_artifact_stream(artifact_id: str) -> AsyncIterator[bytes] +api.read_artifact_range(artifact_id: str, offset: int, length: int) -> bytes +api.create_temp_artifact(name: str, content_type: str, ttl_seconds: int) -> ArtifactWriter +``` + +安全约束: + +- Host 校验 artifact 是否属于当前 run、conversation、actor/subject scope 或授权资源。 +- 默认不允许插件直接读任意本地路径,包括 `/tmp` 任意路径。 +- 临时文件应有 TTL 和清理机制;box artifact 应有 retention policy。 +- 多模态文件进入模型前,由 runner/context packager 决定传引用、摘要、缩略图还是实际 bytes。 + ### 3.5 resource_builder.py 执行前做三层裁剪: @@ -219,6 +314,20 @@ ctx.prompt + ctx.messages + [current_user_message_from_ctx.input] EventRouter -> AgentRunOrchestrator.run_from_event(event_request) ``` +EBA 落地后,`ConversationStore` 不应只保存聊天消息,而应从 `EventLog` 投影生成: + +```text +Platform Adapter + -> EventLog append raw event + -> ConversationProjection update message/history view when applicable + -> EventRouter resolve binding + -> AgentRunOrchestrator.run_from_event(event_request) + -> AgentContextPackager build working context from projection + state + artifacts +``` + +这样消息事件、工具事件、群成员事件、好友申请事件可以共用同一套 run/session/state/resource +边界;非消息事件也不需要伪造成一条用户文本消息。 + `event_request` 至少需要包含: - `event_type`: 稳定协议名,例如 `message.recalled`、`group.member_joined`、`friend.request_received` diff --git a/docs/agent-runner-pluginization/README.md b/docs/agent-runner-pluginization/README.md index e8678fa9..1286c4a3 100644 --- a/docs/agent-runner-pluginization/README.md +++ b/docs/agent-runner-pluginization/README.md @@ -64,7 +64,7 @@ AgentRunnerRegistry | discovers built-in runners and plugin runners v AgentRunOrchestrator - | builds context, validates permissions, invokes runner + | resolves binding, provisions context/resources/state, invokes runner v Built-in RequestRunner adapter / Plugin AgentRunner component | @@ -100,12 +100,13 @@ class AgentRunnerDescriptor(BaseModel): 职责: - 根据 pipeline 配置选择 runner。 -- 将当前 query 或未来事件输入转换为 `AgentRunRequest`。 -- 注入可用工具、模型、知识库、会话、权限、平台能力摘要。 +- 编排 `ContextBuilder` / `ResourceBuilder` 生成 SDK `AgentRunContext` envelope 与已授权资源。 +- 注册本次运行的 `run_id` / runner / resource scope,供后续 `AgentRunAPIProxy` 做权限校验。 - 统一处理超时、异常、流式返回、取消、中断和 telemetry。 - 将插件返回的 `AgentRunResult` 转换回当前 Pipeline 能消费的 `Message` / `MessageChunk`。 LangBot 当前 `ChatMessageHandler` 里的插件 wrapper 应下沉到 orchestrator,避免消息处理器知道插件 runner 的细节。 +这里的 “context” 指 Host 提供的协议 envelope、运行身份、资源、状态快照和默认工作窗口,不是 Agent 的最终 prompt 组装或长期记忆策略。最终模型上下文如何压缩、摘要、召回,应由 AgentRunner 声明策略并在 AgentRunner 边界执行;LangBot 负责提供受限的基础设施和 guardrail。 ## 5. SDK 设计 @@ -159,6 +160,8 @@ class AgentRunContext(BaseModel): subject: SubjectContext | None = None prompt: list[Message] = [] messages: list[Message] = [] + context_request: AgentContextRequest | None = None + context_packaging: ContextPackagingMetadata = ContextPackagingMetadata() input: AgentInput params: dict[str, Any] = {} resources: AgentResources @@ -174,7 +177,9 @@ class AgentRunContext(BaseModel): - `event` 是未来 EBA 的预留封装,本阶段可以由 query 生成一个最小 message event。 - `actor` 表示触发者,`subject` 表示事件作用对象,例如被邀请用户、被撤回消息、被操作群组。 - `prompt` 是宿主处理后的有效 prompt。它来自 LangBot 当前 conversation prompt,并且已经过 `PromptPreProcessing` 等插件事件处理;runner 调模型时应优先使用它,而不是重新读取静态 `config["prompt"]`。 -- `messages` 是历史消息,也已经过宿主 pipeline preprocessing。 +- `messages` 是历史消息,也已经过宿主 pipeline preprocessing。插件化 AgentRunner 路径不再由 Pipeline `msgtrun` 截断,而是在 AgentRunner context packaging 边界按 legacy max-round 语义裁剪。 +- `context_request` 是未来 AgentRunner manifest / binding config 提出的上下文偏好,例如 token budget、summary hybrid、external session;它不是 LangBot 单方面的策略开关。 +- `context_packaging` 描述 Host 本次实际下发的历史窗口,例如使用的策略、来源、已下发消息数、是否确认完整、未来 cursor 等。本阶段只标注 AgentRunner legacy 窗口。 - `input` 是 runner 的主输入,不再强制等同于纯文本消息;`input.contents` 必须保留图片、文件等结构化内容。 - `params` 是单次运行的公开业务变量,宿主过滤内部变量和敏感变量后提供。 - `resources` 列出 LangBot 已授权给 runner 的工具、知识库、模型、文件等。 @@ -190,6 +195,55 @@ ctx.conversation.to_legacy_session() ctx.to_legacy_query_context() ``` +当前代码不改 SDK v1 schema,Host 实际下发结果先作为 +`ctx.runtime.metadata.context_packaging` 下发;它是 packaging receipt,不是 LangBot 侧的长期策略控制面。 + +### 5.2.1 Agentic 上下文与文件协作方向 + +本节主要记录后续设计。本轮已把 legacy `max-round` working window 搬到 +`AgentContextPackager`;LangBot 的完整会话历史仍主要来自进程内 `Conversation.messages`, +长期仍需要持久化 store 和压缩机制。 + +长期方向应区分三类数据: + +- `ConversationStore` / `EventLog`: LangBot 持久保存完整原始消息、事件、工具调用和结果引用,作为审计、重放、重新压缩和历史检索的事实来源。 +- `working context`: 每次 `AgentRunner.run()` 收到的受控上下文窗口。它不应是完整历史全文,而应由 `AgentContextPackager` 组装,例如 effective prompt、压缩摘要、最近若干轮、相关历史片段、RAG/tool context 和当前输入。 +- `context state`: 压缩摘要、`last_compacted_seq`、外部 conversation id、用户偏好等跨轮状态。它由 host-owned state 或授权 storage 持久化,不能放在插件实例内存里。 + +因此不要把完整历史全部塞给插件 runner。正确边界是 LangBot host 保留完整历史, +AgentRunner 边界下发默认安全窗口;如果 runner 需要更多历史,应通过受限 +`AgentRunAPIProxy` 按 cursor/page size 请求片段。这样可以避免每轮 O(n) 复制和跨进程 +序列化,也避免插件 runtime 收到无限膨胀的上下文。 + +上下文压缩应在后续 LiteLLM 接入、能够获得模型 context window 后再实现。建议策略是: + +- 每轮 run 前估算 `prompt + summary + recent turns + tool/RAG context + current input` 的 token。 +- 超过阈值时,对较旧的历史窗口做 compression,生成 summary/checkpoint。 +- 原始消息不删除;summary 是派生记忆,可以重算和审计。 +- 下一轮使用 `summary + recent turns + relevant recalled history` 继续工作。 +- 重启后从持久化 `ConversationStore/EventLog` 和 summary checkpoint 恢复 working context,而不是依赖进程内窗口。 + +大文件、多模态和工具产物不应内联进 `ctx.messages`。后续建议统一成 artifact/resource +引用: + +- 小文本可以直接进入 message/content;大文件、图片、音频、工具输出文件只在 context 中放 + `artifact_id`、`mime_type`、`size`、`digest`、摘要和访问权限。 +- `/tmp` 只适合作为单次 run 的本地临时 staging;不能作为重启后的事实来源。 +- 长期可复用或跨工具协作的文件应放到 box/object storage。当前分支还没有合并 box 能力, + 因此本阶段只预留协议,不实现存取。 +- AgentRunner 通过受限 API 读取 artifact,例如后续的 `get_artifact_metadata()`、 + `open_artifact_stream()`、`read_artifact_range()`。Host 必须校验 run_id、runner 权限、 + 文件大小、MIME、过期时间和可访问范围。 +- 工具返回大结果时也应返回 artifact ref + 摘要,而不是把完整结果塞回消息历史。 + +EBA 接入后,完整事实来源更适合建成 `EventLog + Projection`: + +- `EventLog` 保存 `message.received`、`tool.call.completed`、`message.recalled`、 + `group.member_joined` 等原始事件。 +- `ConversationProjection` 把与对话相关的事件投影成 agent 可读 history。 +- 非消息事件不必伪造成用户消息;它可以带 `actor`、`subject`、`event_data`,再由 + `AgentContextPackager` 决定是否纳入 working context。 + ### 5.3 返回协议 当前 `AgentRunReturn.type` 建议规范化为事件流: diff --git a/src/langbot/pkg/agent/runner/__init__.py b/src/langbot/pkg/agent/runner/__init__.py index 986320c9..40ee1dab 100644 --- a/src/langbot/pkg/agent/runner/__init__.py +++ b/src/langbot/pkg/agent/runner/__init__.py @@ -12,6 +12,7 @@ from .errors import ( ) from .registry import AgentRunnerRegistry from .context_builder import AgentRunContextBuilder +from .context_packager import AgentContextPackager from .resource_builder import AgentResourceBuilder from .result_normalizer import AgentResultNormalizer from .orchestrator import AgentRunOrchestrator @@ -37,6 +38,7 @@ __all__ = [ 'RunnerExecutionError', 'AgentRunnerRegistry', 'AgentRunContextBuilder', + 'AgentContextPackager', 'AgentResourceBuilder', 'AgentResultNormalizer', 'AgentRunOrchestrator', diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index a43aac66..3ee8c65d 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -1,4 +1,4 @@ -"""Agent run context builder for converting Query to AgentRunContext.""" +"""Agent run context builder for provisioning AgentRunContext envelopes.""" from __future__ import annotations import uuid @@ -11,6 +11,7 @@ from langbot_plugin.api.entities.builtin.platform import message as platform_mes from ...core import app from .descriptor import AgentRunnerDescriptor from .config_migration import ConfigMigration +from .context_packager import AgentContextPackager from .state_store import get_state_store from . import events as runner_events @@ -136,13 +137,13 @@ class AgentRunContextPayload(typing.TypedDict): class AgentRunContextBuilder: - """Builder for converting Query to AgentRunContext. + """Builder for provisioning AgentRunContext from a Pipeline Query. Responsibilities: - Generate new run_id (UUID, not query id) - Set trigger type to 'message.received' for pipeline - Build conversation context from session - - Convert messages to SDK format + - Package and convert messages to SDK format - Build input from user_message and message_chain - Build params from query.variables with filtering - Build state snapshot from state_store @@ -165,6 +166,7 @@ class AgentRunContextBuilder: def __init__(self, ap: app.Application): self.ap = ap + self.context_packager = AgentContextPackager() async def build_context( self, @@ -172,7 +174,7 @@ class AgentRunContextBuilder: descriptor: AgentRunnerDescriptor, resources: AgentResources, ) -> AgentRunContextPayload: - """Build AgentRunContext from Query. + """Build AgentRunContext envelope from Query. Args: query: Pipeline query @@ -205,19 +207,6 @@ class AgentRunContextBuilder: 'pipeline_uuid': query.pipeline_uuid, } - # Build input - input: AgentInput = self._build_input(query) - - # Build messages - messages = self._build_messages(query) - - # Build params from query.variables with filtering - params = self._build_params(query) - - # Build state snapshot from state_store - state_store = get_state_store() - state: AgentRunState = state_store.build_snapshot(query, descriptor) - # Get runner binding config from ai.runner_config[runner_id] # This is Pipeline's configuration for this specific runner binding, # passed through AgentRunContext.config to the runner @@ -226,6 +215,20 @@ class AgentRunContextBuilder: descriptor.id, ) + # Build input + input: AgentInput = self._build_input(query) + + # Build bounded working context window for the runner. + packaged_context = self.context_packager.package_messages(query, runner_config) + messages = self._build_messages(packaged_context.messages) + + # Build params from query.variables with filtering + params = self._build_params(query) + + # Build state snapshot from state_store + state_store = get_state_store() + state: AgentRunState = state_store.build_snapshot(query, descriptor) + streaming_supported = await self._is_stream_output_supported(query) remove_think = query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) @@ -241,6 +244,10 @@ class AgentRunContextBuilder: 'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'), 'streaming_supported': streaming_supported, 'remove_think': remove_think, + 'context_packaging': { + 'policy': packaged_context.policy, + 'history': packaged_context.history, + }, }, } @@ -526,13 +533,12 @@ class AgentRunContextBuilder: return prompt_messages - def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]: - """Build messages list from query.""" + def _build_messages(self, source_messages: list[typing.Any]) -> list[dict[str, typing.Any]]: + """Build messages list from packaged source messages.""" messages: list[dict[str, typing.Any]] = [] - if query.messages: - for msg in query.messages: - messages.append(msg.model_dump(mode='json')) + for msg in source_messages: + messages.append(msg.model_dump(mode='json')) return messages diff --git a/src/langbot/pkg/agent/runner/context_packager.py b/src/langbot/pkg/agent/runner/context_packager.py new file mode 100644 index 00000000..5f0d2c60 --- /dev/null +++ b/src/langbot/pkg/agent/runner/context_packager.py @@ -0,0 +1,79 @@ +"""Agent context packaging helpers.""" +from __future__ import annotations + +import dataclasses +import typing + +from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query + + +DEFAULT_LEGACY_MAX_ROUND = 10 + + +@dataclasses.dataclass(frozen=True) +class ContextPackagingResult: + """Packaged working context for one AgentRunner run.""" + + messages: list[typing.Any] + policy: dict[str, typing.Any] + history: dict[str, typing.Any] + + +def get_legacy_max_round(runner_config: dict[str, typing.Any]) -> typing.Any: + """Return the configured legacy max-round value. + + Keep the existing config semantics intact: callers are expected to pass the + already-resolved runner binding config, and invalid values fail the same way + the old truncator failed when comparing them with an integer round count. + """ + return runner_config.get('max-round', DEFAULT_LEGACY_MAX_ROUND) + + +def select_legacy_max_round_messages( + messages: list[typing.Any] | None, + max_round: typing.Any, +) -> list[typing.Any]: + """Select the same message window as the legacy round truncator.""" + if not messages: + return [] + + temp_messages: list[typing.Any] = [] + current_round = 0 + + for msg in messages[::-1]: + if current_round < max_round: + temp_messages.append(msg) + if getattr(msg, 'role', None) == 'user': + current_round += 1 + else: + break + + return temp_messages[::-1] + + +class AgentContextPackager: + """Build the bounded working context for AgentRunner execution.""" + + def package_messages( + self, + query: pipeline_query.Query, + runner_config: dict[str, typing.Any], + ) -> ContextPackagingResult: + """Package query messages using the current legacy max-round policy.""" + source_messages = query.messages or [] + max_round = get_legacy_max_round(runner_config) + packaged_messages = select_legacy_max_round_messages(source_messages, max_round) + + return ContextPackagingResult( + messages=packaged_messages, + policy={ + 'mode': 'legacy_max_round', + 'max_round': max_round, + }, + history={ + 'source': 'query.messages', + 'source_total_count': len(source_messages), + 'delivered_count': len(packaged_messages), + 'messages_complete': len(packaged_messages) == len(source_messages), + }, + ) diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 2e8f6d25..c3f522d9 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -31,7 +31,7 @@ class AgentRunOrchestrator: Responsibilities: - Resolve runner ID from pipeline config (new or old format) - Get runner descriptor from registry - - Build AgentRunContext from Query + - Provision AgentRunContext envelope from Query - Build AgentResources with permission filtering - Invoke plugin runtime RUN_AGENT action - Normalize AgentRunResult to Pipeline messages diff --git a/src/langbot/pkg/pipeline/msgtrun/msgtrun.py b/src/langbot/pkg/pipeline/msgtrun/msgtrun.py index 00a9bfbf..af8eb0e6 100644 --- a/src/langbot/pkg/pipeline/msgtrun/msgtrun.py +++ b/src/langbot/pkg/pipeline/msgtrun/msgtrun.py @@ -3,6 +3,7 @@ from __future__ import annotations from .. import stage, entities from . import truncator from ...utils import importutil +from ...agent.runner.config_migration import ConfigMigration import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query from . import truncators @@ -30,6 +31,9 @@ class ConversationMessageTruncator(stage.PipelineStage): async def process(self, query: pipeline_query.Query, stage_inst_name: str) -> entities.StageProcessResult: """处理""" + if ConfigMigration.resolve_runner_id(query.pipeline_config): + return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + query = await self.trun.truncate(query) return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) diff --git a/src/langbot/pkg/pipeline/msgtrun/truncators/round.py b/src/langbot/pkg/pipeline/msgtrun/truncators/round.py index f339f341..57c58a24 100644 --- a/src/langbot/pkg/pipeline/msgtrun/truncators/round.py +++ b/src/langbot/pkg/pipeline/msgtrun/truncators/round.py @@ -3,6 +3,10 @@ from __future__ import annotations from .. import truncator import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query from ....agent.runner.config_migration import ConfigMigration +from ....agent.runner.context_packager import ( + get_legacy_max_round, + select_legacy_max_round_messages, +) @truncator.truncator_class('round') @@ -11,25 +15,15 @@ class RoundTruncator(truncator.Truncator): async def truncate(self, query: pipeline_query.Query) -> pipeline_query.Query: """截断""" - # max-round remains a pipeline-side trimming knob until token-budget - # based compaction replaces this stage. runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) - runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {} - max_round = runner_config.get('max-round', 10) + if runner_id: + runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) + else: + runner_config = query.pipeline_config.get('msg-truncate', {}).get('round', {}) - temp_messages = [] - - current_round = 0 - - # Traverse from back to front - for msg in query.messages[::-1]: - if current_round < max_round: - temp_messages.append(msg) - if msg.role == 'user': - current_round += 1 - else: - break - - query.messages = temp_messages[::-1] + query.messages = select_legacy_max_round_messages( + query.messages, + get_legacy_max_round(runner_config), + ) return query diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index f74fa098..122824a5 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -309,6 +309,62 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(): assert await get_session_registry().get(context["run_id"]) is None +@pytest.mark.asyncio +async def test_orchestrator_packages_legacy_max_round_without_mutating_query(): + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "message.completed", + "data": {"message": {"role": "assistant", "content": "fake response"}}, + } + ] + ) + ap = FakeApplication(plugin_connector) + orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) + query = make_query() + query.pipeline_config["ai"]["runner_config"][RUNNER_ID]["max-round"] = 2 + query.messages = [ + provider_message.Message(role="user", content="message 1"), + provider_message.Message(role="assistant", content="response 1"), + provider_message.Message(role="user", content="message 2"), + provider_message.Message(role="assistant", content="response 2"), + provider_message.Message(role="user", content="message 3"), + provider_message.Message(role="assistant", content="response 3"), + ] + + messages = [message async for message in orchestrator.run_from_query(query)] + + assert len(messages) == 1 + context = plugin_connector.contexts[0] + assert [message["content"] for message in context["messages"]] == [ + "message 2", + "response 2", + "message 3", + "response 3", + ] + assert [message.content for message in query.messages] == [ + "message 1", + "response 1", + "message 2", + "response 2", + "message 3", + "response 3", + ] + assert context["runtime"]["metadata"]["context_packaging"] == { + "policy": { + "mode": "legacy_max_round", + "max_round": 2, + }, + "history": { + "source": "query.messages", + "source_total_count": 6, + "delivered_count": 4, + "messages_complete": False, + }, + } + + @pytest.mark.asyncio async def test_orchestrator_streams_fake_plugin_deltas(): descriptor = make_descriptor() diff --git a/tests/unit_tests/pipeline/test_msgtrun.py b/tests/unit_tests/pipeline/test_msgtrun.py index 1fe44ba4..04308058 100644 --- a/tests/unit_tests/pipeline/test_msgtrun.py +++ b/tests/unit_tests/pipeline/test_msgtrun.py @@ -48,6 +48,18 @@ def get_round_truncator_module(): def make_truncate_config(max_round: int = 5): """Create a pipeline config with max-round setting.""" + return { + 'msg-truncate': { + 'method': 'round', + 'round': { + 'max-round': max_round, + }, + }, + } + + +def make_agent_runner_config(max_round: int = 5): + """Create an AgentRunner pipeline config with max-round binding config.""" return { 'ai': { 'runner': {'id': RUNNER_ID}, @@ -137,6 +149,36 @@ class TestRoundTruncatorProcess: # All messages should be preserved assert len(result.new_query.messages) == 5 + @pytest.mark.asyncio + async def test_agent_runner_path_skips_pipeline_truncation(self): + """AgentRunner path should leave query.messages intact at pipeline stage.""" + msgtrun = get_msgtrun_module() + entities = get_entities_module() + + app = FakeApp() + stage = msgtrun.ConversationMessageTruncator(app) + + pipeline_config = make_agent_runner_config(max_round=1) + + await stage.initialize(pipeline_config) + + query = text_query("current") + query.pipeline_config = pipeline_config + query.messages = [ + provider_message.Message(role='user', content='old1'), + provider_message.Message(role='assistant', content='old1_resp'), + provider_message.Message(role='user', content='current'), + ] + + result = await stage.process(query, 'ConversationMessageTruncator') + + assert result.result_type == entities.ResultType.CONTINUE + assert [(msg.role, msg.content) for msg in result.new_query.messages] == [ + ('user', 'old1'), + ('assistant', 'old1_resp'), + ('user', 'current'), + ] + @pytest.mark.asyncio async def test_truncate_exceeds_limit(self): """Messages exceeding max-round should be truncated precisely.