mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-14 17:56:03 +00:00
refactor(agent-runner): tighten protocol v1 runtime boundaries
This commit is contained in:
committed by
huanghuoguoguo
parent
f9e07df539
commit
2fd2c6aadc
@@ -16,12 +16,12 @@ from .registry import AgentRunnerRegistry
|
||||
from .context_builder import AgentRunContextBuilder, AgentRunContextPayload
|
||||
from .resource_builder import AgentResourceBuilder
|
||||
from .result_normalizer import AgentResultNormalizer
|
||||
from .state_store import get_state_store, RunnerScopedStateStore
|
||||
from .persistent_state_store import get_persistent_state_store, PersistentStateStore
|
||||
from .session_registry import get_session_registry, AgentRunSessionRegistry
|
||||
from .config_migration import ConfigMigration
|
||||
from .host_models import AgentEventEnvelope, AgentBinding
|
||||
from .pipeline_adapter import PipelineAdapter
|
||||
from .state_scope import build_state_context
|
||||
from .errors import (
|
||||
RunnerNotFoundError,
|
||||
RunnerExecutionError,
|
||||
@@ -63,7 +63,6 @@ class AgentRunOrchestrator:
|
||||
|
||||
# Cached singleton references (set in __init__)
|
||||
_session_registry: AgentRunSessionRegistry
|
||||
_state_store: RunnerScopedStateStore
|
||||
_persistent_state_store: PersistentStateStore | None
|
||||
|
||||
def __init__(
|
||||
@@ -78,7 +77,6 @@ class AgentRunOrchestrator:
|
||||
self.result_normalizer = AgentResultNormalizer(ap)
|
||||
# Cache singleton references to avoid per-request getter calls
|
||||
self._session_registry = get_session_registry()
|
||||
self._state_store = get_state_store()
|
||||
self._persistent_state_store = None # Lazy init on first use
|
||||
|
||||
async def run(
|
||||
@@ -132,13 +130,13 @@ class AgentRunOrchestrator:
|
||||
# Merge params into adapter.extra
|
||||
if 'params' in adapter_context:
|
||||
context['adapter']['extra']['params'] = adapter_context['params']
|
||||
# Merge prompt into adapter.extra (for transition runners)
|
||||
# Merge prompt into adapter.extra for Pipeline adapter consumers.
|
||||
if 'prompt' in adapter_context:
|
||||
context['adapter']['extra']['prompt'] = adapter_context['prompt']
|
||||
# Merge bootstrap if provided
|
||||
if adapter_context.get('bootstrap'):
|
||||
context['bootstrap'] = adapter_context['bootstrap']
|
||||
# Also set adapter_messages for transition runners
|
||||
# Also expose the bootstrap window through adapter metadata.
|
||||
bootstrap_messages = adapter_context['bootstrap'].get('messages')
|
||||
if bootstrap_messages:
|
||||
context['adapter']['adapter_messages'] = bootstrap_messages
|
||||
@@ -150,7 +148,7 @@ class AgentRunOrchestrator:
|
||||
context['runtime']['query_id'] = adapter_context['query_id']
|
||||
|
||||
# Build state context for State API handlers
|
||||
state_context = self._build_state_context(event, binding, descriptor)
|
||||
state_context = build_state_context(event, binding, descriptor)
|
||||
|
||||
# Register session for proxy action permission validation
|
||||
run_id = context['run_id']
|
||||
@@ -274,7 +272,7 @@ class AgentRunOrchestrator:
|
||||
bound_plugins = query.variables.get('_pipeline_bound_plugins')
|
||||
|
||||
# Build adapter context for Pipeline-specific fields
|
||||
adapter_context = await self._build_adapter_context(query, binding)
|
||||
adapter_context = PipelineAdapter.build_adapter_context(query, binding)
|
||||
|
||||
# Delegate to event-first run()
|
||||
async for result in self.run(
|
||||
@@ -285,73 +283,6 @@ class AgentRunOrchestrator:
|
||||
):
|
||||
yield result
|
||||
|
||||
async def _build_adapter_context(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
binding: AgentBinding,
|
||||
) -> dict[str, typing.Any]:
|
||||
"""Build adapter context for Pipeline Query-based flow.
|
||||
|
||||
This extracts adapter-specific fields from Query that aren't available in
|
||||
the event-first flow:
|
||||
- params (from query.variables)
|
||||
- bootstrap messages (for max-round)
|
||||
- query_id
|
||||
- prompt messages
|
||||
|
||||
Args:
|
||||
query: Pipeline query
|
||||
binding: Agent binding with max_round
|
||||
|
||||
Returns:
|
||||
Adapter context dict
|
||||
"""
|
||||
from .context_packager import AgentContextPackager
|
||||
|
||||
# Use context_builder's _build_params for proper filtering
|
||||
# (excludes internal vars, sensitive patterns, permission vars, non-JSON values)
|
||||
params = self.context_builder._build_params(query)
|
||||
|
||||
# Build prompt from query.prompt.messages (for transition runners)
|
||||
prompt = self.context_builder._build_prompt(query)
|
||||
|
||||
# Build bootstrap context for max-round
|
||||
bootstrap = None
|
||||
runtime_metadata = {}
|
||||
max_round = binding.max_round
|
||||
|
||||
if max_round and max_round > 0 and query.messages:
|
||||
# Package messages using context_packager
|
||||
runner_config = binding.runner_config or {}
|
||||
context_packager = AgentContextPackager()
|
||||
packaged_context = context_packager.package_messages(query, runner_config)
|
||||
|
||||
# Build messages list
|
||||
adapter_messages = []
|
||||
for msg in packaged_context.messages:
|
||||
adapter_messages.append(msg.model_dump(mode='json'))
|
||||
|
||||
bootstrap = {
|
||||
'messages': adapter_messages,
|
||||
'summary': None,
|
||||
'artifacts': [],
|
||||
'metadata': {},
|
||||
}
|
||||
|
||||
# Build runtime metadata for context_packaging
|
||||
runtime_metadata['context_packaging'] = {
|
||||
'policy': packaged_context.policy,
|
||||
'history': packaged_context.history,
|
||||
}
|
||||
|
||||
return {
|
||||
'params': params,
|
||||
'prompt': prompt,
|
||||
'bootstrap': bootstrap,
|
||||
'query_id': query.query_id,
|
||||
'runtime_metadata': runtime_metadata,
|
||||
}
|
||||
|
||||
async def _invoke_runner(
|
||||
self,
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
@@ -497,18 +428,22 @@ class AgentRunOrchestrator:
|
||||
"""
|
||||
data = result_dict.get('data', {})
|
||||
|
||||
# Extract scope (default to conversation when omitted by the runner)
|
||||
scope = data.get('scope', 'conversation')
|
||||
scope = data.get('scope')
|
||||
if not scope:
|
||||
raise RunnerProtocolError(
|
||||
descriptor.id,
|
||||
'state.updated missing required field: scope',
|
||||
)
|
||||
|
||||
# Extract key and value
|
||||
key = data.get('key')
|
||||
value = data.get('value')
|
||||
|
||||
if not key:
|
||||
self.ap.logger.warning(
|
||||
f'Runner {descriptor.id} state.updated missing key, ignoring'
|
||||
raise RunnerProtocolError(
|
||||
descriptor.id,
|
||||
'state.updated missing required field: key',
|
||||
)
|
||||
return
|
||||
|
||||
# Lazy init persistent state store
|
||||
if self._persistent_state_store is None:
|
||||
@@ -536,75 +471,6 @@ class AgentRunOrchestrator:
|
||||
f'Runner {descriptor.id} state.updated rejected: {error}'
|
||||
)
|
||||
|
||||
def _build_state_context(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: AgentBinding,
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> dict[str, typing.Any]:
|
||||
"""Build state context for State API handlers.
|
||||
|
||||
Returns context with:
|
||||
- scope_keys: Dict mapping scope name to scope_key
|
||||
- binding_identity: Binding identity for state isolation
|
||||
- Additional context fields for DB insert
|
||||
"""
|
||||
# Get binding identity
|
||||
binding_identity = binding.binding_id
|
||||
if not binding_identity:
|
||||
scope = binding.scope
|
||||
if scope.scope_type and scope.scope_id:
|
||||
binding_identity = f"{scope.scope_type}:{scope.scope_id}"
|
||||
else:
|
||||
binding_identity = "unknown_binding"
|
||||
|
||||
# Build scope keys for each scope
|
||||
scope_keys: dict[str, str] = {}
|
||||
|
||||
# Conversation scope
|
||||
if event.conversation_id:
|
||||
parts = [descriptor.id, binding_identity, event.conversation_id]
|
||||
if event.thread_id:
|
||||
parts.append(event.thread_id)
|
||||
scope_keys['conversation'] = f'conversation:{":".join(parts)}'
|
||||
|
||||
# Actor scope
|
||||
if event.actor and event.actor.actor_id:
|
||||
parts = [
|
||||
descriptor.id,
|
||||
binding_identity,
|
||||
event.actor.actor_type or 'user',
|
||||
event.actor.actor_id,
|
||||
]
|
||||
scope_keys['actor'] = f'actor:{":".join(parts)}'
|
||||
|
||||
# Subject scope
|
||||
if event.subject and event.subject.subject_id:
|
||||
parts = [
|
||||
descriptor.id,
|
||||
binding_identity,
|
||||
event.subject.subject_type or 'unknown',
|
||||
event.subject.subject_id,
|
||||
]
|
||||
scope_keys['subject'] = f'subject:{":".join(parts)}'
|
||||
|
||||
# Runner scope (always available)
|
||||
parts = [descriptor.id, binding_identity]
|
||||
scope_keys['runner'] = f'runner:{":".join(parts)}'
|
||||
|
||||
return {
|
||||
'scope_keys': scope_keys,
|
||||
'binding_identity': binding_identity,
|
||||
'bot_id': event.bot_id,
|
||||
'workspace_id': event.workspace_id,
|
||||
'conversation_id': event.conversation_id,
|
||||
'thread_id': event.thread_id,
|
||||
'actor_type': event.actor.actor_type if event.actor else None,
|
||||
'actor_id': event.actor.actor_id if event.actor else None,
|
||||
'subject_type': event.subject.subject_type if event.subject else None,
|
||||
'subject_id': event.subject.subject_id if event.subject else None,
|
||||
}
|
||||
|
||||
async def _write_event_log(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
|
||||
Reference in New Issue
Block a user