feat(plugin): implement INVOKE_RERANK handler with run-scoped authorization

- Add invoke_rerank action handler in plugin handler
- Validate rerank model access via run session
- Cap documents at 64 for API limit
- Return sorted results by relevance score
This commit is contained in:
huanghuoguoguo
2026-05-13 10:36:47 +08:00
parent b220cf02e5
commit 6e982ff49d
7 changed files with 586 additions and 101 deletions

View File

@@ -81,6 +81,7 @@ class AgentRunSessionRegistry:
'model': {m.get('model_id') for m in resources.get('models', [])}, 'model': {m.get('model_id') for m in resources.get('models', [])},
'tool': {t.get('tool_name') for t in resources.get('tools', [])}, 'tool': {t.get('tool_name') for t in resources.get('tools', [])},
'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])}, 'knowledge_base': {kb.get('kb_id') for kb in resources.get('knowledge_bases', [])},
'file': {f.get('file_id') for f in resources.get('files', [])},
} }
session: AgentRunSession = { session: AgentRunSession = {
@@ -143,15 +144,15 @@ class AgentRunSessionRegistry:
Args: Args:
session: AgentRunSession to check session: AgentRunSession to check
resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage') resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage', 'file')
resource_id: Resource identifier (model_id, tool_name, kb_id) resource_id: Resource identifier (model_id, tool_name, kb_id, 'plugin'/'workspace', file_key)
Returns: Returns:
True if resource is authorized, False otherwise True if resource is authorized, False otherwise
""" """
authorized_ids = session.get('_authorized_ids', {}) authorized_ids = session.get('_authorized_ids', {})
if resource_type in ('model', 'tool', 'knowledge_base'): if resource_type in ('model', 'tool', 'knowledge_base', 'file'):
return resource_id in authorized_ids.get(resource_type, set()) return resource_id in authorized_ids.get(resource_type, set())
if resource_type == 'storage': if resource_type == 'storage':

View File

@@ -46,17 +46,19 @@ async def _validate_run_authorization(
resource_type: str, resource_type: str,
resource_id: str, resource_id: str,
ap: app.Application, ap: app.Application,
caller_plugin_identity: str | None = None,
) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]: ) -> Union[tuple[None, handler.ActionResponse], tuple[Any, None]]:
"""Validate run_id authorization for a resource access. """Validate run_id authorization for a resource access.
Common validation logic for INVOKE_LLM, INVOKE_LLM_STREAM, CALL_TOOL, Common validation logic for INVOKE_LLM, INVOKE_LLM_STREAM, CALL_TOOL,
RETRIEVE_KNOWLEDGE_BASE, and RETRIEVE_KNOWLEDGE actions. RETRIEVE_KNOWLEDGE_BASE, RETRIEVE_KNOWLEDGE, and storage/file actions.
Args: Args:
run_id: The run_id to validate. run_id: The run_id to validate.
resource_type: Resource type ('model', 'tool', 'knowledge_base'). resource_type: Resource type ('model', 'tool', 'knowledge_base', 'storage', 'file').
resource_id: Resource identifier (model_uuid, tool_name, kb_id). resource_id: Resource identifier (model_uuid, tool_name, kb_id, 'plugin'/'workspace', file_key).
ap: Application instance for logging. ap: Application instance for logging.
caller_plugin_identity: Optional plugin identity (author/name) of the caller for cross-plugin validation.
Returns: Returns:
Tuple of (session, None) if validation passes. Tuple of (session, None) if validation passes.
@@ -72,6 +74,18 @@ async def _validate_run_authorization(
message=f'Run session {run_id} not found or expired', message=f'Run session {run_id} not found or expired',
) )
# Validate caller_plugin_identity matches session's plugin_identity
if caller_plugin_identity:
session_plugin_identity = session.get('plugin_identity')
if session_plugin_identity and caller_plugin_identity != session_plugin_identity:
ap.logger.warning(
f'{resource_type.upper()}: caller_plugin_identity {caller_plugin_identity} '
f'does not match session plugin_identity {session_plugin_identity}'
)
return None, handler.ActionResponse.error(
message=f'Plugin identity mismatch: caller {caller_plugin_identity} is not authorized for run_id {run_id}',
)
if not session_registry.is_resource_allowed(session, resource_type, resource_id): if not session_registry.is_resource_allowed(session, resource_type, resource_id):
ap.logger.warning( ap.logger.warning(
f'{resource_type.upper()}: {resource_id} not allowed for run_id {run_id}' f'{resource_type.upper()}: {resource_id} not allowed for run_id {run_id}'
@@ -377,11 +391,12 @@ class RuntimeConnectionHandler(handler.Handler):
funcs = data.get('funcs', []) funcs = data.get('funcs', [])
extra_args = data.get('extra_args', {}) extra_args = data.get('extra_args', {})
run_id = data.get('run_id') # Optional: present for AgentRunner calls run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls # Permission validation for AgentRunner calls
if run_id: if run_id:
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
run_id, 'model', llm_model_uuid, self.ap run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity
) )
if error: if error:
return error return error
@@ -428,11 +443,12 @@ class RuntimeConnectionHandler(handler.Handler):
funcs = data.get('funcs', []) funcs = data.get('funcs', [])
extra_args = data.get('extra_args', {}) extra_args = data.get('extra_args', {})
run_id = data.get('run_id') # Optional: present for AgentRunner calls run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls # Permission validation for AgentRunner calls
if run_id: if run_id:
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
run_id, 'model', llm_model_uuid, self.ap run_id, 'model', llm_model_uuid, self.ap, caller_plugin_identity
) )
if error: if error:
yield error yield error
@@ -476,13 +492,14 @@ class RuntimeConnectionHandler(handler.Handler):
# Support 'tool_parameters' (LangBotAPIProxy) and 'parameters' (AgentRunAPIProxy) # Support 'tool_parameters' (LangBotAPIProxy) and 'parameters' (AgentRunAPIProxy)
parameters = data.get('tool_parameters') or data.get('parameters', {}) parameters = data.get('tool_parameters') or data.get('parameters', {})
run_id = data.get('run_id') # Optional: present for AgentRunner calls run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# session_data = data['session'] # session_data = data['session']
# query_id = data['query_id'] # query_id = data['query_id']
# Permission validation for AgentRunner calls # Permission validation for AgentRunner calls
if run_id: if run_id:
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
run_id, 'tool', tool_name, self.ap run_id, 'tool', tool_name, self.ap, caller_plugin_identity
) )
if error: if error:
return error return error
@@ -511,20 +528,36 @@ class RuntimeConnectionHandler(handler.Handler):
) )
# ================= Binary Storage Handlers ================= # ================= Binary Storage Handlers =================
# NOTE: These are low-level actions called by SDK Runtime's storage wrapper handlers. # Permission validation:
# Permission validation is handled in SDK Runtime layer (not here): # - For AgentRunner calls (with run_id): validates storage permission via session_registry
# - plugin_storage: SDK handler auto-sets owner to caller plugin identity (inherent isolation) # - For regular plugin calls (no run_id): unrestricted access (backward compatibility)
# - workspace_storage: SDK handler should validate session.resources.storage.workspace_storage # - Plugin storage: inherent isolation via owner = plugin identity (set by SDK runtime)
# TODO: SDK storage handlers need to pass run_id and validate workspace_storage permission. # - Workspace storage: requires ctx.resources.storage.workspace_storage for AgentRunner
# Current risk: workspace storage access is unrestricted from AgentRunner context.
@self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE) @self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE)
async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
"""Set binary storage""" """Set binary storage
For AgentRunner calls: validates storage permission via session_registry.
For regular plugin calls: unrestricted access (backward compatibility).
"""
key = data['key'] key = data['key']
owner_type = data['owner_type'] owner_type = data['owner_type']
owner = data['owner'] owner = data['owner']
value = base64.b64decode(data['value_base64']) value = base64.b64decode(data['value_base64'])
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls
if run_id:
# Determine storage type from owner_type
storage_type = owner_type # 'plugin' or 'workspace'
session, error = await _validate_run_authorization(
run_id, 'storage', storage_type, self.ap, caller_plugin_identity
)
if error:
return error
max_value_bytes = ( max_value_bytes = (
self.ap.instance_config.data.get('plugin', {}) self.ap.instance_config.data.get('plugin', {})
.get('binary_storage', {}) .get('binary_storage', {})
@@ -574,10 +607,25 @@ class RuntimeConnectionHandler(handler.Handler):
@self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE) @self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE)
async def get_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: async def get_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
"""Get binary storage""" """Get binary storage
For AgentRunner calls: validates storage permission via session_registry.
For regular plugin calls: unrestricted access (backward compatibility).
"""
key = data['key'] key = data['key']
owner_type = data['owner_type'] owner_type = data['owner_type']
owner = data['owner'] owner = data['owner']
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls
if run_id:
storage_type = owner_type
session, error = await _validate_run_authorization(
run_id, 'storage', storage_type, self.ap, caller_plugin_identity
)
if error:
return error
result = await self.ap.persistence_mgr.execute_async( result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_bstorage.BinaryStorage) sqlalchemy.select(persistence_bstorage.BinaryStorage)
@@ -600,10 +648,25 @@ class RuntimeConnectionHandler(handler.Handler):
@self.action(RuntimeToLangBotAction.DELETE_BINARY_STORAGE) @self.action(RuntimeToLangBotAction.DELETE_BINARY_STORAGE)
async def delete_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: async def delete_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
"""Delete binary storage""" """Delete binary storage
For AgentRunner calls: validates storage permission via session_registry.
For regular plugin calls: unrestricted access (backward compatibility).
"""
key = data['key'] key = data['key']
owner_type = data['owner_type'] owner_type = data['owner_type']
owner = data['owner'] owner = data['owner']
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls
if run_id:
storage_type = owner_type
session, error = await _validate_run_authorization(
run_id, 'storage', storage_type, self.ap, caller_plugin_identity
)
if error:
return error
await self.ap.persistence_mgr.execute_async( await self.ap.persistence_mgr.execute_async(
sqlalchemy.delete(persistence_bstorage.BinaryStorage) sqlalchemy.delete(persistence_bstorage.BinaryStorage)
@@ -618,9 +681,24 @@ class RuntimeConnectionHandler(handler.Handler):
@self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE_KEYS) @self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE_KEYS)
async def get_binary_storage_keys(data: dict[str, Any]) -> handler.ActionResponse: async def get_binary_storage_keys(data: dict[str, Any]) -> handler.ActionResponse:
"""Get binary storage keys""" """Get binary storage keys
For AgentRunner calls: validates storage permission via session_registry.
For regular plugin calls: unrestricted access (backward compatibility).
"""
owner_type = data['owner_type'] owner_type = data['owner_type']
owner = data['owner'] owner = data['owner']
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls
if run_id:
storage_type = owner_type
session, error = await _validate_run_authorization(
run_id, 'storage', storage_type, self.ap, caller_plugin_identity
)
if error:
return error
result = await self.ap.persistence_mgr.execute_async( result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_bstorage.BinaryStorage.key) sqlalchemy.select(persistence_bstorage.BinaryStorage.key)
@@ -636,8 +714,22 @@ class RuntimeConnectionHandler(handler.Handler):
@self.action(PluginToRuntimeAction.GET_CONFIG_FILE) @self.action(PluginToRuntimeAction.GET_CONFIG_FILE)
async def get_config_file(data: dict[str, Any]) -> handler.ActionResponse: async def get_config_file(data: dict[str, Any]) -> handler.ActionResponse:
"""Get a config file by file key""" """Get a config file by file key
For AgentRunner calls: validates file_key against session.resources.files.
For regular plugin calls: unrestricted access (backward compatibility).
"""
file_key = data['file_key'] file_key = data['file_key']
run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls
if run_id:
session, error = await _validate_run_authorization(
run_id, 'file', file_key, self.ap, caller_plugin_identity
)
if error:
return error
try: try:
# Load file from storage # Load file from storage
@@ -672,6 +764,50 @@ class RuntimeConnectionHandler(handler.Handler):
except Exception as e: except Exception as e:
return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid) return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid)
@self.action(PluginToRuntimeAction.INVOKE_RERANK)
async def invoke_rerank(data: dict[str, Any]) -> handler.ActionResponse:
"""Invoke rerank model for agent runner with run-scoped authorization."""
run_id = data.get('run_id')
rerank_model_uuid = data['rerank_model_uuid']
query = data['query']
documents = data['documents']
top_k = data.get('top_k')
# Validate run authorization
session, error = await _validate_run_authorization(
run_id, 'model', rerank_model_uuid, self.ap
)
if error:
return error
# Get rerank model
rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid)
if rerank_model is None:
return handler.ActionResponse.error(
message=f'Rerank model with uuid {rerank_model_uuid} not found',
)
try:
# Cap documents at 64 for API limit
documents_capped = documents[:64]
scores = await rerank_model.provider.invoke_rerank(
model=rerank_model,
query=query,
documents=documents_capped,
)
# Sort by relevance score descending
scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True)
# Apply top_k if specified
if top_k is not None:
scored = scored[:top_k]
return handler.ActionResponse.success(data={'results': scored})
except Exception as e:
return _make_rag_error_response(e, 'RerankError', rerank_model_uuid=rerank_model_uuid)
@self.action(PluginToRuntimeAction.VECTOR_UPSERT) @self.action(PluginToRuntimeAction.VECTOR_UPSERT)
async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse: async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse:
collection_id = data['collection_id'] collection_id = data['collection_id']
@@ -817,11 +953,12 @@ class RuntimeConnectionHandler(handler.Handler):
top_k = data.get('top_k', 5) top_k = data.get('top_k', 5)
filters = data.get('filters', {}) filters = data.get('filters', {})
run_id = data.get('run_id') # Optional: present for AgentRunner calls run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
# Permission validation for AgentRunner calls # Permission validation for AgentRunner calls
if run_id: if run_id:
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
run_id, 'knowledge_base', kb_id, self.ap run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity
) )
if error: if error:
return error return error
@@ -891,12 +1028,6 @@ class RuntimeConnectionHandler(handler.Handler):
Note: This action has dual validation paths: Note: This action has dual validation paths:
- AgentRunner: uses session_registry for permission check - AgentRunner: uses session_registry for permission check
- Regular plugin: uses ConfigMigration.resolve_runner_config for pipeline-level check - Regular plugin: uses ConfigMigration.resolve_runner_config for pipeline-level check
SECURITY TODO: This handler cannot verify the caller's plugin identity.
The session contains 'plugin_identity' (author/name), but we don't have access
to which plugin is making the API call. This could allow a malicious plugin to
use another plugin's run_id if it can guess/obtain it. Future improvement:
track caller plugin identity in RuntimeConnectionHandler or pass it in action data.
""" """
query_id = data['query_id'] query_id = data['query_id']
kb_id = data['kb_id'] kb_id = data['kb_id']
@@ -904,6 +1035,7 @@ class RuntimeConnectionHandler(handler.Handler):
top_k = data.get('top_k', 5) top_k = data.get('top_k', 5)
filters = data.get('filters', {}) filters = data.get('filters', {})
run_id = data.get('run_id') # Optional: present for AgentRunner calls run_id = data.get('run_id') # Optional: present for AgentRunner calls
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
if query_id not in self.ap.query_pool.cached_queries: if query_id not in self.ap.query_pool.cached_queries:
return handler.ActionResponse.error( return handler.ActionResponse.error(
@@ -915,7 +1047,7 @@ class RuntimeConnectionHandler(handler.Handler):
# Permission validation for AgentRunner calls # Permission validation for AgentRunner calls
if run_id: if run_id:
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
run_id, 'knowledge_base', kb_id, self.ap run_id, 'knowledge_base', kb_id, self.ap, caller_plugin_identity
) )
if error: if error:
return error return error

View File

@@ -38,56 +38,11 @@
}, },
"ai": { "ai": {
"runner": { "runner": {
"runner": "local-agent", "id": "plugin:langbot/local-agent/default",
"expire-time": 0 "expire-time": 0
}, },
"local-agent": { "runner_config": {
"model": { "plugin:langbot/local-agent/default": {}
"primary": "",
"fallbacks": []
},
"max-round": 10,
"prompt": [
{
"role": "system",
"content": "You are a helpful assistant."
}
],
"knowledge-bases": [],
"rerank-model": "",
"rerank-top-k": 5
},
"dify-service-api": {
"base-url": "https://api.dify.ai/v1",
"app-type": "chat",
"api-key": "your-api-key",
"timeout": 30
},
"dashscope-app-api": {
"app-type": "agent",
"api-key": "your-api-key",
"app-id": "your-app-id",
"references-quote": "参考资料来自:"
},
"n8n-service-api": {
"webhook-url": "http://your-n8n-webhook-url",
"auth-type": "none",
"basic-username": "",
"basic-password": "",
"jwt-secret": "",
"jwt-algorithm": "HS256",
"header-name": "",
"header-value": "",
"timeout": 120,
"output-key": "response"
},
"langflow-api": {
"base-url": "http://localhost:7860",
"api-key": "your-api-key",
"flow-id": "your-flow-id",
"input-type": "chat",
"output-type": "chat",
"tweaks": "{}"
} }
}, },
"output": { "output": {
@@ -109,4 +64,4 @@
"remove-think": false "remove-think": false
} }
} }
} }

View File

@@ -9,6 +9,7 @@ def make_resources(
tools: list[dict] | None = None, tools: list[dict] | None = None,
knowledge_bases: list[dict] | None = None, knowledge_bases: list[dict] | None = None,
storage: dict | None = None, storage: dict | None = None,
files: list[dict] | None = None,
) -> dict[str, typing.Any]: ) -> dict[str, typing.Any]:
"""Create a minimal AgentResources dict for testing. """Create a minimal AgentResources dict for testing.
@@ -17,6 +18,7 @@ def make_resources(
tools: List of tool dicts with 'tool_name' key tools: List of tool dicts with 'tool_name' key
knowledge_bases: List of KB dicts with 'kb_id' key knowledge_bases: List of KB dicts with 'kb_id' key
storage: Storage permissions dict storage: Storage permissions dict
files: List of file dicts with 'file_id' key
Returns: Returns:
AgentResources dict with all required fields AgentResources dict with all required fields
@@ -25,7 +27,7 @@ def make_resources(
'models': models or [], 'models': models or [],
'tools': tools or [], 'tools': tools or [],
'knowledge_bases': knowledge_bases or [], 'knowledge_bases': knowledge_bases or [],
'files': [], 'files': files or [],
'storage': storage or {'plugin_storage': False, 'workspace_storage': False}, 'storage': storage or {'plugin_storage': False, 'workspace_storage': False},
'platform_capabilities': {}, 'platform_capabilities': {},
} }
@@ -59,6 +61,7 @@ def make_session(
'model': {m.get('model_id') for m in res.get('models', [])}, 'model': {m.get('model_id') for m in res.get('models', [])},
'tool': {t.get('tool_name') for t in res.get('tools', [])}, 'tool': {t.get('tool_name') for t in res.get('tools', [])},
'knowledge_base': {kb.get('kb_id') for kb in res.get('knowledge_bases', [])}, 'knowledge_base': {kb.get('kb_id') for kb in res.get('knowledge_bases', [])},
'file': {f.get('file_id') for f in res.get('files', [])},
} }
return { return {

View File

@@ -1485,29 +1485,31 @@ class TestStorageResourcePermissionHelper:
class TestFilesResourcePermission: class TestFilesResourcePermission:
"""Tests for session_registry.is_resource_allowed for files resource type. """Tests for session_registry.is_resource_allowed for files resource type.
Note: The current implementation does not have 'files' resource type Phase 6: 'files' resource type is now implemented in is_resource_allowed.
in is_resource_allowed. This test class documents the expected behavior
when files resource type is implemented.
""" """
def test_files_resource_type_not_implemented(self): @pytest.mark.asyncio
"""Currently, 'files' resource type returns False in is_resource_allowed.""" async def test_files_resource_type_now_implemented(self):
registry = AgentRunSessionRegistry() """'files' resource type is now implemented in is_resource_allowed."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(files=[{'file_id': 'file_001'}])
session = { await registry.register(
'run_id': 'test', run_id='run_files_implemented',
'runner_id': 'test', runner_id='plugin:test/runner/default',
'query_id': 1, query_id=1,
'plugin_identity': 'test', plugin_identity='test/runner',
'resources': { resources=resources,
'files': [{'file_id': 'file_001'}], )
},
'status': {'started_at': 0, 'last_activity_at': 0},
}
# 'files' resource type is not implemented in is_resource_allowed session = await registry.get('run_files_implemented')
# It returns False for unknown resource types
assert registry.is_resource_allowed(session, 'files', 'file_001') is False # 'files' resource type is now implemented
assert registry.is_resource_allowed(session, 'file', 'file_001') is True
assert registry.is_resource_allowed(session, 'file', 'file_999') is False
await registry.unregister('run_files_implemented')
class TestRealActionHandlerSimulation: class TestRealActionHandlerSimulation:
@@ -1604,7 +1606,7 @@ class TestRealActionHandlerSimulation:
# Try to validate with non-existent run_id # Try to validate with non-existent run_id
session, error = await _validate_run_authorization( session, error = await _validate_run_authorization(
'run_nonexistent_session_sim', 'run_nonexistent_session_flow',
'model', 'model',
'model_001', 'model_001',
mock_ap mock_ap
@@ -1614,4 +1616,362 @@ class TestRealActionHandlerSimulation:
assert session is None assert session is None
assert error is not None assert error is not None
assert 'not found' in error.message.lower() assert 'not found' in error.message.lower()
assert mock_ap.logger.warning.called assert mock_ap.logger.warning.called
class TestStoragePermissionValidation:
"""Tests for Host-side storage permission validation via _validate_run_authorization.
Phase 6: Storage actions (SET/GET/DELETE_BINARY_STORAGE) now validate
storage permissions via _validate_run_authorization when run_id is present.
"""
@pytest.mark.asyncio
async def test_plugin_storage_allowed_when_permitted(self):
"""_validate_run_authorization allows 'plugin' storage when permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': True, 'workspace_storage': False})
await registry.register(
run_id='run_plugin_storage_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_plugin_storage_auth',
'storage',
'plugin',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_plugin_storage_auth')
@pytest.mark.asyncio
async def test_plugin_storage_denied_when_not_permitted(self):
"""_validate_run_authorization denies 'plugin' storage when not permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': False})
await registry.register(
run_id='run_plugin_storage_denied',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_plugin_storage_denied',
'storage',
'plugin',
mock_ap
)
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
await registry.unregister('run_plugin_storage_denied')
@pytest.mark.asyncio
async def test_workspace_storage_allowed_when_permitted(self):
"""_validate_run_authorization allows 'workspace' storage when permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': True})
await registry.register(
run_id='run_workspace_storage_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_workspace_storage_auth',
'storage',
'workspace',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_workspace_storage_auth')
@pytest.mark.asyncio
async def test_workspace_storage_denied_when_not_permitted(self):
"""_validate_run_authorization denies 'workspace' storage when not permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': False})
await registry.register(
run_id='run_workspace_storage_denied',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_workspace_storage_denied',
'storage',
'workspace',
mock_ap
)
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
await registry.unregister('run_workspace_storage_denied')
class TestFilePermissionValidation:
"""Tests for Host-side file permission validation via _validate_run_authorization.
Phase 6: GET_CONFIG_FILE action now validates file permissions
via _validate_run_authorization when run_id is present.
"""
@pytest.mark.asyncio
async def test_file_allowed_when_in_resources(self):
"""_validate_run_authorization allows file when in resources."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(files=[{'file_id': 'file_001'}])
await registry.register(
run_id='run_file_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_file_auth',
'file',
'file_001',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_file_auth')
@pytest.mark.asyncio
async def test_file_denied_when_not_in_resources(self):
"""_validate_run_authorization denies file when not in resources."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(files=[{'file_id': 'file_001'}])
await registry.register(
run_id='run_file_denied',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_file_denied',
'file',
'file_999', # Not in resources
mock_ap
)
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
await registry.unregister('run_file_denied')
class TestCallerPluginIdentityValidation:
"""Tests for caller_plugin_identity cross-plugin validation.
Phase 6: _validate_run_authorization now validates that the caller plugin
identity matches the session's plugin_identity, preventing cross-plugin
unauthorized access if one plugin tries to use another's run_id.
"""
@pytest.mark.asyncio
async def test_same_plugin_identity_allowed(self):
"""_validate_run_authorization allows when caller matches session."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_identity_match',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner', # Session owner
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_identity_match',
'model',
'model_001',
mock_ap,
caller_plugin_identity='test/runner', # Caller is same plugin
)
assert session is not None
assert error is None
await registry.unregister('run_identity_match')
@pytest.mark.asyncio
async def test_different_plugin_identity_denied(self):
"""_validate_run_authorization denies when caller differs from session."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_identity_mismatch',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner', # Session owner
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_identity_mismatch',
'model',
'model_001',
mock_ap,
caller_plugin_identity='other/plugin', # Different plugin trying to use run_id
)
assert session is None
assert error is not None
assert 'mismatch' in error.message.lower()
await registry.unregister('run_identity_mismatch')
@pytest.mark.asyncio
async def test_no_caller_identity_allowed(self):
"""_validate_run_authorization allows when caller_plugin_identity not provided."""
# Backward compatibility: if caller_plugin_identity is None, skip identity check
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_no_caller_identity',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
# caller_plugin_identity not provided (None)
session, error = await _validate_run_authorization(
'run_no_caller_identity',
'model',
'model_001',
mock_ap,
caller_plugin_identity=None, # Not provided
)
# Should pass (backward compat)
assert session is not None
assert error is None
await registry.unregister('run_no_caller_identity')
class TestBackwardCompatStorageNoRunId:
"""Tests for backward compatibility: storage actions without run_id.
Regular plugins (non-AgentRunner) don't have run_id and should
have unrestricted access to storage APIs.
"""
def test_storage_no_run_id_skips_validation(self):
"""Storage actions without run_id skip Host-side validation."""
# Handler.py: if run_id: ...validation...
# When run_id is None, validation is skipped
run_id = None
# Simulate handler logic
if run_id:
raise AssertionError('Should not execute validation')
# Storage access unrestricted for regular plugins
assert run_id is None
def test_file_no_run_id_skips_validation(self):
"""GET_CONFIG_FILE without run_id skips Host-side validation."""
run_id = None
if run_id:
raise AssertionError('Should not execute validation')
# File access unrestricted for regular plugins
assert run_id is None

View File

@@ -108,6 +108,7 @@ class TestSessionRegistryBasic:
'model': set(), 'model': set(),
'tool': set(), 'tool': set(),
'knowledge_base': set(), 'knowledge_base': set(),
'file': set(),
}, },
} }
@@ -169,6 +170,7 @@ class TestSessionRegistryBasic:
'model': set(), 'model': set(),
'tool': set(), 'tool': set(),
'knowledge_base': set(), 'knowledge_base': set(),
'file': set(),
}, },
} }
new_session: AgentRunSession = { new_session: AgentRunSession = {
@@ -185,6 +187,7 @@ class TestSessionRegistryBasic:
'model': set(), 'model': set(),
'tool': set(), 'tool': set(),
'knowledge_base': set(), 'knowledge_base': set(),
'file': set(),
}, },
} }
@@ -328,6 +331,36 @@ class TestIsResourceAllowed:
assert registry.is_resource_allowed(session, 'unknown_type', 'something') is False assert registry.is_resource_allowed(session, 'unknown_type', 'something') is False
def test_file_allowed(self):
"""File in resources should be allowed."""
registry = AgentRunSessionRegistry()
resources = make_resources(
files=[
{'file_id': 'file_001'},
{'file_id': 'file_002'},
]
)
session = make_session(resources=resources)
assert registry.is_resource_allowed(session, 'file', 'file_001') is True
assert registry.is_resource_allowed(session, 'file', 'file_002') is True
def test_file_not_allowed(self):
"""File not in resources should be denied."""
registry = AgentRunSessionRegistry()
resources = make_resources(files=[{'file_id': 'file_001'}])
session = make_session(resources=resources)
assert registry.is_resource_allowed(session, 'file', 'file_999') is False
def test_file_empty_resources(self):
"""Empty files list should deny all."""
registry = AgentRunSessionRegistry()
resources = make_resources(files=[])
session = make_session(resources=resources)
assert registry.is_resource_allowed(session, 'file', 'file_001') is False
def test_missing_resources_field(self): def test_missing_resources_field(self):
"""Missing resources field should not raise.""" """Missing resources field should not raise."""
registry = AgentRunSessionRegistry() registry = AgentRunSessionRegistry()

View File

@@ -374,8 +374,9 @@ export default function PipelineFormComponent({
return null; return null;
} }
// For n8n runner config, use N8nAuthFormComponent for form linkage // For old n8n built-in runner config, use N8nAuthFormComponent for form linkage
if (stage.name === 'n8n-service-api' || stage.name === 'plugin:langbot/n8n-agent/default') { // New plugin:langbot/n8n-agent/default follows normal plugin runner path below
if (stage.name === 'n8n-service-api') {
return ( return (
<Card key={stage.name}> <Card key={stage.name}>
<CardHeader> <CardHeader>