From 897a708a13eace25ab7f1c4157e8f191af457f0c Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <60681390+huanghuoguoguo@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:41:20 +0800 Subject: [PATCH] Fix agent runner host migration and runtime guards Migrates legacy runner blocks into plugin runner configs, preserves run-scoped history boundaries, enforces operation/file authorization, and sanitizes inline attachment persistence. Also fixes plugin runner form dirty handling and adds regression coverage. --- .../pkg/agent/runner/config_migration.py | 74 +++++++++++++ src/langbot/pkg/agent/runner/config_schema.py | 38 +++++++ .../pkg/agent/runner/context_builder.py | 4 + .../pkg/agent/runner/event_log_store.py | 41 ++++++- src/langbot/pkg/agent/runner/orchestrator.py | 23 +++- .../agent/runner/persistent_state_store.py | 4 +- .../pkg/agent/runner/resource_builder.py | 39 ++++++- src/langbot/pkg/agent/runner/run_journal.py | 67 ++++++++++-- .../pkg/agent/runner/session_registry.py | 75 ++++++++++++- src/langbot/pkg/agent/runner/state_scope.py | 63 +++++++---- .../pkg/agent/runner/transcript_store.py | 58 +++++++++- .../entity/persistence/agent_runner_state.py | 2 +- .../pkg/entity/persistence/transcript.py | 7 ++ .../versions/0004_migrate_runner_config.py | 34 +++--- ...a81_add_event_log_and_transcript_tables.py | 24 +++++ ..._add_agent_runner_state_table_for_host_.py | 4 +- ...2c1d9e4f30_add_transcript_scope_columns.py | 78 ++++++++++++++ .../migrations/dbm001_migrate_v3_config.py | 23 ++-- src/langbot/pkg/pipeline/pipelinemgr.py | 6 +- .../pkg/pipeline/process/handlers/chat.py | 7 +- src/langbot/pkg/plugin/handler.py | 53 ++++++--- tests/unit_tests/agent/conftest.py | 28 +++++ tests/unit_tests/agent/test_chat_handler.py | 7 +- .../unit_tests/agent/test_config_migration.py | 21 ++-- .../agent/test_config_migration_full.py | 18 ++-- tests/unit_tests/agent/test_handler_auth.py | 47 +++++++- .../agent/test_history_event_api_auth.py | 101 ++++++++++++++++++ .../agent/test_orchestrator_integration.py | 61 +++++++++++ .../unit_tests/agent/test_resource_builder.py | 56 ++++++++-- .../unit_tests/agent/test_session_registry.py | 26 +++++ tests/unit_tests/agent/test_state_api_auth.py | 8 ++ tests/unit_tests/agent/test_state_store.py | 22 ++-- .../pipeline-form/PipelineFormComponent.tsx | 39 ++++--- 33 files changed, 1017 insertions(+), 141 deletions(-) create mode 100644 src/langbot/pkg/persistence/alembic/versions/7b2c1d9e4f30_add_transcript_scope_columns.py diff --git a/src/langbot/pkg/agent/runner/config_migration.py b/src/langbot/pkg/agent/runner/config_migration.py index c92b1912..6f8250ab 100644 --- a/src/langbot/pkg/agent/runner/config_migration.py +++ b/src/langbot/pkg/agent/runner/config_migration.py @@ -5,11 +5,23 @@ from __future__ import annotations import typing +LEGACY_RUNNER_ID_MAP: dict[str, str] = { + 'local-agent': 'plugin:langbot/local-agent/default', + 'dify-service-api': 'plugin:langbot/dify-agent/default', + 'n8n-service-api': 'plugin:langbot/n8n-agent/default', + 'coze-api': 'plugin:langbot/coze-agent/default', + 'dashscope-app-api': 'plugin:langbot/dashscope-agent/default', + 'langflow-api': 'plugin:langbot/langflow-agent/default', + 'tbox-app-api': 'plugin:langbot/tbox-agent/default', +} + + class ConfigMigration: """Configuration helper for agent runner IDs. Responsibilities: - Resolve runner ID from ai.runner.id + - Migrate legacy ai.runner.runner + ai. blocks - Extract current Agent/runner config from ai.runner_config - Keep the current config container shape stable on save """ @@ -31,6 +43,10 @@ class ConfigMigration: if runner_id: return runner_id + legacy_runner = runner_config.get('runner') + if isinstance(legacy_runner, str): + return LEGACY_RUNNER_ID_MAP.get(legacy_runner) + return None @staticmethod @@ -53,6 +69,13 @@ class ConfigMigration: if runner_id in runner_configs: return runner_configs[runner_id] + legacy_runner = ConfigMigration._legacy_runner_name_for_id(runner_id) + if legacy_runner and isinstance(ai_config.get(legacy_runner), dict): + return ConfigMigration._normalize_legacy_runner_config( + legacy_runner, + ai_config[legacy_runner], + ) + return {} @staticmethod @@ -88,8 +111,59 @@ class ConfigMigration: runner_config = dict(ai_config.get('runner', {})) runner_configs = dict(ai_config.get('runner_config', {})) + legacy_runner = runner_config.get('runner') + mapped_runner_id = None + if isinstance(legacy_runner, str): + mapped_runner_id = LEGACY_RUNNER_ID_MAP.get(legacy_runner) + + if mapped_runner_id and not runner_config.get('id'): + runner_config = { + key: value + for key, value in runner_config.items() + if key != 'runner' + } + runner_config['id'] = mapped_runner_id + + if mapped_runner_id and mapped_runner_id not in runner_configs: + legacy_config = ai_config.get(legacy_runner) + if isinstance(legacy_config, dict): + runner_configs[mapped_runner_id] = ConfigMigration._normalize_legacy_runner_config( + legacy_runner, + legacy_config, + ) + ai_config['runner'] = runner_config ai_config['runner_config'] = runner_configs + if mapped_runner_id and legacy_runner in ai_config: + ai_config.pop(legacy_runner, None) new_config['ai'] = ai_config return new_config + + @staticmethod + def _legacy_runner_name_for_id(runner_id: str) -> str | None: + for legacy_runner, mapped_runner_id in LEGACY_RUNNER_ID_MAP.items(): + if mapped_runner_id == runner_id: + return legacy_runner + return None + + @staticmethod + def _normalize_legacy_runner_config( + legacy_runner: str, + legacy_config: dict[str, typing.Any], + ) -> dict[str, typing.Any]: + """Normalize legacy runner config blocks to current plugin schema quirks.""" + normalized = dict(legacy_config) + + if legacy_runner == 'local-agent': + model = normalized.get('model') + if isinstance(model, str): + normalized['model'] = { + 'primary': model, + 'fallbacks': [], + } + knowledge_base = normalized.pop('knowledge-base', None) + if 'knowledge-bases' not in normalized and isinstance(knowledge_base, str): + normalized['knowledge-bases'] = [] if knowledge_base in {'', '__none__', '__none'} else [knowledge_base] + + return normalized diff --git a/src/langbot/pkg/agent/runner/config_schema.py b/src/langbot/pkg/agent/runner/config_schema.py index da1a3d12..6ee98107 100644 --- a/src/langbot/pkg/agent/runner/config_schema.py +++ b/src/langbot/pkg/agent/runner/config_schema.py @@ -9,6 +9,7 @@ from .descriptor import AgentRunnerDescriptor LLM_MODEL_SELECTOR_TYPES = {'model-fallback-selector', 'llm-model-selector'} KB_SELECTOR_TYPES = {'knowledge-base-multi-selector'} PROMPT_EDITOR_TYPES = {'prompt-editor'} +FILE_SELECTOR_TYPES = {'file', 'array[file]'} NONE_SENTINELS = {'', '__none__', '__none'} @@ -119,6 +120,43 @@ def extract_knowledge_base_uuids( return list(dict.fromkeys(kb_uuids)) +def extract_config_file_resources( + descriptor: AgentRunnerDescriptor | None, + runner_config: dict[str, typing.Any], +) -> list[dict[str, typing.Any]]: + """Extract uploaded config file resources from schema-defined file fields.""" + files: list[dict[str, typing.Any]] = [] + + def append_file(value: typing.Any) -> None: + if not isinstance(value, dict): + return + file_key = value.get('file_key') or value.get('file_id') + if not isinstance(file_key, str) or file_key in NONE_SENTINELS: + return + files.append({ + 'file_id': file_key, + 'file_name': value.get('file_name') or value.get('name'), + 'mime_type': value.get('mime_type') or value.get('mimetype'), + 'source': 'config', + }) + + for item in iter_schema_items(descriptor, FILE_SELECTOR_TYPES): + field_name = item.get('name') + if not field_name: + continue + value = runner_config.get(field_name, item.get('default')) + if item.get('type') == 'file': + append_file(value) + elif isinstance(value, list): + for entry in value: + append_file(entry) + + deduped: dict[str, dict[str, typing.Any]] = {} + for file_resource in files: + deduped.setdefault(file_resource['file_id'], file_resource) + return list(deduped.values()) + + def iter_config_model_refs( descriptor: AgentRunnerDescriptor, runner_config: dict[str, typing.Any], diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index bf1d0485..03dbca18 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -65,6 +65,7 @@ class ModelResource(typing.TypedDict): model_id: str model_type: str | None provider: str | None + operations: list[str] class ToolResource(typing.TypedDict): @@ -73,6 +74,7 @@ class ToolResource(typing.TypedDict): tool_name: str tool_type: str | None description: str | None + operations: list[str] class KnowledgeBaseResource(typing.TypedDict): @@ -81,6 +83,7 @@ class KnowledgeBaseResource(typing.TypedDict): kb_id: str kb_name: str | None kb_type: str | None + operations: list[str] class SkillResource(typing.TypedDict): @@ -98,6 +101,7 @@ class FileResource(typing.TypedDict): file_name: str | None mime_type: str | None source: str | None + operations: list[str] class StorageResource(typing.TypedDict): diff --git a/src/langbot/pkg/agent/runner/event_log_store.py b/src/langbot/pkg/agent/runner/event_log_store.py index 7e146c66..d8c5b4e3 100644 --- a/src/langbot/pkg/agent/runner/event_log_store.py +++ b/src/langbot/pkg/agent/runner/event_log_store.py @@ -141,6 +141,10 @@ class EventLogStore: event_types: list[str] | None = None, before_seq: int | None = None, limit: int = 50, + bot_id: str | None = None, + workspace_id: str | None = None, + thread_id: str | None = None, + strict_thread: bool = False, ) -> tuple[list[dict[str, typing.Any]], int | None, bool]: """Page through event records. @@ -149,6 +153,10 @@ class EventLogStore: event_types: Filter by event types before_seq: Get events before this sequence number limit: Maximum items to return (capped at 100) + bot_id: Optional bot scope filter + workspace_id: Optional workspace scope filter + thread_id: Optional thread scope filter + strict_thread: When true, require thread_id equality including NULL Returns: Tuple of (items, next_seq, has_more) @@ -160,6 +168,7 @@ class EventLogStore: if conversation_id is not None: query = query.where(EventLog.conversation_id == conversation_id) + query = self._apply_scope_filters(query, bot_id, workspace_id, thread_id, strict_thread) if event_types: query = query.where(EventLog.event_type.in_(event_types)) @@ -206,6 +215,10 @@ class EventLogStore: self, conversation_id: str, seq: int, + bot_id: str | None = None, + workspace_id: str | None = None, + thread_id: str | None = None, + strict_thread: bool = False, ) -> bool: """Check if there are events before a sequence number. @@ -217,17 +230,35 @@ class EventLogStore: True if there are events before """ async with self._session_factory() as session: - result = await session.execute( + query = ( sqlalchemy.select(sqlalchemy.func.count()) .select_from(EventLog) - .where( - EventLog.conversation_id == conversation_id, - EventLog.id < seq, - ) + .where(EventLog.conversation_id == conversation_id, EventLog.id < seq) ) + query = self._apply_scope_filters(query, bot_id, workspace_id, thread_id, strict_thread) + result = await session.execute(query) count = result.scalar() return count > 0 + def _apply_scope_filters( + self, + query: typing.Any, + bot_id: str | None, + workspace_id: str | None, + thread_id: str | None, + strict_thread: bool, + ) -> typing.Any: + if bot_id is not None: + query = query.where(EventLog.bot_id == bot_id) + if workspace_id is not None: + query = query.where(EventLog.workspace_id == workspace_id) + if strict_thread: + if thread_id is None: + query = query.where(EventLog.thread_id.is_(None)) + else: + query = query.where(EventLog.thread_id == thread_id) + return query + async def cleanup_events_older_than( self, before: datetime.datetime, diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 0be102f8..7cd35b7e 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -109,6 +109,9 @@ class AgentRunOrchestrator: resources=resources, available_apis=context.get('context', {}).get('available_apis'), conversation_id=event.conversation_id, + bot_id=event.bot_id, + workspace_id=event.workspace_id, + thread_id=event.thread_id, state_policy={ 'enable_state': binding.state_policy.enable_state, 'state_scopes': list(binding.state_policy.state_scopes), @@ -136,6 +139,7 @@ class AgentRunOrchestrator: pending_artifact_refs: list[dict[str, typing.Any]] = [] seen_sequences: set[int] = set() last_sequence = 0 + assistant_transcript_written = False try: async for result_dict in self.invoker.invoke(descriptor, context): @@ -187,11 +191,25 @@ class AgentRunOrchestrator: continue if result_type == 'state.updated': - await self.journal.handle_state_updated_event(result_dict, event, binding, descriptor) + await self.journal.handle_state_updated_event( + result_dict, + event, + binding, + descriptor, + run_id=run_id, + ) await self.result_normalizer.normalize(result_dict, descriptor) continue - if result_type == 'message.completed' and event.conversation_id: + has_completed_message = ( + result_type == 'message.completed' + or ( + result_type == 'run.completed' + and isinstance(result_dict.get('data'), dict) + and bool(result_dict['data'].get('message')) + ) + ) + if has_completed_message and event.conversation_id and not assistant_transcript_written: merged_refs = self.journal.merge_artifact_refs( pending_artifact_refs, result_dict, @@ -205,6 +223,7 @@ class AgentRunOrchestrator: runner_id=descriptor.id, artifact_refs=merged_refs if merged_refs else None, ) + assistant_transcript_written = True result = await self.result_normalizer.normalize(result_dict, descriptor) if result is not None: diff --git a/src/langbot/pkg/agent/runner/persistent_state_store.py b/src/langbot/pkg/agent/runner/persistent_state_store.py index ba9df2d3..e5c2ad56 100644 --- a/src/langbot/pkg/agent/runner/persistent_state_store.py +++ b/src/langbot/pkg/agent/runner/persistent_state_store.py @@ -361,10 +361,8 @@ class PersistentStateStore: delete(AgentRunnerState) .where(AgentRunnerState.scope_key == scope_key) .where(AgentRunnerState.state_key == state_key) - .returning(AgentRunnerState.id) ) - deleted = result.first() - return deleted is not None + return (result.rowcount or 0) > 0 async def state_list( self, diff --git a/src/langbot/pkg/agent/runner/resource_builder.py b/src/langbot/pkg/agent/runner/resource_builder.py index 4a2b6119..02f27017 100644 --- a/src/langbot/pkg/agent/runner/resource_builder.py +++ b/src/langbot/pkg/agent/runner/resource_builder.py @@ -11,6 +11,7 @@ from .context_builder import ( ToolResource, KnowledgeBaseResource, SkillResource, + FileResource, StorageResource, ) from . import config_schema @@ -79,13 +80,14 @@ class AgentResourceBuilder: resource_policy, descriptor ) storage = self._build_storage_from_binding(manifest_perms, binding) + files = self._build_files_from_binding(manifest_perms, descriptor, runner_config) return { 'models': models, 'tools': tools, 'knowledge_bases': knowledge_bases, 'skills': skills, - 'files': [], # Files are populated at runtime + 'files': files, 'storage': storage, 'platform_capabilities': {}, # Reserved for EBA } @@ -104,6 +106,7 @@ class AgentResourceBuilder: model_perms = set(manifest_perms.models) include_llm = bool({'invoke', 'stream'} & model_perms) include_rerank = 'rerank' in model_perms + llm_operations = [operation for operation in ('invoke', 'stream') if operation in model_perms] if not include_llm and not include_rerank: return models @@ -118,12 +121,13 @@ class AgentResourceBuilder: runner_config=runner_config, include_llm=include_llm, include_rerank=include_rerank, + llm_operations=llm_operations, ) # Add explicitly allowed models if allowed_uuids and include_llm: for model_uuid in allowed_uuids: - await self._append_llm_model_resource(models, seen_model_ids, model_uuid) + await self._append_llm_model_resource(models, seen_model_ids, model_uuid, llm_operations) return models @@ -144,6 +148,7 @@ class AgentResourceBuilder: # Get tool names from resource policy allowed_names = resource_policy.allowed_tool_names + tool_operations = [operation for operation in ('detail', 'call') if operation in tool_perms] if allowed_names: for tool_name in allowed_names: @@ -151,6 +156,7 @@ class AgentResourceBuilder: 'tool_name': tool_name, 'tool_type': None, 'description': None, + 'operations': tool_operations, }) return tools @@ -167,6 +173,7 @@ class AgentResourceBuilder: kb_perms = set(manifest_perms.knowledge_bases) if not ({'list', 'retrieve'} & kb_perms): return kb_resources + kb_operations = [operation for operation in ('list', 'retrieve') if operation in kb_perms] if not config_schema.uses_host_knowledge_bases(descriptor): return kb_resources @@ -187,6 +194,7 @@ class AgentResourceBuilder: 'kb_id': kb_uuid, 'kb_name': kb.get_name(), 'kb_type': kb.knowledge_base_entity.kb_type if hasattr(kb.knowledge_base_entity, 'kb_type') else None, + 'operations': kb_operations, }) except Exception as e: self.ap.logger.warning(f'Failed to build knowledge base resource {kb_uuid}: {e}') @@ -223,6 +231,27 @@ class AgentResourceBuilder: }) return skills + def _build_files_from_binding( + self, + manifest_perms: typing.Any, + descriptor: AgentRunnerDescriptor, + runner_config: dict[str, typing.Any], + ) -> list[FileResource]: + """Build config/knowledge file resources selected in runner config.""" + file_perms = set(manifest_perms.files) + operations = [operation for operation in ('config', 'knowledge') if operation in file_perms] + if not operations: + return [] + + files: list[FileResource] = [] + if 'config' in file_perms: + for file_resource in config_schema.extract_config_file_resources(descriptor, runner_config): + files.append({ + **file_resource, + 'operations': ['config'], + }) + return files + def _build_storage_from_binding( self, manifest_perms: typing.Any, @@ -245,11 +274,12 @@ class AgentResourceBuilder: runner_config: dict[str, typing.Any], include_llm: bool, include_rerank: bool, + llm_operations: list[str], ) -> None: """Authorize model-like values selected through DynamicForm fields.""" for model_type, model_uuid in config_schema.iter_config_model_refs(descriptor, runner_config): if model_type == 'llm' and include_llm: - await self._append_llm_model_resource(models, seen_model_ids, model_uuid) + await self._append_llm_model_resource(models, seen_model_ids, model_uuid, llm_operations) elif model_type == 'rerank' and include_rerank: await self._append_rerank_model_resource(models, seen_model_ids, model_uuid) @@ -258,6 +288,7 @@ class AgentResourceBuilder: models: list[ModelResource], seen_model_ids: set[str], model_uuid: str | None, + operations: list[str], ) -> None: """Append an LLM model resource if it exists and has not been added.""" if not model_uuid or model_uuid == '__none__' or model_uuid in seen_model_ids: @@ -270,6 +301,7 @@ class AgentResourceBuilder: 'model_id': model_uuid, 'model_type': getattr(model.model_entity, 'model_type', None), 'provider': getattr(model.provider_entity, 'name', None) if hasattr(model, 'provider_entity') else None, + 'operations': operations, }) seen_model_ids.add(model_uuid) except Exception as e: @@ -292,6 +324,7 @@ class AgentResourceBuilder: 'model_id': model_uuid, 'model_type': getattr(model.model_entity, 'model_type', 'rerank') or 'rerank', 'provider': getattr(model.provider_entity, 'name', None) if hasattr(model, 'provider_entity') else None, + 'operations': ['rerank'], }) seen_model_ids.add(model_uuid) except Exception as e: diff --git a/src/langbot/pkg/agent/runner/run_journal.py b/src/langbot/pkg/agent/runner/run_journal.py index 30405f98..588d6dd5 100644 --- a/src/langbot/pkg/agent/runner/run_journal.py +++ b/src/langbot/pkg/agent/runner/run_journal.py @@ -26,16 +26,62 @@ class AgentRunJournal: self.ap = ap self._persistent_state_store = None + @staticmethod + def _to_plain_dict(value: typing.Any) -> dict[str, typing.Any]: + if hasattr(value, 'model_dump'): + value = value.model_dump(mode='json') + if isinstance(value, dict): + return dict(value) + return {} + + @classmethod + def _sanitize_content_item(cls, value: typing.Any) -> typing.Any: + item = cls._to_plain_dict(value) + if not item: + return value + item_type = item.get('type') + if item_type == 'image_base64' and item.get('image_base64'): + item['image_base64'] = None + item['content_redacted'] = True + elif item_type == 'file_base64' and item.get('file_base64'): + item['file_base64'] = None + item['content_redacted'] = True + return item + + @classmethod + def _sanitize_attachment_ref(cls, value: typing.Any) -> dict[str, typing.Any]: + item = cls._to_plain_dict(value) + if item.get('content'): + item['content'] = None + item['content_redacted'] = True + return item + + @classmethod + def _sanitize_contents(cls, contents: typing.Iterable[typing.Any]) -> list[typing.Any]: + return [cls._sanitize_content_item(content) for content in contents] + + @classmethod + def _sanitize_attachments(cls, attachments: typing.Iterable[typing.Any]) -> list[dict[str, typing.Any]]: + return [cls._sanitize_attachment_ref(attachment) for attachment in attachments] + async def handle_state_updated_event( self, result_dict: dict[str, typing.Any], event: AgentEventEnvelope, binding: AgentBinding, descriptor: AgentRunnerDescriptor, + run_id: str | None = None, ) -> None: """Handle state.updated result in event-first mode.""" data = result_dict.get('data', {}) + result_run_id = result_dict.get('run_id') + if run_id and result_run_id and result_run_id != run_id: + raise RunnerProtocolError( + descriptor.id, + f'state.updated run_id mismatch: expected {run_id}, got {result_run_id}', + ) + scope = data.get('scope') if not scope: raise RunnerProtocolError( @@ -98,8 +144,8 @@ class AgentRunJournal: input_summary = event.input.text[:1000] input_json = { 'text': event.input.text, - 'contents': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents], - 'attachments': [a.model_dump(mode='json') if hasattr(a, 'model_dump') else a for a in event.input.attachments], + 'contents': self._sanitize_contents(event.input.contents), + 'attachments': self._sanitize_attachments(event.input.attachments), } return await store.append_event( @@ -230,19 +276,21 @@ class AgentRunJournal: if event.input: content_json = { 'role': 'user', - 'content': [c.model_dump(mode='json') if hasattr(c, 'model_dump') else c for c in event.input.contents] if event.input.contents else [], + 'content': self._sanitize_contents(event.input.contents) if event.input.contents else [], } artifact_refs = [] if event.input and event.input.attachments: for a in event.input.attachments: - artifact_refs.append(a.model_dump(mode='json') if hasattr(a, 'model_dump') else a) + artifact_refs.append(self._sanitize_attachment_ref(a)) await store.append_transcript( transcript_id=None, event_id=event_log_id, conversation_id=event.conversation_id, role='user', + bot_id=event.bot_id, + workspace_id=event.workspace_id, content=content, content_json=content_json, artifact_refs=artifact_refs if artifact_refs else None, @@ -438,8 +486,8 @@ class AgentRunJournal: input_summary=input_summary, input_json={ 'text': text, - 'contents': input_data.get('contents') or [], - 'attachments': input_data.get('attachments') or [], + 'contents': self._sanitize_contents(input_data.get('contents') or []), + 'attachments': self._sanitize_attachments(input_data.get('attachments') or []), }, run_id=run_id, runner_id=runner_id, @@ -486,7 +534,10 @@ class AgentRunJournal: if isinstance(c, dict) and c.get('type') == 'text': text_parts.append(c.get('text', '')) content = ' '.join(text_parts) if text_parts else None - content_json = message + content_json = { + **message, + 'content': self._sanitize_contents(message['content']), + } assistant_event_id = str(uuid.uuid4()) @@ -495,6 +546,8 @@ class AgentRunJournal: event_id=assistant_event_id, conversation_id=event.conversation_id, role='assistant', + bot_id=event.bot_id, + workspace_id=event.workspace_id, content=content, content_json=content_json, artifact_refs=artifact_refs, diff --git a/src/langbot/pkg/agent/runner/session_registry.py b/src/langbot/pkg/agent/runner/session_registry.py index 85bce3b4..dd879a8c 100644 --- a/src/langbot/pkg/agent/runner/session_registry.py +++ b/src/langbot/pkg/agent/runner/session_registry.py @@ -12,6 +12,14 @@ from .context_builder import AgentResources MAX_STEERING_QUEUE_ITEMS = 100 +DEFAULT_RESOURCE_OPERATIONS: dict[str, set[str]] = { + 'model': {'invoke', 'stream', 'rerank'}, + 'tool': {'detail', 'call'}, + 'knowledge_base': {'list', 'retrieve'}, + 'file': {'config', 'knowledge'}, + 'skill': {'activate'}, +} + class AgentRunSessionStatus(typing.TypedDict): """Status tracking for agent run session.""" @@ -30,9 +38,13 @@ class RunAuthorizationSnapshot(typing.TypedDict): resources: AgentResources available_apis: dict[str, bool] conversation_id: str | None + bot_id: str | None + workspace_id: str | None + thread_id: str | None state_policy: dict[str, typing.Any] state_context: dict[str, typing.Any] authorized_ids: dict[str, set[str]] + authorized_operations: dict[str, dict[str, set[str]]] SteeringQueueItem = dict[str, typing.Any] @@ -87,6 +99,9 @@ class AgentRunSessionRegistry: plugin_identity: str, resources: AgentResources, conversation_id: str | None = None, + bot_id: str | None = None, + workspace_id: str | None = None, + thread_id: str | None = None, available_apis: dict[str, bool] | None = None, state_policy: dict[str, typing.Any] | None = None, state_context: dict[str, typing.Any] | None = None, @@ -100,6 +115,9 @@ class AgentRunSessionRegistry: plugin_identity: Plugin identifier (author/name) resources: Authorized resources for this run conversation_id: Conversation ID for history/event access + bot_id: Bot UUID for history/event access + workspace_id: Workspace ID for history/event access + thread_id: Thread ID for history/event access available_apis: Run-scoped pull APIs exposed in AgentRunContext state_policy: State policy from binding (enable_state, state_scopes) state_context: Context for state API (scope_keys, binding_identity, etc.) @@ -120,9 +138,13 @@ class AgentRunSessionRegistry: 'resources': resources_snapshot, 'available_apis': available_apis, 'conversation_id': conversation_id, + 'bot_id': bot_id, + 'workspace_id': workspace_id, + 'thread_id': thread_id, 'state_policy': copy.deepcopy(state_policy), 'state_context': copy.deepcopy(state_context), 'authorized_ids': self._build_authorized_ids(resources_snapshot), + 'authorized_operations': self._build_authorized_operations(resources_snapshot), } session: AgentRunSession = { @@ -151,6 +173,47 @@ class AgentRunSessionRegistry: 'file': {f.get('file_id') for f in resources.get('files', [])}, } + def _build_authorized_operations( + self, + resources: AgentResources, + ) -> dict[str, dict[str, set[str]]]: + """Pre-compute resource operations for runtime action validation.""" + return { + 'model': { + m.get('model_id'): self._resource_operations('model', m) + for m in resources.get('models', []) + if m.get('model_id') + }, + 'tool': { + t.get('tool_name'): self._resource_operations('tool', t) + for t in resources.get('tools', []) + if t.get('tool_name') + }, + 'knowledge_base': { + kb.get('kb_id'): self._resource_operations('knowledge_base', kb) + for kb in resources.get('knowledge_bases', []) + if kb.get('kb_id') + }, + 'skill': { + s.get('skill_name'): self._resource_operations('skill', s) + for s in resources.get('skills', []) + if s.get('skill_name') + }, + 'file': { + f.get('file_id'): self._resource_operations('file', f) + for f in resources.get('files', []) + if f.get('file_id') + }, + } + + @staticmethod + def _resource_operations(resource_type: str, resource: dict[str, typing.Any]) -> set[str]: + """Return explicit operations or the compatibility default for old resources.""" + operations = resource.get('operations') + if isinstance(operations, list) and operations: + return {str(operation) for operation in operations} + return set(DEFAULT_RESOURCE_OPERATIONS.get(resource_type, set())) + async def unregister(self, run_id: str) -> AgentRunSession | None: """Unregister an agent run session. @@ -263,6 +326,7 @@ class AgentRunSessionRegistry: session: AgentRunSession, resource_type: str, resource_id: str, + operation: str | None = None, ) -> bool: """Check if resource access is allowed for this session. @@ -272,6 +336,7 @@ class AgentRunSessionRegistry: session: AgentRunSession to check resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage', 'file') resource_id: Resource identifier (model_id, tool_name, kb_id, 'plugin'/'workspace', file_key) + operation: Optional operation to check within the authorized resource Returns: True if resource is authorized, False otherwise @@ -281,7 +346,15 @@ class AgentRunSessionRegistry: resources = authorization['resources'] if resource_type in ('model', 'tool', 'knowledge_base', 'skill', 'file'): - return resource_id in authorized_ids.get(resource_type, set()) + if resource_id not in authorized_ids.get(resource_type, set()): + return False + if operation is None: + return True + operation_map = authorization.get('authorized_operations', {}) + operations = operation_map.get(resource_type, {}).get(resource_id) + if not operations: + operations = DEFAULT_RESOURCE_OPERATIONS.get(resource_type, set()) + return operation in operations if resource_type == 'storage': storage = resources.get('storage', {}) diff --git a/src/langbot/pkg/agent/runner/state_scope.py b/src/langbot/pkg/agent/runner/state_scope.py index 0493ff9f..f2a32bb1 100644 --- a/src/langbot/pkg/agent/runner/state_scope.py +++ b/src/langbot/pkg/agent/runner/state_scope.py @@ -1,6 +1,8 @@ """State scope key helpers for AgentRunner host-owned state.""" from __future__ import annotations +import hashlib +import json import typing from .descriptor import AgentRunnerDescriptor @@ -31,6 +33,30 @@ def get_binding_identity(binding: AgentBinding) -> str: return 'unknown_binding' +def _scope_hash(scope: str, parts: dict[str, typing.Any]) -> str: + """Encode state scope dimensions without separator ambiguity.""" + payload = { + 'version': 2, + 'scope': scope, + **parts, + } + raw = json.dumps(payload, sort_keys=True, separators=(',', ':'), ensure_ascii=False) + return f'{scope}:v2:{hashlib.sha256(raw.encode("utf-8")).hexdigest()}' + + +def _base_scope_parts( + event: AgentEventEnvelope, + binding: AgentBinding, + descriptor: AgentRunnerDescriptor, +) -> dict[str, typing.Any]: + return { + 'runner_id': descriptor.id, + 'binding_identity': get_binding_identity(binding), + 'bot_id': event.bot_id, + 'workspace_id': event.workspace_id, + } + + def build_state_scope_key( scope: str, event: AgentEventEnvelope, @@ -41,40 +67,37 @@ def build_state_scope_key( Returns None when the event lacks the identity required by that scope. """ - binding_identity = get_binding_identity(binding) + base_parts = _base_scope_parts(event, binding, descriptor) if scope == 'conversation': if not event.conversation_id: return None - parts = [descriptor.id, binding_identity, event.conversation_id] - if event.thread_id: - parts.append(event.thread_id) - return f'conversation:{":".join(parts)}' + return _scope_hash(scope, { + **base_parts, + 'conversation_id': event.conversation_id, + 'thread_id': event.thread_id, + }) if scope == 'actor': if not event.actor or not event.actor.actor_id: return None - parts = [ - descriptor.id, - binding_identity, - event.actor.actor_type or 'user', - event.actor.actor_id, - ] - return f'actor:{":".join(parts)}' + return _scope_hash(scope, { + **base_parts, + 'actor_type': event.actor.actor_type or 'user', + 'actor_id': event.actor.actor_id, + }) if scope == 'subject': if not event.subject or not event.subject.subject_id: return None - parts = [ - descriptor.id, - binding_identity, - event.subject.subject_type or 'unknown', - event.subject.subject_id, - ] - return f'subject:{":".join(parts)}' + return _scope_hash(scope, { + **base_parts, + 'subject_type': event.subject.subject_type or 'unknown', + 'subject_id': event.subject.subject_id, + }) if scope == 'runner': - return f'runner:{descriptor.id}:{binding_identity}' + return _scope_hash(scope, base_parts) return None diff --git a/src/langbot/pkg/agent/runner/transcript_store.py b/src/langbot/pkg/agent/runner/transcript_store.py index a26436b6..0a91f9d8 100644 --- a/src/langbot/pkg/agent/runner/transcript_store.py +++ b/src/langbot/pkg/agent/runner/transcript_store.py @@ -39,6 +39,8 @@ class TranscriptStore: event_id: str, conversation_id: str, role: str, + bot_id: str | None = None, + workspace_id: str | None = None, content: str | None = None, content_json: dict[str, typing.Any] | None = None, artifact_refs: list[dict[str, typing.Any]] | None = None, @@ -55,6 +57,8 @@ class TranscriptStore: event_id: Source event ID conversation_id: Conversation ID role: Message role (user, assistant, system, tool) + bot_id: Bot UUID scope + workspace_id: Workspace scope content: Text content content_json: Full structured content artifact_refs: Artifact references @@ -78,6 +82,8 @@ class TranscriptStore: item = Transcript( transcript_id=transcript_id, event_id=event_id, + bot_id=bot_id, + workspace_id=workspace_id, conversation_id=conversation_id, thread_id=thread_id, role=role, @@ -106,6 +112,10 @@ class TranscriptStore: limit: int = 50, direction: str = "backward", include_artifacts: bool = False, + bot_id: str | None = None, + workspace_id: str | None = None, + thread_id: str | None = None, + strict_thread: bool = False, ) -> tuple[list[dict[str, typing.Any]], int | None, int | None, bool]: """Page through transcript items. @@ -116,6 +126,10 @@ class TranscriptStore: limit: Maximum items to return (capped at 100) direction: 'backward' (older) or 'forward' (newer) include_artifacts: Include artifact refs + bot_id: Optional bot scope filter + workspace_id: Optional workspace scope filter + thread_id: Optional thread scope filter + strict_thread: When true, require thread_id equality including NULL Returns: Tuple of (items, next_seq, prev_seq, has_more) @@ -126,6 +140,7 @@ class TranscriptStore: query = sqlalchemy.select(Transcript).where( Transcript.conversation_id == conversation_id ) + query = self._apply_scope_filters(query, bot_id, workspace_id, thread_id, strict_thread) if direction == "backward" and before_seq is not None: query = query.where(Transcript.seq < before_seq) @@ -168,6 +183,10 @@ class TranscriptStore: query_text: str, filters: dict[str, typing.Any] | None = None, top_k: int = 10, + bot_id: str | None = None, + workspace_id: str | None = None, + thread_id: str | None = None, + strict_thread: bool = False, ) -> list[dict[str, typing.Any]]: """Search transcript items. @@ -178,6 +197,10 @@ class TranscriptStore: query_text: Search query filters: Optional filters top_k: Maximum results + bot_id: Optional bot scope filter + workspace_id: Optional workspace scope filter + thread_id: Optional thread scope filter + strict_thread: When true, require thread_id equality including NULL Returns: List of matching items @@ -187,6 +210,7 @@ class TranscriptStore: Transcript.conversation_id == conversation_id, Transcript.content.ilike(f"%{query_text}%"), ) + query = self._apply_scope_filters(query, bot_id, workspace_id, thread_id, strict_thread) # Apply additional filters if filters: @@ -254,6 +278,10 @@ class TranscriptStore: self, conversation_id: str, seq: int, + bot_id: str | None = None, + workspace_id: str | None = None, + thread_id: str | None = None, + strict_thread: bool = False, ) -> bool: """Check if there is history before a sequence number. @@ -265,17 +293,35 @@ class TranscriptStore: True if there are items before """ async with self._session_factory() as session: - result = await session.execute( + query = ( sqlalchemy.select(sqlalchemy.func.count()) .select_from(Transcript) - .where( - Transcript.conversation_id == conversation_id, - Transcript.seq < seq, - ) + .where(Transcript.conversation_id == conversation_id, Transcript.seq < seq) ) + query = self._apply_scope_filters(query, bot_id, workspace_id, thread_id, strict_thread) + result = await session.execute(query) count = result.scalar() return count > 0 + def _apply_scope_filters( + self, + query: typing.Any, + bot_id: str | None, + workspace_id: str | None, + thread_id: str | None, + strict_thread: bool, + ) -> typing.Any: + if bot_id is not None: + query = query.where(Transcript.bot_id == bot_id) + if workspace_id is not None: + query = query.where(Transcript.workspace_id == workspace_id) + if strict_thread: + if thread_id is None: + query = query.where(Transcript.thread_id.is_(None)) + else: + query = query.where(Transcript.thread_id == thread_id) + return query + async def cleanup_transcripts_older_than( self, before: datetime.datetime, @@ -307,6 +353,8 @@ class TranscriptStore: result = { 'transcript_id': row.transcript_id, 'event_id': row.event_id, + 'bot_id': row.bot_id, + 'workspace_id': row.workspace_id, 'conversation_id': row.conversation_id, 'thread_id': row.thread_id, 'role': row.role, diff --git a/src/langbot/pkg/entity/persistence/agent_runner_state.py b/src/langbot/pkg/entity/persistence/agent_runner_state.py index 9d91c1d3..93f297f4 100644 --- a/src/langbot/pkg/entity/persistence/agent_runner_state.py +++ b/src/langbot/pkg/entity/persistence/agent_runner_state.py @@ -39,7 +39,7 @@ class AgentRunnerState(Base): scope = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) """State scope: 'conversation', 'actor', 'subject', or 'runner'.""" - scope_key = sqlalchemy.Column(sqlalchemy.String(512), nullable=False, index=True) + scope_key = sqlalchemy.Column(sqlalchemy.String(512), nullable=False) """Full scope key for unique lookup (includes all identity parts).""" state_key = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) diff --git a/src/langbot/pkg/entity/persistence/transcript.py b/src/langbot/pkg/entity/persistence/transcript.py index da0a894c..0c8f4737 100644 --- a/src/langbot/pkg/entity/persistence/transcript.py +++ b/src/langbot/pkg/entity/persistence/transcript.py @@ -25,6 +25,12 @@ class Transcript(Base): event_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) """Reference to the source event in EventLog.""" + bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True) + """Bot UUID this item belongs to.""" + + workspace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) + """Workspace this item belongs to.""" + conversation_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True) """Conversation this item belongs to.""" @@ -69,4 +75,5 @@ class Transcript(Base): __table_args__ = ( sqlalchemy.Index('ix_transcript_conversation_seq', 'conversation_id', 'seq'), sqlalchemy.Index('ix_transcript_conversation_created', 'conversation_id', 'created_at'), + sqlalchemy.Index('ix_transcript_scope_seq', 'bot_id', 'workspace_id', 'conversation_id', 'thread_id', 'seq'), ) diff --git a/src/langbot/pkg/persistence/alembic/versions/0004_migrate_runner_config.py b/src/langbot/pkg/persistence/alembic/versions/0004_migrate_runner_config.py index 5e97e3d3..40f393b9 100644 --- a/src/langbot/pkg/persistence/alembic/versions/0004_migrate_runner_config.py +++ b/src/langbot/pkg/persistence/alembic/versions/0004_migrate_runner_config.py @@ -9,24 +9,24 @@ import json import sqlalchemy as sa from alembic import op +from langbot.pkg.agent.runner.config_migration import ConfigMigration + revision = '0004_migrate_runner_config' down_revision = '0003_add_rerank_models' branch_labels = None depends_on = None def migrate_pipeline_config(config: dict) -> dict: - """Keep current AgentRunner config containers explicit.""" - new_config = dict(config) - if 'ai' not in new_config: - return new_config + """Migrate persisted pipeline config to the AgentRunner plugin shape.""" + return ConfigMigration.migrate_pipeline_config(config) - ai_config = dict(new_config.get('ai', {})) - ai_config['runner'] = dict(ai_config.get('runner', {})) - ai_config['runner_config'] = dict(ai_config.get('runner_config', {})) - new_config['ai'] = ai_config - - return new_config +def _load_config(config_value): + if isinstance(config_value, dict): + return config_value + if isinstance(config_value, str): + return json.loads(config_value) + return None def upgrade() -> None: @@ -34,12 +34,14 @@ def upgrade() -> None: conn = op.get_bind() inspector = sa.inspect(conn) - # Check if pipelines table exists (may not exist in fresh install) - if 'pipelines' not in inspector.get_table_names(): + table_name = 'legacy_pipelines' + + # Check if pipeline table exists (may not exist in fresh install) + if table_name not in inspector.get_table_names(): return # Get all pipelines - result = conn.execute(sa.text('SELECT uuid, config FROM pipelines')) + result = conn.execute(sa.text(f'SELECT uuid, config FROM {table_name}')) pipelines = result.fetchall() for pipeline_uuid, config_json in pipelines: @@ -47,13 +49,15 @@ def upgrade() -> None: continue try: - config = json.loads(config_json) + config = _load_config(config_json) + if not isinstance(config, dict): + continue migrated_config = migrate_pipeline_config(config) # Only update if config changed if json.dumps(config, sort_keys=True) != json.dumps(migrated_config, sort_keys=True): conn.execute( - sa.text('UPDATE pipelines SET config = :config WHERE uuid = :uuid'), + sa.text(f'UPDATE {table_name} SET config = :config WHERE uuid = :uuid'), {'config': json.dumps(migrated_config), 'uuid': pipeline_uuid}, ) except Exception: diff --git a/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py b/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py index 79644352..c9d3d8fe 100644 --- a/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py +++ b/src/langbot/pkg/persistence/alembic/versions/58846a8d7a81_add_event_log_and_transcript_tables.py @@ -22,6 +22,17 @@ def _index_exists(table_name: str, index_name: str) -> bool: return index_name in {index['name'] for index in sa.inspect(op.get_bind()).get_indexes(table_name)} +def _column_exists(table_name: str, column_name: str) -> bool: + return column_name in {column['name'] for column in sa.inspect(op.get_bind()).get_columns(table_name)} + + +def _add_column_if_missing(table_name: str, column: sa.Column) -> None: + if not _table_exists(table_name) or _column_exists(table_name, column.name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.add_column(column) + + def _create_index_if_missing(table_name: str, index_name: str, columns: list[str], *, unique: bool = False) -> None: if not _table_exists(table_name) or _index_exists(table_name, index_name): return @@ -78,6 +89,8 @@ def upgrade() -> None: sa.Column('id', sa.Integer(), primary_key=True, autoincrement=True), sa.Column('transcript_id', sa.String(255), nullable=False, unique=True), sa.Column('event_id', sa.String(255), nullable=False), + sa.Column('bot_id', sa.String(255), nullable=True), + sa.Column('workspace_id', sa.String(255), nullable=True), sa.Column('conversation_id', sa.String(255), nullable=False), sa.Column('thread_id', sa.String(255), nullable=True), sa.Column('role', sa.String(50), nullable=False), @@ -91,22 +104,33 @@ def upgrade() -> None: sa.Column('created_at', sa.DateTime(), nullable=False, server_default=sa.text('(CURRENT_TIMESTAMP)')), sa.Column('metadata_json', sa.Text(), nullable=True), ) + else: + _add_column_if_missing('transcript', sa.Column('bot_id', sa.String(255), nullable=True)) + _add_column_if_missing('transcript', sa.Column('workspace_id', sa.String(255), nullable=True)) # Create indexes for transcript _create_index_if_missing('transcript', 'ix_transcript_transcript_id', ['transcript_id'], unique=True) _create_index_if_missing('transcript', 'ix_transcript_event_id', ['event_id']) + _create_index_if_missing('transcript', 'ix_transcript_bot_id', ['bot_id']) _create_index_if_missing('transcript', 'ix_transcript_conversation_id', ['conversation_id']) _create_index_if_missing('transcript', 'ix_transcript_conversation_seq', ['conversation_id', 'seq']) _create_index_if_missing('transcript', 'ix_transcript_conversation_created', ['conversation_id', 'created_at']) + _create_index_if_missing( + 'transcript', + 'ix_transcript_scope_seq', + ['bot_id', 'workspace_id', 'conversation_id', 'thread_id', 'seq'], + ) _create_index_if_missing('transcript', 'ix_transcript_run_id', ['run_id']) def downgrade() -> None: # Drop transcript table _drop_index_if_exists('transcript', 'ix_transcript_run_id') + _drop_index_if_exists('transcript', 'ix_transcript_scope_seq') _drop_index_if_exists('transcript', 'ix_transcript_conversation_created') _drop_index_if_exists('transcript', 'ix_transcript_conversation_seq') _drop_index_if_exists('transcript', 'ix_transcript_conversation_id') + _drop_index_if_exists('transcript', 'ix_transcript_bot_id') _drop_index_if_exists('transcript', 'ix_transcript_event_id') _drop_index_if_exists('transcript', 'ix_transcript_transcript_id') diff --git a/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py b/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py index dfe36c06..ddca2050 100644 --- a/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py +++ b/src/langbot/pkg/persistence/alembic/versions/6dfd3dd7f0c7_add_agent_runner_state_table_for_host_.py @@ -73,14 +73,14 @@ def upgrade() -> None: ) _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_runner_id', ['runner_id']) _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope', ['scope']) - _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope_key', ['scope_key']) + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope_key_lookup', ['scope_key']) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope_key') + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope_key_lookup') _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope') _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_runner_id') _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_runner_binding') diff --git a/src/langbot/pkg/persistence/alembic/versions/7b2c1d9e4f30_add_transcript_scope_columns.py b/src/langbot/pkg/persistence/alembic/versions/7b2c1d9e4f30_add_transcript_scope_columns.py new file mode 100644 index 00000000..99da93c0 --- /dev/null +++ b/src/langbot/pkg/persistence/alembic/versions/7b2c1d9e4f30_add_transcript_scope_columns.py @@ -0,0 +1,78 @@ +"""add transcript scope columns + +Revision ID: 7b2c1d9e4f30 +Revises: 6dfd3dd7f0c7 +Create Date: 2026-06-12 +""" +from alembic import op +import sqlalchemy as sa + + +revision = '7b2c1d9e4f30' +down_revision = '6dfd3dd7f0c7' +branch_labels = None +depends_on = None + + +def _table_exists(table_name: str) -> bool: + return table_name in sa.inspect(op.get_bind()).get_table_names() + + +def _column_exists(table_name: str, column_name: str) -> bool: + return column_name in {column['name'] for column in sa.inspect(op.get_bind()).get_columns(table_name)} + + +def _index_exists(table_name: str, index_name: str) -> bool: + return index_name in {index['name'] for index in sa.inspect(op.get_bind()).get_indexes(table_name)} + + +def _add_column_if_missing(table_name: str, column: sa.Column) -> None: + if not _table_exists(table_name) or _column_exists(table_name, column.name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.add_column(column) + + +def _create_index_if_missing(table_name: str, index_name: str, columns: list[str]) -> None: + if not _table_exists(table_name) or _index_exists(table_name, index_name): + return + existing_columns = {column['name'] for column in sa.inspect(op.get_bind()).get_columns(table_name)} + if not set(columns).issubset(existing_columns): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.create_index(index_name, columns) + + +def _drop_index_if_exists(table_name: str, index_name: str) -> None: + if not _table_exists(table_name) or not _index_exists(table_name, index_name): + return + with op.batch_alter_table(table_name, schema=None) as batch_op: + batch_op.drop_index(index_name) + + +def upgrade() -> None: + _add_column_if_missing('transcript', sa.Column('bot_id', sa.String(255), nullable=True)) + _add_column_if_missing('transcript', sa.Column('workspace_id', sa.String(255), nullable=True)) + _create_index_if_missing('transcript', 'ix_transcript_bot_id', ['bot_id']) + _create_index_if_missing( + 'transcript', + 'ix_transcript_scope_seq', + ['bot_id', 'workspace_id', 'conversation_id', 'thread_id', 'seq'], + ) + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope_key') + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope_key_lookup', ['scope_key']) + + +def downgrade() -> None: + _drop_index_if_exists('agent_runner_state', 'ix_agent_runner_state_scope_key_lookup') + _create_index_if_missing('agent_runner_state', 'ix_agent_runner_state_scope_key', ['scope_key']) + _drop_index_if_exists('transcript', 'ix_transcript_scope_seq') + _drop_index_if_exists('transcript', 'ix_transcript_bot_id') + if not _table_exists('transcript'): + return + existing_columns = {column['name'] for column in sa.inspect(op.get_bind()).get_columns('transcript')} + with op.batch_alter_table('transcript', schema=None) as batch_op: + if 'workspace_id' in existing_columns: + batch_op.drop_column('workspace_id') + if 'bot_id' in existing_columns: + batch_op.drop_column('bot_id') diff --git a/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py b/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py index 0eddacf6..19558362 100644 --- a/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py +++ b/src/langbot/pkg/persistence/migrations/dbm001_migrate_v3_config.py @@ -11,6 +11,7 @@ from ...entity.persistence import ( pipeline as persistence_pipeline, bot as persistence_bot, ) +from ...agent.runner.config_migration import LEGACY_RUNNER_ID_MAP @migration.migration_class(1) @@ -114,18 +115,28 @@ class DBMigrateV3Config(migration.DBMigration): pipeline_config = default_pipeline['config'] # ai - pipeline_config['ai']['runner'] = { - 'runner': self.ap.provider_cfg.data['runner'], + ai_config = pipeline_config.setdefault('ai', {}) + runner_name = self.ap.provider_cfg.data['runner'] + runner_id = LEGACY_RUNNER_ID_MAP.get(runner_name, '') + ai_config['runner'] = { + 'id': runner_id, } - pipeline_config['ai']['local-agent']['model'] = model_uuid + runner_configs = ai_config.setdefault('runner_config', {}) - pipeline_config['ai']['local-agent']['prompt'] = [ + local_agent_runner_id = LEGACY_RUNNER_ID_MAP['local-agent'] + local_agent_config = runner_configs.setdefault(local_agent_runner_id, {}) + local_agent_config['model'] = { + 'primary': model_uuid, + 'fallbacks': [], + } + + local_agent_config['prompt'] = [ { 'role': 'system', 'content': self.ap.provider_cfg.data['prompt']['default'], } ] - pipeline_config['ai']['dify-service-api'] = { + runner_configs[LEGACY_RUNNER_ID_MAP['dify-service-api']] = { 'base-url': self.ap.provider_cfg.data['dify-service-api']['base-url'], 'app-type': self.ap.provider_cfg.data['dify-service-api']['app-type'], 'api-key': self.ap.provider_cfg.data['dify-service-api'][ @@ -136,7 +147,7 @@ class DBMigrateV3Config(migration.DBMigration): self.ap.provider_cfg.data['dify-service-api']['app-type'] ]['timeout'], } - pipeline_config['ai']['dashscope-app-api'] = { + runner_configs[LEGACY_RUNNER_ID_MAP['dashscope-app-api']] = { 'app-type': self.ap.provider_cfg.data['dashscope-app-api']['app-type'], 'api-key': self.ap.provider_cfg.data['dashscope-app-api']['api-key'], 'references_quote': self.ap.provider_cfg.data['dashscope-app-api'][ diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index 27c12269..b494acc2 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -276,8 +276,10 @@ class RuntimePipeline: # Get runner name from pipeline config runner_name = None - if query.pipeline_config and 'ai' in query.pipeline_config and 'runner' in query.pipeline_config['ai']: - runner_name = query.pipeline_config['ai']['runner'].get('runner') + if query.pipeline_config: + from ..agent.runner.config_migration import ConfigMigration + + runner_name = ConfigMigration.resolve_runner_id(query.pipeline_config) # Record query start and store message_id message_id = '' diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index d386e5b5..7a3f755f 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -88,7 +88,12 @@ class ChatMessageHandler(handler.MessageHandler): # Mark start time for telemetry start_ts = time.time() - if await self.ap.agent_run_orchestrator.try_claim_steering_from_query(query): + try_claim_steering = getattr( + self.ap.agent_run_orchestrator, + 'try_claim_steering_from_query', + None, + ) + if try_claim_steering and await try_claim_steering(query): yield entities.StageProcessResult(result_type=entities.ResultType.INTERRUPT, new_query=query) return diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 5b45bf79..ea0e6dad 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -241,6 +241,29 @@ def _resolve_run_conversation( return session_conversation_id, None +def _run_scope_filters(session: dict[str, Any]) -> dict[str, Any]: + authorization = _get_run_authorization(session) + return { + 'bot_id': authorization.get('bot_id'), + 'workspace_id': authorization.get('workspace_id'), + 'thread_id': authorization.get('thread_id'), + 'strict_thread': True, + } + + +def _event_matches_run_scope(session: dict[str, Any], event: dict[str, Any]) -> bool: + authorization = _get_run_authorization(session) + if authorization.get('conversation_id') != event.get('conversation_id'): + return False + if authorization.get('bot_id') is not None and authorization.get('bot_id') != event.get('bot_id'): + return False + if authorization.get('workspace_id') is not None and authorization.get('workspace_id') != event.get('workspace_id'): + return False + if authorization.get('thread_id') != event.get('thread_id'): + return False + return True + + def _project_event_record_for_api(event: dict[str, Any]) -> dict[str, Any]: """Project EventLogStore rows onto the SDK AgentEventRecord DTO.""" seq = event.get('seq') or event.get('id') @@ -314,6 +337,7 @@ async def _validate_run_authorization( resource_id: str, ap: app.Application, caller_plugin_identity: str | None = None, + operation: str | None = None, ) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]: """Validate run_id authorization for a resource access. @@ -327,6 +351,7 @@ async def _validate_run_authorization( ap: Application instance for logging. caller_plugin_identity: Plugin identity (author/name) of the caller. Required when the run session is bound to a plugin identity. + operation: Optional resource operation required by the runtime action. Returns: Tuple of (session, None) if validation passes. @@ -357,12 +382,13 @@ async def _validate_run_authorization( message=f'Plugin identity mismatch: caller {caller_plugin_identity} is not authorized for run_id {run_id}', ) - if not session_registry.is_resource_allowed(session, resource_type, resource_id): + if not session_registry.is_resource_allowed(session, resource_type, resource_id, operation): ap.logger.warning( - f'{resource_type.upper()}: {resource_id} not allowed for run_id {run_id}' + f'{resource_type.upper()}: {resource_id} operation {operation or "*"} not allowed for run_id {run_id}' ) + operation_suffix = f' for operation {operation}' if operation else '' return None, handler.ActionResponse.error( - message=f'{resource_type} {resource_id} is not authorized for this agent run', + message=f'{resource_type} {resource_id} is not authorized{operation_suffix} for this agent run', ) return session, None @@ -716,7 +742,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity + run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity, operation='invoke' ) if error: return error @@ -774,7 +800,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity + run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity, operation='stream' ) if error: yield error @@ -841,7 +867,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'tool', tool_name, self.ap, caller_plugin_identity + run_id, 'tool', tool_name, self.ap, caller_plugin_identity, operation='call' ) if error: return error @@ -881,7 +907,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'tool', tool_name, self.ap, caller_plugin_identity + run_id, 'tool', tool_name, self.ap, caller_plugin_identity, operation='detail' ) if error: return error @@ -1101,7 +1127,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'file', file_key, self.ap, caller_plugin_identity + run_id, 'file', file_key, self.ap, caller_plugin_identity, operation='config' ) if error: return error @@ -1151,7 +1177,7 @@ class RuntimeConnectionHandler(handler.Handler): # Validate run authorization session, error = await _validate_run_authorization( - run_id, 'model', rerank_model_uuid, self.ap, caller_plugin_identity + run_id, 'model', rerank_model_uuid, self.ap, caller_plugin_identity, operation='rerank' ) if error: return error @@ -1335,7 +1361,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity + run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity, operation='retrieve' ) if error: return error @@ -1410,7 +1436,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity + run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity, operation='retrieve' ) if error: return error @@ -1524,6 +1550,7 @@ class RuntimeConnectionHandler(handler.Handler): limit=limit, direction=direction, include_artifacts=include_artifacts, + **_run_scope_filters(session), ) return handler.ActionResponse.success(data={ @@ -1589,6 +1616,7 @@ class RuntimeConnectionHandler(handler.Handler): query_text=query_text, filters=safe_filters, top_k=top_k, + **_run_scope_filters(session), ) return handler.ActionResponse.success(data={ @@ -1642,7 +1670,7 @@ class RuntimeConnectionHandler(handler.Handler): event_run_id = event.get('run_id') if event_run_id and event_run_id == run_id: return handler.ActionResponse.success(data=_project_event_record_for_api(event)) - if not session_conversation_id or event.get('conversation_id') != session_conversation_id: + if not session_conversation_id or not _event_matches_run_scope(session, event): return handler.ActionResponse.error( message=f'Event {event_id} is not accessible by this run' ) @@ -1707,6 +1735,7 @@ class RuntimeConnectionHandler(handler.Handler): event_types=event_types, before_seq=before_seq, limit=limit, + **_run_scope_filters(session), ) return handler.ActionResponse.success(data={ diff --git a/tests/unit_tests/agent/conftest.py b/tests/unit_tests/agent/conftest.py index e937f5db..d5a14a0f 100644 --- a/tests/unit_tests/agent/conftest.py +++ b/tests/unit_tests/agent/conftest.py @@ -77,6 +77,33 @@ def make_session( 'skill': {s.get('skill_name') for s in res.get('skills', [])}, 'file': {f.get('file_id') for f in res.get('files', [])}, } + authorized_operations: dict[str, dict[str, set[str]]] = { + 'model': { + m.get('model_id'): set(m.get('operations') or ['invoke', 'stream', 'rerank']) + for m in res.get('models', []) + if m.get('model_id') + }, + 'tool': { + t.get('tool_name'): set(t.get('operations') or ['detail', 'call']) + for t in res.get('tools', []) + if t.get('tool_name') + }, + 'knowledge_base': { + kb.get('kb_id'): set(kb.get('operations') or ['list', 'retrieve']) + for kb in res.get('knowledge_bases', []) + if kb.get('kb_id') + }, + 'skill': { + s.get('skill_name'): set(s.get('operations') or ['activate']) + for s in res.get('skills', []) + if s.get('skill_name') + }, + 'file': { + f.get('file_id'): set(f.get('operations') or ['config', 'knowledge']) + for f in res.get('files', []) + if f.get('file_id') + }, + } return { 'run_id': run_id, @@ -90,6 +117,7 @@ def make_session( 'state_policy': policy, 'state_context': context, 'authorized_ids': authorized_ids, + 'authorized_operations': authorized_operations, }, 'status': { 'started_at': now, diff --git a/tests/unit_tests/agent/test_chat_handler.py b/tests/unit_tests/agent/test_chat_handler.py index c3f62550..88e3b4f2 100644 --- a/tests/unit_tests/agent/test_chat_handler.py +++ b/tests/unit_tests/agent/test_chat_handler.py @@ -129,6 +129,9 @@ class MockAgentRunOrchestrator: for chunk in self._chunks: yield chunk + async def try_claim_steering_from_query(self, query): + return False + def resolve_runner_id_for_telemetry(self, query): return 'plugin:langbot/local-agent/default' @@ -240,7 +243,7 @@ class TestConfigMigrationInChatHandler: assert runner_id == 'plugin:langbot/local-agent/default' def test_resolve_runner_id_from_old_format(self): - """ConfigMigration should not resolve removed runner aliases.""" + """ConfigMigration resolves old runner aliases for compatibility.""" pipeline_config = { 'ai': { 'runner': { @@ -250,7 +253,7 @@ class TestConfigMigrationInChatHandler: } runner_id = ConfigMigration.resolve_runner_id(pipeline_config) - assert runner_id is None + assert runner_id == 'plugin:langbot/local-agent/default' class TestErrorHandling: diff --git a/tests/unit_tests/agent/test_config_migration.py b/tests/unit_tests/agent/test_config_migration.py index 24393a84..576bdfca 100644 --- a/tests/unit_tests/agent/test_config_migration.py +++ b/tests/unit_tests/agent/test_config_migration.py @@ -20,7 +20,7 @@ class TestResolveRunnerId: runner_id = ConfigMigration.resolve_runner_id(pipeline_config) assert runner_id == 'plugin:langbot/local-agent/default' - def test_does_not_resolve_old_runner_field(self): + def test_resolves_old_runner_field(self): pipeline_config = { 'ai': { 'runner': { @@ -30,7 +30,7 @@ class TestResolveRunnerId: } runner_id = ConfigMigration.resolve_runner_id(pipeline_config) - assert runner_id is None + assert runner_id == 'plugin:langbot/local-agent/default' def test_resolve_no_runner_config(self): runner_id = ConfigMigration.resolve_runner_id({}) @@ -58,7 +58,7 @@ class TestResolveRunnerConfig: ) assert config == {'model': 'uuid-123', 'custom_option': 10} - def test_does_not_read_old_runner_block(self): + def test_reads_old_runner_block(self): pipeline_config = { 'ai': { 'local-agent': { @@ -71,7 +71,7 @@ class TestResolveRunnerConfig: pipeline_config, 'plugin:langbot/local-agent/default', ) - assert config == {} + assert config == {'model': {'primary': 'uuid-123', 'fallbacks': []}} def test_resolve_no_config(self): config = ConfigMigration.resolve_runner_config( @@ -138,15 +138,20 @@ class TestNormalizePipelineConfig: assert migrated['ai']['runner']['id'] == 'plugin:test/my-runner/default' assert migrated['ai']['runner_config']['plugin:test/my-runner/default']['custom-option'] == 20 - def test_does_not_migrate_old_runner_blocks(self): + def test_migrates_old_runner_blocks(self): config = { 'ai': { 'runner': {'runner': 'local-agent'}, - 'local-agent': {'model': 'old-model'}, + 'local-agent': {'model': 'old-model', 'knowledge-base': 'kb_1'}, }, } migrated = ConfigMigration.migrate_pipeline_config(config) - assert 'id' not in migrated['ai']['runner'] - assert migrated['ai']['local-agent'] == {'model': 'old-model'} + assert migrated['ai']['runner']['id'] == 'plugin:langbot/local-agent/default' + assert 'runner' not in migrated['ai']['runner'] + assert 'local-agent' not in migrated['ai'] + assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default'] == { + 'model': {'primary': 'old-model', 'fallbacks': []}, + 'knowledge-bases': ['kb_1'], + } diff --git a/tests/unit_tests/agent/test_config_migration_full.py b/tests/unit_tests/agent/test_config_migration_full.py index cb3519bb..ecff0ff5 100644 --- a/tests/unit_tests/agent/test_config_migration_full.py +++ b/tests/unit_tests/agent/test_config_migration_full.py @@ -31,7 +31,7 @@ class TestMigratePipelineConfig: assert migrated['ai']['runner']['id'] == 'plugin:langbot/local-agent/default' assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default']['custom-option'] == 10 - def test_old_runner_field_is_not_mapped(self): + def test_old_runner_field_is_mapped(self): config = { 'ai': { 'runner': { @@ -47,11 +47,13 @@ class TestMigratePipelineConfig: migrated = ConfigMigration.migrate_pipeline_config(config) assert migrated['ai']['runner'] == { - 'runner': 'local-agent', 'expire-time': 3600, + 'id': 'plugin:langbot/local-agent/default', } - assert migrated['ai']['runner_config'] == {} - assert migrated['ai']['local-agent'] == {'model': 'old-model'} + assert migrated['ai']['runner_config']['plugin:langbot/local-agent/default'] == { + 'model': {'primary': 'old-model', 'fallbacks': []}, + } + assert 'local-agent' not in migrated['ai'] def test_empty_config_is_unchanged(self): config = {} @@ -95,14 +97,14 @@ class TestResolveRunnerId: runner_id = ConfigMigration.resolve_runner_id(config) assert runner_id == 'plugin:test/my-runner/default' - def test_old_runner_field_is_ignored(self): + def test_old_runner_field_is_mapped(self): config = { 'ai': { 'runner': {'runner': 'local-agent'}, }, } runner_id = ConfigMigration.resolve_runner_id(config) - assert runner_id is None + assert runner_id == 'plugin:langbot/local-agent/default' class TestResolveRunnerConfig: @@ -119,11 +121,11 @@ class TestResolveRunnerConfig: runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default') assert runner_config['custom-option'] == 20 - def test_old_runner_block_is_ignored(self): + def test_old_runner_block_is_read(self): config = { 'ai': { 'local-agent': {'custom-option': 20}, }, } runner_config = ConfigMigration.resolve_runner_config(config, 'plugin:langbot/local-agent/default') - assert runner_config == {} + assert runner_config == {'custom-option': 20} diff --git a/tests/unit_tests/agent/test_handler_auth.py b/tests/unit_tests/agent/test_handler_auth.py index 30a7ff84..d8cfc482 100644 --- a/tests/unit_tests/agent/test_handler_auth.py +++ b/tests/unit_tests/agent/test_handler_auth.py @@ -576,8 +576,8 @@ class TestRETRIEVEKNOWLEDGEBASEBugFix: assert 'kb_custom' in allowed_kbs - def test_retrieve_kb_ignores_old_runner_format(self): - """Old runner format is not resolved by current AgentRunner helpers.""" + def test_retrieve_kb_reads_old_runner_format(self): + """Old runner format is resolved for migration compatibility.""" from langbot.pkg.agent.runner.config_migration import ConfigMigration pipeline_config = { @@ -585,11 +585,16 @@ class TestRETRIEVEKNOWLEDGEBASEBugFix: 'runner': { 'runner': 'local-agent', }, + 'local-agent': { + 'knowledge-bases': ['kb_legacy'], + }, }, } runner_id = ConfigMigration.resolve_runner_id(pipeline_config) - assert runner_id is None + runner_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id) + assert runner_id == 'plugin:langbot/local-agent/default' + assert runner_config.get('knowledge-bases') == ['kb_legacy'] class TestHandlerActionAuthorization: @@ -1870,6 +1875,42 @@ class TestFilePermissionValidation: await registry.unregister('run_file_denied') +class TestOperationPermissionValidation: + """Tests operation-level Host-side run authorization.""" + + @pytest.mark.asyncio + async def test_model_operation_denied_when_resource_only_allows_invoke(self): + from langbot.pkg.agent.runner.session_registry import get_session_registry + from langbot.pkg.plugin.handler import _validate_run_authorization + + registry = get_session_registry() + await registry.register( + run_id='run_model_operation_denied', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=make_resources(models=[{'model_id': 'model_001', 'operations': ['invoke']}]), + ) + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + + session, error = await _validate_run_authorization( + 'run_model_operation_denied', + 'model', + 'model_001', + mock_ap, + caller_plugin_identity='test/runner', + operation='stream', + ) + + assert session is None + assert error is not None + assert 'operation stream' in error.message + + await registry.unregister('run_model_operation_denied') + + class TestCallerPluginIdentityValidation: """Tests for caller_plugin_identity cross-plugin validation. diff --git a/tests/unit_tests/agent/test_history_event_api_auth.py b/tests/unit_tests/agent/test_history_event_api_auth.py index 542dad76..ab5392d3 100644 --- a/tests/unit_tests/agent/test_history_event_api_auth.py +++ b/tests/unit_tests/agent/test_history_event_api_auth.py @@ -64,6 +64,9 @@ async def _register_session( *, run_id='run_1', conversation_id='conv_1', + bot_id=None, + workspace_id=None, + thread_id=None, available_apis=None, ): await session_registry.register( @@ -73,6 +76,9 @@ async def _register_session( plugin_identity='test/runner', resources=make_resources(), conversation_id=conversation_id, + bot_id=bot_id, + workspace_id=workspace_id, + thread_id=thread_id, available_apis=available_apis or {}, ) @@ -220,3 +226,98 @@ async def test_event_page_returns_sdk_page_projection(session_registry, db_engin assert 'input_json' not in item assert 'run_id' not in item assert 'runner_id' not in item + + +@pytest.mark.asyncio +async def test_history_page_filters_run_scope_thread_and_bot(session_registry, db_engine): + from langbot.pkg.agent.runner.transcript_store import TranscriptStore + + await _register_session( + session_registry, + bot_id='bot_1', + thread_id='thread_1', + available_apis={'history_page': True}, + ) + store = TranscriptStore(db_engine) + await store.append_transcript( + transcript_id='tr_visible', + event_id='evt_visible', + conversation_id='conv_1', + role='user', + bot_id='bot_1', + thread_id='thread_1', + content='visible', + ) + await store.append_transcript( + transcript_id='tr_other_bot', + event_id='evt_other_bot', + conversation_id='conv_1', + role='user', + bot_id='bot_2', + thread_id='thread_1', + content='hidden bot', + ) + await store.append_transcript( + transcript_id='tr_other_thread', + event_id='evt_other_thread', + conversation_id='conv_1', + role='user', + bot_id='bot_1', + thread_id='thread_2', + content='hidden thread', + ) + handler = _handler(db_engine, session_registry) + history_page = handler.actions[PluginToRuntimeAction.HISTORY_PAGE.value] + + result = await history_page({ + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + }) + + assert result.code == 0 + assert [item['content'] for item in result.data['items']] == ['visible'] + + +@pytest.mark.asyncio +async def test_event_page_filters_run_scope_thread_and_bot(session_registry, db_engine): + await _register_session( + session_registry, + bot_id='bot_1', + thread_id='thread_1', + available_apis={'event_page': True}, + ) + store = EventLogStore(db_engine) + await store.append_event( + event_id='evt_visible', + event_type='message.received', + source='platform', + bot_id='bot_1', + conversation_id='conv_1', + thread_id='thread_1', + ) + await store.append_event( + event_id='evt_other_bot', + event_type='message.received', + source='platform', + bot_id='bot_2', + conversation_id='conv_1', + thread_id='thread_1', + ) + await store.append_event( + event_id='evt_other_thread', + event_type='message.received', + source='platform', + bot_id='bot_1', + conversation_id='conv_1', + thread_id='thread_2', + ) + handler = _handler(db_engine, session_registry) + event_page = handler.actions[PluginToRuntimeAction.EVENT_PAGE.value] + + result = await event_page({ + 'run_id': 'run_1', + 'caller_plugin_identity': 'test/runner', + }) + + assert result.code == 0 + assert [item['event_id'] for item in result.data['items']] == ['evt_visible'] diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index 389fec91..569bb81b 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -427,6 +427,37 @@ async def test_orchestrator_streams_fake_plugin_deltas(clean_agent_state): assert [chunk.content for chunk in chunks] == ["hel", "hello"] +@pytest.mark.asyncio +async def test_orchestrator_persists_run_completed_message_transcript(clean_agent_state): + """run.completed(message=...) should be treated as the final assistant transcript.""" + from langbot.pkg.agent.runner.transcript_store import TranscriptStore + + db_engine = clean_agent_state + descriptor = make_descriptor() + plugin_connector = FakePluginConnector( + results=[ + { + "type": "run.completed", + "data": { + "finish_reason": "stop", + "message": {"role": "assistant", "content": "final response"}, + }, + }, + ] + ) + orchestrator = AgentRunOrchestrator(FakeApplication(plugin_connector, db_engine), FakeRegistry(descriptor)) + query = make_query() + + messages = [message async for message in orchestrator.run_from_query(query)] + + assert [message.content for message in messages] == ["final response"] + transcript_store = TranscriptStore(db_engine) + transcripts, _, _, _ = await transcript_store.page_transcript(query.session.using_conversation.uuid, limit=10) + assistant_items = [item for item in transcripts if item["role"] == "assistant"] + assert len(assistant_items) == 1 + assert assistant_items[0]["content"] == "final response" + + @pytest.mark.asyncio async def test_orchestrator_drops_duplicate_result_sequence(clean_agent_state): """Duplicate runner result sequences are idempotently ignored.""" @@ -560,6 +591,14 @@ class TestQueryEntrySessionQueryId: ap = FakeApplication(plugin_connector, db_engine) orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) query = make_query() + query.user_message = provider_message.Message( + role="user", + content=[ + provider_message.ContentElement.from_text("hello"), + provider_message.ContentElement.from_image_base64("data:image/png;base64,aGVsbG8="), + provider_message.ContentElement.from_file_base64("data:text/plain;base64,aGVsbG8=", "hello.txt"), + ], + ) messages = [message async for message in orchestrator.run_from_query(query)] @@ -815,6 +854,13 @@ class TestQueryEntryAdapterHostCapabilities: ap = FakeApplication(plugin_connector, db_engine) orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) query = make_query() + query.user_message = provider_message.Message( + role="user", + content=[ + provider_message.ContentElement.from_text("hello"), + provider_message.ContentElement.from_image_base64("data:image/png;base64,aGVsbG8="), + ], + ) messages = [message async for message in orchestrator.run_from_query(query)] @@ -896,6 +942,13 @@ class TestQueryEntryAdapterHostCapabilities: ap = FakeApplication(plugin_connector, db_engine) orchestrator = AgentRunOrchestrator(ap, FakeRegistry(descriptor)) query = make_query() + query.user_message = provider_message.Message( + role="user", + content=[ + provider_message.ContentElement.from_text("hello"), + provider_message.ContentElement.from_image_base64("data:image/png;base64,aGVsbG8="), + ], + ) messages = [message async for message in orchestrator.run_from_query(query)] @@ -910,18 +963,26 @@ class TestQueryEntryAdapterHostCapabilities: assert len(event_logs) >= 1 # First event should be the incoming message.received assert event_logs[0]["event_type"] == "message.received" + assert event_logs[0]["input_json"]["contents"][1]["image_base64"] is None + assert event_logs[0]["input_json"]["contents"][1]["content_redacted"] is True + assert "aGVsbG8=" not in str(event_logs[0]["input_json"]) # Check Transcript has user and assistant messages transcript_store = TranscriptStore(db_engine) transcripts, _, _, _ = await transcript_store.page_transcript( conversation_id=query.session.using_conversation.uuid, limit=10, + include_artifacts=True, ) assert len(transcripts) >= 2 # Find user and assistant messages roles = [t["role"] for t in transcripts] assert "user" in roles assert "assistant" in roles + user_item = next(t for t in transcripts if t["role"] == "user") + assert user_item["content_json"]["content"][1]["image_base64"] is None + assert user_item["artifact_refs"][0]["content"] is None + assert "aGVsbG8=" not in str(user_item) @pytest.mark.asyncio async def test_artifact_created_via_event_first_path(self, clean_agent_state): diff --git a/tests/unit_tests/agent/test_resource_builder.py b/tests/unit_tests/agent/test_resource_builder.py index 6843438f..bf5453b8 100644 --- a/tests/unit_tests/agent/test_resource_builder.py +++ b/tests/unit_tests/agent/test_resource_builder.py @@ -138,10 +138,10 @@ async def test_build_models_authorizes_config_declared_llm_and_rerank_models(app resources = await build_resources(app, query, descriptor) assert resources['models'] == [ - {'model_id': 'primary', 'model_type': 'llm', 'provider': 'test-provider'}, - {'model_id': 'fallback', 'model_type': 'llm', 'provider': 'test-provider'}, - {'model_id': 'aux', 'model_type': 'llm', 'provider': 'aux-provider'}, - {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider'}, + {'model_id': 'primary', 'model_type': 'llm', 'provider': 'test-provider', 'operations': ['invoke', 'stream']}, + {'model_id': 'fallback', 'model_type': 'llm', 'provider': 'test-provider', 'operations': ['invoke', 'stream']}, + {'model_id': 'aux', 'model_type': 'llm', 'provider': 'aux-provider', 'operations': ['invoke', 'stream']}, + {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider', 'operations': ['rerank']}, ] @@ -188,8 +188,8 @@ async def test_build_models_authorizes_rerank_and_llm_refs_from_config(app): resources = await build_resources(app, query, descriptor) assert resources['models'] == [ - {'model_id': 'llm', 'model_type': 'llm', 'provider': 'test-provider'}, - {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider'}, + {'model_id': 'llm', 'model_type': 'llm', 'provider': 'test-provider', 'operations': ['invoke', 'stream']}, + {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider', 'operations': ['rerank']}, ] @@ -218,7 +218,7 @@ async def test_build_models_manifest_permission_narrows_binding(app): resources = await build_resources(app, query, descriptor) assert resources['models'] == [ - {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider'}, + {'model_id': 'rerank', 'model_type': 'rerank', 'provider': 'rerank-provider', 'operations': ['rerank']}, ] @@ -264,11 +264,13 @@ async def test_build_tools_authorizes_query_declared_tools(app): 'tool_name': 'qa_plugin_echo', 'tool_type': None, 'description': None, + 'operations': ['detail', 'call'], }, { 'tool_name': 'qa_mcp_echo', 'tool_type': None, 'description': None, + 'operations': ['detail', 'call'], }, ] @@ -320,8 +322,8 @@ async def test_build_knowledge_bases_unions_config_and_policy_grants(app): resources = await build_resources(app, query, descriptor) assert resources['knowledge_bases'] == [ - {'kb_id': 'kb_config', 'kb_name': 'name-kb_config', 'kb_type': 'default'}, - {'kb_id': 'kb_policy', 'kb_name': 'name-kb_policy', 'kb_type': 'default'}, + {'kb_id': 'kb_config', 'kb_name': 'name-kb_config', 'kb_type': 'default', 'operations': ['list', 'retrieve']}, + {'kb_id': 'kb_policy', 'kb_name': 'name-kb_policy', 'kb_type': 'default', 'operations': ['list', 'retrieve']}, ] @@ -347,6 +349,42 @@ async def test_build_knowledge_bases_manifest_permission_denies_binding_kbs(app) assert resources['knowledge_bases'] == [] +@pytest.mark.asyncio +async def test_build_files_authorizes_config_declared_file_fields(app): + descriptor = make_descriptor( + config_schema=[ + {'name': 'avatar', 'type': 'file'}, + {'name': 'references', 'type': 'array[file]'}, + ], + ) + query = make_query({ + 'avatar': {'file_key': 'plugin_config_avatar.png', 'mimetype': 'image/png'}, + 'references': [ + {'file_key': 'plugin_config_doc.txt', 'mimetype': 'text/plain', 'file_name': 'doc.txt'}, + {'file_key': 'plugin_config_doc.txt', 'mimetype': 'text/plain', 'file_name': 'doc.txt'}, + ], + }) + + resources = await build_resources(app, query, descriptor) + + assert resources['files'] == [ + { + 'file_id': 'plugin_config_avatar.png', + 'file_name': None, + 'mime_type': 'image/png', + 'source': 'config', + 'operations': ['config'], + }, + { + 'file_id': 'plugin_config_doc.txt', + 'file_name': 'doc.txt', + 'mime_type': 'text/plain', + 'source': 'config', + 'operations': ['config'], + }, + ] + + @pytest.mark.asyncio async def test_build_storage_intersects_manifest_and_binding_policy(app): descriptor = make_descriptor( diff --git a/tests/unit_tests/agent/test_session_registry.py b/tests/unit_tests/agent/test_session_registry.py index 37f0d15d..a6d7d7f8 100644 --- a/tests/unit_tests/agent/test_session_registry.py +++ b/tests/unit_tests/agent/test_session_registry.py @@ -330,6 +330,19 @@ class TestIsResourceAllowed: assert registry.is_resource_allowed(session, 'model', 'model_001') is True assert registry.is_resource_allowed(session, 'model', 'model_002') is True + def test_model_operation_denied(self): + """Model resources should enforce operation-level grants.""" + registry = AgentRunSessionRegistry() + resources = make_resources( + models=[ + {'model_id': 'model_001', 'operations': ['invoke']}, + ] + ) + session = make_session(resources=resources) + + assert registry.is_resource_allowed(session, 'model', 'model_001', 'invoke') is True + assert registry.is_resource_allowed(session, 'model', 'model_001', 'stream') is False + def test_model_not_allowed(self): """Model not in resources should be denied.""" registry = AgentRunSessionRegistry() @@ -360,6 +373,19 @@ class TestIsResourceAllowed: assert registry.is_resource_allowed(session, 'tool', 'web_search') is True assert registry.is_resource_allowed(session, 'tool', 'code_exec') is True + def test_tool_operation_denied(self): + """Tool resources should enforce detail/call grants.""" + registry = AgentRunSessionRegistry() + resources = make_resources( + tools=[ + {'tool_name': 'web_search', 'operations': ['detail']}, + ] + ) + session = make_session(resources=resources) + + assert registry.is_resource_allowed(session, 'tool', 'web_search', 'detail') is True + assert registry.is_resource_allowed(session, 'tool', 'web_search', 'call') is False + def test_tool_not_allowed(self): """Tool not in resources should be denied.""" registry = AgentRunSessionRegistry() diff --git a/tests/unit_tests/agent/test_state_api_auth.py b/tests/unit_tests/agent/test_state_api_auth.py index 4b92e070..5e1300f2 100644 --- a/tests/unit_tests/agent/test_state_api_auth.py +++ b/tests/unit_tests/agent/test_state_api_auth.py @@ -138,6 +138,7 @@ class TestStateAPIHandlerAuthorization: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': True, 'state_scopes': ['conversation']}, state_context={'scope_keys': {'conversation': 'conv_key'}, 'binding_identity': 'binding_1'}, ) @@ -173,6 +174,7 @@ class TestStateAPIHandlerAuthorization: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': True, 'state_scopes': ['conversation']}, state_context={'scope_keys': {'conversation': 'conv_key'}, 'binding_identity': 'binding_1'}, ) @@ -209,6 +211,7 @@ class TestStateAPIHandlerAuthorization: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': False, 'state_scopes': []}, state_context={'scope_keys': {}, 'binding_identity': 'binding_1'}, ) @@ -244,6 +247,7 @@ class TestStateAPIHandlerAuthorization: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': True, 'state_scopes': ['conversation']}, state_context={'scope_keys': {'conversation': 'conv_key', 'actor': 'actor_key'}, 'binding_identity': 'binding_1'}, ) @@ -280,6 +284,7 @@ class TestStateAPIHandlerAuthorization: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': True, 'state_scopes': ['conversation']}, state_context={'scope_keys': {}, 'binding_identity': 'binding_1'}, # No scope_keys ) @@ -320,6 +325,7 @@ class TestStateAPIFullFlowWithRealDB: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': True, 'state_scopes': ['conversation', 'runner']}, state_context={ 'scope_keys': { @@ -426,6 +432,7 @@ class TestStateHandlerReadsFromAuthorizationSnapshot: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': False, 'state_scopes': []}, state_context={'scope_keys': {}, 'binding_identity': 'binding_1'}, ) @@ -469,6 +476,7 @@ class TestStateHandlerReadsFromAuthorizationSnapshot: query_id=1, plugin_identity='test/runner', resources=make_resources(), + available_apis={'state': True}, state_policy={'enable_state': True, 'state_scopes': ['conversation']}, state_context={'scope_keys': {'conversation': 'conv_key_xyz'}, 'binding_identity': 'binding_xyz'}, ) diff --git a/tests/unit_tests/agent/test_state_store.py b/tests/unit_tests/agent/test_state_store.py index 68d6a2d5..668f4021 100644 --- a/tests/unit_tests/agent/test_state_store.py +++ b/tests/unit_tests/agent/test_state_store.py @@ -119,18 +119,16 @@ class TestStateScopeHelpers: thread_id='thread_001', ) - assert build_state_scope_key('conversation', event, binding, descriptor) == ( - 'conversation:plugin:test/my-runner/default:binding_a:conv_001:thread_001' - ) - assert build_state_scope_key('actor', event, binding, descriptor) == ( - 'actor:plugin:test/my-runner/default:binding_a:user:user_001' - ) - assert build_state_scope_key('subject', event, binding, descriptor) == ( - 'subject:plugin:test/my-runner/default:binding_a:message:msg_001' - ) - assert build_state_scope_key('runner', event, binding, descriptor) == ( - 'runner:plugin:test/my-runner/default:binding_a' - ) + keys = { + scope: build_state_scope_key(scope, event, binding, descriptor) + for scope in VALID_STATE_SCOPES + } + + assert keys['conversation'].startswith('conversation:v2:') + assert keys['actor'].startswith('actor:v2:') + assert keys['subject'].startswith('subject:v2:') + assert keys['runner'].startswith('runner:v2:') + assert len(set(keys.values())) == len(keys) def test_scope_key_missing_identity_returns_none(self): descriptor = make_descriptor() diff --git a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx index 6177c96f..9c119474 100644 --- a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx +++ b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx @@ -344,6 +344,26 @@ export default function PipelineFormComponent({ } } + function handleRunnerConfigEmit(stageName: string, values: object) { + const stageKey = `ai.runner_config.${stageName}`; + const isFirstEmission = !initializedStagesRef.current.has(stageKey); + + const currentRunnerConfigs = + (form.getValues('ai.runner_config') as Record) || {}; + form.setValue('ai.runner_config', { + ...currentRunnerConfigs, + [stageName]: values, + }); + + if (isFirstEmission) { + initializedStagesRef.current.add(stageKey); + const currentSnapshot = JSON.stringify(form.getValues()); + if (savedSnapshotRef.current === '' || !hasUnsavedChangesRef.current) { + savedSnapshotRef.current = currentSnapshot; + } + } + } + function renderDynamicForms( stage: PipelineConfigStage, formName: keyof FormValues, @@ -404,6 +424,10 @@ export default function PipelineFormComponent({ const isPluginRunner = currentRunner && currentRunner.startsWith('plugin:'); + const stageSystemContext = + stage.name === 'plugin:langbot/local-agent/default' + ? { box_available: boxAvailable } + : undefined; if (isPluginRunner) { const runnerConfigs = (form.watch('ai.runner_config') as any) || {}; const stageInitialValues = runnerConfigs[stage.name] || {}; @@ -429,20 +453,7 @@ export default function PipelineFormComponent({ itemConfigList={stage.config} initialValues={effectiveInitialValues} onSubmit={(values) => { - // Store in ai.runner_config[stage.name] - - const currentRunnerConfigs = - (form.getValues('ai.runner_config') as any) || {}; - form.setValue('ai.runner_config', { - ...currentRunnerConfigs, - [stage.name]: values, - }); - // Mark as initialized - const stageKey = `ai.runner_config.${stage.name}`; - if (!initializedStagesRef.current.has(stageKey)) { - initializedStagesRef.current.add(stageKey); - savedSnapshotRef.current = JSON.stringify(form.getValues()); - } + handleRunnerConfigEmit(stage.name, values); }} systemContext={stageSystemContext} />