diff --git a/src/langbot/pkg/rag/service/runtime.py b/src/langbot/pkg/rag/service/runtime.py index a8bdf25e..0de1ae88 100644 --- a/src/langbot/pkg/rag/service/runtime.py +++ b/src/langbot/pkg/rag/service/runtime.py @@ -1,8 +1,12 @@ from __future__ import annotations import posixpath -from typing import Any -from langbot.pkg.core import app +import re +from typing import TYPE_CHECKING, Any +from urllib.parse import unquote + +if TYPE_CHECKING: + from langbot.pkg.core import app class RAGRuntimeService: @@ -109,8 +113,17 @@ class RAGRuntimeService: regardless of the underlying storage provider. """ # Validate storage_path to prevent path traversal - normalized = posixpath.normpath(storage_path) - if normalized.startswith('/') or '..' in normalized.split('/'): + decoded_path = unquote(storage_path).replace('\\', '/') + decoded_segments = decoded_path.split('/') + normalized = posixpath.normpath(decoded_path) + if ( + not storage_path + or '\x00' in decoded_path + or normalized.startswith('/') + or '..' in decoded_segments + or '..' in normalized.split('/') + or re.match(r'^[A-Za-z]:/', normalized) + ): raise ValueError('Invalid storage path') content_bytes = await self.ap.storage_mgr.storage_provider.load(normalized) return content_bytes if content_bytes else b'' diff --git a/tests/unit_tests/rag/test_runtime_service.py b/tests/unit_tests/rag/test_runtime_service.py index 1ae2831b..2e5dbcce 100644 --- a/tests/unit_tests/rag/test_runtime_service.py +++ b/tests/unit_tests/rag/test_runtime_service.py @@ -471,4 +471,51 @@ class TestRAGRuntimeServiceGetFileStream: # But the current implementation checks '..' in split('/') # Let's test a simple valid path await service.get_file_stream('knowledge/files/test.pdf') - mock_app.storage_mgr.storage_provider.load.assert_called() \ No newline at end of file + mock_app.storage_mgr.storage_provider.load.assert_called() + + @pytest.mark.asyncio + async def test_get_file_stream_normalizes_safe_relative_path(self): + """Safe relative paths are normalized before loading.""" + mock_app = self._create_mock_app() + + mocks = self._make_rag_import_mocks() + + with isolated_sys_modules(mocks): + from langbot.pkg.rag.service.runtime import RAGRuntimeService + + service = RAGRuntimeService(mock_app) + + await service.get_file_stream('knowledge/./files/doc.pdf') + + mock_app.storage_mgr.storage_provider.load.assert_called_once_with('knowledge/files/doc.pdf') + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "storage_path", + [ + "", + "../secret.txt", + "/absolute/path.txt", + "..\\secret.txt", + "nested\\..\\secret.txt", + "%2e%2e/secret.txt", + "nested/%2e%2e/secret.txt", + "C:\\secret.txt", + "safe/\x00file.txt", + ], + ) + async def test_get_file_stream_rejects_unsafe_paths(self, storage_path): + """Traversal, absolute, encoded, and Windows-style paths are rejected.""" + mock_app = self._create_mock_app() + + mocks = self._make_rag_import_mocks() + + with isolated_sys_modules(mocks): + from langbot.pkg.rag.service.runtime import RAGRuntimeService + + service = RAGRuntimeService(mock_app) + + with pytest.raises(ValueError, match='Invalid storage path'): + await service.get_file_stream(storage_path) + + mock_app.storage_mgr.storage_provider.load.assert_not_called()