Compare commits

..

9 Commits

Author SHA1 Message Date
RockChinQ
2e7978317c chore(release): bump version to 4.10.2 2026-06-13 11:21:44 -04:00
RockChinQ
b7d8332cb0 feat(telemetry): include instance_create_ts in heartbeat payload
Load the instance creation timestamp from data/labels/instance_id.json
(backfilling+persisting it for instances created before the field existed),
expose it as constants.instance_create_ts, and include it in the heartbeat
payload so Space can anchor Time-To-Value / onboarding analytics on real
install time rather than first-heartbeat.

Verified: py_compile, ruff, pytest tests/unit_tests/telemetry/ (37 passed).
2026-06-13 11:13:18 -04:00
huanghuoguoguo
7fe3eedeea fix(provider): use LiteLLM input window for context length (#2243) 2026-06-13 21:27:47 +08:00
RockChinQ
b6fde30aa7 style(plugins): ruff format logs route 2026-06-13 08:03:29 -04:00
RockChinQ
5bfa38cbf2 feat(plugins): show plugin logs on detail page via Docs/Logs tablist
Add a Logs tab beside Documentation on the plugin detail page, showing
the output a plugin prints through the standard Python logger (per the
wiki style guide). Logs are captured from the plugin's stderr by the
plugin runtime and fetched on demand.

- Bump langbot-plugin pin to 0.4.4 (adds GET_PLUGIN_LOGS action)
- plugin_connector/handler: get_plugin_logs RPC client
- HTTP route GET /api/v1/plugins/<author>/<name>/logs (limit + level)
- Frontend: wrap detail right panel in Docs/Logs Tabs; PluginLogs
  component with level filter, manual + 3s auto refresh, bottom-follow
- i18n: 7 new keys across all 8 locales
2026-06-13 08:01:18 -04:00
RockChinQ
a97d2040bb fix(i18n,api): backfill missing token-monitoring keys and fix JWT expiry tz
- i18n: add models.searchProviders, monitoring.tabs.tokens and the
  monitoring.tokens.* block (incl. bucket.hour/day) to es-ES, ja-JP,
  ru-RU, th-TH, vi-VN and zh-Hant, which were missing them and failed
  the Check i18n Keys CI.
- api: generate_jwt_token built 'exp' from a naive datetime.now(), which
  PyJWT validates against UTC — in any timezone ahead of UTC the token
  was already expired at issue time. Use datetime.now(timezone.utc).
2026-06-13 05:26:18 -04:00
RockChinQ
a2c6c8201b refactor(persistence): freeze legacy DB migration chain, drop dbm026
The legacy pkg/persistence/migrations (DBMigration / dbmXXX) system now
coexists with Alembic but accepts no new migrations — all new schema
changes go through Alembic.

- remove dbm026_llm_model_context_length (superseded by Alembic
  0005_add_llm_context_length, which makes the identical change)
- cap required_database_version at 25 (legacy chain dbm001-025 kept
  read-only to upgrade pre-existing 3.x DBs to the Alembic baseline)
- add migrations/README.md documenting the freeze
- document the Alembic-only policy and revision-id/idempotency rules in
  AGENTS.md
2026-06-13 05:26:08 -04:00
RockChinQ
672abfe95d refactor(core): remove pre-3.x legacy config migration system
The pkg/core/migrations system (m001-m043 DBMigration-style config
migrations, MigrationStage, and the core.migration base class) only ever
ran when upgrading from LangBot 3.x. The last 3.x release is over a year
old and is no longer supported, so this dead code is removed entirely:

- delete pkg/core/migrations/ (43 mXXX_*.py + __init__)
- delete pkg/core/migration.py (base class + registry)
- delete pkg/core/stages/migrate.py (MigrationStage)
- drop 'MigrationStage' from boot.py stage_order
- delete tests/unit_tests/core/test_migration.py (tested the removed base class)
2026-06-13 05:26:01 -04:00
huanghuoguoguo
9ecb587ac0 refactor(provider): use LiteLLM as unified LLM requester backend (#2150)
* refactor(provider): use LiteLLM as unified LLM requester backend

  - Replace 23+ individual requester implementations with unified litellmchat.py
  - Add litellm_provider field to 27 YAML manifests for provider routing
  - Delete redundant requester subclasses
  - Add unit tests for LiteLLMRequester (29 tests)
  - Fix num_retries parameter name (was max_retries)
  - Fix exception handling order for subclass exceptions

  LiteLLM provides unified API for 100+ providers, eliminating need for
  provider-specific requesters.

* fix: ruff format provider.py

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(provider): simplify LiteLLM requester usage handling

  - Remove unused Anthropic-specific tool schema generation
  - Share completion argument construction between normal and streaming calls
  - Use LiteLLM/OpenAI native usage fields for monitoring
  - Collect stream token usage from LiteLLM stream_options
  - Update LiteLLM requester tests for unified usage fields

* restore: restore deleted provider requester files

Restore individual provider requester implementations that were
removed in de61b5d3. These files coexist with the unified
litellmchat.py backend.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat: update requesters and improve provider selection UI

- Added `litellm_provider` field to various requesters' YAML configurations.
- Removed obsolete Python requester files for OpenRouter, PPIO, QHAIGC, ShengSuanYun, SiliconFlow, Space, TokenPony, VolcArk, and Xai.
- Introduced new requesters for Tencent and Together AI with corresponding YAML configurations and SVG icons.
- Enhanced the ProviderForm component to include a searchable dropdown for selecting providers, improving user experience.
- Updated localization files to include search provider text for both English and Chinese.

* fix(provider): align litellm rebase with master

* fix(provider): capture streaming token usage; add token observability

The LiteLLM streaming requester only captured usage when a chunk had an
empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and
providers send the final usage payload in a chunk that still carries an
empty-delta choice, so streamed calls always recorded 0 tokens in the
monitoring logs/dashboard (non-streaming worked).

- Capture stream usage whenever a chunk carries it, regardless of choices
- Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens)
- Register litellm in bootutils/deps.py (was in pyproject only)
- Add MonitoringService.get_token_statistics + /monitoring/token-statistics
  endpoint: summary, per-model breakdown, token timeseries, and a
  zero-token-success data-quality signal
- Add TokenMonitoring dashboard tab (summary tiles, stacked token chart,
  per-model table) + i18n (en/zh)
- Regression tests for stream usage capture and usage normalization

Verified end-to-end against a real OpenAI-compatible endpoint with
gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both
streaming and non-streaming paths.

* refactor(provider): simplify litellm capabilities

* style: simplify wrapped expressions

* feat(models): persist context metadata

* fix(provider): handle dict embeddings and openai-compatible rerank in LiteLLMRequester

- invoke_embedding: support both object- and dict-shaped response.data
  entries (OpenAI-compatible gateways like new-api return dicts)
- invoke_rerank: litellm.arerank rejects the 'openai' provider, so for
  openai-compatible (or unspecified) providers call the standard
  Jina/Cohere-style POST /v1/rerank endpoint directly over HTTP
- accept both 'relevance_score' and 'score' fields in rerank results
- add unit tests for the openai-compatible HTTP rerank path

* feat(provider): enforce requester support_type when adding models

- frontend: AddModelPopover only shows model-type tabs (llm/embedding/
  rerank) that the provider's requester declares in its manifest
  support_type; ModelsDialog fetches requester manifests and maps
  requester -> support_type, passed down through ProviderCard
- backend: add _validate_provider_supports guard in create_llm_model /
  create_embedding_model / create_rerank_model so a model cannot be
  attached to a provider whose requester does not support that type,
  even if the frontend restriction is bypassed (manifests without
  support_type are allowed for backward compatibility)
- manifests: correct support_type for providers that do not offer all
  three model types:
  - llm only: anthropic, deepseek, groq, moonshot, openrouter, xai
  - llm + text-embedding: openai, gemini, mistral
  - add rerank to new-api (verified working via /v1/rerank)
  - set llm + text-embedding + rerank for aggregator/unknown gateways

* feat(provider): add searchable alias to requester manifests

- add a free-text 'alias' field to every requester manifest spec,
  containing the vendor's English/Chinese names, pinyin, common
  nicknames and flagship model-series names (e.g. moonshot -> kimi,
  月之暗面; zhipu -> glm, 智谱清言)
- frontend: ProviderForm requester search now also matches against
  alias (substring/contains), so searching 'kimi' surfaces Moonshot,
  '硅基' surfaces SiliconFlow, etc.
- also fix support_type: openrouter (relay) supports embedding+rerank;
  LangBot Space gains rerank (coming soon)

* fix(provider): make support_type guard defensive against incomplete model_mgr

- _validate_provider_supports now uses getattr to gracefully skip when
  model_mgr / provider_dict / manifest lookup is unavailable, instead of
  raising AttributeError (fixes unit tests that mock ap.model_mgr as a
  bare SimpleNamespace)
- add TestValidateProviderSupports covering: allow supported type,
  reject unsupported type, allow when support_type missing, allow when
  provider unknown, degrade safely when model_mgr is incomplete

* fix(persistence): guard 0004 migration against missing llm_models table

The 0004_add_llm_model_context_length migration called
inspector.get_columns('llm_models') unconditionally, raising
NoSuchTableError when the table does not exist (e.g. migrating a
fresh/empty DB, as exercised by the integration tests where
create_all() registers no tables because the ORM models are not
imported). Every other migration guards with a table-existence check
first; add the same guard here for both upgrade and downgrade.

Also restore the test head assertion to 0004 (it had been lowered to
0003 to mask this failure).

* Merge branch 'master' into feat/litellm

Resolve conflicts:
- uv.lock: regenerated via 'uv lock' to reconcile litellm/fastuuid
  (ours) with openai bump (master).
- Alembic migrations: master added 0004_add_mcp_readme while this
  branch added 0004_add_llm_model_context_length, both as children of
  0003 (would create multiple heads). Re-chain the litellm migration as
  0005_add_llm_model_context_length with down_revision=0004_add_mcp_readme
  for a single linear head. Update test head assertion accordingly.

* fix(persistence): shorten migration revision id to fit varchar(32)

PostgreSQL stores alembic_version.version_num as varchar(32).
'0005_add_llm_model_context_length' (33 chars) overflowed it, raising
StringDataRightTruncationError in the PG migration tests. Rename the
revision (and file) to '0005_add_llm_context_length' (27 chars) and
update the head assertions in both SQLite and PostgreSQL migration
tests.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: fdc310 <2213070223@qq.com>
Co-authored-by: RockChinQ <rockchinq@gmail.com>
2026-06-13 16:59:48 +08:00
184 changed files with 4641 additions and 6090 deletions

View File

@@ -125,6 +125,14 @@ uv run python -m langbot.pkg.persistence.alembic_runner autogenerate "descriptio
Review and edit the generated script before committing. Migrations execute automatically on startup. `autogenerate` detects schema changes (add/drop columns, tables, type changes) but **data migrations** (e.g. mutating JSON field contents) must be hand-written into the generated script. `env.py` sets `render_as_batch=True`, so SQLite's ALTER TABLE limits are handled automatically — no need to branch per database. More in the wiki ["开发配置"](https://docs.langbot.app/zh/develop/dev-config#数据库迁移).
When writing a migration, follow these rules:
- **Revision id ≤ 32 characters.** PostgreSQL stores `alembic_version.version_num` as `varchar(32)`; a longer id raises `StringDataRightTruncationError` at runtime. Prefer short, descriptive ids like `0005_add_llm_context_length`.
- **Guard every operation against missing tables/columns.** Fresh installs build the schema via `create_all()` and then stamp the Alembic baseline, so a migration may run against a table that already has the change — or, in tests, against an empty database. Check `inspector.get_table_names()` / `inspector.get_columns(...)` before `add_column` / `drop_column`, mirroring the existing migrations.
- **Keep a single linear head.** Chain `down_revision` to the current head; do not create branches. Run the migration tests after adding one: `uv run pytest tests/integration/persistence/ -q` (the PostgreSQL test needs a running PG via `TEST_POSTGRES_URL`).
> **Legacy migration system (deprecated — do not extend).** The old 3.x migration system under `src/langbot/pkg/persistence/migrations/` (`DBMigration` subclasses in `dbmXXX_*.py`, run from `pkg/persistence/mgr.py`) is **frozen**. Do **not** add new `dbmXXX_*.py` files. The chain is capped at `required_database_version = 25` (`pkg/utils/constants.py`); those files only exist to upgrade pre-existing 3.x databases up to the Alembic baseline and are kept read-only. All new schema changes go through Alembic.
## Some Principles
- Keep it simple, stupid.

View File

@@ -1,6 +1,6 @@
[project]
name = "langbot"
version = "4.10.1"
version = "4.10.2"
description = "Production-grade platform for building agentic IM bots"
readme = "README.md"
license-files = ["LICENSE"]
@@ -70,7 +70,7 @@ dependencies = [
"chromadb>=1.0.0,<2.0.0",
"qdrant-client (>=1.15.1,<2.0.0)",
"pyseekdb==1.1.0.post3",
"langbot-plugin==0.4.3",
"langbot-plugin==0.4.4",
"asyncpg>=0.30.0",
"line-bot-sdk>=3.19.0",
"matrix-nio>=0.25.2",
@@ -79,6 +79,7 @@ dependencies = [
"pymilvus>=2.6.4",
"pgvector>=0.4.1",
"botocore>=1.42.39",
"litellm>=1.0.0",
]
keywords = [
"bot",

View File

@@ -1,3 +1,3 @@
"""LangBot - Production-grade platform for building agentic IM bots"""
__version__ = '4.10.1'
__version__ = '4.10.2'

View File

@@ -46,6 +46,30 @@ class MonitoringRouterGroup(group.RouterGroup):
return self.success(data=metrics)
@self.route('/token-statistics', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_token_statistics() -> str:
"""Get detailed token usage statistics (summary, per-model, timeseries)."""
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
bucket = quart.request.args.get('bucket', 'hour')
if bucket not in ('hour', 'day'):
bucket = 'hour'
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
stats = await self.ap.monitoring_service.get_token_statistics(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
bucket=bucket,
)
return self.success(data=stats)
@self.route('/messages', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_messages() -> str:
"""Get message logs"""

View File

@@ -271,6 +271,20 @@ class PluginsRouterGroup(group.RouterGroup):
readme = await self.ap.plugin_connector.get_plugin_readme(author, plugin_name, language=language)
return self.success(data={'readme': readme})
@self.route(
'/<author>/<plugin_name>/logs',
methods=['GET'],
auth_type=group.AuthType.USER_TOKEN_OR_API_KEY,
)
async def _(author: str, plugin_name: str) -> quart.Response:
try:
limit = int(quart.request.args.get('limit', 200))
except (TypeError, ValueError):
limit = 200
level = quart.request.args.get('level') or None
logs = await self.ap.plugin_connector.get_plugin_logs(author, plugin_name, limit=limit, level=level)
return self.success(data={'logs': logs})
@self.route(
'/<author>/<plugin_name>/icon',
methods=['GET'],

View File

@@ -34,6 +34,46 @@ def _runtime_model_data(model_uuid: str, model_data: dict) -> dict:
return {**model_data, 'uuid': model_uuid}
async def _validate_provider_supports(ap: app.Application, provider_uuid: str, model_type: str) -> None:
"""Validate that the provider's requester declares support for ``model_type``.
``model_type`` is one of the manifest ``support_type`` values:
'llm', 'text-embedding', 'rerank'. Raises ValueError when the requester
manifest does not list the requested type. This is a server-side guard so
a model cannot be attached to a provider that does not support it, even if
the frontend tab restriction is bypassed.
"""
model_mgr = getattr(ap, 'model_mgr', None)
if model_mgr is None:
return
provider_dict = getattr(model_mgr, 'provider_dict', None)
if not provider_dict:
return
runtime_provider = provider_dict.get(provider_uuid)
if runtime_provider is None:
return
requester_name = getattr(getattr(runtime_provider, 'provider_entity', None), 'requester', None)
if not requester_name:
return
get_manifest = getattr(model_mgr, 'get_available_requester_manifest_by_name', None)
if not callable(get_manifest):
return
manifest = get_manifest(requester_name)
if manifest is None:
return
spec = getattr(manifest, 'spec', None) or {}
support_type = spec.get('support_type') if isinstance(spec, dict) else None
# When a manifest omits support_type, do not block (backward compatible).
if not support_type:
return
if model_type not in support_type:
raise ValueError(f'Provider requester "{requester_name}" does not support {model_type} models')
class LLMModelsService:
ap: app.Application
@@ -96,6 +136,8 @@ class LLMModelsService:
)
model_data['provider_uuid'] = provider_uuid
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'llm')
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(persistence_model.LLMModel).values(**model_data))
runtime_provider = self.ap.model_mgr.provider_dict.get(model_data['provider_uuid'])
@@ -274,6 +316,8 @@ class EmbeddingModelsService:
)
model_data['provider_uuid'] = provider_uuid
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'text-embedding')
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_model.EmbeddingModel).values(**model_data)
)
@@ -434,6 +478,8 @@ class RerankModelsService:
)
model_data['provider_uuid'] = provider_uuid
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'rerank')
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_model.RerankModel).values(**model_data)
)

View File

@@ -472,6 +472,179 @@ class MonitoringService:
'active_sessions': active_sessions,
}
async def get_token_statistics(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
bucket: str = 'hour',
) -> dict:
"""Get detailed token usage statistics for production observability.
Returns:
- summary: aggregate token counters and call/latency stats over the window
- by_model: per-model token + call breakdown (sorted by total tokens desc)
- timeseries: token usage bucketed by `bucket` ('hour' or 'day')
Only successful LLM calls are counted toward token totals; error calls are
reported separately so a spike in failures is visible without polluting
token accounting.
"""
LLMCall = persistence_monitoring.MonitoringLLMCall
conditions = []
if bot_ids:
conditions.append(LLMCall.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(LLMCall.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(LLMCall.timestamp >= start_time)
if end_time:
conditions.append(LLMCall.timestamp <= end_time)
def _apply(query):
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
return query
# ---- Summary aggregates ----
summary_query = _apply(
sqlalchemy.select(
sqlalchemy.func.count(LLMCall.id),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'success', 1), else_=0)),
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
# Count of successful calls that nonetheless recorded zero tokens —
# a data-quality signal that usage reporting may be broken upstream.
sqlalchemy.func.sum(
sqlalchemy.case(
(sqlalchemy.and_(LLMCall.status == 'success', LLMCall.total_tokens == 0), 1),
else_=0,
)
),
)
)
summary_result = await self.ap.persistence_mgr.execute_async(summary_query)
row = summary_result.first()
(
total_calls,
total_input_tokens,
total_output_tokens,
total_tokens,
total_duration,
total_cost,
success_calls,
error_calls,
zero_token_success_calls,
) = row if row else (0, 0, 0, 0, 0, 0.0, 0, 0, 0)
total_calls = total_calls or 0
success_calls = success_calls or 0
error_calls = error_calls or 0
zero_token_success_calls = zero_token_success_calls or 0
summary = {
'total_calls': total_calls,
'success_calls': success_calls,
'error_calls': error_calls,
'total_input_tokens': int(total_input_tokens or 0),
'total_output_tokens': int(total_output_tokens or 0),
'total_tokens': int(total_tokens or 0),
'total_cost': round(float(total_cost or 0.0), 6),
'avg_tokens_per_call': int((total_tokens or 0) / total_calls) if total_calls > 0 else 0,
'avg_duration_ms': int((total_duration or 0) / total_calls) if total_calls > 0 else 0,
'avg_tokens_per_second': round((total_output_tokens or 0) / (total_duration / 1000), 2)
if total_duration and total_duration > 0
else 0,
'zero_token_success_calls': zero_token_success_calls,
}
# ---- Per-model breakdown ----
by_model_query = _apply(
sqlalchemy.select(
LLMCall.model_name,
sqlalchemy.func.count(LLMCall.id),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
).group_by(LLMCall.model_name)
)
by_model_result = await self.ap.persistence_mgr.execute_async(by_model_query)
by_model = []
for mrow in by_model_result.all():
(
model_name,
m_calls,
m_in,
m_out,
m_total,
m_duration,
m_cost,
m_errors,
) = mrow
m_calls = m_calls or 0
by_model.append(
{
'model_name': model_name,
'calls': m_calls,
'error_calls': m_errors or 0,
'input_tokens': int(m_in or 0),
'output_tokens': int(m_out or 0),
'total_tokens': int(m_total or 0),
'cost': round(float(m_cost or 0.0), 6),
'avg_tokens_per_call': int((m_total or 0) / m_calls) if m_calls > 0 else 0,
'avg_duration_ms': int((m_duration or 0) / m_calls) if m_calls > 0 else 0,
}
)
by_model.sort(key=lambda x: x['total_tokens'], reverse=True)
# ---- Time-bucketed series ----
# Use a DB-agnostic bucketing approach: fetch (timestamp, tokens) rows and
# aggregate in Python. The window is bounded by the time filter, so this is
# cheap for typical dashboard ranges (hours/days).
series_query = _apply(
sqlalchemy.select(
LLMCall.timestamp,
LLMCall.input_tokens,
LLMCall.output_tokens,
LLMCall.total_tokens,
).order_by(LLMCall.timestamp.asc())
)
series_result = await self.ap.persistence_mgr.execute_async(series_query)
bucket_fmt = '%Y-%m-%d %H:00' if bucket == 'hour' else '%Y-%m-%d'
buckets: dict[str, dict] = {}
for srow in series_result.all():
ts, s_in, s_out, s_total = srow
if ts is None:
continue
key = ts.strftime(bucket_fmt)
b = buckets.setdefault(
key,
{'bucket': key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'calls': 0},
)
b['input_tokens'] += int(s_in or 0)
b['output_tokens'] += int(s_out or 0)
b['total_tokens'] += int(s_total or 0)
b['calls'] += 1
timeseries = [buckets[k] for k in sorted(buckets.keys())]
return {
'summary': summary,
'by_model': by_model,
'timeseries': timeseries,
'bucket': bucket,
}
async def get_messages(
self,
bot_ids: list[str] | None = None,

View File

@@ -82,7 +82,7 @@ class UserService:
payload = {
'user': user_email,
'iss': 'LangBot-' + constants.edition,
'exp': datetime.datetime.now() + datetime.timedelta(seconds=jwt_expire),
'exp': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=jwt_expire),
}
return jwt.encode(payload, jwt_secret, algorithm='HS256')

View File

@@ -16,7 +16,6 @@ importutil.import_modules_in_pkg(stages)
stage_order = [
'LoadConfigStage',
'MigrationStage',
'GenKeysStage',
'SetupLoggerStage',
'BuildAppStage',

View File

@@ -42,6 +42,7 @@ required_deps = {
'telegramify_markdown': 'telegramify-markdown',
'slack_sdk': 'slack_sdk',
'asyncpg': 'asyncpg',
'litellm': 'litellm',
}

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import abc
import typing
from . import app
preregistered_migrations: list[typing.Type[Migration]] = []
"""Currently not supported for extension"""
def migration_class(name: str, number: int):
"""Register a migration"""
def decorator(cls: typing.Type[Migration]) -> typing.Type[Migration]:
cls.name = name
cls.number = number
preregistered_migrations.append(cls)
return cls
return decorator
class Migration(abc.ABC):
"""A version migration"""
name: str
number: int
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
@abc.abstractmethod
async def need_migrate(self) -> bool:
"""Determine if the current environment needs to run this migration"""
pass
@abc.abstractmethod
async def run(self):
"""Run migration"""
pass

View File

@@ -1,24 +0,0 @@
from __future__ import annotations
import os
from .. import migration
@migration.migration_class('sensitive-word-migration', 1)
class SensitiveWordMigration(migration.Migration):
"""敏感词迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return os.path.exists('data/config/sensitive-words.json') and not os.path.exists(
'data/metadata/sensitive-words.json'
)
async def run(self):
"""执行迁移"""
# 移动文件
os.rename('data/config/sensitive-words.json', 'data/metadata/sensitive-words.json')
# 重新加载配置
await self.ap.sensitive_meta.load_config()

View File

@@ -1,44 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('openai-config-migration', 2)
class OpenAIConfigMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'openai-config' in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
old_openai_config = self.ap.provider_cfg.data['openai-config'].copy()
if 'keys' not in self.ap.provider_cfg.data:
self.ap.provider_cfg.data['keys'] = {}
if 'openai' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['openai'] = []
self.ap.provider_cfg.data['keys']['openai'] = old_openai_config['api-keys']
self.ap.provider_cfg.data['model'] = old_openai_config['chat-completions-params']['model']
del old_openai_config['chat-completions-params']['model']
if 'requester' not in self.ap.provider_cfg.data:
self.ap.provider_cfg.data['requester'] = {}
if 'openai-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['openai-chat-completions'] = {}
self.ap.provider_cfg.data['requester']['openai-chat-completions'] = {
'base-url': old_openai_config['base_url'],
'args': old_openai_config['chat-completions-params'],
'timeout': old_openai_config['request-timeout'],
}
del self.ap.provider_cfg.data['openai-config']
await self.ap.provider_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('anthropic-requester-config-completion', 3)
class AnthropicRequesterConfigCompletionMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'anthropic-messages' not in self.ap.provider_cfg.data['requester']
or 'anthropic' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'anthropic-messages' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['anthropic-messages'] = {
'base-url': 'https://api.anthropic.com',
'args': {'max_tokens': 1024},
'timeout': 120,
}
if 'anthropic' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['anthropic'] = []
await self.ap.provider_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('moonshot-config-completion', 4)
class MoonshotConfigCompletionMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'moonshot-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'moonshot' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'moonshot-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['moonshot-chat-completions'] = {
'base-url': 'https://api.moonshot.cn/v1',
'args': {},
'timeout': 120,
}
if 'moonshot' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['moonshot'] = []
await self.ap.provider_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('deepseek-config-completion', 5)
class DeepseekConfigCompletionMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'deepseek-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'deepseek' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'deepseek-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['deepseek-chat-completions'] = {
'base-url': 'https://api.deepseek.com',
'args': {},
'timeout': 120,
}
if 'deepseek' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['deepseek'] = []
await self.ap.provider_cfg.dump_config()

View File

@@ -1,19 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('vision-config', 6)
class VisionConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'enable-vision' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
if 'enable-vision' not in self.ap.provider_cfg.data:
self.ap.provider_cfg.data['enable-vision'] = False
await self.ap.provider_cfg.dump_config()

View File

@@ -1,20 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('qcg-center-url-config', 7)
class QCGCenterURLConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'qcg-center-url' not in self.ap.system_cfg.data
async def run(self):
"""执行迁移"""
if 'qcg-center-url' not in self.ap.system_cfg.data:
self.ap.system_cfg.data['qcg-center-url'] = 'https://api.qchatgpt.rockchin.top/api/v2'
await self.ap.system_cfg.dump_config()

View File

@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('ad-fixwin-cfg-migration', 8)
class AdFixwinConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return isinstance(self.ap.pipeline_cfg.data['rate-limit']['fixwin']['default'], int)
async def run(self):
"""执行迁移"""
for session_name in self.ap.pipeline_cfg.data['rate-limit']['fixwin']:
temp_dict = {
'window-size': 60,
'limit': self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name],
}
self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name] = temp_dict
await self.ap.pipeline_cfg.dump_config()

View File

@@ -1,22 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('msg-truncator-cfg-migration', 9)
class MsgTruncatorConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'msg-truncate' not in self.ap.pipeline_cfg.data
async def run(self):
"""执行迁移"""
self.ap.pipeline_cfg.data['msg-truncate'] = {
'method': 'round',
'round': {'max-round': 10},
}
await self.ap.pipeline_cfg.dump_config()

View File

@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('ollama-requester-config', 10)
class MsgTruncatorConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'ollama-chat' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['ollama-chat'] = {
'base-url': 'http://127.0.0.1:11434',
'args': {},
'timeout': 600,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,19 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('command-prefix-config', 11)
class CommandPrefixConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'command-prefix' not in self.ap.command_cfg.data
async def run(self):
"""执行迁移"""
self.ap.command_cfg.data['command-prefix'] = ['!', '']
await self.ap.command_cfg.dump_config()

View File

@@ -1,19 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('runner-config', 12)
class RunnerConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'runner' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['runner'] = 'local-agent'
await self.ap.provider_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('http-api-config', 13)
class HttpApiConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'http-api' not in self.ap.system_cfg.data or 'persistence' not in self.ap.system_cfg.data
async def run(self):
"""执行迁移"""
self.ap.system_cfg.data['http-api'] = {
'enable': True,
'host': '0.0.0.0',
'port': 5300,
'jwt-expire': 604800,
}
self.ap.system_cfg.data['persistence'] = {
'sqlite': {'path': 'data/persistence.db'},
'use': 'sqlite',
}
await self.ap.system_cfg.dump_config()

View File

@@ -1,22 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('force-delay-config', 14)
class ForceDelayConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return isinstance(self.ap.platform_cfg.data['force-delay'], list)
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['force-delay'] = {
'min': self.ap.platform_cfg.data['force-delay'][0],
'max': self.ap.platform_cfg.data['force-delay'][1],
}
await self.ap.platform_cfg.dump_config()

View File

@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('gitee-ai-config', 15)
class GiteeAIConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'gitee-ai-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'gitee-ai' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['gitee-ai-chat-completions'] = {
'base-url': 'https://ai.gitee.com/v1',
'args': {},
'timeout': 120,
}
self.ap.provider_cfg.data['keys']['gitee-ai'] = ['XXXXX']
await self.ap.provider_cfg.dump_config()

View File

@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dify-service-api-config', 16)
class DifyServiceAPICfgMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'dify-service-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api'] = {
'base-url': 'https://api.dify.ai/v1',
'app-type': 'chat',
'chat': {'api-key': 'app-1234567890'},
'workflow': {'api-key': 'app-1234567890', 'output-key': 'summary'},
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dify-api-timeout-params', 17)
class DifyAPITimeoutParamsMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat']
or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow']
or 'agent' not in self.ap.provider_cfg.data['dify-service-api']
)
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['agent'] = {
'api-key': 'app-1234567890',
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('xai-config', 18)
class XaiConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'xai-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['xai-chat-completions'] = {
'base-url': 'https://api.x.ai/v1',
'args': {},
'timeout': 120,
}
self.ap.provider_cfg.data['keys']['xai'] = ['xai-1234567890']
await self.ap.provider_cfg.dump_config()

View File

@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('zhipuai-config', 19)
class ZhipuaiConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'zhipuai-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['zhipuai-chat-completions'] = {
'base-url': 'https://open.bigmodel.cn/api/paas/v4',
'args': {},
'timeout': 120,
}
self.ap.provider_cfg.data['keys']['zhipuai'] = ['xxxxxxx']
await self.ap.provider_cfg.dump_config()

View File

@@ -1,36 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wecom-config', 20)
class WecomConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'wecom':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'wecom',
'enable': False,
'host': '0.0.0.0',
'port': 2290,
'corpid': '',
'secret': '',
'token': '',
'EncodingAESKey': '',
'contacts_secret': '',
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,35 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('lark-config', 21)
class LarkConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'lark':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'lark',
'enable': False,
'app_id': 'cli_abcdefgh',
'app_secret': 'XXXXXXXXXX',
'bot_name': 'LangBot',
'enable-webhook': False,
'port': 2285,
'encrypt-key': 'xxxxxxxxx',
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('lmstudio-config', 22)
class LmStudioConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'lmstudio-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['lmstudio-chat-completions'] = {
'base-url': 'http://127.0.0.1:1234/v1',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('siliconflow-config', 23)
class SiliconFlowConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'siliconflow-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['keys']['siliconflow'] = ['xxxxxxx']
self.ap.provider_cfg.data['requester']['siliconflow-chat-completions'] = {
'base-url': 'https://api.siliconflow.cn/v1',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,31 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('discord-config', 24)
class DiscordConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'discord':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'discord',
'enable': False,
'client_id': '1234567890',
'token': 'XXXXXXXXXX',
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,35 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('gewechat-config', 25)
class GewechatConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'gewechat':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'gewechat',
'enable': False,
'gewechat_url': 'http://your-gewechat-server:2531',
'gewechat_file_url': 'http://your-gewechat-server:2532',
'port': 2286,
'callback_url': 'http://your-callback-url:2286/gewechat/callback',
'app_id': '',
'token': '',
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,33 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('qqofficial-config', 26)
class QQOfficialConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'qqofficial':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'qqofficial',
'enable': False,
'appid': '',
'secret': '',
'port': 2284,
'token': '',
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,35 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wx-official-account-config', 27)
class WXOfficialAccountConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'officialaccount':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'officialaccount',
'enable': False,
'token': '',
'EncodingAESKey': '',
'AppID': '',
'AppSecret': '',
'host': '0.0.0.0',
'port': 2287,
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('bailian-requester-config', 28)
class BailianRequesterConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'bailian-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['keys']['bailian'] = ['sk-xxxxxxx']
self.ap.provider_cfg.data['requester']['bailian-chat-completions'] = {
'base-url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dashscope-app-api-config', 29)
class DashscopeAppAPICfgMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'dashscope-app-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dashscope-app-api'] = {
'app-type': 'agent',
'api-key': 'sk-1234567890',
'agent': {'app-id': 'Your_app_id', 'references_quote': '参考资料来自:'},
'workflow': {
'app-id': 'Your_app_id',
'references_quote': '参考资料来自:',
'biz_params': {'city': '北京', 'date': '2023-08-10'},
},
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,31 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('lark-config-cmpl', 30)
class LarkConfigCmplMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'lark':
if 'enable-webhook' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'lark':
if 'enable-webhook' not in adapter:
adapter['enable-webhook'] = False
if 'port' not in adapter:
adapter['port'] = 2285
if 'encrypt-key' not in adapter:
adapter['encrypt-key'] = 'xxxxxxxxx'
await self.ap.platform_cfg.dump_config()

View File

@@ -1,33 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dingtalk-config', 31)
class DingTalkConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'dingtalk':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'dingtalk',
'enable': False,
'client_id': '',
'client_secret': '',
'robot_code': '',
'robot_name': '',
}
)
await self.ap.platform_cfg.dump_config()

View File

@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('volcark-requester-config', 32)
class VolcArkRequesterConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'volcark-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['keys']['volcark'] = ['xxxxxxxx']
self.ap.provider_cfg.data['requester']['volcark-chat-completions'] = {
'base-url': 'https://ark.cn-beijing.volces.com/api/v3',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,24 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dify-thinking-config', 33)
class DifyThinkingConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
if 'options' not in self.ap.provider_cfg.data['dify-service-api']:
return True
if 'convert-thinking-tips' not in self.ap.provider_cfg.data['dify-service-api']['options']:
return True
return False
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['options'] = {'convert-thinking-tips': 'plain'}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from urllib.parse import urlparse
from .. import migration
@migration.migration_class('gewechat-file-url-config', 34)
class GewechatFileUrlConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'gewechat':
if 'gewechat_file_url' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'gewechat':
if 'gewechat_file_url' not in adapter:
parsed_url = urlparse(adapter['gewechat_url'])
adapter['gewechat_file_url'] = f'{parsed_url.scheme}://{parsed_url.hostname}:2532'
await self.ap.platform_cfg.dump_config()

View File

@@ -1,26 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wxoa-mode', 35)
class WxoaModeMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'Mode' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'Mode' not in adapter:
adapter['Mode'] = 'drop'
await self.ap.platform_cfg.dump_config()

View File

@@ -1,26 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wxoa-loading-message', 36)
class WxoaLoadingMessageMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'LoadingMessage' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'LoadingMessage' not in adapter:
adapter['LoadingMessage'] = 'AI正在思考中请发送任意内容获取回复。'
await self.ap.platform_cfg.dump_config()

View File

@@ -1,18 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('mcp-config', 37)
class MCPConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'mcp' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['mcp'] = {'servers': []}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('tg-dingtalk-markdown', 38)
class TgDingtalkMarkdownMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] in ['dingtalk', 'telegram']:
if 'markdown_card' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] in ['dingtalk', 'telegram']:
if 'markdown_card' not in adapter:
adapter['markdown_card'] = False
await self.ap.platform_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('modelscope-config-completion', 39)
class ModelScopeConfigCompletionMigration(migration.Migration):
"""ModelScope配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'modelscope-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'modelscope' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'modelscope-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['modelscope-chat-completions'] = {
'base-url': 'https://api-inference.modelscope.cn/v1',
'args': {},
'timeout': 120,
}
if 'modelscope' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['modelscope'] = []
await self.ap.provider_cfg.dump_config()

View File

@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('ppio-config', 40)
class PPIOConfigMigration(migration.Migration):
"""PPIO配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'ppio-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'ppio' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'ppio-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['ppio-chat-completions'] = {
'base-url': 'https://api.ppinfra.com/v3/openai',
'args': {},
'timeout': 120,
}
if 'ppio' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['ppio'] = []
await self.ap.provider_cfg.dump_config()

View File

@@ -1,17 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dingtalk_card_auto_layout', 41)
class DingTalkCardAutoLayoutMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return True
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters']['app']['dingtalk']['card_auto_layout'] = False
await self.ap.platform_cfg.dump_config()

View File

@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('weknora-api-config', 42)
class WeKnoraAPICfgMigration(migration.Migration):
"""WeKnora API 配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'weknora-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['weknora-api'] = {
'base-url': 'http://localhost:8080/api/v1',
'app-type': 'agent',
'api-key': '',
'agent-id': 'builtin-smart-reasoning',
'knowledge-base-ids': [],
'web-search-enabled': False,
'timeout': 120,
'base-prompt': '请回答用户的问题。',
}
await self.ap.provider_cfg.dump_config()

View File

@@ -1,30 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('deerflow-api-config', 43)
class DeerFlowAPICfgMigration(migration.Migration):
"""DeerFlow API 配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'deerflow-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['deerflow-api'] = {
'api-base': 'http://127.0.0.1:2026',
'api-key': '',
'auth-header': '',
'assistant-id': 'lead_agent',
'model-name': '',
'thinking-enabled': False,
'plan-mode': False,
'subagent-enabled': False,
'max-concurrent-subagents': 3,
'timeout': 300,
'recursion-limit': 1000,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -202,6 +202,16 @@ class LoadConfigStage(stage.BootingStage):
constants.instance_id = new_id
constants.edition = ap.instance_config.data.get('system', {}).get('edition', 'community')
# Instance creation timestamp: sourced from data/labels/instance_id.json.
# Instances created before this field existed (or supplied via
# system.instance_id) won't have it, so backfill with the current time
# and persist it via the dump below — from then on it stays stable.
instance_create_ts = ap.instance_id.data.get('instance_create_ts', 0)
if not isinstance(instance_create_ts, int) or instance_create_ts <= 0:
instance_create_ts = int(time.time())
ap.instance_id.data['instance_create_ts'] = instance_create_ts
constants.instance_create_ts = instance_create_ts
print(f'LangBot instance id: {constants.instance_id}')
print(f'LangBot edition: {constants.edition}')

View File

@@ -1,43 +0,0 @@
from __future__ import annotations
from .. import stage, app
from .. import migration
from ...utils import importutil
from .. import migrations
importutil.import_modules_in_pkg(migrations)
@stage.stage_class('MigrationStage')
class MigrationStage(stage.BootingStage):
"""Migration stage
These migrations are legacy, only performed in version 3.x
"""
async def run(self, ap: app.Application):
"""Run migration"""
if any(
[
ap.command_cfg is None,
ap.pipeline_cfg is None,
ap.platform_cfg is None,
ap.provider_cfg is None,
ap.system_cfg is None,
]
): # only run migration when version is 3.x
return
migrations = migration.preregistered_migrations
# Sort by migration number
migrations.sort(key=lambda x: x.number)
for migration_cls in migrations:
migration_instance = migration_cls(ap)
if await migration_instance.need_migrate():
await migration_instance.run()
print(f'Migration {migration_instance.name} executed')

View File

@@ -31,6 +31,7 @@ class LLMModel(Base):
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
provider_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
abilities = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default=[])
context_length = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
extra_args = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default={})
prefered_ranking = sqlalchemy.Column(sqlalchemy.Integer, nullable=False, default=0)
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now())

View File

@@ -0,0 +1,39 @@
"""add llm model context length
Revision ID: 0005_add_llm_context_length
Revises: 0004_add_mcp_readme
Create Date: 2026-06-07
"""
import sqlalchemy as sa
from alembic import op
revision = '0005_add_llm_context_length'
down_revision = '0004_add_mcp_readme'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add ``context_length`` to llm_models if the table exists and the column is
# missing. The table may have been created by create_all() with the column
# already present on fresh installs, so guard against duplicate-add; it may
# also be absent entirely (e.g. migrating a truly empty DB), so guard against
# a missing table too.
conn = op.get_bind()
inspector = sa.inspect(conn)
if 'llm_models' not in inspector.get_table_names():
return
columns = {column['name'] for column in inspector.get_columns('llm_models')}
if 'context_length' not in columns:
op.add_column('llm_models', sa.Column('context_length', sa.Integer(), nullable=True))
def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
if 'llm_models' not in inspector.get_table_names():
return
columns = {column['name'] for column in inspector.get_columns('llm_models')}
if 'context_length' in columns:
op.drop_column('llm_models', 'context_length')

View File

@@ -0,0 +1,36 @@
# Legacy migrations (DEPRECATED — do not add new files here)
This directory holds the **legacy 3.x database migration system**
(`DBMigration` subclasses in `dbmXXX_*.py`, registered via
`@migration.migration_class(N)` and run from `pkg/persistence/mgr.py`).
**This system is frozen. Do not add new `dbmXXX_*.py` migrations.**
The chain is capped at version 25 (`required_database_version = 25` in
`pkg/utils/constants.py`). These files exist only to upgrade pre-existing
3.x databases up to the Alembic baseline (`0001_baseline`). Removing them
would break in-place upgrades from old installations, so they are kept
read-only.
## All new schema changes use Alembic
Migrations now live in `pkg/persistence/alembic/versions/`. To create one:
```bash
uv run python -m langbot.pkg.persistence.alembic_runner autogenerate "description of your change"
```
(requires `data/config.yaml` to exist). Review and edit the generated
script before committing — Alembic migrations run automatically on startup
and must be idempotent and guard against missing tables (the test suite
runs them against empty databases).
### Rules for Alembic revision ids
- Keep the revision id **≤ 32 characters** — PostgreSQL stores
`alembic_version.version_num` as `varchar(32)` and will raise
`StringDataRightTruncationError` on overflow.
- Guard every `op` call against a missing table / missing column
(`inspector.get_table_names()` / `inspector.get_columns()`); fresh
installs create the schema via `create_all()` and stamp the baseline,
so migrations may run against tables that already match or do not exist.

View File

@@ -109,7 +109,7 @@ class PreProcessor(stage.PipelineStage):
if llm_model:
query.use_llm_model_uuid = llm_model.model_entity.uuid
if llm_model.model_entity.abilities.__contains__('func_call'):
if 'func_call' in (llm_model.model_entity.abilities or []):
# Get bound plugins and MCP servers for filtering tools
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
@@ -159,11 +159,7 @@ class PreProcessor(stage.PipelineStage):
# Check if this model supports vision, if not, remove all images
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
if (
selected_runner == 'local-agent'
and llm_model
and not llm_model.model_entity.abilities.__contains__('vision')
):
if selected_runner == 'local-agent' and llm_model and 'vision' not in (llm_model.model_entity.abilities or []):
for msg in query.messages:
if isinstance(msg.content, list):
for me in msg.content:
@@ -181,7 +177,7 @@ class PreProcessor(stage.PipelineStage):
plain_text += me.text
elif isinstance(me, platform_message.Image):
if selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
):
if me.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(me.base64))
@@ -202,7 +198,7 @@ class PreProcessor(stage.PipelineStage):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
):
if msg.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(msg.base64))

View File

@@ -689,6 +689,16 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
async def get_plugin_readme(self, plugin_author: str, plugin_name: str, language: str = 'en') -> str:
return await self.handler.get_plugin_readme(plugin_author, plugin_name, language)
async def get_plugin_logs(
self,
plugin_author: str,
plugin_name: str,
limit: int = 200,
level: str | None = None,
) -> list[dict[str, Any]]:
# Not cached: logs are live and change constantly.
return await self.handler.get_plugin_logs(plugin_author, plugin_name, limit, level)
@alru_cache(ttl=5 * 60)
async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]:
return await self.handler.get_plugin_assets(plugin_author, plugin_name, filepath)

View File

@@ -514,35 +514,6 @@ class RuntimeConnectionHandler(handler.Handler):
except Exception as e:
return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid)
@self.action(PluginToRuntimeAction.INVOKE_RERANK)
async def invoke_rerank(data: dict[str, Any]) -> handler.ActionResponse:
rerank_model_uuid = data['rerank_model_uuid']
query = data['query']
documents = data['documents']
top_k = data.get('top_k')
extra_args = data.get('extra_args', {})
try:
rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid)
except ValueError:
return handler.ActionResponse.error(
message=f'Rerank model with rerank_model_uuid {rerank_model_uuid} not found',
)
try:
scores = await rerank_model.provider.invoke_rerank(
model=rerank_model,
query=query,
documents=documents[:64],
extra_args=extra_args,
)
scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True)
if top_k is not None:
scored = scored[: int(top_k)]
return handler.ActionResponse.success(data={'results': scored})
except Exception as e:
return _make_rag_error_response(e, 'RerankError', rerank_model_uuid=rerank_model_uuid)
@self.action(PluginToRuntimeAction.VECTOR_UPSERT)
async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse:
collection_id = data['collection_id']
@@ -982,6 +953,31 @@ class RuntimeConnectionHandler(handler.Handler):
return readme_bytes.decode('utf-8')
async def get_plugin_logs(
self,
plugin_author: str,
plugin_name: str,
limit: int = 200,
level: str | None = None,
) -> list[dict[str, Any]]:
"""Get recent log lines captured from the plugin's stderr."""
try:
result = await self.call_action(
LangBotToRuntimeAction.GET_PLUGIN_LOGS,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
'limit': limit,
'level': level,
},
timeout=20,
)
except Exception:
traceback.print_exc()
return []
return result.get('logs', [])
async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]:
"""Get plugin assets"""
result = await self.call_action(

View File

@@ -37,11 +37,41 @@ class ModelManager:
self.requester_components = []
self.requester_dict = {}
@staticmethod
def _get_litellm_provider_from_manifest(component: engine.Component | None) -> str | None:
if component is None:
return None
spec = getattr(component, 'spec', None) or {}
litellm_provider = None
if isinstance(spec, dict):
litellm_provider = spec.get('litellm_provider')
else:
getter = getattr(spec, 'get', None)
if callable(getter):
try:
litellm_provider = getter('litellm_provider')
except Exception:
litellm_provider = None
if isinstance(litellm_provider, str) and litellm_provider:
return litellm_provider
return None
async def initialize(self):
self.requester_components = self.ap.discover.get_components_by_kind('LLMAPIRequester')
requester_dict: dict[str, type[requester.ProviderAPIRequester]] = {}
for component in self.requester_components:
# Skip components that use litellm_provider (they will use litellmchat.py instead)
litellm_provider = self._get_litellm_provider_from_manifest(component)
if litellm_provider:
self.ap.logger.debug(
f'Skipping Python class loading for {component.metadata.name} '
f'(uses litellm_provider={litellm_provider})'
)
continue
requester_dict[component.metadata.name] = component.get_python_component_class()
self.requester_dict = requester_dict
@@ -236,6 +266,7 @@ class ModelManager:
name=model_info.get('name', ''),
provider_uuid='',
abilities=model_info.get('abilities', []),
context_length=model_info.get('context_length'),
extra_args=model_info.get('extra_args', {}),
),
provider=runtime_provider,
@@ -294,13 +325,37 @@ class ModelManager:
else:
provider_entity = provider_info
if provider_entity.requester not in self.requester_dict:
raise provider_errors.RequesterNotFoundError(provider_entity.requester)
# Get requester manifest to check for litellm_provider
requester_manifest = self.get_available_requester_manifest_by_name(provider_entity.requester)
litellm_provider = self._get_litellm_provider_from_manifest(requester_manifest)
# Build config from base_url
config = {'base_url': provider_entity.base_url}
# Check if requester manifest specifies litellm_provider
if litellm_provider:
from .requesters import litellmchat
# Use unified LiteLLMRequester with provider prefix
# Map litellm_provider (YAML spec) to custom_llm_provider (config)
config['custom_llm_provider'] = litellm_provider
requester_inst = litellmchat.LiteLLMRequester(
ap=self.ap,
config=config,
)
self.ap.logger.debug(
f'Using LiteLLMRequester for {provider_entity.requester} '
f'with custom_llm_provider={config["custom_llm_provider"]}'
)
else:
# Use original requester class (for backward compatibility)
if provider_entity.requester not in self.requester_dict:
raise provider_errors.RequesterNotFoundError(provider_entity.requester)
requester_inst = self.requester_dict[provider_entity.requester](
ap=self.ap,
config=config,
)
requester_inst = self.requester_dict[provider_entity.requester](
ap=self.ap,
config={'base_url': provider_entity.base_url},
)
await requester_inst.initialize()
token_mgr = token.TokenManager(name=provider_entity.uuid, tokens=provider_entity.api_keys or [])
@@ -406,6 +461,7 @@ class ModelManager:
name=model_info.get('name', ''),
provider_uuid=model_info.get('provider_uuid', ''),
abilities=model_info.get('abilities', []),
context_length=model_info.get('context_length'),
extra_args=model_info.get('extra_args', {}),
)

View File

@@ -67,8 +67,8 @@ class RuntimeProvider:
if isinstance(result, tuple):
msg, usage_info = result
if usage_info:
input_tokens = usage_info.get('input_tokens', 0)
output_tokens = usage_info.get('output_tokens', 0)
input_tokens = usage_info.get('prompt_tokens', 0)
output_tokens = usage_info.get('completion_tokens', 0)
return msg
else:
return result
@@ -128,7 +128,6 @@ class RuntimeProvider:
start_time = time.time()
status = 'success'
error_message = None
# Note: Stream doesn't easily provide token counts, set to 0
input_tokens = 0
output_tokens = 0
@@ -143,6 +142,15 @@ class RuntimeProvider:
remove_think=remove_think,
):
yield chunk
# Extract usage from stream if available (stored by LiteLLM requester)
if query:
if query.variables is None:
query.variables = {}
if '_stream_usage' in query.variables:
usage_info = query.variables['_stream_usage']
input_tokens = usage_info.get('prompt_tokens', 0)
output_tokens = usage_info.get('completion_tokens', 0)
del query.variables['_stream_usage']
except Exception as e:
status = 'error'
error_message = str(e)

View File

@@ -1,17 +0,0 @@
from __future__ import annotations
import typing
import openai
from . import chatcmpl
class AI302ChatCompletions(chatcmpl.OpenAIChatCompletions):
"""302.AI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.302.ai/v1',
'timeout': 120,
}

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: 302.AI
icon: 302ai.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "302ai 302.AI 302 ai 中转 中转站 aggregator gpt claude gemini"
support_type:
- llm
- text-embedding

View File

@@ -1,370 +0,0 @@
from __future__ import annotations
import typing
import json
import platform
import socket
import anthropic
import httpx
from .. import errors, requester
from ....utils import image
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class AnthropicMessages(requester.ProviderAPIRequester):
"""Anthropic Messages API 请求器"""
client: anthropic.AsyncAnthropic
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.anthropic.com',
'timeout': 120,
}
async def initialize(self):
# 兼容 Windows 缺失 TCP_KEEPINTVL 和 TCP_KEEPCNT 的问题
if platform.system() == 'Windows':
if not hasattr(socket, 'TCP_KEEPINTVL'):
socket.TCP_KEEPINTVL = 0
if not hasattr(socket, 'TCP_KEEPCNT'):
socket.TCP_KEEPCNT = 0
httpx_client = anthropic._base_client.AsyncHttpxClientWrapper(
base_url=self.requester_cfg['base_url'],
# cast to a valid type because mypy doesn't understand our type narrowing
timeout=typing.cast(httpx.Timeout, self.requester_cfg['timeout']),
limits=anthropic._constants.DEFAULT_CONNECTION_LIMITS,
follow_redirects=True,
trust_env=True,
)
self.client = anthropic.AsyncAnthropic(
api_key='',
http_client=httpx_client,
base_url=self.requester_cfg['base_url'],
)
async def invoke_llm(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
self.client.api_key = model.provider.token_mgr.get_token()
args = extra_args.copy()
args['model'] = model.model_entity.name
# 处理消息
# system
system_role_message = None
for i, m in enumerate(messages):
if m.role == 'system':
system_role_message = m
break
if system_role_message:
messages.pop(i)
if isinstance(system_role_message, provider_message.Message) and isinstance(system_role_message.content, str):
args['system'] = system_role_message.content
req_messages = []
for m in messages:
if m.role == 'tool':
tool_call_id = m.tool_call_id
req_messages.append(
{
'role': 'user',
'content': [
{
'type': 'tool_result',
'tool_use_id': tool_call_id,
'is_error': False,
'content': [{'type': 'text', 'text': m.content}],
}
],
}
)
continue
msg_dict = m.dict(exclude_none=True)
if isinstance(m.content, str) and m.content.strip() != '':
msg_dict['content'] = [{'type': 'text', 'text': m.content}]
elif isinstance(m.content, list):
for i, ce in enumerate(m.content):
if ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
alter_image_ele = {
'type': 'image',
'source': {
'type': 'base64',
'media_type': f'image/{image_format}',
'data': image_b64,
},
}
msg_dict['content'][i] = alter_image_ele
if m.tool_calls:
for tool_call in m.tool_calls:
msg_dict['content'].append(
{
'type': 'tool_use',
'id': tool_call.id,
'name': tool_call.function.name,
'input': json.loads(tool_call.function.arguments),
}
)
del msg_dict['tool_calls']
req_messages.append(msg_dict)
args['messages'] = req_messages
if 'thinking' in args:
args['thinking'] = {'type': 'enabled', 'budget_tokens': 10000}
if funcs:
tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs)
if tools:
args['tools'] = tools
try:
resp = await self.client.messages.create(**args)
args = {
'content': '',
'role': resp.role,
}
assert type(resp) is anthropic.types.message.Message
for block in resp.content:
if not remove_think and block.type == 'thinking':
args['content'] = '<think>\n' + block.thinking + '\n</think>\n' + args['content']
elif block.type == 'text':
args['content'] += block.text
elif block.type == 'tool_use':
assert type(block) is anthropic.types.tool_use_block.ToolUseBlock
tool_call = provider_message.ToolCall(
id=block.id,
type='function',
function=provider_message.FunctionCall(name=block.name, arguments=json.dumps(block.input)),
)
if 'tool_calls' not in args:
args['tool_calls'] = []
args['tool_calls'].append(tool_call)
return provider_message.Message(**args)
except anthropic.AuthenticationError as e:
raise errors.RequesterError(f'api-key 无效: {e.message}')
except anthropic.BadRequestError as e:
raise errors.RequesterError(str(e.message))
except anthropic.NotFoundError as e:
if 'model: ' in str(e):
raise errors.RequesterError(f'模型无效: {e.message}')
else:
raise errors.RequesterError(f'请求地址无效: {e.message}')
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
self.client.api_key = model.provider.token_mgr.get_token()
args = extra_args.copy()
args['model'] = model.model_entity.name
args['stream'] = True
# 处理消息
# system
system_role_message = None
for i, m in enumerate(messages):
if m.role == 'system':
system_role_message = m
break
if system_role_message:
messages.pop(i)
if isinstance(system_role_message, provider_message.Message) and isinstance(system_role_message.content, str):
args['system'] = system_role_message.content
req_messages = []
for m in messages:
if m.role == 'tool':
tool_call_id = m.tool_call_id
req_messages.append(
{
'role': 'user',
'content': [
{
'type': 'tool_result',
'tool_use_id': tool_call_id,
'is_error': False, # 暂时直接写false
'content': [
{'type': 'text', 'text': m.content}
], # 这里要是list包裹应该是多个返回的情况type类型好像也可以填其他的暂时只写text
}
],
}
)
continue
msg_dict = m.dict(exclude_none=True)
if isinstance(m.content, str) and m.content.strip() != '':
msg_dict['content'] = [{'type': 'text', 'text': m.content}]
elif isinstance(m.content, list):
for i, ce in enumerate(m.content):
if ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
alter_image_ele = {
'type': 'image',
'source': {
'type': 'base64',
'media_type': f'image/{image_format}',
'data': image_b64,
},
}
msg_dict['content'][i] = alter_image_ele
if isinstance(msg_dict['content'], str) and msg_dict['content'] == '':
msg_dict['content'] = [] # 这里不知道为什么会莫名有个空导致content为字符
if m.tool_calls:
for tool_call in m.tool_calls:
msg_dict['content'].append(
{
'type': 'tool_use',
'id': tool_call.id,
'name': tool_call.function.name,
'input': json.loads(tool_call.function.arguments),
}
)
del msg_dict['tool_calls']
req_messages.append(msg_dict)
if 'thinking' in args:
args['thinking'] = {'type': 'enabled', 'budget_tokens': 10000}
args['messages'] = req_messages
if funcs:
tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs)
if tools:
args['tools'] = tools
try:
role = 'assistant' # 默认角色
# chunk_idx = 0
think_started = False
think_ended = False
finish_reason = False
tool_name = ''
tool_id = ''
async for chunk in await self.client.messages.create(**args):
content = ''
tool_call = {'id': None, 'function': {'name': None, 'arguments': None}, 'type': 'function'}
if isinstance(
chunk, anthropic.types.raw_content_block_start_event.RawContentBlockStartEvent
): # 记录开始
if chunk.content_block.type == 'tool_use':
if chunk.content_block.name is not None:
tool_name = chunk.content_block.name
if chunk.content_block.id is not None:
tool_id = chunk.content_block.id
tool_call['function']['name'] = tool_name
tool_call['function']['arguments'] = ''
tool_call['id'] = tool_id
if not remove_think:
if chunk.content_block.type == 'thinking' and not remove_think:
think_started = True
elif chunk.content_block.type == 'text' and chunk.index != 0 and not remove_think:
think_ended = True
continue
elif isinstance(chunk, anthropic.types.raw_content_block_delta_event.RawContentBlockDeltaEvent):
if chunk.delta.type == 'thinking_delta':
if think_started:
think_started = False
content = '<think>\n' + chunk.delta.thinking
elif remove_think:
continue
else:
content = chunk.delta.thinking
elif chunk.delta.type == 'text_delta':
if think_ended:
think_ended = False
content = '\n</think>\n' + chunk.delta.text
else:
content = chunk.delta.text
elif chunk.delta.type == 'input_json_delta':
tool_call['function']['arguments'] = chunk.delta.partial_json
tool_call['function']['name'] = tool_name
tool_call['id'] = tool_id
elif isinstance(chunk, anthropic.types.raw_content_block_stop_event.RawContentBlockStopEvent):
continue # 记录raw_content_block结束的
elif isinstance(chunk, anthropic.types.raw_message_delta_event.RawMessageDeltaEvent):
if chunk.delta.stop_reason == 'end_turn':
finish_reason = True
elif isinstance(chunk, anthropic.types.raw_message_stop_event.RawMessageStopEvent):
continue # 这个好像是完全结束
else:
# print(chunk)
self.ap.logger.debug(f'anthropic chunk: {chunk}')
continue
args = {
'content': content,
'role': role,
'is_final': finish_reason,
'tool_calls': None if tool_call['id'] is None else [tool_call],
}
# if chunk_idx == 0:
# chunk_idx += 1
# continue
# assert type(chunk) is anthropic.types.message.Chunk
yield provider_message.MessageChunk(**args)
# return llm_entities.Message(**args)
except anthropic.AuthenticationError as e:
raise errors.RequesterError(f'api-key 无效: {e.message}')
except anthropic.BadRequestError as e:
raise errors.RequesterError(str(e.message))
except anthropic.NotFoundError as e:
if 'model: ' in str(e):
raise errors.RequesterError(f'模型无效: {e.message}')
else:
raise errors.RequesterError(f'请求地址无效: {e.message}')

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: Anthropic
icon: anthropic.svg
spec:
litellm_provider: anthropic
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "anthropic Anthropic 克劳德 claude Claude Opus Sonnet Haiku 安thropic"
support_type:
- llm
provider_category: manufacturer

View File

@@ -0,0 +1,5 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#2932E1"/>
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">Baidu</text>
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">ERNIE</text>
</svg>

After

Width:  |  Height:  |  Size: 396 B

View File

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: baidu-chat-completions
label:
en_US: Baidu ERNIE
zh_Hans: 百度文心一言
icon: baidu.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "baidu Baidu 百度 千帆 qianfan wenxin 文心 文心一言 ernie ERNIE bce embedding bce-reranker"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer

View File

@@ -1,242 +0,0 @@
from __future__ import annotations
import typing
import dashscope
import openai
from . import modelscopechatcmpl
from .. import requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class BailianChatCompletions(modelscopechatcmpl.ModelScopeChatCompletions):
"""阿里云百炼大模型平台 ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'timeout': 120,
}
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
is_use_dashscope_call = False # 是否使用阿里原生库调用
is_enable_multi_model = True # 是否支持多轮对话
use_time_num = 0 # 模型已调用次数,防止存在多文件时重复调用
use_time_ids = [] # 已调用的ID列表
message_id = 0 # 记录消息序号
for msg in messages:
# print(msg)
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
elif me['type'] == 'file_url' and '.' in me.get('file_name', ''):
# 1. 视频文件推理
# https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2845871
file_type = me.get('file_name').lower().split('.')[-1]
if file_type in ['mp4', 'avi', 'mkv', 'mov', 'flv', 'wmv']:
me['type'] = 'video_url'
me['video_url'] = {'url': me['file_url']}
del me['file_url']
del me['file_name']
use_time_num += 1
use_time_ids.append(message_id)
is_enable_multi_model = False
# 2. 语音文件识别, 无法通过openai的audio字段传递暂时不支持
# https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2979031
elif file_type in [
'aac',
'amr',
'aiff',
'flac',
'm4a',
'mp3',
'mpeg',
'ogg',
'opus',
'wav',
'webm',
'wma',
]:
me['audio'] = me['file_url']
me['type'] = 'audio'
del me['file_url']
del me['type']
del me['file_name']
is_use_dashscope_call = True
use_time_num += 1
use_time_ids.append(message_id)
is_enable_multi_model = False
message_id += 1
# 使用列表推导式,保留不在 use_time_ids[:-1] 中的元素,仅保留最后一个多媒体消息
if not is_enable_multi_model and use_time_num > 1:
messages = [msg for idx, msg in enumerate(messages) if idx not in use_time_ids[:-1]]
if not is_enable_multi_model:
messages = [msg for msg in messages if 'resp_message_id' not in msg]
args['messages'] = messages
args['stream'] = True
# 流式处理状态
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
if is_use_dashscope_call:
response = dashscope.MultiModalConversation.call(
# 若没有配置环境变量请用百炼API Key将下行替换为api_key = "sk-xxx"
api_key=use_model.provider.token_mgr.get_token(),
model=use_model.model_entity.name,
messages=messages,
result_format='message',
asr_options={
# "language": "zh", # 可选,若已知音频的语种,可通过该参数指定待识别语种,以提升识别准确率
'enable_lid': True,
'enable_itn': False,
},
stream=True,
)
content_length_list = []
previous_length = 0 # 记录上一次的内容长度
for res in response:
chunk = res['output']
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta_content = choice['message'].content[0]['text']
finish_reason = choice['finish_reason']
content_length_list.append(len(delta_content))
else:
delta_content = ''
finish_reason = None
# 跳过空的第一个 chunk只有 role 没有内容)
if chunk_idx == 0 and not delta_content:
chunk_idx += 1
continue
# 检查 content_length_list 是否有足够的数据
if len(content_length_list) >= 2:
now_content = delta_content[previous_length : content_length_list[-1]]
previous_length = content_length_list[-1] # 更新上一次的长度
else:
now_content = delta_content # 第一次循环时直接使用 delta_content
previous_length = len(delta_content) # 更新上一次的长度
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': now_content if now_content else None,
'is_final': bool(finish_reason) and finish_reason != 'null',
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
else:
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# 处理 reasoning_content
if reasoning_content:
# accumulated_reasoning += reasoning_content
# 如果设置了 remove_think跳过 reasoning_content
if remove_think:
chunk_idx += 1
continue
# 第一次出现 reasoning_content添加 <think> 开始标签
if not thinking_started:
thinking_started = True
delta_content = '<think>\n' + reasoning_content
else:
# 继续输出 reasoning_content
delta_content = reasoning_content
elif thinking_started and not thinking_ended and delta_content:
# reasoning_content 结束normal content 开始,添加 </think> 结束标签
thinking_ended = True
delta_content = '\n</think>\n' + delta_content
# 处理工具调用增量
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] != '':
tool_id = tool_call['id']
if tool_call['function']['name'] is not None:
tool_name = tool_call['function']['name']
if tool_call['type'] is None:
tool_call['type'] = 'function'
tool_call['id'] = tool_id
tool_call['function']['name'] = tool_name
tool_call['function']['arguments'] = (
'' if tool_call['function']['arguments'] is None else tool_call['function']['arguments']
)
# 跳过空的第一个 chunk只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
# return

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: 阿里云百炼
icon: bailian.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,8 +23,10 @@ spec:
type: integer
required: true
default: 120
alias: "bailian 百炼 阿里 阿里云 aliyun alibaba dashscope 通义 通义千问 qwen Qwen tongyi gte-rerank text-embedding-v"
support_type:
- llm
- text-embedding
- rerank
provider_category: maas
execution:

View File

@@ -1,702 +0,0 @@
from __future__ import annotations
import asyncio
import typing
import openai
import openai.types.chat.chat_completion as chat_completion_module
import httpx
from .. import errors, requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class OpenAIChatCompletions(requester.ProviderAPIRequester):
"""OpenAI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.openai.com/v1',
'timeout': 120,
}
async def initialize(self):
self.client = openai.AsyncClient(
api_key=self.init_api_key,
base_url=self.requester_cfg['base_url'].replace(' ', ''),
timeout=self.requester_cfg['timeout'],
http_client=httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']),
)
def _mask_api_key(self, api_key: str | None) -> str:
if not api_key:
return ''
if len(api_key) <= 8:
return '****'
return f'{api_key[:4]}...{api_key[-4:]}'
def _infer_model_type(self, model_id: str) -> str:
normalized_model_id = (model_id or '').lower()
embedding_keywords = (
'embedding',
'embed',
'bge-',
'e5-',
'm3e',
'gte-',
'multilingual-e5',
'text-embedding',
)
return 'embedding' if any(keyword in normalized_model_id for keyword in embedding_keywords) else 'llm'
def _infer_model_abilities(self, item: dict[str, typing.Any], model_id: str) -> list[str]:
normalized_model_id = (model_id or '').lower()
abilities: set[str] = set()
def _flatten(value: typing.Any) -> list[str]:
if value is None:
return []
if isinstance(value, str):
return [value.lower()]
if isinstance(value, dict):
flattened: list[str] = []
for nested_value in value.values():
flattened.extend(_flatten(nested_value))
return flattened
if isinstance(value, (list, tuple, set)):
flattened: list[str] = []
for nested_value in value:
flattened.extend(_flatten(nested_value))
return flattened
return [str(value).lower()]
capability_tokens = _flatten(item.get('capabilities'))
capability_tokens.extend(_flatten(item.get('modalities')))
capability_tokens.extend(_flatten(item.get('input_modalities')))
capability_tokens.extend(_flatten(item.get('output_modalities')))
capability_tokens.extend(_flatten(item.get('supported_generation_methods')))
capability_tokens.extend(_flatten(item.get('supported_parameters')))
capability_tokens.extend(_flatten(item.get('architecture')))
combined_tokens = capability_tokens + [normalized_model_id]
vision_keywords = (
'vision',
'image',
'file',
'video',
'multimodal',
'vl',
'ocr',
'omni',
)
function_call_keywords = (
'function',
'tool',
'tools',
'tool_choice',
'tool_call',
'tool-use',
'tool_use',
)
if any(any(keyword in token for keyword in vision_keywords) for token in combined_tokens):
abilities.add('vision')
if any(any(keyword in token for keyword in function_call_keywords) for token in combined_tokens):
abilities.add('func_call')
return sorted(abilities)
def _normalize_modalities(self, value: typing.Any) -> list[str]:
normalized: list[str] = []
def _collect(item: typing.Any):
if item is None:
return
if isinstance(item, str):
for part in item.replace('->', ',').replace('+', ',').split(','):
token = part.strip().lower()
if token and token not in normalized:
normalized.append(token)
return
if isinstance(item, dict):
for nested in item.values():
_collect(nested)
return
if isinstance(item, (list, tuple, set)):
for nested in item:
_collect(nested)
return
_collect(value)
return normalized
def _extract_scan_metadata(self, item: dict[str, typing.Any], model_id: str) -> dict[str, typing.Any]:
display_name = item.get('name')
if not isinstance(display_name, str) or not display_name.strip() or display_name == model_id:
display_name = ''
description = item.get('description')
if not isinstance(description, str) or not description.strip():
description = ''
context_length = item.get('context_length')
if context_length is None and isinstance(item.get('top_provider'), dict):
context_length = item['top_provider'].get('context_length')
if not isinstance(context_length, int):
try:
context_length = int(context_length) if context_length is not None else None
except (TypeError, ValueError):
context_length = None
input_modalities = self._normalize_modalities(item.get('input_modalities'))
output_modalities = self._normalize_modalities(item.get('output_modalities'))
if isinstance(item.get('architecture'), dict):
if not input_modalities:
input_modalities = self._normalize_modalities(item['architecture'].get('input_modalities'))
if not output_modalities:
output_modalities = self._normalize_modalities(item['architecture'].get('output_modalities'))
owned_by = item.get('owned_by')
if not isinstance(owned_by, str) or not owned_by.strip():
owned_by = ''
return {
'display_name': display_name or None,
'description': description or None,
'context_length': context_length,
'owned_by': owned_by or None,
'input_modalities': input_modalities,
'output_modalities': output_modalities,
}
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
headers = {}
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
models_url = f'{self.requester_cfg["base_url"].rstrip("/")}/models'
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
response = await client.get(models_url, headers=headers)
response.raise_for_status()
payload = response.json()
models = []
for item in payload.get('data', []):
model_id = item.get('id')
if not model_id:
continue
models.append(
{
'id': model_id,
'name': model_id,
'type': self._infer_model_type(model_id),
'abilities': self._infer_model_abilities(item, model_id),
**self._extract_scan_metadata(item, model_id),
}
)
models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
return {
'models': models,
'debug': {
'request': {
'method': 'GET',
'url': models_url,
'headers': {
'Authorization': f'Bearer {self._mask_api_key(api_key)}' if api_key else '',
},
},
'response': payload,
},
}
async def _req(
self,
args: dict,
extra_body: dict = {},
) -> chat_completion_module.ChatCompletion:
return await self.client.chat.completions.create(**args, extra_body=extra_body)
async def _req_stream(
self,
args: dict,
extra_body: dict = {},
):
async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body):
yield chunk
async def _make_msg(
self,
chat_completion: chat_completion_module.ChatCompletion,
remove_think: bool = False,
) -> provider_message.Message:
if not isinstance(chat_completion, chat_completion_module.ChatCompletion):
raise TypeError(f'Expected ChatCompletion, got {type(chat_completion).__name__}: {chat_completion[:16]}')
chatcmpl_message = chat_completion.choices[0].message.model_dump()
# 确保 role 字段存在且不为 None
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
chatcmpl_message['role'] = 'assistant'
# 处理思维链
content = chatcmpl_message.get('content', '')
reasoning_content = chatcmpl_message.get('reasoning_content', None)
processed_content, _ = await self._process_thinking_content(
content=content, reasoning_content=reasoning_content, remove_think=remove_think
)
chatcmpl_message['content'] = processed_content
# 移除 reasoning_content 字段,避免传递给 Message
if 'reasoning_content' in chatcmpl_message:
del chatcmpl_message['reasoning_content']
message = provider_message.Message(**chatcmpl_message)
return message
async def _process_thinking_content(
self,
content: str,
reasoning_content: str = None,
remove_think: bool = False,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
reasoning_content: reasoning_content 字段内容
remove_think: 是否移除思维链
Returns:
(处理后的内容, 提取的思维链内容)
"""
thinking_content = ''
# 1. 从 reasoning_content 提取思维链
if reasoning_content:
thinking_content = reasoning_content
# 2. 从 content 中提取 <think> 标签内容
if content and '<think>' in content and '</think>' in content:
import re
think_pattern = r'<think>(.*?)</think>'
think_matches = re.findall(think_pattern, content, re.DOTALL)
if think_matches:
# 如果已有 reasoning_content则追加
if thinking_content:
thinking_content += '\n' + '\n'.join(think_matches)
else:
thinking_content = '\n'.join(think_matches)
# 移除 content 中的 <think> 标签
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# 3. 根据 remove_think 参数决定是否保留思维链
if remove_think:
return content, ''
else:
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
if thinking_content:
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
args['stream'] = True
# 流式处理状态
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
tool_id = ''
tool_name = ''
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# 处理 reasoning_content
if reasoning_content:
# accumulated_reasoning += reasoning_content
# 如果设置了 remove_think跳过 reasoning_content
if remove_think:
chunk_idx += 1
continue
# 第一次出现 reasoning_content添加 <think> 开始标签
if not thinking_started:
thinking_started = True
delta_content = '<think>\n' + reasoning_content
else:
# 继续输出 reasoning_content
delta_content = reasoning_content
elif thinking_started and not thinking_ended and delta_content:
# reasoning_content 结束normal content 开始,添加 </think> 结束标签
thinking_ended = True
delta_content = '\n</think>\n' + delta_content
# 处理 content 中已有的 <think> 标签(如果需要移除)
# if delta_content and remove_think and '<think>' in delta_content:
# import re
#
# # 移除 <think> 标签及其内容
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
# 处理工具调用增量
# delta_tool_calls = None
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] and tool_call['function']['name']:
tool_id = tool_call['id']
tool_name = tool_call['function']['name']
else:
tool_call['id'] = tool_id
tool_call['function']['name'] = tool_name
if tool_call['type'] is None:
tool_call['type'] = 'function'
# 跳过空的第一个 chunk只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
async def _closure(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
# 发送请求
resp = await self._req(args, extra_body=extra_args)
# 处理请求结果
message = await self._make_msg(resp, remove_think)
# Extract token usage from response
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return message, usage_info
async def invoke_llm(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
"""Invoke LLM and return message with usage info"""
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
for m in messages:
msg_dict = m.dict(exclude_none=True)
content = msg_dict.get('content')
if isinstance(content, list):
# 检查 content 列表中是否每个部分都是文本
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
# 将所有文本部分合并为一个字符串
msg_dict['content'] = '\n'.join(part['text'] for part in content)
req_messages.append(msg_dict)
try:
msg, usage_info = await self._closure(
query=query,
req_messages=req_messages,
use_model=model,
use_funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
)
return msg, usage_info
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
if 'context_length_exceeded' in str(e):
raise errors.RequesterError(f'上文过长,请重置会话: {error_message}')
else:
raise errors.RequesterError(f'请求参数错误: {error_message}')
except openai.AuthenticationError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'无效的 api-key: {error_message}')
except openai.NotFoundError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求路径错误: {error_message}')
except openai.RateLimitError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求过于频繁或余额不足: {error_message}')
except openai.APIConnectionError as e:
error_message = f'连接错误: {str(e)}'
raise errors.RequesterError(error_message)
except openai.APIError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求错误: {error_message}')
async def invoke_embedding(
self,
model: requester.RuntimeEmbeddingModel,
input_text: list[str],
extra_args: dict[str, typing.Any] = {},
) -> tuple[list[list[float]], dict]:
"""调用 Embedding API, returns (embeddings, usage_info)"""
self.client.api_key = model.provider.token_mgr.get_token()
args = {
'model': model.model_entity.name,
'input': input_text,
}
if model.model_entity.extra_args:
args.update(model.model_entity.extra_args)
args.update(extra_args)
try:
resp = await self.client.embeddings.create(**args)
# Extract usage info
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['prompt_tokens'] = resp.usage.prompt_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return [d.embedding for d in resp.data], usage_info
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
raise errors.RequesterError(f'请求参数错误: {e.message}')
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
for m in messages:
msg_dict = m.dict(exclude_none=True)
content = msg_dict.get('content')
if isinstance(content, list):
# 检查 content 列表中是否每个部分都是文本
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
# 将所有文本部分合并为一个字符串
msg_dict['content'] = '\n'.join(part['text'] for part in content)
req_messages.append(msg_dict)
try:
async for item in self._closure_stream(
query=query,
req_messages=req_messages,
use_model=model,
use_funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
):
yield item
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
if 'context_length_exceeded' in e.message:
raise errors.RequesterError(f'上文过长,请重置会话: {e.message}')
else:
raise errors.RequesterError(f'请求参数错误: {e.message}')
except openai.AuthenticationError as e:
raise errors.RequesterError(f'无效的 api-key: {e.message}')
except openai.NotFoundError as e:
raise errors.RequesterError(f'请求路径错误: {e.message}')
except openai.RateLimitError as e:
raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}')
except openai.APIError as e:
raise errors.RequesterError(f'请求错误: {e.message}')
async def invoke_rerank(
self,
model: requester.RuntimeRerankModel,
query: str,
documents: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.List[dict]:
"""Standard /rerank endpoint (Jina/Cohere/SiliconFlow/Voyage/DashScope compatible)
Supports extra_args from model.extra_args:
- rerank_url: full URL override (e.g. "https://dashscope.aliyuncs.com/compatible-api/v1/reranks")
- rerank_path: path override appended to base_url (e.g. "reranks" instead of default "rerank")
- Any other fields are merged into the request payload.
"""
api_key = model.provider.token_mgr.get_token()
base_url = self.requester_cfg.get('base_url', '').rstrip('/')
timeout = self.requester_cfg.get('timeout', 120)
merged_args = {}
if model.model_entity.extra_args:
merged_args.update(model.model_entity.extra_args)
if extra_args:
merged_args.update(extra_args)
rerank_url = merged_args.pop('rerank_url', None)
rerank_path = merged_args.pop('rerank_path', 'rerank')
if not rerank_url:
rerank_url = f'{base_url}/{rerank_path}'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
payload = {
'model': model.model_entity.name,
'query': query,
'documents': documents[:64],
'top_n': min(len(documents), 64),
}
if merged_args:
payload.update(merged_args)
try:
async with httpx.AsyncClient(trust_env=True, timeout=timeout) as client:
resp = await client.post(rerank_url, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
results = self._parse_rerank_response(data)
if results:
scores = [r.get('relevance_score', 0.0) for r in results]
min_score = min(scores)
max_score = max(scores)
if max_score - min_score > 1e-6:
for r in results:
r['relevance_score'] = (r['relevance_score'] - min_score) / (max_score - min_score)
return results
except httpx.HTTPStatusError as e:
raise errors.RequesterError(f'Rerank request failed: {e.response.status_code} - {e.response.text}')
except httpx.TimeoutException:
raise errors.RequesterError('Rerank request timed out')
except Exception as e:
raise errors.RequesterError(f'Rerank request error: {str(e)}')
@staticmethod
def _parse_rerank_response(data: dict) -> typing.List[dict]:
"""Parse rerank response from various providers.
Handles:
- Jina/Cohere/SiliconFlow: {"results": [{"index", "relevance_score"}]}
- Voyage AI: {"data": [{"index", "relevance_score"}]}
- DashScope: {"output": {"results": [{"index", "relevance_score"}]}}
"""
if 'results' in data:
return data['results']
if 'data' in data:
return data['data']
if 'output' in data and isinstance(data['output'], dict):
return data['output'].get('results', [])
return []

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: OpenAI
icon: openai.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,10 +23,10 @@ spec:
type: integer
required: true
default: 120
alias: "openai OpenAI 欧派 gpt GPT ChatGPT chatgpt o1 o3 o4 text-embedding 通用 openai兼容 compatible"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer
execution:
python:

View File

@@ -12,6 +12,7 @@ metadata:
icon: chroma.svg
spec:
config: []
alias: "chroma Chroma 向量 vector embedding 嵌入 chromadb"
support_type:
- text-embedding
provider_category: builtin

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: Cohere
icon: cohere.svg
spec:
litellm_provider: cohere
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "cohere Cohere rerank 重排 reranker rerank-english rerank-multilingual command"
support_type:
- rerank
provider_category: manufacturer

View File

@@ -1,17 +0,0 @@
from __future__ import annotations
import typing
import openai
from . import chatcmpl
class CompShareChatCompletions(chatcmpl.OpenAIChatCompletions):
"""CompShare ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.modelverse.cn/v1',
'timeout': 120,
}

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: 优云智算
icon: compshare.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,8 +23,11 @@ spec:
type: integer
required: true
default: 120
alias: "compshare 优刻得 ucloud UCloud 算力 共享算力 GPU"
support_type:
- llm
- text-embedding
- rerank
provider_category: maas
execution:
python:

View File

@@ -1,67 +0,0 @@
from __future__ import annotations
import typing
from . import chatcmpl
from .. import errors, requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class DeepseekChatCompletions(chatcmpl.OpenAIChatCompletions):
"""Deepseek ChatCompletion API 请求器"""
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.deepseek.com',
'timeout': 120,
}
async def _closure(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages
# deepseek 不支持多模态把content都转换成纯文字
for m in messages:
if 'content' in m and isinstance(m['content'], list):
m['content'] = ' '.join([c['text'] for c in m['content'] if 'text' in c])
args['messages'] = messages
# 发送请求
resp = await self._req(args, extra_body=extra_args)
# print(resp)
if resp is None:
raise errors.RequesterError('接口返回为空,请确定模型提供商服务是否正常')
# 处理请求结果
message = await self._make_msg(resp, remove_think)
# Extract token usage from response
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return message, usage_info

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: DeepSeek
icon: deepseek.svg
spec:
litellm_provider: deepseek
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "deepseek DeepSeek 深度求索 深度 求索 dpsk v3 r1 deepseek-chat deepseek-reasoner"
support_type:
- llm
provider_category: manufacturer

View File

@@ -0,0 +1,4 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#3B82F6"/>
<text x="30" y="32" font-family="Arial, sans-serif" font-size="12" font-weight="bold" fill="white" text-anchor="middle">豆包</text>
</svg>

After

Width:  |  Height:  |  Size: 282 B

View File

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: doubao-chat-completions
label:
en_US: ByteDance Doubao
zh_Hans: 字节豆包
icon: doubao.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://ark.cn-beijing.volces.com/api/v3
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "doubao 豆包 字节 字节跳动 bytedance volcengine 火山 火山引擎 ark 方舟 seed"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer

View File

@@ -1,205 +0,0 @@
from __future__ import annotations
import typing
import httpx
from . import chatcmpl
import uuid
from .. import requester
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
class GeminiChatCompletions(chatcmpl.OpenAIChatCompletions):
"""Google Gemini API 请求器"""
default_config: dict[str, typing.Any] = {
'base_url': 'https://generativelanguage.googleapis.com/v1beta/openai',
'timeout': 120,
}
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
models_url = 'https://generativelanguage.googleapis.com/v1beta/models'
params = {'key': api_key} if api_key else {}
all_models: list[dict[str, typing.Any]] = []
next_page_token = ''
last_payload: dict[str, typing.Any] = {}
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
while True:
request_params = dict(params)
if next_page_token:
request_params['pageToken'] = next_page_token
response = await client.get(models_url, params=request_params)
response.raise_for_status()
payload = response.json()
last_payload = payload
for item in payload.get('models', []):
model_name = item.get('name', '')
model_id = model_name.replace('models/', '', 1)
if not model_id:
continue
supported_methods = item.get('supportedGenerationMethods', []) or []
if 'embedContent' in supported_methods and 'generateContent' not in supported_methods:
model_type = 'embedding'
else:
model_type = 'llm'
all_models.append(
{
'id': model_id,
'name': model_id,
'type': model_type,
'abilities': self._infer_model_abilities(item, model_id),
'display_name': item.get('displayName') or None,
'description': item.get('description') or None,
'context_length': item.get('inputTokenLimit'),
'input_modalities': self._normalize_modalities(item.get('inputModalities')),
'output_modalities': self._normalize_modalities(item.get('outputModalities')),
}
)
next_page_token = payload.get('nextPageToken', '')
if not next_page_token:
break
all_models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
return {
'models': all_models,
'debug': {
'request': {
'method': 'GET',
'url': models_url,
'query': {'key': self._mask_api_key(api_key)} if api_key else {},
},
'response': last_payload,
},
}
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
args['stream'] = True
# 流式处理状态
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
tool_id = ''
tool_name = ''
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# 处理 reasoning_content
if reasoning_content:
# accumulated_reasoning += reasoning_content
# 如果设置了 remove_think跳过 reasoning_content
if remove_think:
chunk_idx += 1
continue
# 第一次出现 reasoning_content添加 <think> 开始标签
if not thinking_started:
thinking_started = True
delta_content = '<think>\n' + reasoning_content
else:
# 继续输出 reasoning_content
delta_content = reasoning_content
elif thinking_started and not thinking_ended and delta_content:
# reasoning_content 结束normal content 开始,添加 </think> 结束标签
thinking_ended = True
delta_content = '\n</think>\n' + delta_content
# 处理 content 中已有的 <think> 标签(如果需要移除)
# if delta_content and remove_think and '<think>' in delta_content:
# import re
#
# # 移除 <think> 标签及其内容
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
# 处理工具调用增量
# delta_tool_calls = None
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] == '' and tool_id == '':
tool_id = str(uuid.uuid4())
if tool_call['function']['name']:
tool_name = tool_call['function']['name']
tool_call['id'] = tool_id
tool_call['function']['name'] = tool_name
if tool_call['type'] is None:
tool_call['type'] = 'function'
# 跳过空的第一个 chunk只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: Google Gemini
icon: gemini.svg
spec:
litellm_provider: gemini
config:
- name: base_url
label:
@@ -22,8 +23,10 @@ spec:
type: integer
required: true
default: 120
alias: "gemini Gemini 谷歌 google Google 双子座 bard flash pro text-embedding-004"
support_type:
- llm
- text-embedding
provider_category: manufacturer
execution:
python:

View File

@@ -1,15 +0,0 @@
from __future__ import annotations
import typing
from . import ppiochatcmpl
class GiteeAIChatCompletions(ppiochatcmpl.PPIOChatCompletions):
"""Gitee AI ChatCompletions API 请求器"""
default_config: dict[str, typing.Any] = {
'base_url': 'https://ai.gitee.com/v1',
'timeout': 120,
}

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: Gitee AI
icon: giteeai.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "gitee Gitee 码云 gitee-ai gitee ai serverless bge embedding rerank"
support_type:
- llm
- text-embedding

View File

@@ -0,0 +1,4 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#F97316"/>
<text x="30" y="32" font-family="Arial, sans-serif" font-size="14" font-weight="bold" fill="white" text-anchor="middle">Groq</text>
</svg>

After

Width:  |  Height:  |  Size: 280 B

View File

@@ -0,0 +1,29 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: groq-chat-completions
label:
en_US: Groq
zh_Hans: Groq
icon: groq.svg
spec:
litellm_provider: groq
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://api.groq.com/openai/v1
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "groq Groq 高速 llama mixtral 推理加速 lpu"
support_type:
- llm
provider_category: manufacturer

View File

@@ -0,0 +1,5 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#0066FF"/>
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">iFlytek</text>
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">Spark</text>
</svg>

After

Width:  |  Height:  |  Size: 398 B

View File

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: iflytek-chat-completions
label:
en_US: iFlytek Spark
zh_Hans: 讯飞星火
icon: iflytek.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://spark-api-open.xf-yun.com/v1
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "iflytek 讯飞 科大讯飞 星火 spark xinghuo xunfei 讯飞星火"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer

View File

@@ -1,208 +0,0 @@
from __future__ import annotations
import openai
import typing
from . import chatcmpl
from .. import requester
import openai.types.chat.chat_completion as chat_completion
import re
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions):
"""接口 AI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.jiekou.ai/openai',
'timeout': 120,
}
is_think: bool = False
async def _make_msg(
self,
chat_completion: chat_completion.ChatCompletion,
remove_think: bool,
) -> provider_message.Message:
chatcmpl_message = chat_completion.choices[0].message.model_dump()
# print(chatcmpl_message.keys(), chatcmpl_message.values())
# 确保 role 字段存在且不为 None
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
chatcmpl_message['role'] = 'assistant'
reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None
# deepseek的reasoner模型
chatcmpl_message['content'] = await self._process_thinking_content(
chatcmpl_message['content'], reasoning_content, remove_think
)
# 移除 reasoning_content 字段,避免传递给 Message
if 'reasoning_content' in chatcmpl_message:
del chatcmpl_message['reasoning_content']
message = provider_message.Message(**chatcmpl_message)
return message
async def _process_thinking_content(
self,
content: str,
reasoning_content: str = None,
remove_think: bool = False,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
reasoning_content: reasoning_content 字段内容
remove_think: 是否移除思维链
Returns:
处理后的内容
"""
if remove_think:
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
else:
if reasoning_content is not None:
content = '<think>\n' + reasoning_content + '\n</think>\n' + content
return content
async def _make_msg_chunk(
self,
delta: dict[str, typing.Any],
idx: int,
) -> provider_message.MessageChunk:
# 处理流式chunk和完整响应的差异
# print(chat_completion.choices[0])
# 确保 role 字段存在且不为 None
if 'role' not in delta or delta['role'] is None:
delta['role'] = 'assistant'
reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
delta['content'] = '' if delta['content'] is None else delta['content']
# print(reasoning_content)
# deepseek的reasoner模型
if reasoning_content is not None:
delta['content'] += reasoning_content
message = provider_message.MessageChunk(**delta)
return message
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
args['stream'] = True
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
# reasoning_content = delta.get('reasoning_content', '')
if remove_think:
if delta['content'] is not None:
if '<think>' in delta['content'] and not thinking_started and not thinking_ended:
thinking_started = True
continue
elif delta['content'] == r'</think>' and not thinking_ended:
thinking_ended = True
continue
elif thinking_ended and delta['content'] == '\n\n' and thinking_started:
thinking_started = False
continue
elif thinking_started and not thinking_ended:
continue
# delta_tool_calls = None
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] and tool_call['function']['name']:
tool_id = tool_call['id']
tool_name = tool_call['function']['name']
if tool_call['id'] is None:
tool_call['id'] = tool_id
if tool_call['function']['name'] is None:
tool_call['function']['name'] = tool_name
if tool_call['function']['arguments'] is None:
tool_call['function']['arguments'] = ''
if tool_call['type'] is None:
tool_call['type'] = 'function'
# 跳过空的第一个 chunk只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: 接口 AI
icon: jiekouai.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -29,9 +30,11 @@ spec:
type: int
required: true
default: 120
alias: "jiekouai 接口AI 接口 jiekou ai 中转 中转站 aggregator"
support_type:
- llm
- text-embedding
- rerank
provider_category: maas
execution:
python:

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: Jina
icon: jina.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "jina Jina jina-ai jinaai rerank 重排 reranker jina-reranker embedding"
support_type:
- rerank
provider_category: manufacturer

View File

@@ -0,0 +1,745 @@
"""LiteLLM unified requester for chat, embedding, and rerank."""
from __future__ import annotations
import typing
import litellm
from litellm import acompletion, aembedding, arerank
from .. import errors, requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class LiteLLMRequester(requester.ProviderAPIRequester):
"""LiteLLM unified API requester supporting chat, embedding, and rerank."""
_EMBEDDING_MODEL_HINTS = ('embedding', 'embed', 'bge-', 'e5-', 'm3e', 'gte-', 'text-embedding')
_RERANK_MODEL_HINTS = ('rerank', 're-rank', 're_rank')
default_config: dict[str, typing.Any] = {
'base_url': '',
'timeout': 120,
'custom_llm_provider': '',
'drop_params': False,
'num_retries': 0,
'api_version': '',
}
async def initialize(self):
"""Initialize LiteLLM client settings."""
# LiteLLM doesn't require explicit client initialization
# Configuration is passed per-request via litellm params
pass
def _build_litellm_model_name(self, model_name: str, custom_llm_provider: str | None = None) -> str:
"""Build LiteLLM model name with provider prefix if needed."""
provider = custom_llm_provider or self.requester_cfg.get('custom_llm_provider', '')
if provider:
# LiteLLM format: provider/model_name
if model_name.startswith(f'{provider}/'):
return model_name
return f'{provider}/{model_name}'
# If no custom provider, assume model_name already includes prefix or is OpenAI-compatible
return model_name
def _get_custom_llm_provider(self) -> str | None:
return self.requester_cfg.get('custom_llm_provider') or None
def _safe_litellm_bool_helper(self, helper_name: str, model_name: str) -> bool:
"""Call a LiteLLM boolean capability helper without letting metadata gaps fail requests."""
helper = getattr(litellm, helper_name, None)
if not callable(helper):
return False
provider = self._get_custom_llm_provider()
candidates: list[tuple[str, str | None]] = [(model_name, provider)]
litellm_model_name = self._build_litellm_model_name(model_name)
if litellm_model_name != model_name:
candidates.append((litellm_model_name, None))
for metadata_provider in self._metadata_provider_candidates(model_name):
candidates.append((f'{metadata_provider}/{model_name}', None))
tried_candidates: set[tuple[str, str | None]] = set()
for candidate_model, candidate_provider in candidates:
candidate_key = (candidate_model, candidate_provider)
if candidate_key in tried_candidates:
continue
tried_candidates.add(candidate_key)
try:
if bool(helper(model=candidate_model, custom_llm_provider=candidate_provider)):
return True
except Exception:
continue
return False
@staticmethod
def _positive_int(value: typing.Any) -> int | None:
if isinstance(value, bool):
return None
if isinstance(value, int) and value > 0:
return value
if isinstance(value, str) and value.isdigit():
parsed_value = int(value)
if parsed_value > 0:
return parsed_value
return None
def _context_length_from_scan_payload(self, model_payload: dict[str, typing.Any] | None) -> int | None:
if not model_payload:
return None
for field_name in ('context_length', 'context_window', 'max_context_length'):
context_length = self._positive_int(model_payload.get(field_name))
if context_length is not None:
return context_length
return None
def _context_length_from_litellm_model_info(self, model_info: typing.Any) -> int | None:
if isinstance(model_info, dict):
return self._positive_int(model_info.get('max_input_tokens'))
return self._positive_int(getattr(model_info, 'max_input_tokens', None))
def _metadata_provider_candidates(self, model_name: str) -> list[str]:
normalized_model_name = (model_name or '').lower()
candidates = []
if normalized_model_name.startswith(('moonshot-', 'kimi-')):
candidates.append('moonshot')
if normalized_model_name.startswith('deepseek-'):
candidates.append('deepseek')
base_url = self.requester_cfg.get('base_url', '').lower()
if 'moonshot' in base_url:
candidates.append('moonshot')
if 'deepseek' in base_url:
candidates.append('deepseek')
deduped_candidates = []
for candidate in candidates:
if candidate not in deduped_candidates:
deduped_candidates.append(candidate)
return deduped_candidates
def _known_context_length_fallback(self, model_name: str) -> int | None:
normalized_model_name = (model_name or '').lower()
if normalized_model_name.startswith('deepseek-v4-'):
return 1_000_000
if normalized_model_name.startswith(('kimi-k2.5', 'kimi-k2.6')):
return 256 * 1024
if normalized_model_name.startswith('moonshot-v1-8k'):
return 8 * 1024
if normalized_model_name.startswith('moonshot-v1-32k'):
return 32 * 1024
if normalized_model_name.startswith('moonshot-v1-128k') or normalized_model_name == 'moonshot-v1-auto':
return 128 * 1024
return None
def _safe_context_length(self, model_name: str) -> int | None:
helper = getattr(litellm, 'get_model_info', None)
if not callable(helper):
return self._known_context_length_fallback(model_name)
candidates = [model_name]
litellm_model_name = self._build_litellm_model_name(model_name)
if litellm_model_name != model_name:
candidates.append(litellm_model_name)
for provider in self._metadata_provider_candidates(model_name):
candidates.append(f'{provider}/{model_name}')
tried_candidates = []
for candidate in candidates:
if candidate in tried_candidates:
continue
tried_candidates.append(candidate)
try:
model_info = helper(candidate)
except Exception:
continue
context_length = self._context_length_from_litellm_model_info(model_info)
if context_length is not None:
return context_length
return self._known_context_length_fallback(model_name)
def _supports_function_calling(self, model_name: str) -> bool:
return self._safe_litellm_bool_helper('supports_function_calling', model_name)
def _supports_vision(self, model_name: str) -> bool:
return self._safe_litellm_bool_helper('supports_vision', model_name)
def _infer_model_type(self, model_id: str) -> str:
normalized_id = (model_id or '').lower()
if any(kw in normalized_id for kw in self._RERANK_MODEL_HINTS):
return 'rerank'
if any(kw in normalized_id for kw in self._EMBEDDING_MODEL_HINTS):
return 'embedding'
return 'llm'
def _enrich_scanned_model(
self,
model_id: str,
model_payload: dict[str, typing.Any] | None = None,
) -> dict[str, typing.Any]:
model_type = self._infer_model_type(model_id)
scanned_model: dict[str, typing.Any] = {
'id': model_id,
'name': model_id,
'type': model_type,
}
if model_type == 'llm':
abilities = []
if self._supports_function_calling(model_id):
abilities.append('func_call')
supports_provider_reported_vision = bool(
model_payload
and (model_payload.get('supports_image_in') is True or model_payload.get('supports_vision') is True)
)
if supports_provider_reported_vision or self._supports_vision(model_id):
abilities.append('vision')
scanned_model['abilities'] = abilities
context_length = self._context_length_from_scan_payload(model_payload)
if context_length is None:
context_length = self._safe_context_length(model_id)
if context_length is not None:
scanned_model['context_length'] = context_length
return scanned_model
def _convert_messages(self, messages: typing.List[provider_message.Message]) -> list[dict]:
"""Convert LangBot messages to LiteLLM/OpenAI format."""
req_messages = []
for m in messages:
msg_dict = m.dict(exclude_none=True)
content = msg_dict.get('content')
if isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get('type') == 'image_base64':
part['image_url'] = {'url': part['image_base64']}
part['type'] = 'image_url'
del part['image_base64']
req_messages.append(msg_dict)
return req_messages
def _process_thinking_content(self, content: str, reasoning_content: str | None, remove_think: bool) -> str:
"""Process thinking/reasoning content.
Args:
content: The main content from response
reasoning_content: Separate reasoning content from model
remove_think: If True, remove thinking markers; if False, preserve them
Returns:
Processed content string
"""
# Extract and handle thinking tags
if content and 'CRETIRE_REASONING_BEGINk' in content and 'CRETIRE_REASONING_ENDk' in content:
import re
think_pattern = r'CRETIRE_REASONING_BEGINk(.*?)CRETIRE_REASONING_ENDk'
if remove_think:
# Remove thinking tags and their content from output
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# else: preserve thinking content as-is
# Handle separate reasoning_content field
# Currently we don't include reasoning_content in user-facing output regardless of remove_think
# because it's typically internal model reasoning, not user-visible thinking
return content or ''
@staticmethod
def _normalize_usage(usage: typing.Any) -> dict:
"""Normalize a LiteLLM/OpenAI usage object into a plain token dict.
Handles several real-world shapes returned by different upstreams:
- object with ``prompt_tokens`` / ``completion_tokens`` / ``total_tokens`` attrs
- dict with the same keys
- missing ``total_tokens`` (derived from prompt + completion)
- ``None`` / partially-populated usage (defaults to 0)
"""
if usage is None:
return {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
def _get(key: str) -> typing.Any:
if isinstance(usage, dict):
return usage.get(key)
return getattr(usage, key, None)
prompt_tokens = _get('prompt_tokens') or 0
completion_tokens = _get('completion_tokens') or 0
total_tokens = _get('total_tokens') or 0
# Some providers omit total_tokens in streaming usage; derive it.
if not total_tokens:
total_tokens = prompt_tokens + completion_tokens
return {
'prompt_tokens': int(prompt_tokens),
'completion_tokens': int(completion_tokens),
'total_tokens': int(total_tokens),
}
def _extract_usage(self, response) -> dict:
"""Extract usage info from a non-streaming LiteLLM response."""
return self._normalize_usage(getattr(response, 'usage', None))
@staticmethod
def _as_dict(value: typing.Any) -> dict:
if value is None:
return {}
if isinstance(value, dict):
return value
if hasattr(value, 'model_dump'):
return value.model_dump()
return {}
def _normalize_stream_tool_calls(
self,
raw_tool_calls: typing.Any,
tool_call_state: dict[int, dict[str, str]],
) -> list[dict] | None:
"""Fill OpenAI-style streaming tool-call deltas so MessageChunk can validate them."""
if not raw_tool_calls:
return None
normalized = []
for fallback_index, raw_tool_call in enumerate(raw_tool_calls):
tool_call = self._as_dict(raw_tool_call)
index = tool_call.get('index')
if not isinstance(index, int):
index = fallback_index
state = tool_call_state.setdefault(index, {'id': '', 'type': 'function', 'name': ''})
if tool_call.get('id'):
state['id'] = tool_call['id']
if tool_call.get('type'):
state['type'] = tool_call['type']
function = self._as_dict(tool_call.get('function'))
if function.get('name'):
state['name'] = function['name']
arguments = function.get('arguments')
if arguments is None:
arguments = ''
elif not isinstance(arguments, str):
arguments = str(arguments)
if not state['id'] or not state['name']:
continue
normalized.append(
{
'id': state['id'],
'type': state['type'] or 'function',
'function': {
'name': state['name'],
'arguments': arguments,
},
}
)
return normalized or None
def _build_common_args(self, args: dict, include_retry_params: bool = True) -> dict:
"""Apply common requester config to args dict."""
if self.requester_cfg.get('base_url'):
args['api_base'] = self.requester_cfg['base_url']
if self.requester_cfg.get('timeout'):
args['timeout'] = self.requester_cfg['timeout']
if include_retry_params:
if self.requester_cfg.get('drop_params'):
args['drop_params'] = self.requester_cfg['drop_params']
if self.requester_cfg.get('num_retries'):
args['num_retries'] = self.requester_cfg['num_retries']
if self.requester_cfg.get('api_version'):
args['api_version'] = self.requester_cfg['api_version']
return args
def _handle_litellm_error(self, e: Exception) -> None:
"""Convert LiteLLM exceptions to RequesterError. Never returns, always raises."""
# Check more specific exceptions first (they inherit from base exceptions)
if isinstance(e, litellm.ContextWindowExceededError):
raise errors.RequesterError(f'上下文长度超限: {str(e)}')
if isinstance(e, litellm.BadRequestError):
raise errors.RequesterError(f'请求参数错误: {str(e)}')
if isinstance(e, litellm.AuthenticationError):
raise errors.RequesterError(f'API key 无效: {str(e)}')
if isinstance(e, litellm.NotFoundError):
raise errors.RequesterError(f'模型或路径无效: {str(e)}')
if isinstance(e, litellm.RateLimitError):
raise errors.RequesterError(f'请求过于频繁或余额不足: {str(e)}')
if isinstance(e, litellm.Timeout):
raise errors.RequesterError(f'请求超时: {str(e)}')
if isinstance(e, litellm.APIConnectionError):
raise errors.RequesterError(f'连接错误: {str(e)}')
if isinstance(e, litellm.APIError):
raise errors.RequesterError(f'API 错误: {str(e)}')
raise errors.RequesterError(f'未知错误: {str(e)}')
async def _build_completion_args(
self,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
stream: bool = False,
) -> dict:
"""Build common completion arguments for invoke_llm and invoke_llm_stream."""
req_messages = self._convert_messages(messages)
model_name = self._build_litellm_model_name(model.model_entity.name)
api_key = model.provider.token_mgr.get_token()
args = {
'model': model_name,
'messages': req_messages,
'api_key': api_key,
}
if stream:
args['stream'] = True
args['stream_options'] = {'include_usage': True}
self._build_common_args(args)
# Apply model-level extra_args first, then call-level extra_args
if model.model_entity.extra_args:
args.update(model.model_entity.extra_args)
args.update(extra_args)
if funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(funcs)
if tools:
args['tools'] = tools
args.setdefault('tool_choice', 'auto')
return args
async def invoke_llm(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
"""Invoke LLM and return message with usage info."""
args = await self._build_completion_args(model, messages, funcs, extra_args, stream=False)
try:
response = await acompletion(**args)
message_data = response.choices[0].message.model_dump()
if 'role' not in message_data or message_data['role'] is None:
message_data['role'] = 'assistant'
content = message_data.get('content', '')
reasoning_content = message_data.get('reasoning_content', None)
message_data['content'] = self._process_thinking_content(content, reasoning_content, remove_think)
if 'reasoning_content' in message_data:
del message_data['reasoning_content']
message = provider_message.Message(**message_data)
usage_info = self._extract_usage(response)
return message, usage_info
except Exception as e:
self._handle_litellm_error(e)
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
"""Invoke LLM streaming and yield chunks."""
args = await self._build_completion_args(model, messages, funcs, extra_args, stream=True)
chunk_idx = 0
role = 'assistant'
tool_call_state: dict[int, dict[str, str]] = {}
try:
response = await acompletion(**args)
async for chunk in response:
# Capture usage whenever a chunk carries it.
#
# Important: many OpenAI-compatible gateways (e.g. new-api) and
# providers send the final usage payload in a chunk that STILL
# contains a (empty-delta) choice, not an empty `choices` list.
# The previous implementation only captured usage when `choices`
# was empty, so streamed calls always recorded 0 tokens.
# We therefore capture usage independently of `choices`, and then
# fall through to also process any content this chunk may carry.
if getattr(chunk, 'usage', None):
usage_info = self._normalize_usage(chunk.usage)
if query is not None:
if query.variables is None:
query.variables = {}
query.variables['_stream_usage'] = usage_info
if not hasattr(chunk, 'choices') or not chunk.choices:
continue
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
if 'role' in delta and delta['role']:
role = delta['role']
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# Handle reasoning_content based on remove_think flag
if reasoning_content:
if remove_think:
# Skip reasoning content when remove_think is True
chunk_idx += 1
continue
else:
# Use reasoning_content as the displayed content
delta_content = reasoning_content
tool_calls = self._normalize_stream_tool_calls(delta.get('tool_calls'), tool_call_state)
if chunk_idx == 0 and not delta_content and not tool_calls:
chunk_idx += 1
continue
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': tool_calls,
'is_final': bool(finish_reason),
}
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
except Exception as e:
self._handle_litellm_error(e)
async def invoke_embedding(
self,
model: requester.RuntimeEmbeddingModel,
input_text: list[str],
extra_args: dict[str, typing.Any] = {},
) -> tuple[list[list[float]], dict]:
"""Invoke embedding and return vectors with usage info."""
model_name = self._build_litellm_model_name(model.model_entity.name)
api_key = model.provider.token_mgr.get_token()
args = {
'model': model_name,
'input': input_text,
'api_key': api_key,
}
self._build_common_args(args, include_retry_params=False)
if model.model_entity.extra_args:
args.update(model.model_entity.extra_args)
args.update(extra_args)
try:
response = await aembedding(**args)
# LiteLLM returns response.data entries either as objects with an
# `.embedding` attribute or as plain dicts (many OpenAI-compatible
# gateways, e.g. new-api, yield dict-shaped entries). Handle both.
embeddings = [d['embedding'] if isinstance(d, dict) else d.embedding for d in response.data]
usage_info = self._extract_usage(response)
return embeddings, usage_info
except Exception as e:
self._handle_litellm_error(e)
async def invoke_rerank(
self,
model: requester.RuntimeRerankModel,
query: str,
documents: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.List[dict]:
"""Invoke rerank and return relevance scores."""
model_name = self._build_litellm_model_name(model.model_entity.name)
api_key = model.provider.token_mgr.get_token()
top_n = min(len(documents), 64)
provider = self._get_custom_llm_provider()
try:
# LiteLLM's rerank API does not support the `openai` provider
# (litellm/rerank_api/main.py raises "Unsupported provider: openai").
# OpenAI-compatible gateways (newapi / one-api / vLLM / Xinference, etc.)
# expose the standard Jina/Cohere-style POST /v1/rerank endpoint, so
# call it directly over HTTP for openai-compatible (or unspecified) providers.
if provider in (None, '', 'openai'):
results = await self._invoke_rerank_openai_compatible(
model_name=model.model_entity.name,
query=query,
documents=documents,
api_key=api_key,
top_n=top_n,
extra_args={**(model.model_entity.extra_args or {}), **extra_args},
)
else:
args = {
'model': model_name,
'query': query,
'documents': documents,
'api_key': api_key,
'top_n': top_n,
}
self._build_common_args(args, include_retry_params=False)
if model.model_entity.extra_args:
args.update(model.model_entity.extra_args)
args.update(extra_args)
response = await arerank(**args)
results = []
for r in response.results:
results.append(
{
'index': r.get('index', 0),
'relevance_score': r.get('relevance_score', 0.0),
}
)
if results:
scores = [r['relevance_score'] for r in results]
min_score = min(scores)
max_score = max(scores)
if max_score - min_score > 1e-6:
for r in results:
r['relevance_score'] = (r['relevance_score'] - min_score) / (max_score - min_score)
return results
except errors.RequesterError:
raise
except Exception as e:
self._handle_litellm_error(e)
async def _invoke_rerank_openai_compatible(
self,
model_name: str,
query: str,
documents: typing.List[str],
api_key: str,
top_n: int,
extra_args: dict[str, typing.Any] = {},
) -> typing.List[dict]:
"""Call the standard Jina/Cohere-style POST /v1/rerank endpoint over HTTP.
Used for OpenAI-compatible gateways where litellm.arerank rejects the
`openai` provider. Returns the same shape as the litellm path:
a list of {'index': int, 'relevance_score': float}.
"""
import httpx
base_url = (self.requester_cfg.get('base_url') or '').rstrip('/')
if not base_url:
raise errors.RequesterError('Base URL required for rerank')
timeout = self.requester_cfg.get('timeout', 120)
headers = {'Content-Type': 'application/json'}
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
payload: dict[str, typing.Any] = {
'model': model_name,
'query': query,
'documents': documents,
'top_n': top_n,
}
if extra_args:
payload.update(extra_args)
rerank_url = f'{base_url}/rerank'
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(rerank_url, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as e:
body = ''
try:
body = e.response.text
except Exception:
pass
raise errors.RequesterError(f'rerank 请求失败 (HTTP {e.response.status_code}): {body or str(e)}')
except httpx.HTTPError as e:
raise errors.RequesterError(f'rerank 连接错误: {str(e)}')
raw_results = data.get('results', []) if isinstance(data, dict) else []
results = []
for r in raw_results:
results.append(
{
'index': r.get('index', 0),
'relevance_score': r.get('relevance_score', r.get('score', 0.0)) or 0.0,
}
)
return results
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
"""Scan models supported by the provider."""
import httpx
base_url = self.requester_cfg.get('base_url', '').rstrip('/')
timeout = self.requester_cfg.get('timeout', 120)
if not base_url:
raise errors.RequesterError('Base URL required for model scanning')
headers = {}
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
models_url = f'{base_url}/models'
try:
async with httpx.AsyncClient(trust_env=True, timeout=timeout) as client:
response = await client.get(models_url, headers=headers)
response.raise_for_status()
payload = response.json()
models = []
for item in payload.get('data', []):
model_id = item.get('id')
if not model_id:
continue
models.append(self._enrich_scanned_model(model_id, item))
models.sort(key=lambda x: (x['type'] != 'llm', x['name'].lower()))
return {'models': models}
except httpx.HTTPStatusError as e:
raise errors.RequesterError(f'Model scan failed: {e.response.status_code}')
except httpx.TimeoutException:
raise errors.RequesterError('Model scan timeout')
except Exception as e:
raise errors.RequesterError(f'Model scan error: {str(e)}')

View File

@@ -0,0 +1,65 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: litellm-chat
label:
en_US: LiteLLM (Unified)
zh_Hans: LiteLLM (统一请求器)
icon: litellm.svg
spec:
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: false
default: ''
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
- name: custom_llm_provider
label:
en_US: Custom Provider
zh_Hans: 自定义 Provider
type: string
required: false
default: ''
description:
en_US: Force provider type (e.g., anthropic, openai, gemini)
zh_Hans: 强制指定 provider 类型(如 anthropic, openai, gemini
- name: drop_params
label:
en_US: Drop Unsupported Params
zh_Hans: 丢弃不支持参数
type: boolean
required: false
default: false
- name: num_retries
label:
en_US: Number of Retries
zh_Hans: 重试次数
type: integer
required: false
default: 0
- name: api_version
label:
en_US: API Version
zh_Hans: API 版本
type: string
required: false
default: ''
alias: "litellm LiteLLM 通用 universal 万能 兼容 compatible proxy 代理 中转"
support_type:
- llm
- text-embedding
- rerank
provider_category: unified
execution:
python:
path: ./litellmchat.py
attr: LiteLLMRequester

View File

@@ -1,17 +0,0 @@
from __future__ import annotations
import typing
import openai
from . import chatcmpl
class LmStudioChatCompletions(chatcmpl.OpenAIChatCompletions):
"""LMStudio ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'http://127.0.0.1:1234/v1',
'timeout': 120,
}

View File

@@ -7,6 +7,7 @@ metadata:
zh_Hans: LM Studio
icon: lmstudio.webp
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "lmstudio LM Studio lm-studio 本地 local 本地部署 self-hosted gguf"
support_type:
- llm
- text-embedding

View File

@@ -0,0 +1,4 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#FF6700"/>
<text x="30" y="32" font-family="Arial, sans-serif" font-size="18" font-weight="bold" fill="white" text-anchor="middle">MiMo</text>
</svg>

After

Width:  |  Height:  |  Size: 280 B

View File

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: mimo-chat-completions
label:
en_US: Xiaomi MiMo
zh_Hans: 小米 MiMo
icon: mimo.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://api.xiaomimimo.com/v1
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "mimo MiMo 小米 xiaomi 小米大模型 xiaomi-mimo"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer

Some files were not shown because too many files have changed in this diff Show More