This commit is contained in:
Typer_Body
2026-06-07 02:17:40 +08:00
parent af451e7006
commit 0c6f71738c
11 changed files with 1138 additions and 1 deletions

View File

@@ -0,0 +1,5 @@
from .client import AsyncDeerFlowClient
from .errors import DeerFlowAPIError
from . import stream_utils
__all__ = ['AsyncDeerFlowClient', 'DeerFlowAPIError', 'stream_utils']

View File

@@ -0,0 +1,203 @@
"""DeerFlow LangGraph HTTP API 客户端
参考 astrbot 的 deerflow_api_client 实现,使用 httpx 适配 LangBot 风格。
"""
from __future__ import annotations
import codecs
import json
import typing
from collections.abc import AsyncGenerator
import httpx
from .errors import DeerFlowAPIError
SSE_MAX_BUFFER_CHARS = 1_048_576
def _normalize_sse_newlines(text: str) -> str:
"""规范化 CRLF/CR 为 LF确保 SSE 块分割稳定"""
return text.replace('\r\n', '\n').replace('\r', '\n')
def _parse_sse_data_lines(data_lines: list[str]) -> typing.Any:
raw_data = '\n'.join(data_lines)
try:
return json.loads(raw_data)
except json.JSONDecodeError:
# 某些 LangGraph 兼容服务端会在单个 SSE 事件中用多个 data 行
# 发送多段 JSON 片段(例如 tuple payload
parsed_lines: list[typing.Any] = []
can_parse_all = True
for line in data_lines:
line = line.strip()
if not line:
continue
try:
parsed_lines.append(json.loads(line))
except json.JSONDecodeError:
can_parse_all = False
break
if can_parse_all and parsed_lines:
return parsed_lines[0] if len(parsed_lines) == 1 else parsed_lines
return raw_data
def _parse_sse_block(block: str) -> dict[str, typing.Any] | None:
if not block.strip():
return None
event_name = 'message'
data_lines: list[str] = []
for line in block.splitlines():
if line.startswith('event:'):
event_name = line[6:].strip()
elif line.startswith('data:'):
data_lines.append(line[5:].lstrip())
if not data_lines:
return None
return {'event': event_name, 'data': _parse_sse_data_lines(data_lines)}
class AsyncDeerFlowClient:
"""DeerFlow LangGraph HTTP API 客户端"""
api_base: str
headers: dict[str, str]
def __init__(
self,
api_base: str = 'http://127.0.0.1:2026',
api_key: str = '',
auth_header: str = '',
) -> None:
self.api_base = api_base.rstrip('/')
self.headers: dict[str, str] = {}
if auth_header:
self.headers['Authorization'] = auth_header
elif api_key:
self.headers['Authorization'] = f'Bearer {api_key}'
async def create_thread(self, timeout: float = 20) -> dict[str, typing.Any]:
"""创建一个新的 LangGraph thread
Returns:
包含 thread_id 等信息的字典
"""
url = f'{self.api_base}/api/langgraph/threads'
payload = {'metadata': {}}
async with httpx.AsyncClient(
trust_env=True,
timeout=timeout,
) as client:
response = await client.post(
url,
headers=self.headers,
json=payload,
)
if response.status_code not in (200, 201):
raise DeerFlowAPIError(
operation='create thread',
status=response.status_code,
body=response.text,
url=url,
)
return response.json()
async def delete_thread(self, thread_id: str, timeout: float = 20) -> None:
"""删除指定 thread"""
url = f'{self.api_base}/api/threads/{thread_id}'
async with httpx.AsyncClient(
trust_env=True,
timeout=timeout,
) as client:
response = await client.delete(url, headers=self.headers)
if response.status_code not in (200, 202, 204, 404):
raise DeerFlowAPIError(
operation='delete thread',
status=response.status_code,
body=response.text,
url=url,
thread_id=thread_id,
)
async def stream_run(
self,
thread_id: str,
payload: dict[str, typing.Any],
timeout: float = 120,
) -> AsyncGenerator[dict[str, typing.Any], None]:
"""运行一次 LangGraph stream 请求,逐事件 yield
Yields:
事件字典 {'event': event_name, 'data': parsed_data}
"""
url = f'{self.api_base}/api/langgraph/threads/{thread_id}/runs/stream'
# 流式请求使用单独的 read timeout 控制
stream_timeout = httpx.Timeout(
connect=min(timeout, 30),
read=timeout,
write=timeout,
pool=timeout,
)
async with httpx.AsyncClient(
trust_env=True,
timeout=stream_timeout,
) as client:
async with client.stream(
'POST',
url,
headers={
**self.headers,
'Accept': 'text/event-stream',
'Content-Type': 'application/json',
},
json=payload,
) as resp:
if resp.status_code != 200:
body = await resp.aread()
raise DeerFlowAPIError(
operation='runs/stream request',
status=resp.status_code,
body=body.decode('utf-8', errors='replace'),
url=url,
thread_id=thread_id,
)
decoder = codecs.getincrementaldecoder('utf-8')('replace')
buffer = ''
async for chunk in resp.aiter_bytes(8192):
buffer += _normalize_sse_newlines(decoder.decode(chunk))
while '\n\n' in buffer:
block, buffer = buffer.split('\n\n', 1)
parsed = _parse_sse_block(block)
if parsed is not None:
yield parsed
if len(buffer) > SSE_MAX_BUFFER_CHARS:
# 缓冲区过大,强制 flush
parsed = _parse_sse_block(buffer)
if parsed is not None:
yield parsed
buffer = ''
# flush 剩余内容
buffer += _normalize_sse_newlines(decoder.decode(b'', final=True))
while '\n\n' in buffer:
block, buffer = buffer.split('\n\n', 1)
parsed = _parse_sse_block(block)
if parsed is not None:
yield parsed
if buffer.strip():
parsed = _parse_sse_block(buffer)
if parsed is not None:
yield parsed

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
class DeerFlowAPIError(Exception):
"""DeerFlow API 请求失败"""
def __init__(
self,
*,
operation: str = '',
status: int = 0,
body: str = '',
url: str = '',
thread_id: str | None = None,
message: str = '',
) -> None:
self.operation = operation
self.status = status
self.body = body
self.url = url
self.thread_id = thread_id
if message:
super().__init__(message)
return
msg = f'DeerFlow {operation} failed: status={status}, url={url}, body={body}'
if thread_id is not None:
msg = (
f'DeerFlow {operation} failed: thread_id={thread_id}, '
f'status={status}, url={url}, body={body}'
)
super().__init__(msg)

View File

@@ -0,0 +1,213 @@
"""DeerFlow LangGraph 流式响应解析工具
参考 astrbot 实现的 deerflow_stream_utils。
"""
from __future__ import annotations
import typing
from collections.abc import Iterable
def extract_text(content: typing.Any) -> str:
"""从消息 content 中提取纯文本"""
if isinstance(content, str):
return content
if isinstance(content, dict):
if isinstance(content.get('text'), str):
return content['text']
if 'content' in content:
return extract_text(content.get('content'))
if 'kwargs' in content and isinstance(content['kwargs'], dict):
return extract_text(content['kwargs'].get('content'))
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
item_type = item.get('type')
if item_type == 'text' and isinstance(item.get('text'), str):
parts.append(item['text'])
elif 'content' in item:
parts.append(extract_text(item['content']))
return '\n'.join([p for p in parts if p]).strip()
return str(content) if content is not None else ''
def extract_messages_from_values_data(data: typing.Any) -> list[typing.Any]:
"""从 values 事件中提取 messages 列表"""
candidates: list[typing.Any] = []
if isinstance(data, dict):
candidates.append(data)
if isinstance(data.get('values'), dict):
candidates.append(data['values'])
elif isinstance(data, list):
candidates.extend([x for x in data if isinstance(x, dict)])
for item in candidates:
messages = item.get('messages')
if isinstance(messages, list):
return messages
return []
def is_ai_message(message: dict[str, typing.Any]) -> bool:
"""判断是否为 AI/assistant 消息"""
role = str(message.get('role', '')).lower()
if role in {'assistant', 'ai'}:
return True
msg_type = str(message.get('type', '')).lower()
if msg_type in {'ai', 'assistant', 'aimessage', 'aimessagechunk'}:
return True
if 'ai' in msg_type and all(
token not in msg_type for token in ('human', 'tool', 'system')
):
return True
return False
def extract_latest_ai_text(messages: Iterable[typing.Any]) -> str:
"""获取最近一条 AI 消息的文本内容"""
if isinstance(messages, (list, tuple)):
iterable = reversed(messages)
else:
iterable = reversed(list(messages))
for msg in iterable:
if not isinstance(msg, dict):
continue
if is_ai_message(msg):
text = extract_text(msg.get('content'))
if text:
return text
return ''
def extract_latest_ai_message(messages: Iterable[typing.Any]) -> dict[str, typing.Any] | None:
"""获取最近一条 AI 消息对象"""
if isinstance(messages, (list, tuple)):
iterable = reversed(messages)
else:
iterable = reversed(list(messages))
for msg in iterable:
if not isinstance(msg, dict):
continue
if is_ai_message(msg):
return msg
return None
def is_clarification_tool_message(message: dict[str, typing.Any]) -> bool:
"""判断是否为澄清问题工具消息"""
msg_type = str(message.get('type', '')).lower()
tool_name = str(message.get('name', '')).lower()
return msg_type == 'tool' and tool_name == 'ask_clarification'
def extract_latest_clarification_text(messages: Iterable[typing.Any]) -> str:
"""提取最近的澄清问题文本"""
if isinstance(messages, (list, tuple)):
iterable = reversed(messages)
else:
iterable = reversed(list(messages))
for msg in iterable:
if not isinstance(msg, dict):
continue
if is_clarification_tool_message(msg):
text = extract_text(msg.get('content'))
if text:
return text
return ''
def get_message_id(message: typing.Any) -> str:
"""提取消息 ID"""
if not isinstance(message, dict):
return ''
msg_id = message.get('id')
return msg_id if isinstance(msg_id, str) else ''
def extract_event_message_obj(data: typing.Any) -> dict[str, typing.Any] | None:
"""从事件 data 中提取消息对象"""
msg_obj = data
if isinstance(data, (list, tuple)) and data:
msg_obj = data[0]
if isinstance(msg_obj, dict) and isinstance(msg_obj.get('data'), dict):
msg_obj = msg_obj['data']
return msg_obj if isinstance(msg_obj, dict) else None
def extract_ai_delta_from_event_data(data: typing.Any) -> str:
"""从 messages-tuple 事件中提取 AI delta 文本"""
msg_obj = extract_event_message_obj(data)
if not msg_obj:
return ''
if is_ai_message(msg_obj):
return extract_text(msg_obj.get('content'))
return ''
def extract_clarification_from_event_data(data: typing.Any) -> str:
"""从事件中提取澄清问题"""
msg_obj = extract_event_message_obj(data)
if not msg_obj:
return ''
if is_clarification_tool_message(msg_obj):
return extract_text(msg_obj.get('content'))
return ''
def _iter_custom_event_items(data: typing.Any) -> list[dict[str, typing.Any]]:
items: list[dict[str, typing.Any]] = []
if isinstance(data, dict):
return [data]
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
items.append(item)
elif isinstance(item, (list, tuple)):
for nested in item:
if isinstance(nested, dict):
items.append(nested)
return items
def extract_task_failures_from_custom_event(data: typing.Any) -> list[str]:
"""从 custom 事件中提取子任务失败信息"""
failures: list[str] = []
for item in _iter_custom_event_items(data):
event_type = str(item.get('type', '')).lower()
if event_type not in {'task_failed', 'task_timed_out'}:
continue
task_id = str(item.get('task_id', '')).strip()
error_text = extract_text(item.get('error')).strip()
if task_id and error_text:
failures.append(f'{task_id}: {error_text}')
elif error_text:
failures.append(error_text)
elif task_id:
failures.append(f'{task_id}: unknown error')
else:
failures.append('unknown task failure')
return failures
def build_task_failure_summary(failures: list[str]) -> str:
"""构建任务失败摘要"""
if not failures:
return ''
deduped: list[str] = []
seen: set[str] = set()
for failure in failures:
if failure not in seen:
seen.add(failure)
deduped.append(failure)
if len(deduped) == 1:
return f'DeerFlow subtask failed: {deduped[0]}'
joined = '\n'.join([f'- {item}' for item in deduped[:5]])
return f'DeerFlow subtasks failed:\n{joined}'

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('deerflow-api-config', 43)
class DeerFlowAPICfgMigration(migration.Migration):
"""DeerFlow API 配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'deerflow-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['deerflow-api'] = {
'api-base': 'http://127.0.0.1:2026',
'api-key': '',
'auth-header': '',
'assistant-id': 'lead_agent',
'model-name': '',
'thinking-enabled': False,
'plan-mode': False,
'subagent-enabled': False,
'max-concurrent-subagents': 3,
'timeout': 300,
'recursion-limit': 1000,
}
await self.ap.provider_cfg.dump_config()

View File

@@ -0,0 +1,531 @@
"""DeerFlow LangGraph API Runner
参考 astrbot 的 deerflow_agent_runner 实现,适配 LangBot 的 Runner 接口。
特点:
- 使用 LangGraph HTTP API 接入 deer-flow 后端
- 自动管理 thread_id按 session 隔离)
- 支持 SSE 流式响应解析
- 支持 streaming/非流式两种输出
- 处理 values / messages-tuple / custom 三种事件
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import typing
import uuid
from collections import deque
from dataclasses import dataclass, field
from langbot.pkg.provider import runner
from langbot.pkg.core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.libs.deerflow_api import client, errors, stream_utils
_MAX_VALUES_HISTORY = 200
@dataclass
class _StreamState:
"""流式状态跟踪"""
latest_text: str = ''
prev_text_for_streaming: str = ''
clarification_text: str = ''
task_failures: list[str] = field(default_factory=list)
seen_message_ids: set[str] = field(default_factory=set)
seen_message_order: deque[str] = field(default_factory=deque)
no_id_message_fingerprints: dict[int, str] = field(default_factory=dict)
baseline_initialized: bool = False
has_values_text: bool = False
run_values_messages: list[dict[str, typing.Any]] = field(default_factory=list)
timed_out: bool = False
@runner.runner_class('deerflow-api')
class DeerFlowAPIRunner(runner.RequestRunner):
"""DeerFlow LangGraph API 对话请求器"""
deerflow_client: client.AsyncDeerFlowClient
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
cfg = self.pipeline_config['ai']['deerflow-api']
api_base = cfg.get('api-base', '').strip()
if not api_base or not api_base.startswith(('http://', 'https://')):
raise errors.DeerFlowAPIError(
message='DeerFlow API Base URL 格式错误,必须以 http:// 或 https:// 开头',
)
self.api_base = api_base
self.api_key = cfg.get('api-key', '')
self.auth_header = cfg.get('auth-header', '')
self.assistant_id = cfg.get('assistant-id', 'lead_agent')
self.model_name = cfg.get('model-name', '')
self.thinking_enabled = bool(cfg.get('thinking-enabled', False))
self.plan_mode = bool(cfg.get('plan-mode', False))
self.subagent_enabled = bool(cfg.get('subagent-enabled', False))
self.max_concurrent_subagents = int(cfg.get('max-concurrent-subagents', 3))
self.timeout = int(cfg.get('timeout', 300))
self.recursion_limit = int(cfg.get('recursion-limit', 1000))
self.deerflow_client = client.AsyncDeerFlowClient(
api_base=self.api_base,
api_key=self.api_key,
auth_header=self.auth_header,
)
# ------------------------------------------------------------------
# 辅助方法
# ------------------------------------------------------------------
def _fingerprint_message(self, message: dict[str, typing.Any]) -> str:
try:
raw = json.dumps(message, sort_keys=True, ensure_ascii=False, default=str)
except (TypeError, ValueError):
raw = repr(message)
return hashlib.sha1(raw.encode('utf-8', errors='ignore')).hexdigest()
def _remember_seen_message_id(self, state: _StreamState, msg_id: str) -> None:
if not msg_id or msg_id in state.seen_message_ids:
return
state.seen_message_ids.add(msg_id)
state.seen_message_order.append(msg_id)
while len(state.seen_message_order) > _MAX_VALUES_HISTORY:
dropped = state.seen_message_order.popleft()
state.seen_message_ids.discard(dropped)
def _extract_new_messages_from_values(
self,
values_messages: list[typing.Any],
state: _StreamState,
) -> list[dict[str, typing.Any]]:
new_messages: list[dict[str, typing.Any]] = []
no_id_indexes_seen: set[int] = set()
for idx, msg in enumerate(values_messages):
if not isinstance(msg, dict):
continue
msg_id = stream_utils.get_message_id(msg)
if msg_id:
if msg_id in state.seen_message_ids:
continue
self._remember_seen_message_id(state, msg_id)
new_messages.append(msg)
continue
no_id_indexes_seen.add(idx)
fp = self._fingerprint_message(msg)
if state.no_id_message_fingerprints.get(idx) == fp:
continue
state.no_id_message_fingerprints[idx] = fp
new_messages.append(msg)
for idx in list(state.no_id_message_fingerprints.keys()):
if idx not in no_id_indexes_seen:
state.no_id_message_fingerprints.pop(idx, None)
return new_messages
# ------------------------------------------------------------------
# 用户输入处理
# ------------------------------------------------------------------
def _build_user_content(
self,
prompt: str,
image_urls: list[str],
) -> typing.Any:
"""构建 LangGraph 兼容的 user content支持多模态"""
if not image_urls:
return prompt
content: list[dict[str, typing.Any]] = []
if prompt:
content.append({'type': 'text', 'text': prompt})
for url in image_urls:
if not isinstance(url, str):
continue
url = url.strip()
if not url:
continue
if url.startswith(('http://', 'https://', 'data:')):
content.append({'type': 'image_url', 'image_url': {'url': url}})
return content if content else prompt
def _preprocess_user_message(
self,
query: pipeline_query.Query,
) -> tuple[str, list[str]]:
"""提取用户消息的纯文本与图片 URL 列表"""
plain_text = ''
image_urls: list[str] = []
if isinstance(query.user_message.content, str):
plain_text = query.user_message.content
elif isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
elif ce.type == 'image_base64':
# 转换为 data URI 形式
b64 = getattr(ce, 'image_base64', '')
if b64:
if not b64.startswith('data:'):
b64 = f'data:image/png;base64,{b64}'
image_urls.append(b64)
elif ce.type == 'image_url':
url = getattr(ce, 'image_url', '')
if url:
image_urls.append(url)
return plain_text, image_urls
# ------------------------------------------------------------------
# 请求构造
# ------------------------------------------------------------------
def _build_messages(
self,
prompt: str,
image_urls: list[str],
system_prompt: str = '',
) -> list[dict[str, typing.Any]]:
messages: list[dict[str, typing.Any]] = []
if system_prompt:
messages.append({'role': 'system', 'content': system_prompt})
messages.append(
{
'role': 'user',
'content': self._build_user_content(prompt, image_urls),
}
)
return messages
def _build_runtime_configurable(self, thread_id: str) -> dict[str, typing.Any]:
cfg: dict[str, typing.Any] = {
'thread_id': thread_id,
'thinking_enabled': self.thinking_enabled,
'is_plan_mode': self.plan_mode,
'subagent_enabled': self.subagent_enabled,
}
if self.subagent_enabled:
cfg['max_concurrent_subagents'] = self.max_concurrent_subagents
if self.model_name:
cfg['model_name'] = self.model_name
return cfg
def _build_payload(
self,
thread_id: str,
prompt: str,
image_urls: list[str],
system_prompt: str = '',
) -> dict[str, typing.Any]:
runtime_configurable = self._build_runtime_configurable(thread_id)
return {
'assistant_id': self.assistant_id,
'input': {
'messages': self._build_messages(prompt, image_urls, system_prompt),
},
'stream_mode': ['values', 'messages-tuple', 'custom'],
# DeerFlow 2.0 从 config.configurable 读取运行时覆盖
# 同时保留 context 字段做向后兼容
'context': dict(runtime_configurable),
'config': {
'recursion_limit': self.recursion_limit,
'configurable': runtime_configurable,
},
}
# ------------------------------------------------------------------
# Session/Thread 管理
# ------------------------------------------------------------------
async def _ensure_thread_id(self, query: pipeline_query.Query) -> str:
"""从 query.session 取/创建 deerflow thread_id
LangBot 使用 `query.session.using_conversation.uuid` 持久化 conversation id
我们复用这个字段存储 deerflow thread_id与 Dify Runner 同样做法)。
"""
thread_id = query.session.using_conversation.uuid or ''
if thread_id:
return thread_id
thread = await self.deerflow_client.create_thread(timeout=min(30, self.timeout))
thread_id = thread.get('thread_id', '')
if not thread_id:
raise errors.DeerFlowAPIError(
message=f'DeerFlow create thread 返回数据缺少 thread_id: {thread}'
)
query.session.using_conversation.uuid = thread_id
return thread_id
# ------------------------------------------------------------------
# 流式事件处理
# ------------------------------------------------------------------
def _handle_values_event(
self,
data: typing.Any,
state: _StreamState,
) -> str | None:
"""处理 values 事件,返回新的完整文本(增量基础上的全量)"""
values_messages = stream_utils.extract_messages_from_values_data(data)
if not values_messages:
return None
new_messages: list[dict[str, typing.Any]] = []
if not state.baseline_initialized:
state.baseline_initialized = True
for idx, msg in enumerate(values_messages):
if not isinstance(msg, dict):
continue
new_messages.append(msg)
msg_id = stream_utils.get_message_id(msg)
if msg_id:
self._remember_seen_message_id(state, msg_id)
continue
state.no_id_message_fingerprints[idx] = self._fingerprint_message(msg)
else:
new_messages = self._extract_new_messages_from_values(values_messages, state)
latest_text = ''
if new_messages:
state.run_values_messages.extend(new_messages)
if len(state.run_values_messages) > _MAX_VALUES_HISTORY:
state.run_values_messages = state.run_values_messages[
-_MAX_VALUES_HISTORY:
]
latest_text = stream_utils.extract_latest_ai_text(state.run_values_messages)
if latest_text:
state.has_values_text = True
latest_clarification = stream_utils.extract_latest_clarification_text(
state.run_values_messages,
)
if latest_clarification:
state.clarification_text = latest_clarification
return latest_text or None
def _handle_message_event(
self,
data: typing.Any,
state: _StreamState,
) -> str | None:
"""处理 messages-tuple 事件,返回增量文本
当 values 事件已经提供完整文本时,跳过 messages-tuple 的增量
"""
delta = stream_utils.extract_ai_delta_from_event_data(data)
if delta and not state.has_values_text:
state.latest_text += delta
return delta
maybe_clar = stream_utils.extract_clarification_from_event_data(data)
if maybe_clar:
state.clarification_text = maybe_clar
return None
def _build_final_text(self, state: _StreamState) -> str:
"""构建最终输出文本"""
if state.clarification_text:
return state.clarification_text
# 优先使用最后一条 AI message 的文本
latest_ai = stream_utils.extract_latest_ai_message(state.run_values_messages)
if latest_ai:
text = stream_utils.extract_text(latest_ai.get('content'))
if text:
if state.timed_out:
text += (
f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。'
)
return text
if state.latest_text:
text = state.latest_text
if state.timed_out:
text += (
f'\n\nDeerFlow stream 在 {self.timeout}s 后超时,返回部分结果。'
)
return text
# 提取任务失败信息作兜底
failure_text = stream_utils.build_task_failure_summary(state.task_failures)
if failure_text:
return failure_text
return 'DeerFlow 返回空响应'
# ------------------------------------------------------------------
# 主流程
# ------------------------------------------------------------------
async def _stream_messages_chunk(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""流式输出生成器"""
plain_text, image_urls = self._preprocess_user_message(query)
system_prompt = ''
# LangBot 的 pipeline 通常通过 prompt-preprocess 已注入 system prompt
# 这里保持空,让 prompt-preprocess 的内容作为 user message 一并送给 deerflow
thread_id = await self._ensure_thread_id(query)
payload = self._build_payload(
thread_id=thread_id,
prompt=plain_text or 'continue',
image_urls=image_urls,
system_prompt=system_prompt,
)
state = _StreamState()
prev_text = ''
message_idx = 0
try:
async for event in self.deerflow_client.stream_run(
thread_id=thread_id,
payload=payload,
timeout=self.timeout,
):
event_type = event.get('event')
data = event.get('data')
if event_type == 'values':
new_full = self._handle_values_event(data, state)
if new_full and new_full != prev_text:
delta = (
new_full[len(prev_text):]
if new_full.startswith(prev_text)
else new_full
)
prev_text = new_full
if delta:
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
content=new_full,
is_final=False,
)
continue
if event_type in {'messages-tuple', 'messages', 'message'}:
delta = self._handle_message_event(data, state)
if delta:
prev_text = state.latest_text
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
content=prev_text,
is_final=False,
)
continue
if event_type == 'custom':
state.task_failures.extend(
stream_utils.extract_task_failures_from_custom_event(data),
)
continue
if event_type == 'error':
raise errors.DeerFlowAPIError(
message=f'DeerFlow stream error event: {data}'
)
if event_type == 'end':
break
except (asyncio.TimeoutError, TimeoutError):
self.ap.logger.warning(
f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}'
)
state.timed_out = True
# 最终消息
final_text = self._build_final_text(state)
yield provider_message.MessageChunk(
role='assistant',
content=final_text,
is_final=True,
)
async def _messages(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""非流式聚合输出"""
plain_text, image_urls = self._preprocess_user_message(query)
thread_id = await self._ensure_thread_id(query)
payload = self._build_payload(
thread_id=thread_id,
prompt=plain_text or 'continue',
image_urls=image_urls,
)
state = _StreamState()
try:
async for event in self.deerflow_client.stream_run(
thread_id=thread_id,
payload=payload,
timeout=self.timeout,
):
event_type = event.get('event')
data = event.get('data')
if event_type == 'values':
self._handle_values_event(data, state)
continue
if event_type in {'messages-tuple', 'messages', 'message'}:
self._handle_message_event(data, state)
continue
if event_type == 'custom':
state.task_failures.extend(
stream_utils.extract_task_failures_from_custom_event(data),
)
continue
if event_type == 'error':
raise errors.DeerFlowAPIError(
message=f'DeerFlow stream error event: {data}'
)
if event_type == 'end':
break
except (asyncio.TimeoutError, TimeoutError):
self.ap.logger.warning(
f'DeerFlow stream timed out after {self.timeout}s for thread_id={thread_id}'
)
state.timed_out = True
final_text = self._build_final_text(state)
yield provider_message.Message(
role='assistant',
content=final_text,
)
async def run(
self,
query: pipeline_query.Query,
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""主入口:根据 adapter 是否支持流式输出,选择流式或非流式"""
if await query.adapter.is_stream_output_supported():
msg_idx = 0
async for msg in self._stream_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
else:
async for msg in self._messages(query):
yield msg

View File

@@ -8,7 +8,7 @@ from langbot.pkg.provider import runner
from langbot.pkg.core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.libs.weknora_api.v1 import client, errors
from langbot.libs.weknora_api import client, errors
@runner.runner_class('weknora-api')

View File

@@ -51,6 +51,10 @@ stages:
label:
en_US: WeKnora API
zh_Hans: WeKnora API
- name: deerflow-api
label:
en_US: DeerFlow API
zh_Hans: DeerFlow API
- name: expire-time
label:
en_US: Conversation expire time (seconds)
@@ -751,3 +755,121 @@ stages:
type: string
required: false
default: '请回答用户的问题。'
- name: deerflow-api
label:
en_US: DeerFlow API
zh_Hans: DeerFlow API
description:
en_US: Configure the DeerFlow LangGraph API of the pipeline
zh_Hans: 配置 DeerFlow LangGraph API
config:
- name: api-base
label:
en_US: API Base URL
zh_Hans: API 基础 URL
description:
en_US: The base URL of the DeerFlow server (e.g. http://127.0.0.1:2026)
zh_Hans: DeerFlow 服务器的基础 URL例如 http://127.0.0.1:2026
type: string
required: true
default: 'http://127.0.0.1:2026'
- name: api-key
label:
en_US: API Key
zh_Hans: API 密钥
description:
en_US: Optional API key for DeerFlow (leave empty if not required)
zh_Hans: DeerFlow 的 API 密钥(如果不需要可留空)
type: string
required: false
default: ''
- name: auth-header
label:
en_US: Auth Header Name
zh_Hans: 鉴权请求头名称
description:
en_US: Custom auth header name. Leave empty to use "x-api-key"
zh_Hans: 自定义鉴权请求头名称,留空则使用 "x-api-key"
type: string
required: false
default: ''
- name: assistant-id
label:
en_US: Assistant ID
zh_Hans: 助手 ID
description:
en_US: The DeerFlow assistant/graph id (default lead_agent)
zh_Hans: DeerFlow 助手/图 ID默认 lead_agent
type: string
required: true
default: 'lead_agent'
- name: model-name
label:
en_US: Model Name
zh_Hans: 模型名称
description:
en_US: Optional model override forwarded to DeerFlow configurable
zh_Hans: 可选的模型名称覆盖,会作为 configurable 转发给 DeerFlow
type: string
required: false
default: ''
- name: thinking-enabled
label:
en_US: Enable Thinking
zh_Hans: 启用思考
description:
en_US: Whether to enable DeerFlow thinking mode
zh_Hans: 是否启用 DeerFlow 思考模式
type: boolean
required: false
default: false
- name: plan-mode
label:
en_US: Plan Mode
zh_Hans: 规划模式
description:
en_US: Whether to enable DeerFlow plan mode
zh_Hans: 是否启用 DeerFlow 规划模式
type: boolean
required: false
default: false
- name: subagent-enabled
label:
en_US: Enable Subagents
zh_Hans: 启用子代理
description:
en_US: Whether to enable parallel subagent execution
zh_Hans: 是否启用并行子代理执行
type: boolean
required: false
default: false
- name: max-concurrent-subagents
label:
en_US: Max Concurrent Subagents
zh_Hans: 最大并发子代理数
description:
en_US: Maximum number of concurrent subagents (only effective when subagents are enabled)
zh_Hans: 最大并发子代理数(仅在启用子代理时生效)
type: integer
required: false
default: 3
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
description:
en_US: Request timeout in seconds (DeerFlow runs may take a long time)
zh_Hans: 请求超时时间DeerFlow 运行可能耗时较长
type: integer
required: false
default: 300
- name: recursion-limit
label:
en_US: Recursion Limit
zh_Hans: 递归上限
description:
en_US: LangGraph recursion limit for a single run
zh_Hans: 单次运行的 LangGraph 递归上限
type: integer
required: false
default: 1000