refactor(agent-runner): tighten protocol v1 runtime boundaries

This commit is contained in:
huanghuoguoguo
2026-05-25 10:34:16 +08:00
parent 90dffa7cd8
commit 9330a684fe
26 changed files with 548 additions and 3291 deletions

View File

@@ -12,7 +12,6 @@ from .errors import (
)
from .registry import AgentRunnerRegistry
from .context_builder import AgentRunContextBuilder
from .context_packager import AgentContextPackager
from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .orchestrator import AgentRunOrchestrator
@@ -38,7 +37,6 @@ __all__ = [
'RunnerExecutionError',
'AgentRunnerRegistry',
'AgentRunContextBuilder',
'AgentContextPackager',
'AgentResourceBuilder',
'AgentResultNormalizer',
'AgentRunOrchestrator',

View File

@@ -5,16 +5,9 @@ import uuid
import time
import typing
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from ...core import app
from .descriptor import AgentRunnerDescriptor
from .config_migration import ConfigMigration
from .context_packager import AgentContextPackager
from .state_store import get_state_store
from .persistent_state_store import get_persistent_state_store
from . import events as runner_events
from .host_models import AgentEventEnvelope, AgentBinding
@@ -33,12 +26,14 @@ class AgentTrigger(typing.TypedDict):
class ConversationContext(typing.TypedDict):
"""Conversation context."""
session_id: str | None
conversation_id: str | None
thread_id: str | None
launcher_type: str | None
launcher_id: str | None
sender_id: str | None
bot_uuid: str | None
bot_id: str | None
workspace_id: str | None
session_id: str | None
pipeline_uuid: str | None
@@ -145,36 +140,22 @@ class AgentRunContextPayload(typing.TypedDict):
class AgentRunContextBuilder:
"""Builder for provisioning AgentRunContext.
Two entry points:
- build_context_from_event(event, binding): Event-first Protocol v1
- build_context(query, descriptor, resources): Pipeline adapter Query-based entry
Responsibilities:
- Generate new run_id (UUID, not query id)
- Set trigger type based on source
- Build conversation context from session/event
- Build input from user_message/event
- Build params with filtering
- Build state snapshot from state_store
- Set trigger type based on event source
- Build conversation context from event
- Build input from event
- Build state snapshot from PersistentStateStore
- Build runtime context with host info, trace_id, deadline
- Set config from runner binding configuration
- Set config from runner binding configuration.
Pipeline Query adaptation belongs to PipelineAdapter, not this builder.
"""
ap: app.Application
# Params filtering rules
# Exclude variables starting with underscore (internal)
INTERNAL_PREFIX = '_'
# Exclude variables with sensitive naming patterns
SENSITIVE_PATTERNS = ('secret', 'token', 'key', 'password', 'credential', 'api_key', 'apikey')
# Exclude permission/control variables
PERMISSION_VARS = ('_pipeline_bound_plugins', '_authorized', '_permission')
def __init__(self, ap: app.Application):
self.ap = ap
self.context_packager = AgentContextPackager()
async def build_context_from_event(
self,
@@ -217,7 +198,8 @@ class AgentRunContextBuilder:
'launcher_type': None, # Will be filled from actor/subject if needed
'launcher_id': None,
'sender_id': event.actor.actor_id if event.actor else None,
'bot_uuid': event.bot_id,
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'pipeline_uuid': binding.pipeline_uuid, # Pipeline adapter field
}
@@ -227,8 +209,9 @@ class AgentRunContextBuilder:
'event_type': event.event_type,
'event_time': event.event_time,
'source': event.source,
'source_event_type': None,
'data': {},
'source_event_type': event.source_event_type,
'raw_ref': event.raw_ref.model_dump(mode='json') if event.raw_ref else None,
'data': event.data,
}
# Build actor context
@@ -323,427 +306,6 @@ class AgentRunContextBuilder:
return context
async def build_context(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
resources: AgentResources,
) -> AgentRunContextPayload:
"""Build AgentRunContext envelope from Query.
This is a Pipeline adapter wrapper that converts Query to event + binding
and delegates to build_context_from_event().
For Protocol v1, messages are NOT inlined by default.
Pipeline max-round only affects bootstrap, NOT Protocol v1 entities.
Args:
query: Pipeline query
descriptor: Runner descriptor
resources: Built resources from AgentResourceBuilder
Returns:
AgentRunContext payload for the plugin runner
"""
# Resolve runner config for binding
runner_id = descriptor.id
runner_config = ConfigMigration.resolve_runner_config(
query.pipeline_config,
runner_id,
)
# Extract max_round for Pipeline adapter bootstrap (NOT Protocol v1)
# Note: config uses 'max-round' with hyphen, not 'max_round'
max_round = runner_config.get('max-round')
if max_round is None:
ai_config = query.pipeline_config.get('ai', {}) if query.pipeline_config else {}
max_round = ai_config.get('max-round')
# Build trigger
trigger: AgentTrigger = {
'type': runner_events.MESSAGE_RECEIVED,
'source': 'pipeline',
'timestamp': int(time.time()),
}
# Build conversation context
conversation: ConversationContext | None = None
session = getattr(query, 'session', None)
if session:
conversation = {
'session_id': f'{getattr(session, "launcher_type", "").value if hasattr(getattr(session, "launcher_type", ""), "value") else getattr(session, "launcher_type", "")}_{getattr(session, "launcher_id", "")}',
'conversation_id': getattr(getattr(session, 'using_conversation', None), 'uuid', None),
'launcher_type': getattr(session, 'launcher_type', None).value if hasattr(getattr(session, 'launcher_type', None), 'value') else getattr(session, 'launcher_type', None),
'launcher_id': getattr(session, 'launcher_id', None),
'sender_id': str(getattr(query, 'sender_id', '')) if getattr(query, 'sender_id', None) else None,
'bot_uuid': getattr(query, 'bot_uuid', None),
'pipeline_uuid': getattr(query, 'pipeline_uuid', None),
}
# Build input
input: AgentInput = self._build_input(query)
# Build params from query.variables with filtering
params = self._build_params(query)
# Build state snapshot from state_store
state_store = get_state_store()
state: AgentRunState = state_store.build_snapshot(query, descriptor)
streaming_supported = await self._is_stream_output_supported(query)
remove_think = query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False) if query.pipeline_config else False
# Build runtime context
run_id = str(uuid.uuid4())
runtime: AgentRuntimeContext = {
'langbot_version': self.ap.ver_mgr.get_current_version(),
'sdk_protocol_version': descriptor.protocol_version,
'query_id': query.query_id,
'trace_id': run_id, # Use run_id as trace_id for now
'deadline_at': self._build_deadline(runner_config),
'metadata': {
'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown') if query.variables else 'Unknown',
'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown') if query.variables else 'Unknown',
'streaming_supported': streaming_supported,
'remove_think': remove_think,
},
}
# Build delivery context from query adapter capabilities
delivery_context = {
'surface': 'pipeline',
'reply_target': None,
'supports_streaming': streaming_supported,
'supports_edit': False,
'supports_reaction': False,
'max_message_size': None,
'platform_capabilities': {},
}
# Build context access for the direct Query adapter helper.
# The event-first run_from_query path uses build_context_from_event().
context_access = {
'conversation_id': conversation.get('conversation_id') if conversation else None,
'thread_id': None,
'latest_cursor': None,
'event_seq': None,
'transcript_seq': None,
'has_history_before': False,
'inline_policy': {
'mode': 'current_event',
'delivered_count': 0,
'source_total_count': None,
'messages_complete': False,
'reason': 'pipeline_adapter',
},
'available_apis': {
'history_page': False,
'history_search': False,
'event_get': False,
'event_page': False,
'artifact_metadata': False,
'artifact_read': False,
'state': False,
'storage': True,
},
}
# Build adapter context (for Pipeline adapter fields)
adapter_context = {
'query_id': query.query_id,
'pipeline_uuid': getattr(query, 'pipeline_uuid', None),
'max_round': max_round, # For reference only
'adapter_messages': [], # Will be filled if max_round is set
'extra': {
'params': params, # Put params in adapter.extra
'prompt': self._build_prompt(query), # Put prompt in adapter.extra
},
}
# Build bootstrap context (optional, for Pipeline adapter max-round)
bootstrap_context = None
# For Pipeline adapter: add bootstrap messages if max_round is set
# This goes into bootstrap.messages, NOT top-level messages
if max_round and max_round > 0:
packaged_context = self.context_packager.package_messages(query, runner_config)
adapter_messages = self._build_messages(packaged_context.messages)
# Put in bootstrap for Protocol v1
bootstrap_context = {
'messages': adapter_messages,
'summary': None,
'artifacts': [],
'metadata': {},
}
# Also update adapter for transition runners
adapter_context['adapter_messages'] = adapter_messages
# Update runtime metadata
runtime['metadata']['context_packaging'] = {
'policy': packaged_context.policy,
'history': packaged_context.history,
}
# Build full context - Protocol v1 structure
context: AgentRunContextPayload = {
'run_id': run_id,
'trigger': trigger,
'conversation': conversation,
'event': self._build_event(query), # REQUIRED
'actor': self._build_actor(query),
'subject': self._build_subject(query),
'input': input,
'delivery': delivery_context, # REQUIRED
'resources': resources,
'context': context_access, # ContextAccess - REQUIRED
'state': state,
'runtime': runtime,
'config': runner_config,
'bootstrap': bootstrap_context, # Optional bootstrap
'adapter': adapter_context, # Pipeline adapter context
'metadata': {}, # Additional metadata
}
return context
def _build_input(self, query: pipeline_query.Query) -> AgentInput:
"""Build AgentInput from query."""
text = None
text_parts: list[str] = []
contents: list[dict[str, typing.Any]] = []
if query.user_message:
# Extract text if content is single text element
if isinstance(query.user_message.content, list):
for elem in query.user_message.content:
contents.append(elem.model_dump(mode='json'))
if elem.type == 'text':
elem_text = getattr(elem, 'text', None)
if elem_text:
text_parts.append(elem_text)
else:
# Single string content
text = str(query.user_message.content)
contents.append({'type': 'text', 'text': text})
if text_parts:
text = ''.join(text_parts)
# Include message_chain for platform-specific format
message_chain_dict = None
if query.message_chain:
message_chain_dict = query.message_chain.model_dump(mode='json')
return {
'text': text,
'contents': contents,
'message_chain': message_chain_dict,
'attachments': self._build_attachments(query, contents),
}
def _build_attachments(
self,
query: pipeline_query.Query,
contents: list[dict[str, typing.Any]],
) -> list[dict[str, typing.Any]]:
"""Extract runner-consumable attachment data from input contents."""
attachments: list[dict[str, typing.Any]] = []
for elem in contents:
elem_type = elem.get('type')
if elem_type == 'image_url':
image_url = elem.get('image_url') or {}
attachments.append(
{
'type': 'image',
'source': 'url',
'url': image_url.get('url') if isinstance(image_url, dict) else str(image_url),
}
)
elif elem_type == 'image_base64':
image_base64 = elem.get('image_base64')
attachments.append(
{
'type': 'image',
'source': 'base64',
'content': image_base64,
'content_type': self._infer_base64_content_type(image_base64, 'image/jpeg'),
'name': 'image',
'has_content': bool(image_base64),
}
)
elif elem_type == 'file_url':
attachments.append(
{
'type': 'file',
'source': 'url',
'url': elem.get('file_url'),
'name': elem.get('file_name'),
}
)
elif elem_type == 'file_base64':
file_base64 = elem.get('file_base64')
attachments.append(
{
'type': 'file',
'source': 'base64',
'name': elem.get('file_name'),
'content': file_base64,
'content_type': self._infer_base64_content_type(file_base64, 'application/octet-stream'),
'has_content': bool(file_base64),
}
)
message_chain = getattr(query, 'message_chain', None)
if message_chain:
for component in message_chain:
if isinstance(component, platform_message.Image):
attachments.append(
{
'type': 'image',
'source': 'message_chain',
'id': component.image_id or None,
'url': component.url or None,
'path': str(component.path) if component.path else None,
'content': component.base64 or None,
'content_type': self._infer_base64_content_type(component.base64, 'image/jpeg'),
'name': 'image',
'has_content': bool(component.base64),
}
)
elif isinstance(component, platform_message.File):
attachments.append(
{
'type': 'file',
'source': 'message_chain',
'id': component.id or None,
'name': component.name or None,
'size': component.size or 0,
'url': component.url or None,
'path': component.path or None,
'content': component.base64 or None,
'content_type': self._infer_base64_content_type(component.base64, 'application/octet-stream'),
'has_content': bool(component.base64),
}
)
elif isinstance(component, platform_message.Voice):
attachments.append(
{
'type': 'voice',
'source': 'message_chain',
'id': component.voice_id or None,
'url': component.url or None,
'path': component.path or None,
'duration': component.length or 0,
'content': component.base64 or None,
'content_type': self._infer_base64_content_type(component.base64, 'audio/mpeg'),
'name': 'voice',
'has_content': bool(component.base64),
}
)
return attachments
def _infer_base64_content_type(self, value: typing.Any, default: str) -> str:
"""Infer MIME type from a data URL base64 value."""
if not isinstance(value, str):
return default
if value.startswith('data:') and ';base64,' in value:
return value[5:value.find(';base64,')] or default
return default
def _build_event(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build a minimal EBA-compatible event envelope from the message query.
The public event_type must be a stable AgentRunner protocol name. Keep
platform or SDK class names inside event_data so future EventRouter
events can share the same top-level naming contract.
"""
message_event = getattr(query, 'message_event', None)
event_data: dict[str, typing.Any] = {}
if message_event and hasattr(message_event, 'model_dump'):
try:
event_data = message_event.model_dump(mode='json')
except TypeError:
event_data = message_event.model_dump()
except Exception:
event_data = {}
event_data.pop('source_platform_object', None)
source_event_type = getattr(message_event, 'type', None) if message_event else None
if source_event_type:
event_data.setdefault('source_event_type', source_event_type)
message_chain = getattr(query, 'message_chain', None)
message_id = getattr(message_chain, 'message_id', None)
if message_id == -1:
message_id = None
event_time = getattr(message_event, 'time', None) if message_event else None
event_timestamp = int(event_time) if isinstance(event_time, (int, float)) else None
return {
'event_type': runner_events.MESSAGE_RECEIVED,
'event_id': str(message_id or getattr(query, 'query_id', '')),
'event_timestamp': event_timestamp,
'event_data': event_data,
}
def _build_actor(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build actor context for the sender that triggered the run."""
message_event = getattr(query, 'message_event', None)
sender = getattr(message_event, 'sender', None) if message_event else None
actor_id = getattr(sender, 'id', None) or getattr(query, 'sender_id', None)
actor_name = sender.get_name() if sender and hasattr(sender, 'get_name') else None
return {
'actor_type': 'user',
'actor_id': str(actor_id) if actor_id is not None else None,
'actor_name': actor_name,
}
def _build_subject(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build subject context for the current message."""
message_chain = getattr(query, 'message_chain', None)
message_id = getattr(message_chain, 'message_id', None)
if message_id == -1:
message_id = None
launcher_type = getattr(query, 'launcher_type', None)
launcher_type_value = getattr(launcher_type, 'value', launcher_type)
return {
'subject_type': 'message',
'subject_id': str(message_id or getattr(query, 'query_id', '')),
'subject_data': {
'launcher_type': launcher_type_value,
'launcher_id': getattr(query, 'launcher_id', None),
'sender_id': str(getattr(query, 'sender_id', '')),
'bot_uuid': getattr(query, 'bot_uuid', None),
'pipeline_uuid': getattr(query, 'pipeline_uuid', None),
},
}
def _build_deadline(self, runner_config: dict[str, typing.Any]) -> float | None:
"""Build deadline timestamp from runner timeout config.
A missing timeout uses the host default. Explicit null, zero, or negative
values disable the total run deadline for advanced deployments.
"""
timeout = runner_config.get('timeout', DEFAULT_RUNNER_TIMEOUT_SECONDS)
if timeout is None:
return None
try:
timeout_seconds = float(timeout)
except (TypeError, ValueError):
return None
if timeout_seconds <= 0:
return None
return time.time() + timeout_seconds
def _build_deadline_from_binding(self, binding: AgentBinding) -> float | None:
"""Build deadline timestamp from binding timeout config.
@@ -767,106 +329,6 @@ class AgentRunContextBuilder:
return time.time() + timeout_seconds
async def _is_stream_output_supported(self, query: pipeline_query.Query) -> bool:
"""Check whether the current adapter can consume streaming chunks."""
try:
return await query.adapter.is_stream_output_supported()
except AttributeError:
return False
except Exception:
return False
def _build_prompt(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
"""Build effective prompt messages from query.prompt after preprocessing."""
prompt_messages: list[dict[str, typing.Any]] = []
prompt = getattr(query, 'prompt', None)
messages = getattr(prompt, 'messages', None)
if not messages:
return prompt_messages
for msg in messages:
prompt_messages.append(msg.model_dump(mode='json'))
return prompt_messages
def _build_messages(self, source_messages: list[typing.Any]) -> list[dict[str, typing.Any]]:
"""Build messages list from packaged source messages."""
messages: list[dict[str, typing.Any]] = []
for msg in source_messages:
messages.append(msg.model_dump(mode='json'))
return messages
def _build_params(self, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build params from query.variables with filtering.
Filtering rules:
1. Exclude variables starting with underscore (internal)
2. Exclude variables with sensitive naming patterns (secret, token, key, password)
3. Exclude permission/control variables
4. Keep only JSON-serializable values
Args:
query: Pipeline query
Returns:
Filtered params dict
"""
params: dict[str, typing.Any] = {}
if not query.variables:
return params
for key, value in query.variables.items():
# Filter internal variables (starting with underscore)
if key.startswith(self.INTERNAL_PREFIX):
continue
# Filter sensitive naming patterns
key_lower = key.lower()
if any(pattern in key_lower for pattern in self.SENSITIVE_PATTERNS):
continue
# Filter permission variables
if any(key == perm_var or key.startswith(perm_var) for perm_var in self.PERMISSION_VARS):
continue
# Keep only JSON-serializable values
if self._is_json_serializable(value):
params[key] = value
return params
def _is_json_serializable(self, value: typing.Any) -> bool:
"""Check if value is JSON-serializable.
Note: set is NOT JSON-serializable. json.dumps({"x": {1}}) fails.
Only list and tuple are allowed as collection types.
Args:
value: Value to check
Returns:
True if JSON-serializable, False otherwise
"""
if value is None:
return True
if isinstance(value, (str, int, float, bool)):
return True
# Only allow list and tuple, NOT set (set is not JSON-serializable)
if isinstance(value, (list, tuple)):
return all(self._is_json_serializable(item) for item in value)
if isinstance(value, dict):
return all(
isinstance(k, str) and self._is_json_serializable(v)
for k, v in value.items()
)
# Pydantic models and other complex types are not directly serializable
# as params (they may have internal structure not meant for runners)
return False
async def _build_context_access(
self,
event: AgentEventEnvelope,
@@ -899,8 +361,7 @@ class AgentRunContextBuilder:
artifact_metadata_enabled = 'metadata' in artifact_permissions
artifact_read_enabled = 'read' in artifact_permissions
# Determine state API availability based on binding state_policy (event-first mode)
# Direct Query context builder does not expose persistent state API.
# Determine state API availability based on binding state_policy.
state_enabled = False
if binding is not None:
state_policy = binding.state_policy

View File

@@ -1,74 +0,0 @@
"""Agent context packaging helpers."""
from __future__ import annotations
import dataclasses
import typing
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
DEFAULT_MAX_ROUND = 10
@dataclasses.dataclass(frozen=True)
class ContextPackagingResult:
"""Packaged working context for one AgentRunner run."""
messages: list[typing.Any]
policy: dict[str, typing.Any]
history: dict[str, typing.Any]
def get_max_round(runner_config: dict[str, typing.Any]) -> typing.Any:
"""Return the configured Pipeline adapter max-round value."""
return runner_config.get('max-round', DEFAULT_MAX_ROUND)
def select_max_round_messages(
messages: list[typing.Any] | None,
max_round: typing.Any,
) -> list[typing.Any]:
"""Select a bounded recent message window by user-round count."""
if not messages:
return []
temp_messages: list[typing.Any] = []
current_round = 0
for msg in messages[::-1]:
if current_round < max_round:
temp_messages.append(msg)
if getattr(msg, 'role', None) == 'user':
current_round += 1
else:
break
return temp_messages[::-1]
class AgentContextPackager:
"""Build the bounded working context for AgentRunner execution."""
def package_messages(
self,
query: pipeline_query.Query,
runner_config: dict[str, typing.Any],
) -> ContextPackagingResult:
"""Package query messages using the Pipeline adapter max-round policy."""
source_messages = query.messages or []
max_round = get_max_round(runner_config)
packaged_messages = select_max_round_messages(source_messages, max_round)
return ContextPackagingResult(
messages=packaged_messages,
policy={
'mode': 'max_round',
'max_round': max_round,
},
history={
'source': 'query.messages',
'source_total_count': len(source_messages),
'delivered_count': len(packaged_messages),
'messages_complete': len(packaged_messages) == len(source_messages),
},
)

View File

@@ -37,6 +37,9 @@ class AgentEventEnvelope(pydantic.BaseModel):
source: str
"""Event source (platform, webui, api, scheduler, system)."""
source_event_type: str | None = None
"""Original source event type, when available."""
bot_id: str | None = None
"""Bot UUID handling this event."""
@@ -64,6 +67,9 @@ class AgentEventEnvelope(pydantic.BaseModel):
raw_ref: RawEventRef | None = None
"""Reference to raw event payload."""
data: dict[str, typing.Any] = pydantic.Field(default_factory=dict)
"""Small structured event payload. Large payloads should be referenced via raw_ref/artifacts."""
# Binding scope types
class BindingScope(pydantic.BaseModel):

View File

@@ -16,12 +16,12 @@ from .registry import AgentRunnerRegistry
from .context_builder import AgentRunContextBuilder, AgentRunContextPayload
from .resource_builder import AgentResourceBuilder
from .result_normalizer import AgentResultNormalizer
from .state_store import get_state_store, RunnerScopedStateStore
from .persistent_state_store import get_persistent_state_store, PersistentStateStore
from .session_registry import get_session_registry, AgentRunSessionRegistry
from .config_migration import ConfigMigration
from .host_models import AgentEventEnvelope, AgentBinding
from .pipeline_adapter import PipelineAdapter
from .state_scope import build_state_context
from .errors import (
RunnerNotFoundError,
RunnerExecutionError,
@@ -63,7 +63,6 @@ class AgentRunOrchestrator:
# Cached singleton references (set in __init__)
_session_registry: AgentRunSessionRegistry
_state_store: RunnerScopedStateStore
_persistent_state_store: PersistentStateStore | None
def __init__(
@@ -78,7 +77,6 @@ class AgentRunOrchestrator:
self.result_normalizer = AgentResultNormalizer(ap)
# Cache singleton references to avoid per-request getter calls
self._session_registry = get_session_registry()
self._state_store = get_state_store()
self._persistent_state_store = None # Lazy init on first use
async def run(
@@ -132,13 +130,13 @@ class AgentRunOrchestrator:
# Merge params into adapter.extra
if 'params' in adapter_context:
context['adapter']['extra']['params'] = adapter_context['params']
# Merge prompt into adapter.extra (for transition runners)
# Merge prompt into adapter.extra for Pipeline adapter consumers.
if 'prompt' in adapter_context:
context['adapter']['extra']['prompt'] = adapter_context['prompt']
# Merge bootstrap if provided
if adapter_context.get('bootstrap'):
context['bootstrap'] = adapter_context['bootstrap']
# Also set adapter_messages for transition runners
# Also expose the bootstrap window through adapter metadata.
bootstrap_messages = adapter_context['bootstrap'].get('messages')
if bootstrap_messages:
context['adapter']['adapter_messages'] = bootstrap_messages
@@ -150,7 +148,7 @@ class AgentRunOrchestrator:
context['runtime']['query_id'] = adapter_context['query_id']
# Build state context for State API handlers
state_context = self._build_state_context(event, binding, descriptor)
state_context = build_state_context(event, binding, descriptor)
# Register session for proxy action permission validation
run_id = context['run_id']
@@ -274,7 +272,7 @@ class AgentRunOrchestrator:
bound_plugins = query.variables.get('_pipeline_bound_plugins')
# Build adapter context for Pipeline-specific fields
adapter_context = await self._build_adapter_context(query, binding)
adapter_context = PipelineAdapter.build_adapter_context(query, binding)
# Delegate to event-first run()
async for result in self.run(
@@ -285,73 +283,6 @@ class AgentRunOrchestrator:
):
yield result
async def _build_adapter_context(
self,
query: pipeline_query.Query,
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build adapter context for Pipeline Query-based flow.
This extracts adapter-specific fields from Query that aren't available in
the event-first flow:
- params (from query.variables)
- bootstrap messages (for max-round)
- query_id
- prompt messages
Args:
query: Pipeline query
binding: Agent binding with max_round
Returns:
Adapter context dict
"""
from .context_packager import AgentContextPackager
# Use context_builder's _build_params for proper filtering
# (excludes internal vars, sensitive patterns, permission vars, non-JSON values)
params = self.context_builder._build_params(query)
# Build prompt from query.prompt.messages (for transition runners)
prompt = self.context_builder._build_prompt(query)
# Build bootstrap context for max-round
bootstrap = None
runtime_metadata = {}
max_round = binding.max_round
if max_round and max_round > 0 and query.messages:
# Package messages using context_packager
runner_config = binding.runner_config or {}
context_packager = AgentContextPackager()
packaged_context = context_packager.package_messages(query, runner_config)
# Build messages list
adapter_messages = []
for msg in packaged_context.messages:
adapter_messages.append(msg.model_dump(mode='json'))
bootstrap = {
'messages': adapter_messages,
'summary': None,
'artifacts': [],
'metadata': {},
}
# Build runtime metadata for context_packaging
runtime_metadata['context_packaging'] = {
'policy': packaged_context.policy,
'history': packaged_context.history,
}
return {
'params': params,
'prompt': prompt,
'bootstrap': bootstrap,
'query_id': query.query_id,
'runtime_metadata': runtime_metadata,
}
async def _invoke_runner(
self,
descriptor: AgentRunnerDescriptor,
@@ -497,18 +428,22 @@ class AgentRunOrchestrator:
"""
data = result_dict.get('data', {})
# Extract scope (default to conversation when omitted by the runner)
scope = data.get('scope', 'conversation')
scope = data.get('scope')
if not scope:
raise RunnerProtocolError(
descriptor.id,
'state.updated missing required field: scope',
)
# Extract key and value
key = data.get('key')
value = data.get('value')
if not key:
self.ap.logger.warning(
f'Runner {descriptor.id} state.updated missing key, ignoring'
raise RunnerProtocolError(
descriptor.id,
'state.updated missing required field: key',
)
return
# Lazy init persistent state store
if self._persistent_state_store is None:
@@ -536,75 +471,6 @@ class AgentRunOrchestrator:
f'Runner {descriptor.id} state.updated rejected: {error}'
)
def _build_state_context(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> dict[str, typing.Any]:
"""Build state context for State API handlers.
Returns context with:
- scope_keys: Dict mapping scope name to scope_key
- binding_identity: Binding identity for state isolation
- Additional context fields for DB insert
"""
# Get binding identity
binding_identity = binding.binding_id
if not binding_identity:
scope = binding.scope
if scope.scope_type and scope.scope_id:
binding_identity = f"{scope.scope_type}:{scope.scope_id}"
else:
binding_identity = "unknown_binding"
# Build scope keys for each scope
scope_keys: dict[str, str] = {}
# Conversation scope
if event.conversation_id:
parts = [descriptor.id, binding_identity, event.conversation_id]
if event.thread_id:
parts.append(event.thread_id)
scope_keys['conversation'] = f'conversation:{":".join(parts)}'
# Actor scope
if event.actor and event.actor.actor_id:
parts = [
descriptor.id,
binding_identity,
event.actor.actor_type or 'user',
event.actor.actor_id,
]
scope_keys['actor'] = f'actor:{":".join(parts)}'
# Subject scope
if event.subject and event.subject.subject_id:
parts = [
descriptor.id,
binding_identity,
event.subject.subject_type or 'unknown',
event.subject.subject_id,
]
scope_keys['subject'] = f'subject:{":".join(parts)}'
# Runner scope (always available)
parts = [descriptor.id, binding_identity]
scope_keys['runner'] = f'runner:{":".join(parts)}'
return {
'scope_keys': scope_keys,
'binding_identity': binding_identity,
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'conversation_id': event.conversation_id,
'thread_id': event.thread_id,
'actor_type': event.actor.actor_type if event.actor else None,
'actor_id': event.actor.actor_id if event.actor else None,
'subject_type': event.subject.subject_type if event.subject else None,
'subject_id': event.subject.subject_id if event.subject else None,
}
async def _write_event_log(
self,
event: AgentEventEnvelope,

View File

@@ -6,7 +6,6 @@ from __future__ import annotations
import typing
import json
import asyncio
import threading
from datetime import datetime
@@ -14,21 +13,17 @@ import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy import select, delete, update
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from .descriptor import AgentRunnerDescriptor
from .host_models import AgentEventEnvelope, AgentBinding
from .state_scope import (
VALID_STATE_SCOPES,
build_state_scope_key,
get_binding_identity,
normalize_state_key,
)
from ...entity.persistence.agent_runner_state import AgentRunnerState
# Valid state scopes for agent runner state updates.
VALID_STATE_SCOPES = ('conversation', 'actor', 'subject', 'runner')
# External-facing key aliases accepted from runners.
STATE_KEY_ALIASES = {
'conversation_id': 'external.conversation_id',
}
# Maximum value_json size (256KB)
MAX_VALUE_JSON_BYTES = 256 * 1024
@@ -52,89 +47,6 @@ class PersistentStateStore:
def __init__(self, db_engine: AsyncEngine):
self._db_engine = db_engine
# ========== Scope Key Building (shared with in-memory store) ==========
def _get_binding_identity(self, binding: AgentBinding) -> str:
"""Get stable binding identity for scope key."""
if binding.binding_id:
return binding.binding_id
scope = binding.scope
if scope.scope_type and scope.scope_id:
return f"{scope.scope_type}:{scope.scope_id}"
return "unknown_binding"
def _make_conversation_scope_key(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build conversation scope key from event and binding."""
if not event.conversation_id:
return None
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
event.conversation_id,
]
if event.thread_id:
parts.append(event.thread_id)
return f'conversation:{":".join(parts)}'
def _make_actor_scope_key(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build actor scope key from event and binding."""
if not event.actor or not event.actor.actor_id:
return None
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
event.actor.actor_type or 'user',
event.actor.actor_id,
]
return f'actor:{":".join(parts)}'
def _make_subject_scope_key(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build subject scope key from event and binding."""
if not event.subject or not event.subject.subject_id:
return None
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
event.subject.subject_type or 'unknown',
event.subject.subject_id,
]
return f'subject:{":".join(parts)}'
def _make_runner_scope_key(
self,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> str:
"""Build runner scope key from event and binding."""
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
]
return f'runner:{":".join(parts)}'
def _get_scope_key(
self,
scope: str,
@@ -143,15 +55,7 @@ class PersistentStateStore:
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Get scope key for given scope."""
if scope == 'conversation':
return self._make_conversation_scope_key(event, binding, descriptor)
elif scope == 'actor':
return self._make_actor_scope_key(event, binding, descriptor)
elif scope == 'subject':
return self._make_subject_scope_key(event, binding, descriptor)
elif scope == 'runner':
return self._make_runner_scope_key(event, binding, descriptor)
return None
return build_state_scope_key(scope, event, binding, descriptor)
def _check_scope_enabled(self, scope: str, binding: AgentBinding) -> bool:
"""Check if scope is enabled by binding's state_policy."""
@@ -276,8 +180,7 @@ class PersistentStateStore:
return False, f'Scope "{scope}" not enabled by binding policy'
# Map accepted key aliases
if key in STATE_KEY_ALIASES:
key = STATE_KEY_ALIASES[key]
key = normalize_state_key(key)
# Get scope key
scope_key = self._get_scope_key(scope, event, binding, descriptor)
@@ -290,7 +193,7 @@ class PersistentStateStore:
return False, error
# Build context fields
binding_identity = self._get_binding_identity(binding)
binding_identity = get_binding_identity(binding)
async with self._db_engine.begin() as conn:
# Check if entry exists

View File

@@ -30,6 +30,7 @@ from .host_models import (
DeliveryPolicy,
)
from . import events as runner_events
from ...pipeline.msgtrun.round_policy import select_max_round_messages
class PipelineAdapter:
@@ -42,6 +43,10 @@ class PipelineAdapter:
- Putting Query-only fields into adapter context
"""
INTERNAL_PREFIX = '_'
SENSITIVE_PATTERNS = ('secret', 'token', 'key', 'password', 'credential', 'api_key', 'apikey')
PERMISSION_VARS = ('_pipeline_bound_plugins', '_authorized', '_permission')
@classmethod
def query_to_event(
cls,
@@ -81,6 +86,7 @@ class PipelineAdapter:
event_type=event.event_type or runner_events.MESSAGE_RECEIVED,
event_time=event.event_time,
source="pipeline_adapter",
source_event_type=event.source_event_type,
bot_id=query.bot_uuid,
workspace_id=None, # Not available in Query
conversation_id=conversation.conversation_id,
@@ -90,6 +96,7 @@ class PipelineAdapter:
input=input,
delivery=delivery,
raw_ref=raw_ref,
data=event.data,
)
@classmethod
@@ -110,6 +117,7 @@ class PipelineAdapter:
pipeline_config = query.pipeline_config or {}
ai_config = pipeline_config.get('ai', {})
runner_config = ai_config.get('runner_config', {}).get(runner_id, {})
pipeline_uuid = getattr(query, 'pipeline_uuid', None)
# Extract max_round for adapter (used in bootstrap, not Protocol v1)
# Note: config uses 'max-round' with hyphen, not 'max_round' with underscore
@@ -118,7 +126,7 @@ class PipelineAdapter:
# Build scope
scope = BindingScope(
scope_type="pipeline",
scope_id=query.pipeline_uuid,
scope_id=pipeline_uuid,
)
# Build resource policy from pipeline config
@@ -141,7 +149,7 @@ class PipelineAdapter:
)
return AgentBinding(
binding_id=f"pipeline_{query.pipeline_uuid or 'default'}_{runner_id}",
binding_id=f"pipeline_{pipeline_uuid or 'default'}_{runner_id}",
scope=scope,
event_types=[runner_events.MESSAGE_RECEIVED],
runner_id=runner_id,
@@ -150,80 +158,116 @@ class PipelineAdapter:
state_policy=state_policy,
delivery_policy=delivery_policy,
enabled=True,
pipeline_uuid=query.pipeline_uuid,
pipeline_uuid=pipeline_uuid,
max_round=max_round,
)
@classmethod
def build_bootstrap_from_binding(
def build_bootstrap_context(
cls,
query: pipeline_query.Query,
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build bootstrap context from binding for max-round.
This method handles the max-round -> bootstrap conversion.
max-round is NOT part of Protocol v1, only used by Pipeline adapter.
Args:
query: Pipeline query
binding: Agent binding with max_round
Returns:
Bootstrap context data
"""
) -> tuple[dict[str, typing.Any] | None, dict[str, typing.Any]]:
"""Build bootstrap messages and runtime metadata for Pipeline max-round."""
max_round = binding.max_round
source_messages = query.messages or []
if not max_round or max_round <= 0 or not source_messages:
return None, {}
# If no max_round or self_managed_context, return empty bootstrap
if max_round is None or max_round <= 0:
return {
"messages": [],
"summary": None,
"artifacts": [],
"metadata": {
"policy": "self_managed",
"max_round": None,
},
}
# max-round packaging (will be handled by context_packager)
return {
"messages": [], # Will be filled by context_packager
packaged_messages = select_max_round_messages(source_messages, max_round)
bootstrap_messages = [cls._dump_message(msg) for msg in packaged_messages]
bootstrap = {
"messages": bootstrap_messages,
"summary": None,
"artifacts": [],
"metadata": {
"policy": "max_round",
"max_round": max_round,
"metadata": {},
}
runtime_metadata = {
'context_packaging': {
'policy': {
'mode': 'max_round',
'max_round': max_round,
},
'history': {
'source': 'query.messages',
'source_total_count': len(source_messages),
'delivered_count': len(packaged_messages),
'messages_complete': len(packaged_messages) == len(source_messages),
},
},
}
return bootstrap, runtime_metadata
@classmethod
def build_adapter_context(
cls,
query: pipeline_query.Query,
binding: AgentBinding,
) -> dict[str, typing.Any]:
"""Build adapter context for Pipeline adapter fields.
These fields are for transition purposes only.
Runners should NOT depend on them for long-term capabilities.
Args:
query: Pipeline query
Returns:
Adapter context data
"""
"""Build Query-derived fields for the Pipeline adapter entry."""
bootstrap, runtime_metadata = cls.build_bootstrap_context(query, binding)
return {
"query_id": query.query_id,
"pipeline_uuid": query.pipeline_uuid,
"max_round": None, # Moved to binding, not here
"adapter_messages": [], # Will be filled by context_packager
"extra": {
"bot_uuid": query.bot_uuid,
"sender_id": str(query.sender_id) if query.sender_id else None,
"launcher_type": query.launcher_type.value if query.launcher_type else None,
"launcher_id": query.launcher_id,
},
'params': cls.build_params(query),
'prompt': cls.build_prompt(query),
'bootstrap': bootstrap,
'query_id': getattr(query, 'query_id', None),
'runtime_metadata': runtime_metadata,
}
@classmethod
def build_params(cls, query: pipeline_query.Query) -> dict[str, typing.Any]:
"""Build adapter params from Pipeline variables with host filtering."""
params: dict[str, typing.Any] = {}
variables = getattr(query, 'variables', None)
if not variables:
return params
for key, value in variables.items():
if key.startswith(cls.INTERNAL_PREFIX):
continue
key_lower = key.lower()
if any(pattern in key_lower for pattern in cls.SENSITIVE_PATTERNS):
continue
if any(key == perm_var or key.startswith(perm_var) for perm_var in cls.PERMISSION_VARS):
continue
if cls.is_json_serializable(value):
params[key] = value
return params
@classmethod
def build_prompt(cls, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
"""Build effective prompt messages from Pipeline preprocessing output."""
prompt = getattr(query, 'prompt', None)
messages = getattr(prompt, 'messages', None)
if not messages:
return []
return [cls._dump_message(msg) for msg in messages]
@classmethod
def is_json_serializable(cls, value: typing.Any) -> bool:
"""Return whether a value can safely cross the adapter boundary as JSON."""
if value is None or isinstance(value, (str, int, float, bool)):
return True
if isinstance(value, (list, tuple)):
return all(cls.is_json_serializable(item) for item in value)
if isinstance(value, dict):
return all(
isinstance(k, str) and cls.is_json_serializable(v)
for k, v in value.items()
)
return False
@staticmethod
def _dump_message(message: typing.Any) -> dict[str, typing.Any]:
"""Serialize a provider message-like object."""
if hasattr(message, 'model_dump'):
return message.model_dump(mode='json')
if isinstance(message, dict):
return message
return {
'role': getattr(message, 'role', None),
'content': getattr(message, 'content', None),
}
# Private helper methods
@@ -519,10 +563,11 @@ class PipelineAdapter:
query: pipeline_query.Query,
) -> DeliveryContext:
"""Build DeliveryContext from Query."""
message_chain = getattr(query, 'message_chain', None)
return DeliveryContext(
surface="platform",
reply_target={
"message_id": getattr(query.message_chain, 'message_id', None),
"message_id": getattr(message_chain, 'message_id', None),
},
supports_streaming=True,
supports_edit=False,
@@ -545,10 +590,17 @@ class PipelineAdapter:
query: pipeline_query.Query,
) -> list[str] | None:
"""Extract allowed model UUIDs from query."""
model_uuids: list[str] = []
model_uuid = getattr(query, 'use_llm_model_uuid', None)
if model_uuid:
return [model_uuid]
return None
model_uuids.append(model_uuid)
variables = getattr(query, 'variables', None) or {}
for fallback_uuid in variables.get('_fallback_model_uuids', []) or []:
if fallback_uuid and fallback_uuid not in model_uuids:
model_uuids.append(fallback_uuid)
return model_uuids or None
@classmethod
def _extract_allowed_tools(

View File

@@ -1,7 +1,6 @@
"""Agent resource builder for constructing authorized resources."""
from __future__ import annotations
import asyncio
import typing
from ...core import app
@@ -30,10 +29,6 @@ class AgentResourceBuilder:
- Build knowledge_bases list from config
- Build storage and files permissions summary
Entry points:
- build_resources_from_binding(event, binding, descriptor): Event-first Protocol v1
- build_resources(query, descriptor): Pipeline adapter Query-based
Note: This only builds the resource declaration. The actual proxy actions
in handler.py must still validate against ctx.resources at runtime.
@@ -209,89 +204,6 @@ class AgentResourceBuilder:
'workspace_storage': 'workspace' in storage_perms and resource_policy.allow_workspace_storage,
}
async def build_resources(
self,
query: typing.Any, # pipeline_query.Query
descriptor: AgentRunnerDescriptor,
) -> AgentResources:
"""Build AgentResources from query and runner descriptor.
This is a Pipeline adapter wrapper for Query-based flow.
Args:
query: Pipeline query with pipeline_config and variables
descriptor: Runner descriptor with permissions and capabilities
Returns:
AgentResources dict with filtered resource lists
"""
# Get bound plugins and MCP servers from query
bound_plugins = query.variables.get('_pipeline_bound_plugins')
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers')
# Layer 1: Runner manifest permissions
manifest_perms = descriptor.permissions
# Layer 2: Pipeline extensions_preference (already in bound_plugins/MCP servers)
# Layer 3: Runner instance config (from pipeline_config) - resolved via ConfigMigration
from .config_migration import ConfigMigration
runner_config = ConfigMigration.resolve_runner_config(query.pipeline_config, descriptor.id)
# Build each resource category in parallel
models, tools, knowledge_bases = await asyncio.gather(
self._build_models(manifest_perms, runner_config, descriptor, query),
self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query),
self._build_knowledge_bases(manifest_perms, runner_config, descriptor, query),
)
storage = self._build_storage(manifest_perms)
return {
'models': models,
'tools': tools,
'knowledge_bases': knowledge_bases,
'files': [], # Files are populated at runtime
'storage': storage,
'platform_capabilities': {}, # Reserved for EBA
}
async def _build_models(
self,
manifest_perms: dict[str, list[str]],
runner_config: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
query: typing.Any,
) -> list[ModelResource]:
"""Build models list with plugin SDK field names."""
models: list[ModelResource] = []
seen_model_ids: set[str] = set()
# Check manifest permission
model_perms = manifest_perms.get('models', [])
if 'invoke' not in model_perms and 'stream' not in model_perms:
return models
# Get model from query (preproc already resolved this)
model_uuid = getattr(query, 'use_llm_model_uuid', None)
if model_uuid:
await self._append_llm_model_resource(models, seen_model_ids, model_uuid)
# Add fallback models if present
fallback_uuids = query.variables.get('_fallback_model_uuids', [])
for fb_uuid in fallback_uuids:
await self._append_llm_model_resource(models, seen_model_ids, fb_uuid)
# Add model resources referenced by the runner binding config schema.
# This makes authorization generic for AgentRunner plugins instead of
# hard-coding only local-agent's primary/fallback model path.
await self._append_config_declared_model_resources(
models=models,
seen_model_ids=seen_model_ids,
descriptor=descriptor,
runner_config=runner_config,
)
return models
async def _append_config_declared_model_resources(
self,
models: list[ModelResource],
@@ -349,79 +261,3 @@ class AgentResourceBuilder:
seen_model_ids.add(model_uuid)
except Exception as e:
self.ap.logger.warning(f'Failed to build rerank model resource {model_uuid}: {e}')
async def _build_tools(
self,
manifest_perms: dict[str, list[str]],
bound_plugins: list[str] | None,
bound_mcp_servers: list[str] | None,
query: typing.Any,
) -> list[ToolResource]:
"""Build tools list with plugin SDK field names."""
tools: list[ToolResource] = []
# Check manifest permission
tool_perms = manifest_perms.get('tools', [])
if 'list' not in tool_perms and 'call' not in tool_perms:
return tools
# Get tools from query (preproc already resolved this for local-agent)
use_funcs = getattr(query, 'use_funcs', [])
for tool in use_funcs:
# Use plugin SDK field names: tool_name, tool_type, description
tools.append({
'tool_name': tool.name,
'tool_type': None, # Tool type not available in current LLMTool
'description': tool.description,
})
return tools
async def _build_knowledge_bases(
self,
manifest_perms: dict[str, list[str]],
runner_config: dict[str, typing.Any],
descriptor: AgentRunnerDescriptor,
query: typing.Any,
) -> list[KnowledgeBaseResource]:
"""Build knowledge bases list with plugin SDK field names."""
kb_resources: list[KnowledgeBaseResource] = []
# Check manifest permission
kb_perms = manifest_perms.get('knowledge_bases', [])
if 'list' not in kb_perms and 'retrieve' not in kb_perms:
return kb_resources
# Get knowledge base UUIDs from schema-defined config fields.
kb_uuids = config_schema.extract_knowledge_base_uuids(descriptor, runner_config)
# Also check query variables (may be modified by plugin PromptPreProcessing)
kb_uuids_from_vars = query.variables.get('_knowledge_base_uuids', [])
if kb_uuids_from_vars:
kb_uuids = kb_uuids_from_vars
for kb_uuid in kb_uuids:
try:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if kb:
# Use plugin SDK field names: kb_id, kb_name, kb_type
kb_resources.append({
'kb_id': kb_uuid,
'kb_name': kb.get_name(),
'kb_type': kb.knowledge_base_entity.kb_type if hasattr(kb.knowledge_base_entity, 'kb_type') else None,
})
except Exception as e:
self.ap.logger.warning(f'Failed to build knowledge base resource {kb_uuid}: {e}')
return kb_resources
def _build_storage(
self,
manifest_perms: dict[str, list[str]],
) -> StorageResource:
"""Build storage permissions with plugin SDK field names."""
storage_perms = manifest_perms.get('storage', [])
return {
'plugin_storage': 'plugin' in storage_perms,
'workspace_storage': 'workspace' in storage_perms,
}

View File

@@ -109,8 +109,8 @@ class AgentResultNormalizer:
elif result_type == 'state.updated':
# Log for telemetry, don't yield to pipeline
# Orchestrator already handles the actual state_store.apply_update
scope = data.get('scope', 'conversation') # Default for backward compat
# Orchestrator already handles the actual PersistentStateStore update.
scope = data.get('scope', 'unknown')
key = data.get('key', 'unknown')
value_repr = repr(data.get('value', '...'))[:100] # Truncate for log
self.ap.logger.debug(

View File

@@ -0,0 +1,113 @@
"""State scope key helpers for AgentRunner host-owned state."""
from __future__ import annotations
import typing
from .descriptor import AgentRunnerDescriptor
from .host_models import AgentBinding, AgentEventEnvelope
VALID_STATE_SCOPES = ('conversation', 'actor', 'subject', 'runner')
STATE_KEY_ALIASES = {
'conversation_id': 'external.conversation_id',
}
def normalize_state_key(key: str) -> str:
"""Map accepted public aliases to protocol state keys."""
return STATE_KEY_ALIASES.get(key, key)
def get_binding_identity(binding: AgentBinding) -> str:
"""Return the stable binding identity used for state isolation."""
if binding.binding_id:
return binding.binding_id
scope = binding.scope
if scope.scope_type and scope.scope_id:
return f'{scope.scope_type}:{scope.scope_id}'
return 'unknown_binding'
def build_state_scope_key(
scope: str,
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build the storage key for one state scope.
Returns None when the event lacks the identity required by that scope.
"""
binding_identity = get_binding_identity(binding)
if scope == 'conversation':
if not event.conversation_id:
return None
parts = [descriptor.id, binding_identity, event.conversation_id]
if event.thread_id:
parts.append(event.thread_id)
return f'conversation:{":".join(parts)}'
if scope == 'actor':
if not event.actor or not event.actor.actor_id:
return None
parts = [
descriptor.id,
binding_identity,
event.actor.actor_type or 'user',
event.actor.actor_id,
]
return f'actor:{":".join(parts)}'
if scope == 'subject':
if not event.subject or not event.subject.subject_id:
return None
parts = [
descriptor.id,
binding_identity,
event.subject.subject_type or 'unknown',
event.subject.subject_id,
]
return f'subject:{":".join(parts)}'
if scope == 'runner':
return f'runner:{descriptor.id}:{binding_identity}'
return None
def build_state_scope_keys(
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> dict[str, str]:
"""Build all available scope keys for an event/binding pair."""
scope_keys: dict[str, str] = {}
for scope in VALID_STATE_SCOPES:
scope_key = build_state_scope_key(scope, event, binding, descriptor)
if scope_key:
scope_keys[scope] = scope_key
return scope_keys
def build_state_context(
event: AgentEventEnvelope,
binding: AgentBinding,
descriptor: AgentRunnerDescriptor,
) -> dict[str, typing.Any]:
"""Build the State API context stored in the run session."""
return {
'scope_keys': build_state_scope_keys(event, binding, descriptor),
'binding_identity': get_binding_identity(binding),
'bot_id': event.bot_id,
'workspace_id': event.workspace_id,
'conversation_id': event.conversation_id,
'thread_id': event.thread_id,
'actor_type': event.actor.actor_type if event.actor else None,
'actor_id': event.actor.actor_id if event.actor else None,
'subject_type': event.subject.subject_type if event.subject else None,
'subject_id': event.subject.subject_id if event.subject else None,
}

View File

@@ -1,618 +0,0 @@
"""Runner scoped state store for managing AgentRunner state across runs."""
from __future__ import annotations
import typing
import threading
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from .descriptor import AgentRunnerDescriptor
from .host_models import AgentEventEnvelope
# Valid state scopes for agent runner state updates.
VALID_STATE_SCOPES = ('conversation', 'actor', 'subject', 'runner')
# External-facing key aliases accepted from runners.
STATE_KEY_ALIASES = {
'conversation_id': 'external.conversation_id',
}
class RunnerScopedStateStore:
"""In-memory scoped state store for AgentRunner protocol state.
IMPORTANT: This is HOST-OWNED protocol state, NOT plugin instance state.
Key Design Principles:
1. Host-owned: State is owned and managed by LangBot host, not by the plugin.
The plugin can only read/write through agent runner state updates.
2. Scope keys based on stable host identity: Uses host-controlled identifiers
(runner_id, bot_uuid, pipeline_uuid, launcher_type, launcher_id) rather
than external/unstable identifiers like external conversation id.
3. External conversation id is a VALUE: The runner can update external.conversation_id
in state, which syncs to conversation.uuid. The scope key remains stable,
preventing state loss when conversation identity changes.
State scopes:
- conversation: runner_id + bot_uuid + pipeline_uuid + launcher_type + launcher_id + conversation identity
- actor: runner_id + bot_uuid + sender_id
- subject: runner_id + bot_uuid + launcher_type + launcher_id
- runner: runner_id + pipeline_uuid
This ensures different runners don't share state and same runner
has appropriate isolation per scope.
Note: This is an in-memory store. State only persists within the
current process lifetime. For production use, a persistent storage
backend should be implemented.
"""
def __init__(self):
# Use thread-safe dict for concurrent access
self._store: dict[str, dict[str, typing.Any]] = {}
self._lock = threading.Lock()
def _make_conversation_scope_key(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> str:
"""Build conversation scope identity key.
Uses host-owned stable identity, NOT external conversation id.
External conversation id is a state VALUE, not part of state KEY.
This prevents state loss when runner updates external.conversation_id:
- First run: scope key uses stable identity, state saved
- Runner returns external.conversation_id, synced to conversation.uuid
- Next run: scope key still uses same stable identity, state accessible
"""
parts = [
descriptor.id,
query.bot_uuid or 'unknown_bot',
query.pipeline_uuid or 'unknown_pipeline',
]
if query.session:
parts.append(query.session.launcher_type.value)
parts.append(query.session.launcher_id)
# Use stable conversation identity (NOT external uuid)
# Options:
# 1. conversation.create_time if available (stable host-owned)
# 2. Use "conversation" literal as stable identity within launcher scope
# (assumes one active conversation per launcher context)
# We use option 2 for simplicity - conversation state is scoped to
# launcher (person/group) + bot + pipeline + runner
# External conversation id is just a VALUE inside this scope
conv_create_time = getattr(query.session.using_conversation, 'create_time', None)
if conv_create_time:
# Use create_time as stable identity if available
parts.append(str(conv_create_time))
# else: no additional part - launcher scope identity is sufficient
return f'conversation:{":".join(parts)}'
def _make_actor_scope_key(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> str:
"""Build actor scope identity key."""
parts = [
descriptor.id,
query.bot_uuid or 'unknown_bot',
str(query.sender_id) if query.sender_id else 'unknown_sender',
]
return f'actor:{":".join(parts)}'
def _make_subject_scope_key(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> str:
"""Build subject scope identity key."""
parts = [
descriptor.id,
query.bot_uuid or 'unknown_bot',
]
if query.session:
parts.append(query.session.launcher_type.value)
parts.append(query.session.launcher_id)
return f'subject:{":".join(parts)}'
def _make_runner_scope_key(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> str:
"""Build runner scope identity key."""
parts = [
descriptor.id,
query.pipeline_uuid or 'unknown_pipeline',
]
return f'runner:{":".join(parts)}'
def _get_scope_key(
self,
scope: str,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> str:
"""Get the storage key for a given scope."""
if scope == 'conversation':
return self._make_conversation_scope_key(query, descriptor)
elif scope == 'actor':
return self._make_actor_scope_key(query, descriptor)
elif scope == 'subject':
return self._make_subject_scope_key(query, descriptor)
elif scope == 'runner':
return self._make_runner_scope_key(query, descriptor)
else:
raise ValueError(f'Invalid scope: {scope}')
def build_snapshot(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> dict[str, dict[str, typing.Any]]:
"""Build state snapshot for all scopes.
Args:
query: Pipeline query
descriptor: Runner descriptor
Returns:
Dict with 4 scope keys, each containing scope state dict
"""
snapshot: dict[str, dict[str, typing.Any]] = {
'conversation': {},
'actor': {},
'subject': {},
'runner': {},
}
with self._lock:
for scope in VALID_STATE_SCOPES:
scope_key = self._get_scope_key(scope, query, descriptor)
scope_state = self._store.get(scope_key, {})
snapshot[scope] = dict(scope_state) # Copy to avoid mutation
# Seed external.conversation_id from existing conversation uuid
if query.session and query.session.using_conversation:
conv_uuid = getattr(query.session.using_conversation, 'uuid', None)
if conv_uuid and 'external.conversation_id' not in snapshot['conversation']:
snapshot['conversation']['external.conversation_id'] = conv_uuid
return snapshot
def apply_update(
self,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
scope: str,
key: str,
value: typing.Any,
logger: typing.Any = None,
) -> bool:
"""Apply a state update to the store.
Args:
query: Pipeline query
descriptor: Runner descriptor
scope: State scope (conversation, actor, subject, runner)
key: State key (should use namespace prefix like external.*)
value: State value (must be JSON-serializable)
logger: Optional logger for warnings
Returns:
True if update applied successfully, False if invalid scope
Side effects:
- Updates internal store
- Syncs external.conversation_id to query.session.using_conversation.uuid
"""
# Validate scope
if scope not in VALID_STATE_SCOPES:
if logger:
logger.warning(
f'Runner {descriptor.id} state.updated with invalid scope: {scope}. '
f'Valid scopes: {", ".join(VALID_STATE_SCOPES)}'
)
return False
# Map accepted key aliases
if key in STATE_KEY_ALIASES:
mapped_key = STATE_KEY_ALIASES[key]
if logger:
logger.debug(
f'Runner {descriptor.id} state.updated key alias "{key}" mapped to "{mapped_key}"'
)
key = mapped_key
# Apply update to store
with self._lock:
scope_key = self._get_scope_key(scope, query, descriptor)
if scope_key not in self._store:
self._store[scope_key] = {}
self._store[scope_key][key] = value
# Sync external.conversation_id to query.session.using_conversation.uuid
if scope == 'conversation' and key == 'external.conversation_id':
if query.session and query.session.using_conversation:
# Keep the active conversation UUID aligned with runner-owned state.
setattr(query.session.using_conversation, 'uuid', value)
if logger:
logger.debug(
f'Synced external.conversation_id "{value}" to conversation.uuid'
)
return True
def clear_scope(
self,
scope: str,
query: pipeline_query.Query,
descriptor: AgentRunnerDescriptor,
) -> None:
"""Clear all state for a specific scope.
Args:
scope: State scope to clear
query: Pipeline query
descriptor: Runner descriptor
"""
with self._lock:
scope_key = self._get_scope_key(scope, query, descriptor)
if scope_key in self._store:
del self._store[scope_key]
def clear_all(self) -> None:
"""Clear all stored state (for testing/reset)."""
with self._lock:
self._store.clear()
# ========== Event-first Protocol v1 methods ==========
def _get_binding_identity(
self,
binding: "AgentBinding",
) -> str:
"""Get stable binding identity for scope key.
Uses binding_id if available, falls back to scope_type + scope_id.
"""
if binding.binding_id:
return binding.binding_id
# Fallback to scope identity
scope = binding.scope
if scope.scope_type and scope.scope_id:
return f"{scope.scope_type}:{scope.scope_id}"
# Last resort - should not happen in production
return "unknown_binding"
def _make_conversation_scope_key_from_event(
self,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build conversation scope identity key from event and binding.
Scope key structure: runner_id + binding_id + conversation_id
This ensures state is isolated per binding and per conversation.
Returns None if conversation_id is missing.
"""
if not event.conversation_id:
return None
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
event.conversation_id,
]
# Include thread_id if present for thread-scoped state
if event.thread_id:
parts.append(event.thread_id)
return f'conversation:{":".join(parts)}'
def _make_actor_scope_key_from_event(
self,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build actor scope identity key from event and binding.
Scope key structure: runner_id + binding_id + actor_type + actor_id
This ensures state is isolated per binding and per actor.
Returns None if actor_id is missing.
"""
if not event.actor or not event.actor.actor_id:
return None
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
event.actor.actor_type or 'user',
event.actor.actor_id,
]
return f'actor:{":".join(parts)}'
def _make_subject_scope_key_from_event(
self,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Build subject scope identity key from event and binding.
Scope key structure: runner_id + binding_id + subject_type + subject_id
This ensures state is isolated per binding and per subject.
Returns None if subject_id is missing.
"""
if not event.subject or not event.subject.subject_id:
return None
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
event.subject.subject_type or 'unknown',
event.subject.subject_id,
]
return f'subject:{":".join(parts)}'
def _make_runner_scope_key_from_event(
self,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
) -> str:
"""Build runner scope identity key from event and binding.
Scope key structure: runner_id + binding_id
This ensures state is isolated per binding (not shared across bindings).
"""
binding_identity = self._get_binding_identity(binding)
parts = [
descriptor.id,
binding_identity,
]
return f'runner:{":".join(parts)}'
def _get_scope_key_from_event(
self,
scope: str,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
) -> str | None:
"""Get the storage key for a given scope from event and binding.
Returns None if required identity is missing for the scope.
"""
if scope == 'conversation':
return self._make_conversation_scope_key_from_event(event, binding, descriptor)
elif scope == 'actor':
return self._make_actor_scope_key_from_event(event, binding, descriptor)
elif scope == 'subject':
return self._make_subject_scope_key_from_event(event, binding, descriptor)
elif scope == 'runner':
return self._make_runner_scope_key_from_event(event, binding, descriptor)
else:
return None
def _check_scope_enabled(
self,
scope: str,
binding: "AgentBinding",
) -> bool:
"""Check if a scope is enabled by binding's state_policy.
Args:
scope: Scope to check
binding: Agent binding with state_policy
Returns:
True if scope is enabled, False otherwise
"""
state_policy = binding.state_policy
# Check if state is disabled entirely
if not state_policy.enable_state:
return False
# Check if scope is in enabled scopes
return scope in state_policy.state_scopes
def build_snapshot_from_event(
self,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
) -> dict[str, dict[str, typing.Any]]:
"""Build state snapshot for all scopes from event and binding.
Respects binding.state_policy:
- If enable_state is False, returns all empty scopes.
- If a scope is not in state_scopes, returns empty dict for that scope.
Args:
event: Event envelope
binding: Agent binding configuration
descriptor: Runner descriptor
Returns:
Dict with 4 scope keys, each containing scope state dict.
Scopes without required identity or disabled by policy will have empty dict.
"""
state_policy = binding.state_policy
# If state is disabled, return all empty scopes
if not state_policy.enable_state:
return {
'conversation': {},
'actor': {},
'subject': {},
'runner': {},
}
snapshot: dict[str, dict[str, typing.Any]] = {
'conversation': {},
'actor': {},
'subject': {},
'runner': {},
}
with self._lock:
for scope in VALID_STATE_SCOPES:
# Check if scope is enabled by policy
if not self._check_scope_enabled(scope, binding):
continue
scope_key = self._get_scope_key_from_event(scope, event, binding, descriptor)
if scope_key:
scope_state = self._store.get(scope_key, {})
snapshot[scope] = dict(scope_state) # Copy to avoid mutation
# Seed external.conversation_id from event.conversation_id if not already set
# Only if conversation scope is enabled
if self._check_scope_enabled('conversation', binding) and event.conversation_id:
if 'external.conversation_id' not in snapshot['conversation']:
snapshot['conversation']['external.conversation_id'] = event.conversation_id
return snapshot
def apply_update_from_event(
self,
event: AgentEventEnvelope,
binding: "AgentBinding",
descriptor: AgentRunnerDescriptor,
scope: str,
key: str,
value: typing.Any,
logger: typing.Any = None,
) -> bool:
"""Apply a state update to the store from event and binding context.
Respects binding.state_policy:
- If enable_state is False, rejects the update.
- If scope is not in state_scopes, rejects the update.
Args:
event: Event envelope
binding: Agent binding configuration
descriptor: Runner descriptor
scope: State scope (conversation, actor, subject, runner)
key: State key (should use namespace prefix like external.*)
value: State value (must be JSON-serializable)
logger: Optional logger for warnings
Returns:
True if update applied successfully, False if invalid scope,
missing identity, or disabled by policy
"""
state_policy = binding.state_policy
# Check if state is disabled entirely
if not state_policy.enable_state:
if logger:
logger.warning(
f'Runner {descriptor.id} state.updated rejected: '
f'state is disabled by binding policy'
)
return False
# Validate scope
if scope not in VALID_STATE_SCOPES:
if logger:
logger.warning(
f'Runner {descriptor.id} state.updated with invalid scope: {scope}. '
f'Valid scopes: {", ".join(VALID_STATE_SCOPES)}'
)
return False
# Check if scope is enabled by policy
if not self._check_scope_enabled(scope, binding):
if logger:
logger.warning(
f'Runner {descriptor.id} state.updated rejected for scope "{scope}": '
f'scope not enabled by binding policy. Enabled scopes: {state_policy.state_scopes}'
)
return False
# Map accepted key aliases
if key in STATE_KEY_ALIASES:
mapped_key = STATE_KEY_ALIASES[key]
if logger:
logger.debug(
f'Runner {descriptor.id} state.updated key alias "{key}" mapped to "{mapped_key}"'
)
key = mapped_key
# Get scope key from event and binding
scope_key = self._get_scope_key_from_event(scope, event, binding, descriptor)
if scope_key is None:
if logger:
logger.warning(
f'Runner {descriptor.id} state.updated for scope "{scope}" '
f'requires missing identity (conversation_id, actor_id, or subject_id). '
f'Skipping update.'
)
return False
# Apply update to store
with self._lock:
if scope_key not in self._store:
self._store[scope_key] = {}
self._store[scope_key][key] = value
if logger:
logger.debug(
f'Runner {descriptor.id} state.updated: scope={scope}, key={key}'
)
return True
# Global singleton state store
_state_store: RunnerScopedStateStore | None = None
_state_store_lock = threading.Lock()
def get_state_store() -> RunnerScopedStateStore:
"""Get the global state store singleton."""
global _state_store
with _state_store_lock:
if _state_store is None:
_state_store = RunnerScopedStateStore()
return _state_store
def reset_state_store() -> None:
"""Reset the global state store (for testing)."""
global _state_store
with _state_store_lock:
_state_store = None

View File

@@ -21,8 +21,7 @@ class AgentRunnerState(Base):
- subject: runner_id + binding_id + subject_type + subject_id
- runner: runner_id + binding_id
This table persists state across runs, replacing the in-memory
RunnerScopedStateStore._store dict.
This table is the production store for AgentRunner state.
"""
__tablename__ = 'agent_runner_state'

View File

@@ -0,0 +1,34 @@
"""Shared max-round message window helpers for Pipeline behavior."""
from __future__ import annotations
import typing
DEFAULT_MAX_ROUND = 10
def get_max_round(config: dict[str, typing.Any]) -> typing.Any:
"""Return the configured Pipeline max-round value."""
return config.get('max-round', DEFAULT_MAX_ROUND)
def select_max_round_messages(
messages: list[typing.Any] | None,
max_round: typing.Any,
) -> list[typing.Any]:
"""Select a bounded recent message window by user-round count."""
if not messages:
return []
temp_messages: list[typing.Any] = []
current_round = 0
for msg in messages[::-1]:
if current_round < max_round:
temp_messages.append(msg)
if getattr(msg, 'role', None) == 'user':
current_round += 1
else:
break
return temp_messages[::-1]

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from .. import truncator
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from ....agent.runner.config_migration import ConfigMigration
from ....agent.runner.context_packager import (
from ..round_policy import (
get_max_round,
select_max_round_messages,
)