diff --git a/README_CN.md b/README_CN.md index 7366827d..55c7e380 100644 --- a/README_CN.md +++ b/README_CN.md @@ -38,7 +38,7 @@ LangBot 是一个**开源的生产级平台**,用于构建 AI 驱动的即时 ### 核心能力 -- **AI 对话与 Agent** — 多轮对话、工具调用、多模态、流式输出。自带 RAG(知识库),深度集成 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org) 等 LLMOps 平台。 +- **AI 对话与 Agent** — 多轮对话、工具调用、多模态、流式输出。自带 RAG(知识库),深度集成 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org)、[Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com)等 LLMOps 平台。 - **全平台支持** — 一套代码,覆盖 QQ、微信、企业微信、飞书、钉钉、Discord、Telegram、Slack、LINE、KOOK 等平台。 - **生产就绪** — 访问控制、限速、敏感词过滤、全面监控与异常处理,已被多家企业采用。 - **插件生态** — 数百个插件,跨进程的事件驱动架构,组件扩展,适配 [MCP 协议](https://modelcontextprotocol.io/)。 diff --git a/README_ES.md b/README_ES.md index b57fa103..a8ba44f7 100644 --- a/README_ES.md +++ b/README_ES.md @@ -37,7 +37,7 @@ LangBot es una **plataforma de código abierto y grado de producción** para con ### Capacidades Clave -- **Conversaciones e Agentes IA** — Diálogos de múltiples turnos, llamadas a herramientas, soporte multimodal, salida en streaming. RAG (base de conocimientos) incorporado con integración profunda con [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org). +- **Conversaciones e Agentes IA** — Diálogos de múltiples turnos, llamadas a herramientas, soporte multimodal, salida en streaming. RAG (base de conocimientos) incorporado con integración profunda con [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com). - **Soporte Universal de Plataformas de MI** — Un solo código base para Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK. - **Listo para Producción** — Control de acceso, limitación de velocidad, filtrado de palabras sensibles, monitoreo completo y manejo de excepciones. De confianza para empresas. - **Ecosistema de Plugins** — Cientos de plugins, arquitectura basada en eventos, extensiones de componentes y soporte del [protocolo MCP](https://modelcontextprotocol.io/). diff --git a/README_FR.md b/README_FR.md index be44aa09..64f3f1f4 100644 --- a/README_FR.md +++ b/README_FR.md @@ -37,7 +37,7 @@ LangBot est une **plateforme open-source de niveau production** pour créer des ### Capacités Clés -- **Conversations IA & Agents** — Dialogues multi-tours, appels d'outils, support multimodal, sortie en streaming. RAG (base de connaissances) intégré avec intégration profonde de [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org). +- **Conversations IA & Agents** — Dialogues multi-tours, appels d'outils, support multimodal, sortie en streaming. RAG (base de connaissances) intégré avec intégration profonde de [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com). - **Support Universel des Plateformes de MI** — Un seul code pour Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK. - **Prêt pour la Production** — Contrôle d'accès, limitation de débit, filtrage de mots sensibles, surveillance complète et gestion des exceptions. Approuvé par les entreprises. - **Écosystème de Plugins** — Des centaines de plugins, architecture événementielle, extensions de composants, et support du [protocole MCP](https://modelcontextprotocol.io/). diff --git a/README_JP.md b/README_JP.md index 098d796d..59686372 100644 --- a/README_JP.md +++ b/README_JP.md @@ -37,7 +37,7 @@ LangBot は、AI搭載のインスタントメッセージングボットを構 ### 主な機能 -- **AI対話とエージェント** — マルチターン対話、ツール呼び出し、マルチモーダル対応、ストリーミング出力。RAG(ナレッジベース)を内蔵し、[Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org) と深く統合。 +- **AI対話とエージェント** — マルチターン対話、ツール呼び出し、マルチモーダル対応、ストリーミング出力。RAG(ナレッジベース)を内蔵し、[Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org)、[Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com) と深く統合。 - **ユニバーサルIMプラットフォーム対応** — 単一のコードベースで Discord、Telegram、Slack、LINE、QQ、WeChat、WeCom、Lark、DingTalk、KOOK に対応。 - **本番環境対応** — アクセス制御、レート制限、センシティブワードフィルタリング、包括的な監視、例外処理を搭載。エンタープライズの信頼に応える品質。 - **プラグインエコシステム** — 数百のプラグイン、イベント駆動アーキテクチャ、コンポーネント拡張、[MCPプロトコル](https://modelcontextprotocol.io/)対応。 diff --git a/README_KO.md b/README_KO.md index e699a53e..1e4a1ada 100644 --- a/README_KO.md +++ b/README_KO.md @@ -37,7 +37,7 @@ LangBot은 AI 기반 인스턴트 메시징 봇을 구축하기 위한 **오픈 ### 핵심 기능 -- **AI 대화 및 에이전트** — 멀티턴 대화, 도구 호출, 멀티모달 지원, 스트리밍 출력. 내장 RAG(지식 베이스)와 [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org) 심층 통합. +- **AI 대화 및 에이전트** — 멀티턴 대화, 도구 호출, 멀티모달 지원, 스트리밍 출력. 내장 RAG(지식 베이스)와 [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com) 심층 통합. - **유니버설 IM 플랫폼 지원** — 단일 코드베이스로 Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK 지원. - **프로덕션 레디** — 접근 제어, 속도 제한, 민감어 필터링, 종합 모니터링 및 예외 처리. 기업 환경에서 검증됨. - **플러그인 생태계** — 수백 개의 플러그인, 이벤트 기반 아키텍처, 컴포넌트 확장, [MCP 프로토콜](https://modelcontextprotocol.io/) 지원. diff --git a/README_RU.md b/README_RU.md index 1ca28b19..79103684 100644 --- a/README_RU.md +++ b/README_RU.md @@ -37,7 +37,7 @@ LangBot — это **платформа с открытым исходным к ### Ключевые возможности -- **ИИ-диалоги и агенты** — Многораундовые диалоги, вызов инструментов, мультимодальная поддержка, потоковый вывод. Встроенная реализация RAG (база знаний) с глубокой интеграцией в [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org). +- **ИИ-диалоги и агенты** — Многораундовые диалоги, вызов инструментов, мультимодальная поддержка, потоковый вывод. Встроенная реализация RAG (база знаний) с глубокой интеграцией в [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com). - **Универсальная поддержка IM-платформ** — Единая кодовая база для Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK. - **Готовность к продакшену** — Контроль доступа, ограничение скорости, фильтрация чувствительных слов, комплексный мониторинг и обработка исключений. Проверено в корпоративной среде. - **Экосистема плагинов** — Сотни плагинов, событийно-ориентированная архитектура, расширения компонентов и поддержка [протокола MCP](https://modelcontextprotocol.io/). diff --git a/README_TW.md b/README_TW.md index 2fdf5a85..8afeebcd 100644 --- a/README_TW.md +++ b/README_TW.md @@ -39,7 +39,7 @@ LangBot 是一個**開源的生產級平台**,用於建構 AI 驅動的即時 ### 核心能力 -- **AI 對話與 Agent** — 多輪對話、工具調用、多模態、流式輸出。自帶 RAG(知識庫),深度整合 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org) 等 LLMOps 平台。 +- **AI 對話與 Agent** — 多輪對話、工具調用、多模態、流式輸出。自帶 RAG(知識庫),深度整合 [Dify](https://dify.ai)、[Coze](https://coze.com)、[n8n](https://n8n.io)、[Langflow](https://langflow.org)、 [Deerflow](https://deerflow.tech)、[Weknora](https://weknora.weixin.qq.com)等 LLMOps 平台。 - **全平台支援** — 一套程式碼,覆蓋 QQ、微信、企業微信、飛書、釘釘、Discord、Telegram、Slack、LINE、KOOK 等平台。 - **生產就緒** — 存取控制、限速、敏感詞過濾、全面監控與異常處理,已被多家企業採用。 - **外掛生態** — 數百個外掛,事件驅動架構,組件擴展,適配 [MCP 協議](https://modelcontextprotocol.io/)。 diff --git a/README_VI.md b/README_VI.md index 7fbf6247..e5766ad2 100644 --- a/README_VI.md +++ b/README_VI.md @@ -37,7 +37,7 @@ LangBot là một **nền tảng mã nguồn mở, cấp sản xuất** để x ### Khả năng chính -- **Hội thoại AI & Agent** — Đối thoại nhiều lượt, gọi công cụ, hỗ trợ đa phương thức, đầu ra streaming. RAG (cơ sở kiến thức) tích hợp sẵn với tích hợp sâu vào [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org). +- **Hội thoại AI & Agent** — Đối thoại nhiều lượt, gọi công cụ, hỗ trợ đa phương thức, đầu ra streaming. RAG (cơ sở kiến thức) tích hợp sẵn với tích hợp sâu vào [Dify](https://dify.ai), [Coze](https://coze.com), [n8n](https://n8n.io), [Langflow](https://langflow.org), [Deerflow](https://deerflow.tech), [Weknora](https://weknora.weixin.qq.com). - **Hỗ trợ đa nền tảng IM** — Một mã nguồn cho Discord, Telegram, Slack, LINE, QQ, WeChat, WeCom, Lark, DingTalk, KOOK. - **Sẵn sàng cho sản xuất** — Kiểm soát truy cập, giới hạn tốc độ, lọc từ nhạy cảm, giám sát toàn diện và xử lý ngoại lệ. Được doanh nghiệp tin dùng. - **Hệ sinh thái Plugin** — Hàng trăm plugin, kiến trúc hướng sự kiện, mở rộng thành phần, và hỗ trợ [giao thức MCP](https://modelcontextprotocol.io/). diff --git a/src/langbot/libs/deerflow_api/__init__.py b/src/langbot/libs/deerflow_api/__init__.py new file mode 100644 index 00000000..160778b1 --- /dev/null +++ b/src/langbot/libs/deerflow_api/__init__.py @@ -0,0 +1,5 @@ +from .client import AsyncDeerFlowClient +from .errors import DeerFlowAPIError +from . import stream_utils + +__all__ = ['AsyncDeerFlowClient', 'DeerFlowAPIError', 'stream_utils'] diff --git a/src/langbot/libs/deerflow_api/client.py b/src/langbot/libs/deerflow_api/client.py new file mode 100644 index 00000000..b66bf7e2 --- /dev/null +++ b/src/langbot/libs/deerflow_api/client.py @@ -0,0 +1,204 @@ +"""DeerFlow LangGraph HTTP API 客户端 + +参考 astrbot 的 deerflow_api_client 实现,使用 httpx 适配 LangBot 风格。 +""" + +from __future__ import annotations + +import codecs +import json +import typing +from collections.abc import AsyncGenerator + +import httpx + +from .errors import DeerFlowAPIError + + +SSE_MAX_BUFFER_CHARS = 1_048_576 + + +def _normalize_sse_newlines(text: str) -> str: + """规范化 CRLF/CR 为 LF,确保 SSE 块分割稳定""" + return text.replace('\r\n', '\n').replace('\r', '\n') + + +def _parse_sse_data_lines(data_lines: list[str]) -> typing.Any: + raw_data = '\n'.join(data_lines) + try: + return json.loads(raw_data) + except json.JSONDecodeError: + # 某些 LangGraph 兼容服务端会在单个 SSE 事件中用多个 data 行 + # 发送多段 JSON 片段(例如 tuple payload) + parsed_lines: list[typing.Any] = [] + can_parse_all = True + for line in data_lines: + line = line.strip() + if not line: + continue + try: + parsed_lines.append(json.loads(line)) + except json.JSONDecodeError: + can_parse_all = False + break + if can_parse_all and parsed_lines: + return parsed_lines[0] if len(parsed_lines) == 1 else parsed_lines + return raw_data + + +def _parse_sse_block(block: str) -> dict[str, typing.Any] | None: + if not block.strip(): + return None + + event_name = 'message' + data_lines: list[str] = [] + for line in block.splitlines(): + if line.startswith('event:'): + event_name = line[6:].strip() + elif line.startswith('data:'): + data_lines.append(line[5:].lstrip()) + + if not data_lines: + return None + return {'event': event_name, 'data': _parse_sse_data_lines(data_lines)} + + +class AsyncDeerFlowClient: + """DeerFlow LangGraph HTTP API 客户端""" + + api_base: str + headers: dict[str, str] + + def __init__( + self, + api_base: str = 'http://127.0.0.1:2026', + api_key: str = '', + auth_header: str = '', + ) -> None: + self.api_base = api_base.rstrip('/') + self.headers: dict[str, str] = {} + if auth_header: + self.headers['Authorization'] = auth_header + elif api_key: + self.headers['Authorization'] = f'Bearer {api_key}' + + async def create_thread(self, timeout: float = 20) -> dict[str, typing.Any]: + """创建一个新的 LangGraph thread + + Returns: + 包含 thread_id 等信息的字典 + """ + url = f'{self.api_base}/api/langgraph/threads' + payload = {'metadata': {}} + + async with httpx.AsyncClient( + trust_env=True, + timeout=timeout, + ) as client: + response = await client.post( + url, + headers=self.headers, + json=payload, + ) + if response.status_code not in (200, 201): + raise DeerFlowAPIError( + operation='create thread', + status=response.status_code, + body=response.text, + url=url, + ) + return response.json() + + async def delete_thread(self, thread_id: str, timeout: float = 20) -> None: + """删除指定 thread""" + url = f'{self.api_base}/api/threads/{thread_id}' + + async with httpx.AsyncClient( + trust_env=True, + timeout=timeout, + ) as client: + response = await client.delete(url, headers=self.headers) + if response.status_code not in (200, 202, 204, 404): + raise DeerFlowAPIError( + operation='delete thread', + status=response.status_code, + body=response.text, + url=url, + thread_id=thread_id, + ) + + async def stream_run( + self, + thread_id: str, + payload: dict[str, typing.Any], + timeout: float = 120, + ) -> AsyncGenerator[dict[str, typing.Any], None]: + """运行一次 LangGraph stream 请求,逐事件 yield + + Yields: + 事件字典 {'event': event_name, 'data': parsed_data} + """ + url = f'{self.api_base}/api/langgraph/threads/{thread_id}/runs/stream' + + # 流式请求使用单独的 read timeout 控制 + stream_timeout = httpx.Timeout( + connect=min(timeout, 30), + read=timeout, + write=timeout, + pool=timeout, + ) + + async with httpx.AsyncClient( + trust_env=True, + timeout=stream_timeout, + ) as client: + async with client.stream( + 'POST', + url, + headers={ + **self.headers, + 'Accept': 'text/event-stream', + 'Content-Type': 'application/json', + }, + json=payload, + ) as resp: + if resp.status_code != 200: + body = await resp.aread() + raise DeerFlowAPIError( + operation='runs/stream request', + status=resp.status_code, + body=body.decode('utf-8', errors='replace'), + url=url, + thread_id=thread_id, + ) + + decoder = codecs.getincrementaldecoder('utf-8')('replace') + buffer = '' + + async for chunk in resp.aiter_bytes(8192): + buffer += _normalize_sse_newlines(decoder.decode(chunk)) + + while '\n\n' in buffer: + block, buffer = buffer.split('\n\n', 1) + parsed = _parse_sse_block(block) + if parsed is not None: + yield parsed + + if len(buffer) > SSE_MAX_BUFFER_CHARS: + # 缓冲区过大,强制 flush + parsed = _parse_sse_block(buffer) + if parsed is not None: + yield parsed + buffer = '' + + # flush 剩余内容 + buffer += _normalize_sse_newlines(decoder.decode(b'', final=True)) + while '\n\n' in buffer: + block, buffer = buffer.split('\n\n', 1) + parsed = _parse_sse_block(block) + if parsed is not None: + yield parsed + if buffer.strip(): + parsed = _parse_sse_block(buffer) + if parsed is not None: + yield parsed diff --git a/src/langbot/libs/deerflow_api/errors.py b/src/langbot/libs/deerflow_api/errors.py new file mode 100644 index 00000000..a3a6c0ab --- /dev/null +++ b/src/langbot/libs/deerflow_api/errors.py @@ -0,0 +1,30 @@ +from __future__ import annotations + + +class DeerFlowAPIError(Exception): + """DeerFlow API 请求失败""" + + def __init__( + self, + *, + operation: str = '', + status: int = 0, + body: str = '', + url: str = '', + thread_id: str | None = None, + message: str = '', + ) -> None: + self.operation = operation + self.status = status + self.body = body + self.url = url + self.thread_id = thread_id + + if message: + super().__init__(message) + return + + msg = f'DeerFlow {operation} failed: status={status}, url={url}, body={body}' + if thread_id is not None: + msg = f'DeerFlow {operation} failed: thread_id={thread_id}, status={status}, url={url}, body={body}' + super().__init__(msg) diff --git a/src/langbot/libs/deerflow_api/stream_utils.py b/src/langbot/libs/deerflow_api/stream_utils.py new file mode 100644 index 00000000..702cb14a --- /dev/null +++ b/src/langbot/libs/deerflow_api/stream_utils.py @@ -0,0 +1,212 @@ +"""DeerFlow LangGraph 流式响应解析工具 + +参考 astrbot 实现的 deerflow_stream_utils。 +""" + +from __future__ import annotations + +import typing +from collections.abc import Iterable + + +def extract_text(content: typing.Any) -> str: + """从消息 content 中提取纯文本""" + if isinstance(content, str): + return content + if isinstance(content, dict): + if isinstance(content.get('text'), str): + return content['text'] + if 'content' in content: + return extract_text(content.get('content')) + if 'kwargs' in content and isinstance(content['kwargs'], dict): + return extract_text(content['kwargs'].get('content')) + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + item_type = item.get('type') + if item_type == 'text' and isinstance(item.get('text'), str): + parts.append(item['text']) + elif 'content' in item: + parts.append(extract_text(item['content'])) + return '\n'.join([p for p in parts if p]).strip() + return str(content) if content is not None else '' + + +def extract_messages_from_values_data(data: typing.Any) -> list[typing.Any]: + """从 values 事件中提取 messages 列表""" + candidates: list[typing.Any] = [] + if isinstance(data, dict): + candidates.append(data) + if isinstance(data.get('values'), dict): + candidates.append(data['values']) + elif isinstance(data, list): + candidates.extend([x for x in data if isinstance(x, dict)]) + + for item in candidates: + messages = item.get('messages') + if isinstance(messages, list): + return messages + return [] + + +def is_ai_message(message: dict[str, typing.Any]) -> bool: + """判断是否为 AI/assistant 消息""" + role = str(message.get('role', '')).lower() + if role in {'assistant', 'ai'}: + return True + + msg_type = str(message.get('type', '')).lower() + if msg_type in {'ai', 'assistant', 'aimessage', 'aimessagechunk'}: + return True + if 'ai' in msg_type and all(token not in msg_type for token in ('human', 'tool', 'system')): + return True + return False + + +def extract_latest_ai_text(messages: Iterable[typing.Any]) -> str: + """获取最近一条 AI 消息的文本内容""" + if isinstance(messages, (list, tuple)): + iterable = reversed(messages) + else: + iterable = reversed(list(messages)) + + for msg in iterable: + if not isinstance(msg, dict): + continue + if is_ai_message(msg): + text = extract_text(msg.get('content')) + if text: + return text + return '' + + +def extract_latest_ai_message(messages: Iterable[typing.Any]) -> dict[str, typing.Any] | None: + """获取最近一条 AI 消息对象""" + if isinstance(messages, (list, tuple)): + iterable = reversed(messages) + else: + iterable = reversed(list(messages)) + + for msg in iterable: + if not isinstance(msg, dict): + continue + if is_ai_message(msg): + return msg + return None + + +def is_clarification_tool_message(message: dict[str, typing.Any]) -> bool: + """判断是否为澄清问题工具消息""" + msg_type = str(message.get('type', '')).lower() + tool_name = str(message.get('name', '')).lower() + return msg_type == 'tool' and tool_name == 'ask_clarification' + + +def extract_latest_clarification_text(messages: Iterable[typing.Any]) -> str: + """提取最近的澄清问题文本""" + if isinstance(messages, (list, tuple)): + iterable = reversed(messages) + else: + iterable = reversed(list(messages)) + + for msg in iterable: + if not isinstance(msg, dict): + continue + if is_clarification_tool_message(msg): + text = extract_text(msg.get('content')) + if text: + return text + return '' + + +def get_message_id(message: typing.Any) -> str: + """提取消息 ID""" + if not isinstance(message, dict): + return '' + msg_id = message.get('id') + return msg_id if isinstance(msg_id, str) else '' + + +def extract_event_message_obj(data: typing.Any) -> dict[str, typing.Any] | None: + """从事件 data 中提取消息对象""" + msg_obj = data + if isinstance(data, (list, tuple)) and data: + msg_obj = data[0] + if isinstance(msg_obj, dict) and isinstance(msg_obj.get('data'), dict): + msg_obj = msg_obj['data'] + return msg_obj if isinstance(msg_obj, dict) else None + + +def extract_ai_delta_from_event_data(data: typing.Any) -> str: + """从 messages-tuple 事件中提取 AI delta 文本""" + msg_obj = extract_event_message_obj(data) + if not msg_obj: + return '' + if is_ai_message(msg_obj): + return extract_text(msg_obj.get('content')) + return '' + + +def extract_clarification_from_event_data(data: typing.Any) -> str: + """从事件中提取澄清问题""" + msg_obj = extract_event_message_obj(data) + if not msg_obj: + return '' + if is_clarification_tool_message(msg_obj): + return extract_text(msg_obj.get('content')) + return '' + + +def _iter_custom_event_items(data: typing.Any) -> list[dict[str, typing.Any]]: + items: list[dict[str, typing.Any]] = [] + if isinstance(data, dict): + return [data] + if isinstance(data, list): + for item in data: + if isinstance(item, dict): + items.append(item) + elif isinstance(item, (list, tuple)): + for nested in item: + if isinstance(nested, dict): + items.append(nested) + return items + + +def extract_task_failures_from_custom_event(data: typing.Any) -> list[str]: + """从 custom 事件中提取子任务失败信息""" + failures: list[str] = [] + for item in _iter_custom_event_items(data): + event_type = str(item.get('type', '')).lower() + if event_type not in {'task_failed', 'task_timed_out'}: + continue + + task_id = str(item.get('task_id', '')).strip() + error_text = extract_text(item.get('error')).strip() + if task_id and error_text: + failures.append(f'{task_id}: {error_text}') + elif error_text: + failures.append(error_text) + elif task_id: + failures.append(f'{task_id}: unknown error') + else: + failures.append('unknown task failure') + return failures + + +def build_task_failure_summary(failures: list[str]) -> str: + """构建任务失败摘要""" + if not failures: + return '' + deduped: list[str] = [] + seen: set[str] = set() + for failure in failures: + if failure not in seen: + seen.add(failure) + deduped.append(failure) + if len(deduped) == 1: + return f'DeerFlow subtask failed: {deduped[0]}' + joined = '\n'.join([f'- {item}' for item in deduped[:5]]) + return f'DeerFlow subtasks failed:\n{joined}' diff --git a/src/langbot/libs/weknora_api/__init__.py b/src/langbot/libs/weknora_api/__init__.py new file mode 100644 index 00000000..23926101 --- /dev/null +++ b/src/langbot/libs/weknora_api/__init__.py @@ -0,0 +1,4 @@ +from .client import AsyncWeKnoraClient +from .errors import WeKnoraAPIError + +__all__ = ['AsyncWeKnoraClient', 'WeKnoraAPIError'] diff --git a/src/langbot/libs/weknora_api/client.py b/src/langbot/libs/weknora_api/client.py new file mode 100644 index 00000000..f753136d --- /dev/null +++ b/src/langbot/libs/weknora_api/client.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import httpx +import typing +import json + +from .errors import WeKnoraAPIError + + +class AsyncWeKnoraClient: + """WeKnora API 客户端""" + + api_key: str + base_url: str + + def __init__( + self, + api_key: str, + base_url: str = 'http://localhost:80/api/v1', + ) -> None: + self.api_key = api_key + self.base_url = base_url + + async def create_session( + self, + title: str = '', + description: str = '', + timeout: float = 30.0, + ) -> str: + """创建会话,返回 session_id""" + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + payload: dict[str, typing.Any] = {} + if title: + payload['title'] = title + if description: + payload['description'] = description + + response = await client.post( + '/sessions', + headers={ + 'X-API-Key': self.api_key, + 'Content-Type': 'application/json', + }, + json=payload, + ) + + if response.status_code not in (200, 201): + raise WeKnoraAPIError(f'{response.status_code} {response.text}') + + data = response.json() + return data['data']['id'] + + async def agent_chat( + self, + session_id: str, + query: str, + user: str, + agent_id: str = '', + knowledge_base_ids: list[str] | None = None, + web_search_enabled: bool = False, + timeout: float = 120.0, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """ + Agent 智能对话(SSE 流式) + + 响应事件类型: + - agent_query: Agent 开始处理 + - thinking: 思考过程 + - tool_call: 工具调用 + - tool_result: 工具结果 + - references: 知识库引用 + - answer: 回答内容 + - reflection: 反思 + - session_title: 会话标题 + - error: 错误 + """ + if knowledge_base_ids is None: + knowledge_base_ids = [] + + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + payload: dict[str, typing.Any] = { + 'query': query, + 'agent_enabled': True, + 'channel': 'im', + } + if agent_id: + payload['agent_id'] = agent_id + if knowledge_base_ids: + payload['knowledge_base_ids'] = knowledge_base_ids + if web_search_enabled: + payload['web_search_enabled'] = True + + async with client.stream( + 'POST', + f'/agent-chat/{session_id}', + headers={ + 'X-API-Key': self.api_key, + 'Content-Type': 'application/json', + }, + json=payload, + ) as r: + async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise WeKnoraAPIError(f'{r.status_code} {chunk}') + if chunk.strip() == '': + continue + if chunk.startswith('data:'): + try: + data = json.loads(chunk[5:].strip()) + except json.JSONDecodeError: + continue + yield data + # 收到 error 事件后主动结束流,避免上层未 raise 时持续等待 + if data.get('response_type') == 'error': + return + + async def knowledge_chat( + self, + session_id: str, + query: str, + user: str, + agent_id: str = 'builtin-quick-answer', + knowledge_base_ids: list[str] | None = None, + timeout: float = 120.0, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """ + 知识库 RAG 问答(SSE 流式) + + 响应事件类型: + - references: 知识库引用 + - answer: 回答内容 + """ + if knowledge_base_ids is None: + knowledge_base_ids = [] + + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + payload: dict[str, typing.Any] = { + 'query': query, + 'channel': 'im', + } + if agent_id: + payload['agent_id'] = agent_id + if knowledge_base_ids: + payload['knowledge_base_ids'] = knowledge_base_ids + + async with client.stream( + 'POST', + f'/knowledge-chat/{session_id}', + headers={ + 'X-API-Key': self.api_key, + 'Content-Type': 'application/json', + }, + json=payload, + ) as r: + async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise WeKnoraAPIError(f'{r.status_code} {chunk}') + if chunk.strip() == '': + continue + if chunk.startswith('data:'): + try: + data = json.loads(chunk[5:].strip()) + except json.JSONDecodeError: + continue + yield data + # 收到 error 事件后主动结束流,避免上层未 raise 时持续等待 + if data.get('response_type') == 'error': + return diff --git a/src/langbot/libs/weknora_api/errors.py b/src/langbot/libs/weknora_api/errors.py new file mode 100644 index 00000000..c43b724c --- /dev/null +++ b/src/langbot/libs/weknora_api/errors.py @@ -0,0 +1,6 @@ +class WeKnoraAPIError(Exception): + """WeKnora API 请求失败""" + + def __init__(self, message: str = ''): + self.message = message + super().__init__(self.message) diff --git a/src/langbot/pkg/core/migrations/m042_weknora_api.py b/src/langbot/pkg/core/migrations/m042_weknora_api.py new file mode 100644 index 00000000..c3a5bf78 --- /dev/null +++ b/src/langbot/pkg/core/migrations/m042_weknora_api.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class('weknora-api-config', 42) +class WeKnoraAPICfgMigration(migration.Migration): + """WeKnora API 配置迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'weknora-api' not in self.ap.provider_cfg.data + + async def run(self): + """执行迁移""" + self.ap.provider_cfg.data['weknora-api'] = { + 'base-url': 'http://localhost:8080/api/v1', + 'app-type': 'agent', + 'api-key': '', + 'agent-id': 'builtin-smart-reasoning', + 'knowledge-base-ids': [], + 'web-search-enabled': False, + 'timeout': 120, + 'base-prompt': '请回答用户的问题。', + } + + await self.ap.provider_cfg.dump_config() diff --git a/src/langbot/pkg/core/migrations/m043_deerflow_api.py b/src/langbot/pkg/core/migrations/m043_deerflow_api.py new file mode 100644 index 00000000..e6cc2c7d --- /dev/null +++ b/src/langbot/pkg/core/migrations/m043_deerflow_api.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class('deerflow-api-config', 43) +class DeerFlowAPICfgMigration(migration.Migration): + """DeerFlow API 配置迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'deerflow-api' not in self.ap.provider_cfg.data + + async def run(self): + """执行迁移""" + self.ap.provider_cfg.data['deerflow-api'] = { + 'api-base': 'http://127.0.0.1:2026', + 'api-key': '', + 'auth-header': '', + 'assistant-id': 'lead_agent', + 'model-name': '', + 'thinking-enabled': False, + 'plan-mode': False, + 'subagent-enabled': False, + 'max-concurrent-subagents': 3, + 'timeout': 300, + 'recursion-limit': 1000, + } + + await self.ap.provider_cfg.dump_config() diff --git a/src/langbot/pkg/provider/runners/deerflowapi.py b/src/langbot/pkg/provider/runners/deerflowapi.py new file mode 100644 index 00000000..79c77126 --- /dev/null +++ b/src/langbot/pkg/provider/runners/deerflowapi.py @@ -0,0 +1,511 @@ +"""DeerFlow LangGraph API Runner + +参考 astrbot 的 deerflow_agent_runner 实现,适配 LangBot 的 Runner 接口。 + +特点: +- 使用 LangGraph HTTP API 接入 deer-flow 后端 +- 自动管理 thread_id(按 session 隔离) +- 支持 SSE 流式响应解析 +- 支持 streaming/非流式两种输出 +- 处理 values / messages-tuple / custom 三种事件 +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import typing +from collections import deque +from dataclasses import dataclass, field + + +from langbot.pkg.provider import runner +from langbot.pkg.core import app +import langbot_plugin.api.entities.builtin.provider.message as provider_message +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +from langbot.libs.deerflow_api import client, errors, stream_utils + + +_MAX_VALUES_HISTORY = 200 + + +@dataclass +class _StreamState: + """流式状态跟踪""" + + latest_text: str = '' + prev_text_for_streaming: str = '' + clarification_text: str = '' + task_failures: list[str] = field(default_factory=list) + seen_message_ids: set[str] = field(default_factory=set) + seen_message_order: deque[str] = field(default_factory=deque) + no_id_message_fingerprints: dict[int, str] = field(default_factory=dict) + baseline_initialized: bool = False + has_values_text: bool = False + run_values_messages: list[dict[str, typing.Any]] = field(default_factory=list) + timed_out: bool = False + + +@runner.runner_class('deerflow-api') +class DeerFlowAPIRunner(runner.RequestRunner): + """DeerFlow LangGraph API 对话请求器""" + + deerflow_client: client.AsyncDeerFlowClient + + def __init__(self, ap: app.Application, pipeline_config: dict): + super().__init__(ap, pipeline_config) + + cfg = self.pipeline_config['ai']['deerflow-api'] + + api_base = cfg.get('api-base', '').strip() + if not api_base or not api_base.startswith(('http://', 'https://')): + raise errors.DeerFlowAPIError( + message='DeerFlow API Base URL 格式错误,必须以 http:// 或 https:// 开头', + ) + + self.api_base = api_base + self.api_key = cfg.get('api-key', '') + self.auth_header = cfg.get('auth-header', '') + self.assistant_id = cfg.get('assistant-id', 'lead_agent') + self.model_name = cfg.get('model-name', '') + self.thinking_enabled = bool(cfg.get('thinking-enabled', False)) + self.plan_mode = bool(cfg.get('plan-mode', False)) + self.subagent_enabled = bool(cfg.get('subagent-enabled', False)) + self.max_concurrent_subagents = int(cfg.get('max-concurrent-subagents', 3)) + self.timeout = int(cfg.get('timeout', 300)) + self.recursion_limit = int(cfg.get('recursion-limit', 1000)) + + self.deerflow_client = client.AsyncDeerFlowClient( + api_base=self.api_base, + api_key=self.api_key, + auth_header=self.auth_header, + ) + + # ------------------------------------------------------------------ + # 辅助方法 + # ------------------------------------------------------------------ + + def _fingerprint_message(self, message: dict[str, typing.Any]) -> str: + try: + raw = json.dumps(message, sort_keys=True, ensure_ascii=False, default=str) + except (TypeError, ValueError): + raw = repr(message) + return hashlib.sha1(raw.encode('utf-8', errors='ignore')).hexdigest() + + def _remember_seen_message_id(self, state: _StreamState, msg_id: str) -> None: + if not msg_id or msg_id in state.seen_message_ids: + return + state.seen_message_ids.add(msg_id) + state.seen_message_order.append(msg_id) + while len(state.seen_message_order) > _MAX_VALUES_HISTORY: + dropped = state.seen_message_order.popleft() + state.seen_message_ids.discard(dropped) + + def _extract_new_messages_from_values( + self, + values_messages: list[typing.Any], + state: _StreamState, + ) -> list[dict[str, typing.Any]]: + new_messages: list[dict[str, typing.Any]] = [] + no_id_indexes_seen: set[int] = set() + for idx, msg in enumerate(values_messages): + if not isinstance(msg, dict): + continue + msg_id = stream_utils.get_message_id(msg) + if msg_id: + if msg_id in state.seen_message_ids: + continue + self._remember_seen_message_id(state, msg_id) + new_messages.append(msg) + continue + + no_id_indexes_seen.add(idx) + fp = self._fingerprint_message(msg) + if state.no_id_message_fingerprints.get(idx) == fp: + continue + state.no_id_message_fingerprints[idx] = fp + new_messages.append(msg) + + for idx in list(state.no_id_message_fingerprints.keys()): + if idx not in no_id_indexes_seen: + state.no_id_message_fingerprints.pop(idx, None) + return new_messages + + # ------------------------------------------------------------------ + # 用户输入处理 + # ------------------------------------------------------------------ + + def _build_user_content( + self, + prompt: str, + image_urls: list[str], + ) -> typing.Any: + """构建 LangGraph 兼容的 user content(支持多模态)""" + if not image_urls: + return prompt + + content: list[dict[str, typing.Any]] = [] + if prompt: + content.append({'type': 'text', 'text': prompt}) + for url in image_urls: + if not isinstance(url, str): + continue + url = url.strip() + if not url: + continue + if url.startswith(('http://', 'https://', 'data:')): + content.append({'type': 'image_url', 'image_url': {'url': url}}) + return content if content else prompt + + def _preprocess_user_message( + self, + query: pipeline_query.Query, + ) -> tuple[str, list[str]]: + """提取用户消息的纯文本与图片 URL 列表""" + plain_text = '' + image_urls: list[str] = [] + + if isinstance(query.user_message.content, str): + plain_text = query.user_message.content + elif isinstance(query.user_message.content, list): + for ce in query.user_message.content: + if ce.type == 'text': + plain_text += ce.text + elif ce.type == 'image_base64': + # 转换为 data URI 形式 + b64 = getattr(ce, 'image_base64', '') + if b64: + if not b64.startswith('data:'): + b64 = f'data:image/png;base64,{b64}' + image_urls.append(b64) + elif ce.type == 'image_url': + url = getattr(ce, 'image_url', '') + if url: + image_urls.append(url) + + return plain_text, image_urls + + # ------------------------------------------------------------------ + # 请求构造 + # ------------------------------------------------------------------ + + def _build_messages( + self, + prompt: str, + image_urls: list[str], + system_prompt: str = '', + ) -> list[dict[str, typing.Any]]: + messages: list[dict[str, typing.Any]] = [] + if system_prompt: + messages.append({'role': 'system', 'content': system_prompt}) + messages.append( + { + 'role': 'user', + 'content': self._build_user_content(prompt, image_urls), + } + ) + return messages + + def _build_runtime_configurable(self, thread_id: str) -> dict[str, typing.Any]: + cfg: dict[str, typing.Any] = { + 'thread_id': thread_id, + 'thinking_enabled': self.thinking_enabled, + 'is_plan_mode': self.plan_mode, + 'subagent_enabled': self.subagent_enabled, + } + if self.subagent_enabled: + cfg['max_concurrent_subagents'] = self.max_concurrent_subagents + if self.model_name: + cfg['model_name'] = self.model_name + return cfg + + def _build_payload( + self, + thread_id: str, + prompt: str, + image_urls: list[str], + system_prompt: str = '', + ) -> dict[str, typing.Any]: + runtime_configurable = self._build_runtime_configurable(thread_id) + return { + 'assistant_id': self.assistant_id, + 'input': { + 'messages': self._build_messages(prompt, image_urls, system_prompt), + }, + 'stream_mode': ['values', 'messages-tuple', 'custom'], + # DeerFlow 2.0 从 config.configurable 读取运行时覆盖 + # 同时保留 context 字段做向后兼容 + 'context': dict(runtime_configurable), + 'config': { + 'recursion_limit': self.recursion_limit, + 'configurable': runtime_configurable, + }, + } + + # ------------------------------------------------------------------ + # Session/Thread 管理 + # ------------------------------------------------------------------ + + async def _ensure_thread_id(self, query: pipeline_query.Query) -> str: + """从 query.session 取/创建 deerflow thread_id + + LangBot 使用 `query.session.using_conversation.uuid` 持久化 conversation id, + 我们复用这个字段存储 deerflow thread_id(与 Dify Runner 同样做法)。 + """ + thread_id = query.session.using_conversation.uuid or '' + if thread_id: + return thread_id + + thread = await self.deerflow_client.create_thread(timeout=min(30, self.timeout)) + thread_id = thread.get('thread_id', '') + if not thread_id: + raise errors.DeerFlowAPIError(message=f'DeerFlow create thread 返回数据缺少 thread_id: {thread}') + + query.session.using_conversation.uuid = thread_id + return thread_id + + # ------------------------------------------------------------------ + # 流式事件处理 + # ------------------------------------------------------------------ + + def _handle_values_event( + self, + data: typing.Any, + state: _StreamState, + ) -> str | None: + """处理 values 事件,返回新的完整文本(增量基础上的全量)""" + values_messages = stream_utils.extract_messages_from_values_data(data) + if not values_messages: + return None + + new_messages: list[dict[str, typing.Any]] = [] + if not state.baseline_initialized: + state.baseline_initialized = True + for idx, msg in enumerate(values_messages): + if not isinstance(msg, dict): + continue + new_messages.append(msg) + msg_id = stream_utils.get_message_id(msg) + if msg_id: + self._remember_seen_message_id(state, msg_id) + continue + state.no_id_message_fingerprints[idx] = self._fingerprint_message(msg) + else: + new_messages = self._extract_new_messages_from_values(values_messages, state) + + latest_text = '' + if new_messages: + state.run_values_messages.extend(new_messages) + if len(state.run_values_messages) > _MAX_VALUES_HISTORY: + state.run_values_messages = state.run_values_messages[-_MAX_VALUES_HISTORY:] + latest_text = stream_utils.extract_latest_ai_text(state.run_values_messages) + if latest_text: + state.has_values_text = True + latest_clarification = stream_utils.extract_latest_clarification_text( + state.run_values_messages, + ) + if latest_clarification: + state.clarification_text = latest_clarification + + return latest_text or None + + def _handle_message_event( + self, + data: typing.Any, + state: _StreamState, + ) -> str | None: + """处理 messages-tuple 事件,返回增量文本 + + 当 values 事件已经提供完整文本时,跳过 messages-tuple 的增量 + """ + delta = stream_utils.extract_ai_delta_from_event_data(data) + if delta and not state.has_values_text: + state.latest_text += delta + return delta + + maybe_clar = stream_utils.extract_clarification_from_event_data(data) + if maybe_clar: + state.clarification_text = maybe_clar + return None + + def _build_final_text(self, state: _StreamState) -> str: + """构建最终输出文本""" + if state.clarification_text: + return state.clarification_text + + # 优先使用最后一条 AI message 的文本 + latest_ai = stream_utils.extract_latest_ai_message(state.run_values_messages) + if latest_ai: + text = stream_utils.extract_text(latest_ai.get('content')) + if text: + if state.timed_out: + text += f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。' + return text + + if state.latest_text: + text = state.latest_text + if state.timed_out: + text += f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。' + return text + + # 提取任务失败信息作兜底 + failure_text = stream_utils.build_task_failure_summary(state.task_failures) + if failure_text: + return failure_text + + return 'DeerFlow 返回空响应' + + # ------------------------------------------------------------------ + # 主流程 + # ------------------------------------------------------------------ + + async def _stream_messages_chunk( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """流式输出生成器""" + plain_text, image_urls = self._preprocess_user_message(query) + + system_prompt = '' + # LangBot 的 pipeline 通常通过 prompt-preprocess 已注入 system prompt + # 这里保持空,让 prompt-preprocess 的内容作为 user message 一并送给 deerflow + + thread_id = await self._ensure_thread_id(query) + payload = self._build_payload( + thread_id=thread_id, + prompt=plain_text or 'continue', + image_urls=image_urls, + system_prompt=system_prompt, + ) + + state = _StreamState() + prev_text = '' + message_idx = 0 + + try: + async for event in self.deerflow_client.stream_run( + thread_id=thread_id, + payload=payload, + timeout=self.timeout, + ): + event_type = event.get('event') + data = event.get('data') + + if event_type == 'values': + new_full = self._handle_values_event(data, state) + if new_full and new_full != prev_text: + delta = new_full[len(prev_text) :] if new_full.startswith(prev_text) else new_full + prev_text = new_full + if delta: + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + content=new_full, + is_final=False, + ) + continue + + if event_type in {'messages-tuple', 'messages', 'message'}: + delta = self._handle_message_event(data, state) + if delta: + prev_text = state.latest_text + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + content=prev_text, + is_final=False, + ) + continue + + if event_type == 'custom': + state.task_failures.extend( + stream_utils.extract_task_failures_from_custom_event(data), + ) + continue + + if event_type == 'error': + raise errors.DeerFlowAPIError(message=f'DeerFlow stream error event: {data}') + + if event_type == 'end': + break + except (asyncio.TimeoutError, TimeoutError): + self.ap.logger.warning(f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}') + state.timed_out = True + + # 最终消息 + final_text = self._build_final_text(state) + yield provider_message.MessageChunk( + role='assistant', + content=final_text, + is_final=True, + ) + + async def _messages( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """非流式聚合输出""" + plain_text, image_urls = self._preprocess_user_message(query) + + thread_id = await self._ensure_thread_id(query) + payload = self._build_payload( + thread_id=thread_id, + prompt=plain_text or 'continue', + image_urls=image_urls, + ) + + state = _StreamState() + + try: + async for event in self.deerflow_client.stream_run( + thread_id=thread_id, + payload=payload, + timeout=self.timeout, + ): + event_type = event.get('event') + data = event.get('data') + + if event_type == 'values': + self._handle_values_event(data, state) + continue + + if event_type in {'messages-tuple', 'messages', 'message'}: + self._handle_message_event(data, state) + continue + + if event_type == 'custom': + state.task_failures.extend( + stream_utils.extract_task_failures_from_custom_event(data), + ) + continue + + if event_type == 'error': + raise errors.DeerFlowAPIError(message=f'DeerFlow stream error event: {data}') + + if event_type == 'end': + break + except (asyncio.TimeoutError, TimeoutError): + self.ap.logger.warning(f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}') + state.timed_out = True + + final_text = self._build_final_text(state) + yield provider_message.Message( + role='assistant', + content=final_text, + ) + + async def run( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """主入口:根据 adapter 是否支持流式输出,选择流式或非流式""" + if await query.adapter.is_stream_output_supported(): + msg_idx = 0 + async for msg in self._stream_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + else: + async for msg in self._messages(query): + yield msg diff --git a/src/langbot/pkg/provider/runners/weknoraapi.py b/src/langbot/pkg/provider/runners/weknoraapi.py new file mode 100644 index 00000000..9d46eebb --- /dev/null +++ b/src/langbot/pkg/provider/runners/weknoraapi.py @@ -0,0 +1,351 @@ +from __future__ import annotations + +import typing +import json + + +from langbot.pkg.provider import runner +from langbot.pkg.core import app +import langbot_plugin.api.entities.builtin.provider.message as provider_message +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +from langbot.libs.weknora_api import client, errors + + +@runner.runner_class('weknora-api') +class WeKnoraAPIRunner(runner.RequestRunner): + """WeKnora API 对话请求器""" + + weknora_client: client.AsyncWeKnoraClient + + def __init__(self, ap: app.Application, pipeline_config: dict): + super().__init__(ap, pipeline_config) + + valid_app_types = ['chat', 'agent'] + if self.pipeline_config['ai']['weknora-api']['app-type'] not in valid_app_types: + raise errors.WeKnoraAPIError( + f'不支持的 WeKnora 应用类型: {self.pipeline_config["ai"]["weknora-api"]["app-type"]}' + ) + + api_key = self.pipeline_config['ai']['weknora-api'].get('api-key', '').strip() + if not api_key: + raise errors.WeKnoraAPIError( + 'WeKnora API Key 未配置,请在流水线的 WeKnora API 配置中填入 API Key ' + '(从 WeKnora 前端 设置 → API Keys 生成)' + ) + + base_url = self.pipeline_config['ai']['weknora-api'].get('base-url', '').strip() + if not base_url: + raise errors.WeKnoraAPIError('WeKnora Base URL 未配置,请填入服务器地址,例如 http://localhost:8080/api/v1') + + self.weknora_client = client.AsyncWeKnoraClient( + api_key=api_key, + base_url=base_url, + ) + + async def _extract_plain_text(self, query: pipeline_query.Query) -> str: + """从用户消息中提取纯文本内容""" + plain_text = '' + if isinstance(query.user_message.content, str): + plain_text = query.user_message.content + elif isinstance(query.user_message.content, list): + for ce in query.user_message.content: + if ce.type == 'text': + plain_text += ce.text + + if not plain_text: + plain_text = self.pipeline_config['ai']['weknora-api'].get('base-prompt', '') + + return plain_text + + async def _ensure_session(self, query: pipeline_query.Query) -> str: + """确保会话存在,如果不存在则创建""" + session_id = query.session.using_conversation.uuid or '' + + if not session_id: + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + session_id = await self.weknora_client.create_session(title=f'IM Chat - {user_tag}') + query.session.using_conversation.uuid = session_id + + return session_id + + async def _agent_chat_messages( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """调用 Agent 智能对话(非流式聚合输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-smart-reasoning') + knowledge_base_ids = config.get('knowledge-base-ids', []) + web_search_enabled = config.get('web-search-enabled', False) + timeout = config.get('timeout', 120) + + full_answer = '' + chunk = None + + async for chunk in self.weknora_client.agent_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + web_search_enabled=web_search_enabled, + timeout=timeout, + ): + self.ap.logger.debug('weknora-agent-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + + if response_type == 'tool_call': + # 工具调用 + tool_data = chunk.get('data', {}) + tool_name = tool_data.get('tool_name', '') + if tool_name: + yield provider_message.Message( + role='assistant', + tool_calls=[ + provider_message.ToolCall( + id=chunk.get('id', ''), + type='function', + function=provider_message.FunctionCall( + name=tool_name, + arguments=json.dumps(tool_data.get('arguments', {})), + ), + ) + ], + ) + + elif response_type == 'answer': + if content: + full_answer += content + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + if full_answer: + yield provider_message.Message( + role='assistant', + content=full_answer, + ) + + async def _chat_messages( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """调用知识库 RAG 问答(非流式聚合输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-quick-answer') + knowledge_base_ids = config.get('knowledge-base-ids', []) + timeout = config.get('timeout', 120) + + full_answer = '' + chunk = None + + async for chunk in self.weknora_client.knowledge_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + timeout=timeout, + ): + self.ap.logger.debug('weknora-chat-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + + if response_type == 'answer': + if content: + full_answer += content + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + if full_answer: + yield provider_message.Message( + role='assistant', + content=full_answer, + ) + + async def _agent_chat_messages_chunk( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """调用 Agent 智能对话(流式输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-smart-reasoning') + knowledge_base_ids = config.get('knowledge-base-ids', []) + web_search_enabled = config.get('web-search-enabled', False) + timeout = config.get('timeout', 120) + + pending_answer = '' + message_idx = 0 + is_final = False + chunk = None + + async for chunk in self.weknora_client.agent_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + web_search_enabled=web_search_enabled, + timeout=timeout, + ): + self.ap.logger.debug('weknora-agent-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + done = chunk.get('done', False) + + if response_type == 'tool_call': + tool_data = chunk.get('data', {}) + tool_name = tool_data.get('tool_name', '') + if tool_name: + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + tool_calls=[ + provider_message.ToolCall( + id=chunk.get('id', ''), + type='function', + function=provider_message.FunctionCall( + name=tool_name, + arguments=json.dumps(tool_data.get('arguments', {})), + ), + ) + ], + ) + + elif response_type == 'answer': + message_idx += 1 + if content: + pending_answer += content + + if done: + is_final = True + + # 每 8 个 chunk 输出一次,或最终输出 + if message_idx % 8 == 0 or is_final: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=is_final, + ) + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + # 确保最终消息已发出 + if not is_final and pending_answer: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=True, + ) + + async def _chat_messages_chunk( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """调用知识库 RAG 问答(流式输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-quick-answer') + knowledge_base_ids = config.get('knowledge-base-ids', []) + timeout = config.get('timeout', 120) + + pending_answer = '' + message_idx = 0 + is_final = False + chunk = None + + async for chunk in self.weknora_client.knowledge_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + timeout=timeout, + ): + self.ap.logger.debug('weknora-chat-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + done = chunk.get('done', False) + + if response_type == 'answer': + message_idx += 1 + if content: + pending_answer += content + + if done: + is_final = True + + if message_idx % 8 == 0 or is_final: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=is_final, + ) + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + if not is_final and pending_answer: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=True, + ) + + async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: + """运行请求""" + app_type = self.pipeline_config['ai']['weknora-api']['app-type'] + + if await query.adapter.is_stream_output_supported(): + msg_idx = 0 + if app_type == 'agent': + async for msg in self._agent_chat_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + elif app_type == 'chat': + async for msg in self._chat_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + else: + raise errors.WeKnoraAPIError(f'不支持的 WeKnora 应用类型: {app_type}') + else: + if app_type == 'agent': + async for msg in self._agent_chat_messages(query): + yield msg + elif app_type == 'chat': + async for msg in self._chat_messages(query): + yield msg + else: + raise errors.WeKnoraAPIError(f'不支持的 WeKnora 应用类型: {app_type}') diff --git a/src/langbot/templates/metadata/pipeline/ai.yaml b/src/langbot/templates/metadata/pipeline/ai.yaml index 32f4115f..16b21069 100644 --- a/src/langbot/templates/metadata/pipeline/ai.yaml +++ b/src/langbot/templates/metadata/pipeline/ai.yaml @@ -47,6 +47,14 @@ stages: label: en_US: Langflow API zh_Hans: Langflow API + - name: weknora-api + label: + en_US: WeKnora API + zh_Hans: WeKnora API + - name: deerflow-api + label: + en_US: DeerFlow API + zh_Hans: DeerFlow API - name: expire-time label: en_US: Conversation expire time (seconds) @@ -653,3 +661,215 @@ stages: type: json required: false default: '{}' + - name: weknora-api + label: + en_US: WeKnora API + zh_Hans: WeKnora API + description: + en_US: Configure the WeKnora API of the pipeline + zh_Hans: 配置 WeKnora API + config: + - name: base-url + label: + en_US: Base URL + zh_Hans: 基础 URL + description: + en_US: The base URL of the WeKnora server (with /api/v1) + zh_Hans: WeKnora 服务器的基础 URL(包含 /api/v1) + type: string + required: true + default: 'http://localhost:8080/api/v1' + - name: api-key + label: + en_US: API Key + zh_Hans: API 密钥 + description: + en_US: The API key for WeKnora, generated from WeKnora frontend Settings → API Keys + zh_Hans: WeKnora 的 API 密钥,从 WeKnora 前端 设置 → API Keys 生成 + type: string + required: true + default: '' + - name: app-type + label: + en_US: App Type + zh_Hans: 应用类型 + type: select + required: true + default: agent + options: + - name: agent + label: + en_US: Agent (Smart Reasoning) + zh_Hans: Agent(智能推理) + - name: chat + label: + en_US: Chat (Knowledge Base RAG) + zh_Hans: 聊天(知识库 RAG) + - name: agent-id + label: + en_US: Agent ID + zh_Hans: 智能体 ID + description: + en_US: The Agent ID to use. Built-in agents include builtin-quick-answer, builtin-smart-reasoning, builtin-data-analyst + zh_Hans: 要使用的智能体 ID。内置智能体:builtin-quick-answer、builtin-smart-reasoning、builtin-data-analyst + type: string + required: true + default: 'builtin-smart-reasoning' + - name: knowledge-base-ids + label: + en_US: Knowledge Base IDs + zh_Hans: 知识库 ID 列表 + description: + en_US: List of WeKnora knowledge base IDs to use (one per line) + zh_Hans: 要使用的 WeKnora 知识库 ID 列表(每行一个) + type: array + required: false + default: [] + - name: web-search-enabled + label: + en_US: Enable Web Search + zh_Hans: 启用网络搜索 + description: + en_US: Whether to enable web search in agent mode + zh_Hans: 在 Agent 模式下是否启用网络搜索 + type: boolean + required: false + default: false + - name: timeout + label: + en_US: Timeout + zh_Hans: 超时时间 + description: + en_US: Request timeout in seconds + zh_Hans: 请求超时时间(秒) + type: integer + required: false + default: 120 + - name: base-prompt + label: + en_US: Base Prompt + zh_Hans: 基础提示词 + description: + en_US: Default prompt when user message is empty (e.g. only images) + zh_Hans: 当用户消息为空(例如仅图片)时使用的默认提示词 + type: string + required: false + default: '请回答用户的问题。' + - name: deerflow-api + label: + en_US: DeerFlow API + zh_Hans: DeerFlow API + description: + en_US: Configure the DeerFlow LangGraph API of the pipeline + zh_Hans: 配置 DeerFlow LangGraph API + config: + - name: api-base + label: + en_US: API Base URL + zh_Hans: API 基础 URL + description: + en_US: The base URL of the DeerFlow server (e.g. http://127.0.0.1:2026) + zh_Hans: DeerFlow 服务器的基础 URL(例如 http://127.0.0.1:2026) + type: string + required: true + default: 'http://127.0.0.1:2026' + - name: api-key + label: + en_US: API Key + zh_Hans: API 密钥 + description: + en_US: Optional API key for DeerFlow (leave empty if not required) + zh_Hans: DeerFlow 的 API 密钥(如果不需要可留空) + type: string + required: false + default: '' + - name: auth-header + label: + en_US: Auth Header Name + zh_Hans: 鉴权请求头名称 + description: + en_US: Custom auth header name. Leave empty to use "x-api-key" + zh_Hans: 自定义鉴权请求头名称,留空则使用 "x-api-key" + type: string + required: false + default: '' + - name: assistant-id + label: + en_US: Assistant ID + zh_Hans: 助手 ID + description: + en_US: The DeerFlow assistant/graph id (default lead_agent) + zh_Hans: DeerFlow 助手/图 ID(默认 lead_agent) + type: string + required: true + default: 'lead_agent' + - name: model-name + label: + en_US: Model Name + zh_Hans: 模型名称 + description: + en_US: Optional model override forwarded to DeerFlow configurable + zh_Hans: 可选的模型名称覆盖,会作为 configurable 转发给 DeerFlow + type: string + required: false + default: '' + - name: thinking-enabled + label: + en_US: Enable Thinking + zh_Hans: 启用思考 + description: + en_US: Whether to enable DeerFlow thinking mode + zh_Hans: 是否启用 DeerFlow 思考模式 + type: boolean + required: false + default: false + - name: plan-mode + label: + en_US: Plan Mode + zh_Hans: 规划模式 + description: + en_US: Whether to enable DeerFlow plan mode + zh_Hans: 是否启用 DeerFlow 规划模式 + type: boolean + required: false + default: false + - name: subagent-enabled + label: + en_US: Enable Subagents + zh_Hans: 启用子代理 + description: + en_US: Whether to enable parallel subagent execution + zh_Hans: 是否启用并行子代理执行 + type: boolean + required: false + default: false + - name: max-concurrent-subagents + label: + en_US: Max Concurrent Subagents + zh_Hans: 最大并发子代理数 + description: + en_US: Maximum number of concurrent subagents (only effective when subagents are enabled) + zh_Hans: 最大并发子代理数(仅在启用子代理时生效) + type: integer + required: false + default: 3 + - name: timeout + label: + en_US: Timeout + zh_Hans: 超时时间 + description: + en_US: Request timeout in seconds (DeerFlow runs may take a long time) + zh_Hans: 请求超时时间(秒),DeerFlow 运行可能耗时较长 + type: integer + required: false + default: 300 + - name: recursion-limit + label: + en_US: Recursion Limit + zh_Hans: 递归上限 + description: + en_US: LangGraph recursion limit for a single run + zh_Hans: 单次运行的 LangGraph 递归上限 + type: integer + required: false + default: 1000