Compare commits

...

25 Commits

Author SHA1 Message Date
huanghuoguoguo d92b664136 test(monitoring): cover trace observability 2026-06-17 10:46:41 +08:00
huanghuoguoguo 8789c42eeb feat(monitoring): add host RAG trace observability 2026-06-17 00:13:57 +08:00
huanghuoguoguo b3c6de2072 [codex] cover frontend CRUD smoke flows (#2253)
* test: cover frontend CRUD smoke flows

* test: add bot CRUD smoke coverage

* test: add bot/pipeline advanced flows and cross-resource tests

- Bot enable/disable toggle with state persistence
- Bot detail tab switching (Configuration, Logs, Sessions)
- Bot form dirty state and save button behavior
- Bot name validation error display
- Pipeline tab switching (Configuration, Dashboard)
- Pipeline form dirty state
- Pipeline name validation error display
- Cross-resource flow: create pipeline then bind to bot
- Empty states for bots, pipelines, knowledge bases, MCP servers
2026-06-16 21:34:17 +08:00
RockChinQ 4e45886647 style(web): show Models above API Integration in main sidebar footer 2026-06-16 06:04:59 -04:00
RockChinQ f592656680 refactor(web): unify settings panel layouts with shared toolbar/body
- Add PanelToolbar/PanelBody primitives so all four settings tabs share
  the same top-toolbar + scrollable-body rhythm under the unified header.
- API panel: drop the heavy gray shadowed TabsList; move the create
  action into the toolbar next to the tabs, lighten per-tab hints.
- Storage panel: reuse PanelToolbar for the generated-at/refresh bar.
- Account panel: wrap content in PanelBody for consistent padding.
- Models panel: keep the pinned LangBot Models (Space) card at the very
  top, above the add-custom-provider row (intentional pin), using
  PanelBody instead of a top toolbar.
2026-06-16 06:02:20 -04:00
RockChinQ e9db858dcc feat(web): unified header for settings dialog, shorter sidebar labels
- Add a shared section header (icon + title + description) with right
  padding so the dialog close X no longer overlaps panel content, and
  every tab now shares the same top layout for a consistent look.
- Shorten inner sidebar nav labels (Models/API/Storage/Account) via new
  settingsDialog.nav.* i18n keys across all 8 locales.
- Add common.apiIntegrationDescription and account.settingsDescription
  for the new header.
2026-06-16 05:50:44 -04:00
RockChinQ 2d6faf9d5e refactor(web): drop legacy ModelsDialog, use unified SettingsDialog everywhere
The model-selector in dynamic forms (pipeline / knowledge base settings)
still opened the old standalone ModelsDialog. Point it at the unified
SettingsDialog (section pinned to models) and delete the now-unused
ModelsDialog wrapper so only the new dialog remains.
2026-06-16 05:41:58 -04:00
RockChinQ d4699547e9 i18n(web): localize Bots/Pipelines sidebar titles for es/th/vi
es-ES pipelines, th-TH bots+pipelines and vi-VN pipelines were left in
English in the sidebar. Translate them: es Flujos, th บอท/ไปป์ไลน์,
vi Quy trình.
2026-06-16 05:27:10 -04:00
RockChinQ 716d7aca94 fix(web): fixed-height settings dialog, narrower sidebar
Pin the dialog to a fixed 80vh (cap 800px) so switching sections no
longer resizes it; panels scroll their own content internally. Override
the SidebarProvider wrapper's default h-svh with h-full so both columns
fill the dialog height. Narrow the inner settings sidebar to w-44.
2026-06-16 05:22:42 -04:00
RockChinQ b3c00fe6da fix(web): use fixed height for settings dialog instead of 80vh
Avoid the dialog stretching to fill tall viewports (large empty space).
Pin to 620px with max-h-[85vh] fallback and narrow width to 52rem.
2026-06-16 05:18:14 -04:00
RockChinQ f4a6edf7ec refactor(web): unify settings dialogs into single dialog with sidebar
Merge API integration, model settings, account settings and storage
analysis into one SettingsDialog with a shadcn inner sidebar for
section switching. Preserve existing ?action= query-param deep links
(showModelSettings / showAccountSettings / showApiIntegrationSettings /
showStorageAnalysis) by mapping each to a section. Extract reusable
panels and keep ModelsDialog as a thin wrapper for the dynamic-form
model picker.
2026-06-16 05:06:06 -04:00
huanghuoguoguo f390980d0a test: format test suite (#2252) 2026-06-16 11:22:29 +08:00
huanghuoguoguo 1ae5aacc00 test: add frontend smoke and backend e2e CI (#2251) 2026-06-16 11:09:55 +08:00
huanghuoguoguo e9fe2f2d43 feat(agent-runner): support host tool lookup (#2244) 2026-06-14 11:29:57 +08:00
huanghuoguoguo 27be09ab15 fix(provider): preserve litellm usage details (#2246) 2026-06-14 11:12:29 +08:00
huanghuoguoguo 1ef4507d9a [codex] Delegate web page bot stream helpers (#2245)
* fix(platform): delegate web page bot stream helpers

* style(platform): format web page bot adapter
2026-06-14 10:57:53 +08:00
RockChinQ 2e7978317c chore(release): bump version to 4.10.2 2026-06-13 11:21:44 -04:00
RockChinQ b7d8332cb0 feat(telemetry): include instance_create_ts in heartbeat payload
Load the instance creation timestamp from data/labels/instance_id.json
(backfilling+persisting it for instances created before the field existed),
expose it as constants.instance_create_ts, and include it in the heartbeat
payload so Space can anchor Time-To-Value / onboarding analytics on real
install time rather than first-heartbeat.

Verified: py_compile, ruff, pytest tests/unit_tests/telemetry/ (37 passed).
2026-06-13 11:13:18 -04:00
huanghuoguoguo 7fe3eedeea fix(provider): use LiteLLM input window for context length (#2243) 2026-06-13 21:27:47 +08:00
RockChinQ b6fde30aa7 style(plugins): ruff format logs route 2026-06-13 08:03:29 -04:00
RockChinQ 5bfa38cbf2 feat(plugins): show plugin logs on detail page via Docs/Logs tablist
Add a Logs tab beside Documentation on the plugin detail page, showing
the output a plugin prints through the standard Python logger (per the
wiki style guide). Logs are captured from the plugin's stderr by the
plugin runtime and fetched on demand.

- Bump langbot-plugin pin to 0.4.4 (adds GET_PLUGIN_LOGS action)
- plugin_connector/handler: get_plugin_logs RPC client
- HTTP route GET /api/v1/plugins/<author>/<name>/logs (limit + level)
- Frontend: wrap detail right panel in Docs/Logs Tabs; PluginLogs
  component with level filter, manual + 3s auto refresh, bottom-follow
- i18n: 7 new keys across all 8 locales
2026-06-13 08:01:18 -04:00
RockChinQ a97d2040bb fix(i18n,api): backfill missing token-monitoring keys and fix JWT expiry tz
- i18n: add models.searchProviders, monitoring.tabs.tokens and the
  monitoring.tokens.* block (incl. bucket.hour/day) to es-ES, ja-JP,
  ru-RU, th-TH, vi-VN and zh-Hant, which were missing them and failed
  the Check i18n Keys CI.
- api: generate_jwt_token built 'exp' from a naive datetime.now(), which
  PyJWT validates against UTC — in any timezone ahead of UTC the token
  was already expired at issue time. Use datetime.now(timezone.utc).
2026-06-13 05:26:18 -04:00
RockChinQ a2c6c8201b refactor(persistence): freeze legacy DB migration chain, drop dbm026
The legacy pkg/persistence/migrations (DBMigration / dbmXXX) system now
coexists with Alembic but accepts no new migrations — all new schema
changes go through Alembic.

- remove dbm026_llm_model_context_length (superseded by Alembic
  0005_add_llm_context_length, which makes the identical change)
- cap required_database_version at 25 (legacy chain dbm001-025 kept
  read-only to upgrade pre-existing 3.x DBs to the Alembic baseline)
- add migrations/README.md documenting the freeze
- document the Alembic-only policy and revision-id/idempotency rules in
  AGENTS.md
2026-06-13 05:26:08 -04:00
RockChinQ 672abfe95d refactor(core): remove pre-3.x legacy config migration system
The pkg/core/migrations system (m001-m043 DBMigration-style config
migrations, MigrationStage, and the core.migration base class) only ever
ran when upgrading from LangBot 3.x. The last 3.x release is over a year
old and is no longer supported, so this dead code is removed entirely:

- delete pkg/core/migrations/ (43 mXXX_*.py + __init__)
- delete pkg/core/migration.py (base class + registry)
- delete pkg/core/stages/migrate.py (MigrationStage)
- drop 'MigrationStage' from boot.py stage_order
- delete tests/unit_tests/core/test_migration.py (tested the removed base class)
2026-06-13 05:26:01 -04:00
huanghuoguoguo 9ecb587ac0 refactor(provider): use LiteLLM as unified LLM requester backend (#2150)
* refactor(provider): use LiteLLM as unified LLM requester backend

  - Replace 23+ individual requester implementations with unified litellmchat.py
  - Add litellm_provider field to 27 YAML manifests for provider routing
  - Delete redundant requester subclasses
  - Add unit tests for LiteLLMRequester (29 tests)
  - Fix num_retries parameter name (was max_retries)
  - Fix exception handling order for subclass exceptions

  LiteLLM provides unified API for 100+ providers, eliminating need for
  provider-specific requesters.

* fix: ruff format provider.py

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(provider): simplify LiteLLM requester usage handling

  - Remove unused Anthropic-specific tool schema generation
  - Share completion argument construction between normal and streaming calls
  - Use LiteLLM/OpenAI native usage fields for monitoring
  - Collect stream token usage from LiteLLM stream_options
  - Update LiteLLM requester tests for unified usage fields

* restore: restore deleted provider requester files

Restore individual provider requester implementations that were
removed in de61b5d3. These files coexist with the unified
litellmchat.py backend.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat: update requesters and improve provider selection UI

- Added `litellm_provider` field to various requesters' YAML configurations.
- Removed obsolete Python requester files for OpenRouter, PPIO, QHAIGC, ShengSuanYun, SiliconFlow, Space, TokenPony, VolcArk, and Xai.
- Introduced new requesters for Tencent and Together AI with corresponding YAML configurations and SVG icons.
- Enhanced the ProviderForm component to include a searchable dropdown for selecting providers, improving user experience.
- Updated localization files to include search provider text for both English and Chinese.

* fix(provider): align litellm rebase with master

* fix(provider): capture streaming token usage; add token observability

The LiteLLM streaming requester only captured usage when a chunk had an
empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and
providers send the final usage payload in a chunk that still carries an
empty-delta choice, so streamed calls always recorded 0 tokens in the
monitoring logs/dashboard (non-streaming worked).

- Capture stream usage whenever a chunk carries it, regardless of choices
- Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens)
- Register litellm in bootutils/deps.py (was in pyproject only)
- Add MonitoringService.get_token_statistics + /monitoring/token-statistics
  endpoint: summary, per-model breakdown, token timeseries, and a
  zero-token-success data-quality signal
- Add TokenMonitoring dashboard tab (summary tiles, stacked token chart,
  per-model table) + i18n (en/zh)
- Regression tests for stream usage capture and usage normalization

Verified end-to-end against a real OpenAI-compatible endpoint with
gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both
streaming and non-streaming paths.

* refactor(provider): simplify litellm capabilities

* style: simplify wrapped expressions

* feat(models): persist context metadata

* fix(provider): handle dict embeddings and openai-compatible rerank in LiteLLMRequester

- invoke_embedding: support both object- and dict-shaped response.data
  entries (OpenAI-compatible gateways like new-api return dicts)
- invoke_rerank: litellm.arerank rejects the 'openai' provider, so for
  openai-compatible (or unspecified) providers call the standard
  Jina/Cohere-style POST /v1/rerank endpoint directly over HTTP
- accept both 'relevance_score' and 'score' fields in rerank results
- add unit tests for the openai-compatible HTTP rerank path

* feat(provider): enforce requester support_type when adding models

- frontend: AddModelPopover only shows model-type tabs (llm/embedding/
  rerank) that the provider's requester declares in its manifest
  support_type; ModelsDialog fetches requester manifests and maps
  requester -> support_type, passed down through ProviderCard
- backend: add _validate_provider_supports guard in create_llm_model /
  create_embedding_model / create_rerank_model so a model cannot be
  attached to a provider whose requester does not support that type,
  even if the frontend restriction is bypassed (manifests without
  support_type are allowed for backward compatibility)
- manifests: correct support_type for providers that do not offer all
  three model types:
  - llm only: anthropic, deepseek, groq, moonshot, openrouter, xai
  - llm + text-embedding: openai, gemini, mistral
  - add rerank to new-api (verified working via /v1/rerank)
  - set llm + text-embedding + rerank for aggregator/unknown gateways

* feat(provider): add searchable alias to requester manifests

- add a free-text 'alias' field to every requester manifest spec,
  containing the vendor's English/Chinese names, pinyin, common
  nicknames and flagship model-series names (e.g. moonshot -> kimi,
  月之暗面; zhipu -> glm, 智谱清言)
- frontend: ProviderForm requester search now also matches against
  alias (substring/contains), so searching 'kimi' surfaces Moonshot,
  '硅基' surfaces SiliconFlow, etc.
- also fix support_type: openrouter (relay) supports embedding+rerank;
  LangBot Space gains rerank (coming soon)

* fix(provider): make support_type guard defensive against incomplete model_mgr

- _validate_provider_supports now uses getattr to gracefully skip when
  model_mgr / provider_dict / manifest lookup is unavailable, instead of
  raising AttributeError (fixes unit tests that mock ap.model_mgr as a
  bare SimpleNamespace)
- add TestValidateProviderSupports covering: allow supported type,
  reject unsupported type, allow when support_type missing, allow when
  provider unknown, degrade safely when model_mgr is incomplete

* fix(persistence): guard 0004 migration against missing llm_models table

The 0004_add_llm_model_context_length migration called
inspector.get_columns('llm_models') unconditionally, raising
NoSuchTableError when the table does not exist (e.g. migrating a
fresh/empty DB, as exercised by the integration tests where
create_all() registers no tables because the ORM models are not
imported). Every other migration guards with a table-existence check
first; add the same guard here for both upgrade and downgrade.

Also restore the test head assertion to 0004 (it had been lowered to
0003 to mask this failure).

* Merge branch 'master' into feat/litellm

Resolve conflicts:
- uv.lock: regenerated via 'uv lock' to reconcile litellm/fastuuid
  (ours) with openai bump (master).
- Alembic migrations: master added 0004_add_mcp_readme while this
  branch added 0004_add_llm_model_context_length, both as children of
  0003 (would create multiple heads). Re-chain the litellm migration as
  0005_add_llm_model_context_length with down_revision=0004_add_mcp_readme
  for a single linear head. Update test head assertion accordingly.

* fix(persistence): shorten migration revision id to fit varchar(32)

PostgreSQL stores alembic_version.version_num as varchar(32).
'0005_add_llm_model_context_length' (33 chars) overflowed it, raising
StringDataRightTruncationError in the PG migration tests. Rename the
revision (and file) to '0005_add_llm_context_length' (27 chars) and
update the head assertions in both SQLite and PostgreSQL migration
tests.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: fdc310 <2213070223@qq.com>
Co-authored-by: RockChinQ <rockchinq@gmail.com>
2026-06-13 16:59:48 +08:00
315 changed files with 11194 additions and 8773 deletions
+46
View File
@@ -0,0 +1,46 @@
name: Frontend Tests
on:
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
paths:
- 'web/**'
- '.github/workflows/frontend-tests.yml'
push:
branches:
- master
- develop
paths:
- 'web/**'
- '.github/workflows/frontend-tests.yml'
jobs:
playwright-smoke:
name: Playwright Smoke
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '25'
- name: Install pnpm
uses: pnpm/action-setup@v4
with:
version: 8.9.2
- name: Install dependencies
working-directory: web
run: pnpm install --frozen-lockfile
- name: Install Playwright browsers
working-directory: web
run: pnpm exec playwright install --with-deps chromium
- name: Run Playwright smoke tests
working-directory: web
run: pnpm test:e2e
+1 -1
View File
@@ -29,7 +29,7 @@ jobs:
run: uv sync --dev
- name: Run ruff check
run: uv run ruff check src
run: uv run ruff check src/langbot/ tests/ --output-format=concise
- name: Run ruff format
run: uv run ruff format src --check
+62 -1
View File
@@ -84,6 +84,67 @@ jobs:
echo "" >> $GITHUB_STEP_SUMMARY
echo "Test Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
e2e:
name: E2E Startup Tests
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Install dependencies
run: uv sync --dev
- name: Run E2E startup tests
run: uv run pytest tests/e2e -q --tb=short
- name: E2E Test Summary
if: always()
run: |
echo "## E2E Startup Test Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Test Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
box-integration:
name: Box Integration Tests
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Install dependencies
run: uv sync --dev
- name: Check Docker runtime
run: docker info
- name: Run Box integration tests
run: uv run pytest tests/integration_tests -q --tb=short
- name: Box Integration Test Summary
if: always()
run: |
echo "## Box Integration Test Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Test Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
coverage:
name: Coverage Gate
runs-on: ubuntu-latest
@@ -129,4 +190,4 @@ jobs:
echo "## Coverage Results" >> $GITHUB_STEP_SUMMARY
echo "" >> $GITHUB_STEP_SUMMARY
echo "Threshold: 18%" >> $GITHUB_STEP_SUMMARY
echo "Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
echo "Status: ${{ job.status }}" >> $GITHUB_STEP_SUMMARY
+8
View File
@@ -125,6 +125,14 @@ uv run python -m langbot.pkg.persistence.alembic_runner autogenerate "descriptio
Review and edit the generated script before committing. Migrations execute automatically on startup. `autogenerate` detects schema changes (add/drop columns, tables, type changes) but **data migrations** (e.g. mutating JSON field contents) must be hand-written into the generated script. `env.py` sets `render_as_batch=True`, so SQLite's ALTER TABLE limits are handled automatically — no need to branch per database. More in the wiki ["开发配置"](https://docs.langbot.app/zh/develop/dev-config#数据库迁移).
When writing a migration, follow these rules:
- **Revision id ≤ 32 characters.** PostgreSQL stores `alembic_version.version_num` as `varchar(32)`; a longer id raises `StringDataRightTruncationError` at runtime. Prefer short, descriptive ids like `0005_add_llm_context_length`.
- **Guard every operation against missing tables/columns.** Fresh installs build the schema via `create_all()` and then stamp the Alembic baseline, so a migration may run against a table that already has the change — or, in tests, against an empty database. Check `inspector.get_table_names()` / `inspector.get_columns(...)` before `add_column` / `drop_column`, mirroring the existing migrations.
- **Keep a single linear head.** Chain `down_revision` to the current head; do not create branches. Run the migration tests after adding one: `uv run pytest tests/integration/persistence/ -q` (the PostgreSQL test needs a running PG via `TEST_POSTGRES_URL`).
> **Legacy migration system (deprecated — do not extend).** The old 3.x migration system under `src/langbot/pkg/persistence/migrations/` (`DBMigration` subclasses in `dbmXXX_*.py`, run from `pkg/persistence/mgr.py`) is **frozen**. Do **not** add new `dbmXXX_*.py` files. The chain is capped at `required_database_version = 25` (`pkg/utils/constants.py`); those files only exist to upgrade pre-existing 3.x databases up to the Alembic baseline and are kept read-only. All new schema changes go through Alembic.
## Some Principles
- Keep it simple, stupid.
+3 -2
View File
@@ -1,6 +1,6 @@
[project]
name = "langbot"
version = "4.10.1"
version = "4.10.2"
description = "Production-grade platform for building agentic IM bots"
readme = "README.md"
license-files = ["LICENSE"]
@@ -70,7 +70,7 @@ dependencies = [
"chromadb>=1.0.0,<2.0.0",
"qdrant-client (>=1.15.1,<2.0.0)",
"pyseekdb==1.1.0.post3",
"langbot-plugin==0.4.3",
"langbot-plugin==0.4.4",
"asyncpg>=0.30.0",
"line-bot-sdk>=3.19.0",
"matrix-nio>=0.25.2",
@@ -79,6 +79,7 @@ dependencies = [
"pymilvus>=2.6.4",
"pgvector>=0.4.1",
"botocore>=1.42.39",
"litellm>=1.0.0",
]
keywords = [
"bot",
+1 -1
View File
@@ -1,3 +1,3 @@
"""LangBot - Production-grade platform for building agentic IM bots"""
__version__ = '4.10.1'
__version__ = '4.10.2'
@@ -46,6 +46,30 @@ class MonitoringRouterGroup(group.RouterGroup):
return self.success(data=metrics)
@self.route('/token-statistics', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_token_statistics() -> str:
"""Get detailed token usage statistics (summary, per-model, timeseries)."""
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
bucket = quart.request.args.get('bucket', 'hour')
if bucket not in ('hour', 'day'):
bucket = 'hour'
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
stats = await self.ap.monitoring_service.get_token_statistics(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
bucket=bucket,
)
return self.success(data=stats)
@self.route('/messages', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_messages() -> str:
"""Get message logs"""
@@ -289,18 +313,30 @@ class MonitoringRouterGroup(group.RouterGroup):
offset=0,
)
# Get traces
traces, traces_total = await self.ap.monitoring_service.get_traces(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
return self.success(
data={
'overview': overview,
'messages': messages,
'llmCalls': llm_calls,
'embeddingCalls': embedding_calls,
'traces': traces,
'sessions': sessions,
'errors': errors,
'totalCount': {
'messages': messages_total,
'llmCalls': llm_calls_total,
'embeddingCalls': embedding_calls_total,
'traces': traces_total,
'sessions': sessions_total,
'errors': errors_total,
},
@@ -326,6 +362,49 @@ class MonitoringRouterGroup(group.RouterGroup):
return self.success(data=details)
@self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_traces() -> str:
"""Get end-to-end trace records."""
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
session_ids = quart.request.args.getlist('sessionId')
statuses = quart.request.args.getlist('status')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
traces, total = await self.ap.monitoring_service.get_traces(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
session_ids=session_ids if session_ids else None,
statuses=statuses if statuses else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return self.success(
data={
'traces': traces,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/traces/<trace_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_trace_details(trace_id: str) -> str:
"""Get one trace with all spans."""
details = await self.ap.monitoring_service.get_trace_details(trace_id)
if not details.get('found'):
return self.http_status(404, -1, f'Trace {trace_id} not found')
return self.success(data=details)
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def export_data() -> tuple[str, int]:
"""Export monitoring data as CSV"""
@@ -271,6 +271,20 @@ class PluginsRouterGroup(group.RouterGroup):
readme = await self.ap.plugin_connector.get_plugin_readme(author, plugin_name, language=language)
return self.success(data={'readme': readme})
@self.route(
'/<author>/<plugin_name>/logs',
methods=['GET'],
auth_type=group.AuthType.USER_TOKEN_OR_API_KEY,
)
async def _(author: str, plugin_name: str) -> quart.Response:
try:
limit = int(quart.request.args.get('limit', 200))
except (TypeError, ValueError):
limit = 200
level = quart.request.args.get('level') or None
logs = await self.ap.plugin_connector.get_plugin_logs(author, plugin_name, limit=limit, level=level)
return self.success(data={'logs': logs})
@self.route(
'/<author>/<plugin_name>/icon',
methods=['GET'],
@@ -336,8 +350,24 @@ class PluginsRouterGroup(group.RouterGroup):
if not endpoint.startswith('/') or '..' in endpoint:
return self.http_status(400, -1, 'invalid endpoint')
caller = {
'plugin_author': author,
'plugin_name': plugin_name,
'page_id': page_id,
'origin': _get_request_origin(),
}
headers = {
key: value
for key, value in {
'user-agent': quart.request.headers.get('User-Agent'),
'x-request-id': quart.request.headers.get('X-Request-ID'),
'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'),
}.items()
if value
}
result = await self.ap.plugin_connector.handle_page_api(
author, plugin_name, page_id, endpoint, method.upper(), body
author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers
)
if result.get('error'):
return self.http_status(400, -1, result['error'])
+46
View File
@@ -34,6 +34,46 @@ def _runtime_model_data(model_uuid: str, model_data: dict) -> dict:
return {**model_data, 'uuid': model_uuid}
async def _validate_provider_supports(ap: app.Application, provider_uuid: str, model_type: str) -> None:
"""Validate that the provider's requester declares support for ``model_type``.
``model_type`` is one of the manifest ``support_type`` values:
'llm', 'text-embedding', 'rerank'. Raises ValueError when the requester
manifest does not list the requested type. This is a server-side guard so
a model cannot be attached to a provider that does not support it, even if
the frontend tab restriction is bypassed.
"""
model_mgr = getattr(ap, 'model_mgr', None)
if model_mgr is None:
return
provider_dict = getattr(model_mgr, 'provider_dict', None)
if not provider_dict:
return
runtime_provider = provider_dict.get(provider_uuid)
if runtime_provider is None:
return
requester_name = getattr(getattr(runtime_provider, 'provider_entity', None), 'requester', None)
if not requester_name:
return
get_manifest = getattr(model_mgr, 'get_available_requester_manifest_by_name', None)
if not callable(get_manifest):
return
manifest = get_manifest(requester_name)
if manifest is None:
return
spec = getattr(manifest, 'spec', None) or {}
support_type = spec.get('support_type') if isinstance(spec, dict) else None
# When a manifest omits support_type, do not block (backward compatible).
if not support_type:
return
if model_type not in support_type:
raise ValueError(f'Provider requester "{requester_name}" does not support {model_type} models')
class LLMModelsService:
ap: app.Application
@@ -96,6 +136,8 @@ class LLMModelsService:
)
model_data['provider_uuid'] = provider_uuid
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'llm')
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(persistence_model.LLMModel).values(**model_data))
runtime_provider = self.ap.model_mgr.provider_dict.get(model_data['provider_uuid'])
@@ -274,6 +316,8 @@ class EmbeddingModelsService:
)
model_data['provider_uuid'] = provider_uuid
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'text-embedding')
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_model.EmbeddingModel).values(**model_data)
)
@@ -434,6 +478,8 @@ class RerankModelsService:
)
model_data['provider_uuid'] = provider_uuid
await _validate_provider_supports(self.ap, model_data['provider_uuid'], 'rerank')
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_model.RerankModel).values(**model_data)
)
@@ -3,11 +3,53 @@ from __future__ import annotations
import uuid
import datetime
import sqlalchemy
import json
from ....core import app
from ....entity.persistence import monitoring as persistence_monitoring
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
def _json_dumps(value: dict | list | None) -> str | None:
if value is None:
return None
try:
return json.dumps(value, ensure_ascii=False, default=str)
except Exception:
return json.dumps({'serialization_error': str(value)}, ensure_ascii=False)
def _json_loads(value: str | None) -> dict | list | None:
if not value:
return None
try:
return json.loads(value)
except Exception:
return None
def new_trace_id() -> str:
return f'trace-{uuid.uuid4().hex[:16]}'
def new_span_id() -> str:
return f'span-{uuid.uuid4().hex[:16]}'
def normalize_trace_status(status: str | None) -> str:
"""Normalize operation status to the monitoring UI vocabulary."""
if status in ('completed', 'ok'):
return 'success'
if status in ('failed', 'failure', 'exception'):
return 'error'
if status in ('running', 'success', 'error'):
return status
return 'success'
class MonitoringService:
"""Monitoring service"""
@@ -74,6 +116,18 @@ class MonitoringService:
persistence_monitoring.MonitoringFeedback.timestamp,
persistence_monitoring.MonitoringFeedback.id,
),
(
'monitoring_traces',
persistence_monitoring.MonitoringTrace,
persistence_monitoring.MonitoringTrace.started_at,
persistence_monitoring.MonitoringTrace.trace_id,
),
(
'monitoring_spans',
persistence_monitoring.MonitoringSpan,
persistence_monitoring.MonitoringSpan.started_at,
persistence_monitoring.MonitoringSpan.span_id,
),
]
deleted_counts: dict[str, int] = {}
@@ -133,6 +187,116 @@ class MonitoringService:
# ========== Recording Methods ==========
async def start_trace(
self,
trace_id: str | None = None,
name: str = 'LangBot query',
bot_id: str | None = None,
bot_name: str | None = None,
pipeline_id: str | None = None,
pipeline_name: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
query_id: str | int | None = None,
attributes: dict | None = None,
) -> str:
"""Create or update a trace header row."""
trace_id = trace_id or new_trace_id()
trace_data = {
'trace_id': trace_id,
'started_at': _utc_now(),
'ended_at': None,
'duration': None,
'status': 'running',
'name': name,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'message_id': message_id,
'query_id': str(query_id) if query_id is not None else None,
'attributes': _json_dumps(attributes),
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data)
)
return trace_id
async def finish_trace(
self,
trace_id: str,
status: str = 'success',
duration: int | None = None,
message_id: str | None = None,
attributes: dict | None = None,
) -> None:
"""Mark a trace complete."""
update_values: dict = {
'ended_at': _utc_now(),
'status': normalize_trace_status(status),
}
if duration is not None:
update_values['duration'] = duration
if message_id is not None:
update_values['message_id'] = message_id
if attributes is not None:
update_values['attributes'] = _json_dumps(attributes)
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringTrace)
.where(persistence_monitoring.MonitoringTrace.trace_id == trace_id)
.values(update_values)
)
async def record_span(
self,
trace_id: str,
name: str,
kind: str,
status: str = 'success',
span_id: str | None = None,
parent_span_id: str | None = None,
started_at: datetime.datetime | None = None,
ended_at: datetime.datetime | None = None,
duration: int | None = None,
message_id: str | None = None,
session_id: str | None = None,
bot_id: str | None = None,
pipeline_id: str | None = None,
attributes: dict | None = None,
error_message: str | None = None,
) -> str:
"""Record a single completed span."""
started_at = started_at or _utc_now()
if duration is None and ended_at is not None:
duration = int((ended_at - started_at).total_seconds() * 1000)
elif duration is not None:
duration = int(round(float(duration)))
span_data = {
'span_id': span_id or new_span_id(),
'trace_id': trace_id,
'parent_span_id': parent_span_id,
'name': name,
'kind': kind,
'status': normalize_trace_status(status),
'started_at': started_at,
'ended_at': ended_at or _utc_now(),
'duration': duration,
'message_id': message_id,
'session_id': session_id,
'bot_id': bot_id,
'pipeline_id': pipeline_id,
'attributes': _json_dumps(attributes),
'error_message': error_message,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data)
)
return span_data['span_id']
async def record_message(
self,
bot_id: str,
@@ -472,6 +636,179 @@ class MonitoringService:
'active_sessions': active_sessions,
}
async def get_token_statistics(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
bucket: str = 'hour',
) -> dict:
"""Get detailed token usage statistics for production observability.
Returns:
- summary: aggregate token counters and call/latency stats over the window
- by_model: per-model token + call breakdown (sorted by total tokens desc)
- timeseries: token usage bucketed by `bucket` ('hour' or 'day')
Only successful LLM calls are counted toward token totals; error calls are
reported separately so a spike in failures is visible without polluting
token accounting.
"""
LLMCall = persistence_monitoring.MonitoringLLMCall
conditions = []
if bot_ids:
conditions.append(LLMCall.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(LLMCall.pipeline_id.in_(pipeline_ids))
if start_time:
conditions.append(LLMCall.timestamp >= start_time)
if end_time:
conditions.append(LLMCall.timestamp <= end_time)
def _apply(query):
if conditions:
query = query.where(sqlalchemy.and_(*conditions))
return query
# ---- Summary aggregates ----
summary_query = _apply(
sqlalchemy.select(
sqlalchemy.func.count(LLMCall.id),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'success', 1), else_=0)),
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
# Count of successful calls that nonetheless recorded zero tokens —
# a data-quality signal that usage reporting may be broken upstream.
sqlalchemy.func.sum(
sqlalchemy.case(
(sqlalchemy.and_(LLMCall.status == 'success', LLMCall.total_tokens == 0), 1),
else_=0,
)
),
)
)
summary_result = await self.ap.persistence_mgr.execute_async(summary_query)
row = summary_result.first()
(
total_calls,
total_input_tokens,
total_output_tokens,
total_tokens,
total_duration,
total_cost,
success_calls,
error_calls,
zero_token_success_calls,
) = row if row else (0, 0, 0, 0, 0, 0.0, 0, 0, 0)
total_calls = total_calls or 0
success_calls = success_calls or 0
error_calls = error_calls or 0
zero_token_success_calls = zero_token_success_calls or 0
summary = {
'total_calls': total_calls,
'success_calls': success_calls,
'error_calls': error_calls,
'total_input_tokens': int(total_input_tokens or 0),
'total_output_tokens': int(total_output_tokens or 0),
'total_tokens': int(total_tokens or 0),
'total_cost': round(float(total_cost or 0.0), 6),
'avg_tokens_per_call': int((total_tokens or 0) / total_calls) if total_calls > 0 else 0,
'avg_duration_ms': int((total_duration or 0) / total_calls) if total_calls > 0 else 0,
'avg_tokens_per_second': round((total_output_tokens or 0) / (total_duration / 1000), 2)
if total_duration and total_duration > 0
else 0,
'zero_token_success_calls': zero_token_success_calls,
}
# ---- Per-model breakdown ----
by_model_query = _apply(
sqlalchemy.select(
LLMCall.model_name,
sqlalchemy.func.count(LLMCall.id),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
sqlalchemy.func.sum(sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)),
).group_by(LLMCall.model_name)
)
by_model_result = await self.ap.persistence_mgr.execute_async(by_model_query)
by_model = []
for mrow in by_model_result.all():
(
model_name,
m_calls,
m_in,
m_out,
m_total,
m_duration,
m_cost,
m_errors,
) = mrow
m_calls = m_calls or 0
by_model.append(
{
'model_name': model_name,
'calls': m_calls,
'error_calls': m_errors or 0,
'input_tokens': int(m_in or 0),
'output_tokens': int(m_out or 0),
'total_tokens': int(m_total or 0),
'cost': round(float(m_cost or 0.0), 6),
'avg_tokens_per_call': int((m_total or 0) / m_calls) if m_calls > 0 else 0,
'avg_duration_ms': int((m_duration or 0) / m_calls) if m_calls > 0 else 0,
}
)
by_model.sort(key=lambda x: x['total_tokens'], reverse=True)
# ---- Time-bucketed series ----
# Use a DB-agnostic bucketing approach: fetch (timestamp, tokens) rows and
# aggregate in Python. The window is bounded by the time filter, so this is
# cheap for typical dashboard ranges (hours/days).
series_query = _apply(
sqlalchemy.select(
LLMCall.timestamp,
LLMCall.input_tokens,
LLMCall.output_tokens,
LLMCall.total_tokens,
).order_by(LLMCall.timestamp.asc())
)
series_result = await self.ap.persistence_mgr.execute_async(series_query)
bucket_fmt = '%Y-%m-%d %H:00' if bucket == 'hour' else '%Y-%m-%d'
buckets: dict[str, dict] = {}
for srow in series_result.all():
ts, s_in, s_out, s_total = srow
if ts is None:
continue
key = ts.strftime(bucket_fmt)
b = buckets.setdefault(
key,
{'bucket': key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'calls': 0},
)
b['input_tokens'] += int(s_in or 0)
b['output_tokens'] += int(s_out or 0)
b['total_tokens'] += int(s_total or 0)
b['calls'] += 1
timeseries = [buckets[k] for k in sorted(buckets.keys())]
return {
'summary': summary,
'by_model': by_model,
'timeseries': timeseries,
'bucket': bucket,
}
async def get_messages(
self,
bot_ids: list[str] | None = None,
@@ -903,6 +1240,19 @@ class MonitoringService:
for row in error_rows
]
trace_query = (
sqlalchemy.select(persistence_monitoring.MonitoringTrace)
.where(persistence_monitoring.MonitoringTrace.message_id == message_id)
.order_by(persistence_monitoring.MonitoringTrace.started_at.desc())
.limit(1)
)
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
trace_row = trace_result.first()
trace = None
if trace_row:
trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row
trace = self._serialize_trace(trace_model)
return {
'message_id': message_id,
'found': True,
@@ -917,6 +1267,84 @@ class MonitoringService:
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
},
'errors': errors,
'trace': trace,
}
def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict:
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace)
data['attributes'] = _json_loads(data.get('attributes')) or {}
return data
def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict:
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span)
data['attributes'] = _json_loads(data.get('attributes')) or {}
return data
async def get_traces(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
session_ids: list[str] | None = None,
statuses: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get trace headers with filters."""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids))
if session_ids:
conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids))
if statuses:
conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses))
if start_time:
conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time)
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id))
query = sqlalchemy.select(persistence_monitoring.MonitoringTrace)
if conditions:
clause = sqlalchemy.and_(*conditions)
count_query = count_query.where(clause)
query = query.where(clause)
total_result = await self.ap.persistence_mgr.execute_async(count_query)
total = total_result.scalar() or 0
query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
traces = [self._serialize_trace(row[0] if isinstance(row, tuple) else row) for row in result.all()]
return traces, total
async def get_trace_details(self, trace_id: str) -> dict:
"""Get a single trace and all spans in chronological order."""
trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where(
persistence_monitoring.MonitoringTrace.trace_id == trace_id
)
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
trace_row = trace_result.first()
if not trace_row:
return {'trace_id': trace_id, 'found': False}
trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row
span_query = (
sqlalchemy.select(persistence_monitoring.MonitoringSpan)
.where(persistence_monitoring.MonitoringSpan.trace_id == trace_id)
.order_by(persistence_monitoring.MonitoringSpan.started_at.asc())
)
span_result = await self.ap.persistence_mgr.execute_async(span_query)
spans = [self._serialize_span(row[0] if isinstance(row, tuple) else row) for row in span_result.all()]
return {
'trace_id': trace_id,
'found': True,
'trace': self._serialize_trace(trace),
'spans': spans,
}
# ========== Export Methods ==========
+1 -1
View File
@@ -82,7 +82,7 @@ class UserService:
payload = {
'user': user_email,
'iss': 'LangBot-' + constants.edition,
'exp': datetime.datetime.now() + datetime.timedelta(seconds=jwt_expire),
'exp': datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=jwt_expire),
}
return jwt.encode(payload, jwt_secret, algorithm='HS256')
-1
View File
@@ -16,7 +16,6 @@ importutil.import_modules_in_pkg(stages)
stage_order = [
'LoadConfigStage',
'MigrationStage',
'GenKeysStage',
'SetupLoggerStage',
'BuildAppStage',
+1
View File
@@ -42,6 +42,7 @@ required_deps = {
'telegramify_markdown': 'telegramify-markdown',
'slack_sdk': 'slack_sdk',
'asyncpg': 'asyncpg',
'litellm': 'litellm',
}
-45
View File
@@ -1,45 +0,0 @@
from __future__ import annotations
import abc
import typing
from . import app
preregistered_migrations: list[typing.Type[Migration]] = []
"""Currently not supported for extension"""
def migration_class(name: str, number: int):
"""Register a migration"""
def decorator(cls: typing.Type[Migration]) -> typing.Type[Migration]:
cls.name = name
cls.number = number
preregistered_migrations.append(cls)
return cls
return decorator
class Migration(abc.ABC):
"""A version migration"""
name: str
number: int
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
@abc.abstractmethod
async def need_migrate(self) -> bool:
"""Determine if the current environment needs to run this migration"""
pass
@abc.abstractmethod
async def run(self):
"""Run migration"""
pass
@@ -1,24 +0,0 @@
from __future__ import annotations
import os
from .. import migration
@migration.migration_class('sensitive-word-migration', 1)
class SensitiveWordMigration(migration.Migration):
"""敏感词迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return os.path.exists('data/config/sensitive-words.json') and not os.path.exists(
'data/metadata/sensitive-words.json'
)
async def run(self):
"""执行迁移"""
# 移动文件
os.rename('data/config/sensitive-words.json', 'data/metadata/sensitive-words.json')
# 重新加载配置
await self.ap.sensitive_meta.load_config()
@@ -1,44 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('openai-config-migration', 2)
class OpenAIConfigMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'openai-config' in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
old_openai_config = self.ap.provider_cfg.data['openai-config'].copy()
if 'keys' not in self.ap.provider_cfg.data:
self.ap.provider_cfg.data['keys'] = {}
if 'openai' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['openai'] = []
self.ap.provider_cfg.data['keys']['openai'] = old_openai_config['api-keys']
self.ap.provider_cfg.data['model'] = old_openai_config['chat-completions-params']['model']
del old_openai_config['chat-completions-params']['model']
if 'requester' not in self.ap.provider_cfg.data:
self.ap.provider_cfg.data['requester'] = {}
if 'openai-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['openai-chat-completions'] = {}
self.ap.provider_cfg.data['requester']['openai-chat-completions'] = {
'base-url': old_openai_config['base_url'],
'args': old_openai_config['chat-completions-params'],
'timeout': old_openai_config['request-timeout'],
}
del self.ap.provider_cfg.data['openai-config']
await self.ap.provider_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('anthropic-requester-config-completion', 3)
class AnthropicRequesterConfigCompletionMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'anthropic-messages' not in self.ap.provider_cfg.data['requester']
or 'anthropic' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'anthropic-messages' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['anthropic-messages'] = {
'base-url': 'https://api.anthropic.com',
'args': {'max_tokens': 1024},
'timeout': 120,
}
if 'anthropic' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['anthropic'] = []
await self.ap.provider_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('moonshot-config-completion', 4)
class MoonshotConfigCompletionMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'moonshot-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'moonshot' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'moonshot-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['moonshot-chat-completions'] = {
'base-url': 'https://api.moonshot.cn/v1',
'args': {},
'timeout': 120,
}
if 'moonshot' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['moonshot'] = []
await self.ap.provider_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('deepseek-config-completion', 5)
class DeepseekConfigCompletionMigration(migration.Migration):
"""OpenAI配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'deepseek-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'deepseek' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'deepseek-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['deepseek-chat-completions'] = {
'base-url': 'https://api.deepseek.com',
'args': {},
'timeout': 120,
}
if 'deepseek' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['deepseek'] = []
await self.ap.provider_cfg.dump_config()
@@ -1,19 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('vision-config', 6)
class VisionConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'enable-vision' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
if 'enable-vision' not in self.ap.provider_cfg.data:
self.ap.provider_cfg.data['enable-vision'] = False
await self.ap.provider_cfg.dump_config()
@@ -1,20 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('qcg-center-url-config', 7)
class QCGCenterURLConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'qcg-center-url' not in self.ap.system_cfg.data
async def run(self):
"""执行迁移"""
if 'qcg-center-url' not in self.ap.system_cfg.data:
self.ap.system_cfg.data['qcg-center-url'] = 'https://api.qchatgpt.rockchin.top/api/v2'
await self.ap.system_cfg.dump_config()
@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('ad-fixwin-cfg-migration', 8)
class AdFixwinConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return isinstance(self.ap.pipeline_cfg.data['rate-limit']['fixwin']['default'], int)
async def run(self):
"""执行迁移"""
for session_name in self.ap.pipeline_cfg.data['rate-limit']['fixwin']:
temp_dict = {
'window-size': 60,
'limit': self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name],
}
self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name] = temp_dict
await self.ap.pipeline_cfg.dump_config()
@@ -1,22 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('msg-truncator-cfg-migration', 9)
class MsgTruncatorConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'msg-truncate' not in self.ap.pipeline_cfg.data
async def run(self):
"""执行迁移"""
self.ap.pipeline_cfg.data['msg-truncate'] = {
'method': 'round',
'round': {'max-round': 10},
}
await self.ap.pipeline_cfg.dump_config()
@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('ollama-requester-config', 10)
class MsgTruncatorConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'ollama-chat' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['ollama-chat'] = {
'base-url': 'http://127.0.0.1:11434',
'args': {},
'timeout': 600,
}
await self.ap.provider_cfg.dump_config()
@@ -1,19 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('command-prefix-config', 11)
class CommandPrefixConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'command-prefix' not in self.ap.command_cfg.data
async def run(self):
"""执行迁移"""
self.ap.command_cfg.data['command-prefix'] = ['!', '']
await self.ap.command_cfg.dump_config()
@@ -1,19 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('runner-config', 12)
class RunnerConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'runner' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['runner'] = 'local-agent'
await self.ap.provider_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('http-api-config', 13)
class HttpApiConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'http-api' not in self.ap.system_cfg.data or 'persistence' not in self.ap.system_cfg.data
async def run(self):
"""执行迁移"""
self.ap.system_cfg.data['http-api'] = {
'enable': True,
'host': '0.0.0.0',
'port': 5300,
'jwt-expire': 604800,
}
self.ap.system_cfg.data['persistence'] = {
'sqlite': {'path': 'data/persistence.db'},
'use': 'sqlite',
}
await self.ap.system_cfg.dump_config()
@@ -1,22 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('force-delay-config', 14)
class ForceDelayConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return isinstance(self.ap.platform_cfg.data['force-delay'], list)
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['force-delay'] = {
'min': self.ap.platform_cfg.data['force-delay'][0],
'max': self.ap.platform_cfg.data['force-delay'][1],
}
await self.ap.platform_cfg.dump_config()
@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('gitee-ai-config', 15)
class GiteeAIConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'gitee-ai-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'gitee-ai' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['gitee-ai-chat-completions'] = {
'base-url': 'https://ai.gitee.com/v1',
'args': {},
'timeout': 120,
}
self.ap.provider_cfg.data['keys']['gitee-ai'] = ['XXXXX']
await self.ap.provider_cfg.dump_config()
@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dify-service-api-config', 16)
class DifyServiceAPICfgMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'dify-service-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api'] = {
'base-url': 'https://api.dify.ai/v1',
'app-type': 'chat',
'chat': {'api-key': 'app-1234567890'},
'workflow': {'api-key': 'app-1234567890', 'output-key': 'summary'},
}
await self.ap.provider_cfg.dump_config()
@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dify-api-timeout-params', 17)
class DifyAPITimeoutParamsMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat']
or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow']
or 'agent' not in self.ap.provider_cfg.data['dify-service-api']
)
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['agent'] = {
'api-key': 'app-1234567890',
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()
@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('xai-config', 18)
class XaiConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'xai-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['xai-chat-completions'] = {
'base-url': 'https://api.x.ai/v1',
'args': {},
'timeout': 120,
}
self.ap.provider_cfg.data['keys']['xai'] = ['xai-1234567890']
await self.ap.provider_cfg.dump_config()
@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('zhipuai-config', 19)
class ZhipuaiConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'zhipuai-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['zhipuai-chat-completions'] = {
'base-url': 'https://open.bigmodel.cn/api/paas/v4',
'args': {},
'timeout': 120,
}
self.ap.provider_cfg.data['keys']['zhipuai'] = ['xxxxxxx']
await self.ap.provider_cfg.dump_config()
@@ -1,36 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wecom-config', 20)
class WecomConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'wecom':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'wecom',
'enable': False,
'host': '0.0.0.0',
'port': 2290,
'corpid': '',
'secret': '',
'token': '',
'EncodingAESKey': '',
'contacts_secret': '',
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,35 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('lark-config', 21)
class LarkConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'lark':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'lark',
'enable': False,
'app_id': 'cli_abcdefgh',
'app_secret': 'XXXXXXXXXX',
'bot_name': 'LangBot',
'enable-webhook': False,
'port': 2285,
'encrypt-key': 'xxxxxxxxx',
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,23 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('lmstudio-config', 22)
class LmStudioConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'lmstudio-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['requester']['lmstudio-chat-completions'] = {
'base-url': 'http://127.0.0.1:1234/v1',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()
@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('siliconflow-config', 23)
class SiliconFlowConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'siliconflow-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['keys']['siliconflow'] = ['xxxxxxx']
self.ap.provider_cfg.data['requester']['siliconflow-chat-completions'] = {
'base-url': 'https://api.siliconflow.cn/v1',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()
@@ -1,31 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('discord-config', 24)
class DiscordConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'discord':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'discord',
'enable': False,
'client_id': '1234567890',
'token': 'XXXXXXXXXX',
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,35 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('gewechat-config', 25)
class GewechatConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'gewechat':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'gewechat',
'enable': False,
'gewechat_url': 'http://your-gewechat-server:2531',
'gewechat_file_url': 'http://your-gewechat-server:2532',
'port': 2286,
'callback_url': 'http://your-callback-url:2286/gewechat/callback',
'app_id': '',
'token': '',
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,33 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('qqofficial-config', 26)
class QQOfficialConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'qqofficial':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'qqofficial',
'enable': False,
'appid': '',
'secret': '',
'port': 2284,
'token': '',
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,35 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wx-official-account-config', 27)
class WXOfficialAccountConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'officialaccount':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'officialaccount',
'enable': False,
'token': '',
'EncodingAESKey': '',
'AppID': '',
'AppSecret': '',
'host': '0.0.0.0',
'port': 2287,
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('bailian-requester-config', 28)
class BailianRequesterConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'bailian-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['keys']['bailian'] = ['sk-xxxxxxx']
self.ap.provider_cfg.data['requester']['bailian-chat-completions'] = {
'base-url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()
@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dashscope-app-api-config', 29)
class DashscopeAppAPICfgMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'dashscope-app-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dashscope-app-api'] = {
'app-type': 'agent',
'api-key': 'sk-1234567890',
'agent': {'app-id': 'Your_app_id', 'references_quote': '参考资料来自:'},
'workflow': {
'app-id': 'Your_app_id',
'references_quote': '参考资料来自:',
'biz_params': {'city': '北京', 'date': '2023-08-10'},
},
}
await self.ap.provider_cfg.dump_config()
@@ -1,31 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('lark-config-cmpl', 30)
class LarkConfigCmplMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'lark':
if 'enable-webhook' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'lark':
if 'enable-webhook' not in adapter:
adapter['enable-webhook'] = False
if 'port' not in adapter:
adapter['port'] = 2285
if 'encrypt-key' not in adapter:
adapter['encrypt-key'] = 'xxxxxxxxx'
await self.ap.platform_cfg.dump_config()
@@ -1,33 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dingtalk-config', 31)
class DingTalkConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
# for adapter in self.ap.platform_cfg.data['platform-adapters']:
# if adapter['adapter'] == 'dingtalk':
# return False
# return True
return False
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters'].append(
{
'adapter': 'dingtalk',
'enable': False,
'client_id': '',
'client_secret': '',
'robot_code': '',
'robot_name': '',
}
)
await self.ap.platform_cfg.dump_config()
@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('volcark-requester-config', 32)
class VolcArkRequesterConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'volcark-chat-completions' not in self.ap.provider_cfg.data['requester']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['keys']['volcark'] = ['xxxxxxxx']
self.ap.provider_cfg.data['requester']['volcark-chat-completions'] = {
'base-url': 'https://ark.cn-beijing.volces.com/api/v3',
'args': {},
'timeout': 120,
}
await self.ap.provider_cfg.dump_config()
@@ -1,24 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dify-thinking-config', 33)
class DifyThinkingConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
if 'options' not in self.ap.provider_cfg.data['dify-service-api']:
return True
if 'convert-thinking-tips' not in self.ap.provider_cfg.data['dify-service-api']['options']:
return True
return False
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['options'] = {'convert-thinking-tips': 'plain'}
await self.ap.provider_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from urllib.parse import urlparse
from .. import migration
@migration.migration_class('gewechat-file-url-config', 34)
class GewechatFileUrlConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'gewechat':
if 'gewechat_file_url' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'gewechat':
if 'gewechat_file_url' not in adapter:
parsed_url = urlparse(adapter['gewechat_url'])
adapter['gewechat_file_url'] = f'{parsed_url.scheme}://{parsed_url.hostname}:2532'
await self.ap.platform_cfg.dump_config()
@@ -1,26 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wxoa-mode', 35)
class WxoaModeMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'Mode' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'Mode' not in adapter:
adapter['Mode'] = 'drop'
await self.ap.platform_cfg.dump_config()
@@ -1,26 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('wxoa-loading-message', 36)
class WxoaLoadingMessageMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'LoadingMessage' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] == 'officialaccount':
if 'LoadingMessage' not in adapter:
adapter['LoadingMessage'] = 'AI正在思考中,请发送任意内容获取回复。'
await self.ap.platform_cfg.dump_config()
@@ -1,18 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('mcp-config', 37)
class MCPConfigMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'mcp' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['mcp'] = {'servers': []}
await self.ap.provider_cfg.dump_config()
@@ -1,25 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('tg-dingtalk-markdown', 38)
class TgDingtalkMarkdownMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] in ['dingtalk', 'telegram']:
if 'markdown_card' not in adapter:
return True
return False
async def run(self):
"""执行迁移"""
for adapter in self.ap.platform_cfg.data['platform-adapters']:
if adapter['adapter'] in ['dingtalk', 'telegram']:
if 'markdown_card' not in adapter:
adapter['markdown_card'] = False
await self.ap.platform_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('modelscope-config-completion', 39)
class ModelScopeConfigCompletionMigration(migration.Migration):
"""ModelScope配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'modelscope-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'modelscope' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'modelscope-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['modelscope-chat-completions'] = {
'base-url': 'https://api-inference.modelscope.cn/v1',
'args': {},
'timeout': 120,
}
if 'modelscope' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['modelscope'] = []
await self.ap.provider_cfg.dump_config()
@@ -1,29 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('ppio-config', 40)
class PPIOConfigMigration(migration.Migration):
"""PPIO配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return (
'ppio-chat-completions' not in self.ap.provider_cfg.data['requester']
or 'ppio' not in self.ap.provider_cfg.data['keys']
)
async def run(self):
"""执行迁移"""
if 'ppio-chat-completions' not in self.ap.provider_cfg.data['requester']:
self.ap.provider_cfg.data['requester']['ppio-chat-completions'] = {
'base-url': 'https://api.ppinfra.com/v3/openai',
'args': {},
'timeout': 120,
}
if 'ppio' not in self.ap.provider_cfg.data['keys']:
self.ap.provider_cfg.data['keys']['ppio'] = []
await self.ap.provider_cfg.dump_config()
@@ -1,17 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('dingtalk_card_auto_layout', 41)
class DingTalkCardAutoLayoutMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return True
async def run(self):
"""执行迁移"""
self.ap.platform_cfg.data['platform-adapters']['app']['dingtalk']['card_auto_layout'] = False
await self.ap.platform_cfg.dump_config()
@@ -1,27 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('weknora-api-config', 42)
class WeKnoraAPICfgMigration(migration.Migration):
"""WeKnora API 配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'weknora-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['weknora-api'] = {
'base-url': 'http://localhost:8080/api/v1',
'app-type': 'agent',
'api-key': '',
'agent-id': 'builtin-smart-reasoning',
'knowledge-base-ids': [],
'web-search-enabled': False,
'timeout': 120,
'base-prompt': '请回答用户的问题。',
}
await self.ap.provider_cfg.dump_config()
@@ -1,30 +0,0 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('deerflow-api-config', 43)
class DeerFlowAPICfgMigration(migration.Migration):
"""DeerFlow API 配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'deerflow-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['deerflow-api'] = {
'api-base': 'http://127.0.0.1:2026',
'api-key': '',
'auth-header': '',
'assistant-id': 'lead_agent',
'model-name': '',
'thinking-enabled': False,
'plan-mode': False,
'subagent-enabled': False,
'max-concurrent-subagents': 3,
'timeout': 300,
'recursion-limit': 1000,
}
await self.ap.provider_cfg.dump_config()
@@ -202,6 +202,16 @@ class LoadConfigStage(stage.BootingStage):
constants.instance_id = new_id
constants.edition = ap.instance_config.data.get('system', {}).get('edition', 'community')
# Instance creation timestamp: sourced from data/labels/instance_id.json.
# Instances created before this field existed (or supplied via
# system.instance_id) won't have it, so backfill with the current time
# and persist it via the dump below — from then on it stays stable.
instance_create_ts = ap.instance_id.data.get('instance_create_ts', 0)
if not isinstance(instance_create_ts, int) or instance_create_ts <= 0:
instance_create_ts = int(time.time())
ap.instance_id.data['instance_create_ts'] = instance_create_ts
constants.instance_create_ts = instance_create_ts
print(f'LangBot instance id: {constants.instance_id}')
print(f'LangBot edition: {constants.edition}')
-43
View File
@@ -1,43 +0,0 @@
from __future__ import annotations
from .. import stage, app
from .. import migration
from ...utils import importutil
from .. import migrations
importutil.import_modules_in_pkg(migrations)
@stage.stage_class('MigrationStage')
class MigrationStage(stage.BootingStage):
"""Migration stage
These migrations are legacy, only performed in version 3.x
"""
async def run(self, ap: app.Application):
"""Run migration"""
if any(
[
ap.command_cfg is None,
ap.pipeline_cfg is None,
ap.platform_cfg is None,
ap.provider_cfg is None,
ap.system_cfg is None,
]
): # only run migration when version is 3.x
return
migrations = migration.preregistered_migrations
# Sort by migration number
migrations.sort(key=lambda x: x.number)
for migration_cls in migrations:
migration_instance = migration_cls(ap)
if await migration_instance.need_migrate():
await migration_instance.run()
print(f'Migration {migration_instance.name} executed')
@@ -31,6 +31,7 @@ class LLMModel(Base):
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
provider_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
abilities = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default=[])
context_length = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
extra_args = sqlalchemy.Column(sqlalchemy.JSON, nullable=False, default={})
prefered_ranking = sqlalchemy.Column(sqlalchemy.Integer, nullable=False, default=0)
created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now())
@@ -3,6 +3,49 @@ import sqlalchemy
from .base import Base
class MonitoringTrace(Base):
"""End-to-end monitoring trace records"""
__tablename__ = 'monitoring_traces'
trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
class MonitoringSpan(Base):
"""Trace span records for pipeline, RAG, model, plugin and tool operations"""
__tablename__ = 'monitoring_spans'
span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True)
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True)
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
class MonitoringMessage(Base):
"""Monitoring message records"""
@@ -0,0 +1,39 @@
"""add llm model context length
Revision ID: 0005_add_llm_context_length
Revises: 0004_add_mcp_readme
Create Date: 2026-06-07
"""
import sqlalchemy as sa
from alembic import op
revision = '0005_add_llm_context_length'
down_revision = '0004_add_mcp_readme'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add ``context_length`` to llm_models if the table exists and the column is
# missing. The table may have been created by create_all() with the column
# already present on fresh installs, so guard against duplicate-add; it may
# also be absent entirely (e.g. migrating a truly empty DB), so guard against
# a missing table too.
conn = op.get_bind()
inspector = sa.inspect(conn)
if 'llm_models' not in inspector.get_table_names():
return
columns = {column['name'] for column in inspector.get_columns('llm_models')}
if 'context_length' not in columns:
op.add_column('llm_models', sa.Column('context_length', sa.Integer(), nullable=True))
def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
if 'llm_models' not in inspector.get_table_names():
return
columns = {column['name'] for column in inspector.get_columns('llm_models')}
if 'context_length' in columns:
op.drop_column('llm_models', 'context_length')
@@ -0,0 +1,88 @@
"""add monitoring traces and spans
Revision ID: 0006_monitoring_traces
Revises: 0005_add_llm_context_length
Create Date: 2026-06-16
"""
import sqlalchemy as sa
from alembic import op
revision = '0006_monitoring_traces'
down_revision = '0005_add_llm_context_length'
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
tables = set(inspector.get_table_names())
if 'monitoring_traces' not in tables:
op.create_table(
'monitoring_traces',
sa.Column('trace_id', sa.String(length=255), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('ended_at', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('bot_id', sa.String(length=255), nullable=True),
sa.Column('bot_name', sa.String(length=255), nullable=True),
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
sa.Column('pipeline_name', sa.String(length=255), nullable=True),
sa.Column('session_id', sa.String(length=255), nullable=True),
sa.Column('message_id', sa.String(length=255), nullable=True),
sa.Column('query_id', sa.String(length=255), nullable=True),
sa.Column('attributes', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('trace_id'),
)
op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at'])
op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at'])
op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status'])
op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id'])
op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id'])
op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id'])
op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id'])
op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id'])
if 'monitoring_spans' not in tables:
op.create_table(
'monitoring_spans',
sa.Column('span_id', sa.String(length=255), nullable=False),
sa.Column('trace_id', sa.String(length=255), nullable=False),
sa.Column('parent_span_id', sa.String(length=255), nullable=True),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('kind', sa.String(length=80), nullable=False),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('ended_at', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Integer(), nullable=True),
sa.Column('message_id', sa.String(length=255), nullable=True),
sa.Column('session_id', sa.String(length=255), nullable=True),
sa.Column('bot_id', sa.String(length=255), nullable=True),
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
sa.Column('attributes', sa.Text(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('span_id'),
)
op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id'])
op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id'])
op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind'])
op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status'])
op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at'])
op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id'])
op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id'])
op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id'])
op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id'])
def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
tables = set(inspector.get_table_names())
if 'monitoring_spans' in tables:
op.drop_table('monitoring_spans')
if 'monitoring_traces' in tables:
op.drop_table('monitoring_traces')
@@ -0,0 +1,36 @@
# Legacy migrations (DEPRECATED — do not add new files here)
This directory holds the **legacy 3.x database migration system**
(`DBMigration` subclasses in `dbmXXX_*.py`, registered via
`@migration.migration_class(N)` and run from `pkg/persistence/mgr.py`).
**This system is frozen. Do not add new `dbmXXX_*.py` migrations.**
The chain is capped at version 25 (`required_database_version = 25` in
`pkg/utils/constants.py`). These files exist only to upgrade pre-existing
3.x databases up to the Alembic baseline (`0001_baseline`). Removing them
would break in-place upgrades from old installations, so they are kept
read-only.
## All new schema changes use Alembic
Migrations now live in `pkg/persistence/alembic/versions/`. To create one:
```bash
uv run python -m langbot.pkg.persistence.alembic_runner autogenerate "description of your change"
```
(requires `data/config.yaml` to exist). Review and edit the generated
script before committing — Alembic migrations run automatically on startup
and must be idempotent and guard against missing tables (the test suite
runs them against empty databases).
### Rules for Alembic revision ids
- Keep the revision id **≤ 32 characters** — PostgreSQL stores
`alembic_version.version_num` as `varchar(32)` and will raise
`StringDataRightTruncationError` on overflow.
- Guard every `op` call against a missing table / missing column
(`inspector.get_table_names()` / `inspector.get_columns()`); fresh
installs create the schema via `create_all()` and stamp the baseline,
so migrations may run against tables that already match or do not exist.
+146 -26
View File
@@ -2,6 +2,9 @@ from __future__ import annotations
import typing
import traceback
import time
import uuid
import datetime
import sqlalchemy
@@ -79,6 +82,19 @@ class RuntimePipeline:
enable_all_plugins: bool
"""是否启用所有插件"""
@staticmethod
def _new_span_id() -> str:
return f'span-{uuid.uuid4().hex[:16]}'
@staticmethod
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
@staticmethod
def _query_session_id(query: pipeline_query.Query) -> str:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
return f'{launcher_type}_{query.launcher_id}'
enable_all_mcp_servers: bool
"""是否启用所有MCP服务器"""
@@ -234,44 +250,96 @@ class RuntimePipeline:
stage_container = self.stage_containers[i]
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
span_started_at = self._utc_now()
span_started = time.perf_counter()
span_status = 'success'
span_error = None
span_result_type = None
result = stage_container.inst.process(query, stage_container.inst_name)
try:
result = stage_container.inst.process(query, stage_container.inst_name)
if isinstance(result, typing.Coroutine):
result = await result
if isinstance(result, typing.Coroutine):
result = await result
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
)
await self._check_output(query, result)
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
break
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query
elif isinstance(result, typing.AsyncGenerator): # 生成器
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
async for sub_result in result:
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
span_result_type = str(
result.result_type.value if hasattr(result.result_type, 'value') else result.result_type
)
await self._check_output(query, sub_result)
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
)
await self._check_output(query, result)
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query
await self._execute_from_stage(i + 1, query)
break
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query
elif isinstance(result, typing.AsyncGenerator): # 生成器
span_result_type = 'generator'
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
async for sub_result in result:
span_result_type = str(
sub_result.result_type.value
if hasattr(sub_result.result_type, 'value')
else sub_result.result_type
)
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
)
await self._check_output(query, sub_result)
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(
f'Stage {stage_container.inst_name} interrupted query {query.query_id}'
)
break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query
await self._execute_from_stage(i + 1, query)
break
except Exception as e:
span_status = 'error'
span_error = str(e)
raise
finally:
trace_id = (query.variables or {}).get('_monitoring_trace_id')
root_span_id = (query.variables or {}).get('_monitoring_root_span_id')
if trace_id:
try:
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=root_span_id,
name=stage_container.inst_name,
kind='pipeline.stage',
status=span_status,
started_at=span_started_at,
duration=int((time.perf_counter() - span_started) * 1000),
message_id=(query.variables or {}).get('_monitoring_message_id'),
session_id=self._query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=self.pipeline_entity.uuid,
attributes={
'stage_class': stage_container.inst.__class__.__name__,
'result_type': span_result_type,
'query_id': query.query_id,
},
error_message=span_error,
)
except Exception as monitor_err:
self.ap.logger.error(f'Failed to record stage span: {monitor_err}')
i += 1
async def process_query(self, query: pipeline_query.Query):
"""处理请求"""
trace_started_at = self._utc_now()
trace_started = time.perf_counter()
root_span_id = self._new_span_id()
trace_id = None
trace_status = 'success'
# Get monitoring metadata
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
@@ -303,6 +371,28 @@ class RuntimePipeline:
except Exception as e:
self.ap.logger.error(f'Failed to record query start: {e}')
try:
trace_id = await self.ap.monitoring_service.start_trace(
name='LangBot query',
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
session_id=self._query_session_id(query),
message_id=message_id or None,
query_id=query.query_id,
attributes={
'launcher_type': query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
'runner_name': runner_name,
},
)
query.variables['_monitoring_trace_id'] = trace_id
query.variables['_monitoring_root_span_id'] = root_span_id
except Exception as e:
self.ap.logger.error(f'Failed to start query trace: {e}')
try:
# Get bound plugins for this pipeline
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
@@ -361,6 +451,7 @@ class RuntimePipeline:
self.ap.logger.error(f'Failed to record query response: {e}')
except Exception as e:
trace_status = 'error'
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
@@ -383,6 +474,35 @@ class RuntimePipeline:
self.ap.logger.error(f'Failed to record query error: {me}')
finally:
if trace_id:
try:
duration_ms = int((time.perf_counter() - trace_started) * 1000)
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=root_span_id,
name='LangBot query',
kind='pipeline.query',
status=trace_status,
started_at=trace_started_at,
duration=duration_ms,
message_id=message_id or None,
session_id=self._query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=self.pipeline_entity.uuid,
attributes={
'query_id': query.query_id,
'pipeline_name': pipeline_name,
'runner_name': runner_name,
},
)
await self.ap.monitoring_service.finish_trace(
trace_id=trace_id,
status=trace_status,
duration=duration_ms,
message_id=message_id or None,
)
except Exception as monitor_err:
self.ap.logger.error(f'Failed to finish query trace: {monitor_err}')
self.ap.logger.debug(f'Query {query.query_id} processed')
del self.ap.query_pool.cached_queries[query.query_id]
+4 -8
View File
@@ -109,7 +109,7 @@ class PreProcessor(stage.PipelineStage):
if llm_model:
query.use_llm_model_uuid = llm_model.model_entity.uuid
if llm_model.model_entity.abilities.__contains__('func_call'):
if 'func_call' in (llm_model.model_entity.abilities or []):
# Get bound plugins and MCP servers for filtering tools
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
@@ -159,11 +159,7 @@ class PreProcessor(stage.PipelineStage):
# Check if this model supports vision, if not, remove all images
# TODO this checking should be performed in runner, and in this stage, the image should be reserved
if (
selected_runner == 'local-agent'
and llm_model
and not llm_model.model_entity.abilities.__contains__('vision')
):
if selected_runner == 'local-agent' and llm_model and 'vision' not in (llm_model.model_entity.abilities or []):
for msg in query.messages:
if isinstance(msg.content, list):
for me in msg.content:
@@ -181,7 +177,7 @@ class PreProcessor(stage.PipelineStage):
plain_text += me.text
elif isinstance(me, platform_message.Image):
if selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
):
if me.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(me.base64))
@@ -202,7 +198,7 @@ class PreProcessor(stage.PipelineStage):
content_list.append(provider_message.ContentElement.from_text(msg.text))
elif isinstance(msg, platform_message.Image):
if selected_runner != 'local-agent' or (
llm_model and llm_model.model_entity.abilities.__contains__('vision')
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
):
if msg.base64 is not None:
content_list.append(provider_message.ContentElement.from_image_base64(msg.base64))
@@ -84,6 +84,18 @@ class WebPageBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter
):
self.listeners.pop(event_type, None)
async def is_stream_output_supported(self) -> bool:
"""Delegate stream output check to ws_adapter."""
if self._ws_adapter is not None:
return await self._ws_adapter.is_stream_output_supported()
return False
async def create_message_card(self, message_id: str | int, event: platform_events.MessageEvent) -> bool:
"""Delegate create_message_card to ws_adapter."""
if self._ws_adapter is not None:
return await self._ws_adapter.create_message_card(message_id, event)
return False
async def is_muted(self, group_id: int) -> bool:
return False
+22 -1
View File
@@ -689,6 +689,16 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
async def get_plugin_readme(self, plugin_author: str, plugin_name: str, language: str = 'en') -> str:
return await self.handler.get_plugin_readme(plugin_author, plugin_name, language)
async def get_plugin_logs(
self,
plugin_author: str,
plugin_name: str,
limit: int = 200,
level: str | None = None,
) -> list[dict[str, Any]]:
# Not cached: logs are live and change constantly.
return await self.handler.get_plugin_logs(plugin_author, plugin_name, limit, level)
@alru_cache(ttl=5 * 60)
async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]:
return await self.handler.get_plugin_assets(plugin_author, plugin_name, filepath)
@@ -701,8 +711,19 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
endpoint: str,
method: str,
body: Any = None,
caller: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body)
return await self.handler.handle_page_api(
plugin_author,
plugin_name,
page_id,
endpoint,
method,
body,
caller,
headers or {},
)
async def get_debug_info(self) -> dict[str, Any]:
"""Get debug information including debug key and WS URL"""
+44
View File
@@ -755,6 +755,21 @@ class RuntimeConnectionHandler(handler.Handler):
'session_name': session_name,
'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id),
'_trace_context': {
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
'parent_span_id': query.variables.get('_monitoring_root_span_id')
if query.variables
else None,
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
'query_id': query.query_id,
'session_id': session_name,
'bot_id': query.bot_uuid or '',
'pipeline_id': query.pipeline_uuid or '',
'knowledge_base_id': kb_id,
'attributes': {
'source': 'plugin-api',
},
},
},
)
results = [entry.model_dump(mode='json') for entry in entries]
@@ -953,6 +968,31 @@ class RuntimeConnectionHandler(handler.Handler):
return readme_bytes.decode('utf-8')
async def get_plugin_logs(
self,
plugin_author: str,
plugin_name: str,
limit: int = 200,
level: str | None = None,
) -> list[dict[str, Any]]:
"""Get recent log lines captured from the plugin's stderr."""
try:
result = await self.call_action(
LangBotToRuntimeAction.GET_PLUGIN_LOGS,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
'limit': limit,
'level': level,
},
timeout=20,
)
except Exception:
traceback.print_exc()
return []
return result.get('logs', [])
async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]:
"""Get plugin assets"""
result = await self.call_action(
@@ -986,6 +1026,8 @@ class RuntimeConnectionHandler(handler.Handler):
endpoint: str,
method: str,
body: Any = None,
caller: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Forward a page API call to the plugin via runtime."""
result = await self.call_action(
@@ -997,6 +1039,8 @@ class RuntimeConnectionHandler(handler.Handler):
'endpoint': endpoint,
'method': method,
'body': body,
'caller': caller,
'headers': headers or {},
},
timeout=30,
)
+62 -6
View File
@@ -37,11 +37,41 @@ class ModelManager:
self.requester_components = []
self.requester_dict = {}
@staticmethod
def _get_litellm_provider_from_manifest(component: engine.Component | None) -> str | None:
if component is None:
return None
spec = getattr(component, 'spec', None) or {}
litellm_provider = None
if isinstance(spec, dict):
litellm_provider = spec.get('litellm_provider')
else:
getter = getattr(spec, 'get', None)
if callable(getter):
try:
litellm_provider = getter('litellm_provider')
except Exception:
litellm_provider = None
if isinstance(litellm_provider, str) and litellm_provider:
return litellm_provider
return None
async def initialize(self):
self.requester_components = self.ap.discover.get_components_by_kind('LLMAPIRequester')
requester_dict: dict[str, type[requester.ProviderAPIRequester]] = {}
for component in self.requester_components:
# Skip components that use litellm_provider (they will use litellmchat.py instead)
litellm_provider = self._get_litellm_provider_from_manifest(component)
if litellm_provider:
self.ap.logger.debug(
f'Skipping Python class loading for {component.metadata.name} '
f'(uses litellm_provider={litellm_provider})'
)
continue
requester_dict[component.metadata.name] = component.get_python_component_class()
self.requester_dict = requester_dict
@@ -236,6 +266,7 @@ class ModelManager:
name=model_info.get('name', ''),
provider_uuid='',
abilities=model_info.get('abilities', []),
context_length=model_info.get('context_length'),
extra_args=model_info.get('extra_args', {}),
),
provider=runtime_provider,
@@ -294,13 +325,37 @@ class ModelManager:
else:
provider_entity = provider_info
if provider_entity.requester not in self.requester_dict:
raise provider_errors.RequesterNotFoundError(provider_entity.requester)
# Get requester manifest to check for litellm_provider
requester_manifest = self.get_available_requester_manifest_by_name(provider_entity.requester)
litellm_provider = self._get_litellm_provider_from_manifest(requester_manifest)
# Build config from base_url
config = {'base_url': provider_entity.base_url}
# Check if requester manifest specifies litellm_provider
if litellm_provider:
from .requesters import litellmchat
# Use unified LiteLLMRequester with provider prefix
# Map litellm_provider (YAML spec) to custom_llm_provider (config)
config['custom_llm_provider'] = litellm_provider
requester_inst = litellmchat.LiteLLMRequester(
ap=self.ap,
config=config,
)
self.ap.logger.debug(
f'Using LiteLLMRequester for {provider_entity.requester} '
f'with custom_llm_provider={config["custom_llm_provider"]}'
)
else:
# Use original requester class (for backward compatibility)
if provider_entity.requester not in self.requester_dict:
raise provider_errors.RequesterNotFoundError(provider_entity.requester)
requester_inst = self.requester_dict[provider_entity.requester](
ap=self.ap,
config=config,
)
requester_inst = self.requester_dict[provider_entity.requester](
ap=self.ap,
config={'base_url': provider_entity.base_url},
)
await requester_inst.initialize()
token_mgr = token.TokenManager(name=provider_entity.uuid, tokens=provider_entity.api_keys or [])
@@ -406,6 +461,7 @@ class ModelManager:
name=model_info.get('name', ''),
provider_uuid=model_info.get('provider_uuid', ''),
abilities=model_info.get('abilities', []),
context_length=model_info.get('context_length'),
extra_args=model_info.get('extra_args', {}),
)
+86 -3
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import abc
import typing
import time
import datetime
from ...core import app
from ...entity.persistence import model as persistence_model
@@ -12,6 +13,28 @@ import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
LLM_USAGE_QUERY_VARIABLE = '_llm_usage'
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
def _query_session_id(query: pipeline_query.Query) -> str:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
return f'{launcher_type}_{query.launcher_id}'
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
"""Store the latest provider usage on the query for upstream action handlers."""
if query is None or not usage_info:
return
if query.variables is None:
query.variables = {}
query.variables[LLM_USAGE_QUERY_VARIABLE] = dict(usage_info)
class RuntimeProvider:
"""运行时模型提供商"""
@@ -46,6 +69,7 @@ class RuntimeProvider:
"""Bridge method for invoking LLM with monitoring"""
# Start timing for monitoring
start_time = time.time()
span_started_at = _utc_now()
input_tokens = 0
output_tokens = 0
status = 'success'
@@ -67,8 +91,9 @@ class RuntimeProvider:
if isinstance(result, tuple):
msg, usage_info = result
if usage_info:
input_tokens = usage_info.get('input_tokens', 0)
output_tokens = usage_info.get('output_tokens', 0)
_store_llm_usage(query, usage_info)
input_tokens = usage_info.get('prompt_tokens', 0)
output_tokens = usage_info.get('completion_tokens', 0)
return msg
else:
return result
@@ -111,6 +136,30 @@ class RuntimeProvider:
error_message=error_message,
message_id=message_id,
)
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
if trace_id:
await self.requester.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=parent_span_id,
name=f'LLM {model.model_entity.name}',
kind='model.llm',
status=status,
started_at=span_started_at,
duration=duration_ms,
message_id=message_id,
session_id=_query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=query.pipeline_uuid,
attributes={
'model_name': model.model_entity.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'stream': False,
},
error_message=error_message,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
@@ -126,9 +175,9 @@ class RuntimeProvider:
"""Bridge method for invoking LLM stream with monitoring"""
# Start timing for monitoring
start_time = time.time()
span_started_at = _utc_now()
status = 'success'
error_message = None
# Note: Stream doesn't easily provide token counts, set to 0
input_tokens = 0
output_tokens = 0
@@ -143,6 +192,16 @@ class RuntimeProvider:
remove_think=remove_think,
):
yield chunk
# Extract usage from stream if available (stored by LiteLLM requester)
if query:
if query.variables is None:
query.variables = {}
if STREAM_USAGE_QUERY_VARIABLE in query.variables:
usage_info = query.variables[STREAM_USAGE_QUERY_VARIABLE]
_store_llm_usage(query, usage_info)
input_tokens = usage_info.get('prompt_tokens', 0)
output_tokens = usage_info.get('completion_tokens', 0)
del query.variables[STREAM_USAGE_QUERY_VARIABLE]
except Exception as e:
status = 'error'
error_message = str(e)
@@ -181,6 +240,30 @@ class RuntimeProvider:
error_message=error_message,
message_id=message_id,
)
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
if trace_id:
await self.requester.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=parent_span_id,
name=f'LLM stream {model.model_entity.name}',
kind='model.llm',
status=status,
started_at=span_started_at,
duration=duration_ms,
message_id=message_id,
session_id=_query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=query.pipeline_uuid,
attributes={
'model_name': model.model_entity.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'stream': True,
},
error_message=error_message,
)
except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
@@ -1,17 +0,0 @@
from __future__ import annotations
import typing
import openai
from . import chatcmpl
class AI302ChatCompletions(chatcmpl.OpenAIChatCompletions):
"""302.AI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.302.ai/v1',
'timeout': 120,
}
@@ -7,6 +7,7 @@ metadata:
zh_Hans: 302.AI
icon: 302ai.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "302ai 302.AI 302 ai 中转 中转站 aggregator gpt claude gemini"
support_type:
- llm
- text-embedding
@@ -1,370 +0,0 @@
from __future__ import annotations
import typing
import json
import platform
import socket
import anthropic
import httpx
from .. import errors, requester
from ....utils import image
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class AnthropicMessages(requester.ProviderAPIRequester):
"""Anthropic Messages API 请求器"""
client: anthropic.AsyncAnthropic
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.anthropic.com',
'timeout': 120,
}
async def initialize(self):
# 兼容 Windows 缺失 TCP_KEEPINTVL 和 TCP_KEEPCNT 的问题
if platform.system() == 'Windows':
if not hasattr(socket, 'TCP_KEEPINTVL'):
socket.TCP_KEEPINTVL = 0
if not hasattr(socket, 'TCP_KEEPCNT'):
socket.TCP_KEEPCNT = 0
httpx_client = anthropic._base_client.AsyncHttpxClientWrapper(
base_url=self.requester_cfg['base_url'],
# cast to a valid type because mypy doesn't understand our type narrowing
timeout=typing.cast(httpx.Timeout, self.requester_cfg['timeout']),
limits=anthropic._constants.DEFAULT_CONNECTION_LIMITS,
follow_redirects=True,
trust_env=True,
)
self.client = anthropic.AsyncAnthropic(
api_key='',
http_client=httpx_client,
base_url=self.requester_cfg['base_url'],
)
async def invoke_llm(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
self.client.api_key = model.provider.token_mgr.get_token()
args = extra_args.copy()
args['model'] = model.model_entity.name
# 处理消息
# system
system_role_message = None
for i, m in enumerate(messages):
if m.role == 'system':
system_role_message = m
break
if system_role_message:
messages.pop(i)
if isinstance(system_role_message, provider_message.Message) and isinstance(system_role_message.content, str):
args['system'] = system_role_message.content
req_messages = []
for m in messages:
if m.role == 'tool':
tool_call_id = m.tool_call_id
req_messages.append(
{
'role': 'user',
'content': [
{
'type': 'tool_result',
'tool_use_id': tool_call_id,
'is_error': False,
'content': [{'type': 'text', 'text': m.content}],
}
],
}
)
continue
msg_dict = m.dict(exclude_none=True)
if isinstance(m.content, str) and m.content.strip() != '':
msg_dict['content'] = [{'type': 'text', 'text': m.content}]
elif isinstance(m.content, list):
for i, ce in enumerate(m.content):
if ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
alter_image_ele = {
'type': 'image',
'source': {
'type': 'base64',
'media_type': f'image/{image_format}',
'data': image_b64,
},
}
msg_dict['content'][i] = alter_image_ele
if m.tool_calls:
for tool_call in m.tool_calls:
msg_dict['content'].append(
{
'type': 'tool_use',
'id': tool_call.id,
'name': tool_call.function.name,
'input': json.loads(tool_call.function.arguments),
}
)
del msg_dict['tool_calls']
req_messages.append(msg_dict)
args['messages'] = req_messages
if 'thinking' in args:
args['thinking'] = {'type': 'enabled', 'budget_tokens': 10000}
if funcs:
tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs)
if tools:
args['tools'] = tools
try:
resp = await self.client.messages.create(**args)
args = {
'content': '',
'role': resp.role,
}
assert type(resp) is anthropic.types.message.Message
for block in resp.content:
if not remove_think and block.type == 'thinking':
args['content'] = '<think>\n' + block.thinking + '\n</think>\n' + args['content']
elif block.type == 'text':
args['content'] += block.text
elif block.type == 'tool_use':
assert type(block) is anthropic.types.tool_use_block.ToolUseBlock
tool_call = provider_message.ToolCall(
id=block.id,
type='function',
function=provider_message.FunctionCall(name=block.name, arguments=json.dumps(block.input)),
)
if 'tool_calls' not in args:
args['tool_calls'] = []
args['tool_calls'].append(tool_call)
return provider_message.Message(**args)
except anthropic.AuthenticationError as e:
raise errors.RequesterError(f'api-key 无效: {e.message}')
except anthropic.BadRequestError as e:
raise errors.RequesterError(str(e.message))
except anthropic.NotFoundError as e:
if 'model: ' in str(e):
raise errors.RequesterError(f'模型无效: {e.message}')
else:
raise errors.RequesterError(f'请求地址无效: {e.message}')
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message:
self.client.api_key = model.provider.token_mgr.get_token()
args = extra_args.copy()
args['model'] = model.model_entity.name
args['stream'] = True
# 处理消息
# system
system_role_message = None
for i, m in enumerate(messages):
if m.role == 'system':
system_role_message = m
break
if system_role_message:
messages.pop(i)
if isinstance(system_role_message, provider_message.Message) and isinstance(system_role_message.content, str):
args['system'] = system_role_message.content
req_messages = []
for m in messages:
if m.role == 'tool':
tool_call_id = m.tool_call_id
req_messages.append(
{
'role': 'user',
'content': [
{
'type': 'tool_result',
'tool_use_id': tool_call_id,
'is_error': False, # 暂时直接写false
'content': [
{'type': 'text', 'text': m.content}
], # 这里要是list包裹,应该是多个返回的情况?type类型好像也可以填其他的,暂时只写text
}
],
}
)
continue
msg_dict = m.dict(exclude_none=True)
if isinstance(m.content, str) and m.content.strip() != '':
msg_dict['content'] = [{'type': 'text', 'text': m.content}]
elif isinstance(m.content, list):
for i, ce in enumerate(m.content):
if ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
alter_image_ele = {
'type': 'image',
'source': {
'type': 'base64',
'media_type': f'image/{image_format}',
'data': image_b64,
},
}
msg_dict['content'][i] = alter_image_ele
if isinstance(msg_dict['content'], str) and msg_dict['content'] == '':
msg_dict['content'] = [] # 这里不知道为什么会莫名有个空导致content为字符
if m.tool_calls:
for tool_call in m.tool_calls:
msg_dict['content'].append(
{
'type': 'tool_use',
'id': tool_call.id,
'name': tool_call.function.name,
'input': json.loads(tool_call.function.arguments),
}
)
del msg_dict['tool_calls']
req_messages.append(msg_dict)
if 'thinking' in args:
args['thinking'] = {'type': 'enabled', 'budget_tokens': 10000}
args['messages'] = req_messages
if funcs:
tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs)
if tools:
args['tools'] = tools
try:
role = 'assistant' # 默认角色
# chunk_idx = 0
think_started = False
think_ended = False
finish_reason = False
tool_name = ''
tool_id = ''
async for chunk in await self.client.messages.create(**args):
content = ''
tool_call = {'id': None, 'function': {'name': None, 'arguments': None}, 'type': 'function'}
if isinstance(
chunk, anthropic.types.raw_content_block_start_event.RawContentBlockStartEvent
): # 记录开始
if chunk.content_block.type == 'tool_use':
if chunk.content_block.name is not None:
tool_name = chunk.content_block.name
if chunk.content_block.id is not None:
tool_id = chunk.content_block.id
tool_call['function']['name'] = tool_name
tool_call['function']['arguments'] = ''
tool_call['id'] = tool_id
if not remove_think:
if chunk.content_block.type == 'thinking' and not remove_think:
think_started = True
elif chunk.content_block.type == 'text' and chunk.index != 0 and not remove_think:
think_ended = True
continue
elif isinstance(chunk, anthropic.types.raw_content_block_delta_event.RawContentBlockDeltaEvent):
if chunk.delta.type == 'thinking_delta':
if think_started:
think_started = False
content = '<think>\n' + chunk.delta.thinking
elif remove_think:
continue
else:
content = chunk.delta.thinking
elif chunk.delta.type == 'text_delta':
if think_ended:
think_ended = False
content = '\n</think>\n' + chunk.delta.text
else:
content = chunk.delta.text
elif chunk.delta.type == 'input_json_delta':
tool_call['function']['arguments'] = chunk.delta.partial_json
tool_call['function']['name'] = tool_name
tool_call['id'] = tool_id
elif isinstance(chunk, anthropic.types.raw_content_block_stop_event.RawContentBlockStopEvent):
continue # 记录raw_content_block结束的
elif isinstance(chunk, anthropic.types.raw_message_delta_event.RawMessageDeltaEvent):
if chunk.delta.stop_reason == 'end_turn':
finish_reason = True
elif isinstance(chunk, anthropic.types.raw_message_stop_event.RawMessageStopEvent):
continue # 这个好像是完全结束
else:
# print(chunk)
self.ap.logger.debug(f'anthropic chunk: {chunk}')
continue
args = {
'content': content,
'role': role,
'is_final': finish_reason,
'tool_calls': None if tool_call['id'] is None else [tool_call],
}
# if chunk_idx == 0:
# chunk_idx += 1
# continue
# assert type(chunk) is anthropic.types.message.Chunk
yield provider_message.MessageChunk(**args)
# return llm_entities.Message(**args)
except anthropic.AuthenticationError as e:
raise errors.RequesterError(f'api-key 无效: {e.message}')
except anthropic.BadRequestError as e:
raise errors.RequesterError(str(e.message))
except anthropic.NotFoundError as e:
if 'model: ' in str(e):
raise errors.RequesterError(f'模型无效: {e.message}')
else:
raise errors.RequesterError(f'请求地址无效: {e.message}')
@@ -7,6 +7,7 @@ metadata:
zh_Hans: Anthropic
icon: anthropic.svg
spec:
litellm_provider: anthropic
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "anthropic Anthropic 克劳德 claude Claude Opus Sonnet Haiku 安thropic"
support_type:
- llm
provider_category: manufacturer
@@ -0,0 +1,5 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#2932E1"/>
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">Baidu</text>
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">ERNIE</text>
</svg>

After

Width:  |  Height:  |  Size: 396 B

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: baidu-chat-completions
label:
en_US: Baidu ERNIE
zh_Hans: 百度文心一言
icon: baidu.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "baidu Baidu 百度 千帆 qianfan wenxin 文心 文心一言 ernie ERNIE bce embedding bce-reranker"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer
@@ -1,242 +0,0 @@
from __future__ import annotations
import typing
import dashscope
import openai
from . import modelscopechatcmpl
from .. import requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class BailianChatCompletions(modelscopechatcmpl.ModelScopeChatCompletions):
"""阿里云百炼大模型平台 ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'timeout': 120,
}
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
is_use_dashscope_call = False # 是否使用阿里原生库调用
is_enable_multi_model = True # 是否支持多轮对话
use_time_num = 0 # 模型已调用次数,防止存在多文件时重复调用
use_time_ids = [] # 已调用的ID列表
message_id = 0 # 记录消息序号
for msg in messages:
# print(msg)
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
elif me['type'] == 'file_url' and '.' in me.get('file_name', ''):
# 1. 视频文件推理
# https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2845871
file_type = me.get('file_name').lower().split('.')[-1]
if file_type in ['mp4', 'avi', 'mkv', 'mov', 'flv', 'wmv']:
me['type'] = 'video_url'
me['video_url'] = {'url': me['file_url']}
del me['file_url']
del me['file_name']
use_time_num += 1
use_time_ids.append(message_id)
is_enable_multi_model = False
# 2. 语音文件识别, 无法通过openai的audio字段传递,暂时不支持
# https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2979031
elif file_type in [
'aac',
'amr',
'aiff',
'flac',
'm4a',
'mp3',
'mpeg',
'ogg',
'opus',
'wav',
'webm',
'wma',
]:
me['audio'] = me['file_url']
me['type'] = 'audio'
del me['file_url']
del me['type']
del me['file_name']
is_use_dashscope_call = True
use_time_num += 1
use_time_ids.append(message_id)
is_enable_multi_model = False
message_id += 1
# 使用列表推导式,保留不在 use_time_ids[:-1] 中的元素,仅保留最后一个多媒体消息
if not is_enable_multi_model and use_time_num > 1:
messages = [msg for idx, msg in enumerate(messages) if idx not in use_time_ids[:-1]]
if not is_enable_multi_model:
messages = [msg for msg in messages if 'resp_message_id' not in msg]
args['messages'] = messages
args['stream'] = True
# 流式处理状态
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
if is_use_dashscope_call:
response = dashscope.MultiModalConversation.call(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key = "sk-xxx"
api_key=use_model.provider.token_mgr.get_token(),
model=use_model.model_entity.name,
messages=messages,
result_format='message',
asr_options={
# "language": "zh", # 可选,若已知音频的语种,可通过该参数指定待识别语种,以提升识别准确率
'enable_lid': True,
'enable_itn': False,
},
stream=True,
)
content_length_list = []
previous_length = 0 # 记录上一次的内容长度
for res in response:
chunk = res['output']
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta_content = choice['message'].content[0]['text']
finish_reason = choice['finish_reason']
content_length_list.append(len(delta_content))
else:
delta_content = ''
finish_reason = None
# 跳过空的第一个 chunk(只有 role 没有内容)
if chunk_idx == 0 and not delta_content:
chunk_idx += 1
continue
# 检查 content_length_list 是否有足够的数据
if len(content_length_list) >= 2:
now_content = delta_content[previous_length : content_length_list[-1]]
previous_length = content_length_list[-1] # 更新上一次的长度
else:
now_content = delta_content # 第一次循环时直接使用 delta_content
previous_length = len(delta_content) # 更新上一次的长度
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': now_content if now_content else None,
'is_final': bool(finish_reason) and finish_reason != 'null',
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
else:
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role,后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# 处理 reasoning_content
if reasoning_content:
# accumulated_reasoning += reasoning_content
# 如果设置了 remove_think,跳过 reasoning_content
if remove_think:
chunk_idx += 1
continue
# 第一次出现 reasoning_content,添加 <think> 开始标签
if not thinking_started:
thinking_started = True
delta_content = '<think>\n' + reasoning_content
else:
# 继续输出 reasoning_content
delta_content = reasoning_content
elif thinking_started and not thinking_ended and delta_content:
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
thinking_ended = True
delta_content = '\n</think>\n' + delta_content
# 处理工具调用增量
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] != '':
tool_id = tool_call['id']
if tool_call['function']['name'] is not None:
tool_name = tool_call['function']['name']
if tool_call['type'] is None:
tool_call['type'] = 'function'
tool_call['id'] = tool_id
tool_call['function']['name'] = tool_name
tool_call['function']['arguments'] = (
'' if tool_call['function']['arguments'] is None else tool_call['function']['arguments']
)
# 跳过空的第一个 chunk(只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
# return
@@ -7,6 +7,7 @@ metadata:
zh_Hans: 阿里云百炼
icon: bailian.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,8 +23,10 @@ spec:
type: integer
required: true
default: 120
alias: "bailian 百炼 阿里 阿里云 aliyun alibaba dashscope 通义 通义千问 qwen Qwen tongyi gte-rerank text-embedding-v"
support_type:
- llm
- text-embedding
- rerank
provider_category: maas
execution:
@@ -1,702 +0,0 @@
from __future__ import annotations
import asyncio
import typing
import openai
import openai.types.chat.chat_completion as chat_completion_module
import httpx
from .. import errors, requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class OpenAIChatCompletions(requester.ProviderAPIRequester):
"""OpenAI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.openai.com/v1',
'timeout': 120,
}
async def initialize(self):
self.client = openai.AsyncClient(
api_key=self.init_api_key,
base_url=self.requester_cfg['base_url'].replace(' ', ''),
timeout=self.requester_cfg['timeout'],
http_client=httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']),
)
def _mask_api_key(self, api_key: str | None) -> str:
if not api_key:
return ''
if len(api_key) <= 8:
return '****'
return f'{api_key[:4]}...{api_key[-4:]}'
def _infer_model_type(self, model_id: str) -> str:
normalized_model_id = (model_id or '').lower()
embedding_keywords = (
'embedding',
'embed',
'bge-',
'e5-',
'm3e',
'gte-',
'multilingual-e5',
'text-embedding',
)
return 'embedding' if any(keyword in normalized_model_id for keyword in embedding_keywords) else 'llm'
def _infer_model_abilities(self, item: dict[str, typing.Any], model_id: str) -> list[str]:
normalized_model_id = (model_id or '').lower()
abilities: set[str] = set()
def _flatten(value: typing.Any) -> list[str]:
if value is None:
return []
if isinstance(value, str):
return [value.lower()]
if isinstance(value, dict):
flattened: list[str] = []
for nested_value in value.values():
flattened.extend(_flatten(nested_value))
return flattened
if isinstance(value, (list, tuple, set)):
flattened: list[str] = []
for nested_value in value:
flattened.extend(_flatten(nested_value))
return flattened
return [str(value).lower()]
capability_tokens = _flatten(item.get('capabilities'))
capability_tokens.extend(_flatten(item.get('modalities')))
capability_tokens.extend(_flatten(item.get('input_modalities')))
capability_tokens.extend(_flatten(item.get('output_modalities')))
capability_tokens.extend(_flatten(item.get('supported_generation_methods')))
capability_tokens.extend(_flatten(item.get('supported_parameters')))
capability_tokens.extend(_flatten(item.get('architecture')))
combined_tokens = capability_tokens + [normalized_model_id]
vision_keywords = (
'vision',
'image',
'file',
'video',
'multimodal',
'vl',
'ocr',
'omni',
)
function_call_keywords = (
'function',
'tool',
'tools',
'tool_choice',
'tool_call',
'tool-use',
'tool_use',
)
if any(any(keyword in token for keyword in vision_keywords) for token in combined_tokens):
abilities.add('vision')
if any(any(keyword in token for keyword in function_call_keywords) for token in combined_tokens):
abilities.add('func_call')
return sorted(abilities)
def _normalize_modalities(self, value: typing.Any) -> list[str]:
normalized: list[str] = []
def _collect(item: typing.Any):
if item is None:
return
if isinstance(item, str):
for part in item.replace('->', ',').replace('+', ',').split(','):
token = part.strip().lower()
if token and token not in normalized:
normalized.append(token)
return
if isinstance(item, dict):
for nested in item.values():
_collect(nested)
return
if isinstance(item, (list, tuple, set)):
for nested in item:
_collect(nested)
return
_collect(value)
return normalized
def _extract_scan_metadata(self, item: dict[str, typing.Any], model_id: str) -> dict[str, typing.Any]:
display_name = item.get('name')
if not isinstance(display_name, str) or not display_name.strip() or display_name == model_id:
display_name = ''
description = item.get('description')
if not isinstance(description, str) or not description.strip():
description = ''
context_length = item.get('context_length')
if context_length is None and isinstance(item.get('top_provider'), dict):
context_length = item['top_provider'].get('context_length')
if not isinstance(context_length, int):
try:
context_length = int(context_length) if context_length is not None else None
except (TypeError, ValueError):
context_length = None
input_modalities = self._normalize_modalities(item.get('input_modalities'))
output_modalities = self._normalize_modalities(item.get('output_modalities'))
if isinstance(item.get('architecture'), dict):
if not input_modalities:
input_modalities = self._normalize_modalities(item['architecture'].get('input_modalities'))
if not output_modalities:
output_modalities = self._normalize_modalities(item['architecture'].get('output_modalities'))
owned_by = item.get('owned_by')
if not isinstance(owned_by, str) or not owned_by.strip():
owned_by = ''
return {
'display_name': display_name or None,
'description': description or None,
'context_length': context_length,
'owned_by': owned_by or None,
'input_modalities': input_modalities,
'output_modalities': output_modalities,
}
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
headers = {}
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
models_url = f'{self.requester_cfg["base_url"].rstrip("/")}/models'
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
response = await client.get(models_url, headers=headers)
response.raise_for_status()
payload = response.json()
models = []
for item in payload.get('data', []):
model_id = item.get('id')
if not model_id:
continue
models.append(
{
'id': model_id,
'name': model_id,
'type': self._infer_model_type(model_id),
'abilities': self._infer_model_abilities(item, model_id),
**self._extract_scan_metadata(item, model_id),
}
)
models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
return {
'models': models,
'debug': {
'request': {
'method': 'GET',
'url': models_url,
'headers': {
'Authorization': f'Bearer {self._mask_api_key(api_key)}' if api_key else '',
},
},
'response': payload,
},
}
async def _req(
self,
args: dict,
extra_body: dict = {},
) -> chat_completion_module.ChatCompletion:
return await self.client.chat.completions.create(**args, extra_body=extra_body)
async def _req_stream(
self,
args: dict,
extra_body: dict = {},
):
async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body):
yield chunk
async def _make_msg(
self,
chat_completion: chat_completion_module.ChatCompletion,
remove_think: bool = False,
) -> provider_message.Message:
if not isinstance(chat_completion, chat_completion_module.ChatCompletion):
raise TypeError(f'Expected ChatCompletion, got {type(chat_completion).__name__}: {chat_completion[:16]}')
chatcmpl_message = chat_completion.choices[0].message.model_dump()
# 确保 role 字段存在且不为 None
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
chatcmpl_message['role'] = 'assistant'
# 处理思维链
content = chatcmpl_message.get('content', '')
reasoning_content = chatcmpl_message.get('reasoning_content', None)
processed_content, _ = await self._process_thinking_content(
content=content, reasoning_content=reasoning_content, remove_think=remove_think
)
chatcmpl_message['content'] = processed_content
# 移除 reasoning_content 字段,避免传递给 Message
if 'reasoning_content' in chatcmpl_message:
del chatcmpl_message['reasoning_content']
message = provider_message.Message(**chatcmpl_message)
return message
async def _process_thinking_content(
self,
content: str,
reasoning_content: str = None,
remove_think: bool = False,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
reasoning_content: reasoning_content 字段内容
remove_think: 是否移除思维链
Returns:
(处理后的内容, 提取的思维链内容)
"""
thinking_content = ''
# 1. 从 reasoning_content 提取思维链
if reasoning_content:
thinking_content = reasoning_content
# 2. 从 content 中提取 <think> 标签内容
if content and '<think>' in content and '</think>' in content:
import re
think_pattern = r'<think>(.*?)</think>'
think_matches = re.findall(think_pattern, content, re.DOTALL)
if think_matches:
# 如果已有 reasoning_content,则追加
if thinking_content:
thinking_content += '\n' + '\n'.join(think_matches)
else:
thinking_content = '\n'.join(think_matches)
# 移除 content 中的 <think> 标签
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# 3. 根据 remove_think 参数决定是否保留思维链
if remove_think:
return content, ''
else:
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
if thinking_content:
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
args['stream'] = True
# 流式处理状态
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
tool_id = ''
tool_name = ''
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role,后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# 处理 reasoning_content
if reasoning_content:
# accumulated_reasoning += reasoning_content
# 如果设置了 remove_think,跳过 reasoning_content
if remove_think:
chunk_idx += 1
continue
# 第一次出现 reasoning_content,添加 <think> 开始标签
if not thinking_started:
thinking_started = True
delta_content = '<think>\n' + reasoning_content
else:
# 继续输出 reasoning_content
delta_content = reasoning_content
elif thinking_started and not thinking_ended and delta_content:
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
thinking_ended = True
delta_content = '\n</think>\n' + delta_content
# 处理 content 中已有的 <think> 标签(如果需要移除)
# if delta_content and remove_think and '<think>' in delta_content:
# import re
#
# # 移除 <think> 标签及其内容
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
# 处理工具调用增量
# delta_tool_calls = None
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] and tool_call['function']['name']:
tool_id = tool_call['id']
tool_name = tool_call['function']['name']
else:
tool_call['id'] = tool_id
tool_call['function']['name'] = tool_name
if tool_call['type'] is None:
tool_call['type'] = 'function'
# 跳过空的第一个 chunk(只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
async def _closure(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
# 发送请求
resp = await self._req(args, extra_body=extra_args)
# 处理请求结果
message = await self._make_msg(resp, remove_think)
# Extract token usage from response
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return message, usage_info
async def invoke_llm(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
"""Invoke LLM and return message with usage info"""
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
for m in messages:
msg_dict = m.dict(exclude_none=True)
content = msg_dict.get('content')
if isinstance(content, list):
# 检查 content 列表中是否每个部分都是文本
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
# 将所有文本部分合并为一个字符串
msg_dict['content'] = '\n'.join(part['text'] for part in content)
req_messages.append(msg_dict)
try:
msg, usage_info = await self._closure(
query=query,
req_messages=req_messages,
use_model=model,
use_funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
)
return msg, usage_info
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
if 'context_length_exceeded' in str(e):
raise errors.RequesterError(f'上文过长,请重置会话: {error_message}')
else:
raise errors.RequesterError(f'请求参数错误: {error_message}')
except openai.AuthenticationError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'无效的 api-key: {error_message}')
except openai.NotFoundError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求路径错误: {error_message}')
except openai.RateLimitError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求过于频繁或余额不足: {error_message}')
except openai.APIConnectionError as e:
error_message = f'连接错误: {str(e)}'
raise errors.RequesterError(error_message)
except openai.APIError as e:
error_message = str(e.message) if hasattr(e, 'message') else str(e)
raise errors.RequesterError(f'请求错误: {error_message}')
async def invoke_embedding(
self,
model: requester.RuntimeEmbeddingModel,
input_text: list[str],
extra_args: dict[str, typing.Any] = {},
) -> tuple[list[list[float]], dict]:
"""调用 Embedding API, returns (embeddings, usage_info)"""
self.client.api_key = model.provider.token_mgr.get_token()
args = {
'model': model.model_entity.name,
'input': input_text,
}
if model.model_entity.extra_args:
args.update(model.model_entity.extra_args)
args.update(extra_args)
try:
resp = await self.client.embeddings.create(**args)
# Extract usage info
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['prompt_tokens'] = resp.usage.prompt_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return [d.embedding for d in resp.data], usage_info
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
raise errors.RequesterError(f'请求参数错误: {e.message}')
async def invoke_llm_stream(
self,
query: pipeline_query.Query,
model: requester.RuntimeLLMModel,
messages: typing.List[provider_message.Message],
funcs: typing.List[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
for m in messages:
msg_dict = m.dict(exclude_none=True)
content = msg_dict.get('content')
if isinstance(content, list):
# 检查 content 列表中是否每个部分都是文本
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
# 将所有文本部分合并为一个字符串
msg_dict['content'] = '\n'.join(part['text'] for part in content)
req_messages.append(msg_dict)
try:
async for item in self._closure_stream(
query=query,
req_messages=req_messages,
use_model=model,
use_funcs=funcs,
extra_args=extra_args,
remove_think=remove_think,
):
yield item
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')
except openai.BadRequestError as e:
if 'context_length_exceeded' in e.message:
raise errors.RequesterError(f'上文过长,请重置会话: {e.message}')
else:
raise errors.RequesterError(f'请求参数错误: {e.message}')
except openai.AuthenticationError as e:
raise errors.RequesterError(f'无效的 api-key: {e.message}')
except openai.NotFoundError as e:
raise errors.RequesterError(f'请求路径错误: {e.message}')
except openai.RateLimitError as e:
raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}')
except openai.APIError as e:
raise errors.RequesterError(f'请求错误: {e.message}')
async def invoke_rerank(
self,
model: requester.RuntimeRerankModel,
query: str,
documents: typing.List[str],
extra_args: dict[str, typing.Any] = {},
) -> typing.List[dict]:
"""Standard /rerank endpoint (Jina/Cohere/SiliconFlow/Voyage/DashScope compatible)
Supports extra_args from model.extra_args:
- rerank_url: full URL override (e.g. "https://dashscope.aliyuncs.com/compatible-api/v1/reranks")
- rerank_path: path override appended to base_url (e.g. "reranks" instead of default "rerank")
- Any other fields are merged into the request payload.
"""
api_key = model.provider.token_mgr.get_token()
base_url = self.requester_cfg.get('base_url', '').rstrip('/')
timeout = self.requester_cfg.get('timeout', 120)
merged_args = {}
if model.model_entity.extra_args:
merged_args.update(model.model_entity.extra_args)
if extra_args:
merged_args.update(extra_args)
rerank_url = merged_args.pop('rerank_url', None)
rerank_path = merged_args.pop('rerank_path', 'rerank')
if not rerank_url:
rerank_url = f'{base_url}/{rerank_path}'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
payload = {
'model': model.model_entity.name,
'query': query,
'documents': documents[:64],
'top_n': min(len(documents), 64),
}
if merged_args:
payload.update(merged_args)
try:
async with httpx.AsyncClient(trust_env=True, timeout=timeout) as client:
resp = await client.post(rerank_url, headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
results = self._parse_rerank_response(data)
if results:
scores = [r.get('relevance_score', 0.0) for r in results]
min_score = min(scores)
max_score = max(scores)
if max_score - min_score > 1e-6:
for r in results:
r['relevance_score'] = (r['relevance_score'] - min_score) / (max_score - min_score)
return results
except httpx.HTTPStatusError as e:
raise errors.RequesterError(f'Rerank request failed: {e.response.status_code} - {e.response.text}')
except httpx.TimeoutException:
raise errors.RequesterError('Rerank request timed out')
except Exception as e:
raise errors.RequesterError(f'Rerank request error: {str(e)}')
@staticmethod
def _parse_rerank_response(data: dict) -> typing.List[dict]:
"""Parse rerank response from various providers.
Handles:
- Jina/Cohere/SiliconFlow: {"results": [{"index", "relevance_score"}]}
- Voyage AI: {"data": [{"index", "relevance_score"}]}
- DashScope: {"output": {"results": [{"index", "relevance_score"}]}}
"""
if 'results' in data:
return data['results']
if 'data' in data:
return data['data']
if 'output' in data and isinstance(data['output'], dict):
return data['output'].get('results', [])
return []
@@ -7,6 +7,7 @@ metadata:
zh_Hans: OpenAI
icon: openai.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,10 +23,10 @@ spec:
type: integer
required: true
default: 120
alias: "openai OpenAI 欧派 gpt GPT ChatGPT chatgpt o1 o3 o4 text-embedding 通用 openai兼容 compatible"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer
execution:
python:
@@ -12,6 +12,7 @@ metadata:
icon: chroma.svg
spec:
config: []
alias: "chroma Chroma 向量 vector embedding 嵌入 chromadb"
support_type:
- text-embedding
provider_category: builtin
@@ -7,6 +7,7 @@ metadata:
zh_Hans: Cohere
icon: cohere.svg
spec:
litellm_provider: cohere
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "cohere Cohere rerank 重排 reranker rerank-english rerank-multilingual command"
support_type:
- rerank
provider_category: manufacturer
@@ -1,17 +0,0 @@
from __future__ import annotations
import typing
import openai
from . import chatcmpl
class CompShareChatCompletions(chatcmpl.OpenAIChatCompletions):
"""CompShare ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.modelverse.cn/v1',
'timeout': 120,
}
@@ -7,6 +7,7 @@ metadata:
zh_Hans: 优云智算
icon: compshare.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,8 +23,11 @@ spec:
type: integer
required: true
default: 120
alias: "compshare 优刻得 ucloud UCloud 算力 共享算力 GPU"
support_type:
- llm
- text-embedding
- rerank
provider_category: maas
execution:
python:
@@ -1,67 +0,0 @@
from __future__ import annotations
import typing
from . import chatcmpl
from .. import errors, requester
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class DeepseekChatCompletions(chatcmpl.OpenAIChatCompletions):
"""Deepseek ChatCompletion API 请求器"""
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.deepseek.com',
'timeout': 120,
}
async def _closure(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> tuple[provider_message.Message, dict]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages
# deepseek 不支持多模态,把content都转换成纯文字
for m in messages:
if 'content' in m and isinstance(m['content'], list):
m['content'] = ' '.join([c['text'] for c in m['content'] if 'text' in c])
args['messages'] = messages
# 发送请求
resp = await self._req(args, extra_body=extra_args)
# print(resp)
if resp is None:
raise errors.RequesterError('接口返回为空,请确定模型提供商服务是否正常')
# 处理请求结果
message = await self._make_msg(resp, remove_think)
# Extract token usage from response
usage_info = {}
if hasattr(resp, 'usage') and resp.usage:
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
usage_info['total_tokens'] = resp.usage.total_tokens or 0
return message, usage_info
@@ -7,6 +7,7 @@ metadata:
zh_Hans: DeepSeek
icon: deepseek.svg
spec:
litellm_provider: deepseek
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "deepseek DeepSeek 深度求索 深度 求索 dpsk v3 r1 deepseek-chat deepseek-reasoner"
support_type:
- llm
provider_category: manufacturer
@@ -0,0 +1,4 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#3B82F6"/>
<text x="30" y="32" font-family="Arial, sans-serif" font-size="12" font-weight="bold" fill="white" text-anchor="middle">豆包</text>
</svg>

After

Width:  |  Height:  |  Size: 282 B

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: doubao-chat-completions
label:
en_US: ByteDance Doubao
zh_Hans: 字节豆包
icon: doubao.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://ark.cn-beijing.volces.com/api/v3
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "doubao 豆包 字节 字节跳动 bytedance volcengine 火山 火山引擎 ark 方舟 seed"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer
@@ -1,205 +0,0 @@
from __future__ import annotations
import typing
import httpx
from . import chatcmpl
import uuid
from .. import requester
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
class GeminiChatCompletions(chatcmpl.OpenAIChatCompletions):
"""Google Gemini API 请求器"""
default_config: dict[str, typing.Any] = {
'base_url': 'https://generativelanguage.googleapis.com/v1beta/openai',
'timeout': 120,
}
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
models_url = 'https://generativelanguage.googleapis.com/v1beta/models'
params = {'key': api_key} if api_key else {}
all_models: list[dict[str, typing.Any]] = []
next_page_token = ''
last_payload: dict[str, typing.Any] = {}
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
while True:
request_params = dict(params)
if next_page_token:
request_params['pageToken'] = next_page_token
response = await client.get(models_url, params=request_params)
response.raise_for_status()
payload = response.json()
last_payload = payload
for item in payload.get('models', []):
model_name = item.get('name', '')
model_id = model_name.replace('models/', '', 1)
if not model_id:
continue
supported_methods = item.get('supportedGenerationMethods', []) or []
if 'embedContent' in supported_methods and 'generateContent' not in supported_methods:
model_type = 'embedding'
else:
model_type = 'llm'
all_models.append(
{
'id': model_id,
'name': model_id,
'type': model_type,
'abilities': self._infer_model_abilities(item, model_id),
'display_name': item.get('displayName') or None,
'description': item.get('description') or None,
'context_length': item.get('inputTokenLimit'),
'input_modalities': self._normalize_modalities(item.get('inputModalities')),
'output_modalities': self._normalize_modalities(item.get('outputModalities')),
}
)
next_page_token = payload.get('nextPageToken', '')
if not next_page_token:
break
all_models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
return {
'models': all_models,
'debug': {
'request': {
'method': 'GET',
'url': models_url,
'query': {'key': self._mask_api_key(api_key)} if api_key else {},
},
'response': last_payload,
},
}
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.MessageChunk:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
args['stream'] = True
# 流式处理状态
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
tool_id = ''
tool_name = ''
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role,后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
reasoning_content = delta.get('reasoning_content', '')
# 处理 reasoning_content
if reasoning_content:
# accumulated_reasoning += reasoning_content
# 如果设置了 remove_think,跳过 reasoning_content
if remove_think:
chunk_idx += 1
continue
# 第一次出现 reasoning_content,添加 <think> 开始标签
if not thinking_started:
thinking_started = True
delta_content = '<think>\n' + reasoning_content
else:
# 继续输出 reasoning_content
delta_content = reasoning_content
elif thinking_started and not thinking_ended and delta_content:
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
thinking_ended = True
delta_content = '\n</think>\n' + delta_content
# 处理 content 中已有的 <think> 标签(如果需要移除)
# if delta_content and remove_think and '<think>' in delta_content:
# import re
#
# # 移除 <think> 标签及其内容
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
# 处理工具调用增量
# delta_tool_calls = None
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] == '' and tool_id == '':
tool_id = str(uuid.uuid4())
if tool_call['function']['name']:
tool_name = tool_call['function']['name']
tool_call['id'] = tool_id
tool_call['function']['name'] = tool_name
if tool_call['type'] is None:
tool_call['type'] = 'function'
# 跳过空的第一个 chunk(只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
@@ -7,6 +7,7 @@ metadata:
zh_Hans: Google Gemini
icon: gemini.svg
spec:
litellm_provider: gemini
config:
- name: base_url
label:
@@ -22,8 +23,10 @@ spec:
type: integer
required: true
default: 120
alias: "gemini Gemini 谷歌 google Google 双子座 bard flash pro text-embedding-004"
support_type:
- llm
- text-embedding
provider_category: manufacturer
execution:
python:
@@ -1,15 +0,0 @@
from __future__ import annotations
import typing
from . import ppiochatcmpl
class GiteeAIChatCompletions(ppiochatcmpl.PPIOChatCompletions):
"""Gitee AI ChatCompletions API 请求器"""
default_config: dict[str, typing.Any] = {
'base_url': 'https://ai.gitee.com/v1',
'timeout': 120,
}
@@ -7,6 +7,7 @@ metadata:
zh_Hans: Gitee AI
icon: giteeai.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -22,6 +23,7 @@ spec:
type: integer
required: true
default: 120
alias: "gitee Gitee 码云 gitee-ai gitee ai serverless bge embedding rerank"
support_type:
- llm
- text-embedding
@@ -0,0 +1,4 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#F97316"/>
<text x="30" y="32" font-family="Arial, sans-serif" font-size="14" font-weight="bold" fill="white" text-anchor="middle">Groq</text>
</svg>

After

Width:  |  Height:  |  Size: 280 B

@@ -0,0 +1,29 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: groq-chat-completions
label:
en_US: Groq
zh_Hans: Groq
icon: groq.svg
spec:
litellm_provider: groq
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://api.groq.com/openai/v1
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "groq Groq 高速 llama mixtral 推理加速 lpu"
support_type:
- llm
provider_category: manufacturer
@@ -0,0 +1,5 @@
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
<rect width="60" height="50" rx="8" fill="#0066FF"/>
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">iFlytek</text>
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">Spark</text>
</svg>

After

Width:  |  Height:  |  Size: 398 B

@@ -0,0 +1,31 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: iflytek-chat-completions
label:
en_US: iFlytek Spark
zh_Hans: 讯飞星火
icon: iflytek.svg
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://spark-api-open.xf-yun.com/v1
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "iflytek 讯飞 科大讯飞 星火 spark xinghuo xunfei 讯飞星火"
support_type:
- llm
- text-embedding
- rerank
provider_category: manufacturer
@@ -1,208 +0,0 @@
from __future__ import annotations
import openai
import typing
from . import chatcmpl
from .. import requester
import openai.types.chat.chat_completion as chat_completion
import re
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions):
"""接口 AI ChatCompletion API 请求器"""
client: openai.AsyncClient
default_config: dict[str, typing.Any] = {
'base_url': 'https://api.jiekou.ai/openai',
'timeout': 120,
}
is_think: bool = False
async def _make_msg(
self,
chat_completion: chat_completion.ChatCompletion,
remove_think: bool,
) -> provider_message.Message:
chatcmpl_message = chat_completion.choices[0].message.model_dump()
# print(chatcmpl_message.keys(), chatcmpl_message.values())
# 确保 role 字段存在且不为 None
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
chatcmpl_message['role'] = 'assistant'
reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None
# deepseek的reasoner模型
chatcmpl_message['content'] = await self._process_thinking_content(
chatcmpl_message['content'], reasoning_content, remove_think
)
# 移除 reasoning_content 字段,避免传递给 Message
if 'reasoning_content' in chatcmpl_message:
del chatcmpl_message['reasoning_content']
message = provider_message.Message(**chatcmpl_message)
return message
async def _process_thinking_content(
self,
content: str,
reasoning_content: str = None,
remove_think: bool = False,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
reasoning_content: reasoning_content 字段内容
remove_think: 是否移除思维链
Returns:
处理后的内容
"""
if remove_think:
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
else:
if reasoning_content is not None:
content = '<think>\n' + reasoning_content + '\n</think>\n' + content
return content
async def _make_msg_chunk(
self,
delta: dict[str, typing.Any],
idx: int,
) -> provider_message.MessageChunk:
# 处理流式chunk和完整响应的差异
# print(chat_completion.choices[0])
# 确保 role 字段存在且不为 None
if 'role' not in delta or delta['role'] is None:
delta['role'] = 'assistant'
reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
delta['content'] = '' if delta['content'] is None else delta['content']
# print(reasoning_content)
# deepseek的reasoner模型
if reasoning_content is not None:
delta['content'] += reasoning_content
message = provider_message.MessageChunk(**delta)
return message
async def _closure_stream(
self,
query: pipeline_query.Query,
req_messages: list[dict],
use_model: requester.RuntimeLLMModel,
use_funcs: list[resource_tool.LLMTool] = None,
extra_args: dict[str, typing.Any] = {},
remove_think: bool = False,
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
self.client.api_key = use_model.provider.token_mgr.get_token()
args = {}
args['model'] = use_model.model_entity.name
if use_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
if tools:
args['tools'] = tools
# 设置此次请求中的messages
messages = req_messages.copy()
# 检查vision
for msg in messages:
if 'content' in msg and isinstance(msg['content'], list):
for me in msg['content']:
if me['type'] == 'image_base64':
me['image_url'] = {'url': me['image_base64']}
me['type'] = 'image_url'
del me['image_base64']
args['messages'] = messages
args['stream'] = True
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
chunk_idx = 0
thinking_started = False
thinking_ended = False
role = 'assistant' # 默认角色
async for chunk in self._req_stream(args, extra_body=extra_args):
# 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
finish_reason = getattr(choice, 'finish_reason', None)
else:
delta = {}
finish_reason = None
# 从第一个 chunk 获取 role,后续使用这个 role
if 'role' in delta and delta['role']:
role = delta['role']
# 获取增量内容
delta_content = delta.get('content', '')
# reasoning_content = delta.get('reasoning_content', '')
if remove_think:
if delta['content'] is not None:
if '<think>' in delta['content'] and not thinking_started and not thinking_ended:
thinking_started = True
continue
elif delta['content'] == r'</think>' and not thinking_ended:
thinking_ended = True
continue
elif thinking_ended and delta['content'] == '\n\n' and thinking_started:
thinking_started = False
continue
elif thinking_started and not thinking_ended:
continue
# delta_tool_calls = None
if delta.get('tool_calls'):
for tool_call in delta['tool_calls']:
if tool_call['id'] and tool_call['function']['name']:
tool_id = tool_call['id']
tool_name = tool_call['function']['name']
if tool_call['id'] is None:
tool_call['id'] = tool_id
if tool_call['function']['name'] is None:
tool_call['function']['name'] = tool_name
if tool_call['function']['arguments'] is None:
tool_call['function']['arguments'] = ''
if tool_call['type'] is None:
tool_call['type'] = 'function'
# 跳过空的第一个 chunk(只有 role 没有内容)
if chunk_idx == 0 and not delta_content and not delta.get('tool_calls'):
chunk_idx += 1
continue
# 构建 MessageChunk - 只包含增量内容
chunk_data = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': delta.get('tool_calls'),
'is_final': bool(finish_reason),
}
# 移除 None 值
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
@@ -7,6 +7,7 @@ metadata:
zh_Hans: 接口 AI
icon: jiekouai.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
@@ -29,9 +30,11 @@ spec:
type: int
required: true
default: 120
alias: "jiekouai 接口AI 接口 jiekou ai 中转 中转站 aggregator"
support_type:
- llm
- text-embedding
- rerank
provider_category: maas
execution:
python:

Some files were not shown because too many files have changed in this diff Show More