feat(agent-runner): enforce typed host permissions

This commit is contained in:
huanghuoguoguo
2026-06-10 22:36:23 +08:00
parent 4e016ad23e
commit 86ec12a391
41 changed files with 584 additions and 3886 deletions
-45
View File
@@ -1,45 +0,0 @@
from __future__ import annotations
import abc
import typing
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
preregistered_runners: list[typing.Type[RequestRunner]] = []
def runner_class(name: str):
"""注册一个请求运行器"""
def decorator(cls: typing.Type[RequestRunner]) -> typing.Type[RequestRunner]:
cls.name = name
preregistered_runners.append(cls)
return cls
return decorator
class RequestRunner(abc.ABC):
"""请求运行器"""
name: str = None
ap: app.Application
pipeline_config: dict
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
@abc.abstractmethod
async def run(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""运行请求"""
pass
-295
View File
@@ -1,295 +0,0 @@
"""
Legacy Coze API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/coze-agent` plugin instead.
"""
from __future__ import annotations
import typing
import json
import base64
from langbot.pkg.provider import runner
from langbot.pkg.core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.utils import image
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.libs.coze_server_api.client import AsyncCozeAPIClient
@runner.runner_class('coze-api')
class CozeAPIRunner(runner.RequestRunner):
"""Coze API 对话请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.pipeline_config = pipeline_config
self.ap = ap
self.agent_token = pipeline_config['ai']['coze-api']['api-key']
self.bot_id = pipeline_config['ai']['coze-api'].get('bot-id')
self.chat_timeout = pipeline_config['ai']['coze-api'].get('timeout')
self.auto_save_history = pipeline_config['ai']['coze-api'].get('auto_save_history')
self.api_base = pipeline_config['ai']['coze-api'].get('api-base')
self.coze = AsyncCozeAPIClient(self.agent_token, self.api_base)
def _process_thinking_content(
self,
content: str,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
Returns:
(处理后的内容, 提取的思维链内容)
"""
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
thinking_content = ''
# 从 content 中提取 <think> 标签内容
if content and '<think>' in content and '</think>' in content:
import re
think_pattern = r'<think>(.*?)</think>'
think_matches = re.findall(think_pattern, content, re.DOTALL)
if think_matches:
thinking_content = '\n'.join(think_matches)
# 移除 content 中的 <think> 标签
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# 根据 remove_think 参数决定是否保留思维链
if remove_think:
return content, ''
else:
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
if thinking_content:
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content
async def _preprocess_user_message(self, query: pipeline_query.Query) -> list[dict]:
"""预处理用户消息,转换为Coze消息格式
Returns:
list[dict]: Coze消息列表
"""
messages = []
if isinstance(query.user_message.content, list):
# 多模态消息处理
content_parts = []
for ce in query.user_message.content:
if ce.type == 'text':
content_parts.append({'type': 'text', 'text': ce.text})
elif ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
file_bytes = base64.b64decode(image_b64)
file_id = await self._get_file_id(file_bytes)
content_parts.append({'type': 'image', 'file_id': file_id})
elif ce.type == 'file':
# 处理文件,上传到Coze
file_id = await self._get_file_id(ce.file)
content_parts.append({'type': 'file', 'file_id': file_id})
# 创建多模态消息
if content_parts:
messages.append(
{
'role': 'user',
'content': json.dumps(content_parts),
'content_type': 'object_string',
'meta_data': None,
}
)
elif isinstance(query.user_message.content, str):
# 纯文本消息
messages.append(
{'role': 'user', 'content': query.user_message.content, 'content_type': 'text', 'meta_data': None}
)
return messages
async def _get_file_id(self, file) -> str:
"""上传文件到Coze服务
Args:
file: 文件
Returns:
str: 文件ID
"""
file_id = await self.coze.upload(file=file)
return file_id
async def _chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手(非流式)
注意:由于cozepy没有提供非流式API,这里使用流式API并在结束后一次性返回完整内容
"""
user_id = f'{query.launcher_type.value}_{query.launcher_id}'
# 预处理用户消息
additional_messages = await self._preprocess_user_message(query)
# 获取会话ID
conversation_id = None
# 收集完整内容
full_content = ''
full_reasoning = ''
try:
# 调用Coze API流式接口
async for chunk in self.coze.chat_messages(
bot_id=self.bot_id,
user_id=user_id,
additional_messages=additional_messages,
conversation_id=conversation_id,
timeout=self.chat_timeout,
auto_save_history=self.auto_save_history,
stream=True,
):
self.ap.logger.debug(f'coze-chat-stream: {chunk}')
event_type = chunk.get('event')
data = chunk.get('data', {})
# Removed debug print statement to avoid cluttering logs in production
if event_type == 'conversation.message.delta':
# 收集内容
if 'content' in data:
full_content += data.get('content', '')
# 收集推理内容(如果有)
if 'reasoning_content' in data:
full_reasoning += data.get('reasoning_content', '')
elif event_type.split('.')[-1] == 'done': # 本地部署coze时,结束event不为done
# 保存会话ID
if 'conversation_id' in data:
conversation_id = data.get('conversation_id')
elif event_type == 'error':
# 处理错误
error_msg = f'Coze API错误: {data.get("message", "未知错误")}'
yield provider_message.Message(
role='assistant',
content=error_msg,
)
return
# 处理思维链内容
content, thinking_content = self._process_thinking_content(full_content)
if full_reasoning:
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
if not remove_think:
content = f'<think>\n{full_reasoning}\n</think>\n{content}'.strip()
# 一次性返回完整内容
yield provider_message.Message(
role='assistant',
content=content,
)
# 保存会话ID
if conversation_id and query.session.using_conversation:
query.session.using_conversation.uuid = conversation_id
except Exception as e:
self.ap.logger.error(f'Coze API错误: {str(e)}')
yield provider_message.Message(
role='assistant',
content=f'Coze API调用失败: {str(e)}',
)
async def _chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手(流式)"""
user_id = f'{query.launcher_type.value}_{query.launcher_id}'
# 预处理用户消息
additional_messages = await self._preprocess_user_message(query)
# 获取会话ID
conversation_id = None
start_reasoning = False
stop_reasoning = False
message_idx = 1
is_final = False
full_content = ''
remove_think = self.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
try:
# 调用Coze API流式接口
async for chunk in self.coze.chat_messages(
bot_id=self.bot_id,
user_id=user_id,
additional_messages=additional_messages,
conversation_id=conversation_id,
timeout=self.chat_timeout,
auto_save_history=self.auto_save_history,
stream=True,
):
self.ap.logger.debug(f'coze-chat-stream-chunk: {chunk}')
event_type = chunk.get('event')
data = chunk.get('data', {})
content = ''
if event_type == 'conversation.message.delta':
message_idx += 1
# 处理内容增量
if 'reasoning_content' in data and not remove_think:
reasoning_content = data.get('reasoning_content', '')
if reasoning_content and not start_reasoning:
content = '<think/>\n'
start_reasoning = True
content += reasoning_content
if 'content' in data:
if data.get('content', ''):
content += data.get('content', '')
if not stop_reasoning and start_reasoning:
content = f'</think>\n{content}'
stop_reasoning = True
elif event_type.split('.')[-1] == 'done': # 本地部署coze时,结束event不为done
# 保存会话ID
if 'conversation_id' in data:
conversation_id = data.get('conversation_id')
if query.session.using_conversation:
query.session.using_conversation.uuid = conversation_id
is_final = True
elif event_type == 'error':
# 处理错误
error_msg = f'Coze API错误: {data.get("message", "未知错误")}'
yield provider_message.MessageChunk(role='assistant', content=error_msg, finish_reason='error')
return
full_content += content
if message_idx % 8 == 0 or is_final:
if full_content:
yield provider_message.MessageChunk(role='assistant', content=full_content, is_final=is_final)
except Exception as e:
self.ap.logger.error(f'Coze API流式调用错误: {str(e)}')
yield provider_message.MessageChunk(
role='assistant', content=f'Coze API流式调用失败: {str(e)}', finish_reason='error'
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行"""
msg_seq = 0
if await query.adapter.is_stream_output_supported():
async for msg in self._chat_messages_chunk(query):
if isinstance(msg, provider_message.MessageChunk):
msg_seq += 1
msg.msg_sequence = msg_seq
yield msg
else:
async for msg in self._chat_messages(query):
yield msg
@@ -1,362 +0,0 @@
"""
Legacy DashScope (阿里云百炼) API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/dashscope-agent` plugin instead.
"""
from __future__ import annotations
import typing
import re
import dashscope
from .. import runner
from ...core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class DashscopeAPIError(Exception):
"""Dashscope API 请求失败"""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
@runner.runner_class('dashscope-app-api')
class DashScopeAPIRunner(runner.RequestRunner):
"阿里云百炼DashsscopeAPI对话请求器"
# 运行器内部使用的配置
app_type: str # 应用类型
app_id: str # 应用ID
api_key: str # API Key
references_quote: (
str # 引用资料提示(当展示回答来源功能开启时,这个变量会作为引用资料名前的提示,可在provider.json中配置)
)
def __init__(self, ap: app.Application, pipeline_config: dict):
"""初始化"""
self.ap = ap
self.pipeline_config = pipeline_config
valid_app_types = ['agent', 'workflow']
self.app_type = self.pipeline_config['ai']['dashscope-app-api']['app-type']
# 检查配置文件中使用的应用类型是否支持
if self.app_type not in valid_app_types:
raise DashscopeAPIError(f'不支持的 Dashscope 应用类型: {self.app_type}')
# 初始化Dashscope 参数配置
self.app_id = self.pipeline_config['ai']['dashscope-app-api']['app-id']
self.api_key = self.pipeline_config['ai']['dashscope-app-api']['api-key']
self.references_quote = self.pipeline_config['ai']['dashscope-app-api']['references_quote']
def _replace_references(self, text, references_dict):
"""阿里云百炼平台的自定义应用支持资料引用,此函数可以将引用标签替换为参考资料"""
# 匹配 <ref>[index_id]</ref> 形式的字符串
pattern = re.compile(r'<ref>\[(.*?)\]</ref>')
def replacement(match):
# 获取引用编号
ref_key = match.group(1)
if ref_key in references_dict:
# 如果有对应的参考资料按照provider.json中的reference_quote返回提示,来自哪个参考资料文件
return f'({self.references_quote} {references_dict[ref_key]})'
else:
# 如果没有对应的参考资料,保留原样
return match.group(0)
# 使用 re.sub() 进行替换
return pattern.sub(replacement, text)
async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[str]]:
"""预处理用户消息,提取纯文本,阿里云提供的上传文件方法过于复杂,暂不支持上传文件(包括图片)"""
plain_text = ''
image_ids = []
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
# 暂时不支持上传图片,保留代码以便后续扩展
# elif ce.type == "image_base64":
# image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
# file_bytes = base64.b64decode(image_b64)
# file = ("img.png", file_bytes, f"image/{image_format}")
# file_upload_resp = await self.dify_client.upload_file(
# file,
# f"{query.session.launcher_type.value}_{query.session.launcher_id}",
# )
# image_id = file_upload_resp["id"]
# image_ids.append(image_id)
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
return plain_text, image_ids
async def _agent_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""Dashscope 智能体对话请求"""
# 局部变量
chunk = None # 流式传输的块
pending_content = '' # 待处理的Agent输出内容
references_dict = {} # 用于存储引用编号和对应的参考资料
plain_text = '' # 用户输入的纯文本信息
image_ids = [] # 用户输入的图片ID列表 (暂不支持)
think_start = False
think_end = False
plain_text, image_ids = await self._preprocess_user_message(query)
has_thoughts = True # 获取思考过程
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
if remove_think:
has_thoughts = False
# 发送对话请求
response = dashscope.Application.call(
api_key=self.api_key, # 智能体应用的API Key
app_id=self.app_id, # 智能体应用的ID
prompt=plain_text, # 用户输入的文本信息
stream=True, # 流式输出
incremental_output=True, # 增量输出,使用流式输出需要开启增量输出
session_id=query.session.using_conversation.uuid, # 会话ID用于,多轮对话
enable_thinking=has_thoughts,
has_thoughts=has_thoughts,
# rag_options={ # 主要用于文件交互,暂不支持
# "session_file_ids": ["FILE_ID1"], # FILE_ID1 替换为实际的临时文件ID,逗号隔开多个
# }
)
idx_chunk = 0
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
if is_stream:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
idx_chunk += 1
# 获取流式传输的output
stream_output = chunk.get('output', {})
stream_think = stream_output.get('thoughts') or []
if stream_think and stream_think[0].get('thought'):
if not think_start:
think_start = True
pending_content += f'<think>\n{stream_think[0].get("thought")}'
else:
# 继续输出 reasoning_content
pending_content += stream_think[0].get('thought')
elif think_start and (not stream_think or stream_think[0].get('thought') == '') and not think_end:
think_end = True
pending_content += '\n</think>\n'
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
# 是否是流式最后一个chunk
is_final = False if stream_output.get('finish_reason', False) == 'null' else True
# 获取模型传出的参考资料列表
references_dict_list = stream_output.get('doc_references', [])
# 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict)
if idx_chunk % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=pending_content,
is_final=is_final,
)
# 保存当前会话的session_id用于下次对话的语境
query.session.using_conversation.uuid = stream_output.get('session_id')
else:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
idx_chunk += 1
# 获取流式传输的output
stream_output = chunk.get('output', {})
stream_think = stream_output.get('thoughts') or []
if stream_think and stream_think[0].get('thought'):
if not think_start:
think_start = True
pending_content += f'<think>\n{stream_think[0].get("thought")}'
else:
# 继续输出 reasoning_content
pending_content += stream_think[0].get('thought')
elif think_start and (not stream_think or stream_think[0].get('thought') == '') and not think_end:
think_end = True
pending_content += '\n</think>\n'
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
# 保存当前会话的session_id用于下次对话的语境
query.session.using_conversation.uuid = stream_output.get('session_id')
# 获取模型传出的参考资料列表
references_dict_list = stream_output.get('doc_references', [])
# 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict)
yield provider_message.Message(
role='assistant',
content=pending_content,
)
async def _workflow_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""Dashscope 工作流对话请求"""
# 局部变量
chunk = None # 流式传输的块
pending_content = '' # 待处理的Agent输出内容
references_dict = {} # 用于存储引用编号和对应的参考资料
plain_text = '' # 用户输入的纯文本信息
image_ids = [] # 用户输入的图片ID列表 (暂不支持)
plain_text, image_ids = await self._preprocess_user_message(query)
biz_params = {}
biz_params.update(query.variables)
# 发送对话请求
response = dashscope.Application.call(
api_key=self.api_key, # 智能体应用的API Key
app_id=self.app_id, # 智能体应用的ID
prompt=plain_text, # 用户输入的文本信息
stream=True, # 流式输出
incremental_output=True, # 增量输出,使用流式输出需要开启增量输出
session_id=query.session.using_conversation.uuid, # 会话ID用于,多轮对话
biz_params=biz_params, # 工作流应用的自定义输入参数传递
flow_stream_mode='message_format', # 消息模式,输出/结束节点的流式结果
# rag_options={ # 主要用于文件交互,暂不支持
# "session_file_ids": ["FILE_ID1"], # FILE_ID1 替换为实际的临时文件ID,逗号隔开多个
# }
)
# 处理API返回的流式输出
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
idx_chunk = 0
if is_stream:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
idx_chunk += 1
# 获取流式传输的output
stream_output = chunk.get('output', {})
if stream_output.get('workflow_message') is not None:
pending_content += stream_output.get('workflow_message').get('message').get('content')
# if stream_output.get('text') is not None:
# pending_content += stream_output.get('text')
is_final = False if stream_output.get('finish_reason', False) == 'null' else True
# 获取模型传出的参考资料列表
references_dict_list = stream_output.get('doc_references', [])
# 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict)
if idx_chunk % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=pending_content,
is_final=is_final,
)
# 保存当前会话的session_id用于下次对话的语境
query.session.using_conversation.uuid = stream_output.get('session_id')
else:
for chunk in response:
if chunk.get('status_code') != 200:
raise DashscopeAPIError(
f'Dashscope API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if not chunk:
continue
# 获取流式传输的output
stream_output = chunk.get('output', {})
if stream_output.get('text') is not None:
pending_content += stream_output.get('text')
is_final = False if stream_output.get('finish_reason', False) == 'null' else True
# 保存当前会话的session_id用于下次对话的语境
query.session.using_conversation.uuid = stream_output.get('session_id')
# 获取模型传出的参考资料列表
references_dict_list = stream_output.get('doc_references', [])
# 从模型传出的参考资料信息中提取用于替换的字典
if references_dict_list is not None:
for doc in references_dict_list:
if doc.get('index_id') is not None:
references_dict[doc.get('index_id')] = doc.get('doc_name')
# 将参考资料替换到文本中
pending_content = self._replace_references(pending_content, references_dict)
yield provider_message.Message(
role='assistant',
content=pending_content,
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行"""
msg_seq = 0
if self.app_type == 'agent':
async for msg in self._agent_messages(query):
if isinstance(msg, provider_message.MessageChunk):
msg_seq += 1
msg.msg_sequence = msg_seq
yield msg
elif self.app_type == 'workflow':
async for msg in self._workflow_messages(query):
if isinstance(msg, provider_message.MessageChunk):
msg_seq += 1
msg.msg_sequence = msg_seq
yield msg
else:
raise DashscopeAPIError(f'不支持的 Dashscope 应用类型: {self.app_type}')
@@ -1,782 +0,0 @@
"""
Legacy Dify Service API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/dify-agent` plugin instead.
"""
from __future__ import annotations
import typing
import json
import uuid
import base64
import mimetypes
from langbot.pkg.provider import runner
from langbot.pkg.core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.utils import image
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.libs.dify_service_api.v1 import client, errors
import httpx
@runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器"""
dify_client: client.AsyncDifyServiceClient
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
valid_app_types = ['chat', 'agent', 'workflow']
if self.pipeline_config['ai']['dify-service-api']['app-type'] not in valid_app_types:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)
api_key = self.pipeline_config['ai']['dify-service-api']['api-key']
self.dify_client = client.AsyncDifyServiceClient(
api_key=api_key,
base_url=self.pipeline_config['ai']['dify-service-api']['base-url'],
)
def _process_thinking_content(
self,
content: str,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
Returns:
(处理后的内容, 提取的思维链内容)
"""
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
thinking_content = ''
# 从 content 中提取 <think> 标签内容
if content and '<think>' in content and '</think>' in content:
import re
think_pattern = r'<think>(.*?)</think>'
think_matches = re.findall(think_pattern, content, re.DOTALL)
if think_matches:
thinking_content = '\n'.join(think_matches)
# 移除 content 中的 <think> 标签
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# 3. 根据 remove_think 参数决定是否保留思维链
if remove_think:
return content, ''
else:
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
if thinking_content:
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content
def _extract_dify_text_output(self, value: typing.Any) -> str:
"""Extract text content from Dify output payload."""
if value is None:
return ''
if isinstance(value, dict):
content = value.get('content')
if isinstance(content, str):
return content
return json.dumps(value, ensure_ascii=False)
if isinstance(value, str):
text = value.strip()
if not text:
return ''
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return value
if isinstance(parsed, dict) and isinstance(parsed.get('content'), str):
return parsed['content']
return value
return str(value)
async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[dict]]:
"""预处理用户消息,提取纯文本,并将图片/文件上传到 Dify 服务
Returns:
tuple[str, list[dict]]: 纯文本和上传后的文件描述(包含 type 与 id)
"""
plain_text = ''
upload_files: list[dict] = []
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
async def upload_file_bytes(file_name: str, file_bytes: bytes, content_type: str) -> str:
file_name = file_name or 'file'
content_type = content_type or 'application/octet-stream'
file = (file_name, file_bytes, content_type)
resp = await self.dify_client.upload_file(file, user_tag)
return resp['id']
async def download_file(file_url: str) -> tuple[bytes, str]:
"""Download file from url (supports data url)."""
async with httpx.AsyncClient() as client_session:
resp = await client_session.get(file_url)
resp.raise_for_status()
content_type = (
resp.headers.get('content-type') or mimetypes.guess_type(file_url)[0] or 'application/octet-stream'
)
return resp.content, content_type
def _detect_file_type(content_type: str) -> str:
"""Map MIME to dify file type."""
if content_type and content_type.startswith('image/'):
return 'image'
if content_type and content_type.startswith('audio/'):
return 'audio'
if content_type and content_type.startswith('video/'):
return 'video'
return 'document'
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
elif ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
file_bytes = base64.b64decode(image_b64)
image_id = await upload_file_bytes(f'img.{image_format}', file_bytes, f'image/{image_format}')
upload_files.append({'type': 'image', 'id': image_id})
elif ce.type == 'file_url':
file_url = getattr(ce, 'file_url', None)
file_name = getattr(ce, 'file_name', None) or 'file'
try:
file_bytes, content_type = await download_file(file_url)
file_id = await upload_file_bytes(file_name, file_bytes, content_type)
file_type = _detect_file_type(content_type)
upload_files.append({'type': file_type, 'id': file_id})
except Exception as e:
self.ap.logger.warning(f'dify file upload failed: {e}')
elif ce.type == 'file_base64':
file_name = getattr(ce, 'file_name', None) or 'file'
header, b64_data = ce.file_base64.split(',', 1)
content_type = 'application/octet-stream'
if ';' in header:
content_type = header.split(';')[0][5:] or content_type
file_bytes = base64.b64decode(b64_data)
file_id = await upload_file_bytes(file_name, file_bytes, content_type)
file_type = _detect_file_type(content_type)
upload_files.append({'type': file_type, 'id': file_id})
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
plain_text = plain_text if plain_text else self.pipeline_config['ai']['dify-service-api']['base-prompt']
return plain_text, upload_files
async def _chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
mode = 'basic' # 标记是基础编排还是工作流编排
basic_mode_pending_chunk = ''
inputs = {}
inputs.update(query.variables)
chunk = None # 初始化chunk变量,防止在没有响应时引用错误
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_started':
mode = 'workflow'
if mode == 'workflow':
if chunk['event'] == 'node_finished':
if chunk['data']['node_type'] == 'answer':
answer = self._extract_dify_text_output(chunk['data']['outputs'].get('answer'))
content, _ = self._process_thinking_content(answer)
yield provider_message.Message(
role='assistant',
content=content,
)
elif mode == 'basic':
if chunk['event'] == 'message':
basic_mode_pending_chunk += chunk['answer']
elif chunk['event'] == 'message_end':
content, _ = self._process_thinking_content(basic_mode_pending_chunk)
yield provider_message.Message(
role='assistant',
content=content,
)
basic_mode_pending_chunk = ''
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _agent_chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = []
inputs = {}
inputs.update(query.variables)
pending_agent_message = ''
chunk = None # 初始化chunk变量,防止在没有响应时引用错误
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
response_mode='streaming',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-agent-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'agent_message' or chunk['event'] == 'message':
pending_agent_message += chunk['answer']
else:
if pending_agent_message.strip() != '':
pending_agent_message = pending_agent_message.replace('</details>Action:', '</details>')
content, _ = self._process_thinking_content(pending_agent_message)
yield provider_message.Message(
role='assistant',
content=content,
)
pending_agent_message = ''
if chunk['event'] == 'agent_thought':
if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过
continue
if chunk['tool']:
msg = provider_message.Message(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id=chunk['id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['tool'],
arguments=json.dumps({}),
),
)
],
)
yield msg
if chunk['event'] == 'message_file':
if chunk['type'] == 'image' and chunk['belongs_to'] == 'assistant':
# 检查URL是否已经是完整的连接
if chunk['url'].startswith('http://') or chunk['url'].startswith('https://'):
image_url = chunk['url']
else:
base_url = self.dify_client.base_url
if base_url.endswith('/v1'):
base_url = base_url[:-3]
image_url = base_url + chunk['url']
yield provider_message.Message(
role='assistant',
content=[provider_message.ContentElement.from_image_url(image_url)],
)
if chunk['event'] == 'error':
raise errors.DifyAPIError('dify 服务错误: ' + chunk['message'])
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _workflow_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流"""
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
query.variables['conversation_id'] = query.session.using_conversation.uuid
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = ['text_chunk', 'workflow_started']
inputs = { # these variables are legacy variables, we need to keep them for compatibility
'langbot_user_message_text': plain_text,
'langbot_session_id': query.variables['session_id'],
'langbot_conversation_id': query.variables['conversation_id'],
'langbot_msg_create_time': query.variables['msg_create_time'],
}
inputs.update(query.variables)
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
files=files,
timeout=120,
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
msg = provider_message.Message(
role='assistant',
content=None,
tool_calls=[
provider_message.ToolCall(
id=chunk['data']['node_id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['data']['title'],
arguments=json.dumps({}),
),
)
],
)
yield msg
elif chunk['event'] == 'workflow_finished':
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
msg = provider_message.Message(
role='assistant',
content=content,
)
yield msg
async def _chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
mode = 'basic'
basic_mode_pending_chunk = ''
inputs = {}
inputs.update(query.variables)
message_idx = 0
chunk = None # 初始化chunk变量,防止在没有响应时引用错误
is_final = False
think_start = False
think_end = False
yielded_final = False
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_started':
mode = 'workflow'
elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished'):
# Some Dify deployments may omit workflow_started in streamed chunks.
mode = 'workflow'
if chunk['event'] == 'message':
message_idx += 1
if remove_think:
if '<think>' in chunk['answer'] and not think_start:
think_start = True
continue
if '</think>' in chunk['answer'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['answer'])
basic_mode_pending_chunk += content
think_end = True
elif think_end:
basic_mode_pending_chunk += chunk['answer']
if think_start:
continue
else:
basic_mode_pending_chunk += chunk['answer']
if chunk['event'] == 'message_end':
is_final = True
elif chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if mode == 'workflow' and chunk['event'] == 'node_finished':
if chunk['data'].get('node_type') == 'answer':
answer = self._extract_dify_text_output(chunk['data'].get('outputs', {}).get('answer'))
if answer:
basic_mode_pending_chunk = answer
if (
not yielded_final
and (is_final or message_idx % 8 == 0)
and (basic_mode_pending_chunk != '' or is_final)
):
# content, _ = self._process_thinking_content(basic_mode_pending_chunk)
yield provider_message.MessageChunk(
role='assistant',
content=basic_mode_pending_chunk,
is_final=is_final,
)
if is_final:
yielded_final = True
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _agent_chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = []
inputs = {}
inputs.update(query.variables)
pending_agent_message = ''
chunk = None # 初始化chunk变量,防止在没有响应时引用错误
message_idx = 0
is_final = False
think_start = False
think_end = False
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
response_mode='streaming',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-agent-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'agent_message':
message_idx += 1
if remove_think:
if '<think>' in chunk['answer'] and not think_start:
think_start = True
continue
if '</think>' in chunk['answer'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['answer'])
pending_agent_message += content
think_end = True
elif think_end or not think_start:
pending_agent_message += chunk['answer']
if think_start and not think_end:
continue
else:
pending_agent_message += chunk['answer']
elif chunk['event'] == 'message_end':
is_final = True
else:
if chunk['event'] == 'agent_thought':
if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过
continue
message_idx += 1
if chunk['tool']:
msg = provider_message.MessageChunk(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id=chunk['id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['tool'],
arguments=json.dumps({}),
),
)
],
)
yield msg
if chunk['event'] == 'message_file':
message_idx += 1
if chunk['type'] == 'image' and chunk['belongs_to'] == 'assistant':
# 检查URL是否已经是完整的连接
if chunk['url'].startswith('http://') or chunk['url'].startswith('https://'):
image_url = chunk['url']
else:
base_url = self.dify_client.base_url
if base_url.endswith('/v1'):
base_url = base_url[:-3]
image_url = base_url + chunk['url']
yield provider_message.MessageChunk(
role='assistant',
content=[provider_message.ContentElement.from_image_url(image_url)],
is_final=is_final,
)
if chunk['event'] == 'error':
raise errors.DifyAPIError('dify 服务错误: ' + chunk['message'])
if message_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=pending_agent_message,
is_final=is_final,
)
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应,请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _workflow_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流"""
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
query.variables['conversation_id'] = query.session.using_conversation.uuid
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = ['workflow_started']
inputs = { # these variables are legacy variables, we need to keep them for compatibility
'langbot_user_message_text': plain_text,
'langbot_session_id': query.variables['session_id'],
'langbot_conversation_id': query.variables['conversation_id'],
'langbot_msg_create_time': query.variables['msg_create_time'],
}
inputs.update(query.variables)
messsage_idx = 0
is_final = False
think_start = False
think_end = False
workflow_contents = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
files=files,
timeout=120,
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
if chunk['event'] == 'text_chunk':
messsage_idx += 1
if remove_think:
if '<think>' in chunk['data']['text'] and not think_start:
think_start = True
continue
if '</think>' in chunk['data']['text'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += chunk['data']['text']
if think_start:
continue
else:
workflow_contents += chunk['data']['text']
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
messsage_idx += 1
msg = provider_message.MessageChunk(
role='assistant',
content=None,
tool_calls=[
provider_message.ToolCall(
id=chunk['data']['node_id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['data']['title'],
arguments=json.dumps({}),
),
)
],
)
yield msg
if messsage_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
is_final=is_final,
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""
if await query.adapter.is_stream_output_supported():
msg_idx = 0
if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
async for msg in self._chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
async for msg in self._agent_chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
async for msg in self._workflow_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
else:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)
else:
if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
async for msg in self._chat_messages(query):
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
async for msg in self._agent_chat_messages(query):
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
async for msg in self._workflow_messages(query):
yield msg
else:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)
@@ -1,187 +0,0 @@
"""
Legacy Langflow API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/langflow-agent` plugin instead.
"""
from __future__ import annotations
import typing
import json
import httpx
import uuid
import traceback
from .. import runner
from ...core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
@runner.runner_class('langflow-api')
class LangflowAPIRunner(runner.RequestRunner):
"""Langflow API 对话请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
async def _build_request_payload(self, query: pipeline_query.Query) -> dict:
"""构建请求负载
Args:
query: 用户查询对象
Returns:
dict: 请求负载
"""
# 获取用户消息文本
user_message_text = ''
if isinstance(query.user_message.content, str):
user_message_text = query.user_message.content
elif isinstance(query.user_message.content, list):
for item in query.user_message.content:
if item.type == 'text':
user_message_text += item.text
# 从配置中获取 input_type 和 output_type,如果未配置则使用默认值
input_type = self.pipeline_config['ai']['langflow-api'].get('input_type', 'chat')
output_type = self.pipeline_config['ai']['langflow-api'].get('output_type', 'chat')
# 构建基本负载
payload = {
'output_type': output_type,
'input_type': input_type,
'input_value': user_message_text,
'session_id': str(uuid.uuid4()),
}
# 如果配置中有tweaks,则添加到负载中
tweaks = json.loads(self.pipeline_config['ai']['langflow-api'].get('tweaks'))
if tweaks:
payload['tweaks'] = tweaks
return payload
async def run(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""运行请求
Args:
query: 用户查询对象
Yields:
Message: 回复消息
"""
# 检查是否支持流式输出
is_stream = False
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
# 从配置中获取API参数
base_url = self.pipeline_config['ai']['langflow-api']['base-url']
api_key = self.pipeline_config['ai']['langflow-api']['api-key']
flow_id = self.pipeline_config['ai']['langflow-api']['flow-id']
# 构建API URL
url = f'{base_url.rstrip("/")}/api/v1/run/{flow_id}'
# 构建请求负载
payload = await self._build_request_payload(query)
# 设置请求头
headers = {'Content-Type': 'application/json', 'x-api-key': api_key}
# 发送请求
async with httpx.AsyncClient() as client:
if is_stream:
# 流式请求
async with client.stream('POST', url, json=payload, headers=headers, timeout=120.0) as response:
response.raise_for_status()
accumulated_content = ''
message_count = 0
async for line in response.aiter_lines():
data_str = line
if data_str.startswith('data: '):
data_str = data_str[6:] # 移除 "data: " 前缀
try:
data = json.loads(data_str)
# 提取消息内容
message_text = ''
if 'outputs' in data and len(data['outputs']) > 0:
output = data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in data:
messages = data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
if message_text:
# 更新累积内容
accumulated_content = message_text
message_count += 1
# 每8条消息或有新内容时生成一个chunk
if message_count % 8 == 0 or len(message_text) > 0:
yield provider_message.MessageChunk(
role='assistant', content=accumulated_content, is_final=False
)
except json.JSONDecodeError:
# 如果不是JSON,跳过这一行
traceback.print_exc()
continue
# 发送最终消息
yield provider_message.MessageChunk(role='assistant', content=accumulated_content, is_final=True)
else:
# 非流式请求
response = await client.post(url, json=payload, headers=headers, timeout=120.0)
response.raise_for_status()
# 解析响应
response_data = response.json()
# 提取消息内容
# 根据Langflow API文档,响应结构可能在outputs[0].outputs[0].outputs.message.message中
message_text = ''
if 'outputs' in response_data and len(response_data['outputs']) > 0:
output = response_data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in response_data:
messages = response_data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
# 如果仍然没有找到消息,返回完整响应的字符串表示
if not message_text:
message_text = json.dumps(response_data, ensure_ascii=False, indent=2)
# 生成回复消息
if is_stream:
yield provider_message.MessageChunk(role='assistant', content=message_text, is_final=True)
else:
reply_message = provider_message.Message(role='assistant', content=message_text)
yield reply_message
@@ -1,526 +0,0 @@
"""
Legacy Local Agent Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/local-agent` plugin instead.
"""
from __future__ import annotations
import json
import copy
import typing
from .. import runner
from ...telemetry import features as telemetry_features
from ..modelmgr import requester as modelmgr_requester
from ..tools.loaders.native import EXEC_TOOL_NAME
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.rag.context as rag_context
rag_combined_prompt_template = """
The following are relevant context entries retrieved from the knowledge base.
Please use them to answer the user's message.
Respond in the same language as the user's input.
<context>
{rag_context}
</context>
<user_message>
{user_message}
</user_message>
"""
SANDBOX_EXEC_TOOL_NAME = 'sandbox_exec'
SANDBOX_EXEC_SYSTEM_GUIDANCE = (
'When sandbox_exec is available, use it for exact calculations, statistics, structured data parsing, '
'and code execution instead of estimating mentally. If the user provides numbers, tables, CSV-like text, '
'JSON, or other data and asks for a computed answer, prefer running a short Python script in sandbox_exec '
'and then answer from the tool result.'
)
# Hard cap on tool-call rounds within a single agent turn. A looping or
# adversarial model can otherwise emit tool calls indefinitely (each potentially
# a sandbox exec), yielding a non-terminating request and runaway cost. Set
# generously so it never interrupts legitimate multi-step agentic workflows.
MAX_TOOL_CALL_ROUNDS = 128
def _model_has_ability(model: modelmgr_requester.RuntimeLLMModel, ability: str) -> bool:
return ability in (model.model_entity.abilities or [])
class _StreamAccumulator:
"""Accumulate streamed content and fragmented OpenAI-style tool calls."""
def __init__(self, msg_sequence: int = 0, initial_content: str | None = None):
self.tool_calls_map: dict[str, provider_message.ToolCall] = {}
self.msg_idx = 0
self.accumulated_content = initial_content or ''
self.last_role = 'assistant'
self.msg_sequence = msg_sequence
def add(self, msg: provider_message.MessageChunk) -> provider_message.MessageChunk | None:
self.msg_idx += 1
if msg.role:
self.last_role = msg.role
if msg.content:
self.accumulated_content += msg.content
if msg.tool_calls:
for tool_call in msg.tool_calls:
if tool_call.id not in self.tool_calls_map:
self.tool_calls_map[tool_call.id] = provider_message.ToolCall(
id=tool_call.id,
type=tool_call.type,
function=provider_message.FunctionCall(
name=tool_call.function.name if tool_call.function else '',
arguments='',
),
)
if tool_call.function and tool_call.function.arguments:
self.tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments
if self.msg_idx % 8 == 0 or msg.is_final:
self.msg_sequence += 1
return provider_message.MessageChunk(
role=self.last_role,
content=self.accumulated_content,
tool_calls=list(self.tool_calls_map.values()) if (self.tool_calls_map and msg.is_final) else None,
is_final=msg.is_final,
msg_sequence=self.msg_sequence,
)
return None
def final_message(self) -> provider_message.MessageChunk:
return provider_message.MessageChunk(
role=self.last_role,
content=self.accumulated_content,
tool_calls=list(self.tool_calls_map.values()) if self.tool_calls_map else None,
msg_sequence=self.msg_sequence,
)
@runner.runner_class('local-agent')
class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner"""
def _build_request_messages(
self,
query: pipeline_query.Query,
user_message: provider_message.Message,
) -> list[provider_message.Message]:
req_messages = query.prompt.messages.copy() + query.messages.copy()
if any(getattr(tool, 'name', None) == EXEC_TOOL_NAME for tool in query.use_funcs or []):
req_messages.append(
provider_message.Message(
role='system',
content=self.ap.box_service.get_system_guidance(),
)
)
req_messages.append(user_message)
return req_messages
async def _get_model_candidates(
self,
query: pipeline_query.Query,
) -> list[modelmgr_requester.RuntimeLLMModel]:
"""Build ordered list of models to try: primary model + fallback models."""
candidates = []
# Primary model
if query.use_llm_model_uuid:
try:
primary = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid)
candidates.append(primary)
except ValueError:
self.ap.logger.warning(f'Primary model {query.use_llm_model_uuid} not found')
# Fallback models
fallback_uuids = (query.variables or {}).get('_fallback_model_uuids', [])
for fb_uuid in fallback_uuids:
try:
fb_model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
candidates.append(fb_model)
except ValueError:
self.ap.logger.warning(f'Fallback model {fb_uuid} not found, skipping')
return candidates
async def _invoke_with_fallback(
self,
query: pipeline_query.Query,
candidates: list[modelmgr_requester.RuntimeLLMModel],
messages: list,
funcs: list,
remove_think: bool,
) -> tuple[provider_message.Message, modelmgr_requester.RuntimeLLMModel]:
"""Try non-streaming invocation with sequential fallback. Returns (message, model_used)."""
last_error = None
for model in candidates:
try:
msg = await model.provider.invoke_llm(
query,
model,
messages,
funcs if _model_has_ability(model, 'func_call') else [],
extra_args=model.model_entity.extra_args,
remove_think=remove_think,
)
return msg, model
except Exception as e:
last_error = e
self.ap.logger.warning(f'Model {model.model_entity.name} failed: {e}, trying next fallback...')
raise last_error or RuntimeError('No model candidates available')
async def _invoke_stream_with_fallback(
self,
query: pipeline_query.Query,
candidates: list[modelmgr_requester.RuntimeLLMModel],
messages: list,
funcs: list,
remove_think: bool,
) -> tuple[typing.AsyncGenerator, modelmgr_requester.RuntimeLLMModel]:
"""Try streaming invocation with sequential fallback. Returns (stream_generator, model_used).
Fallback is only possible before any chunks have been yielded to the client.
Once streaming starts, the model is committed.
"""
last_error = None
for model in candidates:
try:
stream = model.provider.invoke_llm_stream(
query,
model,
messages,
funcs if _model_has_ability(model, 'func_call') else [],
extra_args=model.model_entity.extra_args,
remove_think=remove_think,
)
# Attempt to get the first chunk to verify the stream works
first_chunk = await stream.__anext__()
async def _chain_stream(first, rest):
yield first
async for chunk in rest:
yield chunk
return _chain_stream(first_chunk, stream), model
except StopAsyncIteration:
# Empty stream — treat as success (model returned nothing)
async def _empty_stream():
return
yield # make it a generator
return _empty_stream(), model
except Exception as e:
last_error = e
self.ap.logger.warning(f'Model {model.model_entity.name} stream failed: {e}, trying next fallback...')
raise last_error or RuntimeError('No model candidates available')
async def run(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""Run request"""
pending_tool_calls = []
initial_response_emitted = False
# Get knowledge bases list from query variables (set by PreProcessor,
# may have been modified by plugins during PromptPreProcessing)
kb_uuids = query.variables.get('_knowledge_base_uuids', [])
user_message = copy.deepcopy(query.user_message)
user_message_text = ''
if isinstance(user_message.content, str):
user_message_text = user_message.content
elif isinstance(user_message.content, list):
for ce in user_message.content:
if ce.type == 'text':
user_message_text += ce.text
break
if kb_uuids and user_message_text:
# only support text for now
all_results: list[rag_context.RetrievalResultEntry] = []
kb_engine_plugins: set[str] = set()
# Retrieve from each knowledge base
for kb_uuid in kb_uuids:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
if not kb:
self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping')
continue
try:
engine_plugin_id = kb.get_knowledge_engine_plugin_id() or 'builtin'
except Exception:
engine_plugin_id = 'builtin'
kb_engine_plugins.add(engine_plugin_id)
result = await kb.retrieve(
user_message_text,
settings={
'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id),
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
if result:
all_results.extend(result)
# Telemetry: knowledge base usage (counts and engine categories only)
telemetry_features.set_value(
query,
'kb',
{
'kb_count': len(kb_uuids),
'engine_plugins': sorted(kb_engine_plugins),
'retrieved_entries': len(all_results),
},
)
# Rerank step: re-score results using a rerank model if configured
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
rerank_model_uuid = local_agent_config.get('rerank-model', '')
if rerank_model_uuid == '__none__':
rerank_model_uuid = ''
self.ap.logger.info(
f'Rerank config: model_uuid={rerank_model_uuid!r}, '
f'results={len(all_results)}, '
f'local_agent_keys={list(local_agent_config.keys())}'
)
if all_results and rerank_model_uuid:
try:
rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid)
rerank_top_k = int(local_agent_config.get('rerank-top-k', 5))
doc_texts = []
for entry in all_results:
text = ' '.join(c.text for c in entry.content if c.type == 'text' and c.text)
doc_texts.append(text)
doc_texts_capped = doc_texts[:64]
scores = await rerank_model.provider.invoke_rerank(
model=rerank_model,
query=user_message_text,
documents=doc_texts_capped,
)
scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True)
top_indices = [s['index'] for s in scored[:rerank_top_k] if s['index'] < len(all_results)]
all_results = [all_results[i] for i in top_indices]
self.ap.logger.info(
f'Rerank complete: {len(doc_texts)} docs reranked -> top {len(all_results)} kept (top_k={rerank_top_k})'
)
except ValueError:
self.ap.logger.warning(f'Rerank model {rerank_model_uuid} not found, skipping rerank')
except Exception as e:
self.ap.logger.warning(f'Rerank failed, using original order: {e}')
final_user_message_text = ''
if all_results:
texts = []
idx = 1
for entry in all_results:
for content in entry.content:
if content.type == 'text' and content.text is not None:
texts.append(f'[{idx}] {content.text}')
idx += 1
rag_context_text = '\n\n'.join(texts)
final_user_message_text = rag_combined_prompt_template.format(
rag_context=rag_context_text, user_message=user_message_text
)
else:
final_user_message_text = user_message_text
self.ap.logger.debug(f'Final user message text: {final_user_message_text}')
for ce in user_message.content:
if ce.type == 'text':
ce.text = final_user_message_text
break
req_messages = self._build_request_messages(query, user_message)
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
remove_think = query.pipeline_config['output'].get('misc', '').get('remove-think')
# Build ordered candidate list (primary + fallbacks)
candidates = await self._get_model_candidates(query)
if not candidates:
raise RuntimeError('No LLM model configured for local-agent runner')
self.ap.logger.debug(
f'localagent req: query={query.query_id} req_messages={req_messages} '
f'candidates={[m.model_entity.name for m in candidates]}'
)
if not is_stream:
# Non-streaming: invoke with fallback
msg, use_llm_model = await self._invoke_with_fallback(
query,
candidates,
req_messages,
query.use_funcs,
remove_think,
)
final_msg = msg
else:
# Streaming: invoke with fallback
stream_accumulator = _StreamAccumulator(msg_sequence=1)
stream_src, use_llm_model = await self._invoke_stream_with_fallback(
query,
candidates,
req_messages,
query.use_funcs,
remove_think,
)
async for msg in stream_src:
chunk = stream_accumulator.add(msg)
if chunk:
yield chunk
initial_response_emitted = True
final_msg = stream_accumulator.final_message()
pending_tool_calls = final_msg.tool_calls
first_content = final_msg.content
if isinstance(final_msg, provider_message.MessageChunk):
first_end_sequence = final_msg.msg_sequence
if not is_stream:
yield final_msg
elif not initial_response_emitted:
yield final_msg
initial_response_emitted = True
req_messages.append(final_msg)
# Once a model succeeds, commit to it for the tool call loop
# (no fallback mid-conversation — different models may interpret tool results differently)
tool_call_round = 0
while pending_tool_calls:
tool_call_round += 1
telemetry_features.set_value(query, 'tool_call_rounds', tool_call_round)
if tool_call_round > MAX_TOOL_CALL_ROUNDS:
self.ap.logger.warning(
f'Tool-call loop reached the {MAX_TOOL_CALL_ROUNDS}-round cap '
f'(query_id={query.query_id}); stopping to avoid a non-terminating request.'
)
break
for tool_call in pending_tool_calls:
try:
func = tool_call.function
if func.arguments:
parameters = json.loads(func.arguments)
else:
parameters = {}
func_ret = await self.ap.tool_mgr.execute_func_call(func.name, parameters, query=query)
# Handle return value content
tool_content = None
if (
isinstance(func_ret, list)
and len(func_ret) > 0
and isinstance(func_ret[0], provider_message.ContentElement)
):
tool_content = func_ret
else:
tool_content = json.dumps(func_ret, ensure_ascii=False)
if is_stream:
msg = provider_message.MessageChunk(
role='tool',
content=tool_content,
tool_call_id=tool_call.id,
)
else:
msg = provider_message.Message(
role='tool',
content=tool_content,
tool_call_id=tool_call.id,
)
yield msg
req_messages.append(msg)
except Exception as e:
if is_stream:
err_msg = provider_message.MessageChunk(
role='tool',
content=f'err: {e}',
tool_call_id=tool_call.id,
is_final=True,
)
else:
err_msg = provider_message.Message(role='tool', content=f'err: {e}', tool_call_id=tool_call.id)
yield err_msg
req_messages.append(err_msg)
self.ap.logger.debug(
f'localagent req: query={query.query_id} req_messages={req_messages} '
f'use_llm_model={use_llm_model.model_entity.name}'
)
if is_stream:
stream_accumulator = _StreamAccumulator(
msg_sequence=first_end_sequence,
initial_content=first_content,
)
tool_stream_src = use_llm_model.provider.invoke_llm_stream(
query,
use_llm_model,
req_messages,
query.use_funcs if _model_has_ability(use_llm_model, 'func_call') else [],
extra_args=use_llm_model.model_entity.extra_args,
remove_think=remove_think,
)
async for msg in tool_stream_src:
chunk = stream_accumulator.add(msg)
if chunk:
yield chunk
final_msg = stream_accumulator.final_message()
else:
# Non-streaming: use committed model directly (no fallback in tool loop)
msg = await use_llm_model.provider.invoke_llm(
query,
use_llm_model,
req_messages,
query.use_funcs if _model_has_ability(use_llm_model, 'func_call') else [],
extra_args=use_llm_model.model_entity.extra_args,
remove_think=remove_think,
)
yield msg
final_msg = msg
pending_tool_calls = final_msg.tool_calls
req_messages.append(final_msg)
@@ -1,284 +0,0 @@
"""
Legacy n8n Service API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/n8n-agent` plugin instead.
"""
from __future__ import annotations
import typing
import json
import uuid
import aiohttp
from langbot.pkg.utils import httpclient
from .. import runner
from ...core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class N8nAPIError(Exception):
"""N8n API 请求失败"""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
@runner.runner_class('n8n-service-api')
class N8nServiceAPIRunner(runner.RequestRunner):
"""N8n Service API 工作流请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
# 获取webhook URL
self.webhook_url = self.pipeline_config['ai']['n8n-service-api']['webhook-url']
# 获取超时设置,默认为120秒
self.timeout = self.pipeline_config['ai']['n8n-service-api'].get('timeout', 120)
# 获取输出键名,默认为response
self.output_key = self.pipeline_config['ai']['n8n-service-api'].get('output-key', 'response')
# 获取认证类型,默认为none
self.auth_type = self.pipeline_config['ai']['n8n-service-api'].get('auth-type', 'none')
# 根据认证类型获取相应的认证信息
if self.auth_type == 'basic':
self.basic_username = self.pipeline_config['ai']['n8n-service-api'].get('basic-username', '')
self.basic_password = self.pipeline_config['ai']['n8n-service-api'].get('basic-password', '')
elif self.auth_type == 'jwt':
self.jwt_secret = self.pipeline_config['ai']['n8n-service-api'].get('jwt-secret', '')
self.jwt_algorithm = self.pipeline_config['ai']['n8n-service-api'].get('jwt-algorithm', 'HS256')
elif self.auth_type == 'header':
self.header_name = self.pipeline_config['ai']['n8n-service-api'].get('header-name', '')
self.header_value = self.pipeline_config['ai']['n8n-service-api'].get('header-value', '')
async def _preprocess_user_message(self, query: pipeline_query.Query) -> str:
"""预处理用户消息,提取纯文本
Returns:
str: 纯文本消息
"""
plain_text = ''
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
# 注意:n8n webhook目前不支持直接处理图片,如需支持可在此扩展
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
return plain_text
async def _process_response(
self, response: aiohttp.ClientResponse
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""处理响应——支持流式格式和普通 JSON 格式"""
full_content = ''
full_text = ''
chunk_idx = 0
is_final = False
message_idx = 0
buffer = ''
decoder = json.JSONDecoder()
async for raw_chunk in response.content.iter_chunked(1024):
if not raw_chunk:
continue
try:
# 将 bytes 解码为字符串(容忍错误)
if isinstance(raw_chunk, (bytes, bytearray)):
chunk_str = raw_chunk.decode('utf-8', errors='replace')
else:
chunk_str = str(raw_chunk)
full_text += chunk_str
buffer += chunk_str
# 尝试从 buffer 中循环解析出 JSON 对象(处理多个对象或部分对象)
while buffer:
buffer = buffer.lstrip()
if not buffer:
break
try:
obj, idx = decoder.raw_decode(buffer)
buffer = buffer[idx:]
if not isinstance(obj, dict):
# 忽略非字典类型的顶级 JSON
continue
if obj.get('type') == 'item' and 'content' in obj:
chunk_idx += 1
content = obj['content']
full_content += content
elif obj.get('type') == 'end':
is_final = True
if is_final or (chunk_idx > 0 and chunk_idx % 8 == 0):
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
content=full_content,
is_final=is_final,
msg_sequence=message_idx,
)
except json.JSONDecodeError:
# buffer 末尾可能是一个不完整的 JSON,等待更多数据
break
except Exception as e:
# 记录解析失败并继续接收后续 chunk
try:
preview = chunk_str[:200]
except Exception:
preview = '<unavailable>'
self.ap.logger.warning(f'Failed to process chunk: {e}; chunk preview: {preview}')
# 流结束后,尝试解析残余 buffer
if buffer:
try:
buffer = buffer.strip()
if buffer:
obj, _ = decoder.raw_decode(buffer)
if isinstance(obj, dict):
if obj.get('type') == 'item' and 'content' in obj:
chunk_idx += 1
full_content += obj['content']
elif obj.get('type') == 'end':
is_final = True
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
content=full_content,
is_final=is_final,
msg_sequence=message_idx,
)
except Exception as e:
preview = buffer[:200]
self.ap.logger.warning(f'Failed to parse remaining buffer: {e}; buffer preview: {preview}')
# n8n 返回普通 JSON 格式(无任何流式 type:item 内容)
if chunk_idx == 0:
output_content = ''
try:
response_data = json.loads(full_text.strip())
if isinstance(response_data, dict):
if self.output_key in response_data:
output_content = response_data[self.output_key]
else:
output_content = json.dumps(response_data, ensure_ascii=False)
else:
output_content = full_text
except json.JSONDecodeError:
output_content = full_text
self.ap.logger.debug(f'n8n webhook response (non-stream): {full_text[:200]}')
yield provider_message.MessageChunk(
role='assistant',
content=output_content,
is_final=True,
msg_sequence=message_idx + 1,
)
async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用n8n webhook"""
# 生成会话ID(如果不存在)
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
# Keep query variables in sync with the generated/new conversation id.
# query.variables is later merged into payload and would otherwise
# overwrite the generated conversation_id with the stale preprocessor
# value (usually None for a new conversation).
query.variables['conversation_id'] = query.session.using_conversation.uuid
# 预处理用户消息
plain_text = await self._preprocess_user_message(query)
# 准备请求数据
payload = {
# 基本消息内容
'chatInput': plain_text, # 考虑到之前用户直接用的message model这里添加新键
'message': plain_text,
'user_message_text': plain_text,
'conversation_id': query.session.using_conversation.uuid,
'session_id': query.variables.get('session_id', ''),
'user_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
'msg_create_time': query.variables.get('msg_create_time', ''),
}
# 添加所有变量到payload
payload.update(query.variables)
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
try:
# 准备请求头和认证信息
headers = {}
auth = None
# 根据认证类型设置相应的认证信息
if self.auth_type == 'basic':
# 使用Basic认证
auth = aiohttp.BasicAuth(self.basic_username, self.basic_password)
self.ap.logger.debug(f'using basic auth: {self.basic_username}')
elif self.auth_type == 'jwt':
# 使用JWT认证
import jwt
import time
# 创建JWT令牌
payload_jwt = {
'exp': int(time.time()) + 3600, # 1小时过期
'iat': int(time.time()),
'sub': 'n8n-webhook',
}
token = jwt.encode(payload_jwt, self.jwt_secret, algorithm=self.jwt_algorithm)
# 添加到Authorization头
headers['Authorization'] = f'Bearer {token}'
self.ap.logger.debug('using jwt auth')
elif self.auth_type == 'header':
# 使用自定义请求头认证
headers[self.header_name] = self.header_value
self.ap.logger.debug(f'using header auth: {self.header_name}')
else:
self.ap.logger.debug('no auth')
# 调用webhook
session = httpclient.get_session()
async with session.post(
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
async for chunk in self._process_response(response):
if is_stream:
yield chunk
elif chunk.is_final:
yield provider_message.Message(
role='assistant',
content=chunk.content,
)
except Exception as e:
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""
async for msg in self._call_webhook(query):
yield msg
-209
View File
@@ -1,209 +0,0 @@
"""
Legacy Tbox (蚂蚁百宝箱) API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/tbox-agent` plugin instead.
"""
from __future__ import annotations
import typing
import json
import base64
import tempfile
import os
from tboxsdk.tbox import TboxClient
from tboxsdk.model.file import File, FileType
from .. import runner
from ...core import app
from ...utils import image
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
class TboxAPIError(Exception):
"""TBox API 请求失败"""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
@runner.runner_class('tbox-app-api')
class TboxAPIRunner(runner.RequestRunner):
"蚂蚁百宝箱API对话请求器"
# 运行器内部使用的配置
app_id: str # 蚂蚁百宝箱平台中的应用ID
api_key: str # 在蚂蚁百宝箱平台中申请的令牌
def __init__(self, ap: app.Application, pipeline_config: dict):
"""初始化"""
self.ap = ap
self.pipeline_config = pipeline_config
# 初始化Tbox 参数配置
self.app_id = self.pipeline_config['ai']['tbox-app-api']['app-id']
self.api_key = self.pipeline_config['ai']['tbox-app-api']['api-key']
# 初始化Tbox client
self.tbox_client = TboxClient(authorization=self.api_key)
async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[str]]:
"""预处理用户消息,提取纯文本,并将图片上传到 Tbox 服务
Returns:
tuple[str, list[str]]: 纯文本和图片的 Tbox 文件ID
"""
plain_text = ''
image_ids = []
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
elif ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
# 创建临时文件
file_bytes = base64.b64decode(image_b64)
try:
with tempfile.NamedTemporaryFile(suffix=f'.{image_format}', delete=False) as tmp_file:
tmp_file.write(file_bytes)
tmp_file_path = tmp_file.name
file_upload_resp = self.tbox_client.upload_file(tmp_file_path)
image_id = file_upload_resp.get('data', '')
image_ids.append(image_id)
finally:
# 清理临时文件
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
return plain_text, image_ids
async def _agent_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""TBox 智能体对话请求"""
plain_text, image_ids = await self._preprocess_user_message(query)
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
# 获取Tbox的conversation_id
conversation_id = query.session.using_conversation.uuid or None
files = None
if image_ids:
files = [File(file_id=image_id, type=FileType.IMAGE) for image_id in image_ids]
# 发送对话请求
response = self.tbox_client.chat(
app_id=self.app_id, # Tbox中智能体应用的ID
user_id=query.bot_uuid, # 用户ID
query=plain_text, # 用户输入的文本信息
stream=is_stream, # 是否流式输出
conversation_id=conversation_id, # 会话ID,为None时Tbox会自动创建一个新会话
files=files, # 图片内容
)
if is_stream:
# 解析Tbox流式输出内容,并发送给上游
for chunk in self._process_stream_message(response, query, remove_think):
yield chunk
else:
message = self._process_non_stream_message(response, query, remove_think)
yield provider_message.Message(
role='assistant',
content=message,
)
def _process_non_stream_message(self, response: typing.Dict, query: pipeline_query.Query, remove_think: bool):
if response.get('errorCode') != '0':
raise TboxAPIError(f'Tbox API 请求失败: {response.get("errorMsg", "")}')
payload = response.get('data', {})
conversation_id = payload.get('conversationId', '')
query.session.using_conversation.uuid = conversation_id
thinking_content = payload.get('reasoningContent', [])
result = ''
if thinking_content and not remove_think:
result += f'<think>\n{thinking_content[0].get("text", "")}\n</think>\n'
content = payload.get('result', [])
if content:
result += content[0].get('chunk', '')
return result
def _process_stream_message(
self, response: typing.Generator[dict], query: pipeline_query.Query, remove_think: bool
):
idx_msg = 0
pending_content = ''
conversation_id = None
think_start = False
think_end = False
for chunk in response:
if chunk.get('type', '') == 'chunk':
"""
Tbox返回的消息内容chunk结构
{'lane': 'default', 'payload': {'conversationId': '20250918tBI947065406', 'messageId': '20250918TB1f53230954', 'text': ''}, 'type': 'chunk'}
"""
# 如果包含思考过程,拼接</think>
if think_start and not think_end:
pending_content += '\n</think>\n'
think_end = True
payload = chunk.get('payload', {})
if not conversation_id:
conversation_id = payload.get('conversationId')
query.session.using_conversation.uuid = conversation_id
if payload.get('text'):
idx_msg += 1
pending_content += payload.get('text')
elif chunk.get('type', '') == 'thinking' and not remove_think:
"""
Tbox返回的思考过程chunk结构
{'payload': '{"ext_data":{"text":"日期"},"event":"flow.node.llm.thinking","entity":{"node_type":"text-completion","execute_id":"6","group_id":0,"parent_execute_id":"6","node_name":"模型推理","node_id":"TC_5u6gl0"}}', 'type': 'thinking'}
"""
payload = json.loads(chunk.get('payload', '{}'))
if payload.get('ext_data', {}).get('text'):
idx_msg += 1
content = payload.get('ext_data', {}).get('text')
if not think_start:
think_start = True
pending_content += f'<think>\n{content}'
else:
pending_content += content
elif chunk.get('type', '') == 'error':
raise TboxAPIError(
f'Tbox API 请求失败: status_code={chunk.get("status_code")} message={chunk.get("message")} request_id={chunk.get("request_id")} '
)
if idx_msg % 8 == 0:
yield provider_message.MessageChunk(
role='assistant',
content=pending_content,
is_final=False,
)
# Tbox不返回END事件,默认发一个最终消息
yield provider_message.MessageChunk(
role='assistant',
content=pending_content,
is_final=True,
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行"""
msg_seq = 0
async for msg in self._agent_messages(query):
if isinstance(msg, provider_message.MessageChunk):
msg_seq += 1
msg.msg_sequence = msg_seq
yield msg