From 14330741ccc3d0e7480d0da06c9bd3710e1f68b2 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Sat, 16 May 2026 10:53:57 +0800 Subject: [PATCH] fix(rag): reject unsafe runtime file paths --- src/langbot/pkg/rag/service/runtime.py | 21 ++++-- tests/unit_tests/rag/test_runtime_service.py | 68 ++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 tests/unit_tests/rag/test_runtime_service.py diff --git a/src/langbot/pkg/rag/service/runtime.py b/src/langbot/pkg/rag/service/runtime.py index a8bdf25e..0de1ae88 100644 --- a/src/langbot/pkg/rag/service/runtime.py +++ b/src/langbot/pkg/rag/service/runtime.py @@ -1,8 +1,12 @@ from __future__ import annotations import posixpath -from typing import Any -from langbot.pkg.core import app +import re +from typing import TYPE_CHECKING, Any +from urllib.parse import unquote + +if TYPE_CHECKING: + from langbot.pkg.core import app class RAGRuntimeService: @@ -109,8 +113,17 @@ class RAGRuntimeService: regardless of the underlying storage provider. """ # Validate storage_path to prevent path traversal - normalized = posixpath.normpath(storage_path) - if normalized.startswith('/') or '..' in normalized.split('/'): + decoded_path = unquote(storage_path).replace('\\', '/') + decoded_segments = decoded_path.split('/') + normalized = posixpath.normpath(decoded_path) + if ( + not storage_path + or '\x00' in decoded_path + or normalized.startswith('/') + or '..' in decoded_segments + or '..' in normalized.split('/') + or re.match(r'^[A-Za-z]:/', normalized) + ): raise ValueError('Invalid storage path') content_bytes = await self.ap.storage_mgr.storage_provider.load(normalized) return content_bytes if content_bytes else b'' diff --git a/tests/unit_tests/rag/test_runtime_service.py b/tests/unit_tests/rag/test_runtime_service.py new file mode 100644 index 00000000..ba4d8c43 --- /dev/null +++ b/tests/unit_tests/rag/test_runtime_service.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from langbot.pkg.rag.service.runtime import RAGRuntimeService + + +class DummyStorageProvider: + def __init__(self, content: bytes | None = b'data'): + self.content = content + self.loaded_paths: list[str] = [] + + async def load(self, path: str): + self.loaded_paths.append(path) + return self.content + + +def make_service(storage_provider: DummyStorageProvider) -> RAGRuntimeService: + return RAGRuntimeService(SimpleNamespace(storage_mgr=SimpleNamespace(storage_provider=storage_provider))) + + +@pytest.mark.asyncio +async def test_get_file_stream_normalizes_safe_path(): + storage_provider = DummyStorageProvider() + service = make_service(storage_provider) + + content = await service.get_file_stream('safe/./nested/file.pdf') + + assert content == b'data' + assert storage_provider.loaded_paths == ['safe/nested/file.pdf'] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'storage_path', + [ + '', + '../secret.txt', + '/absolute/path.txt', + '..\\secret.txt', + 'nested\\..\\secret.txt', + '%2e%2e/secret.txt', + 'nested/%2e%2e/secret.txt', + 'C:\\secret.txt', + 'safe/\x00file.txt', + ], +) +async def test_get_file_stream_rejects_unsafe_paths(storage_path: str): + storage_provider = DummyStorageProvider() + service = make_service(storage_provider) + + with pytest.raises(ValueError, match='Invalid storage path'): + await service.get_file_stream(storage_path) + + assert storage_provider.loaded_paths == [] + + +@pytest.mark.asyncio +async def test_get_file_stream_returns_empty_bytes_for_missing_content(): + storage_provider = DummyStorageProvider(content=None) + service = make_service(storage_provider) + + content = await service.get_file_stream('safe/file.pdf') + + assert content == b'' + assert storage_provider.loaded_paths == ['safe/file.pdf']