Files
LangBot/src/langbot/pkg/provider/runners/difysvapi.py
huanghuoguoguo b220cf02e5 docs(runner): mark legacy runners and add PROGRESS.md
- Add DEPRECATED docstring to all legacy runners in pkg/provider/runners/
- Mark migration target for each runner (local-agent, dify, n8n, coze, dashscope, langflow, tbox)
- Add PROGRESS.md to track agent-runner-pluginization implementation status
- Remove completed PHASE0_INTEGRATION_RECORD.md
2026-05-17 11:07:52 +08:00

785 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Legacy Dify Service API Runner.
DEPRECATED: This runner has been migrated to the AgentRunner plugin format.
Use the official `langbot/dify-agent` plugin instead.
Migration target: /home/glwuy/langbot-app/langbot-agent-runner/dify-agent/
"""
from __future__ import annotations
import typing
import json
import uuid
import base64
import mimetypes
from langbot.pkg.provider import runner
from langbot.pkg.core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.utils import image
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.libs.dify_service_api.v1 import client, errors
import httpx
@runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器"""
dify_client: client.AsyncDifyServiceClient
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
valid_app_types = ['chat', 'agent', 'workflow']
if self.pipeline_config['ai']['dify-service-api']['app-type'] not in valid_app_types:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)
api_key = self.pipeline_config['ai']['dify-service-api']['api-key']
self.dify_client = client.AsyncDifyServiceClient(
api_key=api_key,
base_url=self.pipeline_config['ai']['dify-service-api']['base-url'],
)
def _process_thinking_content(
self,
content: str,
) -> tuple[str, str]:
"""处理思维链内容
Args:
content: 原始内容
Returns:
(处理后的内容, 提取的思维链内容)
"""
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
thinking_content = ''
# 从 content 中提取 <think> 标签内容
if content and '<think>' in content and '</think>' in content:
import re
think_pattern = r'<think>(.*?)</think>'
think_matches = re.findall(think_pattern, content, re.DOTALL)
if think_matches:
thinking_content = '\n'.join(think_matches)
# 移除 content 中的 <think> 标签
content = re.sub(think_pattern, '', content, flags=re.DOTALL).strip()
# 3. 根据 remove_think 参数决定是否保留思维链
if remove_think:
return content, ''
else:
# 如果有思维链内容,将其以 <think> 格式添加到 content 开头
if thinking_content:
content = f'<think>\n{thinking_content}\n</think>\n{content}'.strip()
return content, thinking_content
def _extract_dify_text_output(self, value: typing.Any) -> str:
"""Extract text content from Dify output payload."""
if value is None:
return ''
if isinstance(value, dict):
content = value.get('content')
if isinstance(content, str):
return content
return json.dumps(value, ensure_ascii=False)
if isinstance(value, str):
text = value.strip()
if not text:
return ''
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return value
if isinstance(parsed, dict) and isinstance(parsed.get('content'), str):
return parsed['content']
return value
return str(value)
async def _preprocess_user_message(self, query: pipeline_query.Query) -> tuple[str, list[dict]]:
"""预处理用户消息,提取纯文本,并将图片/文件上传到 Dify 服务
Returns:
tuple[str, list[dict]]: 纯文本和上传后的文件描述(包含 type 与 id
"""
plain_text = ''
upload_files: list[dict] = []
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
async def upload_file_bytes(file_name: str, file_bytes: bytes, content_type: str) -> str:
file_name = file_name or 'file'
content_type = content_type or 'application/octet-stream'
file = (file_name, file_bytes, content_type)
resp = await self.dify_client.upload_file(file, user_tag)
return resp['id']
async def download_file(file_url: str) -> tuple[bytes, str]:
"""Download file from url (supports data url)."""
async with httpx.AsyncClient() as client_session:
resp = await client_session.get(file_url)
resp.raise_for_status()
content_type = (
resp.headers.get('content-type') or mimetypes.guess_type(file_url)[0] or 'application/octet-stream'
)
return resp.content, content_type
def _detect_file_type(content_type: str) -> str:
"""Map MIME to dify file type."""
if content_type and content_type.startswith('image/'):
return 'image'
if content_type and content_type.startswith('audio/'):
return 'audio'
if content_type and content_type.startswith('video/'):
return 'video'
return 'document'
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
elif ce.type == 'image_base64':
image_b64, image_format = await image.extract_b64_and_format(ce.image_base64)
file_bytes = base64.b64decode(image_b64)
image_id = await upload_file_bytes(f'img.{image_format}', file_bytes, f'image/{image_format}')
upload_files.append({'type': 'image', 'id': image_id})
elif ce.type == 'file_url':
file_url = getattr(ce, 'file_url', None)
file_name = getattr(ce, 'file_name', None) or 'file'
try:
file_bytes, content_type = await download_file(file_url)
file_id = await upload_file_bytes(file_name, file_bytes, content_type)
file_type = _detect_file_type(content_type)
upload_files.append({'type': file_type, 'id': file_id})
except Exception as e:
self.ap.logger.warning(f'dify file upload failed: {e}')
elif ce.type == 'file_base64':
file_name = getattr(ce, 'file_name', None) or 'file'
header, b64_data = ce.file_base64.split(',', 1)
content_type = 'application/octet-stream'
if ';' in header:
content_type = header.split(';')[0][5:] or content_type
file_bytes = base64.b64decode(b64_data)
file_id = await upload_file_bytes(file_name, file_bytes, content_type)
file_type = _detect_file_type(content_type)
upload_files.append({'type': file_type, 'id': file_id})
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
plain_text = plain_text if plain_text else self.pipeline_config['ai']['dify-service-api']['base-prompt']
return plain_text, upload_files
async def _chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
mode = 'basic' # 标记是基础编排还是工作流编排
basic_mode_pending_chunk = ''
inputs = {}
inputs.update(query.variables)
chunk = None # 初始化chunk变量防止在没有响应时引用错误
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_started':
mode = 'workflow'
if mode == 'workflow':
if chunk['event'] == 'node_finished':
if chunk['data']['node_type'] == 'answer':
answer = self._extract_dify_text_output(chunk['data']['outputs'].get('answer'))
content, _ = self._process_thinking_content(answer)
yield provider_message.Message(
role='assistant',
content=content,
)
elif mode == 'basic':
if chunk['event'] == 'message':
basic_mode_pending_chunk += chunk['answer']
elif chunk['event'] == 'message_end':
content, _ = self._process_thinking_content(basic_mode_pending_chunk)
yield provider_message.Message(
role='assistant',
content=content,
)
basic_mode_pending_chunk = ''
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _agent_chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = []
inputs = {}
inputs.update(query.variables)
pending_agent_message = ''
chunk = None # 初始化chunk变量防止在没有响应时引用错误
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
response_mode='streaming',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-agent-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'agent_message' or chunk['event'] == 'message':
pending_agent_message += chunk['answer']
else:
if pending_agent_message.strip() != '':
pending_agent_message = pending_agent_message.replace('</details>Action:', '</details>')
content, _ = self._process_thinking_content(pending_agent_message)
yield provider_message.Message(
role='assistant',
content=content,
)
pending_agent_message = ''
if chunk['event'] == 'agent_thought':
if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过
continue
if chunk['tool']:
msg = provider_message.Message(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id=chunk['id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['tool'],
arguments=json.dumps({}),
),
)
],
)
yield msg
if chunk['event'] == 'message_file':
if chunk['type'] == 'image' and chunk['belongs_to'] == 'assistant':
# 检查URL是否已经是完整的连接
if chunk['url'].startswith('http://') or chunk['url'].startswith('https://'):
image_url = chunk['url']
else:
base_url = self.dify_client.base_url
if base_url.endswith('/v1'):
base_url = base_url[:-3]
image_url = base_url + chunk['url']
yield provider_message.Message(
role='assistant',
content=[provider_message.ContentElement.from_image_url(image_url)],
)
if chunk['event'] == 'error':
raise errors.DifyAPIError('dify 服务错误: ' + chunk['message'])
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _workflow_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流"""
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
query.variables['conversation_id'] = query.session.using_conversation.uuid
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = ['text_chunk', 'workflow_started']
inputs = { # these variables are legacy variables, we need to keep them for compatibility
'langbot_user_message_text': plain_text,
'langbot_session_id': query.variables['session_id'],
'langbot_conversation_id': query.variables['conversation_id'],
'langbot_msg_create_time': query.variables['msg_create_time'],
}
inputs.update(query.variables)
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
files=files,
timeout=120,
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
msg = provider_message.Message(
role='assistant',
content=None,
tool_calls=[
provider_message.ToolCall(
id=chunk['data']['node_id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['data']['title'],
arguments=json.dumps({}),
),
)
],
)
yield msg
elif chunk['event'] == 'workflow_finished':
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
msg = provider_message.Message(
role='assistant',
content=content,
)
yield msg
async def _chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
mode = 'basic'
basic_mode_pending_chunk = ''
inputs = {}
inputs.update(query.variables)
message_idx = 0
chunk = None # 初始化chunk变量防止在没有响应时引用错误
is_final = False
think_start = False
think_end = False
yielded_final = False
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-chat-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_started':
mode = 'workflow'
elif chunk['event'] in ('node_started', 'node_finished', 'workflow_finished'):
# Some Dify deployments may omit workflow_started in streamed chunks.
mode = 'workflow'
if chunk['event'] == 'message':
message_idx += 1
if remove_think:
if '<think>' in chunk['answer'] and not think_start:
think_start = True
continue
if '</think>' in chunk['answer'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['answer'])
basic_mode_pending_chunk += content
think_end = True
elif think_end:
basic_mode_pending_chunk += chunk['answer']
if think_start:
continue
else:
basic_mode_pending_chunk += chunk['answer']
if chunk['event'] == 'message_end':
is_final = True
elif chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if mode == 'workflow' and chunk['event'] == 'node_finished':
if chunk['data'].get('node_type') == 'answer':
answer = self._extract_dify_text_output(chunk['data'].get('outputs', {}).get('answer'))
if answer:
basic_mode_pending_chunk = answer
if (
not yielded_final
and (is_final or message_idx % 8 == 0)
and (basic_mode_pending_chunk != '' or is_final)
):
# content, _ = self._process_thinking_content(basic_mode_pending_chunk)
yield provider_message.MessageChunk(
role='assistant',
content=basic_mode_pending_chunk,
is_final=is_final,
)
if is_final:
yielded_final = True
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _agent_chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or None
query.variables['conversation_id'] = cov_id
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = []
inputs = {}
inputs.update(query.variables)
pending_agent_message = ''
chunk = None # 初始化chunk变量防止在没有响应时引用错误
message_idx = 0
is_final = False
think_start = False
think_end = False
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.chat_messages(
inputs=inputs,
query=plain_text,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
response_mode='streaming',
conversation_id=cov_id,
files=files,
timeout=120,
):
self.ap.logger.debug('dify-agent-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'agent_message':
message_idx += 1
if remove_think:
if '<think>' in chunk['answer'] and not think_start:
think_start = True
continue
if '</think>' in chunk['answer'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['answer'])
pending_agent_message += content
think_end = True
elif think_end or not think_start:
pending_agent_message += chunk['answer']
if think_start and not think_end:
continue
else:
pending_agent_message += chunk['answer']
elif chunk['event'] == 'message_end':
is_final = True
else:
if chunk['event'] == 'agent_thought':
if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过
continue
message_idx += 1
if chunk['tool']:
msg = provider_message.MessageChunk(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id=chunk['id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['tool'],
arguments=json.dumps({}),
),
)
],
)
yield msg
if chunk['event'] == 'message_file':
message_idx += 1
if chunk['type'] == 'image' and chunk['belongs_to'] == 'assistant':
# 检查URL是否已经是完整的连接
if chunk['url'].startswith('http://') or chunk['url'].startswith('https://'):
image_url = chunk['url']
else:
base_url = self.dify_client.base_url
if base_url.endswith('/v1'):
base_url = base_url[:-3]
image_url = base_url + chunk['url']
yield provider_message.MessageChunk(
role='assistant',
content=[provider_message.ContentElement.from_image_url(image_url)],
is_final=is_final,
)
if chunk['event'] == 'error':
raise errors.DifyAPIError('dify 服务错误: ' + chunk['message'])
if message_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=pending_agent_message,
is_final=is_final,
)
if chunk is None:
raise errors.DifyAPIError('Dify API 没有返回任何响应请检查网络连接和API配置')
query.session.using_conversation.uuid = chunk['conversation_id']
async def _workflow_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流"""
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
query.variables['conversation_id'] = query.session.using_conversation.uuid
plain_text, upload_files = await self._preprocess_user_message(query)
files = [
{
'type': f['type'],
'transfer_method': 'local_file',
'upload_file_id': f['id'],
}
for f in upload_files
]
ignored_events = ['workflow_started']
inputs = { # these variables are legacy variables, we need to keep them for compatibility
'langbot_user_message_text': plain_text,
'langbot_session_id': query.variables['session_id'],
'langbot_conversation_id': query.variables['conversation_id'],
'langbot_msg_create_time': query.variables['msg_create_time'],
}
inputs.update(query.variables)
messsage_idx = 0
is_final = False
think_start = False
think_end = False
workflow_contents = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
user=f'{query.session.launcher_type.value}_{query.session.launcher_id}',
files=files,
timeout=120,
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
if chunk['event'] == 'text_chunk':
messsage_idx += 1
if remove_think:
if '<think>' in chunk['data']['text'] and not think_start:
think_start = True
continue
if '</think>' in chunk['data']['text'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += chunk['data']['text']
if think_start:
continue
else:
workflow_contents += chunk['data']['text']
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
messsage_idx += 1
msg = provider_message.MessageChunk(
role='assistant',
content=None,
tool_calls=[
provider_message.ToolCall(
id=chunk['data']['node_id'],
type='function',
function=provider_message.FunctionCall(
name=chunk['data']['title'],
arguments=json.dumps({}),
),
)
],
)
yield msg
if messsage_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
is_final=is_final,
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""
if await query.adapter.is_stream_output_supported():
msg_idx = 0
if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
async for msg in self._chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
async for msg in self._agent_chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
async for msg in self._workflow_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
else:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)
else:
if self.pipeline_config['ai']['dify-service-api']['app-type'] == 'chat':
async for msg in self._chat_messages(query):
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'agent':
async for msg in self._agent_chat_messages(query):
yield msg
elif self.pipeline_config['ai']['dify-service-api']['app-type'] == 'workflow':
async for msg in self._workflow_messages(query):
yield msg
else:
raise errors.DifyAPIError(
f'不支持的 Dify 应用类型: {self.pipeline_config["ai"]["dify-service-api"]["app-type"]}'
)