From 6e982ff49d31728912a17ea818c202ba8459b3a3 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Wed, 13 May 2026 10:36:47 +0800 Subject: [PATCH] feat(plugin): implement INVOKE_RERANK handler with run-scoped authorization - Add invoke_rerank action handler in plugin handler - Validate rerank model access via run session - Cap documents at 64 for API limit - Return sorted results by relevance score --- .../pkg/agent/runner/session_registry.py | 7 +- src/langbot/pkg/plugin/handler.py | 182 ++++++-- .../templates/default-pipeline-config.json | 53 +-- tests/unit_tests/agent/conftest.py | 5 +- tests/unit_tests/agent/test_handler_auth.py | 402 +++++++++++++++++- .../unit_tests/agent/test_session_registry.py | 33 ++ .../pipeline-form/PipelineFormComponent.tsx | 5 +- 7 files changed, 586 insertions(+), 101 deletions(-) diff --git a/src/langbot/pkg/agent/runner/session_registry.py b/src/langbot/pkg/agent/runner/session_registry.py index c4ee441a..76b0f46e 100644 --- a/src/langbot/pkg/agent/runner/session_registry.py +++ b/src/langbot/pkg/agent/runner/session_registry.py @@ -81,6 +81,7 @@ class AgentRunSessionRegistry: 'model': {m.get('model_id') for m in resources.get('models', [])}, 'tool': {t.get('tool_name') for t in resources.get('tools', [])}, 'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])}, + 'file': {f.get('file_id') for f in resources.get('files', [])}, } session: AgentRunSession = { @@ -143,15 +144,15 @@ class AgentRunSessionRegistry: Args: session: AgentRunSession to check - resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage') - resource_id: Resource identifier (model_id, tool_name, kb_id) + resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage', 'file') + resource_id: Resource identifier (model_id, tool_name, kb_id, 'plugin'/'workspace', file_key) Returns: True if resource is authorized, False otherwise """ authorized_ids = session.get('_authorized_ids', {}) - if resource_type in ('model', 'tool', 'knowledge_base'): + if resource_type in ('model', 'tool', 'knowledge_base', 'file'): return resource_id in authorized_ids.get(resource_type, set()) if resource_type == 'storage': diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index e444a5c9..b21cc95c 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -46,17 +46,19 @@ async def _validate_run_authorization( resource_type: str, resource_id: str, ap: app.Application, + caller_plugin_identity: str | None = None, ) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]: """Validate run_id authorization for a resource access. Common validation logic for INVOKE_LLM, INVOKE_LLM_STREAM, CALL_TOOL, - RETRIEVE_KNOWLEDGE_BASE, and RETRIEVE_KNOWLEDGE actions. + RETRIEVE_KNOWLEDGE_BASE, RETRIEVE_KNOWLEDGE, and storage/file actions. Args: run_id: The run_id to validate. - resource_type: Resource type ('model', 'tool', 'knowledge_base'). - resource_id: Resource identifier (model_uuid, tool_name, kb_id). + resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage', 'file'). + resource_id: Resource identifier (model_uuid, tool_name, kb_id, 'plugin'/'workspace', file_key). ap: Application instance for logging. + caller_plugin_identity: Optional plugin identity (author/name) of the caller for cross-plugin validation. Returns: Tuple of (session, None) if validation passes. @@ -72,6 +74,18 @@ async def _validate_run_authorization( message=f'Run session {run_id} not found or expired', ) + # Validate caller_plugin_identity matches session's plugin_identity + if caller_plugin_identity: + session_plugin_identity = session.get('plugin_identity') + if session_plugin_identity and caller_plugin_identity != session_plugin_identity: + ap.logger.warning( + f'{resource_type.upper()}: caller_plugin_identity {caller_plugin_identity} ' + f'does not match session plugin_identity {session_plugin_identity}' + ) + return None, handler.ActionResponse.error( + message=f'Plugin identity mismatch: caller {caller_plugin_identity} is not authorized for run_id {run_id}', + ) + if not session_registry.is_resource_allowed(session, resource_type, resource_id): ap.logger.warning( f'{resource_type.upper()}: {resource_id} not allowed for run_id {run_id}' @@ -377,11 +391,12 @@ class RuntimeConnectionHandler(handler.Handler): funcs = data.get('funcs', []) extra_args = data.get('extra_args', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'model', llm_model_uuid, self.ap + run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity ) if error: return error @@ -428,11 +443,12 @@ class RuntimeConnectionHandler(handler.Handler): funcs = data.get('funcs', []) extra_args = data.get('extra_args', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'model', llm_model_uuid, self.ap + run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity ) if error: yield error @@ -476,13 +492,14 @@ class RuntimeConnectionHandler(handler.Handler): # Support 'tool_parameters' (LangBotAPIProxy) and 'parameters' (AgentRunAPIProxy) parameters = data.get('tool_parameters') or data.get('parameters', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation # session_data = data['session'] # query_id = data['query_id'] # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'tool', tool_name, self.ap + run_id, 'tool', tool_name, self.ap, caller_plugin_identity ) if error: return error @@ -511,20 +528,36 @@ class RuntimeConnectionHandler(handler.Handler): ) # ================= Binary Storage Handlers ================= - # NOTE: These are low-level actions called by SDK Runtime's storage wrapper handlers. - # Permission validation is handled in SDK Runtime layer (not here): - # - plugin_storage: SDK handler auto-sets owner to caller plugin identity (inherent isolation) - # - workspace_storage: SDK handler should validate session.resources.storage.workspace_storage - # TODO: SDK storage handlers need to pass run_id and validate workspace_storage permission. - # Current risk: workspace storage access is unrestricted from AgentRunner context. + # Permission validation: + # - For AgentRunner calls (with run_id): validates storage permission via session_registry + # - For regular plugin calls (no run_id): unrestricted access (backward compatibility) + # - Plugin storage: inherent isolation via owner = plugin identity (set by SDK runtime) + # - Workspace storage: requires ctx.resources.storage.workspace_storage for AgentRunner @self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE) async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: - """Set binary storage""" + """Set binary storage + + For AgentRunner calls: validates storage permission via session_registry. + For regular plugin calls: unrestricted access (backward compatibility). + """ key = data['key'] owner_type = data['owner_type'] owner = data['owner'] value = base64.b64decode(data['value_base64']) + run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + + # Permission validation for AgentRunner calls + if run_id: + # Determine storage type from owner_type + storage_type = owner_type # 'plugin' or 'workspace' + session, error = await _validate_run_authorization( + run_id, 'storage', storage_type, self.ap, caller_plugin_identity + ) + if error: + return error + max_value_bytes = ( self.ap.instance_config.data.get('plugin', {}) .get('binary_storage', {}) @@ -574,10 +607,25 @@ class RuntimeConnectionHandler(handler.Handler): @self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE) async def get_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: - """Get binary storage""" + """Get binary storage + + For AgentRunner calls: validates storage permission via session_registry. + For regular plugin calls: unrestricted access (backward compatibility). + """ key = data['key'] owner_type = data['owner_type'] owner = data['owner'] + run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + + # Permission validation for AgentRunner calls + if run_id: + storage_type = owner_type + session, error = await _validate_run_authorization( + run_id, 'storage', storage_type, self.ap, caller_plugin_identity + ) + if error: + return error result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_bstorage.BinaryStorage) @@ -600,10 +648,25 @@ class RuntimeConnectionHandler(handler.Handler): @self.action(RuntimeToLangBotAction.DELETE_BINARY_STORAGE) async def delete_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: - """Delete binary storage""" + """Delete binary storage + + For AgentRunner calls: validates storage permission via session_registry. + For regular plugin calls: unrestricted access (backward compatibility). + """ key = data['key'] owner_type = data['owner_type'] owner = data['owner'] + run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + + # Permission validation for AgentRunner calls + if run_id: + storage_type = owner_type + session, error = await _validate_run_authorization( + run_id, 'storage', storage_type, self.ap, caller_plugin_identity + ) + if error: + return error await self.ap.persistence_mgr.execute_async( sqlalchemy.delete(persistence_bstorage.BinaryStorage) @@ -618,9 +681,24 @@ class RuntimeConnectionHandler(handler.Handler): @self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE_KEYS) async def get_binary_storage_keys(data: dict[str, Any]) -> handler.ActionResponse: - """Get binary storage keys""" + """Get binary storage keys + + For AgentRunner calls: validates storage permission via session_registry. + For regular plugin calls: unrestricted access (backward compatibility). + """ owner_type = data['owner_type'] owner = data['owner'] + run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + + # Permission validation for AgentRunner calls + if run_id: + storage_type = owner_type + session, error = await _validate_run_authorization( + run_id, 'storage', storage_type, self.ap, caller_plugin_identity + ) + if error: + return error result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_bstorage.BinaryStorage.key) @@ -636,8 +714,22 @@ class RuntimeConnectionHandler(handler.Handler): @self.action(PluginToRuntimeAction.GET_CONFIG_FILE) async def get_config_file(data: dict[str, Any]) -> handler.ActionResponse: - """Get a config file by file key""" + """Get a config file by file key + + For AgentRunner calls: validates file_key against session.resources.files. + For regular plugin calls: unrestricted access (backward compatibility). + """ file_key = data['file_key'] + run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation + + # Permission validation for AgentRunner calls + if run_id: + session, error = await _validate_run_authorization( + run_id, 'file', file_key, self.ap, caller_plugin_identity + ) + if error: + return error try: # Load file from storage @@ -672,6 +764,50 @@ class RuntimeConnectionHandler(handler.Handler): except Exception as e: return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid) + @self.action(PluginToRuntimeAction.INVOKE_RERANK) + async def invoke_rerank(data: dict[str, Any]) -> handler.ActionResponse: + """Invoke rerank model for agent runner with run-scoped authorization.""" + run_id = data.get('run_id') + rerank_model_uuid = data['rerank_model_uuid'] + query = data['query'] + documents = data['documents'] + top_k = data.get('top_k') + + # Validate run authorization + session, error = await _validate_run_authorization( + run_id, 'model', rerank_model_uuid, self.ap + ) + if error: + return error + + # Get rerank model + rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid) + if rerank_model is None: + return handler.ActionResponse.error( + message=f'Rerank model with uuid {rerank_model_uuid} not found', + ) + + try: + # Cap documents at 64 for API limit + documents_capped = documents[:64] + + scores = await rerank_model.provider.invoke_rerank( + model=rerank_model, + query=query, + documents=documents_capped, + ) + + # Sort by relevance score descending + scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True) + + # Apply top_k if specified + if top_k is not None: + scored = scored[:top_k] + + return handler.ActionResponse.success(data={'results': scored}) + except Exception as e: + return _make_rag_error_response(e, 'RerankError', rerank_model_uuid=rerank_model_uuid) + @self.action(PluginToRuntimeAction.VECTOR_UPSERT) async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse: collection_id = data['collection_id'] @@ -817,11 +953,12 @@ class RuntimeConnectionHandler(handler.Handler): top_k = data.get('top_k', 5) filters = data.get('filters', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'knowledge_base', kb_id, self.ap + run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity ) if error: return error @@ -891,12 +1028,6 @@ class RuntimeConnectionHandler(handler.Handler): Note: This action has dual validation paths: - AgentRunner: uses session_registry for permission check - Regular plugin: uses ConfigMigration.resolve_runner_config for pipeline-level check - - SECURITY TODO: This handler cannot verify the caller's plugin identity. - The session contains 'plugin_identity' (author/name), but we don't have access - to which plugin is making the API call. This could allow a malicious plugin to - use another plugin's run_id if it can guess/obtain it. Future improvement: - track caller plugin identity in RuntimeConnectionHandler or pass it in action data. """ query_id = data['query_id'] kb_id = data['kb_id'] @@ -904,6 +1035,7 @@ class RuntimeConnectionHandler(handler.Handler): top_k = data.get('top_k', 5) filters = data.get('filters', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls + caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( @@ -915,7 +1047,7 @@ class RuntimeConnectionHandler(handler.Handler): # Permission validation for AgentRunner calls if run_id: session, error = await _validate_run_authorization( - run_id, 'knowledge_base', kb_id, self.ap + run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity ) if error: return error diff --git a/src/langbot/templates/default-pipeline-config.json b/src/langbot/templates/default-pipeline-config.json index fe6e2842..f72e0038 100644 --- a/src/langbot/templates/default-pipeline-config.json +++ b/src/langbot/templates/default-pipeline-config.json @@ -38,56 +38,11 @@ }, "ai": { "runner": { - "runner": "local-agent", + "id": "plugin:langbot/local-agent/default", "expire-time": 0 }, - "local-agent": { - "model": { - "primary": "", - "fallbacks": [] - }, - "max-round": 10, - "prompt": [ - { - "role": "system", - "content": "You are a helpful assistant." - } - ], - "knowledge-bases": [], - "rerank-model": "", - "rerank-top-k": 5 - }, - "dify-service-api": { - "base-url": "https://api.dify.ai/v1", - "app-type": "chat", - "api-key": "your-api-key", - "timeout": 30 - }, - "dashscope-app-api": { - "app-type": "agent", - "api-key": "your-api-key", - "app-id": "your-app-id", - "references-quote": "参考资料来自:" - }, - "n8n-service-api": { - "webhook-url": "http://your-n8n-webhook-url", - "auth-type": "none", - "basic-username": "", - "basic-password": "", - "jwt-secret": "", - "jwt-algorithm": "HS256", - "header-name": "", - "header-value": "", - "timeout": 120, - "output-key": "response" - }, - "langflow-api": { - "base-url": "http://localhost:7860", - "api-key": "your-api-key", - "flow-id": "your-flow-id", - "input-type": "chat", - "output-type": "chat", - "tweaks": "{}" + "runner_config": { + "plugin:langbot/local-agent/default": {} } }, "output": { @@ -109,4 +64,4 @@ "remove-think": false } } -} +} \ No newline at end of file diff --git a/tests/unit_tests/agent/conftest.py b/tests/unit_tests/agent/conftest.py index 030f8b27..85c12478 100644 --- a/tests/unit_tests/agent/conftest.py +++ b/tests/unit_tests/agent/conftest.py @@ -9,6 +9,7 @@ def make_resources( tools: list[dict] | None = None, knowledge_bases: list[dict] | None = None, storage: dict | None = None, + files: list[dict] | None = None, ) -> dict[str, typing.Any]: """Create a minimal AgentResources dict for testing. @@ -17,6 +18,7 @@ def make_resources( tools: List of tool dicts with 'tool_name' key knowledge_bases: List of KB dicts with 'kb_id' key storage: Storage permissions dict + files: List of file dicts with 'file_id' key Returns: AgentResources dict with all required fields @@ -25,7 +27,7 @@ def make_resources( 'models': models or [], 'tools': tools or [], 'knowledge_bases': knowledge_bases or [], - 'files': [], + 'files': files or [], 'storage': storage or {'plugin_storage': False, 'workspace_storage': False}, 'platform_capabilities': {}, } @@ -59,6 +61,7 @@ def make_session( 'model': {m.get('model_id') for m in res.get('models', [])}, 'tool': {t.get('tool_name') for t in res.get('tools', [])}, 'knowledge_base': {kb.get('kb_id') for kb in res.get('knowledge_bases', [])}, + 'file': {f.get('file_id') for f in res.get('files', [])}, } return { diff --git a/tests/unit_tests/agent/test_handler_auth.py b/tests/unit_tests/agent/test_handler_auth.py index 33986ad4..d1be33a1 100644 --- a/tests/unit_tests/agent/test_handler_auth.py +++ b/tests/unit_tests/agent/test_handler_auth.py @@ -1485,29 +1485,31 @@ class TestStorageResourcePermissionHelper: class TestFilesResourcePermission: """Tests for session_registry.is_resource_allowed for files resource type. - Note: The current implementation does not have 'files' resource type - in is_resource_allowed. This test class documents the expected behavior - when files resource type is implemented. + Phase 6: 'files' resource type is now implemented in is_resource_allowed. """ - def test_files_resource_type_not_implemented(self): - """Currently, 'files' resource type returns False in is_resource_allowed.""" - registry = AgentRunSessionRegistry() + @pytest.mark.asyncio + async def test_files_resource_type_now_implemented(self): + """'files' resource type is now implemented in is_resource_allowed.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(files=[{'file_id': 'file_001'}]) - session = { - 'run_id': 'test', - 'runner_id': 'test', - 'query_id': 1, - 'plugin_identity': 'test', - 'resources': { - 'files': [{'file_id': 'file_001'}], - }, - 'status': {'started_at': 0, 'last_activity_at': 0}, - } + await registry.register( + run_id='run_files_implemented', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) - # 'files' resource type is not implemented in is_resource_allowed - # It returns False for unknown resource types - assert registry.is_resource_allowed(session, 'files', 'file_001') is False + session = await registry.get('run_files_implemented') + + # 'files' resource type is now implemented + assert registry.is_resource_allowed(session, 'file', 'file_001') is True + assert registry.is_resource_allowed(session, 'file', 'file_999') is False + + await registry.unregister('run_files_implemented') class TestRealActionHandlerSimulation: @@ -1604,7 +1606,7 @@ class TestRealActionHandlerSimulation: # Try to validate with non-existent run_id session, error = await _validate_run_authorization( - 'run_nonexistent_session_sim', + 'run_nonexistent_session_flow', 'model', 'model_001', mock_ap @@ -1614,4 +1616,362 @@ class TestRealActionHandlerSimulation: assert session is None assert error is not None assert 'not found' in error.message.lower() - assert mock_ap.logger.warning.called \ No newline at end of file + assert mock_ap.logger.warning.called + + +class TestStoragePermissionValidation: + """Tests for Host-side storage permission validation via _validate_run_authorization. + + Phase 6: Storage actions (SET/GET/DELETE_BINARY_STORAGE) now validate + storage permissions via _validate_run_authorization when run_id is present. + """ + + @pytest.mark.asyncio + async def test_plugin_storage_allowed_when_permitted(self): + """_validate_run_authorization allows 'plugin' storage when permitted.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(storage={'plugin_storage': True, 'workspace_storage': False}) + + await registry.register( + run_id='run_plugin_storage_auth', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + + session, error = await _validate_run_authorization( + 'run_plugin_storage_auth', + 'storage', + 'plugin', + mock_ap + ) + + assert session is not None + assert error is None + + await registry.unregister('run_plugin_storage_auth') + + @pytest.mark.asyncio + async def test_plugin_storage_denied_when_not_permitted(self): + """_validate_run_authorization denies 'plugin' storage when not permitted.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': False}) + + await registry.register( + run_id='run_plugin_storage_denied', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + mock_ap.logger.warning = MagicMock() + + session, error = await _validate_run_authorization( + 'run_plugin_storage_denied', + 'storage', + 'plugin', + mock_ap + ) + + assert session is None + assert error is not None + assert 'not authorized' in error.message.lower() + + await registry.unregister('run_plugin_storage_denied') + + @pytest.mark.asyncio + async def test_workspace_storage_allowed_when_permitted(self): + """_validate_run_authorization allows 'workspace' storage when permitted.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': True}) + + await registry.register( + run_id='run_workspace_storage_auth', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + + session, error = await _validate_run_authorization( + 'run_workspace_storage_auth', + 'storage', + 'workspace', + mock_ap + ) + + assert session is not None + assert error is None + + await registry.unregister('run_workspace_storage_auth') + + @pytest.mark.asyncio + async def test_workspace_storage_denied_when_not_permitted(self): + """_validate_run_authorization denies 'workspace' storage when not permitted.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': False}) + + await registry.register( + run_id='run_workspace_storage_denied', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + mock_ap.logger.warning = MagicMock() + + session, error = await _validate_run_authorization( + 'run_workspace_storage_denied', + 'storage', + 'workspace', + mock_ap + ) + + assert session is None + assert error is not None + assert 'not authorized' in error.message.lower() + + await registry.unregister('run_workspace_storage_denied') + + +class TestFilePermissionValidation: + """Tests for Host-side file permission validation via _validate_run_authorization. + + Phase 6: GET_CONFIG_FILE action now validates file permissions + via _validate_run_authorization when run_id is present. + """ + + @pytest.mark.asyncio + async def test_file_allowed_when_in_resources(self): + """_validate_run_authorization allows file when in resources.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(files=[{'file_id': 'file_001'}]) + + await registry.register( + run_id='run_file_auth', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + + session, error = await _validate_run_authorization( + 'run_file_auth', + 'file', + 'file_001', + mock_ap + ) + + assert session is not None + assert error is None + + await registry.unregister('run_file_auth') + + @pytest.mark.asyncio + async def test_file_denied_when_not_in_resources(self): + """_validate_run_authorization denies file when not in resources.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(files=[{'file_id': 'file_001'}]) + + await registry.register( + run_id='run_file_denied', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + mock_ap.logger.warning = MagicMock() + + session, error = await _validate_run_authorization( + 'run_file_denied', + 'file', + 'file_999', # Not in resources + mock_ap + ) + + assert session is None + assert error is not None + assert 'not authorized' in error.message.lower() + + await registry.unregister('run_file_denied') + + +class TestCallerPluginIdentityValidation: + """Tests for caller_plugin_identity cross-plugin validation. + + Phase 6: _validate_run_authorization now validates that the caller plugin + identity matches the session's plugin_identity, preventing cross-plugin + unauthorized access if one plugin tries to use another's run_id. + """ + + @pytest.mark.asyncio + async def test_same_plugin_identity_allowed(self): + """_validate_run_authorization allows when caller matches session.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(models=[{'model_id': 'model_001'}]) + + await registry.register( + run_id='run_identity_match', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', # Session owner + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + + session, error = await _validate_run_authorization( + 'run_identity_match', + 'model', + 'model_001', + mock_ap, + caller_plugin_identity='test/runner', # Caller is same plugin + ) + + assert session is not None + assert error is None + + await registry.unregister('run_identity_match') + + @pytest.mark.asyncio + async def test_different_plugin_identity_denied(self): + """_validate_run_authorization denies when caller differs from session.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(models=[{'model_id': 'model_001'}]) + + await registry.register( + run_id='run_identity_mismatch', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', # Session owner + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + mock_ap.logger.warning = MagicMock() + + session, error = await _validate_run_authorization( + 'run_identity_mismatch', + 'model', + 'model_001', + mock_ap, + caller_plugin_identity='other/plugin', # Different plugin trying to use run_id + ) + + assert session is None + assert error is not None + assert 'mismatch' in error.message.lower() + + await registry.unregister('run_identity_mismatch') + + @pytest.mark.asyncio + async def test_no_caller_identity_allowed(self): + """_validate_run_authorization allows when caller_plugin_identity not provided.""" + # Backward compatibility: if caller_plugin_identity is None, skip identity check + from langbot.pkg.agent.runner.session_registry import get_session_registry + registry = get_session_registry() + resources = make_resources(models=[{'model_id': 'model_001'}]) + + await registry.register( + run_id='run_no_caller_identity', + runner_id='plugin:test/runner/default', + query_id=1, + plugin_identity='test/runner', + resources=resources, + ) + + from langbot.pkg.plugin.handler import _validate_run_authorization + + mock_ap = MagicMock() + mock_ap.logger = MagicMock() + + # caller_plugin_identity not provided (None) + session, error = await _validate_run_authorization( + 'run_no_caller_identity', + 'model', + 'model_001', + mock_ap, + caller_plugin_identity=None, # Not provided + ) + + # Should pass (backward compat) + assert session is not None + assert error is None + + await registry.unregister('run_no_caller_identity') + + +class TestBackwardCompatStorageNoRunId: + """Tests for backward compatibility: storage actions without run_id. + + Regular plugins (non-AgentRunner) don't have run_id and should + have unrestricted access to storage APIs. + """ + + def test_storage_no_run_id_skips_validation(self): + """Storage actions without run_id skip Host-side validation.""" + # Handler.py: if run_id: ...validation... + # When run_id is None, validation is skipped + run_id = None + + # Simulate handler logic + if run_id: + raise AssertionError('Should not execute validation') + + # Storage access unrestricted for regular plugins + assert run_id is None + + def test_file_no_run_id_skips_validation(self): + """GET_CONFIG_FILE without run_id skips Host-side validation.""" + run_id = None + + if run_id: + raise AssertionError('Should not execute validation') + + # File access unrestricted for regular plugins + assert run_id is None \ No newline at end of file diff --git a/tests/unit_tests/agent/test_session_registry.py b/tests/unit_tests/agent/test_session_registry.py index 04ce3016..415c2188 100644 --- a/tests/unit_tests/agent/test_session_registry.py +++ b/tests/unit_tests/agent/test_session_registry.py @@ -108,6 +108,7 @@ class TestSessionRegistryBasic: 'model': set(), 'tool': set(), 'knowledge_base': set(), + 'file': set(), }, } @@ -169,6 +170,7 @@ class TestSessionRegistryBasic: 'model': set(), 'tool': set(), 'knowledge_base': set(), + 'file': set(), }, } new_session: AgentRunSession = { @@ -185,6 +187,7 @@ class TestSessionRegistryBasic: 'model': set(), 'tool': set(), 'knowledge_base': set(), + 'file': set(), }, } @@ -328,6 +331,36 @@ class TestIsResourceAllowed: assert registry.is_resource_allowed(session, 'unknown_type', 'something') is False + def test_file_allowed(self): + """File in resources should be allowed.""" + registry = AgentRunSessionRegistry() + resources = make_resources( + files=[ + {'file_id': 'file_001'}, + {'file_id': 'file_002'}, + ] + ) + session = make_session(resources=resources) + + assert registry.is_resource_allowed(session, 'file', 'file_001') is True + assert registry.is_resource_allowed(session, 'file', 'file_002') is True + + def test_file_not_allowed(self): + """File not in resources should be denied.""" + registry = AgentRunSessionRegistry() + resources = make_resources(files=[{'file_id': 'file_001'}]) + session = make_session(resources=resources) + + assert registry.is_resource_allowed(session, 'file', 'file_999') is False + + def test_file_empty_resources(self): + """Empty files list should deny all.""" + registry = AgentRunSessionRegistry() + resources = make_resources(files=[]) + session = make_session(resources=resources) + + assert registry.is_resource_allowed(session, 'file', 'file_001') is False + def test_missing_resources_field(self): """Missing resources field should not raise.""" registry = AgentRunSessionRegistry() diff --git a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx index 7e89e5bf..a40ef67c 100644 --- a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx +++ b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx @@ -374,8 +374,9 @@ export default function PipelineFormComponent({ return null; } - // For n8n runner config, use N8nAuthFormComponent for form linkage - if (stage.name === 'n8n-service-api' || stage.name === 'plugin:langbot/n8n-agent/default') { + // For old n8n built-in runner config, use N8nAuthFormComponent for form linkage + // New plugin:langbot/n8n-agent/default follows normal plugin runner path below + if (stage.name === 'n8n-service-api') { return (