From 4e68a93df728c3e73e7cd229a65d1a9e7c1e5dc7 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Sat, 23 May 2026 19:45:57 +0800 Subject: [PATCH] feat(agent-runner): scope event-first state by binding --- .../pkg/agent/runner/context_builder.py | 11 +- src/langbot/pkg/agent/runner/orchestrator.py | 23 +- src/langbot/pkg/agent/runner/state_store.py | 320 +++++++++ tests/unit_tests/agent/test_state_store.py | 669 +++++++++++++++++- 4 files changed, 1008 insertions(+), 15 deletions(-) diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index 14fb609c..f2957427 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -261,14 +261,9 @@ class AgentRunContextBuilder: # Populate with actual values from stores context_access = await self._build_context_access(event, descriptor) - # Build state snapshot (for event-first, we need event-based scope key) - # For now, return empty state - will be implemented with state_store migration - state: AgentRunState = { - 'conversation': {}, - 'actor': {}, - 'subject': {}, - 'runner': {}, - } + # Build state snapshot from event context + state_store = get_state_store() + state: AgentRunState = state_store.build_snapshot_from_event(event, binding, descriptor) # Build runtime context runtime: AgentRuntimeContext = { diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 13358f16..160d4538 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -170,7 +170,7 @@ class AgentRunOrchestrator: # Handle state.updated first - consume before normalizer if result_dict.get('type') == 'state.updated': - self._handle_state_updated_event(result_dict, event, descriptor) + self._handle_state_updated_event(result_dict, event, binding, descriptor) # Pass to normalizer for logging, but don't yield to pipeline await self.result_normalizer.normalize(result_dict, descriptor) continue @@ -559,6 +559,7 @@ class AgentRunOrchestrator: self, result_dict: dict[str, typing.Any], event: AgentEventEnvelope, + binding: AgentBinding, descriptor: AgentRunnerDescriptor, ) -> None: """Handle state.updated result in event-first mode. @@ -566,6 +567,7 @@ class AgentRunOrchestrator: Args: result_dict: Raw result dict with type='state.updated' event: Event envelope + binding: Agent binding configuration descriptor: Runner descriptor """ data = result_dict.get('data', {}) @@ -584,13 +586,22 @@ class AgentRunOrchestrator: return # Apply update to state store using event context - # Note: state_store needs to support event-based scope key calculation - # For now, we log and skip actual persistence in event-first mode - # This will be implemented when state_store is migrated to support events - self.ap.logger.debug( - f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}, value={value}' + success = self._state_store.apply_update_from_event( + event=event, + binding=binding, + descriptor=descriptor, + scope=scope, + key=key, + value=value, + logger=self.ap.logger, ) + if success: + self.ap.logger.debug( + f'Runner {descriptor.id} state.updated (event mode): scope={scope}, key={key}' + ) + # Invalid scope or missing identity is already logged by apply_update_from_event + async def _write_event_log( self, event: AgentEventEnvelope, diff --git a/src/langbot/pkg/agent/runner/state_store.py b/src/langbot/pkg/agent/runner/state_store.py index e1a7f6e2..3eb13d11 100644 --- a/src/langbot/pkg/agent/runner/state_store.py +++ b/src/langbot/pkg/agent/runner/state_store.py @@ -7,6 +7,7 @@ import threading from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query from .descriptor import AgentRunnerDescriptor +from .host_models import AgentEventEnvelope # Valid state scopes for agent runner state updates. @@ -277,6 +278,325 @@ class RunnerScopedStateStore: with self._lock: self._store.clear() + # ========== Event-first Protocol v1 methods ========== + + def _get_binding_identity( + self, + binding: "AgentBinding", + ) -> str: + """Get stable binding identity for scope key. + + Uses binding_id if available, falls back to scope_type + scope_id. + """ + if binding.binding_id: + return binding.binding_id + + # Fallback to scope identity + scope = binding.scope + if scope.scope_type and scope.scope_id: + return f"{scope.scope_type}:{scope.scope_id}" + + # Last resort - should not happen in production + return "unknown_binding" + + def _make_conversation_scope_key_from_event( + self, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + ) -> str | None: + """Build conversation scope identity key from event and binding. + + Scope key structure: runner_id + binding_id + conversation_id + This ensures state is isolated per binding and per conversation. + + Returns None if conversation_id is missing. + """ + if not event.conversation_id: + return None + + binding_identity = self._get_binding_identity(binding) + + parts = [ + descriptor.id, + binding_identity, + event.conversation_id, + ] + + # Include thread_id if present for thread-scoped state + if event.thread_id: + parts.append(event.thread_id) + + return f'conversation:{":".join(parts)}' + + def _make_actor_scope_key_from_event( + self, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + ) -> str | None: + """Build actor scope identity key from event and binding. + + Scope key structure: runner_id + binding_id + actor_type + actor_id + This ensures state is isolated per binding and per actor. + + Returns None if actor_id is missing. + """ + if not event.actor or not event.actor.actor_id: + return None + + binding_identity = self._get_binding_identity(binding) + + parts = [ + descriptor.id, + binding_identity, + event.actor.actor_type or 'user', + event.actor.actor_id, + ] + + return f'actor:{":".join(parts)}' + + def _make_subject_scope_key_from_event( + self, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + ) -> str | None: + """Build subject scope identity key from event and binding. + + Scope key structure: runner_id + binding_id + subject_type + subject_id + This ensures state is isolated per binding and per subject. + + Returns None if subject_id is missing. + """ + if not event.subject or not event.subject.subject_id: + return None + + binding_identity = self._get_binding_identity(binding) + + parts = [ + descriptor.id, + binding_identity, + event.subject.subject_type or 'unknown', + event.subject.subject_id, + ] + + return f'subject:{":".join(parts)}' + + def _make_runner_scope_key_from_event( + self, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + ) -> str: + """Build runner scope identity key from event and binding. + + Scope key structure: runner_id + binding_id + This ensures state is isolated per binding (not shared across bindings). + """ + binding_identity = self._get_binding_identity(binding) + + parts = [ + descriptor.id, + binding_identity, + ] + + return f'runner:{":".join(parts)}' + + def _get_scope_key_from_event( + self, + scope: str, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + ) -> str | None: + """Get the storage key for a given scope from event and binding. + + Returns None if required identity is missing for the scope. + """ + if scope == 'conversation': + return self._make_conversation_scope_key_from_event(event, binding, descriptor) + elif scope == 'actor': + return self._make_actor_scope_key_from_event(event, binding, descriptor) + elif scope == 'subject': + return self._make_subject_scope_key_from_event(event, binding, descriptor) + elif scope == 'runner': + return self._make_runner_scope_key_from_event(event, binding, descriptor) + else: + return None + + def _check_scope_enabled( + self, + scope: str, + binding: "AgentBinding", + ) -> bool: + """Check if a scope is enabled by binding's state_policy. + + Args: + scope: Scope to check + binding: Agent binding with state_policy + + Returns: + True if scope is enabled, False otherwise + """ + state_policy = binding.state_policy + + # Check if state is disabled entirely + if not state_policy.enable_state: + return False + + # Check if scope is in enabled scopes + return scope in state_policy.state_scopes + + def build_snapshot_from_event( + self, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + ) -> dict[str, dict[str, typing.Any]]: + """Build state snapshot for all scopes from event and binding. + + Respects binding.state_policy: + - If enable_state is False, returns all empty scopes. + - If a scope is not in state_scopes, returns empty dict for that scope. + + Args: + event: Event envelope + binding: Agent binding configuration + descriptor: Runner descriptor + + Returns: + Dict with 4 scope keys, each containing scope state dict. + Scopes without required identity or disabled by policy will have empty dict. + """ + state_policy = binding.state_policy + + # If state is disabled, return all empty scopes + if not state_policy.enable_state: + return { + 'conversation': {}, + 'actor': {}, + 'subject': {}, + 'runner': {}, + } + + snapshot: dict[str, dict[str, typing.Any]] = { + 'conversation': {}, + 'actor': {}, + 'subject': {}, + 'runner': {}, + } + + with self._lock: + for scope in VALID_STATE_SCOPES: + # Check if scope is enabled by policy + if not self._check_scope_enabled(scope, binding): + continue + + scope_key = self._get_scope_key_from_event(scope, event, binding, descriptor) + if scope_key: + scope_state = self._store.get(scope_key, {}) + snapshot[scope] = dict(scope_state) # Copy to avoid mutation + + # Seed external.conversation_id from event.conversation_id if not already set + # Only if conversation scope is enabled + if self._check_scope_enabled('conversation', binding) and event.conversation_id: + if 'external.conversation_id' not in snapshot['conversation']: + snapshot['conversation']['external.conversation_id'] = event.conversation_id + + return snapshot + + def apply_update_from_event( + self, + event: AgentEventEnvelope, + binding: "AgentBinding", + descriptor: AgentRunnerDescriptor, + scope: str, + key: str, + value: typing.Any, + logger: typing.Any = None, + ) -> bool: + """Apply a state update to the store from event and binding context. + + Respects binding.state_policy: + - If enable_state is False, rejects the update. + - If scope is not in state_scopes, rejects the update. + + Args: + event: Event envelope + binding: Agent binding configuration + descriptor: Runner descriptor + scope: State scope (conversation, actor, subject, runner) + key: State key (should use namespace prefix like external.*) + value: State value (must be JSON-serializable) + logger: Optional logger for warnings + + Returns: + True if update applied successfully, False if invalid scope, + missing identity, or disabled by policy + """ + state_policy = binding.state_policy + + # Check if state is disabled entirely + if not state_policy.enable_state: + if logger: + logger.warning( + f'Runner {descriptor.id} state.updated rejected: ' + f'state is disabled by binding policy' + ) + return False + + # Validate scope + if scope not in VALID_STATE_SCOPES: + if logger: + logger.warning( + f'Runner {descriptor.id} state.updated with invalid scope: {scope}. ' + f'Valid scopes: {", ".join(VALID_STATE_SCOPES)}' + ) + return False + + # Check if scope is enabled by policy + if not self._check_scope_enabled(scope, binding): + if logger: + logger.warning( + f'Runner {descriptor.id} state.updated rejected for scope "{scope}": ' + f'scope not enabled by binding policy. Enabled scopes: {state_policy.state_scopes}' + ) + return False + + # Map legacy key names + if key in LEGACY_KEY_MAPPING: + mapped_key = LEGACY_KEY_MAPPING[key] + if logger: + logger.debug( + f'Runner {descriptor.id} state.updated legacy key "{key}" mapped to "{mapped_key}"' + ) + key = mapped_key + + # Get scope key from event and binding + scope_key = self._get_scope_key_from_event(scope, event, binding, descriptor) + if scope_key is None: + if logger: + logger.warning( + f'Runner {descriptor.id} state.updated for scope "{scope}" ' + f'requires missing identity (conversation_id, actor_id, or subject_id). ' + f'Skipping update.' + ) + return False + + # Apply update to store + with self._lock: + if scope_key not in self._store: + self._store[scope_key] = {} + self._store[scope_key][key] = value + + if logger: + logger.debug( + f'Runner {descriptor.id} state.updated: scope={scope}, key={key}' + ) + + return True + # Global singleton state store _state_store: RunnerScopedStateStore | None = None diff --git a/tests/unit_tests/agent/test_state_store.py b/tests/unit_tests/agent/test_state_store.py index 97089536..af85a9a6 100644 --- a/tests/unit_tests/agent/test_state_store.py +++ b/tests/unit_tests/agent/test_state_store.py @@ -9,6 +9,7 @@ from langbot.pkg.agent.runner.state_store import ( LEGACY_KEY_MAPPING, ) from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor +from langbot.pkg.agent.runner.host_models import AgentBinding, BindingScope, StatePolicy def make_descriptor(runner_id: str = 'plugin:test/my-runner/default') -> AgentRunnerDescriptor: @@ -68,6 +69,18 @@ class FakeLogger: self.warnings.append(msg) +class FakeBinding: + """Fake binding for testing event-first state.""" + def __init__( + self, + binding_id: str = 'binding_001', + state_policy: StatePolicy | None = None, + ): + self.binding_id = binding_id + self.scope = BindingScope(scope_type='pipeline', scope_id='pipeline_001') + self.state_policy = state_policy or StatePolicy() + + class TestStateStoreBuildSnapshot: """Tests for build_snapshot.""" @@ -470,4 +483,658 @@ class TestConstants: def test_legacy_key_mapping(self): """LEGACY_KEY_MAPPING should map conversation_id.""" - assert LEGACY_KEY_MAPPING == {'conversation_id': 'external.conversation_id'} \ No newline at end of file + assert LEGACY_KEY_MAPPING == {'conversation_id': 'external.conversation_id'} + + +# ========== Event-first Protocol v1 tests ========== + + +class FakeActorContext: + """Fake actor context for event testing.""" + def __init__(self, actor_type: str = 'user', actor_id: str = 'user_123', actor_name: str = 'Test User'): + self.actor_type = actor_type + self.actor_id = actor_id + self.actor_name = actor_name + + +class FakeSubjectContext: + """Fake subject context for event testing.""" + def __init__(self, subject_type: str = 'message', subject_id: str = 'msg_001', data: dict = None): + self.subject_type = subject_type + self.subject_id = subject_id + self.data = data or {} + + +class FakeAgentInput: + """Fake agent input for event testing.""" + def __init__(self, text: str = 'Hello'): + self.text = text + self.contents = [] + self.message_chain = None + self.attachments = [] + + +class FakeDeliveryContext: + """Fake delivery context for event testing.""" + def __init__(self): + self.surface = 'platform' + self.reply_target = None + self.supports_streaming = True + self.supports_edit = False + self.supports_reaction = False + self.max_message_size = None + self.platform_capabilities = {} + + +class FakeEventEnvelope: + """Fake event envelope for testing event-first state.""" + def __init__( + self, + event_id: str = 'evt_001', + event_type: str = 'message.received', + conversation_id: str = 'conv_001', + actor: FakeActorContext | None = None, + subject: FakeSubjectContext | None = None, + bot_id: str = 'bot_001', + workspace_id: str = 'ws_001', + ): + self.event_id = event_id + self.event_type = event_type + self.event_time = 1700000000 + self.source = 'platform' + self.bot_id = bot_id + self.workspace_id = workspace_id + self.conversation_id = conversation_id + self.thread_id = None + self.actor = actor or FakeActorContext() + self.subject = subject + self.input = FakeAgentInput() + self.delivery = FakeDeliveryContext() + self.raw_ref = None + + +class TestStateStoreEventFirstBuildSnapshot: + """Tests for build_snapshot_from_event.""" + + def test_build_snapshot_returns_four_scopes(self): + """Snapshot from event should have all four scope keys.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope() + binding = FakeBinding() + + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + + assert 'conversation' in snapshot + assert 'actor' in snapshot + assert 'subject' in snapshot + assert 'runner' in snapshot + + def test_build_snapshot_seeds_conversation_id(self): + """Snapshot should seed external.conversation_id from event.conversation_id.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_test') + binding = FakeBinding() + + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + + assert snapshot['conversation']['external.conversation_id'] == 'conv_test' + + def test_build_snapshot_without_conversation_id(self): + """Snapshot without conversation_id should have empty conversation scope.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id=None) + binding = FakeBinding() + + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + + assert snapshot['conversation'] == {} + + def test_build_snapshot_without_actor(self): + """Snapshot without actor should have empty actor scope.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(actor=None) + binding = FakeBinding() + + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + + assert snapshot['actor'] == {} + + def test_build_snapshot_without_subject(self): + """Snapshot without subject should have empty subject scope.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(subject=None) + binding = FakeBinding() + + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + + assert snapshot['subject'] == {} + + def test_build_snapshot_returns_stored_values(self): + """Snapshot should return previously stored values via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_001', actor=FakeActorContext(actor_id='user_001')) + # Use binding with all scopes enabled + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + logger = FakeLogger() + + # Store values using event-first methods + store.apply_update_from_event(event, binding, descriptor, 'conversation', 'memory.summary', 'Summary', logger) + store.apply_update_from_event(event, binding, descriptor, 'actor', 'preferred_language', 'en', logger) + store.apply_update_from_event(event, binding, descriptor, 'runner', 'cache_version', 'v1', logger) + + # Build snapshot + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + + assert snapshot['conversation']['memory.summary'] == 'Summary' + assert snapshot['actor']['preferred_language'] == 'en' + assert snapshot['runner']['cache_version'] == 'v1' + + def test_build_snapshot_isolation_by_runner_id(self): + """Different runner IDs should have isolated state in event-first mode.""" + store = RunnerScopedStateStore() + descriptor1 = make_descriptor('plugin:test/runner-a/default') + descriptor2 = make_descriptor('plugin:test/runner-b/default') + event = FakeEventEnvelope(conversation_id='conv_001') + binding = FakeBinding() + logger = FakeLogger() + + # Store for runner-a + store.apply_update_from_event(event, binding, descriptor1, 'conversation', 'key', 'value_a', logger) + + # Build snapshot for runner-b + snapshot_b = store.build_snapshot_from_event(event, binding, descriptor2) + + # runner-b should not see runner-a's state (only external.conversation_id seeded) + assert snapshot_b['conversation'] == {'external.conversation_id': 'conv_001'} + + +class TestStateStoreEventFirstApplyUpdate: + """Tests for apply_update_from_event.""" + + def test_apply_update_conversation_scope(self): + """Apply update to conversation scope via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_001') + binding = FakeBinding() + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'conversation', 'memory.summary', 'Summary', logger + ) + + assert result is True + assert len(logger.warnings) == 0 + + def test_apply_update_actor_scope(self): + """Apply update to actor scope via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(actor=FakeActorContext(actor_id='user_001')) + binding = FakeBinding() + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'actor', 'preferred_language', 'en', logger + ) + + assert result is True + assert len(logger.warnings) == 0 + + def test_apply_update_subject_scope(self): + """Apply update to subject scope via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(subject=FakeSubjectContext(subject_id='msg_001')) + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'subject', 'group_topic', 'general', logger + ) + + assert result is True + assert len(logger.warnings) == 0 + + def test_apply_update_runner_scope(self): + """Apply update to runner scope via event (always works).""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope() # No special identity needed + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'runner', 'cache_version', 'v2', logger + ) + + assert result is True + assert len(logger.warnings) == 0 + + def test_apply_update_invalid_scope(self): + """Invalid scope should return False and log warning.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope() + binding = FakeBinding() + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'invalid_scope', 'key', 'value', logger + ) + + assert result is False + assert len(logger.warnings) == 1 + assert 'invalid scope' in logger.warnings[0] + + def test_apply_update_conversation_missing_conversation_id(self): + """Conversation scope without conversation_id should return False.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id=None) + binding = FakeBinding() + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'conversation', 'key', 'value', logger + ) + + assert result is False + assert len(logger.warnings) == 1 + assert 'missing identity' in logger.warnings[0] + + def test_apply_update_actor_missing_actor_id(self): + """Actor scope without actor_id should return False.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(actor=FakeActorContext(actor_id=None)) + binding = FakeBinding() + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'actor', 'key', 'value', logger + ) + + assert result is False + assert len(logger.warnings) == 1 + assert 'missing identity' in logger.warnings[0] + + def test_apply_update_subject_missing_subject_id(self): + """Subject scope without subject_id should return False.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(subject=FakeSubjectContext(subject_id=None)) + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'subject', 'key', 'value', logger + ) + + assert result is False + assert len(logger.warnings) == 1 + assert 'missing identity' in logger.warnings[0] + + def test_apply_update_legacy_key_mapping(self): + """Legacy key conversation_id should be mapped to external.conversation_id.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_001') + binding = FakeBinding() + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'conversation', 'conversation_id', 'conv_old', logger + ) + + assert result is True + assert 'mapped to' in logger.debugs[0] + + # Check mapped key is stored + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['conversation']['external.conversation_id'] == 'conv_old' + + +class TestStateStoreEventFirstScopeIsolation: + """Tests for scope isolation in event-first mode.""" + + def test_conversation_scope_isolated_by_conversation_id(self): + """Conversation scope should be isolated by conversation_id.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + binding = FakeBinding() + event1 = FakeEventEnvelope(conversation_id='conv_001') + event2 = FakeEventEnvelope(conversation_id='conv_002') + logger = FakeLogger() + + # Store for conv_001 + store.apply_update_from_event(event1, binding, descriptor, 'conversation', 'key', 'value1', logger) + + # conv_002 should not see conv_001's state + snapshot2 = store.build_snapshot_from_event(event2, binding, descriptor) + assert snapshot2['conversation'] == {'external.conversation_id': 'conv_002'} + + # conv_001 should see its own state + snapshot1 = store.build_snapshot_from_event(event1, binding, descriptor) + assert snapshot1['conversation']['key'] == 'value1' + + def test_actor_scope_isolated_by_actor_id(self): + """Actor scope should be isolated by actor_type + actor_id.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + binding = FakeBinding() + event1 = FakeEventEnvelope(actor=FakeActorContext(actor_type='user', actor_id='user_001')) + event2 = FakeEventEnvelope(actor=FakeActorContext(actor_type='user', actor_id='user_002')) + logger = FakeLogger() + + # Store for user_001 + store.apply_update_from_event(event1, binding, descriptor, 'actor', 'preferred_language', 'en', logger) + + # user_002 should not see user_001's state + snapshot2 = store.build_snapshot_from_event(event2, binding, descriptor) + assert snapshot2['actor'] == {} + + # user_001 should see its own state + snapshot1 = store.build_snapshot_from_event(event1, binding, descriptor) + assert snapshot1['actor']['preferred_language'] == 'en' + + def test_subject_scope_isolated_by_subject_id(self): + """Subject scope should be isolated by subject_type + subject_id.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + event1 = FakeEventEnvelope(subject=FakeSubjectContext(subject_type='message', subject_id='msg_001')) + event2 = FakeEventEnvelope(subject=FakeSubjectContext(subject_type='message', subject_id='msg_002')) + logger = FakeLogger() + + # Store for msg_001 + store.apply_update_from_event(event1, binding, descriptor, 'subject', 'key', 'value1', logger) + + # msg_002 should not see msg_001's state + snapshot2 = store.build_snapshot_from_event(event2, binding, descriptor) + assert snapshot2['subject'] == {} + + # msg_001 should see its own state + snapshot1 = store.build_snapshot_from_event(event1, binding, descriptor) + assert snapshot1['subject']['key'] == 'value1' + + def test_runner_scope_shared_within_runner(self): + """Runner scope should be shared within same runner across all events.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + event1 = FakeEventEnvelope(conversation_id='conv_001') + event2 = FakeEventEnvelope(conversation_id='conv_002') + logger = FakeLogger() + + # Store for event1's runner scope + store.apply_update_from_event(event1, binding, descriptor, 'runner', 'cache_version', 'v1', logger) + + # event2 should see the same runner state + snapshot2 = store.build_snapshot_from_event(event2, binding, descriptor) + assert snapshot2['runner']['cache_version'] == 'v1' + + +class TestStateStoreEventFirstRoundTrip: + """Tests for state round trip: store -> read via event-first.""" + + def test_state_round_trip_conversation(self): + """State stored via event should be readable via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_001') + binding = FakeBinding() + logger = FakeLogger() + + # Store + store.apply_update_from_event(event, binding, descriptor, 'conversation', 'memory.summary', 'Summary', logger) + + # Read + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['conversation']['memory.summary'] == 'Summary' + + def test_state_round_trip_actor(self): + """Actor state stored via event should be readable via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(actor=FakeActorContext(actor_id='user_001')) + binding = FakeBinding() + logger = FakeLogger() + + # Store + store.apply_update_from_event(event, binding, descriptor, 'actor', 'preferred_language', 'zh', logger) + + # Read + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['actor']['preferred_language'] == 'zh' + + def test_state_round_trip_subject(self): + """Subject state stored via event should be readable via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(subject=FakeSubjectContext(subject_id='msg_001')) + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + logger = FakeLogger() + + # Store + store.apply_update_from_event(event, binding, descriptor, 'subject', 'group_topic', 'tech', logger) + + # Read + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['subject']['group_topic'] == 'tech' + + def test_state_round_trip_runner(self): + """Runner state stored via event should be readable via event.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope() + binding = FakeBinding(state_policy=StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner'])) + logger = FakeLogger() + + # Store + store.apply_update_from_event(event, binding, descriptor, 'runner', 'cache_version', 'v2', logger) + + # Read + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['runner']['cache_version'] == 'v2' + + +class TestStateStoreBindingIsolation: + """Tests for binding isolation in event-first state.""" + + def test_conversation_state_isolated_by_binding_id(self): + """Same runner, same conversation_id, different binding_id: conversation state isolated.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() # Same runner + event = FakeEventEnvelope(conversation_id='conv_001') + binding_a = FakeBinding(binding_id='binding_a') + binding_b = FakeBinding(binding_id='binding_b') + logger = FakeLogger() + + # Store for binding_a + store.apply_update_from_event(event, binding_a, descriptor, 'conversation', 'key', 'value_a', logger) + + # binding_b should not see binding_a's state + snapshot_b = store.build_snapshot_from_event(event, binding_b, descriptor) + assert snapshot_b['conversation'] == {'external.conversation_id': 'conv_001'} + + # binding_a should see its own state + snapshot_a = store.build_snapshot_from_event(event, binding_a, descriptor) + assert snapshot_a['conversation']['key'] == 'value_a' + + def test_runner_state_isolated_by_binding_id(self): + """Same runner, different binding_id: runner state isolated.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() # Same runner + event = FakeEventEnvelope() + policy = StatePolicy(state_scopes=['conversation', 'actor', 'subject', 'runner']) + binding_a = FakeBinding(binding_id='binding_a', state_policy=policy) + binding_b = FakeBinding(binding_id='binding_b', state_policy=policy) + logger = FakeLogger() + + # Store for binding_a + store.apply_update_from_event(event, binding_a, descriptor, 'runner', 'cache_version', 'v1', logger) + + # binding_b should not see binding_a's runner state + snapshot_b = store.build_snapshot_from_event(event, binding_b, descriptor) + assert snapshot_b['runner'] == {} + + # binding_a should see its own state + snapshot_a = store.build_snapshot_from_event(event, binding_a, descriptor) + assert snapshot_a['runner']['cache_version'] == 'v1' + + def test_actor_state_isolated_by_binding_id(self): + """Same runner, same actor_id, different binding_id: actor state isolated.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(actor=FakeActorContext(actor_id='user_001')) + binding_a = FakeBinding(binding_id='binding_a') + binding_b = FakeBinding(binding_id='binding_b') + logger = FakeLogger() + + # Store for binding_a + store.apply_update_from_event(event, binding_a, descriptor, 'actor', 'preferred_language', 'en', logger) + + # binding_b should not see binding_a's state + snapshot_b = store.build_snapshot_from_event(event, binding_b, descriptor) + assert snapshot_b['actor'] == {} + + # binding_a should see its own state + snapshot_a = store.build_snapshot_from_event(event, binding_a, descriptor) + assert snapshot_a['actor']['preferred_language'] == 'en' + + +class TestStateStorePolicyEnforcement: + """Tests for state policy enforcement.""" + + def test_enable_state_false_returns_empty_snapshot(self): + """enable_state=False should return all empty scopes.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_001') + policy = StatePolicy(enable_state=False) + binding = FakeBinding(state_policy=policy) + logger = FakeLogger() + + # Even if state exists, snapshot should be empty + store.apply_update_from_event( + event, FakeBinding(), descriptor, 'conversation', 'key', 'value', logger + ) + + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['conversation'] == {} + assert snapshot['actor'] == {} + assert snapshot['subject'] == {} + assert snapshot['runner'] == {} + + def test_enable_state_false_rejects_update(self): + """enable_state=False should reject state updates.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope(conversation_id='conv_001') + policy = StatePolicy(enable_state=False) + binding = FakeBinding(state_policy=policy) + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'conversation', 'key', 'value', logger + ) + + assert result is False + assert len(logger.warnings) == 1 + assert 'disabled' in logger.warnings[0] + + def test_state_scopes_restricts_enabled_scopes(self): + """state_scopes should restrict which scopes are enabled.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope( + conversation_id='conv_001', + actor=FakeActorContext(actor_id='user_001'), + ) + # Only allow conversation scope + policy = StatePolicy(state_scopes=['conversation']) + binding = FakeBinding(state_policy=policy) + logger = FakeLogger() + + # Conversation update should work + result_conv = store.apply_update_from_event( + event, binding, descriptor, 'conversation', 'key', 'value_conv', logger + ) + assert result_conv is True + + # Actor update should be rejected + result_actor = store.apply_update_from_event( + event, binding, descriptor, 'actor', 'key', 'value_actor', logger + ) + assert result_actor is False + assert any('not enabled' in w for w in logger.warnings) + + def test_state_scopes_restricts_snapshot(self): + """state_scopes should restrict which scopes appear in snapshot.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope( + conversation_id='conv_001', + actor=FakeActorContext(actor_id='user_001'), + ) + # Only allow conversation scope + policy = StatePolicy(state_scopes=['conversation']) + binding = FakeBinding(state_policy=policy) + logger = FakeLogger() + + # Store values for all scopes using a binding with all scopes enabled + full_binding = FakeBinding() + store.apply_update_from_event(event, full_binding, descriptor, 'conversation', 'conv_key', 'conv_val', logger) + store.apply_update_from_event(event, full_binding, descriptor, 'actor', 'actor_key', 'actor_val', logger) + + # Snapshot with restricted binding should only have conversation + snapshot = store.build_snapshot_from_event(event, binding, descriptor) + assert snapshot['conversation']['conv_key'] == 'conv_val' + assert snapshot['actor'] == {} # Not enabled by policy + + def test_default_state_scopes_conversation_and_actor(self): + """Default state_scopes should be conversation and actor only.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope( + conversation_id='conv_001', + subject=FakeSubjectContext(subject_id='msg_001'), + ) + binding = FakeBinding() # Uses default policy + logger = FakeLogger() + + # Conversation should work (in default scopes) + result_conv = store.apply_update_from_event( + event, binding, descriptor, 'conversation', 'key', 'value', logger + ) + assert result_conv is True + + # Subject should be rejected (not in default scopes) + result_subject = store.apply_update_from_event( + event, binding, descriptor, 'subject', 'key', 'value', logger + ) + assert result_subject is False + + def test_runner_scope_restricted_by_policy(self): + """Runner scope should be restricted by state_scopes.""" + store = RunnerScopedStateStore() + descriptor = make_descriptor() + event = FakeEventEnvelope() + # Only allow conversation scope + policy = StatePolicy(state_scopes=['conversation']) + binding = FakeBinding(state_policy=policy) + logger = FakeLogger() + + result = store.apply_update_from_event( + event, binding, descriptor, 'runner', 'key', 'value', logger + ) + + assert result is False + assert any('not enabled' in w for w in logger.warnings) \ No newline at end of file