diff --git a/src/langbot/pkg/api/http/controller/groups/monitoring.py b/src/langbot/pkg/api/http/controller/groups/monitoring.py index 11c9e272..65640b6e 100644 --- a/src/langbot/pkg/api/http/controller/groups/monitoring.py +++ b/src/langbot/pkg/api/http/controller/groups/monitoring.py @@ -46,6 +46,30 @@ class MonitoringRouterGroup(group.RouterGroup): return self.success(data=metrics) + @self.route('/token-statistics', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def get_token_statistics() -> str: + """Get detailed token usage statistics (summary, per-model, timeseries).""" + bot_ids = quart.request.args.getlist('botId') + pipeline_ids = quart.request.args.getlist('pipelineId') + start_time_str = quart.request.args.get('startTime') + end_time_str = quart.request.args.get('endTime') + bucket = quart.request.args.get('bucket', 'hour') + if bucket not in ('hour', 'day'): + bucket = 'hour' + + start_time = parse_iso_datetime(start_time_str) + end_time = parse_iso_datetime(end_time_str) + + stats = await self.ap.monitoring_service.get_token_statistics( + bot_ids=bot_ids if bot_ids else None, + pipeline_ids=pipeline_ids if pipeline_ids else None, + start_time=start_time, + end_time=end_time, + bucket=bucket, + ) + + return self.success(data=stats) + @self.route('/messages', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) async def get_messages() -> str: """Get message logs""" diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index 1ba66482..78761c6a 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -472,6 +472,185 @@ class MonitoringService: 'active_sessions': active_sessions, } + async def get_token_statistics( + self, + bot_ids: list[str] | None = None, + pipeline_ids: list[str] | None = None, + start_time: datetime.datetime | None = None, + end_time: datetime.datetime | None = None, + bucket: str = 'hour', + ) -> dict: + """Get detailed token usage statistics for production observability. + + Returns: + - summary: aggregate token counters and call/latency stats over the window + - by_model: per-model token + call breakdown (sorted by total tokens desc) + - timeseries: token usage bucketed by `bucket` ('hour' or 'day') + + Only successful LLM calls are counted toward token totals; error calls are + reported separately so a spike in failures is visible without polluting + token accounting. + """ + LLMCall = persistence_monitoring.MonitoringLLMCall + + conditions = [] + if bot_ids: + conditions.append(LLMCall.bot_id.in_(bot_ids)) + if pipeline_ids: + conditions.append(LLMCall.pipeline_id.in_(pipeline_ids)) + if start_time: + conditions.append(LLMCall.timestamp >= start_time) + if end_time: + conditions.append(LLMCall.timestamp <= end_time) + + def _apply(query): + if conditions: + query = query.where(sqlalchemy.and_(*conditions)) + return query + + # ---- Summary aggregates ---- + summary_query = _apply( + sqlalchemy.select( + sqlalchemy.func.count(LLMCall.id), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0), + sqlalchemy.func.sum( + sqlalchemy.case((LLMCall.status == 'success', 1), else_=0) + ), + sqlalchemy.func.sum( + sqlalchemy.case((LLMCall.status == 'error', 1), else_=0) + ), + # Count of successful calls that nonetheless recorded zero tokens — + # a data-quality signal that usage reporting may be broken upstream. + sqlalchemy.func.sum( + sqlalchemy.case( + (sqlalchemy.and_(LLMCall.status == 'success', LLMCall.total_tokens == 0), 1), + else_=0, + ) + ), + ) + ) + summary_result = await self.ap.persistence_mgr.execute_async(summary_query) + row = summary_result.first() + ( + total_calls, + total_input_tokens, + total_output_tokens, + total_tokens, + total_duration, + total_cost, + success_calls, + error_calls, + zero_token_success_calls, + ) = row if row else (0, 0, 0, 0, 0, 0.0, 0, 0, 0) + + total_calls = total_calls or 0 + success_calls = success_calls or 0 + error_calls = error_calls or 0 + zero_token_success_calls = zero_token_success_calls or 0 + + summary = { + 'total_calls': total_calls, + 'success_calls': success_calls, + 'error_calls': error_calls, + 'total_input_tokens': int(total_input_tokens or 0), + 'total_output_tokens': int(total_output_tokens or 0), + 'total_tokens': int(total_tokens or 0), + 'total_cost': round(float(total_cost or 0.0), 6), + 'avg_tokens_per_call': int((total_tokens or 0) / total_calls) if total_calls > 0 else 0, + 'avg_duration_ms': int((total_duration or 0) / total_calls) if total_calls > 0 else 0, + 'avg_tokens_per_second': round((total_output_tokens or 0) / (total_duration / 1000), 2) + if total_duration and total_duration > 0 + else 0, + 'zero_token_success_calls': zero_token_success_calls, + } + + # ---- Per-model breakdown ---- + by_model_query = _apply( + sqlalchemy.select( + LLMCall.model_name, + sqlalchemy.func.count(LLMCall.id), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0), + sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0), + sqlalchemy.func.sum( + sqlalchemy.case((LLMCall.status == 'error', 1), else_=0) + ), + ).group_by(LLMCall.model_name) + ) + by_model_result = await self.ap.persistence_mgr.execute_async(by_model_query) + by_model = [] + for mrow in by_model_result.all(): + ( + model_name, + m_calls, + m_in, + m_out, + m_total, + m_duration, + m_cost, + m_errors, + ) = mrow + m_calls = m_calls or 0 + by_model.append( + { + 'model_name': model_name, + 'calls': m_calls, + 'error_calls': m_errors or 0, + 'input_tokens': int(m_in or 0), + 'output_tokens': int(m_out or 0), + 'total_tokens': int(m_total or 0), + 'cost': round(float(m_cost or 0.0), 6), + 'avg_tokens_per_call': int((m_total or 0) / m_calls) if m_calls > 0 else 0, + 'avg_duration_ms': int((m_duration or 0) / m_calls) if m_calls > 0 else 0, + } + ) + by_model.sort(key=lambda x: x['total_tokens'], reverse=True) + + # ---- Time-bucketed series ---- + # Use a DB-agnostic bucketing approach: fetch (timestamp, tokens) rows and + # aggregate in Python. The window is bounded by the time filter, so this is + # cheap for typical dashboard ranges (hours/days). + series_query = _apply( + sqlalchemy.select( + LLMCall.timestamp, + LLMCall.input_tokens, + LLMCall.output_tokens, + LLMCall.total_tokens, + ).order_by(LLMCall.timestamp.asc()) + ) + series_result = await self.ap.persistence_mgr.execute_async(series_query) + + bucket_fmt = '%Y-%m-%d %H:00' if bucket == 'hour' else '%Y-%m-%d' + buckets: dict[str, dict] = {} + for srow in series_result.all(): + ts, s_in, s_out, s_total = srow + if ts is None: + continue + key = ts.strftime(bucket_fmt) + b = buckets.setdefault( + key, + {'bucket': key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'calls': 0}, + ) + b['input_tokens'] += int(s_in or 0) + b['output_tokens'] += int(s_out or 0) + b['total_tokens'] += int(s_total or 0) + b['calls'] += 1 + + timeseries = [buckets[k] for k in sorted(buckets.keys())] + + return { + 'summary': summary, + 'by_model': by_model, + 'timeseries': timeseries, + 'bucket': bucket, + } + async def get_messages( self, bot_ids: list[str] | None = None, diff --git a/src/langbot/pkg/core/bootutils/deps.py b/src/langbot/pkg/core/bootutils/deps.py index 1f653037..2cfd57e0 100644 --- a/src/langbot/pkg/core/bootutils/deps.py +++ b/src/langbot/pkg/core/bootutils/deps.py @@ -42,6 +42,7 @@ required_deps = { 'telegramify_markdown': 'telegramify-markdown', 'slack_sdk': 'slack_sdk', 'asyncpg': 'asyncpg', + 'litellm': 'litellm', } diff --git a/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py b/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py index 46f81179..e0eb3eb8 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py @@ -85,15 +85,42 @@ class LiteLLMRequester(requester.ProviderAPIRequester): # because it's typically internal model reasoning, not user-visible thinking return content or '' - def _extract_usage(self, response) -> dict: - """Extract usage info from LiteLLM response.""" - usage = response.usage + @staticmethod + def _normalize_usage(usage: typing.Any) -> dict: + """Normalize a LiteLLM/OpenAI usage object into a plain token dict. + + Handles several real-world shapes returned by different upstreams: + - object with ``prompt_tokens`` / ``completion_tokens`` / ``total_tokens`` attrs + - dict with the same keys + - missing ``total_tokens`` (derived from prompt + completion) + - ``None`` / partially-populated usage (defaults to 0) + """ + if usage is None: + return {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0} + + def _get(key: str) -> typing.Any: + if isinstance(usage, dict): + return usage.get(key) + return getattr(usage, key, None) + + prompt_tokens = _get('prompt_tokens') or 0 + completion_tokens = _get('completion_tokens') or 0 + total_tokens = _get('total_tokens') or 0 + + # Some providers omit total_tokens in streaming usage; derive it. + if not total_tokens: + total_tokens = prompt_tokens + completion_tokens + return { - 'prompt_tokens': usage.prompt_tokens or 0, - 'completion_tokens': usage.completion_tokens or 0, - 'total_tokens': usage.total_tokens or 0, + 'prompt_tokens': int(prompt_tokens), + 'completion_tokens': int(completion_tokens), + 'total_tokens': int(total_tokens), } + def _extract_usage(self, response) -> dict: + """Extract usage info from a non-streaming LiteLLM response.""" + return self._normalize_usage(getattr(response, 'usage', None)) + def _build_common_args(self, args: dict, include_retry_params: bool = True) -> dict: """Apply common requester config to args dict.""" if self.requester_cfg.get('base_url'): @@ -217,18 +244,21 @@ class LiteLLMRequester(requester.ProviderAPIRequester): try: response = await acompletion(**args) async for chunk in response: - # Check for usage chunk (final chunk with stream_options include_usage) - if hasattr(chunk, 'usage') and chunk.usage and (not hasattr(chunk, 'choices') or not chunk.choices): - usage_info = { - 'prompt_tokens': chunk.usage.prompt_tokens or 0, - 'completion_tokens': chunk.usage.completion_tokens or 0, - 'total_tokens': chunk.usage.total_tokens or 0, - } - if query: + # Capture usage whenever a chunk carries it. + # + # Important: many OpenAI-compatible gateways (e.g. new-api) and + # providers send the final usage payload in a chunk that STILL + # contains a (empty-delta) choice, not an empty `choices` list. + # The previous implementation only captured usage when `choices` + # was empty, so streamed calls always recorded 0 tokens. + # We therefore capture usage independently of `choices`, and then + # fall through to also process any content this chunk may carry. + if getattr(chunk, 'usage', None): + usage_info = self._normalize_usage(chunk.usage) + if query is not None: if query.variables is None: query.variables = {} query.variables['_stream_usage'] = usage_info - continue if not hasattr(chunk, 'choices') or not chunk.choices: continue diff --git a/tests/unit_tests/provider/test_litellmchat.py b/tests/unit_tests/provider/test_litellmchat.py index d80ea50e..ad8d9fd3 100644 --- a/tests/unit_tests/provider/test_litellmchat.py +++ b/tests/unit_tests/provider/test_litellmchat.py @@ -110,6 +110,147 @@ class TestExtractUsage: assert result['completion_tokens'] == 0 +class TestNormalizeUsage: + """Test _normalize_usage helper covering real-world usage shapes""" + + def test_none_usage(self): + """None usage -> all zeros (no crash)""" + result = litellmchat.LiteLLMRequester._normalize_usage(None) + assert result == {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0} + + def test_dict_usage(self): + """Usage given as a plain dict""" + result = litellmchat.LiteLLMRequester._normalize_usage( + {'prompt_tokens': 12, 'completion_tokens': 8, 'total_tokens': 20} + ) + assert result == {'prompt_tokens': 12, 'completion_tokens': 8, 'total_tokens': 20} + + def test_missing_total_is_derived(self): + """When total_tokens is absent/zero it is derived from prompt + completion""" + usage = Mock() + usage.prompt_tokens = 42 + usage.completion_tokens = 10 + usage.total_tokens = 0 + result = litellmchat.LiteLLMRequester._normalize_usage(usage) + assert result['total_tokens'] == 52 + + def test_partial_attrs_default_to_zero(self): + """Missing attributes default to 0 instead of raising""" + usage = Mock(spec=['prompt_tokens']) + usage.prompt_tokens = 5 + result = litellmchat.LiteLLMRequester._normalize_usage(usage) + assert result == {'prompt_tokens': 5, 'completion_tokens': 0, 'total_tokens': 5} + + +class TestInvokeLLMStreamUsage: + """Regression tests for streaming token usage capture. + + Real OpenAI-compatible gateways (e.g. new-api) send the final usage payload + in a chunk that still carries a (empty-delta) choice rather than an empty + `choices` list. The usage must be captured regardless, otherwise streamed + calls record 0 tokens. + """ + + def _make_chunk(self, *, content=None, finish_reason=None, usage=None, has_choice=True): + chunk = Mock() + if usage is not None: + chunk.usage = usage + else: + chunk.usage = None + if has_choice: + choice = Mock() + delta = Mock() + delta.model_dump = Mock( + return_value={'role': 'assistant', 'content': content, 'tool_calls': None} + ) + choice.delta = delta + choice.finish_reason = finish_reason + chunk.choices = [choice] + else: + chunk.choices = [] + return chunk + + @pytest.mark.asyncio + async def test_stream_usage_with_nonempty_choices(self): + """Usage chunk that still has a choice must populate _stream_usage.""" + import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query + import langbot_plugin.api.entities.builtin.provider.message as provider_message + + mock_ap = Mock() + mock_ap.tool_mgr = Mock() + mock_ap.tool_mgr.generate_tools_for_openai = AsyncMock(return_value=None) + requester = litellmchat.LiteLLMRequester(ap=mock_ap, config={}) + model = MockRuntimeModel('gpt-4o', 'test-api-key') + + usage = Mock() + usage.prompt_tokens = 24 + usage.completion_tokens = 48 + usage.total_tokens = 72 + + chunks = [ + self._make_chunk(content='Hello'), + self._make_chunk(content=None, finish_reason='stop'), + # Final usage chunk WITH a non-empty (empty-delta) choice — the bug case. + self._make_chunk(content=None, usage=usage, has_choice=True), + ] + + async def _aiter(*args, **kwargs): + for c in chunks: + yield c + + query = Mock(spec=pipeline_query.Query) + query.variables = {} + + messages = [provider_message.Message(role='user', content='Hi')] + + with patch.object(litellmchat, 'acompletion', new=AsyncMock(side_effect=lambda **kw: _aiter())): + collected = [] + async for ch in requester.invoke_llm_stream(query=query, model=model, messages=messages): + collected.append(ch) + + assert '_stream_usage' in query.variables + assert query.variables['_stream_usage']['prompt_tokens'] == 24 + assert query.variables['_stream_usage']['completion_tokens'] == 48 + assert query.variables['_stream_usage']['total_tokens'] == 72 + + @pytest.mark.asyncio + async def test_stream_usage_with_empty_choices(self): + """Usage chunk with empty choices list must also populate _stream_usage.""" + import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query + import langbot_plugin.api.entities.builtin.provider.message as provider_message + + mock_ap = Mock() + mock_ap.tool_mgr = Mock() + mock_ap.tool_mgr.generate_tools_for_openai = AsyncMock(return_value=None) + requester = litellmchat.LiteLLMRequester(ap=mock_ap, config={}) + model = MockRuntimeModel('gpt-4o', 'test-api-key') + + usage = Mock() + usage.prompt_tokens = 5 + usage.completion_tokens = 7 + usage.total_tokens = 12 + + chunks = [ + self._make_chunk(content='Hi there'), + self._make_chunk(content=None, finish_reason='stop'), + self._make_chunk(usage=usage, has_choice=False), + ] + + async def _aiter(*args, **kwargs): + for c in chunks: + yield c + + query = Mock(spec=pipeline_query.Query) + query.variables = {} + messages = [provider_message.Message(role='user', content='Hi')] + + with patch.object(litellmchat, 'acompletion', new=AsyncMock(side_effect=lambda **kw: _aiter())): + async for _ in requester.invoke_llm_stream(query=query, model=model, messages=messages): + pass + + assert query.variables['_stream_usage']['total_tokens'] == 12 + + class TestProcessThinkingContent: """Test _process_thinking_content method""" diff --git a/web/src/app/home/monitoring/components/TokenMonitoring.tsx b/web/src/app/home/monitoring/components/TokenMonitoring.tsx new file mode 100644 index 00000000..1571fdb0 --- /dev/null +++ b/web/src/app/home/monitoring/components/TokenMonitoring.tsx @@ -0,0 +1,462 @@ +import React, { useEffect, useMemo, useState, useCallback } from 'react'; +import { useTranslation } from 'react-i18next'; +import { + ComposedChart, + Area, + Bar, + XAxis, + YAxis, + CartesianGrid, + Tooltip, + ResponsiveContainer, + Legend, +} from 'recharts'; +import { + Coins, + ArrowDownToLine, + ArrowUpFromLine, + Gauge, + AlertTriangle, + TrendingUp, +} from 'lucide-react'; +import { httpClient } from '@/app/infra/http/HttpClient'; + +interface TokenSummary { + total_calls: number; + success_calls: number; + error_calls: number; + total_input_tokens: number; + total_output_tokens: number; + total_tokens: number; + total_cost: number; + avg_tokens_per_call: number; + avg_duration_ms: number; + avg_tokens_per_second: number; + zero_token_success_calls: number; +} + +interface TokenByModel { + model_name: string; + calls: number; + error_calls: number; + input_tokens: number; + output_tokens: number; + total_tokens: number; + cost: number; + avg_tokens_per_call: number; + avg_duration_ms: number; +} + +interface TokenTimeseriesPoint { + bucket: string; + input_tokens: number; + output_tokens: number; + total_tokens: number; + calls: number; +} + +interface TokenStatistics { + summary: TokenSummary; + by_model: TokenByModel[]; + timeseries: TokenTimeseriesPoint[]; + bucket: string; +} + +interface TokenMonitoringProps { + botIds?: string[]; + pipelineIds?: string[]; + startTime?: string; + endTime?: string; + /** Bumped by the parent to trigger a refetch on manual refresh. */ + refreshKey?: number; +} + +function formatNumber(n: number): string { + if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(2)}M`; + if (n >= 1_000) return `${(n / 1_000).toFixed(1)}K`; + return n.toLocaleString(); +} + +const TOOLTIP_STYLE: React.CSSProperties = { + backgroundColor: 'var(--card)', + border: '1px solid var(--border)', + borderRadius: '12px', + boxShadow: + '0 10px 15px -3px rgb(0 0 0 / 0.1), 0 4px 6px -4px rgb(0 0 0 / 0.1)', + fontSize: '13px', + padding: '12px', + color: 'var(--foreground)', +}; + +function MetricTile({ + icon, + label, + value, + sub, + accent, +}: { + icon: React.ReactNode; + label: string; + value: string; + sub?: string; + accent?: string; +}) { + return ( +
| + {t('monitoring.tokens.model')} + | ++ {t('monitoring.tokens.calls')} + | ++ {t('monitoring.tokens.inputTokens')} + | ++ {t('monitoring.tokens.outputTokens')} + | ++ {t('monitoring.tokens.totalTokens')} + | ++ {t('monitoring.tokens.avgPerCall')} + | ++ {t('monitoring.tokens.avgLatency')} + | +
|---|---|---|---|---|---|---|
|
+
+ {m.model_name}
+
+
+
+
+ |
+ + {m.calls} + {m.error_calls > 0 && ( + + {' '} + ({m.error_calls}✕) + + )} + | ++ {formatNumber(m.input_tokens)} + | ++ {formatNumber(m.output_tokens)} + | ++ {formatNumber(m.total_tokens)} + | ++ {formatNumber(m.avg_tokens_per_call)} + | ++ {m.avg_duration_ms}ms + | +