Merge remote-tracking branch 'origin/fix/rag-runtime-safe-file-path' into validation/test-build-with-fixes

# Conflicts:
#	tests/unit_tests/rag/test_runtime_service.py
This commit is contained in:
huanghuoguoguo
2026-05-16 10:57:43 +08:00
2 changed files with 65 additions and 5 deletions

View File

@@ -1,8 +1,12 @@
from __future__ import annotations
import posixpath
from typing import Any
from langbot.pkg.core import app
import re
from typing import TYPE_CHECKING, Any
from urllib.parse import unquote
if TYPE_CHECKING:
from langbot.pkg.core import app
class RAGRuntimeService:
@@ -109,8 +113,17 @@ class RAGRuntimeService:
regardless of the underlying storage provider.
"""
# Validate storage_path to prevent path traversal
normalized = posixpath.normpath(storage_path)
if normalized.startswith('/') or '..' in normalized.split('/'):
decoded_path = unquote(storage_path).replace('\\', '/')
decoded_segments = decoded_path.split('/')
normalized = posixpath.normpath(decoded_path)
if (
not storage_path
or '\x00' in decoded_path
or normalized.startswith('/')
or '..' in decoded_segments
or '..' in normalized.split('/')
or re.match(r'^[A-Za-z]:/', normalized)
):
raise ValueError('Invalid storage path')
content_bytes = await self.ap.storage_mgr.storage_provider.load(normalized)
return content_bytes if content_bytes else b''

View File

@@ -471,4 +471,51 @@ class TestRAGRuntimeServiceGetFileStream:
# But the current implementation checks '..' in split('/')
# Let's test a simple valid path
await service.get_file_stream('knowledge/files/test.pdf')
mock_app.storage_mgr.storage_provider.load.assert_called()
mock_app.storage_mgr.storage_provider.load.assert_called()
@pytest.mark.asyncio
async def test_get_file_stream_normalizes_safe_relative_path(self):
"""Safe relative paths are normalized before loading."""
mock_app = self._create_mock_app()
mocks = self._make_rag_import_mocks()
with isolated_sys_modules(mocks):
from langbot.pkg.rag.service.runtime import RAGRuntimeService
service = RAGRuntimeService(mock_app)
await service.get_file_stream('knowledge/./files/doc.pdf')
mock_app.storage_mgr.storage_provider.load.assert_called_once_with('knowledge/files/doc.pdf')
@pytest.mark.asyncio
@pytest.mark.parametrize(
"storage_path",
[
"",
"../secret.txt",
"/absolute/path.txt",
"..\\secret.txt",
"nested\\..\\secret.txt",
"%2e%2e/secret.txt",
"nested/%2e%2e/secret.txt",
"C:\\secret.txt",
"safe/\x00file.txt",
],
)
async def test_get_file_stream_rejects_unsafe_paths(self, storage_path):
"""Traversal, absolute, encoded, and Windows-style paths are rejected."""
mock_app = self._create_mock_app()
mocks = self._make_rag_import_mocks()
with isolated_sys_modules(mocks):
from langbot.pkg.rag.service.runtime import RAGRuntimeService
service = RAGRuntimeService(mock_app)
with pytest.raises(ValueError, match='Invalid storage path'):
await service.get_file_stream(storage_path)
mock_app.storage_mgr.storage_provider.load.assert_not_called()