From 32b400dcb1efeb5e53fc9034eda5edfade2b77c0 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 16 Dec 2024 23:54:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20dify=E7=9A=84timeout=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89=20(#949)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../m017_dify_api_timeout_params.py | 19 ++ pkg/core/stages/migrate.py | 2 +- pkg/pipeline/process/handlers/chat.py | 2 +- pkg/provider/runners/difysvapi.py | 162 ++++++++++++------ pkg/utils/pkgmgr.py | 10 -- templates/provider.json | 6 +- 6 files changed, 131 insertions(+), 70 deletions(-) create mode 100644 pkg/core/migrations/m017_dify_api_timeout_params.py diff --git a/pkg/core/migrations/m017_dify_api_timeout_params.py b/pkg/core/migrations/m017_dify_api_timeout_params.py new file mode 100644 index 00000000..e0837732 --- /dev/null +++ b/pkg/core/migrations/m017_dify_api_timeout_params.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class("dify-api-timeout-params", 17) +class DifyAPITimeoutParamsMigration(migration.Migration): + """迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow'] + + async def run(self): + """执行迁移""" + self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120 + self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120 + + await self.ap.provider_cfg.dump_config() diff --git a/pkg/core/stages/migrate.py b/pkg/core/stages/migrate.py index ec70a2ab..3134dc3b 100644 --- a/pkg/core/stages/migrate.py +++ b/pkg/core/stages/migrate.py @@ -7,7 +7,7 @@ from .. import migration from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config, m014_force_delay_config -from ..migrations import m015_gitee_ai_config, m016_dify_service_api +from ..migrations import m015_gitee_ai_config, m016_dify_service_api, m017_dify_api_timeout_params @stage.stage_class("MigrationStage") diff --git a/pkg/pipeline/process/handlers/chat.py b/pkg/pipeline/process/handlers/chat.py index 6e192b78..975b18cc 100644 --- a/pkg/pipeline/process/handlers/chat.py +++ b/pkg/pipeline/process/handlers/chat.py @@ -91,7 +91,7 @@ class ChatMessageHandler(handler.MessageHandler): query.session.using_conversation.messages.extend(query.resp_messages) except Exception as e: - self.ap.logger.error(f'对话({query.query_id})请求失败: {str(e)}') + self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}') yield entities.StageProcessResult( result_type=entities.ResultType.INTERRUPT, diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index c3173d6d..87c9761c 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -20,125 +20,175 @@ class DifyServiceAPIRunner(runner.RequestRunner): async def initialize(self): """初始化""" - valid_app_types = ['chat', 'workflow'] - if self.ap.provider_cfg.data['dify-service-api']['app-type'] not in valid_app_types: - raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}") + valid_app_types = ["chat", "workflow"] + if ( + self.ap.provider_cfg.data["dify-service-api"]["app-type"] + not in valid_app_types + ): + raise errors.DifyAPIError( + f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}" + ) - api_key = self.ap.provider_cfg.data['dify-service-api'][self.ap.provider_cfg.data['dify-service-api']['app-type']]['api-key'] + api_key = self.ap.provider_cfg.data["dify-service-api"][ + self.ap.provider_cfg.data["dify-service-api"]["app-type"] + ]["api-key"] self.dify_client = client.AsyncDifyServiceClient( api_key=api_key, - base_url=self.ap.provider_cfg.data['dify-service-api']['base-url'] + base_url=self.ap.provider_cfg.data["dify-service-api"]["base-url"], ) - async def _preprocess_user_message(self, query: core_entities.Query) -> tuple[str, list[str]]: + async def _preprocess_user_message( + self, query: core_entities.Query + ) -> tuple[str, list[str]]: """预处理用户消息,提取纯文本,并将图片上传到 Dify 服务 - + Returns: tuple[str, list[str]]: 纯文本和图片的 Dify 服务图片 ID """ - plain_text = '' + plain_text = "" image_ids = [] if isinstance(query.user_message.content, list): for ce in query.user_message.content: - if ce.type == 'text': + if ce.type == "text": plain_text += ce.text - elif ce.type == 'image_url': - file_bytes, image_format = await image.get_qq_image_bytes(ce.image_url.url) + elif ce.type == "image_url": + file_bytes, image_format = await image.get_qq_image_bytes( + ce.image_url.url + ) file = ("img.png", file_bytes, f"image/{image_format}") - file_upload_resp = await self.dify_client.upload_file(file, f"{query.session.launcher_type.value}_{query.session.launcher_id}") - image_id = file_upload_resp['id'] + file_upload_resp = await self.dify_client.upload_file( + file, + f"{query.session.launcher_type.value}_{query.session.launcher_id}", + ) + image_id = file_upload_resp["id"] image_ids.append(image_id) elif isinstance(query.user_message.content, str): plain_text = query.user_message.content return plain_text, image_ids - async def _chat_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def _chat_messages( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: """调用聊天助手""" cov_id = query.session.using_conversation.uuid or "" plain_text, image_ids = await self._preprocess_user_message(query) - files = [{ - 'type': 'image', - 'transfer_method': 'local_file', - 'upload_file_id': image_id, - } for image_id in image_ids] + files = [ + { + "type": "image", + "transfer_method": "local_file", + "upload_file_id": image_id, + } + for image_id in image_ids + ] - resp = await self.dify_client.chat_messages(inputs={}, query=plain_text, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", conversation_id=cov_id, files=files) + resp = await self.dify_client.chat_messages( + inputs={}, + query=plain_text, + user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", + conversation_id=cov_id, + files=files, + timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"], + ) msg = llm_entities.Message( - role='assistant', - content=resp['answer'], + role="assistant", + content=resp["answer"], ) yield msg - query.session.using_conversation.uuid = resp['conversation_id'] + query.session.using_conversation.uuid = resp["conversation_id"] - async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def _workflow_messages( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: """调用工作流""" if not query.session.using_conversation.uuid: query.session.using_conversation.uuid = str(uuid.uuid4()) - + cov_id = query.session.using_conversation.uuid plain_text, image_ids = await self._preprocess_user_message(query) - files = [{ - 'type': 'image', - 'transfer_method': 'local_file', - 'upload_file_id': image_id, - } for image_id in image_ids] + files = [ + { + "type": "image", + "transfer_method": "local_file", + "upload_file_id": image_id, + } + for image_id in image_ids + ] - ignored_events = ['text_chunk', 'workflow_started'] + ignored_events = ["text_chunk", "workflow_started"] - async for chunk in self.dify_client.workflow_run(inputs={ - "langbot_user_message_text": plain_text, - "langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}", - "langbot_conversation_id": cov_id, - }, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", files=files): - if chunk['event'] in ignored_events: + async for chunk in self.dify_client.workflow_run( + inputs={ + "langbot_user_message_text": plain_text, + "langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}", + "langbot_conversation_id": cov_id, + }, + user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", + files=files, + timeout=self.ap.provider_cfg.data["dify-service-api"]["workflow"]["timeout"], + ): + + if chunk["event"] in ignored_events: continue - if chunk['event'] == 'node_started': - - if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': + if chunk["event"] == "node_started": + + if ( + chunk["data"]["node_type"] == "start" + or chunk["data"]["node_type"] == "end" + ): continue msg = llm_entities.Message( - role='assistant', + role="assistant", content=None, - tool_calls=[llm_entities.ToolCall( - id=chunk['data']['node_id'], - type='function', - function=llm_entities.FunctionCall( - name=chunk['data']['title'], - arguments=json.dumps({}), - ), - )], + tool_calls=[ + llm_entities.ToolCall( + id=chunk["data"]["node_id"], + type="function", + function=llm_entities.FunctionCall( + name=chunk["data"]["title"], + arguments=json.dumps({}), + ), + ) + ], ) yield msg - elif chunk['event'] == 'workflow_finished': + elif chunk["event"] == "workflow_finished": msg = llm_entities.Message( - role='assistant', - content=chunk['data']['outputs'][self.ap.provider_cfg.data['dify-service-api']['workflow']['output-key']], + role="assistant", + content=chunk["data"]["outputs"][ + self.ap.provider_cfg.data["dify-service-api"]["workflow"][ + "output-key" + ] + ], ) yield msg - async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def run( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: """运行请求""" - if self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'chat': + if self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "chat": async for msg in self._chat_messages(query): yield msg - elif self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'workflow': + elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "workflow": async for msg in self._workflow_messages(query): yield msg else: - raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}") + raise errors.DifyAPIError( + f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}" + ) diff --git a/pkg/utils/pkgmgr.py b/pkg/utils/pkgmgr.py index ed0d3dbf..9c0f8b72 100644 --- a/pkg/utils/pkgmgr.py +++ b/pkg/utils/pkgmgr.py @@ -22,13 +22,3 @@ def install_requirements(file): pipmain(['install', '-r', file, "-i", "https://pypi.tuna.tsinghua.edu.cn/simple", "--trusted-host", "pypi.tuna.tsinghua.edu.cn"]) # log.reset_logging() - - -if __name__ == "__main__": - try: - install("openai11") - except Exception as e: - print(111) - print(e) - - print(222) \ No newline at end of file diff --git a/templates/provider.json b/templates/provider.json index 74f9fb3b..c46ae793 100644 --- a/templates/provider.json +++ b/templates/provider.json @@ -62,11 +62,13 @@ "base-url": "https://api.dify.ai/v1", "app-type": "chat", "chat": { - "api-key": "app-1234567890" + "api-key": "app-1234567890", + "timeout": 120 }, "workflow": { "api-key": "app-1234567890", - "output-key": "summary" + "output-key": "summary", + "timeout": 120 } } } \ No newline at end of file