mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-14 17:56:03 +00:00
refactor(provider): use LiteLLM as unified LLM requester backend (#2150)
* refactor(provider): use LiteLLM as unified LLM requester backend
- Replace 23+ individual requester implementations with unified litellmchat.py
- Add litellm_provider field to 27 YAML manifests for provider routing
- Delete redundant requester subclasses
- Add unit tests for LiteLLMRequester (29 tests)
- Fix num_retries parameter name (was max_retries)
- Fix exception handling order for subclass exceptions
LiteLLM provides unified API for 100+ providers, eliminating need for
provider-specific requesters.
* fix: ruff format provider.py
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* refactor(provider): simplify LiteLLM requester usage handling
- Remove unused Anthropic-specific tool schema generation
- Share completion argument construction between normal and streaming calls
- Use LiteLLM/OpenAI native usage fields for monitoring
- Collect stream token usage from LiteLLM stream_options
- Update LiteLLM requester tests for unified usage fields
* restore: restore deleted provider requester files
Restore individual provider requester implementations that were
removed in de61b5d3. These files coexist with the unified
litellmchat.py backend.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* feat: update requesters and improve provider selection UI
- Added `litellm_provider` field to various requesters' YAML configurations.
- Removed obsolete Python requester files for OpenRouter, PPIO, QHAIGC, ShengSuanYun, SiliconFlow, Space, TokenPony, VolcArk, and Xai.
- Introduced new requesters for Tencent and Together AI with corresponding YAML configurations and SVG icons.
- Enhanced the ProviderForm component to include a searchable dropdown for selecting providers, improving user experience.
- Updated localization files to include search provider text for both English and Chinese.
* fix(provider): align litellm rebase with master
* fix(provider): capture streaming token usage; add token observability
The LiteLLM streaming requester only captured usage when a chunk had an
empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and
providers send the final usage payload in a chunk that still carries an
empty-delta choice, so streamed calls always recorded 0 tokens in the
monitoring logs/dashboard (non-streaming worked).
- Capture stream usage whenever a chunk carries it, regardless of choices
- Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens)
- Register litellm in bootutils/deps.py (was in pyproject only)
- Add MonitoringService.get_token_statistics + /monitoring/token-statistics
endpoint: summary, per-model breakdown, token timeseries, and a
zero-token-success data-quality signal
- Add TokenMonitoring dashboard tab (summary tiles, stacked token chart,
per-model table) + i18n (en/zh)
- Regression tests for stream usage capture and usage normalization
Verified end-to-end against a real OpenAI-compatible endpoint with
gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both
streaming and non-streaming paths.
* refactor(provider): simplify litellm capabilities
* style: simplify wrapped expressions
* feat(models): persist context metadata
* fix(provider): handle dict embeddings and openai-compatible rerank in LiteLLMRequester
- invoke_embedding: support both object- and dict-shaped response.data
entries (OpenAI-compatible gateways like new-api return dicts)
- invoke_rerank: litellm.arerank rejects the 'openai' provider, so for
openai-compatible (or unspecified) providers call the standard
Jina/Cohere-style POST /v1/rerank endpoint directly over HTTP
- accept both 'relevance_score' and 'score' fields in rerank results
- add unit tests for the openai-compatible HTTP rerank path
* feat(provider): enforce requester support_type when adding models
- frontend: AddModelPopover only shows model-type tabs (llm/embedding/
rerank) that the provider's requester declares in its manifest
support_type; ModelsDialog fetches requester manifests and maps
requester -> support_type, passed down through ProviderCard
- backend: add _validate_provider_supports guard in create_llm_model /
create_embedding_model / create_rerank_model so a model cannot be
attached to a provider whose requester does not support that type,
even if the frontend restriction is bypassed (manifests without
support_type are allowed for backward compatibility)
- manifests: correct support_type for providers that do not offer all
three model types:
- llm only: anthropic, deepseek, groq, moonshot, openrouter, xai
- llm + text-embedding: openai, gemini, mistral
- add rerank to new-api (verified working via /v1/rerank)
- set llm + text-embedding + rerank for aggregator/unknown gateways
* feat(provider): add searchable alias to requester manifests
- add a free-text 'alias' field to every requester manifest spec,
containing the vendor's English/Chinese names, pinyin, common
nicknames and flagship model-series names (e.g. moonshot -> kimi,
月之暗面; zhipu -> glm, 智谱清言)
- frontend: ProviderForm requester search now also matches against
alias (substring/contains), so searching 'kimi' surfaces Moonshot,
'硅基' surfaces SiliconFlow, etc.
- also fix support_type: openrouter (relay) supports embedding+rerank;
LangBot Space gains rerank (coming soon)
* fix(provider): make support_type guard defensive against incomplete model_mgr
- _validate_provider_supports now uses getattr to gracefully skip when
model_mgr / provider_dict / manifest lookup is unavailable, instead of
raising AttributeError (fixes unit tests that mock ap.model_mgr as a
bare SimpleNamespace)
- add TestValidateProviderSupports covering: allow supported type,
reject unsupported type, allow when support_type missing, allow when
provider unknown, degrade safely when model_mgr is incomplete
* fix(persistence): guard 0004 migration against missing llm_models table
The 0004_add_llm_model_context_length migration called
inspector.get_columns('llm_models') unconditionally, raising
NoSuchTableError when the table does not exist (e.g. migrating a
fresh/empty DB, as exercised by the integration tests where
create_all() registers no tables because the ORM models are not
imported). Every other migration guards with a table-existence check
first; add the same guard here for both upgrade and downgrade.
Also restore the test head assertion to 0004 (it had been lowered to
0003 to mask this failure).
* Merge branch 'master' into feat/litellm
Resolve conflicts:
- uv.lock: regenerated via 'uv lock' to reconcile litellm/fastuuid
(ours) with openai bump (master).
- Alembic migrations: master added 0004_add_mcp_readme while this
branch added 0004_add_llm_model_context_length, both as children of
0003 (would create multiple heads). Re-chain the litellm migration as
0005_add_llm_model_context_length with down_revision=0004_add_mcp_readme
for a single linear head. Update test head assertion accordingly.
* fix(persistence): shorten migration revision id to fit varchar(32)
PostgreSQL stores alembic_version.version_num as varchar(32).
'0005_add_llm_model_context_length' (33 chars) overflowed it, raising
StringDataRightTruncationError in the PG migration tests. Rename the
revision (and file) to '0005_add_llm_context_length' (27 chars) and
update the head assertions in both SQLite and PostgreSQL migration
tests.
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: fdc310 <2213070223@qq.com>
Co-authored-by: RockChinQ <rockchinq@gmail.com>
This commit is contained in:
@@ -46,6 +46,30 @@ class MonitoringRouterGroup(group.RouterGroup):
|
||||
|
||||
return self.success(data=metrics)
|
||||
|
||||
@self.route('/token-statistics', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_token_statistics() -> str:
|
||||
"""Get detailed token usage statistics (summary, per-model, timeseries)."""
|
||||
bot_ids = quart.request.args.getlist('botId')
|
||||
pipeline_ids = quart.request.args.getlist('pipelineId')
|
||||
start_time_str = quart.request.args.get('startTime')
|
||||
end_time_str = quart.request.args.get('endTime')
|
||||
bucket = quart.request.args.get('bucket', 'hour')
|
||||
if bucket not in ('hour', 'day'):
|
||||
bucket = 'hour'
|
||||
|
||||
start_time = parse_iso_datetime(start_time_str)
|
||||
end_time = parse_iso_datetime(end_time_str)
|
||||
|
||||
stats = await self.ap.monitoring_service.get_token_statistics(
|
||||
bot_ids=bot_ids if bot_ids else None,
|
||||
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
bucket=bucket,
|
||||
)
|
||||
|
||||
return self.success(data=stats)
|
||||
|
||||
@self.route('/messages', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_messages() -> str:
|
||||
"""Get message logs"""
|
||||
|
||||
@@ -34,6 +34,46 @@ def _runtime_model_data(model_uuid: str, model_data: dict) -> dict:
|
||||
return {**model_data, 'uuid': model_uuid}
|
||||
|
||||
|
||||
async def _validate_provider_supports(ap: app.Application, provider_uuid: str, model_type: str) -> None:
|
||||
"""Validate that the provider's requester declares support for ``model_type``.
|
||||
|
||||
``model_type`` is one of the manifest ``support_type`` values:
|
||||
'llm', 'text-embedding', 'rerank'. Raises ValueError when the requester
|
||||
manifest does not list the requested type. This is a server-side guard so
|
||||
a model cannot be attached to a provider that does not support it, even if
|
||||
the frontend tab restriction is bypassed.
|
||||
"""
|
||||
model_mgr = getattr(ap, 'model_mgr', None)
|
||||
if model_mgr is None:
|
||||
return
|
||||
|
||||
provider_dict = getattr(model_mgr, 'provider_dict', None)
|
||||
if not provider_dict:
|
||||
return
|
||||
runtime_provider = provider_dict.get(provider_uuid)
|
||||
if runtime_provider is None:
|
||||
return
|
||||
|
||||
requester_name = getattr(getattr(runtime_provider, 'provider_entity', None), 'requester', None)
|
||||
if not requester_name:
|
||||
return
|
||||
|
||||
get_manifest = getattr(model_mgr, 'get_available_requester_manifest_by_name', None)
|
||||
if not callable(get_manifest):
|
||||
return
|
||||
manifest = get_manifest(requester_name)
|
||||
if manifest is None:
|
||||
return
|
||||
|
||||
spec = getattr(manifest, 'spec', None) or {}
|
||||
support_type = spec.get('support_type') if isinstance(spec, dict) else None
|
||||
# When a manifest omits support_type, do not block (backward compatible).
|
||||
if not support_type:
|
||||
return
|
||||
if model_type not in support_type:
|
||||
raise ValueError(f'Provider requester "{requester_name}" does not support {model_type} models')
|
||||
|
||||
|
||||
class LLMModelsService:
|
||||
ap: app.Application
|
||||
|
||||
@@ -96,6 +136,8 @@ class LLMModelsService:
|
||||
)
|
||||
model_data['provider_uuid'] = provider_uuid
|
||||
|
||||
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'llm')
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(persistence_model.LLMModel).values(**model_data))
|
||||
|
||||
runtime_provider = self.ap.model_mgr.provider_dict.get(model_data['provider_uuid'])
|
||||
@@ -274,6 +316,8 @@ class EmbeddingModelsService:
|
||||
)
|
||||
model_data['provider_uuid'] = provider_uuid
|
||||
|
||||
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'text-embedding')
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_model.EmbeddingModel).values(**model_data)
|
||||
)
|
||||
@@ -434,6 +478,8 @@ class RerankModelsService:
|
||||
)
|
||||
model_data['provider_uuid'] = provider_uuid
|
||||
|
||||
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'rerank')
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_model.RerankModel).values(**model_data)
|
||||
)
|
||||
|
||||
@@ -472,6 +472,179 @@ class MonitoringService:
|
||||
'active_sessions': active_sessions,
|
||||
}
|
||||
|
||||
async def get_token_statistics(
|
||||
self,
|
||||
bot_ids: list[str] | None = None,
|
||||
pipeline_ids: list[str] | None = None,
|
||||
start_time: datetime.datetime | None = None,
|
||||
end_time: datetime.datetime | None = None,
|
||||
bucket: str = 'hour',
|
||||
) -> dict:
|
||||
"""Get detailed token usage statistics for production observability.
|
||||
|
||||
Returns:
|
||||
- summary: aggregate token counters and call/latency stats over the window
|
||||
- by_model: per-model token + call breakdown (sorted by total tokens desc)
|
||||
- timeseries: token usage bucketed by `bucket` ('hour' or 'day')
|
||||
|
||||
Only successful LLM calls are counted toward token totals; error calls are
|
||||
reported separately so a spike in failures is visible without polluting
|
||||
token accounting.
|
||||
"""
|
||||
LLMCall = persistence_monitoring.MonitoringLLMCall
|
||||
|
||||
conditions = []
|
||||
if bot_ids:
|
||||
conditions.append(LLMCall.bot_id.in_(bot_ids))
|
||||
if pipeline_ids:
|
||||
conditions.append(LLMCall.pipeline_id.in_(pipeline_ids))
|
||||
if start_time:
|
||||
conditions.append(LLMCall.timestamp >= start_time)
|
||||
if end_time:
|
||||
conditions.append(LLMCall.timestamp <= end_time)
|
||||
|
||||
def _apply(query):
|
||||
if conditions:
|
||||
query = query.where(sqlalchemy.and_(*conditions))
|
||||
return query
|
||||
|
||||
# ---- Summary aggregates ----
|
||||
summary_query = _apply(
|
||||
sqlalchemy.select(
|
||||
sqlalchemy.func.count(LLMCall.id),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
|
||||
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'success', 1), else_=0)),
|
||||
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
|
||||
# Count of successful calls that nonetheless recorded zero tokens —
|
||||
# a data-quality signal that usage reporting may be broken upstream.
|
||||
sqlalchemy.func.sum(
|
||||
sqlalchemy.case(
|
||||
(sqlalchemy.and_(LLMCall.status == 'success', LLMCall.total_tokens == 0), 1),
|
||||
else_=0,
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
summary_result = await self.ap.persistence_mgr.execute_async(summary_query)
|
||||
row = summary_result.first()
|
||||
(
|
||||
total_calls,
|
||||
total_input_tokens,
|
||||
total_output_tokens,
|
||||
total_tokens,
|
||||
total_duration,
|
||||
total_cost,
|
||||
success_calls,
|
||||
error_calls,
|
||||
zero_token_success_calls,
|
||||
) = row if row else (0, 0, 0, 0, 0, 0.0, 0, 0, 0)
|
||||
|
||||
total_calls = total_calls or 0
|
||||
success_calls = success_calls or 0
|
||||
error_calls = error_calls or 0
|
||||
zero_token_success_calls = zero_token_success_calls or 0
|
||||
|
||||
summary = {
|
||||
'total_calls': total_calls,
|
||||
'success_calls': success_calls,
|
||||
'error_calls': error_calls,
|
||||
'total_input_tokens': int(total_input_tokens or 0),
|
||||
'total_output_tokens': int(total_output_tokens or 0),
|
||||
'total_tokens': int(total_tokens or 0),
|
||||
'total_cost': round(float(total_cost or 0.0), 6),
|
||||
'avg_tokens_per_call': int((total_tokens or 0) / total_calls) if total_calls > 0 else 0,
|
||||
'avg_duration_ms': int((total_duration or 0) / total_calls) if total_calls > 0 else 0,
|
||||
'avg_tokens_per_second': round((total_output_tokens or 0) / (total_duration / 1000), 2)
|
||||
if total_duration and total_duration > 0
|
||||
else 0,
|
||||
'zero_token_success_calls': zero_token_success_calls,
|
||||
}
|
||||
|
||||
# ---- Per-model breakdown ----
|
||||
by_model_query = _apply(
|
||||
sqlalchemy.select(
|
||||
LLMCall.model_name,
|
||||
sqlalchemy.func.count(LLMCall.id),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
|
||||
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
|
||||
).group_by(LLMCall.model_name)
|
||||
)
|
||||
by_model_result = await self.ap.persistence_mgr.execute_async(by_model_query)
|
||||
by_model = []
|
||||
for mrow in by_model_result.all():
|
||||
(
|
||||
model_name,
|
||||
m_calls,
|
||||
m_in,
|
||||
m_out,
|
||||
m_total,
|
||||
m_duration,
|
||||
m_cost,
|
||||
m_errors,
|
||||
) = mrow
|
||||
m_calls = m_calls or 0
|
||||
by_model.append(
|
||||
{
|
||||
'model_name': model_name,
|
||||
'calls': m_calls,
|
||||
'error_calls': m_errors or 0,
|
||||
'input_tokens': int(m_in or 0),
|
||||
'output_tokens': int(m_out or 0),
|
||||
'total_tokens': int(m_total or 0),
|
||||
'cost': round(float(m_cost or 0.0), 6),
|
||||
'avg_tokens_per_call': int((m_total or 0) / m_calls) if m_calls > 0 else 0,
|
||||
'avg_duration_ms': int((m_duration or 0) / m_calls) if m_calls > 0 else 0,
|
||||
}
|
||||
)
|
||||
by_model.sort(key=lambda x: x['total_tokens'], reverse=True)
|
||||
|
||||
# ---- Time-bucketed series ----
|
||||
# Use a DB-agnostic bucketing approach: fetch (timestamp, tokens) rows and
|
||||
# aggregate in Python. The window is bounded by the time filter, so this is
|
||||
# cheap for typical dashboard ranges (hours/days).
|
||||
series_query = _apply(
|
||||
sqlalchemy.select(
|
||||
LLMCall.timestamp,
|
||||
LLMCall.input_tokens,
|
||||
LLMCall.output_tokens,
|
||||
LLMCall.total_tokens,
|
||||
).order_by(LLMCall.timestamp.asc())
|
||||
)
|
||||
series_result = await self.ap.persistence_mgr.execute_async(series_query)
|
||||
|
||||
bucket_fmt = '%Y-%m-%d %H:00' if bucket == 'hour' else '%Y-%m-%d'
|
||||
buckets: dict[str, dict] = {}
|
||||
for srow in series_result.all():
|
||||
ts, s_in, s_out, s_total = srow
|
||||
if ts is None:
|
||||
continue
|
||||
key = ts.strftime(bucket_fmt)
|
||||
b = buckets.setdefault(
|
||||
key,
|
||||
{'bucket': key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'calls': 0},
|
||||
)
|
||||
b['input_tokens'] += int(s_in or 0)
|
||||
b['output_tokens'] += int(s_out or 0)
|
||||
b['total_tokens'] += int(s_total or 0)
|
||||
b['calls'] += 1
|
||||
|
||||
timeseries = [buckets[k] for k in sorted(buckets.keys())]
|
||||
|
||||
return {
|
||||
'summary': summary,
|
||||
'by_model': by_model,
|
||||
'timeseries': timeseries,
|
||||
'bucket': bucket,
|
||||
}
|
||||
|
||||
async def get_messages(
|
||||
self,
|
||||
bot_ids: list[str] | None = None,
|
||||
|
||||
Reference in New Issue
Block a user