From d0383e146e68e49e1d962b9b01cd506c3295bd5f Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Tue, 2 Jun 2026 17:01:45 +0800 Subject: [PATCH] refactor(agent-runner): remove host context windowing --- .../AGENT_CONTEXT_PROTOCOL.md | 25 +- .../IMPLEMENTATION_PLAN.md | 42 +- .../OFFICIAL_RUNNER_PLUGINS.md | 7 +- .../agent-runner-pluginization/PROTOCOL_V1.md | 18 +- .../pkg/agent/runner/config_migration.py | 3 + .../pkg/agent/runner/context_builder.py | 4 +- .../pkg/agent/runner/event_log_store.py | 1 - src/langbot/pkg/agent/runner/host_models.py | 5 - src/langbot/pkg/agent/runner/orchestrator.py | 10 - .../pkg/agent/runner/pipeline_adapter.py | 46 --- src/langbot/pkg/api/http/service/pipeline.py | 1 - .../core/migrations/m009_msg_truncator_cfg.py | 22 -- .../migrations/dbm001_migrate_v3_config.py | 3 - src/langbot/pkg/pipeline/msgtrun/__init__.py | 0 src/langbot/pkg/pipeline/msgtrun/msgtrun.py | 39 -- .../pkg/pipeline/msgtrun/round_policy.py | 34 -- src/langbot/pkg/pipeline/msgtrun/truncator.py | 56 --- .../pipeline/msgtrun/truncators/__init__.py | 0 .../pkg/pipeline/msgtrun/truncators/round.py | 29 -- src/langbot/pkg/pipeline/pipelinemgr.py | 5 +- src/langbot/templates/legacy/pipeline.json | 8 +- .../unit_tests/agent/test_config_migration.py | 10 +- .../agent/test_config_migration_full.py | 23 +- .../agent/test_event_first_protocol.py | 83 +--- .../agent/test_orchestrator_integration.py | 51 +-- tests/unit_tests/pipeline/test_msgtrun.py | 369 ------------------ 26 files changed, 79 insertions(+), 815 deletions(-) delete mode 100644 src/langbot/pkg/core/migrations/m009_msg_truncator_cfg.py delete mode 100644 src/langbot/pkg/pipeline/msgtrun/__init__.py delete mode 100644 src/langbot/pkg/pipeline/msgtrun/msgtrun.py delete mode 100644 src/langbot/pkg/pipeline/msgtrun/round_policy.py delete mode 100644 src/langbot/pkg/pipeline/msgtrun/truncator.py delete mode 100644 src/langbot/pkg/pipeline/msgtrun/truncators/__init__.py delete mode 100644 src/langbot/pkg/pipeline/msgtrun/truncators/round.py delete mode 100644 tests/unit_tests/pipeline/test_msgtrun.py diff --git a/docs/agent-runner-pluginization/AGENT_CONTEXT_PROTOCOL.md b/docs/agent-runner-pluginization/AGENT_CONTEXT_PROTOCOL.md index da68b2bf..41531284 100644 --- a/docs/agent-runner-pluginization/AGENT_CONTEXT_PROTOCOL.md +++ b/docs/agent-runner-pluginization/AGENT_CONTEXT_PROTOCOL.md @@ -14,7 +14,7 @@ - ✅ `AgentRunAPIProxy.state` — get/set/delete API - ✅ EventLog / Transcript / ArtifactStore — host 事实源 - ✅ PersistentStateStore — 持久化状态存储 -- ✅ `max-round` 已从协议实体中移除;如某 runner 仍需要类似历史窗口参数,应作为 runner binding config 由插件 manifest 暴露,而不是 Host / Pipeline 协议字段 +- ✅ `max-round` / host-side history window 已从 LangBot Host/Pipeline 语义中移除;如某 runner 仍需要类似参数,应由该 runner 自己解释配置 - ✅ 外部 harness context projection 已用 Claude Code runner 做 MVP 验证:context 文件、skill 投影、MCP 配置和 host-owned resume state ## 1. 设计原则 @@ -41,7 +41,7 @@ 如果某个 runner 仍需要“最多读取多少轮历史”这样的策略参数,应由该 runner 在自己的 manifest/config schema 中声明,并作为 binding config 存到 `ctx.config` / `runner_config`。Host 只提供 history pull API、cursor、hard cap 和权限边界;runner 自己决定是否读取、读取多少、如何截断和压缩。 -当前 official local-agent 方向是通过 Host history API 拉取 transcript,并由 runner 自己管理模型上下文。它不依赖 Pipeline adapter 下发的 `max-round` / bootstrap 窗口。 +当前 official local-agent 方向是通过 Host history API 拉取 transcript,并由 runner 自己管理模型上下文。它不依赖 Pipeline adapter 下发历史窗口。 新协议不应该问“LangBot 每轮裁几轮历史给 agent”,而应该问: @@ -58,7 +58,7 @@ - `Transcript`: Host 从 EventLog 投影出的对话视图,用于 UI、审计和按需历史读取。 - `Working context`: Agent 本轮实际送进模型或 runtime 的上下文,由 AgentRunner 决定。 -LangBot 可以为简单 runner 提供 bootstrap window,但这只是 convenience,不是主架构。 +LangBot 不再提供 host-side bootstrap window。简单 runner 如果需要历史窗口,应在 runner 内部通过 Host history API 拉取并裁剪。 ## 2. Event 到来时传什么 @@ -117,22 +117,11 @@ class AgentRunContext(BaseModel): 这些会破坏跨进程序列化成本、泄露范围、KV cache 稳定性,也会迫使 host 替 agent 做 context 策略。 -### 2.3 可选 bootstrap +### 2.3 不提供 Host Bootstrap Window -根据 runner manifest 可以提供可选 bootstrap: +`AgentRunContext.bootstrap` 可以作为协议里的可选扩展字段保留,但 LangBot Host 默认不填历史窗口,也不通过 Pipeline 配置决定窗口大小。 -```yaml -context: - bootstrap: none | current_event | recent_tail | summary_tail - max_inline_events: 0 - max_inline_bytes: 0 -``` - -建议默认: - -- 自管 runtime:`bootstrap: current_event` -- 简单 HTTP runner:`bootstrap: recent_tail` -- runner 如果需要 `recent_tail` 策略,应通过自己的 binding config 声明窗口大小;Host 不把 `max-round` 作为通用协议字段扩展。 +如果 runner 需要类似 `recent_tail` 的策略,它应在自己的 manifest/config schema 中声明参数,并在 runner 内部通过 `history_page` / `history_search` 读取、裁剪和压缩历史。Host 只负责权限、分页、hard cap 和事实源。 ## 3. ContextAccess @@ -335,7 +324,7 @@ LangBot core 不应内置官方 agent 的业务流程: **已完成(当前分支)**: -- ✅ `max-round` 不再是协议字段;类似历史窗口策略属于 runner binding config,而不是 Host / Pipeline 通用语义 +- ✅ `max-round` 不再是协议字段,也不再是 Host / Pipeline 通用语义 - ✅ 新 runner 默认不收到历史窗口 - ✅ `AgentRunContext` 增加 `context` / cursor / access capabilities - ✅ `AgentRunAPIProxy` 增加 history / events / artifacts / state API diff --git a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md index 2d605ef3..daee03a3 100644 --- a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md +++ b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md @@ -156,7 +156,7 @@ class AgentRunnerDescriptor(BaseModel): ### 3.4 context_builder.py / pipeline_adapter.py -`context_builder.py` 只负责从 `AgentEventEnvelope + AgentBinding` 构造 SDK v1 `AgentRunContext`。Pipeline Query 的读取、参数过滤、prompt 提取和 `max-round` bootstrap 映射都属于 `PipelineAdapter`,不再放进 context builder。 +`context_builder.py` 只负责从 `AgentEventEnvelope + AgentBinding` 构造 SDK v1 `AgentRunContext`。Pipeline Query 的读取、参数过滤和 prompt 提取属于 `PipelineAdapter`,但 PipelineAdapter 不再做历史窗口裁剪或 bootstrap 打包。 当前消息 Pipeline 进入 agent runner 的路径: @@ -183,7 +183,7 @@ Protocol v1 context 的稳定字段: - `state`: `PersistentStateStore` 读取的 host-managed scoped state snapshot - `runtime`: host/version/workspace/bot/query/trace/deadline - `config`: 当前 binding 对该 runner id 的配置,即 `runner_config` -- `bootstrap`: 可选小窗口,不是完整历史 +- `bootstrap`: 可选扩展字段;LangBot Host 默认不填历史窗口 - `adapter`: Pipeline 或其它入口 adapter 的元数据 Pipeline adapter 的 `prompt` 和公开业务变量不进入顶层协议字段: @@ -191,58 +191,36 @@ Pipeline adapter 的 `prompt` 和公开业务变量不进入顶层协议字段 - filtered params -> `ctx.adapter.extra["params"]` - legacy/effective prompt 可以暂存到 `ctx.adapter.extra["prompt"]`,但 official runner 不应把它当作行为契约 -- `max-round` working window 可以保留在 Pipeline adapter 兼容层,但 official - `local-agent` 不消费该 bootstrap/window -- packaging 元数据 -> `ctx.runtime.metadata.context_packaging` +- LangBot Host 不生成 `bootstrap.messages`、`adapter_messages` 或 context packaging 元数据 现阶段不要把新的压缩或 token-budget 裁剪塞回 Pipeline stage。Pipeline 只负责入口适配;完整历史和长期上下文由 EventLog / Transcript / pull APIs / future ContextCompressor 支撑。 ### 3.4.1 Agentic context plan -本轮只在 `PipelineAdapter` 中保留 `max-round` working window,不改变 user-round 选择规则。 EventLog / Transcript / Host pull APIs 已落地,`ContextCompressor` 仍是设计预留。 目标是让 Pipeline 逐步退化为入口 adapter,让 AgentRunner 层拥有上下文打包职责。 -建议最终拆成四个 host-side 服务: +建议 Host 保持三类事实源和受限 API: ```text ConversationStore / EventLog -> durable append-only raw messages, events, tool results, artifact refs ConversationProjection -> converts events into agent-readable conversation history -PipelineAdapter bootstrap policy - -> builds the bounded working context for one run ContextCompressor - -> creates and updates summaries/checkpoints when thresholds are exceeded + -> future optional service for summaries/checkpoints, requested and consumed by runners ``` 关键原则: - 完整历史属于 LangBot host,不属于插件实例。插件仍是 singleton/stateless。 -- `ctx.bootstrap.messages` 是 optional working context window,不是完整 conversation dump。 +- `ctx.bootstrap.messages` 不是 Host 默认下发的 working context。 - 每轮不能全量复制/序列化完整历史给插件 runtime;否则长会话会产生 O(n) 成本和跨进程 payload 膨胀。 -- `max-round` 的 user-round 规则只属于 Pipeline adapter 的 bootstrap 策略。 -- LiteLLM 接入后,context packaging 应升级为 token budget / summary / pull API 协作策略。 +- `max-round` 或类似窗口规则不属于 LangBot Host / Pipeline 语义。 +- LiteLLM 接入后,模型窗口元信息应作为 resource/runtime metadata 暴露给 runner,由 runner 决定预算和压缩策略。 - `ContextCompressor` 生成的是派生 summary/checkpoint,不能覆盖或删除 raw history。 - 重启恢复依赖持久化 store 和 summary checkpoint,不依赖 `SessionManager` 里的进程内 conversation list。 -后续 `AgentRunContext` 可增加: - -```python -context_request: AgentContextRequest | None -context_packaging: ContextPackagingMetadata -``` - -建议语义: - -- `context_request.mode`: AgentRunner manifest / binding config 请求的 `max_round`、`token_budget`、`summary_hybrid`、`external_session` -- `context_request.budget`: 模型窗口、预留输出 token、工具/RAG 预算等偏好 -- `context_packaging.policy`: Host 本次实际采用的打包策略 -- `context_packaging.delivered_count`: 本次下发的历史消息数 -- `context_packaging.source_total_count`: packager 可见的原始历史消息数 -- `context_packaging.messages_complete`: 本窗口是否已经包含完整历史 -- `context_packaging.cursor_before`: 未来通过 host API 读取更早历史的 cursor - 未来需要的受限 API: ```python @@ -256,7 +234,7 @@ page size、总字节数、deadline 和可访问 conversation。 ### 3.4.2 Large artifacts and tool collaboration -大文件、多模态输入和工具产物不要内联进 bootstrap messages 或 tool result。后续统一用 +大文件、多模态输入和工具产物不要内联进 prompt、bootstrap 或 tool result。后续统一用 artifact/resource ref 协作: - message/content 里只放小文本和必要摘要。 @@ -512,7 +490,7 @@ async def run_from_query(query: pipeline_query.Query) -> AsyncGenerator[Message ### Step 4:local-agent parity - 使用静态绑定配置 `ctx.config["prompt"]`,不读取 `ctx.adapter.extra["prompt"]`。 -- 通过 Host history API 拉取 transcript,不读取 `ctx.bootstrap.messages` 或 `ctx.adapter.adapter_messages`。 +- 通过 Host history API 拉取 transcript,不读取 `ctx.bootstrap.messages` 或 adapter window 字段。 - 当前 user message 从 `ctx.input.contents` 构造,保留多模态内容。 - RAG 只替换/插入文本部分,不丢图片/文件。 - streaming/non-streaming 默认跟随 `runtime.metadata.streaming_supported`。 diff --git a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md index 33872d7e..3541c247 100644 --- a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md +++ b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md @@ -183,10 +183,9 @@ LangBot core 不应为了 local-agent 保留业务编排逻辑。local-agent 的 - `ctx.runtime.metadata.streaming_supported`:当前 adapter 是否能消费流式输出。 - 宿主代理 action:模型、工具、知识库、rerank 调用必须通过 `run_id` 校验资源权限。 -`local-agent` 不应消费 Pipeline adapter 生成的 `max-round` / `bootstrap` -窗口,也不应读取 `ctx.adapter.extra.prompt`。它应从绑定配置读取静态 -`prompt`,并通过 Host history API 拉取 transcript。Pipeline adapter 可以继续为旧入口 -保留 `max-round` 兼容逻辑,但这不是 official local-agent 的行为契约。 +`local-agent` 不应消费 Pipeline adapter 生成的历史窗口,也不应读取 +`ctx.adapter.extra.prompt`。它应从绑定配置读取静态 `prompt`,并通过 Host +history API 拉取 transcript。Pipeline adapter 不保留 Host-side window 兼容逻辑。 建议 local-agent manifest 使用 hybrid 或 self-managed context: diff --git a/docs/agent-runner-pluginization/PROTOCOL_V1.md b/docs/agent-runner-pluginization/PROTOCOL_V1.md index d5437554..82abbc7b 100644 --- a/docs/agent-runner-pluginization/PROTOCOL_V1.md +++ b/docs/agent-runner-pluginization/PROTOCOL_V1.md @@ -11,7 +11,7 @@ - ✅ Host 支持 `run_id` session authorization - ✅ Host 能从当前 Pipeline 入口生成 event-first context - ✅ `messages` 降级为 optional bootstrap -- ✅ `max-round` 不出现在协议实体中;类似历史窗口参数若存在,应来自 runner manifest/config schema,并作为 binding config 进入 `ctx.config` +- ✅ `max-round` 不出现在协议实体中,也不属于 Host / Pipeline 语义;类似参数若存在,由 runner 自己解释 `ctx.config` - ✅ Proxy 覆盖 model、tool、knowledge、state/storage - ✅ History / Event / Artifact / State API 已落地 - ✅ EventLog / Transcript / ArtifactStore / PersistentStateStore 已落地 @@ -142,13 +142,13 @@ class AgentRunnerContextPolicy(BaseModel): wants_static_context_refs: bool = True ``` -Host 使用该声明决定是否给 runner inline bootstrap history。默认原则: +Host 不使用该声明给 runner inline 历史窗口。默认原则: - Host 不得默认 inline 全量历史。 -- Host 默认只 inline 当前 event / input 和 context handles。 +- Host 只 inline 当前 event / input 和 context handles。 - Runner 拥有 working context assembly。 - Runner 可在授权后通过 Host history / event / artifact / state APIs 拉取更多上下文。 -- `max-round` 不属于 Protocol v1 字段,也不属于 Pipeline / Host 通用语义。 +- `max-round` 或类似窗口参数不属于 Protocol v1 字段,也不属于 Pipeline / Host 通用语义;如果某个 runner 需要,应由 runner 自己解释 `ctx.config`。 ## 4. Run 协议 @@ -193,7 +193,7 @@ class AgentRunContext(BaseModel): - `event` 是必选字段,Protocol v1 是 event-first。 - `input` 表示当前事件的主输入,不等于历史消息。 -- `bootstrap` 是可选字段,不是完整 history。 +- `bootstrap` 是可选字段;LangBot Host 默认不填历史窗口。 - `adapter` 只放 Pipeline adapter 字段,runner 不应依赖它做长期能力。 - `config` 是 Host binding config,不是插件实例状态。 @@ -342,10 +342,10 @@ class BootstrapContext(BaseModel): 约束: -- `bootstrap.messages` 是 host convenience,不是协议核心。 -- 自管 context runner 默认应收到空 bootstrap 或只收到当前 event。 +- `bootstrap.messages` 不是 LangBot Host 的默认行为。 +- 自管 context runner 默认应收到空 bootstrap。 - Host 不应为了”帮 agent 更聪明”而自动拼接完整 transcript。 -- 类似历史窗口策略应由具体 runner 的 binding config 表达;new/official runners 不应依赖 Pipeline adapter 下发的 bootstrap window。 +- 类似历史窗口策略应由具体 runner 自己解释 binding config,并通过 Host history API 拉取历史;new/official runners 不应依赖 Pipeline adapter 下发历史窗口。 ### 4.10 RuntimeContext @@ -685,7 +685,7 @@ Protocol v1 已在当前分支完成: - ✅ Host 支持 `run_id` session authorization - ✅ Host 能从当前 Pipeline 入口生成 event-first context - ✅ `messages` 降级为 optional bootstrap -- ✅ `max-round` 不出现在协议实体中;类似参数属于具体 runner binding config +- ✅ `max-round` 不出现在协议实体中,也不属于 Host / Pipeline 语义 - ✅ Proxy 至少覆盖 model、tool、knowledge、state/storage - ✅ History / event / artifact API 已落地 - ✅ EventLog / Transcript / ArtifactStore / PersistentStateStore 已落地 diff --git a/src/langbot/pkg/agent/runner/config_migration.py b/src/langbot/pkg/agent/runner/config_migration.py index 3d007644..a9c8065e 100644 --- a/src/langbot/pkg/agent/runner/config_migration.py +++ b/src/langbot/pkg/agent/runner/config_migration.py @@ -114,6 +114,9 @@ class ConfigMigration: if old_runner_name: old_config = ai_config.get(old_runner_name, {}) if old_config: + old_config = dict(old_config) + if runner_id == OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent']: + old_config.pop('max-round', None) return ConfigMigration.normalize_runner_config_for_migration(runner_id, old_config) return {} diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index d6fb4179..f918315b 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -296,8 +296,6 @@ class AgentRunContextBuilder: adapter_context = { 'query_id': None, 'pipeline_uuid': binding.pipeline_uuid, - 'max_round': binding.max_round, # For reference only - 'adapter_messages': [], 'extra': {}, } @@ -316,7 +314,7 @@ class AgentRunContextBuilder: 'state': state, 'runtime': runtime, 'config': binding.runner_config, - 'bootstrap': None, # Optional - no messages inlined by default + 'bootstrap': None, 'adapter': adapter_context, 'metadata': {}, # Additional metadata } diff --git a/src/langbot/pkg/agent/runner/event_log_store.py b/src/langbot/pkg/agent/runner/event_log_store.py index 0b693b19..134d07df 100644 --- a/src/langbot/pkg/agent/runner/event_log_store.py +++ b/src/langbot/pkg/agent/runner/event_log_store.py @@ -11,7 +11,6 @@ from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession from sqlalchemy.orm import sessionmaker from ...entity.persistence.event_log import EventLog -from ...entity.persistence.transcript import Transcript class EventLogStore: diff --git a/src/langbot/pkg/agent/runner/host_models.py b/src/langbot/pkg/agent/runner/host_models.py index ffa96604..96e578ff 100644 --- a/src/langbot/pkg/agent/runner/host_models.py +++ b/src/langbot/pkg/agent/runner/host_models.py @@ -8,8 +8,6 @@ import typing import pydantic from langbot_plugin.api.entities.builtin.agent_runner.event import ( - AgentEventContext, - ConversationContext, ActorContext, SubjectContext, RawEventRef, @@ -172,6 +170,3 @@ class AgentBinding(pydantic.BaseModel): # Fields for Pipeline adapter pipeline_uuid: str | None = None """Pipeline UUID (for Pipeline adapter).""" - - max_round: int | None = None - """max-round (for Pipeline adapter bootstrap, not Protocol v1).""" diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 90dafbf1..f479d4cc 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -133,16 +133,6 @@ class AgentRunOrchestrator: # Merge prompt into adapter.extra for Pipeline adapter consumers. if 'prompt' in adapter_context: context['adapter']['extra']['prompt'] = adapter_context['prompt'] - # Merge bootstrap if provided - if adapter_context.get('bootstrap'): - context['bootstrap'] = adapter_context['bootstrap'] - # Also expose the bootstrap window through adapter metadata. - bootstrap_messages = adapter_context['bootstrap'].get('messages') - if bootstrap_messages: - context['adapter']['adapter_messages'] = bootstrap_messages - # Merge runtime metadata if provided - if adapter_context.get('runtime_metadata'): - context['runtime']['metadata'].update(adapter_context['runtime_metadata']) # Set query_id if provided if adapter_context.get('query_id'): context['runtime']['query_id'] = adapter_context['query_id'] diff --git a/src/langbot/pkg/agent/runner/pipeline_adapter.py b/src/langbot/pkg/agent/runner/pipeline_adapter.py index ca0d67ae..6a5df3d5 100644 --- a/src/langbot/pkg/agent/runner/pipeline_adapter.py +++ b/src/langbot/pkg/agent/runner/pipeline_adapter.py @@ -29,7 +29,6 @@ from .host_models import ( DeliveryPolicy, ) from . import events as runner_events -from ...pipeline.msgtrun.round_policy import select_max_round_messages class PipelineAdapter: @@ -38,7 +37,6 @@ class PipelineAdapter: This adapter is responsible for: - Converting Query to AgentEventEnvelope - Converting Pipeline config to temporary AgentBinding - - Handling max-round as bootstrap policy - Putting Query-only fields into adapter context """ @@ -118,10 +116,6 @@ class PipelineAdapter: runner_config = ai_config.get('runner_config', {}).get(runner_id, {}) pipeline_uuid = getattr(query, 'pipeline_uuid', None) - # Extract max_round for adapter (used in bootstrap, not Protocol v1) - # Note: config uses 'max-round' with hyphen, not 'max_round' with underscore - max_round = runner_config.get('max-round') or ai_config.get('max-round') - # Build scope scope = BindingScope( scope_type="pipeline", @@ -158,45 +152,8 @@ class PipelineAdapter: delivery_policy=delivery_policy, enabled=True, pipeline_uuid=pipeline_uuid, - max_round=max_round, ) - @classmethod - def build_bootstrap_context( - cls, - query: pipeline_query.Query, - binding: AgentBinding, - ) -> tuple[dict[str, typing.Any] | None, dict[str, typing.Any]]: - """Build bootstrap messages and runtime metadata for Pipeline max-round.""" - max_round = binding.max_round - source_messages = query.messages or [] - if not max_round or max_round <= 0 or not source_messages: - return None, {} - - packaged_messages = select_max_round_messages(source_messages, max_round) - bootstrap_messages = [cls._dump_message(msg) for msg in packaged_messages] - bootstrap = { - "messages": bootstrap_messages, - "summary": None, - "artifacts": [], - "metadata": {}, - } - runtime_metadata = { - 'context_packaging': { - 'policy': { - 'mode': 'max_round', - 'max_round': max_round, - }, - 'history': { - 'source': 'query.messages', - 'source_total_count': len(source_messages), - 'delivered_count': len(packaged_messages), - 'messages_complete': len(packaged_messages) == len(source_messages), - }, - }, - } - return bootstrap, runtime_metadata - @classmethod def build_adapter_context( cls, @@ -204,13 +161,10 @@ class PipelineAdapter: binding: AgentBinding, ) -> dict[str, typing.Any]: """Build Query-derived fields for the Pipeline adapter entry.""" - bootstrap, runtime_metadata = cls.build_bootstrap_context(query, binding) return { 'params': cls.build_params(query), 'prompt': cls.build_prompt(query), - 'bootstrap': bootstrap, 'query_id': getattr(query, 'query_id', None), - 'runtime_metadata': runtime_metadata, } @classmethod diff --git a/src/langbot/pkg/api/http/service/pipeline.py b/src/langbot/pkg/api/http/service/pipeline.py index d17c59bc..fbafe6d2 100644 --- a/src/langbot/pkg/api/http/service/pipeline.py +++ b/src/langbot/pkg/api/http/service/pipeline.py @@ -19,7 +19,6 @@ default_stage_order = [ 'BanSessionCheckStage', # 封禁会话检查 'PreContentFilterStage', # 内容过滤前置阶段 'PreProcessor', # 预处理器 - 'ConversationMessageTruncator', # 会话消息截断器 'RequireRateLimitOccupancy', # 请求速率限制占用 'MessageProcessor', # 处理器 'ReleaseRateLimitOccupancy', # 释放速率限制占用 diff --git a/src/langbot/pkg/core/migrations/m009_msg_truncator_cfg.py b/src/langbot/pkg/core/migrations/m009_msg_truncator_cfg.py deleted file mode 100644 index 066af126..00000000 --- a/src/langbot/pkg/core/migrations/m009_msg_truncator_cfg.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import annotations - -from .. import migration - - -@migration.migration_class('msg-truncator-cfg-migration', 9) -class MsgTruncatorConfigMigration(migration.Migration): - """迁移""" - - async def need_migrate(self) -> bool: - """判断当前环境是否需要运行此迁移""" - return 'msg-truncate' not in self.ap.pipeline_cfg.data - - async def run(self): - """执行迁移""" - - self.ap.pipeline_cfg.data['msg-truncate'] = { - 'method': 'round', - 'round': {'max-round': 10}, - } - - await self.ap.pipeline_cfg.dump_config() diff --git a/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py b/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py index 55e63fff..0eddacf6 100644 --- a/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py +++ b/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py @@ -118,9 +118,6 @@ class DBMigrateV3Config(migration.DBMigration): 'runner': self.ap.provider_cfg.data['runner'], } pipeline_config['ai']['local-agent']['model'] = model_uuid - pipeline_config['ai']['local-agent']['max-round'] = self.ap.pipeline_cfg.data['msg-truncate']['round'][ - 'max-round' - ] pipeline_config['ai']['local-agent']['prompt'] = [ { diff --git a/src/langbot/pkg/pipeline/msgtrun/__init__.py b/src/langbot/pkg/pipeline/msgtrun/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/langbot/pkg/pipeline/msgtrun/msgtrun.py b/src/langbot/pkg/pipeline/msgtrun/msgtrun.py deleted file mode 100644 index af8eb0e6..00000000 --- a/src/langbot/pkg/pipeline/msgtrun/msgtrun.py +++ /dev/null @@ -1,39 +0,0 @@ -from __future__ import annotations - -from .. import stage, entities -from . import truncator -from ...utils import importutil -from ...agent.runner.config_migration import ConfigMigration -import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -from . import truncators - -importutil.import_modules_in_pkg(truncators) - - -@stage.stage_class('ConversationMessageTruncator') -class ConversationMessageTruncator(stage.PipelineStage): - """Conversation message truncator - - Used to truncate the conversation message chain to adapt to the LLM message length limit. - """ - - trun: truncator.Truncator - - async def initialize(self, pipeline_config: dict): - use_method = 'round' - - for trun in truncator.preregistered_truncators: - if trun.name == use_method: - self.trun = trun(self.ap) - break - else: - raise ValueError(f'Unknown truncator: {use_method}') - - async def process(self, query: pipeline_query.Query, stage_inst_name: str) -> entities.StageProcessResult: - """处理""" - if ConfigMigration.resolve_runner_id(query.pipeline_config): - return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) - - query = await self.trun.truncate(query) - - return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) diff --git a/src/langbot/pkg/pipeline/msgtrun/round_policy.py b/src/langbot/pkg/pipeline/msgtrun/round_policy.py deleted file mode 100644 index 659ab13e..00000000 --- a/src/langbot/pkg/pipeline/msgtrun/round_policy.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Shared max-round message window helpers for Pipeline behavior.""" -from __future__ import annotations - -import typing - - -DEFAULT_MAX_ROUND = 10 - - -def get_max_round(config: dict[str, typing.Any]) -> typing.Any: - """Return the configured Pipeline max-round value.""" - return config.get('max-round', DEFAULT_MAX_ROUND) - - -def select_max_round_messages( - messages: list[typing.Any] | None, - max_round: typing.Any, -) -> list[typing.Any]: - """Select a bounded recent message window by user-round count.""" - if not messages: - return [] - - temp_messages: list[typing.Any] = [] - current_round = 0 - - for msg in messages[::-1]: - if current_round < max_round: - temp_messages.append(msg) - if getattr(msg, 'role', None) == 'user': - current_round += 1 - else: - break - - return temp_messages[::-1] diff --git a/src/langbot/pkg/pipeline/msgtrun/truncator.py b/src/langbot/pkg/pipeline/msgtrun/truncator.py deleted file mode 100644 index 180982d3..00000000 --- a/src/langbot/pkg/pipeline/msgtrun/truncator.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations - -import typing -import abc - -from ...core import app -import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query - -preregistered_truncators: list[typing.Type[Truncator]] = [] - - -def truncator_class( - name: str, -) -> typing.Callable[[typing.Type[Truncator]], typing.Type[Truncator]]: - """截断器类装饰器 - - Args: - name (str): 截断器名称 - - Returns: - typing.Callable[[typing.Type[Truncator]], typing.Type[Truncator]]: 装饰器 - """ - - def decorator(cls: typing.Type[Truncator]) -> typing.Type[Truncator]: - assert issubclass(cls, Truncator) - - cls.name = name - - preregistered_truncators.append(cls) - - return cls - - return decorator - - -class Truncator(abc.ABC): - """消息截断器基类""" - - name: str - - ap: app.Application - - def __init__(self, ap: app.Application): - self.ap = ap - - async def initialize(self): - pass - - @abc.abstractmethod - async def truncate(self, query: pipeline_query.Query) -> pipeline_query.Query: - """截断 - - 一般只需要操作query.messages,也可以扩展操作query.prompt, query.user_message。 - 请勿操作其他字段。 - """ - pass diff --git a/src/langbot/pkg/pipeline/msgtrun/truncators/__init__.py b/src/langbot/pkg/pipeline/msgtrun/truncators/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/langbot/pkg/pipeline/msgtrun/truncators/round.py b/src/langbot/pkg/pipeline/msgtrun/truncators/round.py deleted file mode 100644 index 78a55df1..00000000 --- a/src/langbot/pkg/pipeline/msgtrun/truncators/round.py +++ /dev/null @@ -1,29 +0,0 @@ -from __future__ import annotations - -from .. import truncator -import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -from ....agent.runner.config_migration import ConfigMigration -from ..round_policy import ( - get_max_round, - select_max_round_messages, -) - - -@truncator.truncator_class('round') -class RoundTruncator(truncator.Truncator): - """Truncate the conversation message chain to adapt to the LLM message length limit.""" - - async def truncate(self, query: pipeline_query.Query) -> pipeline_query.Query: - """截断""" - runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) - if runner_id: - runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) - else: - runner_config = query.pipeline_config.get('msg-truncate', {}).get('round', {}) - - query.messages = select_max_round_messages( - query.messages, - get_max_round(runner_config), - ) - - return query diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 1426fe3d..27c12269 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -28,7 +28,6 @@ from . import ( wrapper, preproc, ratelimit, - msgtrun, ) importutil.import_modules_in_pkgs( @@ -42,7 +41,6 @@ importutil.import_modules_in_pkgs( wrapper, preproc, ratelimit, - msgtrun, ] ) @@ -438,6 +436,9 @@ class PipelineManager: # initialize stage containers according to pipeline_entity.stages stage_containers: list[StageInstContainer] = [] for stage_name in pipeline_entity.stages: + if stage_name not in self.stage_dict: + self.ap.logger.warning(f'Pipeline stage {stage_name} is not registered; skipping') + continue stage_containers.append(StageInstContainer(inst_name=stage_name, inst=self.stage_dict[stage_name](self.ap))) for stage_container in stage_containers: diff --git a/src/langbot/templates/legacy/pipeline.json b/src/langbot/templates/legacy/pipeline.json index eb57f023..5b8fc9c9 100644 --- a/src/langbot/templates/legacy/pipeline.json +++ b/src/langbot/templates/legacy/pipeline.json @@ -34,11 +34,5 @@ "limit": 60 } } - }, - "msg-truncate": { - "method": "round", - "round": { - "max-round": 10 - } } -} \ No newline at end of file +} diff --git a/tests/unit_tests/agent/test_config_migration.py b/tests/unit_tests/agent/test_config_migration.py index c1e3b560..4dd7805f 100644 --- a/tests/unit_tests/agent/test_config_migration.py +++ b/tests/unit_tests/agent/test_config_migration.py @@ -120,7 +120,7 @@ class TestResolveRunnerConfig: 'runner_config': { 'plugin:langbot/local-agent/default': { 'model': 'uuid-123', - 'max_round': 10, + 'custom_option': 10, }, }, }, @@ -130,7 +130,7 @@ class TestResolveRunnerConfig: pipeline_config, 'plugin:langbot/local-agent/default', ) - assert config == {'model': 'uuid-123', 'max_round': 10} + assert config == {'model': 'uuid-123', 'custom_option': 10} def test_resolve_old_format_config(self): """Runtime config resolver should not read old format.""" @@ -138,7 +138,7 @@ class TestResolveRunnerConfig: 'ai': { 'local-agent': { 'model': 'uuid-123', - 'max_round': 10, + 'custom_option': 10, }, }, } @@ -155,7 +155,7 @@ class TestResolveRunnerConfig: 'ai': { 'local-agent': { 'model': 'uuid-123', - 'max_round': 10, + 'custom_option': 10, 'knowledge-base': 'kb-123', }, }, @@ -165,7 +165,7 @@ class TestResolveRunnerConfig: pipeline_config, 'plugin:langbot/local-agent/default', ) - assert config == {'model': 'uuid-123', 'max_round': 10, 'knowledge-bases': ['kb-123']} + assert config == {'model': 'uuid-123', 'custom_option': 10, 'knowledge-bases': ['kb-123']} assert 'knowledge-base' not in config def test_resolve_no_config(self): diff --git a/tests/unit_tests/agent/test_config_migration_full.py b/tests/unit_tests/agent/test_config_migration_full.py index a55af073..37303a0f 100644 --- a/tests/unit_tests/agent/test_config_migration_full.py +++ b/tests/unit_tests/agent/test_config_migration_full.py @@ -20,7 +20,6 @@ class TestMigratePipelineConfig: }, 'local-agent': { 'model': {'primary': 'model-uuid', 'fallbacks': []}, - 'max-round': 10, 'knowledge-base': 'kb-uuid', 'prompt': [{'role': 'system', 'content': 'Hello'}], }, @@ -35,9 +34,9 @@ class TestMigratePipelineConfig: # Config should be in runner_config assert 'plugin:langbot/local-agent/default' in migrated['ai']['runner_config'] - assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default']['max-round'] == 10 assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default']['knowledge-bases'] == ['kb-uuid'] assert 'knowledge-base' not in migrated['ai']['runner_config']['plugin:langbot/local-agent/default'] + assert 'max-round' not in migrated['ai']['runner_config']['plugin:langbot/local-agent/default'] # Expire-time preserved assert migrated['ai']['runner']['expire-time'] == 0 @@ -76,7 +75,7 @@ class TestMigratePipelineConfig: 'runner_config': { 'plugin:langbot/local-agent/default': { 'model': {'primary': '', 'fallbacks': []}, - 'max-round': 10, + 'custom-option': 10, }, }, }, @@ -86,7 +85,7 @@ class TestMigratePipelineConfig: # Should remain unchanged assert migrated['ai']['runner']['id'] == 'plugin:langbot/local-agent/default' - assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default']['max-round'] == 10 + assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default']['custom-option'] == 10 def test_new_format_local_agent_config_normalizes_legacy_kb_key(self): """Migration should normalize legacy KB aliases before runtime.""" @@ -260,18 +259,18 @@ class TestResolveRunnerConfig: config = { 'ai': { 'runner_config': { - 'plugin:langbot/local-agent/default': {'max-round': 20}, + 'plugin:langbot/local-agent/default': {'custom-option': 20}, }, }, } runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default') - assert runner_config['max-round'] == 20 + assert runner_config['custom-option'] == 20 def test_resolve_old_format_config(self): """resolve_runner_config should not read old ai.local-agent at runtime.""" config = { 'ai': { - 'local-agent': {'max-round': 15}, + 'local-agent': {'max-round': 15, 'custom-option': 20}, }, } runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default') @@ -281,21 +280,21 @@ class TestResolveRunnerConfig: """resolve_legacy_runner_config should read old ai.local-agent for migration.""" config = { 'ai': { - 'local-agent': {'max-round': 15}, + 'local-agent': {'max-round': 15, 'custom-option': 20}, }, } runner_config = ConfigMigration.resolve_legacy_runner_config(config, 'plugin:langbot/local-agent/default') - assert runner_config['max-round'] == 15 + assert runner_config == {'custom-option': 20} def test_resolve_new_format_priority(self): """New format runner_config should take priority.""" config = { 'ai': { 'runner_config': { - 'plugin:langbot/local-agent/default': {'max-round': 25}, + 'plugin:langbot/local-agent/default': {'custom-option': 25}, }, - 'local-agent': {'max-round': 10}, # Old, should be ignored + 'local-agent': {'max-round': 10, 'custom-option': 10}, # Old, should be ignored }, } runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default') - assert runner_config['max-round'] == 25 + assert runner_config['custom-option'] == 25 diff --git a/tests/unit_tests/agent/test_event_first_protocol.py b/tests/unit_tests/agent/test_event_first_protocol.py index a58c1848..8f311429 100644 --- a/tests/unit_tests/agent/test_event_first_protocol.py +++ b/tests/unit_tests/agent/test_event_first_protocol.py @@ -4,7 +4,7 @@ Tests cover: 1. Pipeline Query -> AgentEventEnvelope conversion 2. Pipeline config -> AgentBinding conversion 3. AgentRunContext not inlining full history by default -4. Pipeline max-round only affecting bootstrap/adapter context +4. LangBot Host not defining context-window controls 5. Event-first run() entry point """ from __future__ import annotations @@ -147,23 +147,13 @@ class TestPipelineConfigToBinding: assert binding.scope.scope_type == "pipeline" assert binding.scope.scope_id == mock_query.pipeline_uuid - def test_config_to_binding_max_round(self, mock_query_with_max_round): - """Test max_round extraction for Pipeline adapter.""" - binding = PipelineAdapter.pipeline_config_to_binding( - mock_query_with_max_round, "plugin:test/plugin/runner" - ) - - # max_round should be captured but NOT in Protocol v1 entities - assert binding.max_round == 10 - - def test_config_to_binding_no_max_round(self, mock_query): - """Test binding without max_round.""" + def test_config_to_binding_does_not_add_host_context_window(self, mock_query): + """Pipeline binding should not define Host-side context window controls.""" binding = PipelineAdapter.pipeline_config_to_binding( mock_query, "plugin:test/plugin/runner" ) - # max_round may be None - assert binding.max_round is None + assert not hasattr(binding, "max_round") class TestAgentRunContextProtocolV1: @@ -248,60 +238,23 @@ class TestAgentRunContextProtocolV1: assert ctx.bootstrap is None or isinstance(ctx.bootstrap.messages, list) -class TestMaxRoundNotInProtocol: - """Test that Pipeline max-round only affects adapter context, not Protocol v1.""" +class TestHostContextWindowNotInProtocol: + """Test that Host-side context window controls are not in Protocol v1.""" - def test_max_round_not_in_sdk_context(self): - """Test max-round is not a field in SDK AgentRunContext.""" - # AgentRunContext should not have max_round field + def test_context_window_not_in_sdk_context(self): + """AgentRunContext should not expose Host-side window controls.""" ctx_fields = AgentRunContext.model_fields.keys() assert "max_round" not in ctx_fields assert "maxRound" not in ctx_fields - def test_max_round_in_adapter_context(self): - """Test max_round is in adapter context, not main context.""" - trigger = AgentTrigger(type="message.received") - event = AgentEventContext( - event_id="evt_1", - event_type="message.received", - source="platform", - ) - input = AgentInput(text="Hello") - from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources - from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext - from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext - from langbot_plugin.api.entities.builtin.agent_runner.context import AdapterContext - - adapter = AdapterContext(max_round=10) - - ctx = AgentRunContext( - run_id="run_1", - trigger=trigger, - event=event, - input=input, - delivery=DeliveryContext(surface="platform"), - resources=AgentResources(), - runtime=AgentRuntimeContext(), - adapter=adapter, - ) - - # max_round is in adapter context, not main context - assert ctx.adapter is not None - assert ctx.adapter.max_round == 10 - - def test_binding_max_round_for_adapter_only(self, mock_query_with_max_round): - """Test max_round in binding is for adapter use, not Protocol v1.""" + def test_binding_has_no_context_window_field(self, mock_query): + """Pipeline adapter should not attach context window policy to binding.""" binding = PipelineAdapter.pipeline_config_to_binding( - mock_query_with_max_round, "plugin:test/plugin/runner" + mock_query, "plugin:test/plugin/runner" ) - # max_round is in binding (Host-internal) for Pipeline adapter - assert binding.max_round == 10 - - # But SDK entities don't have it - ctx_fields = AgentRunContext.model_fields.keys() - assert "max_round" not in ctx_fields + assert not hasattr(binding, "max_round") class TestSDKCapabilitiesProtocolV1: @@ -416,18 +369,6 @@ def mock_query(): return query -@pytest.fixture -def mock_query_with_max_round(mock_query): - """Create a mock Query with max_round configuration.""" - mock_query.pipeline_config = { - "ai": { - "runner": "plugin:test/plugin/runner", - "max-round": 10, - } - } - return mock_query - - @pytest.fixture def mock_query_no_session(): """Create a mock Query without session.""" diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index c1470280..97227a95 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio import datetime import types -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock import pytest from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine @@ -332,8 +332,8 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(clean_agent @pytest.mark.asyncio -async def test_orchestrator_packages_max_round_without_mutating_query(clean_agent_state): - """Test that max-round is packaged without mutating original query.""" +async def test_orchestrator_does_not_package_query_messages_into_context(clean_agent_state): + """Host should not build an agent working-context window from query.messages.""" db_engine = clean_agent_state descriptor = make_descriptor() plugin_connector = FakePluginConnector( @@ -347,7 +347,7 @@ async def test_orchestrator_packages_max_round_without_mutating_query(clean_agen ap = FakeApplication(plugin_connector, db_engine) orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) query = make_query() - query.pipeline_config["ai"]["runner_config"][RUNNER_ID]["max-round"] = 2 + query.pipeline_config["ai"]["runner_config"][RUNNER_ID]["agent-window"] = 2 query.messages = [ provider_message.Message(role="user", content="message 1"), provider_message.Message(role="assistant", content="response 1"), @@ -361,21 +361,10 @@ async def test_orchestrator_packages_max_round_without_mutating_query(clean_agen assert len(messages) == 1 context = plugin_connector.contexts[0] - # Protocol v1: messages are in bootstrap.messages - assert context["bootstrap"] is not None - assert [message["content"] for message in context["bootstrap"]["messages"]] == [ - "message 2", - "response 2", - "message 3", - "response 3", - ] - # Also exposed in adapter.adapter_messages for runners that consume adapter bootstrap. - assert [message["content"] for message in context["adapter"]["adapter_messages"]] == [ - "message 2", - "response 2", - "message 3", - "response 3", - ] + assert context["config"]["agent-window"] == 2 + assert context["bootstrap"] is None + assert "adapter_messages" not in context["adapter"] + assert "context_packaging" not in context["runtime"]["metadata"] assert [message.content for message in query.messages] == [ "message 1", "response 1", @@ -384,18 +373,6 @@ async def test_orchestrator_packages_max_round_without_mutating_query(clean_agen "message 3", "response 3", ] - assert context["runtime"]["metadata"]["context_packaging"] == { - "policy": { - "mode": "max_round", - "max_round": 2, - }, - "history": { - "source": "query.messages", - "source_total_count": 6, - "delivered_count": 4, - "messages_complete": False, - }, - } @pytest.mark.asyncio @@ -493,7 +470,7 @@ async def test_orchestrator_enforces_total_runner_deadline(clean_agent_state): assert exc_info.value.retryable is True assert "runner.timeout" in str(exc_info.value) - assert await get_session_registry().get(plugin_connector.contexts[0]["run_id"]) is None + assert await get_session_registry().list_active_runs() == [] class TestPipelineCompatibilityQueryIdInSession: @@ -610,7 +587,7 @@ class TestPipelineAdapterPromptAndParams: ], ) - messages = [message async for message in orchestrator.run_from_query(query)] + _messages = [message async for message in orchestrator.run_from_query(query)] context = plugin_connector.contexts[0] # Prompt should be in adapter.extra @@ -641,7 +618,7 @@ class TestPipelineAdapterPromptAndParams: "another_param": 123, } - messages = [message async for message in orchestrator.run_from_query(query)] + _messages = [message async for message in orchestrator.run_from_query(query)] context = plugin_connector.contexts[0] assert context["adapter"]["extra"]["params"] == { @@ -671,7 +648,7 @@ class TestPipelineAdapterPromptAndParams: "_pipeline_bound_plugins": ["plugin1"], } - messages = [message async for message in orchestrator.run_from_query(query)] + _messages = [message async for message in orchestrator.run_from_query(query)] context = plugin_connector.contexts[0] params = context["adapter"]["extra"]["params"] @@ -703,7 +680,7 @@ class TestPipelineAdapterPromptAndParams: "credential": "secret000", } - messages = [message async for message in orchestrator.run_from_query(query)] + _messages = [message async for message in orchestrator.run_from_query(query)] context = plugin_connector.contexts[0] params = context["adapter"]["extra"]["params"] @@ -735,7 +712,7 @@ class TestPipelineAdapterPromptAndParams: "a_lambda": lambda x: x, # function is not JSON-serializable } - messages = [message async for message in orchestrator.run_from_query(query)] + _messages = [message async for message in orchestrator.run_from_query(query)] context = plugin_connector.contexts[0] params = context["adapter"]["extra"]["params"] diff --git a/tests/unit_tests/pipeline/test_msgtrun.py b/tests/unit_tests/pipeline/test_msgtrun.py deleted file mode 100644 index 04308058..00000000 --- a/tests/unit_tests/pipeline/test_msgtrun.py +++ /dev/null @@ -1,369 +0,0 @@ -""" -Unit tests for ConversationMessageTruncator (msgtrun) pipeline stage. - -Tests cover: -- Normal truncation behavior based on max-round -- Boundary length handling -- Empty message handling -- Multi-message chain truncation -""" - -from __future__ import annotations - -import pytest -from importlib import import_module - -from tests.factories import ( - FakeApp, - text_query, -) - -import langbot_plugin.api.entities.builtin.provider.message as provider_message - - -RUNNER_ID = 'plugin:langbot/local-agent/default' - - -def get_msgtrun_module(): - """Lazy import to avoid circular import issues.""" - # Import pipelinemgr first to trigger stage registration - import_module('langbot.pkg.pipeline.pipelinemgr') - return import_module('langbot.pkg.pipeline.msgtrun.msgtrun') - - -def get_truncator_module(): - """Lazy import for truncator base.""" - return import_module('langbot.pkg.pipeline.msgtrun.truncator') - - -def get_entities_module(): - """Lazy import for pipeline entities.""" - return import_module('langbot.pkg.pipeline.entities') - - -def get_round_truncator_module(): - """Lazy import for round truncator.""" - return import_module('langbot.pkg.pipeline.msgtrun.truncators.round') - - -def make_truncate_config(max_round: int = 5): - """Create a pipeline config with max-round setting.""" - return { - 'msg-truncate': { - 'method': 'round', - 'round': { - 'max-round': max_round, - }, - }, - } - - -def make_agent_runner_config(max_round: int = 5): - """Create an AgentRunner pipeline config with max-round binding config.""" - return { - 'ai': { - 'runner': {'id': RUNNER_ID}, - 'runner_config': { - RUNNER_ID: { - 'max-round': max_round, - }, - }, - } - } - - -class TestConversationMessageTruncatorInit: - """Tests for ConversationMessageTruncator initialization.""" - - @pytest.mark.asyncio - async def test_initialize_round_truncator(self): - """Initialize should select 'round' truncator by default.""" - msgtrun = get_msgtrun_module() - truncator = get_truncator_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config() - - await stage.initialize(pipeline_config) - - assert stage.trun is not None - assert isinstance(stage.trun, truncator.Truncator) - - @pytest.mark.asyncio - async def test_initialize_unknown_truncator_raises(self): - """Initialize with unknown truncator method should raise ValueError.""" - msgtrun = get_msgtrun_module() - truncator = get_truncator_module() - - # Save original preregistered_truncators - original_truncators = truncator.preregistered_truncators.copy() - - try: - # Clear registered truncators to simulate unknown method - truncator.preregistered_truncators = [] - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config() - - with pytest.raises(ValueError, match='Unknown truncator'): - await stage.initialize(pipeline_config) - finally: - # Restore original truncators - truncator.preregistered_truncators = original_truncators - - -class TestRoundTruncatorProcess: - """Tests for RoundTruncator truncation behavior.""" - - @pytest.mark.asyncio - async def test_truncate_within_limit(self): - """Messages within max-round limit should not be truncated.""" - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config(max_round=5) - - await stage.initialize(pipeline_config) - - # Create query with 3 messages (within limit) - query = text_query("current message") - query.pipeline_config = pipeline_config - query.messages = [ - provider_message.Message(role='user', content='message 1'), - provider_message.Message(role='assistant', content='response 1'), - provider_message.Message(role='user', content='message 2'), - provider_message.Message(role='assistant', content='response 2'), - provider_message.Message(role='user', content='current message'), - ] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - # All messages should be preserved - assert len(result.new_query.messages) == 5 - - @pytest.mark.asyncio - async def test_agent_runner_path_skips_pipeline_truncation(self): - """AgentRunner path should leave query.messages intact at pipeline stage.""" - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_agent_runner_config(max_round=1) - - await stage.initialize(pipeline_config) - - query = text_query("current") - query.pipeline_config = pipeline_config - query.messages = [ - provider_message.Message(role='user', content='old1'), - provider_message.Message(role='assistant', content='old1_resp'), - provider_message.Message(role='user', content='current'), - ] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - assert [(msg.role, msg.content) for msg in result.new_query.messages] == [ - ('user', 'old1'), - ('assistant', 'old1_resp'), - ('user', 'current'), - ] - - @pytest.mark.asyncio - async def test_truncate_exceeds_limit(self): - """Messages exceeding max-round should be truncated precisely. - - Algorithm: traverse backwards, collect while current_round < max_round, count user messages as rounds. - For max_round=2 with 7 messages (u1, a1, u2, a2, u3, a3, u_current): - - Iterate: u_current(r=0<2, collect, r=1), a3(r=1<2, collect), u3(r=1<2, collect, r=2) - - a2: r=2 not < 2 → break - - Collected reverse: [u_current, a3, u3] - - Reversed: [u3, a3, u_current] = 3 messages - """ - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config(max_round=2) # Only keep 2 rounds - - await stage.initialize(pipeline_config) - - # Create query with many messages exceeding limit - # 7 messages = 3 full rounds + 1 current user - query = text_query("current message") - query.pipeline_config = pipeline_config - query.messages = [ - provider_message.Message(role='user', content='message 1'), - provider_message.Message(role='assistant', content='response 1'), - provider_message.Message(role='user', content='message 2'), - provider_message.Message(role='assistant', content='response 2'), - provider_message.Message(role='user', content='message 3'), - provider_message.Message(role='assistant', content='response 3'), - provider_message.Message(role='user', content='current message'), - ] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - # Should keep exactly 3 messages: message3, response3, current message - messages = result.new_query.messages - assert len(messages) == 3 - - # Verify exact message content - assert messages[0].role == 'user' - assert messages[0].content == 'message 3' - assert messages[1].role == 'assistant' - assert messages[1].content == 'response 3' - assert messages[2].role == 'user' - assert messages[2].content == 'current message' - - @pytest.mark.asyncio - async def test_truncate_empty_messages(self): - """Empty messages list should return empty list.""" - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config() - - await stage.initialize(pipeline_config) - - query = text_query("hello") - query.pipeline_config = pipeline_config - query.messages = [] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - assert len(result.new_query.messages) == 0 - - @pytest.mark.asyncio - async def test_truncate_single_message(self): - """Single message should be preserved.""" - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config() - - await stage.initialize(pipeline_config) - - query = text_query("hello") - query.pipeline_config = pipeline_config - query.messages = [ - provider_message.Message(role='user', content='hello'), - ] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - assert len(result.new_query.messages) == 1 - - @pytest.mark.asyncio - async def test_truncate_preserves_order(self): - """Truncation should preserve message order.""" - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config(max_round=2) - - await stage.initialize(pipeline_config) - - query = text_query("current") - query.pipeline_config = pipeline_config - query.messages = [ - provider_message.Message(role='user', content='user1'), - provider_message.Message(role='assistant', content='asst1'), - provider_message.Message(role='user', content='user2'), - provider_message.Message(role='assistant', content='asst2'), - provider_message.Message(role='user', content='user3'), - ] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - - messages = result.new_query.messages - assert [(msg.role, msg.content) for msg in messages] == [ - ('user', 'user2'), - ('assistant', 'asst2'), - ('user', 'user3'), - ] - - @pytest.mark.asyncio - async def test_truncate_max_round_one(self): - """max-round=1 should only keep last user message.""" - msgtrun = get_msgtrun_module() - entities = get_entities_module() - - app = FakeApp() - stage = msgtrun.ConversationMessageTruncator(app) - - pipeline_config = make_truncate_config(max_round=1) - - await stage.initialize(pipeline_config) - - query = text_query("current") - query.pipeline_config = pipeline_config - query.messages = [ - provider_message.Message(role='user', content='old1'), - provider_message.Message(role='assistant', content='old1_resp'), - provider_message.Message(role='user', content='current'), - ] - - result = await stage.process(query, 'ConversationMessageTruncator') - - assert result.result_type == entities.ResultType.CONTINUE - messages = result.new_query.messages - assert [(msg.role, msg.content) for msg in messages] == [('user', 'current')] - - -class TestRoundTruncatorDirect: - """Direct tests for RoundTruncator class.""" - - @pytest.mark.asyncio - async def test_round_truncator_direct_process(self): - """Test RoundTruncator truncate method directly.""" - truncator_mod = get_truncator_module() - - app = FakeApp() - - # Get the RoundTruncator class from preregistered - for trun_cls in truncator_mod.preregistered_truncators: - if trun_cls.name == 'round': - trun = trun_cls(app) - break - - query = text_query("hello") - query.pipeline_config = make_truncate_config(max_round=3) - query.messages = [ - provider_message.Message(role='user', content='m1'), - provider_message.Message(role='assistant', content='r1'), - provider_message.Message(role='user', content='m2'), - provider_message.Message(role='assistant', content='r2'), - provider_message.Message(role='user', content='hello'), - ] - - result = await trun.truncate(query) - - assert result is not None - assert hasattr(result, 'messages')