diff --git a/src/langbot/libs/deerflow_api/__init__.py b/src/langbot/libs/deerflow_api/__init__.py new file mode 100644 index 00000000..160778b1 --- /dev/null +++ b/src/langbot/libs/deerflow_api/__init__.py @@ -0,0 +1,5 @@ +from .client import AsyncDeerFlowClient +from .errors import DeerFlowAPIError +from . import stream_utils + +__all__ = ['AsyncDeerFlowClient', 'DeerFlowAPIError', 'stream_utils'] diff --git a/src/langbot/libs/deerflow_api/client.py b/src/langbot/libs/deerflow_api/client.py new file mode 100644 index 00000000..a08b5f91 --- /dev/null +++ b/src/langbot/libs/deerflow_api/client.py @@ -0,0 +1,203 @@ +"""DeerFlow LangGraph HTTP API 客户端 + +参考 astrbot 的 deerflow_api_client 实现,使用 httpx 适配 LangBot 风格。 +""" +from __future__ import annotations + +import codecs +import json +import typing +from collections.abc import AsyncGenerator + +import httpx + +from .errors import DeerFlowAPIError + + +SSE_MAX_BUFFER_CHARS = 1_048_576 + + +def _normalize_sse_newlines(text: str) -> str: + """规范化 CRLF/CR 为 LF,确保 SSE 块分割稳定""" + return text.replace('\r\n', '\n').replace('\r', '\n') + + +def _parse_sse_data_lines(data_lines: list[str]) -> typing.Any: + raw_data = '\n'.join(data_lines) + try: + return json.loads(raw_data) + except json.JSONDecodeError: + # 某些 LangGraph 兼容服务端会在单个 SSE 事件中用多个 data 行 + # 发送多段 JSON 片段(例如 tuple payload) + parsed_lines: list[typing.Any] = [] + can_parse_all = True + for line in data_lines: + line = line.strip() + if not line: + continue + try: + parsed_lines.append(json.loads(line)) + except json.JSONDecodeError: + can_parse_all = False + break + if can_parse_all and parsed_lines: + return parsed_lines[0] if len(parsed_lines) == 1 else parsed_lines + return raw_data + + +def _parse_sse_block(block: str) -> dict[str, typing.Any] | None: + if not block.strip(): + return None + + event_name = 'message' + data_lines: list[str] = [] + for line in block.splitlines(): + if line.startswith('event:'): + event_name = line[6:].strip() + elif line.startswith('data:'): + data_lines.append(line[5:].lstrip()) + + if not data_lines: + return None + return {'event': event_name, 'data': _parse_sse_data_lines(data_lines)} + + +class AsyncDeerFlowClient: + """DeerFlow LangGraph HTTP API 客户端""" + + api_base: str + headers: dict[str, str] + + def __init__( + self, + api_base: str = 'http://127.0.0.1:2026', + api_key: str = '', + auth_header: str = '', + ) -> None: + self.api_base = api_base.rstrip('/') + self.headers: dict[str, str] = {} + if auth_header: + self.headers['Authorization'] = auth_header + elif api_key: + self.headers['Authorization'] = f'Bearer {api_key}' + + async def create_thread(self, timeout: float = 20) -> dict[str, typing.Any]: + """创建一个新的 LangGraph thread + + Returns: + 包含 thread_id 等信息的字典 + """ + url = f'{self.api_base}/api/langgraph/threads' + payload = {'metadata': {}} + + async with httpx.AsyncClient( + trust_env=True, + timeout=timeout, + ) as client: + response = await client.post( + url, + headers=self.headers, + json=payload, + ) + if response.status_code not in (200, 201): + raise DeerFlowAPIError( + operation='create thread', + status=response.status_code, + body=response.text, + url=url, + ) + return response.json() + + async def delete_thread(self, thread_id: str, timeout: float = 20) -> None: + """删除指定 thread""" + url = f'{self.api_base}/api/threads/{thread_id}' + + async with httpx.AsyncClient( + trust_env=True, + timeout=timeout, + ) as client: + response = await client.delete(url, headers=self.headers) + if response.status_code not in (200, 202, 204, 404): + raise DeerFlowAPIError( + operation='delete thread', + status=response.status_code, + body=response.text, + url=url, + thread_id=thread_id, + ) + + async def stream_run( + self, + thread_id: str, + payload: dict[str, typing.Any], + timeout: float = 120, + ) -> AsyncGenerator[dict[str, typing.Any], None]: + """运行一次 LangGraph stream 请求,逐事件 yield + + Yields: + 事件字典 {'event': event_name, 'data': parsed_data} + """ + url = f'{self.api_base}/api/langgraph/threads/{thread_id}/runs/stream' + + # 流式请求使用单独的 read timeout 控制 + stream_timeout = httpx.Timeout( + connect=min(timeout, 30), + read=timeout, + write=timeout, + pool=timeout, + ) + + async with httpx.AsyncClient( + trust_env=True, + timeout=stream_timeout, + ) as client: + async with client.stream( + 'POST', + url, + headers={ + **self.headers, + 'Accept': 'text/event-stream', + 'Content-Type': 'application/json', + }, + json=payload, + ) as resp: + if resp.status_code != 200: + body = await resp.aread() + raise DeerFlowAPIError( + operation='runs/stream request', + status=resp.status_code, + body=body.decode('utf-8', errors='replace'), + url=url, + thread_id=thread_id, + ) + + decoder = codecs.getincrementaldecoder('utf-8')('replace') + buffer = '' + + async for chunk in resp.aiter_bytes(8192): + buffer += _normalize_sse_newlines(decoder.decode(chunk)) + + while '\n\n' in buffer: + block, buffer = buffer.split('\n\n', 1) + parsed = _parse_sse_block(block) + if parsed is not None: + yield parsed + + if len(buffer) > SSE_MAX_BUFFER_CHARS: + # 缓冲区过大,强制 flush + parsed = _parse_sse_block(buffer) + if parsed is not None: + yield parsed + buffer = '' + + # flush 剩余内容 + buffer += _normalize_sse_newlines(decoder.decode(b'', final=True)) + while '\n\n' in buffer: + block, buffer = buffer.split('\n\n', 1) + parsed = _parse_sse_block(block) + if parsed is not None: + yield parsed + if buffer.strip(): + parsed = _parse_sse_block(buffer) + if parsed is not None: + yield parsed diff --git a/src/langbot/libs/deerflow_api/errors.py b/src/langbot/libs/deerflow_api/errors.py new file mode 100644 index 00000000..8ea23ef5 --- /dev/null +++ b/src/langbot/libs/deerflow_api/errors.py @@ -0,0 +1,33 @@ +from __future__ import annotations + + +class DeerFlowAPIError(Exception): + """DeerFlow API 请求失败""" + + def __init__( + self, + *, + operation: str = '', + status: int = 0, + body: str = '', + url: str = '', + thread_id: str | None = None, + message: str = '', + ) -> None: + self.operation = operation + self.status = status + self.body = body + self.url = url + self.thread_id = thread_id + + if message: + super().__init__(message) + return + + msg = f'DeerFlow {operation} failed: status={status}, url={url}, body={body}' + if thread_id is not None: + msg = ( + f'DeerFlow {operation} failed: thread_id={thread_id}, ' + f'status={status}, url={url}, body={body}' + ) + super().__init__(msg) diff --git a/src/langbot/libs/deerflow_api/stream_utils.py b/src/langbot/libs/deerflow_api/stream_utils.py new file mode 100644 index 00000000..933b3dfb --- /dev/null +++ b/src/langbot/libs/deerflow_api/stream_utils.py @@ -0,0 +1,213 @@ +"""DeerFlow LangGraph 流式响应解析工具 + +参考 astrbot 实现的 deerflow_stream_utils。 +""" +from __future__ import annotations + +import typing +from collections.abc import Iterable + + +def extract_text(content: typing.Any) -> str: + """从消息 content 中提取纯文本""" + if isinstance(content, str): + return content + if isinstance(content, dict): + if isinstance(content.get('text'), str): + return content['text'] + if 'content' in content: + return extract_text(content.get('content')) + if 'kwargs' in content and isinstance(content['kwargs'], dict): + return extract_text(content['kwargs'].get('content')) + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict): + item_type = item.get('type') + if item_type == 'text' and isinstance(item.get('text'), str): + parts.append(item['text']) + elif 'content' in item: + parts.append(extract_text(item['content'])) + return '\n'.join([p for p in parts if p]).strip() + return str(content) if content is not None else '' + + +def extract_messages_from_values_data(data: typing.Any) -> list[typing.Any]: + """从 values 事件中提取 messages 列表""" + candidates: list[typing.Any] = [] + if isinstance(data, dict): + candidates.append(data) + if isinstance(data.get('values'), dict): + candidates.append(data['values']) + elif isinstance(data, list): + candidates.extend([x for x in data if isinstance(x, dict)]) + + for item in candidates: + messages = item.get('messages') + if isinstance(messages, list): + return messages + return [] + + +def is_ai_message(message: dict[str, typing.Any]) -> bool: + """判断是否为 AI/assistant 消息""" + role = str(message.get('role', '')).lower() + if role in {'assistant', 'ai'}: + return True + + msg_type = str(message.get('type', '')).lower() + if msg_type in {'ai', 'assistant', 'aimessage', 'aimessagechunk'}: + return True + if 'ai' in msg_type and all( + token not in msg_type for token in ('human', 'tool', 'system') + ): + return True + return False + + +def extract_latest_ai_text(messages: Iterable[typing.Any]) -> str: + """获取最近一条 AI 消息的文本内容""" + if isinstance(messages, (list, tuple)): + iterable = reversed(messages) + else: + iterable = reversed(list(messages)) + + for msg in iterable: + if not isinstance(msg, dict): + continue + if is_ai_message(msg): + text = extract_text(msg.get('content')) + if text: + return text + return '' + + +def extract_latest_ai_message(messages: Iterable[typing.Any]) -> dict[str, typing.Any] | None: + """获取最近一条 AI 消息对象""" + if isinstance(messages, (list, tuple)): + iterable = reversed(messages) + else: + iterable = reversed(list(messages)) + + for msg in iterable: + if not isinstance(msg, dict): + continue + if is_ai_message(msg): + return msg + return None + + +def is_clarification_tool_message(message: dict[str, typing.Any]) -> bool: + """判断是否为澄清问题工具消息""" + msg_type = str(message.get('type', '')).lower() + tool_name = str(message.get('name', '')).lower() + return msg_type == 'tool' and tool_name == 'ask_clarification' + + +def extract_latest_clarification_text(messages: Iterable[typing.Any]) -> str: + """提取最近的澄清问题文本""" + if isinstance(messages, (list, tuple)): + iterable = reversed(messages) + else: + iterable = reversed(list(messages)) + + for msg in iterable: + if not isinstance(msg, dict): + continue + if is_clarification_tool_message(msg): + text = extract_text(msg.get('content')) + if text: + return text + return '' + + +def get_message_id(message: typing.Any) -> str: + """提取消息 ID""" + if not isinstance(message, dict): + return '' + msg_id = message.get('id') + return msg_id if isinstance(msg_id, str) else '' + + +def extract_event_message_obj(data: typing.Any) -> dict[str, typing.Any] | None: + """从事件 data 中提取消息对象""" + msg_obj = data + if isinstance(data, (list, tuple)) and data: + msg_obj = data[0] + if isinstance(msg_obj, dict) and isinstance(msg_obj.get('data'), dict): + msg_obj = msg_obj['data'] + return msg_obj if isinstance(msg_obj, dict) else None + + +def extract_ai_delta_from_event_data(data: typing.Any) -> str: + """从 messages-tuple 事件中提取 AI delta 文本""" + msg_obj = extract_event_message_obj(data) + if not msg_obj: + return '' + if is_ai_message(msg_obj): + return extract_text(msg_obj.get('content')) + return '' + + +def extract_clarification_from_event_data(data: typing.Any) -> str: + """从事件中提取澄清问题""" + msg_obj = extract_event_message_obj(data) + if not msg_obj: + return '' + if is_clarification_tool_message(msg_obj): + return extract_text(msg_obj.get('content')) + return '' + + +def _iter_custom_event_items(data: typing.Any) -> list[dict[str, typing.Any]]: + items: list[dict[str, typing.Any]] = [] + if isinstance(data, dict): + return [data] + if isinstance(data, list): + for item in data: + if isinstance(item, dict): + items.append(item) + elif isinstance(item, (list, tuple)): + for nested in item: + if isinstance(nested, dict): + items.append(nested) + return items + + +def extract_task_failures_from_custom_event(data: typing.Any) -> list[str]: + """从 custom 事件中提取子任务失败信息""" + failures: list[str] = [] + for item in _iter_custom_event_items(data): + event_type = str(item.get('type', '')).lower() + if event_type not in {'task_failed', 'task_timed_out'}: + continue + + task_id = str(item.get('task_id', '')).strip() + error_text = extract_text(item.get('error')).strip() + if task_id and error_text: + failures.append(f'{task_id}: {error_text}') + elif error_text: + failures.append(error_text) + elif task_id: + failures.append(f'{task_id}: unknown error') + else: + failures.append('unknown task failure') + return failures + + +def build_task_failure_summary(failures: list[str]) -> str: + """构建任务失败摘要""" + if not failures: + return '' + deduped: list[str] = [] + seen: set[str] = set() + for failure in failures: + if failure not in seen: + seen.add(failure) + deduped.append(failure) + if len(deduped) == 1: + return f'DeerFlow subtask failed: {deduped[0]}' + joined = '\n'.join([f'- {item}' for item in deduped[:5]]) + return f'DeerFlow subtasks failed:\n{joined}' diff --git a/src/langbot/libs/weknora_api/v1/__init__.py b/src/langbot/libs/weknora_api/__init__.py similarity index 100% rename from src/langbot/libs/weknora_api/v1/__init__.py rename to src/langbot/libs/weknora_api/__init__.py diff --git a/src/langbot/libs/weknora_api/v1/client.py b/src/langbot/libs/weknora_api/client.py similarity index 100% rename from src/langbot/libs/weknora_api/v1/client.py rename to src/langbot/libs/weknora_api/client.py diff --git a/src/langbot/libs/weknora_api/v1/errors.py b/src/langbot/libs/weknora_api/errors.py similarity index 100% rename from src/langbot/libs/weknora_api/v1/errors.py rename to src/langbot/libs/weknora_api/errors.py diff --git a/src/langbot/pkg/core/migrations/m043_deerflow_api.py b/src/langbot/pkg/core/migrations/m043_deerflow_api.py new file mode 100644 index 00000000..e6cc2c7d --- /dev/null +++ b/src/langbot/pkg/core/migrations/m043_deerflow_api.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class('deerflow-api-config', 43) +class DeerFlowAPICfgMigration(migration.Migration): + """DeerFlow API 配置迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'deerflow-api' not in self.ap.provider_cfg.data + + async def run(self): + """执行迁移""" + self.ap.provider_cfg.data['deerflow-api'] = { + 'api-base': 'http://127.0.0.1:2026', + 'api-key': '', + 'auth-header': '', + 'assistant-id': 'lead_agent', + 'model-name': '', + 'thinking-enabled': False, + 'plan-mode': False, + 'subagent-enabled': False, + 'max-concurrent-subagents': 3, + 'timeout': 300, + 'recursion-limit': 1000, + } + + await self.ap.provider_cfg.dump_config() diff --git a/src/langbot/pkg/provider/runners/deerflowapi.py b/src/langbot/pkg/provider/runners/deerflowapi.py new file mode 100644 index 00000000..4bd44ad5 --- /dev/null +++ b/src/langbot/pkg/provider/runners/deerflowapi.py @@ -0,0 +1,531 @@ +"""DeerFlow LangGraph API Runner + +参考 astrbot 的 deerflow_agent_runner 实现,适配 LangBot 的 Runner 接口。 + +特点: +- 使用 LangGraph HTTP API 接入 deer-flow 后端 +- 自动管理 thread_id(按 session 隔离) +- 支持 SSE 流式响应解析 +- 支持 streaming/非流式两种输出 +- 处理 values / messages-tuple / custom 三种事件 +""" +from __future__ import annotations + +import asyncio +import hashlib +import json +import typing +import uuid +from collections import deque +from dataclasses import dataclass, field + + +from langbot.pkg.provider import runner +from langbot.pkg.core import app +import langbot_plugin.api.entities.builtin.provider.message as provider_message +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +from langbot.libs.deerflow_api import client, errors, stream_utils + + +_MAX_VALUES_HISTORY = 200 + + +@dataclass +class _StreamState: + """流式状态跟踪""" + latest_text: str = '' + prev_text_for_streaming: str = '' + clarification_text: str = '' + task_failures: list[str] = field(default_factory=list) + seen_message_ids: set[str] = field(default_factory=set) + seen_message_order: deque[str] = field(default_factory=deque) + no_id_message_fingerprints: dict[int, str] = field(default_factory=dict) + baseline_initialized: bool = False + has_values_text: bool = False + run_values_messages: list[dict[str, typing.Any]] = field(default_factory=list) + timed_out: bool = False + + +@runner.runner_class('deerflow-api') +class DeerFlowAPIRunner(runner.RequestRunner): + """DeerFlow LangGraph API 对话请求器""" + + deerflow_client: client.AsyncDeerFlowClient + + def __init__(self, ap: app.Application, pipeline_config: dict): + self.ap = ap + self.pipeline_config = pipeline_config + + cfg = self.pipeline_config['ai']['deerflow-api'] + + api_base = cfg.get('api-base', '').strip() + if not api_base or not api_base.startswith(('http://', 'https://')): + raise errors.DeerFlowAPIError( + message='DeerFlow API Base URL 格式错误,必须以 http:// 或 https:// 开头', + ) + + self.api_base = api_base + self.api_key = cfg.get('api-key', '') + self.auth_header = cfg.get('auth-header', '') + self.assistant_id = cfg.get('assistant-id', 'lead_agent') + self.model_name = cfg.get('model-name', '') + self.thinking_enabled = bool(cfg.get('thinking-enabled', False)) + self.plan_mode = bool(cfg.get('plan-mode', False)) + self.subagent_enabled = bool(cfg.get('subagent-enabled', False)) + self.max_concurrent_subagents = int(cfg.get('max-concurrent-subagents', 3)) + self.timeout = int(cfg.get('timeout', 300)) + self.recursion_limit = int(cfg.get('recursion-limit', 1000)) + + self.deerflow_client = client.AsyncDeerFlowClient( + api_base=self.api_base, + api_key=self.api_key, + auth_header=self.auth_header, + ) + + # ------------------------------------------------------------------ + # 辅助方法 + # ------------------------------------------------------------------ + + def _fingerprint_message(self, message: dict[str, typing.Any]) -> str: + try: + raw = json.dumps(message, sort_keys=True, ensure_ascii=False, default=str) + except (TypeError, ValueError): + raw = repr(message) + return hashlib.sha1(raw.encode('utf-8', errors='ignore')).hexdigest() + + def _remember_seen_message_id(self, state: _StreamState, msg_id: str) -> None: + if not msg_id or msg_id in state.seen_message_ids: + return + state.seen_message_ids.add(msg_id) + state.seen_message_order.append(msg_id) + while len(state.seen_message_order) > _MAX_VALUES_HISTORY: + dropped = state.seen_message_order.popleft() + state.seen_message_ids.discard(dropped) + + def _extract_new_messages_from_values( + self, + values_messages: list[typing.Any], + state: _StreamState, + ) -> list[dict[str, typing.Any]]: + new_messages: list[dict[str, typing.Any]] = [] + no_id_indexes_seen: set[int] = set() + for idx, msg in enumerate(values_messages): + if not isinstance(msg, dict): + continue + msg_id = stream_utils.get_message_id(msg) + if msg_id: + if msg_id in state.seen_message_ids: + continue + self._remember_seen_message_id(state, msg_id) + new_messages.append(msg) + continue + + no_id_indexes_seen.add(idx) + fp = self._fingerprint_message(msg) + if state.no_id_message_fingerprints.get(idx) == fp: + continue + state.no_id_message_fingerprints[idx] = fp + new_messages.append(msg) + + for idx in list(state.no_id_message_fingerprints.keys()): + if idx not in no_id_indexes_seen: + state.no_id_message_fingerprints.pop(idx, None) + return new_messages + + # ------------------------------------------------------------------ + # 用户输入处理 + # ------------------------------------------------------------------ + + def _build_user_content( + self, + prompt: str, + image_urls: list[str], + ) -> typing.Any: + """构建 LangGraph 兼容的 user content(支持多模态)""" + if not image_urls: + return prompt + + content: list[dict[str, typing.Any]] = [] + if prompt: + content.append({'type': 'text', 'text': prompt}) + for url in image_urls: + if not isinstance(url, str): + continue + url = url.strip() + if not url: + continue + if url.startswith(('http://', 'https://', 'data:')): + content.append({'type': 'image_url', 'image_url': {'url': url}}) + return content if content else prompt + + def _preprocess_user_message( + self, + query: pipeline_query.Query, + ) -> tuple[str, list[str]]: + """提取用户消息的纯文本与图片 URL 列表""" + plain_text = '' + image_urls: list[str] = [] + + if isinstance(query.user_message.content, str): + plain_text = query.user_message.content + elif isinstance(query.user_message.content, list): + for ce in query.user_message.content: + if ce.type == 'text': + plain_text += ce.text + elif ce.type == 'image_base64': + # 转换为 data URI 形式 + b64 = getattr(ce, 'image_base64', '') + if b64: + if not b64.startswith('data:'): + b64 = f'data:image/png;base64,{b64}' + image_urls.append(b64) + elif ce.type == 'image_url': + url = getattr(ce, 'image_url', '') + if url: + image_urls.append(url) + + return plain_text, image_urls + + # ------------------------------------------------------------------ + # 请求构造 + # ------------------------------------------------------------------ + + def _build_messages( + self, + prompt: str, + image_urls: list[str], + system_prompt: str = '', + ) -> list[dict[str, typing.Any]]: + messages: list[dict[str, typing.Any]] = [] + if system_prompt: + messages.append({'role': 'system', 'content': system_prompt}) + messages.append( + { + 'role': 'user', + 'content': self._build_user_content(prompt, image_urls), + } + ) + return messages + + def _build_runtime_configurable(self, thread_id: str) -> dict[str, typing.Any]: + cfg: dict[str, typing.Any] = { + 'thread_id': thread_id, + 'thinking_enabled': self.thinking_enabled, + 'is_plan_mode': self.plan_mode, + 'subagent_enabled': self.subagent_enabled, + } + if self.subagent_enabled: + cfg['max_concurrent_subagents'] = self.max_concurrent_subagents + if self.model_name: + cfg['model_name'] = self.model_name + return cfg + + def _build_payload( + self, + thread_id: str, + prompt: str, + image_urls: list[str], + system_prompt: str = '', + ) -> dict[str, typing.Any]: + runtime_configurable = self._build_runtime_configurable(thread_id) + return { + 'assistant_id': self.assistant_id, + 'input': { + 'messages': self._build_messages(prompt, image_urls, system_prompt), + }, + 'stream_mode': ['values', 'messages-tuple', 'custom'], + # DeerFlow 2.0 从 config.configurable 读取运行时覆盖 + # 同时保留 context 字段做向后兼容 + 'context': dict(runtime_configurable), + 'config': { + 'recursion_limit': self.recursion_limit, + 'configurable': runtime_configurable, + }, + } + + # ------------------------------------------------------------------ + # Session/Thread 管理 + # ------------------------------------------------------------------ + + async def _ensure_thread_id(self, query: pipeline_query.Query) -> str: + """从 query.session 取/创建 deerflow thread_id + + LangBot 使用 `query.session.using_conversation.uuid` 持久化 conversation id, + 我们复用这个字段存储 deerflow thread_id(与 Dify Runner 同样做法)。 + """ + thread_id = query.session.using_conversation.uuid or '' + if thread_id: + return thread_id + + thread = await self.deerflow_client.create_thread(timeout=min(30, self.timeout)) + thread_id = thread.get('thread_id', '') + if not thread_id: + raise errors.DeerFlowAPIError( + message=f'DeerFlow create thread 返回数据缺少 thread_id: {thread}' + ) + + query.session.using_conversation.uuid = thread_id + return thread_id + + # ------------------------------------------------------------------ + # 流式事件处理 + # ------------------------------------------------------------------ + + def _handle_values_event( + self, + data: typing.Any, + state: _StreamState, + ) -> str | None: + """处理 values 事件,返回新的完整文本(增量基础上的全量)""" + values_messages = stream_utils.extract_messages_from_values_data(data) + if not values_messages: + return None + + new_messages: list[dict[str, typing.Any]] = [] + if not state.baseline_initialized: + state.baseline_initialized = True + for idx, msg in enumerate(values_messages): + if not isinstance(msg, dict): + continue + new_messages.append(msg) + msg_id = stream_utils.get_message_id(msg) + if msg_id: + self._remember_seen_message_id(state, msg_id) + continue + state.no_id_message_fingerprints[idx] = self._fingerprint_message(msg) + else: + new_messages = self._extract_new_messages_from_values(values_messages, state) + + latest_text = '' + if new_messages: + state.run_values_messages.extend(new_messages) + if len(state.run_values_messages) > _MAX_VALUES_HISTORY: + state.run_values_messages = state.run_values_messages[ + -_MAX_VALUES_HISTORY: + ] + latest_text = stream_utils.extract_latest_ai_text(state.run_values_messages) + if latest_text: + state.has_values_text = True + latest_clarification = stream_utils.extract_latest_clarification_text( + state.run_values_messages, + ) + if latest_clarification: + state.clarification_text = latest_clarification + + return latest_text or None + + def _handle_message_event( + self, + data: typing.Any, + state: _StreamState, + ) -> str | None: + """处理 messages-tuple 事件,返回增量文本 + + 当 values 事件已经提供完整文本时,跳过 messages-tuple 的增量 + """ + delta = stream_utils.extract_ai_delta_from_event_data(data) + if delta and not state.has_values_text: + state.latest_text += delta + return delta + + maybe_clar = stream_utils.extract_clarification_from_event_data(data) + if maybe_clar: + state.clarification_text = maybe_clar + return None + + def _build_final_text(self, state: _StreamState) -> str: + """构建最终输出文本""" + if state.clarification_text: + return state.clarification_text + + # 优先使用最后一条 AI message 的文本 + latest_ai = stream_utils.extract_latest_ai_message(state.run_values_messages) + if latest_ai: + text = stream_utils.extract_text(latest_ai.get('content')) + if text: + if state.timed_out: + text += ( + f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。' + ) + return text + + if state.latest_text: + text = state.latest_text + if state.timed_out: + text += ( + f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。' + ) + return text + + # 提取任务失败信息作兜底 + failure_text = stream_utils.build_task_failure_summary(state.task_failures) + if failure_text: + return failure_text + + return 'DeerFlow 返回空响应' + + # ------------------------------------------------------------------ + # 主流程 + # ------------------------------------------------------------------ + + async def _stream_messages_chunk( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """流式输出生成器""" + plain_text, image_urls = self._preprocess_user_message(query) + + system_prompt = '' + # LangBot 的 pipeline 通常通过 prompt-preprocess 已注入 system prompt + # 这里保持空,让 prompt-preprocess 的内容作为 user message 一并送给 deerflow + + thread_id = await self._ensure_thread_id(query) + payload = self._build_payload( + thread_id=thread_id, + prompt=plain_text or 'continue', + image_urls=image_urls, + system_prompt=system_prompt, + ) + + state = _StreamState() + prev_text = '' + message_idx = 0 + + try: + async for event in self.deerflow_client.stream_run( + thread_id=thread_id, + payload=payload, + timeout=self.timeout, + ): + event_type = event.get('event') + data = event.get('data') + + if event_type == 'values': + new_full = self._handle_values_event(data, state) + if new_full and new_full != prev_text: + delta = ( + new_full[len(prev_text):] + if new_full.startswith(prev_text) + else new_full + ) + prev_text = new_full + if delta: + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + content=new_full, + is_final=False, + ) + continue + + if event_type in {'messages-tuple', 'messages', 'message'}: + delta = self._handle_message_event(data, state) + if delta: + prev_text = state.latest_text + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + content=prev_text, + is_final=False, + ) + continue + + if event_type == 'custom': + state.task_failures.extend( + stream_utils.extract_task_failures_from_custom_event(data), + ) + continue + + if event_type == 'error': + raise errors.DeerFlowAPIError( + message=f'DeerFlow stream error event: {data}' + ) + + if event_type == 'end': + break + except (asyncio.TimeoutError, TimeoutError): + self.ap.logger.warning( + f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}' + ) + state.timed_out = True + + # 最终消息 + final_text = self._build_final_text(state) + yield provider_message.MessageChunk( + role='assistant', + content=final_text, + is_final=True, + ) + + async def _messages( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """非流式聚合输出""" + plain_text, image_urls = self._preprocess_user_message(query) + + thread_id = await self._ensure_thread_id(query) + payload = self._build_payload( + thread_id=thread_id, + prompt=plain_text or 'continue', + image_urls=image_urls, + ) + + state = _StreamState() + + try: + async for event in self.deerflow_client.stream_run( + thread_id=thread_id, + payload=payload, + timeout=self.timeout, + ): + event_type = event.get('event') + data = event.get('data') + + if event_type == 'values': + self._handle_values_event(data, state) + continue + + if event_type in {'messages-tuple', 'messages', 'message'}: + self._handle_message_event(data, state) + continue + + if event_type == 'custom': + state.task_failures.extend( + stream_utils.extract_task_failures_from_custom_event(data), + ) + continue + + if event_type == 'error': + raise errors.DeerFlowAPIError( + message=f'DeerFlow stream error event: {data}' + ) + + if event_type == 'end': + break + except (asyncio.TimeoutError, TimeoutError): + self.ap.logger.warning( + f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}' + ) + state.timed_out = True + + final_text = self._build_final_text(state) + yield provider_message.Message( + role='assistant', + content=final_text, + ) + + async def run( + self, + query: pipeline_query.Query, + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """主入口:根据 adapter 是否支持流式输出,选择流式或非流式""" + if await query.adapter.is_stream_output_supported(): + msg_idx = 0 + async for msg in self._stream_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + else: + async for msg in self._messages(query): + yield msg diff --git a/src/langbot/pkg/provider/runners/weknoraapi.py b/src/langbot/pkg/provider/runners/weknoraapi.py index d5469986..301be112 100644 --- a/src/langbot/pkg/provider/runners/weknoraapi.py +++ b/src/langbot/pkg/provider/runners/weknoraapi.py @@ -8,7 +8,7 @@ from langbot.pkg.provider import runner from langbot.pkg.core import app import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -from langbot.libs.weknora_api.v1 import client, errors +from langbot.libs.weknora_api import client, errors @runner.runner_class('weknora-api') diff --git a/src/langbot/templates/metadata/pipeline/ai.yaml b/src/langbot/templates/metadata/pipeline/ai.yaml index 1facc60f..16b21069 100644 --- a/src/langbot/templates/metadata/pipeline/ai.yaml +++ b/src/langbot/templates/metadata/pipeline/ai.yaml @@ -51,6 +51,10 @@ stages: label: en_US: WeKnora API zh_Hans: WeKnora API + - name: deerflow-api + label: + en_US: DeerFlow API + zh_Hans: DeerFlow API - name: expire-time label: en_US: Conversation expire time (seconds) @@ -751,3 +755,121 @@ stages: type: string required: false default: '请回答用户的问题。' + - name: deerflow-api + label: + en_US: DeerFlow API + zh_Hans: DeerFlow API + description: + en_US: Configure the DeerFlow LangGraph API of the pipeline + zh_Hans: 配置 DeerFlow LangGraph API + config: + - name: api-base + label: + en_US: API Base URL + zh_Hans: API 基础 URL + description: + en_US: The base URL of the DeerFlow server (e.g. http://127.0.0.1:2026) + zh_Hans: DeerFlow 服务器的基础 URL(例如 http://127.0.0.1:2026) + type: string + required: true + default: 'http://127.0.0.1:2026' + - name: api-key + label: + en_US: API Key + zh_Hans: API 密钥 + description: + en_US: Optional API key for DeerFlow (leave empty if not required) + zh_Hans: DeerFlow 的 API 密钥(如果不需要可留空) + type: string + required: false + default: '' + - name: auth-header + label: + en_US: Auth Header Name + zh_Hans: 鉴权请求头名称 + description: + en_US: Custom auth header name. Leave empty to use "x-api-key" + zh_Hans: 自定义鉴权请求头名称,留空则使用 "x-api-key" + type: string + required: false + default: '' + - name: assistant-id + label: + en_US: Assistant ID + zh_Hans: 助手 ID + description: + en_US: The DeerFlow assistant/graph id (default lead_agent) + zh_Hans: DeerFlow 助手/图 ID(默认 lead_agent) + type: string + required: true + default: 'lead_agent' + - name: model-name + label: + en_US: Model Name + zh_Hans: 模型名称 + description: + en_US: Optional model override forwarded to DeerFlow configurable + zh_Hans: 可选的模型名称覆盖,会作为 configurable 转发给 DeerFlow + type: string + required: false + default: '' + - name: thinking-enabled + label: + en_US: Enable Thinking + zh_Hans: 启用思考 + description: + en_US: Whether to enable DeerFlow thinking mode + zh_Hans: 是否启用 DeerFlow 思考模式 + type: boolean + required: false + default: false + - name: plan-mode + label: + en_US: Plan Mode + zh_Hans: 规划模式 + description: + en_US: Whether to enable DeerFlow plan mode + zh_Hans: 是否启用 DeerFlow 规划模式 + type: boolean + required: false + default: false + - name: subagent-enabled + label: + en_US: Enable Subagents + zh_Hans: 启用子代理 + description: + en_US: Whether to enable parallel subagent execution + zh_Hans: 是否启用并行子代理执行 + type: boolean + required: false + default: false + - name: max-concurrent-subagents + label: + en_US: Max Concurrent Subagents + zh_Hans: 最大并发子代理数 + description: + en_US: Maximum number of concurrent subagents (only effective when subagents are enabled) + zh_Hans: 最大并发子代理数(仅在启用子代理时生效) + type: integer + required: false + default: 3 + - name: timeout + label: + en_US: Timeout + zh_Hans: 超时时间 + description: + en_US: Request timeout in seconds (DeerFlow runs may take a long time) + zh_Hans: 请求超时时间(秒),DeerFlow 运行可能耗时较长 + type: integer + required: false + default: 300 + - name: recursion-limit + label: + en_US: Recursion Limit + zh_Hans: 递归上限 + description: + en_US: LangGraph recursion limit for a single run + zh_Hans: 单次运行的 LangGraph 递归上限 + type: integer + required: false + default: 1000