Compare commits

..

1 Commits

Author SHA1 Message Date
huanghuoguoguo
31f4bc1ad6 fix(api): validate api key prefix 2026-05-16 11:03:17 +08:00
4 changed files with 45 additions and 28 deletions

View File

@@ -52,6 +52,9 @@ class ApiKeyService:
async def verify_api_key(self, key: str) -> bool:
"""Verify if an API key is valid"""
if not isinstance(key, str) or not key.startswith('lbk_'):
return False
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(apikey.ApiKey).where(apikey.ApiKey.key == key)
)

View File

@@ -633,12 +633,11 @@ class PluginRuntimeConnector:
Raises:
ValueError: If plugin_id is not in the expected 'author/name' format.
"""
segments = plugin_id.split('/')
if len(segments) != 2 or not all(segments):
if '/' not in plugin_id:
raise ValueError(
f"Invalid plugin_id format: '{plugin_id}'. Expected 'author/name' format (e.g. 'langbot/rag-engine')."
)
return segments[0], segments[1]
return plugin_id.split('/', 1)
async def call_rag_ingest(self, plugin_id: str, context_data: dict[str, Any]) -> dict[str, Any]:
"""Call plugin to ingest document.

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
from langbot.pkg.api.http.service.apikey import ApiKeyService
@pytest.mark.asyncio
@pytest.mark.parametrize('api_key', [None, 123, b'lbk_bytes', '', 'plain_key', ' LBK_bad', 'sk-lbk_fake'])
async def test_verify_api_key_rejects_non_lbk_keys_without_db_query(api_key):
persistence_mgr = SimpleNamespace(execute_async=AsyncMock())
service = ApiKeyService(SimpleNamespace(persistence_mgr=persistence_mgr))
result = await service.verify_api_key(api_key)
assert result is False
persistence_mgr.execute_async.assert_not_awaited()
@pytest.mark.asyncio
@pytest.mark.parametrize(
('db_row', 'expected'),
[
(object(), True),
(None, False),
],
)
async def test_verify_api_key_keeps_db_validation_for_lbk_keys(db_row, expected):
query_result = Mock()
query_result.first.return_value = db_row
persistence_mgr = SimpleNamespace(execute_async=AsyncMock(return_value=query_result))
service = ApiKeyService(SimpleNamespace(persistence_mgr=persistence_mgr))
result = await service.verify_api_key('lbk_valid_format')
assert result is expected
persistence_mgr.execute_async.assert_awaited_once()

View File

@@ -1,25 +0,0 @@
"""Test plugin ID parsing validation."""
import pytest
from src.langbot.pkg.plugin.connector import PluginRuntimeConnector
def test_parse_plugin_id_accepts_author_name():
assert PluginRuntimeConnector._parse_plugin_id('langbot/rag-engine') == ('langbot', 'rag-engine')
@pytest.mark.parametrize(
'plugin_id',
[
'',
'author',
'author/',
'/name',
'author/name/extra',
'/',
],
)
def test_parse_plugin_id_rejects_malformed_ids(plugin_id):
with pytest.raises(ValueError, match='Expected'):
PluginRuntimeConnector._parse_plugin_id(plugin_id)