refactor(agent-runner): make agent binding and auth snapshot explicit

This commit is contained in:
huanghuoguoguo
2026-06-03 18:45:27 +08:00
parent a850127893
commit 08c51118c5
22 changed files with 530 additions and 411 deletions

View File

@@ -16,7 +16,13 @@ from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .orchestrator import AgentRunOrchestrator
from .config_migration import ConfigMigration
from .session_registry import AgentRunSessionRegistry, AgentRunSession, get_session_registry
from .binding_resolver import AgentBindingResolver, AgentBindingResolutionError
from .session_registry import (
AgentRunSessionRegistry,
AgentRunSession,
RunAuthorizationSnapshot,
get_session_registry,
)
from .events import (
MESSAGE_RECEIVED,
MESSAGE_RECALLED,
@@ -41,8 +47,11 @@ __all__ = [
'AgentResultNormalizer',
'AgentRunOrchestrator',
'ConfigMigration',
'AgentBindingResolver',
'AgentBindingResolutionError',
'AgentRunSessionRegistry',
'AgentRunSession',
'RunAuthorizationSnapshot',
'get_session_registry',
'MESSAGE_RECEIVED',
'MESSAGE_RECALLED',

View File

@@ -0,0 +1,63 @@
"""Resolve host events to one effective Agent binding."""
from __future__ import annotations
from .host_models import AgentConfig, AgentBinding, AgentEventEnvelope, BindingScope
class AgentBindingResolutionError(Exception):
"""Raised when an event cannot resolve to exactly one Agent binding."""
class AgentBindingResolver:
"""Resolve an event to a single AgentBinding.
The target product model is one bot / IM channel -> one Agent. Fan-out,
observer agents, or multi-runner arbitration require separate delivery and
state semantics and are intentionally not hidden in this resolver.
"""
def resolve_one(
self,
event: AgentEventEnvelope,
agents: list[AgentConfig],
) -> AgentBinding:
"""Resolve exactly one enabled Agent for the event."""
matches = [
agent
for agent in agents
if agent.enabled and event.event_type in agent.event_types
]
if not matches:
raise AgentBindingResolutionError(
f'No Agent binding matches event_type={event.event_type}'
)
if len(matches) > 1:
agent_ids = ', '.join(agent.agent_id or '<anonymous>' for agent in matches)
raise AgentBindingResolutionError(
f'Multiple Agent bindings match event_type={event.event_type}: {agent_ids}'
)
return self._to_binding(matches[0])
def _to_binding(self, agent: AgentConfig) -> AgentBinding:
"""Project product-level Agent config into the run-time binding model."""
scope = BindingScope(
scope_type='agent',
scope_id=agent.agent_id,
)
return AgentBinding(
binding_id=f"agent_{agent.agent_id or 'default'}_{agent.runner_id}",
scope=scope,
event_types=list(agent.event_types),
runner_id=agent.runner_id,
runner_config=agent.runner_config,
resource_policy=agent.resource_policy,
state_policy=agent.state_policy,
delivery_policy=agent.delivery_policy,
enabled=agent.enabled,
agent_id=agent.agent_id,
)

View File

@@ -133,6 +133,42 @@ class DeliveryPolicy(pydantic.BaseModel):
"""Maximum message size."""
class AgentConfig(pydantic.BaseModel):
"""Host-side Agent configuration.
Product-level Agent is the target replacement for Pipeline-owned agent
config. Current Pipeline entry paths can project their config into this
model during migration.
"""
agent_id: str | None = None
"""Host-side Agent/config identifier."""
runner_id: str
"""Runner ID to invoke."""
runner_config: dict[str, typing.Any] = pydantic.Field(default_factory=dict)
"""Agent/runner binding configuration."""
resource_policy: ResourcePolicy = pydantic.Field(default_factory=ResourcePolicy)
"""Resource policy for this Agent."""
state_policy: StatePolicy = pydantic.Field(default_factory=StatePolicy)
"""State policy for this Agent."""
delivery_policy: DeliveryPolicy = pydantic.Field(default_factory=DeliveryPolicy)
"""Delivery policy for this Agent."""
event_types: list[str] = pydantic.Field(default_factory=lambda: ["message.received"])
"""Event types this Agent handles."""
enabled: bool = True
"""Whether this Agent can be selected by a binding resolver."""
metadata: dict[str, typing.Any] = pydantic.Field(default_factory=dict)
"""Non-protocol diagnostic metadata, such as legacy config source."""
class AgentBinding(pydantic.BaseModel):
"""Binding configuration for mapping events to runners.

View File

@@ -21,6 +21,7 @@ from .session_registry import get_session_registry, AgentRunSessionRegistry
from .config_migration import ConfigMigration
from .host_models import AgentEventEnvelope, AgentBinding
from .query_entry_adapter import QueryEntryAdapter
from .binding_resolver import AgentBindingResolver
from .state_scope import build_state_context
from .errors import (
RunnerNotFoundError,
@@ -61,6 +62,8 @@ class AgentRunOrchestrator:
result_normalizer: AgentResultNormalizer
binding_resolver: AgentBindingResolver
# Cached singleton references (set in __init__)
_session_registry: AgentRunSessionRegistry
_persistent_state_store: PersistentStateStore | None
@@ -75,6 +78,7 @@ class AgentRunOrchestrator:
self.context_builder = AgentRunContextBuilder(ap)
self.resource_builder = AgentResourceBuilder(ap)
self.result_normalizer = AgentResultNormalizer(ap)
self.binding_resolver = AgentBindingResolver()
# Cache singleton references to avoid per-request getter calls
self._session_registry = get_session_registry()
self._persistent_state_store = None # Lazy init on first use
@@ -258,8 +262,10 @@ class AgentRunOrchestrator:
# Convert Query to event-first envelope
event = QueryEntryAdapter.query_to_event(query)
# Convert current config to binding
binding = QueryEntryAdapter.config_to_binding(query, runner_id)
# Project legacy Pipeline config into target Agent config, then resolve
# exactly one effective binding for this event.
agent_config = QueryEntryAdapter.config_to_agent_config(query, runner_id)
binding = self.binding_resolver.resolve_one(event, [agent_config])
# Extract bound plugins for authorization
bound_plugins = query.variables.get('_pipeline_bound_plugins')

View File

@@ -21,9 +21,8 @@ from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from .host_models import (
AgentConfig,
AgentEventEnvelope,
AgentBinding,
BindingScope,
ResourcePolicy,
StatePolicy,
DeliveryPolicy,
@@ -36,7 +35,7 @@ class QueryEntryAdapter:
This adapter is responsible for:
- Converting Query to AgentEventEnvelope
- Converting current Agent/runner config to temporary AgentBinding
- Projecting current Pipeline config to temporary AgentConfig
- Putting Query-only fields into adapter context
"""
@@ -97,30 +96,17 @@ class QueryEntryAdapter:
)
@classmethod
def config_to_binding(
def config_to_agent_config(
cls,
query: pipeline_query.Query,
runner_id: str,
) -> AgentBinding:
"""Convert current config container to temporary AgentBinding.
Args:
query: Current entry query
runner_id: Resolved runner ID
Returns:
AgentBinding for this run
"""
) -> AgentConfig:
"""Project the current Pipeline config container into target Agent config."""
pipeline_config = query.pipeline_config or {}
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner_config', {}).get(runner_id, {})
agent_id = getattr(query, 'pipeline_uuid', None)
scope = BindingScope(
scope_type="agent",
scope_id=agent_id,
)
# Build resource policy from current config
resource_policy = ResourcePolicy(
allowed_model_uuids=cls._extract_allowed_models(query),
@@ -140,17 +126,16 @@ class QueryEntryAdapter:
enable_reply=True,
)
return AgentBinding(
binding_id=f"agent_{agent_id or 'default'}_{runner_id}",
scope=scope,
event_types=[runner_events.MESSAGE_RECEIVED],
return AgentConfig(
agent_id=agent_id,
runner_id=runner_id,
runner_config=runner_config,
resource_policy=resource_policy,
state_policy=state_policy,
delivery_policy=delivery_policy,
event_types=[runner_events.MESSAGE_RECEIVED],
enabled=True,
agent_id=agent_id,
metadata={'source': 'pipeline_adapter'},
)
@classmethod

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import copy
import typing
import time
import threading
@@ -15,6 +16,22 @@ class AgentRunSessionStatus(typing.TypedDict):
last_activity_at: int
class RunAuthorizationSnapshot(typing.TypedDict):
"""Frozen authorization data for one active run.
ResourceBuilder creates the authorized resource list once before runner
execution. Runtime proxy handlers must validate against this run-scoped
snapshot instead of recomputing resource policy.
"""
resources: AgentResources
permissions: dict[str, list[str]]
conversation_id: str | None
state_policy: dict[str, typing.Any]
state_context: dict[str, typing.Any]
authorized_ids: dict[str, set[str]]
class AgentRunSession(typing.TypedDict):
"""Session for an active agent runner execution.
@@ -25,25 +42,15 @@ class AgentRunSession(typing.TypedDict):
runner_id: Runner descriptor ID (plugin:author/name/runner)
query_id: Host entry query ID, only present for query-based adapters
plugin_identity: Plugin identifier (author/name) of the runner
conversation_id: Conversation ID for history/event access
resources: Authorized resources for this run (from AgentResources)
permissions: Runner permissions from descriptor (artifacts, history, events, etc.)
state_policy: State policy from binding (enable_state, state_scopes)
state_context: Context for state API (scope_keys, binding_identity, etc.)
authorization: Run-scoped authorization snapshot; runtime auth truth
status: Session status tracking
_authorized_ids: Pre-computed authorized resource IDs for O(1) lookup
"""
run_id: str
runner_id: str
query_id: int | None
plugin_identity: str # author/name
conversation_id: str | None
resources: AgentResources
permissions: dict[str, list[str]]
state_policy: dict[str, typing.Any] # {enable_state: bool, state_scopes: list}
state_context: dict[str, typing.Any] # {scope_keys: dict, binding_identity: str, ...}
authorization: RunAuthorizationSnapshot
status: AgentRunSessionStatus
_authorized_ids: dict[str, set[str]] # Pre-computed sets for O(1) lookup
class AgentRunSessionRegistry:
@@ -82,7 +89,7 @@ class AgentRunSessionRegistry:
Args:
run_id: Unique run identifier
runner_id: Runner descriptor ID
query_id: Host entry query ID, only present for query-based adapters
query_id: Host entry query ID, only present for query-based adapters
plugin_identity: Plugin identifier (author/name)
resources: Authorized resources for this run
conversation_id: Conversation ID for history/event access
@@ -102,36 +109,40 @@ class AgentRunSessionRegistry:
# Normalize state_context to empty dict if None
state_context = state_context or {}
# Pre-compute authorized resource IDs for O(1) lookup
authorized_ids: dict[str, set[str]] = {
'model': {m.get('model_id') for m in resources.get('models', [])},
'tool': {t.get('tool_name') for t in resources.get('tools', [])},
'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])},
'file': {f.get('file_id') for f in resources.get('files', [])},
resources_snapshot = copy.deepcopy(resources)
authorization: RunAuthorizationSnapshot = {
'resources': resources_snapshot,
'permissions': copy.deepcopy(permissions),
'conversation_id': conversation_id,
'state_policy': copy.deepcopy(state_policy),
'state_context': copy.deepcopy(state_context),
'authorized_ids': self._build_authorized_ids(resources_snapshot),
}
# NOTE: state_policy and state_context are stored at session top-level,
# NOT in resources. Resources should only contain resource authorization info.
session: AgentRunSession = {
'run_id': run_id,
'runner_id': runner_id,
'query_id': query_id,
'plugin_identity': plugin_identity,
'conversation_id': conversation_id,
'resources': resources, # Original AgentResources, no state metadata mixed in
'permissions': permissions,
'state_policy': state_policy,
'state_context': state_context,
'authorization': authorization,
'status': {
'started_at': now,
'last_activity_at': now,
},
'_authorized_ids': authorized_ids,
}
async with self._lock:
self._sessions[run_id] = session
def _build_authorized_ids(self, resources: AgentResources) -> dict[str, set[str]]:
"""Pre-compute authorized resource IDs for O(1) lookup."""
return {
'model': {m.get('model_id') for m in resources.get('models', [])},
'tool': {t.get('tool_name') for t in resources.get('tools', [])},
'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])},
'file': {f.get('file_id') for f in resources.get('files', [])},
}
async def unregister(self, run_id: str) -> None:
"""Unregister an agent run session.
@@ -182,13 +193,15 @@ class AgentRunSessionRegistry:
Returns:
True if resource is authorized, False otherwise
"""
authorized_ids = session.get('_authorized_ids', {})
authorization = session['authorization']
authorized_ids = authorization['authorized_ids']
resources = authorization['resources']
if resource_type in ('model', 'tool', 'knowledge_base', 'file'):
return resource_id in authorized_ids.get(resource_type, set())
if resource_type == 'storage':
storage = session['resources'].get('storage', {})
storage = resources.get('storage', {})
if resource_id == 'plugin':
return storage.get('plugin_storage', False)
elif resource_id == 'workspace':

View File

@@ -116,16 +116,17 @@ def _validate_artifact_access(
Without an explicit scope field, we enforce strict access control.
Args:
session: AgentRunSession dict with run_id, conversation_id, permissions
session: AgentRunSession dict with run_id and authorization snapshot
artifact_metadata: Artifact metadata dict with conversation_id, run_id
operation: Operation name for error messages ('metadata' or 'read')
Returns:
Tuple of (is_allowed, error_message). If is_allowed is False, error_message contains reason.
"""
authorization = session['authorization']
artifact_conversation_id = artifact_metadata.get('conversation_id')
artifact_run_id = artifact_metadata.get('run_id')
session_conversation_id = session.get('conversation_id')
session_conversation_id = authorization.get('conversation_id')
session_run_id = session.get('run_id')
# Rule 1: Created by this run (allows cross-conversation access for self-created artifacts)
@@ -141,6 +142,40 @@ def _validate_artifact_access(
return False, f'Artifact {operation} access denied: artifact not in session conversation and not created by this run'
def _get_run_authorization(session: dict[str, Any]) -> dict[str, Any]:
"""Return the run-scoped authorization snapshot."""
return session['authorization']
def _resolve_state_scope(
session: dict[str, Any],
scope: str,
) -> tuple[dict[str, Any] | None, str | None, handler.ActionResponse | None]:
"""Resolve state policy/context for an authorized run scope."""
authorization = _get_run_authorization(session)
state_policy = authorization['state_policy']
if not state_policy.get('enable_state', True):
return None, None, handler.ActionResponse.error(
message='State access is disabled by binding policy'
)
state_scopes = state_policy.get('state_scopes', ['conversation', 'actor'])
if scope not in state_scopes:
return None, None, handler.ActionResponse.error(
message=f'Scope "{scope}" is not enabled by binding policy'
)
state_context = authorization['state_context']
scope_key = state_context.get('scope_keys', {}).get(scope)
if not scope_key:
return None, None, handler.ActionResponse.error(
message=f'Scope key not available for scope "{scope}"'
)
return state_context, scope_key, None
async def _validate_agent_run_session(
run_id: str,
caller_plugin_identity: str | None,
@@ -173,7 +208,7 @@ async def _validate_agent_run_session(
)
if permission_group and permission_operation:
permissions = session.get('permissions', {})
permissions = _get_run_authorization(session)['permissions']
allowed_operations = permissions.get(permission_group, [])
if permission_operation not in allowed_operations:
return None, handler.ActionResponse.error(
@@ -189,7 +224,7 @@ def _resolve_run_conversation(
api_name: str,
) -> tuple[str | None, handler.ActionResponse | None]:
"""Resolve and enforce current-run conversation scope."""
session_conversation_id = session.get('conversation_id')
session_conversation_id = _get_run_authorization(session).get('conversation_id')
if requested_conversation_id:
if not session_conversation_id:
@@ -1572,7 +1607,7 @@ class RuntimeConnectionHandler(handler.Handler):
)
# Validate event is in the same conversation as the run, or was created by the same run.
session_conversation_id = session.get('conversation_id')
session_conversation_id = _get_run_authorization(session).get('conversation_id')
event_run_id = event.get('run_id')
if event_run_id and event_run_id == run_id:
return handler.ActionResponse.success(data=event)
@@ -1813,53 +1848,18 @@ class RuntimeConnectionHandler(handler.Handler):
if not key:
return handler.ActionResponse.error(message='key is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'State get',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get state policy from session (stored in state_policy field, not in resources)
state_policy = session.get('state_policy', {})
if not state_policy:
# Default state policy
state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']}
# Check if state is enabled
if not state_policy.get('enable_state', True):
return handler.ActionResponse.error(
message='State access is disabled by binding policy'
)
# Check if scope is enabled
state_scopes = state_policy.get('state_scopes', ['conversation', 'actor'])
if scope not in state_scopes:
return handler.ActionResponse.error(
message=f'Scope "{scope}" is not enabled by binding policy'
)
# Build scope key using state_context from session (stored in state_context field, not in resources)
state_context = session.get('state_context', {})
scope_key = state_context.get('scope_keys', {}).get(scope)
if not scope_key:
return handler.ActionResponse.error(
message=f'Scope key not available for scope "{scope}"'
)
_state_context, scope_key, state_error = _resolve_state_scope(session, scope)
if state_error:
return state_error
# Get state from persistent store
from ..agent.runner.persistent_state_store import get_persistent_state_store
@@ -1894,52 +1894,18 @@ class RuntimeConnectionHandler(handler.Handler):
if not key:
return handler.ActionResponse.error(message='key is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'State set',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get state policy from session (stored in state_policy field, not in resources)
state_policy = session.get('state_policy', {})
if not state_policy:
state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']}
# Check if state is enabled
if not state_policy.get('enable_state', True):
return handler.ActionResponse.error(
message='State access is disabled by binding policy'
)
# Check if scope is enabled
state_scopes = state_policy.get('state_scopes', ['conversation', 'actor'])
if scope not in state_scopes:
return handler.ActionResponse.error(
message=f'Scope "{scope}" is not enabled by binding policy'
)
# Build scope key using state_context from session (stored in state_context field, not in resources)
state_context = session.get('state_context', {})
scope_key = state_context.get('scope_keys', {}).get(scope)
if not scope_key:
return handler.ActionResponse.error(
message=f'Scope key not available for scope "{scope}"'
)
state_context, scope_key, state_error = _resolve_state_scope(session, scope)
if state_error:
return state_error
# Get additional context for DB insert
runner_id = session.get('runner_id', '')
@@ -1989,52 +1955,18 @@ class RuntimeConnectionHandler(handler.Handler):
if not key:
return handler.ActionResponse.error(message='key is required')
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'State delete',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get state policy from session (stored in state_policy field, not in resources)
state_policy = session.get('state_policy', {})
if not state_policy:
state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']}
# Check if state is enabled
if not state_policy.get('enable_state', True):
return handler.ActionResponse.error(
message='State access is disabled by binding policy'
)
# Check if scope is enabled
state_scopes = state_policy.get('state_scopes', ['conversation', 'actor'])
if scope not in state_scopes:
return handler.ActionResponse.error(
message=f'Scope "{scope}" is not enabled by binding policy'
)
# Build scope key using state_context from session (stored in state_context field, not in resources)
state_context = session.get('state_context', {})
scope_key = state_context.get('scope_keys', {}).get(scope)
if not scope_key:
return handler.ActionResponse.error(
message=f'Scope key not available for scope "{scope}"'
)
_state_context, scope_key, state_error = _resolve_state_scope(session, scope)
if state_error:
return state_error
# Delete state from persistent store
from ..agent.runner.persistent_state_store import get_persistent_state_store
@@ -2070,52 +2002,18 @@ class RuntimeConnectionHandler(handler.Handler):
limit = 100
limit = min(limit, 100) # Cap at 100
# Validate run session
session_registry = get_session_registry()
session = await session_registry.get(run_id)
if not session:
return handler.ActionResponse.error(
message=f'Run session {run_id} not found or expired'
)
session, error = await _validate_agent_run_session(
run_id,
caller_plugin_identity,
self.ap,
'State list',
)
if error:
return error
# Validate caller plugin identity (strict: required when session has plugin_identity)
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity:
if not caller_plugin_identity:
return handler.ActionResponse.error(
message=f'caller_plugin_identity is required for run_id {run_id}'
)
if caller_plugin_identity != session_plugin_identity:
return handler.ActionResponse.error(
message=f'Plugin identity mismatch for run_id {run_id}'
)
# Get state policy from session (stored in state_policy field, not in resources)
state_policy = session.get('state_policy', {})
if not state_policy:
state_policy = {'enable_state': True, 'state_scopes': ['conversation', 'actor']}
# Check if state is enabled
if not state_policy.get('enable_state', True):
return handler.ActionResponse.error(
message='State access is disabled by binding policy'
)
# Check if scope is enabled
state_scopes = state_policy.get('state_scopes', ['conversation', 'actor'])
if scope not in state_scopes:
return handler.ActionResponse.error(
message=f'Scope "{scope}" is not enabled by binding policy'
)
# Build scope key using state_context from session (stored in state_context field, not in resources)
state_context = session.get('state_context', {})
scope_key = state_context.get('scope_keys', {}).get(scope)
if not scope_key:
return handler.ActionResponse.error(
message=f'Scope key not available for scope "{scope}"'
)
_state_context, scope_key, state_error = _resolve_state_scope(session, scope)
if state_error:
return state_error
# List state keys from persistent store
from ..agent.runner.persistent_state_store import get_persistent_state_store