Files
LangBot/src/langbot/pkg/provider/modelmgr/requester.py
huanghuoguoguo 9ecb587ac0 refactor(provider): use LiteLLM as unified LLM requester backend (#2150)
* refactor(provider): use LiteLLM as unified LLM requester backend

  - Replace 23+ individual requester implementations with unified litellmchat.py
  - Add litellm_provider field to 27 YAML manifests for provider routing
  - Delete redundant requester subclasses
  - Add unit tests for LiteLLMRequester (29 tests)
  - Fix num_retries parameter name (was max_retries)
  - Fix exception handling order for subclass exceptions

  LiteLLM provides unified API for 100+ providers, eliminating need for
  provider-specific requesters.

* fix: ruff format provider.py

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(provider): simplify LiteLLM requester usage handling

  - Remove unused Anthropic-specific tool schema generation
  - Share completion argument construction between normal and streaming calls
  - Use LiteLLM/OpenAI native usage fields for monitoring
  - Collect stream token usage from LiteLLM stream_options
  - Update LiteLLM requester tests for unified usage fields

* restore: restore deleted provider requester files

Restore individual provider requester implementations that were
removed in de61b5d3. These files coexist with the unified
litellmchat.py backend.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat: update requesters and improve provider selection UI

- Added `litellm_provider` field to various requesters' YAML configurations.
- Removed obsolete Python requester files for OpenRouter, PPIO, QHAIGC, ShengSuanYun, SiliconFlow, Space, TokenPony, VolcArk, and Xai.
- Introduced new requesters for Tencent and Together AI with corresponding YAML configurations and SVG icons.
- Enhanced the ProviderForm component to include a searchable dropdown for selecting providers, improving user experience.
- Updated localization files to include search provider text for both English and Chinese.

* fix(provider): align litellm rebase with master

* fix(provider): capture streaming token usage; add token observability

The LiteLLM streaming requester only captured usage when a chunk had an
empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and
providers send the final usage payload in a chunk that still carries an
empty-delta choice, so streamed calls always recorded 0 tokens in the
monitoring logs/dashboard (non-streaming worked).

- Capture stream usage whenever a chunk carries it, regardless of choices
- Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens)
- Register litellm in bootutils/deps.py (was in pyproject only)
- Add MonitoringService.get_token_statistics + /monitoring/token-statistics
  endpoint: summary, per-model breakdown, token timeseries, and a
  zero-token-success data-quality signal
- Add TokenMonitoring dashboard tab (summary tiles, stacked token chart,
  per-model table) + i18n (en/zh)
- Regression tests for stream usage capture and usage normalization

Verified end-to-end against a real OpenAI-compatible endpoint with
gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both
streaming and non-streaming paths.

* refactor(provider): simplify litellm capabilities

* style: simplify wrapped expressions

* feat(models): persist context metadata

* fix(provider): handle dict embeddings and openai-compatible rerank in LiteLLMRequester

- invoke_embedding: support both object- and dict-shaped response.data
  entries (OpenAI-compatible gateways like new-api return dicts)
- invoke_rerank: litellm.arerank rejects the 'openai' provider, so for
  openai-compatible (or unspecified) providers call the standard
  Jina/Cohere-style POST /v1/rerank endpoint directly over HTTP
- accept both 'relevance_score' and 'score' fields in rerank results
- add unit tests for the openai-compatible HTTP rerank path

* feat(provider): enforce requester support_type when adding models

- frontend: AddModelPopover only shows model-type tabs (llm/embedding/
  rerank) that the provider's requester declares in its manifest
  support_type; ModelsDialog fetches requester manifests and maps
  requester -> support_type, passed down through ProviderCard
- backend: add _validate_provider_supports guard in create_llm_model /
  create_embedding_model / create_rerank_model so a model cannot be
  attached to a provider whose requester does not support that type,
  even if the frontend restriction is bypassed (manifests without
  support_type are allowed for backward compatibility)
- manifests: correct support_type for providers that do not offer all
  three model types:
  - llm only: anthropic, deepseek, groq, moonshot, openrouter, xai
  - llm + text-embedding: openai, gemini, mistral
  - add rerank to new-api (verified working via /v1/rerank)
  - set llm + text-embedding + rerank for aggregator/unknown gateways

* feat(provider): add searchable alias to requester manifests

- add a free-text 'alias' field to every requester manifest spec,
  containing the vendor's English/Chinese names, pinyin, common
  nicknames and flagship model-series names (e.g. moonshot -> kimi,
  月之暗面; zhipu -> glm, 智谱清言)
- frontend: ProviderForm requester search now also matches against
  alias (substring/contains), so searching 'kimi' surfaces Moonshot,
  '硅基' surfaces SiliconFlow, etc.
- also fix support_type: openrouter (relay) supports embedding+rerank;
  LangBot Space gains rerank (coming soon)

* fix(provider): make support_type guard defensive against incomplete model_mgr

- _validate_provider_supports now uses getattr to gracefully skip when
  model_mgr / provider_dict / manifest lookup is unavailable, instead of
  raising AttributeError (fixes unit tests that mock ap.model_mgr as a
  bare SimpleNamespace)
- add TestValidateProviderSupports covering: allow supported type,
  reject unsupported type, allow when support_type missing, allow when
  provider unknown, degrade safely when model_mgr is incomplete

* fix(persistence): guard 0004 migration against missing llm_models table

The 0004_add_llm_model_context_length migration called
inspector.get_columns('llm_models') unconditionally, raising
NoSuchTableError when the table does not exist (e.g. migrating a
fresh/empty DB, as exercised by the integration tests where
create_all() registers no tables because the ORM models are not
imported). Every other migration guards with a table-existence check
first; add the same guard here for both upgrade and downgrade.

Also restore the test head assertion to 0004 (it had been lowered to
0003 to mask this failure).

* Merge branch 'master' into feat/litellm

Resolve conflicts:
- uv.lock: regenerated via 'uv lock' to reconcile litellm/fastuuid
  (ours) with openai bump (master).
- Alembic migrations: master added 0004_add_mcp_readme while this
  branch added 0004_add_llm_model_context_length, both as children of
  0003 (would create multiple heads). Re-chain the litellm migration as
  0005_add_llm_model_context_length with down_revision=0004_add_mcp_readme
  for a single linear head. Update test head assertion accordingly.

* fix(persistence): shorten migration revision id to fit varchar(32)

PostgreSQL stores alembic_version.version_num as varchar(32).
'0005_add_llm_model_context_length' (33 chars) overflowed it, raising
StringDataRightTruncationError in the PG migration tests. Rename the
revision (and file) to '0005_add_llm_context_length' (27 chars) and
update the head assertions in both SQLite and PostgreSQL migration
tests.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: fdc310 <2213070223@qq.com>
Co-authored-by: RockChinQ <rockchinq@gmail.com>
2026-06-13 16:59:48 +08:00

460 lines
16 KiB
Python

from __future__ import annotations
import abc
import typing
import time
from ...core import app
from ...entity.persistence import model as persistence_model
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from . import token
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class RuntimeProvider:
"""运行时模型提供商"""
provider_entity: persistence_model.ModelProvider
"""提供商数据"""
token_mgr: token.TokenManager
"""api key管理器"""
requester: ProviderAPIRequester
"""请求器实例"""
def __init__(
self,
provider_entity: persistence_model.ModelProvider,
token_mgr: token.TokenManager,
requester: ProviderAPIRequester,
):
self.provider_entity = provider_entity
self.token_mgr = token_mgr
self.requester = requester
async def invoke_llm(
self,
query: pipeline_query.Query,
model: RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
"""Bridge method for invoking LLM with monitoring"""
# Start timing for monitoring
start_time = time.time()
input_tokens = 0
output_tokens = 0
status = 'success'
error_message = None
try:
# Call the underlying requester
result = await self.requester.invoke_llm(
query=query,
model=model,
messages=messages,
funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
)
# Try to extract token usage if the requester returns it
# For requesters that return tuple (message, usage_info)
if isinstance(result, tuple):
msg, usage_info = result
if usage_info:
input_tokens = usage_info.get('prompt_tokens', 0)
output_tokens = usage_info.get('completion_tokens', 0)
return msg
else:
return result
except Exception as e:
status = 'error'
error_message = str(e)
raise
finally:
# Record LLM call monitoring data (only if query is provided)
if query is not None:
duration_ms = int((time.time() - start_time) * 1000)
# Import monitoring helper
try:
from ...pipeline import monitoring_helper
# Get monitoring metadata from query variables
if query.variables:
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
message_id = query.variables.get('_monitoring_message_id')
else:
bot_name = 'Unknown'
pipeline_name = 'Unknown'
message_id = None
await monitoring_helper.MonitoringHelper.record_llm_call(
ap=self.requester.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=query.pipeline_uuid or 'unknown',
pipeline_name=pipeline_name,
model_name=model.model_entity.name,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
status=status,
error_message=error_message,
message_id=message_id,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
"""Bridge method for invoking LLM stream with monitoring"""
# Start timing for monitoring
start_time = time.time()
status = 'success'
error_message = None
input_tokens = 0
output_tokens = 0
try:
# Stream the response
async for chunk in self.requester.invoke_llm_stream(
query=query,
model=model,
messages=messages,
funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
):
yield chunk
# Extract usage from stream if available (stored by LiteLLM requester)
if query:
if query.variables is None:
query.variables = {}
if '_stream_usage' in query.variables:
usage_info = query.variables['_stream_usage']
input_tokens = usage_info.get('prompt_tokens', 0)
output_tokens = usage_info.get('completion_tokens', 0)
del query.variables['_stream_usage']
except Exception as e:
status = 'error'
error_message = str(e)
raise
finally:
# Record LLM call monitoring data (only if query is provided)
if query is not None:
duration_ms = int((time.time() - start_time) * 1000)
# Import monitoring helper
try:
from ...pipeline import monitoring_helper
# Get monitoring metadata from query variables
if query.variables:
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
message_id = query.variables.get('_monitoring_message_id')
else:
bot_name = 'Unknown'
pipeline_name = 'Unknown'
message_id = None
await monitoring_helper.MonitoringHelper.record_llm_call(
ap=self.requester.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=query.pipeline_uuid or 'unknown',
pipeline_name=pipeline_name,
model_name=model.model_entity.name,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
status=status,
error_message=error_message,
message_id=message_id,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
async def invoke_embedding(
self,
model: RuntimeEmbeddingModel,
input_text: typing.List[str],
extra_args: dict[str, typing.Any] = {},
knowledge_base_id: str | None = None,
query_text: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
call_type: str | None = None,
) -> typing.List[typing.List[float]]:
"""Bridge method for invoking embedding with monitoring"""
# Start timing for monitoring
start_time = time.time()
prompt_tokens = 0
total_tokens = 0
status = 'success'
error_message = None
try:
# Call the underlying requester
result = await self.requester.invoke_embedding(
model=model,
input_text=input_text,
extra_args=extra_args,
)
# Handle both old format (list only) and new format (tuple with usage)
if isinstance(result, tuple):
embeddings, usage_info = result
if usage_info:
prompt_tokens = usage_info.get('prompt_tokens', 0)
total_tokens = usage_info.get('total_tokens', 0)
return embeddings
else:
return result
except Exception as e:
status = 'error'
error_message = str(e)
raise
finally:
# Record embedding call monitoring data
duration_ms = int((time.time() - start_time) * 1000)
try:
await self.requester.ap.monitoring_service.record_embedding_call(
model_name=model.model_entity.name,
prompt_tokens=prompt_tokens,
total_tokens=total_tokens,
duration=duration_ms,
input_count=len(input_text),
status=status,
error_message=error_message,
knowledge_base_id=knowledge_base_id,
query_text=query_text,
session_id=session_id,
message_id=message_id,
call_type=call_type,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record embedding call: {monitor_err}')
async def invoke_rerank(
self,
model: RuntimeRerankModel,
query: str,
documents: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.List[dict]:
"""Bridge method for invoking rerank with monitoring"""
start_time = time.time()
status = 'success'
try:
result = await self.requester.invoke_rerank(
model=model,
query=query,
documents=documents,
extra_args=extra_args,
)
return result
except Exception:
status = 'error'
raise
finally:
duration_ms = int((time.time() - start_time) * 1000)
try:
self.requester.ap.logger.debug(
f'[Rerank] model={model.model_entity.name} docs={len(documents)} '
f'duration={duration_ms}ms status={status}'
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record rerank call: {monitor_err}')
class RuntimeLLMModel:
"""运行时模型"""
model_entity: persistence_model.LLMModel
"""模型数据"""
provider: RuntimeProvider
"""提供商实例"""
def __init__(
self,
model_entity: persistence_model.LLMModel,
provider: RuntimeProvider,
):
self.model_entity = model_entity
self.provider = provider
class RuntimeEmbeddingModel:
"""运行时 Embedding 模型"""
model_entity: persistence_model.EmbeddingModel
"""模型数据"""
provider: RuntimeProvider
"""提供商实例"""
def __init__(
self,
model_entity: persistence_model.EmbeddingModel,
provider: RuntimeProvider,
):
self.model_entity = model_entity
self.provider = provider
class RuntimeRerankModel:
"""运行时 Rerank 模型"""
model_entity: persistence_model.RerankModel
"""模型数据"""
provider: RuntimeProvider
"""提供商实例"""
def __init__(
self,
model_entity: persistence_model.RerankModel,
provider: RuntimeProvider,
):
self.model_entity = model_entity
self.provider = provider
class ProviderAPIRequester(metaclass=abc.ABCMeta):
"""Provider API请求器"""
name: str = None
init_api_key: str = 'langbot-init-placeholder'
ap: app.Application
default_config: dict[str, typing.Any] = {}
requester_cfg: dict[str, typing.Any] = {}
def __init__(self, ap: app.Application, config: dict[str, typing.Any]):
self.ap = ap
self.requester_cfg = {**self.default_config}
self.requester_cfg.update(config)
async def initialize(self):
pass
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
"""Scan models supported by the provider.
The default implementation does not support scanning. Requesters that
can enumerate remote models should override this method.
"""
raise NotImplementedError('This provider does not support model scanning')
@abc.abstractmethod
async def invoke_llm(
self,
query: pipeline_query.Query,
model: RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
"""调用API
Args:
model (RuntimeLLMModel): 使用的模型信息
messages (typing.List[llm_entities.Message]): 消息对象列表
funcs (typing.List[tools_entities.LLMFunction], optional): 使用的工具函数列表. Defaults to None.
extra_args (dict[str, typing.Any], optional): 额外的参数. Defaults to {}.
remove_think (bool, optional): 是否移思考中的消息. Defaults to False.
Returns:
llm_entities.Message: 返回消息对象
"""
pass
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
"""调用API
Args:
model (RuntimeLLMModel): 使用的模型信息
messages (typing.List[provider_message.Message]): 消息对象列表
funcs (typing.List[resource_tool.LLMTool], optional): 使用的工具函数列表. Defaults to None.
extra_args (dict[str, typing.Any], optional): 额外的参数. Defaults to {}.
remove_think (bool, optional): 是否移除思考中的消息. Defaults to False.
Returns:
typing.AsyncGenerator[provider_message.MessageChunk]: 返回消息对象
"""
pass
async def invoke_embedding(
self,
model: RuntimeEmbeddingModel,
input_text: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.Union[typing.List[typing.List[float]], tuple[typing.List[typing.List[float]], dict]]:
"""调用 Embedding API
Args:
model (RuntimeEmbeddingModel): 使用的模型信息
input_text (typing.List[str]): 输入文本
extra_args (dict[str, typing.Any], optional): 额外的参数. Defaults to {}.
Returns:
typing.List[typing.List[float]]: 返回的 embedding 向量
或者 tuple[typing.List[typing.List[float]], dict]: 返回 (embedding 向量, usage_info)
"""
pass
async def invoke_rerank(
self,
model: RuntimeRerankModel,
query: str,
documents: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.List[dict]:
"""调用 Rerank API
Args:
model (RuntimeRerankModel): 使用的模型信息
query (str): 查询文本
documents (typing.List[str]): 待重排序的文档列表
extra_args (dict[str, typing.Any], optional): 额外的参数. Defaults to {}.
Returns:
typing.List[dict]: [{"index": int, "relevance_score": float}, ...]
"""
raise NotImplementedError('This requester does not support rerank')