From 036affe01fd90b606bbc7bef4c745751103d6658 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Sun, 17 May 2026 23:26:52 +0800 Subject: [PATCH] feat(agent-runner): enrich plugin runner host context --- .../IMPLEMENTATION_PLAN.md | 119 ++++++--- .../OFFICIAL_RUNNER_PLUGINS.md | 75 +++--- docs/agent-runner-pluginization/README.md | 60 ++++- .../pkg/agent/runner/context_builder.py | 38 ++- .../pkg/agent/runner/resource_builder.py | 110 ++++++-- src/langbot/pkg/plugin/handler.py | 77 +++++- .../test_context_builder_params_state.py | 48 +++- .../unit_tests/agent/test_resource_builder.py | 148 +++++++++++ .../unit_tests/plugin/test_handler_actions.py | 245 ++++++++++++++++++ 9 files changed, 806 insertions(+), 114 deletions(-) create mode 100644 tests/unit_tests/agent/test_resource_builder.py diff --git a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md index b8279680..cdae3e31 100644 --- a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md +++ b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md @@ -1,6 +1,8 @@ -# Agent Runner 插件化最终实现计划 +# Agent Runner 插件化当前实现与收尾计划 -本文档面向实现 agent,用来把当前 PoC 分支直接推进到最终架构。这个分支不按线上渐进发布节奏处理,因此可以接受一次性破坏内部 runner 实现和 Pipeline AI 配置结构;但最终必须提供历史配置迁移。 +本文档面向实现 agent,用来把当前 AgentRunner 插件化实现推进到可迁移状态。 + +当前代码已经不是从零开始的 PoC。LangBot 已经具备 registry、orchestrator、context/resource builder、result normalizer 和插件 runtime action。本计划重点描述剩余工作:补齐宿主通用能力、对齐旧内置 runner 行为、完成官方 runner 插件迁移验收。 ## 1. 最终状态 @@ -18,6 +20,28 @@ LangBot 最终只保留 Agent Runner 的宿主能力: LangBot 不再长期维护内置业务 runner 分支。`local-agent`、Dify、n8n、Coze、DashScope、Langflow、Tbox 等都迁到官方 AgentRunner 插件。 +迁移期间允许旧 `RequestRunner` 文件继续存在,作为行为对齐基准和回退分析材料。它们不影响当前进度;真正的最终条件是主聊天执行路径不再依赖旧 runner。 + +## 1.1 当前状态快照 + +已完成或基本完成: + +- `AgentRunnerDescriptor`、runner id 解析、registry。 +- `AgentRunOrchestrator` 替换 `ChatMessageHandler` 内部 runner 调度。 +- `AgentRunContextBuilder`、`AgentResourceBuilder`、`AgentResultNormalizer`。 +- `ai.runner.id` + `ai.runner_config[id]` 的读取与旧配置映射。 +- AgentRunner runtime action:`LIST_AGENT_RUNNERS`、`RUN_AGENT`。 +- run-scoped proxy authorization:模型、工具、知识库、存储、文件。 + +仍需收尾: + +- `AgentRunContext` 暴露宿主处理后的有效 prompt、结构化输入和 runtime metadata。 +- AgentRun proxy action 通过 `run_id/query_id` 找回当前 Query,保留旧 runner 行为所需上下文。 +- `AgentResourceBuilder` 按 DynamicForm schema 泛化模型/rerank/知识库/文件授权。 +- 官方 `local-agent` 插件完成旧内置 local-agent parity。 +- timeout/deadline、取消、插件无输出、协议错误的端到端保护。 +- 官方 runner 插件安装/预装/迁移缺失处理。 + ## 2. 高层架构 ```text @@ -137,14 +161,29 @@ class AgentRunnerDescriptor(BaseModel): - `event`: message event envelope 子集 - `actor`: sender - `subject`: 当前消息或 launcher +- `prompt`: 宿主已处理的有效 prompt,即 `query.prompt.messages` - `messages`: `query.messages` - `input`: 从 `query.user_message` 和 `query.message_chain` 构造 +- `params`: 过滤后的公开业务变量 - `resources`: 由 `resource_builder` 注入 +- `state`: host-managed scoped state snapshot - `runtime`: host/version/workspace/bot/pipeline/query/trace/deadline - `config`: 当前 Pipeline 对该 runner id 的绑定配置,即 `ai.runner_config[runner_id]` 保留 SDK legacy helper 是 SDK 的责任,LangBot 不再构造 PoC 的 `query_id/session/messages/user_message/extra_config` 上下文。 +`prompt` 的语义必须明确:它不是静态配置 `config["prompt"]`,而是 LangBot PreProcessor 和 `PromptPreProcessing` 插件事件之后的有效 prompt。旧内置 local-agent 请求模型时使用: + +```python +query.prompt.messages + query.messages + [query.user_message] +``` + +插件化 runner 要保持行为一致,应消费: + +```python +ctx.prompt + ctx.messages + [current_user_message_from_ctx.input] +``` + ### 3.5 resource_builder.py 执行前做三层裁剪: @@ -155,14 +194,22 @@ class AgentRunnerDescriptor(BaseModel): 输出写入 `ctx.resources`,至少覆盖: -- models:可调用模型 UUID、类型、能力摘要 +- models:可调用模型 UUID、类型、能力摘要。包括 LLM、fallback LLM、rerank 等 runner config schema 中选择的模型类资源。 - tools:可见工具 manifest,使用当前 bound plugins / MCP server 范围 - knowledge_bases:可检索知识库列表 - storage:plugin storage / workspace storage 权限摘要 - files:允许读取的配置文件、知识文件摘要 - platform_capabilities:本阶段只声明,不执行平台动作 -注意:旧的 unrestricted proxy action 必须在 Phase 2 被二次校验,不能只靠 context 声明。 +注意:旧的 unrestricted proxy action 必须二次校验,不能只靠 context 声明。AgentRunner 可用资源应来自 `ctx.resources`,不是插件 runtime 的全局能力。 + +资源裁剪要尽量通用,不应只写死 local-agent: + +- `model-fallback-selector` 授权 primary/fallback LLM。 +- `llm-model-selector` 授权 LLM。 +- `rerank-model-selector` 授权 rerank 模型。 +- `knowledge-base-multi-selector` 授权知识库。 +- 后续新增 selector 时应在 resource builder 中统一扩展。 ### 3.6 result_normalizer.py @@ -293,51 +340,53 @@ async def run_from_query(query: pipeline_query.Query) -> AsyncGenerator[Message 可以暂时保留文件作为官方插件迁移参考,但不应被运行时引用。 -## 6. 实现顺序 +## 6. 收尾实现顺序 -### Step 1:接入新版 SDK +### Step 1:补齐宿主上下文 -- 更新 LangBot 依赖到包含 SDK v1 AgentRunner 协议的版本 -- 删除 LangBot 中对旧 `AgentRunReturn` 类型名的依赖 -- 确认 `langbot_plugin` 的本地 editable / lockfile 指向正确 SDK +- SDK `AgentRunContext` 增加 `prompt`,并保持向后兼容默认空列表。 +- LangBot context builder 写入 `ctx.prompt`、`ctx.input.contents`、`ctx.runtime.metadata.streaming_supported`、`ctx.runtime.metadata.remove_think`。 +- 保持 `ctx.config` 只表达静态绑定配置。 -### Step 2:Agent 子系统骨架 +### Step 2:增强宿主 AgentRun proxy action -- 新增 descriptor/id/errors -- 新增 registry,先只 list plugin runner -- 为 registry 加单测,使用 fake connector +- `invoke_llm` / `invoke_llm_stream` 通过 `run_id/query_id` 找回当前 Query。 +- 自动合并 model persisted `extra_args` 与 action-level override。 +- 自动应用 pipeline `remove-think`,并允许 action 显式 override。 +- `call_tool` 传回当前 Query,恢复旧工具调用上下文。 +- `retrieve_knowledge` 保持 `bot_uuid`、`sender_id`、`session_name` 等 settings。 +- `invoke_rerank` 使用 run-scoped model authorization。 -### Step 3:Pipeline metadata 切 registry +### Step 3:泛化资源构建 -- `get_pipeline_metadata()` 只通过 registry 输出 runner option -- 插件 runner config stage 从 descriptor.config_schema 生成 -- schema 错误不影响 metadata 返回 +- 按 manifest permissions + bound plugins/MCP + runner config schema 构造资源。 +- 支持 primary/fallback LLM、rerank model、KB selector。 +- 不把 local-agent 特例扩散到通用资源层。 -### Step 4:Orchestrator 替换 ChatMessageHandler +### Step 4:local-agent parity -- 新增 context builder / result normalizer / orchestrator -- `chat.py` 删除 wrapper 和 runner 查找 -- 维持现有流式卡片和 resp_messages 行为 +- 使用 `ctx.prompt` 而不是重新读取 `ctx.config["prompt"]`。 +- 当前 user message 从 `ctx.input.contents` 构造,保留多模态内容。 +- RAG 只替换/插入文本部分,不丢图片/文件。 +- streaming/non-streaming 默认跟随 `runtime.metadata.streaming_supported`。 +- 首轮 fallback 成功后,tool loop 固定使用 committed model。 +- tool loop 继续传可用 tools,支持多步工具调用。 +- rerank 通过授权模型资源调用。 -### Step 5:新配置读写 +### Step 5:端到端保护和测试 -- 后端 resolve runner id 支持新旧配置 -- 前端表单改 `runner.id` + `runner_config` -- 默认配置改官方 local-agent 插件 id +- 插件无输出时按 runner failed 处理。 +- timeout/deadline 覆盖 plugin runtime、模型调用和外部 runner 调用。 +- runner 协议错误转受控错误。 +- 覆盖旧 local-agent 行为 parity:普通回复、流式、工具、多步工具、KB、rerank、多模态、PromptPreProcessing。 -### Step 6:权限和资源裁剪 - -- resource builder 根据 manifest / pipeline / runner binding config 裁剪 -- proxy action 校验 resource scope -- 禁止插件用 unrestricted API 访问未授权知识库、工具、模型 - -### Step 7:删除内置 runner 运行分支 +### Step 6:官方 runner 迁移 - 官方插件 ready 后移除内置 runner registry - 删除或隔离 provider runners 的运行引用 - 测试旧 runner 名只能通过 migration 映射到插件 id -### Step 8:历史配置迁移 +### Step 7:历史配置迁移 - 写 persistence migration - 更新 default pipeline config @@ -371,6 +420,8 @@ async def run_from_query(query: pipeline_query.Query) -> AsyncGenerator[Message - `PipelineService` 不直接拼插件 runner metadata。 - 所有 runner 配置使用 `ai.runner.id` + `ai.runner_config`。 - 插件 runtime 不为每个 Pipeline 或 runner 配置创建插件实例;`runner_config` 只作为绑定配置随 `ctx.config` 传入。 -- 旧内置 runner 不再作为 LangBot 内部运行分支执行。 +- 主聊天路径不再通过旧内置 runner 执行业务 runner。迁移期间旧文件可以保留。 - 插件只能访问 `ctx.resources` 授权的模型、工具、知识库和文件。 +- 宿主 action 能为 AgentRunner 调用恢复必要 Query 语义,插件不需要拿裸 Query。 +- 官方 `local-agent` 插件对外行为与旧内置 local-agent 对齐。 - EBA 相关字段只作为 context/result 预留,不执行平台动作。 diff --git a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md index 18dc1dca..4c03317e 100644 --- a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md +++ b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md @@ -1,12 +1,12 @@ -# 官方 AgentRunner 插件仓库计划 +# 官方 AgentRunner 插件迁移计划 -本文档描述内置 `RequestRunner` 迁出 LangBot 后,官方 runner 插件仓库应如何组织。建议新建仓库: +本文档描述内置 `RequestRunner` 迁出 LangBot 后,官方 runner 插件如何组织、迁移和验收。 -```text -/home/glwuy/langbot-app/langbot-official-agent-runners -``` +当前实现已经进入过渡阶段: -远端仓库名建议:`langbot-official-agent-runners`。 +- LangBot 主聊天路径通过 `AgentRunOrchestrator` 调用插件化 `AgentRunner`。 +- 旧 `src/langbot/pkg/provider/runners/*` 仍保留,作为迁移参考和回退分析材料;在官方插件迁移完成前不要求删除。 +- 官方 runner 当前以独立插件目录/仓库推进,例如 `langbot-local-agent/` 和 `langbot-agent-runner/*-agent/`。不再要求先落地单一 monorepo。 ## 1. 为什么新仓库 @@ -16,43 +16,32 @@ - SDK 仓库维护 AgentRunner 组件和 runtime 协议。 - 官方 runner 插件承载业务 runner 的具体实现和第三方平台适配。 -不要把官方 runner 插件继续留在 LangBot 主仓库,否则容易重新形成“宿主和业务 runner 绑死”的结构。 +不要把官方 runner 插件重新绑死在 LangBot 主仓库内。允许开发期使用本地路径插件,但运行边界必须保持为: + +- LangBot 提供通用宿主能力:上下文、资源授权、模型/工具/知识库调用代理、结果归一。 +- 插件消费这些能力,实现具体 runner 行为。 +- 旧内置 runner 只作为行为对齐的基准,不作为长期运行路径。 ## 2. 仓库结构 -建议采用 monorepo: +当前推荐策略是“官方插件可独立发布,必要时共享 SDK helper”。开发期可以采用本地多目录布局: ```text -langbot-official-agent-runners/ - README.md - pyproject.toml - packages/ - local-agent/ - manifest.yaml - components/default.yaml - main.py - src/ - tests/ - dify-agent/ +langbot-app/ + langbot-local-agent/ + manifest.yaml + components/agent_runner/default.yaml + components/agent_runner/default.py + pkg/ + tests/ + langbot-agent-runner/ n8n-agent/ - coze-agent/ - dashscope-agent/ - langflow-agent/ - tbox-agent/ - shared/ - langbot_agent_runner_utils/ - __init__.py - context.py - config.py - streaming.py - tool_calling.py - errors.py - tests/ - fixtures/ - integration/ + ... ``` -先用一个仓库统一迁移,避免每个 runner 复制 SDK helper、测试夹具、发布脚本。 +后续可以把多个官方 runner 聚合进 monorepo,也可以继续独立发布。这个选择不影响协议设计;协议边界由 SDK 和 LangBot 宿主保证。 + +如果多个 runner 出现重复逻辑,优先沉淀到 SDK 或一个明确的共享 helper 包,不要把宿主私有结构泄漏给插件。 ## 3. 插件命名和 runner id @@ -155,10 +144,18 @@ execution: 与 LangBot 主仓库的责任边界: -- LangBot 构造 `ctx.messages`、`ctx.input`、`ctx.resources` +- LangBot 构造 `ctx.prompt`、`ctx.messages`、`ctx.input`、`ctx.resources` - 插件负责选择模型、拼请求、调用 LLM、处理 tool call loop、输出 result stream - 插件不能绕过 `ctx.resources` 调用未授权模型、工具或知识库 +为了保持旧内置 runner 行为,`local-agent` 插件必须优先消费宿主处理后的有效上下文: + +- `ctx.prompt`:PreProcessor 和 `PromptPreProcessing` 插件事件处理后的有效 prompt;不是静态 `ctx.config["prompt"]` 的同义词。 +- `ctx.messages`:已由宿主加载并经过 prompt preprocessing 的历史消息。 +- `ctx.input.contents`:当前结构化输入,必须保留图片、文件等多模态内容。 +- `ctx.runtime.metadata.streaming_supported`:当前 adapter 是否能消费流式输出。 +- 宿主代理 action:模型、工具、知识库、rerank 调用应通过 `run_id/query_id` 找回当前 Query,以复用旧 runner 拥有的上下文能力。 + ## 7. 外部 runner 插件要求 外部平台 runner 迁移时遵循: @@ -182,11 +179,13 @@ execution: - 开发阶段使用本地路径插件。 - 发布前支持 marketplace 安装。 - 历史配置 migration 只在官方插件可用时执行。 +- 迁移期间保留旧内置 runner 文件,直到对应官方插件通过 parity 验收。 ## 9. 验收标准 - 每个旧 runner 都有对应官方 AgentRunner 插件。 - 旧 runner 配置能无损复制到新 `runner_config[id]`。 -- LangBot 主仓库不再通过 `RequestRunner` 执行业务 runner。 +- LangBot 主聊天路径不再通过 `RequestRunner` 执行业务 runner。 - 官方插件测试覆盖非流式、流式、错误、timeout、配置缺失。 -- `local-agent` 插件能完成模型 fallback、tool calling、知识库检索。 +- `local-agent` 插件能完成模型 fallback、tool calling、知识库检索、多模态输入、prompt preprocessing 后的有效 prompt 消费、rerank。 +- 对外行为与旧内置 local-agent runner 保持一致;代码结构不需要相同。 diff --git a/docs/agent-runner-pluginization/README.md b/docs/agent-runner-pluginization/README.md index 0651af14..5f6b0b00 100644 --- a/docs/agent-runner-pluginization/README.md +++ b/docs/agent-runner-pluginization/README.md @@ -8,6 +8,22 @@ 本设计只聚焦 Agent Runner 插件化。EBA 文档中的事件体系、平台 API、事件路由只作为接口预留和未来兼容参考,不纳入本阶段实现范围。 +## 1.1 当前实现状态 + +当前实现已经不是早期 PoC: + +- LangBot 已有 `AgentRunnerRegistry`、`AgentRunOrchestrator`、`AgentRunContextBuilder`、`AgentResourceBuilder`、`AgentResultNormalizer`。 +- `ChatMessageHandler` 主路径已经委托给 orchestrator,不再直接解析插件 runner 或实例化 wrapper。 +- Pipeline metadata 已经从 registry 动态生成插件 runner 选项和配置 stage。 +- SDK 已有 Protocol v1 的 `AgentRunContext`、`AgentRunResult`、capabilities、permissions、`AgentRunAPIProxy`。 +- 旧 `RequestRunner` 文件仍保留,当前作为迁移基准和回退分析材料;最终 parity 完成后再移除或隔离。 + +当前仍在收尾的重点不是“能不能调用插件 runner”,而是: + +- 宿主侧通用能力是否足够,让插件 runner 获得旧内置 runner 隐式拥有的上下文。 +- `local-agent` 官方插件是否能在对外行为上对齐旧内置 local-agent。 +- 权限裁剪、timeout、错误隔离和端到端 parity 测试是否完整。 + ## 2. 目标与非目标 目标: @@ -26,16 +42,15 @@ - 不改变现有 Pipeline 的阶段链和私聊/群聊入口。 - 不引入插件内自定义长驻调度器;Agent Runner 仍由 LangBot 显式调用。 -## 3. 当前分支问题 +## 3. 当前实现剩余问题 -当前分支的实现可以作为 PoC,但需要调整: +以下是当前实现仍需要收敛的点: -- `AgentRunContext` 仍是 query 视角,字段包括 `query_id`、`session`、`messages`、`user_message`、`use_funcs`、`extra_config`,对非消息事件和复杂任务上下文表达不足。 -- runner 标识使用 `plugin:author/plugin_name/runner_name` 字符串拼接,缺少结构化 ID、版本、能力和权限信息。 -- LangBot 在 `PipelineService.get_pipeline_metadata()` 中直接把插件配置 schema 拼进 AI metadata,缺少缓存、失败隔离和 schema 兼容验证。 -- `ChatMessageHandler` 内部直接解析插件 runner 名称并调用 wrapper,调度逻辑和消息处理逻辑耦合。 -- SDK 的 `AgentRunner.run()` 只接受单一上下文,没有生命周期 hooks、能力声明、配置 schema 分层和运行结果协议版本。 -- 工具调用、知识检索、LLM 调用目前依赖零散 proxy action,缺少 Agent 运行期明确的 capability set。 +- `AgentRunContext` 需要持续补齐宿主处理后的有效上下文,例如有效 prompt、结构化输入、runtime metadata、params/state。 +- `AgentRunAPIProxy` 需要通过 `run_id/query_id` 保留旧 runner 隐式拥有的 Query 语义,例如工具调用上下文、知识库检索 settings、模型 extra args、remove-think。 +- `AgentResourceBuilder` 应按 manifest + Pipeline 绑定 + runner config schema 通用裁剪资源,不能只为 local-agent 写死。 +- `local-agent` 插件需要对齐旧内置 runner 的外部行为,包括 prompt preprocessing、多模态、fallback、tool loop、RAG、rerank、流式/非流式选择。 +- timeout/deadline、取消、插件无输出、结果过大等运行保护还需要更完整的端到端验证。 ## 4. 总体架构 @@ -142,9 +157,12 @@ class AgentRunContext(BaseModel): event: AgentEventContext | None = None actor: ActorContext | None = None subject: SubjectContext | None = None + prompt: list[Message] = [] messages: list[Message] = [] input: AgentInput + params: dict[str, Any] = {} resources: AgentResources + state: AgentRunState = AgentRunState() runtime: AgentRuntimeContext config: dict[str, Any] = {} ``` @@ -155,8 +173,12 @@ class AgentRunContext(BaseModel): - `conversation` 承载会话历史、launcher、sender、bot 等聊天语义。 - `event` 是未来 EBA 的预留封装,本阶段可以由 query 生成一个最小 message event。 - `actor` 表示触发者,`subject` 表示事件作用对象,例如被邀请用户、被撤回消息、被操作群组。 -- `input` 是 runner 的主输入,不再强制等同于纯文本消息。 +- `prompt` 是宿主处理后的有效 prompt。它来自 LangBot 当前 conversation prompt,并且已经过 `PromptPreProcessing` 等插件事件处理;runner 调模型时应优先使用它,而不是重新读取静态 `config["prompt"]`。 +- `messages` 是历史消息,也已经过宿主 pipeline preprocessing。 +- `input` 是 runner 的主输入,不再强制等同于纯文本消息;`input.contents` 必须保留图片、文件等结构化内容。 +- `params` 是单次运行的公开业务变量,宿主过滤内部变量和敏感变量后提供。 - `resources` 列出 LangBot 已授权给 runner 的工具、知识库、模型、文件等。 +- `state` 是宿主管理的持久 runner-scoped 状态快照。 - `runtime` 提供 host 信息、workspace/bot/pipeline 标识、trace id、deadline 等。 - `config` 是当前 Pipeline 或未来事件绑定对该 runner id 的绑定配置,替代当前 `extra_config`。 @@ -199,14 +221,22 @@ class AgentRunResult(BaseModel): Agent Runner 插件需要使用 LangBot 能力,但这些能力必须通过显式授权暴露: -- 模型:`invoke_llm`、`invoke_llm_stream`、embedding。 -- 工具:`list_tools`、`get_tool_detail`、`call_tool`。 -- 知识:`list_knowledge_bases`、`retrieve_knowledge`。 +- 模型:`invoke_llm`、`invoke_llm_stream`、rerank、后续 embedding。 +- 工具:`get_tool_detail`、`call_tool`。runner 通过 `ctx.resources.tools` 获取已授权工具列表,不暴露 unrestricted `list_tools`。 +- 知识:`retrieve_knowledge`。runner 通过 `ctx.resources.knowledge_bases` 获取已授权知识库列表,不暴露 unrestricted `list_knowledge_bases`。 - 存储:plugin storage、workspace storage。 - 文件:配置文件读取、知识文件读取。 SDK 应把这些能力按 capability 分组。LangBot 在调用 runner 前根据 runner manifest、pipeline 配置、插件绑定范围生成 `resources`,插件不能绕过资源列表调用未授权对象。 +宿主 action handler 不应只是把请求转发给 provider/tool/knowledge manager。对 AgentRunner 调用,它还需要通过 `run_id/query_id` 找回当前 Pipeline Query,并自动补齐旧内置 runner 过去直接拥有的上下文,例如: + +- provider 调用的 `query` +- model `extra_args` +- 输出设置 `remove-think` +- 工具调用需要的 Query 上下文 +- 知识库检索的 `bot_uuid`、`sender_id`、`session_name` + ## 6. LangBot 设计 ### 6.1 runner 发现 @@ -366,8 +396,9 @@ LangBot 执行前做三层裁剪: - 兼容当前 `plugin:author/name/runner` 字符串 ID。 - 兼容 `runner.runner` 配置键。 - 提供从旧 runner 配置到 `runner.id` / `runner_config` 的迁移。 -- 将所有内置 `RequestRunner` 强制迁移为内置插件或官方插件包。 -- LangBot 只保留插件 runtime、registry、orchestrator 和兼容迁移逻辑,不再维护独立的内置 runner 执行分支。 +- 将所有内置 `RequestRunner` 强制迁移为官方插件包。 +- 迁移期间旧 `RequestRunner` 文件可以保留作为 parity 基准;主聊天路径不应继续依赖它们。 +- LangBot 最终只保留插件 runtime、registry、orchestrator 和兼容迁移逻辑,不再维护独立的内置 runner 执行分支。 ### Phase 4:为 EBA 接入做预留 @@ -400,6 +431,7 @@ SDK: - 插件 runner 只能看到 LangBot 注入的工具、知识库、模型资源。 - 插件 runner 异常不会中断插件 runtime 或 Pipeline 主流程。 - 旧 Pipeline 配置和旧内置 runner 正常工作。 +- 官方 `local-agent` 插件在外部行为上对齐旧内置 local-agent:有效 prompt、历史消息、结构化输入、RAG、rerank、工具循环、模型 fallback、streaming/non-streaming。 - 文档明确区分“Agent Runner 插件化”和“未来 EBA 架构”。 ## 11. 已确认决策 diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index 3cca668e..cfe7ce85 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -122,6 +122,7 @@ class AgentRunContextV1(typing.TypedDict): actor: dict[str, typing.Any] | None subject: dict[str, typing.Any] | None messages: list[dict[str, typing.Any]] + prompt: list[dict[str, typing.Any]] input: AgentInput params: dict[str, typing.Any] resources: AgentResources @@ -221,6 +222,9 @@ class AgentRunContextBuilder: descriptor.id, ) + streaming_supported = await self._is_stream_output_supported(query) + remove_think = query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) + # Build runtime context runtime: AgentRuntimeContext = { 'langbot_version': self.ap.ver_mgr.get_current_version(), @@ -231,6 +235,8 @@ class AgentRunContextBuilder: 'metadata': { 'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'), 'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'), + 'streaming_supported': streaming_supported, + 'remove_think': remove_think, }, } @@ -243,6 +249,7 @@ class AgentRunContextBuilder: 'actor': self._build_actor(query), 'subject': self._build_subject(query), 'messages': messages, + 'prompt': self._build_prompt(query), 'input': input, 'params': params, 'resources': resources, @@ -256,6 +263,7 @@ class AgentRunContextBuilder: def _build_input(self, query: pipeline_query.Query) -> AgentInput: """Build AgentInput from query.""" text = None + text_parts: list[str] = [] contents: list[dict[str, typing.Any]] = [] if query.user_message: @@ -264,12 +272,17 @@ class AgentRunContextBuilder: for elem in query.user_message.content: contents.append(elem.model_dump(mode='json')) if elem.type == 'text': - text = getattr(elem, 'text', None) + elem_text = getattr(elem, 'text', None) + if elem_text: + text_parts.append(elem_text) else: # Single string content text = str(query.user_message.content) contents.append({'type': 'text', 'text': text}) + if text_parts: + text = ''.join(text_parts) + # Include message_chain for platform-specific format message_chain_dict = None if query.message_chain: @@ -473,6 +486,29 @@ class AgentRunContextBuilder: return int(time.time() + timeout_seconds) + async def _is_stream_output_supported(self, query: pipeline_query.Query) -> bool: + """Check whether the current adapter can consume streaming chunks.""" + try: + return await query.adapter.is_stream_output_supported() + except AttributeError: + return False + except Exception: + return False + + def _build_prompt(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]: + """Build effective prompt messages from query.prompt after preprocessing.""" + prompt_messages: list[dict[str, typing.Any]] = [] + + prompt = getattr(query, 'prompt', None) + messages = getattr(prompt, 'messages', None) + if not messages: + return prompt_messages + + for msg in messages: + prompt_messages.append(msg.model_dump(mode='json')) + + return prompt_messages + def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]: """Build messages list from query.""" messages: list[dict[str, typing.Any]] = [] diff --git a/src/langbot/pkg/agent/runner/resource_builder.py b/src/langbot/pkg/agent/runner/resource_builder.py index 4cf539ea..c3239c37 100644 --- a/src/langbot/pkg/agent/runner/resource_builder.py +++ b/src/langbot/pkg/agent/runner/resource_builder.py @@ -71,7 +71,7 @@ class AgentResourceBuilder: # Build each resource category in parallel models, tools, knowledge_bases = await asyncio.gather( - self._build_models(manifest_perms, query), + self._build_models(manifest_perms, runner_config, descriptor, query), self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query), self._build_knowledge_bases(manifest_perms, runner_config, query), ) @@ -89,10 +89,13 @@ class AgentResourceBuilder: async def _build_models( self, manifest_perms: dict[str, list[str]], + runner_config: dict[str, typing.Any], + descriptor: AgentRunnerDescriptor, query: typing.Any, ) -> list[ModelResource]: """Build models list with SDK v1 field names.""" models: list[ModelResource] = [] + seen_model_ids: set[str] = set() # Check manifest permission model_perms = manifest_perms.get('models', []) @@ -101,8 +104,72 @@ class AgentResourceBuilder: # Get model from query (preproc already resolved this) model_uuid = getattr(query, 'use_llm_model_uuid', None) - if not model_uuid: - return models + if model_uuid: + await self._append_llm_model_resource(models, seen_model_ids, model_uuid) + + # Add fallback models if present + fallback_uuids = query.variables.get('_fallback_model_uuids', []) + for fb_uuid in fallback_uuids: + await self._append_llm_model_resource(models, seen_model_ids, fb_uuid) + + # Add model resources referenced by the runner binding config schema. + # This makes authorization generic for AgentRunner plugins instead of + # hard-coding only local-agent's primary/fallback model path. + await self._append_config_declared_model_resources( + models=models, + seen_model_ids=seen_model_ids, + descriptor=descriptor, + runner_config=runner_config, + ) + + return models + + async def _append_config_declared_model_resources( + self, + models: list[ModelResource], + seen_model_ids: set[str], + descriptor: AgentRunnerDescriptor, + runner_config: dict[str, typing.Any], + ) -> None: + """Authorize model-like values selected through DynamicForm fields.""" + for item in descriptor.config_schema or []: + if not isinstance(item, dict): + continue + + field_name = item.get('name') + field_type = item.get('type') + if not field_name or field_name not in runner_config: + continue + + value = runner_config.get(field_name) + if field_type == 'model-fallback-selector': + if isinstance(value, str): + await self._append_llm_model_resource(models, seen_model_ids, value) + elif isinstance(value, dict): + primary = value.get('primary') + if isinstance(primary, str): + await self._append_llm_model_resource(models, seen_model_ids, primary) + fallbacks = value.get('fallbacks', []) + if isinstance(fallbacks, list): + for fallback_uuid in fallbacks: + if isinstance(fallback_uuid, str): + await self._append_llm_model_resource(models, seen_model_ids, fallback_uuid) + elif field_type == 'llm-model-selector': + if isinstance(value, str): + await self._append_llm_model_resource(models, seen_model_ids, value) + elif field_type == 'rerank-model-selector': + if isinstance(value, str): + await self._append_rerank_model_resource(models, seen_model_ids, value) + + async def _append_llm_model_resource( + self, + models: list[ModelResource], + seen_model_ids: set[str], + model_uuid: str | None, + ) -> None: + """Append an LLM model resource if it exists and has not been added.""" + if not model_uuid or model_uuid == '__none__' or model_uuid in seen_model_ids: + return try: model = await self.ap.model_mgr.get_model_by_uuid(model_uuid) @@ -112,24 +179,31 @@ class AgentResourceBuilder: 'model_type': getattr(model.model_entity, 'model_type', None), 'provider': getattr(model.provider_entity, 'name', None) if hasattr(model, 'provider_entity') else None, }) + seen_model_ids.add(model_uuid) except Exception as e: - self.ap.logger.warning(f'Failed to build model resource {model_uuid}: {e}') + self.ap.logger.warning(f'Failed to build LLM model resource {model_uuid}: {e}') - # Add fallback models if present - fallback_uuids = query.variables.get('_fallback_model_uuids', []) - for fb_uuid in fallback_uuids: - try: - model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid) - if model and model.model_entity: - models.append({ - 'model_id': fb_uuid, - 'model_type': model.model_entity.model_type, - 'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None, - }) - except Exception as e: - self.ap.logger.warning(f'Failed to build fallback model resource {fb_uuid}: {e}') + async def _append_rerank_model_resource( + self, + models: list[ModelResource], + seen_model_ids: set[str], + model_uuid: str | None, + ) -> None: + """Append a rerank model resource if it exists and has not been added.""" + if not model_uuid or model_uuid == '__none__' or model_uuid in seen_model_ids: + return - return models + try: + model = await self.ap.model_mgr.get_rerank_model_by_uuid(model_uuid) + if model and model.model_entity: + models.append({ + 'model_id': model_uuid, + 'model_type': getattr(model.model_entity, 'model_type', 'rerank') or 'rerank', + 'provider': getattr(model.provider_entity, 'name', None) if hasattr(model, 'provider_entity') else None, + }) + seen_model_ids.add(model_uuid) + except Exception as e: + self.ap.logger.warning(f'Failed to build rerank model resource {model_uuid}: {e}') async def _build_tools( self, diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index c65a0056..0902452c 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -154,6 +154,51 @@ async def _validate_run_authorization( return session, None +def _get_cached_query(ap: app.Application, query_id: int | None) -> Any | None: + """Return a cached pipeline Query for runtime actions when available.""" + if query_id is None: + return None + + try: + return ap.query_pool.cached_queries.get(query_id) + except Exception: + return None + + +def _resolve_action_query(data: dict[str, Any], session: Any | None, ap: app.Application) -> Any | None: + """Resolve the current Query from an AgentRunner session or action payload.""" + query_id = None + if session: + query_id = session.get('query_id') + if query_id is None: + query_id = data.get('query_id') + return _get_cached_query(ap, query_id) + + +def _resolve_remove_think(data: dict[str, Any], query: Any | None) -> bool: + """Resolve remove-think using explicit action override, then pipeline config.""" + if 'remove_think' in data: + return bool(data.get('remove_think')) + + if query and getattr(query, 'pipeline_config', None): + return bool(query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)) + + return False + + +def _merge_model_extra_args(model: Any, call_extra_args: Any) -> dict[str, Any]: + """Merge persisted model extra_args with action-level overrides.""" + merged: dict[str, Any] = {} + + model_extra_args = getattr(getattr(model, 'model_entity', None), 'extra_args', None) + if isinstance(model_extra_args, dict): + merged.update(model_extra_args) + if isinstance(call_extra_args, dict): + merged.update(call_extra_args) + + return merged + + class RuntimeConnectionHandler(handler.Handler): """Runtime connection handler""" @@ -449,6 +494,7 @@ class RuntimeConnectionHandler(handler.Handler): extra_args = data.get('extra_args', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + session = None # Permission validation for AgentRunner calls if run_id: @@ -473,13 +519,18 @@ class RuntimeConnectionHandler(handler.Handler): pass funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs] + query = _resolve_action_query(data, session, self.ap) + effective_extra_args = _merge_model_extra_args(llm_model, extra_args) + remove_think = _resolve_remove_think(data, query) + effective_funcs = funcs_obj if 'func_call' in (llm_model.model_entity.abilities or []) else [] result = await llm_model.provider.invoke_llm( - query=None, + query=query, model=llm_model, messages=messages_obj, - funcs=funcs_obj, - extra_args=extra_args, + funcs=effective_funcs, + extra_args=effective_extra_args, + remove_think=remove_think, ) return handler.ActionResponse.success( @@ -501,6 +552,7 @@ class RuntimeConnectionHandler(handler.Handler): extra_args = data.get('extra_args', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + session = None # Permission validation for AgentRunner calls if run_id: @@ -526,13 +578,18 @@ class RuntimeConnectionHandler(handler.Handler): pass funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs] + query = _resolve_action_query(data, session, self.ap) + effective_extra_args = _merge_model_extra_args(llm_model, extra_args) + remove_think = _resolve_remove_think(data, query) + effective_funcs = funcs_obj if 'func_call' in (llm_model.model_entity.abilities or []) else [] async for chunk in llm_model.provider.invoke_llm_stream( - query=None, + query=query, model=llm_model, messages=messages_obj, - funcs=funcs_obj, - extra_args=extra_args, + funcs=effective_funcs, + extra_args=effective_extra_args, + remove_think=remove_think, ): yield handler.ActionResponse.success( data={ @@ -558,6 +615,7 @@ class RuntimeConnectionHandler(handler.Handler): caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation # session_data = data['session'] # query_id = data['query_id'] + session = None # Permission validation for AgentRunner calls if run_id: @@ -571,10 +629,11 @@ class RuntimeConnectionHandler(handler.Handler): # In real implementation, you would reconstruct the full session # For now, we'll call the tool manager's execute method try: + query = _resolve_action_query(data, session, self.ap) result = await self.ap.tool_mgr.execute_func_call( name=tool_name, parameters=parameters, - query=None, # TODO: reconstruct query from session_data if needed + query=query, ) # Return both 'tool_response' (LangBotAPIProxy) and 'result' (AgentRunAPIProxy) # LangBotAPIProxy expects 'tool_response', AgentRunAPIProxy expects 'result' @@ -872,10 +931,11 @@ class RuntimeConnectionHandler(handler.Handler): query = data['query'] documents = data['documents'] top_k = data.get('top_k') + caller_plugin_identity = data.get('caller_plugin_identity') # Validate run authorization session, error = await _validate_run_authorization( - run_id, 'model', rerank_model_uuid, self.ap + run_id, 'model', rerank_model_uuid, self.ap, caller_plugin_identity ) if error: return error @@ -895,6 +955,7 @@ class RuntimeConnectionHandler(handler.Handler): model=rerank_model, query=query, documents=documents_capped, + extra_args=_merge_model_extra_args(rerank_model, data.get('extra_args', {})), ) # Sort by relevance score descending diff --git a/tests/unit_tests/agent/test_context_builder_params_state.py b/tests/unit_tests/agent/test_context_builder_params_state.py index e5ac035f..fcffac7c 100644 --- a/tests/unit_tests/agent/test_context_builder_params_state.py +++ b/tests/unit_tests/agent/test_context_builder_params_state.py @@ -66,6 +66,21 @@ class FakeMessage: self.content = content self.role = 'user' + def model_dump(self, mode='json'): + return {'role': self.role, 'content': self.content} + + +class FakePrompt: + """Fake prompt container.""" + def __init__(self, messages=None): + self.messages = messages or [] + + +class FakeAdapter: + """Fake adapter with streaming capability.""" + async def is_stream_output_supported(self): + return True + class TestBuildParams: """Tests for _build_params filtering.""" @@ -446,4 +461,35 @@ class TestBuildParamsInContext: # state should have seeded conversation_id assert 'state' in context - assert context['state']['conversation']['external.conversation_id'] == 'conv_abc' \ No newline at end of file + assert context['state']['conversation']['external.conversation_id'] == 'conv_abc' + + @pytest.mark.asyncio + async def test_context_includes_effective_prompt_and_runtime_capabilities(self): + """Context should expose host-preprocessed prompt and adapter capabilities.""" + reset_state_store() + ap = FakeApplication() + builder = AgentRunContextBuilder(ap) + descriptor = make_descriptor() + resources = make_resources() + + session = FakeSession() + query = type('Query', (), { + 'query_id': 1, + 'bot_uuid': 'bot_001', + 'pipeline_uuid': 'pipeline_001', + 'sender_id': 'user_001', + 'session': session, + 'user_message': None, + 'message_chain': None, + 'messages': [], + 'prompt': FakePrompt([FakeMessage('Effective prompt')]), + 'adapter': FakeAdapter(), + 'pipeline_config': {'output': {'misc': {'remove-think': True}}}, + 'variables': {}, + })() + + context = await builder.build_context(query, descriptor, resources) + + assert context['prompt'][0]['content'] == 'Effective prompt' + assert context['runtime']['metadata']['streaming_supported'] is True + assert context['runtime']['metadata']['remove_think'] is True diff --git a/tests/unit_tests/agent/test_resource_builder.py b/tests/unit_tests/agent/test_resource_builder.py new file mode 100644 index 00000000..e883965c --- /dev/null +++ b/tests/unit_tests/agent/test_resource_builder.py @@ -0,0 +1,148 @@ +"""Tests for AgentResourceBuilder.""" +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + +from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor +from langbot.pkg.agent.runner.resource_builder import AgentResourceBuilder + + +RUNNER_ID = 'plugin:test/runner/default' + + +def make_descriptor( + *, + permissions: dict | None = None, + config_schema: list[dict] | None = None, +) -> AgentRunnerDescriptor: + return AgentRunnerDescriptor( + id=RUNNER_ID, + source='plugin', + label={'en_US': 'Test Runner'}, + plugin_author='test', + plugin_name='runner', + runner_name='default', + permissions=permissions or {'models': ['invoke', 'stream']}, + config_schema=config_schema or [], + ) + + +def make_model(model_type='llm', provider='test-provider'): + return SimpleNamespace( + model_entity=SimpleNamespace(model_type=model_type), + provider_entity=SimpleNamespace(name=provider), + ) + + +def make_query(runner_config: dict, *, variables: dict | None = None, use_llm_model_uuid=None): + return SimpleNamespace( + pipeline_config={ + 'ai': { + 'runner': {'id': RUNNER_ID}, + 'runner_config': {RUNNER_ID: runner_config}, + }, + }, + variables=variables or {}, + use_llm_model_uuid=use_llm_model_uuid, + ) + + +@pytest.fixture +def app(): + mock_app = Mock() + mock_app.logger = Mock() + mock_app.model_mgr = Mock() + mock_app.rag_mgr = Mock() + mock_app.rag_mgr.get_knowledge_base_by_uuid = AsyncMock(return_value=None) + return mock_app + + +@pytest.mark.asyncio +async def test_build_models_authorizes_config_declared_llm_and_rerank_models(app): + """DynamicForm model selectors should become run-scoped authorized models.""" + llm_models = { + 'primary': make_model(), + 'fallback': make_model(), + 'aux': make_model(provider='aux-provider'), + } + rerank_models = { + 'rerank': make_model(model_type='rerank', provider='rerank-provider'), + } + + async def get_model_by_uuid(model_uuid): + return llm_models.get(model_uuid) + + async def get_rerank_model_by_uuid(model_uuid): + return rerank_models.get(model_uuid) + + app.model_mgr.get_model_by_uuid = AsyncMock(side_effect=get_model_by_uuid) + app.model_mgr.get_rerank_model_by_uuid = AsyncMock(side_effect=get_rerank_model_by_uuid) + descriptor = make_descriptor( + config_schema=[ + {'name': 'model', 'type': 'model-fallback-selector'}, + {'name': 'aux-model', 'type': 'llm-model-selector'}, + {'name': 'rerank-model', 'type': 'rerank-model-selector'}, + ], + ) + query = make_query({ + 'model': {'primary': 'primary', 'fallbacks': ['fallback', 'primary']}, + 'aux-model': 'aux', + 'rerank-model': 'rerank', + }) + + resources = await AgentResourceBuilder(app).build_resources(query, descriptor) + + assert resources['models'] == [ + {'model_id': 'primary', 'model_type': 'llm', 'provider': 'test-provider'}, + {'model_id': 'fallback', 'model_type': 'llm', 'provider': 'test-provider'}, + {'model_id': 'aux', 'model_type': 'llm', 'provider': 'aux-provider'}, + {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider'}, + ] + + +@pytest.mark.asyncio +async def test_build_models_still_honors_manifest_permissions(app): + """Config-selected models should not bypass runner manifest permissions.""" + app.model_mgr.get_model_by_uuid = AsyncMock(return_value=make_model()) + app.model_mgr.get_rerank_model_by_uuid = AsyncMock(return_value=make_model(model_type='rerank')) + descriptor = make_descriptor( + permissions={'models': []}, + config_schema=[ + {'name': 'model', 'type': 'model-fallback-selector'}, + {'name': 'rerank-model', 'type': 'rerank-model-selector'}, + ], + ) + query = make_query({ + 'model': {'primary': 'primary', 'fallbacks': ['fallback']}, + 'rerank-model': 'rerank', + }) + + resources = await AgentResourceBuilder(app).build_resources(query, descriptor) + + assert resources['models'] == [] + app.model_mgr.get_model_by_uuid.assert_not_awaited() + app.model_mgr.get_rerank_model_by_uuid.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_build_models_deduplicates_query_and_config_models(app): + """A model selected by both preproc and runner config should appear once.""" + app.model_mgr.get_model_by_uuid = AsyncMock(return_value=make_model()) + app.model_mgr.get_rerank_model_by_uuid = AsyncMock(return_value=None) + descriptor = make_descriptor( + config_schema=[ + {'name': 'model', 'type': 'model-fallback-selector'}, + ], + ) + query = make_query( + {'model': {'primary': 'primary', 'fallbacks': ['fallback']}}, + variables={'_fallback_model_uuids': ['fallback']}, + use_llm_model_uuid='primary', + ) + + resources = await AgentResourceBuilder(app).build_resources(query, descriptor) + + assert [model['model_id'] for model in resources['models']] == ['primary', 'fallback'] diff --git a/tests/unit_tests/plugin/test_handler_actions.py b/tests/unit_tests/plugin/test_handler_actions.py index 81bc7570..29e6a0cb 100644 --- a/tests/unit_tests/plugin/test_handler_actions.py +++ b/tests/unit_tests/plugin/test_handler_actions.py @@ -7,6 +7,7 @@ from types import SimpleNamespace from unittest.mock import AsyncMock, Mock import pytest +from langbot_plugin.api.entities.builtin.provider import message as provider_message from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction, RuntimeToLangBotAction @@ -27,6 +28,22 @@ def compiled_params(statement): return statement.compile().params +def make_agent_resources( + models: list[dict] | None = None, + tools: list[dict] | None = None, + knowledge_bases: list[dict] | None = None, +): + """Create a minimal AgentRun resources payload for run-scoped action tests.""" + return { + 'models': models or [], + 'tools': tools or [], + 'knowledge_bases': knowledge_bases or [], + 'files': [], + 'storage': {'plugin_storage': False, 'workspace_storage': False}, + 'platform_capabilities': {}, + } + + class TestInitializePluginSettings: """Tests for initialize_plugin_settings action handler.""" @@ -349,3 +366,231 @@ class TestHandlerQueryLookup: assert response.code == 0 assert response.data == {'bot_uuid': 'test-bot-uuid'} + + +class TestAgentRunProxyActions: + """Tests for AgentRunner proxy actions that need host Query semantics.""" + + @pytest.fixture + def app(self): + mock_app = Mock() + mock_app.logger = Mock() + mock_app.query_pool = Mock() + mock_app.query_pool.cached_queries = {} + mock_app.model_mgr = Mock() + mock_app.model_mgr.get_model_by_uuid = AsyncMock() + mock_app.model_mgr.get_rerank_model_by_uuid = AsyncMock() + mock_app.tool_mgr = Mock() + mock_app.tool_mgr.execute_func_call = AsyncMock(return_value={'ok': True}) + return mock_app + + @staticmethod + def query(remove_think=True): + return SimpleNamespace( + pipeline_config={'output': {'misc': {'remove-think': remove_think}}}, + ) + + @pytest.mark.asyncio + async def test_invoke_llm_restores_query_and_model_options(self, app): + """INVOKE_LLM passes Query, model extra_args and remove-think to provider.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + + run_id = 'run_proxy_invoke_llm_options' + query = self.query(remove_think=True) + app.query_pool.cached_queries[901] = query + + registry = get_session_registry() + await registry.unregister(run_id) + await registry.register( + run_id=run_id, + runner_id='plugin:test/runner/default', + query_id=901, + plugin_identity='test/runner', + resources=make_agent_resources(models=[{'model_id': 'llm_001'}]), + ) + + provider = SimpleNamespace( + invoke_llm=AsyncMock(return_value=provider_message.Message(role='assistant', content='ok')), + ) + model = SimpleNamespace( + model_entity=SimpleNamespace( + abilities=['func_call'], + extra_args={'temperature': 0.2, 'top_p': 0.8}, + ), + provider=provider, + ) + app.model_mgr.get_model_by_uuid.return_value = model + runtime_handler = make_handler(app) + + try: + response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM.value]({ + 'run_id': run_id, + 'llm_model_uuid': 'llm_001', + 'messages': [{'role': 'user', 'content': 'hello'}], + 'funcs': [{ + 'name': 'search', + 'human_desc': 'Search', + 'description': 'Search', + 'parameters': {'type': 'object'}, + }], + 'extra_args': {'temperature': 0.7, 'presence_penalty': 0.1}, + }) + finally: + await registry.unregister(run_id) + + assert response.code == 0 + provider.invoke_llm.assert_awaited_once() + kwargs = provider.invoke_llm.await_args.kwargs + assert kwargs['query'] is query + assert kwargs['extra_args'] == { + 'temperature': 0.7, + 'top_p': 0.8, + 'presence_penalty': 0.1, + } + assert kwargs['remove_think'] is True + assert [tool.name for tool in kwargs['funcs']] == ['search'] + + @pytest.mark.asyncio + async def test_invoke_llm_stream_restores_query_and_options(self, app): + """INVOKE_LLM_STREAM applies the same host context as non-streaming calls.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + + class StreamProvider: + def __init__(self): + self.kwargs = None + + async def invoke_llm_stream(self, **kwargs): + self.kwargs = kwargs + yield provider_message.MessageChunk(role='assistant', content='hi') + + run_id = 'run_proxy_invoke_llm_stream_options' + query = self.query(remove_think=False) + app.query_pool.cached_queries[902] = query + + registry = get_session_registry() + await registry.unregister(run_id) + await registry.register( + run_id=run_id, + runner_id='plugin:test/runner/default', + query_id=902, + plugin_identity='test/runner', + resources=make_agent_resources(models=[{'model_id': 'llm_stream_001'}]), + ) + + provider = StreamProvider() + model = SimpleNamespace( + model_entity=SimpleNamespace(abilities=[], extra_args={'max_tokens': 128}), + provider=provider, + ) + app.model_mgr.get_model_by_uuid.return_value = model + runtime_handler = make_handler(app) + + responses = [] + try: + stream = runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM_STREAM.value]({ + 'run_id': run_id, + 'llm_model_uuid': 'llm_stream_001', + 'messages': [{'role': 'user', 'content': 'hello'}], + 'funcs': [{ + 'name': 'search', + 'human_desc': 'Search', + 'description': 'Search', + 'parameters': {'type': 'object'}, + }], + 'extra_args': {'max_tokens': 256}, + 'remove_think': True, + }) + async for response in stream: + responses.append(response) + finally: + await registry.unregister(run_id) + + assert [response.code for response in responses] == [0] + assert provider.kwargs['query'] is query + assert provider.kwargs['extra_args'] == {'max_tokens': 256} + assert provider.kwargs['remove_think'] is True + assert provider.kwargs['funcs'] == [] + + @pytest.mark.asyncio + async def test_call_tool_passes_current_query(self, app): + """CALL_TOOL passes the current Query back into tool execution.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + + run_id = 'run_proxy_call_tool_query' + query = self.query() + app.query_pool.cached_queries[903] = query + + registry = get_session_registry() + await registry.unregister(run_id) + await registry.register( + run_id=run_id, + runner_id='plugin:test/runner/default', + query_id=903, + plugin_identity='test/runner', + resources=make_agent_resources(tools=[{'tool_name': 'test/search'}]), + ) + + runtime_handler = make_handler(app) + + try: + response = await runtime_handler.actions[PluginToRuntimeAction.CALL_TOOL.value]({ + 'run_id': run_id, + 'tool_name': 'test/search', + 'parameters': {'q': 'langbot'}, + }) + finally: + await registry.unregister(run_id) + + assert response.code == 0 + app.tool_mgr.execute_func_call.assert_awaited_once_with( + name='test/search', + parameters={'q': 'langbot'}, + query=query, + ) + + @pytest.mark.asyncio + async def test_invoke_rerank_uses_authorized_model_and_extra_args(self, app): + """INVOKE_RERANK validates run-scoped model access and merges model extra_args.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + + run_id = 'run_proxy_rerank_options' + registry = get_session_registry() + await registry.unregister(run_id) + await registry.register( + run_id=run_id, + runner_id='plugin:test/runner/default', + query_id=904, + plugin_identity='test/runner', + resources=make_agent_resources(models=[{'model_id': 'rerank_001'}]), + ) + + provider = SimpleNamespace( + invoke_rerank=AsyncMock(return_value=[ + {'index': 0, 'relevance_score': 0.2}, + {'index': 1, 'relevance_score': 0.9}, + ]), + ) + rerank_model = SimpleNamespace( + model_entity=SimpleNamespace(extra_args={'top_n': 5, 'return_documents': False}), + provider=provider, + ) + app.model_mgr.get_rerank_model_by_uuid.return_value = rerank_model + runtime_handler = make_handler(app) + + try: + response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_RERANK.value]({ + 'run_id': run_id, + 'rerank_model_uuid': 'rerank_001', + 'query': 'hello', + 'documents': ['a', 'b'], + 'top_k': 1, + 'extra_args': {'top_n': 2}, + }) + finally: + await registry.unregister(run_id) + + assert response.code == 0 + assert response.data['results'] == [{'index': 1, 'relevance_score': 0.9}] + provider.invoke_rerank.assert_awaited_once() + kwargs = provider.invoke_rerank.await_args.kwargs + assert kwargs['extra_args'] == {'top_n': 2, 'return_documents': False}