diff --git a/docs/agent-runner-pluginization/EVENT_BASED_AGENT.md b/docs/agent-runner-pluginization/EVENT_BASED_AGENT.md index d86683b9..31b1945b 100644 --- a/docs/agent-runner-pluginization/EVENT_BASED_AGENT.md +++ b/docs/agent-runner-pluginization/EVENT_BASED_AGENT.md @@ -41,9 +41,13 @@ ## 4. Event Envelope 与 Binding - 入口事件用 `AgentEventEnvelope`(HOST_SDK §4.1)承载;顶层字段使用 LangBot 稳定协议名,平台原始事件名和原始 payload 放 `metadata` / `raw_ref`。 -- 触发关系用 `AgentBinding`(HOST_SDK §4.2)表达。EBA 阶段 binding 通过 `event_types`、`scope`、`filters` 决定哪些事件触发哪个 runner。 +- 触发关系用 `AgentBinding`(HOST_SDK §4.2)表达。EBA 阶段 binding 通过 `event_types`、`scope`、`filters` 决定哪些事件触发当前 bot / channel 绑定的 Agent。 -Binding scope 示例:workspace 全局、bot 级、platform channel 级、conversation / group / thread 级、user / actor 级。旧 Pipeline 可迁移为 `message.received` 的 binding source,但不是唯一 binding source。 +目标产品语义:一个 bot / IM channel 在同一时间只绑定一个负责 agentic +处理的 Agent;一个 Agent 可以被多个 bot / channel 复用。因此 EBA 主线按 +single-agent dispatch 设计,不做默认 fan-out。 + +Binding scope 示例:workspace 全局、bot 级、platform channel 级、conversation / group / thread 级、user / actor 级。旧 Pipeline 可迁移为 `message.received` 的临时 binding source,但目标持久配置应是 Agent,不是 Pipeline。 Event Source 可包括:`platform_adapter`(飞书、QQ、微信、Telegram 等)、`webui`、`http_api`、`scheduler`、`system`。EventRouter 不应写死平台 adapter 的类名。 @@ -53,7 +57,7 @@ Event Source 可包括:`platform_adapter`(飞书、QQ、微信、Telegram Platform Adapter / WebUI / API -> Event Gateway normalize payload -> EventLog append raw event - -> EventRouter resolve bindings + -> EventRouter resolve one effective AgentBinding -> AgentRunOrchestrator.run(event, binding) -> AgentRunContextBuilder.build(event, binding) -> PluginRuntimeConnector.run_agent() @@ -63,6 +67,10 @@ Platform Adapter / WebUI / API 约束:必须复用现有 orchestrator,不能为 EBA 单独实现另一套 plugin runner 调用协议;非消息事件不能绕过 resource authorization;delivery 和 platform action 走统一权限模型;外部 harness runner 也通过同一套 envelope/binding/context/result 协议接入,不为 Claude Code / Codex / Kimi 单独发明队列协议。 +若未来产品需要 observer agent、多个 agent 并行处理同一事件、或多 runner +裁决,应另行设计 fan-out 合并、delivery 冲突、state 写入冲突、platform +action 审批和 audit 语义。当前 EBA 预留不隐含这些能力。 + ## 6. 平台动作执行 EBA 后 `action.requested`(PROTOCOL_V1 §7.2,当前仅 telemetry 不执行)将用于请求 host 执行平台动作: diff --git a/docs/agent-runner-pluginization/HOST_SDK_INFRASTRUCTURE.md b/docs/agent-runner-pluginization/HOST_SDK_INFRASTRUCTURE.md index bd90c914..5150e3d6 100644 --- a/docs/agent-runner-pluginization/HOST_SDK_INFRASTRUCTURE.md +++ b/docs/agent-runner-pluginization/HOST_SDK_INFRASTRUCTURE.md @@ -11,7 +11,7 @@ LangBot 要转为 agent host,而不是内置 runner 容器: - 接收 IM、WebUI、API 和未来 EventRouter 产生的事件。 -- 根据事件、bot、workspace、scope 解析应该调用的 agent binding。 +- 根据事件、bot、workspace、scope 解析应该调用的 Agent / agent binding。 - 发现、校验和调用插件提供的 AgentRunner。 - 为每次 run 提供受限资源、状态、存储、上下文引用和生命周期控制。 - 接收 AgentRunner 返回的事件流,投递到 IM、WebUI 或其他 output surface。 @@ -53,7 +53,12 @@ AgentRunResult stream Delivery / Renderer / Platform API ``` -当前 Pipeline 只应接入在 Query entry adapter 位置:它可以继续产生 `message.received`,但不应再拥有 runner 选择、上下文裁剪和业务 agent 执行的核心语义。EventGateway 由外部 event branch 实现。 +目标产品模型中,Agent 替代 Pipeline 承载 agent 配置:bot / IM +channel 绑定一个 Agent,一个 Agent 可以被多个 bot / channel 复用。 +当前 Pipeline 只应接入在 Query entry adapter 位置:它可以继续产生 +`message.received` 并投影出临时 `AgentBinding`,但不应再拥有 runner +选择、上下文裁剪和业务 agent 执行的核心语义。EventGateway 由外部 event +branch 实现。 ## 4. LangBot 侧能力 @@ -87,7 +92,11 @@ class AgentEventEnvelope(BaseModel): ### 4.2 AgentBinding -`AgentBinding` 是"什么事件调用哪个 runner、带什么绑定配置"的持久配置,是 Host 内部模型(不暴露给 SDK),替代长期依赖 Pipeline runner config 的角色。 +`AgentBinding` 是"什么事件调用哪个 AgentRunner、带什么 Agent 配置"的 +Host 内部运行投影(不暴露给 SDK)。产品层的持久对象应是 Agent: +Agent 携带 runner id、runner config、resource/state/delivery policy,并可被 +多个 bot / channel 复用。`AgentBinding` 是 EventRouter / 当前 +QueryEntryAdapter 在一次运行前解析出的有效绑定。 ```python class AgentBinding(BaseModel): @@ -103,16 +112,25 @@ class AgentBinding(BaseModel): delivery_policy: DeliveryPolicy ``` -**当前 adapter source**:`QueryEntryAdapter.config_to_binding(query, runner_id)` 从 current config 生成临时 `AgentBinding`。Pipeline 当前作为一种 binding source(AI runner config → binding、extension preference → resource_policy、output settings → delivery_policy),但新设计不再把这些字段命名为 Pipeline 专属概念。 +一个 bot / IM channel 在同一时间只应解析出一个负责 agentic 处理的 +AgentBinding。若未来需要 observer / fan-out / 多 agent 裁决,必须另行定义 +delivery、state、platform action 和 result 合并语义;当前 v1/EBA 主线不隐式支持。 + +**当前 adapter source**:`QueryEntryAdapter.config_to_agent_config(query, runner_id)` +先把 current config 投影为迁移期 `AgentConfig`,再由 +`AgentBindingResolver.resolve_one(event, [agent_config])` 解析出唯一 +`AgentBinding`。Pipeline 当前只是迁移期 Agent config source(AI runner config +→ runner_config、extension preference → resource_policy、output settings → +delivery_policy),但新设计不再把这些字段命名为 Pipeline 专属概念。 ### 4.3 AgentRunnerRegistry -Registry 收集 runner descriptor(来自插件 runtime、可能的 host adapter runner、开发期本地插件): +Registry 收集 runner descriptor(来自插件 runtime、开发期本地插件): ```python class AgentRunnerDescriptor(BaseModel): id: str - source: Literal["plugin", "host_adapter"] + source: Literal["plugin"] label: I18nObject description: I18nObject | None = None protocol_version: str = "1" @@ -124,6 +142,10 @@ class AgentRunnerDescriptor(BaseModel): 职责:调用 `plugin_connector.list_agent_runners()` 拉取 runner、校验 manifest(`kind == AgentRunner`、`metadata.name/label` 存在、`protocol_version` 兼容、`spec.*` 类型正确)、输出 descriptor、缓存 discovery 结果并提供 `refresh()`。单个插件 manifest 失败只记 warning,不影响其它 runner。`plugin:author/name/runner` 是稳定 id 格式;多个 binding 指向同一 runner id 时**不创建多个插件实例**。 +Host 内置 runner / adapter 不能作为 `AgentRunnerDescriptor.source` 绕过插件 +runtime、`run_id`、`ctx.resources` 和 `AgentRunAPIProxy` 权限链。若需要 +开发期调试 adapter,应放在 Host 内部测试入口,不进入可选 runner 列表。 + 刷新触发点:插件安装/卸载/升级/重启后;Pipeline metadata 请求时发现缓存为空;可选 TTL(优先保证正确性)。 ### 4.4 AgentRunOrchestrator @@ -154,7 +176,14 @@ LangBot 在每次 run 前生成 `ctx.resources`(PROTOCOL_V1 §6),来自三 2. binding / resource policy 允许的资源范围。 3. 当前 event / actor / bot / workspace 的实际权限。 -运行期每个 proxy action 必须再次通过 `run_id` 校验。SDK 侧本地校验只用于开发体验,host 侧校验才是安全边界。 +这次裁剪结果必须冻结为 run-scoped authorization snapshot,并由 +`AgentRunSessionRegistry` 按 `run_id` 保存。`ctx.resources` 是投影给 runner +看的同一份授权结果;运行期每个 proxy action 只依据该 snapshot 校验 active +run session、caller plugin identity、resource id、scope、payload size、rate +limit 和 deadline。Handler 不应重新执行三层裁剪,否则 build-time 与 runtime +授权逻辑会漂移。 + +SDK 侧本地校验只用于开发体验,host 侧 run authorization snapshot 才是安全边界。 资源裁剪应通用,不写死 local-agent。selector 与资源的映射示例:`model-fallback-selector` → primary/fallback LLM、`llm-model-selector` → LLM、`rerank-model-selector` → rerank 模型、`knowledge-base-multi-selector` → 知识库;新增 selector 时在 resource builder 中统一扩展。 diff --git a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md index 9569c6ba..ec437ce8 100644 --- a/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md +++ b/docs/agent-runner-pluginization/OFFICIAL_RUNNER_PLUGINS.md @@ -111,7 +111,11 @@ Claude Code、Codex、Kimi Code 这类 runner 不一定通过 LangBot 的模型/ ## 7. Claude Code / Codex runner 当前形态 -`claude-code-agent` 与 `codex-agent` 是最小可运行 MVP,用来证明外部 harness runner 可以接入同一套 AgentRunner 协议。本地 smoke 验收记录见 [PROGRESS.md](./PROGRESS.md) 与 [PHASE1_QA_ACCEPTANCE_MATRIX.md](./PHASE1_QA_ACCEPTANCE_MATRIX.md)。 +`claude-code-agent` 与 `codex-agent` 是最小可运行 MVP / dev path,用来证明外部 harness runner 可以接入同一套 AgentRunner 协议。本地 smoke 验收记录见 [PROGRESS.md](./PROGRESS.md) 与 [PHASE1_QA_ACCEPTANCE_MATRIX.md](./PHASE1_QA_ACCEPTANCE_MATRIX.md)。 + +MVP 含义:已验证 event-first context、resource projection、result stream 和 +基础 resume state 可以跑通;不表示 Docker 生产部署、发布级执行隔离、 +workspace lifecycle、secret projection、团队级 audit 或 runtime sidecar 已完成。 ### 7.1 Claude Code runner @@ -127,7 +131,7 @@ Claude Code、Codex、Kimi Code 这类 runner 不一定通过 LangBot 的模型/ ### 7.3 当前限制 -不是发布级安全边界实现;默认只做本地 CLI 调用,不实现完整执行隔离或 workspace 生命周期;不实现 issue-centric 队列、复杂 workflow engine 或长期任务调度;Codex 仅验证协议形态,不代表 Codex 发布级能力或 Kimi runner 已完成。runtime 管控面方向见 [RUNTIME_CONTROL_PLANE_V2.md](./RUNTIME_CONTROL_PLANE_V2.md)。 +不是发布级安全边界实现;默认只做本地 CLI 调用,不实现完整执行隔离或 workspace 生命周期;不实现 issue-centric 队列、复杂 workflow engine 或长期任务调度;Docker 环境只能访问容器内 CLI 和凭据;Codex 仅验证协议形态,不代表 Codex 发布级能力或 Kimi runner 已完成。runtime 管控面方向见 [RUNTIME_CONTROL_PLANE_V2.md](./RUNTIME_CONTROL_PLANE_V2.md)。 ## 8. 发布和安装策略 diff --git a/docs/agent-runner-pluginization/PROTOCOL_V1.md b/docs/agent-runner-pluginization/PROTOCOL_V1.md index c2cb004c..a6f7a2eb 100644 --- a/docs/agent-runner-pluginization/PROTOCOL_V1.md +++ b/docs/agent-runner-pluginization/PROTOCOL_V1.md @@ -33,7 +33,11 @@ Protocol v1 **不定义**: | AgentRunAPIProxy | AgentRunner 访问 Host 能力的受限 API。 | | AgentBinding | Host 内部的事件到 runner 绑定配置,不直接暴露给 SDK(见 HOST_SDK §4.2)。 | -`AgentBinding` 只影响 Host 构造出的 `ctx.config`、`ctx.resources`、`ctx.context` 和 `ctx.delivery`。SDK 不需要知道 binding 的持久化形态。 +产品层的 `Agent` 替代旧 Pipeline 承载 agent 配置:bot / IM channel +绑定一个 Agent,一个 Agent 可以被多个 bot / channel 复用。Host 内部的 +`AgentBinding` 是一次事件运行前解析出的有效绑定,只影响 Host 构造出的 +`ctx.config`、`ctx.resources`、`ctx.context` 和 `ctx.delivery`。SDK 不需要知道 +Agent / binding 的持久化形态。 外部 harness runner(Claude Code、Codex、Kimi Code 等)也是 `AgentRunner`:它们消费 event-first `AgentRunContext`、返回 `AgentRunResult`,并通过 Host 授权的 state/storage/artifact API 保存跨轮次指针。它们内部可以继续使用自己的 session、tool loop、MCP、上下文压缩和权限模型。 @@ -492,7 +496,9 @@ Host 不负责业务编排:不拼接全量历史、不替 runner 做 prompt as ## 12. Pipeline Adapter 边界 -Pipeline 是当前入口 adapter,不是协议中心。Query entry adapter 负责: +Pipeline 是当前入口 adapter,不是协议中心。目标产品模型中 Agent 会替代 +Pipeline 承载 runner config、resource policy 和 delivery policy;当前 Query +entry adapter 只是迁移桥。它负责: - 从 `Query` 构造 `AgentEventContext` 和临时 `AgentBinding`(见 HOST_SDK §4.2)。 - 从当前 Agent/runner config 构造 `ctx.config`。 @@ -501,15 +507,25 @@ Pipeline 是当前入口 adapter,不是协议中心。Query entry adapter 负 约束: - adapter **不**定义历史窗口、prompt 组装或 agentic context 策略。 -- preprocessing / hook 后的有效指令不通过 `ctx.adapter.extra` 主动推送;后续应通过 Host prompt/instruction pull API 暴露(占位见 HOST_SDK §4.8)。 +- `ctx.adapter.extra` 只允许承载一次性、JSON-safe、入口相关的非核心元数据,例如 `params`;不得承载 `prompt`、history window、RAG 结果、tool schema 或授权资源。 +- 静态绑定 prompt 属于 `ctx.config.prompt`。preprocessing / hook 后的动态有效指令不通过 `ctx.adapter.extra` 主动推送;后续如需要保留这类能力,应通过 Host prompt/instruction pull API 暴露(占位见 HOST_SDK §4.8)。 - 新 runner 不应长期依赖 `adapter`,应只依赖 event-first context 和 Host API。 -## 13. 开放问题 +## 13. 已确认约束 + +- v1 / EBA 主线是 `one event -> one AgentBinding -> one run_id -> one runner`。 +- 一个 bot / IM channel 在同一时间只绑定一个负责 agentic 处理的 Agent;一个 Agent 可以被多个 bot / channel 复用。 +- 如果配置层出现多个匹配 AgentBinding,BindingResolver 必须按明确规则选出一个或拒绝配置,不应默认 fan-out。 +- observer agent、多 runner fan-out、并行裁决、result 合并等能力需要单独设计 delivery、state、platform action 和 audit 语义,不属于当前 v1 契约。 +- `AgentRunnerDescriptor.source` 只允许 `plugin`;Host 内置 adapter 不能作为 runner source 绕过插件/runtime/proxy 权限链。 +- `ctx.resources` 与 proxy action 校验必须来自同一个 run authorization snapshot;runtime handler 不应重新执行资源裁剪。 +- 外部 harness runner 当前是 MVP / dev path,证明协议可接入,不代表发布级安全边界或 Docker 生产可用性完成。 + +## 14. 开放问题 - `AgentBinding` 是否需要进入 SDK 文档作为只读诊断信息,还是完全 Host 内部。 - `TranscriptItem` 的最小字段集如何定义。 - ArtifactStore 是否复用现有 BinaryStorage backend,还是引入独立实体。 - State 与 Storage 的边界是否需要更强类型。 - `platform_api` action 的审批模型如何表达。 -- 多 runner 并发处理同一 event 时,result delivery 的冲突策略如何定义。 - Host 侧 scoped MCP / skill / workspace projection 是否需要从 runner config 上移为一等 resource projection API。 diff --git a/docs/agent-runner-pluginization/README.md b/docs/agent-runner-pluginization/README.md index 1bafa49a..7f46e27f 100644 --- a/docs/agent-runner-pluginization/README.md +++ b/docs/agent-runner-pluginization/README.md @@ -13,7 +13,8 @@ **本分支目标:AgentRunner 外化 / 插件化基础设施** -本分支只做 LangBot 作为 Agent Host 的基础能力建设: +本分支只做 LangBot 作为 Agent Host 的基础能力建设,为后续用 `Agent` +替代 Pipeline 承载 agent 配置打底: - LangBot 与 SDK 的稳定协议合同(Protocol v1) - Host-side `AgentEventEnvelope` / `AgentBinding` 模型 @@ -35,6 +36,22 @@ EventGateway 在本文档中描述为 **future integration point**,由外部 event branch 提供。本分支只定义 host-side envelope/binding models 和 `run(event, binding)` orchestrator 入口。 +## 目标产品模型 + +未来产品层应把 `Agent` 理解为 Pipeline 的替代物:原先 bot 绑定 +Pipeline,Pipeline 携带 agent/provider/RAG/tool 等配置;后续应改为 bot 或 +IM channel 绑定一个 Agent,Agent 携带 runner id、runner config、 +resource/state/delivery policy 等 agent 配置。 + +约束: + +- 一个 bot / IM channel 在同一时间只绑定一个负责 agentic 处理的 Agent。 +- 一个 Agent 可以被多个 bot / channel 复用,类似旧 Pipeline 可被多个 bot 共享。 +- Agent 配置是运行绑定配置,不是插件实例状态;多个 Agent 指向同一 + AgentRunner 时不创建多个插件实例。 +- 当前 Pipeline path 只是迁移期入口 adapter:它把旧 Pipeline 配置投影为临时 + `AgentBinding`,不代表目标架构仍由 Pipeline 承载 agent 语义。 + ## 当前状态 **当前 Pipeline 是入口 adapter,不再是 agent runner 设计核心。** @@ -65,7 +82,7 @@ EventGateway 在本文档中描述为 **future integration point**,由外部 e - LangBot 与 SDK 的稳定协议合同 - runner manifest / descriptor / registry -- agent binding 与配置解析 +- Agent / binding 配置解析 - run orchestration 和生命周期管理 - resource authorization 与 `run_id` 级权限校验 - host-owned state / storage / event log / transcript / artifact 能力 @@ -87,6 +104,10 @@ Host 不定义通用历史窗口字段或策略;runner 通过 Host pull API 消息只是事件的一种。后续 `message.received`、`message.recalled`、`group.member_joined`、`friend.request_received` 等事件都应能通过统一事件 envelope 触发 AgentRunner。 +EBA 主线按单 Agent 调度设计:EventRouter 对一个 bot / channel / scope +解析出一个有效 AgentBinding,再调用一次 `AgentRunOrchestrator.run(event, +binding)`。多 agent fan-out、observer agent 或并行裁决不属于当前目标语义。 + **本分支不实现 EBA 完整能力,只预留:** - event-first envelope (`AgentEventEnvelope`) - AgentBinding model @@ -116,9 +137,11 @@ Host 不定义通用历史窗口字段或策略;runner 通过 Host pull API - 一个插件可以声明多个 `AgentRunner` 组件,每个组件独立暴露 manifest、配置 schema、能力和权限。 - 插件本身按单实例、无状态执行单元理解;不同绑定不创建多个插件实例。 -- 绑定只保存 runner id 和绑定配置,不代表插件实例状态。 +- Agent / binding 只保存 runner id 和绑定配置,不代表插件实例状态。 +- bot / IM channel 绑定一个 Agent;Agent 可被多个 bot / channel 复用。 - LangBot 可以提供 host-owned state / storage 能力,让 runner 把状态寄宿在 LangBot;但这应该是授权能力,不是强制要求。 - 官方 runner 插件是协议消费者,不是协议设计的优先约束。 - Pipeline 是当前入口 adapter,不是未来架构中心。 +- Event dispatch 主线是 one event -> one AgentBinding -> one run_id -> one runner。 - EventGateway 是 future integration point,由外部 event branch 提供。 - Runtime control plane 是 v2 Host capability layer,不阻塞当前 AgentRunner v1 主线;agent 管控面插件应构建在该 Host 能力层之上。 diff --git a/src/langbot/pkg/agent/runner/__init__.py b/src/langbot/pkg/agent/runner/__init__.py index 986320c9..c9937f16 100644 --- a/src/langbot/pkg/agent/runner/__init__.py +++ b/src/langbot/pkg/agent/runner/__init__.py @@ -16,7 +16,13 @@ from .resource_builder import AgentResourceBuilder from .result_normalizer import AgentResultNormalizer from .orchestrator import AgentRunOrchestrator from .config_migration import ConfigMigration -from .session_registry import AgentRunSessionRegistry, AgentRunSession, get_session_registry +from .binding_resolver import AgentBindingResolver, AgentBindingResolutionError +from .session_registry import ( + AgentRunSessionRegistry, + AgentRunSession, + RunAuthorizationSnapshot, + get_session_registry, +) from .events import ( MESSAGE_RECEIVED, MESSAGE_RECALLED, @@ -41,8 +47,11 @@ __all__ = [ 'AgentResultNormalizer', 'AgentRunOrchestrator', 'ConfigMigration', + 'AgentBindingResolver', + 'AgentBindingResolutionError', 'AgentRunSessionRegistry', 'AgentRunSession', + 'RunAuthorizationSnapshot', 'get_session_registry', 'MESSAGE_RECEIVED', 'MESSAGE_RECALLED', diff --git a/src/langbot/pkg/agent/runner/binding_resolver.py b/src/langbot/pkg/agent/runner/binding_resolver.py new file mode 100644 index 00000000..6d05ba76 --- /dev/null +++ b/src/langbot/pkg/agent/runner/binding_resolver.py @@ -0,0 +1,63 @@ +"""Resolve host events to one effective Agent binding.""" + +from __future__ import annotations + +from .host_models import AgentConfig, AgentBinding, AgentEventEnvelope, BindingScope + + +class AgentBindingResolutionError(Exception): + """Raised when an event cannot resolve to exactly one Agent binding.""" + + +class AgentBindingResolver: + """Resolve an event to a single AgentBinding. + + The target product model is one bot / IM channel -> one Agent. Fan-out, + observer agents, or multi-runner arbitration require separate delivery and + state semantics and are intentionally not hidden in this resolver. + """ + + def resolve_one( + self, + event: AgentEventEnvelope, + agents: list[AgentConfig], + ) -> AgentBinding: + """Resolve exactly one enabled Agent for the event.""" + matches = [ + agent + for agent in agents + if agent.enabled and event.event_type in agent.event_types + ] + + if not matches: + raise AgentBindingResolutionError( + f'No Agent binding matches event_type={event.event_type}' + ) + + if len(matches) > 1: + agent_ids = ', '.join(agent.agent_id or '' for agent in matches) + raise AgentBindingResolutionError( + f'Multiple Agent bindings match event_type={event.event_type}: {agent_ids}' + ) + + return self._to_binding(matches[0]) + + def _to_binding(self, agent: AgentConfig) -> AgentBinding: + """Project product-level Agent config into the run-time binding model.""" + scope = BindingScope( + scope_type='agent', + scope_id=agent.agent_id, + ) + + return AgentBinding( + binding_id=f"agent_{agent.agent_id or 'default'}_{agent.runner_id}", + scope=scope, + event_types=list(agent.event_types), + runner_id=agent.runner_id, + runner_config=agent.runner_config, + resource_policy=agent.resource_policy, + state_policy=agent.state_policy, + delivery_policy=agent.delivery_policy, + enabled=agent.enabled, + agent_id=agent.agent_id, + ) diff --git a/src/langbot/pkg/agent/runner/host_models.py b/src/langbot/pkg/agent/runner/host_models.py index b98ff365..f4462c6d 100644 --- a/src/langbot/pkg/agent/runner/host_models.py +++ b/src/langbot/pkg/agent/runner/host_models.py @@ -133,6 +133,42 @@ class DeliveryPolicy(pydantic.BaseModel): """Maximum message size.""" +class AgentConfig(pydantic.BaseModel): + """Host-side Agent configuration. + + Product-level Agent is the target replacement for Pipeline-owned agent + config. Current Pipeline entry paths can project their config into this + model during migration. + """ + + agent_id: str | None = None + """Host-side Agent/config identifier.""" + + runner_id: str + """Runner ID to invoke.""" + + runner_config: dict[str, typing.Any] = pydantic.Field(default_factory=dict) + """Agent/runner binding configuration.""" + + resource_policy: ResourcePolicy = pydantic.Field(default_factory=ResourcePolicy) + """Resource policy for this Agent.""" + + state_policy: StatePolicy = pydantic.Field(default_factory=StatePolicy) + """State policy for this Agent.""" + + delivery_policy: DeliveryPolicy = pydantic.Field(default_factory=DeliveryPolicy) + """Delivery policy for this Agent.""" + + event_types: list[str] = pydantic.Field(default_factory=lambda: ["message.received"]) + """Event types this Agent handles.""" + + enabled: bool = True + """Whether this Agent can be selected by a binding resolver.""" + + metadata: dict[str, typing.Any] = pydantic.Field(default_factory=dict) + """Non-protocol diagnostic metadata, such as legacy config source.""" + + class AgentBinding(pydantic.BaseModel): """Binding configuration for mapping events to runners. diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index b38e5054..97092452 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -21,6 +21,7 @@ from .session_registry import get_session_registry, AgentRunSessionRegistry from .config_migration import ConfigMigration from .host_models import AgentEventEnvelope, AgentBinding from .query_entry_adapter import QueryEntryAdapter +from .binding_resolver import AgentBindingResolver from .state_scope import build_state_context from .errors import ( RunnerNotFoundError, @@ -61,6 +62,8 @@ class AgentRunOrchestrator: result_normalizer: AgentResultNormalizer + binding_resolver: AgentBindingResolver + # Cached singleton references (set in __init__) _session_registry: AgentRunSessionRegistry _persistent_state_store: PersistentStateStore | None @@ -75,6 +78,7 @@ class AgentRunOrchestrator: self.context_builder = AgentRunContextBuilder(ap) self.resource_builder = AgentResourceBuilder(ap) self.result_normalizer = AgentResultNormalizer(ap) + self.binding_resolver = AgentBindingResolver() # Cache singleton references to avoid per-request getter calls self._session_registry = get_session_registry() self._persistent_state_store = None # Lazy init on first use @@ -258,8 +262,10 @@ class AgentRunOrchestrator: # Convert Query to event-first envelope event = QueryEntryAdapter.query_to_event(query) - # Convert current config to binding - binding = QueryEntryAdapter.config_to_binding(query, runner_id) + # Project legacy Pipeline config into target Agent config, then resolve + # exactly one effective binding for this event. + agent_config = QueryEntryAdapter.config_to_agent_config(query, runner_id) + binding = self.binding_resolver.resolve_one(event, [agent_config]) # Extract bound plugins for authorization bound_plugins = query.variables.get('_pipeline_bound_plugins') diff --git a/src/langbot/pkg/agent/runner/query_entry_adapter.py b/src/langbot/pkg/agent/runner/query_entry_adapter.py index dd559ddc..9591de72 100644 --- a/src/langbot/pkg/agent/runner/query_entry_adapter.py +++ b/src/langbot/pkg/agent/runner/query_entry_adapter.py @@ -21,9 +21,8 @@ from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext from .host_models import ( + AgentConfig, AgentEventEnvelope, - AgentBinding, - BindingScope, ResourcePolicy, StatePolicy, DeliveryPolicy, @@ -36,7 +35,7 @@ class QueryEntryAdapter: This adapter is responsible for: - Converting Query to AgentEventEnvelope - - Converting current Agent/runner config to temporary AgentBinding + - Projecting current Pipeline config to temporary AgentConfig - Putting Query-only fields into adapter context """ @@ -97,30 +96,17 @@ class QueryEntryAdapter: ) @classmethod - def config_to_binding( + def config_to_agent_config( cls, query: pipeline_query.Query, runner_id: str, - ) -> AgentBinding: - """Convert current config container to temporary AgentBinding. - - Args: - query: Current entry query - runner_id: Resolved runner ID - - Returns: - AgentBinding for this run - """ + ) -> AgentConfig: + """Project the current Pipeline config container into target Agent config.""" pipeline_config = query.pipeline_config or {} ai_config = pipeline_config.get('ai', {}) runner_config = ai_config.get('runner_config', {}).get(runner_id, {}) agent_id = getattr(query, 'pipeline_uuid', None) - scope = BindingScope( - scope_type="agent", - scope_id=agent_id, - ) - # Build resource policy from current config resource_policy = ResourcePolicy( allowed_model_uuids=cls._extract_allowed_models(query), @@ -140,17 +126,16 @@ class QueryEntryAdapter: enable_reply=True, ) - return AgentBinding( - binding_id=f"agent_{agent_id or 'default'}_{runner_id}", - scope=scope, - event_types=[runner_events.MESSAGE_RECEIVED], + return AgentConfig( + agent_id=agent_id, runner_id=runner_id, runner_config=runner_config, resource_policy=resource_policy, state_policy=state_policy, delivery_policy=delivery_policy, + event_types=[runner_events.MESSAGE_RECEIVED], enabled=True, - agent_id=agent_id, + metadata={'source': 'pipeline_adapter'}, ) @classmethod diff --git a/src/langbot/pkg/agent/runner/session_registry.py b/src/langbot/pkg/agent/runner/session_registry.py index 65d3ebc0..cdfc4f50 100644 --- a/src/langbot/pkg/agent/runner/session_registry.py +++ b/src/langbot/pkg/agent/runner/session_registry.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio +import copy import typing import time import threading @@ -15,6 +16,22 @@ class AgentRunSessionStatus(typing.TypedDict): last_activity_at: int +class RunAuthorizationSnapshot(typing.TypedDict): + """Frozen authorization data for one active run. + + ResourceBuilder creates the authorized resource list once before runner + execution. Runtime proxy handlers must validate against this run-scoped + snapshot instead of recomputing resource policy. + """ + + resources: AgentResources + permissions: dict[str, list[str]] + conversation_id: str | None + state_policy: dict[str, typing.Any] + state_context: dict[str, typing.Any] + authorized_ids: dict[str, set[str]] + + class AgentRunSession(typing.TypedDict): """Session for an active agent runner execution. @@ -25,25 +42,15 @@ class AgentRunSession(typing.TypedDict): runner_id: Runner descriptor ID (plugin:author/name/runner) query_id: Host entry query ID, only present for query-based adapters plugin_identity: Plugin identifier (author/name) of the runner - conversation_id: Conversation ID for history/event access - resources: Authorized resources for this run (from AgentResources) - permissions: Runner permissions from descriptor (artifacts, history, events, etc.) - state_policy: State policy from binding (enable_state, state_scopes) - state_context: Context for state API (scope_keys, binding_identity, etc.) + authorization: Run-scoped authorization snapshot; runtime auth truth status: Session status tracking - _authorized_ids: Pre-computed authorized resource IDs for O(1) lookup """ run_id: str runner_id: str query_id: int | None plugin_identity: str # author/name - conversation_id: str | None - resources: AgentResources - permissions: dict[str, list[str]] - state_policy: dict[str, typing.Any] # {enable_state: bool, state_scopes: list} - state_context: dict[str, typing.Any] # {scope_keys: dict, binding_identity: str, ...} + authorization: RunAuthorizationSnapshot status: AgentRunSessionStatus - _authorized_ids: dict[str, set[str]] # Pre-computed sets for O(1) lookup class AgentRunSessionRegistry: @@ -82,7 +89,7 @@ class AgentRunSessionRegistry: Args: run_id: Unique run identifier runner_id: Runner descriptor ID - query_id: Host entry query ID, only present for query-based adapters + query_id: Host entry query ID, only present for query-based adapters plugin_identity: Plugin identifier (author/name) resources: Authorized resources for this run conversation_id: Conversation ID for history/event access @@ -102,36 +109,40 @@ class AgentRunSessionRegistry: # Normalize state_context to empty dict if None state_context = state_context or {} - # Pre-compute authorized resource IDs for O(1) lookup - authorized_ids: dict[str, set[str]] = { - 'model': {m.get('model_id') for m in resources.get('models', [])}, - 'tool': {t.get('tool_name') for t in resources.get('tools', [])}, - 'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])}, - 'file': {f.get('file_id') for f in resources.get('files', [])}, + resources_snapshot = copy.deepcopy(resources) + authorization: RunAuthorizationSnapshot = { + 'resources': resources_snapshot, + 'permissions': copy.deepcopy(permissions), + 'conversation_id': conversation_id, + 'state_policy': copy.deepcopy(state_policy), + 'state_context': copy.deepcopy(state_context), + 'authorized_ids': self._build_authorized_ids(resources_snapshot), } - # NOTE: state_policy and state_context are stored at session top-level, - # NOT in resources. Resources should only contain resource authorization info. session: AgentRunSession = { 'run_id': run_id, 'runner_id': runner_id, 'query_id': query_id, 'plugin_identity': plugin_identity, - 'conversation_id': conversation_id, - 'resources': resources, # Original AgentResources, no state metadata mixed in - 'permissions': permissions, - 'state_policy': state_policy, - 'state_context': state_context, + 'authorization': authorization, 'status': { 'started_at': now, 'last_activity_at': now, }, - '_authorized_ids': authorized_ids, } async with self._lock: self._sessions[run_id] = session + def _build_authorized_ids(self, resources: AgentResources) -> dict[str, set[str]]: + """Pre-compute authorized resource IDs for O(1) lookup.""" + return { + 'model': {m.get('model_id') for m in resources.get('models', [])}, + 'tool': {t.get('tool_name') for t in resources.get('tools', [])}, + 'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])}, + 'file': {f.get('file_id') for f in resources.get('files', [])}, + } + async def unregister(self, run_id: str) -> None: """Unregister an agent run session. @@ -182,13 +193,15 @@ class AgentRunSessionRegistry: Returns: True if resource is authorized, False otherwise """ - authorized_ids = session.get('_authorized_ids', {}) + authorization = session['authorization'] + authorized_ids = authorization['authorized_ids'] + resources = authorization['resources'] if resource_type in ('model', 'tool', 'knowledge_base', 'file'): return resource_id in authorized_ids.get(resource_type, set()) if resource_type == 'storage': - storage = session['resources'].get('storage', {}) + storage = resources.get('storage', {}) if resource_id == 'plugin': return storage.get('plugin_storage', False) elif resource_id == 'workspace': diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 1bf8f16c..f9cfe5f4 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -116,16 +116,17 @@ def _validate_artifact_access( Without an explicit scope field, we enforce strict access control. Args: - session: AgentRunSession dict with run_id, conversation_id, permissions + session: AgentRunSession dict with run_id and authorization snapshot artifact_metadata: Artifact metadata dict with conversation_id, run_id operation: Operation name for error messages ('metadata' or 'read') Returns: Tuple of (is_allowed, error_message). If is_allowed is False, error_message contains reason. """ + authorization = session['authorization'] artifact_conversation_id = artifact_metadata.get('conversation_id') artifact_run_id = artifact_metadata.get('run_id') - session_conversation_id = session.get('conversation_id') + session_conversation_id = authorization.get('conversation_id') session_run_id = session.get('run_id') # Rule 1: Created by this run (allows cross-conversation access for self-created artifacts) @@ -141,6 +142,40 @@ def _validate_artifact_access( return False, f'Artifact {operation} access denied: artifact not in session conversation and not created by this run' +def _get_run_authorization(session: dict[str, Any]) -> dict[str, Any]: + """Return the run-scoped authorization snapshot.""" + return session['authorization'] + + +def _resolve_state_scope( + session: dict[str, Any], + scope: str, +) -> tuple[dict[str, Any] | None, str | None, handler.ActionResponse | None]: + """Resolve state policy/context for an authorized run scope.""" + authorization = _get_run_authorization(session) + state_policy = authorization['state_policy'] + + if not state_policy.get('enable_state', True): + return None, None, handler.ActionResponse.error( + message='State access is disabled by binding policy' + ) + + state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) + if scope not in state_scopes: + return None, None, handler.ActionResponse.error( + message=f'Scope "{scope}" is not enabled by binding policy' + ) + + state_context = authorization['state_context'] + scope_key = state_context.get('scope_keys', {}).get(scope) + if not scope_key: + return None, None, handler.ActionResponse.error( + message=f'Scope key not available for scope "{scope}"' + ) + + return state_context, scope_key, None + + async def _validate_agent_run_session( run_id: str, caller_plugin_identity: str | None, @@ -173,7 +208,7 @@ async def _validate_agent_run_session( ) if permission_group and permission_operation: - permissions = session.get('permissions', {}) + permissions = _get_run_authorization(session)['permissions'] allowed_operations = permissions.get(permission_group, []) if permission_operation not in allowed_operations: return None, handler.ActionResponse.error( @@ -189,7 +224,7 @@ def _resolve_run_conversation( api_name: str, ) -> tuple[str | None, handler.ActionResponse | None]: """Resolve and enforce current-run conversation scope.""" - session_conversation_id = session.get('conversation_id') + session_conversation_id = _get_run_authorization(session).get('conversation_id') if requested_conversation_id: if not session_conversation_id: @@ -1572,7 +1607,7 @@ class RuntimeConnectionHandler(handler.Handler): ) # Validate event is in the same conversation as the run, or was created by the same run. - session_conversation_id = session.get('conversation_id') + session_conversation_id = _get_run_authorization(session).get('conversation_id') event_run_id = event.get('run_id') if event_run_id and event_run_id == run_id: return handler.ActionResponse.success(data=event) @@ -1813,53 +1848,18 @@ class RuntimeConnectionHandler(handler.Handler): if not key: return handler.ActionResponse.error(message='key is required') - # Validate run session - session_registry = get_session_registry() - session = await session_registry.get(run_id) - if not session: - return handler.ActionResponse.error( - message=f'Run session {run_id} not found or expired' - ) + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'State get', + ) + if error: + return error - # Validate caller plugin identity (strict: required when session has plugin_identity) - session_plugin_identity = session.get('plugin_identity') - if session_plugin_identity: - if not caller_plugin_identity: - return handler.ActionResponse.error( - message=f'caller_plugin_identity is required for run_id {run_id}' - ) - if caller_plugin_identity != session_plugin_identity: - return handler.ActionResponse.error( - message=f'Plugin identity mismatch for run_id {run_id}' - ) - - # Get state policy from session (stored in state_policy field, not in resources) - state_policy = session.get('state_policy', {}) - if not state_policy: - # Default state policy - state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']} - - # Check if state is enabled - if not state_policy.get('enable_state', True): - return handler.ActionResponse.error( - message='State access is disabled by binding policy' - ) - - # Check if scope is enabled - state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) - if scope not in state_scopes: - return handler.ActionResponse.error( - message=f'Scope "{scope}" is not enabled by binding policy' - ) - - # Build scope key using state_context from session (stored in state_context field, not in resources) - state_context = session.get('state_context', {}) - scope_key = state_context.get('scope_keys', {}).get(scope) - - if not scope_key: - return handler.ActionResponse.error( - message=f'Scope key not available for scope "{scope}"' - ) + _state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error # Get state from persistent store from ..agent.runner.persistent_state_store import get_persistent_state_store @@ -1894,52 +1894,18 @@ class RuntimeConnectionHandler(handler.Handler): if not key: return handler.ActionResponse.error(message='key is required') - # Validate run session - session_registry = get_session_registry() - session = await session_registry.get(run_id) - if not session: - return handler.ActionResponse.error( - message=f'Run session {run_id} not found or expired' - ) + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'State set', + ) + if error: + return error - # Validate caller plugin identity (strict: required when session has plugin_identity) - session_plugin_identity = session.get('plugin_identity') - if session_plugin_identity: - if not caller_plugin_identity: - return handler.ActionResponse.error( - message=f'caller_plugin_identity is required for run_id {run_id}' - ) - if caller_plugin_identity != session_plugin_identity: - return handler.ActionResponse.error( - message=f'Plugin identity mismatch for run_id {run_id}' - ) - - # Get state policy from session (stored in state_policy field, not in resources) - state_policy = session.get('state_policy', {}) - if not state_policy: - state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']} - - # Check if state is enabled - if not state_policy.get('enable_state', True): - return handler.ActionResponse.error( - message='State access is disabled by binding policy' - ) - - # Check if scope is enabled - state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) - if scope not in state_scopes: - return handler.ActionResponse.error( - message=f'Scope "{scope}" is not enabled by binding policy' - ) - - # Build scope key using state_context from session (stored in state_context field, not in resources) - state_context = session.get('state_context', {}) - scope_key = state_context.get('scope_keys', {}).get(scope) - - if not scope_key: - return handler.ActionResponse.error( - message=f'Scope key not available for scope "{scope}"' - ) + state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error # Get additional context for DB insert runner_id = session.get('runner_id', '') @@ -1989,52 +1955,18 @@ class RuntimeConnectionHandler(handler.Handler): if not key: return handler.ActionResponse.error(message='key is required') - # Validate run session - session_registry = get_session_registry() - session = await session_registry.get(run_id) - if not session: - return handler.ActionResponse.error( - message=f'Run session {run_id} not found or expired' - ) + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'State delete', + ) + if error: + return error - # Validate caller plugin identity (strict: required when session has plugin_identity) - session_plugin_identity = session.get('plugin_identity') - if session_plugin_identity: - if not caller_plugin_identity: - return handler.ActionResponse.error( - message=f'caller_plugin_identity is required for run_id {run_id}' - ) - if caller_plugin_identity != session_plugin_identity: - return handler.ActionResponse.error( - message=f'Plugin identity mismatch for run_id {run_id}' - ) - - # Get state policy from session (stored in state_policy field, not in resources) - state_policy = session.get('state_policy', {}) - if not state_policy: - state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']} - - # Check if state is enabled - if not state_policy.get('enable_state', True): - return handler.ActionResponse.error( - message='State access is disabled by binding policy' - ) - - # Check if scope is enabled - state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) - if scope not in state_scopes: - return handler.ActionResponse.error( - message=f'Scope "{scope}" is not enabled by binding policy' - ) - - # Build scope key using state_context from session (stored in state_context field, not in resources) - state_context = session.get('state_context', {}) - scope_key = state_context.get('scope_keys', {}).get(scope) - - if not scope_key: - return handler.ActionResponse.error( - message=f'Scope key not available for scope "{scope}"' - ) + _state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error # Delete state from persistent store from ..agent.runner.persistent_state_store import get_persistent_state_store @@ -2070,52 +2002,18 @@ class RuntimeConnectionHandler(handler.Handler): limit = 100 limit = min(limit, 100) # Cap at 100 - # Validate run session - session_registry = get_session_registry() - session = await session_registry.get(run_id) - if not session: - return handler.ActionResponse.error( - message=f'Run session {run_id} not found or expired' - ) + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'State list', + ) + if error: + return error - # Validate caller plugin identity (strict: required when session has plugin_identity) - session_plugin_identity = session.get('plugin_identity') - if session_plugin_identity: - if not caller_plugin_identity: - return handler.ActionResponse.error( - message=f'caller_plugin_identity is required for run_id {run_id}' - ) - if caller_plugin_identity != session_plugin_identity: - return handler.ActionResponse.error( - message=f'Plugin identity mismatch for run_id {run_id}' - ) - - # Get state policy from session (stored in state_policy field, not in resources) - state_policy = session.get('state_policy', {}) - if not state_policy: - state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']} - - # Check if state is enabled - if not state_policy.get('enable_state', True): - return handler.ActionResponse.error( - message='State access is disabled by binding policy' - ) - - # Check if scope is enabled - state_scopes = state_policy.get('state_scopes', ['conversation', 'actor']) - if scope not in state_scopes: - return handler.ActionResponse.error( - message=f'Scope "{scope}" is not enabled by binding policy' - ) - - # Build scope key using state_context from session (stored in state_context field, not in resources) - state_context = session.get('state_context', {}) - scope_key = state_context.get('scope_keys', {}).get(scope) - - if not scope_key: - return handler.ActionResponse.error( - message=f'Scope key not available for scope "{scope}"' - ) + _state_context, scope_key, state_error = _resolve_state_scope(session, scope) + if state_error: + return state_error # List state keys from persistent store from ..agent.runner.persistent_state_store import get_persistent_state_store diff --git a/tests/unit_tests/agent/conftest.py b/tests/unit_tests/agent/conftest.py index e4c9e19e..f25631e0 100644 --- a/tests/unit_tests/agent/conftest.py +++ b/tests/unit_tests/agent/conftest.py @@ -39,6 +39,10 @@ def make_session( query_id: int | None = 1, plugin_identity: str = 'test/test-runner', resources: dict | None = None, + conversation_id: str | None = None, + permissions: dict[str, list[str]] | None = None, + state_policy: dict[str, typing.Any] | None = None, + state_context: dict[str, typing.Any] | None = None, ) -> dict[str, typing.Any]: """Create a minimal AgentRunSession dict for testing. @@ -50,13 +54,19 @@ def make_session( resources: AgentResources dict (uses make_resources() default if None) Returns: - AgentRunSession dict with all required fields including pre-computed _authorized_ids + AgentRunSession dict with run-scoped authorization snapshot """ import time now = int(time.time()) - res = resources or make_resources() + res = resources if resources is not None else make_resources() + perms = permissions if permissions is not None else {} + policy = ( + state_policy + if state_policy is not None + else {'enable_state': True, 'state_scopes': ['conversation', 'actor']} + ) + context = state_context if state_context is not None else {} - # Pre-compute authorized IDs for O(1) lookup (matching production behavior) authorized_ids: dict[str, set[str]] = { 'model': {m.get('model_id') for m in res.get('models', [])}, 'tool': {t.get('tool_name') for t in res.get('tools', [])}, @@ -69,10 +79,16 @@ def make_session( 'runner_id': runner_id, 'query_id': query_id, 'plugin_identity': plugin_identity, - 'resources': res, + 'authorization': { + 'resources': res, + 'permissions': perms, + 'conversation_id': conversation_id, + 'state_policy': policy, + 'state_context': context, + 'authorized_ids': authorized_ids, + }, 'status': { 'started_at': now, 'last_activity_at': now, }, - '_authorized_ids': authorized_ids, } diff --git a/tests/unit_tests/agent/test_artifact_store.py b/tests/unit_tests/agent/test_artifact_store.py index 1b5607f6..8d599362 100644 --- a/tests/unit_tests/agent/test_artifact_store.py +++ b/tests/unit_tests/agent/test_artifact_store.py @@ -12,6 +12,7 @@ from langbot.pkg.agent.runner.session_registry import ( AgentRunSessionRegistry, get_session_registry, ) +from .conftest import make_session class TestArtifactStore: @@ -210,6 +211,13 @@ class TestArtifactAuthorization: class TestArtifactAccessValidation: """Test _validate_artifact_access authorization rules.""" + def _make_session(self, conversation_id: str | None): + return make_session( + run_id="run_001", + conversation_id=conversation_id, + permissions={"artifacts": ["metadata", "read"]}, + ) + def _call_validate(self, session, metadata, operation="metadata"): """Helper to call the validation function.""" from langbot.pkg.plugin.handler import _validate_artifact_access @@ -217,11 +225,7 @@ class TestArtifactAccessValidation: def test_global_artifact_denied_by_default(self): """Artifacts without conversation_id are denied by default (no global access).""" - session = { - "run_id": "run_001", - "conversation_id": "conv_001", - "permissions": {"artifacts": ["metadata", "read"]}, - } + session = self._make_session("conv_001") metadata = { "artifact_id": "art_global", "conversation_id": None, # No conversation scope @@ -234,11 +238,7 @@ class TestArtifactAccessValidation: def test_own_run_artifact_allowed(self): """Artifacts created by same run are allowed (even cross-conversation).""" - session = { - "run_id": "run_001", - "conversation_id": "conv_001", - "permissions": {"artifacts": ["metadata", "read"]}, - } + session = self._make_session("conv_001") metadata = { "artifact_id": "art_001", "conversation_id": "conv_other", # Different conversation @@ -251,11 +251,7 @@ class TestArtifactAccessValidation: def test_same_conversation_allowed(self): """Artifacts in same conversation are allowed.""" - session = { - "run_id": "run_001", - "conversation_id": "conv_001", - "permissions": {"artifacts": ["metadata", "read"]}, - } + session = self._make_session("conv_001") metadata = { "artifact_id": "art_001", "conversation_id": "conv_001", # Same as session @@ -268,11 +264,7 @@ class TestArtifactAccessValidation: def test_different_conversation_and_run_denied(self): """Artifacts in different conversation and different run are denied.""" - session = { - "run_id": "run_001", - "conversation_id": "conv_001", - "permissions": {"artifacts": ["metadata", "read"]}, - } + session = self._make_session("conv_001") metadata = { "artifact_id": "art_001", "conversation_id": "conv_other", # Different conversation @@ -285,11 +277,7 @@ class TestArtifactAccessValidation: def test_session_without_conversation_denied_for_conversation_artifact(self): """Session without conversation_id cannot access conversation-scoped artifacts.""" - session = { - "run_id": "run_001", - "conversation_id": None, # No conversation - "permissions": {"artifacts": ["metadata", "read"]}, - } + session = self._make_session(None) metadata = { "artifact_id": "art_001", "conversation_id": "conv_001", # Has conversation @@ -301,11 +289,7 @@ class TestArtifactAccessValidation: def test_session_without_conversation_allowed_for_own_artifact(self): """Session without conversation can access artifacts it created.""" - session = { - "run_id": "run_001", - "conversation_id": None, # No conversation - "permissions": {"artifacts": ["metadata", "read"]}, - } + session = self._make_session(None) metadata = { "artifact_id": "art_001", "conversation_id": "conv_001", # Has conversation @@ -431,9 +415,10 @@ class TestSessionRegistryPermissions: session = await session_registry.get("run_001") assert session is not None - assert session["permissions"]["artifacts"] == ["metadata", "read"] - assert session["permissions"]["history"] == ["page"] - assert session["permissions"]["events"] == ["get"] + permissions = session["authorization"]["permissions"] + assert permissions["artifacts"] == ["metadata", "read"] + assert permissions["history"] == ["page"] + assert permissions["events"] == ["get"] @pytest.mark.asyncio async def test_register_with_empty_permissions(self, session_registry): @@ -457,7 +442,7 @@ class TestSessionRegistryPermissions: session = await session_registry.get("run_002") assert session is not None - assert session["permissions"] == {} + assert session["authorization"]["permissions"] == {} class TestArtifactStoreRealSQLite: diff --git a/tests/unit_tests/agent/test_chat_handler.py b/tests/unit_tests/agent/test_chat_handler.py index 77b4f6aa..602296a5 100644 --- a/tests/unit_tests/agent/test_chat_handler.py +++ b/tests/unit_tests/agent/test_chat_handler.py @@ -97,6 +97,7 @@ class MockMessageChunk: self.role = 'assistant' self.content = content self.resp_message_id = resp_message_id + self.tool_calls = [] self.is_final = False def readable_str(self): diff --git a/tests/unit_tests/agent/test_event_first_protocol.py b/tests/unit_tests/agent/test_event_first_protocol.py index b93b158b..e3364d4a 100644 --- a/tests/unit_tests/agent/test_event_first_protocol.py +++ b/tests/unit_tests/agent/test_event_first_protocol.py @@ -2,7 +2,7 @@ Tests cover: 1. Query -> AgentEventEnvelope conversion -2. Current config -> AgentBinding conversion +2. Current config -> AgentConfig projection and single-binding resolution 3. AgentRunContext not inlining full history by default 4. LangBot Host not defining context-window controls 5. Event-first run() entry point @@ -32,6 +32,10 @@ from langbot_plugin.api.entities.builtin.agent_runner.permissions import ( # Import LangBot host models from langbot.pkg.agent.runner.query_entry_adapter import QueryEntryAdapter +from langbot.pkg.agent.runner.binding_resolver import ( + AgentBindingResolver, + AgentBindingResolutionError, +) class TestQueryToEventEnvelope: @@ -127,27 +131,40 @@ class TestQueryToEventEnvelope: assert second.event_id != first.event_id -class TestQueryConfigToBinding: - """Test current config -> AgentBinding conversion.""" +class TestQueryConfigToAgentConfig: + """Test current config projection and single-Agent binding resolution.""" - def test_config_to_binding_runner_id(self, mock_query): - """Test binding runner_id extraction.""" - binding = QueryEntryAdapter.config_to_binding( + def test_config_to_agent_config_runner_id(self, mock_query): + """Test AgentConfig runner_id extraction.""" + agent_config = QueryEntryAdapter.config_to_agent_config( mock_query, "plugin:author/plugin/runner" ) - assert binding.runner_id == "plugin:author/plugin/runner" + assert agent_config.runner_id == "plugin:author/plugin/runner" - def test_config_to_binding_scope(self, mock_query): - """Test binding scope extraction.""" - binding = QueryEntryAdapter.config_to_binding( + def test_resolver_projects_agent_scope(self, mock_query): + """Test binding scope projection through the resolver.""" + event = QueryEntryAdapter.query_to_event(mock_query) + agent_config = QueryEntryAdapter.config_to_agent_config( mock_query, "plugin:test/plugin/runner" ) + binding = AgentBindingResolver().resolve_one(event, [agent_config]) assert binding.scope.scope_type == "agent" assert binding.scope.scope_id == mock_query.pipeline_uuid assert binding.agent_id == mock_query.pipeline_uuid + def test_resolver_rejects_multiple_matching_agents(self, mock_query): + """Event dispatch is single-Agent in v1.""" + event = QueryEntryAdapter.query_to_event(mock_query) + first = QueryEntryAdapter.config_to_agent_config( + mock_query, "plugin:test/plugin/runner" + ) + second = first.model_copy(update={"agent_id": "agent_2"}) + + with pytest.raises(AgentBindingResolutionError): + AgentBindingResolver().resolve_one(event, [first, second]) + class TestAgentRunContextProtocolV1: """Test AgentRunContext Protocol v1 behavior.""" diff --git a/tests/unit_tests/agent/test_event_log_transcript.py b/tests/unit_tests/agent/test_event_log_transcript.py index 3425c4c1..814144ec 100644 --- a/tests/unit_tests/agent/test_event_log_transcript.py +++ b/tests/unit_tests/agent/test_event_log_transcript.py @@ -313,7 +313,7 @@ class TestHistoryPageAuthorization: session = await session_registry.get("run_1") assert session is not None - assert session["conversation_id"] == "conv_1" + assert session["authorization"]["conversation_id"] == "conv_1" # Cleanup await session_registry.unregister("run_1") diff --git a/tests/unit_tests/agent/test_handler_auth.py b/tests/unit_tests/agent/test_handler_auth.py index 6a81b4b9..7adfb328 100644 --- a/tests/unit_tests/agent/test_handler_auth.py +++ b/tests/unit_tests/agent/test_handler_auth.py @@ -22,7 +22,7 @@ from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry from langbot.pkg.plugin.handler import _build_tool_detail, _get_pipeline_knowledge_base_uuids # Import shared test fixtures from conftest.py -from .conftest import make_resources +from .conftest import make_resources, make_session class MockModel: @@ -1152,15 +1152,7 @@ class TestResourceTypeValidation: registry = AgentRunSessionRegistry() resources = make_resources() - # Create session manually for this test - session = { - 'run_id': 'test', - 'runner_id': 'test', - 'query_id': 1, - 'plugin_identity': 'test', - 'resources': resources, - 'status': {'started_at': 0, 'last_activity_at': 0}, - } + session = make_session(resources=resources) # Unknown resource type should return False assert registry.is_resource_allowed(session, 'unknown_type', 'any_id') is False @@ -1487,15 +1479,7 @@ class TestStorageResourcePermissionHelper: """is_resource_allowed handles missing storage field gracefully.""" registry = AgentRunSessionRegistry() - # Create session without storage field - session = { - 'run_id': 'test', - 'runner_id': 'test', - 'query_id': 1, - 'plugin_identity': 'test', - 'resources': {}, # No storage field - 'status': {'started_at': 0, 'last_activity_at': 0}, - } + session = make_session(resources={}) # Should return False for both storage types assert registry.is_resource_allowed(session, 'storage', 'plugin') is False diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index aae1515e..cf3deaf6 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -13,6 +13,7 @@ from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor from langbot.pkg.agent.runner.errors import RunnerExecutionError from langbot.pkg.agent.runner.orchestrator import AgentRunOrchestrator from langbot.pkg.agent.runner.query_entry_adapter import QueryEntryAdapter +from langbot.pkg.agent.runner.binding_resolver import AgentBindingResolver from langbot.pkg.agent.runner.session_registry import get_session_registry from langbot.pkg.agent.runner.persistent_state_store import reset_persistent_state_store from langbot_plugin.api.entities.builtin.platform import entities as platform_entities @@ -327,7 +328,7 @@ async def test_orchestrator_runs_fake_plugin_with_authorized_context(clean_agent session_during_run = plugin_connector.sessions_during_run[0] assert session_during_run is not None assert session_during_run["plugin_identity"] == "langbot/local-agent" - assert session_during_run["_authorized_ids"]["tool"] == {"langbot/test-tool/search"} + assert session_during_run["authorization"]["authorized_ids"]["tool"] == {"langbot/test-tool/search"} assert await get_session_registry().get(context["run_id"]) is None @@ -758,7 +759,8 @@ class TestQueryEntryAdapterHostCapabilities: # Note: We need to rebuild the event and binding to query the store from langbot.pkg.agent.runner.query_entry_adapter import QueryEntryAdapter event = QueryEntryAdapter.query_to_event(query) - binding = QueryEntryAdapter.config_to_binding(query, RUNNER_ID) + agent_config = QueryEntryAdapter.config_to_agent_config(query, RUNNER_ID) + binding = AgentBindingResolver().resolve_one(event, [agent_config]) snapshot = await persistent_store.build_snapshot_from_event(event, binding, descriptor) assert snapshot["conversation"]["external.test_key"] == "test_value" diff --git a/tests/unit_tests/agent/test_resource_builder.py b/tests/unit_tests/agent/test_resource_builder.py index 2486af06..55c8746f 100644 --- a/tests/unit_tests/agent/test_resource_builder.py +++ b/tests/unit_tests/agent/test_resource_builder.py @@ -7,6 +7,7 @@ from unittest.mock import AsyncMock, Mock import pytest from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor +from langbot.pkg.agent.runner.binding_resolver import AgentBindingResolver from langbot.pkg.agent.runner.query_entry_adapter import QueryEntryAdapter from langbot.pkg.agent.runner.resource_builder import AgentResourceBuilder @@ -48,6 +49,15 @@ def make_query( use_funcs: list | None = None, ): return SimpleNamespace( + query_id=1, + bot_uuid='bot_001', + launcher_type='person', + launcher_id='launcher_001', + sender_id='sender_001', + message_event=None, + message_chain=None, + user_message=None, + session=None, pipeline_config={ 'ai': { 'runner': {'id': RUNNER_ID}, @@ -62,9 +72,11 @@ def make_query( async def build_resources(app, query, descriptor): - binding = QueryEntryAdapter.config_to_binding(query, descriptor.id) + event = QueryEntryAdapter.query_to_event(query) + agent_config = QueryEntryAdapter.config_to_agent_config(query, descriptor.id) + binding = AgentBindingResolver().resolve_one(event, [agent_config]) return await AgentResourceBuilder(app).build_resources_from_binding( - event=Mock(), + event=event, binding=binding, descriptor=descriptor, ) diff --git a/tests/unit_tests/agent/test_session_registry.py b/tests/unit_tests/agent/test_session_registry.py index c47205f8..c021e655 100644 --- a/tests/unit_tests/agent/test_session_registry.py +++ b/tests/unit_tests/agent/test_session_registry.py @@ -41,8 +41,43 @@ class TestSessionRegistryBasic: assert result['runner_id'] == 'plugin:test/my-runner/default' assert result['query_id'] == 1 assert result['plugin_identity'] == 'test/my-runner' - assert len(result['resources']['models']) == 1 - assert result['resources']['models'][0]['model_id'] == 'model_001' + auth_resources = result['authorization']['resources'] + assert len(auth_resources['models']) == 1 + assert auth_resources['models'][0]['model_id'] == 'model_001' + assert 'resources' not in result + assert 'permissions' not in result + assert '_authorized_ids' not in result + + @pytest.mark.asyncio + async def test_register_freezes_authorization_snapshot(self): + """Register should freeze authorization data for the run.""" + registry = AgentRunSessionRegistry() + resources = make_resources( + models=[{'model_id': 'model_001'}], + storage={'plugin_storage': True, 'workspace_storage': False}, + ) + + await registry.register( + run_id='run_snapshot', + runner_id='plugin:test/my-runner/default', + query_id=1, + plugin_identity='test/my-runner', + resources=resources, + permissions={'models': ['invoke']}, + conversation_id='conv_001', + ) + + resources['models'].append({'model_id': 'model_late'}) + resources['storage']['workspace_storage'] = True + + session = await registry.get('run_snapshot') + assert session is not None + authorization = session['authorization'] + assert authorization['conversation_id'] == 'conv_001' + assert authorization['permissions'] == {'models': ['invoke']} + assert registry.is_resource_allowed(session, 'model', 'model_001') is True + assert registry.is_resource_allowed(session, 'model', 'model_late') is False + assert registry.is_resource_allowed(session, 'storage', 'workspace') is False @pytest.mark.asyncio async def test_get_nonexistent_session(self): @@ -91,23 +126,15 @@ class TestSessionRegistryBasic: # Create session with manually set old timestamp now = int(time.time()) - res = make_resources() - old_session: AgentRunSession = { - 'run_id': run_id, - 'runner_id': 'plugin:test/my-runner/default', - 'query_id': 1, - 'plugin_identity': 'test/my-runner', - 'resources': res, - 'status': { - 'started_at': now - 100, # 100 seconds ago - 'last_activity_at': now - 100, # 100 seconds ago - }, - '_authorized_ids': { - 'model': set(), - 'tool': set(), - 'knowledge_base': set(), - 'file': set(), - }, + old_session: AgentRunSession = make_session( + run_id=run_id, + runner_id='plugin:test/my-runner/default', + query_id=1, + plugin_identity='test/my-runner', + ) + old_session['status'] = { + 'started_at': now - 100, + 'last_activity_at': now - 100, } async with registry._lock: @@ -153,40 +180,25 @@ class TestSessionRegistryBasic: # Create sessions with manually set old timestamp now = int(time.time()) - res = make_resources() - old_session: AgentRunSession = { - 'run_id': 'old_run', - 'runner_id': 'plugin:test/runner/default', - 'query_id': 1, - 'plugin_identity': 'test/runner', - 'resources': res, - 'status': { - 'started_at': now - 7200, # 2 hours ago - 'last_activity_at': now - 7200, # 2 hours ago - }, - '_authorized_ids': { - 'model': set(), - 'tool': set(), - 'knowledge_base': set(), - 'file': set(), - }, + old_session: AgentRunSession = make_session( + run_id='old_run', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + ) + old_session['status'] = { + 'started_at': now - 7200, + 'last_activity_at': now - 7200, } - new_session: AgentRunSession = { - 'run_id': 'new_run', - 'runner_id': 'plugin:test/runner/default', - 'query_id': 2, - 'plugin_identity': 'test/runner', - 'resources': res, - 'status': { - 'started_at': now, - 'last_activity_at': now, - }, - '_authorized_ids': { - 'model': set(), - 'tool': set(), - 'knowledge_base': set(), - 'file': set(), - }, + new_session: AgentRunSession = make_session( + run_id='new_run', + runner_id='plugin:test/runner/default', + query_id=2, + plugin_identity='test/runner', + ) + new_session['status'] = { + 'started_at': now, + 'last_activity_at': now, } async with registry._lock: diff --git a/tests/unit_tests/agent/test_state_api_auth.py b/tests/unit_tests/agent/test_state_api_auth.py index 8f91f404..324f9f2c 100644 --- a/tests/unit_tests/agent/test_state_api_auth.py +++ b/tests/unit_tests/agent/test_state_api_auth.py @@ -342,7 +342,7 @@ class TestStateAPIFullFlowWithRealDB: # Verify session has correct state_context session = await session_registry.get('run_full_flow') assert session is not None - state_ctx = session.get('state_context') + state_ctx = session['authorization']['state_context'] assert state_ctx is not None, f"state_context is None. Session keys: {list(session.keys())}" assert 'scope_keys' in state_ctx, f"scope_keys not in state_context: {state_ctx}" assert 'conversation' in state_ctx['scope_keys'], f"conversation not in scope_keys: {state_ctx['scope_keys']}" @@ -412,31 +412,31 @@ class TestStateAPIFullFlowWithRealDB: await session_registry.unregister('run_full_flow') -class TestStateHandlerReadsFromSessionTopLevel: - """Tests verifying handlers read state_policy/state_context from session top-level, not resources.""" +class TestStateHandlerReadsFromAuthorizationSnapshot: + """Tests verifying handlers read state_policy/state_context from authorization snapshot.""" @pytest.mark.asyncio - async def test_state_handler_reads_state_policy_from_session_top_level(self, session_registry, db_engine, persistent_store): - """Handler reads state_policy from session['state_policy'], not session['resources']['state_policy'].""" + async def test_state_handler_reads_state_policy_from_authorization(self, session_registry, db_engine, persistent_store): + """Handler reads state_policy from session['authorization'], not resources.""" fake_app = FakeApplication(db_engine) fake_app.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine) - # Register with explicit state_policy at top level + # Register with explicit state_policy in the authorization snapshot await session_registry.register( run_id='run_policy_top_level', runner_id='plugin:test/runner/default', query_id=1, plugin_identity='test/runner', resources=make_resources(), - state_policy={'enable_state': False, 'state_scopes': []}, # Disabled at top level + state_policy={'enable_state': False, 'state_scopes': []}, state_context={'scope_keys': {}, 'binding_identity': 'binding_1'}, ) # Verify resources does NOT contain state_policy session = await session_registry.get('run_policy_top_level') assert session is not None - assert 'state_policy' not in session.get('resources', {}), \ - "resources should NOT contain state_policy" + resources = session['authorization']['resources'] + assert 'state_policy' not in resources, "resources should NOT contain state_policy" async def fake_disconnect(): return True @@ -445,7 +445,7 @@ class TestStateHandlerReadsFromSessionTopLevel: handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_get_handler = handler.actions[PluginToRuntimeAction.STATE_GET.value] - # Should fail because enable_state=False in session['state_policy'] + # Should fail because enable_state=False in authorization.state_policy result = await state_get_handler({ 'run_id': 'run_policy_top_level', 'scope': 'conversation', @@ -459,12 +459,12 @@ class TestStateHandlerReadsFromSessionTopLevel: await session_registry.unregister('run_policy_top_level') @pytest.mark.asyncio - async def test_state_handler_reads_state_context_from_session_top_level(self, session_registry, db_engine, persistent_store): - """Handler reads state_context from session['state_context'], not session['resources']['state_context'].""" + async def test_state_handler_reads_state_context_from_authorization(self, session_registry, db_engine, persistent_store): + """Handler reads state_context from session['authorization'], not resources.""" fake_app = FakeApplication(db_engine) fake_app.persistence_mgr.get_db_engine = MagicMock(return_value=db_engine) - # Register with explicit state_context at top level + # Register with explicit state_context in the authorization snapshot await session_registry.register( run_id='run_context_top_level', runner_id='plugin:test/runner/default', @@ -478,8 +478,8 @@ class TestStateHandlerReadsFromSessionTopLevel: # Verify resources does NOT contain state_context session = await session_registry.get('run_context_top_level') assert session is not None - assert 'state_context' not in session.get('resources', {}), \ - "resources should NOT contain state_context" + resources = session['authorization']['resources'] + assert 'state_context' not in resources, "resources should NOT contain state_context" async def fake_disconnect(): return True @@ -488,7 +488,7 @@ class TestStateHandlerReadsFromSessionTopLevel: handler = RuntimeConnectionHandler(FakeConnection(), fake_disconnect, fake_app) state_set_handler = handler.actions[PluginToRuntimeAction.STATE_SET.value] - # Should use scope_key from session['state_context']['scope_keys']['conversation'] + # Should use scope_key from authorization.state_context.scope_keys.conversation result = await state_set_handler({ 'run_id': 'run_context_top_level', 'scope': 'conversation', @@ -508,7 +508,7 @@ class TestResourcesDoesNotContainStateMetadata: @pytest.mark.asyncio async def test_resources_clean_after_register(self, session_registry): - """After register(), resources should not contain state_policy or state_context.""" + """After register(), only authorization contains resources and state metadata.""" resources = make_resources() await session_registry.register( @@ -524,15 +524,15 @@ class TestResourcesDoesNotContainStateMetadata: session = await session_registry.get('run_resources_clean') assert session is not None - # Verify resources is clean - session_resources = session.get('resources', {}) + # Verify resources is nested under authorization and is clean. + assert 'resources' not in session + session_resources = session['authorization']['resources'] assert 'state_policy' not in session_resources, \ - "session['resources'] should NOT contain state_policy" + "authorization['resources'] should NOT contain state_policy" assert 'state_context' not in session_resources, \ - "session['resources'] should NOT contain state_context" + "authorization['resources'] should NOT contain state_context" - # Verify state metadata is at top level - assert 'state_policy' in session - assert 'state_context' in session + assert 'state_policy' in session['authorization'] + assert 'state_context' in session['authorization'] await session_registry.unregister('run_resources_clean')