Files
LangBot/tests/unit_tests/agent/test_handler_auth.py
2026-05-19 12:20:28 +08:00

2029 lines
72 KiB
Python

"""Tests for RuntimeConnectionHandler proxy action authorization.
Tests focus on:
- INVOKE_LLM authorization
- INVOKE_LLM_STREAM authorization
- CALL_TOOL authorization
- RETRIEVE_KNOWLEDGE_BASE authorization
Authorization paths:
1. AgentRunner calls: has run_id, validates against session_registry
2. Regular plugin calls: no run_id, unrestricted (backward compatibility)
"""
from __future__ import annotations
import pytest
import types
from unittest.mock import AsyncMock, MagicMock
from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor
from langbot.pkg.agent.runner.session_registry import AgentRunSessionRegistry
from langbot.pkg.plugin.handler import _build_tool_detail, _get_pipeline_knowledge_base_uuids
# Import shared test fixtures from conftest.py
from .conftest import make_resources
class MockModel:
"""Mock LLM model for testing."""
def __init__(self, uuid: str):
self.uuid = uuid
self.provider = MagicMock()
self.provider.invoke_llm = AsyncMock(return_value=MagicMock(model_dump=lambda: {'content': 'response'}))
class MockEmbeddingModel:
"""Mock embedding model for testing."""
def __init__(self, uuid: str):
self.uuid = uuid
self.provider = MagicMock()
class MockKnowledgeBase:
"""Mock knowledge base for testing."""
def __init__(self, uuid: str, name: str = 'KB'):
self.knowledge_base_entity = MagicMock()
self.knowledge_base_entity.description = f'{name} description'
self._uuid = uuid
self._name = name
self.retrieve = AsyncMock(return_value=[])
def get_uuid(self):
return self._uuid
def get_name(self):
return self._name
class MockQuery:
"""Mock query for testing."""
def __init__(self, query_id: int = 1):
self.query_id = query_id
self.session = MagicMock()
self.session.launcher_type = MagicMock()
self.session.launcher_type.value = 'telegram'
self.session.launcher_id = 'group_123'
self.sender_id = 'user_001'
self.bot_uuid = 'bot_001'
self.pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:test/runner/default',
},
'runner_config': {
'plugin:test/runner/default': {
'knowledge-bases': ['kb_001', 'kb_002'],
},
},
},
}
class MockApplication:
"""Mock Application for testing."""
def __init__(self):
self.logger = MagicMock()
self.logger.debug = MagicMock()
self.logger.warning = MagicMock()
self.logger.info = MagicMock()
self.logger.error = MagicMock()
self.query_pool = MagicMock()
self.query_pool.cached_queries = {}
self.model_mgr = MagicMock()
self.model_mgr.get_model_by_uuid = AsyncMock(return_value=None)
self.model_mgr.get_embedding_model_by_uuid = AsyncMock(return_value=None)
self.tool_mgr = MagicMock()
self.tool_mgr.execute_func_call = AsyncMock(return_value={'result': 'success'})
self.rag_mgr = MagicMock()
self.rag_mgr.get_knowledge_base_by_uuid = AsyncMock(return_value=None)
self.rag_mgr.knowledge_bases = {}
self.persistence_mgr = MagicMock()
self.persistence_mgr.execute_async = AsyncMock(return_value=MagicMock(first=lambda: None))
class FakeAgentRunnerRegistry:
async def get(self, runner_id, bound_plugins=None):
return AgentRunnerDescriptor(
id=runner_id,
source='plugin',
label={'en_US': 'Test Runner'},
plugin_author='test',
plugin_name='runner',
runner_name='default',
config_schema=[
{'name': 'knowledge-bases', 'type': 'knowledge-base-multi-selector', 'default': []},
],
capabilities={'knowledge_retrieval': True},
permissions={'knowledge_bases': ['list', 'retrieve']},
)
class MockConnection:
"""Mock connection for testing."""
pass
class TestPipelineKnowledgeBaseScope:
"""Tests for schema-driven pipeline KB scope resolution."""
@pytest.mark.asyncio
async def test_uses_preprocessed_query_scope(self):
app = MockApplication()
query = MockQuery()
query.variables = {'_knowledge_base_uuids': ['kb_var', '__none__', 'kb_var']}
kb_uuids = await _get_pipeline_knowledge_base_uuids(app, query)
assert kb_uuids == ['kb_var']
@pytest.mark.asyncio
async def test_uses_runner_schema_when_query_scope_not_preprocessed(self):
app = MockApplication()
app.agent_runner_registry = FakeAgentRunnerRegistry()
query = MockQuery()
query.variables = {}
kb_uuids = await _get_pipeline_knowledge_base_uuids(app, query)
assert kb_uuids == ['kb_001', 'kb_002']
class MockDisconnectCallback:
"""Mock disconnect callback for testing."""
async def __call__(self):
return True
class TestInvokeLLMAuthorization:
"""Tests for INVOKE_LLM authorization."""
@pytest.mark.asyncio
async def test_invoke_llm_authorized_with_run_id(self):
"""INVOKE_LLM: authorized when model in session.resources."""
# Setup registry with session
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_authorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Verify authorization logic directly
session = await registry.get('run_authorized')
assert session is not None
assert registry.is_resource_allowed(session, 'model', 'model_001') is True
# Cleanup
await registry.unregister('run_authorized')
@pytest.mark.asyncio
async def test_invoke_llm_unauthorized_with_run_id(self):
"""INVOKE_LLM: unauthorized when model not in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_unauthorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Test authorization logic directly
session = await registry.get('run_unauthorized')
assert session is not None
# model_002 is not in resources
assert registry.is_resource_allowed(session, 'model', 'model_002') is False
await registry.unregister('run_unauthorized')
@pytest.mark.asyncio
async def test_invoke_llm_session_not_found(self):
"""INVOKE_LLM: session not found should return error."""
registry = AgentRunSessionRegistry()
# No session registered for this run_id
session = await registry.get('run_nonexistent')
assert session is None
@pytest.mark.asyncio
async def test_invoke_llm_no_run_id_unrestricted(self):
"""INVOKE_LLM: no run_id should be unrestricted (backward compat)."""
# When no run_id is provided, the authorization check is skipped
# This is the backward compatibility path for regular plugin calls
# Simulate: if not run_id, skip authorization
run_id = None
# Authorization check should NOT be triggered
assert run_id is None # No authorization check
class TestInvokeLLMStreamAuthorization:
"""Tests for INVOKE_LLM_STREAM authorization."""
@pytest.mark.asyncio
async def test_invoke_llm_stream_authorized_with_run_id(self):
"""INVOKE_LLM_STREAM: authorized when model in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_stream_authorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_stream_authorized')
assert session is not None
assert registry.is_resource_allowed(session, 'model', 'model_001') is True
await registry.unregister('run_stream_authorized')
@pytest.mark.asyncio
async def test_invoke_llm_stream_unauthorized_with_run_id(self):
"""INVOKE_LLM_STREAM: unauthorized when model not in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_stream_unauthorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_stream_unauthorized')
assert session is not None
assert registry.is_resource_allowed(session, 'model', 'model_002') is False
await registry.unregister('run_stream_unauthorized')
@pytest.mark.asyncio
async def test_invoke_llm_stream_no_run_id_unrestricted(self):
"""INVOKE_LLM_STREAM: no run_id should be unrestricted."""
run_id = None
# No authorization check
assert run_id is None
def test_build_tool_detail_normalizes_plugin_component_manifest():
"""GET_TOOL_DETAIL returns a uniform schema for ordinary plugin Tool manifests."""
manifest_tool = types.SimpleNamespace(
metadata=types.SimpleNamespace(
name='search',
label={'en_US': 'Search'},
description={'en_US': 'Search public data'},
),
spec={
'llm_prompt': 'Search test data',
'parameters': {
'type': 'object',
'properties': {'q': {'type': 'string'}},
},
},
)
detail = _build_tool_detail(manifest_tool, requested_tool_name='author/plugin/search')
assert detail['name'] == 'author/plugin/search'
assert detail['description'] == 'Search test data'
assert detail['human_desc'] == 'Search test data'
assert detail['parameters']['properties']['q']['type'] == 'string'
assert detail['label'] == {'en_US': 'Search'}
assert detail['spec'] == manifest_tool.spec
class TestCallToolAuthorization:
"""Tests for CALL_TOOL authorization."""
@pytest.mark.asyncio
async def test_call_tool_authorized_with_run_id(self):
"""CALL_TOOL: authorized when tool in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(tools=[{'tool_name': 'web_search'}])
await registry.register(
run_id='run_tool_authorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_tool_authorized')
assert session is not None
assert registry.is_resource_allowed(session, 'tool', 'web_search') is True
await registry.unregister('run_tool_authorized')
@pytest.mark.asyncio
async def test_call_tool_unauthorized_with_run_id(self):
"""CALL_TOOL: unauthorized when tool not in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(tools=[{'tool_name': 'web_search'}])
await registry.register(
run_id='run_tool_unauthorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_tool_unauthorized')
assert session is not None
assert registry.is_resource_allowed(session, 'tool', 'image_gen') is False
await registry.unregister('run_tool_unauthorized')
@pytest.mark.asyncio
async def test_call_tool_no_run_id_unrestricted(self):
"""CALL_TOOL: no run_id should be unrestricted."""
run_id = None
# No authorization check
assert run_id is None
class TestRetrieveKnowledgeBaseAuthorization:
"""Tests for RETRIEVE_KNOWLEDGE_BASE authorization."""
@pytest.mark.asyncio
async def test_retrieve_knowledge_base_authorized_with_run_id(self):
"""RETRIEVE_KNOWLEDGE_BASE: authorized when kb in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(knowledge_bases=[{'kb_id': 'kb_001'}])
await registry.register(
run_id='run_kb_authorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_kb_authorized')
assert session is not None
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_001') is True
await registry.unregister('run_kb_authorized')
@pytest.mark.asyncio
async def test_retrieve_knowledge_base_unauthorized_with_run_id(self):
"""RETRIEVE_KNOWLEDGE_BASE: unauthorized when kb not in session.resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(knowledge_bases=[{'kb_id': 'kb_001'}])
await registry.register(
run_id='run_kb_unauthorized',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_kb_unauthorized')
assert session is not None
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_999') is False
await registry.unregister('run_kb_unauthorized')
@pytest.mark.asyncio
async def test_retrieve_knowledge_base_no_run_id_pipeline_check(self):
"""RETRIEVE_KNOWLEDGE_BASE: no run_id checks pipeline config."""
# When no run_id, the handler checks against pipeline's configured KBs
# This is the backward compatibility path for regular plugin calls
from langbot.pkg.agent.runner.config_migration import ConfigMigration
# Simulate pipeline config
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:test/runner/default',
},
'runner_config': {
'plugin:test/runner/default': {
'knowledge-bases': ['kb_001', 'kb_002'],
},
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
assert runner_id == 'plugin:test/runner/default'
runner_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
allowed_kbs = runner_config.get('knowledge-bases', [])
assert 'kb_001' in allowed_kbs
assert 'kb_999' not in allowed_kbs
class TestAuthorizationPathDifferentiation:
"""Tests that verify AgentRunner vs regular plugin call differentiation."""
@pytest.mark.asyncio
async def test_agent_runner_path_with_run_id(self):
"""AgentRunner calls provide run_id and use session_registry."""
registry = AgentRunSessionRegistry()
# AgentRunner call has run_id
run_id = 'run_agent_123'
# Register session with resources
await registry.register(
run_id=run_id,
runner_id='plugin:test/agent/default',
query_id=1,
plugin_identity='test/agent',
resources=make_resources(
models=[{'model_id': 'model_xyz'}],
tools=[{'tool_name': 'agent_tool'}],
knowledge_bases=[{'kb_id': 'kb_agent'}],
),
)
session = await registry.get(run_id)
assert session is not None
# Authorization checks
assert registry.is_resource_allowed(session, 'model', 'model_xyz') is True
assert registry.is_resource_allowed(session, 'model', 'other_model') is False
assert registry.is_resource_allowed(session, 'tool', 'agent_tool') is True
assert registry.is_resource_allowed(session, 'tool', 'other_tool') is False
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_agent') is True
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_other') is False
await registry.unregister(run_id)
@pytest.mark.asyncio
async def test_regular_plugin_path_no_run_id(self):
"""Regular plugin calls have no run_id and skip session check."""
# Regular plugin call has no run_id
run_id = None
# Authorization check should be skipped when run_id is None.
# This is handled in handler.py with: if run_id: ...
assert run_id is None
# For regular plugins:
# - INVOKE_LLM: unrestricted access to any model
# - CALL_TOOL: unrestricted access to any tool
# - RETRIEVE_KNOWLEDGE_BASE: checks pipeline config instead
class TestHandlerAuthorizationErrorMessages:
"""Tests for error message content in authorization failures."""
def test_model_not_authorized_error_message(self):
"""Error message should mention model not authorized."""
expected_msg = "Model model_999 is not authorized for this agent run"
assert 'not authorized' in expected_msg
assert 'model_999' in expected_msg
def test_tool_not_authorized_error_message(self):
"""Error message should mention tool not authorized."""
expected_msg = "Tool image_gen is not authorized for this agent run"
assert 'not authorized' in expected_msg
assert 'image_gen' in expected_msg
def test_kb_not_authorized_error_message(self):
"""Error message should mention kb not authorized."""
expected_msg = "Knowledge base kb_999 is not authorized for this agent run"
assert 'not authorized' in expected_msg
assert 'kb_999' in expected_msg
def test_session_not_found_error_message(self):
"""Error message should mention session not found."""
expected_msg = "Run session run_xyz not found or expired"
assert 'not found' in expected_msg
assert 'run_xyz' in expected_msg
class TestRETRIEVEKNOWLEDGEBASEBugFix:
"""Tests for the RETRIEVE_KNOWLEDGE_BASE bug fix in handler.py.
Bug: Previously, the handler directly accessed pipeline_config['ai']['local-agent']
without first resolving the runner_id, causing issues when non-local-agent runners
were used.
Fix: Now uses ConfigMigration.resolve_runner_id first, then resolve_runner_config.
"""
def test_retrieve_kb_fix_local_agent_runner(self):
"""Fix should work for local-agent runner."""
from langbot.pkg.agent.runner.config_migration import ConfigMigration
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:langbot/local-agent/default',
},
'runner_config': {
'plugin:langbot/local-agent/default': {
'knowledge-bases': ['kb_001'],
},
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
runner_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
allowed_kbs = runner_config.get('knowledge-bases', [])
assert 'kb_001' in allowed_kbs
def test_retrieve_kb_fix_other_runner(self):
"""Fix should work for non-local-agent runners."""
from langbot.pkg.agent.runner.config_migration import ConfigMigration
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:custom/my-agent/default',
},
'runner_config': {
'plugin:custom/my-agent/default': {
'knowledge-bases': ['kb_custom'],
},
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
runner_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
allowed_kbs = runner_config.get('knowledge-bases', [])
assert 'kb_custom' in allowed_kbs
def test_retrieve_kb_fix_old_format(self):
"""Fix should work for old format pipeline config."""
from langbot.pkg.agent.runner.config_migration import ConfigMigration
# Old format: ai.runner.runner = 'local-agent'
pipeline_config = {
'ai': {
'runner': {
'runner': 'local-agent',
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
# Should resolve to plugin:langbot/local-agent/default
assert 'local-agent' in runner_id
def test_retrieve_kb_fix_backward_compat_knowledge_base(self):
"""Fix should handle backward compat for old 'knowledge-base' field."""
from langbot.pkg.agent.runner.config_migration import ConfigMigration
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:langbot/local-agent/default',
},
'runner_config': {
'plugin:langbot/local-agent/default': {
'knowledge-base': 'kb_single', # Old singular field
},
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
runner_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
# Handler.py checks both knowledge-bases and knowledge-base
allowed_kbs = runner_config.get('knowledge-bases', [])
if not allowed_kbs:
old_kb = runner_config.get('knowledge-base', '')
if old_kb and old_kb != '__none__':
allowed_kbs = [old_kb]
assert 'kb_single' in allowed_kbs
class TestHandlerActionAuthorization:
"""Tests for real handler action-level authorization.
These tests simulate RuntimeConnectionHandler action handlers
to verify actual authorization behavior at the action level.
"""
@pytest.mark.asyncio
async def test_invoke_llm_handler_authorized_path(self):
"""INVOKE_LLM handler: authorized when model in resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_invoke_llm_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Simulate handler authorization logic
run_id = 'run_invoke_llm_auth'
llm_model_uuid = 'model_001'
session_registry = registry
session = await session_registry.get(run_id)
assert session is not None
# Authorization check (same as handler.py line 352)
is_allowed = session_registry.is_resource_allowed(session, 'model', llm_model_uuid)
assert is_allowed is True
await registry.unregister(run_id)
@pytest.mark.asyncio
async def test_invoke_llm_handler_unauthorized_path(self):
"""INVOKE_LLM handler: unauthorized when model not in resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_invoke_llm_unauth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
run_id = 'run_invoke_llm_unauth'
llm_model_uuid = 'model_999' # Not in resources
session_registry = registry
session = await session_registry.get(run_id)
assert session is not None
# Authorization check (same as handler.py line 352)
is_allowed = session_registry.is_resource_allowed(session, 'model', llm_model_uuid)
assert is_allowed is False
# Should return error response (handler.py line 357)
expected_error = f'Model {llm_model_uuid} is not authorized for this agent run'
assert 'not authorized' in expected_error
await registry.unregister(run_id)
@pytest.mark.asyncio
async def test_invoke_llm_handler_session_not_found(self):
"""INVOKE_LLM handler: session not found returns error."""
registry = AgentRunSessionRegistry()
# No session registered
run_id = 'run_nonexistent'
session = await registry.get(run_id)
assert session is None
# Handler should return error (handler.py line 348)
expected_error = f'Run session {run_id} not found or expired'
assert 'not found' in expected_error
@pytest.mark.asyncio
async def test_invoke_llm_handler_no_run_id_unrestricted(self):
"""INVOKE_LLM handler: no run_id skips authorization (backward compat)."""
# Simulate handler logic: if not run_id, skip authorization
run_id = None
# In handler.py, authorization check is inside: if run_id: ...
# So when run_id is None, authorization is skipped.
assert run_id is None
@pytest.mark.asyncio
async def test_call_tool_handler_authorized_path(self):
"""CALL_TOOL handler: authorized when tool in resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(tools=[{'tool_name': 'web_search'}])
await registry.register(
run_id='run_call_tool_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
run_id = 'run_call_tool_auth'
tool_name = 'web_search'
session_registry = registry
session = await session_registry.get(run_id)
assert session is not None
# Authorization check (handler.py line 475)
is_allowed = session_registry.is_resource_allowed(session, 'tool', tool_name)
assert is_allowed is True
await registry.unregister(run_id)
@pytest.mark.asyncio
async def test_call_tool_handler_unauthorized_path(self):
"""CALL_TOOL handler: unauthorized when tool not in resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(tools=[{'tool_name': 'web_search'}])
await registry.register(
run_id='run_call_tool_unauth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
run_id = 'run_call_tool_unauth'
tool_name = 'image_gen' # Not in resources
session_registry = registry
session = await session_registry.get(run_id)
assert session is not None
# Authorization check
is_allowed = session_registry.is_resource_allowed(session, 'tool', tool_name)
assert is_allowed is False
# Should return error (handler.py line 480)
expected_error = f'Tool {tool_name} is not authorized for this agent run'
assert 'not authorized' in expected_error
await registry.unregister(run_id)
@pytest.mark.asyncio
async def test_call_tool_handler_no_run_id_unrestricted(self):
"""CALL_TOOL handler: no run_id skips authorization."""
run_id = None
# Authorization check is inside: if run_id: ...
assert run_id is None
@pytest.mark.asyncio
async def test_retrieve_knowledge_base_handler_authorized_path(self):
"""RETRIEVE_KNOWLEDGE_BASE handler: authorized when kb in resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(knowledge_bases=[{'kb_id': 'kb_001'}])
await registry.register(
run_id='run_kb_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
run_id = 'run_kb_auth'
kb_id = 'kb_001'
session_registry = registry
session = await session_registry.get(run_id)
assert session is not None
# Authorization check (handler.py line 889)
is_allowed = session_registry.is_resource_allowed(session, 'knowledge_base', kb_id)
assert is_allowed is True
await registry.unregister(run_id)
@pytest.mark.asyncio
async def test_retrieve_knowledge_base_handler_unauthorized_path(self):
"""RETRIEVE_KNOWLEDGE_BASE handler: unauthorized when kb not in resources."""
registry = AgentRunSessionRegistry()
resources = make_resources(knowledge_bases=[{'kb_id': 'kb_001'}])
await registry.register(
run_id='run_kb_unauth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
run_id = 'run_kb_unauth'
kb_id = 'kb_999' # Not in resources
session_registry = registry
session = await session_registry.get(run_id)
assert session is not None
# Authorization check
is_allowed = session_registry.is_resource_allowed(session, 'knowledge_base', kb_id)
assert is_allowed is False
# Should return error (handler.py line 894)
expected_error = f'Knowledge base {kb_id} is not authorized for this agent run'
assert 'not authorized' in expected_error
await registry.unregister(run_id)
class TestSDKAgentRunAPIProxyFieldConsistency:
"""Tests for SDK AgentRunAPIProxy field name consistency with Host handler.
These tests verify that SDK sends field names that match what Host handler reads.
"""
def test_call_tool_field_names_match(self):
"""CALL_TOOL: SDK 'parameters' matches Host 'parameters'."""
# SDK agent_run_api.py line 146: "parameters": parameters
# Host handler.py line 457: parameters = data['parameters']
sdk_field = 'parameters'
host_field = 'parameters'
assert sdk_field == host_field
def test_call_tool_run_id_field_present(self):
"""CALL_TOOL: SDK includes 'run_id' field."""
# SDK agent_run_api.py line 144: "run_id": self.run_id
# Host handler.py line 458: run_id = data.get('run_id')
sdk_fields = ['run_id', 'tool_name', 'parameters', 'session', 'query_id']
host_expected_fields = ['tool_name', 'parameters', 'run_id']
for field in host_expected_fields:
assert field in sdk_fields
def test_invoke_llm_field_names_match(self):
"""INVOKE_LLM: SDK fields match Host handler."""
# SDK agent_run_api.py lines 77-82
sdk_fields = ['run_id', 'llm_model_uuid', 'messages', 'funcs', 'extra_args', 'timeout']
# Host handler.py lines 333-337
host_fields = ['llm_model_uuid', 'messages', 'funcs', 'extra_args', 'run_id']
for field in host_fields:
assert field in sdk_fields
def test_invoke_llm_stream_field_names_match(self):
"""INVOKE_LLM_STREAM: SDK fields match Host handler."""
# SDK agent_run_api.py lines 111-116
sdk_fields = ['run_id', 'llm_model_uuid', 'messages', 'funcs', 'extra_args']
# Host handler.py lines 397-401
host_fields = ['llm_model_uuid', 'messages', 'funcs', 'extra_args', 'run_id']
for field in host_fields:
assert field in sdk_fields
def test_retrieve_knowledge_base_field_names_match(self):
"""RETRIEVE_KNOWLEDGE_BASE: SDK fields match Host handler."""
# SDK agent_run_api.py lines 178-183
sdk_fields = ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']
# Note: query_id is from query context, not SDK proxy
for field in ['run_id', 'kb_id', 'query_text', 'top_k', 'filters']:
assert field in sdk_fields
def test_retrieve_knowledge_base_action_enum_correct(self):
"""RETRIEVE_KNOWLEDGE_BASE: SDK uses correct action enum."""
from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction
# SDK agent_run_api.py line 178: PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE
# Host handler.py line 851: @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE)
action = PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE
assert action.value == 'retrieve_knowledge_base'
# Verify it's different from unrestricted RETRIEVE_KNOWLEDGE
unrestricted_action = PluginToRuntimeAction.RETRIEVE_KNOWLEDGE
assert unrestricted_action.value == 'retrieve_knowledge'
assert action != unrestricted_action
class TestNoRunIdBackwardCompatPath:
"""Tests for backward compatibility path when no run_id is provided.
Regular plugins (non-AgentRunner) don't have run_id and should
have unrestricted access to certain APIs.
"""
@pytest.mark.asyncio
async def test_invoke_llm_no_run_id_unrestricted_access(self):
"""INVOKE_LLM: no run_id means unrestricted model access."""
# Handler.py line 340: if run_id: ...
# When run_id is None, the authorization block is skipped
run_id = None
llm_model_uuid = 'any_model'
# Simulate handler logic: no run_id skips authorization.
assert run_id is None
# Model can be any UUID (unrestricted)
assert llm_model_uuid == 'any_model'
@pytest.mark.asyncio
async def test_call_tool_no_run_id_unrestricted_access(self):
"""CALL_TOOL: no run_id means unrestricted tool access."""
run_id = None
tool_name = 'any_tool'
# Handler.py line 463: if run_id: ...
assert run_id is None
assert tool_name == 'any_tool'
@pytest.mark.asyncio
async def test_retrieve_knowledge_base_no_run_id_pipeline_check(self):
"""RETRIEVE_KNOWLEDGE_BASE: no run_id uses pipeline config check."""
from langbot.pkg.agent.runner.config_migration import ConfigMigration
# When no run_id, handler.py lines 897-914 check pipeline config
pipeline_config = {
'ai': {
'runner': {
'id': 'plugin:test/runner/default',
},
'runner_config': {
'plugin:test/runner/default': {
'knowledge-bases': ['kb_001', 'kb_002'],
},
},
},
}
runner_id = ConfigMigration.resolve_runner_id(pipeline_config)
runner_config = ConfigMigration.resolve_runner_config(pipeline_config, runner_id)
allowed_kb_uuids = runner_config.get('knowledge-bases', [])
# kb_001 should be allowed
assert 'kb_001' in allowed_kb_uuids
# kb_999 should NOT be allowed
assert 'kb_999' not in allowed_kb_uuids
class TestSessionExpiryAndCleanup:
"""Tests for session expiry and cleanup scenarios."""
@pytest.mark.asyncio
async def test_session_expiry_detection(self):
"""Session expiry: old session should be considered expired."""
import time
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
# Register session
await registry.register(
run_id='run_expiry_test',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_expiry_test')
assert session is not None
# Check session status
started_at = session['status']['started_at']
last_activity = session['status']['last_activity_at']
assert last_activity >= started_at
# Session should be valid initially
current_time = int(time.time())
assert current_time - started_at < 10 # Less than 10 seconds old
await registry.unregister('run_expiry_test')
@pytest.mark.asyncio
async def test_cleanup_stale_sessions(self):
"""Cleanup: stale sessions should be removed."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
# Register session
await registry.register(
run_id='run_cleanup_test',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Session exists
session = await registry.get('run_cleanup_test')
assert session is not None
# Cleanup with max_age=0 (immediate cleanup)
# Note: This won't actually cleanup because session is just created
# We need to manually test cleanup logic
cleaned = await registry.cleanup_stale_sessions(max_age_seconds=0)
assert isinstance(cleaned, int)
# Session should still exist (it was just created)
# With max_age=0, sessions with last_activity > 0 seconds ago would be cleaned
# But since it's just created, last_activity_at is current time
session_after = await registry.get('run_cleanup_test')
assert session_after is not None
await registry.unregister('run_cleanup_test')
@pytest.mark.asyncio
async def test_unregister_removes_session(self):
"""Unregister: session should be removed from registry."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_unregister_test',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Session exists
session = await registry.get('run_unregister_test')
assert session is not None
# Unregister
await registry.unregister('run_unregister_test')
# Session should not exist
session_after = await registry.get('run_unregister_test')
assert session_after is None
class TestResourceTypeValidation:
"""Tests for different resource type validation in is_resource_allowed."""
@pytest.mark.asyncio
async def test_model_resource_validation(self):
"""Model resource: correct model_id validation."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[
{'model_id': 'model_001'},
{'model_id': 'model_002'},
])
await registry.register(
run_id='run_model_validation',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_model_validation')
# Authorized models
assert registry.is_resource_allowed(session, 'model', 'model_001') is True
assert registry.is_resource_allowed(session, 'model', 'model_002') is True
# Unauthorized models
assert registry.is_resource_allowed(session, 'model', 'model_999') is False
await registry.unregister('run_model_validation')
@pytest.mark.asyncio
async def test_tool_resource_validation(self):
"""Tool resource: correct tool_name validation."""
registry = AgentRunSessionRegistry()
resources = make_resources(tools=[
{'tool_name': 'web_search'},
{'tool_name': 'image_gen'},
])
await registry.register(
run_id='run_tool_validation',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_tool_validation')
# Authorized tools
assert registry.is_resource_allowed(session, 'tool', 'web_search') is True
assert registry.is_resource_allowed(session, 'tool', 'image_gen') is True
# Unauthorized tools
assert registry.is_resource_allowed(session, 'tool', 'file_upload') is False
await registry.unregister('run_tool_validation')
@pytest.mark.asyncio
async def test_knowledge_base_resource_validation(self):
"""Knowledge base resource: correct kb_id validation."""
registry = AgentRunSessionRegistry()
resources = make_resources(knowledge_bases=[
{'kb_id': 'kb_001'},
{'kb_id': 'kb_002'},
])
await registry.register(
run_id='run_kb_validation',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_kb_validation')
# Authorized KBs
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_001') is True
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_002') is True
# Unauthorized KBs
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_999') is False
await registry.unregister('run_kb_validation')
@pytest.mark.asyncio
async def test_storage_resource_validation(self):
"""Storage resource: boolean permission validation."""
registry = AgentRunSessionRegistry()
resources = make_resources()
resources['storage'] = {'plugin_storage': True, 'workspace_storage': False}
await registry.register(
run_id='run_storage_validation',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_storage_validation')
# Plugin storage allowed
assert registry.is_resource_allowed(session, 'storage', 'plugin') is True
# Workspace storage not allowed
assert registry.is_resource_allowed(session, 'storage', 'workspace') is False
await registry.unregister('run_storage_validation')
def test_unknown_resource_type_returns_false(self):
"""Unknown resource type: should return False."""
registry = AgentRunSessionRegistry()
resources = make_resources()
# Create session manually for this test
session = {
'run_id': 'test',
'runner_id': 'test',
'query_id': 1,
'plugin_identity': 'test',
'resources': resources,
'status': {'started_at': 0, 'last_activity_at': 0},
}
# Unknown resource type should return False
assert registry.is_resource_allowed(session, 'unknown_type', 'any_id') is False
class TestBypassPrevention:
"""Tests to ensure AgentRunAPIProxy cannot bypass authorization."""
@pytest.mark.asyncio
async def test_cannot_bypass_via_unrestricted_retrieve_knowledge(self):
"""Cannot bypass KB authorization via unrestricted RETRIEVE_KNOWLEDGE action."""
# AgentRunAPIProxy uses RETRIEVE_KNOWLEDGE_BASE (with run_id)
# RETRIEVE_KNOWLEDGE is unrestricted and separate
# AgentRunner should NOT use RETRIEVE_KNOWLEDGE to bypass authorization
registry = AgentRunSessionRegistry()
resources = make_resources(knowledge_bases=[{'kb_id': 'kb_001'}])
await registry.register(
run_id='run_bypass_test',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_bypass_test')
# kb_002 is not authorized
assert registry.is_resource_allowed(session, 'knowledge_base', 'kb_002') is False
# If AgentRunner tried to use RETRIEVE_KNOWLEDGE (unrestricted),
# it would bypass authorization - but AgentRunAPIProxy correctly uses
# RETRIE_KNOWLEDGE_BASE which requires authorization
from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction
# Verify SDK uses correct action
assert PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE.value == 'retrieve_knowledge_base'
await registry.unregister('run_bypass_test')
@pytest.mark.asyncio
async def test_cannot_bypass_via_missing_run_id_in_session(self):
"""Cannot bypass by using run_id that doesn't exist in registry."""
registry = AgentRunSessionRegistry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_valid',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Try to use a run_id that doesn't exist
fake_run_id = 'run_fake'
session = await registry.get(fake_run_id)
assert session is None
# Handler should return error for non-existent run_id
# (handler.py line 348, 466, 881)
expected_error = f'Run session {fake_run_id} not found or expired'
assert 'not found' in expected_error
await registry.unregister('run_valid')
class TestValidateRunAuthorizationHelper:
"""Tests for _validate_run_authorization helper function.
This helper is used by INVOKE_LLM, INVOKE_LLM_STREAM, CALL_TOOL,
and RETRIEVE_KNOWLEDGE_BASE handlers to validate run_id authorization.
Note: This helper uses get_session_registry() which returns the global singleton.
Tests must use the same global registry.
"""
@pytest.mark.asyncio
async def test_validate_returns_session_when_authorized(self):
"""_validate_run_authorization returns session when resource is authorized."""
# Use global session registry (same as _validate_run_authorization)
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_validate_test_helper',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Import the helper
from langbot.pkg.plugin.handler import _validate_run_authorization
# Create mock application
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_validate_test_helper',
'model',
'model_001',
mock_ap
)
# Should return session, no error
assert session is not None
assert error is None
assert session['run_id'] == 'run_validate_test_helper'
await registry.unregister('run_validate_test_helper')
@pytest.mark.asyncio
async def test_validate_returns_error_when_session_not_found(self):
"""_validate_run_authorization returns error when session not found."""
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_nonexistent_helper',
'model',
'model_001',
mock_ap
)
# Should return no session, error response
assert session is None
assert error is not None
assert 'not found' in error.message.lower()
assert mock_ap.logger.warning.called
@pytest.mark.asyncio
async def test_validate_returns_error_when_resource_not_allowed(self):
"""_validate_run_authorization returns error when resource not allowed."""
# Use global session registry
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_unauthorized_helper',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_unauthorized_helper',
'model',
'model_999', # Not in resources
mock_ap
)
# Should return no session, error response
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
assert mock_ap.logger.warning.called
await registry.unregister('run_unauthorized_helper')
@pytest.mark.asyncio
async def test_validate_for_tool_resource_type(self):
"""_validate_run_authorization works for tool resource type."""
# Use global session registry
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(tools=[{'tool_name': 'web_search'}])
await registry.register(
run_id='run_tool_test_helper',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_tool_test_helper',
'tool',
'web_search',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_tool_test_helper')
@pytest.mark.asyncio
async def test_validate_for_knowledge_base_resource_type(self):
"""_validate_run_authorization works for knowledge_base resource type."""
# Use global session registry
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(knowledge_bases=[{'kb_id': 'kb_001'}])
await registry.register(
run_id='run_kb_test_helper',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_kb_test_helper',
'knowledge_base',
'kb_001',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_kb_test_helper')
class TestStorageResourcePermissionHelper:
"""Tests for session_registry.is_resource_allowed for storage resource type.
The 'storage' resource type has different permission model:
- resource_id can be 'plugin' or 'workspace'
- Permission is boolean flag, not list membership
"""
@pytest.mark.asyncio
async def test_plugin_storage_allowed_when_true(self):
"""is_resource_allowed returns True when plugin_storage=True."""
registry = AgentRunSessionRegistry()
resources = make_resources()
resources['storage'] = {'plugin_storage': True, 'workspace_storage': False}
await registry.register(
run_id='run_plugin_storage',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_plugin_storage')
assert registry.is_resource_allowed(session, 'storage', 'plugin') is True
assert registry.is_resource_allowed(session, 'storage', 'workspace') is False
await registry.unregister('run_plugin_storage')
@pytest.mark.asyncio
async def test_workspace_storage_allowed_when_true(self):
"""is_resource_allowed returns True when workspace_storage=True."""
registry = AgentRunSessionRegistry()
resources = make_resources()
resources['storage'] = {'plugin_storage': False, 'workspace_storage': True}
await registry.register(
run_id='run_workspace_storage',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_workspace_storage')
assert registry.is_resource_allowed(session, 'storage', 'plugin') is False
assert registry.is_resource_allowed(session, 'storage', 'workspace') is True
await registry.unregister('run_workspace_storage')
@pytest.mark.asyncio
async def test_both_storage_types_disabled(self):
"""is_resource_allowed returns False when both storage types disabled."""
registry = AgentRunSessionRegistry()
resources = make_resources()
resources['storage'] = {'plugin_storage': False, 'workspace_storage': False}
await registry.register(
run_id='run_no_storage',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_no_storage')
assert registry.is_resource_allowed(session, 'storage', 'plugin') is False
assert registry.is_resource_allowed(session, 'storage', 'workspace') is False
await registry.unregister('run_no_storage')
@pytest.mark.asyncio
async def test_unknown_storage_resource_id_returns_false(self):
"""is_resource_allowed returns False for unknown storage resource_id."""
registry = AgentRunSessionRegistry()
resources = make_resources()
resources['storage'] = {'plugin_storage': True, 'workspace_storage': True}
await registry.register(
run_id='run_unknown_storage',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_unknown_storage')
# Unknown storage resource_id
assert registry.is_resource_allowed(session, 'storage', 'unknown_type') is False
await registry.unregister('run_unknown_storage')
def test_storage_permission_with_missing_storage_field(self):
"""is_resource_allowed handles missing storage field gracefully."""
registry = AgentRunSessionRegistry()
# Create session without storage field
session = {
'run_id': 'test',
'runner_id': 'test',
'query_id': 1,
'plugin_identity': 'test',
'resources': {}, # No storage field
'status': {'started_at': 0, 'last_activity_at': 0},
}
# Should return False for both storage types
assert registry.is_resource_allowed(session, 'storage', 'plugin') is False
assert registry.is_resource_allowed(session, 'storage', 'workspace') is False
class TestFilesResourcePermission:
"""Tests for session_registry.is_resource_allowed for files resource type.
Phase 6: 'files' resource type is now implemented in is_resource_allowed.
"""
@pytest.mark.asyncio
async def test_files_resource_type_now_implemented(self):
"""'files' resource type is now implemented in is_resource_allowed."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(files=[{'file_id': 'file_001'}])
await registry.register(
run_id='run_files_implemented',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
session = await registry.get('run_files_implemented')
# 'files' resource type is now implemented
assert registry.is_resource_allowed(session, 'file', 'file_001') is True
assert registry.is_resource_allowed(session, 'file', 'file_999') is False
await registry.unregister('run_files_implemented')
class TestRealActionHandlerSimulation:
"""Tests that simulate real RuntimeConnectionHandler action registration and execution.
These tests attempt to verify the actual handler behavior without full integration.
Uses global session registry to match _validate_run_authorization behavior.
"""
@pytest.mark.asyncio
async def test_action_handler_invoke_llm_flow(self):
"""Simulate INVOKE_LLM action handler authorization flow."""
# Use global session registry
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_invoke_llm_flow_sim',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
# Simulate handler logic
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
# Step 1: Validate authorization
session, error = await _validate_run_authorization(
'run_invoke_llm_flow_sim',
'model',
'model_001',
mock_ap
)
# Should pass authorization
assert session is not None
assert error is None
# Step 2: Handler would invoke LLM (not tested here, would need mock model)
await registry.unregister('run_invoke_llm_flow_sim')
@pytest.mark.asyncio
async def test_action_handler_rejects_unauthorized_model(self):
"""Simulate INVOKE_LLM handler rejecting unauthorized model."""
# Use global session registry
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_reject_model_sim',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
# Try to access unauthorized model
session, error = await _validate_run_authorization(
'run_reject_model_sim',
'model',
'model_999',
mock_ap
)
# Should reject
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
assert mock_ap.logger.warning.called
await registry.unregister('run_reject_model_sim')
@pytest.mark.asyncio
async def test_action_handler_session_not_found_flow(self):
"""Simulate handler behavior when session not found."""
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
# Try to validate with non-existent run_id
session, error = await _validate_run_authorization(
'run_nonexistent_session_flow',
'model',
'model_001',
mock_ap
)
# Should return error
assert session is None
assert error is not None
assert 'not found' in error.message.lower()
assert mock_ap.logger.warning.called
class TestStoragePermissionValidation:
"""Tests for Host-side storage permission validation via _validate_run_authorization.
Phase 6: Storage actions (SET/GET/DELETE_BINARY_STORAGE) now validate
storage permissions via _validate_run_authorization when run_id is present.
"""
@pytest.mark.asyncio
async def test_plugin_storage_allowed_when_permitted(self):
"""_validate_run_authorization allows 'plugin' storage when permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': True, 'workspace_storage': False})
await registry.register(
run_id='run_plugin_storage_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_plugin_storage_auth',
'storage',
'plugin',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_plugin_storage_auth')
@pytest.mark.asyncio
async def test_plugin_storage_denied_when_not_permitted(self):
"""_validate_run_authorization denies 'plugin' storage when not permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': False})
await registry.register(
run_id='run_plugin_storage_denied',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_plugin_storage_denied',
'storage',
'plugin',
mock_ap
)
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
await registry.unregister('run_plugin_storage_denied')
@pytest.mark.asyncio
async def test_workspace_storage_allowed_when_permitted(self):
"""_validate_run_authorization allows 'workspace' storage when permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': True})
await registry.register(
run_id='run_workspace_storage_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_workspace_storage_auth',
'storage',
'workspace',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_workspace_storage_auth')
@pytest.mark.asyncio
async def test_workspace_storage_denied_when_not_permitted(self):
"""_validate_run_authorization denies 'workspace' storage when not permitted."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(storage={'plugin_storage': False, 'workspace_storage': False})
await registry.register(
run_id='run_workspace_storage_denied',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_workspace_storage_denied',
'storage',
'workspace',
mock_ap
)
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
await registry.unregister('run_workspace_storage_denied')
class TestFilePermissionValidation:
"""Tests for Host-side file permission validation via _validate_run_authorization.
Phase 6: GET_CONFIG_FILE action now validates file permissions
via _validate_run_authorization when run_id is present.
"""
@pytest.mark.asyncio
async def test_file_allowed_when_in_resources(self):
"""_validate_run_authorization allows file when in resources."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(files=[{'file_id': 'file_001'}])
await registry.register(
run_id='run_file_auth',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_file_auth',
'file',
'file_001',
mock_ap
)
assert session is not None
assert error is None
await registry.unregister('run_file_auth')
@pytest.mark.asyncio
async def test_file_denied_when_not_in_resources(self):
"""_validate_run_authorization denies file when not in resources."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(files=[{'file_id': 'file_001'}])
await registry.register(
run_id='run_file_denied',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_file_denied',
'file',
'file_999', # Not in resources
mock_ap
)
assert session is None
assert error is not None
assert 'not authorized' in error.message.lower()
await registry.unregister('run_file_denied')
class TestCallerPluginIdentityValidation:
"""Tests for caller_plugin_identity cross-plugin validation.
Phase 6: _validate_run_authorization now validates that the caller plugin
identity matches the session's plugin_identity, preventing cross-plugin
unauthorized access if one plugin tries to use another's run_id.
"""
@pytest.mark.asyncio
async def test_same_plugin_identity_allowed(self):
"""_validate_run_authorization allows when caller matches session."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_identity_match',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner', # Session owner
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
session, error = await _validate_run_authorization(
'run_identity_match',
'model',
'model_001',
mock_ap,
caller_plugin_identity='test/runner', # Caller is same plugin
)
assert session is not None
assert error is None
await registry.unregister('run_identity_match')
@pytest.mark.asyncio
async def test_different_plugin_identity_denied(self):
"""_validate_run_authorization denies when caller differs from session."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_identity_mismatch',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner', # Session owner
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
mock_ap.logger.warning = MagicMock()
session, error = await _validate_run_authorization(
'run_identity_mismatch',
'model',
'model_001',
mock_ap,
caller_plugin_identity='other/plugin', # Different plugin trying to use run_id
)
assert session is None
assert error is not None
assert 'mismatch' in error.message.lower()
await registry.unregister('run_identity_mismatch')
@pytest.mark.asyncio
async def test_no_caller_identity_allowed(self):
"""_validate_run_authorization allows when caller_plugin_identity not provided."""
# Backward compatibility: if caller_plugin_identity is None, skip identity check
from langbot.pkg.agent.runner.session_registry import get_session_registry
registry = get_session_registry()
resources = make_resources(models=[{'model_id': 'model_001'}])
await registry.register(
run_id='run_no_caller_identity',
runner_id='plugin:test/runner/default',
query_id=1,
plugin_identity='test/runner',
resources=resources,
)
from langbot.pkg.plugin.handler import _validate_run_authorization
mock_ap = MagicMock()
mock_ap.logger = MagicMock()
# caller_plugin_identity not provided (None)
session, error = await _validate_run_authorization(
'run_no_caller_identity',
'model',
'model_001',
mock_ap,
caller_plugin_identity=None, # Not provided
)
# Should pass (backward compat)
assert session is not None
assert error is None
await registry.unregister('run_no_caller_identity')
class TestBackwardCompatStorageNoRunId:
"""Tests for backward compatibility: storage actions without run_id.
Regular plugins (non-AgentRunner) don't have run_id and should
have unrestricted access to storage APIs.
"""
def test_storage_no_run_id_skips_validation(self):
"""Storage actions without run_id skip Host-side validation."""
# Handler.py: if run_id: ...validation...
# When run_id is None, validation is skipped
run_id = None
# Simulate handler logic: no run_id skips validation.
assert run_id is None
# Storage access unrestricted for regular plugins
assert run_id is None
def test_file_no_run_id_skips_validation(self):
"""GET_CONFIG_FILE without run_id skips Host-side validation."""
run_id = None
assert run_id is None
# File access unrestricted for regular plugins
assert run_id is None