mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-07 14:26:03 +00:00
fix(provider): capture streaming token usage; add token observability
The LiteLLM streaming requester only captured usage when a chunk had an empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and providers send the final usage payload in a chunk that still carries an empty-delta choice, so streamed calls always recorded 0 tokens in the monitoring logs/dashboard (non-streaming worked). - Capture stream usage whenever a chunk carries it, regardless of choices - Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens) - Register litellm in bootutils/deps.py (was in pyproject only) - Add MonitoringService.get_token_statistics + /monitoring/token-statistics endpoint: summary, per-model breakdown, token timeseries, and a zero-token-success data-quality signal - Add TokenMonitoring dashboard tab (summary tiles, stacked token chart, per-model table) + i18n (en/zh) - Regression tests for stream usage capture and usage normalization Verified end-to-end against a real OpenAI-compatible endpoint with gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both streaming and non-streaming paths.
This commit is contained in:
@@ -110,6 +110,147 @@ class TestExtractUsage:
|
||||
assert result['completion_tokens'] == 0
|
||||
|
||||
|
||||
class TestNormalizeUsage:
|
||||
"""Test _normalize_usage helper covering real-world usage shapes"""
|
||||
|
||||
def test_none_usage(self):
|
||||
"""None usage -> all zeros (no crash)"""
|
||||
result = litellmchat.LiteLLMRequester._normalize_usage(None)
|
||||
assert result == {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
|
||||
|
||||
def test_dict_usage(self):
|
||||
"""Usage given as a plain dict"""
|
||||
result = litellmchat.LiteLLMRequester._normalize_usage(
|
||||
{'prompt_tokens': 12, 'completion_tokens': 8, 'total_tokens': 20}
|
||||
)
|
||||
assert result == {'prompt_tokens': 12, 'completion_tokens': 8, 'total_tokens': 20}
|
||||
|
||||
def test_missing_total_is_derived(self):
|
||||
"""When total_tokens is absent/zero it is derived from prompt + completion"""
|
||||
usage = Mock()
|
||||
usage.prompt_tokens = 42
|
||||
usage.completion_tokens = 10
|
||||
usage.total_tokens = 0
|
||||
result = litellmchat.LiteLLMRequester._normalize_usage(usage)
|
||||
assert result['total_tokens'] == 52
|
||||
|
||||
def test_partial_attrs_default_to_zero(self):
|
||||
"""Missing attributes default to 0 instead of raising"""
|
||||
usage = Mock(spec=['prompt_tokens'])
|
||||
usage.prompt_tokens = 5
|
||||
result = litellmchat.LiteLLMRequester._normalize_usage(usage)
|
||||
assert result == {'prompt_tokens': 5, 'completion_tokens': 0, 'total_tokens': 5}
|
||||
|
||||
|
||||
class TestInvokeLLMStreamUsage:
|
||||
"""Regression tests for streaming token usage capture.
|
||||
|
||||
Real OpenAI-compatible gateways (e.g. new-api) send the final usage payload
|
||||
in a chunk that still carries a (empty-delta) choice rather than an empty
|
||||
`choices` list. The usage must be captured regardless, otherwise streamed
|
||||
calls record 0 tokens.
|
||||
"""
|
||||
|
||||
def _make_chunk(self, *, content=None, finish_reason=None, usage=None, has_choice=True):
|
||||
chunk = Mock()
|
||||
if usage is not None:
|
||||
chunk.usage = usage
|
||||
else:
|
||||
chunk.usage = None
|
||||
if has_choice:
|
||||
choice = Mock()
|
||||
delta = Mock()
|
||||
delta.model_dump = Mock(
|
||||
return_value={'role': 'assistant', 'content': content, 'tool_calls': None}
|
||||
)
|
||||
choice.delta = delta
|
||||
choice.finish_reason = finish_reason
|
||||
chunk.choices = [choice]
|
||||
else:
|
||||
chunk.choices = []
|
||||
return chunk
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_usage_with_nonempty_choices(self):
|
||||
"""Usage chunk that still has a choice must populate _stream_usage."""
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
mock_ap = Mock()
|
||||
mock_ap.tool_mgr = Mock()
|
||||
mock_ap.tool_mgr.generate_tools_for_openai = AsyncMock(return_value=None)
|
||||
requester = litellmchat.LiteLLMRequester(ap=mock_ap, config={})
|
||||
model = MockRuntimeModel('gpt-4o', 'test-api-key')
|
||||
|
||||
usage = Mock()
|
||||
usage.prompt_tokens = 24
|
||||
usage.completion_tokens = 48
|
||||
usage.total_tokens = 72
|
||||
|
||||
chunks = [
|
||||
self._make_chunk(content='Hello'),
|
||||
self._make_chunk(content=None, finish_reason='stop'),
|
||||
# Final usage chunk WITH a non-empty (empty-delta) choice — the bug case.
|
||||
self._make_chunk(content=None, usage=usage, has_choice=True),
|
||||
]
|
||||
|
||||
async def _aiter(*args, **kwargs):
|
||||
for c in chunks:
|
||||
yield c
|
||||
|
||||
query = Mock(spec=pipeline_query.Query)
|
||||
query.variables = {}
|
||||
|
||||
messages = [provider_message.Message(role='user', content='Hi')]
|
||||
|
||||
with patch.object(litellmchat, 'acompletion', new=AsyncMock(side_effect=lambda **kw: _aiter())):
|
||||
collected = []
|
||||
async for ch in requester.invoke_llm_stream(query=query, model=model, messages=messages):
|
||||
collected.append(ch)
|
||||
|
||||
assert '_stream_usage' in query.variables
|
||||
assert query.variables['_stream_usage']['prompt_tokens'] == 24
|
||||
assert query.variables['_stream_usage']['completion_tokens'] == 48
|
||||
assert query.variables['_stream_usage']['total_tokens'] == 72
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_usage_with_empty_choices(self):
|
||||
"""Usage chunk with empty choices list must also populate _stream_usage."""
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
mock_ap = Mock()
|
||||
mock_ap.tool_mgr = Mock()
|
||||
mock_ap.tool_mgr.generate_tools_for_openai = AsyncMock(return_value=None)
|
||||
requester = litellmchat.LiteLLMRequester(ap=mock_ap, config={})
|
||||
model = MockRuntimeModel('gpt-4o', 'test-api-key')
|
||||
|
||||
usage = Mock()
|
||||
usage.prompt_tokens = 5
|
||||
usage.completion_tokens = 7
|
||||
usage.total_tokens = 12
|
||||
|
||||
chunks = [
|
||||
self._make_chunk(content='Hi there'),
|
||||
self._make_chunk(content=None, finish_reason='stop'),
|
||||
self._make_chunk(usage=usage, has_choice=False),
|
||||
]
|
||||
|
||||
async def _aiter(*args, **kwargs):
|
||||
for c in chunks:
|
||||
yield c
|
||||
|
||||
query = Mock(spec=pipeline_query.Query)
|
||||
query.variables = {}
|
||||
messages = [provider_message.Message(role='user', content='Hi')]
|
||||
|
||||
with patch.object(litellmchat, 'acompletion', new=AsyncMock(side_effect=lambda **kw: _aiter())):
|
||||
async for _ in requester.invoke_llm_stream(query=query, model=model, messages=messages):
|
||||
pass
|
||||
|
||||
assert query.variables['_stream_usage']['total_tokens'] == 12
|
||||
|
||||
|
||||
class TestProcessThinkingContent:
|
||||
"""Test _process_thinking_content method"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user