diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index 68cd28971..8b2c4762e 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -1318,10 +1318,7 @@ class MonitoringService: query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset) result = await self.ap.persistence_mgr.execute_async(query) - traces = [ - self._serialize_trace(row[0] if isinstance(row, tuple) else row) - for row in result.all() - ] + traces = [self._serialize_trace(row[0] if isinstance(row, tuple) else row) for row in result.all()] return traces, total async def get_trace_details(self, trace_id: str) -> dict: @@ -1341,10 +1338,7 @@ class MonitoringService: .order_by(persistence_monitoring.MonitoringSpan.started_at.asc()) ) span_result = await self.ap.persistence_mgr.execute_async(span_query) - spans = [ - self._serialize_span(row[0] if isinstance(row, tuple) else row) - for row in span_result.all() - ] + spans = [self._serialize_span(row[0] if isinstance(row, tuple) else row) for row in span_result.all()] return { 'trace_id': trace_id, diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index d08e411de..62df2669a 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -263,7 +263,9 @@ class RuntimePipeline: result = await result if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果 - span_result_type = str(result.result_type.value if hasattr(result.result_type, 'value') else result.result_type) + span_result_type = str( + result.result_type.value if hasattr(result.result_type, 'value') else result.result_type + ) self.ap.logger.debug( f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}' ) @@ -290,7 +292,9 @@ class RuntimePipeline: await self._check_output(query, sub_result) if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT: - self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') + self.ap.logger.debug( + f'Stage {stage_container.inst_name} interrupted query {query.query_id}' + ) break elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE: query = sub_result.new_query diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 66a343bd5..4ee815411 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -757,7 +757,9 @@ class RuntimeConnectionHandler(handler.Handler): 'sender_id': str(query.sender_id), '_trace_context': { 'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None, - 'parent_span_id': query.variables.get('_monitoring_root_span_id') if query.variables else None, + 'parent_span_id': query.variables.get('_monitoring_root_span_id') + if query.variables + else None, 'message_id': query.variables.get('_monitoring_message_id') if query.variables else None, 'query_id': query.query_id, 'session_id': session_name, diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 640af8321..00672bbf3 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -270,7 +270,9 @@ class LocalAgentRunner(runner.RequestRunner): 'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}', '_trace_context': { 'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None, - 'parent_span_id': query.variables.get('_monitoring_root_span_id') if query.variables else None, + 'parent_span_id': query.variables.get('_monitoring_root_span_id') + if query.variables + else None, 'message_id': query.variables.get('_monitoring_message_id') if query.variables else None, 'query_id': query.query_id, 'session_id': f'{query.launcher_type.value}_{query.launcher_id}', diff --git a/tests/integration/api/test_monitoring.py b/tests/integration/api/test_monitoring.py index d7b4af898..57afdb722 100644 --- a/tests/integration/api/test_monitoring.py +++ b/tests/integration/api/test_monitoring.py @@ -8,6 +8,7 @@ Run: uv run pytest tests/integration/api/test_monitoring.py -q from __future__ import annotations +import datetime import pytest from unittest.mock import MagicMock, AsyncMock, Mock @@ -84,11 +85,11 @@ def fake_monitoring_app(): app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10)) app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1)) app.monitoring_service.get_trace_details = AsyncMock( - return_value={ - 'found': True, - 'trace_id': 'trace-1', - 'trace': {'trace_id': 'trace-1'}, - 'spans': [], + side_effect=lambda trace_id: { + 'found': trace_id == 'trace-1', + 'trace_id': trace_id, + 'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None, + 'spans': [] if trace_id == 'trace-1' else None, } ) app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20)) @@ -266,6 +267,51 @@ class TestMonitoringDetailsEndpoints: assert response.status_code == 200 +@pytest.mark.usefixtures('mock_circular_import_chain') +class TestMonitoringTraceEndpoints: + """Tests for trace list and detail endpoints.""" + + @pytest.mark.asyncio + async def test_get_traces_forwards_filters(self, quart_test_client, fake_monitoring_app): + """GET /api/v1/monitoring/traces forwards filters to service.""" + response = await quart_test_client.get( + '/api/v1/monitoring/traces' + '?botId=bot-1' + '&pipelineId=pipeline-1' + '&sessionId=session-1' + '&status=success' + '&startTime=2026-01-01T00:00:00Z' + '&endTime=2026-01-02T00:00:00Z' + '&limit=25' + '&offset=5', + headers={'Authorization': 'Bearer test_token'}, + ) + + assert response.status_code == 200 + data = await response.get_json() + assert data['data']['traces'] == [{'trace_id': 'trace-1'}] + assert data['data']['total'] == 1 + fake_monitoring_app.monitoring_service.get_traces.assert_awaited_with( + bot_ids=['bot-1'], + pipeline_ids=['pipeline-1'], + session_ids=['session-1'], + statuses=['success'], + start_time=datetime.datetime(2026, 1, 1, 0, 0), + end_time=datetime.datetime(2026, 1, 2, 0, 0), + limit=25, + offset=5, + ) + + @pytest.mark.asyncio + async def test_get_trace_details_not_found(self, quart_test_client): + """GET /api/v1/monitoring/traces/{id} returns 404 when missing.""" + response = await quart_test_client.get( + '/api/v1/monitoring/traces/trace-missing', headers={'Authorization': 'Bearer test_token'} + ) + + assert response.status_code == 404 + + @pytest.mark.usefixtures('mock_circular_import_chain') class TestMonitoringFeedbackEndpoints: """Tests for feedback endpoints.""" diff --git a/tests/unit_tests/api/service/test_monitoring_service.py b/tests/unit_tests/api/service/test_monitoring_service.py new file mode 100644 index 000000000..590c3f55b --- /dev/null +++ b/tests/unit_tests/api/service/test_monitoring_service.py @@ -0,0 +1,207 @@ +"""Unit tests for MonitoringService trace observability.""" + +from __future__ import annotations + +import datetime +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest +import sqlalchemy +from sqlalchemy.ext.asyncio import create_async_engine + +from langbot.pkg.api.http.service.monitoring import MonitoringService +from langbot.pkg.entity.persistence.base import Base +from langbot.pkg.entity.persistence import monitoring as persistence_monitoring + + +pytestmark = pytest.mark.asyncio + + +class _SQLitePersistence: + def __init__(self, engine): + self._engine = engine + + def get_db_engine(self): + return self._engine + + async def execute_async(self, *args, **kwargs): + async with self._engine.connect() as conn: + result = await conn.execute(*args, **kwargs) + await conn.commit() + return result + + def serialize_model(self, model, data, masked_columns=None): + masked_columns = masked_columns or [] + return { + column.name: getattr(data, column.name).isoformat() + if isinstance(getattr(data, column.name), datetime.datetime) + else getattr(data, column.name) + for column in model.__table__.columns + if column.name not in masked_columns + } + + +@pytest.fixture +async def monitoring_service(tmp_path): + engine = create_async_engine(f'sqlite+aiosqlite:///{tmp_path / "monitoring.db"}') + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + ap = SimpleNamespace( + persistence_mgr=_SQLitePersistence(engine), + instance_config=SimpleNamespace(data={'database': {'use': 'sqlite'}}), + logger=Mock(), + ) + service = MonitoringService(ap) + yield service + await engine.dispose() + + +async def test_trace_lifecycle_records_spans_and_returns_details(monitoring_service): + started_at = datetime.datetime(2026, 1, 1, 12, 0, 0) + ended_at = started_at + datetime.timedelta(milliseconds=125) + + trace_id = await monitoring_service.start_trace( + trace_id='trace-test', + name='Pipeline query', + bot_id='bot-1', + bot_name='Bot', + pipeline_id='pipeline-1', + pipeline_name='Default', + session_id='session-1', + message_id='message-1', + query_id=42, + attributes={'source': 'unit-test'}, + ) + assert trace_id == 'trace-test' + + root_span_id = await monitoring_service.record_span( + trace_id=trace_id, + span_id='span-root', + name='Pipeline', + kind='pipeline', + status='completed', + started_at=started_at, + ended_at=ended_at, + message_id='message-1', + session_id='session-1', + bot_id='bot-1', + pipeline_id='pipeline-1', + attributes={'stage_count': 2}, + ) + await monitoring_service.record_span( + trace_id=trace_id, + span_id='span-rag', + parent_span_id=root_span_id, + name='RAG retrieval', + kind='rag.retrieval', + status='failed', + started_at=started_at + datetime.timedelta(seconds=1), + duration=12.7, + attributes={'top_k': 5}, + error_message='vector store timeout', + ) + await monitoring_service.finish_trace( + trace_id, + status='completed', + duration=250, + message_id='message-final', + attributes={'result_type': 'reply'}, + ) + + traces, total = await monitoring_service.get_traces( + bot_ids=['bot-1'], + pipeline_ids=['pipeline-1'], + session_ids=['session-1'], + statuses=['success'], + limit=10, + offset=0, + ) + + assert total == 1 + assert traces[0]['trace_id'] == trace_id + assert traces[0]['status'] == 'success' + assert traces[0]['message_id'] == 'message-final' + assert traces[0]['query_id'] == '42' + assert traces[0]['attributes'] == {'result_type': 'reply'} + + details = await monitoring_service.get_trace_details(trace_id) + assert details['found'] is True + assert details['trace']['trace_id'] == trace_id + assert [span['span_id'] for span in details['spans']] == ['span-root', 'span-rag'] + assert details['spans'][0]['status'] == 'success' + assert details['spans'][0]['duration'] == 125 + assert details['spans'][0]['attributes'] == {'stage_count': 2} + assert details['spans'][1]['status'] == 'error' + assert details['spans'][1]['duration'] == 13 + assert details['spans'][1]['parent_span_id'] == 'span-root' + assert details['spans'][1]['error_message'] == 'vector store timeout' + + +async def test_get_trace_details_returns_not_found_for_missing_trace(monitoring_service): + details = await monitoring_service.get_trace_details('trace-missing') + + assert details == {'trace_id': 'trace-missing', 'found': False} + + +async def test_cleanup_expired_records_includes_traces_and_spans(monitoring_service): + old_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(days=30) + recent_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) + + await monitoring_service.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_monitoring.MonitoringTrace), + [ + { + 'trace_id': 'trace-old', + 'started_at': old_time, + 'ended_at': old_time, + 'duration': 10, + 'status': 'success', + 'name': 'Old trace', + }, + { + 'trace_id': 'trace-recent', + 'started_at': recent_time, + 'ended_at': recent_time, + 'duration': 10, + 'status': 'success', + 'name': 'Recent trace', + }, + ], + ) + await monitoring_service.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_monitoring.MonitoringSpan), + [ + { + 'span_id': 'span-old', + 'trace_id': 'trace-old', + 'name': 'Old span', + 'kind': 'pipeline', + 'status': 'success', + 'started_at': old_time, + 'ended_at': old_time, + }, + { + 'span_id': 'span-recent', + 'trace_id': 'trace-recent', + 'name': 'Recent span', + 'kind': 'pipeline', + 'status': 'success', + 'started_at': recent_time, + 'ended_at': recent_time, + }, + ], + ) + + monitoring_service._release_sqlite_space = AsyncMock() + + deleted = await monitoring_service.cleanup_expired_records(retention_days=7, batch_size=1) + + assert deleted['monitoring_traces'] == 1 + assert deleted['monitoring_spans'] == 1 + monitoring_service._release_sqlite_space.assert_awaited_once() + + remaining = await monitoring_service.get_trace_details('trace-recent') + assert remaining['found'] is True + assert remaining['spans'][0]['span_id'] == 'span-recent' diff --git a/tests/unit_tests/api/test_monitoring_routes.py b/tests/unit_tests/api/test_monitoring_routes.py new file mode 100644 index 000000000..8c45afe9f --- /dev/null +++ b/tests/unit_tests/api/test_monitoring_routes.py @@ -0,0 +1,111 @@ +"""Unit tests for monitoring trace HTTP routes.""" + +from __future__ import annotations + +import datetime +from unittest.mock import AsyncMock, Mock + +import pytest +import quart + +from tests.factories import FakeApp +from tests.utils.import_isolation import MockLifecycleControlScope, isolated_sys_modules + + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture +async def monitoring_client(): + mock_app = Mock() + mock_app.Application = type('FakeMinimalApplication', (), {}) + mock_entities = Mock() + mock_entities.LifecycleControlScope = MockLifecycleControlScope + + clear = [ + 'langbot.pkg.api.http.controller.group', + 'langbot.pkg.api.http.controller.groups', + 'langbot.pkg.api.http.controller.groups.monitoring', + 'langbot.pkg.api.http.controller.main', + ] + + app = FakeApp() + app.user_service = Mock() + app.user_service.verify_jwt_token = AsyncMock(return_value='test@example.com') + app.user_service.get_user_by_email = AsyncMock(return_value=Mock(email='test@example.com')) + + app.monitoring_service = Mock() + app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1)) + app.monitoring_service.get_trace_details = AsyncMock( + side_effect=lambda trace_id: { + 'found': trace_id == 'trace-1', + 'trace_id': trace_id, + 'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None, + 'spans': [] if trace_id == 'trace-1' else None, + } + ) + + with isolated_sys_modules( + mocks={ + 'langbot.pkg.core.app': mock_app, + 'langbot.pkg.core.entities': mock_entities, + }, + clear=clear, + ): + from langbot.pkg.api.http.controller.groups.monitoring import MonitoringRouterGroup + + quart_app = quart.Quart(__name__) + group = MonitoringRouterGroup(app, quart_app) + await group.initialize() + + yield app, quart_app.test_client() + + +async def test_get_traces_route_forwards_filters(monitoring_client): + app, client = monitoring_client + + response = await client.get( + '/api/v1/monitoring/traces' + '?botId=bot-1' + '&pipelineId=pipeline-1' + '&sessionId=session-1' + '&status=success' + '&startTime=2026-01-01T00:00:00Z' + '&endTime=2026-01-02T00:00:00Z' + '&limit=25' + '&offset=5', + headers={'Authorization': 'Bearer test_token'}, + ) + + assert response.status_code == 200 + data = await response.get_json() + assert data['data'] == { + 'traces': [{'trace_id': 'trace-1'}], + 'total': 1, + 'limit': 25, + 'offset': 5, + } + app.monitoring_service.get_traces.assert_awaited_once_with( + bot_ids=['bot-1'], + pipeline_ids=['pipeline-1'], + session_ids=['session-1'], + statuses=['success'], + start_time=datetime.datetime(2026, 1, 1, 0, 0), + end_time=datetime.datetime(2026, 1, 2, 0, 0), + limit=25, + offset=5, + ) + + +async def test_get_trace_details_route_returns_404_for_missing_trace(monitoring_client): + _app, client = monitoring_client + + response = await client.get( + '/api/v1/monitoring/traces/trace-missing', + headers={'Authorization': 'Bearer test_token'}, + ) + + assert response.status_code == 404 + data = await response.get_json() + assert data['code'] == -1 + assert data['msg'] == 'Trace trace-missing not found' diff --git a/tests/unit_tests/persistence/test_monitoring_trace_migration.py b/tests/unit_tests/persistence/test_monitoring_trace_migration.py new file mode 100644 index 000000000..1364c93b8 --- /dev/null +++ b/tests/unit_tests/persistence/test_monitoring_trace_migration.py @@ -0,0 +1,87 @@ +"""Unit tests for the monitoring trace Alembic migration.""" + +from __future__ import annotations + +from importlib import import_module + + +class _FakeInspector: + def __init__(self, tables): + self._tables = tables + + def get_table_names(self): + return list(self._tables) + + +class _FakeOp: + def __init__(self): + self.created_tables = [] + self.created_indexes = [] + self.dropped_tables = [] + + def get_bind(self): + return object() + + def create_table(self, table_name, *columns): + self.created_tables.append((table_name, columns)) + + def create_index(self, index_name, table_name, columns): + self.created_indexes.append((index_name, table_name, columns)) + + def drop_table(self, table_name): + self.dropped_tables.append(table_name) + + +def _migration_module(): + return import_module('langbot.pkg.persistence.alembic.versions.0006_monitoring_traces') + + +def test_upgrade_creates_monitoring_trace_tables_and_indexes(monkeypatch): + migration = _migration_module() + fake_op = _FakeOp() + + monkeypatch.setattr(migration, 'op', fake_op) + monkeypatch.setattr(migration.sa, 'inspect', lambda _conn: _FakeInspector(tables=set())) + + migration.upgrade() + + assert [table_name for table_name, _columns in fake_op.created_tables] == [ + 'monitoring_traces', + 'monitoring_spans', + ] + assert ('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at']) in fake_op.created_indexes + assert ('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id']) in fake_op.created_indexes + assert ('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id']) in fake_op.created_indexes + + +def test_upgrade_skips_existing_monitoring_trace_tables(monkeypatch): + migration = _migration_module() + fake_op = _FakeOp() + + monkeypatch.setattr(migration, 'op', fake_op) + monkeypatch.setattr( + migration.sa, + 'inspect', + lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}), + ) + + migration.upgrade() + + assert fake_op.created_tables == [] + assert fake_op.created_indexes == [] + + +def test_downgrade_drops_spans_before_traces(monkeypatch): + migration = _migration_module() + fake_op = _FakeOp() + + monkeypatch.setattr(migration, 'op', fake_op) + monkeypatch.setattr( + migration.sa, + 'inspect', + lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}), + ) + + migration.downgrade() + + assert fake_op.dropped_tables == ['monitoring_spans', 'monitoring_traces']