feat(agent-runner): integrate AgentRunner Protocol v1 with plugin system

Phase 0 integration complete - verified minimal loop with local-agent stub runner.

Changes:
- Add AgentRunOrchestrator for plugin-based agent execution
- Add AgentResultNormalizer for Protocol v1 result conversion
- Add AgentRunnerDescriptor for runner ID parsing (plugin:author/name/runner)
- Update chat handler to use new orchestrator instead of direct runner lookup
- Add plugin handler methods for list_agent_runners and run_agent
- Add connector methods for AgentRunner protocol forwarding
- Update pipeline API to include runner options in metadata
- Add integration docs and implementation plan

Integration verified:
- Runner: plugin:langbot/local-agent/default
- Input: "你好"
- Output: [stub] Echo: 你好
- Date: 2026-05-10 10:09

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
huanghuoguoguo
2026-05-10 10:11:54 +08:00
parent b7dcda8b23
commit 5aaa422250
29 changed files with 3955 additions and 298 deletions

View File

@@ -0,0 +1,370 @@
# Agent Runner 插件化最终实现计划
本文档面向实现 agent用来把当前 PoC 分支直接推进到最终架构。这个分支不按线上渐进发布节奏处理,因此可以接受一次性破坏内部 runner 实现和 Pipeline AI 配置结构;但最终必须提供历史配置迁移。
## 1. 最终状态
LangBot 最终只保留 Agent Runner 的宿主能力:
- 发现 runner`AgentRunnerRegistry`
- 选择 runnerPipeline 配置和未来事件绑定配置
- 构造上下文:`AgentRunContext`
- 裁剪资源:模型、工具、知识库、文件、存储、平台能力
- 调度执行:`AgentRunOrchestrator`
- 归一结果:`AgentRunResult` -> 当前 Pipeline 的 `Message` / `MessageChunk`
- 隔离错误:插件异常、协议错误、超时、结果过大不能破坏主流程
- 迁移旧配置:把旧内置 runner 配置迁到官方 AgentRunner 插件配置
LangBot 不再长期维护内置业务 runner 分支。`local-agent`、Dify、n8n、Coze、DashScope、Langflow、Tbox 等都迁到官方 AgentRunner 插件。
## 2. 高层架构
```text
Pipeline MessageProcessor / future EventRouter
|
v
AgentRunOrchestrator
|
+--> AgentRunnerRegistry
| +--> plugin runtime LIST_AGENT_RUNNERS
| +--> descriptor cache / validation
|
+--> AgentRunContextBuilder
+--> AgentResourceBuilder
+--> AgentResultNormalizer
|
v
PluginRuntimeConnector.run_agent()
|
v
SDK Runtime RUN_AGENT -> plugin AgentRunner.run()
```
关键约束:
- `ChatMessageHandler` 不解析 `plugin:*`,不实例化 wrapper不知道 runner 组件细节。
- `PipelineService.get_pipeline_metadata()` 不直接访问插件 runtime而是读取 registry。
-`RequestRunner` 只作为迁移参考,不作为最终运行路径。
- EBA 只做字段预留,不在本轮实现 EventBus、EventRouter、平台动作执行。
## 3. 新增 LangBot 模块
建议新增:
```text
src/langbot/pkg/agent/
__init__.py
runner/
__init__.py
descriptor.py
errors.py
id.py
registry.py
context_builder.py
resource_builder.py
orchestrator.py
result_normalizer.py
config_migration.py
```
### 3.1 descriptor.py
定义 LangBot 内部使用的 descriptor
```python
class AgentRunnerDescriptor(BaseModel):
id: str
source: Literal["plugin"]
label: dict[str, str]
description: dict[str, str] | None = None
plugin_author: str
plugin_name: str
runner_name: str
plugin_version: str | None = None
protocol_version: str = "1"
config_schema: list[dict[str, Any]] = []
capabilities: dict[str, bool] = {}
permissions: dict[str, list[str]] = {}
raw_manifest: dict[str, Any] = {}
```
`source == "builtin"` 不作为最终目标。如果实现阶段需要临时 adapter必须标记为测试过渡代码并在官方插件跑通后删除。
### 3.2 id.py
统一 runner id 解析和生成:
- 插件 runner id`plugin:{author}/{plugin_name}/{runner_name}`
- `parse_runner_id(id)` 返回结构化对象
- 禁止业务代码手写字符串 split
- PoC 已存在的 `plugin:author/name/runner` 继续作为合法 id
### 3.3 registry.py
职责:
- 调用 `ap.plugin_connector.list_agent_runners(bound_plugins=None)` 拉取插件 runner
- 校验 manifest
- `kind == AgentRunner`
- `metadata.name` 存在
- `metadata.label` 存在
- `spec.protocol_version` 兼容,默认 `1`
- `spec.config` 是 list默认空
- `spec.capabilities` 是 dict默认空
- `spec.permissions` 是 dict默认空
- 输出 `AgentRunnerDescriptor`
- 缓存 discovery 结果,提供 `refresh()`
- 单个插件 manifest 失败只记录 warning不影响其它 runner
刷新触发点:
- 插件安装、卸载、升级、重启后
- Pipeline metadata 请求时发现缓存为空
- 可选 TTL优先保证正确性
### 3.4 context_builder.py
把当前 Pipeline query 直接转换成 SDK v1 `AgentRunContext`
当前消息 Pipeline 的最小字段:
- `run_id`: 新 UUID不使用 query id 作为全局 run id
- `trigger.type`: `message.received`
- `conversation`: launcher、sender、bot、pipeline、历史消息
- `event`: message event envelope 子集
- `actor`: sender
- `subject`: 当前消息或 launcher
- `messages`: `query.messages`
- `input`: 从 `query.user_message``query.message_chain` 构造
- `resources`: 由 `resource_builder` 注入
- `runtime`: host/version/workspace/bot/pipeline/query/trace/deadline
- `config`: 当前 runner id 对应的实例配置
保留 SDK legacy helper 是 SDK 的责任LangBot 不再构造 PoC 的 `query_id/session/messages/user_message/extra_config` 上下文。
### 3.5 resource_builder.py
执行前做三层裁剪:
1. runner manifest 声明的 `spec.permissions`
2. Pipeline 的 `extensions_preferences`
3. runner 实例配置中选择的资源范围
输出写入 `ctx.resources`,至少覆盖:
- models可调用模型 UUID、类型、能力摘要
- tools可见工具 manifest使用当前 bound plugins / MCP server 范围
- knowledge_bases可检索知识库列表
- storageplugin storage / workspace storage 权限摘要
- files允许读取的配置文件、知识文件摘要
- platform_capabilities本阶段只声明不执行平台动作
注意:旧的 unrestricted proxy action 必须在 Phase 2 被二次校验,不能只靠 context 声明。
### 3.6 result_normalizer.py
只接受 SDK v1 result
- `message.delta`
- `message.completed`
- `tool.call.started`
- `tool.call.completed`
- `state.updated`
- `run.completed`
- `run.failed`
- `action.requested` 允许实验性返回,但本阶段只记录 telemetry不执行
映射:
- `message.delta.data.chunk` -> `provider_message.MessageChunk`
- `message.completed.data.message` -> `provider_message.Message`
- `run.completed.data.message` -> `provider_message.Message`
- `run.failed` -> 抛出受控异常,让 `ChatMessageHandler` 使用现有错误策略
- 工具和状态事件默认不 yield 到 Pipeline只记录 debug/telemetry
防护:
- 未知 type warning 后忽略
- 单 result 序列化大小限制
- provider message schema 校验失败转 `run.failed`
- 插件没有输出任何消息时,按 runner failed 处理
### 3.7 orchestrator.py
核心入口:
```python
async def run_from_query(query: pipeline_query.Query) -> AsyncGenerator[Message | MessageChunk, None]:
runner_id = resolve_runner_id(query.pipeline_config)
descriptor = await registry.get(runner_id, bound_plugins=query.variables.get("_pipeline_bound_plugins"))
ctx = await context_builder.from_query(query, descriptor)
async for raw in plugin_connector.run_agent(...):
async for message in result_normalizer.normalize(raw):
yield message
```
必须覆盖:
- runner id 不存在
- 插件系统关闭
- runner 不在 bound plugins 范围内
- 插件 runtime 断连
- runner 协议版本不兼容
- run 超时
- task cancellation
## 4. 配置模型直接切换
目标格式:
```json
{
"ai": {
"runner": {
"id": "plugin:langbot/local-agent/default",
"expire-time": 0
},
"runner_config": {
"plugin:langbot/local-agent/default": {}
}
}
}
```
兼容读取:
- 优先读 `ai.runner.id`
- 没有 `id` 时读旧 `ai.runner.runner`
- 旧内置 runner 名通过迁移表映射:
- `local-agent` -> `plugin:langbot/local-agent/default`
- `dify-service-api` -> `plugin:langbot/dify-agent/default`
- `n8n-service-api` -> `plugin:langbot/n8n-agent/default`
- `coze-api` -> `plugin:langbot/coze-agent/default`
- `dashscope-app-api` -> `plugin:langbot/dashscope-agent/default`
- `langflow-api` -> `plugin:langbot/langflow-agent/default`
- `tbox-app-api` -> `plugin:langbot/tbox-agent/default`
写入策略:
- 新 UI 只写 `ai.runner.id``ai.runner_config`
- 后端 update 接口接受旧字段,但保存时归一成新格式
- migration 最后统一落库
## 5. 需要修改的 LangBot 范围
必须修改:
- `src/langbot/pkg/core/app.py`
- 增加 `agent_runner_registry` / `agent_run_orchestrator` 属性
- `src/langbot/pkg/core/stages/build_app.py`
- 初始化 Agent 子系统
- `src/langbot/pkg/pipeline/process/handlers/chat.py`
- 删除 `PluginAgentRunnerWrapper`
- 删除内置 runner 查找逻辑
- 调用 orchestrator
- `src/langbot/pkg/api/http/service/pipeline.py`
- metadata 从 registry 生成
- `src/langbot/pkg/plugin/connector.py`
- `list_agent_runners()` / `run_agent()` 增加协议校验和 bound plugin 参数
- `src/langbot/pkg/plugin/handler.py`
- proxy action 二次权限校验
- `src/langbot/pkg/pipeline/preproc/preproc.py`
- 不再只为 `local-agent` 构造工具、知识库、模型
- 对所有 agent runner 保留 multimodal input
- `src/langbot/pkg/pipeline/pipelinemgr.py`
- runner name 监控改读 `runner.id`
- `src/langbot/templates/metadata/pipeline/ai.yaml`
- runner 字段从 `runner` 迁到 `id`
- `src/langbot/templates/default-pipeline-config.json`
- 默认 runner 改为官方 local-agent 插件 id
- `web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx`
- 当前 runner 改读 `ai.runner.id`
- runner 配置区改写入 `ai.runner_config[id]`
最终删除或停用:
- `src/langbot/pkg/provider/runner.py` 的业务注册路径
- `src/langbot/pkg/provider/runners/*` 的运行入口
可以暂时保留文件作为官方插件迁移参考,但不应被运行时引用。
## 6. 实现顺序
### Step 1接入新版 SDK
- 更新 LangBot 依赖到包含 SDK v1 AgentRunner 协议的版本
- 删除 LangBot 中对旧 `AgentRunReturn` 类型名的依赖
- 确认 `langbot_plugin` 的本地 editable / lockfile 指向正确 SDK
### Step 2Agent 子系统骨架
- 新增 descriptor/id/errors
- 新增 registry先只 list plugin runner
- 为 registry 加单测,使用 fake connector
### Step 3Pipeline metadata 切 registry
- `get_pipeline_metadata()` 只通过 registry 输出 runner option
- 插件 runner config stage 从 descriptor.config_schema 生成
- schema 错误不影响 metadata 返回
### Step 4Orchestrator 替换 ChatMessageHandler
- 新增 context builder / result normalizer / orchestrator
- `chat.py` 删除 wrapper 和 runner 查找
- 维持现有流式卡片和 resp_messages 行为
### Step 5新配置读写
- 后端 resolve runner id 支持新旧配置
- 前端表单改 `runner.id` + `runner_config`
- 默认配置改官方 local-agent 插件 id
### Step 6权限和资源裁剪
- resource builder 根据 manifest / pipeline / instance config 裁剪
- proxy action 校验 resource scope
- 禁止插件用 unrestricted API 访问未授权知识库、工具、模型
### Step 7删除内置 runner 运行分支
- 官方插件 ready 后移除内置 runner registry
- 删除或隔离 provider runners 的运行引用
- 测试旧 runner 名只能通过 migration 映射到插件 id
### Step 8历史配置迁移
- 写 persistence migration
- 更新 default pipeline config
- 对已存在 Pipeline 执行旧字段到新字段迁移
- 对监控/日志里的 runner 字段改用新 id
## 7. 测试要求
单测:
- runner id parse / format
- registry manifest 校验、失败隔离、bound plugins 过滤
- context builder 从 query 生成完整 v1 context
- resource builder 三层裁剪
- result normalizer 对每种 result type 的映射
- 旧配置 resolve 和 migration
集成测试:
- fake AgentRunner 插件可被 Pipeline 选择
- streaming 输出仍能更新 message card
- 插件异常返回用户可理解错误,不中断 runtime
- runner 不在 bound plugins 时不可执行
- 未授权工具 / 知识库 / 模型 proxy 调用被拒绝
-`local-agent` Pipeline 配置迁到官方插件 id
## 8. 验收标准
- LangBot Pipeline 可以选择插件 AgentRunner 并完成非流式和流式回复。
- `ChatMessageHandler` 不包含插件 runner 解析和 wrapper。
- `PipelineService` 不直接拼插件 runner metadata。
- 所有 runner 配置使用 `ai.runner.id` + `ai.runner_config`
- 旧内置 runner 不再作为 LangBot 内部运行分支执行。
- 插件只能访问 `ctx.resources` 授权的模型、工具、知识库和文件。
- EBA 相关字段只作为 context/result 预留,不执行平台动作。

View File

@@ -0,0 +1,192 @@
# 官方 AgentRunner 插件仓库计划
本文档描述内置 `RequestRunner` 迁出 LangBot 后,官方 runner 插件仓库应如何组织。建议新建仓库:
```text
/home/glwuy/langbot-app/langbot-official-agent-runners
```
远端仓库名建议:`langbot-official-agent-runners`
## 1. 为什么新仓库
官方 runner 插件会和 LangBot 主仓库、SDK 仓库以不同节奏迭代:
- LangBot 主仓库只维护宿主协议和调度。
- SDK 仓库维护 AgentRunner 组件和 runtime 协议。
- 官方 runner 插件承载业务 runner 的具体实现和第三方平台适配。
不要把官方 runner 插件继续留在 LangBot 主仓库,否则容易重新形成“宿主和业务 runner 绑死”的结构。
## 2. 仓库结构
建议采用 monorepo
```text
langbot-official-agent-runners/
README.md
pyproject.toml
packages/
local-agent/
manifest.yaml
components/default.yaml
main.py
src/
tests/
dify-agent/
n8n-agent/
coze-agent/
dashscope-agent/
langflow-agent/
tbox-agent/
shared/
langbot_agent_runner_utils/
__init__.py
context.py
config.py
streaming.py
tool_calling.py
errors.py
tests/
fixtures/
integration/
```
先用一个仓库统一迁移,避免每个 runner 复制 SDK helper、测试夹具、发布脚本。
## 3. 插件命名和 runner id
固定映射:
| 旧 runner | 官方插件 | runner id |
| --- | --- | --- |
| `local-agent` | `langbot/local-agent` | `plugin:langbot/local-agent/default` |
| `dify-service-api` | `langbot/dify-agent` | `plugin:langbot/dify-agent/default` |
| `n8n-service-api` | `langbot/n8n-agent` | `plugin:langbot/n8n-agent/default` |
| `coze-api` | `langbot/coze-agent` | `plugin:langbot/coze-agent/default` |
| `dashscope-app-api` | `langbot/dashscope-agent` | `plugin:langbot/dashscope-agent/default` |
| `langflow-api` | `langbot/langflow-agent` | `plugin:langbot/langflow-agent/default` |
| `tbox-app-api` | `langbot/tbox-agent` | `plugin:langbot/tbox-agent/default` |
每个插件可以后续提供多个 runner但迁移目标的默认 runner 统一叫 `default`
## 4. 迁移优先级
### Batch 1打通协议
1. `local-agent`
2. `dify-agent`
原因:
- `local-agent` 覆盖模型、工具、知识库、流式、会话历史,是能力最完整的基准。
- `dify-agent` 代表外部 Agent 平台调用,配置和错误处理能验证传统 service API runner 的迁移方式。
### Batch 2迁移外部 workflow runner
1. `n8n-agent`
2. `langflow-agent`
这批主要验证 webhook/workflow 输入输出、timeout、外部 conversation id。
### Batch 3迁移平台 Agent API
1. `coze-agent`
2. `dashscope-agent`
3. `tbox-agent`
这批主要验证平台特有响应格式、引用资料、文件/图片输入。
## 5. 每个官方插件的组件要求
每个插件至少包含:
```yaml
apiVersion: langbot/v1
kind: AgentRunner
metadata:
name: default
label:
en_US: Dify Agent
zh_Hans: Dify Agent
description:
en_US: Run a Dify application as a LangBot AgentRunner.
zh_Hans: 将 Dify 应用作为 LangBot AgentRunner 运行。
spec:
protocol_version: "1"
config: []
capabilities:
streaming: true
tool_calling: false
knowledge_retrieval: false
multimodal_input: false
event_context: true
platform_api: false
interrupt: false
stateful_session: true
permissions:
models: []
tools: []
knowledge_bases: []
storage: ["plugin"]
files: []
platform_api: []
execution:
python:
path: ./main.py
attr: DefaultAgentRunner
```
## 6. local-agent 插件要求
`local-agent` 是最关键的官方插件,应等价迁移当前:
- model primary/fallback 选择
- prompt
- max-round
- knowledge-bases
- rerank-model
- rerank-top-k
- function calling
- streaming
- multimodal input
- conversation history
- monitoring metadata
与 LangBot 主仓库的责任边界:
- LangBot 构造 `ctx.messages``ctx.input``ctx.resources`
- 插件负责选择模型、拼请求、调用 LLM、处理 tool call loop、输出 result stream
- 插件不能绕过 `ctx.resources` 调用未授权模型、工具或知识库
## 7. 外部 runner 插件要求
外部平台 runner 迁移时遵循:
- 旧配置字段尽量保持同名,便于 migration 复制
- 输出统一转换为 `AgentRunResult`
- 外部 API timeout 从 runner config 读取
- 平台 conversation id 存 plugin storage 或 context runtime state不能依赖 LangBot 内置 conversation uuid 私有结构
- 流式支持按平台能力声明,没有流式就只发 `message.completed`
## 8. 发布和安装策略
最终 LangBot 安装或升级时需要保证官方 runner 插件可用。可选方案:
1. 首次启动检测缺失官方 runner 插件并提示安装。
2. 打包发行版时预装官方 runner 插件。
3. 在 migration 前检查对应插件是否存在,不存在则自动安装或阻止迁移。
建议实现顺序:
- 开发阶段使用本地路径插件。
- 发布前支持 marketplace 安装。
- 历史配置 migration 只在官方插件可用时执行。
## 9. 验收标准
- 每个旧 runner 都有对应官方 AgentRunner 插件。
- 旧 runner 配置能无损复制到新 `runner_config[id]`
- LangBot 主仓库不再通过 `RequestRunner` 执行业务 runner。
- 官方插件测试覆盖非流式、流式、错误、timeout、配置缺失。
- `local-agent` 插件能完成模型 fallback、tool calling、知识库检索。

View File

@@ -0,0 +1,63 @@
# Agent Runner Pluginization Phase 0 Integration Test Record
## Test Summary
**Status**: PASSED
**Date**: 2026-05-10 10:09
## Test Configuration
- **LangBot Branch**: feat/agent-runner-plugin
- **SDK Branch**: feat/agent-runner-plugin
- **Runner Repo**: langbot-agent-runner (new)
## Test Scenario
- **Selected Runner**: `plugin:langbot/local-agent/default`
- **Input**: `1`
- **Expected Output**: `[stub] Echo: 1`
- **Actual Output**: `[stub] Echo: 1`
## Verified Chain
```
Frontend selects plugin:langbot/local-agent/default
-> LangBot pipeline
-> AgentRunOrchestrator
-> SDK runtime RUN_AGENT
-> langbot-agent-runner/local-agent DefaultAgentRunner
-> AgentRunResult
-> LangBot response
```
## Key Components Verified
### LangBot Host
- AgentRunOrchestrator resolves runner ID via ConfigMigration
- AgentRunContextBuilder builds SDK v1 context
- AgentResultNormalizer normalizes SDK v1 results
- ChatMessageHandler delegates to orchestrator (single resp_message_id, streaming pop/append)
### SDK Runtime
- RUN_AGENT action dispatches to plugin runner
- AgentRunner component manifest parsing
- LIST_AGENT_RUNNERS returns runner metadata
### langbot-agent-runner Plugin
- DefaultAgentRunner stub implementation
- AgentRunner manifest with protocol_version, capabilities, permissions
- Echo response validates SDK v1 result format
## Next Steps (Phase 1)
1. Implement real Dify runner (external API runner validation)
2. Update frontend to save `ai.runner.id` + `ai.runner_config`
3. Add persistence migration for old config format
4. Update pipeline templates
5. Add proxy action secondary permission validation
## Related Documents
- [IMPLEMENTATION_PLAN.md](./IMPLEMENTATION_PLAN.md)
- [OFFICIAL_RUNNER_PLUGINS.md](./OFFICIAL_RUNNER_PLUGINS.md)

View File

@@ -105,6 +105,9 @@ classifiers = [
"Topic :: Communications :: Chat",
]
[tool.uv.sources]
langbot-plugin = { path = "../langbot-plugin-sdk", editable = true }
[project.urls]
Homepage = "https://langbot.app"
Documentation = "https://docs.langbot.app"
@@ -223,4 +226,3 @@ skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"

View File

@@ -0,0 +1,37 @@
"""Agent runner subsystem for LangBot."""
from __future__ import annotations
from .runner.descriptor import AgentRunnerDescriptor
from .runner.id import parse_runner_id, format_runner_id, RunnerIdParts, is_plugin_runner_id
from .runner.errors import (
AgentRunnerError,
RunnerNotFoundError,
RunnerNotAuthorizedError,
RunnerProtocolError,
RunnerExecutionError,
)
from .runner.registry import AgentRunnerRegistry
from .runner.context_builder import AgentRunContextBuilder
from .runner.resource_builder import AgentResourceBuilder
from .runner.result_normalizer import AgentResultNormalizer
from .runner.orchestrator import AgentRunOrchestrator
from .runner.config_migration import ConfigMigration
__all__ = [
'AgentRunnerDescriptor',
'parse_runner_id',
'format_runner_id',
'is_plugin_runner_id',
'RunnerIdParts',
'AgentRunnerError',
'RunnerNotFoundError',
'RunnerNotAuthorizedError',
'RunnerProtocolError',
'RunnerExecutionError',
'AgentRunnerRegistry',
'AgentRunContextBuilder',
'AgentResourceBuilder',
'AgentResultNormalizer',
'AgentRunOrchestrator',
'ConfigMigration',
]

View File

@@ -0,0 +1,36 @@
"""Agent runner modules."""
from __future__ import annotations
from .descriptor import AgentRunnerDescriptor
from .id import parse_runner_id, format_runner_id, RunnerIdParts
from .errors import (
AgentRunnerError,
RunnerNotFoundError,
RunnerNotAuthorizedError,
RunnerProtocolError,
RunnerExecutionError,
)
from .registry import AgentRunnerRegistry
from .context_builder import AgentRunContextBuilder
from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .orchestrator import AgentRunOrchestrator
from .config_migration import ConfigMigration
__all__ = [
'AgentRunnerDescriptor',
'parse_runner_id',
'format_runner_id',
'RunnerIdParts',
'AgentRunnerError',
'RunnerNotFoundError',
'RunnerNotAuthorizedError',
'RunnerProtocolError',
'RunnerExecutionError',
'AgentRunnerRegistry',
'AgentRunContextBuilder',
'AgentResourceBuilder',
'AgentResultNormalizer',
'AgentRunOrchestrator',
'ConfigMigration',
]

View File

@@ -0,0 +1,196 @@
"""Configuration migration for agent runner IDs."""
from __future__ import annotations
import typing
from .id import is_plugin_runner_id
# Mapping from old built-in runner names to official plugin runner IDs
OLD_RUNNER_TO_PLUGIN_RUNNER_ID = {
'local-agent': 'plugin:langbot/local-agent/default',
'dify-service-api': 'plugin:langbot/dify-agent/default',
'n8n-service-api': 'plugin:langbot/n8n-agent/default',
'coze-api': 'plugin:langbot/coze-agent/default',
'dashscope-app-api': 'plugin:langbot/dashscope-agent/default',
'langflow-api': 'plugin:langbot/langflow-agent/default',
'tbox-app-api': 'plugin:langbot/tbox-agent/default',
}
class ConfigMigration:
"""Configuration migration helper for agent runner IDs.
Responsibilities:
- Resolve runner ID from new ai.runner.id or old ai.runner.runner
- Map old built-in runner names to official plugin runner IDs
- Extract runner config from ai.runner_config or old ai.<runner-name>
"""
@staticmethod
def resolve_runner_id(pipeline_config: dict[str, typing.Any]) -> str | None:
"""Resolve runner ID from pipeline configuration.
Priority:
1. New format: ai.runner.id (must be plugin:* format)
2. Old format: ai.runner.runner (mapped to plugin:* if built-in)
Args:
pipeline_config: Pipeline configuration dict
Returns:
Runner ID string, or None if not configured
"""
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner', {})
# Check new format first
runner_id = runner_config.get('id')
if runner_id:
if is_plugin_runner_id(runner_id):
return runner_id
# If it's not a plugin ID, try to map it as old runner name
return OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(runner_id, runner_id)
# Check old format
old_runner_name = runner_config.get('runner')
if old_runner_name:
# If already plugin:* format, return directly
if is_plugin_runner_id(old_runner_name):
return old_runner_name
# Map old built-in runner to official plugin ID
mapped_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name)
if mapped_id:
return mapped_id
# Return old name if no mapping exists (will error in registry)
return old_runner_name
return None
@staticmethod
def resolve_runner_config(
pipeline_config: dict[str, typing.Any],
runner_id: str,
) -> dict[str, typing.Any]:
"""Resolve runner instance configuration from pipeline configuration.
Priority:
1. New format: ai.runner_config[runner_id]
2. Old format: ai.<runner-name> (mapped from runner_id if applicable)
Args:
pipeline_config: Pipeline configuration dict
runner_id: Resolved runner ID
Returns:
Runner configuration dict (empty if not found)
"""
ai_config = pipeline_config.get('ai', {})
# Check new format
runner_configs = ai_config.get('runner_config', {})
if runner_id in runner_configs:
return runner_configs[runner_id]
# Check old format: ai.<old_runner_name>
# Try to find old runner name from runner_id
old_runner_name = None
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id:
old_runner_name = old_name
break
if old_runner_name:
old_config = ai_config.get(old_runner_name, {})
if old_config:
return old_config
# If runner_id is plugin:* format, try extracting runner_name as config key
if is_plugin_runner_id(runner_id):
# Some configs might use just the runner_name component as key
# But this is legacy behavior - prefer ai.runner_config[id]
pass
return {}
@staticmethod
def get_old_runner_name(runner_id: str) -> str | None:
"""Get old runner name from mapped runner ID.
Args:
runner_id: Plugin runner ID
Returns:
Old runner name if mapped, None otherwise
"""
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id:
return old_name
return None
@staticmethod
def get_expire_time(pipeline_config: dict[str, typing.Any]) -> int:
"""Get conversation expire time from configuration.
Args:
pipeline_config: Pipeline configuration dict
Returns:
Expire time in seconds (0 means no expiry)
"""
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner', {})
return runner_config.get('expire-time', 0)
@staticmethod
def migrate_pipeline_config(pipeline_config: dict[str, typing.Any]) -> dict[str, typing.Any]:
"""Migrate pipeline config to new format.
This converts old ai.runner.runner and ai.<runner-name> to
new ai.runner.id and ai.runner_config format.
Args:
pipeline_config: Original pipeline configuration
Returns:
Migrated pipeline configuration
"""
# Create copy
new_config = dict(pipeline_config)
ai_config = new_config.get('ai', {})
if not ai_config:
return new_config
runner_config = ai_config.get('runner', {})
runner_configs = ai_config.get('runner_config', {})
# Resolve runner ID
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
if runner_id:
# Set new format
runner_config['id'] = runner_id
# Remove old runner field if present
if 'runner' in runner_config and is_plugin_runner_id(runner_config['runner']):
# Already migrated plugin:* format, keep as id
pass
elif 'runner' in runner_config:
# Old built-in runner name, remove after migration
old_name = runner_config['runner']
if old_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID:
del runner_config['runner']
# Migrate runner config
resolved_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
if resolved_config:
runner_configs[runner_id] = resolved_config
# Remove old runner config block
for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items():
if mapped_id == runner_id and old_name in ai_config:
del ai_config[old_name]
# Update configs
ai_config['runner'] = runner_config
ai_config['runner_config'] = runner_configs
new_config['ai'] = ai_config
return new_config

View File

@@ -0,0 +1,254 @@
"""Agent run context builder for converting Query to SDK v1 AgentRunContext."""
from __future__ import annotations
import uuid
import time
import typing
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .config_migration import ConfigMigration
# Internal models for SDK v1 context protocol matching SDK v1 resources.py
class AgentTrigger(typing.TypedDict):
"""Agent trigger information."""
type: str
source: str # 'pipeline' or 'event_router'
timestamp: int | None
class ConversationContext(typing.TypedDict):
"""Conversation context."""
session_id: str | None
conversation_id: str | None
launcher_type: str | None
launcher_id: str | None
sender_id: str | None
bot_uuid: str | None
pipeline_uuid: str | None
class AgentInput(typing.TypedDict):
"""Agent input."""
text: str | None
contents: list[dict[str, typing.Any]]
message_chain: dict[str, typing.Any] | None
attachments: list[dict[str, typing.Any]]
# SDK v1 Protocol resource models - matching langbot-plugin-sdk/resources.py
class ModelResource(typing.TypedDict):
"""Model resource per SDK v1."""
model_id: str
model_type: str | None
provider: str | None
class ToolResource(typing.TypedDict):
"""Tool resource per SDK v1."""
tool_name: str
tool_type: str | None
description: str | None
class KnowledgeBaseResource(typing.TypedDict):
"""Knowledge base resource per SDK v1."""
kb_id: str
kb_name: str | None
kb_type: str | None
class FileResource(typing.TypedDict):
"""File resource per SDK v1."""
file_id: str
file_name: str | None
mime_type: str | None
source: str | None
class StorageResource(typing.TypedDict):
"""Storage resource per SDK v1."""
plugin_storage: bool
workspace_storage: bool
class AgentResources(typing.TypedDict):
"""Agent resources per SDK v1."""
models: list[ModelResource]
tools: list[ToolResource]
knowledge_bases: list[KnowledgeBaseResource]
files: list[FileResource]
storage: StorageResource
platform_capabilities: dict[str, typing.Any]
class AgentRuntimeContext(typing.TypedDict):
"""Agent runtime context."""
langbot_version: str | None
sdk_protocol_version: str
query_id: int | None
trace_id: str | None
deadline_at: int | None
metadata: dict[str, typing.Any]
class AgentRunContextV1(typing.TypedDict):
"""SDK v1 AgentRunContext per PROTOCOL_V1.md."""
run_id: str
trigger: AgentTrigger
conversation: ConversationContext | None
event: dict[str, typing.Any] | None # Reserved for EBA
actor: dict[str, typing.Any] | None # Reserved for EBA
subject: dict[str, typing.Any] | None # Reserved for EBA
messages: list[dict[str, typing.Any]]
input: AgentInput
resources: AgentResources
runtime: AgentRuntimeContext
config: dict[str, typing.Any]
class AgentRunContextBuilder:
"""Builder for converting Query to SDK v1 AgentRunContext.
Responsibilities:
- Generate new run_id (UUID, not query id)
- Set trigger type to 'message.received' for pipeline
- Build conversation context from session
- Convert messages to SDK format
- Build input from user_message and message_chain
- Set resources from AgentResourceBuilder result
- Build runtime context with host info, trace_id, deadline
- Set config from runner instance configuration
"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def build_context(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
resources: AgentResources,
) -> AgentRunContextV1:
"""Build AgentRunContext from Query.
Args:
query: Pipeline query
descriptor: Runner descriptor
resources: Built resources from AgentResourceBuilder
Returns:
AgentRunContextV1 dict matching PROTOCOL_V1.md
"""
# Generate new run_id
run_id = str(uuid.uuid4())
# Build trigger
trigger: AgentTrigger = {
'type': 'message.received',
'source': 'pipeline',
'timestamp': int(time.time()),
}
# Build conversation context
conversation: ConversationContext | None = None
if query.session:
conversation = {
'session_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
'conversation_id': getattr(query.session.using_conversation, 'uuid', None),
'launcher_type': query.session.launcher_type.value,
'launcher_id': query.session.launcher_id,
'sender_id': str(query.sender_id),
'bot_uuid': query.bot_uuid,
'pipeline_uuid': query.pipeline_uuid,
}
# Build input
input: AgentInput = self._build_input(query)
# Build messages
messages = self._build_messages(query)
# Get runner config
runner_config = ConfigMigration.resolve_runner_config(
query.pipeline_config,
descriptor.id,
)
# Build runtime context
runtime: AgentRuntimeContext = {
'langbot_version': self.ap.ver_mgr.get_current_version(),
'sdk_protocol_version': descriptor.protocol_version,
'query_id': query.query_id,
'trace_id': run_id, # Use run_id as trace_id for now
'deadline_at': None, # TODO: set from runner config timeout
'metadata': {
'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'),
'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'),
},
}
# Build full context
context: AgentRunContextV1 = {
'run_id': run_id,
'trigger': trigger,
'conversation': conversation,
'event': None, # Reserved for EBA
'actor': None, # Reserved for EBA
'subject': None, # Reserved for EBA
'messages': messages,
'input': input,
'resources': resources,
'runtime': runtime,
'config': runner_config,
}
return context
def _build_input(self, query: pipeline_query.Query) -> AgentInput:
"""Build AgentInput from query."""
text = None
contents: list[dict[str, typing.Any]] = []
if query.user_message:
# Extract text if content is single text element
if isinstance(query.user_message.content, list):
for elem in query.user_message.content:
contents.append(elem.model_dump(mode='json'))
if elem.type == 'text':
text = getattr(elem, 'text', None)
else:
# Single string content
text = str(query.user_message.content)
contents.append({'type': 'text', 'text': text})
# Include message_chain for platform-specific format
message_chain_dict = None
if query.message_chain:
message_chain_dict = query.message_chain.model_dump(mode='json')
return {
'text': text,
'contents': contents,
'message_chain': message_chain_dict,
'attachments': [], # TODO: extract attachments from message_chain
}
def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
"""Build messages list from query."""
messages: list[dict[str, typing.Any]] = []
if query.messages:
for msg in query.messages:
messages.append(msg.model_dump(mode='json'))
return messages

View File

@@ -0,0 +1,72 @@
"""Agent runner descriptor."""
from __future__ import annotations
import typing
import pydantic
class AgentRunnerDescriptor(pydantic.BaseModel):
"""Descriptor for an agent runner.
Represents the discovered metadata for a runner, including
its identity, capabilities, permissions, and configuration schema.
"""
id: str
"""Unique runner ID: plugin:author/plugin_name/runner_name"""
source: typing.Literal['plugin']
"""Runner source type"""
label: dict[str, str]
"""Display labels keyed by locale (e.g., en_US, zh_Hans)"""
description: dict[str, str] | None = None
"""Optional description keyed by locale"""
plugin_author: str
"""Plugin author from manifest"""
plugin_name: str
"""Plugin name from manifest"""
runner_name: str
"""AgentRunner component name from manifest"""
plugin_version: str | None = None
"""Optional plugin version"""
protocol_version: str = '1'
"""SDK protocol version, default '1'"""
config_schema: list[dict[str, typing.Any]] = []
"""Configuration schema using DynamicForm format"""
capabilities: dict[str, bool] = {}
"""Runner capabilities: streaming, tool_calling, knowledge_retrieval, etc."""
permissions: dict[str, list[str]] = {}
"""Requested permissions: models, tools, knowledge_bases, storage, files, platform_api"""
raw_manifest: dict[str, typing.Any] = {}
"""Original manifest for reference"""
model_config = pydantic.ConfigDict(
extra='allow',
)
def get_plugin_id(self) -> str:
"""Return plugin identifier as author/name."""
return f'{self.plugin_author}/{self.plugin_name}'
def supports_streaming(self) -> bool:
"""Check if runner supports streaming output."""
return self.capabilities.get('streaming', False)
def supports_tool_calling(self) -> bool:
"""Check if runner supports tool calling."""
return self.capabilities.get('tool_calling', False)
def supports_knowledge_retrieval(self) -> bool:
"""Check if runner supports knowledge retrieval."""
return self.capabilities.get('knowledge_retrieval', False)

View File

@@ -0,0 +1,37 @@
"""Agent runner errors."""
from __future__ import annotations
class AgentRunnerError(Exception):
"""Base error for agent runner operations."""
pass
class RunnerNotFoundError(AgentRunnerError):
"""Runner not found in registry."""
def __init__(self, runner_id: str):
self.runner_id = runner_id
super().__init__(f'Agent runner not found: {runner_id}')
class RunnerNotAuthorizedError(AgentRunnerError):
"""Runner not authorized for this pipeline."""
def __init__(self, runner_id: str, bound_plugins: list[str] | None):
self.runner_id = runner_id
self.bound_plugins = bound_plugins
super().__init__(f'Agent runner {runner_id} not authorized for bound_plugins={bound_plugins}')
class RunnerProtocolError(AgentRunnerError):
"""Runner protocol version mismatch or invalid manifest."""
def __init__(self, runner_id: str, message: str):
self.runner_id = runner_id
super().__init__(f'Agent runner protocol error for {runner_id}: {message}')
class RunnerExecutionError(AgentRunnerError):
"""Runner execution failed."""
def __init__(self, runner_id: str, message: str, retryable: bool = False):
self.runner_id = runner_id
self.retryable = retryable
super().__init__(f'Agent runner {runner_id} execution failed: {message}')

View File

@@ -0,0 +1,92 @@
"""Agent runner ID parsing and formatting."""
from __future__ import annotations
import dataclasses
@dataclasses.dataclass(frozen=True)
class RunnerIdParts:
"""Parsed runner ID components."""
source: str # 'plugin' (future: 'builtin')
plugin_author: str
plugin_name: str
runner_name: str
def to_plugin_id(self) -> str:
"""Return plugin identifier as author/name."""
return f'{self.plugin_author}/{self.plugin_name}'
def parse_runner_id(runner_id: str) -> RunnerIdParts:
"""Parse runner ID string into components.
Args:
runner_id: Runner ID in format 'plugin:author/plugin_name/runner_name'
Returns:
RunnerIdParts with parsed components
Raises:
ValueError: If runner_id format is invalid
"""
if runner_id.startswith('plugin:'):
parts = runner_id[7:].split('/')
if len(parts) != 3:
raise ValueError(
f'Invalid plugin runner ID format: {runner_id}. '
f'Expected: plugin:author/plugin_name/runner_name'
)
plugin_author, plugin_name, runner_name = parts
if not plugin_author or not plugin_name or not runner_name:
raise ValueError(
f'Invalid plugin runner ID: {runner_id}. '
f'author, plugin_name, and runner_name must be non-empty'
)
return RunnerIdParts(
source='plugin',
plugin_author=plugin_author,
plugin_name=plugin_name,
runner_name=runner_name,
)
else:
# For backward compatibility with old built-in runner names
# This should eventually be removed after migration
raise ValueError(
f'Invalid runner ID format: {runner_id}. '
f'Expected: plugin:author/plugin_name/runner_name'
)
def format_runner_id(
source: str,
plugin_author: str,
plugin_name: str,
runner_name: str,
) -> str:
"""Format runner ID from components.
Args:
source: Runner source ('plugin')
plugin_author: Plugin author
plugin_name: Plugin name
runner_name: Runner component name
Returns:
Runner ID string
"""
if source == 'plugin':
return f'plugin:{plugin_author}/{plugin_name}/{runner_name}'
else:
raise ValueError(f'Invalid runner source: {source}')
def is_plugin_runner_id(runner_id: str) -> bool:
"""Check if runner ID is a plugin runner.
Args:
runner_id: Runner ID string
Returns:
True if runner ID starts with 'plugin:'
"""
return runner_id.startswith('plugin:')

View File

@@ -0,0 +1,158 @@
"""Agent run orchestrator for coordinating runner execution."""
from __future__ import annotations
import typing
import traceback
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .registry import AgentRunnerRegistry
from .context_builder import AgentRunContextBuilder, AgentRunContextV1
from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .config_migration import ConfigMigration
from .errors import (
RunnerNotFoundError,
RunnerExecutionError,
)
class AgentRunOrchestrator:
"""Orchestrator for agent runner execution.
Responsibilities:
- Resolve runner ID from pipeline config (new or old format)
- Get runner descriptor from registry
- Build AgentRunContext from Query
- Build AgentResources with permission filtering
- Invoke plugin runtime RUN_AGENT action
- Normalize AgentRunResult to Pipeline messages
- Handle errors, timeouts, protocol errors
- Maintain streaming card behavior
This is the main entry point for ChatMessageHandler.
"""
ap: app.Application
registry: AgentRunnerRegistry
context_builder: AgentRunContextBuilder
resource_builder: AgentResourceBuilder
result_normalizer: AgentResultNormalizer
def __init__(
self,
ap: app.Application,
registry: AgentRunnerRegistry,
):
self.ap = ap
self.registry = registry
self.context_builder = AgentRunContextBuilder(ap)
self.resource_builder = AgentResourceBuilder(ap)
self.result_normalizer = AgentResultNormalizer(ap)
async def run_from_query(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run agent runner from pipeline query.
This is the main entry point called by ChatMessageHandler.
Args:
query: Pipeline query with pipeline_config, session, messages, etc.
Yields:
Message or MessageChunk for pipeline response
Raises:
RunnerNotFoundError: If runner not found
RunnerNotAuthorizedError: If runner not authorized
RunnerExecutionError: If runner execution failed
"""
# Resolve runner ID
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
if not runner_id:
raise RunnerNotFoundError('no runner configured')
# Get bound plugins for authorization
bound_plugins = query.variables.get('_pipeline_bound_plugins')
# Get runner descriptor
descriptor = await self.registry.get(runner_id, bound_plugins)
# Build resources
resources = await self.resource_builder.build_resources(query, descriptor)
# Build context
context = await self.context_builder.build_context(query, descriptor, resources)
# Run via plugin connector
async for result_dict in self._invoke_runner(descriptor, context):
# Normalize result
result = await self.result_normalizer.normalize(result_dict, descriptor)
if result is not None:
yield result
async def _invoke_runner(
self,
descriptor: AgentRunnerDescriptor,
context: AgentRunContextV1,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""Invoke runner via plugin connector.
Args:
descriptor: Runner descriptor
context: AgentRunContext dict
Yields:
Raw result dicts from plugin runtime
Raises:
RunnerExecutionError: If plugin system disabled or runtime error
"""
if not self.ap.plugin_connector.is_enable_plugin:
raise RunnerExecutionError(
descriptor.id,
'Plugin system is disabled',
retryable=False,
)
try:
async for result_dict in self.ap.plugin_connector.run_agent(
plugin_author=descriptor.plugin_author,
plugin_name=descriptor.plugin_name,
runner_name=descriptor.runner_name,
context=context,
):
yield result_dict
except RunnerExecutionError:
raise
except Exception as e:
# Wrap unexpected errors
self.ap.logger.error(
f'Runner {descriptor.id} unexpected error: {traceback.format_exc()}'
)
raise RunnerExecutionError(
descriptor.id,
str(e),
retryable=False,
)
def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None:
"""Resolve runner ID for telemetry/logging without full execution.
Args:
query: Pipeline query
Returns:
Runner ID string, or None
"""
return ConfigMigration.resolve_runner_id(query.pipeline_config)

View File

@@ -0,0 +1,277 @@
"""Agent runner registry for discovering and caching runner descriptors."""
from __future__ import annotations
import typing
import asyncio
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .id import parse_runner_id, format_runner_id
from .errors import RunnerNotFoundError, RunnerNotAuthorizedError
class AgentRunnerRegistry:
"""Registry for discovering and managing agent runners.
Responsibilities:
- Discover runners from plugin runtime via LIST_AGENT_RUNNERS
- Validate runner manifests (kind, metadata, spec)
- Cache discovered runners for performance
- Filter runners by bound plugins
- Handle manifest errors gracefully (log warning, skip runner)
"""
ap: app.Application
_cache: dict[str, AgentRunnerDescriptor] | None
"""Cached runner descriptors keyed by runner ID"""
_cache_lock: asyncio.Lock
"""Lock for cache refresh operations"""
def __init__(self, ap: app.Application):
self.ap = ap
self._cache = None
self._cache_lock = asyncio.Lock()
async def _discover_runners(self) -> dict[str, AgentRunnerDescriptor]:
"""Discover runners from plugin runtime.
Always discovers ALL runners (no bound_plugins filter).
The cache should contain unfiltered discovery results.
Returns:
Dict of runner descriptors keyed by runner ID
"""
if not self.ap.plugin_connector.is_enable_plugin:
return {}
runners: dict[str, AgentRunnerDescriptor] = {}
try:
# Always list all runners (bound_plugins=None)
plugin_runners = await self.ap.plugin_connector.list_agent_runners(None)
for runner_data in plugin_runners:
try:
descriptor = self._validate_and_build_descriptor(runner_data)
if descriptor is not None:
runners[descriptor.id] = descriptor
except Exception as e:
plugin_author = runner_data.get('plugin_author', 'unknown')
plugin_name = runner_data.get('plugin_name', 'unknown')
runner_name = runner_data.get('runner_name', 'unknown')
self.ap.logger.warning(
f'Invalid runner manifest for plugin:{plugin_author}/{plugin_name}/{runner_name}: {e}'
)
continue
except Exception as e:
self.ap.logger.warning(f'Failed to list agent runners from plugin runtime: {e}')
return {}
return runners
def _validate_and_build_descriptor(self, runner_data: dict[str, typing.Any]) -> AgentRunnerDescriptor | None:
"""Validate runner manifest and build descriptor.
Args:
runner_data: Raw runner data from plugin runtime with fields:
- plugin_author, plugin_name, runner_name
- manifest (full component manifest dict)
- protocol_version, capabilities, permissions, config (extracted from spec)
Returns:
AgentRunnerDescriptor if valid, None if invalid
"""
plugin_author = runner_data.get('plugin_author', '')
plugin_name = runner_data.get('plugin_name', '')
runner_name = runner_data.get('runner_name', '')
if not plugin_author or not plugin_name or not runner_name:
return None
manifest = runner_data.get('manifest', {})
# Validate kind
kind = manifest.get('kind', '')
if kind != 'AgentRunner':
return None
# Validate metadata
metadata = manifest.get('metadata', {})
name = metadata.get('name', '')
if not name:
return None
# metadata.label must exist
label = metadata.get('label', {})
if not label:
label = {name: name} # fallback
# SDK now provides these directly extracted from spec
protocol_version = runner_data.get('protocol_version', '1')
config_schema = runner_data.get('config', [])
capabilities = runner_data.get('capabilities', {})
permissions = runner_data.get('permissions', {})
# Build descriptor
runner_id = format_runner_id(
source='plugin',
plugin_author=plugin_author,
plugin_name=plugin_name,
runner_name=runner_name,
)
return AgentRunnerDescriptor(
id=runner_id,
source='plugin',
label=label,
description=metadata.get('description') or runner_data.get('runner_description'),
plugin_author=plugin_author,
plugin_name=plugin_name,
runner_name=runner_name,
plugin_version=runner_data.get('plugin_version'),
protocol_version=protocol_version,
config_schema=config_schema,
capabilities=capabilities,
permissions=permissions,
raw_manifest=manifest,
)
async def refresh(self) -> None:
"""Refresh runner cache.
Always discovers ALL runners (no bound_plugins filter).
The cache contains unfiltered discovery results.
"""
async with self._cache_lock:
self._cache = await self._discover_runners()
async def list_runners(
self,
bound_plugins: list[str] | None = None,
use_cache: bool = True,
) -> list[AgentRunnerDescriptor]:
"""List available runners.
Args:
bound_plugins: Optional filter for bound plugins (applied locally)
use_cache: Use cached data if available
Returns:
List of runner descriptors
"""
if use_cache and self._cache is not None:
# Filter from cache
return self._filter_runners_by_bound_plugins(self._cache, bound_plugins)
# Discover fresh (always full list)
runners = await self._discover_runners()
# Update cache (full list, unfiltered)
async with self._cache_lock:
self._cache = runners
# Filter locally
return self._filter_runners_by_bound_plugins(runners, bound_plugins)
def _filter_runners_by_bound_plugins(
self,
runners: dict[str, AgentRunnerDescriptor],
bound_plugins: list[str] | None,
) -> list[AgentRunnerDescriptor]:
"""Filter runners by bound plugins.
Args:
runners: Dict of runner descriptors
bound_plugins: Optional filter (None means all plugins allowed)
Returns:
Filtered list of runner descriptors
"""
if bound_plugins is None:
# All plugins allowed
return list(runners.values())
allowed_plugin_ids = set(bound_plugins)
filtered = []
for descriptor in runners.values():
plugin_id = descriptor.get_plugin_id()
if plugin_id in allowed_plugin_ids:
filtered.append(descriptor)
return filtered
async def get(
self,
runner_id: str,
bound_plugins: list[str] | None = None,
) -> AgentRunnerDescriptor:
"""Get a specific runner descriptor.
Args:
runner_id: Runner ID to lookup
bound_plugins: Optional bound plugins filter
Returns:
AgentRunnerDescriptor
Raises:
RunnerNotFoundError: If runner not found
RunnerNotAuthorizedError: If runner not in bound plugins
"""
# Parse and validate runner ID format
try:
parse_runner_id(runner_id)
except ValueError as e:
raise RunnerNotFoundError(runner_id) from e
# Get from cache or discover (always full list)
if self._cache is None:
await self.refresh()
if self._cache is None:
raise RunnerNotFoundError(runner_id)
descriptor = self._cache.get(runner_id)
if descriptor is None:
raise RunnerNotFoundError(runner_id)
# Check authorization
if bound_plugins is not None:
plugin_id = descriptor.get_plugin_id()
if plugin_id not in bound_plugins:
raise RunnerNotAuthorizedError(runner_id, bound_plugins)
return descriptor
async def get_runner_metadata_for_pipeline(self) -> list[dict[str, typing.Any]]:
"""Get runner metadata for pipeline configuration UI.
Returns runner options and their config schemas for the DynamicForm.
"""
# Get all runners (no bound plugin filter for metadata listing)
runners = await self.list_runners(bound_plugins=None)
options = []
stages = []
for descriptor in runners:
# Add runner option
options.append({
'name': descriptor.id,
'label': descriptor.label,
'description': descriptor.description,
})
# Add config schema as stage if not empty
if descriptor.config_schema:
stages.append({
'name': descriptor.id,
'label': descriptor.label,
'description': descriptor.description,
'config': descriptor.config_schema,
})
return options, stages

View File

@@ -0,0 +1,210 @@
"""Agent resource builder for constructing authorized resources."""
from __future__ import annotations
import typing
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .context_builder import (
AgentResources,
ModelResource,
ToolResource,
KnowledgeBaseResource,
StorageResource,
)
class AgentResourceBuilder:
"""Builder for constructing AgentResources with permission filtering.
Responsibilities:
- Apply 3-layer permission filtering:
1. Runner manifest declared permissions
2. Pipeline extensions_preference (bound plugins/MCP servers)
3. Runner instance config selected resources
- Build models list from authorized models
- Build tools list from bound plugins/MCP servers
- Build knowledge_bases list from config
- Build storage and files permissions summary
Note: This only builds the resource declaration. The actual proxy actions
in handler.py must still validate against ctx.resources at runtime.
Resource field names match SDK v1 Protocol:
- ModelResource: model_id, model_type, provider
- ToolResource: tool_name, tool_type, description
- KnowledgeBaseResource: kb_id, kb_name, kb_type
- StorageResource: plugin_storage, workspace_storage
"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def build_resources(
self,
query: typing.Any, # pipeline_query.Query
descriptor: AgentRunnerDescriptor,
) -> AgentResources:
"""Build AgentResources from query and runner descriptor.
Args:
query: Pipeline query with pipeline_config and variables
descriptor: Runner descriptor with permissions and capabilities
Returns:
AgentResources dict with filtered resource lists
"""
# Get bound plugins and MCP servers from query
bound_plugins = query.variables.get('_pipeline_bound_plugins')
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers')
# Layer 1: Runner manifest permissions
manifest_perms = descriptor.permissions
# Layer 2: Pipeline extensions_preference (already in bound_plugins/MCP servers)
# Layer 3: Runner instance config (from pipeline_config) - resolved via ConfigMigration
from .config_migration import ConfigMigration
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, descriptor.id)
# Build each resource category
models = await self._build_models(manifest_perms, query)
tools = await self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query)
knowledge_bases = await self._build_knowledge_bases(manifest_perms, runner_config, query)
storage = self._build_storage(manifest_perms)
return {
'models': models,
'tools': tools,
'knowledge_bases': knowledge_bases,
'files': [], # Files are populated at runtime
'storage': storage,
'platform_capabilities': {}, # Reserved for EBA
}
async def _build_models(
self,
manifest_perms: dict[str, list[str]],
query: typing.Any,
) -> list[ModelResource]:
"""Build models list with SDK v1 field names."""
models: list[ModelResource] = []
# Check manifest permission
model_perms = manifest_perms.get('models', [])
if 'invoke' not in model_perms and 'stream' not in model_perms:
return models
# Get model from query (preproc already resolved this)
model_uuid = getattr(query, 'use_llm_model_uuid', None)
if not model_uuid:
return models
try:
model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
if model and model.model_entity:
# Use SDK v1 field names: model_id, model_type, provider
models.append({
'model_id': model_uuid,
'model_type': model.model_entity.model_type,
'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None,
})
except Exception:
pass
# Add fallback models if present
fallback_uuids = query.variables.get('_fallback_model_uuids', [])
for fb_uuid in fallback_uuids:
try:
model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
if model and model.model_entity:
models.append({
'model_id': fb_uuid,
'model_type': model.model_entity.model_type,
'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None,
})
except Exception:
pass
return models
async def _build_tools(
self,
manifest_perms: dict[str, list[str]],
bound_plugins: list[str] | None,
bound_mcp_servers: list[str] | None,
query: typing.Any,
) -> list[ToolResource]:
"""Build tools list with SDK v1 field names."""
tools: list[ToolResource] = []
# Check manifest permission
tool_perms = manifest_perms.get('tools', [])
if 'list' not in tool_perms and 'call' not in tool_perms:
return tools
# Get tools from query (preproc already resolved this for local-agent)
use_funcs = getattr(query, 'use_funcs', [])
for tool in use_funcs:
# Use SDK v1 field names: tool_name, tool_type, description
tools.append({
'tool_name': tool.name,
'tool_type': None, # Tool type not available in current LLMTool
'description': tool.description,
})
return tools
async def _build_knowledge_bases(
self,
manifest_perms: dict[str, list[str]],
runner_config: dict[str, typing.Any],
query: typing.Any,
) -> list[KnowledgeBaseResource]:
"""Build knowledge bases list with SDK v1 field names."""
kb_resources: list[KnowledgeBaseResource] = []
# Check manifest permission
kb_perms = manifest_perms.get('knowledge_bases', [])
if 'list' not in kb_perms and 'retrieve' not in kb_perms:
return kb_resources
# Get knowledge base UUIDs from config
kb_uuids = runner_config.get('knowledge-bases', [])
if not kb_uuids:
# Old single KB config
old_kb_uuid = runner_config.get('knowledge-base', '')
if old_kb_uuid and old_kb_uuid != '__none__':
kb_uuids = [old_kb_uuid]
# Also check query variables (may be modified by plugin PromptPreProcessing)
kb_uuids_from_vars = query.variables.get('_knowledge_base_uuids', [])
if kb_uuids_from_vars:
kb_uuids = kb_uuids_from_vars
for kb_uuid in kb_uuids:
try:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if kb:
# Use SDK v1 field names: kb_id, kb_name, kb_type
kb_resources.append({
'kb_id': kb_uuid,
'kb_name': kb.get_name(),
'kb_type': kb.knowledge_base_entity.kb_type if hasattr(kb.knowledge_base_entity, 'kb_type') else None,
})
except Exception:
pass
return kb_resources
def _build_storage(
self,
manifest_perms: dict[str, list[str]],
) -> StorageResource:
"""Build storage permissions with SDK v1 field names."""
storage_perms = manifest_perms.get('storage', [])
return {
'plugin_storage': 'plugin' in storage_perms,
'workspace_storage': 'workspace' in storage_perms,
}

View File

@@ -0,0 +1,180 @@
"""Agent result normalizer for converting SDK v1 AgentRunResult to Pipeline messages."""
from __future__ import annotations
import typing
from langbot_plugin.api.entities.builtin.provider import message as provider_message
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .errors import RunnerExecutionError, RunnerProtocolError
# Maximum size for a single result payload (prevent memory exhaustion)
MAX_RESULT_SIZE_BYTES = 1024 * 1024 # 1 MB
class AgentResultNormalizer:
"""Normalizer for converting SDK v1 AgentRunResult to Pipeline messages.
Responsibilities:
- Accept only SDK v1 result types (message.delta, message.completed, etc.)
- Map message.delta -> MessageChunk
- Map message.completed -> Message
- Map run.completed (with message) -> Message
- Handle run.failed as controlled error
- Ignore unknown types with warning
- Validate result size
- Validate message schema
Per PROTOCOL_V1.md, accepted types:
- message.delta
- message.completed
- tool.call.started
- tool.call.completed
- state.updated
- run.completed
- run.failed
- action.requested (log only, don't execute)
"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def normalize(
self,
result_dict: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
) -> provider_message.Message | provider_message.MessageChunk | None:
"""Normalize AgentRunResult to Message or MessageChunk.
Args:
result_dict: Raw result dict from plugin runtime
descriptor: Runner descriptor for error context
Returns:
Message, MessageChunk, or None (for non-message events)
Raises:
RunnerExecutionError: On run.failed
RunnerProtocolError: On invalid result format
"""
# Validate result type
result_type = result_dict.get('type')
if not result_type:
raise RunnerProtocolError(descriptor.id, 'Missing result type')
# Validate result size
try:
import json
result_json = json.dumps(result_dict)
if len(result_json) > MAX_RESULT_SIZE_BYTES:
self.ap.logger.warning(
f'Runner {descriptor.id} result too large ({len(result_json)} bytes), truncating'
)
# Truncate content if possible
data = result_dict.get('data', {})
if 'chunk' in data or 'message' in data:
content = data.get('chunk', {}).get('content', '') or data.get('message', {}).get('content', '')
if isinstance(content, str) and len(content) > 10000:
# Keep reasonable length
data['chunk'] = {'role': 'assistant', 'content': content[:10000] + '...[truncated]'}
except Exception:
pass
# Handle each result type
data = result_dict.get('data', {})
if result_type == 'message.delta':
return self._normalize_message_delta(data, descriptor)
elif result_type == 'message.completed':
return self._normalize_message_completed(data, descriptor)
elif result_type == 'tool.call.started':
# Log only, don't yield to pipeline
self.ap.logger.debug(
f'Runner {descriptor.id} tool call started: {data.get("tool_name", "unknown")}'
)
return None
elif result_type == 'tool.call.completed':
# Log only, don't yield to pipeline
self.ap.logger.debug(
f'Runner {descriptor.id} tool call completed: {data.get("tool_name", "unknown")}'
)
return None
elif result_type == 'state.updated':
# Log for telemetry, don't yield
self.ap.logger.debug(
f'Runner {descriptor.id} state updated: {data.get("key", "unknown")}={data.get("value", "...")}'
)
return None
elif result_type == 'run.completed':
# May include final message
if 'message' in data:
return self._normalize_message_completed(data, descriptor)
# If no message, it's just completion signal
return None
elif result_type == 'run.failed':
error_msg = data.get('error', 'Unknown error')
error_code = data.get('code', 'unknown')
retryable = data.get('retryable', False)
raise RunnerExecutionError(
descriptor.id,
f'{error_msg} (code: {error_code})',
retryable=retryable,
)
elif result_type == 'action.requested':
# Reserved for EBA - log only, don't execute
self.ap.logger.info(
f'Runner {descriptor.id} requested action (not executed in current phase): '
f'{data.get("action", "unknown")}'
)
return None
else:
# Unknown type - warn and ignore (SDK v1 only)
self.ap.logger.warning(
f'Runner {descriptor.id} returned unknown result type: {result_type}. '
f'Expected SDK v1 types (message.delta, message.completed, run.completed, run.failed, etc.)'
)
return None
def _normalize_message_delta(
self,
data: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
) -> provider_message.MessageChunk:
"""Normalize message.delta to MessageChunk."""
chunk_data = data.get('chunk', {})
if not chunk_data:
raise RunnerProtocolError(descriptor.id, 'message.delta missing chunk data')
try:
chunk = provider_message.MessageChunk.model_validate(chunk_data)
return chunk
except Exception as e:
raise RunnerProtocolError(descriptor.id, f'Invalid chunk schema: {e}')
def _normalize_message_completed(
self,
data: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
) -> provider_message.Message:
"""Normalize message.completed to Message."""
message_data = data.get('message', {})
if not message_data:
raise RunnerProtocolError(descriptor.id, 'message.completed missing message data')
try:
msg = provider_message.Message.model_validate(message_data)
return msg
except Exception as e:
raise RunnerProtocolError(descriptor.id, f'Invalid message schema: {e}')

View File

@@ -31,7 +31,7 @@ class PipelineService:
self.ap = ap
async def get_pipeline_metadata(self) -> list[dict]:
"""Get pipeline metadata with dynamically loaded plugin runners"""
"""Get pipeline metadata with dynamically loaded plugin runners from registry"""
import copy
# Deep copy AI metadata to avoid modifying the original
@@ -48,43 +48,20 @@ class PipelineService:
# Find the runner select config
for config_item in runner_stage.get('config', []):
if config_item.get('name') == 'runner':
# Get plugin agent runners
# Get plugin agent runners from registry
try:
plugin_runners = await self.ap.plugin_connector.list_agent_runners()
runner_options, runner_stages = await self.ap.agent_runner_registry.get_runner_metadata_for_pipeline()
# Add plugin runners to options
for runner in plugin_runners:
manifest = runner.get('manifest', {})
metadata = manifest.get('metadata', {})
for option in runner_options:
config_item['options'].append(option)
# Format: plugin:author/plugin_name/runner_name
runner_value = (
f'plugin:{runner["plugin_author"]}/{runner["plugin_name"]}/{runner["runner_name"]}'
)
# Add to options
config_item['options'].append(
{
'name': runner_value,
'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}),
'description': metadata.get('description'),
}
)
# Add corresponding stage configuration for this runner
spec_config = manifest.get('spec', {}).get('config', [])
if spec_config:
ai_metadata['stages'].append(
{
'name': runner_value,
'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}),
'description': metadata.get('description'),
'config': spec_config,
}
)
# Add corresponding stage configuration for each runner
for stage_config in runner_stages:
ai_metadata['stages'].append(stage_config)
except Exception as e:
self.ap.logger.warning(f'Failed to load plugin agent runners: {e}')
self.ap.logger.warning(f'Failed to load plugin agent runners from registry: {e}')
return [
self.ap.pipeline_config_meta_trigger,

View File

@@ -4,6 +4,7 @@ import logging
import asyncio
import traceback
import os
from typing import TYPE_CHECKING
from ..platform import botmgr as im_mgr
from ..platform.webhook_pusher import WebhookPusher
@@ -44,6 +45,9 @@ from ..vector import mgr as vectordb_mgr
from ..telemetry import telemetry as telemetry_module
from ..survey import manager as survey_module
if TYPE_CHECKING:
from ..agent.runner import AgentRunnerRegistry, AgentRunOrchestrator
class Application:
"""Runtime application object and context"""
@@ -158,6 +162,11 @@ class Application:
maintenance_service: maintenance_service.MaintenanceService = None
# Agent runner subsystem
agent_runner_registry: AgentRunnerRegistry = None
agent_run_orchestrator: AgentRunOrchestrator = None
def __init__(self):
pass

View File

@@ -36,6 +36,7 @@ from ...vector import mgr as vectordb_mgr
from .. import taskmgr
from ...telemetry import telemetry as telemetry_module
from ...survey import manager as survey_module
from ...agent.runner import AgentRunnerRegistry, AgentRunOrchestrator
@stage.stage_class('BuildAppStage')
@@ -179,5 +180,12 @@ class BuildAppStage(stage.BootingStage):
await plugin_connector_inst.initialize()
ap.plugin_connector = plugin_connector_inst
# Initialize agent runner subsystem
agent_runner_registry_inst = AgentRunnerRegistry(ap)
ap.agent_runner_registry = agent_runner_registry_inst
agent_run_orchestrator_inst = AgentRunOrchestrator(ap, agent_runner_registry_inst)
ap.agent_run_orchestrator = agent_run_orchestrator_inst
ctrl = controller.Controller(ap)
ap.ctrl = ctrl

View File

@@ -9,6 +9,12 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.platform.events as platform_events
from ...agent.runner.config_migration import ConfigMigration
# Official local-agent runner ID
LOCAL_AGENT_RUNNER_ID = 'plugin:langbot/local-agent/default'
@stage.stage_class('PreProcessor')
class PreProcessor(stage.PipelineStage):
@@ -31,16 +37,27 @@ class PreProcessor(stage.PipelineStage):
stage_inst_name: str,
) -> entities.StageProcessResult:
"""Process"""
selected_runner = query.pipeline_config['ai']['runner']['runner']
# Resolve runner ID using ConfigMigration (supports both new and old formats)
runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config)
# Get runner config (from new ai.runner_config or old ai.<runner-name>)
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {}
session = await self.ap.sess_mgr.get_session(query)
# Determine if this is a local-agent runner (built-in LLM capabilities)
# Check by runner_id OR by legacy runner field for backward compatibility
is_local_agent = runner_id == LOCAL_AGENT_RUNNER_ID or (
runner_id is None and
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner') == 'local-agent'
)
# When not local-agent, llm_model is None
llm_model = None
if selected_runner == 'local-agent':
if is_local_agent:
# Read model config — new format is { primary: str, fallbacks: [str] },
# but handle legacy plain string for backward compatibility
model_config = query.pipeline_config['ai']['local-agent'].get('model', {})
model_config = runner_config.get('model', {})
if isinstance(model_config, str):
# Legacy format: plain UUID string
primary_uuid = model_config
@@ -67,10 +84,17 @@ class PreProcessor(stage.PipelineStage):
if valid_fallbacks:
query.variables['_fallback_model_uuids'] = valid_fallbacks
# Get prompt config - for local-agent, use runner_config; for others, use default prompt
prompt_config = runner_config.get('prompt', [
{'role': 'system', 'content': 'You are a helpful assistant.'}
]) if is_local_agent else [
{'role': 'system', 'content': 'You are a helpful assistant.'}
]
conversation = await self.ap.sess_mgr.get_conversation(
query,
session,
query.pipeline_config['ai']['local-agent']['prompt'],
prompt_config,
query.pipeline_uuid,
query.bot_uuid,
)
@@ -79,7 +103,7 @@ class PreProcessor(stage.PipelineStage):
# been idle for longer than the configured conversation expire time.
# The idle window is measured from the last preprocess/update time, not
# from the conversation creation time.
conversation_expire_time = query.pipeline_config.get('ai', {}).get('runner', {}).get('expire-time', None)
conversation_expire_time = ConfigMigration.get_expire_time(query.pipeline_config)
now = datetime.datetime.now()
if conversation_expire_time is not None and conversation_expire_time > 0:
last_update_time = getattr(conversation, 'update_time', None) or getattr(conversation, 'create_time', None)
@@ -101,7 +125,7 @@ class PreProcessor(stage.PipelineStage):
query.prompt = conversation.prompt.copy()
query.messages = conversation.messages.copy()
if selected_runner == 'local-agent':
if is_local_agent:
query.use_funcs = []
if llm_model:
query.use_llm_model_uuid = llm_model.model_entity.uuid
@@ -149,7 +173,7 @@ class PreProcessor(stage.PipelineStage):
# Check if this model supports vision, if not, remove all images
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
if (
selected_runner == 'local-agent'
is_local_agent
and llm_model
and not llm_model.model_entity.abilities.__contains__('vision')
):
@@ -162,14 +186,15 @@ class PreProcessor(stage.PipelineStage):
content_list: list[provider_message.ContentElement] = []
plain_text = ''
quote_msg = query.pipeline_config['trigger'].get('misc', '').get('combine-quote-message')
quote_msg = query.pipeline_config['trigger'].get('misc', {}).get('combine-quote-message', False)
for me in query.message_chain:
if isinstance(me, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(me.text))
plain_text += me.text
elif isinstance(me, platform_message.Image):
if selected_runner != 'local-agent' or (
# Allow images for non-local-agent runners or if local-agent has vision
if not is_local_agent or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if me.base64 is not None:
@@ -190,7 +215,7 @@ class PreProcessor(stage.PipelineStage):
if isinstance(msg, platform_message.Plain):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if selected_runner != 'local-agent' or (
if not is_local_agent or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
):
if msg.base64 is not None:
@@ -214,9 +239,10 @@ class PreProcessor(stage.PipelineStage):
# Extract knowledge base UUIDs into query variables so plugins can modify them
# during PromptPreProcessing before the runner performs retrieval.
kb_uuids = query.pipeline_config['ai']['local-agent'].get('knowledge-bases', [])
# Only for local-agent runner
kb_uuids = runner_config.get('knowledge-bases', []) if is_local_agent else []
if not kb_uuids:
old_kb_uuid = query.pipeline_config['ai']['local-agent'].get('knowledge-base', '')
old_kb_uuid = runner_config.get('knowledge-base', '') if is_local_agent else ''
if old_kb_uuid and old_kb_uuid != '__none__':
kb_uuids = [old_kb_uuid]
query.variables['_knowledge_base_uuids'] = list(kb_uuids)

View File

@@ -9,99 +9,28 @@ from datetime import datetime
from .. import handler
from ... import entities
from ....provider import runner as runner_module
import langbot_plugin.api.entities.events as events
from ....utils import importutil, constants, runner as runner_utils
from ....provider import runners
from ....utils import constants, runner as runner_utils
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
importutil.import_modules_in_pkg(runners)
class PluginAgentRunnerWrapper(runner_module.RequestRunner):
"""Wrapper to run AgentRunner from plugin"""
def __init__(self, ap, plugin_author: str, plugin_name: str, runner_name: str, pipeline_config: dict):
super().__init__(ap, pipeline_config)
self.plugin_author = plugin_author
self.plugin_name = plugin_name
self.runner_name = runner_name
self.name = f'plugin:{plugin_author}/{plugin_name}/{runner_name}'
async def run(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run the plugin agent runner"""
# Build AgentRunContext
context = AgentRunContext(
query_id=query.query_id,
session=query.session,
messages=query.messages,
user_message=query.user_message.content[0]
if isinstance(query.user_message.content, list)
else provider_message.ContentElement.from_text(query.user_message.content),
use_funcs=query.use_funcs,
extra_config=self.pipeline_config.get('ai', {}).get(self.runner_name, {}),
)
# Call plugin connector to run agent
async for result_dict in self.ap.plugin_connector.run_agent(
plugin_author=self.plugin_author,
plugin_name=self.plugin_name,
runner_name=self.runner_name,
context=context.model_dump(),
):
# Convert result to Message/MessageChunk
result_type = result_dict.get('type')
if result_type == 'chunk':
# Stream chunk
chunk_data = result_dict.get('message_chunk')
if chunk_data:
yield provider_message.MessageChunk.model_validate(chunk_data)
elif result_type == 'text':
# Text content
content = result_dict.get('content', '')
yield provider_message.MessageChunk(
role='assistant',
content=content,
)
elif result_type == 'tool_call':
# Tool call notification (may not need to yield anything here)
pass
elif result_type == 'finish':
# Final message
message_data = result_dict.get('message')
if message_data:
yield provider_message.Message.model_validate(message_data)
else:
# Fallback: create message from content
content = result_dict.get('content', '')
yield provider_message.Message(
role='assistant',
content=content,
)
class ChatMessageHandler(handler.MessageHandler):
"""Chat message handler using AgentRunOrchestrator.
This handler delegates all runner execution to the agent_run_orchestrator,
which resolves runner ID, builds context, invokes plugin runtime,
and normalizes results.
"""
async def handle(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[entities.StageProcessResult, None]:
"""处理"""
# 调API
# 生成器
# 触发插件事件
"""Handle chat message by delegating to AgentRunOrchestrator."""
# Trigger plugin event
event_class = (
events.PersonNormalMessageReceived
if query.launcher_type == provider_session.LauncherTypes.PERSON
@@ -122,7 +51,7 @@ class ChatMessageHandler(handler.MessageHandler):
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
event_ctx = await self.ap.plugin_connector.emit_event(event, bound_plugins)
is_create_card = False # 判断下是否需要创建流式卡片
is_create_card = False # Track if streaming card was created
if event_ctx.is_prevented_default():
if event_ctx.event.reply_message_chain is not None:
@@ -153,103 +82,85 @@ class ChatMessageHandler(handler.MessageHandler):
is_stream = False
try:
runner_name = query.pipeline_config['ai']['runner']['runner']
# Check if it's a built-in runner
runner = None
for r in runner_module.preregistered_runners:
if r.name == runner_name:
runner = r(self.ap, query.pipeline_config)
break
# If not found in built-in runners, check plugin runners
if runner is None:
# Parse runner name: format is "plugin:author/plugin_name/runner_name"
if runner_name.startswith('plugin:'):
parts = runner_name[7:].split('/') # Remove "plugin:" prefix
if len(parts) == 3:
plugin_author, plugin_name, component_runner_name = parts
runner = PluginAgentRunnerWrapper(
self.ap, plugin_author, plugin_name, component_runner_name, query.pipeline_config
)
else:
raise ValueError(
f'Invalid plugin runner name format: {runner_name}. Expected: plugin:author/name/runner'
)
else:
raise ValueError(f'Request Runner not found: {runner_name}')
# Mark start time for telemetry
start_ts = time.time()
if is_stream:
resp_message_id = uuid.uuid4()
chunk_count = 0 # Track streaming chunks to reduce excessive logging
# Create a single resp_message_id for the entire streaming response
resp_message_id = uuid.uuid4()
async for result in runner.run(query):
result.resp_message_id = str(resp_message_id)
# Use AgentRunOrchestrator to run the agent
# This replaces direct runner lookup and PluginAgentRunnerWrapper
async for result in self.ap.agent_run_orchestrator.run_from_query(query):
result.resp_message_id = str(resp_message_id)
# For streaming mode, pop previous response before adding new chunk
# This allows incremental card updates
if is_stream:
if query.resp_messages:
query.resp_messages.pop()
if query.resp_message_chain:
query.resp_message_chain.pop()
# 此时连接外部 AI 服务正常,创建卡片
if not is_create_card: # 只有不是第一次才创建卡片
await query.adapter.create_message_card(str(resp_message_id), query.message_event)
is_create_card = True
query.resp_messages.append(result)
chunk_count += 1
# Only log every 10th chunk to reduce excessive logging during streaming
# This prevents memory overflow from thousands of log entries per conversation
# First chunk uses INFO level to confirm connection establishment
if chunk_count == 1:
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming started: {self.cut_str(result.readable_str())}'
)
elif chunk_count % 10 == 0:
self.ap.logger.debug(
f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}'
)
# Create streaming card on first result (connection established)
if is_stream and not is_create_card:
await query.adapter.create_message_card(str(resp_message_id), query.message_event)
is_create_card = True
if result.content is not None:
text_length += len(result.content)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
# Log final summary after streaming completes
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars'
)
else:
async for result in runner.run(query):
query.resp_messages.append(result)
query.resp_messages.append(result)
# Logging (reduce verbosity for streaming chunks)
if not is_stream:
self.ap.logger.info(
f'Conversation({query.query_id}) Response: {self.cut_str(result.readable_str())}'
)
if result.content is not None:
text_length += len(result.content)
if result.content is not None:
text_length += len(result.content)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
# Log final summary after streaming completes
if is_stream:
chunk_count = len(query.resp_messages)
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars'
)
# Update conversation history
query.session.using_conversation.messages.append(query.user_message)
query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e:
# Import orchestrator errors for specific handling
from ....agent.runner.errors import (
RunnerNotFoundError,
RunnerNotAuthorizedError,
RunnerExecutionError,
)
error_info = f'{traceback.format_exc()}'
self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {error_info}')
traceback.print_exc()
exception_handling = query.pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
# Handle specific runner errors with appropriate messages
if isinstance(e, RunnerNotFoundError):
user_notice = f'Agent runner not found: {e.runner_id}'
elif isinstance(e, RunnerNotAuthorizedError):
user_notice = 'Agent runner not authorized for this pipeline'
elif isinstance(e, RunnerExecutionError):
if e.retryable:
user_notice = 'Agent runner temporarily unavailable. Please try again.'
else:
user_notice = 'Agent runner execution failed.'
else:
# Use existing exception handling
exception_handling = query.pipeline_config['output']['misc'].get('exception-handling', 'show-hint')
if exception_handling == 'show-error':
user_notice = f'{e}'
elif exception_handling == 'show-hint':
user_notice = query.pipeline_config['output']['misc'].get('failure-hint', 'Request failed.')
else: # hide
user_notice = None
if exception_handling == 'show-error':
user_notice = f'{e}'
elif exception_handling == 'show-hint':
user_notice = query.pipeline_config['output']['misc'].get('failure-hint', 'Request failed.')
else: # hide
user_notice = None
yield entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT,
@@ -259,7 +170,7 @@ class ChatMessageHandler(handler.MessageHandler):
debug_notice=traceback.format_exc(),
)
finally:
# Telemetry reporting: collect minimal per-query execution info and send asynchronously
# Telemetry reporting
try:
end_ts = time.time()
duration_ms = None
@@ -267,16 +178,14 @@ class ChatMessageHandler(handler.MessageHandler):
duration_ms = int((end_ts - start_ts) * 1000)
adapter_name = query.adapter.__class__.__name__ if hasattr(query, 'adapter') else None
runner_name = (
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner')
if query.pipeline_config
else None
)
# Model name if using localagent
# Use orchestrator to resolve runner ID for telemetry
runner_name = self.ap.agent_run_orchestrator.resolve_runner_id_for_telemetry(query)
# Model name if available
model_name = None
try:
if runner_name == 'local-agent' and getattr(query, 'use_llm_model_uuid', None):
if getattr(query, 'use_llm_model_uuid', None):
m = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid)
if m and getattr(m, 'model_entity', None):
model_name = getattr(m.model_entity, 'name', None)
@@ -286,7 +195,7 @@ class ChatMessageHandler(handler.MessageHandler):
pipeline_plugins = query.variables.get('_pipeline_bound_plugins', None)
runner_category = runner_utils.get_runner_category_from_runner(
runner_name, runner, query.pipeline_config
runner_name, None, query.pipeline_config
)
payload = {
@@ -304,7 +213,6 @@ class ChatMessageHandler(handler.MessageHandler):
'timestamp': datetime.utcnow().isoformat(),
}
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
await self.ap.telemetry.start_send_task(payload)
# Trigger survey event on first successful non-WebSocket response
@@ -312,5 +220,4 @@ class ChatMessageHandler(handler.MessageHandler):
if self.ap.survey:
await self.ap.survey.trigger_event('first_bot_response_success')
except Exception as ex:
# Ensure telemetry issues do not affect normal flow
self.ap.logger.warning(f'Failed to send telemetry: {ex}')

View File

@@ -600,14 +600,16 @@ class PluginRuntimeConnector:
yield cmd_ret
# AgentRunner methods
async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[ComponentManifest]:
"""List all available AgentRunner components."""
async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[dict[str, Any]]:
"""List all available AgentRunner components.
Returns list of dicts with plugin_author, plugin_name, runner_name, manifest, etc.
"""
if not self.is_enable_plugin:
return []
runners_data = await self.handler.list_agent_runners(include_plugins=bound_plugins)
runners = [ComponentManifest.model_validate(runner) for runner in runners_data]
return runners
return runners_data
async def run_agent(
self,
@@ -625,10 +627,18 @@ class PluginRuntimeConnector:
context: AgentRunContext as dict
Yields:
AgentRunReturn results as dicts
AgentRunResult dicts per Protocol v1
"""
if not self.is_enable_plugin:
yield {'type': 'finish', 'finish_reason': 'error', 'content': 'Plugin system is disabled'}
# Return v1 protocol run.failed
yield {
'type': 'run.failed',
'data': {
'error': 'Plugin system is disabled',
'code': 'plugin.disabled',
'retryable': False,
},
}
return
gen = self.handler.run_agent(plugin_author, plugin_name, runner_name, context)

View File

@@ -419,76 +419,6 @@ class RuntimeConnectionHandler(handler.Handler):
message=f'Failed to execute tool {tool_name}: {e}',
)
@self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE)
async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse:
"""Retrieve knowledge from a knowledge base"""
kb_uuid = data['kb_uuid']
query = data['query']
top_k = data.get('top_k', 5)
try:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if kb is None:
return handler.ActionResponse.error(
message=f'Knowledge base with uuid {kb_uuid} not found',
)
results = await kb.retrieve(query=query, top_k=top_k)
# Convert results to dict format
results_data = [
{
'id': r.id,
'content': [c.model_dump() for c in r.content],
'metadata': r.metadata,
}
for r in results
]
return handler.ActionResponse.success(
data={
'results': results_data,
},
)
except Exception as e:
traceback.print_exc()
return handler.ActionResponse.error(
message=f'Failed to retrieve knowledge: {e}',
)
@self.action(PluginToRuntimeAction.INVOKE_EMBEDDING)
async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse:
"""Invoke an embedding model"""
embedding_model_uuid = data['embedding_model_uuid']
texts = data['texts']
try:
embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid)
if embedding_model is None:
return handler.ActionResponse.error(
message=f'Embedding model with uuid {embedding_model_uuid} not found',
)
# Call embedding model to generate embeddings
embeddings = []
for text in texts:
embedding = await embedding_model.provider.invoke_embedding(
model=embedding_model,
text=text,
)
embeddings.append(embedding)
return handler.ActionResponse.success(
data={
'embeddings': embeddings,
},
)
except Exception as e:
traceback.print_exc()
return handler.ActionResponse.error(
message=f'Failed to invoke embedding model: {e}',
)
@self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE)
async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
"""Set binary storage"""
@@ -856,10 +786,11 @@ class RuntimeConnectionHandler(handler.Handler):
# Validate kb_id is in pipeline's allowed list
allowed_kb_uuids = []
if query.pipeline_config:
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
allowed_kb_uuids = local_agent_config.get('knowledge-bases', [])
from langbot.pkg.agent.runner.config_migration import ConfigMigration
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, None)
allowed_kb_uuids = runner_config.get('knowledge-bases', [])
if not allowed_kb_uuids:
old_kb_uuid = local_agent_config.get('knowledge-base', '')
old_kb_uuid = runner_config.get('knowledge-base', '')
if old_kb_uuid and old_kb_uuid != '__none__':
allowed_kb_uuids = [old_kb_uuid]
@@ -1025,6 +956,55 @@ class RuntimeConnectionHandler(handler.Handler):
return result['tools']
async def list_agent_runners(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]:
"""List agent runners from plugin runtime.
Returns list of dicts with:
- plugin_author
- plugin_name
- runner_name
- runner_description
- manifest
- protocol_version
- capabilities
- permissions
- config
"""
result = await self.call_action(
LangBotToRuntimeAction.LIST_AGENT_RUNNERS,
{
'include_plugins': include_plugins,
},
timeout=20,
)
return result['runners']
async def run_agent(
self,
plugin_author: str,
plugin_name: str,
runner_name: str,
context: dict[str, Any],
) -> typing.AsyncGenerator[dict[str, Any], None]:
"""Run an AgentRunner component.
Yields AgentRunResult dicts per Protocol v1.
"""
gen = self.call_action_generator(
LangBotToRuntimeAction.RUN_AGENT,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
'runner_name': runner_name,
'context': context,
},
timeout=300,
)
async for ret in gen:
yield ret
async def get_plugin_icon(self, plugin_author: str, plugin_name: str) -> dict[str, Any]:
"""Get plugin icon"""
result = await self.call_action(

View File

@@ -0,0 +1,2 @@
"""Tests for agent runner subsystem."""
from __future__ import annotations

View File

@@ -0,0 +1,553 @@
"""Tests for ChatMessageHandler behavior with AgentRunOrchestrator.
Tests focus on:
- Streaming mode behavior (single resp_message_id, pop/append pattern)
- Non-streaming mode behavior (no pop)
- Orchestrator invocation
- Error handling for RunnerNotFoundError, RunnerExecutionError
Avoids circular imports by using proper import structure.
"""
from __future__ import annotations
import uuid
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from langbot.pkg.agent.runner.errors import (
RunnerNotFoundError,
RunnerExecutionError,
RunnerNotAuthorizedError,
)
from langbot.pkg.agent.runner.config_migration import ConfigMigration
# Define mock classes in dependency order (no forward references needed)
class MockLauncherType:
value = 'person'
class MockConversation:
uuid = 'conv-uuid'
messages = []
class MockMessage:
role = 'user'
content = 'Hello'
class MockAdapter:
is_stream = False
async def is_stream_output_supported(self):
return self.is_stream
async def create_message_card(self, resp_message_id, message_event):
pass
class MockSession:
launcher_type = MockLauncherType()
launcher_id = 'user123'
using_conversation = MockConversation()
class MockQuery:
"""Mock Query for testing."""
def __init__(self):
self.query_id = 1
self.launcher_type = MockLauncherType()
self.launcher_id = 'user123'
self.sender_id = 'user123'
self.bot_uuid = 'bot-uuid'
self.pipeline_uuid = 'pipeline-uuid'
self.pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:langbot/local-agent/default',
},
'runner_config': {},
},
'output': {
'misc': {
'exception-handling': 'show-hint',
'failure-hint': 'Request failed.',
},
},
}
self.variables = {}
self.session = MockSession()
self.user_message = MockMessage()
self.messages = []
self.resp_messages = []
self.resp_message_chain = None
self.adapter = MockAdapter()
self.message_event = MagicMock()
self.message_chain = MagicMock()
class MockMessageChunk:
"""Mock MessageChunk for testing."""
def __init__(self, content, resp_message_id=None):
self.role = 'assistant'
self.content = content
self.resp_message_id = resp_message_id
self.is_final = False
def readable_str(self):
return self.content
class MockEventContext:
"""Mock event context for testing."""
def __init__(self, prevented=False, reply_message_chain=None, user_message_alter=None):
self._prevented = prevented
self.event = MagicMock()
self.event.reply_message_chain = reply_message_chain
self.event.user_message_alter = user_message_alter
def is_prevented_default(self):
return self._prevented
class MockAgentRunOrchestrator:
"""Mock AgentRunOrchestrator for testing."""
def __init__(self, chunks=None, error=None):
self._chunks = chunks or []
self._error = error
async def run_from_query(self, query):
"""Async generator that yields chunks or raises error."""
if self._error:
raise self._error
for chunk in self._chunks:
yield chunk
def resolve_runner_id_for_telemetry(self, query):
return 'plugin:langbot/local-agent/default'
class MockApplication:
"""Mock Application for testing."""
def __init__(self, orchestrator=None):
self.agent_run_orchestrator = orchestrator or MockAgentRunOrchestrator()
self.logger = MagicMock()
self.logger.info = MagicMock()
self.logger.debug = MagicMock()
self.logger.warning = MagicMock()
self.logger.error = MagicMock()
# Mock plugin_connector
self.plugin_connector = MagicMock()
self.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext())
# Mock telemetry
self.telemetry = MagicMock()
self.telemetry.start_send_task = AsyncMock()
# Mock survey
self.survey = MagicMock()
self.survey.trigger_event = AsyncMock()
# Mock model_mgr
self.model_mgr = MagicMock()
self.model_mgr.get_model_by_uuid = AsyncMock(return_value=None)
class TestStreamingBehavior:
"""Tests for streaming mode behavior."""
def test_single_resp_message_id_for_streaming(self):
"""Streaming mode should use single resp_message_id for entire response."""
# Simulate the streaming logic: resp_message_id created outside loop
resp_message_id = uuid.uuid4()
chunks = ['Hello', ' World', '!']
resp_messages = []
for chunk in chunks:
result = MockMessageChunk(chunk)
result.resp_message_id = str(resp_message_id)
# Pop old chunk (streaming behavior)
if resp_messages:
resp_messages.pop()
resp_messages.append(result)
# All chunks should have same resp_message_id
assert len(resp_messages) == 1 # Only last chunk remains after pop/append
assert resp_messages[0].resp_message_id == str(resp_message_id)
def test_pop_before_append_in_streaming(self):
"""Streaming mode should pop old chunk before appending new."""
resp_message_id = uuid.uuid4()
resp_messages = []
# First chunk - no pop
chunk1 = MockMessageChunk('Hello')
chunk1.resp_message_id = str(resp_message_id)
resp_messages.append(chunk1)
assert len(resp_messages) == 1
# Second chunk - pop first, then append
if resp_messages:
resp_messages.pop()
chunk2 = MockMessageChunk('Hello World')
chunk2.resp_message_id = str(resp_message_id)
resp_messages.append(chunk2)
assert len(resp_messages) == 1
assert resp_messages[0].content == 'Hello World'
def test_non_streaming_no_pop(self):
"""Non-streaming mode should NOT pop previous responses."""
resp_messages = []
# First message
msg1 = MockMessageChunk('Response 1')
resp_messages.append(msg1)
assert len(resp_messages) == 1
# Second message - should NOT pop in non-streaming
msg2 = MockMessageChunk('Response 2')
resp_messages.append(msg2)
assert len(resp_messages) == 2
class TestConfigMigrationInChatHandler:
"""Tests for ConfigMigration usage in chat handler context."""
def test_resolve_runner_id_from_pipeline_config(self):
"""Chat handler should use ConfigMigration to resolve runner ID."""
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:langbot/local-agent/default',
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:langbot/local-agent/default'
def test_resolve_runner_id_from_old_format(self):
"""ConfigMigration should handle old runner format."""
pipeline_config = {
'ai': {
'runner': {
'runner': 'local-agent',
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:langbot/local-agent/default'
class TestErrorHandling:
"""Tests for orchestrator error handling."""
def test_runner_not_found_error_properties(self):
"""RunnerNotFoundError should have runner_id property."""
error = RunnerNotFoundError('plugin:notexist/unknown/default')
assert error.runner_id == 'plugin:notexist/unknown/default'
assert 'not found' in str(error)
def test_runner_execution_error_retryable(self):
"""RunnerExecutionError should have retryable property."""
error = RunnerExecutionError(
'plugin:langbot/local-agent/default',
'Upstream timeout',
retryable=True,
)
assert error.runner_id == 'plugin:langbot/local-agent/default'
assert error.retryable is True
assert 'timeout' in str(error)
def test_runner_execution_error_not_retryable(self):
"""RunnerExecutionError can be non-retryable."""
error = RunnerExecutionError(
'plugin:langbot/local-agent/default',
'Configuration error',
retryable=False,
)
assert error.retryable is False
def test_runner_not_authorized_error_properties(self):
"""RunnerNotAuthorizedError should have bound_plugins property."""
error = RunnerNotAuthorizedError(
'plugin:langbot/local-agent/default',
['langbot/dify-agent'],
)
assert error.runner_id == 'plugin:langbot/local-agent/default'
assert error.bound_plugins == ['langbot/dify-agent']
class TestChatHandlerImports:
"""Test that chat handler can be imported without circular import."""
def test_import_chat_handler_module(self):
"""Import chat handler module should work."""
# This test verifies the import works without circular dependency
from langbot.pkg.pipeline.process.handlers import chat
assert chat.ChatMessageHandler is not None
def test_chat_handler_class_exists(self):
"""ChatMessageHandler class should be defined."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
assert ChatMessageHandler.__name__ == 'ChatMessageHandler'
def test_chat_handler_has_handle_method(self):
"""ChatMessageHandler should have async generator handle method."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
assert hasattr(ChatMessageHandler, 'handle')
# handle returns AsyncGenerator, so check for async generator function
import inspect
assert inspect.isasyncgenfunction(ChatMessageHandler.handle)
class TestChatHandlerAsyncBehavior:
"""Real async tests for ChatMessageHandler.handle() with mocked orchestrator."""
@pytest.mark.asyncio
async def test_streaming_single_resp_message_id(self):
"""Streaming mode: all chunks should have same resp_message_id."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
from langbot.pkg.pipeline import entities
# Create chunks for streaming
chunks = [
MockMessageChunk('Hello'),
MockMessageChunk('Hello World'),
MockMessageChunk('Hello World!'),
]
orchestrator = MockAgentRunOrchestrator(chunks=chunks)
mock_ap = MockApplication(orchestrator=orchestrator)
# Mock event context to not prevent default
event_ctx = MockEventContext(prevented=False)
mock_ap.plugin_connector.emit_event = AsyncMock(return_value=event_ctx)
query = MockQuery()
query.adapter.is_stream = True # Enable streaming mode
handler = ChatMessageHandler(mock_ap)
# Mock event creation and StageProcessResult to bypass pydantic validation
mock_event = MagicMock()
mock_event.return_value = MagicMock()
def make_result(*args, **kwargs):
return MagicMock(result_type=kwargs.get('result_type', entities.ResultType.CONTINUE))
with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \
patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result):
mock_events_module.PersonNormalMessageReceived = mock_event
mock_events_module.GroupNormalMessageReceived = mock_event
results = []
async for result in handler.handle(query):
results.append(result)
# Verify single resp_message_id
resp_ids = [msg.resp_message_id for msg in query.resp_messages if hasattr(msg, 'resp_message_id')]
assert len(set(resp_ids)) == 1 # All same ID
# Verify pop/append pattern: only last chunk remains
assert len(query.resp_messages) == 1
assert query.resp_messages[0].content == 'Hello World!'
@pytest.mark.asyncio
async def test_non_streaming_no_pop(self):
"""Non-streaming mode: all chunks should remain."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
from langbot.pkg.pipeline import entities
chunks = [
MockMessageChunk('Response 1'),
MockMessageChunk('Response 2'),
]
orchestrator = MockAgentRunOrchestrator(chunks=chunks)
mock_ap = MockApplication(orchestrator=orchestrator)
mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False))
query = MockQuery()
query.adapter.is_stream = False # Disable streaming mode
handler = ChatMessageHandler(mock_ap)
mock_event = MagicMock()
mock_event.return_value = MagicMock()
def make_result(*args, **kwargs):
return MagicMock(result_type=kwargs.get('result_type', entities.ResultType.CONTINUE))
with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \
patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result):
mock_events_module.PersonNormalMessageReceived = mock_event
mock_events_module.GroupNormalMessageReceived = mock_event
results = []
async for result in handler.handle(query):
results.append(result)
# No pop: all chunks should remain
assert len(query.resp_messages) == 2
assert query.resp_messages[0].content == 'Response 1'
assert query.resp_messages[1].content == 'Response 2'
@pytest.mark.asyncio
async def test_runner_not_found_error(self):
"""Handler should catch RunnerNotFoundError and return INTERRUPT."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
from langbot.pkg.pipeline import entities
orchestrator = MockAgentRunOrchestrator(
error=RunnerNotFoundError('plugin:notexist/unknown/default')
)
mock_ap = MockApplication(orchestrator=orchestrator)
mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False))
query = MockQuery()
handler = ChatMessageHandler(mock_ap)
mock_event = MagicMock()
mock_event.return_value = MagicMock()
def make_result(*args, **kwargs):
return MagicMock(
result_type=kwargs.get('result_type'),
user_notice=kwargs.get('user_notice'),
)
with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \
patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result):
mock_events_module.PersonNormalMessageReceived = mock_event
mock_events_module.GroupNormalMessageReceived = mock_event
results = []
async for result in handler.handle(query):
results.append(result)
# Should return INTERRUPT with user_notice
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert 'not found' in results[0].user_notice
@pytest.mark.asyncio
async def test_runner_not_authorized_error(self):
"""Handler should catch RunnerNotAuthorizedError and return INTERRUPT."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
from langbot.pkg.pipeline import entities
orchestrator = MockAgentRunOrchestrator(
error=RunnerNotAuthorizedError('plugin:langbot/local-agent/default', ['other/plugin'])
)
mock_ap = MockApplication(orchestrator=orchestrator)
mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False))
query = MockQuery()
handler = ChatMessageHandler(mock_ap)
mock_event = MagicMock()
mock_event.return_value = MagicMock()
def make_result(*args, **kwargs):
return MagicMock(
result_type=kwargs.get('result_type'),
user_notice=kwargs.get('user_notice'),
)
with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \
patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result):
mock_events_module.PersonNormalMessageReceived = mock_event
mock_events_module.GroupNormalMessageReceived = mock_event
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert 'not authorized' in results[0].user_notice
@pytest.mark.asyncio
async def test_runner_execution_error_retryable(self):
"""Handler should catch retryable RunnerExecutionError."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
from langbot.pkg.pipeline import entities
orchestrator = MockAgentRunOrchestrator(
error=RunnerExecutionError('plugin:langbot/local-agent/default', 'timeout', retryable=True)
)
mock_ap = MockApplication(orchestrator=orchestrator)
mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False))
query = MockQuery()
handler = ChatMessageHandler(mock_ap)
mock_event = MagicMock()
mock_event.return_value = MagicMock()
def make_result(*args, **kwargs):
return MagicMock(
result_type=kwargs.get('result_type'),
user_notice=kwargs.get('user_notice'),
)
with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \
patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result):
mock_events_module.PersonNormalMessageReceived = mock_event
mock_events_module.GroupNormalMessageReceived = mock_event
results = []
async for result in handler.handle(query):
results.append(result)
assert len(results) == 1
assert results[0].result_type == entities.ResultType.INTERRUPT
assert 'temporarily unavailable' in results[0].user_notice
@pytest.mark.asyncio
async def test_prevented_default_with_reply(self):
"""When event prevented default with reply, use reply message."""
from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler
from langbot.pkg.pipeline import entities
# Mock reply message chain
reply_chain = MockMessageChunk('Reply from plugin')
mock_ap = MockApplication()
mock_ap.plugin_connector.emit_event = AsyncMock(
return_value=MockEventContext(prevented=True, reply_message_chain=reply_chain)
)
query = MockQuery()
handler = ChatMessageHandler(mock_ap)
mock_event = MagicMock()
mock_event.return_value = MagicMock()
def make_result(*args, **kwargs):
return MagicMock(result_type=kwargs.get('result_type', entities.ResultType.CONTINUE))
with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \
patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result):
mock_events_module.PersonNormalMessageReceived = mock_event
mock_events_module.GroupNormalMessageReceived = mock_event
results = []
async for result in handler.handle(query):
results.append(result)
# Should return CONTINUE with reply message
assert len(results) == 1
assert results[0].result_type == entities.ResultType.CONTINUE
assert len(query.resp_messages) == 1

View File

@@ -0,0 +1,231 @@
"""Tests for agent runner config migration."""
from __future__ import annotations
from langbot.pkg.agent.runner.config_migration import (
ConfigMigration,
OLD_RUNNER_TO_PLUGIN_RUNNER_ID,
)
class TestOldRunnerMapping:
"""Tests for OLD_RUNNER_TO_PLUGIN_RUNNER_ID mapping."""
def test_local_agent_mapping(self):
"""Local-agent should map to official plugin."""
assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent'] == 'plugin:langbot/local-agent/default'
def test_dify_mapping(self):
"""Dify should map to official plugin."""
assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['dify-service-api'] == 'plugin:langbot/dify-agent/default'
def test_n8n_mapping(self):
"""n8n should map to official plugin."""
assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['n8n-service-api'] == 'plugin:langbot/n8n-agent/default'
def test_coze_mapping(self):
"""Coze should map to official plugin."""
assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['coze-api'] == 'plugin:langbot/coze-agent/default'
def test_all_runners_mapped(self):
"""All old runners should have mapping."""
expected_runners = [
'local-agent',
'dify-service-api',
'n8n-service-api',
'coze-api',
'dashscope-app-api',
'langflow-api',
'tbox-app-api',
]
for runner in expected_runners:
assert runner in OLD_RUNNER_TO_PLUGIN_RUNNER_ID
mapped = OLD_RUNNER_TO_PLUGIN_RUNNER_ID[runner]
assert mapped.startswith('plugin:langbot/')
assert mapped.endswith('/default')
class TestResolveRunnerId:
"""Tests for ConfigMigration.resolve_runner_id."""
def test_resolve_new_format_runner_id(self):
"""Resolve runner ID from new format."""
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:langbot/local-agent/default',
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:langbot/local-agent/default'
def test_resolve_old_format_runner_name(self):
"""Resolve runner ID from old format."""
pipeline_config = {
'ai': {
'runner': {
'runner': 'local-agent',
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:langbot/local-agent/default'
def test_resolve_old_format_plugin_runner(self):
"""Resolve already migrated plugin:* runner."""
pipeline_config = {
'ai': {
'runner': {
'runner': 'plugin:alice/my-agent/custom',
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:alice/my-agent/custom'
def test_resolve_no_runner_config(self):
"""Resolve runner ID when not configured."""
pipeline_config = {}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id is None
def test_resolve_priority_new_over_old(self):
"""New format takes priority over old format."""
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:langbot/local-agent/default',
'runner': 'dify-service-api', # This should be ignored
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:langbot/local-agent/default'
class TestResolveRunnerConfig:
"""Tests for ConfigMigration.resolve_runner_config."""
def test_resolve_new_format_config(self):
"""Resolve runner config from new format."""
pipeline_config = {
'ai': {
'runner_config': {
'plugin:langbot/local-agent/default': {
'model': 'uuid-123',
'max_round': 10,
},
},
},
}
config = ConfigMigration.resolve_runner_config(
pipeline_config,
'plugin:langbot/local-agent/default',
)
assert config == {'model': 'uuid-123', 'max_round': 10}
def test_resolve_old_format_config(self):
"""Resolve runner config from old format."""
pipeline_config = {
'ai': {
'local-agent': {
'model': 'uuid-123',
'max_round': 10,
},
},
}
config = ConfigMigration.resolve_runner_config(
pipeline_config,
'plugin:langbot/local-agent/default',
)
assert config == {'model': 'uuid-123', 'max_round': 10}
def test_resolve_no_config(self):
"""Resolve runner config when not found."""
pipeline_config = {}
config = ConfigMigration.resolve_runner_config(
pipeline_config,
'plugin:langbot/local-agent/default',
)
assert config == {}
def test_resolve_priority_new_over_old(self):
"""New format config takes priority."""
pipeline_config = {
'ai': {
'runner_config': {
'plugin:langbot/local-agent/default': {
'model': 'new-uuid',
},
},
'local-agent': {
'model': 'old-uuid',
},
},
}
config = ConfigMigration.resolve_runner_config(
pipeline_config,
'plugin:langbot/local-agent/default',
)
assert config == {'model': 'new-uuid'}
class TestGetExpireTime:
"""Tests for ConfigMigration.get_expire_time."""
def test_get_expire_time_zero(self):
"""Get expire time when zero."""
pipeline_config = {
'ai': {
'runner': {
'expire-time': 0,
},
},
}
expire_time = ConfigMigration.get_expire_time(pipeline_config)
assert expire_time == 0
def test_get_expire_time_positive(self):
"""Get expire time when positive."""
pipeline_config = {
'ai': {
'runner': {
'expire-time': 3600,
},
},
}
expire_time = ConfigMigration.get_expire_time(pipeline_config)
assert expire_time == 3600
def test_get_expire_time_default(self):
"""Get expire time when not configured."""
pipeline_config = {}
expire_time = ConfigMigration.get_expire_time(pipeline_config)
assert expire_time == 0
class TestGetOldRunnerName:
"""Tests for ConfigMigration.get_old_runner_name."""
def test_get_old_runner_name_mapped(self):
"""Get old runner name for mapped runner ID."""
old_name = ConfigMigration.get_old_runner_name('plugin:langbot/local-agent/default')
assert old_name == 'local-agent'
def test_get_old_runner_name_not_mapped(self):
"""Get old runner name for unmapped runner ID."""
old_name = ConfigMigration.get_old_runner_name('plugin:alice/my-agent/custom')
assert old_name is None

View File

@@ -0,0 +1,137 @@
"""Tests for agent runner ID parsing and formatting."""
from __future__ import annotations
import pytest
from langbot.pkg.agent.runner.id import (
parse_runner_id,
format_runner_id,
RunnerIdParts,
is_plugin_runner_id,
)
class TestRunnerIdParsing:
"""Tests for parse_runner_id."""
def test_parse_plugin_runner_id(self):
"""Parse valid plugin runner ID."""
runner_id = 'plugin:langbot/local-agent/default'
parts = parse_runner_id(runner_id)
assert parts.source == 'plugin'
assert parts.plugin_author == 'langbot'
assert parts.plugin_name == 'local-agent'
assert parts.runner_name == 'default'
def test_parse_plugin_runner_id_complex_names(self):
"""Parse plugin runner ID with complex names."""
runner_id = 'plugin:alice/helpdesk-agent/ticket-handler'
parts = parse_runner_id(runner_id)
assert parts.source == 'plugin'
assert parts.plugin_author == 'alice'
assert parts.plugin_name == 'helpdesk-agent'
assert parts.runner_name == 'ticket-handler'
def test_parse_invalid_plugin_runner_id_missing_parts(self):
"""Parse invalid plugin runner ID with missing parts."""
runner_id = 'plugin:langbot/local-agent'
with pytest.raises(ValueError) as exc_info:
parse_runner_id(runner_id)
assert 'Invalid plugin runner ID format' in str(exc_info.value)
def test_parse_invalid_plugin_runner_id_empty_parts(self):
"""Parse invalid plugin runner ID with empty parts."""
runner_id = 'plugin://default'
with pytest.raises(ValueError) as exc_info:
parse_runner_id(runner_id)
assert 'non-empty' in str(exc_info.value)
def test_parse_invalid_runner_id_not_plugin(self):
"""Parse invalid runner ID without plugin prefix."""
runner_id = 'local-agent'
with pytest.raises(ValueError) as exc_info:
parse_runner_id(runner_id)
assert 'Invalid runner ID format' in str(exc_info.value)
def test_parse_invalid_runner_id_empty_string(self):
"""Parse empty runner ID."""
runner_id = ''
with pytest.raises(ValueError):
parse_runner_id(runner_id)
class TestRunnerIdFormatting:
"""Tests for format_runner_id."""
def test_format_plugin_runner_id(self):
"""Format plugin runner ID."""
runner_id = format_runner_id(
source='plugin',
plugin_author='langbot',
plugin_name='local-agent',
runner_name='default',
)
assert runner_id == 'plugin:langbot/local-agent/default'
def test_format_invalid_source(self):
"""Format runner ID with invalid source."""
with pytest.raises(ValueError) as exc_info:
format_runner_id(
source='builtin',
plugin_author='langbot',
plugin_name='local-agent',
runner_name='default',
)
assert 'Invalid runner source' in str(exc_info.value)
class TestRunnerIdParts:
"""Tests for RunnerIdParts dataclass."""
def test_get_plugin_id(self):
"""Get plugin ID from parts."""
parts = RunnerIdParts(
source='plugin',
plugin_author='langbot',
plugin_name='local-agent',
runner_name='default',
)
assert parts.to_plugin_id() == 'langbot/local-agent'
def test_frozen_dataclass(self):
"""RunnerIdParts should be immutable."""
parts = RunnerIdParts(
source='plugin',
plugin_author='langbot',
plugin_name='local-agent',
runner_name='default',
)
with pytest.raises(Exception): # FrozenInstanceError
parts.plugin_author = 'other'
class TestIsPluginRunnerId:
"""Tests for is_plugin_runner_id."""
def test_is_plugin_runner_id_true(self):
"""Check plugin runner ID returns True."""
assert is_plugin_runner_id('plugin:langbot/local-agent/default') is True
def test_is_plugin_runner_id_false(self):
"""Check non-plugin runner ID returns False."""
assert is_plugin_runner_id('local-agent') is False
assert is_plugin_runner_id('builtin:local-agent') is False
assert is_plugin_runner_id('') is False

View File

@@ -0,0 +1,278 @@
"""Tests for agent runner registry."""
from __future__ import annotations
import pytest
from langbot.pkg.agent.runner.registry import AgentRunnerRegistry
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.agent.runner.errors import RunnerNotFoundError, RunnerNotAuthorizedError
class FakeApplication:
"""Fake Application for testing."""
def __init__(self):
class FakeLogger:
def info(self, msg):
pass
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
self.logger = FakeLogger()
class FakePluginConnector:
is_enable_plugin = True
async def list_agent_runners(self, bound_plugins=None):
# Return sample runner data
return [
{
'plugin_author': 'langbot',
'plugin_name': 'local-agent',
'runner_name': 'default',
'manifest': {
'kind': 'AgentRunner',
'metadata': {
'name': 'default',
'label': {'en_US': 'Local Agent'},
},
'spec': {
'protocol_version': '1',
'config': [],
'capabilities': {'streaming': True},
'permissions': {},
},
},
},
{
'plugin_author': 'alice',
'plugin_name': 'my-agent',
'runner_name': 'custom',
'manifest': {
'kind': 'AgentRunner',
'metadata': {
'name': 'custom',
'label': {'en_US': 'Custom Agent'},
},
'spec': {
'protocol_version': '1',
'config': [{'name': 'param1', 'type': 'string'}],
'capabilities': {},
'permissions': {},
},
},
},
# Invalid runner - wrong kind
{
'plugin_author': 'bad',
'plugin_name': 'wrong-kind',
'runner_name': 'default',
'manifest': {
'kind': 'Tool', # Wrong kind
'metadata': {},
'spec': {},
},
},
# Invalid runner - missing name
{
'plugin_author': 'bad',
'plugin_name': 'missing-name',
'runner_name': 'default',
'manifest': {
'kind': 'AgentRunner',
'metadata': {}, # No name
'spec': {},
},
},
]
self.plugin_connector = FakePluginConnector()
class TestRegistryDiscovery:
"""Tests for runner discovery."""
@pytest.mark.asyncio
async def test_discover_valid_runners(self):
"""Discover valid runners from plugin runtime."""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
runners = await registry.list_runners(use_cache=False)
# Should find 2 valid runners (langbot/local-agent and alice/my-agent)
assert len(runners) == 2
ids = [r.id for r in runners]
assert 'plugin:langbot/local-agent/default' in ids
assert 'plugin:alice/my-agent/custom' in ids
@pytest.mark.asyncio
async def test_discover_caches_results(self):
"""Discovery should cache results."""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
# First discovery
runners1 = await registry.list_runners(use_cache=True)
# Second call should use cache
runners2 = await registry.list_runners(use_cache=True)
assert registry._cache is not None
assert len(runners1) == len(runners2)
@pytest.mark.asyncio
async def test_discover_handles_plugin_disabled(self):
"""Discovery returns empty when plugin system disabled."""
ap = FakeApplication()
ap.plugin_connector.is_enable_plugin = False
registry = AgentRunnerRegistry(ap)
runners = await registry.list_runners(use_cache=False)
assert runners == []
@pytest.mark.asyncio
async def test_cache_not_polluted_by_bound_plugins(self):
"""Cache should contain ALL runners, not filtered by bound_plugins.
Regression test: get(bound_plugins=["a/b"]) should not pollute cache,
so subsequent list_runners(bound_plugins=None) should return all runners.
"""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
# First: get with bound_plugins filter (should not pollute cache)
descriptor = await registry.get(
'plugin:langbot/local-agent/default',
bound_plugins=['langbot/local-agent'],
)
assert descriptor.id == 'plugin:langbot/local-agent/default'
# Cache should contain ALL runners (both langbot and alice)
assert registry._cache is not None
assert len(registry._cache) == 2 # Both runners in cache
assert 'plugin:langbot/local-agent/default' in registry._cache
assert 'plugin:alice/my-agent/custom' in registry._cache
# Second: list_runners without filter should return ALL runners
all_runners = await registry.list_runners(bound_plugins=None, use_cache=True)
assert len(all_runners) == 2 # Both runners returned
# Third: list_runners with different filter should work correctly
alice_runners = await registry.list_runners(bound_plugins=['alice/my-agent'], use_cache=True)
assert len(alice_runners) == 1
assert alice_runners[0].id == 'plugin:alice/my-agent/custom'
class TestRegistryGet:
"""Tests for getting specific runner."""
@pytest.mark.asyncio
async def test_get_existing_runner(self):
"""Get existing runner by ID."""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
descriptor = await registry.get('plugin:langbot/local-agent/default')
assert descriptor.id == 'plugin:langbot/local-agent/default'
assert descriptor.plugin_author == 'langbot'
assert descriptor.plugin_name == 'local-agent'
assert descriptor.runner_name == 'default'
@pytest.mark.asyncio
async def test_get_nonexistent_runner(self):
"""Get nonexistent runner raises RunnerNotFoundError."""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
with pytest.raises(RunnerNotFoundError) as exc_info:
await registry.get('plugin:notexist/unknown/default')
assert exc_info.value.runner_id == 'plugin:notexist/unknown/default'
@pytest.mark.asyncio
async def test_get_runner_with_bound_plugins_filter(self):
"""Get runner with bound plugins authorization."""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
# Authorized - langbot plugin in bound list
descriptor = await registry.get(
'plugin:langbot/local-agent/default',
bound_plugins=['langbot/local-agent'],
)
assert descriptor is not None
# Not authorized - plugin not in bound list
with pytest.raises(RunnerNotAuthorizedError):
await registry.get(
'plugin:alice/my-agent/custom',
bound_plugins=['langbot/local-agent'],
)
class TestRegistryMetadataForPipeline:
"""Tests for get_runner_metadata_for_pipeline."""
@pytest.mark.asyncio
async def test_get_metadata_options_and_stages(self):
"""Get metadata options and stages for pipeline UI."""
ap = FakeApplication()
registry = AgentRunnerRegistry(ap)
options, stages = await registry.get_runner_metadata_for_pipeline()
# Should have options for each runner
assert len(options) == 2
option_ids = [o['name'] for o in options]
assert 'plugin:langbot/local-agent/default' in option_ids
assert 'plugin:alice/my-agent/custom' in option_ids
# Should have stages for runners with config
# Note: stages may be empty if config_schema is empty list
# In real scenarios, runners with config_schema will generate stages
# Only runners with non-empty config_schema generate stages
# mock data has config: [{'name': 'param1', 'type': 'string'}] for alice/my-agent
# but config is now taken from runner_data.get('config', [])
assert len(stages) >= 0 # Can be 0 if all runners have empty config
class TestDescriptorValidation:
"""Tests for descriptor validation."""
def test_validate_runner_descriptor(self):
"""Validate correctly built descriptor."""
descriptor = AgentRunnerDescriptor(
id='plugin:test/my-runner/default',
source='plugin',
label={'en_US': 'Test Runner'},
plugin_author='test',
plugin_name='my-runner',
runner_name='default',
)
assert descriptor.id == 'plugin:test/my-runner/default'
assert descriptor.get_plugin_id() == 'test/my-runner'
assert descriptor.protocol_version == '1'
def test_descriptor_capabilities(self):
"""Descriptor capability helper methods."""
descriptor = AgentRunnerDescriptor(
id='plugin:test/my-runner/default',
source='plugin',
label={'en_US': 'Test Runner'},
plugin_author='test',
plugin_name='my-runner',
runner_name='default',
capabilities={'streaming': True, 'tool_calling': False},
)
assert descriptor.supports_streaming() is True
assert descriptor.supports_tool_calling() is False
assert descriptor.supports_knowledge_retrieval() is False

View File

@@ -0,0 +1,343 @@
"""Tests for agent runner result normalizer."""
from __future__ import annotations
import pytest
from langbot.pkg.agent.runner.result_normalizer import AgentResultNormalizer
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.agent.runner.errors import RunnerExecutionError, RunnerProtocolError
from langbot_plugin.api.entities.builtin.provider import message as provider_message
class FakeApplication:
"""Fake Application for testing."""
def __init__(self):
class FakeLogger:
def info(self, msg):
pass
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
pass
self.logger = FakeLogger()
def make_descriptor():
"""Create a test descriptor."""
return AgentRunnerDescriptor(
id='plugin:langbot/local-agent/default',
source='plugin',
label={'en_US': 'Local Agent', 'zh_Hans': '内置 Agent'},
plugin_author='langbot',
plugin_name='local-agent',
runner_name='default',
protocol_version='1',
capabilities={'streaming': True},
)
class TestNormalizeMessageDelta:
"""Tests for normalizing message.delta results."""
@pytest.mark.asyncio
async def test_normalize_message_delta_text(self):
"""Normalize message.delta with text chunk."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'message.delta',
'data': {
'chunk': {
'role': 'assistant',
'content': 'Hello',
},
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is not None
assert isinstance(result, provider_message.MessageChunk)
assert result.role == 'assistant'
assert result.content == 'Hello'
@pytest.mark.asyncio
async def test_normalize_message_delta_missing_chunk(self):
"""Normalize message.delta without chunk data."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'message.delta',
'data': {},
}
with pytest.raises(RunnerProtocolError) as exc_info:
await normalizer.normalize(result_dict, descriptor)
assert 'missing chunk data' in str(exc_info.value)
class TestNormalizeMessageCompleted:
"""Tests for normalizing message.completed results."""
@pytest.mark.asyncio
async def test_normalize_message_completed(self):
"""Normalize message.completed with full message."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'message.completed',
'data': {
'message': {
'role': 'assistant',
'content': 'Complete response',
},
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is not None
assert isinstance(result, provider_message.Message)
assert result.role == 'assistant'
assert result.content == 'Complete response'
@pytest.mark.asyncio
async def test_normalize_message_completed_missing_message(self):
"""Normalize message.completed without message data."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'message.completed',
'data': {},
}
with pytest.raises(RunnerProtocolError) as exc_info:
await normalizer.normalize(result_dict, descriptor)
assert 'missing message data' in str(exc_info.value)
class TestNormalizeRunCompleted:
"""Tests for normalizing run.completed results."""
@pytest.mark.asyncio
async def test_normalize_run_completed_with_message(self):
"""Normalize run.completed with final message."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'run.completed',
'data': {
'message': {
'role': 'assistant',
'content': 'Final response',
},
'finish_reason': 'stop',
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is not None
assert isinstance(result, provider_message.Message)
@pytest.mark.asyncio
async def test_normalize_run_completed_without_message(self):
"""Normalize run.completed without message."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'run.completed',
'data': {
'finish_reason': 'stop',
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
class TestNormalizeRunFailed:
"""Tests for normalizing run.failed results."""
@pytest.mark.asyncio
async def test_normalize_run_failed(self):
"""Normalize run.failed raises RunnerExecutionError."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'run.failed',
'data': {
'error': 'Upstream timeout',
'code': 'upstream.timeout',
'retryable': True,
},
}
with pytest.raises(RunnerExecutionError) as exc_info:
await normalizer.normalize(result_dict, descriptor)
assert exc_info.value.runner_id == 'plugin:langbot/local-agent/default'
assert exc_info.value.retryable is True
assert 'timeout' in str(exc_info.value)
class TestNormalizeNonMessageResults:
"""Tests for normalizing non-message results."""
@pytest.mark.asyncio
async def test_normalize_tool_call_started(self):
"""Normalize tool.call.started returns None."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'tool.call.started',
'data': {
'tool_call_id': 'call_1',
'tool_name': 'weather',
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
@pytest.mark.asyncio
async def test_normalize_tool_call_completed(self):
"""Normalize tool.call.completed returns None."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'tool.call.completed',
'data': {
'tool_call_id': 'call_1',
'tool_name': 'weather',
'result': {'temp': 20},
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
@pytest.mark.asyncio
async def test_normalize_state_updated(self):
"""Normalize state.updated returns None."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'state.updated',
'data': {
'key': 'external_conversation_id',
'value': 'abc123',
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
@pytest.mark.asyncio
async def test_normalize_action_requested(self):
"""Normalize action.requested returns None (EBA reserved)."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'action.requested',
'data': {
'action': 'platform.message.edit',
'parameters': {},
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
class TestNormalizeInvalidResults:
"""Tests for handling invalid results."""
@pytest.mark.asyncio
async def test_normalize_missing_type(self):
"""Normalize result without type."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'data': {},
}
with pytest.raises(RunnerProtocolError) as exc_info:
await normalizer.normalize(result_dict, descriptor)
assert 'Missing result type' in str(exc_info.value)
@pytest.mark.asyncio
async def test_normalize_unknown_type(self):
"""Normalize unknown type returns None."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
result_dict = {
'type': 'unknown_type',
'data': {},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
@pytest.mark.asyncio
async def test_normalize_legacy_type_returns_none(self):
"""Legacy types (chunk, text, finish) are now treated as unknown."""
normalizer = AgentResultNormalizer(FakeApplication())
descriptor = make_descriptor()
# chunk is now unknown
result_dict = {
'type': 'chunk',
'data': {
'message_chunk': {
'role': 'assistant',
'content': 'Legacy chunk',
},
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
# text is now unknown
result_dict = {
'type': 'text',
'data': {
'content': 'Legacy text',
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None
# finish is now unknown
result_dict = {
'type': 'finish',
'data': {
'message': {
'role': 'assistant',
'content': 'Legacy finish',
},
},
}
result = await normalizer.normalize(result_dict, descriptor)
assert result is None

32
uv.lock generated
View File

@@ -1973,7 +1973,7 @@ requires-dist = [
{ name = "ebooklib", specifier = ">=0.18" },
{ name = "gewechat-client", specifier = ">=0.1.5" },
{ name = "html2text", specifier = ">=2024.2.26" },
{ name = "langbot-plugin", specifier = "==0.3.11" },
{ name = "langbot-plugin", editable = "../langbot-plugin-sdk" },
{ name = "langchain", specifier = ">=0.2.0" },
{ name = "langchain-core", specifier = ">=1.2.28" },
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
@@ -2036,8 +2036,8 @@ dev = [
[[package]]
name = "langbot-plugin"
version = "0.3.11"
source = { registry = "https://pypi.org/simple" }
version = "0.3.10"
source = { editable = "../langbot-plugin-sdk" }
dependencies = [
{ name = "aiofiles" },
{ name = "dotenv" },
@@ -2054,9 +2054,29 @@ dependencies = [
{ name = "watchdog" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/91/83/93b86bcdbfe51d820fa59232aaa73cc802d6ce614f67d8f8b33957419538/langbot_plugin-0.3.11.tar.gz", hash = "sha256:8d10c98c771b468b2d35cc007778439c39922a88265fcc16a5881234bc7c1b19", size = 190315, upload-time = "2026-05-12T15:45:24.262Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8f/22/de7977a6a5cbf557b80043eb3ed39e5feff24033a5d6db4ab88d48ccb6ea/langbot_plugin-0.3.11-py3-none-any.whl", hash = "sha256:c1d2e84eda1584902d99efa316b850c08c1c04fcc199306ff4af1dca1431304a", size = 165574, upload-time = "2026-05-12T15:45:22.908Z" },
[package.metadata]
requires-dist = [
{ name = "aiofiles", specifier = ">=24.1.0" },
{ name = "dotenv", specifier = ">=0.9.9" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "jinja2", specifier = ">=3.1.6" },
{ name = "pip", specifier = ">=25.2" },
{ name = "pydantic", specifier = ">=2.11.5" },
{ name = "pydantic-settings", specifier = ">=2.10.1" },
{ name = "pytest", specifier = ">=8.4.0" },
{ name = "pyyaml", specifier = ">=6.0.2" },
{ name = "textual", specifier = ">=3.2.0" },
{ name = "types-aiofiles", specifier = ">=24.1.0.20250516" },
{ name = "types-pyyaml", specifier = ">=6.0.12.20250516" },
{ name = "watchdog", specifier = ">=6.0.0" },
{ name = "websockets", specifier = ">=15.0.1" },
]
[package.metadata.requires-dev]
dev = [
{ name = "mypy", specifier = ">=1.16.0" },
{ name = "ruff", specifier = ">=0.11.12" },
]
[[package]]