mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-04 21:06:03 +00:00
feat(agent-runner): scope event-first state by binding
This commit is contained in:
@@ -261,14 +261,9 @@ class AgentRunContextBuilder:
|
||||
# Populate with actual values from stores
|
||||
context_access = await self._build_context_access(event, descriptor)
|
||||
|
||||
# Build state snapshot (for event-first, we need event-based scope key)
|
||||
# For now, return empty state - will be implemented with state_store migration
|
||||
state: AgentRunState = {
|
||||
'conversation': {},
|
||||
'actor': {},
|
||||
'subject': {},
|
||||
'runner': {},
|
||||
}
|
||||
# Build state snapshot from event context
|
||||
state_store = get_state_store()
|
||||
state: AgentRunState = state_store.build_snapshot_from_event(event, binding, descriptor)
|
||||
|
||||
# Build runtime context
|
||||
runtime: AgentRuntimeContext = {
|
||||
|
||||
@@ -170,7 +170,7 @@ class AgentRunOrchestrator:
|
||||
|
||||
# Handle state.updated first - consume before normalizer
|
||||
if result_dict.get('type') == 'state.updated':
|
||||
self._handle_state_updated_event(result_dict, event, descriptor)
|
||||
self._handle_state_updated_event(result_dict, event, binding, descriptor)
|
||||
# Pass to normalizer for logging, but don't yield to pipeline
|
||||
await self.result_normalizer.normalize(result_dict, descriptor)
|
||||
continue
|
||||
@@ -559,6 +559,7 @@ class AgentRunOrchestrator:
|
||||
self,
|
||||
result_dict: dict[str, typing.Any],
|
||||
event: AgentEventEnvelope,
|
||||
binding: AgentBinding,
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> None:
|
||||
"""Handle state.updated result in event-first mode.
|
||||
@@ -566,6 +567,7 @@ class AgentRunOrchestrator:
|
||||
Args:
|
||||
result_dict: Raw result dict with type='state.updated'
|
||||
event: Event envelope
|
||||
binding: Agent binding configuration
|
||||
descriptor: Runner descriptor
|
||||
"""
|
||||
data = result_dict.get('data', {})
|
||||
@@ -584,13 +586,22 @@ class AgentRunOrchestrator:
|
||||
return
|
||||
|
||||
# Apply update to state store using event context
|
||||
# Note: state_store needs to support event-based scope key calculation
|
||||
# For now, we log and skip actual persistence in event-first mode
|
||||
# This will be implemented when state_store is migrated to support events
|
||||
self.ap.logger.debug(
|
||||
f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}, value={value}'
|
||||
success = self._state_store.apply_update_from_event(
|
||||
event=event,
|
||||
binding=binding,
|
||||
descriptor=descriptor,
|
||||
scope=scope,
|
||||
key=key,
|
||||
value=value,
|
||||
logger=self.ap.logger,
|
||||
)
|
||||
|
||||
if success:
|
||||
self.ap.logger.debug(
|
||||
f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}'
|
||||
)
|
||||
# Invalid scope or missing identity is already logged by apply_update_from_event
|
||||
|
||||
async def _write_event_log(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
|
||||
@@ -7,6 +7,7 @@ import threading
|
||||
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
|
||||
|
||||
from .descriptor import AgentRunnerDescriptor
|
||||
from .host_models import AgentEventEnvelope
|
||||
|
||||
|
||||
# Valid state scopes for agent runner state updates.
|
||||
@@ -277,6 +278,325 @@ class RunnerScopedStateStore:
|
||||
with self._lock:
|
||||
self._store.clear()
|
||||
|
||||
# ========== Event-first Protocol v1 methods ==========
|
||||
|
||||
def _get_binding_identity(
|
||||
self,
|
||||
binding: "AgentBinding",
|
||||
) -> str:
|
||||
"""Get stable binding identity for scope key.
|
||||
|
||||
Uses binding_id if available, falls back to scope_type + scope_id.
|
||||
"""
|
||||
if binding.binding_id:
|
||||
return binding.binding_id
|
||||
|
||||
# Fallback to scope identity
|
||||
scope = binding.scope
|
||||
if scope.scope_type and scope.scope_id:
|
||||
return f"{scope.scope_type}:{scope.scope_id}"
|
||||
|
||||
# Last resort - should not happen in production
|
||||
return "unknown_binding"
|
||||
|
||||
def _make_conversation_scope_key_from_event(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> str | None:
|
||||
"""Build conversation scope identity key from event and binding.
|
||||
|
||||
Scope key structure: runner_id + binding_id + conversation_id
|
||||
This ensures state is isolated per binding and per conversation.
|
||||
|
||||
Returns None if conversation_id is missing.
|
||||
"""
|
||||
if not event.conversation_id:
|
||||
return None
|
||||
|
||||
binding_identity = self._get_binding_identity(binding)
|
||||
|
||||
parts = [
|
||||
descriptor.id,
|
||||
binding_identity,
|
||||
event.conversation_id,
|
||||
]
|
||||
|
||||
# Include thread_id if present for thread-scoped state
|
||||
if event.thread_id:
|
||||
parts.append(event.thread_id)
|
||||
|
||||
return f'conversation:{":".join(parts)}'
|
||||
|
||||
def _make_actor_scope_key_from_event(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> str | None:
|
||||
"""Build actor scope identity key from event and binding.
|
||||
|
||||
Scope key structure: runner_id + binding_id + actor_type + actor_id
|
||||
This ensures state is isolated per binding and per actor.
|
||||
|
||||
Returns None if actor_id is missing.
|
||||
"""
|
||||
if not event.actor or not event.actor.actor_id:
|
||||
return None
|
||||
|
||||
binding_identity = self._get_binding_identity(binding)
|
||||
|
||||
parts = [
|
||||
descriptor.id,
|
||||
binding_identity,
|
||||
event.actor.actor_type or 'user',
|
||||
event.actor.actor_id,
|
||||
]
|
||||
|
||||
return f'actor:{":".join(parts)}'
|
||||
|
||||
def _make_subject_scope_key_from_event(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> str | None:
|
||||
"""Build subject scope identity key from event and binding.
|
||||
|
||||
Scope key structure: runner_id + binding_id + subject_type + subject_id
|
||||
This ensures state is isolated per binding and per subject.
|
||||
|
||||
Returns None if subject_id is missing.
|
||||
"""
|
||||
if not event.subject or not event.subject.subject_id:
|
||||
return None
|
||||
|
||||
binding_identity = self._get_binding_identity(binding)
|
||||
|
||||
parts = [
|
||||
descriptor.id,
|
||||
binding_identity,
|
||||
event.subject.subject_type or 'unknown',
|
||||
event.subject.subject_id,
|
||||
]
|
||||
|
||||
return f'subject:{":".join(parts)}'
|
||||
|
||||
def _make_runner_scope_key_from_event(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> str:
|
||||
"""Build runner scope identity key from event and binding.
|
||||
|
||||
Scope key structure: runner_id + binding_id
|
||||
This ensures state is isolated per binding (not shared across bindings).
|
||||
"""
|
||||
binding_identity = self._get_binding_identity(binding)
|
||||
|
||||
parts = [
|
||||
descriptor.id,
|
||||
binding_identity,
|
||||
]
|
||||
|
||||
return f'runner:{":".join(parts)}'
|
||||
|
||||
def _get_scope_key_from_event(
|
||||
self,
|
||||
scope: str,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> str | None:
|
||||
"""Get the storage key for a given scope from event and binding.
|
||||
|
||||
Returns None if required identity is missing for the scope.
|
||||
"""
|
||||
if scope == 'conversation':
|
||||
return self._make_conversation_scope_key_from_event(event, binding, descriptor)
|
||||
elif scope == 'actor':
|
||||
return self._make_actor_scope_key_from_event(event, binding, descriptor)
|
||||
elif scope == 'subject':
|
||||
return self._make_subject_scope_key_from_event(event, binding, descriptor)
|
||||
elif scope == 'runner':
|
||||
return self._make_runner_scope_key_from_event(event, binding, descriptor)
|
||||
else:
|
||||
return None
|
||||
|
||||
def _check_scope_enabled(
|
||||
self,
|
||||
scope: str,
|
||||
binding: "AgentBinding",
|
||||
) -> bool:
|
||||
"""Check if a scope is enabled by binding's state_policy.
|
||||
|
||||
Args:
|
||||
scope: Scope to check
|
||||
binding: Agent binding with state_policy
|
||||
|
||||
Returns:
|
||||
True if scope is enabled, False otherwise
|
||||
"""
|
||||
state_policy = binding.state_policy
|
||||
|
||||
# Check if state is disabled entirely
|
||||
if not state_policy.enable_state:
|
||||
return False
|
||||
|
||||
# Check if scope is in enabled scopes
|
||||
return scope in state_policy.state_scopes
|
||||
|
||||
def build_snapshot_from_event(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
) -> dict[str, dict[str, typing.Any]]:
|
||||
"""Build state snapshot for all scopes from event and binding.
|
||||
|
||||
Respects binding.state_policy:
|
||||
- If enable_state is False, returns all empty scopes.
|
||||
- If a scope is not in state_scopes, returns empty dict for that scope.
|
||||
|
||||
Args:
|
||||
event: Event envelope
|
||||
binding: Agent binding configuration
|
||||
descriptor: Runner descriptor
|
||||
|
||||
Returns:
|
||||
Dict with 4 scope keys, each containing scope state dict.
|
||||
Scopes without required identity or disabled by policy will have empty dict.
|
||||
"""
|
||||
state_policy = binding.state_policy
|
||||
|
||||
# If state is disabled, return all empty scopes
|
||||
if not state_policy.enable_state:
|
||||
return {
|
||||
'conversation': {},
|
||||
'actor': {},
|
||||
'subject': {},
|
||||
'runner': {},
|
||||
}
|
||||
|
||||
snapshot: dict[str, dict[str, typing.Any]] = {
|
||||
'conversation': {},
|
||||
'actor': {},
|
||||
'subject': {},
|
||||
'runner': {},
|
||||
}
|
||||
|
||||
with self._lock:
|
||||
for scope in VALID_STATE_SCOPES:
|
||||
# Check if scope is enabled by policy
|
||||
if not self._check_scope_enabled(scope, binding):
|
||||
continue
|
||||
|
||||
scope_key = self._get_scope_key_from_event(scope, event, binding, descriptor)
|
||||
if scope_key:
|
||||
scope_state = self._store.get(scope_key, {})
|
||||
snapshot[scope] = dict(scope_state) # Copy to avoid mutation
|
||||
|
||||
# Seed external.conversation_id from event.conversation_id if not already set
|
||||
# Only if conversation scope is enabled
|
||||
if self._check_scope_enabled('conversation', binding) and event.conversation_id:
|
||||
if 'external.conversation_id' not in snapshot['conversation']:
|
||||
snapshot['conversation']['external.conversation_id'] = event.conversation_id
|
||||
|
||||
return snapshot
|
||||
|
||||
def apply_update_from_event(
|
||||
self,
|
||||
event: AgentEventEnvelope,
|
||||
binding: "AgentBinding",
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
scope: str,
|
||||
key: str,
|
||||
value: typing.Any,
|
||||
logger: typing.Any = None,
|
||||
) -> bool:
|
||||
"""Apply a state update to the store from event and binding context.
|
||||
|
||||
Respects binding.state_policy:
|
||||
- If enable_state is False, rejects the update.
|
||||
- If scope is not in state_scopes, rejects the update.
|
||||
|
||||
Args:
|
||||
event: Event envelope
|
||||
binding: Agent binding configuration
|
||||
descriptor: Runner descriptor
|
||||
scope: State scope (conversation, actor, subject, runner)
|
||||
key: State key (should use namespace prefix like external.*)
|
||||
value: State value (must be JSON-serializable)
|
||||
logger: Optional logger for warnings
|
||||
|
||||
Returns:
|
||||
True if update applied successfully, False if invalid scope,
|
||||
missing identity, or disabled by policy
|
||||
"""
|
||||
state_policy = binding.state_policy
|
||||
|
||||
# Check if state is disabled entirely
|
||||
if not state_policy.enable_state:
|
||||
if logger:
|
||||
logger.warning(
|
||||
f'Runner {descriptor.id} state.updated rejected: '
|
||||
f'state is disabled by binding policy'
|
||||
)
|
||||
return False
|
||||
|
||||
# Validate scope
|
||||
if scope not in VALID_STATE_SCOPES:
|
||||
if logger:
|
||||
logger.warning(
|
||||
f'Runner {descriptor.id} state.updated with invalid scope: {scope}. '
|
||||
f'Valid scopes: {", ".join(VALID_STATE_SCOPES)}'
|
||||
)
|
||||
return False
|
||||
|
||||
# Check if scope is enabled by policy
|
||||
if not self._check_scope_enabled(scope, binding):
|
||||
if logger:
|
||||
logger.warning(
|
||||
f'Runner {descriptor.id} state.updated rejected for scope "{scope}": '
|
||||
f'scope not enabled by binding policy. Enabled scopes: {state_policy.state_scopes}'
|
||||
)
|
||||
return False
|
||||
|
||||
# Map legacy key names
|
||||
if key in LEGACY_KEY_MAPPING:
|
||||
mapped_key = LEGACY_KEY_MAPPING[key]
|
||||
if logger:
|
||||
logger.debug(
|
||||
f'Runner {descriptor.id} state.updated legacy key "{key}" mapped to "{mapped_key}"'
|
||||
)
|
||||
key = mapped_key
|
||||
|
||||
# Get scope key from event and binding
|
||||
scope_key = self._get_scope_key_from_event(scope, event, binding, descriptor)
|
||||
if scope_key is None:
|
||||
if logger:
|
||||
logger.warning(
|
||||
f'Runner {descriptor.id} state.updated for scope "{scope}" '
|
||||
f'requires missing identity (conversation_id, actor_id, or subject_id). '
|
||||
f'Skipping update.'
|
||||
)
|
||||
return False
|
||||
|
||||
# Apply update to store
|
||||
with self._lock:
|
||||
if scope_key not in self._store:
|
||||
self._store[scope_key] = {}
|
||||
self._store[scope_key][key] = value
|
||||
|
||||
if logger:
|
||||
logger.debug(
|
||||
f'Runner {descriptor.id} state.updated: scope={scope}, key={key}'
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# Global singleton state store
|
||||
_state_store: RunnerScopedStateStore | None = None
|
||||
|
||||
Reference in New Issue
Block a user