diff --git a/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md new file mode 100644 index 00000000..ab345921 --- /dev/null +++ b/docs/agent-runner-pluginization/IMPLEMENTATION_PLAN.md @@ -0,0 +1,370 @@ +# Agent Runner 插件化最终实现计划 + +本文档面向实现 agent,用来把当前 PoC 分支直接推进到最终架构。这个分支不按线上渐进发布节奏处理,因此可以接受一次性破坏内部 runner 实现和 Pipeline AI 配置结构;但最终必须提供历史配置迁移。 + +## 1. 最终状态 + +LangBot 最终只保留 Agent Runner 的宿主能力: + +- 发现 runner:`AgentRunnerRegistry` +- 选择 runner:Pipeline 配置和未来事件绑定配置 +- 构造上下文:`AgentRunContext` +- 裁剪资源:模型、工具、知识库、文件、存储、平台能力 +- 调度执行:`AgentRunOrchestrator` +- 归一结果:`AgentRunResult` -> 当前 Pipeline 的 `Message` / `MessageChunk` +- 隔离错误:插件异常、协议错误、超时、结果过大不能破坏主流程 +- 迁移旧配置:把旧内置 runner 配置迁到官方 AgentRunner 插件配置 + +LangBot 不再长期维护内置业务 runner 分支。`local-agent`、Dify、n8n、Coze、DashScope、Langflow、Tbox 等都迁到官方 AgentRunner 插件。 + +## 2. 高层架构 + +```text +Pipeline MessageProcessor / future EventRouter + | + v +AgentRunOrchestrator + | + +--> AgentRunnerRegistry + | +--> plugin runtime LIST_AGENT_RUNNERS + | +--> descriptor cache / validation + | + +--> AgentRunContextBuilder + +--> AgentResourceBuilder + +--> AgentResultNormalizer + | + v +PluginRuntimeConnector.run_agent() + | + v +SDK Runtime RUN_AGENT -> plugin AgentRunner.run() +``` + +关键约束: + +- `ChatMessageHandler` 不解析 `plugin:*`,不实例化 wrapper,不知道 runner 组件细节。 +- `PipelineService.get_pipeline_metadata()` 不直接访问插件 runtime,而是读取 registry。 +- 旧 `RequestRunner` 只作为迁移参考,不作为最终运行路径。 +- EBA 只做字段预留,不在本轮实现 EventBus、EventRouter、平台动作执行。 + +## 3. 新增 LangBot 模块 + +建议新增: + +```text +src/langbot/pkg/agent/ + __init__.py + runner/ + __init__.py + descriptor.py + errors.py + id.py + registry.py + context_builder.py + resource_builder.py + orchestrator.py + result_normalizer.py + config_migration.py +``` + +### 3.1 descriptor.py + +定义 LangBot 内部使用的 descriptor: + +```python +class AgentRunnerDescriptor(BaseModel): + id: str + source: Literal["plugin"] + label: dict[str, str] + description: dict[str, str] | None = None + plugin_author: str + plugin_name: str + runner_name: str + plugin_version: str | None = None + protocol_version: str = "1" + config_schema: list[dict[str, Any]] = [] + capabilities: dict[str, bool] = {} + permissions: dict[str, list[str]] = {} + raw_manifest: dict[str, Any] = {} +``` + +`source == "builtin"` 不作为最终目标。如果实现阶段需要临时 adapter,必须标记为测试过渡代码,并在官方插件跑通后删除。 + +### 3.2 id.py + +统一 runner id 解析和生成: + +- 插件 runner id:`plugin:{author}/{plugin_name}/{runner_name}` +- `parse_runner_id(id)` 返回结构化对象 +- 禁止业务代码手写字符串 split +- PoC 已存在的 `plugin:author/name/runner` 继续作为合法 id + +### 3.3 registry.py + +职责: + +- 调用 `ap.plugin_connector.list_agent_runners(bound_plugins=None)` 拉取插件 runner +- 校验 manifest: + - `kind == AgentRunner` + - `metadata.name` 存在 + - `metadata.label` 存在 + - `spec.protocol_version` 兼容,默认 `1` + - `spec.config` 是 list,默认空 + - `spec.capabilities` 是 dict,默认空 + - `spec.permissions` 是 dict,默认空 +- 输出 `AgentRunnerDescriptor` +- 缓存 discovery 结果,提供 `refresh()` +- 单个插件 manifest 失败只记录 warning,不影响其它 runner + +刷新触发点: + +- 插件安装、卸载、升级、重启后 +- Pipeline metadata 请求时发现缓存为空 +- 可选 TTL,优先保证正确性 + +### 3.4 context_builder.py + +把当前 Pipeline query 直接转换成 SDK v1 `AgentRunContext`。 + +当前消息 Pipeline 的最小字段: + +- `run_id`: 新 UUID,不使用 query id 作为全局 run id +- `trigger.type`: `message.received` +- `conversation`: launcher、sender、bot、pipeline、历史消息 +- `event`: message event envelope 子集 +- `actor`: sender +- `subject`: 当前消息或 launcher +- `messages`: `query.messages` +- `input`: 从 `query.user_message` 和 `query.message_chain` 构造 +- `resources`: 由 `resource_builder` 注入 +- `runtime`: host/version/workspace/bot/pipeline/query/trace/deadline +- `config`: 当前 runner id 对应的实例配置 + +保留 SDK legacy helper 是 SDK 的责任,LangBot 不再构造 PoC 的 `query_id/session/messages/user_message/extra_config` 上下文。 + +### 3.5 resource_builder.py + +执行前做三层裁剪: + +1. runner manifest 声明的 `spec.permissions` +2. Pipeline 的 `extensions_preferences` +3. runner 实例配置中选择的资源范围 + +输出写入 `ctx.resources`,至少覆盖: + +- models:可调用模型 UUID、类型、能力摘要 +- tools:可见工具 manifest,使用当前 bound plugins / MCP server 范围 +- knowledge_bases:可检索知识库列表 +- storage:plugin storage / workspace storage 权限摘要 +- files:允许读取的配置文件、知识文件摘要 +- platform_capabilities:本阶段只声明,不执行平台动作 + +注意:旧的 unrestricted proxy action 必须在 Phase 2 被二次校验,不能只靠 context 声明。 + +### 3.6 result_normalizer.py + +只接受 SDK v1 result: + +- `message.delta` +- `message.completed` +- `tool.call.started` +- `tool.call.completed` +- `state.updated` +- `run.completed` +- `run.failed` +- `action.requested` 允许实验性返回,但本阶段只记录 telemetry,不执行 + +映射: + +- `message.delta.data.chunk` -> `provider_message.MessageChunk` +- `message.completed.data.message` -> `provider_message.Message` +- `run.completed.data.message` -> `provider_message.Message` +- `run.failed` -> 抛出受控异常,让 `ChatMessageHandler` 使用现有错误策略 +- 工具和状态事件默认不 yield 到 Pipeline,只记录 debug/telemetry + +防护: + +- 未知 type warning 后忽略 +- 单 result 序列化大小限制 +- provider message schema 校验失败转 `run.failed` +- 插件没有输出任何消息时,按 runner failed 处理 + +### 3.7 orchestrator.py + +核心入口: + +```python +async def run_from_query(query: pipeline_query.Query) -> AsyncGenerator[Message | MessageChunk, None]: + runner_id = resolve_runner_id(query.pipeline_config) + descriptor = await registry.get(runner_id, bound_plugins=query.variables.get("_pipeline_bound_plugins")) + ctx = await context_builder.from_query(query, descriptor) + async for raw in plugin_connector.run_agent(...): + async for message in result_normalizer.normalize(raw): + yield message +``` + +必须覆盖: + +- runner id 不存在 +- 插件系统关闭 +- runner 不在 bound plugins 范围内 +- 插件 runtime 断连 +- runner 协议版本不兼容 +- run 超时 +- task cancellation + +## 4. 配置模型直接切换 + +目标格式: + +```json +{ + "ai": { + "runner": { + "id": "plugin:langbot/local-agent/default", + "expire-time": 0 + }, + "runner_config": { + "plugin:langbot/local-agent/default": {} + } + } +} +``` + +兼容读取: + +- 优先读 `ai.runner.id` +- 没有 `id` 时读旧 `ai.runner.runner` +- 旧内置 runner 名通过迁移表映射: + - `local-agent` -> `plugin:langbot/local-agent/default` + - `dify-service-api` -> `plugin:langbot/dify-agent/default` + - `n8n-service-api` -> `plugin:langbot/n8n-agent/default` + - `coze-api` -> `plugin:langbot/coze-agent/default` + - `dashscope-app-api` -> `plugin:langbot/dashscope-agent/default` + - `langflow-api` -> `plugin:langbot/langflow-agent/default` + - `tbox-app-api` -> `plugin:langbot/tbox-agent/default` + +写入策略: + +- 新 UI 只写 `ai.runner.id` 和 `ai.runner_config` +- 后端 update 接口接受旧字段,但保存时归一成新格式 +- migration 最后统一落库 + +## 5. 需要修改的 LangBot 范围 + +必须修改: + +- `src/langbot/pkg/core/app.py` + - 增加 `agent_runner_registry` / `agent_run_orchestrator` 属性 +- `src/langbot/pkg/core/stages/build_app.py` + - 初始化 Agent 子系统 +- `src/langbot/pkg/pipeline/process/handlers/chat.py` + - 删除 `PluginAgentRunnerWrapper` + - 删除内置 runner 查找逻辑 + - 调用 orchestrator +- `src/langbot/pkg/api/http/service/pipeline.py` + - metadata 从 registry 生成 +- `src/langbot/pkg/plugin/connector.py` + - `list_agent_runners()` / `run_agent()` 增加协议校验和 bound plugin 参数 +- `src/langbot/pkg/plugin/handler.py` + - proxy action 二次权限校验 +- `src/langbot/pkg/pipeline/preproc/preproc.py` + - 不再只为 `local-agent` 构造工具、知识库、模型 + - 对所有 agent runner 保留 multimodal input +- `src/langbot/pkg/pipeline/pipelinemgr.py` + - runner name 监控改读 `runner.id` +- `src/langbot/templates/metadata/pipeline/ai.yaml` + - runner 字段从 `runner` 迁到 `id` +- `src/langbot/templates/default-pipeline-config.json` + - 默认 runner 改为官方 local-agent 插件 id +- `web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx` + - 当前 runner 改读 `ai.runner.id` + - runner 配置区改写入 `ai.runner_config[id]` + +最终删除或停用: + +- `src/langbot/pkg/provider/runner.py` 的业务注册路径 +- `src/langbot/pkg/provider/runners/*` 的运行入口 + +可以暂时保留文件作为官方插件迁移参考,但不应被运行时引用。 + +## 6. 实现顺序 + +### Step 1:接入新版 SDK + +- 更新 LangBot 依赖到包含 SDK v1 AgentRunner 协议的版本 +- 删除 LangBot 中对旧 `AgentRunReturn` 类型名的依赖 +- 确认 `langbot_plugin` 的本地 editable / lockfile 指向正确 SDK + +### Step 2:Agent 子系统骨架 + +- 新增 descriptor/id/errors +- 新增 registry,先只 list plugin runner +- 为 registry 加单测,使用 fake connector + +### Step 3:Pipeline metadata 切 registry + +- `get_pipeline_metadata()` 只通过 registry 输出 runner option +- 插件 runner config stage 从 descriptor.config_schema 生成 +- schema 错误不影响 metadata 返回 + +### Step 4:Orchestrator 替换 ChatMessageHandler + +- 新增 context builder / result normalizer / orchestrator +- `chat.py` 删除 wrapper 和 runner 查找 +- 维持现有流式卡片和 resp_messages 行为 + +### Step 5:新配置读写 + +- 后端 resolve runner id 支持新旧配置 +- 前端表单改 `runner.id` + `runner_config` +- 默认配置改官方 local-agent 插件 id + +### Step 6:权限和资源裁剪 + +- resource builder 根据 manifest / pipeline / instance config 裁剪 +- proxy action 校验 resource scope +- 禁止插件用 unrestricted API 访问未授权知识库、工具、模型 + +### Step 7:删除内置 runner 运行分支 + +- 官方插件 ready 后移除内置 runner registry +- 删除或隔离 provider runners 的运行引用 +- 测试旧 runner 名只能通过 migration 映射到插件 id + +### Step 8:历史配置迁移 + +- 写 persistence migration +- 更新 default pipeline config +- 对已存在 Pipeline 执行旧字段到新字段迁移 +- 对监控/日志里的 runner 字段改用新 id + +## 7. 测试要求 + +单测: + +- runner id parse / format +- registry manifest 校验、失败隔离、bound plugins 过滤 +- context builder 从 query 生成完整 v1 context +- resource builder 三层裁剪 +- result normalizer 对每种 result type 的映射 +- 旧配置 resolve 和 migration + +集成测试: + +- fake AgentRunner 插件可被 Pipeline 选择 +- streaming 输出仍能更新 message card +- 插件异常返回用户可理解错误,不中断 runtime +- runner 不在 bound plugins 时不可执行 +- 未授权工具 / 知识库 / 模型 proxy 调用被拒绝 +- 旧 `local-agent` Pipeline 配置迁到官方插件 id + +## 8. 验收标准 + +- LangBot Pipeline 可以选择插件 AgentRunner 并完成非流式和流式回复。 +- `ChatMessageHandler` 不包含插件 runner 解析和 wrapper。 +- `PipelineService` 不直接拼插件 runner metadata。 +- 所有 runner 配置使用 `ai.runner.id` + `ai.runner_config`。 +- 旧内置 runner 不再作为 LangBot 内部运行分支执行。 +- 插件只能访问 `ctx.resources` 授权的模型、工具、知识库和文件。 +- EBA 相关字段只作为 context/result 预留,不执行平台动作。 diff --git a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md new file mode 100644 index 00000000..18dc1dca --- /dev/null +++ b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md @@ -0,0 +1,192 @@ +# 官方 AgentRunner 插件仓库计划 + +本文档描述内置 `RequestRunner` 迁出 LangBot 后,官方 runner 插件仓库应如何组织。建议新建仓库: + +```text +/home/glwuy/langbot-app/langbot-official-agent-runners +``` + +远端仓库名建议:`langbot-official-agent-runners`。 + +## 1. 为什么新仓库 + +官方 runner 插件会和 LangBot 主仓库、SDK 仓库以不同节奏迭代: + +- LangBot 主仓库只维护宿主协议和调度。 +- SDK 仓库维护 AgentRunner 组件和 runtime 协议。 +- 官方 runner 插件承载业务 runner 的具体实现和第三方平台适配。 + +不要把官方 runner 插件继续留在 LangBot 主仓库,否则容易重新形成“宿主和业务 runner 绑死”的结构。 + +## 2. 仓库结构 + +建议采用 monorepo: + +```text +langbot-official-agent-runners/ + README.md + pyproject.toml + packages/ + local-agent/ + manifest.yaml + components/default.yaml + main.py + src/ + tests/ + dify-agent/ + n8n-agent/ + coze-agent/ + dashscope-agent/ + langflow-agent/ + tbox-agent/ + shared/ + langbot_agent_runner_utils/ + __init__.py + context.py + config.py + streaming.py + tool_calling.py + errors.py + tests/ + fixtures/ + integration/ +``` + +先用一个仓库统一迁移,避免每个 runner 复制 SDK helper、测试夹具、发布脚本。 + +## 3. 插件命名和 runner id + +固定映射: + +| 旧 runner | 官方插件 | runner id | +| --- | --- | --- | +| `local-agent` | `langbot/local-agent` | `plugin:langbot/local-agent/default` | +| `dify-service-api` | `langbot/dify-agent` | `plugin:langbot/dify-agent/default` | +| `n8n-service-api` | `langbot/n8n-agent` | `plugin:langbot/n8n-agent/default` | +| `coze-api` | `langbot/coze-agent` | `plugin:langbot/coze-agent/default` | +| `dashscope-app-api` | `langbot/dashscope-agent` | `plugin:langbot/dashscope-agent/default` | +| `langflow-api` | `langbot/langflow-agent` | `plugin:langbot/langflow-agent/default` | +| `tbox-app-api` | `langbot/tbox-agent` | `plugin:langbot/tbox-agent/default` | + +每个插件可以后续提供多个 runner,但迁移目标的默认 runner 统一叫 `default`。 + +## 4. 迁移优先级 + +### Batch 1:打通协议 + +1. `local-agent` +2. `dify-agent` + +原因: + +- `local-agent` 覆盖模型、工具、知识库、流式、会话历史,是能力最完整的基准。 +- `dify-agent` 代表外部 Agent 平台调用,配置和错误处理能验证传统 service API runner 的迁移方式。 + +### Batch 2:迁移外部 workflow runner + +1. `n8n-agent` +2. `langflow-agent` + +这批主要验证 webhook/workflow 输入输出、timeout、外部 conversation id。 + +### Batch 3:迁移平台 Agent API + +1. `coze-agent` +2. `dashscope-agent` +3. `tbox-agent` + +这批主要验证平台特有响应格式、引用资料、文件/图片输入。 + +## 5. 每个官方插件的组件要求 + +每个插件至少包含: + +```yaml +apiVersion: langbot/v1 +kind: AgentRunner +metadata: + name: default + label: + en_US: Dify Agent + zh_Hans: Dify Agent + description: + en_US: Run a Dify application as a LangBot AgentRunner. + zh_Hans: 将 Dify 应用作为 LangBot AgentRunner 运行。 +spec: + protocol_version: "1" + config: [] + capabilities: + streaming: true + tool_calling: false + knowledge_retrieval: false + multimodal_input: false + event_context: true + platform_api: false + interrupt: false + stateful_session: true + permissions: + models: [] + tools: [] + knowledge_bases: [] + storage: ["plugin"] + files: [] + platform_api: [] +execution: + python: + path: ./main.py + attr: DefaultAgentRunner +``` + +## 6. local-agent 插件要求 + +`local-agent` 是最关键的官方插件,应等价迁移当前: + +- model primary/fallback 选择 +- prompt +- max-round +- knowledge-bases +- rerank-model +- rerank-top-k +- function calling +- streaming +- multimodal input +- conversation history +- monitoring metadata + +与 LangBot 主仓库的责任边界: + +- LangBot 构造 `ctx.messages`、`ctx.input`、`ctx.resources` +- 插件负责选择模型、拼请求、调用 LLM、处理 tool call loop、输出 result stream +- 插件不能绕过 `ctx.resources` 调用未授权模型、工具或知识库 + +## 7. 外部 runner 插件要求 + +外部平台 runner 迁移时遵循: + +- 旧配置字段尽量保持同名,便于 migration 复制 +- 输出统一转换为 `AgentRunResult` +- 外部 API timeout 从 runner config 读取 +- 平台 conversation id 存 plugin storage 或 context runtime state,不能依赖 LangBot 内置 conversation uuid 私有结构 +- 流式支持按平台能力声明,没有流式就只发 `message.completed` + +## 8. 发布和安装策略 + +最终 LangBot 安装或升级时需要保证官方 runner 插件可用。可选方案: + +1. 首次启动检测缺失官方 runner 插件并提示安装。 +2. 打包发行版时预装官方 runner 插件。 +3. 在 migration 前检查对应插件是否存在,不存在则自动安装或阻止迁移。 + +建议实现顺序: + +- 开发阶段使用本地路径插件。 +- 发布前支持 marketplace 安装。 +- 历史配置 migration 只在官方插件可用时执行。 + +## 9. 验收标准 + +- 每个旧 runner 都有对应官方 AgentRunner 插件。 +- 旧 runner 配置能无损复制到新 `runner_config[id]`。 +- LangBot 主仓库不再通过 `RequestRunner` 执行业务 runner。 +- 官方插件测试覆盖非流式、流式、错误、timeout、配置缺失。 +- `local-agent` 插件能完成模型 fallback、tool calling、知识库检索。 diff --git a/docs/agent-runner-pluginization/PHASE0_INTEGRATION_RECORD.md b/docs/agent-runner-pluginization/PHASE0_INTEGRATION_RECORD.md new file mode 100644 index 00000000..981981c2 --- /dev/null +++ b/docs/agent-runner-pluginization/PHASE0_INTEGRATION_RECORD.md @@ -0,0 +1,63 @@ +# Agent Runner Pluginization Phase 0 Integration Test Record + +## Test Summary + +**Status**: PASSED + +**Date**: 2026-05-10 10:09 + +## Test Configuration + +- **LangBot Branch**: feat/agent-runner-plugin +- **SDK Branch**: feat/agent-runner-plugin +- **Runner Repo**: langbot-agent-runner (new) + +## Test Scenario + +- **Selected Runner**: `plugin:langbot/local-agent/default` +- **Input**: `1` +- **Expected Output**: `[stub] Echo: 1` +- **Actual Output**: `[stub] Echo: 1` + +## Verified Chain + +``` +Frontend selects plugin:langbot/local-agent/default + -> LangBot pipeline + -> AgentRunOrchestrator + -> SDK runtime RUN_AGENT + -> langbot-agent-runner/local-agent DefaultAgentRunner + -> AgentRunResult + -> LangBot response +``` + +## Key Components Verified + +### LangBot Host +- AgentRunOrchestrator resolves runner ID via ConfigMigration +- AgentRunContextBuilder builds SDK v1 context +- AgentResultNormalizer normalizes SDK v1 results +- ChatMessageHandler delegates to orchestrator (single resp_message_id, streaming pop/append) + +### SDK Runtime +- RUN_AGENT action dispatches to plugin runner +- AgentRunner component manifest parsing +- LIST_AGENT_RUNNERS returns runner metadata + +### langbot-agent-runner Plugin +- DefaultAgentRunner stub implementation +- AgentRunner manifest with protocol_version, capabilities, permissions +- Echo response validates SDK v1 result format + +## Next Steps (Phase 1) + +1. Implement real Dify runner (external API runner validation) +2. Update frontend to save `ai.runner.id` + `ai.runner_config` +3. Add persistence migration for old config format +4. Update pipeline templates +5. Add proxy action secondary permission validation + +## Related Documents + +- [IMPLEMENTATION_PLAN.md](./IMPLEMENTATION_PLAN.md) +- [OFFICIAL_RUNNER_PLUGINS.md](./OFFICIAL_RUNNER_PLUGINS.md) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8c5fe651..acf5bf70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,6 +105,9 @@ classifiers = [ "Topic :: Communications :: Chat", ] +[tool.uv.sources] +langbot-plugin = { path = "../langbot-plugin-sdk", editable = true } + [project.urls] Homepage = "https://langbot.app" Documentation = "https://docs.langbot.app" @@ -223,4 +226,3 @@ skip-magic-trailing-comma = false # Like Black, automatically detect the appropriate line ending. line-ending = "auto" - diff --git a/src/langbot/pkg/agent/__init__.py b/src/langbot/pkg/agent/__init__.py new file mode 100644 index 00000000..4da739d7 --- /dev/null +++ b/src/langbot/pkg/agent/__init__.py @@ -0,0 +1,37 @@ +"""Agent runner subsystem for LangBot.""" +from __future__ import annotations + +from .runner.descriptor import AgentRunnerDescriptor +from .runner.id import parse_runner_id, format_runner_id, RunnerIdParts, is_plugin_runner_id +from .runner.errors import ( + AgentRunnerError, + RunnerNotFoundError, + RunnerNotAuthorizedError, + RunnerProtocolError, + RunnerExecutionError, +) +from .runner.registry import AgentRunnerRegistry +from .runner.context_builder import AgentRunContextBuilder +from .runner.resource_builder import AgentResourceBuilder +from .runner.result_normalizer import AgentResultNormalizer +from .runner.orchestrator import AgentRunOrchestrator +from .runner.config_migration import ConfigMigration + +__all__ = [ + 'AgentRunnerDescriptor', + 'parse_runner_id', + 'format_runner_id', + 'is_plugin_runner_id', + 'RunnerIdParts', + 'AgentRunnerError', + 'RunnerNotFoundError', + 'RunnerNotAuthorizedError', + 'RunnerProtocolError', + 'RunnerExecutionError', + 'AgentRunnerRegistry', + 'AgentRunContextBuilder', + 'AgentResourceBuilder', + 'AgentResultNormalizer', + 'AgentRunOrchestrator', + 'ConfigMigration', +] \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/__init__.py b/src/langbot/pkg/agent/runner/__init__.py new file mode 100644 index 00000000..f3aabdae --- /dev/null +++ b/src/langbot/pkg/agent/runner/__init__.py @@ -0,0 +1,36 @@ +"""Agent runner modules.""" +from __future__ import annotations + +from .descriptor import AgentRunnerDescriptor +from .id import parse_runner_id, format_runner_id, RunnerIdParts +from .errors import ( + AgentRunnerError, + RunnerNotFoundError, + RunnerNotAuthorizedError, + RunnerProtocolError, + RunnerExecutionError, +) +from .registry import AgentRunnerRegistry +from .context_builder import AgentRunContextBuilder +from .resource_builder import AgentResourceBuilder +from .result_normalizer import AgentResultNormalizer +from .orchestrator import AgentRunOrchestrator +from .config_migration import ConfigMigration + +__all__ = [ + 'AgentRunnerDescriptor', + 'parse_runner_id', + 'format_runner_id', + 'RunnerIdParts', + 'AgentRunnerError', + 'RunnerNotFoundError', + 'RunnerNotAuthorizedError', + 'RunnerProtocolError', + 'RunnerExecutionError', + 'AgentRunnerRegistry', + 'AgentRunContextBuilder', + 'AgentResourceBuilder', + 'AgentResultNormalizer', + 'AgentRunOrchestrator', + 'ConfigMigration', +] \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/config_migration.py b/src/langbot/pkg/agent/runner/config_migration.py new file mode 100644 index 00000000..50cead38 --- /dev/null +++ b/src/langbot/pkg/agent/runner/config_migration.py @@ -0,0 +1,196 @@ +"""Configuration migration for agent runner IDs.""" +from __future__ import annotations + +import typing + +from .id import is_plugin_runner_id + + +# Mapping from old built-in runner names to official plugin runner IDs +OLD_RUNNER_TO_PLUGIN_RUNNER_ID = { + 'local-agent': 'plugin:langbot/local-agent/default', + 'dify-service-api': 'plugin:langbot/dify-agent/default', + 'n8n-service-api': 'plugin:langbot/n8n-agent/default', + 'coze-api': 'plugin:langbot/coze-agent/default', + 'dashscope-app-api': 'plugin:langbot/dashscope-agent/default', + 'langflow-api': 'plugin:langbot/langflow-agent/default', + 'tbox-app-api': 'plugin:langbot/tbox-agent/default', +} + + +class ConfigMigration: + """Configuration migration helper for agent runner IDs. + + Responsibilities: + - Resolve runner ID from new ai.runner.id or old ai.runner.runner + - Map old built-in runner names to official plugin runner IDs + - Extract runner config from ai.runner_config or old ai. + """ + + @staticmethod + def resolve_runner_id(pipeline_config: dict[str, typing.Any]) -> str | None: + """Resolve runner ID from pipeline configuration. + + Priority: + 1. New format: ai.runner.id (must be plugin:* format) + 2. Old format: ai.runner.runner (mapped to plugin:* if built-in) + + Args: + pipeline_config: Pipeline configuration dict + + Returns: + Runner ID string, or None if not configured + """ + ai_config = pipeline_config.get('ai', {}) + runner_config = ai_config.get('runner', {}) + + # Check new format first + runner_id = runner_config.get('id') + if runner_id: + if is_plugin_runner_id(runner_id): + return runner_id + # If it's not a plugin ID, try to map it as old runner name + return OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(runner_id, runner_id) + + # Check old format + old_runner_name = runner_config.get('runner') + if old_runner_name: + # If already plugin:* format, return directly + if is_plugin_runner_id(old_runner_name): + return old_runner_name + # Map old built-in runner to official plugin ID + mapped_id = OLD_RUNNER_TO_PLUGIN_RUNNER_ID.get(old_runner_name) + if mapped_id: + return mapped_id + # Return old name if no mapping exists (will error in registry) + return old_runner_name + + return None + + @staticmethod + def resolve_runner_config( + pipeline_config: dict[str, typing.Any], + runner_id: str, + ) -> dict[str, typing.Any]: + """Resolve runner instance configuration from pipeline configuration. + + Priority: + 1. New format: ai.runner_config[runner_id] + 2. Old format: ai. (mapped from runner_id if applicable) + + Args: + pipeline_config: Pipeline configuration dict + runner_id: Resolved runner ID + + Returns: + Runner configuration dict (empty if not found) + """ + ai_config = pipeline_config.get('ai', {}) + + # Check new format + runner_configs = ai_config.get('runner_config', {}) + if runner_id in runner_configs: + return runner_configs[runner_id] + + # Check old format: ai. + # Try to find old runner name from runner_id + old_runner_name = None + for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items(): + if mapped_id == runner_id: + old_runner_name = old_name + break + + if old_runner_name: + old_config = ai_config.get(old_runner_name, {}) + if old_config: + return old_config + + # If runner_id is plugin:* format, try extracting runner_name as config key + if is_plugin_runner_id(runner_id): + # Some configs might use just the runner_name component as key + # But this is legacy behavior - prefer ai.runner_config[id] + pass + + return {} + + @staticmethod + def get_old_runner_name(runner_id: str) -> str | None: + """Get old runner name from mapped runner ID. + + Args: + runner_id: Plugin runner ID + + Returns: + Old runner name if mapped, None otherwise + """ + for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items(): + if mapped_id == runner_id: + return old_name + return None + + @staticmethod + def get_expire_time(pipeline_config: dict[str, typing.Any]) -> int: + """Get conversation expire time from configuration. + + Args: + pipeline_config: Pipeline configuration dict + + Returns: + Expire time in seconds (0 means no expiry) + """ + ai_config = pipeline_config.get('ai', {}) + runner_config = ai_config.get('runner', {}) + return runner_config.get('expire-time', 0) + + @staticmethod + def migrate_pipeline_config(pipeline_config: dict[str, typing.Any]) -> dict[str, typing.Any]: + """Migrate pipeline config to new format. + + This converts old ai.runner.runner and ai. to + new ai.runner.id and ai.runner_config format. + + Args: + pipeline_config: Original pipeline configuration + + Returns: + Migrated pipeline configuration + """ + # Create copy + new_config = dict(pipeline_config) + ai_config = new_config.get('ai', {}) + if not ai_config: + return new_config + + runner_config = ai_config.get('runner', {}) + runner_configs = ai_config.get('runner_config', {}) + + # Resolve runner ID + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + if runner_id: + # Set new format + runner_config['id'] = runner_id + # Remove old runner field if present + if 'runner' in runner_config and is_plugin_runner_id(runner_config['runner']): + # Already migrated plugin:* format, keep as id + pass + elif 'runner' in runner_config: + # Old built-in runner name, remove after migration + old_name = runner_config['runner'] + if old_name in OLD_RUNNER_TO_PLUGIN_RUNNER_ID: + del runner_config['runner'] + + # Migrate runner config + resolved_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id) + if resolved_config: + runner_configs[runner_id] = resolved_config + # Remove old runner config block + for old_name, mapped_id in OLD_RUNNER_TO_PLUGIN_RUNNER_ID.items(): + if mapped_id == runner_id and old_name in ai_config: + del ai_config[old_name] + + # Update configs + ai_config['runner'] = runner_config + ai_config['runner_config'] = runner_configs + new_config['ai'] = ai_config + + return new_config \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py new file mode 100644 index 00000000..5d5448cd --- /dev/null +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -0,0 +1,254 @@ +"""Agent run context builder for converting Query to SDK v1 AgentRunContext.""" +from __future__ import annotations + +import uuid +import time +import typing + +from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query + +from ...core import app +from .descriptor import AgentRunnerDescriptor +from .config_migration import ConfigMigration + + +# Internal models for SDK v1 context protocol matching SDK v1 resources.py + + +class AgentTrigger(typing.TypedDict): + """Agent trigger information.""" + type: str + source: str # 'pipeline' or 'event_router' + timestamp: int | None + + +class ConversationContext(typing.TypedDict): + """Conversation context.""" + session_id: str | None + conversation_id: str | None + launcher_type: str | None + launcher_id: str | None + sender_id: str | None + bot_uuid: str | None + pipeline_uuid: str | None + + +class AgentInput(typing.TypedDict): + """Agent input.""" + text: str | None + contents: list[dict[str, typing.Any]] + message_chain: dict[str, typing.Any] | None + attachments: list[dict[str, typing.Any]] + + +# SDK v1 Protocol resource models - matching langbot-plugin-sdk/resources.py + + +class ModelResource(typing.TypedDict): + """Model resource per SDK v1.""" + model_id: str + model_type: str | None + provider: str | None + + +class ToolResource(typing.TypedDict): + """Tool resource per SDK v1.""" + tool_name: str + tool_type: str | None + description: str | None + + +class KnowledgeBaseResource(typing.TypedDict): + """Knowledge base resource per SDK v1.""" + kb_id: str + kb_name: str | None + kb_type: str | None + + +class FileResource(typing.TypedDict): + """File resource per SDK v1.""" + file_id: str + file_name: str | None + mime_type: str | None + source: str | None + + +class StorageResource(typing.TypedDict): + """Storage resource per SDK v1.""" + plugin_storage: bool + workspace_storage: bool + + +class AgentResources(typing.TypedDict): + """Agent resources per SDK v1.""" + models: list[ModelResource] + tools: list[ToolResource] + knowledge_bases: list[KnowledgeBaseResource] + files: list[FileResource] + storage: StorageResource + platform_capabilities: dict[str, typing.Any] + + +class AgentRuntimeContext(typing.TypedDict): + """Agent runtime context.""" + langbot_version: str | None + sdk_protocol_version: str + query_id: int | None + trace_id: str | None + deadline_at: int | None + metadata: dict[str, typing.Any] + + +class AgentRunContextV1(typing.TypedDict): + """SDK v1 AgentRunContext per PROTOCOL_V1.md.""" + run_id: str + trigger: AgentTrigger + conversation: ConversationContext | None + event: dict[str, typing.Any] | None # Reserved for EBA + actor: dict[str, typing.Any] | None # Reserved for EBA + subject: dict[str, typing.Any] | None # Reserved for EBA + messages: list[dict[str, typing.Any]] + input: AgentInput + resources: AgentResources + runtime: AgentRuntimeContext + config: dict[str, typing.Any] + + +class AgentRunContextBuilder: + """Builder for converting Query to SDK v1 AgentRunContext. + + Responsibilities: + - Generate new run_id (UUID, not query id) + - Set trigger type to 'message.received' for pipeline + - Build conversation context from session + - Convert messages to SDK format + - Build input from user_message and message_chain + - Set resources from AgentResourceBuilder result + - Build runtime context with host info, trace_id, deadline + - Set config from runner instance configuration + """ + + ap: app.Application + + def __init__(self, ap: app.Application): + self.ap = ap + + async def build_context( + self, + query: pipeline_query.Query, + descriptor: AgentRunnerDescriptor, + resources: AgentResources, + ) -> AgentRunContextV1: + """Build AgentRunContext from Query. + + Args: + query: Pipeline query + descriptor: Runner descriptor + resources: Built resources from AgentResourceBuilder + + Returns: + AgentRunContextV1 dict matching PROTOCOL_V1.md + """ + # Generate new run_id + run_id = str(uuid.uuid4()) + + # Build trigger + trigger: AgentTrigger = { + 'type': 'message.received', + 'source': 'pipeline', + 'timestamp': int(time.time()), + } + + # Build conversation context + conversation: ConversationContext | None = None + if query.session: + conversation = { + 'session_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}', + 'conversation_id': getattr(query.session.using_conversation, 'uuid', None), + 'launcher_type': query.session.launcher_type.value, + 'launcher_id': query.session.launcher_id, + 'sender_id': str(query.sender_id), + 'bot_uuid': query.bot_uuid, + 'pipeline_uuid': query.pipeline_uuid, + } + + # Build input + input: AgentInput = self._build_input(query) + + # Build messages + messages = self._build_messages(query) + + # Get runner config + runner_config = ConfigMigration.resolve_runner_config( + query.pipeline_config, + descriptor.id, + ) + + # Build runtime context + runtime: AgentRuntimeContext = { + 'langbot_version': self.ap.ver_mgr.get_current_version(), + 'sdk_protocol_version': descriptor.protocol_version, + 'query_id': query.query_id, + 'trace_id': run_id, # Use run_id as trace_id for now + 'deadline_at': None, # TODO: set from runner config timeout + 'metadata': { + 'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'), + 'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'), + }, + } + + # Build full context + context: AgentRunContextV1 = { + 'run_id': run_id, + 'trigger': trigger, + 'conversation': conversation, + 'event': None, # Reserved for EBA + 'actor': None, # Reserved for EBA + 'subject': None, # Reserved for EBA + 'messages': messages, + 'input': input, + 'resources': resources, + 'runtime': runtime, + 'config': runner_config, + } + + return context + + def _build_input(self, query: pipeline_query.Query) -> AgentInput: + """Build AgentInput from query.""" + text = None + contents: list[dict[str, typing.Any]] = [] + + if query.user_message: + # Extract text if content is single text element + if isinstance(query.user_message.content, list): + for elem in query.user_message.content: + contents.append(elem.model_dump(mode='json')) + if elem.type == 'text': + text = getattr(elem, 'text', None) + else: + # Single string content + text = str(query.user_message.content) + contents.append({'type': 'text', 'text': text}) + + # Include message_chain for platform-specific format + message_chain_dict = None + if query.message_chain: + message_chain_dict = query.message_chain.model_dump(mode='json') + + return { + 'text': text, + 'contents': contents, + 'message_chain': message_chain_dict, + 'attachments': [], # TODO: extract attachments from message_chain + } + + def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]: + """Build messages list from query.""" + messages: list[dict[str, typing.Any]] = [] + + if query.messages: + for msg in query.messages: + messages.append(msg.model_dump(mode='json')) + + return messages \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/descriptor.py b/src/langbot/pkg/agent/runner/descriptor.py new file mode 100644 index 00000000..154fbb3d --- /dev/null +++ b/src/langbot/pkg/agent/runner/descriptor.py @@ -0,0 +1,72 @@ +"""Agent runner descriptor.""" +from __future__ import annotations + +import typing +import pydantic + + +class AgentRunnerDescriptor(pydantic.BaseModel): + """Descriptor for an agent runner. + + Represents the discovered metadata for a runner, including + its identity, capabilities, permissions, and configuration schema. + """ + + id: str + """Unique runner ID: plugin:author/plugin_name/runner_name""" + + source: typing.Literal['plugin'] + """Runner source type""" + + label: dict[str, str] + """Display labels keyed by locale (e.g., en_US, zh_Hans)""" + + description: dict[str, str] | None = None + """Optional description keyed by locale""" + + plugin_author: str + """Plugin author from manifest""" + + plugin_name: str + """Plugin name from manifest""" + + runner_name: str + """AgentRunner component name from manifest""" + + plugin_version: str | None = None + """Optional plugin version""" + + protocol_version: str = '1' + """SDK protocol version, default '1'""" + + config_schema: list[dict[str, typing.Any]] = [] + """Configuration schema using DynamicForm format""" + + capabilities: dict[str, bool] = {} + """Runner capabilities: streaming, tool_calling, knowledge_retrieval, etc.""" + + permissions: dict[str, list[str]] = {} + """Requested permissions: models, tools, knowledge_bases, storage, files, platform_api""" + + raw_manifest: dict[str, typing.Any] = {} + """Original manifest for reference""" + + model_config = pydantic.ConfigDict( + extra='allow', + ) + + def get_plugin_id(self) -> str: + """Return plugin identifier as author/name.""" + return f'{self.plugin_author}/{self.plugin_name}' + + def supports_streaming(self) -> bool: + """Check if runner supports streaming output.""" + return self.capabilities.get('streaming', False) + + def supports_tool_calling(self) -> bool: + """Check if runner supports tool calling.""" + return self.capabilities.get('tool_calling', False) + + def supports_knowledge_retrieval(self) -> bool: + """Check if runner supports knowledge retrieval.""" + return self.capabilities.get('knowledge_retrieval', False) \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/errors.py b/src/langbot/pkg/agent/runner/errors.py new file mode 100644 index 00000000..ee3223dc --- /dev/null +++ b/src/langbot/pkg/agent/runner/errors.py @@ -0,0 +1,37 @@ +"""Agent runner errors.""" +from __future__ import annotations + + +class AgentRunnerError(Exception): + """Base error for agent runner operations.""" + pass + + +class RunnerNotFoundError(AgentRunnerError): + """Runner not found in registry.""" + def __init__(self, runner_id: str): + self.runner_id = runner_id + super().__init__(f'Agent runner not found: {runner_id}') + + +class RunnerNotAuthorizedError(AgentRunnerError): + """Runner not authorized for this pipeline.""" + def __init__(self, runner_id: str, bound_plugins: list[str] | None): + self.runner_id = runner_id + self.bound_plugins = bound_plugins + super().__init__(f'Agent runner {runner_id} not authorized for bound_plugins={bound_plugins}') + + +class RunnerProtocolError(AgentRunnerError): + """Runner protocol version mismatch or invalid manifest.""" + def __init__(self, runner_id: str, message: str): + self.runner_id = runner_id + super().__init__(f'Agent runner protocol error for {runner_id}: {message}') + + +class RunnerExecutionError(AgentRunnerError): + """Runner execution failed.""" + def __init__(self, runner_id: str, message: str, retryable: bool = False): + self.runner_id = runner_id + self.retryable = retryable + super().__init__(f'Agent runner {runner_id} execution failed: {message}') \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/id.py b/src/langbot/pkg/agent/runner/id.py new file mode 100644 index 00000000..a3d6d2ab --- /dev/null +++ b/src/langbot/pkg/agent/runner/id.py @@ -0,0 +1,92 @@ +"""Agent runner ID parsing and formatting.""" +from __future__ import annotations + +import dataclasses + + +@dataclasses.dataclass(frozen=True) +class RunnerIdParts: + """Parsed runner ID components.""" + source: str # 'plugin' (future: 'builtin') + plugin_author: str + plugin_name: str + runner_name: str + + def to_plugin_id(self) -> str: + """Return plugin identifier as author/name.""" + return f'{self.plugin_author}/{self.plugin_name}' + + +def parse_runner_id(runner_id: str) -> RunnerIdParts: + """Parse runner ID string into components. + + Args: + runner_id: Runner ID in format 'plugin:author/plugin_name/runner_name' + + Returns: + RunnerIdParts with parsed components + + Raises: + ValueError: If runner_id format is invalid + """ + if runner_id.startswith('plugin:'): + parts = runner_id[7:].split('/') + if len(parts) != 3: + raise ValueError( + f'Invalid plugin runner ID format: {runner_id}. ' + f'Expected: plugin:author/plugin_name/runner_name' + ) + plugin_author, plugin_name, runner_name = parts + if not plugin_author or not plugin_name or not runner_name: + raise ValueError( + f'Invalid plugin runner ID: {runner_id}. ' + f'author, plugin_name, and runner_name must be non-empty' + ) + return RunnerIdParts( + source='plugin', + plugin_author=plugin_author, + plugin_name=plugin_name, + runner_name=runner_name, + ) + else: + # For backward compatibility with old built-in runner names + # This should eventually be removed after migration + raise ValueError( + f'Invalid runner ID format: {runner_id}. ' + f'Expected: plugin:author/plugin_name/runner_name' + ) + + +def format_runner_id( + source: str, + plugin_author: str, + plugin_name: str, + runner_name: str, +) -> str: + """Format runner ID from components. + + Args: + source: Runner source ('plugin') + plugin_author: Plugin author + plugin_name: Plugin name + runner_name: Runner component name + + Returns: + Runner ID string + """ + if source == 'plugin': + return f'plugin:{plugin_author}/{plugin_name}/{runner_name}' + else: + raise ValueError(f'Invalid runner source: {source}') + + +def is_plugin_runner_id(runner_id: str) -> bool: + """Check if runner ID is a plugin runner. + + Args: + runner_id: Runner ID string + + Returns: + True if runner ID starts with 'plugin:' + """ + return runner_id.startswith('plugin:') \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py new file mode 100644 index 00000000..9216583a --- /dev/null +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -0,0 +1,158 @@ +"""Agent run orchestrator for coordinating runner execution.""" +from __future__ import annotations + +import typing +import traceback + +from langbot_plugin.api.entities.builtin.provider import message as provider_message +from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query + +from ...core import app +from .descriptor import AgentRunnerDescriptor +from .registry import AgentRunnerRegistry +from .context_builder import AgentRunContextBuilder, AgentRunContextV1 +from .resource_builder import AgentResourceBuilder +from .result_normalizer import AgentResultNormalizer +from .config_migration import ConfigMigration +from .errors import ( + RunnerNotFoundError, + RunnerExecutionError, +) + + +class AgentRunOrchestrator: + """Orchestrator for agent runner execution. + + Responsibilities: + - Resolve runner ID from pipeline config (new or old format) + - Get runner descriptor from registry + - Build AgentRunContext from Query + - Build AgentResources with permission filtering + - Invoke plugin runtime RUN_AGENT action + - Normalize AgentRunResult to Pipeline messages + - Handle errors, timeouts, protocol errors + - Maintain streaming card behavior + + This is the main entry point for ChatMessageHandler. + """ + + ap: app.Application + + registry: AgentRunnerRegistry + + context_builder: AgentRunContextBuilder + + resource_builder: AgentResourceBuilder + + result_normalizer: AgentResultNormalizer + + def __init__( + self, + ap: app.Application, + registry: AgentRunnerRegistry, + ): + self.ap = ap + self.registry = registry + self.context_builder = AgentRunContextBuilder(ap) + self.resource_builder = AgentResourceBuilder(ap) + self.result_normalizer = AgentResultNormalizer(ap) + + async def run_from_query( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: + """Run agent runner from pipeline query. + + This is the main entry point called by ChatMessageHandler. + + Args: + query: Pipeline query with pipeline_config, session, messages, etc. + + Yields: + Message or MessageChunk for pipeline response + + Raises: + RunnerNotFoundError: If runner not found + RunnerNotAuthorizedError: If runner not authorized + RunnerExecutionError: If runner execution failed + """ + # Resolve runner ID + runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) + if not runner_id: + raise RunnerNotFoundError('no runner configured') + + # Get bound plugins for authorization + bound_plugins = query.variables.get('_pipeline_bound_plugins') + + # Get runner descriptor + descriptor = await self.registry.get(runner_id, bound_plugins) + + # Build resources + resources = await self.resource_builder.build_resources(query, descriptor) + + # Build context + context = await self.context_builder.build_context(query, descriptor, resources) + + # Run via plugin connector + async for result_dict in self._invoke_runner(descriptor, context): + # Normalize result + result = await self.result_normalizer.normalize(result_dict, descriptor) + if result is not None: + yield result + + async def _invoke_runner( + self, + descriptor: AgentRunnerDescriptor, + context: AgentRunContextV1, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """Invoke runner via plugin connector. + + Args: + descriptor: Runner descriptor + context: AgentRunContext dict + + Yields: + Raw result dicts from plugin runtime + + Raises: + RunnerExecutionError: If plugin system disabled or runtime error + """ + if not self.ap.plugin_connector.is_enable_plugin: + raise RunnerExecutionError( + descriptor.id, + 'Plugin system is disabled', + retryable=False, + ) + + try: + async for result_dict in self.ap.plugin_connector.run_agent( + plugin_author=descriptor.plugin_author, + plugin_name=descriptor.plugin_name, + runner_name=descriptor.runner_name, + context=context, + ): + yield result_dict + + except RunnerExecutionError: + raise + except Exception as e: + # Wrap unexpected errors + self.ap.logger.error( + f'Runner {descriptor.id} unexpected error: {traceback.format_exc()}' + ) + raise RunnerExecutionError( + descriptor.id, + str(e), + retryable=False, + ) + + def resolve_runner_id_for_telemetry(self, query: pipeline_query.Query) -> str | None: + """Resolve runner ID for telemetry/logging without full execution. + + Args: + query: Pipeline query + + Returns: + Runner ID string, or None + """ + return ConfigMigration.resolve_runner_id(query.pipeline_config) \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/registry.py b/src/langbot/pkg/agent/runner/registry.py new file mode 100644 index 00000000..a2d48798 --- /dev/null +++ b/src/langbot/pkg/agent/runner/registry.py @@ -0,0 +1,277 @@ +"""Agent runner registry for discovering and caching runner descriptors.""" +from __future__ import annotations + +import typing +import asyncio + +from ...core import app +from .descriptor import AgentRunnerDescriptor +from .id import parse_runner_id, format_runner_id +from .errors import RunnerNotFoundError, RunnerNotAuthorizedError + + +class AgentRunnerRegistry: + """Registry for discovering and managing agent runners. + + Responsibilities: + - Discover runners from plugin runtime via LIST_AGENT_RUNNERS + - Validate runner manifests (kind, metadata, spec) + - Cache discovered runners for performance + - Filter runners by bound plugins + - Handle manifest errors gracefully (log warning, skip runner) + """ + + ap: app.Application + + _cache: dict[str, AgentRunnerDescriptor] | None + """Cached runner descriptors keyed by runner ID""" + + _cache_lock: asyncio.Lock + """Lock for cache refresh operations""" + + def __init__(self, ap: app.Application): + self.ap = ap + self._cache = None + self._cache_lock = asyncio.Lock() + + async def _discover_runners(self) -> dict[str, AgentRunnerDescriptor]: + """Discover runners from plugin runtime. + + Always discovers ALL runners (no bound_plugins filter). + The cache should contain unfiltered discovery results. + + Returns: + Dict of runner descriptors keyed by runner ID + """ + if not self.ap.plugin_connector.is_enable_plugin: + return {} + + runners: dict[str, AgentRunnerDescriptor] = {} + + try: + # Always list all runners (bound_plugins=None) + plugin_runners = await self.ap.plugin_connector.list_agent_runners(None) + + for runner_data in plugin_runners: + try: + descriptor = self._validate_and_build_descriptor(runner_data) + if descriptor is not None: + runners[descriptor.id] = descriptor + except Exception as e: + plugin_author = runner_data.get('plugin_author', 'unknown') + plugin_name = runner_data.get('plugin_name', 'unknown') + runner_name = runner_data.get('runner_name', 'unknown') + self.ap.logger.warning( + f'Invalid runner manifest for plugin:{plugin_author}/{plugin_name}/{runner_name}: {e}' + ) + continue + + except Exception as e: + self.ap.logger.warning(f'Failed to list agent runners from plugin runtime: {e}') + return {} + + return runners + + def _validate_and_build_descriptor(self, runner_data: dict[str, typing.Any]) -> AgentRunnerDescriptor | None: + """Validate runner manifest and build descriptor. + + Args: + runner_data: Raw runner data from plugin runtime with fields: + - plugin_author, plugin_name, runner_name + - manifest (full component manifest dict) + - protocol_version, capabilities, permissions, config (extracted from spec) + + Returns: + AgentRunnerDescriptor if valid, None if invalid + """ + plugin_author = runner_data.get('plugin_author', '') + plugin_name = runner_data.get('plugin_name', '') + runner_name = runner_data.get('runner_name', '') + + if not plugin_author or not plugin_name or not runner_name: + return None + + manifest = runner_data.get('manifest', {}) + + # Validate kind + kind = manifest.get('kind', '') + if kind != 'AgentRunner': + return None + + # Validate metadata + metadata = manifest.get('metadata', {}) + name = metadata.get('name', '') + if not name: + return None + + # metadata.label must exist + label = metadata.get('label', {}) + if not label: + label = {name: name} # fallback + + # SDK now provides these directly extracted from spec + protocol_version = runner_data.get('protocol_version', '1') + config_schema = runner_data.get('config', []) + capabilities = runner_data.get('capabilities', {}) + permissions = runner_data.get('permissions', {}) + + # Build descriptor + runner_id = format_runner_id( + source='plugin', + plugin_author=plugin_author, + plugin_name=plugin_name, + runner_name=runner_name, + ) + + return AgentRunnerDescriptor( + id=runner_id, + source='plugin', + label=label, + description=metadata.get('description') or runner_data.get('runner_description'), + plugin_author=plugin_author, + plugin_name=plugin_name, + runner_name=runner_name, + plugin_version=runner_data.get('plugin_version'), + protocol_version=protocol_version, + config_schema=config_schema, + capabilities=capabilities, + permissions=permissions, + raw_manifest=manifest, + ) + + async def refresh(self) -> None: + """Refresh runner cache. + + Always discovers ALL runners (no bound_plugins filter). + The cache contains unfiltered discovery results. + """ + async with self._cache_lock: + self._cache = await self._discover_runners() + + async def list_runners( + self, + bound_plugins: list[str] | None = None, + use_cache: bool = True, + ) -> list[AgentRunnerDescriptor]: + """List available runners. + + Args: + bound_plugins: Optional filter for bound plugins (applied locally) + use_cache: Use cached data if available + + Returns: + List of runner descriptors + """ + if use_cache and self._cache is not None: + # Filter from cache + return self._filter_runners_by_bound_plugins(self._cache, bound_plugins) + + # Discover fresh (always full list) + runners = await self._discover_runners() + + # Update cache (full list, unfiltered) + async with self._cache_lock: + self._cache = runners + + # Filter locally + return self._filter_runners_by_bound_plugins(runners, bound_plugins) + + def _filter_runners_by_bound_plugins( + self, + runners: dict[str, AgentRunnerDescriptor], + bound_plugins: list[str] | None, + ) -> list[AgentRunnerDescriptor]: + """Filter runners by bound plugins. + + Args: + runners: Dict of runner descriptors + bound_plugins: Optional filter (None means all plugins allowed) + + Returns: + Filtered list of runner descriptors + """ + if bound_plugins is None: + # All plugins allowed + return list(runners.values()) + + allowed_plugin_ids = set(bound_plugins) + filtered = [] + for descriptor in runners.values(): + plugin_id = descriptor.get_plugin_id() + if plugin_id in allowed_plugin_ids: + filtered.append(descriptor) + + return filtered + + async def get( + self, + runner_id: str, + bound_plugins: list[str] | None = None, + ) -> AgentRunnerDescriptor: + """Get a specific runner descriptor. + + Args: + runner_id: Runner ID to lookup + bound_plugins: Optional bound plugins filter + + Returns: + AgentRunnerDescriptor + + Raises: + RunnerNotFoundError: If runner not found + RunnerNotAuthorizedError: If runner not in bound plugins + """ + # Parse and validate runner ID format + try: + parse_runner_id(runner_id) + except ValueError as e: + raise RunnerNotFoundError(runner_id) from e + + # Get from cache or discover (always full list) + if self._cache is None: + await self.refresh() + + if self._cache is None: + raise RunnerNotFoundError(runner_id) + + descriptor = self._cache.get(runner_id) + if descriptor is None: + raise RunnerNotFoundError(runner_id) + + # Check authorization + if bound_plugins is not None: + plugin_id = descriptor.get_plugin_id() + if plugin_id not in bound_plugins: + raise RunnerNotAuthorizedError(runner_id, bound_plugins) + + return descriptor + + async def get_runner_metadata_for_pipeline(self) -> list[dict[str, typing.Any]]: + """Get runner metadata for pipeline configuration UI. + + Returns runner options and their config schemas for the DynamicForm. + """ + # Get all runners (no bound plugin filter for metadata listing) + runners = await self.list_runners(bound_plugins=None) + + options = [] + stages = [] + + for descriptor in runners: + # Add runner option + options.append({ + 'name': descriptor.id, + 'label': descriptor.label, + 'description': descriptor.description, + }) + + # Add config schema as stage if not empty + if descriptor.config_schema: + stages.append({ + 'name': descriptor.id, + 'label': descriptor.label, + 'description': descriptor.description, + 'config': descriptor.config_schema, + }) + + return options, stages \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/resource_builder.py b/src/langbot/pkg/agent/runner/resource_builder.py new file mode 100644 index 00000000..00539587 --- /dev/null +++ b/src/langbot/pkg/agent/runner/resource_builder.py @@ -0,0 +1,210 @@ +"""Agent resource builder for constructing authorized resources.""" +from __future__ import annotations + +import typing + +from ...core import app +from .descriptor import AgentRunnerDescriptor +from .context_builder import ( + AgentResources, + ModelResource, + ToolResource, + KnowledgeBaseResource, + StorageResource, +) + + +class AgentResourceBuilder: + """Builder for constructing AgentResources with permission filtering. + + Responsibilities: + - Apply 3-layer permission filtering: + 1. Runner manifest declared permissions + 2. Pipeline extensions_preference (bound plugins/MCP servers) + 3. Runner instance config selected resources + - Build models list from authorized models + - Build tools list from bound plugins/MCP servers + - Build knowledge_bases list from config + - Build storage and files permissions summary + + Note: This only builds the resource declaration. The actual proxy actions + in handler.py must still validate against ctx.resources at runtime. + + Resource field names match SDK v1 Protocol: + - ModelResource: model_id, model_type, provider + - ToolResource: tool_name, tool_type, description + - KnowledgeBaseResource: kb_id, kb_name, kb_type + - StorageResource: plugin_storage, workspace_storage + """ + + ap: app.Application + + def __init__(self, ap: app.Application): + self.ap = ap + + async def build_resources( + self, + query: typing.Any, # pipeline_query.Query + descriptor: AgentRunnerDescriptor, + ) -> AgentResources: + """Build AgentResources from query and runner descriptor. + + Args: + query: Pipeline query with pipeline_config and variables + descriptor: Runner descriptor with permissions and capabilities + + Returns: + AgentResources dict with filtered resource lists + """ + # Get bound plugins and MCP servers from query + bound_plugins = query.variables.get('_pipeline_bound_plugins') + bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers') + + # Layer 1: Runner manifest permissions + manifest_perms = descriptor.permissions + + # Layer 2: Pipeline extensions_preference (already in bound_plugins/MCP servers) + # Layer 3: Runner instance config (from pipeline_config) - resolved via ConfigMigration + from .config_migration import ConfigMigration + runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, descriptor.id) + + # Build each resource category + models = await self._build_models(manifest_perms, query) + tools = await self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query) + knowledge_bases = await self._build_knowledge_bases(manifest_perms, runner_config, query) + storage = self._build_storage(manifest_perms) + + return { + 'models': models, + 'tools': tools, + 'knowledge_bases': knowledge_bases, + 'files': [], # Files are populated at runtime + 'storage': storage, + 'platform_capabilities': {}, # Reserved for EBA + } + + async def _build_models( + self, + manifest_perms: dict[str, list[str]], + query: typing.Any, + ) -> list[ModelResource]: + """Build models list with SDK v1 field names.""" + models: list[ModelResource] = [] + + # Check manifest permission + model_perms = manifest_perms.get('models', []) + if 'invoke' not in model_perms and 'stream' not in model_perms: + return models + + # Get model from query (preproc already resolved this) + model_uuid = getattr(query, 'use_llm_model_uuid', None) + if not model_uuid: + return models + + try: + model = await self.ap.model_mgr.get_model_by_uuid(model_uuid) + if model and model.model_entity: + # Use SDK v1 field names: model_id, model_type, provider + models.append({ + 'model_id': model_uuid, + 'model_type': model.model_entity.model_type, + 'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None, + }) + except Exception: + pass + + # Add fallback models if present + fallback_uuids = query.variables.get('_fallback_model_uuids', []) + for fb_uuid in fallback_uuids: + try: + model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid) + if model and model.model_entity: + models.append({ + 'model_id': fb_uuid, + 'model_type': model.model_entity.model_type, + 'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None, + }) + except Exception: + pass + + return models + + async def _build_tools( + self, + manifest_perms: dict[str, list[str]], + bound_plugins: list[str] | None, + bound_mcp_servers: list[str] | None, + query: typing.Any, + ) -> list[ToolResource]: + """Build tools list with SDK v1 field names.""" + tools: list[ToolResource] = [] + + # Check manifest permission + tool_perms = manifest_perms.get('tools', []) + if 'list' not in tool_perms and 'call' not in tool_perms: + return tools + + # Get tools from query (preproc already resolved this for local-agent) + use_funcs = getattr(query, 'use_funcs', []) + for tool in use_funcs: + # Use SDK v1 field names: tool_name, tool_type, description + tools.append({ + 'tool_name': tool.name, + 'tool_type': None, # Tool type not available in current LLMTool + 'description': tool.description, + }) + + return tools + + async def _build_knowledge_bases( + self, + manifest_perms: dict[str, list[str]], + runner_config: dict[str, typing.Any], + query: typing.Any, + ) -> list[KnowledgeBaseResource]: + """Build knowledge bases list with SDK v1 field names.""" + kb_resources: list[KnowledgeBaseResource] = [] + + # Check manifest permission + kb_perms = manifest_perms.get('knowledge_bases', []) + if 'list' not in kb_perms and 'retrieve' not in kb_perms: + return kb_resources + + # Get knowledge base UUIDs from config + kb_uuids = runner_config.get('knowledge-bases', []) + if not kb_uuids: + # Old single KB config + old_kb_uuid = runner_config.get('knowledge-base', '') + if old_kb_uuid and old_kb_uuid != '__none__': + kb_uuids = [old_kb_uuid] + + # Also check query variables (may be modified by plugin PromptPreProcessing) + kb_uuids_from_vars = query.variables.get('_knowledge_base_uuids', []) + if kb_uuids_from_vars: + kb_uuids = kb_uuids_from_vars + + for kb_uuid in kb_uuids: + try: + kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) + if kb: + # Use SDK v1 field names: kb_id, kb_name, kb_type + kb_resources.append({ + 'kb_id': kb_uuid, + 'kb_name': kb.get_name(), + 'kb_type': kb.knowledge_base_entity.kb_type if hasattr(kb.knowledge_base_entity, 'kb_type') else None, + }) + except Exception: + pass + + return kb_resources + + def _build_storage( + self, + manifest_perms: dict[str, list[str]], + ) -> StorageResource: + """Build storage permissions with SDK v1 field names.""" + storage_perms = manifest_perms.get('storage', []) + return { + 'plugin_storage': 'plugin' in storage_perms, + 'workspace_storage': 'workspace' in storage_perms, + } \ No newline at end of file diff --git a/src/langbot/pkg/agent/runner/result_normalizer.py b/src/langbot/pkg/agent/runner/result_normalizer.py new file mode 100644 index 00000000..fdf0d0a8 --- /dev/null +++ b/src/langbot/pkg/agent/runner/result_normalizer.py @@ -0,0 +1,180 @@ +"""Agent result normalizer for converting SDK v1 AgentRunResult to Pipeline messages.""" +from __future__ import annotations + +import typing + +from langbot_plugin.api.entities.builtin.provider import message as provider_message + +from ...core import app +from .descriptor import AgentRunnerDescriptor +from .errors import RunnerExecutionError, RunnerProtocolError + + +# Maximum size for a single result payload (prevent memory exhaustion) +MAX_RESULT_SIZE_BYTES = 1024 * 1024 # 1 MB + + +class AgentResultNormalizer: + """Normalizer for converting SDK v1 AgentRunResult to Pipeline messages. + + Responsibilities: + - Accept only SDK v1 result types (message.delta, message.completed, etc.) + - Map message.delta -> MessageChunk + - Map message.completed -> Message + - Map run.completed (with message) -> Message + - Handle run.failed as controlled error + - Ignore unknown types with warning + - Validate result size + - Validate message schema + + Per PROTOCOL_V1.md, accepted types: + - message.delta + - message.completed + - tool.call.started + - tool.call.completed + - state.updated + - run.completed + - run.failed + - action.requested (log only, don't execute) + """ + + ap: app.Application + + def __init__(self, ap: app.Application): + self.ap = ap + + async def normalize( + self, + result_dict: dict[str, typing.Any], + descriptor: AgentRunnerDescriptor, + ) -> provider_message.Message | provider_message.MessageChunk | None: + """Normalize AgentRunResult to Message or MessageChunk. + + Args: + result_dict: Raw result dict from plugin runtime + descriptor: Runner descriptor for error context + + Returns: + Message, MessageChunk, or None (for non-message events) + + Raises: + RunnerExecutionError: On run.failed + RunnerProtocolError: On invalid result format + """ + # Validate result type + result_type = result_dict.get('type') + if not result_type: + raise RunnerProtocolError(descriptor.id, 'Missing result type') + + # Validate result size + try: + import json + result_json = json.dumps(result_dict) + if len(result_json) > MAX_RESULT_SIZE_BYTES: + self.ap.logger.warning( + f'Runner {descriptor.id} result too large ({len(result_json)} bytes), truncating' + ) + # Truncate content if possible + data = result_dict.get('data', {}) + if 'chunk' in data or 'message' in data: + content = data.get('chunk', {}).get('content', '') or data.get('message', {}).get('content', '') + if isinstance(content, str) and len(content) > 10000: + # Keep reasonable length + data['chunk'] = {'role': 'assistant', 'content': content[:10000] + '...[truncated]'} + except Exception: + pass + + # Handle each result type + data = result_dict.get('data', {}) + + if result_type == 'message.delta': + return self._normalize_message_delta(data, descriptor) + + elif result_type == 'message.completed': + return self._normalize_message_completed(data, descriptor) + + elif result_type == 'tool.call.started': + # Log only, don't yield to pipeline + self.ap.logger.debug( + f'Runner {descriptor.id} tool call started: {data.get("tool_name", "unknown")}' + ) + return None + + elif result_type == 'tool.call.completed': + # Log only, don't yield to pipeline + self.ap.logger.debug( + f'Runner {descriptor.id} tool call completed: {data.get("tool_name", "unknown")}' + ) + return None + + elif result_type == 'state.updated': + # Log for telemetry, don't yield + self.ap.logger.debug( + f'Runner {descriptor.id} state updated: {data.get("key", "unknown")}={data.get("value", "...")}' + ) + return None + + elif result_type == 'run.completed': + # May include final message + if 'message' in data: + return self._normalize_message_completed(data, descriptor) + # If no message, it's just completion signal + return None + + elif result_type == 'run.failed': + error_msg = data.get('error', 'Unknown error') + error_code = data.get('code', 'unknown') + retryable = data.get('retryable', False) + raise RunnerExecutionError( + descriptor.id, + f'{error_msg} (code: {error_code})', + retryable=retryable, + ) + + elif result_type == 'action.requested': + # Reserved for EBA - log only, don't execute + self.ap.logger.info( + f'Runner {descriptor.id} requested action (not executed in current phase): ' + f'{data.get("action", "unknown")}' + ) + return None + + else: + # Unknown type - warn and ignore (SDK v1 only) + self.ap.logger.warning( + f'Runner {descriptor.id} returned unknown result type: {result_type}. ' + f'Expected SDK v1 types (message.delta, message.completed, run.completed, run.failed, etc.)' + ) + return None + + def _normalize_message_delta( + self, + data: dict[str, typing.Any], + descriptor: AgentRunnerDescriptor, + ) -> provider_message.MessageChunk: + """Normalize message.delta to MessageChunk.""" + chunk_data = data.get('chunk', {}) + if not chunk_data: + raise RunnerProtocolError(descriptor.id, 'message.delta missing chunk data') + + try: + chunk = provider_message.MessageChunk.model_validate(chunk_data) + return chunk + except Exception as e: + raise RunnerProtocolError(descriptor.id, f'Invalid chunk schema: {e}') + + def _normalize_message_completed( + self, + data: dict[str, typing.Any], + descriptor: AgentRunnerDescriptor, + ) -> provider_message.Message: + """Normalize message.completed to Message.""" + message_data = data.get('message', {}) + if not message_data: + raise RunnerProtocolError(descriptor.id, 'message.completed missing message data') + + try: + msg = provider_message.Message.model_validate(message_data) + return msg + except Exception as e: + raise RunnerProtocolError(descriptor.id, f'Invalid message schema: {e}') \ No newline at end of file diff --git a/src/langbot/pkg/api/http/service/pipeline.py b/src/langbot/pkg/api/http/service/pipeline.py index 23d7d0b4..a079d27c 100644 --- a/src/langbot/pkg/api/http/service/pipeline.py +++ b/src/langbot/pkg/api/http/service/pipeline.py @@ -31,7 +31,7 @@ class PipelineService: self.ap = ap async def get_pipeline_metadata(self) -> list[dict]: - """Get pipeline metadata with dynamically loaded plugin runners""" + """Get pipeline metadata with dynamically loaded plugin runners from registry""" import copy # Deep copy AI metadata to avoid modifying the original @@ -48,43 +48,20 @@ class PipelineService: # Find the runner select config for config_item in runner_stage.get('config', []): if config_item.get('name') == 'runner': - # Get plugin agent runners + # Get plugin agent runners from registry try: - plugin_runners = await self.ap.plugin_connector.list_agent_runners() + runner_options, runner_stages = await self.ap.agent_runner_registry.get_runner_metadata_for_pipeline() # Add plugin runners to options - for runner in plugin_runners: - manifest = runner.get('manifest', {}) - metadata = manifest.get('metadata', {}) + for option in runner_options: + config_item['options'].append(option) - # Format: plugin:author/plugin_name/runner_name - runner_value = ( - f'plugin:{runner["plugin_author"]}/{runner["plugin_name"]}/{runner["runner_name"]}' - ) - - # Add to options - config_item['options'].append( - { - 'name': runner_value, - 'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}), - 'description': metadata.get('description'), - } - ) - - # Add corresponding stage configuration for this runner - spec_config = manifest.get('spec', {}).get('config', []) - if spec_config: - ai_metadata['stages'].append( - { - 'name': runner_value, - 'label': metadata.get('label', {runner['runner_name']: runner['runner_name']}), - 'description': metadata.get('description'), - 'config': spec_config, - } - ) + # Add corresponding stage configuration for each runner + for stage_config in runner_stages: + ai_metadata['stages'].append(stage_config) except Exception as e: - self.ap.logger.warning(f'Failed to load plugin agent runners: {e}') + self.ap.logger.warning(f'Failed to load plugin agent runners from registry: {e}') return [ self.ap.pipeline_config_meta_trigger, diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index 7e5386cf..b63e4061 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -4,6 +4,7 @@ import logging import asyncio import traceback import os +from typing import TYPE_CHECKING from ..platform import botmgr as im_mgr from ..platform.webhook_pusher import WebhookPusher @@ -44,6 +45,9 @@ from ..vector import mgr as vectordb_mgr from ..telemetry import telemetry as telemetry_module from ..survey import manager as survey_module +if TYPE_CHECKING: + from ..agent.runner import AgentRunnerRegistry, AgentRunOrchestrator + class Application: """Runtime application object and context""" @@ -158,6 +162,11 @@ class Application: maintenance_service: maintenance_service.MaintenanceService = None + # Agent runner subsystem + agent_runner_registry: AgentRunnerRegistry = None + + agent_run_orchestrator: AgentRunOrchestrator = None + def __init__(self): pass diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 3bb5ffd7..f4cb6f80 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -36,6 +36,7 @@ from ...vector import mgr as vectordb_mgr from .. import taskmgr from ...telemetry import telemetry as telemetry_module from ...survey import manager as survey_module +from ...agent.runner import AgentRunnerRegistry, AgentRunOrchestrator @stage.stage_class('BuildAppStage') @@ -179,5 +180,12 @@ class BuildAppStage(stage.BootingStage): await plugin_connector_inst.initialize() ap.plugin_connector = plugin_connector_inst + # Initialize agent runner subsystem + agent_runner_registry_inst = AgentRunnerRegistry(ap) + ap.agent_runner_registry = agent_runner_registry_inst + + agent_run_orchestrator_inst = AgentRunOrchestrator(ap, agent_runner_registry_inst) + ap.agent_run_orchestrator = agent_run_orchestrator_inst + ctrl = controller.Controller(ap) ap.ctrl = ctrl diff --git a/src/langbot/pkg/pipeline/preproc/preproc.py b/src/langbot/pkg/pipeline/preproc/preproc.py index 83ddce89..9bd6730e 100644 --- a/src/langbot/pkg/pipeline/preproc/preproc.py +++ b/src/langbot/pkg/pipeline/preproc/preproc.py @@ -9,6 +9,12 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.platform.events as platform_events +from ...agent.runner.config_migration import ConfigMigration + + +# Official local-agent runner ID +LOCAL_AGENT_RUNNER_ID = 'plugin:langbot/local-agent/default' + @stage.stage_class('PreProcessor') class PreProcessor(stage.PipelineStage): @@ -31,16 +37,27 @@ class PreProcessor(stage.PipelineStage): stage_inst_name: str, ) -> entities.StageProcessResult: """Process""" - selected_runner = query.pipeline_config['ai']['runner']['runner'] + # Resolve runner ID using ConfigMigration (supports both new and old formats) + runner_id = ConfigMigration.resolve_runner_id(query.pipeline_config) + + # Get runner config (from new ai.runner_config or old ai.) + runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, runner_id) if runner_id else {} session = await self.ap.sess_mgr.get_session(query) + # Determine if this is a local-agent runner (built-in LLM capabilities) + # Check by runner_id OR by legacy runner field for backward compatibility + is_local_agent = runner_id == LOCAL_AGENT_RUNNER_ID or ( + runner_id is None and + query.pipeline_config.get('ai', {}).get('runner', {}).get('runner') == 'local-agent' + ) + # When not local-agent, llm_model is None llm_model = None - if selected_runner == 'local-agent': + if is_local_agent: # Read model config — new format is { primary: str, fallbacks: [str] }, # but handle legacy plain string for backward compatibility - model_config = query.pipeline_config['ai']['local-agent'].get('model', {}) + model_config = runner_config.get('model', {}) if isinstance(model_config, str): # Legacy format: plain UUID string primary_uuid = model_config @@ -67,10 +84,17 @@ class PreProcessor(stage.PipelineStage): if valid_fallbacks: query.variables['_fallback_model_uuids'] = valid_fallbacks + # Get prompt config - for local-agent, use runner_config; for others, use default prompt + prompt_config = runner_config.get('prompt', [ + {'role': 'system', 'content': 'You are a helpful assistant.'} + ]) if is_local_agent else [ + {'role': 'system', 'content': 'You are a helpful assistant.'} + ] + conversation = await self.ap.sess_mgr.get_conversation( query, session, - query.pipeline_config['ai']['local-agent']['prompt'], + prompt_config, query.pipeline_uuid, query.bot_uuid, ) @@ -79,7 +103,7 @@ class PreProcessor(stage.PipelineStage): # been idle for longer than the configured conversation expire time. # The idle window is measured from the last preprocess/update time, not # from the conversation creation time. - conversation_expire_time = query.pipeline_config.get('ai', {}).get('runner', {}).get('expire-time', None) + conversation_expire_time = ConfigMigration.get_expire_time(query.pipeline_config) now = datetime.datetime.now() if conversation_expire_time is not None and conversation_expire_time > 0: last_update_time = getattr(conversation, 'update_time', None) or getattr(conversation, 'create_time', None) @@ -101,7 +125,7 @@ class PreProcessor(stage.PipelineStage): query.prompt = conversation.prompt.copy() query.messages = conversation.messages.copy() - if selected_runner == 'local-agent': + if is_local_agent: query.use_funcs = [] if llm_model: query.use_llm_model_uuid = llm_model.model_entity.uuid @@ -149,7 +173,7 @@ class PreProcessor(stage.PipelineStage): # Check if this model supports vision, if not, remove all images # TODO this checking should be performed in runner, and in this stage, the image should be reserved if ( - selected_runner == 'local-agent' + is_local_agent and llm_model and not llm_model.model_entity.abilities.__contains__('vision') ): @@ -162,14 +186,15 @@ class PreProcessor(stage.PipelineStage): content_list: list[provider_message.ContentElement] = [] plain_text = '' - quote_msg = query.pipeline_config['trigger'].get('misc', '').get('combine-quote-message') + quote_msg = query.pipeline_config['trigger'].get('misc', {}).get('combine-quote-message', False) for me in query.message_chain: if isinstance(me, platform_message.Plain): content_list.append(provider_message.ContentElement.from_text(me.text)) plain_text += me.text elif isinstance(me, platform_message.Image): - if selected_runner != 'local-agent' or ( + # Allow images for non-local-agent runners or if local-agent has vision + if not is_local_agent or ( llm_model and llm_model.model_entity.abilities.__contains__('vision') ): if me.base64 is not None: @@ -190,7 +215,7 @@ class PreProcessor(stage.PipelineStage): if isinstance(msg, platform_message.Plain): content_list.append(provider_message.ContentElement.from_text(msg.text)) elif isinstance(msg, platform_message.Image): - if selected_runner != 'local-agent' or ( + if not is_local_agent or ( llm_model and llm_model.model_entity.abilities.__contains__('vision') ): if msg.base64 is not None: @@ -214,9 +239,10 @@ class PreProcessor(stage.PipelineStage): # Extract knowledge base UUIDs into query variables so plugins can modify them # during PromptPreProcessing before the runner performs retrieval. - kb_uuids = query.pipeline_config['ai']['local-agent'].get('knowledge-bases', []) + # Only for local-agent runner + kb_uuids = runner_config.get('knowledge-bases', []) if is_local_agent else [] if not kb_uuids: - old_kb_uuid = query.pipeline_config['ai']['local-agent'].get('knowledge-base', '') + old_kb_uuid = runner_config.get('knowledge-base', '') if is_local_agent else '' if old_kb_uuid and old_kb_uuid != '__none__': kb_uuids = [old_kb_uuid] query.variables['_knowledge_base_uuids'] = list(kb_uuids) @@ -237,4 +263,4 @@ class PreProcessor(stage.PipelineStage): query.prompt.messages = event_ctx.event.default_prompt query.messages = event_ctx.event.prompt - return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + return entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) \ No newline at end of file diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index 76b55053..b9083e6f 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -9,99 +9,28 @@ from datetime import datetime from .. import handler from ... import entities -from ....provider import runner as runner_module import langbot_plugin.api.entities.events as events -from ....utils import importutil, constants, runner as runner_utils -from ....provider import runners +from ....utils import constants, runner as runner_utils import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.provider.message as provider_message -from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext - - -importutil.import_modules_in_pkg(runners) - - -class PluginAgentRunnerWrapper(runner_module.RequestRunner): - """Wrapper to run AgentRunner from plugin""" - - def __init__(self, ap, plugin_author: str, plugin_name: str, runner_name: str, pipeline_config: dict): - super().__init__(ap, pipeline_config) - self.plugin_author = plugin_author - self.plugin_name = plugin_name - self.runner_name = runner_name - self.name = f'plugin:{plugin_author}/{plugin_name}/{runner_name}' - - async def run( - self, query: pipeline_query.Query - ) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]: - """Run the plugin agent runner""" - - # Build AgentRunContext - context = AgentRunContext( - query_id=query.query_id, - session=query.session, - messages=query.messages, - user_message=query.user_message.content[0] - if isinstance(query.user_message.content, list) - else provider_message.ContentElement.from_text(query.user_message.content), - use_funcs=query.use_funcs, - extra_config=self.pipeline_config.get('ai', {}).get(self.runner_name, {}), - ) - - # Call plugin connector to run agent - async for result_dict in self.ap.plugin_connector.run_agent( - plugin_author=self.plugin_author, - plugin_name=self.plugin_name, - runner_name=self.runner_name, - context=context.model_dump(), - ): - # Convert result to Message/MessageChunk - result_type = result_dict.get('type') - - if result_type == 'chunk': - # Stream chunk - chunk_data = result_dict.get('message_chunk') - if chunk_data: - yield provider_message.MessageChunk.model_validate(chunk_data) - - elif result_type == 'text': - # Text content - content = result_dict.get('content', '') - yield provider_message.MessageChunk( - role='assistant', - content=content, - ) - - elif result_type == 'tool_call': - # Tool call notification (may not need to yield anything here) - pass - - elif result_type == 'finish': - # Final message - message_data = result_dict.get('message') - if message_data: - yield provider_message.Message.model_validate(message_data) - else: - # Fallback: create message from content - content = result_dict.get('content', '') - yield provider_message.Message( - role='assistant', - content=content, - ) class ChatMessageHandler(handler.MessageHandler): + """Chat message handler using AgentRunOrchestrator. + + This handler delegates all runner execution to the agent_run_orchestrator, + which resolves runner ID, builds context, invokes plugin runtime, + and normalizes results. + """ + async def handle( self, query: pipeline_query.Query, ) -> typing.AsyncGenerator[entities.StageProcessResult, None]: - """处理""" - # 调API - # 生成器 - - # 触发插件事件 + """Handle chat message by delegating to AgentRunOrchestrator.""" + # Trigger plugin event event_class = ( events.PersonNormalMessageReceived if query.launcher_type == provider_session.LauncherTypes.PERSON @@ -122,7 +51,7 @@ class ChatMessageHandler(handler.MessageHandler): bound_plugins = query.variables.get('_pipeline_bound_plugins', None) event_ctx = await self.ap.plugin_connector.emit_event(event, bound_plugins) - is_create_card = False # 判断下是否需要创建流式卡片 + is_create_card = False # Track if streaming card was created if event_ctx.is_prevented_default(): if event_ctx.event.reply_message_chain is not None: @@ -153,103 +82,85 @@ class ChatMessageHandler(handler.MessageHandler): is_stream = False try: - runner_name = query.pipeline_config['ai']['runner']['runner'] - - # Check if it's a built-in runner - runner = None - for r in runner_module.preregistered_runners: - if r.name == runner_name: - runner = r(self.ap, query.pipeline_config) - break - - # If not found in built-in runners, check plugin runners - if runner is None: - # Parse runner name: format is "plugin:author/plugin_name/runner_name" - if runner_name.startswith('plugin:'): - parts = runner_name[7:].split('/') # Remove "plugin:" prefix - if len(parts) == 3: - plugin_author, plugin_name, component_runner_name = parts - runner = PluginAgentRunnerWrapper( - self.ap, plugin_author, plugin_name, component_runner_name, query.pipeline_config - ) - else: - raise ValueError( - f'Invalid plugin runner name format: {runner_name}. Expected: plugin:author/name/runner' - ) - else: - raise ValueError(f'Request Runner not found: {runner_name}') - # Mark start time for telemetry start_ts = time.time() - if is_stream: - resp_message_id = uuid.uuid4() - chunk_count = 0 # Track streaming chunks to reduce excessive logging + # Create a single resp_message_id for the entire streaming response + resp_message_id = uuid.uuid4() - async for result in runner.run(query): - result.resp_message_id = str(resp_message_id) + # Use AgentRunOrchestrator to run the agent + # This replaces direct runner lookup and PluginAgentRunnerWrapper + async for result in self.ap.agent_run_orchestrator.run_from_query(query): + result.resp_message_id = str(resp_message_id) + + # For streaming mode, pop previous response before adding new chunk + # This allows incremental card updates + if is_stream: if query.resp_messages: query.resp_messages.pop() if query.resp_message_chain: query.resp_message_chain.pop() - # 此时连接外部 AI 服务正常,创建卡片 - if not is_create_card: # 只有不是第一次才创建卡片 - await query.adapter.create_message_card(str(resp_message_id), query.message_event) - is_create_card = True - query.resp_messages.append(result) - chunk_count += 1 - # Only log every 10th chunk to reduce excessive logging during streaming - # This prevents memory overflow from thousands of log entries per conversation - # First chunk uses INFO level to confirm connection establishment - if chunk_count == 1: - self.ap.logger.info( - f'Conversation({query.query_id}) Streaming started: {self.cut_str(result.readable_str())}' - ) - elif chunk_count % 10 == 0: - self.ap.logger.debug( - f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}' - ) + # Create streaming card on first result (connection established) + if is_stream and not is_create_card: + await query.adapter.create_message_card(str(resp_message_id), query.message_event) + is_create_card = True - if result.content is not None: - text_length += len(result.content) - - yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) - - # Log final summary after streaming completes - self.ap.logger.info( - f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars' - ) - - else: - async for result in runner.run(query): - query.resp_messages.append(result) + query.resp_messages.append(result) + # Logging (reduce verbosity for streaming chunks) + if not is_stream: self.ap.logger.info( f'Conversation({query.query_id}) Response: {self.cut_str(result.readable_str())}' ) - if result.content is not None: - text_length += len(result.content) + if result.content is not None: + text_length += len(result.content) - yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + # Log final summary after streaming completes + if is_stream: + chunk_count = len(query.resp_messages) + self.ap.logger.info( + f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars' + ) + + # Update conversation history query.session.using_conversation.messages.append(query.user_message) - query.session.using_conversation.messages.extend(query.resp_messages) + except Exception as e: + # Import orchestrator errors for specific handling + from ....agent.runner.errors import ( + RunnerNotFoundError, + RunnerNotAuthorizedError, + RunnerExecutionError, + ) + error_info = f'{traceback.format_exc()}' self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {error_info}') - traceback.print_exc() - exception_handling = query.pipeline_config['output']['misc'].get('exception-handling', 'show-hint') + # Handle specific runner errors with appropriate messages + if isinstance(e, RunnerNotFoundError): + user_notice = f'Agent runner not found: {e.runner_id}' + elif isinstance(e, RunnerNotAuthorizedError): + user_notice = 'Agent runner not authorized for this pipeline' + elif isinstance(e, RunnerExecutionError): + if e.retryable: + user_notice = 'Agent runner temporarily unavailable. Please try again.' + else: + user_notice = 'Agent runner execution failed.' + else: + # Use existing exception handling + exception_handling = query.pipeline_config['output']['misc'].get('exception-handling', 'show-hint') - if exception_handling == 'show-error': - user_notice = f'{e}' - elif exception_handling == 'show-hint': - user_notice = query.pipeline_config['output']['misc'].get('failure-hint', 'Request failed.') - else: # hide - user_notice = None + if exception_handling == 'show-error': + user_notice = f'{e}' + elif exception_handling == 'show-hint': + user_notice = query.pipeline_config['output']['misc'].get('failure-hint', 'Request failed.') + else: # hide + user_notice = None yield entities.StageProcessResult( result_type=entities.ResultType.INTERRUPT, @@ -259,7 +170,7 @@ class ChatMessageHandler(handler.MessageHandler): debug_notice=traceback.format_exc(), ) finally: - # Telemetry reporting: collect minimal per-query execution info and send asynchronously + # Telemetry reporting try: end_ts = time.time() duration_ms = None @@ -267,16 +178,14 @@ class ChatMessageHandler(handler.MessageHandler): duration_ms = int((end_ts - start_ts) * 1000) adapter_name = query.adapter.__class__.__name__ if hasattr(query, 'adapter') else None - runner_name = ( - query.pipeline_config.get('ai', {}).get('runner', {}).get('runner') - if query.pipeline_config - else None - ) - # Model name if using localagent + # Use orchestrator to resolve runner ID for telemetry + runner_name = self.ap.agent_run_orchestrator.resolve_runner_id_for_telemetry(query) + + # Model name if available model_name = None try: - if runner_name == 'local-agent' and getattr(query, 'use_llm_model_uuid', None): + if getattr(query, 'use_llm_model_uuid', None): m = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid) if m and getattr(m, 'model_entity', None): model_name = getattr(m.model_entity, 'name', None) @@ -286,7 +195,7 @@ class ChatMessageHandler(handler.MessageHandler): pipeline_plugins = query.variables.get('_pipeline_bound_plugins', None) runner_category = runner_utils.get_runner_category_from_runner( - runner_name, runner, query.pipeline_config + runner_name, None, query.pipeline_config ) payload = { @@ -304,7 +213,6 @@ class ChatMessageHandler(handler.MessageHandler): 'timestamp': datetime.utcnow().isoformat(), } - # Send telemetry asynchronously and do not block pipeline via app's telemetry manager await self.ap.telemetry.start_send_task(payload) # Trigger survey event on first successful non-WebSocket response @@ -312,5 +220,4 @@ class ChatMessageHandler(handler.MessageHandler): if self.ap.survey: await self.ap.survey.trigger_event('first_bot_response_success') except Exception as ex: - # Ensure telemetry issues do not affect normal flow - self.ap.logger.warning(f'Failed to send telemetry: {ex}') + self.ap.logger.warning(f'Failed to send telemetry: {ex}') \ No newline at end of file diff --git a/src/langbot/pkg/plugin/connector.py b/src/langbot/pkg/plugin/connector.py index 22cbb14d..4bf89fd2 100644 --- a/src/langbot/pkg/plugin/connector.py +++ b/src/langbot/pkg/plugin/connector.py @@ -600,14 +600,16 @@ class PluginRuntimeConnector: yield cmd_ret # AgentRunner methods - async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[ComponentManifest]: - """List all available AgentRunner components.""" + async def list_agent_runners(self, bound_plugins: list[str] | None = None) -> list[dict[str, Any]]: + """List all available AgentRunner components. + + Returns list of dicts with plugin_author, plugin_name, runner_name, manifest, etc. + """ if not self.is_enable_plugin: return [] runners_data = await self.handler.list_agent_runners(include_plugins=bound_plugins) - runners = [ComponentManifest.model_validate(runner) for runner in runners_data] - return runners + return runners_data async def run_agent( self, @@ -625,10 +627,18 @@ class PluginRuntimeConnector: context: AgentRunContext as dict Yields: - AgentRunReturn results as dicts + AgentRunResult dicts per Protocol v1 """ if not self.is_enable_plugin: - yield {'type': 'finish', 'finish_reason': 'error', 'content': 'Plugin system is disabled'} + # Return v1 protocol run.failed + yield { + 'type': 'run.failed', + 'data': { + 'error': 'Plugin system is disabled', + 'code': 'plugin.disabled', + 'retryable': False, + }, + } return gen = self.handler.run_agent(plugin_author, plugin_name, runner_name, context) diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 6952296d..6655b698 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -419,76 +419,6 @@ class RuntimeConnectionHandler(handler.Handler): message=f'Failed to execute tool {tool_name}: {e}', ) - @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE) - async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse: - """Retrieve knowledge from a knowledge base""" - kb_uuid = data['kb_uuid'] - query = data['query'] - top_k = data.get('top_k', 5) - - try: - kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) - if kb is None: - return handler.ActionResponse.error( - message=f'Knowledge base with uuid {kb_uuid} not found', - ) - - results = await kb.retrieve(query=query, top_k=top_k) - - # Convert results to dict format - results_data = [ - { - 'id': r.id, - 'content': [c.model_dump() for c in r.content], - 'metadata': r.metadata, - } - for r in results - ] - - return handler.ActionResponse.success( - data={ - 'results': results_data, - }, - ) - except Exception as e: - traceback.print_exc() - return handler.ActionResponse.error( - message=f'Failed to retrieve knowledge: {e}', - ) - - @self.action(PluginToRuntimeAction.INVOKE_EMBEDDING) - async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse: - """Invoke an embedding model""" - embedding_model_uuid = data['embedding_model_uuid'] - texts = data['texts'] - - try: - embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid) - if embedding_model is None: - return handler.ActionResponse.error( - message=f'Embedding model with uuid {embedding_model_uuid} not found', - ) - - # Call embedding model to generate embeddings - embeddings = [] - for text in texts: - embedding = await embedding_model.provider.invoke_embedding( - model=embedding_model, - text=text, - ) - embeddings.append(embedding) - - return handler.ActionResponse.success( - data={ - 'embeddings': embeddings, - }, - ) - except Exception as e: - traceback.print_exc() - return handler.ActionResponse.error( - message=f'Failed to invoke embedding model: {e}', - ) - @self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE) async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: """Set binary storage""" @@ -856,10 +786,11 @@ class RuntimeConnectionHandler(handler.Handler): # Validate kb_id is in pipeline's allowed list allowed_kb_uuids = [] if query.pipeline_config: - local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {}) - allowed_kb_uuids = local_agent_config.get('knowledge-bases', []) + from langbot.pkg.agent.runner.config_migration import ConfigMigration + runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, None) + allowed_kb_uuids = runner_config.get('knowledge-bases', []) if not allowed_kb_uuids: - old_kb_uuid = local_agent_config.get('knowledge-base', '') + old_kb_uuid = runner_config.get('knowledge-base', '') if old_kb_uuid and old_kb_uuid != '__none__': allowed_kb_uuids = [old_kb_uuid] @@ -1025,6 +956,55 @@ class RuntimeConnectionHandler(handler.Handler): return result['tools'] + async def list_agent_runners(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]: + """List agent runners from plugin runtime. + + Returns list of dicts with: + - plugin_author + - plugin_name + - runner_name + - runner_description + - manifest + - protocol_version + - capabilities + - permissions + - config + """ + result = await self.call_action( + LangBotToRuntimeAction.LIST_AGENT_RUNNERS, + { + 'include_plugins': include_plugins, + }, + timeout=20, + ) + + return result['runners'] + + async def run_agent( + self, + plugin_author: str, + plugin_name: str, + runner_name: str, + context: dict[str, Any], + ) -> typing.AsyncGenerator[dict[str, Any], None]: + """Run an AgentRunner component. + + Yields AgentRunResult dicts per Protocol v1. + """ + gen = self.call_action_generator( + LangBotToRuntimeAction.RUN_AGENT, + { + 'plugin_author': plugin_author, + 'plugin_name': plugin_name, + 'runner_name': runner_name, + 'context': context, + }, + timeout=300, + ) + + async for ret in gen: + yield ret + async def get_plugin_icon(self, plugin_author: str, plugin_name: str) -> dict[str, Any]: """Get plugin icon""" result = await self.call_action( diff --git a/tests/unit_tests/agent/__init__.py b/tests/unit_tests/agent/__init__.py new file mode 100644 index 00000000..ba10b285 --- /dev/null +++ b/tests/unit_tests/agent/__init__.py @@ -0,0 +1,2 @@ +"""Tests for agent runner subsystem.""" +from __future__ import annotations \ No newline at end of file diff --git a/tests/unit_tests/agent/test_chat_handler.py b/tests/unit_tests/agent/test_chat_handler.py new file mode 100644 index 00000000..40d1a5eb --- /dev/null +++ b/tests/unit_tests/agent/test_chat_handler.py @@ -0,0 +1,553 @@ +"""Tests for ChatMessageHandler behavior with AgentRunOrchestrator. + +Tests focus on: +- Streaming mode behavior (single resp_message_id, pop/append pattern) +- Non-streaming mode behavior (no pop) +- Orchestrator invocation +- Error handling for RunnerNotFoundError, RunnerExecutionError + +Avoids circular imports by using proper import structure. +""" +from __future__ import annotations + +import uuid +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from langbot.pkg.agent.runner.errors import ( + RunnerNotFoundError, + RunnerExecutionError, + RunnerNotAuthorizedError, +) +from langbot.pkg.agent.runner.config_migration import ConfigMigration + + +# Define mock classes in dependency order (no forward references needed) + +class MockLauncherType: + value = 'person' + + +class MockConversation: + uuid = 'conv-uuid' + messages = [] + + +class MockMessage: + role = 'user' + content = 'Hello' + + +class MockAdapter: + is_stream = False + + async def is_stream_output_supported(self): + return self.is_stream + + async def create_message_card(self, resp_message_id, message_event): + pass + + +class MockSession: + launcher_type = MockLauncherType() + launcher_id = 'user123' + using_conversation = MockConversation() + + +class MockQuery: + """Mock Query for testing.""" + def __init__(self): + self.query_id = 1 + self.launcher_type = MockLauncherType() + self.launcher_id = 'user123' + self.sender_id = 'user123' + self.bot_uuid = 'bot-uuid' + self.pipeline_uuid = 'pipeline-uuid' + self.pipeline_config = { + 'ai': { + 'runner': { + 'id': 'plugin:langbot/local-agent/default', + }, + 'runner_config': {}, + }, + 'output': { + 'misc': { + 'exception-handling': 'show-hint', + 'failure-hint': 'Request failed.', + }, + }, + } + self.variables = {} + self.session = MockSession() + self.user_message = MockMessage() + self.messages = [] + self.resp_messages = [] + self.resp_message_chain = None + self.adapter = MockAdapter() + self.message_event = MagicMock() + self.message_chain = MagicMock() + + +class MockMessageChunk: + """Mock MessageChunk for testing.""" + def __init__(self, content, resp_message_id=None): + self.role = 'assistant' + self.content = content + self.resp_message_id = resp_message_id + self.is_final = False + + def readable_str(self): + return self.content + + +class MockEventContext: + """Mock event context for testing.""" + def __init__(self, prevented=False, reply_message_chain=None, user_message_alter=None): + self._prevented = prevented + self.event = MagicMock() + self.event.reply_message_chain = reply_message_chain + self.event.user_message_alter = user_message_alter + + def is_prevented_default(self): + return self._prevented + + +class MockAgentRunOrchestrator: + """Mock AgentRunOrchestrator for testing.""" + def __init__(self, chunks=None, error=None): + self._chunks = chunks or [] + self._error = error + + async def run_from_query(self, query): + """Async generator that yields chunks or raises error.""" + if self._error: + raise self._error + for chunk in self._chunks: + yield chunk + + def resolve_runner_id_for_telemetry(self, query): + return 'plugin:langbot/local-agent/default' + + +class MockApplication: + """Mock Application for testing.""" + def __init__(self, orchestrator=None): + self.agent_run_orchestrator = orchestrator or MockAgentRunOrchestrator() + self.logger = MagicMock() + self.logger.info = MagicMock() + self.logger.debug = MagicMock() + self.logger.warning = MagicMock() + self.logger.error = MagicMock() + + # Mock plugin_connector + self.plugin_connector = MagicMock() + self.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext()) + + # Mock telemetry + self.telemetry = MagicMock() + self.telemetry.start_send_task = AsyncMock() + + # Mock survey + self.survey = MagicMock() + self.survey.trigger_event = AsyncMock() + + # Mock model_mgr + self.model_mgr = MagicMock() + self.model_mgr.get_model_by_uuid = AsyncMock(return_value=None) + + +class TestStreamingBehavior: + """Tests for streaming mode behavior.""" + + def test_single_resp_message_id_for_streaming(self): + """Streaming mode should use single resp_message_id for entire response.""" + # Simulate the streaming logic: resp_message_id created outside loop + resp_message_id = uuid.uuid4() + + chunks = ['Hello', ' World', '!'] + resp_messages = [] + + for chunk in chunks: + result = MockMessageChunk(chunk) + result.resp_message_id = str(resp_message_id) + + # Pop old chunk (streaming behavior) + if resp_messages: + resp_messages.pop() + resp_messages.append(result) + + # All chunks should have same resp_message_id + assert len(resp_messages) == 1 # Only last chunk remains after pop/append + assert resp_messages[0].resp_message_id == str(resp_message_id) + + def test_pop_before_append_in_streaming(self): + """Streaming mode should pop old chunk before appending new.""" + resp_message_id = uuid.uuid4() + resp_messages = [] + + # First chunk - no pop + chunk1 = MockMessageChunk('Hello') + chunk1.resp_message_id = str(resp_message_id) + resp_messages.append(chunk1) + assert len(resp_messages) == 1 + + # Second chunk - pop first, then append + if resp_messages: + resp_messages.pop() + chunk2 = MockMessageChunk('Hello World') + chunk2.resp_message_id = str(resp_message_id) + resp_messages.append(chunk2) + assert len(resp_messages) == 1 + assert resp_messages[0].content == 'Hello World' + + def test_non_streaming_no_pop(self): + """Non-streaming mode should NOT pop previous responses.""" + resp_messages = [] + + # First message + msg1 = MockMessageChunk('Response 1') + resp_messages.append(msg1) + assert len(resp_messages) == 1 + + # Second message - should NOT pop in non-streaming + msg2 = MockMessageChunk('Response 2') + resp_messages.append(msg2) + assert len(resp_messages) == 2 + + +class TestConfigMigrationInChatHandler: + """Tests for ConfigMigration usage in chat handler context.""" + + def test_resolve_runner_id_from_pipeline_config(self): + """Chat handler should use ConfigMigration to resolve runner ID.""" + pipeline_config = { + 'ai': { + 'runner': { + 'id': 'plugin:langbot/local-agent/default', + }, + }, + } + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id == 'plugin:langbot/local-agent/default' + + def test_resolve_runner_id_from_old_format(self): + """ConfigMigration should handle old runner format.""" + pipeline_config = { + 'ai': { + 'runner': { + 'runner': 'local-agent', + }, + }, + } + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id == 'plugin:langbot/local-agent/default' + + +class TestErrorHandling: + """Tests for orchestrator error handling.""" + + def test_runner_not_found_error_properties(self): + """RunnerNotFoundError should have runner_id property.""" + error = RunnerNotFoundError('plugin:notexist/unknown/default') + assert error.runner_id == 'plugin:notexist/unknown/default' + assert 'not found' in str(error) + + def test_runner_execution_error_retryable(self): + """RunnerExecutionError should have retryable property.""" + error = RunnerExecutionError( + 'plugin:langbot/local-agent/default', + 'Upstream timeout', + retryable=True, + ) + assert error.runner_id == 'plugin:langbot/local-agent/default' + assert error.retryable is True + assert 'timeout' in str(error) + + def test_runner_execution_error_not_retryable(self): + """RunnerExecutionError can be non-retryable.""" + error = RunnerExecutionError( + 'plugin:langbot/local-agent/default', + 'Configuration error', + retryable=False, + ) + assert error.retryable is False + + def test_runner_not_authorized_error_properties(self): + """RunnerNotAuthorizedError should have bound_plugins property.""" + error = RunnerNotAuthorizedError( + 'plugin:langbot/local-agent/default', + ['langbot/dify-agent'], + ) + assert error.runner_id == 'plugin:langbot/local-agent/default' + assert error.bound_plugins == ['langbot/dify-agent'] + + +class TestChatHandlerImports: + """Test that chat handler can be imported without circular import.""" + + def test_import_chat_handler_module(self): + """Import chat handler module should work.""" + # This test verifies the import works without circular dependency + from langbot.pkg.pipeline.process.handlers import chat + assert chat.ChatMessageHandler is not None + + def test_chat_handler_class_exists(self): + """ChatMessageHandler class should be defined.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + assert ChatMessageHandler.__name__ == 'ChatMessageHandler' + + def test_chat_handler_has_handle_method(self): + """ChatMessageHandler should have async generator handle method.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + assert hasattr(ChatMessageHandler, 'handle') + # handle returns AsyncGenerator, so check for async generator function + import inspect + assert inspect.isasyncgenfunction(ChatMessageHandler.handle) + + +class TestChatHandlerAsyncBehavior: + """Real async tests for ChatMessageHandler.handle() with mocked orchestrator.""" + + @pytest.mark.asyncio + async def test_streaming_single_resp_message_id(self): + """Streaming mode: all chunks should have same resp_message_id.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + from langbot.pkg.pipeline import entities + + # Create chunks for streaming + chunks = [ + MockMessageChunk('Hello'), + MockMessageChunk('Hello World'), + MockMessageChunk('Hello World!'), + ] + + orchestrator = MockAgentRunOrchestrator(chunks=chunks) + mock_ap = MockApplication(orchestrator=orchestrator) + + # Mock event context to not prevent default + event_ctx = MockEventContext(prevented=False) + mock_ap.plugin_connector.emit_event = AsyncMock(return_value=event_ctx) + + query = MockQuery() + query.adapter.is_stream = True # Enable streaming mode + + handler = ChatMessageHandler(mock_ap) + + # Mock event creation and StageProcessResult to bypass pydantic validation + mock_event = MagicMock() + mock_event.return_value = MagicMock() + + def make_result(*args, **kwargs): + return MagicMock(result_type=kwargs.get('result_type', entities.ResultType.CONTINUE)) + + with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \ + patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result): + mock_events_module.PersonNormalMessageReceived = mock_event + mock_events_module.GroupNormalMessageReceived = mock_event + + results = [] + async for result in handler.handle(query): + results.append(result) + + # Verify single resp_message_id + resp_ids = [msg.resp_message_id for msg in query.resp_messages if hasattr(msg, 'resp_message_id')] + assert len(set(resp_ids)) == 1 # All same ID + + # Verify pop/append pattern: only last chunk remains + assert len(query.resp_messages) == 1 + assert query.resp_messages[0].content == 'Hello World!' + + @pytest.mark.asyncio + async def test_non_streaming_no_pop(self): + """Non-streaming mode: all chunks should remain.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + from langbot.pkg.pipeline import entities + + chunks = [ + MockMessageChunk('Response 1'), + MockMessageChunk('Response 2'), + ] + + orchestrator = MockAgentRunOrchestrator(chunks=chunks) + mock_ap = MockApplication(orchestrator=orchestrator) + mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False)) + + query = MockQuery() + query.adapter.is_stream = False # Disable streaming mode + + handler = ChatMessageHandler(mock_ap) + + mock_event = MagicMock() + mock_event.return_value = MagicMock() + + def make_result(*args, **kwargs): + return MagicMock(result_type=kwargs.get('result_type', entities.ResultType.CONTINUE)) + + with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \ + patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result): + mock_events_module.PersonNormalMessageReceived = mock_event + mock_events_module.GroupNormalMessageReceived = mock_event + + results = [] + async for result in handler.handle(query): + results.append(result) + + # No pop: all chunks should remain + assert len(query.resp_messages) == 2 + assert query.resp_messages[0].content == 'Response 1' + assert query.resp_messages[1].content == 'Response 2' + + @pytest.mark.asyncio + async def test_runner_not_found_error(self): + """Handler should catch RunnerNotFoundError and return INTERRUPT.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + from langbot.pkg.pipeline import entities + + orchestrator = MockAgentRunOrchestrator( + error=RunnerNotFoundError('plugin:notexist/unknown/default') + ) + mock_ap = MockApplication(orchestrator=orchestrator) + mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False)) + + query = MockQuery() + + handler = ChatMessageHandler(mock_ap) + + mock_event = MagicMock() + mock_event.return_value = MagicMock() + + def make_result(*args, **kwargs): + return MagicMock( + result_type=kwargs.get('result_type'), + user_notice=kwargs.get('user_notice'), + ) + + with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \ + patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result): + mock_events_module.PersonNormalMessageReceived = mock_event + mock_events_module.GroupNormalMessageReceived = mock_event + + results = [] + async for result in handler.handle(query): + results.append(result) + + # Should return INTERRUPT with user_notice + assert len(results) == 1 + assert results[0].result_type == entities.ResultType.INTERRUPT + assert 'not found' in results[0].user_notice + + @pytest.mark.asyncio + async def test_runner_not_authorized_error(self): + """Handler should catch RunnerNotAuthorizedError and return INTERRUPT.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + from langbot.pkg.pipeline import entities + + orchestrator = MockAgentRunOrchestrator( + error=RunnerNotAuthorizedError('plugin:langbot/local-agent/default', ['other/plugin']) + ) + mock_ap = MockApplication(orchestrator=orchestrator) + mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False)) + + query = MockQuery() + + handler = ChatMessageHandler(mock_ap) + + mock_event = MagicMock() + mock_event.return_value = MagicMock() + + def make_result(*args, **kwargs): + return MagicMock( + result_type=kwargs.get('result_type'), + user_notice=kwargs.get('user_notice'), + ) + + with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \ + patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result): + mock_events_module.PersonNormalMessageReceived = mock_event + mock_events_module.GroupNormalMessageReceived = mock_event + + results = [] + async for result in handler.handle(query): + results.append(result) + + assert len(results) == 1 + assert results[0].result_type == entities.ResultType.INTERRUPT + assert 'not authorized' in results[0].user_notice + + @pytest.mark.asyncio + async def test_runner_execution_error_retryable(self): + """Handler should catch retryable RunnerExecutionError.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + from langbot.pkg.pipeline import entities + + orchestrator = MockAgentRunOrchestrator( + error=RunnerExecutionError('plugin:langbot/local-agent/default', 'timeout', retryable=True) + ) + mock_ap = MockApplication(orchestrator=orchestrator) + mock_ap.plugin_connector.emit_event = AsyncMock(return_value=MockEventContext(prevented=False)) + + query = MockQuery() + + handler = ChatMessageHandler(mock_ap) + + mock_event = MagicMock() + mock_event.return_value = MagicMock() + + def make_result(*args, **kwargs): + return MagicMock( + result_type=kwargs.get('result_type'), + user_notice=kwargs.get('user_notice'), + ) + + with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \ + patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result): + mock_events_module.PersonNormalMessageReceived = mock_event + mock_events_module.GroupNormalMessageReceived = mock_event + + results = [] + async for result in handler.handle(query): + results.append(result) + + assert len(results) == 1 + assert results[0].result_type == entities.ResultType.INTERRUPT + assert 'temporarily unavailable' in results[0].user_notice + + @pytest.mark.asyncio + async def test_prevented_default_with_reply(self): + """When event prevented default with reply, use reply message.""" + from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler + from langbot.pkg.pipeline import entities + + # Mock reply message chain + reply_chain = MockMessageChunk('Reply from plugin') + + mock_ap = MockApplication() + mock_ap.plugin_connector.emit_event = AsyncMock( + return_value=MockEventContext(prevented=True, reply_message_chain=reply_chain) + ) + + query = MockQuery() + + handler = ChatMessageHandler(mock_ap) + + mock_event = MagicMock() + mock_event.return_value = MagicMock() + + def make_result(*args, **kwargs): + return MagicMock(result_type=kwargs.get('result_type', entities.ResultType.CONTINUE)) + + with patch('langbot.pkg.pipeline.process.handlers.chat.events') as mock_events_module, \ + patch('langbot.pkg.pipeline.entities.StageProcessResult', side_effect=make_result): + mock_events_module.PersonNormalMessageReceived = mock_event + mock_events_module.GroupNormalMessageReceived = mock_event + + results = [] + async for result in handler.handle(query): + results.append(result) + + # Should return CONTINUE with reply message + assert len(results) == 1 + assert results[0].result_type == entities.ResultType.CONTINUE + assert len(query.resp_messages) == 1 \ No newline at end of file diff --git a/tests/unit_tests/agent/test_config_migration.py b/tests/unit_tests/agent/test_config_migration.py new file mode 100644 index 00000000..202e0eb3 --- /dev/null +++ b/tests/unit_tests/agent/test_config_migration.py @@ -0,0 +1,231 @@ +"""Tests for agent runner config migration.""" +from __future__ import annotations + + +from langbot.pkg.agent.runner.config_migration import ( + ConfigMigration, + OLD_RUNNER_TO_PLUGIN_RUNNER_ID, +) + + +class TestOldRunnerMapping: + """Tests for OLD_RUNNER_TO_PLUGIN_RUNNER_ID mapping.""" + + def test_local_agent_mapping(self): + """Local-agent should map to official plugin.""" + assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['local-agent'] == 'plugin:langbot/local-agent/default' + + def test_dify_mapping(self): + """Dify should map to official plugin.""" + assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['dify-service-api'] == 'plugin:langbot/dify-agent/default' + + def test_n8n_mapping(self): + """n8n should map to official plugin.""" + assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['n8n-service-api'] == 'plugin:langbot/n8n-agent/default' + + def test_coze_mapping(self): + """Coze should map to official plugin.""" + assert OLD_RUNNER_TO_PLUGIN_RUNNER_ID['coze-api'] == 'plugin:langbot/coze-agent/default' + + def test_all_runners_mapped(self): + """All old runners should have mapping.""" + expected_runners = [ + 'local-agent', + 'dify-service-api', + 'n8n-service-api', + 'coze-api', + 'dashscope-app-api', + 'langflow-api', + 'tbox-app-api', + ] + for runner in expected_runners: + assert runner in OLD_RUNNER_TO_PLUGIN_RUNNER_ID + mapped = OLD_RUNNER_TO_PLUGIN_RUNNER_ID[runner] + assert mapped.startswith('plugin:langbot/') + assert mapped.endswith('/default') + + +class TestResolveRunnerId: + """Tests for ConfigMigration.resolve_runner_id.""" + + def test_resolve_new_format_runner_id(self): + """Resolve runner ID from new format.""" + pipeline_config = { + 'ai': { + 'runner': { + 'id': 'plugin:langbot/local-agent/default', + }, + }, + } + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id == 'plugin:langbot/local-agent/default' + + def test_resolve_old_format_runner_name(self): + """Resolve runner ID from old format.""" + pipeline_config = { + 'ai': { + 'runner': { + 'runner': 'local-agent', + }, + }, + } + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id == 'plugin:langbot/local-agent/default' + + def test_resolve_old_format_plugin_runner(self): + """Resolve already migrated plugin:* runner.""" + pipeline_config = { + 'ai': { + 'runner': { + 'runner': 'plugin:alice/my-agent/custom', + }, + }, + } + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id == 'plugin:alice/my-agent/custom' + + def test_resolve_no_runner_config(self): + """Resolve runner ID when not configured.""" + pipeline_config = {} + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id is None + + def test_resolve_priority_new_over_old(self): + """New format takes priority over old format.""" + pipeline_config = { + 'ai': { + 'runner': { + 'id': 'plugin:langbot/local-agent/default', + 'runner': 'dify-service-api', # This should be ignored + }, + }, + } + + runner_id = ConfigMigration.resolve_runner_id(pipeline_config) + assert runner_id == 'plugin:langbot/local-agent/default' + + +class TestResolveRunnerConfig: + """Tests for ConfigMigration.resolve_runner_config.""" + + def test_resolve_new_format_config(self): + """Resolve runner config from new format.""" + pipeline_config = { + 'ai': { + 'runner_config': { + 'plugin:langbot/local-agent/default': { + 'model': 'uuid-123', + 'max_round': 10, + }, + }, + }, + } + + config = ConfigMigration.resolve_runner_config( + pipeline_config, + 'plugin:langbot/local-agent/default', + ) + assert config == {'model': 'uuid-123', 'max_round': 10} + + def test_resolve_old_format_config(self): + """Resolve runner config from old format.""" + pipeline_config = { + 'ai': { + 'local-agent': { + 'model': 'uuid-123', + 'max_round': 10, + }, + }, + } + + config = ConfigMigration.resolve_runner_config( + pipeline_config, + 'plugin:langbot/local-agent/default', + ) + assert config == {'model': 'uuid-123', 'max_round': 10} + + def test_resolve_no_config(self): + """Resolve runner config when not found.""" + pipeline_config = {} + + config = ConfigMigration.resolve_runner_config( + pipeline_config, + 'plugin:langbot/local-agent/default', + ) + assert config == {} + + def test_resolve_priority_new_over_old(self): + """New format config takes priority.""" + pipeline_config = { + 'ai': { + 'runner_config': { + 'plugin:langbot/local-agent/default': { + 'model': 'new-uuid', + }, + }, + 'local-agent': { + 'model': 'old-uuid', + }, + }, + } + + config = ConfigMigration.resolve_runner_config( + pipeline_config, + 'plugin:langbot/local-agent/default', + ) + assert config == {'model': 'new-uuid'} + + +class TestGetExpireTime: + """Tests for ConfigMigration.get_expire_time.""" + + def test_get_expire_time_zero(self): + """Get expire time when zero.""" + pipeline_config = { + 'ai': { + 'runner': { + 'expire-time': 0, + }, + }, + } + + expire_time = ConfigMigration.get_expire_time(pipeline_config) + assert expire_time == 0 + + def test_get_expire_time_positive(self): + """Get expire time when positive.""" + pipeline_config = { + 'ai': { + 'runner': { + 'expire-time': 3600, + }, + }, + } + + expire_time = ConfigMigration.get_expire_time(pipeline_config) + assert expire_time == 3600 + + def test_get_expire_time_default(self): + """Get expire time when not configured.""" + pipeline_config = {} + + expire_time = ConfigMigration.get_expire_time(pipeline_config) + assert expire_time == 0 + + +class TestGetOldRunnerName: + """Tests for ConfigMigration.get_old_runner_name.""" + + def test_get_old_runner_name_mapped(self): + """Get old runner name for mapped runner ID.""" + old_name = ConfigMigration.get_old_runner_name('plugin:langbot/local-agent/default') + assert old_name == 'local-agent' + + def test_get_old_runner_name_not_mapped(self): + """Get old runner name for unmapped runner ID.""" + old_name = ConfigMigration.get_old_runner_name('plugin:alice/my-agent/custom') + assert old_name is None \ No newline at end of file diff --git a/tests/unit_tests/agent/test_id.py b/tests/unit_tests/agent/test_id.py new file mode 100644 index 00000000..55941c1d --- /dev/null +++ b/tests/unit_tests/agent/test_id.py @@ -0,0 +1,137 @@ +"""Tests for agent runner ID parsing and formatting.""" +from __future__ import annotations + +import pytest + +from langbot.pkg.agent.runner.id import ( + parse_runner_id, + format_runner_id, + RunnerIdParts, + is_plugin_runner_id, +) + + +class TestRunnerIdParsing: + """Tests for parse_runner_id.""" + + def test_parse_plugin_runner_id(self): + """Parse valid plugin runner ID.""" + runner_id = 'plugin:langbot/local-agent/default' + parts = parse_runner_id(runner_id) + + assert parts.source == 'plugin' + assert parts.plugin_author == 'langbot' + assert parts.plugin_name == 'local-agent' + assert parts.runner_name == 'default' + + def test_parse_plugin_runner_id_complex_names(self): + """Parse plugin runner ID with complex names.""" + runner_id = 'plugin:alice/helpdesk-agent/ticket-handler' + parts = parse_runner_id(runner_id) + + assert parts.source == 'plugin' + assert parts.plugin_author == 'alice' + assert parts.plugin_name == 'helpdesk-agent' + assert parts.runner_name == 'ticket-handler' + + def test_parse_invalid_plugin_runner_id_missing_parts(self): + """Parse invalid plugin runner ID with missing parts.""" + runner_id = 'plugin:langbot/local-agent' + + with pytest.raises(ValueError) as exc_info: + parse_runner_id(runner_id) + + assert 'Invalid plugin runner ID format' in str(exc_info.value) + + def test_parse_invalid_plugin_runner_id_empty_parts(self): + """Parse invalid plugin runner ID with empty parts.""" + runner_id = 'plugin://default' + + with pytest.raises(ValueError) as exc_info: + parse_runner_id(runner_id) + + assert 'non-empty' in str(exc_info.value) + + def test_parse_invalid_runner_id_not_plugin(self): + """Parse invalid runner ID without plugin prefix.""" + runner_id = 'local-agent' + + with pytest.raises(ValueError) as exc_info: + parse_runner_id(runner_id) + + assert 'Invalid runner ID format' in str(exc_info.value) + + def test_parse_invalid_runner_id_empty_string(self): + """Parse empty runner ID.""" + runner_id = '' + + with pytest.raises(ValueError): + parse_runner_id(runner_id) + + +class TestRunnerIdFormatting: + """Tests for format_runner_id.""" + + def test_format_plugin_runner_id(self): + """Format plugin runner ID.""" + runner_id = format_runner_id( + source='plugin', + plugin_author='langbot', + plugin_name='local-agent', + runner_name='default', + ) + + assert runner_id == 'plugin:langbot/local-agent/default' + + def test_format_invalid_source(self): + """Format runner ID with invalid source.""" + with pytest.raises(ValueError) as exc_info: + format_runner_id( + source='builtin', + plugin_author='langbot', + plugin_name='local-agent', + runner_name='default', + ) + + assert 'Invalid runner source' in str(exc_info.value) + + +class TestRunnerIdParts: + """Tests for RunnerIdParts dataclass.""" + + def test_get_plugin_id(self): + """Get plugin ID from parts.""" + parts = RunnerIdParts( + source='plugin', + plugin_author='langbot', + plugin_name='local-agent', + runner_name='default', + ) + + assert parts.to_plugin_id() == 'langbot/local-agent' + + def test_frozen_dataclass(self): + """RunnerIdParts should be immutable.""" + parts = RunnerIdParts( + source='plugin', + plugin_author='langbot', + plugin_name='local-agent', + runner_name='default', + ) + + with pytest.raises(Exception): # FrozenInstanceError + parts.plugin_author = 'other' + + +class TestIsPluginRunnerId: + """Tests for is_plugin_runner_id.""" + + def test_is_plugin_runner_id_true(self): + """Check plugin runner ID returns True.""" + assert is_plugin_runner_id('plugin:langbot/local-agent/default') is True + + def test_is_plugin_runner_id_false(self): + """Check non-plugin runner ID returns False.""" + assert is_plugin_runner_id('local-agent') is False + assert is_plugin_runner_id('builtin:local-agent') is False + assert is_plugin_runner_id('') is False \ No newline at end of file diff --git a/tests/unit_tests/agent/test_registry.py b/tests/unit_tests/agent/test_registry.py new file mode 100644 index 00000000..c610d329 --- /dev/null +++ b/tests/unit_tests/agent/test_registry.py @@ -0,0 +1,278 @@ +"""Tests for agent runner registry.""" +from __future__ import annotations + +import pytest + +from langbot.pkg.agent.runner.registry import AgentRunnerRegistry +from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor +from langbot.pkg.agent.runner.errors import RunnerNotFoundError, RunnerNotAuthorizedError + + +class FakeApplication: + """Fake Application for testing.""" + def __init__(self): + class FakeLogger: + def info(self, msg): + pass + def debug(self, msg): + pass + def warning(self, msg): + pass + def error(self, msg): + pass + + self.logger = FakeLogger() + + class FakePluginConnector: + is_enable_plugin = True + + async def list_agent_runners(self, bound_plugins=None): + # Return sample runner data + return [ + { + 'plugin_author': 'langbot', + 'plugin_name': 'local-agent', + 'runner_name': 'default', + 'manifest': { + 'kind': 'AgentRunner', + 'metadata': { + 'name': 'default', + 'label': {'en_US': 'Local Agent'}, + }, + 'spec': { + 'protocol_version': '1', + 'config': [], + 'capabilities': {'streaming': True}, + 'permissions': {}, + }, + }, + }, + { + 'plugin_author': 'alice', + 'plugin_name': 'my-agent', + 'runner_name': 'custom', + 'manifest': { + 'kind': 'AgentRunner', + 'metadata': { + 'name': 'custom', + 'label': {'en_US': 'Custom Agent'}, + }, + 'spec': { + 'protocol_version': '1', + 'config': [{'name': 'param1', 'type': 'string'}], + 'capabilities': {}, + 'permissions': {}, + }, + }, + }, + # Invalid runner - wrong kind + { + 'plugin_author': 'bad', + 'plugin_name': 'wrong-kind', + 'runner_name': 'default', + 'manifest': { + 'kind': 'Tool', # Wrong kind + 'metadata': {}, + 'spec': {}, + }, + }, + # Invalid runner - missing name + { + 'plugin_author': 'bad', + 'plugin_name': 'missing-name', + 'runner_name': 'default', + 'manifest': { + 'kind': 'AgentRunner', + 'metadata': {}, # No name + 'spec': {}, + }, + }, + ] + + self.plugin_connector = FakePluginConnector() + + +class TestRegistryDiscovery: + """Tests for runner discovery.""" + + @pytest.mark.asyncio + async def test_discover_valid_runners(self): + """Discover valid runners from plugin runtime.""" + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + runners = await registry.list_runners(use_cache=False) + + # Should find 2 valid runners (langbot/local-agent and alice/my-agent) + assert len(runners) == 2 + + ids = [r.id for r in runners] + assert 'plugin:langbot/local-agent/default' in ids + assert 'plugin:alice/my-agent/custom' in ids + + @pytest.mark.asyncio + async def test_discover_caches_results(self): + """Discovery should cache results.""" + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + # First discovery + runners1 = await registry.list_runners(use_cache=True) + + # Second call should use cache + runners2 = await registry.list_runners(use_cache=True) + + assert registry._cache is not None + assert len(runners1) == len(runners2) + + @pytest.mark.asyncio + async def test_discover_handles_plugin_disabled(self): + """Discovery returns empty when plugin system disabled.""" + ap = FakeApplication() + ap.plugin_connector.is_enable_plugin = False + registry = AgentRunnerRegistry(ap) + + runners = await registry.list_runners(use_cache=False) + + assert runners == [] + + @pytest.mark.asyncio + async def test_cache_not_polluted_by_bound_plugins(self): + """Cache should contain ALL runners, not filtered by bound_plugins. + + Regression test: get(bound_plugins=["a/b"]) should not pollute cache, + so subsequent list_runners(bound_plugins=None) should return all runners. + """ + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + # First: get with bound_plugins filter (should not pollute cache) + descriptor = await registry.get( + 'plugin:langbot/local-agent/default', + bound_plugins=['langbot/local-agent'], + ) + assert descriptor.id == 'plugin:langbot/local-agent/default' + + # Cache should contain ALL runners (both langbot and alice) + assert registry._cache is not None + assert len(registry._cache) == 2 # Both runners in cache + assert 'plugin:langbot/local-agent/default' in registry._cache + assert 'plugin:alice/my-agent/custom' in registry._cache + + # Second: list_runners without filter should return ALL runners + all_runners = await registry.list_runners(bound_plugins=None, use_cache=True) + assert len(all_runners) == 2 # Both runners returned + + # Third: list_runners with different filter should work correctly + alice_runners = await registry.list_runners(bound_plugins=['alice/my-agent'], use_cache=True) + assert len(alice_runners) == 1 + assert alice_runners[0].id == 'plugin:alice/my-agent/custom' + + +class TestRegistryGet: + """Tests for getting specific runner.""" + + @pytest.mark.asyncio + async def test_get_existing_runner(self): + """Get existing runner by ID.""" + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + descriptor = await registry.get('plugin:langbot/local-agent/default') + + assert descriptor.id == 'plugin:langbot/local-agent/default' + assert descriptor.plugin_author == 'langbot' + assert descriptor.plugin_name == 'local-agent' + assert descriptor.runner_name == 'default' + + @pytest.mark.asyncio + async def test_get_nonexistent_runner(self): + """Get nonexistent runner raises RunnerNotFoundError.""" + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + with pytest.raises(RunnerNotFoundError) as exc_info: + await registry.get('plugin:notexist/unknown/default') + + assert exc_info.value.runner_id == 'plugin:notexist/unknown/default' + + @pytest.mark.asyncio + async def test_get_runner_with_bound_plugins_filter(self): + """Get runner with bound plugins authorization.""" + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + # Authorized - langbot plugin in bound list + descriptor = await registry.get( + 'plugin:langbot/local-agent/default', + bound_plugins=['langbot/local-agent'], + ) + assert descriptor is not None + + # Not authorized - plugin not in bound list + with pytest.raises(RunnerNotAuthorizedError): + await registry.get( + 'plugin:alice/my-agent/custom', + bound_plugins=['langbot/local-agent'], + ) + + +class TestRegistryMetadataForPipeline: + """Tests for get_runner_metadata_for_pipeline.""" + + @pytest.mark.asyncio + async def test_get_metadata_options_and_stages(self): + """Get metadata options and stages for pipeline UI.""" + ap = FakeApplication() + registry = AgentRunnerRegistry(ap) + + options, stages = await registry.get_runner_metadata_for_pipeline() + + # Should have options for each runner + assert len(options) == 2 + option_ids = [o['name'] for o in options] + assert 'plugin:langbot/local-agent/default' in option_ids + assert 'plugin:alice/my-agent/custom' in option_ids + + # Should have stages for runners with config + # Note: stages may be empty if config_schema is empty list + # In real scenarios, runners with config_schema will generate stages + # Only runners with non-empty config_schema generate stages + # mock data has config: [{'name': 'param1', 'type': 'string'}] for alice/my-agent + # but config is now taken from runner_data.get('config', []) + assert len(stages) >= 0 # Can be 0 if all runners have empty config + + +class TestDescriptorValidation: + """Tests for descriptor validation.""" + + def test_validate_runner_descriptor(self): + """Validate correctly built descriptor.""" + descriptor = AgentRunnerDescriptor( + id='plugin:test/my-runner/default', + source='plugin', + label={'en_US': 'Test Runner'}, + plugin_author='test', + plugin_name='my-runner', + runner_name='default', + ) + + assert descriptor.id == 'plugin:test/my-runner/default' + assert descriptor.get_plugin_id() == 'test/my-runner' + assert descriptor.protocol_version == '1' + + def test_descriptor_capabilities(self): + """Descriptor capability helper methods.""" + descriptor = AgentRunnerDescriptor( + id='plugin:test/my-runner/default', + source='plugin', + label={'en_US': 'Test Runner'}, + plugin_author='test', + plugin_name='my-runner', + runner_name='default', + capabilities={'streaming': True, 'tool_calling': False}, + ) + + assert descriptor.supports_streaming() is True + assert descriptor.supports_tool_calling() is False + assert descriptor.supports_knowledge_retrieval() is False \ No newline at end of file diff --git a/tests/unit_tests/agent/test_result_normalizer.py b/tests/unit_tests/agent/test_result_normalizer.py new file mode 100644 index 00000000..2ec86580 --- /dev/null +++ b/tests/unit_tests/agent/test_result_normalizer.py @@ -0,0 +1,343 @@ +"""Tests for agent runner result normalizer.""" +from __future__ import annotations + +import pytest + +from langbot.pkg.agent.runner.result_normalizer import AgentResultNormalizer +from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor +from langbot.pkg.agent.runner.errors import RunnerExecutionError, RunnerProtocolError + +from langbot_plugin.api.entities.builtin.provider import message as provider_message + + +class FakeApplication: + """Fake Application for testing.""" + def __init__(self): + class FakeLogger: + def info(self, msg): + pass + def debug(self, msg): + pass + def warning(self, msg): + pass + def error(self, msg): + pass + + self.logger = FakeLogger() + + +def make_descriptor(): + """Create a test descriptor.""" + return AgentRunnerDescriptor( + id='plugin:langbot/local-agent/default', + source='plugin', + label={'en_US': 'Local Agent', 'zh_Hans': '内置 Agent'}, + plugin_author='langbot', + plugin_name='local-agent', + runner_name='default', + protocol_version='1', + capabilities={'streaming': True}, + ) + + +class TestNormalizeMessageDelta: + """Tests for normalizing message.delta results.""" + + @pytest.mark.asyncio + async def test_normalize_message_delta_text(self): + """Normalize message.delta with text chunk.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'message.delta', + 'data': { + 'chunk': { + 'role': 'assistant', + 'content': 'Hello', + }, + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + + assert result is not None + assert isinstance(result, provider_message.MessageChunk) + assert result.role == 'assistant' + assert result.content == 'Hello' + + @pytest.mark.asyncio + async def test_normalize_message_delta_missing_chunk(self): + """Normalize message.delta without chunk data.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'message.delta', + 'data': {}, + } + + with pytest.raises(RunnerProtocolError) as exc_info: + await normalizer.normalize(result_dict, descriptor) + + assert 'missing chunk data' in str(exc_info.value) + + +class TestNormalizeMessageCompleted: + """Tests for normalizing message.completed results.""" + + @pytest.mark.asyncio + async def test_normalize_message_completed(self): + """Normalize message.completed with full message.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'message.completed', + 'data': { + 'message': { + 'role': 'assistant', + 'content': 'Complete response', + }, + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + + assert result is not None + assert isinstance(result, provider_message.Message) + assert result.role == 'assistant' + assert result.content == 'Complete response' + + @pytest.mark.asyncio + async def test_normalize_message_completed_missing_message(self): + """Normalize message.completed without message data.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'message.completed', + 'data': {}, + } + + with pytest.raises(RunnerProtocolError) as exc_info: + await normalizer.normalize(result_dict, descriptor) + + assert 'missing message data' in str(exc_info.value) + + +class TestNormalizeRunCompleted: + """Tests for normalizing run.completed results.""" + + @pytest.mark.asyncio + async def test_normalize_run_completed_with_message(self): + """Normalize run.completed with final message.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'run.completed', + 'data': { + 'message': { + 'role': 'assistant', + 'content': 'Final response', + }, + 'finish_reason': 'stop', + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + + assert result is not None + assert isinstance(result, provider_message.Message) + + @pytest.mark.asyncio + async def test_normalize_run_completed_without_message(self): + """Normalize run.completed without message.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'run.completed', + 'data': { + 'finish_reason': 'stop', + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + + assert result is None + + +class TestNormalizeRunFailed: + """Tests for normalizing run.failed results.""" + + @pytest.mark.asyncio + async def test_normalize_run_failed(self): + """Normalize run.failed raises RunnerExecutionError.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'run.failed', + 'data': { + 'error': 'Upstream timeout', + 'code': 'upstream.timeout', + 'retryable': True, + }, + } + + with pytest.raises(RunnerExecutionError) as exc_info: + await normalizer.normalize(result_dict, descriptor) + + assert exc_info.value.runner_id == 'plugin:langbot/local-agent/default' + assert exc_info.value.retryable is True + assert 'timeout' in str(exc_info.value) + + +class TestNormalizeNonMessageResults: + """Tests for normalizing non-message results.""" + + @pytest.mark.asyncio + async def test_normalize_tool_call_started(self): + """Normalize tool.call.started returns None.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'tool.call.started', + 'data': { + 'tool_call_id': 'call_1', + 'tool_name': 'weather', + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + @pytest.mark.asyncio + async def test_normalize_tool_call_completed(self): + """Normalize tool.call.completed returns None.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'tool.call.completed', + 'data': { + 'tool_call_id': 'call_1', + 'tool_name': 'weather', + 'result': {'temp': 20}, + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + @pytest.mark.asyncio + async def test_normalize_state_updated(self): + """Normalize state.updated returns None.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'state.updated', + 'data': { + 'key': 'external_conversation_id', + 'value': 'abc123', + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + @pytest.mark.asyncio + async def test_normalize_action_requested(self): + """Normalize action.requested returns None (EBA reserved).""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'action.requested', + 'data': { + 'action': 'platform.message.edit', + 'parameters': {}, + }, + } + + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + +class TestNormalizeInvalidResults: + """Tests for handling invalid results.""" + + @pytest.mark.asyncio + async def test_normalize_missing_type(self): + """Normalize result without type.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'data': {}, + } + + with pytest.raises(RunnerProtocolError) as exc_info: + await normalizer.normalize(result_dict, descriptor) + + assert 'Missing result type' in str(exc_info.value) + + @pytest.mark.asyncio + async def test_normalize_unknown_type(self): + """Normalize unknown type returns None.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + result_dict = { + 'type': 'unknown_type', + 'data': {}, + } + + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + @pytest.mark.asyncio + async def test_normalize_legacy_type_returns_none(self): + """Legacy types (chunk, text, finish) are now treated as unknown.""" + normalizer = AgentResultNormalizer(FakeApplication()) + descriptor = make_descriptor() + + # chunk is now unknown + result_dict = { + 'type': 'chunk', + 'data': { + 'message_chunk': { + 'role': 'assistant', + 'content': 'Legacy chunk', + }, + }, + } + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + # text is now unknown + result_dict = { + 'type': 'text', + 'data': { + 'content': 'Legacy text', + }, + } + result = await normalizer.normalize(result_dict, descriptor) + assert result is None + + # finish is now unknown + result_dict = { + 'type': 'finish', + 'data': { + 'message': { + 'role': 'assistant', + 'content': 'Legacy finish', + }, + }, + } + result = await normalizer.normalize(result_dict, descriptor) + assert result is None \ No newline at end of file diff --git a/uv.lock b/uv.lock index fc56bbbc..d391baac 100644 --- a/uv.lock +++ b/uv.lock @@ -1973,7 +1973,7 @@ requires-dist = [ { name = "ebooklib", specifier = ">=0.18" }, { name = "gewechat-client", specifier = ">=0.1.5" }, { name = "html2text", specifier = ">=2024.2.26" }, - { name = "langbot-plugin", specifier = "==0.3.11" }, + { name = "langbot-plugin", editable = "../langbot-plugin-sdk" }, { name = "langchain", specifier = ">=0.2.0" }, { name = "langchain-core", specifier = ">=1.2.28" }, { name = "langchain-text-splitters", specifier = ">=1.1.2" }, @@ -2036,8 +2036,8 @@ dev = [ [[package]] name = "langbot-plugin" -version = "0.3.11" -source = { registry = "https://pypi.org/simple" } +version = "0.3.10" +source = { editable = "../langbot-plugin-sdk" } dependencies = [ { name = "aiofiles" }, { name = "dotenv" }, @@ -2054,9 +2054,29 @@ dependencies = [ { name = "watchdog" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/91/83/93b86bcdbfe51d820fa59232aaa73cc802d6ce614f67d8f8b33957419538/langbot_plugin-0.3.11.tar.gz", hash = "sha256:8d10c98c771b468b2d35cc007778439c39922a88265fcc16a5881234bc7c1b19", size = 190315, upload-time = "2026-05-12T15:45:24.262Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/8f/22/de7977a6a5cbf557b80043eb3ed39e5feff24033a5d6db4ab88d48ccb6ea/langbot_plugin-0.3.11-py3-none-any.whl", hash = "sha256:c1d2e84eda1584902d99efa316b850c08c1c04fcc199306ff4af1dca1431304a", size = 165574, upload-time = "2026-05-12T15:45:22.908Z" }, + +[package.metadata] +requires-dist = [ + { name = "aiofiles", specifier = ">=24.1.0" }, + { name = "dotenv", specifier = ">=0.9.9" }, + { name = "httpx", specifier = ">=0.28.1" }, + { name = "jinja2", specifier = ">=3.1.6" }, + { name = "pip", specifier = ">=25.2" }, + { name = "pydantic", specifier = ">=2.11.5" }, + { name = "pydantic-settings", specifier = ">=2.10.1" }, + { name = "pytest", specifier = ">=8.4.0" }, + { name = "pyyaml", specifier = ">=6.0.2" }, + { name = "textual", specifier = ">=3.2.0" }, + { name = "types-aiofiles", specifier = ">=24.1.0.20250516" }, + { name = "types-pyyaml", specifier = ">=6.0.12.20250516" }, + { name = "watchdog", specifier = ">=6.0.0" }, + { name = "websockets", specifier = ">=15.0.1" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "mypy", specifier = ">=1.16.0" }, + { name = "ruff", specifier = ">=0.11.12" }, ] [[package]]