fix: dify的timeout无法自定义 (#949)

This commit is contained in:
Junyan Qin
2024-12-16 23:54:56 +08:00
parent 0dcd2d8179
commit 32b400dcb1
6 changed files with 131 additions and 70 deletions

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
from .. import migration
@migration.migration_class("dify-api-timeout-params", 17)
class DifyAPITimeoutParamsMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120
await self.ap.provider_cfg.dump_config()

View File

@@ -7,7 +7,7 @@ from .. import migration
from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion
from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg
from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config, m014_force_delay_config from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config, m014_force_delay_config
from ..migrations import m015_gitee_ai_config, m016_dify_service_api from ..migrations import m015_gitee_ai_config, m016_dify_service_api, m017_dify_api_timeout_params
@stage.stage_class("MigrationStage") @stage.stage_class("MigrationStage")

View File

@@ -91,7 +91,7 @@ class ChatMessageHandler(handler.MessageHandler):
query.session.using_conversation.messages.extend(query.resp_messages) query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e: except Exception as e:
self.ap.logger.error(f'对话({query.query_id})请求失败: {str(e)}') self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}')
yield entities.StageProcessResult( yield entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT, result_type=entities.ResultType.INTERRUPT,

View File

@@ -20,125 +20,175 @@ class DifyServiceAPIRunner(runner.RequestRunner):
async def initialize(self): async def initialize(self):
"""初始化""" """初始化"""
valid_app_types = ['chat', 'workflow'] valid_app_types = ["chat", "workflow"]
if self.ap.provider_cfg.data['dify-service-api']['app-type'] not in valid_app_types: if (
raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}") self.ap.provider_cfg.data["dify-service-api"]["app-type"]
not in valid_app_types
):
raise errors.DifyAPIError(
f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}"
)
api_key = self.ap.provider_cfg.data['dify-service-api'][self.ap.provider_cfg.data['dify-service-api']['app-type']]['api-key'] api_key = self.ap.provider_cfg.data["dify-service-api"][
self.ap.provider_cfg.data["dify-service-api"]["app-type"]
]["api-key"]
self.dify_client = client.AsyncDifyServiceClient( self.dify_client = client.AsyncDifyServiceClient(
api_key=api_key, api_key=api_key,
base_url=self.ap.provider_cfg.data['dify-service-api']['base-url'] base_url=self.ap.provider_cfg.data["dify-service-api"]["base-url"],
) )
async def _preprocess_user_message(self, query: core_entities.Query) -> tuple[str, list[str]]: async def _preprocess_user_message(
self, query: core_entities.Query
) -> tuple[str, list[str]]:
"""预处理用户消息,提取纯文本,并将图片上传到 Dify 服务 """预处理用户消息,提取纯文本,并将图片上传到 Dify 服务
Returns: Returns:
tuple[str, list[str]]: 纯文本和图片的 Dify 服务图片 ID tuple[str, list[str]]: 纯文本和图片的 Dify 服务图片 ID
""" """
plain_text = '' plain_text = ""
image_ids = [] image_ids = []
if isinstance(query.user_message.content, list): if isinstance(query.user_message.content, list):
for ce in query.user_message.content: for ce in query.user_message.content:
if ce.type == 'text': if ce.type == "text":
plain_text += ce.text plain_text += ce.text
elif ce.type == 'image_url': elif ce.type == "image_url":
file_bytes, image_format = await image.get_qq_image_bytes(ce.image_url.url) file_bytes, image_format = await image.get_qq_image_bytes(
ce.image_url.url
)
file = ("img.png", file_bytes, f"image/{image_format}") file = ("img.png", file_bytes, f"image/{image_format}")
file_upload_resp = await self.dify_client.upload_file(file, f"{query.session.launcher_type.value}_{query.session.launcher_id}") file_upload_resp = await self.dify_client.upload_file(
image_id = file_upload_resp['id'] file,
f"{query.session.launcher_type.value}_{query.session.launcher_id}",
)
image_id = file_upload_resp["id"]
image_ids.append(image_id) image_ids.append(image_id)
elif isinstance(query.user_message.content, str): elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content plain_text = query.user_message.content
return plain_text, image_ids return plain_text, image_ids
async def _chat_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: async def _chat_messages(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""调用聊天助手""" """调用聊天助手"""
cov_id = query.session.using_conversation.uuid or "" cov_id = query.session.using_conversation.uuid or ""
plain_text, image_ids = await self._preprocess_user_message(query) plain_text, image_ids = await self._preprocess_user_message(query)
files = [{ files = [
'type': 'image', {
'transfer_method': 'local_file', "type": "image",
'upload_file_id': image_id, "transfer_method": "local_file",
} for image_id in image_ids] "upload_file_id": image_id,
}
for image_id in image_ids
]
resp = await self.dify_client.chat_messages(inputs={}, query=plain_text, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", conversation_id=cov_id, files=files) resp = await self.dify_client.chat_messages(
inputs={},
query=plain_text,
user=f"{query.session.launcher_type.value}_{query.session.launcher_id}",
conversation_id=cov_id,
files=files,
timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"],
)
msg = llm_entities.Message( msg = llm_entities.Message(
role='assistant', role="assistant",
content=resp['answer'], content=resp["answer"],
) )
yield msg yield msg
query.session.using_conversation.uuid = resp['conversation_id'] query.session.using_conversation.uuid = resp["conversation_id"]
async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: async def _workflow_messages(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""调用工作流""" """调用工作流"""
if not query.session.using_conversation.uuid: if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4()) query.session.using_conversation.uuid = str(uuid.uuid4())
cov_id = query.session.using_conversation.uuid cov_id = query.session.using_conversation.uuid
plain_text, image_ids = await self._preprocess_user_message(query) plain_text, image_ids = await self._preprocess_user_message(query)
files = [{ files = [
'type': 'image', {
'transfer_method': 'local_file', "type": "image",
'upload_file_id': image_id, "transfer_method": "local_file",
} for image_id in image_ids] "upload_file_id": image_id,
}
for image_id in image_ids
]
ignored_events = ['text_chunk', 'workflow_started'] ignored_events = ["text_chunk", "workflow_started"]
async for chunk in self.dify_client.workflow_run(inputs={ async for chunk in self.dify_client.workflow_run(
"langbot_user_message_text": plain_text, inputs={
"langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}", "langbot_user_message_text": plain_text,
"langbot_conversation_id": cov_id, "langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}",
}, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", files=files): "langbot_conversation_id": cov_id,
if chunk['event'] in ignored_events: },
user=f"{query.session.launcher_type.value}_{query.session.launcher_id}",
files=files,
timeout=self.ap.provider_cfg.data["dify-service-api"]["workflow"]["timeout"],
):
if chunk["event"] in ignored_events:
continue continue
if chunk['event'] == 'node_started': if chunk["event"] == "node_started":
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': if (
chunk["data"]["node_type"] == "start"
or chunk["data"]["node_type"] == "end"
):
continue continue
msg = llm_entities.Message( msg = llm_entities.Message(
role='assistant', role="assistant",
content=None, content=None,
tool_calls=[llm_entities.ToolCall( tool_calls=[
id=chunk['data']['node_id'], llm_entities.ToolCall(
type='function', id=chunk["data"]["node_id"],
function=llm_entities.FunctionCall( type="function",
name=chunk['data']['title'], function=llm_entities.FunctionCall(
arguments=json.dumps({}), name=chunk["data"]["title"],
), arguments=json.dumps({}),
)], ),
)
],
) )
yield msg yield msg
elif chunk['event'] == 'workflow_finished': elif chunk["event"] == "workflow_finished":
msg = llm_entities.Message( msg = llm_entities.Message(
role='assistant', role="assistant",
content=chunk['data']['outputs'][self.ap.provider_cfg.data['dify-service-api']['workflow']['output-key']], content=chunk["data"]["outputs"][
self.ap.provider_cfg.data["dify-service-api"]["workflow"][
"output-key"
]
],
) )
yield msg yield msg
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: async def run(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""运行请求""" """运行请求"""
if self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'chat': if self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "chat":
async for msg in self._chat_messages(query): async for msg in self._chat_messages(query):
yield msg yield msg
elif self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'workflow': elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "workflow":
async for msg in self._workflow_messages(query): async for msg in self._workflow_messages(query):
yield msg yield msg
else: else:
raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}") raise errors.DifyAPIError(
f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}"
)

View File

@@ -22,13 +22,3 @@ def install_requirements(file):
pipmain(['install', '-r', file, "-i", "https://pypi.tuna.tsinghua.edu.cn/simple", pipmain(['install', '-r', file, "-i", "https://pypi.tuna.tsinghua.edu.cn/simple",
"--trusted-host", "pypi.tuna.tsinghua.edu.cn"]) "--trusted-host", "pypi.tuna.tsinghua.edu.cn"])
# log.reset_logging() # log.reset_logging()
if __name__ == "__main__":
try:
install("openai11")
except Exception as e:
print(111)
print(e)
print(222)

View File

@@ -62,11 +62,13 @@
"base-url": "https://api.dify.ai/v1", "base-url": "https://api.dify.ai/v1",
"app-type": "chat", "app-type": "chat",
"chat": { "chat": {
"api-key": "app-1234567890" "api-key": "app-1234567890",
"timeout": 120
}, },
"workflow": { "workflow": {
"api-key": "app-1234567890", "api-key": "app-1234567890",
"output-key": "summary" "output-key": "summary",
"timeout": 120
} }
} }
} }