Compare commits
8 Commits
feat/litel
...
feat/addwe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07b90f12a2 | ||
|
|
fd896c6974 | ||
|
|
1fbfa868fb | ||
|
|
ad05819c2e | ||
|
|
0c6f71738c | ||
|
|
af451e7006 | ||
|
|
59f20bcc73 | ||
|
|
e5b3cced1f |
@@ -38,7 +38,7 @@ LangBot 是一个**开源的生产级平台**,用于构建 AI 驱动的即时
|
||||
|
||||
### 核心能力
|
||||
|
||||
- **AI 对话与 Agent** — 多轮对话、工具调用、多模态、流式输出。自带 RAG(知识库),深度集成 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org) 等 LLMOps 平台。
|
||||
- **AI 对话与 Agent** — 多轮对话、工具调用、多模态、流式输出。自带 RAG(知识库),深度集成 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org)、[Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com)等 LLMOps 平台。
|
||||
- **全平台支持** — 一套代码,覆盖 QQ、微信、企业微信、飞书、钉钉、Discord、Telegram、Slack、LINE、KOOK 等平台。
|
||||
- **生产就绪** — 访问控制、限速、敏感词过滤、全面监控与异常处理,已被多家企业采用。
|
||||
- **插件生态** — 数百个插件,跨进程的事件驱动架构,组件扩展,适配 [MCP 协议](https://modelcontextprotocol.io/)。
|
||||
|
||||
@@ -37,7 +37,7 @@ LangBot es una **plataforma de código abierto y grado de producción** para con
|
||||
|
||||
### Capacidades Clave
|
||||
|
||||
- **Conversaciones e Agentes IA** — Diálogos de múltiples turnos, llamadas a herramientas, soporte multimodal, salida en streaming. RAG (base de conocimientos) incorporado con integración profunda con [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org).
|
||||
- **Conversaciones e Agentes IA** — Diálogos de múltiples turnos, llamadas a herramientas, soporte multimodal, salida en streaming. RAG (base de conocimientos) incorporado con integración profunda con [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com).
|
||||
- **Soporte Universal de Plataformas de MI** — Un solo código base para Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK.
|
||||
- **Listo para Producción** — Control de acceso, limitación de velocidad, filtrado de palabras sensibles, monitoreo completo y manejo de excepciones. De confianza para empresas.
|
||||
- **Ecosistema de Plugins** — Cientos de plugins, arquitectura basada en eventos, extensiones de componentes y soporte del [protocolo MCP](https://modelcontextprotocol.io/).
|
||||
|
||||
@@ -37,7 +37,7 @@ LangBot est une **plateforme open-source de niveau production** pour créer des
|
||||
|
||||
### Capacités Clés
|
||||
|
||||
- **Conversations IA & Agents** — Dialogues multi-tours, appels d'outils, support multimodal, sortie en streaming. RAG (base de connaissances) intégré avec intégration profonde de [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org).
|
||||
- **Conversations IA & Agents** — Dialogues multi-tours, appels d'outils, support multimodal, sortie en streaming. RAG (base de connaissances) intégré avec intégration profonde de [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com).
|
||||
- **Support Universel des Plateformes de MI** — Un seul code pour Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK.
|
||||
- **Prêt pour la Production** — Contrôle d'accès, limitation de débit, filtrage de mots sensibles, surveillance complète et gestion des exceptions. Approuvé par les entreprises.
|
||||
- **Écosystème de Plugins** — Des centaines de plugins, architecture événementielle, extensions de composants, et support du [protocole MCP](https://modelcontextprotocol.io/).
|
||||
|
||||
@@ -37,7 +37,7 @@ LangBot は、AI搭載のインスタントメッセージングボットを構
|
||||
|
||||
### 主な機能
|
||||
|
||||
- **AI対話とエージェント** — マルチターン対話、ツール呼び出し、マルチモーダル対応、ストリーミング出力。RAG(ナレッジベース)を内蔵し、[Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org) と深く統合。
|
||||
- **AI対話とエージェント** — マルチターン対話、ツール呼び出し、マルチモーダル対応、ストリーミング出力。RAG(ナレッジベース)を内蔵し、[Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org)、[Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com) と深く統合。
|
||||
- **ユニバーサルIMプラットフォーム対応** — 単一のコードベースで Discord、Telegram、Slack、LINE、QQ、WeChat、WeCom、Lark、DingTalk、KOOK に対応。
|
||||
- **本番環境対応** — アクセス制御、レート制限、センシティブワードフィルタリング、包括的な監視、例外処理を搭載。エンタープライズの信頼に応える品質。
|
||||
- **プラグインエコシステム** — 数百のプラグイン、イベント駆動アーキテクチャ、コンポーネント拡張、[MCPプロトコル](https://modelcontextprotocol.io/)対応。
|
||||
|
||||
@@ -37,7 +37,7 @@ LangBot은 AI 기반 인스턴트 메시징 봇을 구축하기 위한 **오픈
|
||||
|
||||
### 핵심 기능
|
||||
|
||||
- **AI 대화 및 에이전트** — 멀티턴 대화, 도구 호출, 멀티모달 지원, 스트리밍 출력. 내장 RAG(지식 베이스)와 [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org) 심층 통합.
|
||||
- **AI 대화 및 에이전트** — 멀티턴 대화, 도구 호출, 멀티모달 지원, 스트리밍 출력. 내장 RAG(지식 베이스)와 [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com) 심층 통합.
|
||||
- **유니버설 IM 플랫폼 지원** — 단일 코드베이스로 Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK 지원.
|
||||
- **프로덕션 레디** — 접근 제어, 속도 제한, 민감어 필터링, 종합 모니터링 및 예외 처리. 기업 환경에서 검증됨.
|
||||
- **플러그인 생태계** — 수백 개의 플러그인, 이벤트 기반 아키텍처, 컴포넌트 확장, [MCP 프로토콜](https://modelcontextprotocol.io/) 지원.
|
||||
|
||||
@@ -37,7 +37,7 @@ LangBot — это **платформа с открытым исходным к
|
||||
|
||||
### Ключевые возможности
|
||||
|
||||
- **ИИ-диалоги и агенты** — Многораундовые диалоги, вызов инструментов, мультимодальная поддержка, потоковый вывод. Встроенная реализация RAG (база знаний) с глубокой интеграцией в [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org).
|
||||
- **ИИ-диалоги и агенты** — Многораундовые диалоги, вызов инструментов, мультимодальная поддержка, потоковый вывод. Встроенная реализация RAG (база знаний) с глубокой интеграцией в [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com).
|
||||
- **Универсальная поддержка IM-платформ** — Единая кодовая база для Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK.
|
||||
- **Готовность к продакшену** — Контроль доступа, ограничение скорости, фильтрация чувствительных слов, комплексный мониторинг и обработка исключений. Проверено в корпоративной среде.
|
||||
- **Экосистема плагинов** — Сотни плагинов, событийно-ориентированная архитектура, расширения компонентов и поддержка [протокола MCP](https://modelcontextprotocol.io/).
|
||||
|
||||
@@ -39,7 +39,7 @@ LangBot 是一個**開源的生產級平台**,用於建構 AI 驅動的即時
|
||||
|
||||
### 核心能力
|
||||
|
||||
- **AI 對話與 Agent** — 多輪對話、工具調用、多模態、流式輸出。自帶 RAG(知識庫),深度整合 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org) 等 LLMOps 平台。
|
||||
- **AI 對話與 Agent** — 多輪對話、工具調用、多模態、流式輸出。自帶 RAG(知識庫),深度整合 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org)、 [Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com)等 LLMOps 平台。
|
||||
- **全平台支援** — 一套程式碼,覆蓋 QQ、微信、企業微信、飛書、釘釘、Discord、Telegram、Slack、LINE、KOOK 等平台。
|
||||
- **生產就緒** — 存取控制、限速、敏感詞過濾、全面監控與異常處理,已被多家企業採用。
|
||||
- **外掛生態** — 數百個外掛,事件驅動架構,組件擴展,適配 [MCP 協議](https://modelcontextprotocol.io/)。
|
||||
|
||||
@@ -37,7 +37,7 @@ LangBot là một **nền tảng mã nguồn mở, cấp sản xuất** để x
|
||||
|
||||
### Khả năng chính
|
||||
|
||||
- **Hội thoại AI & Agent** — Đối thoại nhiều lượt, gọi công cụ, hỗ trợ đa phương thức, đầu ra streaming. RAG (cơ sở kiến thức) tích hợp sẵn với tích hợp sâu vào [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org).
|
||||
- **Hội thoại AI & Agent** — Đối thoại nhiều lượt, gọi công cụ, hỗ trợ đa phương thức, đầu ra streaming. RAG (cơ sở kiến thức) tích hợp sẵn với tích hợp sâu vào [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com).
|
||||
- **Hỗ trợ đa nền tảng IM** — Một mã nguồn cho Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK.
|
||||
- **Sẵn sàng cho sản xuất** — Kiểm soát truy cập, giới hạn tốc độ, lọc từ nhạy cảm, giám sát toàn diện và xử lý ngoại lệ. Được doanh nghiệp tin dùng.
|
||||
- **Hệ sinh thái Plugin** — Hàng trăm plugin, kiến trúc hướng sự kiện, mở rộng thành phần, và hỗ trợ [giao thức MCP](https://modelcontextprotocol.io/).
|
||||
|
||||
@@ -79,7 +79,6 @@ dependencies = [
|
||||
"pymilvus>=2.6.4",
|
||||
"pgvector>=0.4.1",
|
||||
"botocore>=1.42.39",
|
||||
"litellm>=1.0.0",
|
||||
]
|
||||
keywords = [
|
||||
"bot",
|
||||
|
||||
5
src/langbot/libs/deerflow_api/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .client import AsyncDeerFlowClient
|
||||
from .errors import DeerFlowAPIError
|
||||
from . import stream_utils
|
||||
|
||||
__all__ = ['AsyncDeerFlowClient', 'DeerFlowAPIError', 'stream_utils']
|
||||
204
src/langbot/libs/deerflow_api/client.py
Normal file
@@ -0,0 +1,204 @@
|
||||
"""DeerFlow LangGraph HTTP API 客户端
|
||||
|
||||
参考 astrbot 的 deerflow_api_client 实现,使用 httpx 适配 LangBot 风格。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import codecs
|
||||
import json
|
||||
import typing
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import httpx
|
||||
|
||||
from .errors import DeerFlowAPIError
|
||||
|
||||
|
||||
SSE_MAX_BUFFER_CHARS = 1_048_576
|
||||
|
||||
|
||||
def _normalize_sse_newlines(text: str) -> str:
|
||||
"""规范化 CRLF/CR 为 LF,确保 SSE 块分割稳定"""
|
||||
return text.replace('\r\n', '\n').replace('\r', '\n')
|
||||
|
||||
|
||||
def _parse_sse_data_lines(data_lines: list[str]) -> typing.Any:
|
||||
raw_data = '\n'.join(data_lines)
|
||||
try:
|
||||
return json.loads(raw_data)
|
||||
except json.JSONDecodeError:
|
||||
# 某些 LangGraph 兼容服务端会在单个 SSE 事件中用多个 data 行
|
||||
# 发送多段 JSON 片段(例如 tuple payload)
|
||||
parsed_lines: list[typing.Any] = []
|
||||
can_parse_all = True
|
||||
for line in data_lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
parsed_lines.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
can_parse_all = False
|
||||
break
|
||||
if can_parse_all and parsed_lines:
|
||||
return parsed_lines[0] if len(parsed_lines) == 1 else parsed_lines
|
||||
return raw_data
|
||||
|
||||
|
||||
def _parse_sse_block(block: str) -> dict[str, typing.Any] | None:
|
||||
if not block.strip():
|
||||
return None
|
||||
|
||||
event_name = 'message'
|
||||
data_lines: list[str] = []
|
||||
for line in block.splitlines():
|
||||
if line.startswith('event:'):
|
||||
event_name = line[6:].strip()
|
||||
elif line.startswith('data:'):
|
||||
data_lines.append(line[5:].lstrip())
|
||||
|
||||
if not data_lines:
|
||||
return None
|
||||
return {'event': event_name, 'data': _parse_sse_data_lines(data_lines)}
|
||||
|
||||
|
||||
class AsyncDeerFlowClient:
|
||||
"""DeerFlow LangGraph HTTP API 客户端"""
|
||||
|
||||
api_base: str
|
||||
headers: dict[str, str]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_base: str = 'http://127.0.0.1:2026',
|
||||
api_key: str = '',
|
||||
auth_header: str = '',
|
||||
) -> None:
|
||||
self.api_base = api_base.rstrip('/')
|
||||
self.headers: dict[str, str] = {}
|
||||
if auth_header:
|
||||
self.headers['Authorization'] = auth_header
|
||||
elif api_key:
|
||||
self.headers['Authorization'] = f'Bearer {api_key}'
|
||||
|
||||
async def create_thread(self, timeout: float = 20) -> dict[str, typing.Any]:
|
||||
"""创建一个新的 LangGraph thread
|
||||
|
||||
Returns:
|
||||
包含 thread_id 等信息的字典
|
||||
"""
|
||||
url = f'{self.api_base}/api/langgraph/threads'
|
||||
payload = {'metadata': {}}
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
response = await client.post(
|
||||
url,
|
||||
headers=self.headers,
|
||||
json=payload,
|
||||
)
|
||||
if response.status_code not in (200, 201):
|
||||
raise DeerFlowAPIError(
|
||||
operation='create thread',
|
||||
status=response.status_code,
|
||||
body=response.text,
|
||||
url=url,
|
||||
)
|
||||
return response.json()
|
||||
|
||||
async def delete_thread(self, thread_id: str, timeout: float = 20) -> None:
|
||||
"""删除指定 thread"""
|
||||
url = f'{self.api_base}/api/threads/{thread_id}'
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
response = await client.delete(url, headers=self.headers)
|
||||
if response.status_code not in (200, 202, 204, 404):
|
||||
raise DeerFlowAPIError(
|
||||
operation='delete thread',
|
||||
status=response.status_code,
|
||||
body=response.text,
|
||||
url=url,
|
||||
thread_id=thread_id,
|
||||
)
|
||||
|
||||
async def stream_run(
|
||||
self,
|
||||
thread_id: str,
|
||||
payload: dict[str, typing.Any],
|
||||
timeout: float = 120,
|
||||
) -> AsyncGenerator[dict[str, typing.Any], None]:
|
||||
"""运行一次 LangGraph stream 请求,逐事件 yield
|
||||
|
||||
Yields:
|
||||
事件字典 {'event': event_name, 'data': parsed_data}
|
||||
"""
|
||||
url = f'{self.api_base}/api/langgraph/threads/{thread_id}/runs/stream'
|
||||
|
||||
# 流式请求使用单独的 read timeout 控制
|
||||
stream_timeout = httpx.Timeout(
|
||||
connect=min(timeout, 30),
|
||||
read=timeout,
|
||||
write=timeout,
|
||||
pool=timeout,
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
trust_env=True,
|
||||
timeout=stream_timeout,
|
||||
) as client:
|
||||
async with client.stream(
|
||||
'POST',
|
||||
url,
|
||||
headers={
|
||||
**self.headers,
|
||||
'Accept': 'text/event-stream',
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
json=payload,
|
||||
) as resp:
|
||||
if resp.status_code != 200:
|
||||
body = await resp.aread()
|
||||
raise DeerFlowAPIError(
|
||||
operation='runs/stream request',
|
||||
status=resp.status_code,
|
||||
body=body.decode('utf-8', errors='replace'),
|
||||
url=url,
|
||||
thread_id=thread_id,
|
||||
)
|
||||
|
||||
decoder = codecs.getincrementaldecoder('utf-8')('replace')
|
||||
buffer = ''
|
||||
|
||||
async for chunk in resp.aiter_bytes(8192):
|
||||
buffer += _normalize_sse_newlines(decoder.decode(chunk))
|
||||
|
||||
while '\n\n' in buffer:
|
||||
block, buffer = buffer.split('\n\n', 1)
|
||||
parsed = _parse_sse_block(block)
|
||||
if parsed is not None:
|
||||
yield parsed
|
||||
|
||||
if len(buffer) > SSE_MAX_BUFFER_CHARS:
|
||||
# 缓冲区过大,强制 flush
|
||||
parsed = _parse_sse_block(buffer)
|
||||
if parsed is not None:
|
||||
yield parsed
|
||||
buffer = ''
|
||||
|
||||
# flush 剩余内容
|
||||
buffer += _normalize_sse_newlines(decoder.decode(b'', final=True))
|
||||
while '\n\n' in buffer:
|
||||
block, buffer = buffer.split('\n\n', 1)
|
||||
parsed = _parse_sse_block(block)
|
||||
if parsed is not None:
|
||||
yield parsed
|
||||
if buffer.strip():
|
||||
parsed = _parse_sse_block(buffer)
|
||||
if parsed is not None:
|
||||
yield parsed
|
||||
30
src/langbot/libs/deerflow_api/errors.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
class DeerFlowAPIError(Exception):
|
||||
"""DeerFlow API 请求失败"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
operation: str = '',
|
||||
status: int = 0,
|
||||
body: str = '',
|
||||
url: str = '',
|
||||
thread_id: str | None = None,
|
||||
message: str = '',
|
||||
) -> None:
|
||||
self.operation = operation
|
||||
self.status = status
|
||||
self.body = body
|
||||
self.url = url
|
||||
self.thread_id = thread_id
|
||||
|
||||
if message:
|
||||
super().__init__(message)
|
||||
return
|
||||
|
||||
msg = f'DeerFlow {operation} failed: status={status}, url={url}, body={body}'
|
||||
if thread_id is not None:
|
||||
msg = f'DeerFlow {operation} failed: thread_id={thread_id}, status={status}, url={url}, body={body}'
|
||||
super().__init__(msg)
|
||||
212
src/langbot/libs/deerflow_api/stream_utils.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""DeerFlow LangGraph 流式响应解析工具
|
||||
|
||||
参考 astrbot 实现的 deerflow_stream_utils。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
from collections.abc import Iterable
|
||||
|
||||
|
||||
def extract_text(content: typing.Any) -> str:
|
||||
"""从消息 content 中提取纯文本"""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, dict):
|
||||
if isinstance(content.get('text'), str):
|
||||
return content['text']
|
||||
if 'content' in content:
|
||||
return extract_text(content.get('content'))
|
||||
if 'kwargs' in content and isinstance(content['kwargs'], dict):
|
||||
return extract_text(content['kwargs'].get('content'))
|
||||
if isinstance(content, list):
|
||||
parts: list[str] = []
|
||||
for item in content:
|
||||
if isinstance(item, str):
|
||||
parts.append(item)
|
||||
elif isinstance(item, dict):
|
||||
item_type = item.get('type')
|
||||
if item_type == 'text' and isinstance(item.get('text'), str):
|
||||
parts.append(item['text'])
|
||||
elif 'content' in item:
|
||||
parts.append(extract_text(item['content']))
|
||||
return '\n'.join([p for p in parts if p]).strip()
|
||||
return str(content) if content is not None else ''
|
||||
|
||||
|
||||
def extract_messages_from_values_data(data: typing.Any) -> list[typing.Any]:
|
||||
"""从 values 事件中提取 messages 列表"""
|
||||
candidates: list[typing.Any] = []
|
||||
if isinstance(data, dict):
|
||||
candidates.append(data)
|
||||
if isinstance(data.get('values'), dict):
|
||||
candidates.append(data['values'])
|
||||
elif isinstance(data, list):
|
||||
candidates.extend([x for x in data if isinstance(x, dict)])
|
||||
|
||||
for item in candidates:
|
||||
messages = item.get('messages')
|
||||
if isinstance(messages, list):
|
||||
return messages
|
||||
return []
|
||||
|
||||
|
||||
def is_ai_message(message: dict[str, typing.Any]) -> bool:
|
||||
"""判断是否为 AI/assistant 消息"""
|
||||
role = str(message.get('role', '')).lower()
|
||||
if role in {'assistant', 'ai'}:
|
||||
return True
|
||||
|
||||
msg_type = str(message.get('type', '')).lower()
|
||||
if msg_type in {'ai', 'assistant', 'aimessage', 'aimessagechunk'}:
|
||||
return True
|
||||
if 'ai' in msg_type and all(token not in msg_type for token in ('human', 'tool', 'system')):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def extract_latest_ai_text(messages: Iterable[typing.Any]) -> str:
|
||||
"""获取最近一条 AI 消息的文本内容"""
|
||||
if isinstance(messages, (list, tuple)):
|
||||
iterable = reversed(messages)
|
||||
else:
|
||||
iterable = reversed(list(messages))
|
||||
|
||||
for msg in iterable:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if is_ai_message(msg):
|
||||
text = extract_text(msg.get('content'))
|
||||
if text:
|
||||
return text
|
||||
return ''
|
||||
|
||||
|
||||
def extract_latest_ai_message(messages: Iterable[typing.Any]) -> dict[str, typing.Any] | None:
|
||||
"""获取最近一条 AI 消息对象"""
|
||||
if isinstance(messages, (list, tuple)):
|
||||
iterable = reversed(messages)
|
||||
else:
|
||||
iterable = reversed(list(messages))
|
||||
|
||||
for msg in iterable:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if is_ai_message(msg):
|
||||
return msg
|
||||
return None
|
||||
|
||||
|
||||
def is_clarification_tool_message(message: dict[str, typing.Any]) -> bool:
|
||||
"""判断是否为澄清问题工具消息"""
|
||||
msg_type = str(message.get('type', '')).lower()
|
||||
tool_name = str(message.get('name', '')).lower()
|
||||
return msg_type == 'tool' and tool_name == 'ask_clarification'
|
||||
|
||||
|
||||
def extract_latest_clarification_text(messages: Iterable[typing.Any]) -> str:
|
||||
"""提取最近的澄清问题文本"""
|
||||
if isinstance(messages, (list, tuple)):
|
||||
iterable = reversed(messages)
|
||||
else:
|
||||
iterable = reversed(list(messages))
|
||||
|
||||
for msg in iterable:
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if is_clarification_tool_message(msg):
|
||||
text = extract_text(msg.get('content'))
|
||||
if text:
|
||||
return text
|
||||
return ''
|
||||
|
||||
|
||||
def get_message_id(message: typing.Any) -> str:
|
||||
"""提取消息 ID"""
|
||||
if not isinstance(message, dict):
|
||||
return ''
|
||||
msg_id = message.get('id')
|
||||
return msg_id if isinstance(msg_id, str) else ''
|
||||
|
||||
|
||||
def extract_event_message_obj(data: typing.Any) -> dict[str, typing.Any] | None:
|
||||
"""从事件 data 中提取消息对象"""
|
||||
msg_obj = data
|
||||
if isinstance(data, (list, tuple)) and data:
|
||||
msg_obj = data[0]
|
||||
if isinstance(msg_obj, dict) and isinstance(msg_obj.get('data'), dict):
|
||||
msg_obj = msg_obj['data']
|
||||
return msg_obj if isinstance(msg_obj, dict) else None
|
||||
|
||||
|
||||
def extract_ai_delta_from_event_data(data: typing.Any) -> str:
|
||||
"""从 messages-tuple 事件中提取 AI delta 文本"""
|
||||
msg_obj = extract_event_message_obj(data)
|
||||
if not msg_obj:
|
||||
return ''
|
||||
if is_ai_message(msg_obj):
|
||||
return extract_text(msg_obj.get('content'))
|
||||
return ''
|
||||
|
||||
|
||||
def extract_clarification_from_event_data(data: typing.Any) -> str:
|
||||
"""从事件中提取澄清问题"""
|
||||
msg_obj = extract_event_message_obj(data)
|
||||
if not msg_obj:
|
||||
return ''
|
||||
if is_clarification_tool_message(msg_obj):
|
||||
return extract_text(msg_obj.get('content'))
|
||||
return ''
|
||||
|
||||
|
||||
def _iter_custom_event_items(data: typing.Any) -> list[dict[str, typing.Any]]:
|
||||
items: list[dict[str, typing.Any]] = []
|
||||
if isinstance(data, dict):
|
||||
return [data]
|
||||
if isinstance(data, list):
|
||||
for item in data:
|
||||
if isinstance(item, dict):
|
||||
items.append(item)
|
||||
elif isinstance(item, (list, tuple)):
|
||||
for nested in item:
|
||||
if isinstance(nested, dict):
|
||||
items.append(nested)
|
||||
return items
|
||||
|
||||
|
||||
def extract_task_failures_from_custom_event(data: typing.Any) -> list[str]:
|
||||
"""从 custom 事件中提取子任务失败信息"""
|
||||
failures: list[str] = []
|
||||
for item in _iter_custom_event_items(data):
|
||||
event_type = str(item.get('type', '')).lower()
|
||||
if event_type not in {'task_failed', 'task_timed_out'}:
|
||||
continue
|
||||
|
||||
task_id = str(item.get('task_id', '')).strip()
|
||||
error_text = extract_text(item.get('error')).strip()
|
||||
if task_id and error_text:
|
||||
failures.append(f'{task_id}: {error_text}')
|
||||
elif error_text:
|
||||
failures.append(error_text)
|
||||
elif task_id:
|
||||
failures.append(f'{task_id}: unknown error')
|
||||
else:
|
||||
failures.append('unknown task failure')
|
||||
return failures
|
||||
|
||||
|
||||
def build_task_failure_summary(failures: list[str]) -> str:
|
||||
"""构建任务失败摘要"""
|
||||
if not failures:
|
||||
return ''
|
||||
deduped: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for failure in failures:
|
||||
if failure not in seen:
|
||||
seen.add(failure)
|
||||
deduped.append(failure)
|
||||
if len(deduped) == 1:
|
||||
return f'DeerFlow subtask failed: {deduped[0]}'
|
||||
joined = '\n'.join([f'- {item}' for item in deduped[:5]])
|
||||
return f'DeerFlow subtasks failed:\n{joined}'
|
||||
4
src/langbot/libs/weknora_api/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .client import AsyncWeKnoraClient
|
||||
from .errors import WeKnoraAPIError
|
||||
|
||||
__all__ = ['AsyncWeKnoraClient', 'WeKnoraAPIError']
|
||||
180
src/langbot/libs/weknora_api/client.py
Normal file
@@ -0,0 +1,180 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import httpx
|
||||
import typing
|
||||
import json
|
||||
|
||||
from .errors import WeKnoraAPIError
|
||||
|
||||
|
||||
class AsyncWeKnoraClient:
|
||||
"""WeKnora API 客户端"""
|
||||
|
||||
api_key: str
|
||||
base_url: str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_key: str,
|
||||
base_url: str = 'http://localhost:80/api/v1',
|
||||
) -> None:
|
||||
self.api_key = api_key
|
||||
self.base_url = base_url
|
||||
|
||||
async def create_session(
|
||||
self,
|
||||
title: str = '',
|
||||
description: str = '',
|
||||
timeout: float = 30.0,
|
||||
) -> str:
|
||||
"""创建会话,返回 session_id"""
|
||||
async with httpx.AsyncClient(
|
||||
base_url=self.base_url,
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
payload: dict[str, typing.Any] = {}
|
||||
if title:
|
||||
payload['title'] = title
|
||||
if description:
|
||||
payload['description'] = description
|
||||
|
||||
response = await client.post(
|
||||
'/sessions',
|
||||
headers={
|
||||
'X-API-Key': self.api_key,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
json=payload,
|
||||
)
|
||||
|
||||
if response.status_code not in (200, 201):
|
||||
raise WeKnoraAPIError(f'{response.status_code} {response.text}')
|
||||
|
||||
data = response.json()
|
||||
return data['data']['id']
|
||||
|
||||
async def agent_chat(
|
||||
self,
|
||||
session_id: str,
|
||||
query: str,
|
||||
user: str,
|
||||
agent_id: str = '',
|
||||
knowledge_base_ids: list[str] | None = None,
|
||||
web_search_enabled: bool = False,
|
||||
timeout: float = 120.0,
|
||||
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
||||
"""
|
||||
Agent 智能对话(SSE 流式)
|
||||
|
||||
响应事件类型:
|
||||
- agent_query: Agent 开始处理
|
||||
- thinking: 思考过程
|
||||
- tool_call: 工具调用
|
||||
- tool_result: 工具结果
|
||||
- references: 知识库引用
|
||||
- answer: 回答内容
|
||||
- reflection: 反思
|
||||
- session_title: 会话标题
|
||||
- error: 错误
|
||||
"""
|
||||
if knowledge_base_ids is None:
|
||||
knowledge_base_ids = []
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
base_url=self.base_url,
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
payload: dict[str, typing.Any] = {
|
||||
'query': query,
|
||||
'agent_enabled': True,
|
||||
'channel': 'im',
|
||||
}
|
||||
if agent_id:
|
||||
payload['agent_id'] = agent_id
|
||||
if knowledge_base_ids:
|
||||
payload['knowledge_base_ids'] = knowledge_base_ids
|
||||
if web_search_enabled:
|
||||
payload['web_search_enabled'] = True
|
||||
|
||||
async with client.stream(
|
||||
'POST',
|
||||
f'/agent-chat/{session_id}',
|
||||
headers={
|
||||
'X-API-Key': self.api_key,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
json=payload,
|
||||
) as r:
|
||||
async for chunk in r.aiter_lines():
|
||||
if r.status_code != 200:
|
||||
raise WeKnoraAPIError(f'{r.status_code} {chunk}')
|
||||
if chunk.strip() == '':
|
||||
continue
|
||||
if chunk.startswith('data:'):
|
||||
try:
|
||||
data = json.loads(chunk[5:].strip())
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
yield data
|
||||
# 收到 error 事件后主动结束流,避免上层未 raise 时持续等待
|
||||
if data.get('response_type') == 'error':
|
||||
return
|
||||
|
||||
async def knowledge_chat(
|
||||
self,
|
||||
session_id: str,
|
||||
query: str,
|
||||
user: str,
|
||||
agent_id: str = 'builtin-quick-answer',
|
||||
knowledge_base_ids: list[str] | None = None,
|
||||
timeout: float = 120.0,
|
||||
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
||||
"""
|
||||
知识库 RAG 问答(SSE 流式)
|
||||
|
||||
响应事件类型:
|
||||
- references: 知识库引用
|
||||
- answer: 回答内容
|
||||
"""
|
||||
if knowledge_base_ids is None:
|
||||
knowledge_base_ids = []
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
base_url=self.base_url,
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
payload: dict[str, typing.Any] = {
|
||||
'query': query,
|
||||
'channel': 'im',
|
||||
}
|
||||
if agent_id:
|
||||
payload['agent_id'] = agent_id
|
||||
if knowledge_base_ids:
|
||||
payload['knowledge_base_ids'] = knowledge_base_ids
|
||||
|
||||
async with client.stream(
|
||||
'POST',
|
||||
f'/knowledge-chat/{session_id}',
|
||||
headers={
|
||||
'X-API-Key': self.api_key,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
json=payload,
|
||||
) as r:
|
||||
async for chunk in r.aiter_lines():
|
||||
if r.status_code != 200:
|
||||
raise WeKnoraAPIError(f'{r.status_code} {chunk}')
|
||||
if chunk.strip() == '':
|
||||
continue
|
||||
if chunk.startswith('data:'):
|
||||
try:
|
||||
data = json.loads(chunk[5:].strip())
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
yield data
|
||||
# 收到 error 事件后主动结束流,避免上层未 raise 时持续等待
|
||||
if data.get('response_type') == 'error':
|
||||
return
|
||||
6
src/langbot/libs/weknora_api/errors.py
Normal file
@@ -0,0 +1,6 @@
|
||||
class WeKnoraAPIError(Exception):
|
||||
"""WeKnora API 请求失败"""
|
||||
|
||||
def __init__(self, message: str = ''):
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
||||
@@ -46,30 +46,6 @@ class MonitoringRouterGroup(group.RouterGroup):
|
||||
|
||||
return self.success(data=metrics)
|
||||
|
||||
@self.route('/token-statistics', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_token_statistics() -> str:
|
||||
"""Get detailed token usage statistics (summary, per-model, timeseries)."""
|
||||
bot_ids = quart.request.args.getlist('botId')
|
||||
pipeline_ids = quart.request.args.getlist('pipelineId')
|
||||
start_time_str = quart.request.args.get('startTime')
|
||||
end_time_str = quart.request.args.get('endTime')
|
||||
bucket = quart.request.args.get('bucket', 'hour')
|
||||
if bucket not in ('hour', 'day'):
|
||||
bucket = 'hour'
|
||||
|
||||
start_time = parse_iso_datetime(start_time_str)
|
||||
end_time = parse_iso_datetime(end_time_str)
|
||||
|
||||
stats = await self.ap.monitoring_service.get_token_statistics(
|
||||
bot_ids=bot_ids if bot_ids else None,
|
||||
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
bucket=bucket,
|
||||
)
|
||||
|
||||
return self.success(data=stats)
|
||||
|
||||
@self.route('/messages', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_messages() -> str:
|
||||
"""Get message logs"""
|
||||
|
||||
@@ -472,185 +472,6 @@ class MonitoringService:
|
||||
'active_sessions': active_sessions,
|
||||
}
|
||||
|
||||
async def get_token_statistics(
|
||||
self,
|
||||
bot_ids: list[str] | None = None,
|
||||
pipeline_ids: list[str] | None = None,
|
||||
start_time: datetime.datetime | None = None,
|
||||
end_time: datetime.datetime | None = None,
|
||||
bucket: str = 'hour',
|
||||
) -> dict:
|
||||
"""Get detailed token usage statistics for production observability.
|
||||
|
||||
Returns:
|
||||
- summary: aggregate token counters and call/latency stats over the window
|
||||
- by_model: per-model token + call breakdown (sorted by total tokens desc)
|
||||
- timeseries: token usage bucketed by `bucket` ('hour' or 'day')
|
||||
|
||||
Only successful LLM calls are counted toward token totals; error calls are
|
||||
reported separately so a spike in failures is visible without polluting
|
||||
token accounting.
|
||||
"""
|
||||
LLMCall = persistence_monitoring.MonitoringLLMCall
|
||||
|
||||
conditions = []
|
||||
if bot_ids:
|
||||
conditions.append(LLMCall.bot_id.in_(bot_ids))
|
||||
if pipeline_ids:
|
||||
conditions.append(LLMCall.pipeline_id.in_(pipeline_ids))
|
||||
if start_time:
|
||||
conditions.append(LLMCall.timestamp >= start_time)
|
||||
if end_time:
|
||||
conditions.append(LLMCall.timestamp <= end_time)
|
||||
|
||||
def _apply(query):
|
||||
if conditions:
|
||||
query = query.where(sqlalchemy.and_(*conditions))
|
||||
return query
|
||||
|
||||
# ---- Summary aggregates ----
|
||||
summary_query = _apply(
|
||||
sqlalchemy.select(
|
||||
sqlalchemy.func.count(LLMCall.id),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
|
||||
sqlalchemy.func.sum(
|
||||
sqlalchemy.case((LLMCall.status == 'success', 1), else_=0)
|
||||
),
|
||||
sqlalchemy.func.sum(
|
||||
sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)
|
||||
),
|
||||
# Count of successful calls that nonetheless recorded zero tokens —
|
||||
# a data-quality signal that usage reporting may be broken upstream.
|
||||
sqlalchemy.func.sum(
|
||||
sqlalchemy.case(
|
||||
(sqlalchemy.and_(LLMCall.status == 'success', LLMCall.total_tokens == 0), 1),
|
||||
else_=0,
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
summary_result = await self.ap.persistence_mgr.execute_async(summary_query)
|
||||
row = summary_result.first()
|
||||
(
|
||||
total_calls,
|
||||
total_input_tokens,
|
||||
total_output_tokens,
|
||||
total_tokens,
|
||||
total_duration,
|
||||
total_cost,
|
||||
success_calls,
|
||||
error_calls,
|
||||
zero_token_success_calls,
|
||||
) = row if row else (0, 0, 0, 0, 0, 0.0, 0, 0, 0)
|
||||
|
||||
total_calls = total_calls or 0
|
||||
success_calls = success_calls or 0
|
||||
error_calls = error_calls or 0
|
||||
zero_token_success_calls = zero_token_success_calls or 0
|
||||
|
||||
summary = {
|
||||
'total_calls': total_calls,
|
||||
'success_calls': success_calls,
|
||||
'error_calls': error_calls,
|
||||
'total_input_tokens': int(total_input_tokens or 0),
|
||||
'total_output_tokens': int(total_output_tokens or 0),
|
||||
'total_tokens': int(total_tokens or 0),
|
||||
'total_cost': round(float(total_cost or 0.0), 6),
|
||||
'avg_tokens_per_call': int((total_tokens or 0) / total_calls) if total_calls > 0 else 0,
|
||||
'avg_duration_ms': int((total_duration or 0) / total_calls) if total_calls > 0 else 0,
|
||||
'avg_tokens_per_second': round((total_output_tokens or 0) / (total_duration / 1000), 2)
|
||||
if total_duration and total_duration > 0
|
||||
else 0,
|
||||
'zero_token_success_calls': zero_token_success_calls,
|
||||
}
|
||||
|
||||
# ---- Per-model breakdown ----
|
||||
by_model_query = _apply(
|
||||
sqlalchemy.select(
|
||||
LLMCall.model_name,
|
||||
sqlalchemy.func.count(LLMCall.id),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.input_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.output_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.total_tokens), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.duration), 0),
|
||||
sqlalchemy.func.coalesce(sqlalchemy.func.sum(LLMCall.cost), 0.0),
|
||||
sqlalchemy.func.sum(
|
||||
sqlalchemy.case((LLMCall.status == 'error', 1), else_=0)
|
||||
),
|
||||
).group_by(LLMCall.model_name)
|
||||
)
|
||||
by_model_result = await self.ap.persistence_mgr.execute_async(by_model_query)
|
||||
by_model = []
|
||||
for mrow in by_model_result.all():
|
||||
(
|
||||
model_name,
|
||||
m_calls,
|
||||
m_in,
|
||||
m_out,
|
||||
m_total,
|
||||
m_duration,
|
||||
m_cost,
|
||||
m_errors,
|
||||
) = mrow
|
||||
m_calls = m_calls or 0
|
||||
by_model.append(
|
||||
{
|
||||
'model_name': model_name,
|
||||
'calls': m_calls,
|
||||
'error_calls': m_errors or 0,
|
||||
'input_tokens': int(m_in or 0),
|
||||
'output_tokens': int(m_out or 0),
|
||||
'total_tokens': int(m_total or 0),
|
||||
'cost': round(float(m_cost or 0.0), 6),
|
||||
'avg_tokens_per_call': int((m_total or 0) / m_calls) if m_calls > 0 else 0,
|
||||
'avg_duration_ms': int((m_duration or 0) / m_calls) if m_calls > 0 else 0,
|
||||
}
|
||||
)
|
||||
by_model.sort(key=lambda x: x['total_tokens'], reverse=True)
|
||||
|
||||
# ---- Time-bucketed series ----
|
||||
# Use a DB-agnostic bucketing approach: fetch (timestamp, tokens) rows and
|
||||
# aggregate in Python. The window is bounded by the time filter, so this is
|
||||
# cheap for typical dashboard ranges (hours/days).
|
||||
series_query = _apply(
|
||||
sqlalchemy.select(
|
||||
LLMCall.timestamp,
|
||||
LLMCall.input_tokens,
|
||||
LLMCall.output_tokens,
|
||||
LLMCall.total_tokens,
|
||||
).order_by(LLMCall.timestamp.asc())
|
||||
)
|
||||
series_result = await self.ap.persistence_mgr.execute_async(series_query)
|
||||
|
||||
bucket_fmt = '%Y-%m-%d %H:00' if bucket == 'hour' else '%Y-%m-%d'
|
||||
buckets: dict[str, dict] = {}
|
||||
for srow in series_result.all():
|
||||
ts, s_in, s_out, s_total = srow
|
||||
if ts is None:
|
||||
continue
|
||||
key = ts.strftime(bucket_fmt)
|
||||
b = buckets.setdefault(
|
||||
key,
|
||||
{'bucket': key, 'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'calls': 0},
|
||||
)
|
||||
b['input_tokens'] += int(s_in or 0)
|
||||
b['output_tokens'] += int(s_out or 0)
|
||||
b['total_tokens'] += int(s_total or 0)
|
||||
b['calls'] += 1
|
||||
|
||||
timeseries = [buckets[k] for k in sorted(buckets.keys())]
|
||||
|
||||
return {
|
||||
'summary': summary,
|
||||
'by_model': by_model,
|
||||
'timeseries': timeseries,
|
||||
'bucket': bucket,
|
||||
}
|
||||
|
||||
async def get_messages(
|
||||
self,
|
||||
bot_ids: list[str] | None = None,
|
||||
|
||||
@@ -42,7 +42,6 @@ required_deps = {
|
||||
'telegramify_markdown': 'telegramify-markdown',
|
||||
'slack_sdk': 'slack_sdk',
|
||||
'asyncpg': 'asyncpg',
|
||||
'litellm': 'litellm',
|
||||
}
|
||||
|
||||
|
||||
|
||||
27
src/langbot/pkg/core/migrations/m042_weknora_api.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from .. import migration
|
||||
|
||||
|
||||
@migration.migration_class('weknora-api-config', 42)
|
||||
class WeKnoraAPICfgMigration(migration.Migration):
|
||||
"""WeKnora API 配置迁移"""
|
||||
|
||||
async def need_migrate(self) -> bool:
|
||||
"""判断当前环境是否需要运行此迁移"""
|
||||
return 'weknora-api' not in self.ap.provider_cfg.data
|
||||
|
||||
async def run(self):
|
||||
"""执行迁移"""
|
||||
self.ap.provider_cfg.data['weknora-api'] = {
|
||||
'base-url': 'http://localhost:8080/api/v1',
|
||||
'app-type': 'agent',
|
||||
'api-key': '',
|
||||
'agent-id': 'builtin-smart-reasoning',
|
||||
'knowledge-base-ids': [],
|
||||
'web-search-enabled': False,
|
||||
'timeout': 120,
|
||||
'base-prompt': '请回答用户的问题。',
|
||||
}
|
||||
|
||||
await self.ap.provider_cfg.dump_config()
|
||||
30
src/langbot/pkg/core/migrations/m043_deerflow_api.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from .. import migration
|
||||
|
||||
|
||||
@migration.migration_class('deerflow-api-config', 43)
|
||||
class DeerFlowAPICfgMigration(migration.Migration):
|
||||
"""DeerFlow API 配置迁移"""
|
||||
|
||||
async def need_migrate(self) -> bool:
|
||||
"""判断当前环境是否需要运行此迁移"""
|
||||
return 'deerflow-api' not in self.ap.provider_cfg.data
|
||||
|
||||
async def run(self):
|
||||
"""执行迁移"""
|
||||
self.ap.provider_cfg.data['deerflow-api'] = {
|
||||
'api-base': 'http://127.0.0.1:2026',
|
||||
'api-key': '',
|
||||
'auth-header': '',
|
||||
'assistant-id': 'lead_agent',
|
||||
'model-name': '',
|
||||
'thinking-enabled': False,
|
||||
'plan-mode': False,
|
||||
'subagent-enabled': False,
|
||||
'max-concurrent-subagents': 3,
|
||||
'timeout': 300,
|
||||
'recursion-limit': 1000,
|
||||
}
|
||||
|
||||
await self.ap.provider_cfg.dump_config()
|
||||
@@ -109,7 +109,7 @@ class PreProcessor(stage.PipelineStage):
|
||||
if llm_model:
|
||||
query.use_llm_model_uuid = llm_model.model_entity.uuid
|
||||
|
||||
if 'func_call' in (llm_model.model_entity.abilities or []):
|
||||
if llm_model.model_entity.abilities.__contains__('func_call'):
|
||||
# Get bound plugins and MCP servers for filtering tools
|
||||
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
||||
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
|
||||
@@ -162,7 +162,7 @@ class PreProcessor(stage.PipelineStage):
|
||||
if (
|
||||
selected_runner == 'local-agent'
|
||||
and llm_model
|
||||
and 'vision' not in (llm_model.model_entity.abilities or [])
|
||||
and not llm_model.model_entity.abilities.__contains__('vision')
|
||||
):
|
||||
for msg in query.messages:
|
||||
if isinstance(msg.content, list):
|
||||
@@ -181,7 +181,7 @@ class PreProcessor(stage.PipelineStage):
|
||||
plain_text += me.text
|
||||
elif isinstance(me, platform_message.Image):
|
||||
if selected_runner != 'local-agent' or (
|
||||
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
|
||||
llm_model and llm_model.model_entity.abilities.__contains__('vision')
|
||||
):
|
||||
if me.base64 is not None:
|
||||
content_list.append(provider_message.ContentElement.from_image_base64(me.base64))
|
||||
@@ -202,7 +202,7 @@ class PreProcessor(stage.PipelineStage):
|
||||
content_list.append(provider_message.ContentElement.from_text(msg.text))
|
||||
elif isinstance(msg, platform_message.Image):
|
||||
if selected_runner != 'local-agent' or (
|
||||
llm_model and 'vision' in (llm_model.model_entity.abilities or [])
|
||||
llm_model and llm_model.model_entity.abilities.__contains__('vision')
|
||||
):
|
||||
if msg.base64 is not None:
|
||||
content_list.append(provider_message.ContentElement.from_image_base64(msg.base64))
|
||||
|
||||
@@ -37,41 +37,11 @@ class ModelManager:
|
||||
self.requester_components = []
|
||||
self.requester_dict = {}
|
||||
|
||||
@staticmethod
|
||||
def _get_litellm_provider_from_manifest(component: engine.Component | None) -> str | None:
|
||||
if component is None:
|
||||
return None
|
||||
|
||||
spec = getattr(component, 'spec', None) or {}
|
||||
litellm_provider = None
|
||||
|
||||
if isinstance(spec, dict):
|
||||
litellm_provider = spec.get('litellm_provider')
|
||||
else:
|
||||
getter = getattr(spec, 'get', None)
|
||||
if callable(getter):
|
||||
try:
|
||||
litellm_provider = getter('litellm_provider')
|
||||
except Exception:
|
||||
litellm_provider = None
|
||||
|
||||
if isinstance(litellm_provider, str) and litellm_provider:
|
||||
return litellm_provider
|
||||
return None
|
||||
|
||||
async def initialize(self):
|
||||
self.requester_components = self.ap.discover.get_components_by_kind('LLMAPIRequester')
|
||||
|
||||
requester_dict: dict[str, type[requester.ProviderAPIRequester]] = {}
|
||||
for component in self.requester_components:
|
||||
# Skip components that use litellm_provider (they will use litellmchat.py instead)
|
||||
litellm_provider = self._get_litellm_provider_from_manifest(component)
|
||||
if litellm_provider:
|
||||
self.ap.logger.debug(
|
||||
f'Skipping Python class loading for {component.metadata.name} '
|
||||
f'(uses litellm_provider={litellm_provider})'
|
||||
)
|
||||
continue
|
||||
requester_dict[component.metadata.name] = component.get_python_component_class()
|
||||
|
||||
self.requester_dict = requester_dict
|
||||
@@ -324,37 +294,13 @@ class ModelManager:
|
||||
else:
|
||||
provider_entity = provider_info
|
||||
|
||||
# Get requester manifest to check for litellm_provider
|
||||
requester_manifest = self.get_available_requester_manifest_by_name(provider_entity.requester)
|
||||
litellm_provider = self._get_litellm_provider_from_manifest(requester_manifest)
|
||||
|
||||
# Build config from base_url
|
||||
config = {'base_url': provider_entity.base_url}
|
||||
|
||||
# Check if requester manifest specifies litellm_provider
|
||||
if litellm_provider:
|
||||
from .requesters import litellmchat
|
||||
|
||||
# Use unified LiteLLMRequester with provider prefix
|
||||
# Map litellm_provider (YAML spec) to custom_llm_provider (config)
|
||||
config['custom_llm_provider'] = litellm_provider
|
||||
requester_inst = litellmchat.LiteLLMRequester(
|
||||
ap=self.ap,
|
||||
config=config,
|
||||
)
|
||||
self.ap.logger.debug(
|
||||
f'Using LiteLLMRequester for {provider_entity.requester} '
|
||||
f'with custom_llm_provider={config["custom_llm_provider"]}'
|
||||
)
|
||||
else:
|
||||
# Use original requester class (for backward compatibility)
|
||||
if provider_entity.requester not in self.requester_dict:
|
||||
raise provider_errors.RequesterNotFoundError(provider_entity.requester)
|
||||
requester_inst = self.requester_dict[provider_entity.requester](
|
||||
ap=self.ap,
|
||||
config=config,
|
||||
)
|
||||
if provider_entity.requester not in self.requester_dict:
|
||||
raise provider_errors.RequesterNotFoundError(provider_entity.requester)
|
||||
|
||||
requester_inst = self.requester_dict[provider_entity.requester](
|
||||
ap=self.ap,
|
||||
config={'base_url': provider_entity.base_url},
|
||||
)
|
||||
await requester_inst.initialize()
|
||||
|
||||
token_mgr = token.TokenManager(name=provider_entity.uuid, tokens=provider_entity.api_keys or [])
|
||||
|
||||
@@ -67,8 +67,8 @@ class RuntimeProvider:
|
||||
if isinstance(result, tuple):
|
||||
msg, usage_info = result
|
||||
if usage_info:
|
||||
input_tokens = usage_info.get('prompt_tokens', 0)
|
||||
output_tokens = usage_info.get('completion_tokens', 0)
|
||||
input_tokens = usage_info.get('input_tokens', 0)
|
||||
output_tokens = usage_info.get('output_tokens', 0)
|
||||
return msg
|
||||
else:
|
||||
return result
|
||||
@@ -128,6 +128,7 @@ class RuntimeProvider:
|
||||
start_time = time.time()
|
||||
status = 'success'
|
||||
error_message = None
|
||||
# Note: Stream doesn't easily provide token counts, set to 0
|
||||
input_tokens = 0
|
||||
output_tokens = 0
|
||||
|
||||
@@ -142,15 +143,6 @@ class RuntimeProvider:
|
||||
remove_think=remove_think,
|
||||
):
|
||||
yield chunk
|
||||
# Extract usage from stream if available (stored by LiteLLM requester)
|
||||
if query:
|
||||
if query.variables is None:
|
||||
query.variables = {}
|
||||
if '_stream_usage' in query.variables:
|
||||
usage_info = query.variables['_stream_usage']
|
||||
input_tokens = usage_info.get('prompt_tokens', 0)
|
||||
output_tokens = usage_info.get('completion_tokens', 0)
|
||||
del query.variables['_stream_usage']
|
||||
except Exception as e:
|
||||
status = 'error'
|
||||
error_message = str(e)
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class AI302ChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""302.AI ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.302.ai/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 302.AI
|
||||
icon: 302ai.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
370
src/langbot/pkg/provider/modelmgr/requesters/anthropicmsgs.py
Normal file
@@ -0,0 +1,370 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import json
|
||||
import platform
|
||||
import socket
|
||||
import anthropic
|
||||
import httpx
|
||||
|
||||
from .. import errors, requester
|
||||
|
||||
from ....utils import image
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class AnthropicMessages(requester.ProviderAPIRequester):
|
||||
"""Anthropic Messages API 请求器"""
|
||||
|
||||
client: anthropic.AsyncAnthropic
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.anthropic.com',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def initialize(self):
|
||||
# 兼容 Windows 缺失 TCP_KEEPINTVL 和 TCP_KEEPCNT 的问题
|
||||
if platform.system() == 'Windows':
|
||||
if not hasattr(socket, 'TCP_KEEPINTVL'):
|
||||
socket.TCP_KEEPINTVL = 0
|
||||
if not hasattr(socket, 'TCP_KEEPCNT'):
|
||||
socket.TCP_KEEPCNT = 0
|
||||
httpx_client = anthropic._base_client.AsyncHttpxClientWrapper(
|
||||
base_url=self.requester_cfg['base_url'],
|
||||
# cast to a valid type because mypy doesn't understand our type narrowing
|
||||
timeout=typing.cast(httpx.Timeout, self.requester_cfg['timeout']),
|
||||
limits=anthropic._constants.DEFAULT_CONNECTION_LIMITS,
|
||||
follow_redirects=True,
|
||||
trust_env=True,
|
||||
)
|
||||
|
||||
self.client = anthropic.AsyncAnthropic(
|
||||
api_key='',
|
||||
http_client=httpx_client,
|
||||
base_url=self.requester_cfg['base_url'],
|
||||
)
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message:
|
||||
self.client.api_key = model.provider.token_mgr.get_token()
|
||||
|
||||
args = extra_args.copy()
|
||||
args['model'] = model.model_entity.name
|
||||
|
||||
# 处理消息
|
||||
|
||||
# system
|
||||
system_role_message = None
|
||||
|
||||
for i, m in enumerate(messages):
|
||||
if m.role == 'system':
|
||||
system_role_message = m
|
||||
|
||||
break
|
||||
|
||||
if system_role_message:
|
||||
messages.pop(i)
|
||||
|
||||
if isinstance(system_role_message, provider_message.Message) and isinstance(system_role_message.content, str):
|
||||
args['system'] = system_role_message.content
|
||||
|
||||
req_messages = []
|
||||
|
||||
for m in messages:
|
||||
if m.role == 'tool':
|
||||
tool_call_id = m.tool_call_id
|
||||
|
||||
req_messages.append(
|
||||
{
|
||||
'role': 'user',
|
||||
'content': [
|
||||
{
|
||||
'type': 'tool_result',
|
||||
'tool_use_id': tool_call_id,
|
||||
'is_error': False,
|
||||
'content': [{'type': 'text', 'text': m.content}],
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
|
||||
if isinstance(m.content, str) and m.content.strip() != '':
|
||||
msg_dict['content'] = [{'type': 'text', 'text': m.content}]
|
||||
elif isinstance(m.content, list):
|
||||
for i, ce in enumerate(m.content):
|
||||
if ce.type == 'image_base64':
|
||||
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
|
||||
|
||||
alter_image_ele = {
|
||||
'type': 'image',
|
||||
'source': {
|
||||
'type': 'base64',
|
||||
'media_type': f'image/{image_format}',
|
||||
'data': image_b64,
|
||||
},
|
||||
}
|
||||
msg_dict['content'][i] = alter_image_ele
|
||||
|
||||
if m.tool_calls:
|
||||
for tool_call in m.tool_calls:
|
||||
msg_dict['content'].append(
|
||||
{
|
||||
'type': 'tool_use',
|
||||
'id': tool_call.id,
|
||||
'name': tool_call.function.name,
|
||||
'input': json.loads(tool_call.function.arguments),
|
||||
}
|
||||
)
|
||||
|
||||
del msg_dict['tool_calls']
|
||||
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
args['messages'] = req_messages
|
||||
|
||||
if 'thinking' in args:
|
||||
args['thinking'] = {'type': 'enabled', 'budget_tokens': 10000}
|
||||
|
||||
if funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
try:
|
||||
resp = await self.client.messages.create(**args)
|
||||
|
||||
args = {
|
||||
'content': '',
|
||||
'role': resp.role,
|
||||
}
|
||||
assert type(resp) is anthropic.types.message.Message
|
||||
|
||||
for block in resp.content:
|
||||
if not remove_think and block.type == 'thinking':
|
||||
args['content'] = '<think>\n' + block.thinking + '\n</think>\n' + args['content']
|
||||
elif block.type == 'text':
|
||||
args['content'] += block.text
|
||||
elif block.type == 'tool_use':
|
||||
assert type(block) is anthropic.types.tool_use_block.ToolUseBlock
|
||||
tool_call = provider_message.ToolCall(
|
||||
id=block.id,
|
||||
type='function',
|
||||
function=provider_message.FunctionCall(name=block.name, arguments=json.dumps(block.input)),
|
||||
)
|
||||
if 'tool_calls' not in args:
|
||||
args['tool_calls'] = []
|
||||
args['tool_calls'].append(tool_call)
|
||||
|
||||
return provider_message.Message(**args)
|
||||
except anthropic.AuthenticationError as e:
|
||||
raise errors.RequesterError(f'api-key 无效: {e.message}')
|
||||
except anthropic.BadRequestError as e:
|
||||
raise errors.RequesterError(str(e.message))
|
||||
except anthropic.NotFoundError as e:
|
||||
if 'model: ' in str(e):
|
||||
raise errors.RequesterError(f'模型无效: {e.message}')
|
||||
else:
|
||||
raise errors.RequesterError(f'请求地址无效: {e.message}')
|
||||
|
||||
async def invoke_llm_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message:
|
||||
self.client.api_key = model.provider.token_mgr.get_token()
|
||||
|
||||
args = extra_args.copy()
|
||||
args['model'] = model.model_entity.name
|
||||
args['stream'] = True
|
||||
|
||||
# 处理消息
|
||||
|
||||
# system
|
||||
system_role_message = None
|
||||
|
||||
for i, m in enumerate(messages):
|
||||
if m.role == 'system':
|
||||
system_role_message = m
|
||||
|
||||
break
|
||||
|
||||
if system_role_message:
|
||||
messages.pop(i)
|
||||
|
||||
if isinstance(system_role_message, provider_message.Message) and isinstance(system_role_message.content, str):
|
||||
args['system'] = system_role_message.content
|
||||
|
||||
req_messages = []
|
||||
|
||||
for m in messages:
|
||||
if m.role == 'tool':
|
||||
tool_call_id = m.tool_call_id
|
||||
|
||||
req_messages.append(
|
||||
{
|
||||
'role': 'user',
|
||||
'content': [
|
||||
{
|
||||
'type': 'tool_result',
|
||||
'tool_use_id': tool_call_id,
|
||||
'is_error': False, # 暂时直接写false
|
||||
'content': [
|
||||
{'type': 'text', 'text': m.content}
|
||||
], # 这里要是list包裹,应该是多个返回的情况?type类型好像也可以填其他的,暂时只写text
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
|
||||
if isinstance(m.content, str) and m.content.strip() != '':
|
||||
msg_dict['content'] = [{'type': 'text', 'text': m.content}]
|
||||
elif isinstance(m.content, list):
|
||||
for i, ce in enumerate(m.content):
|
||||
if ce.type == 'image_base64':
|
||||
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
|
||||
|
||||
alter_image_ele = {
|
||||
'type': 'image',
|
||||
'source': {
|
||||
'type': 'base64',
|
||||
'media_type': f'image/{image_format}',
|
||||
'data': image_b64,
|
||||
},
|
||||
}
|
||||
msg_dict['content'][i] = alter_image_ele
|
||||
if isinstance(msg_dict['content'], str) and msg_dict['content'] == '':
|
||||
msg_dict['content'] = [] # 这里不知道为什么会莫名有个空导致content为字符
|
||||
if m.tool_calls:
|
||||
for tool_call in m.tool_calls:
|
||||
msg_dict['content'].append(
|
||||
{
|
||||
'type': 'tool_use',
|
||||
'id': tool_call.id,
|
||||
'name': tool_call.function.name,
|
||||
'input': json.loads(tool_call.function.arguments),
|
||||
}
|
||||
)
|
||||
|
||||
del msg_dict['tool_calls']
|
||||
|
||||
req_messages.append(msg_dict)
|
||||
if 'thinking' in args:
|
||||
args['thinking'] = {'type': 'enabled', 'budget_tokens': 10000}
|
||||
|
||||
args['messages'] = req_messages
|
||||
|
||||
if funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
try:
|
||||
role = 'assistant' # 默认角色
|
||||
# chunk_idx = 0
|
||||
think_started = False
|
||||
think_ended = False
|
||||
finish_reason = False
|
||||
tool_name = ''
|
||||
tool_id = ''
|
||||
async for chunk in await self.client.messages.create(**args):
|
||||
content = ''
|
||||
tool_call = {'id': None, 'function': {'name': None, 'arguments': None}, 'type': 'function'}
|
||||
if isinstance(
|
||||
chunk, anthropic.types.raw_content_block_start_event.RawContentBlockStartEvent
|
||||
): # 记录开始
|
||||
if chunk.content_block.type == 'tool_use':
|
||||
if chunk.content_block.name is not None:
|
||||
tool_name = chunk.content_block.name
|
||||
if chunk.content_block.id is not None:
|
||||
tool_id = chunk.content_block.id
|
||||
|
||||
tool_call['function']['name'] = tool_name
|
||||
tool_call['function']['arguments'] = ''
|
||||
tool_call['id'] = tool_id
|
||||
|
||||
if not remove_think:
|
||||
if chunk.content_block.type == 'thinking' and not remove_think:
|
||||
think_started = True
|
||||
elif chunk.content_block.type == 'text' and chunk.index != 0 and not remove_think:
|
||||
think_ended = True
|
||||
continue
|
||||
elif isinstance(chunk, anthropic.types.raw_content_block_delta_event.RawContentBlockDeltaEvent):
|
||||
if chunk.delta.type == 'thinking_delta':
|
||||
if think_started:
|
||||
think_started = False
|
||||
content = '<think>\n' + chunk.delta.thinking
|
||||
elif remove_think:
|
||||
continue
|
||||
else:
|
||||
content = chunk.delta.thinking
|
||||
elif chunk.delta.type == 'text_delta':
|
||||
if think_ended:
|
||||
think_ended = False
|
||||
content = '\n</think>\n' + chunk.delta.text
|
||||
else:
|
||||
content = chunk.delta.text
|
||||
elif chunk.delta.type == 'input_json_delta':
|
||||
tool_call['function']['arguments'] = chunk.delta.partial_json
|
||||
tool_call['function']['name'] = tool_name
|
||||
tool_call['id'] = tool_id
|
||||
elif isinstance(chunk, anthropic.types.raw_content_block_stop_event.RawContentBlockStopEvent):
|
||||
continue # 记录raw_content_block结束的
|
||||
|
||||
elif isinstance(chunk, anthropic.types.raw_message_delta_event.RawMessageDeltaEvent):
|
||||
if chunk.delta.stop_reason == 'end_turn':
|
||||
finish_reason = True
|
||||
elif isinstance(chunk, anthropic.types.raw_message_stop_event.RawMessageStopEvent):
|
||||
continue # 这个好像是完全结束
|
||||
else:
|
||||
# print(chunk)
|
||||
self.ap.logger.debug(f'anthropic chunk: {chunk}')
|
||||
continue
|
||||
|
||||
args = {
|
||||
'content': content,
|
||||
'role': role,
|
||||
'is_final': finish_reason,
|
||||
'tool_calls': None if tool_call['id'] is None else [tool_call],
|
||||
}
|
||||
# if chunk_idx == 0:
|
||||
# chunk_idx += 1
|
||||
# continue
|
||||
|
||||
# assert type(chunk) is anthropic.types.message.Chunk
|
||||
|
||||
yield provider_message.MessageChunk(**args)
|
||||
|
||||
# return llm_entities.Message(**args)
|
||||
except anthropic.AuthenticationError as e:
|
||||
raise errors.RequesterError(f'api-key 无效: {e.message}')
|
||||
except anthropic.BadRequestError as e:
|
||||
raise errors.RequesterError(str(e.message))
|
||||
except anthropic.NotFoundError as e:
|
||||
if 'model: ' in str(e):
|
||||
raise errors.RequesterError(f'模型无效: {e.message}')
|
||||
else:
|
||||
raise errors.RequesterError(f'请求地址无效: {e.message}')
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Anthropic
|
||||
icon: anthropic.svg
|
||||
spec:
|
||||
litellm_provider: anthropic
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#2932E1"/>
|
||||
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">Baidu</text>
|
||||
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">ERNIE</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 396 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: baidu-chat-completions
|
||||
label:
|
||||
en_US: Baidu ERNIE
|
||||
zh_Hans: 百度文心一言
|
||||
icon: baidu.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
242
src/langbot/pkg/provider/modelmgr/requesters/bailianchatcmpl.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import dashscope
|
||||
import openai
|
||||
|
||||
from . import modelscopechatcmpl
|
||||
from .. import requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class BailianChatCompletions(modelscopechatcmpl.ModelScopeChatCompletions):
|
||||
"""阿里云百炼大模型平台 ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def _closure_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
is_use_dashscope_call = False # 是否使用阿里原生库调用
|
||||
is_enable_multi_model = True # 是否支持多轮对话
|
||||
use_time_num = 0 # 模型已调用次数,防止存在多文件时重复调用
|
||||
use_time_ids = [] # 已调用的ID列表
|
||||
message_id = 0 # 记录消息序号
|
||||
|
||||
for msg in messages:
|
||||
# print(msg)
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
elif me['type'] == 'file_url' and '.' in me.get('file_name', ''):
|
||||
# 1. 视频文件推理
|
||||
# https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2845871
|
||||
file_type = me.get('file_name').lower().split('.')[-1]
|
||||
if file_type in ['mp4', 'avi', 'mkv', 'mov', 'flv', 'wmv']:
|
||||
me['type'] = 'video_url'
|
||||
me['video_url'] = {'url': me['file_url']}
|
||||
del me['file_url']
|
||||
del me['file_name']
|
||||
use_time_num += 1
|
||||
use_time_ids.append(message_id)
|
||||
is_enable_multi_model = False
|
||||
# 2. 语音文件识别, 无法通过openai的audio字段传递,暂时不支持
|
||||
# https://bailian.console.aliyun.com/?tab=doc#/doc/?type=model&url=2979031
|
||||
elif file_type in [
|
||||
'aac',
|
||||
'amr',
|
||||
'aiff',
|
||||
'flac',
|
||||
'm4a',
|
||||
'mp3',
|
||||
'mpeg',
|
||||
'ogg',
|
||||
'opus',
|
||||
'wav',
|
||||
'webm',
|
||||
'wma',
|
||||
]:
|
||||
me['audio'] = me['file_url']
|
||||
me['type'] = 'audio'
|
||||
del me['file_url']
|
||||
del me['type']
|
||||
del me['file_name']
|
||||
is_use_dashscope_call = True
|
||||
use_time_num += 1
|
||||
use_time_ids.append(message_id)
|
||||
is_enable_multi_model = False
|
||||
message_id += 1
|
||||
|
||||
# 使用列表推导式,保留不在 use_time_ids[:-1] 中的元素,仅保留最后一个多媒体消息
|
||||
if not is_enable_multi_model and use_time_num > 1:
|
||||
messages = [msg for idx, msg in enumerate(messages) if idx not in use_time_ids[:-1]]
|
||||
|
||||
if not is_enable_multi_model:
|
||||
messages = [msg for msg in messages if 'resp_message_id' not in msg]
|
||||
|
||||
args['messages'] = messages
|
||||
args['stream'] = True
|
||||
|
||||
# 流式处理状态
|
||||
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant' # 默认角色
|
||||
|
||||
if is_use_dashscope_call:
|
||||
response = dashscope.MultiModalConversation.call(
|
||||
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key = "sk-xxx"
|
||||
api_key=use_model.provider.token_mgr.get_token(),
|
||||
model=use_model.model_entity.name,
|
||||
messages=messages,
|
||||
result_format='message',
|
||||
asr_options={
|
||||
# "language": "zh", # 可选,若已知音频的语种,可通过该参数指定待识别语种,以提升识别准确率
|
||||
'enable_lid': True,
|
||||
'enable_itn': False,
|
||||
},
|
||||
stream=True,
|
||||
)
|
||||
content_length_list = []
|
||||
previous_length = 0 # 记录上一次的内容长度
|
||||
for res in response:
|
||||
chunk = res['output']
|
||||
# 解析 chunk 数据
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta_content = choice['message'].content[0]['text']
|
||||
finish_reason = choice['finish_reason']
|
||||
content_length_list.append(len(delta_content))
|
||||
else:
|
||||
delta_content = ''
|
||||
finish_reason = None
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 检查 content_length_list 是否有足够的数据
|
||||
if len(content_length_list) >= 2:
|
||||
now_content = delta_content[previous_length : content_length_list[-1]]
|
||||
previous_length = content_length_list[-1] # 更新上一次的长度
|
||||
else:
|
||||
now_content = delta_content # 第一次循环时直接使用 delta_content
|
||||
previous_length = len(delta_content) # 更新上一次的长度
|
||||
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': now_content if now_content else None,
|
||||
'is_final': bool(finish_reason) and finish_reason != 'null',
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
else:
|
||||
async for chunk in self._req_stream(args, extra_body=extra_args):
|
||||
# 解析 chunk 数据
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
else:
|
||||
delta = {}
|
||||
finish_reason = None
|
||||
|
||||
# 从第一个 chunk 获取 role,后续使用这个 role
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
# 获取增量内容
|
||||
delta_content = delta.get('content', '')
|
||||
reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
# 处理 reasoning_content
|
||||
if reasoning_content:
|
||||
# accumulated_reasoning += reasoning_content
|
||||
# 如果设置了 remove_think,跳过 reasoning_content
|
||||
if remove_think:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 第一次出现 reasoning_content,添加 <think> 开始标签
|
||||
if not thinking_started:
|
||||
thinking_started = True
|
||||
delta_content = '<think>\n' + reasoning_content
|
||||
else:
|
||||
# 继续输出 reasoning_content
|
||||
delta_content = reasoning_content
|
||||
elif thinking_started and not thinking_ended and delta_content:
|
||||
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
|
||||
thinking_ended = True
|
||||
delta_content = '\n</think>\n' + delta_content
|
||||
|
||||
# 处理工具调用增量
|
||||
if delta.get('tool_calls'):
|
||||
for tool_call in delta['tool_calls']:
|
||||
if tool_call['id'] != '':
|
||||
tool_id = tool_call['id']
|
||||
if tool_call['function']['name'] is not None:
|
||||
tool_name = tool_call['function']['name']
|
||||
|
||||
if tool_call['type'] is None:
|
||||
tool_call['type'] = 'function'
|
||||
tool_call['id'] = tool_id
|
||||
tool_call['function']['name'] = tool_name
|
||||
tool_call['function']['arguments'] = (
|
||||
'' if tool_call['function']['arguments'] is None else tool_call['function']['arguments']
|
||||
)
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': delta.get('tool_calls'),
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
# return
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 阿里云百炼
|
||||
icon: bailian.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,7 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: maas
|
||||
execution:
|
||||
|
||||
702
src/langbot/pkg/provider/modelmgr/requesters/chatcmpl.py
Normal file
@@ -0,0 +1,702 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import typing
|
||||
|
||||
import openai
|
||||
import openai.types.chat.chat_completion as chat_completion_module
|
||||
import httpx
|
||||
|
||||
from .. import errors, requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class OpenAIChatCompletions(requester.ProviderAPIRequester):
|
||||
"""OpenAI ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.openai.com/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def initialize(self):
|
||||
self.client = openai.AsyncClient(
|
||||
api_key=self.init_api_key,
|
||||
base_url=self.requester_cfg['base_url'].replace(' ', ''),
|
||||
timeout=self.requester_cfg['timeout'],
|
||||
http_client=httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']),
|
||||
)
|
||||
|
||||
def _mask_api_key(self, api_key: str | None) -> str:
|
||||
if not api_key:
|
||||
return ''
|
||||
if len(api_key) <= 8:
|
||||
return '****'
|
||||
return f'{api_key[:4]}...{api_key[-4:]}'
|
||||
|
||||
def _infer_model_type(self, model_id: str) -> str:
|
||||
normalized_model_id = (model_id or '').lower()
|
||||
embedding_keywords = (
|
||||
'embedding',
|
||||
'embed',
|
||||
'bge-',
|
||||
'e5-',
|
||||
'm3e',
|
||||
'gte-',
|
||||
'multilingual-e5',
|
||||
'text-embedding',
|
||||
)
|
||||
return 'embedding' if any(keyword in normalized_model_id for keyword in embedding_keywords) else 'llm'
|
||||
|
||||
def _infer_model_abilities(self, item: dict[str, typing.Any], model_id: str) -> list[str]:
|
||||
normalized_model_id = (model_id or '').lower()
|
||||
abilities: set[str] = set()
|
||||
|
||||
def _flatten(value: typing.Any) -> list[str]:
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
return [value.lower()]
|
||||
if isinstance(value, dict):
|
||||
flattened: list[str] = []
|
||||
for nested_value in value.values():
|
||||
flattened.extend(_flatten(nested_value))
|
||||
return flattened
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
flattened: list[str] = []
|
||||
for nested_value in value:
|
||||
flattened.extend(_flatten(nested_value))
|
||||
return flattened
|
||||
return [str(value).lower()]
|
||||
|
||||
capability_tokens = _flatten(item.get('capabilities'))
|
||||
capability_tokens.extend(_flatten(item.get('modalities')))
|
||||
capability_tokens.extend(_flatten(item.get('input_modalities')))
|
||||
capability_tokens.extend(_flatten(item.get('output_modalities')))
|
||||
capability_tokens.extend(_flatten(item.get('supported_generation_methods')))
|
||||
capability_tokens.extend(_flatten(item.get('supported_parameters')))
|
||||
capability_tokens.extend(_flatten(item.get('architecture')))
|
||||
|
||||
combined_tokens = capability_tokens + [normalized_model_id]
|
||||
|
||||
vision_keywords = (
|
||||
'vision',
|
||||
'image',
|
||||
'file',
|
||||
'video',
|
||||
'multimodal',
|
||||
'vl',
|
||||
'ocr',
|
||||
'omni',
|
||||
)
|
||||
function_call_keywords = (
|
||||
'function',
|
||||
'tool',
|
||||
'tools',
|
||||
'tool_choice',
|
||||
'tool_call',
|
||||
'tool-use',
|
||||
'tool_use',
|
||||
)
|
||||
|
||||
if any(any(keyword in token for keyword in vision_keywords) for token in combined_tokens):
|
||||
abilities.add('vision')
|
||||
|
||||
if any(any(keyword in token for keyword in function_call_keywords) for token in combined_tokens):
|
||||
abilities.add('func_call')
|
||||
|
||||
return sorted(abilities)
|
||||
|
||||
def _normalize_modalities(self, value: typing.Any) -> list[str]:
|
||||
normalized: list[str] = []
|
||||
|
||||
def _collect(item: typing.Any):
|
||||
if item is None:
|
||||
return
|
||||
if isinstance(item, str):
|
||||
for part in item.replace('->', ',').replace('+', ',').split(','):
|
||||
token = part.strip().lower()
|
||||
if token and token not in normalized:
|
||||
normalized.append(token)
|
||||
return
|
||||
if isinstance(item, dict):
|
||||
for nested in item.values():
|
||||
_collect(nested)
|
||||
return
|
||||
if isinstance(item, (list, tuple, set)):
|
||||
for nested in item:
|
||||
_collect(nested)
|
||||
return
|
||||
|
||||
_collect(value)
|
||||
return normalized
|
||||
|
||||
def _extract_scan_metadata(self, item: dict[str, typing.Any], model_id: str) -> dict[str, typing.Any]:
|
||||
display_name = item.get('name')
|
||||
if not isinstance(display_name, str) or not display_name.strip() or display_name == model_id:
|
||||
display_name = ''
|
||||
|
||||
description = item.get('description')
|
||||
if not isinstance(description, str) or not description.strip():
|
||||
description = ''
|
||||
|
||||
context_length = item.get('context_length')
|
||||
if context_length is None and isinstance(item.get('top_provider'), dict):
|
||||
context_length = item['top_provider'].get('context_length')
|
||||
|
||||
if not isinstance(context_length, int):
|
||||
try:
|
||||
context_length = int(context_length) if context_length is not None else None
|
||||
except (TypeError, ValueError):
|
||||
context_length = None
|
||||
|
||||
input_modalities = self._normalize_modalities(item.get('input_modalities'))
|
||||
output_modalities = self._normalize_modalities(item.get('output_modalities'))
|
||||
|
||||
if isinstance(item.get('architecture'), dict):
|
||||
if not input_modalities:
|
||||
input_modalities = self._normalize_modalities(item['architecture'].get('input_modalities'))
|
||||
if not output_modalities:
|
||||
output_modalities = self._normalize_modalities(item['architecture'].get('output_modalities'))
|
||||
|
||||
owned_by = item.get('owned_by')
|
||||
if not isinstance(owned_by, str) or not owned_by.strip():
|
||||
owned_by = ''
|
||||
|
||||
return {
|
||||
'display_name': display_name or None,
|
||||
'description': description or None,
|
||||
'context_length': context_length,
|
||||
'owned_by': owned_by or None,
|
||||
'input_modalities': input_modalities,
|
||||
'output_modalities': output_modalities,
|
||||
}
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
headers = {}
|
||||
if api_key:
|
||||
headers['Authorization'] = f'Bearer {api_key}'
|
||||
|
||||
models_url = f'{self.requester_cfg["base_url"].rstrip("/")}/models'
|
||||
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
|
||||
response = await client.get(models_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
|
||||
models = []
|
||||
for item in payload.get('data', []):
|
||||
model_id = item.get('id')
|
||||
if not model_id:
|
||||
continue
|
||||
models.append(
|
||||
{
|
||||
'id': model_id,
|
||||
'name': model_id,
|
||||
'type': self._infer_model_type(model_id),
|
||||
'abilities': self._infer_model_abilities(item, model_id),
|
||||
**self._extract_scan_metadata(item, model_id),
|
||||
}
|
||||
)
|
||||
|
||||
models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
|
||||
return {
|
||||
'models': models,
|
||||
'debug': {
|
||||
'request': {
|
||||
'method': 'GET',
|
||||
'url': models_url,
|
||||
'headers': {
|
||||
'Authorization': f'Bearer {self._mask_api_key(api_key)}' if api_key else '',
|
||||
},
|
||||
},
|
||||
'response': payload,
|
||||
},
|
||||
}
|
||||
|
||||
async def _req(
|
||||
self,
|
||||
args: dict,
|
||||
extra_body: dict = {},
|
||||
) -> chat_completion_module.ChatCompletion:
|
||||
return await self.client.chat.completions.create(**args, extra_body=extra_body)
|
||||
|
||||
async def _req_stream(
|
||||
self,
|
||||
args: dict,
|
||||
extra_body: dict = {},
|
||||
):
|
||||
async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body):
|
||||
yield chunk
|
||||
|
||||
async def _make_msg(
|
||||
self,
|
||||
chat_completion: chat_completion_module.ChatCompletion,
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message:
|
||||
if not isinstance(chat_completion, chat_completion_module.ChatCompletion):
|
||||
raise TypeError(f'Expected ChatCompletion, got {type(chat_completion).__name__}: {chat_completion[:16]}')
|
||||
|
||||
chatcmpl_message = chat_completion.choices[0].message.model_dump()
|
||||
|
||||
# 确保 role 字段存在且不为 None
|
||||
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
|
||||
chatcmpl_message['role'] = 'assistant'
|
||||
|
||||
# 处理思维链
|
||||
content = chatcmpl_message.get('content', '')
|
||||
reasoning_content = chatcmpl_message.get('reasoning_content', None)
|
||||
|
||||
processed_content, _ = await self._process_thinking_content(
|
||||
content=content, reasoning_content=reasoning_content, remove_think=remove_think
|
||||
)
|
||||
|
||||
chatcmpl_message['content'] = processed_content
|
||||
|
||||
# 移除 reasoning_content 字段,避免传递给 Message
|
||||
if 'reasoning_content' in chatcmpl_message:
|
||||
del chatcmpl_message['reasoning_content']
|
||||
|
||||
message = provider_message.Message(**chatcmpl_message)
|
||||
|
||||
return message
|
||||
|
||||
async def _process_thinking_content(
|
||||
self,
|
||||
content: str,
|
||||
reasoning_content: str = None,
|
||||
remove_think: bool = False,
|
||||
) -> tuple[str, str]:
|
||||
"""处理思维链内容
|
||||
|
||||
Args:
|
||||
content: 原始内容
|
||||
reasoning_content: reasoning_content 字段内容
|
||||
remove_think: 是否移除思维链
|
||||
|
||||
Returns:
|
||||
(处理后的内容, 提取的思维链内容)
|
||||
"""
|
||||
thinking_content = ''
|
||||
|
||||
# 1. 从 reasoning_content 提取思维链
|
||||
if reasoning_content:
|
||||
thinking_content = reasoning_content
|
||||
|
||||
# 2. 从 content 中提取 <think> 标签内容
|
||||
if content and '<think>' in content and '</think>' in content:
|
||||
import re
|
||||
|
||||
think_pattern = r'<think>(.*?)</think>'
|
||||
think_matches = re.findall(think_pattern, content, re.DOTALL)
|
||||
if think_matches:
|
||||
# 如果已有 reasoning_content,则追加
|
||||
if thinking_content:
|
||||
thinking_content += '\n' + '\n'.join(think_matches)
|
||||
else:
|
||||
thinking_content = '\n'.join(think_matches)
|
||||
# 移除 content 中的 <think> 标签
|
||||
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
|
||||
|
||||
# 3. 根据 remove_think 参数决定是否保留思维链
|
||||
if remove_think:
|
||||
return content, ''
|
||||
else:
|
||||
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
|
||||
if thinking_content:
|
||||
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
|
||||
return content, thinking_content
|
||||
|
||||
async def _closure_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.MessageChunk:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
args['stream'] = True
|
||||
|
||||
# 流式处理状态
|
||||
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant' # 默认角色
|
||||
tool_id = ''
|
||||
tool_name = ''
|
||||
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
|
||||
|
||||
async for chunk in self._req_stream(args, extra_body=extra_args):
|
||||
# 解析 chunk 数据
|
||||
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
else:
|
||||
delta = {}
|
||||
finish_reason = None
|
||||
# 从第一个 chunk 获取 role,后续使用这个 role
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
# 获取增量内容
|
||||
delta_content = delta.get('content', '')
|
||||
reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
# 处理 reasoning_content
|
||||
if reasoning_content:
|
||||
# accumulated_reasoning += reasoning_content
|
||||
# 如果设置了 remove_think,跳过 reasoning_content
|
||||
if remove_think:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 第一次出现 reasoning_content,添加 <think> 开始标签
|
||||
if not thinking_started:
|
||||
thinking_started = True
|
||||
delta_content = '<think>\n' + reasoning_content
|
||||
else:
|
||||
# 继续输出 reasoning_content
|
||||
delta_content = reasoning_content
|
||||
elif thinking_started and not thinking_ended and delta_content:
|
||||
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
|
||||
thinking_ended = True
|
||||
delta_content = '\n</think>\n' + delta_content
|
||||
|
||||
# 处理 content 中已有的 <think> 标签(如果需要移除)
|
||||
# if delta_content and remove_think and '<think>' in delta_content:
|
||||
# import re
|
||||
#
|
||||
# # 移除 <think> 标签及其内容
|
||||
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
|
||||
|
||||
# 处理工具调用增量
|
||||
# delta_tool_calls = None
|
||||
if delta.get('tool_calls'):
|
||||
for tool_call in delta['tool_calls']:
|
||||
if tool_call['id'] and tool_call['function']['name']:
|
||||
tool_id = tool_call['id']
|
||||
tool_name = tool_call['function']['name']
|
||||
else:
|
||||
tool_call['id'] = tool_id
|
||||
tool_call['function']['name'] = tool_name
|
||||
if tool_call['type'] is None:
|
||||
tool_call['type'] = 'function'
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
|
||||
chunk_idx += 1
|
||||
continue
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': delta.get('tool_calls'),
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
|
||||
async def _closure(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> tuple[provider_message.Message, dict]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
|
||||
# 发送请求
|
||||
|
||||
resp = await self._req(args, extra_body=extra_args)
|
||||
# 处理请求结果
|
||||
message = await self._make_msg(resp, remove_think)
|
||||
|
||||
# Extract token usage from response
|
||||
usage_info = {}
|
||||
if hasattr(resp, 'usage') and resp.usage:
|
||||
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
|
||||
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
|
||||
usage_info['total_tokens'] = resp.usage.total_tokens or 0
|
||||
|
||||
return message, usage_info
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> tuple[provider_message.Message, dict]:
|
||||
"""Invoke LLM and return message with usage info"""
|
||||
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
|
||||
for m in messages:
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
content = msg_dict.get('content')
|
||||
if isinstance(content, list):
|
||||
# 检查 content 列表中是否每个部分都是文本
|
||||
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
|
||||
# 将所有文本部分合并为一个字符串
|
||||
msg_dict['content'] = '\n'.join(part['text'] for part in content)
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
try:
|
||||
msg, usage_info = await self._closure(
|
||||
query=query,
|
||||
req_messages=req_messages,
|
||||
use_model=model,
|
||||
use_funcs=funcs,
|
||||
extra_args=extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
return msg, usage_info
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
except openai.BadRequestError as e:
|
||||
error_message = str(e.message) if hasattr(e, 'message') else str(e)
|
||||
if 'context_length_exceeded' in str(e):
|
||||
raise errors.RequesterError(f'上文过长,请重置会话: {error_message}')
|
||||
else:
|
||||
raise errors.RequesterError(f'请求参数错误: {error_message}')
|
||||
except openai.AuthenticationError as e:
|
||||
error_message = str(e.message) if hasattr(e, 'message') else str(e)
|
||||
raise errors.RequesterError(f'无效的 api-key: {error_message}')
|
||||
except openai.NotFoundError as e:
|
||||
error_message = str(e.message) if hasattr(e, 'message') else str(e)
|
||||
raise errors.RequesterError(f'请求路径错误: {error_message}')
|
||||
except openai.RateLimitError as e:
|
||||
error_message = str(e.message) if hasattr(e, 'message') else str(e)
|
||||
raise errors.RequesterError(f'请求过于频繁或余额不足: {error_message}')
|
||||
except openai.APIConnectionError as e:
|
||||
error_message = f'连接错误: {str(e)}'
|
||||
raise errors.RequesterError(error_message)
|
||||
except openai.APIError as e:
|
||||
error_message = str(e.message) if hasattr(e, 'message') else str(e)
|
||||
raise errors.RequesterError(f'请求错误: {error_message}')
|
||||
|
||||
async def invoke_embedding(
|
||||
self,
|
||||
model: requester.RuntimeEmbeddingModel,
|
||||
input_text: list[str],
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
) -> tuple[list[list[float]], dict]:
|
||||
"""调用 Embedding API, returns (embeddings, usage_info)"""
|
||||
self.client.api_key = model.provider.token_mgr.get_token()
|
||||
|
||||
args = {
|
||||
'model': model.model_entity.name,
|
||||
'input': input_text,
|
||||
}
|
||||
|
||||
if model.model_entity.extra_args:
|
||||
args.update(model.model_entity.extra_args)
|
||||
|
||||
args.update(extra_args)
|
||||
|
||||
try:
|
||||
resp = await self.client.embeddings.create(**args)
|
||||
|
||||
# Extract usage info
|
||||
usage_info = {}
|
||||
if hasattr(resp, 'usage') and resp.usage:
|
||||
usage_info['prompt_tokens'] = resp.usage.prompt_tokens or 0
|
||||
usage_info['total_tokens'] = resp.usage.total_tokens or 0
|
||||
|
||||
return [d.embedding for d in resp.data], usage_info
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
except openai.BadRequestError as e:
|
||||
raise errors.RequesterError(f'请求参数错误: {e.message}')
|
||||
|
||||
async def invoke_llm_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.MessageChunk:
|
||||
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
|
||||
for m in messages:
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
content = msg_dict.get('content')
|
||||
if isinstance(content, list):
|
||||
# 检查 content 列表中是否每个部分都是文本
|
||||
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
|
||||
# 将所有文本部分合并为一个字符串
|
||||
msg_dict['content'] = '\n'.join(part['text'] for part in content)
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
try:
|
||||
async for item in self._closure_stream(
|
||||
query=query,
|
||||
req_messages=req_messages,
|
||||
use_model=model,
|
||||
use_funcs=funcs,
|
||||
extra_args=extra_args,
|
||||
remove_think=remove_think,
|
||||
):
|
||||
yield item
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
except openai.BadRequestError as e:
|
||||
if 'context_length_exceeded' in e.message:
|
||||
raise errors.RequesterError(f'上文过长,请重置会话: {e.message}')
|
||||
else:
|
||||
raise errors.RequesterError(f'请求参数错误: {e.message}')
|
||||
except openai.AuthenticationError as e:
|
||||
raise errors.RequesterError(f'无效的 api-key: {e.message}')
|
||||
except openai.NotFoundError as e:
|
||||
raise errors.RequesterError(f'请求路径错误: {e.message}')
|
||||
except openai.RateLimitError as e:
|
||||
raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}')
|
||||
except openai.APIError as e:
|
||||
raise errors.RequesterError(f'请求错误: {e.message}')
|
||||
|
||||
async def invoke_rerank(
|
||||
self,
|
||||
model: requester.RuntimeRerankModel,
|
||||
query: str,
|
||||
documents: typing.List[str],
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
) -> typing.List[dict]:
|
||||
"""Standard /rerank endpoint (Jina/Cohere/SiliconFlow/Voyage/DashScope compatible)
|
||||
|
||||
Supports extra_args from model.extra_args:
|
||||
- rerank_url: full URL override (e.g. "https://dashscope.aliyuncs.com/compatible-api/v1/reranks")
|
||||
- rerank_path: path override appended to base_url (e.g. "reranks" instead of default "rerank")
|
||||
- Any other fields are merged into the request payload.
|
||||
"""
|
||||
api_key = model.provider.token_mgr.get_token()
|
||||
base_url = self.requester_cfg.get('base_url', '').rstrip('/')
|
||||
timeout = self.requester_cfg.get('timeout', 120)
|
||||
|
||||
merged_args = {}
|
||||
if model.model_entity.extra_args:
|
||||
merged_args.update(model.model_entity.extra_args)
|
||||
if extra_args:
|
||||
merged_args.update(extra_args)
|
||||
|
||||
rerank_url = merged_args.pop('rerank_url', None)
|
||||
rerank_path = merged_args.pop('rerank_path', 'rerank')
|
||||
if not rerank_url:
|
||||
rerank_url = f'{base_url}/{rerank_path}'
|
||||
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': f'Bearer {api_key}',
|
||||
}
|
||||
|
||||
payload = {
|
||||
'model': model.model_entity.name,
|
||||
'query': query,
|
||||
'documents': documents[:64],
|
||||
'top_n': min(len(documents), 64),
|
||||
}
|
||||
|
||||
if merged_args:
|
||||
payload.update(merged_args)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(trust_env=True, timeout=timeout) as client:
|
||||
resp = await client.post(rerank_url, headers=headers, json=payload)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
results = self._parse_rerank_response(data)
|
||||
|
||||
if results:
|
||||
scores = [r.get('relevance_score', 0.0) for r in results]
|
||||
min_score = min(scores)
|
||||
max_score = max(scores)
|
||||
if max_score - min_score > 1e-6:
|
||||
for r in results:
|
||||
r['relevance_score'] = (r['relevance_score'] - min_score) / (max_score - min_score)
|
||||
|
||||
return results
|
||||
except httpx.HTTPStatusError as e:
|
||||
raise errors.RequesterError(f'Rerank request failed: {e.response.status_code} - {e.response.text}')
|
||||
except httpx.TimeoutException:
|
||||
raise errors.RequesterError('Rerank request timed out')
|
||||
except Exception as e:
|
||||
raise errors.RequesterError(f'Rerank request error: {str(e)}')
|
||||
|
||||
@staticmethod
|
||||
def _parse_rerank_response(data: dict) -> typing.List[dict]:
|
||||
"""Parse rerank response from various providers.
|
||||
|
||||
Handles:
|
||||
- Jina/Cohere/SiliconFlow: {"results": [{"index", "relevance_score"}]}
|
||||
- Voyage AI: {"data": [{"index", "relevance_score"}]}
|
||||
- DashScope: {"output": {"results": [{"index", "relevance_score"}]}}
|
||||
"""
|
||||
if 'results' in data:
|
||||
return data['results']
|
||||
if 'data' in data:
|
||||
return data['data']
|
||||
if 'output' in data and isinstance(data['output'], dict):
|
||||
return data['output'].get('results', [])
|
||||
return []
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: OpenAI
|
||||
icon: openai.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Cohere
|
||||
icon: cohere.svg
|
||||
spec:
|
||||
litellm_provider: cohere
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class CompShareChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""CompShare ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.modelverse.cn/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 优云智算
|
||||
icon: compshare.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: maas
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from . import chatcmpl
|
||||
from .. import errors, requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class DeepseekChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""Deepseek ChatCompletion API 请求器"""
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.deepseek.com',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def _closure(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> tuple[provider_message.Message, dict]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages
|
||||
|
||||
# deepseek 不支持多模态,把content都转换成纯文字
|
||||
for m in messages:
|
||||
if 'content' in m and isinstance(m['content'], list):
|
||||
m['content'] = ' '.join([c['text'] for c in m['content'] if 'text' in c])
|
||||
|
||||
args['messages'] = messages
|
||||
|
||||
# 发送请求
|
||||
resp = await self._req(args, extra_body=extra_args)
|
||||
|
||||
# print(resp)
|
||||
|
||||
if resp is None:
|
||||
raise errors.RequesterError('接口返回为空,请确定模型提供商服务是否正常')
|
||||
# 处理请求结果
|
||||
message = await self._make_msg(resp, remove_think)
|
||||
|
||||
# Extract token usage from response
|
||||
usage_info = {}
|
||||
if hasattr(resp, 'usage') and resp.usage:
|
||||
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
|
||||
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
|
||||
usage_info['total_tokens'] = resp.usage.total_tokens or 0
|
||||
|
||||
return message, usage_info
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: DeepSeek
|
||||
icon: deepseek.svg
|
||||
spec:
|
||||
litellm_provider: deepseek
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#3B82F6"/>
|
||||
<text x="30" y="32" font-family="Arial, sans-serif" font-size="12" font-weight="bold" fill="white" text-anchor="middle">豆包</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 282 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: doubao-chat-completions
|
||||
label:
|
||||
en_US: ByteDance Doubao
|
||||
zh_Hans: 字节豆包
|
||||
icon: doubao.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://ark.cn-beijing.volces.com/api/v3
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
205
src/langbot/pkg/provider/modelmgr/requesters/geminichatcmpl.py
Normal file
@@ -0,0 +1,205 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import httpx
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
import uuid
|
||||
|
||||
from .. import requester
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
|
||||
|
||||
class GeminiChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""Google Gemini API 请求器"""
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://generativelanguage.googleapis.com/v1beta/openai',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
models_url = 'https://generativelanguage.googleapis.com/v1beta/models'
|
||||
params = {'key': api_key} if api_key else {}
|
||||
|
||||
all_models: list[dict[str, typing.Any]] = []
|
||||
next_page_token = ''
|
||||
last_payload: dict[str, typing.Any] = {}
|
||||
|
||||
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
|
||||
while True:
|
||||
request_params = dict(params)
|
||||
if next_page_token:
|
||||
request_params['pageToken'] = next_page_token
|
||||
|
||||
response = await client.get(models_url, params=request_params)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
last_payload = payload
|
||||
|
||||
for item in payload.get('models', []):
|
||||
model_name = item.get('name', '')
|
||||
model_id = model_name.replace('models/', '', 1)
|
||||
if not model_id:
|
||||
continue
|
||||
|
||||
supported_methods = item.get('supportedGenerationMethods', []) or []
|
||||
if 'embedContent' in supported_methods and 'generateContent' not in supported_methods:
|
||||
model_type = 'embedding'
|
||||
else:
|
||||
model_type = 'llm'
|
||||
|
||||
all_models.append(
|
||||
{
|
||||
'id': model_id,
|
||||
'name': model_id,
|
||||
'type': model_type,
|
||||
'abilities': self._infer_model_abilities(item, model_id),
|
||||
'display_name': item.get('displayName') or None,
|
||||
'description': item.get('description') or None,
|
||||
'context_length': item.get('inputTokenLimit'),
|
||||
'input_modalities': self._normalize_modalities(item.get('inputModalities')),
|
||||
'output_modalities': self._normalize_modalities(item.get('outputModalities')),
|
||||
}
|
||||
)
|
||||
|
||||
next_page_token = payload.get('nextPageToken', '')
|
||||
if not next_page_token:
|
||||
break
|
||||
|
||||
all_models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
|
||||
return {
|
||||
'models': all_models,
|
||||
'debug': {
|
||||
'request': {
|
||||
'method': 'GET',
|
||||
'url': models_url,
|
||||
'query': {'key': self._mask_api_key(api_key)} if api_key else {},
|
||||
},
|
||||
'response': last_payload,
|
||||
},
|
||||
}
|
||||
|
||||
async def _closure_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.MessageChunk:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
args['stream'] = True
|
||||
|
||||
# 流式处理状态
|
||||
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant' # 默认角色
|
||||
tool_id = ''
|
||||
tool_name = ''
|
||||
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
|
||||
|
||||
async for chunk in self._req_stream(args, extra_body=extra_args):
|
||||
# 解析 chunk 数据
|
||||
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
else:
|
||||
delta = {}
|
||||
finish_reason = None
|
||||
# 从第一个 chunk 获取 role,后续使用这个 role
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
# 获取增量内容
|
||||
delta_content = delta.get('content', '')
|
||||
reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
# 处理 reasoning_content
|
||||
if reasoning_content:
|
||||
# accumulated_reasoning += reasoning_content
|
||||
# 如果设置了 remove_think,跳过 reasoning_content
|
||||
if remove_think:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 第一次出现 reasoning_content,添加 <think> 开始标签
|
||||
if not thinking_started:
|
||||
thinking_started = True
|
||||
delta_content = '<think>\n' + reasoning_content
|
||||
else:
|
||||
# 继续输出 reasoning_content
|
||||
delta_content = reasoning_content
|
||||
elif thinking_started and not thinking_ended and delta_content:
|
||||
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
|
||||
thinking_ended = True
|
||||
delta_content = '\n</think>\n' + delta_content
|
||||
|
||||
# 处理 content 中已有的 <think> 标签(如果需要移除)
|
||||
# if delta_content and remove_think and '<think>' in delta_content:
|
||||
# import re
|
||||
#
|
||||
# # 移除 <think> 标签及其内容
|
||||
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
|
||||
|
||||
# 处理工具调用增量
|
||||
# delta_tool_calls = None
|
||||
if delta.get('tool_calls'):
|
||||
for tool_call in delta['tool_calls']:
|
||||
if tool_call['id'] == '' and tool_id == '':
|
||||
tool_id = str(uuid.uuid4())
|
||||
if tool_call['function']['name']:
|
||||
tool_name = tool_call['function']['name']
|
||||
tool_call['id'] = tool_id
|
||||
tool_call['function']['name'] = tool_name
|
||||
if tool_call['type'] is None:
|
||||
tool_call['type'] = 'function'
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
|
||||
chunk_idx += 1
|
||||
continue
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': delta.get('tool_calls'),
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Google Gemini
|
||||
icon: gemini.svg
|
||||
spec:
|
||||
litellm_provider: gemini
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
import typing
|
||||
|
||||
from . import ppiochatcmpl
|
||||
|
||||
|
||||
class GiteeAIChatCompletions(ppiochatcmpl.PPIOChatCompletions):
|
||||
"""Gitee AI ChatCompletions API 请求器"""
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://ai.gitee.com/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Gitee AI
|
||||
icon: giteeai.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#F97316"/>
|
||||
<text x="30" y="32" font-family="Arial, sans-serif" font-size="14" font-weight="bold" fill="white" text-anchor="middle">Groq</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 280 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: groq-chat-completions
|
||||
label:
|
||||
en_US: Groq
|
||||
zh_Hans: Groq
|
||||
icon: groq.svg
|
||||
spec:
|
||||
litellm_provider: groq
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://api.groq.com/openai/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#0066FF"/>
|
||||
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">iFlytek</text>
|
||||
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">Spark</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 398 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: iflytek-chat-completions
|
||||
label:
|
||||
en_US: iFlytek Spark
|
||||
zh_Hans: 讯飞星火
|
||||
icon: iflytek.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://spark-api-open.xf-yun.com/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
208
src/langbot/pkg/provider/modelmgr/requesters/jiekouaichatcmpl.py
Normal file
@@ -0,0 +1,208 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import openai
|
||||
import typing
|
||||
|
||||
from . import chatcmpl
|
||||
from .. import requester
|
||||
import openai.types.chat.chat_completion as chat_completion
|
||||
import re
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
|
||||
|
||||
class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""接口 AI ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.jiekou.ai/openai',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
is_think: bool = False
|
||||
|
||||
async def _make_msg(
|
||||
self,
|
||||
chat_completion: chat_completion.ChatCompletion,
|
||||
remove_think: bool,
|
||||
) -> provider_message.Message:
|
||||
chatcmpl_message = chat_completion.choices[0].message.model_dump()
|
||||
# print(chatcmpl_message.keys(), chatcmpl_message.values())
|
||||
|
||||
# 确保 role 字段存在且不为 None
|
||||
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
|
||||
chatcmpl_message['role'] = 'assistant'
|
||||
|
||||
reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None
|
||||
|
||||
# deepseek的reasoner模型
|
||||
chatcmpl_message['content'] = await self._process_thinking_content(
|
||||
chatcmpl_message['content'], reasoning_content, remove_think
|
||||
)
|
||||
|
||||
# 移除 reasoning_content 字段,避免传递给 Message
|
||||
if 'reasoning_content' in chatcmpl_message:
|
||||
del chatcmpl_message['reasoning_content']
|
||||
|
||||
message = provider_message.Message(**chatcmpl_message)
|
||||
|
||||
return message
|
||||
|
||||
async def _process_thinking_content(
|
||||
self,
|
||||
content: str,
|
||||
reasoning_content: str = None,
|
||||
remove_think: bool = False,
|
||||
) -> tuple[str, str]:
|
||||
"""处理思维链内容
|
||||
|
||||
Args:
|
||||
content: 原始内容
|
||||
reasoning_content: reasoning_content 字段内容
|
||||
remove_think: 是否移除思维链
|
||||
|
||||
Returns:
|
||||
处理后的内容
|
||||
"""
|
||||
if remove_think:
|
||||
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
|
||||
else:
|
||||
if reasoning_content is not None:
|
||||
content = '<think>\n' + reasoning_content + '\n</think>\n' + content
|
||||
return content
|
||||
|
||||
async def _make_msg_chunk(
|
||||
self,
|
||||
delta: dict[str, typing.Any],
|
||||
idx: int,
|
||||
) -> provider_message.MessageChunk:
|
||||
# 处理流式chunk和完整响应的差异
|
||||
# print(chat_completion.choices[0])
|
||||
|
||||
# 确保 role 字段存在且不为 None
|
||||
if 'role' not in delta or delta['role'] is None:
|
||||
delta['role'] = 'assistant'
|
||||
|
||||
reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
|
||||
|
||||
delta['content'] = '' if delta['content'] is None else delta['content']
|
||||
# print(reasoning_content)
|
||||
|
||||
# deepseek的reasoner模型
|
||||
|
||||
if reasoning_content is not None:
|
||||
delta['content'] += reasoning_content
|
||||
|
||||
message = provider_message.MessageChunk(**delta)
|
||||
|
||||
return message
|
||||
|
||||
async def _closure_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
args['stream'] = True
|
||||
|
||||
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant' # 默认角色
|
||||
async for chunk in self._req_stream(args, extra_body=extra_args):
|
||||
# 解析 chunk 数据
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
else:
|
||||
delta = {}
|
||||
finish_reason = None
|
||||
|
||||
# 从第一个 chunk 获取 role,后续使用这个 role
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
# 获取增量内容
|
||||
delta_content = delta.get('content', '')
|
||||
# reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
if remove_think:
|
||||
if delta['content'] is not None:
|
||||
if '<think>' in delta['content'] and not thinking_started and not thinking_ended:
|
||||
thinking_started = True
|
||||
continue
|
||||
elif delta['content'] == r'</think>' and not thinking_ended:
|
||||
thinking_ended = True
|
||||
continue
|
||||
elif thinking_ended and delta['content'] == '\n\n' and thinking_started:
|
||||
thinking_started = False
|
||||
continue
|
||||
elif thinking_started and not thinking_ended:
|
||||
continue
|
||||
|
||||
# delta_tool_calls = None
|
||||
if delta.get('tool_calls'):
|
||||
for tool_call in delta['tool_calls']:
|
||||
if tool_call['id'] and tool_call['function']['name']:
|
||||
tool_id = tool_call['id']
|
||||
tool_name = tool_call['function']['name']
|
||||
|
||||
if tool_call['id'] is None:
|
||||
tool_call['id'] = tool_id
|
||||
if tool_call['function']['name'] is None:
|
||||
tool_call['function']['name'] = tool_name
|
||||
if tool_call['function']['arguments'] is None:
|
||||
tool_call['function']['arguments'] = ''
|
||||
if tool_call['type'] is None:
|
||||
tool_call['type'] = 'function'
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content and not delta.get('tool_calls'):
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': delta.get('tool_calls'),
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 接口 AI
|
||||
icon: jiekouai.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Jina
|
||||
icon: jina.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -1,571 +0,0 @@
|
||||
"""LiteLLM unified requester for chat, embedding, and rerank."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
import litellm
|
||||
from litellm import acompletion, aembedding, arerank
|
||||
|
||||
from .. import errors, requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class LiteLLMRequester(requester.ProviderAPIRequester):
|
||||
"""LiteLLM unified API requester supporting chat, embedding, and rerank."""
|
||||
|
||||
_EMBEDDING_MODEL_HINTS = ('embedding', 'embed', 'bge-', 'e5-', 'm3e', 'gte-', 'text-embedding')
|
||||
_RERANK_MODEL_HINTS = ('rerank', 're-rank', 're_rank')
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': '',
|
||||
'timeout': 120,
|
||||
'custom_llm_provider': '',
|
||||
'drop_params': False,
|
||||
'num_retries': 0,
|
||||
'api_version': '',
|
||||
}
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize LiteLLM client settings."""
|
||||
# LiteLLM doesn't require explicit client initialization
|
||||
# Configuration is passed per-request via litellm params
|
||||
pass
|
||||
|
||||
def _build_litellm_model_name(self, model_name: str, custom_llm_provider: str | None = None) -> str:
|
||||
"""Build LiteLLM model name with provider prefix if needed."""
|
||||
provider = custom_llm_provider or self.requester_cfg.get('custom_llm_provider', '')
|
||||
if provider:
|
||||
# LiteLLM format: provider/model_name
|
||||
if model_name.startswith(f'{provider}/'):
|
||||
return model_name
|
||||
return f'{provider}/{model_name}'
|
||||
# If no custom provider, assume model_name already includes prefix or is OpenAI-compatible
|
||||
return model_name
|
||||
|
||||
def _get_custom_llm_provider(self) -> str | None:
|
||||
return self.requester_cfg.get('custom_llm_provider') or None
|
||||
|
||||
def _safe_litellm_bool_helper(self, helper_name: str, model_name: str) -> bool:
|
||||
"""Call a LiteLLM boolean capability helper without letting metadata gaps fail requests."""
|
||||
helper = getattr(litellm, helper_name, None)
|
||||
if not callable(helper):
|
||||
return False
|
||||
|
||||
provider = self._get_custom_llm_provider()
|
||||
candidates: list[tuple[str, str | None]] = [(model_name, provider)]
|
||||
litellm_model_name = self._build_litellm_model_name(model_name)
|
||||
if litellm_model_name != model_name:
|
||||
candidates.append((litellm_model_name, None))
|
||||
|
||||
for candidate_model, candidate_provider in candidates:
|
||||
try:
|
||||
if bool(helper(model=candidate_model, custom_llm_provider=candidate_provider)):
|
||||
return True
|
||||
except Exception:
|
||||
continue
|
||||
return False
|
||||
|
||||
def _safe_context_length(self, model_name: str) -> int | None:
|
||||
helper = getattr(litellm, 'get_max_tokens', None)
|
||||
if not callable(helper):
|
||||
return None
|
||||
|
||||
candidates = [model_name]
|
||||
litellm_model_name = self._build_litellm_model_name(model_name)
|
||||
if litellm_model_name != model_name:
|
||||
candidates.append(litellm_model_name)
|
||||
|
||||
for candidate in candidates:
|
||||
try:
|
||||
max_tokens = helper(candidate)
|
||||
except Exception:
|
||||
continue
|
||||
if isinstance(max_tokens, int) and max_tokens > 0:
|
||||
return max_tokens
|
||||
return None
|
||||
|
||||
def _supports_function_calling(self, model_name: str) -> bool:
|
||||
return self._safe_litellm_bool_helper('supports_function_calling', model_name)
|
||||
|
||||
def _supports_vision(self, model_name: str) -> bool:
|
||||
return self._safe_litellm_bool_helper('supports_vision', model_name)
|
||||
|
||||
def _infer_model_type(self, model_id: str) -> str:
|
||||
normalized_id = (model_id or '').lower()
|
||||
if any(kw in normalized_id for kw in self._RERANK_MODEL_HINTS):
|
||||
return 'rerank'
|
||||
if any(kw in normalized_id for kw in self._EMBEDDING_MODEL_HINTS):
|
||||
return 'embedding'
|
||||
return 'llm'
|
||||
|
||||
def _enrich_scanned_model(self, model_id: str) -> dict[str, typing.Any]:
|
||||
model_type = self._infer_model_type(model_id)
|
||||
scanned_model: dict[str, typing.Any] = {
|
||||
'id': model_id,
|
||||
'name': model_id,
|
||||
'type': model_type,
|
||||
}
|
||||
|
||||
if model_type == 'llm':
|
||||
abilities = []
|
||||
if self._supports_function_calling(model_id):
|
||||
abilities.append('func_call')
|
||||
if self._supports_vision(model_id):
|
||||
abilities.append('vision')
|
||||
scanned_model['abilities'] = abilities
|
||||
|
||||
context_length = self._safe_context_length(model_id)
|
||||
if context_length is not None:
|
||||
scanned_model['context_length'] = context_length
|
||||
|
||||
return scanned_model
|
||||
|
||||
def _convert_messages(self, messages: typing.List[provider_message.Message]) -> list[dict]:
|
||||
"""Convert LangBot messages to LiteLLM/OpenAI format."""
|
||||
req_messages = []
|
||||
for m in messages:
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
content = msg_dict.get('content')
|
||||
|
||||
if isinstance(content, list):
|
||||
for part in content:
|
||||
if isinstance(part, dict) and part.get('type') == 'image_base64':
|
||||
part['image_url'] = {'url': part['image_base64']}
|
||||
part['type'] = 'image_url'
|
||||
del part['image_base64']
|
||||
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
return req_messages
|
||||
|
||||
def _process_thinking_content(self, content: str, reasoning_content: str | None, remove_think: bool) -> str:
|
||||
"""Process thinking/reasoning content.
|
||||
|
||||
Args:
|
||||
content: The main content from response
|
||||
reasoning_content: Separate reasoning content from model
|
||||
remove_think: If True, remove thinking markers; if False, preserve them
|
||||
|
||||
Returns:
|
||||
Processed content string
|
||||
"""
|
||||
# Extract and handle thinking tags
|
||||
if content and 'CRETIRE_REASONING_BEGINk' in content and 'CRETIRE_REASONING_ENDk' in content:
|
||||
import re
|
||||
|
||||
think_pattern = r'CRETIRE_REASONING_BEGINk(.*?)CRETIRE_REASONING_ENDk'
|
||||
|
||||
if remove_think:
|
||||
# Remove thinking tags and their content from output
|
||||
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
|
||||
# else: preserve thinking content as-is
|
||||
|
||||
# Handle separate reasoning_content field
|
||||
# Currently we don't include reasoning_content in user-facing output regardless of remove_think
|
||||
# because it's typically internal model reasoning, not user-visible thinking
|
||||
return content or ''
|
||||
|
||||
@staticmethod
|
||||
def _normalize_usage(usage: typing.Any) -> dict:
|
||||
"""Normalize a LiteLLM/OpenAI usage object into a plain token dict.
|
||||
|
||||
Handles several real-world shapes returned by different upstreams:
|
||||
- object with ``prompt_tokens`` / ``completion_tokens`` / ``total_tokens`` attrs
|
||||
- dict with the same keys
|
||||
- missing ``total_tokens`` (derived from prompt + completion)
|
||||
- ``None`` / partially-populated usage (defaults to 0)
|
||||
"""
|
||||
if usage is None:
|
||||
return {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}
|
||||
|
||||
def _get(key: str) -> typing.Any:
|
||||
if isinstance(usage, dict):
|
||||
return usage.get(key)
|
||||
return getattr(usage, key, None)
|
||||
|
||||
prompt_tokens = _get('prompt_tokens') or 0
|
||||
completion_tokens = _get('completion_tokens') or 0
|
||||
total_tokens = _get('total_tokens') or 0
|
||||
|
||||
# Some providers omit total_tokens in streaming usage; derive it.
|
||||
if not total_tokens:
|
||||
total_tokens = prompt_tokens + completion_tokens
|
||||
|
||||
return {
|
||||
'prompt_tokens': int(prompt_tokens),
|
||||
'completion_tokens': int(completion_tokens),
|
||||
'total_tokens': int(total_tokens),
|
||||
}
|
||||
|
||||
def _extract_usage(self, response) -> dict:
|
||||
"""Extract usage info from a non-streaming LiteLLM response."""
|
||||
return self._normalize_usage(getattr(response, 'usage', None))
|
||||
|
||||
@staticmethod
|
||||
def _as_dict(value: typing.Any) -> dict:
|
||||
if value is None:
|
||||
return {}
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
if hasattr(value, 'model_dump'):
|
||||
return value.model_dump()
|
||||
return {}
|
||||
|
||||
def _normalize_stream_tool_calls(
|
||||
self,
|
||||
raw_tool_calls: typing.Any,
|
||||
tool_call_state: dict[int, dict[str, str]],
|
||||
) -> list[dict] | None:
|
||||
"""Fill OpenAI-style streaming tool-call deltas so MessageChunk can validate them."""
|
||||
if not raw_tool_calls:
|
||||
return None
|
||||
|
||||
normalized = []
|
||||
for fallback_index, raw_tool_call in enumerate(raw_tool_calls):
|
||||
tool_call = self._as_dict(raw_tool_call)
|
||||
index = tool_call.get('index')
|
||||
if not isinstance(index, int):
|
||||
index = fallback_index
|
||||
|
||||
state = tool_call_state.setdefault(index, {'id': '', 'type': 'function', 'name': ''})
|
||||
if tool_call.get('id'):
|
||||
state['id'] = tool_call['id']
|
||||
if tool_call.get('type'):
|
||||
state['type'] = tool_call['type']
|
||||
|
||||
function = self._as_dict(tool_call.get('function'))
|
||||
if function.get('name'):
|
||||
state['name'] = function['name']
|
||||
|
||||
arguments = function.get('arguments')
|
||||
if arguments is None:
|
||||
arguments = ''
|
||||
elif not isinstance(arguments, str):
|
||||
arguments = str(arguments)
|
||||
|
||||
if not state['id'] or not state['name']:
|
||||
continue
|
||||
|
||||
normalized.append(
|
||||
{
|
||||
'id': state['id'],
|
||||
'type': state['type'] or 'function',
|
||||
'function': {
|
||||
'name': state['name'],
|
||||
'arguments': arguments,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
return normalized or None
|
||||
|
||||
def _build_common_args(self, args: dict, include_retry_params: bool = True) -> dict:
|
||||
"""Apply common requester config to args dict."""
|
||||
if self.requester_cfg.get('base_url'):
|
||||
args['api_base'] = self.requester_cfg['base_url']
|
||||
if self.requester_cfg.get('timeout'):
|
||||
args['timeout'] = self.requester_cfg['timeout']
|
||||
if include_retry_params:
|
||||
if self.requester_cfg.get('drop_params'):
|
||||
args['drop_params'] = self.requester_cfg['drop_params']
|
||||
if self.requester_cfg.get('num_retries'):
|
||||
args['num_retries'] = self.requester_cfg['num_retries']
|
||||
if self.requester_cfg.get('api_version'):
|
||||
args['api_version'] = self.requester_cfg['api_version']
|
||||
return args
|
||||
|
||||
def _handle_litellm_error(self, e: Exception) -> None:
|
||||
"""Convert LiteLLM exceptions to RequesterError. Never returns, always raises."""
|
||||
# Check more specific exceptions first (they inherit from base exceptions)
|
||||
if isinstance(e, litellm.ContextWindowExceededError):
|
||||
raise errors.RequesterError(f'上下文长度超限: {str(e)}')
|
||||
if isinstance(e, litellm.BadRequestError):
|
||||
raise errors.RequesterError(f'请求参数错误: {str(e)}')
|
||||
if isinstance(e, litellm.AuthenticationError):
|
||||
raise errors.RequesterError(f'API key 无效: {str(e)}')
|
||||
if isinstance(e, litellm.NotFoundError):
|
||||
raise errors.RequesterError(f'模型或路径无效: {str(e)}')
|
||||
if isinstance(e, litellm.RateLimitError):
|
||||
raise errors.RequesterError(f'请求过于频繁或余额不足: {str(e)}')
|
||||
if isinstance(e, litellm.Timeout):
|
||||
raise errors.RequesterError(f'请求超时: {str(e)}')
|
||||
if isinstance(e, litellm.APIConnectionError):
|
||||
raise errors.RequesterError(f'连接错误: {str(e)}')
|
||||
if isinstance(e, litellm.APIError):
|
||||
raise errors.RequesterError(f'API 错误: {str(e)}')
|
||||
raise errors.RequesterError(f'未知错误: {str(e)}')
|
||||
|
||||
async def _build_completion_args(
|
||||
self,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
stream: bool = False,
|
||||
) -> dict:
|
||||
"""Build common completion arguments for invoke_llm and invoke_llm_stream."""
|
||||
req_messages = self._convert_messages(messages)
|
||||
model_name = self._build_litellm_model_name(model.model_entity.name)
|
||||
api_key = model.provider.token_mgr.get_token()
|
||||
|
||||
args = {
|
||||
'model': model_name,
|
||||
'messages': req_messages,
|
||||
'api_key': api_key,
|
||||
}
|
||||
if stream:
|
||||
args['stream'] = True
|
||||
args['stream_options'] = {'include_usage': True}
|
||||
self._build_common_args(args)
|
||||
|
||||
# Apply model-level extra_args first, then call-level extra_args
|
||||
if model.model_entity.extra_args:
|
||||
args.update(model.model_entity.extra_args)
|
||||
args.update(extra_args)
|
||||
|
||||
if funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(funcs)
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
args.setdefault('tool_choice', 'auto')
|
||||
|
||||
return args
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> tuple[provider_message.Message, dict]:
|
||||
"""Invoke LLM and return message with usage info."""
|
||||
args = await self._build_completion_args(model, messages, funcs, extra_args, stream=False)
|
||||
|
||||
try:
|
||||
response = await acompletion(**args)
|
||||
|
||||
message_data = response.choices[0].message.model_dump()
|
||||
if 'role' not in message_data or message_data['role'] is None:
|
||||
message_data['role'] = 'assistant'
|
||||
|
||||
content = message_data.get('content', '')
|
||||
reasoning_content = message_data.get('reasoning_content', None)
|
||||
message_data['content'] = self._process_thinking_content(content, reasoning_content, remove_think)
|
||||
|
||||
if 'reasoning_content' in message_data:
|
||||
del message_data['reasoning_content']
|
||||
|
||||
message = provider_message.Message(**message_data)
|
||||
usage_info = self._extract_usage(response)
|
||||
|
||||
return message, usage_info
|
||||
|
||||
except Exception as e:
|
||||
self._handle_litellm_error(e)
|
||||
|
||||
async def invoke_llm_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.MessageChunk:
|
||||
"""Invoke LLM streaming and yield chunks."""
|
||||
args = await self._build_completion_args(model, messages, funcs, extra_args, stream=True)
|
||||
|
||||
chunk_idx = 0
|
||||
role = 'assistant'
|
||||
tool_call_state: dict[int, dict[str, str]] = {}
|
||||
|
||||
try:
|
||||
response = await acompletion(**args)
|
||||
async for chunk in response:
|
||||
# Capture usage whenever a chunk carries it.
|
||||
#
|
||||
# Important: many OpenAI-compatible gateways (e.g. new-api) and
|
||||
# providers send the final usage payload in a chunk that STILL
|
||||
# contains a (empty-delta) choice, not an empty `choices` list.
|
||||
# The previous implementation only captured usage when `choices`
|
||||
# was empty, so streamed calls always recorded 0 tokens.
|
||||
# We therefore capture usage independently of `choices`, and then
|
||||
# fall through to also process any content this chunk may carry.
|
||||
if getattr(chunk, 'usage', None):
|
||||
usage_info = self._normalize_usage(chunk.usage)
|
||||
if query is not None:
|
||||
if query.variables is None:
|
||||
query.variables = {}
|
||||
query.variables['_stream_usage'] = usage_info
|
||||
|
||||
if not hasattr(chunk, 'choices') or not chunk.choices:
|
||||
continue
|
||||
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
delta_content = delta.get('content', '')
|
||||
reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
# Handle reasoning_content based on remove_think flag
|
||||
if reasoning_content:
|
||||
if remove_think:
|
||||
# Skip reasoning content when remove_think is True
|
||||
chunk_idx += 1
|
||||
continue
|
||||
else:
|
||||
# Use reasoning_content as the displayed content
|
||||
delta_content = reasoning_content
|
||||
|
||||
tool_calls = self._normalize_stream_tool_calls(delta.get('tool_calls'), tool_call_state)
|
||||
|
||||
if chunk_idx == 0 and not delta_content and not tool_calls:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': tool_calls,
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
|
||||
except Exception as e:
|
||||
self._handle_litellm_error(e)
|
||||
|
||||
async def invoke_embedding(
|
||||
self,
|
||||
model: requester.RuntimeEmbeddingModel,
|
||||
input_text: list[str],
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
) -> tuple[list[list[float]], dict]:
|
||||
"""Invoke embedding and return vectors with usage info."""
|
||||
model_name = self._build_litellm_model_name(model.model_entity.name)
|
||||
api_key = model.provider.token_mgr.get_token()
|
||||
|
||||
args = {
|
||||
'model': model_name,
|
||||
'input': input_text,
|
||||
'api_key': api_key,
|
||||
}
|
||||
self._build_common_args(args, include_retry_params=False)
|
||||
|
||||
if model.model_entity.extra_args:
|
||||
args.update(model.model_entity.extra_args)
|
||||
|
||||
args.update(extra_args)
|
||||
|
||||
try:
|
||||
response = await aembedding(**args)
|
||||
|
||||
embeddings = [d.embedding for d in response.data]
|
||||
usage_info = self._extract_usage(response)
|
||||
|
||||
return embeddings, usage_info
|
||||
|
||||
except Exception as e:
|
||||
self._handle_litellm_error(e)
|
||||
|
||||
async def invoke_rerank(
|
||||
self,
|
||||
model: requester.RuntimeRerankModel,
|
||||
query: str,
|
||||
documents: typing.List[str],
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
) -> typing.List[dict]:
|
||||
"""Invoke rerank and return relevance scores."""
|
||||
model_name = self._build_litellm_model_name(model.model_entity.name)
|
||||
api_key = model.provider.token_mgr.get_token()
|
||||
|
||||
args = {
|
||||
'model': model_name,
|
||||
'query': query,
|
||||
'documents': documents,
|
||||
'api_key': api_key,
|
||||
'top_n': min(len(documents), 64),
|
||||
}
|
||||
self._build_common_args(args, include_retry_params=False)
|
||||
|
||||
if model.model_entity.extra_args:
|
||||
args.update(model.model_entity.extra_args)
|
||||
|
||||
args.update(extra_args)
|
||||
|
||||
try:
|
||||
response = await arerank(**args)
|
||||
|
||||
results = []
|
||||
for r in response.results:
|
||||
results.append(
|
||||
{
|
||||
'index': r.get('index', 0),
|
||||
'relevance_score': r.get('relevance_score', 0.0),
|
||||
}
|
||||
)
|
||||
|
||||
if results:
|
||||
scores = [r['relevance_score'] for r in results]
|
||||
min_score = min(scores)
|
||||
max_score = max(scores)
|
||||
if max_score - min_score > 1e-6:
|
||||
for r in results:
|
||||
r['relevance_score'] = (r['relevance_score'] - min_score) / (max_score - min_score)
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
self._handle_litellm_error(e)
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
"""Scan models supported by the provider."""
|
||||
import httpx
|
||||
|
||||
base_url = self.requester_cfg.get('base_url', '').rstrip('/')
|
||||
timeout = self.requester_cfg.get('timeout', 120)
|
||||
|
||||
if not base_url:
|
||||
raise errors.RequesterError('Base URL required for model scanning')
|
||||
|
||||
headers = {}
|
||||
if api_key:
|
||||
headers['Authorization'] = f'Bearer {api_key}'
|
||||
|
||||
models_url = f'{base_url}/models'
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(trust_env=True, timeout=timeout) as client:
|
||||
response = await client.get(models_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
|
||||
models = []
|
||||
for item in payload.get('data', []):
|
||||
model_id = item.get('id')
|
||||
if not model_id:
|
||||
continue
|
||||
|
||||
models.append(self._enrich_scanned_model(model_id))
|
||||
|
||||
models.sort(key=lambda x: (x['type'] != 'llm', x['name'].lower()))
|
||||
|
||||
return {'models': models}
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
raise errors.RequesterError(f'Model scan failed: {e.response.status_code}')
|
||||
except httpx.TimeoutException:
|
||||
raise errors.RequesterError('Model scan timeout')
|
||||
except Exception as e:
|
||||
raise errors.RequesterError(f'Model scan error: {str(e)}')
|
||||
@@ -1,64 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: litellm-chat
|
||||
label:
|
||||
en_US: LiteLLM (Unified)
|
||||
zh_Hans: LiteLLM (统一请求器)
|
||||
icon: litellm.svg
|
||||
spec:
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: false
|
||||
default: ''
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
- name: custom_llm_provider
|
||||
label:
|
||||
en_US: Custom Provider
|
||||
zh_Hans: 自定义 Provider
|
||||
type: string
|
||||
required: false
|
||||
default: ''
|
||||
description:
|
||||
en_US: Force provider type (e.g., anthropic, openai, gemini)
|
||||
zh_Hans: 强制指定 provider 类型(如 anthropic, openai, gemini)
|
||||
- name: drop_params
|
||||
label:
|
||||
en_US: Drop Unsupported Params
|
||||
zh_Hans: 丢弃不支持参数
|
||||
type: boolean
|
||||
required: false
|
||||
default: false
|
||||
- name: num_retries
|
||||
label:
|
||||
en_US: Number of Retries
|
||||
zh_Hans: 重试次数
|
||||
type: integer
|
||||
required: false
|
||||
default: 0
|
||||
- name: api_version
|
||||
label:
|
||||
en_US: API Version
|
||||
zh_Hans: API 版本
|
||||
type: string
|
||||
required: false
|
||||
default: ''
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: unified
|
||||
execution:
|
||||
python:
|
||||
path: ./litellmchat.py
|
||||
attr: LiteLLMRequester
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class LmStudioChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""LMStudio ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'http://127.0.0.1:1234/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: LM Studio
|
||||
icon: lmstudio.webp
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#FF6700"/>
|
||||
<text x="30" y="32" font-family="Arial, sans-serif" font-size="18" font-weight="bold" fill="white" text-anchor="middle">MiMo</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 280 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: mimo-chat-completions
|
||||
label:
|
||||
en_US: Xiaomi MiMo
|
||||
zh_Hans: 小米 MiMo
|
||||
icon: mimo.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://api.xiaomimimo.com/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -1,4 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#4F46E5"/>
|
||||
<text x="30" y="32" font-family="Arial, sans-serif" font-size="12" font-weight="bold" fill="white" text-anchor="middle">MiniMax</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 283 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: minimax-chat-completions
|
||||
label:
|
||||
en_US: MiniMax
|
||||
zh_Hans: MiniMax
|
||||
icon: minimax.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://api.minimax.chat/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#FF6B35"/>
|
||||
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">Mistral</text>
|
||||
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">AI</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 395 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: mistral-chat-completions
|
||||
label:
|
||||
en_US: Mistral AI
|
||||
zh_Hans: Mistral AI
|
||||
icon: mistral.svg
|
||||
spec:
|
||||
litellm_provider: mistral
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://api.mistral.ai/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -0,0 +1,561 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import typing
|
||||
|
||||
import openai
|
||||
import openai.types.chat.chat_completion as chat_completion
|
||||
import httpx
|
||||
|
||||
from .. import entities, errors, requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class ModelScopeChatCompletions(requester.ProviderAPIRequester):
|
||||
"""ModelScope ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api-inference.modelscope.cn/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def initialize(self):
|
||||
self.client = openai.AsyncClient(
|
||||
api_key=self.init_api_key,
|
||||
base_url=self.requester_cfg['base_url'],
|
||||
timeout=self.requester_cfg['timeout'],
|
||||
http_client=httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']),
|
||||
)
|
||||
|
||||
def _mask_api_key(self, api_key: str | None) -> str:
|
||||
if not api_key:
|
||||
return ''
|
||||
if len(api_key) <= 8:
|
||||
return '****'
|
||||
return f'{api_key[:4]}...{api_key[-4:]}'
|
||||
|
||||
def _infer_model_type(self, model_id: str) -> str:
|
||||
normalized_model_id = (model_id or '').lower()
|
||||
embedding_keywords = (
|
||||
'embedding',
|
||||
'embed',
|
||||
'bge-',
|
||||
'e5-',
|
||||
'm3e',
|
||||
'gte-',
|
||||
'multilingual-e5',
|
||||
'text-embedding',
|
||||
)
|
||||
return 'embedding' if any(keyword in normalized_model_id for keyword in embedding_keywords) else 'llm'
|
||||
|
||||
def _infer_model_abilities(self, item: dict[str, typing.Any], model_id: str) -> list[str]:
|
||||
normalized_model_id = (model_id or '').lower()
|
||||
abilities: set[str] = set()
|
||||
|
||||
def _flatten(value: typing.Any) -> list[str]:
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, str):
|
||||
return [value.lower()]
|
||||
if isinstance(value, dict):
|
||||
flattened: list[str] = []
|
||||
for nested_value in value.values():
|
||||
flattened.extend(_flatten(nested_value))
|
||||
return flattened
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
flattened: list[str] = []
|
||||
for nested_value in value:
|
||||
flattened.extend(_flatten(nested_value))
|
||||
return flattened
|
||||
return [str(value).lower()]
|
||||
|
||||
capability_tokens = _flatten(item.get('capabilities'))
|
||||
capability_tokens.extend(_flatten(item.get('modalities')))
|
||||
capability_tokens.extend(_flatten(item.get('input_modalities')))
|
||||
capability_tokens.extend(_flatten(item.get('output_modalities')))
|
||||
capability_tokens.extend(_flatten(item.get('supported_generation_methods')))
|
||||
capability_tokens.extend(_flatten(item.get('supported_parameters')))
|
||||
capability_tokens.extend(_flatten(item.get('architecture')))
|
||||
|
||||
combined_tokens = capability_tokens + [normalized_model_id]
|
||||
|
||||
vision_keywords = ('vision', 'image', 'file', 'video', 'multimodal', 'vl', 'ocr', 'omni')
|
||||
function_call_keywords = ('function', 'tool', 'tools', 'tool_choice', 'tool_call', 'tool-use', 'tool_use')
|
||||
|
||||
if any(any(keyword in token for keyword in vision_keywords) for token in combined_tokens):
|
||||
abilities.add('vision')
|
||||
|
||||
if any(any(keyword in token for keyword in function_call_keywords) for token in combined_tokens):
|
||||
abilities.add('func_call')
|
||||
|
||||
return sorted(abilities)
|
||||
|
||||
def _normalize_modalities(self, value: typing.Any) -> list[str]:
|
||||
normalized: list[str] = []
|
||||
|
||||
def _collect(item: typing.Any):
|
||||
if item is None:
|
||||
return
|
||||
if isinstance(item, str):
|
||||
for part in item.replace('->', ',').replace('+', ',').split(','):
|
||||
token = part.strip().lower()
|
||||
if token and token not in normalized:
|
||||
normalized.append(token)
|
||||
return
|
||||
if isinstance(item, dict):
|
||||
for nested in item.values():
|
||||
_collect(nested)
|
||||
return
|
||||
if isinstance(item, (list, tuple, set)):
|
||||
for nested in item:
|
||||
_collect(nested)
|
||||
return
|
||||
|
||||
_collect(value)
|
||||
return normalized
|
||||
|
||||
def _extract_scan_metadata(self, item: dict[str, typing.Any], model_id: str) -> dict[str, typing.Any]:
|
||||
display_name = item.get('name')
|
||||
if not isinstance(display_name, str) or not display_name.strip() or display_name == model_id:
|
||||
display_name = ''
|
||||
|
||||
description = item.get('description')
|
||||
if not isinstance(description, str) or not description.strip():
|
||||
description = ''
|
||||
|
||||
context_length = item.get('context_length')
|
||||
if context_length is None and isinstance(item.get('top_provider'), dict):
|
||||
context_length = item['top_provider'].get('context_length')
|
||||
|
||||
if not isinstance(context_length, int):
|
||||
try:
|
||||
context_length = int(context_length) if context_length is not None else None
|
||||
except (TypeError, ValueError):
|
||||
context_length = None
|
||||
|
||||
input_modalities = self._normalize_modalities(item.get('input_modalities'))
|
||||
output_modalities = self._normalize_modalities(item.get('output_modalities'))
|
||||
|
||||
if isinstance(item.get('architecture'), dict):
|
||||
if not input_modalities:
|
||||
input_modalities = self._normalize_modalities(item['architecture'].get('input_modalities'))
|
||||
if not output_modalities:
|
||||
output_modalities = self._normalize_modalities(item['architecture'].get('output_modalities'))
|
||||
|
||||
owned_by = item.get('owned_by')
|
||||
if not isinstance(owned_by, str) or not owned_by.strip():
|
||||
owned_by = ''
|
||||
|
||||
return {
|
||||
'display_name': display_name or None,
|
||||
'description': description or None,
|
||||
'context_length': context_length,
|
||||
'owned_by': owned_by or None,
|
||||
'input_modalities': input_modalities,
|
||||
'output_modalities': output_modalities,
|
||||
}
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
headers = {}
|
||||
if api_key:
|
||||
headers['Authorization'] = f'Bearer {api_key}'
|
||||
|
||||
models_url = f'{self.requester_cfg["base_url"].rstrip("/")}/models'
|
||||
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
|
||||
response = await client.get(models_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
|
||||
models = []
|
||||
for item in payload.get('data', []):
|
||||
model_id = item.get('id')
|
||||
if not model_id:
|
||||
continue
|
||||
models.append(
|
||||
{
|
||||
'id': model_id,
|
||||
'name': model_id,
|
||||
'type': self._infer_model_type(model_id),
|
||||
'abilities': self._infer_model_abilities(item, model_id),
|
||||
**self._extract_scan_metadata(item, model_id),
|
||||
}
|
||||
)
|
||||
|
||||
models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
|
||||
return {
|
||||
'models': models,
|
||||
'debug': {
|
||||
'request': {
|
||||
'method': 'GET',
|
||||
'url': models_url,
|
||||
'headers': {
|
||||
'Authorization': f'Bearer {self._mask_api_key(api_key)}' if api_key else '',
|
||||
},
|
||||
},
|
||||
'response': payload,
|
||||
},
|
||||
}
|
||||
|
||||
async def _req(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
args: dict,
|
||||
extra_body: dict = {},
|
||||
remove_think: bool = False,
|
||||
) -> list[dict[str, typing.Any]]:
|
||||
args['stream'] = True
|
||||
|
||||
chunk = None
|
||||
|
||||
pending_content = ''
|
||||
|
||||
tool_calls = []
|
||||
|
||||
resp_gen: openai.AsyncStream = await self.client.chat.completions.create(**args, extra_body=extra_body)
|
||||
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
tool_id = ''
|
||||
tool_name = ''
|
||||
message_delta = {}
|
||||
async for chunk in resp_gen:
|
||||
if not chunk or not chunk.id or not chunk.choices or not chunk.choices[0] or not chunk.choices[0].delta:
|
||||
continue
|
||||
|
||||
delta = chunk.choices[0].delta.model_dump() if hasattr(chunk.choices[0], 'delta') else {}
|
||||
reasoning_content = delta.get('reasoning_content')
|
||||
# 处理 reasoning_content
|
||||
if reasoning_content:
|
||||
# accumulated_reasoning += reasoning_content
|
||||
# 如果设置了 remove_think,跳过 reasoning_content
|
||||
if remove_think:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 第一次出现 reasoning_content,添加 <think> 开始标签
|
||||
if not thinking_started:
|
||||
thinking_started = True
|
||||
pending_content += '<think>\n' + reasoning_content
|
||||
else:
|
||||
# 继续输出 reasoning_content
|
||||
pending_content += reasoning_content
|
||||
elif thinking_started and not thinking_ended and delta.get('content'):
|
||||
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
|
||||
thinking_ended = True
|
||||
pending_content += '\n</think>\n' + delta.get('content')
|
||||
|
||||
if delta.get('content') is not None:
|
||||
pending_content += delta.get('content')
|
||||
|
||||
if delta.get('tool_calls') is not None:
|
||||
for tool_call in delta.get('tool_calls'):
|
||||
if tool_call['id'] != '':
|
||||
tool_id = tool_call['id']
|
||||
if tool_call['function']['name'] is not None:
|
||||
tool_name = tool_call['function']['name']
|
||||
if tool_call['function']['arguments'] is None:
|
||||
continue
|
||||
tool_call['id'] = tool_id
|
||||
tool_call['name'] = tool_name
|
||||
for tc in tool_calls:
|
||||
if tc['index'] == tool_call['index']:
|
||||
tc['function']['arguments'] += tool_call['function']['arguments']
|
||||
break
|
||||
else:
|
||||
tool_calls.append(tool_call)
|
||||
|
||||
if chunk.choices[0].finish_reason is not None:
|
||||
break
|
||||
message_delta['content'] = pending_content
|
||||
message_delta['role'] = 'assistant'
|
||||
|
||||
message_delta['tool_calls'] = tool_calls if tool_calls else None
|
||||
return [message_delta]
|
||||
|
||||
async def _make_msg(
|
||||
self,
|
||||
chat_completion: list[dict[str, typing.Any]],
|
||||
) -> provider_message.Message:
|
||||
chatcmpl_message = chat_completion[0]
|
||||
|
||||
# 确保 role 字段存在且不为 None
|
||||
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
|
||||
chatcmpl_message['role'] = 'assistant'
|
||||
|
||||
message = provider_message.Message(**chatcmpl_message)
|
||||
|
||||
return message
|
||||
|
||||
async def _closure(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> tuple[provider_message.Message, dict]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
|
||||
# 发送请求
|
||||
resp = await self._req(query, args, extra_body=extra_args, remove_think=remove_think)
|
||||
|
||||
# 处理请求结果
|
||||
message = await self._make_msg(resp)
|
||||
|
||||
# ModelScope uses streaming, usage info not available
|
||||
usage_info = {}
|
||||
|
||||
return message, usage_info
|
||||
|
||||
async def _req_stream(
|
||||
self,
|
||||
args: dict,
|
||||
extra_body: dict = {},
|
||||
) -> chat_completion.ChatCompletion:
|
||||
async for chunk in await self.client.chat.completions.create(**args, extra_body=extra_body):
|
||||
yield chunk
|
||||
|
||||
async def _closure_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
args['stream'] = True
|
||||
|
||||
# 流式处理状态
|
||||
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant' # 默认角色
|
||||
# accumulated_reasoning = '' # 仅用于判断何时结束思维链
|
||||
|
||||
async for chunk in self._req_stream(args, extra_body=extra_args):
|
||||
# 解析 chunk 数据
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
else:
|
||||
delta = {}
|
||||
finish_reason = None
|
||||
|
||||
# 从第一个 chunk 获取 role,后续使用这个 role
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
# 获取增量内容
|
||||
delta_content = delta.get('content', '')
|
||||
reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
# 处理 reasoning_content
|
||||
if reasoning_content:
|
||||
# accumulated_reasoning += reasoning_content
|
||||
# 如果设置了 remove_think,跳过 reasoning_content
|
||||
if remove_think:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 第一次出现 reasoning_content,添加 <think> 开始标签
|
||||
if not thinking_started:
|
||||
thinking_started = True
|
||||
delta_content = '<think>\n' + reasoning_content
|
||||
else:
|
||||
# 继续输出 reasoning_content
|
||||
delta_content = reasoning_content
|
||||
elif thinking_started and not thinking_ended and delta_content:
|
||||
# reasoning_content 结束,normal content 开始,添加 </think> 结束标签
|
||||
thinking_ended = True
|
||||
delta_content = '\n</think>\n' + delta_content
|
||||
|
||||
# 处理 content 中已有的 <think> 标签(如果需要移除)
|
||||
# if delta_content and remove_think and '<think>' in delta_content:
|
||||
# import re
|
||||
#
|
||||
# # 移除 <think> 标签及其内容
|
||||
# delta_content = re.sub(r'<think>.*?</think>', '', delta_content, flags=re.DOTALL)
|
||||
|
||||
# 处理工具调用增量
|
||||
if delta.get('tool_calls'):
|
||||
for tool_call in delta['tool_calls']:
|
||||
if tool_call['id'] != '':
|
||||
tool_id = tool_call['id']
|
||||
if tool_call['function']['name'] is not None:
|
||||
tool_name = tool_call['function']['name']
|
||||
|
||||
if tool_call['type'] is None:
|
||||
tool_call['type'] = 'function'
|
||||
tool_call['id'] = tool_id
|
||||
tool_call['function']['name'] = tool_name
|
||||
tool_call['function']['arguments'] = (
|
||||
'' if tool_call['function']['arguments'] is None else tool_call['function']['arguments']
|
||||
)
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'):
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': delta.get('tool_calls'),
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
# return
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: entities.LLMModelInfo,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message:
|
||||
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
|
||||
for m in messages:
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
content = msg_dict.get('content')
|
||||
if isinstance(content, list):
|
||||
# 检查 content 列表中是否每个部分都是文本
|
||||
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
|
||||
# 将所有文本部分合并为一个字符串
|
||||
msg_dict['content'] = '\n'.join(part['text'] for part in content)
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
try:
|
||||
return await self._closure(
|
||||
query=query,
|
||||
req_messages=req_messages,
|
||||
use_model=model,
|
||||
use_funcs=funcs,
|
||||
extra_args=extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
except openai.BadRequestError as e:
|
||||
if 'context_length_exceeded' in e.message:
|
||||
raise errors.RequesterError(f'上文过长,请重置会话: {e.message}')
|
||||
else:
|
||||
raise errors.RequesterError(f'请求参数错误: {e.message}')
|
||||
except openai.AuthenticationError as e:
|
||||
raise errors.RequesterError(f'无效的 api-key: {e.message}')
|
||||
except openai.NotFoundError as e:
|
||||
raise errors.RequesterError(f'请求路径错误: {e.message}')
|
||||
except openai.RateLimitError as e:
|
||||
raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}')
|
||||
except openai.APIError as e:
|
||||
raise errors.RequesterError(f'请求错误: {e.message}')
|
||||
|
||||
async def invoke_llm_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.MessageChunk:
|
||||
req_messages = [] # req_messages 仅用于类内,外部同步由 query.messages 进行
|
||||
for m in messages:
|
||||
msg_dict = m.dict(exclude_none=True)
|
||||
content = msg_dict.get('content')
|
||||
if isinstance(content, list):
|
||||
# 检查 content 列表中是否每个部分都是文本
|
||||
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
|
||||
# 将所有文本部分合并为一个字符串
|
||||
msg_dict['content'] = '\n'.join(part['text'] for part in content)
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
try:
|
||||
async for item in self._closure_stream(
|
||||
query=query,
|
||||
req_messages=req_messages,
|
||||
use_model=model,
|
||||
use_funcs=funcs,
|
||||
extra_args=extra_args,
|
||||
remove_think=remove_think,
|
||||
):
|
||||
yield item
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
except openai.BadRequestError as e:
|
||||
if 'context_length_exceeded' in e.message:
|
||||
raise errors.RequesterError(f'上文过长,请重置会话: {e.message}')
|
||||
else:
|
||||
raise errors.RequesterError(f'请求参数错误: {e.message}')
|
||||
except openai.AuthenticationError as e:
|
||||
raise errors.RequesterError(f'无效的 api-key: {e.message}')
|
||||
except openai.NotFoundError as e:
|
||||
raise errors.RequesterError(f'请求路径错误: {e.message}')
|
||||
except openai.RateLimitError as e:
|
||||
raise errors.RequesterError(f'请求过于频繁或余额不足: {e.message}')
|
||||
except openai.APIError as e:
|
||||
raise errors.RequesterError(f'请求错误: {e.message}')
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 魔搭社区
|
||||
icon: modelscope.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -32,8 +31,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: maas
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
|
||||
from . import chatcmpl
|
||||
from .. import requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
|
||||
class MoonshotChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""Moonshot ChatCompletion API 请求器"""
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.moonshot.cn/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def _closure(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> tuple[provider_message.Message, dict]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages
|
||||
|
||||
# deepseek 不支持多模态,把content都转换成纯文字
|
||||
for m in messages:
|
||||
if 'content' in m and isinstance(m['content'], list):
|
||||
m['content'] = ' '.join([c['text'] for c in m['content']])
|
||||
|
||||
# 删除空的,不知道干嘛的,直接删了。
|
||||
# messages = [m for m in messages if m["content"].strip() != "" and ('tool_calls' not in m or not m['tool_calls'])]
|
||||
|
||||
args['messages'] = messages
|
||||
|
||||
# 发送请求
|
||||
resp = await self._req(args, extra_body=extra_args)
|
||||
|
||||
# 处理请求结果
|
||||
message = await self._make_msg(resp, remove_think)
|
||||
|
||||
# Extract token usage from response
|
||||
usage_info = {}
|
||||
if hasattr(resp, 'usage') and resp.usage:
|
||||
usage_info['input_tokens'] = resp.usage.prompt_tokens or 0
|
||||
usage_info['output_tokens'] = resp.usage.completion_tokens or 0
|
||||
usage_info['total_tokens'] = resp.usage.total_tokens or 0
|
||||
|
||||
return message, usage_info
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 月之暗面
|
||||
icon: moonshot.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class NewAPIChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""New API ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'http://localhost:3000/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: New API
|
||||
icon: newapi.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
314
src/langbot/pkg/provider/modelmgr/requesters/ollamachat.py
Normal file
@@ -0,0 +1,314 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import typing
|
||||
from typing import Union, Mapping, Any, AsyncIterator
|
||||
import uuid
|
||||
import json
|
||||
|
||||
import ollama
|
||||
import httpx
|
||||
|
||||
from .. import errors, requester
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
REQUESTER_NAME: str = 'ollama-chat'
|
||||
|
||||
|
||||
class OllamaChatCompletions(requester.ProviderAPIRequester):
|
||||
"""Ollama平台 ChatCompletion API请求器"""
|
||||
|
||||
client: ollama.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'http://127.0.0.1:11434',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def initialize(self):
|
||||
os.environ['OLLAMA_HOST'] = self.requester_cfg['base_url']
|
||||
self.client = ollama.AsyncClient(timeout=self.requester_cfg['timeout'])
|
||||
|
||||
def _infer_model_type(self, model_id: str) -> str:
|
||||
normalized_model_id = (model_id or '').lower()
|
||||
embedding_keywords = ('embedding', 'embed', 'bge-', 'e5-', 'm3e', 'gte-', 'text-embedding')
|
||||
return 'embedding' if any(keyword in normalized_model_id for keyword in embedding_keywords) else 'llm'
|
||||
|
||||
def _infer_model_abilities(self, item: dict[str, typing.Any], model_id: str) -> list[str]:
|
||||
normalized_model_id = (model_id or '').lower()
|
||||
abilities: set[str] = set()
|
||||
details = item.get('details', {}) or {}
|
||||
families = details.get('families', []) or []
|
||||
tokens = [normalized_model_id, str(details.get('family', '')).lower()]
|
||||
tokens.extend(str(family).lower() for family in families)
|
||||
|
||||
if any(keyword in token for token in tokens for keyword in ('vision', 'vl', 'omni', 'llava', 'ocr')):
|
||||
abilities.add('vision')
|
||||
if any(keyword in token for token in tokens for keyword in ('tool', 'function')):
|
||||
abilities.add('func_call')
|
||||
return sorted(abilities)
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
del api_key
|
||||
models_url = f'{self.requester_cfg["base_url"].rstrip("/")}/api/tags'
|
||||
|
||||
async with httpx.AsyncClient(trust_env=True, timeout=self.requester_cfg['timeout']) as client:
|
||||
response = await client.get(models_url)
|
||||
response.raise_for_status()
|
||||
payload = response.json()
|
||||
|
||||
models: list[dict[str, typing.Any]] = []
|
||||
for item in payload.get('models', []):
|
||||
model_id = item.get('model') or item.get('name')
|
||||
if not model_id:
|
||||
continue
|
||||
models.append(
|
||||
{
|
||||
'id': model_id,
|
||||
'name': item.get('name', model_id),
|
||||
'type': self._infer_model_type(model_id),
|
||||
'abilities': self._infer_model_abilities(item, model_id),
|
||||
}
|
||||
)
|
||||
|
||||
models.sort(key=lambda item: (item['type'] != 'llm', item['name'].lower()))
|
||||
return {
|
||||
'models': models,
|
||||
'debug': {
|
||||
'request': {
|
||||
'method': 'GET',
|
||||
'url': models_url,
|
||||
},
|
||||
'response': payload,
|
||||
},
|
||||
}
|
||||
|
||||
async def _req(
|
||||
self,
|
||||
args: dict,
|
||||
) -> Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]:
|
||||
return await self.client.chat(**args)
|
||||
|
||||
async def _closure(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message:
|
||||
args = extra_args.copy()
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
messages: list[dict] = req_messages.copy()
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
text_content: list = []
|
||||
image_urls: list = []
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'text':
|
||||
text_content.append(me['text'])
|
||||
elif me['type'] == 'image_base64':
|
||||
image_urls.append(me['image_base64'])
|
||||
|
||||
msg['content'] = '\n'.join(text_content)
|
||||
msg['images'] = [url.split(',')[1] for url in image_urls]
|
||||
if 'tool_calls' in msg: # LangBot 内部以 str 存储 tool_calls 的参数,这里需要转换为 dict
|
||||
for tool_call in msg['tool_calls']:
|
||||
tool_call['function']['arguments'] = json.loads(tool_call['function']['arguments'])
|
||||
args['messages'] = messages
|
||||
|
||||
args['tools'] = []
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
resp = await self._req(args)
|
||||
message: provider_message.Message = await self._make_msg(resp)
|
||||
return message
|
||||
|
||||
async def _make_msg(self, chat_completions: ollama.ChatResponse) -> provider_message.Message:
|
||||
message: ollama.Message = chat_completions.message
|
||||
if message is None:
|
||||
raise ValueError("chat_completions must contain a 'message' field")
|
||||
|
||||
ret_msg: provider_message.Message = None
|
||||
|
||||
if message.content is not None:
|
||||
ret_msg = provider_message.Message(role='assistant', content=message.content)
|
||||
if message.tool_calls is not None and len(message.tool_calls) > 0:
|
||||
tool_calls: list[provider_message.ToolCall] = []
|
||||
|
||||
for tool_call in message.tool_calls:
|
||||
tool_calls.append(
|
||||
provider_message.ToolCall(
|
||||
id=uuid.uuid4().hex,
|
||||
type='function',
|
||||
function=provider_message.FunctionCall(
|
||||
name=tool_call.function.name,
|
||||
arguments=json.dumps(tool_call.function.arguments),
|
||||
),
|
||||
)
|
||||
)
|
||||
ret_msg.tool_calls = tool_calls
|
||||
|
||||
return ret_msg
|
||||
|
||||
async def _prepare_messages(
|
||||
self,
|
||||
messages: typing.List[provider_message.Message],
|
||||
) -> list[dict]:
|
||||
"""Prepare messages for Ollama API request."""
|
||||
req_messages: list = []
|
||||
for m in messages:
|
||||
msg_dict: dict = m.dict(exclude_none=True)
|
||||
content: Any = msg_dict.get('content')
|
||||
if isinstance(content, list):
|
||||
if all(isinstance(part, dict) and part.get('type') == 'text' for part in content):
|
||||
msg_dict['content'] = '\n'.join(part['text'] for part in content)
|
||||
req_messages.append(msg_dict)
|
||||
return req_messages
|
||||
|
||||
async def invoke_llm(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message:
|
||||
req_messages = await self._prepare_messages(messages)
|
||||
try:
|
||||
return await self._closure(
|
||||
query=query,
|
||||
req_messages=req_messages,
|
||||
use_model=model,
|
||||
use_funcs=funcs,
|
||||
extra_args=extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
|
||||
async def invoke_llm_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
model: requester.RuntimeLLMModel,
|
||||
messages: typing.List[provider_message.Message],
|
||||
funcs: typing.List[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.MessageChunk:
|
||||
req_messages = await self._prepare_messages(messages)
|
||||
|
||||
try:
|
||||
args = extra_args.copy()
|
||||
args['model'] = model.model_entity.name
|
||||
|
||||
# Process messages for Ollama format
|
||||
msgs: list[dict] = req_messages.copy()
|
||||
for msg in msgs:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
text_content: list = []
|
||||
image_urls: list = []
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'text':
|
||||
text_content.append(me['text'])
|
||||
elif me['type'] == 'image_base64':
|
||||
image_urls.append(me['image_base64'])
|
||||
msg['content'] = '\n'.join(text_content)
|
||||
msg['images'] = [url.split(',')[1] for url in image_urls]
|
||||
if 'tool_calls' in msg:
|
||||
for tool_call in msg['tool_calls']:
|
||||
tool_call['function']['arguments'] = json.loads(tool_call['function']['arguments'])
|
||||
args['messages'] = msgs
|
||||
|
||||
args['tools'] = []
|
||||
if funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(funcs)
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
args['stream'] = True
|
||||
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant'
|
||||
|
||||
async for chunk in await self.client.chat(**args):
|
||||
message: ollama.Message = chunk.message
|
||||
done = chunk.done
|
||||
|
||||
delta_content = message.content or ''
|
||||
reasoning_content = getattr(message, 'thinking', '') or ''
|
||||
|
||||
# Handle reasoning/thinking content
|
||||
if reasoning_content:
|
||||
if remove_think:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
if not thinking_started:
|
||||
thinking_started = True
|
||||
delta_content = '<think>\n' + reasoning_content
|
||||
else:
|
||||
delta_content = reasoning_content
|
||||
elif thinking_started and not thinking_ended and delta_content:
|
||||
thinking_ended = True
|
||||
delta_content = '\n</think>\n' + delta_content
|
||||
|
||||
# Handle tool calls
|
||||
tool_calls_data = None
|
||||
if message.tool_calls:
|
||||
tool_calls_data = []
|
||||
for tc in message.tool_calls:
|
||||
tool_calls_data.append(
|
||||
{
|
||||
'id': uuid.uuid4().hex,
|
||||
'type': 'function',
|
||||
'function': {
|
||||
'name': tc.function.name,
|
||||
'arguments': json.dumps(tc.function.arguments),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
# Skip empty first chunk
|
||||
if chunk_idx == 0 and not delta_content and not reasoning_content and not tool_calls_data:
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': tool_calls_data,
|
||||
'is_final': bool(done),
|
||||
}
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise errors.RequesterError('请求超时')
|
||||
|
||||
async def invoke_embedding(
|
||||
self,
|
||||
model: requester.RuntimeEmbeddingModel,
|
||||
input_text: list[str],
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
) -> list[list[float]]:
|
||||
return (
|
||||
await self.client.embed(
|
||||
model=model.model_entity.name,
|
||||
input=input_text,
|
||||
**extra_args,
|
||||
)
|
||||
).embeddings
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Ollama
|
||||
icon: ollama.svg
|
||||
spec:
|
||||
litellm_provider: ollama
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import modelscopechatcmpl
|
||||
|
||||
|
||||
class OpenRouterChatCompletions(modelscopechatcmpl.ModelScopeChatCompletions):
|
||||
"""OpenRouter ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://openrouter.ai/api/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
original_base_url = self.requester_cfg.get('base_url', '')
|
||||
self.requester_cfg['base_url'] = 'https://openrouter.ai/api/v1'
|
||||
try:
|
||||
return await super().scan_models(api_key)
|
||||
finally:
|
||||
self.requester_cfg['base_url'] = original_base_url
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: OpenRouter
|
||||
icon: openrouter.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
208
src/langbot/pkg/provider/modelmgr/requesters/ppiochatcmpl.py
Normal file
@@ -0,0 +1,208 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import openai
|
||||
import typing
|
||||
|
||||
from . import chatcmpl
|
||||
from .. import requester
|
||||
import openai.types.chat.chat_completion as chat_completion
|
||||
import re
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
||||
|
||||
|
||||
class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""欧派云 ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.ppinfra.com/v3/openai',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
is_think: bool = False
|
||||
|
||||
async def _make_msg(
|
||||
self,
|
||||
chat_completion: chat_completion.ChatCompletion,
|
||||
remove_think: bool,
|
||||
) -> provider_message.Message:
|
||||
chatcmpl_message = chat_completion.choices[0].message.model_dump()
|
||||
# print(chatcmpl_message.keys(), chatcmpl_message.values())
|
||||
|
||||
# 确保 role 字段存在且不为 None
|
||||
if 'role' not in chatcmpl_message or chatcmpl_message['role'] is None:
|
||||
chatcmpl_message['role'] = 'assistant'
|
||||
|
||||
reasoning_content = chatcmpl_message['reasoning_content'] if 'reasoning_content' in chatcmpl_message else None
|
||||
|
||||
# deepseek的reasoner模型
|
||||
chatcmpl_message['content'] = await self._process_thinking_content(
|
||||
chatcmpl_message['content'], reasoning_content, remove_think
|
||||
)
|
||||
|
||||
# 移除 reasoning_content 字段,避免传递给 Message
|
||||
if 'reasoning_content' in chatcmpl_message:
|
||||
del chatcmpl_message['reasoning_content']
|
||||
|
||||
message = provider_message.Message(**chatcmpl_message)
|
||||
|
||||
return message
|
||||
|
||||
async def _process_thinking_content(
|
||||
self,
|
||||
content: str,
|
||||
reasoning_content: str = None,
|
||||
remove_think: bool = False,
|
||||
) -> tuple[str, str]:
|
||||
"""处理思维链内容
|
||||
|
||||
Args:
|
||||
content: 原始内容
|
||||
reasoning_content: reasoning_content 字段内容
|
||||
remove_think: 是否移除思维链
|
||||
|
||||
Returns:
|
||||
处理后的内容
|
||||
"""
|
||||
if remove_think:
|
||||
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
|
||||
else:
|
||||
if reasoning_content is not None:
|
||||
content = '<think>\n' + reasoning_content + '\n</think>\n' + content
|
||||
return content
|
||||
|
||||
async def _make_msg_chunk(
|
||||
self,
|
||||
delta: dict[str, typing.Any],
|
||||
idx: int,
|
||||
) -> provider_message.MessageChunk:
|
||||
# 处理流式chunk和完整响应的差异
|
||||
# print(chat_completion.choices[0])
|
||||
|
||||
# 确保 role 字段存在且不为 None
|
||||
if 'role' not in delta or delta['role'] is None:
|
||||
delta['role'] = 'assistant'
|
||||
|
||||
reasoning_content = delta['reasoning_content'] if 'reasoning_content' in delta else None
|
||||
|
||||
delta['content'] = '' if delta['content'] is None else delta['content']
|
||||
# print(reasoning_content)
|
||||
|
||||
# deepseek的reasoner模型
|
||||
|
||||
if reasoning_content is not None:
|
||||
delta['content'] += reasoning_content
|
||||
|
||||
message = provider_message.MessageChunk(**delta)
|
||||
|
||||
return message
|
||||
|
||||
async def _closure_stream(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
req_messages: list[dict],
|
||||
use_model: requester.RuntimeLLMModel,
|
||||
use_funcs: list[resource_tool.LLMTool] = None,
|
||||
extra_args: dict[str, typing.Any] = {},
|
||||
remove_think: bool = False,
|
||||
) -> provider_message.Message | typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
self.client.api_key = use_model.provider.token_mgr.get_token()
|
||||
|
||||
args = {}
|
||||
args['model'] = use_model.model_entity.name
|
||||
|
||||
if use_funcs:
|
||||
tools = await self.ap.tool_mgr.generate_tools_for_openai(use_funcs)
|
||||
|
||||
if tools:
|
||||
args['tools'] = tools
|
||||
|
||||
# 设置此次请求中的messages
|
||||
messages = req_messages.copy()
|
||||
|
||||
# 检查vision
|
||||
for msg in messages:
|
||||
if 'content' in msg and isinstance(msg['content'], list):
|
||||
for me in msg['content']:
|
||||
if me['type'] == 'image_base64':
|
||||
me['image_url'] = {'url': me['image_base64']}
|
||||
me['type'] = 'image_url'
|
||||
del me['image_base64']
|
||||
|
||||
args['messages'] = messages
|
||||
args['stream'] = True
|
||||
|
||||
# tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
chunk_idx = 0
|
||||
thinking_started = False
|
||||
thinking_ended = False
|
||||
role = 'assistant' # 默认角色
|
||||
async for chunk in self._req_stream(args, extra_body=extra_args):
|
||||
# 解析 chunk 数据
|
||||
if hasattr(chunk, 'choices') and chunk.choices:
|
||||
choice = chunk.choices[0]
|
||||
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {}
|
||||
finish_reason = getattr(choice, 'finish_reason', None)
|
||||
else:
|
||||
delta = {}
|
||||
finish_reason = None
|
||||
|
||||
# 从第一个 chunk 获取 role,后续使用这个 role
|
||||
if 'role' in delta and delta['role']:
|
||||
role = delta['role']
|
||||
|
||||
# 获取增量内容
|
||||
delta_content = delta.get('content', '')
|
||||
# reasoning_content = delta.get('reasoning_content', '')
|
||||
|
||||
if remove_think:
|
||||
if delta['content'] is not None:
|
||||
if '<think>' in delta['content'] and not thinking_started and not thinking_ended:
|
||||
thinking_started = True
|
||||
continue
|
||||
elif delta['content'] == r'</think>' and not thinking_ended:
|
||||
thinking_ended = True
|
||||
continue
|
||||
elif thinking_ended and delta['content'] == '\n\n' and thinking_started:
|
||||
thinking_started = False
|
||||
continue
|
||||
elif thinking_started and not thinking_ended:
|
||||
continue
|
||||
|
||||
# delta_tool_calls = None
|
||||
if delta.get('tool_calls'):
|
||||
for tool_call in delta['tool_calls']:
|
||||
if tool_call['id'] and tool_call['function']['name']:
|
||||
tool_id = tool_call['id']
|
||||
tool_name = tool_call['function']['name']
|
||||
|
||||
if tool_call['id'] is None:
|
||||
tool_call['id'] = tool_id
|
||||
if tool_call['function']['name'] is None:
|
||||
tool_call['function']['name'] = tool_name
|
||||
if tool_call['function']['arguments'] is None:
|
||||
tool_call['function']['arguments'] = ''
|
||||
if tool_call['type'] is None:
|
||||
tool_call['type'] = 'function'
|
||||
|
||||
# 跳过空的第一个 chunk(只有 role 没有内容)
|
||||
if chunk_idx == 0 and not delta_content and not delta.get('tool_calls'):
|
||||
chunk_idx += 1
|
||||
continue
|
||||
|
||||
# 构建 MessageChunk - 只包含增量内容
|
||||
chunk_data = {
|
||||
'role': role,
|
||||
'content': delta_content if delta_content else None,
|
||||
'tool_calls': delta.get('tool_calls'),
|
||||
'is_final': bool(finish_reason),
|
||||
}
|
||||
|
||||
# 移除 None 值
|
||||
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
|
||||
|
||||
yield provider_message.MessageChunk(**chunk_data)
|
||||
chunk_idx += 1
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 派欧云
|
||||
icon: ppio.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import openai
|
||||
import typing
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class QHAIGCChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""启航 AI ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.qhaigc.com/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 启航 AI
|
||||
icon: qhaigc.png
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -2,16 +2,19 @@ from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from . import litellmchat
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class QiniuChatCompletions(litellmchat.LiteLLMRequester):
|
||||
class QiniuChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""七牛云 ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.qnaigc.com/v1',
|
||||
'timeout': 120,
|
||||
'custom_llm_provider': 'openai',
|
||||
}
|
||||
|
||||
async def scan_models(self, api_key: str | None = None) -> dict[str, typing.Any]:
|
||||
|
||||
32
src/langbot/pkg/provider/modelmgr/requesters/shengsuanyun.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import openai
|
||||
import typing
|
||||
|
||||
from . import chatcmpl
|
||||
import openai.types.chat.chat_completion as chat_completion
|
||||
|
||||
|
||||
class ShengSuanYunChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""胜算云(ModelSpot.AI) ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://router.shengsuanyun.com/api/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
|
||||
async def _req(
|
||||
self,
|
||||
args: dict,
|
||||
extra_body: dict = {},
|
||||
) -> chat_completion.ChatCompletion:
|
||||
return await self.client.chat.completions.create(
|
||||
**args,
|
||||
extra_body=extra_body,
|
||||
extra_headers={
|
||||
'HTTP-Referer': 'https://langbot.app',
|
||||
'X-Title': 'LangBot',
|
||||
},
|
||||
)
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 胜算云
|
||||
icon: shengsuanyun.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class SiliconFlowChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""SiliconFlow ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.siliconflow.cn/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 硅基流动
|
||||
icon: siliconflow.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class LangBotSpaceChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""LangBot Space ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.langbot.cloud/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Space
|
||||
icon: space.webp
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#0052D9"/>
|
||||
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">Tencent</text>
|
||||
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">Hunyuan</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 400 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: tencent-chat-completions
|
||||
label:
|
||||
en_US: Tencent Hunyuan
|
||||
zh_Hans: 腾讯混元
|
||||
icon: tencent.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://hunyuan.tencentcloudapi.com/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#8B5CF6"/>
|
||||
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">Together</text>
|
||||
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">AI</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 396 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: together-chat-completions
|
||||
label:
|
||||
en_US: Together AI
|
||||
zh_Hans: Together AI
|
||||
icon: together.svg
|
||||
spec:
|
||||
litellm_provider: together_ai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://api.together.xyz/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 小马算力
|
||||
icon: tokenpony.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class TokenPonyChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""TokenPony ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.tokenpony.cn/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class VolcArkChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""火山方舟大模型平台 ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://ark.cn-beijing.volces.com/api/v3',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 火山方舟
|
||||
icon: volcark.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: maas
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: Voyage AI
|
||||
icon: voyageai.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
|
||||
17
src/langbot/pkg/provider/modelmgr/requesters/xaichatcmpl.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class XaiChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""xAI ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://api.x.ai/v1',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: xAI
|
||||
icon: xai.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
execution:
|
||||
python:
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
<svg width="60" height="50" viewBox="0 0 60 50" xmlns="http://www.w3.org/2000/svg">
|
||||
<rect width="60" height="50" rx="8" fill="#10B981"/>
|
||||
<text x="30" y="28" font-family="Arial, sans-serif" font-size="10" font-weight="bold" fill="white" text-anchor="middle">01.AI</text>
|
||||
<text x="30" y="40" font-family="Arial, sans-serif" font-size="8" fill="white" text-anchor="middle">Yi</text>
|
||||
</svg>
|
||||
|
Before Width: | Height: | Size: 393 B |
@@ -1,30 +0,0 @@
|
||||
apiVersion: v1
|
||||
kind: LLMAPIRequester
|
||||
metadata:
|
||||
name: yi-chat-completions
|
||||
label:
|
||||
en_US: 01.AI Yi
|
||||
zh_Hans: 零一万物
|
||||
icon: yi.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
en_US: Base URL
|
||||
zh_Hans: 基础 URL
|
||||
type: string
|
||||
required: true
|
||||
default: https://api.lingyiwanwu.com/v1
|
||||
- name: timeout
|
||||
label:
|
||||
en_US: Timeout
|
||||
zh_Hans: 超时时间
|
||||
type: integer
|
||||
required: true
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
@@ -0,0 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import openai
|
||||
|
||||
from . import chatcmpl
|
||||
|
||||
|
||||
class ZhipuAIChatCompletions(chatcmpl.OpenAIChatCompletions):
|
||||
"""智谱AI ChatCompletion API 请求器"""
|
||||
|
||||
client: openai.AsyncClient
|
||||
|
||||
default_config: dict[str, typing.Any] = {
|
||||
'base_url': 'https://open.bigmodel.cn/api/paas/v4',
|
||||
'timeout': 120,
|
||||
}
|
||||
@@ -7,7 +7,6 @@ metadata:
|
||||
zh_Hans: 智谱 AI
|
||||
icon: zhipuai.svg
|
||||
spec:
|
||||
litellm_provider: openai
|
||||
config:
|
||||
- name: base_url
|
||||
label:
|
||||
@@ -25,8 +24,6 @@ spec:
|
||||
default: 120
|
||||
support_type:
|
||||
- llm
|
||||
- text-embedding
|
||||
- rerank
|
||||
provider_category: manufacturer
|
||||
execution:
|
||||
python:
|
||||
|
||||
511
src/langbot/pkg/provider/runners/deerflowapi.py
Normal file
@@ -0,0 +1,511 @@
|
||||
"""DeerFlow LangGraph API Runner
|
||||
|
||||
参考 astrbot 的 deerflow_agent_runner 实现,适配 LangBot 的 Runner 接口。
|
||||
|
||||
特点:
|
||||
- 使用 LangGraph HTTP API 接入 deer-flow 后端
|
||||
- 自动管理 thread_id(按 session 隔离)
|
||||
- 支持 SSE 流式响应解析
|
||||
- 支持 streaming/非流式两种输出
|
||||
- 处理 values / messages-tuple / custom 三种事件
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import typing
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
from langbot.pkg.provider import runner
|
||||
from langbot.pkg.core import app
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
from langbot.libs.deerflow_api import client, errors, stream_utils
|
||||
|
||||
|
||||
_MAX_VALUES_HISTORY = 200
|
||||
|
||||
|
||||
@dataclass
|
||||
class _StreamState:
|
||||
"""流式状态跟踪"""
|
||||
|
||||
latest_text: str = ''
|
||||
prev_text_for_streaming: str = ''
|
||||
clarification_text: str = ''
|
||||
task_failures: list[str] = field(default_factory=list)
|
||||
seen_message_ids: set[str] = field(default_factory=set)
|
||||
seen_message_order: deque[str] = field(default_factory=deque)
|
||||
no_id_message_fingerprints: dict[int, str] = field(default_factory=dict)
|
||||
baseline_initialized: bool = False
|
||||
has_values_text: bool = False
|
||||
run_values_messages: list[dict[str, typing.Any]] = field(default_factory=list)
|
||||
timed_out: bool = False
|
||||
|
||||
|
||||
@runner.runner_class('deerflow-api')
|
||||
class DeerFlowAPIRunner(runner.RequestRunner):
|
||||
"""DeerFlow LangGraph API 对话请求器"""
|
||||
|
||||
deerflow_client: client.AsyncDeerFlowClient
|
||||
|
||||
def __init__(self, ap: app.Application, pipeline_config: dict):
|
||||
super().__init__(ap, pipeline_config)
|
||||
|
||||
cfg = self.pipeline_config['ai']['deerflow-api']
|
||||
|
||||
api_base = cfg.get('api-base', '').strip()
|
||||
if not api_base or not api_base.startswith(('http://', 'https://')):
|
||||
raise errors.DeerFlowAPIError(
|
||||
message='DeerFlow API Base URL 格式错误,必须以 http:// 或 https:// 开头',
|
||||
)
|
||||
|
||||
self.api_base = api_base
|
||||
self.api_key = cfg.get('api-key', '')
|
||||
self.auth_header = cfg.get('auth-header', '')
|
||||
self.assistant_id = cfg.get('assistant-id', 'lead_agent')
|
||||
self.model_name = cfg.get('model-name', '')
|
||||
self.thinking_enabled = bool(cfg.get('thinking-enabled', False))
|
||||
self.plan_mode = bool(cfg.get('plan-mode', False))
|
||||
self.subagent_enabled = bool(cfg.get('subagent-enabled', False))
|
||||
self.max_concurrent_subagents = int(cfg.get('max-concurrent-subagents', 3))
|
||||
self.timeout = int(cfg.get('timeout', 300))
|
||||
self.recursion_limit = int(cfg.get('recursion-limit', 1000))
|
||||
|
||||
self.deerflow_client = client.AsyncDeerFlowClient(
|
||||
api_base=self.api_base,
|
||||
api_key=self.api_key,
|
||||
auth_header=self.auth_header,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 辅助方法
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _fingerprint_message(self, message: dict[str, typing.Any]) -> str:
|
||||
try:
|
||||
raw = json.dumps(message, sort_keys=True, ensure_ascii=False, default=str)
|
||||
except (TypeError, ValueError):
|
||||
raw = repr(message)
|
||||
return hashlib.sha1(raw.encode('utf-8', errors='ignore')).hexdigest()
|
||||
|
||||
def _remember_seen_message_id(self, state: _StreamState, msg_id: str) -> None:
|
||||
if not msg_id or msg_id in state.seen_message_ids:
|
||||
return
|
||||
state.seen_message_ids.add(msg_id)
|
||||
state.seen_message_order.append(msg_id)
|
||||
while len(state.seen_message_order) > _MAX_VALUES_HISTORY:
|
||||
dropped = state.seen_message_order.popleft()
|
||||
state.seen_message_ids.discard(dropped)
|
||||
|
||||
def _extract_new_messages_from_values(
|
||||
self,
|
||||
values_messages: list[typing.Any],
|
||||
state: _StreamState,
|
||||
) -> list[dict[str, typing.Any]]:
|
||||
new_messages: list[dict[str, typing.Any]] = []
|
||||
no_id_indexes_seen: set[int] = set()
|
||||
for idx, msg in enumerate(values_messages):
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
msg_id = stream_utils.get_message_id(msg)
|
||||
if msg_id:
|
||||
if msg_id in state.seen_message_ids:
|
||||
continue
|
||||
self._remember_seen_message_id(state, msg_id)
|
||||
new_messages.append(msg)
|
||||
continue
|
||||
|
||||
no_id_indexes_seen.add(idx)
|
||||
fp = self._fingerprint_message(msg)
|
||||
if state.no_id_message_fingerprints.get(idx) == fp:
|
||||
continue
|
||||
state.no_id_message_fingerprints[idx] = fp
|
||||
new_messages.append(msg)
|
||||
|
||||
for idx in list(state.no_id_message_fingerprints.keys()):
|
||||
if idx not in no_id_indexes_seen:
|
||||
state.no_id_message_fingerprints.pop(idx, None)
|
||||
return new_messages
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 用户输入处理
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_user_content(
|
||||
self,
|
||||
prompt: str,
|
||||
image_urls: list[str],
|
||||
) -> typing.Any:
|
||||
"""构建 LangGraph 兼容的 user content(支持多模态)"""
|
||||
if not image_urls:
|
||||
return prompt
|
||||
|
||||
content: list[dict[str, typing.Any]] = []
|
||||
if prompt:
|
||||
content.append({'type': 'text', 'text': prompt})
|
||||
for url in image_urls:
|
||||
if not isinstance(url, str):
|
||||
continue
|
||||
url = url.strip()
|
||||
if not url:
|
||||
continue
|
||||
if url.startswith(('http://', 'https://', 'data:')):
|
||||
content.append({'type': 'image_url', 'image_url': {'url': url}})
|
||||
return content if content else prompt
|
||||
|
||||
def _preprocess_user_message(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
) -> tuple[str, list[str]]:
|
||||
"""提取用户消息的纯文本与图片 URL 列表"""
|
||||
plain_text = ''
|
||||
image_urls: list[str] = []
|
||||
|
||||
if isinstance(query.user_message.content, str):
|
||||
plain_text = query.user_message.content
|
||||
elif isinstance(query.user_message.content, list):
|
||||
for ce in query.user_message.content:
|
||||
if ce.type == 'text':
|
||||
plain_text += ce.text
|
||||
elif ce.type == 'image_base64':
|
||||
# 转换为 data URI 形式
|
||||
b64 = getattr(ce, 'image_base64', '')
|
||||
if b64:
|
||||
if not b64.startswith('data:'):
|
||||
b64 = f'data:image/png;base64,{b64}'
|
||||
image_urls.append(b64)
|
||||
elif ce.type == 'image_url':
|
||||
url = getattr(ce, 'image_url', '')
|
||||
if url:
|
||||
image_urls.append(url)
|
||||
|
||||
return plain_text, image_urls
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 请求构造
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_messages(
|
||||
self,
|
||||
prompt: str,
|
||||
image_urls: list[str],
|
||||
system_prompt: str = '',
|
||||
) -> list[dict[str, typing.Any]]:
|
||||
messages: list[dict[str, typing.Any]] = []
|
||||
if system_prompt:
|
||||
messages.append({'role': 'system', 'content': system_prompt})
|
||||
messages.append(
|
||||
{
|
||||
'role': 'user',
|
||||
'content': self._build_user_content(prompt, image_urls),
|
||||
}
|
||||
)
|
||||
return messages
|
||||
|
||||
def _build_runtime_configurable(self, thread_id: str) -> dict[str, typing.Any]:
|
||||
cfg: dict[str, typing.Any] = {
|
||||
'thread_id': thread_id,
|
||||
'thinking_enabled': self.thinking_enabled,
|
||||
'is_plan_mode': self.plan_mode,
|
||||
'subagent_enabled': self.subagent_enabled,
|
||||
}
|
||||
if self.subagent_enabled:
|
||||
cfg['max_concurrent_subagents'] = self.max_concurrent_subagents
|
||||
if self.model_name:
|
||||
cfg['model_name'] = self.model_name
|
||||
return cfg
|
||||
|
||||
def _build_payload(
|
||||
self,
|
||||
thread_id: str,
|
||||
prompt: str,
|
||||
image_urls: list[str],
|
||||
system_prompt: str = '',
|
||||
) -> dict[str, typing.Any]:
|
||||
runtime_configurable = self._build_runtime_configurable(thread_id)
|
||||
return {
|
||||
'assistant_id': self.assistant_id,
|
||||
'input': {
|
||||
'messages': self._build_messages(prompt, image_urls, system_prompt),
|
||||
},
|
||||
'stream_mode': ['values', 'messages-tuple', 'custom'],
|
||||
# DeerFlow 2.0 从 config.configurable 读取运行时覆盖
|
||||
# 同时保留 context 字段做向后兼容
|
||||
'context': dict(runtime_configurable),
|
||||
'config': {
|
||||
'recursion_limit': self.recursion_limit,
|
||||
'configurable': runtime_configurable,
|
||||
},
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Session/Thread 管理
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _ensure_thread_id(self, query: pipeline_query.Query) -> str:
|
||||
"""从 query.session 取/创建 deerflow thread_id
|
||||
|
||||
LangBot 使用 `query.session.using_conversation.uuid` 持久化 conversation id,
|
||||
我们复用这个字段存储 deerflow thread_id(与 Dify Runner 同样做法)。
|
||||
"""
|
||||
thread_id = query.session.using_conversation.uuid or ''
|
||||
if thread_id:
|
||||
return thread_id
|
||||
|
||||
thread = await self.deerflow_client.create_thread(timeout=min(30, self.timeout))
|
||||
thread_id = thread.get('thread_id', '')
|
||||
if not thread_id:
|
||||
raise errors.DeerFlowAPIError(message=f'DeerFlow create thread 返回数据缺少 thread_id: {thread}')
|
||||
|
||||
query.session.using_conversation.uuid = thread_id
|
||||
return thread_id
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 流式事件处理
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _handle_values_event(
|
||||
self,
|
||||
data: typing.Any,
|
||||
state: _StreamState,
|
||||
) -> str | None:
|
||||
"""处理 values 事件,返回新的完整文本(增量基础上的全量)"""
|
||||
values_messages = stream_utils.extract_messages_from_values_data(data)
|
||||
if not values_messages:
|
||||
return None
|
||||
|
||||
new_messages: list[dict[str, typing.Any]] = []
|
||||
if not state.baseline_initialized:
|
||||
state.baseline_initialized = True
|
||||
for idx, msg in enumerate(values_messages):
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
new_messages.append(msg)
|
||||
msg_id = stream_utils.get_message_id(msg)
|
||||
if msg_id:
|
||||
self._remember_seen_message_id(state, msg_id)
|
||||
continue
|
||||
state.no_id_message_fingerprints[idx] = self._fingerprint_message(msg)
|
||||
else:
|
||||
new_messages = self._extract_new_messages_from_values(values_messages, state)
|
||||
|
||||
latest_text = ''
|
||||
if new_messages:
|
||||
state.run_values_messages.extend(new_messages)
|
||||
if len(state.run_values_messages) > _MAX_VALUES_HISTORY:
|
||||
state.run_values_messages = state.run_values_messages[-_MAX_VALUES_HISTORY:]
|
||||
latest_text = stream_utils.extract_latest_ai_text(state.run_values_messages)
|
||||
if latest_text:
|
||||
state.has_values_text = True
|
||||
latest_clarification = stream_utils.extract_latest_clarification_text(
|
||||
state.run_values_messages,
|
||||
)
|
||||
if latest_clarification:
|
||||
state.clarification_text = latest_clarification
|
||||
|
||||
return latest_text or None
|
||||
|
||||
def _handle_message_event(
|
||||
self,
|
||||
data: typing.Any,
|
||||
state: _StreamState,
|
||||
) -> str | None:
|
||||
"""处理 messages-tuple 事件,返回增量文本
|
||||
|
||||
当 values 事件已经提供完整文本时,跳过 messages-tuple 的增量
|
||||
"""
|
||||
delta = stream_utils.extract_ai_delta_from_event_data(data)
|
||||
if delta and not state.has_values_text:
|
||||
state.latest_text += delta
|
||||
return delta
|
||||
|
||||
maybe_clar = stream_utils.extract_clarification_from_event_data(data)
|
||||
if maybe_clar:
|
||||
state.clarification_text = maybe_clar
|
||||
return None
|
||||
|
||||
def _build_final_text(self, state: _StreamState) -> str:
|
||||
"""构建最终输出文本"""
|
||||
if state.clarification_text:
|
||||
return state.clarification_text
|
||||
|
||||
# 优先使用最后一条 AI message 的文本
|
||||
latest_ai = stream_utils.extract_latest_ai_message(state.run_values_messages)
|
||||
if latest_ai:
|
||||
text = stream_utils.extract_text(latest_ai.get('content'))
|
||||
if text:
|
||||
if state.timed_out:
|
||||
text += f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。'
|
||||
return text
|
||||
|
||||
if state.latest_text:
|
||||
text = state.latest_text
|
||||
if state.timed_out:
|
||||
text += f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。'
|
||||
return text
|
||||
|
||||
# 提取任务失败信息作兜底
|
||||
failure_text = stream_utils.build_task_failure_summary(state.task_failures)
|
||||
if failure_text:
|
||||
return failure_text
|
||||
|
||||
return 'DeerFlow 返回空响应'
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 主流程
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _stream_messages_chunk(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
"""流式输出生成器"""
|
||||
plain_text, image_urls = self._preprocess_user_message(query)
|
||||
|
||||
system_prompt = ''
|
||||
# LangBot 的 pipeline 通常通过 prompt-preprocess 已注入 system prompt
|
||||
# 这里保持空,让 prompt-preprocess 的内容作为 user message 一并送给 deerflow
|
||||
|
||||
thread_id = await self._ensure_thread_id(query)
|
||||
payload = self._build_payload(
|
||||
thread_id=thread_id,
|
||||
prompt=plain_text or 'continue',
|
||||
image_urls=image_urls,
|
||||
system_prompt=system_prompt,
|
||||
)
|
||||
|
||||
state = _StreamState()
|
||||
prev_text = ''
|
||||
message_idx = 0
|
||||
|
||||
try:
|
||||
async for event in self.deerflow_client.stream_run(
|
||||
thread_id=thread_id,
|
||||
payload=payload,
|
||||
timeout=self.timeout,
|
||||
):
|
||||
event_type = event.get('event')
|
||||
data = event.get('data')
|
||||
|
||||
if event_type == 'values':
|
||||
new_full = self._handle_values_event(data, state)
|
||||
if new_full and new_full != prev_text:
|
||||
delta = new_full[len(prev_text) :] if new_full.startswith(prev_text) else new_full
|
||||
prev_text = new_full
|
||||
if delta:
|
||||
message_idx += 1
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=new_full,
|
||||
is_final=False,
|
||||
)
|
||||
continue
|
||||
|
||||
if event_type in {'messages-tuple', 'messages', 'message'}:
|
||||
delta = self._handle_message_event(data, state)
|
||||
if delta:
|
||||
prev_text = state.latest_text
|
||||
message_idx += 1
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=prev_text,
|
||||
is_final=False,
|
||||
)
|
||||
continue
|
||||
|
||||
if event_type == 'custom':
|
||||
state.task_failures.extend(
|
||||
stream_utils.extract_task_failures_from_custom_event(data),
|
||||
)
|
||||
continue
|
||||
|
||||
if event_type == 'error':
|
||||
raise errors.DeerFlowAPIError(message=f'DeerFlow stream error event: {data}')
|
||||
|
||||
if event_type == 'end':
|
||||
break
|
||||
except (asyncio.TimeoutError, TimeoutError):
|
||||
self.ap.logger.warning(f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}')
|
||||
state.timed_out = True
|
||||
|
||||
# 最终消息
|
||||
final_text = self._build_final_text(state)
|
||||
yield provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=final_text,
|
||||
is_final=True,
|
||||
)
|
||||
|
||||
async def _messages(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""非流式聚合输出"""
|
||||
plain_text, image_urls = self._preprocess_user_message(query)
|
||||
|
||||
thread_id = await self._ensure_thread_id(query)
|
||||
payload = self._build_payload(
|
||||
thread_id=thread_id,
|
||||
prompt=plain_text or 'continue',
|
||||
image_urls=image_urls,
|
||||
)
|
||||
|
||||
state = _StreamState()
|
||||
|
||||
try:
|
||||
async for event in self.deerflow_client.stream_run(
|
||||
thread_id=thread_id,
|
||||
payload=payload,
|
||||
timeout=self.timeout,
|
||||
):
|
||||
event_type = event.get('event')
|
||||
data = event.get('data')
|
||||
|
||||
if event_type == 'values':
|
||||
self._handle_values_event(data, state)
|
||||
continue
|
||||
|
||||
if event_type in {'messages-tuple', 'messages', 'message'}:
|
||||
self._handle_message_event(data, state)
|
||||
continue
|
||||
|
||||
if event_type == 'custom':
|
||||
state.task_failures.extend(
|
||||
stream_utils.extract_task_failures_from_custom_event(data),
|
||||
)
|
||||
continue
|
||||
|
||||
if event_type == 'error':
|
||||
raise errors.DeerFlowAPIError(message=f'DeerFlow stream error event: {data}')
|
||||
|
||||
if event_type == 'end':
|
||||
break
|
||||
except (asyncio.TimeoutError, TimeoutError):
|
||||
self.ap.logger.warning(f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}')
|
||||
state.timed_out = True
|
||||
|
||||
final_text = self._build_final_text(state)
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=final_text,
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""主入口:根据 adapter 是否支持流式输出,选择流式或非流式"""
|
||||
if await query.adapter.is_stream_output_supported():
|
||||
msg_idx = 0
|
||||
async for msg in self._stream_messages_chunk(query):
|
||||
msg_idx += 1
|
||||
msg.msg_sequence = msg_idx
|
||||
yield msg
|
||||
else:
|
||||
async for msg in self._messages(query):
|
||||
yield msg
|
||||
@@ -41,64 +41,6 @@ SANDBOX_EXEC_SYSTEM_GUIDANCE = (
|
||||
MAX_TOOL_CALL_ROUNDS = 128
|
||||
|
||||
|
||||
def _model_has_ability(model: modelmgr_requester.RuntimeLLMModel, ability: str) -> bool:
|
||||
return ability in (model.model_entity.abilities or [])
|
||||
|
||||
|
||||
class _StreamAccumulator:
|
||||
"""Accumulate streamed content and fragmented OpenAI-style tool calls."""
|
||||
|
||||
def __init__(self, msg_sequence: int = 0, initial_content: str | None = None):
|
||||
self.tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
self.msg_idx = 0
|
||||
self.accumulated_content = initial_content or ''
|
||||
self.last_role = 'assistant'
|
||||
self.msg_sequence = msg_sequence
|
||||
|
||||
def add(self, msg: provider_message.MessageChunk) -> provider_message.MessageChunk | None:
|
||||
self.msg_idx += 1
|
||||
|
||||
if msg.role:
|
||||
self.last_role = msg.role
|
||||
|
||||
if msg.content:
|
||||
self.accumulated_content += msg.content
|
||||
|
||||
if msg.tool_calls:
|
||||
for tool_call in msg.tool_calls:
|
||||
if tool_call.id not in self.tool_calls_map:
|
||||
self.tool_calls_map[tool_call.id] = provider_message.ToolCall(
|
||||
id=tool_call.id,
|
||||
type=tool_call.type,
|
||||
function=provider_message.FunctionCall(
|
||||
name=tool_call.function.name if tool_call.function else '',
|
||||
arguments='',
|
||||
),
|
||||
)
|
||||
if tool_call.function and tool_call.function.arguments:
|
||||
self.tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
|
||||
|
||||
if self.msg_idx % 8 == 0 or msg.is_final:
|
||||
self.msg_sequence += 1
|
||||
return provider_message.MessageChunk(
|
||||
role=self.last_role,
|
||||
content=self.accumulated_content,
|
||||
tool_calls=list(self.tool_calls_map.values()) if (self.tool_calls_map and msg.is_final) else None,
|
||||
is_final=msg.is_final,
|
||||
msg_sequence=self.msg_sequence,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def final_message(self) -> provider_message.MessageChunk:
|
||||
return provider_message.MessageChunk(
|
||||
role=self.last_role,
|
||||
content=self.accumulated_content,
|
||||
tool_calls=list(self.tool_calls_map.values()) if self.tool_calls_map else None,
|
||||
msg_sequence=self.msg_sequence,
|
||||
)
|
||||
|
||||
|
||||
@runner.runner_class('local-agent')
|
||||
class LocalAgentRunner(runner.RequestRunner):
|
||||
"""Local agent request runner"""
|
||||
@@ -163,7 +105,7 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
query,
|
||||
model,
|
||||
messages,
|
||||
funcs if _model_has_ability(model, 'func_call') else [],
|
||||
funcs if model.model_entity.abilities.__contains__('func_call') else [],
|
||||
extra_args=model.model_entity.extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
@@ -193,7 +135,7 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
query,
|
||||
model,
|
||||
messages,
|
||||
funcs if _model_has_ability(model, 'func_call') else [],
|
||||
funcs if model.model_entity.abilities.__contains__('func_call') else [],
|
||||
extra_args=model.model_entity.extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
@@ -360,7 +302,11 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
final_msg = msg
|
||||
else:
|
||||
# Streaming: invoke with fallback
|
||||
stream_accumulator = _StreamAccumulator(msg_sequence=1)
|
||||
tool_calls_map: dict[str, provider_message.ToolCall] = {}
|
||||
msg_idx = 0
|
||||
accumulated_content = ''
|
||||
last_role = 'assistant'
|
||||
msg_sequence = 1
|
||||
|
||||
stream_src, use_llm_model = await self._invoke_stream_with_fallback(
|
||||
query,
|
||||
@@ -370,12 +316,44 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
remove_think,
|
||||
)
|
||||
async for msg in stream_src:
|
||||
chunk = stream_accumulator.add(msg)
|
||||
if chunk:
|
||||
yield chunk
|
||||
msg_idx = msg_idx + 1
|
||||
|
||||
if msg.role:
|
||||
last_role = msg.role
|
||||
|
||||
if msg.content:
|
||||
accumulated_content += msg.content
|
||||
|
||||
if msg.tool_calls:
|
||||
for tool_call in msg.tool_calls:
|
||||
if tool_call.id not in tool_calls_map:
|
||||
tool_calls_map[tool_call.id] = provider_message.ToolCall(
|
||||
id=tool_call.id,
|
||||
type=tool_call.type,
|
||||
function=provider_message.FunctionCall(
|
||||
name=tool_call.function.name if tool_call.function else '', arguments=''
|
||||
),
|
||||
)
|
||||
if tool_call.function and tool_call.function.arguments:
|
||||
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
|
||||
|
||||
if msg_idx % 8 == 0 or msg.is_final:
|
||||
msg_sequence += 1
|
||||
yield provider_message.MessageChunk(
|
||||
role=last_role,
|
||||
content=accumulated_content,
|
||||
tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None,
|
||||
is_final=msg.is_final,
|
||||
msg_sequence=msg_sequence,
|
||||
)
|
||||
initial_response_emitted = True
|
||||
|
||||
final_msg = stream_accumulator.final_message()
|
||||
final_msg = provider_message.MessageChunk(
|
||||
role=last_role,
|
||||
content=accumulated_content,
|
||||
tool_calls=list(tool_calls_map.values()) if tool_calls_map else None,
|
||||
msg_sequence=msg_sequence,
|
||||
)
|
||||
|
||||
pending_tool_calls = final_msg.tool_calls
|
||||
first_content = final_msg.content
|
||||
@@ -460,36 +438,69 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
)
|
||||
|
||||
if is_stream:
|
||||
stream_accumulator = _StreamAccumulator(
|
||||
msg_sequence=first_end_sequence,
|
||||
initial_content=first_content,
|
||||
)
|
||||
tool_calls_map = {}
|
||||
msg_idx = 0
|
||||
accumulated_content = ''
|
||||
last_role = 'assistant'
|
||||
msg_sequence = first_end_sequence
|
||||
|
||||
tool_stream_src = use_llm_model.provider.invoke_llm_stream(
|
||||
query,
|
||||
use_llm_model,
|
||||
req_messages,
|
||||
query.use_funcs
|
||||
if _model_has_ability(use_llm_model, 'func_call')
|
||||
else [],
|
||||
query.use_funcs if use_llm_model.model_entity.abilities.__contains__('func_call') else [],
|
||||
extra_args=use_llm_model.model_entity.extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
async for msg in tool_stream_src:
|
||||
chunk = stream_accumulator.add(msg)
|
||||
if chunk:
|
||||
yield chunk
|
||||
msg_idx += 1
|
||||
|
||||
final_msg = stream_accumulator.final_message()
|
||||
if msg.role:
|
||||
last_role = msg.role
|
||||
|
||||
# Prepend first-round content on first chunk of tool-call round
|
||||
if msg_idx == 1:
|
||||
accumulated_content = first_content if first_content is not None else accumulated_content
|
||||
|
||||
if msg.content:
|
||||
accumulated_content += msg.content
|
||||
|
||||
if msg.tool_calls:
|
||||
for tool_call in msg.tool_calls:
|
||||
if tool_call.id not in tool_calls_map:
|
||||
tool_calls_map[tool_call.id] = provider_message.ToolCall(
|
||||
id=tool_call.id,
|
||||
type=tool_call.type,
|
||||
function=provider_message.FunctionCall(
|
||||
name=tool_call.function.name if tool_call.function else '', arguments=''
|
||||
),
|
||||
)
|
||||
if tool_call.function and tool_call.function.arguments:
|
||||
tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
|
||||
|
||||
if msg_idx % 8 == 0 or msg.is_final:
|
||||
msg_sequence += 1
|
||||
yield provider_message.MessageChunk(
|
||||
role=last_role,
|
||||
content=accumulated_content,
|
||||
tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None,
|
||||
is_final=msg.is_final,
|
||||
msg_sequence=msg_sequence,
|
||||
)
|
||||
|
||||
final_msg = provider_message.MessageChunk(
|
||||
role=last_role,
|
||||
content=accumulated_content,
|
||||
tool_calls=list(tool_calls_map.values()) if tool_calls_map else None,
|
||||
msg_sequence=msg_sequence,
|
||||
)
|
||||
else:
|
||||
# Non-streaming: use committed model directly (no fallback in tool loop)
|
||||
msg = await use_llm_model.provider.invoke_llm(
|
||||
query,
|
||||
use_llm_model,
|
||||
req_messages,
|
||||
query.use_funcs
|
||||
if _model_has_ability(use_llm_model, 'func_call')
|
||||
else [],
|
||||
query.use_funcs if use_llm_model.model_entity.abilities.__contains__('func_call') else [],
|
||||
extra_args=use_llm_model.model_entity.extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
|
||||