From 32b400dcb1efeb5e53fc9034eda5edfade2b77c0 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 16 Dec 2024 23:54:56 +0800 Subject: [PATCH 1/4] =?UTF-8?q?fix:=20dify=E7=9A=84timeout=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E8=87=AA=E5=AE=9A=E4=B9=89=20(#949)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../m017_dify_api_timeout_params.py | 19 ++ pkg/core/stages/migrate.py | 2 +- pkg/pipeline/process/handlers/chat.py | 2 +- pkg/provider/runners/difysvapi.py | 162 ++++++++++++------ pkg/utils/pkgmgr.py | 10 -- templates/provider.json | 6 +- 6 files changed, 131 insertions(+), 70 deletions(-) create mode 100644 pkg/core/migrations/m017_dify_api_timeout_params.py diff --git a/pkg/core/migrations/m017_dify_api_timeout_params.py b/pkg/core/migrations/m017_dify_api_timeout_params.py new file mode 100644 index 00000000..e0837732 --- /dev/null +++ b/pkg/core/migrations/m017_dify_api_timeout_params.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class("dify-api-timeout-params", 17) +class DifyAPITimeoutParamsMigration(migration.Migration): + """迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow'] + + async def run(self): + """执行迁移""" + self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120 + self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120 + + await self.ap.provider_cfg.dump_config() diff --git a/pkg/core/stages/migrate.py b/pkg/core/stages/migrate.py index ec70a2ab..3134dc3b 100644 --- a/pkg/core/stages/migrate.py +++ b/pkg/core/stages/migrate.py @@ -7,7 +7,7 @@ from .. import migration from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config, m014_force_delay_config -from ..migrations import m015_gitee_ai_config, m016_dify_service_api +from ..migrations import m015_gitee_ai_config, m016_dify_service_api, m017_dify_api_timeout_params @stage.stage_class("MigrationStage") diff --git a/pkg/pipeline/process/handlers/chat.py b/pkg/pipeline/process/handlers/chat.py index 6e192b78..975b18cc 100644 --- a/pkg/pipeline/process/handlers/chat.py +++ b/pkg/pipeline/process/handlers/chat.py @@ -91,7 +91,7 @@ class ChatMessageHandler(handler.MessageHandler): query.session.using_conversation.messages.extend(query.resp_messages) except Exception as e: - self.ap.logger.error(f'对话({query.query_id})请求失败: {str(e)}') + self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}') yield entities.StageProcessResult( result_type=entities.ResultType.INTERRUPT, diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index c3173d6d..87c9761c 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -20,125 +20,175 @@ class DifyServiceAPIRunner(runner.RequestRunner): async def initialize(self): """初始化""" - valid_app_types = ['chat', 'workflow'] - if self.ap.provider_cfg.data['dify-service-api']['app-type'] not in valid_app_types: - raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}") + valid_app_types = ["chat", "workflow"] + if ( + self.ap.provider_cfg.data["dify-service-api"]["app-type"] + not in valid_app_types + ): + raise errors.DifyAPIError( + f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}" + ) - api_key = self.ap.provider_cfg.data['dify-service-api'][self.ap.provider_cfg.data['dify-service-api']['app-type']]['api-key'] + api_key = self.ap.provider_cfg.data["dify-service-api"][ + self.ap.provider_cfg.data["dify-service-api"]["app-type"] + ]["api-key"] self.dify_client = client.AsyncDifyServiceClient( api_key=api_key, - base_url=self.ap.provider_cfg.data['dify-service-api']['base-url'] + base_url=self.ap.provider_cfg.data["dify-service-api"]["base-url"], ) - async def _preprocess_user_message(self, query: core_entities.Query) -> tuple[str, list[str]]: + async def _preprocess_user_message( + self, query: core_entities.Query + ) -> tuple[str, list[str]]: """预处理用户消息,提取纯文本,并将图片上传到 Dify 服务 - + Returns: tuple[str, list[str]]: 纯文本和图片的 Dify 服务图片 ID """ - plain_text = '' + plain_text = "" image_ids = [] if isinstance(query.user_message.content, list): for ce in query.user_message.content: - if ce.type == 'text': + if ce.type == "text": plain_text += ce.text - elif ce.type == 'image_url': - file_bytes, image_format = await image.get_qq_image_bytes(ce.image_url.url) + elif ce.type == "image_url": + file_bytes, image_format = await image.get_qq_image_bytes( + ce.image_url.url + ) file = ("img.png", file_bytes, f"image/{image_format}") - file_upload_resp = await self.dify_client.upload_file(file, f"{query.session.launcher_type.value}_{query.session.launcher_id}") - image_id = file_upload_resp['id'] + file_upload_resp = await self.dify_client.upload_file( + file, + f"{query.session.launcher_type.value}_{query.session.launcher_id}", + ) + image_id = file_upload_resp["id"] image_ids.append(image_id) elif isinstance(query.user_message.content, str): plain_text = query.user_message.content return plain_text, image_ids - async def _chat_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def _chat_messages( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: """调用聊天助手""" cov_id = query.session.using_conversation.uuid or "" plain_text, image_ids = await self._preprocess_user_message(query) - files = [{ - 'type': 'image', - 'transfer_method': 'local_file', - 'upload_file_id': image_id, - } for image_id in image_ids] + files = [ + { + "type": "image", + "transfer_method": "local_file", + "upload_file_id": image_id, + } + for image_id in image_ids + ] - resp = await self.dify_client.chat_messages(inputs={}, query=plain_text, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", conversation_id=cov_id, files=files) + resp = await self.dify_client.chat_messages( + inputs={}, + query=plain_text, + user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", + conversation_id=cov_id, + files=files, + timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"], + ) msg = llm_entities.Message( - role='assistant', - content=resp['answer'], + role="assistant", + content=resp["answer"], ) yield msg - query.session.using_conversation.uuid = resp['conversation_id'] + query.session.using_conversation.uuid = resp["conversation_id"] - async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def _workflow_messages( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: """调用工作流""" if not query.session.using_conversation.uuid: query.session.using_conversation.uuid = str(uuid.uuid4()) - + cov_id = query.session.using_conversation.uuid plain_text, image_ids = await self._preprocess_user_message(query) - files = [{ - 'type': 'image', - 'transfer_method': 'local_file', - 'upload_file_id': image_id, - } for image_id in image_ids] + files = [ + { + "type": "image", + "transfer_method": "local_file", + "upload_file_id": image_id, + } + for image_id in image_ids + ] - ignored_events = ['text_chunk', 'workflow_started'] + ignored_events = ["text_chunk", "workflow_started"] - async for chunk in self.dify_client.workflow_run(inputs={ - "langbot_user_message_text": plain_text, - "langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}", - "langbot_conversation_id": cov_id, - }, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", files=files): - if chunk['event'] in ignored_events: + async for chunk in self.dify_client.workflow_run( + inputs={ + "langbot_user_message_text": plain_text, + "langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}", + "langbot_conversation_id": cov_id, + }, + user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", + files=files, + timeout=self.ap.provider_cfg.data["dify-service-api"]["workflow"]["timeout"], + ): + + if chunk["event"] in ignored_events: continue - if chunk['event'] == 'node_started': - - if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': + if chunk["event"] == "node_started": + + if ( + chunk["data"]["node_type"] == "start" + or chunk["data"]["node_type"] == "end" + ): continue msg = llm_entities.Message( - role='assistant', + role="assistant", content=None, - tool_calls=[llm_entities.ToolCall( - id=chunk['data']['node_id'], - type='function', - function=llm_entities.FunctionCall( - name=chunk['data']['title'], - arguments=json.dumps({}), - ), - )], + tool_calls=[ + llm_entities.ToolCall( + id=chunk["data"]["node_id"], + type="function", + function=llm_entities.FunctionCall( + name=chunk["data"]["title"], + arguments=json.dumps({}), + ), + ) + ], ) yield msg - elif chunk['event'] == 'workflow_finished': + elif chunk["event"] == "workflow_finished": msg = llm_entities.Message( - role='assistant', - content=chunk['data']['outputs'][self.ap.provider_cfg.data['dify-service-api']['workflow']['output-key']], + role="assistant", + content=chunk["data"]["outputs"][ + self.ap.provider_cfg.data["dify-service-api"]["workflow"][ + "output-key" + ] + ], ) yield msg - async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def run( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: """运行请求""" - if self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'chat': + if self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "chat": async for msg in self._chat_messages(query): yield msg - elif self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'workflow': + elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "workflow": async for msg in self._workflow_messages(query): yield msg else: - raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}") + raise errors.DifyAPIError( + f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}" + ) diff --git a/pkg/utils/pkgmgr.py b/pkg/utils/pkgmgr.py index ed0d3dbf..9c0f8b72 100644 --- a/pkg/utils/pkgmgr.py +++ b/pkg/utils/pkgmgr.py @@ -22,13 +22,3 @@ def install_requirements(file): pipmain(['install', '-r', file, "-i", "https://pypi.tuna.tsinghua.edu.cn/simple", "--trusted-host", "pypi.tuna.tsinghua.edu.cn"]) # log.reset_logging() - - -if __name__ == "__main__": - try: - install("openai11") - except Exception as e: - print(111) - print(e) - - print(222) \ No newline at end of file diff --git a/templates/provider.json b/templates/provider.json index 74f9fb3b..c46ae793 100644 --- a/templates/provider.json +++ b/templates/provider.json @@ -62,11 +62,13 @@ "base-url": "https://api.dify.ai/v1", "app-type": "chat", "chat": { - "api-key": "app-1234567890" + "api-key": "app-1234567890", + "timeout": 120 }, "workflow": { "api-key": "app-1234567890", - "output-key": "summary" + "output-key": "summary", + "timeout": 120 } } } \ No newline at end of file From 6642498f00dafff038a2c3268e7d61717df26adb Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 17 Dec 2024 00:41:28 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AF=B9=20agent?= =?UTF-8?q?=20=E5=BA=94=E7=94=A8=E7=9A=84=E6=94=AF=E6=8C=81=20(#951)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libs/dify_service_api/test.py | 6 +- libs/dify_service_api/v1/client.py | 27 +++--- .../m017_dify_api_timeout_params.py | 7 +- pkg/provider/runners/difysvapi.py | 84 ++++++++++++++++--- templates/provider.json | 4 + 5 files changed, 103 insertions(+), 25 deletions(-) diff --git a/libs/dify_service_api/test.py b/libs/dify_service_api/test.py index 4c2662fa..faf7571a 100644 --- a/libs/dify_service_api/test.py +++ b/libs/dify_service_api/test.py @@ -10,8 +10,8 @@ class TestDifyClient: async def test_chat_messages(self): cln = client.AsyncDifyServiceClient(api_key=os.getenv("DIFY_API_KEY"), base_url=os.getenv("DIFY_BASE_URL")) - resp = await cln.chat_messages(inputs={}, query="Who are you?", user="test") - print(json.dumps(resp, ensure_ascii=False, indent=4)) + async for chunk in cln.chat_messages(inputs={}, query="调用工具查看现在几点?", user="test"): + print(json.dumps(chunk, ensure_ascii=False, indent=4)) async def test_upload_file(self): cln = client.AsyncDifyServiceClient(api_key=os.getenv("DIFY_API_KEY"), base_url=os.getenv("DIFY_BASE_URL")) @@ -41,4 +41,4 @@ class TestDifyClient: print(json.dumps(chunks, ensure_ascii=False, indent=4)) if __name__ == "__main__": - asyncio.run(TestDifyClient().test_workflow_run()) + asyncio.run(TestDifyClient().test_chat_messages()) diff --git a/libs/dify_service_api/v1/client.py b/libs/dify_service_api/v1/client.py index 91b60052..efa70ea5 100644 --- a/libs/dify_service_api/v1/client.py +++ b/libs/dify_service_api/v1/client.py @@ -26,21 +26,22 @@ class AsyncDifyServiceClient: inputs: dict[str, typing.Any], query: str, user: str, - response_mode: str = "blocking", # 当前不支持 streaming + response_mode: str = "streaming", # 当前不支持 blocking conversation_id: str = "", files: list[dict[str, typing.Any]] = [], timeout: float = 30.0, - ) -> dict[str, typing.Any]: + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: """发送消息""" - if response_mode != "blocking": - raise DifyAPIError("当前仅支持 blocking 模式") + if response_mode != "streaming": + raise DifyAPIError("当前仅支持 streaming 模式") async with httpx.AsyncClient( base_url=self.base_url, trust_env=True, timeout=timeout, ) as client: - response = await client.post( + async with client.stream( + "POST", "/chat-messages", headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}, json={ @@ -51,12 +52,14 @@ class AsyncDifyServiceClient: "conversation_id": conversation_id, "files": files, }, - ) - - if response.status_code != 200: - raise DifyAPIError(f"{response.status_code} {response.text}") - - return response.json() + ) as r: + async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise DifyAPIError(f"{r.status_code} {chunk}") + if chunk.strip() == "": + continue + if chunk.startswith("data:"): + yield json.loads(chunk[5:]) async def workflow_run( self, @@ -88,6 +91,8 @@ class AsyncDifyServiceClient: }, ) as r: async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise DifyAPIError(f"{r.status_code} {chunk}") if chunk.strip() == "": continue if chunk.startswith("data:"): diff --git a/pkg/core/migrations/m017_dify_api_timeout_params.py b/pkg/core/migrations/m017_dify_api_timeout_params.py index e0837732..a0e502a4 100644 --- a/pkg/core/migrations/m017_dify_api_timeout_params.py +++ b/pkg/core/migrations/m017_dify_api_timeout_params.py @@ -9,11 +9,16 @@ class DifyAPITimeoutParamsMigration(migration.Migration): async def need_migrate(self) -> bool: """判断当前环境是否需要运行此迁移""" - return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow'] + return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow'] \ + or 'agent' not in self.ap.provider_cfg.data['dify-service-api'] async def run(self): """执行迁移""" self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120 self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120 + self.ap.provider_cfg.data['dify-service-api']['agent'] = { + "api-key": "app-1234567890", + "timeout": 120 + } await self.ap.provider_cfg.dump_config() diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index 87c9761c..4fed4277 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -20,7 +20,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): async def initialize(self): """初始化""" - valid_app_types = ["chat", "workflow"] + valid_app_types = ["chat", "agent", "workflow"] if ( self.ap.provider_cfg.data["dify-service-api"]["app-type"] not in valid_app_types @@ -85,23 +85,84 @@ class DifyServiceAPIRunner(runner.RequestRunner): for image_id in image_ids ] - resp = await self.dify_client.chat_messages( + async for chunk in self.dify_client.chat_messages( inputs={}, query=plain_text, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", conversation_id=cov_id, files=files, timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"], - ) + ): + self.ap.logger.debug("dify-chat-chunk: "+chunk) + if chunk['event'] == 'node_finished': + if chunk['data']['node_type'] == 'answer': + yield llm_entities.Message( + role="assistant", + content=chunk['data']['outputs']['answer'], + ) - msg = llm_entities.Message( - role="assistant", - content=resp["answer"], - ) + query.session.using_conversation.uuid = chunk["conversation_id"] - yield msg + async def _agent_chat_messages( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message, None]: + """调用聊天助手""" + cov_id = query.session.using_conversation.uuid or "" - query.session.using_conversation.uuid = resp["conversation_id"] + plain_text, image_ids = await self._preprocess_user_message(query) + + files = [ + { + "type": "image", + "transfer_method": "local_file", + "upload_file_id": image_id, + } + for image_id in image_ids + ] + + ignored_events = ["agent_message"] + + async for chunk in self.dify_client.chat_messages( + inputs={}, + query=plain_text, + user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", + response_mode="streaming", + conversation_id=cov_id, + files=files, + timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"], + ): + self.ap.logger.debug("dify-agent-chunk: "+chunk) + if chunk["event"] in ignored_events: + continue + if chunk["event"] == "agent_thought": + + if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过 + continue + + if chunk['thought'].strip() != '': # 文字回复内容 + msg = llm_entities.Message( + role="assistant", + content=chunk["thought"], + ) + yield msg + + if chunk['tool']: + msg = llm_entities.Message( + role="assistant", + tool_calls=[ + llm_entities.ToolCall( + id=chunk['id'], + type="function", + function=llm_entities.FunctionCall( + name=chunk["tool"], + arguments=json.dumps({}), + ), + ) + ], + ) + yield msg + + query.session.using_conversation.uuid = chunk["conversation_id"] async def _workflow_messages( self, query: core_entities.Query @@ -136,7 +197,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): files=files, timeout=self.ap.provider_cfg.data["dify-service-api"]["workflow"]["timeout"], ): - + self.ap.logger.debug("dify-workflow-chunk: "+chunk) if chunk["event"] in ignored_events: continue @@ -185,6 +246,9 @@ class DifyServiceAPIRunner(runner.RequestRunner): if self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "chat": async for msg in self._chat_messages(query): yield msg + elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "agent": + async for msg in self._agent_chat_messages(query): + yield msg elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "workflow": async for msg in self._workflow_messages(query): yield msg diff --git a/templates/provider.json b/templates/provider.json index c46ae793..30656f8c 100644 --- a/templates/provider.json +++ b/templates/provider.json @@ -65,6 +65,10 @@ "api-key": "app-1234567890", "timeout": 120 }, + "agent": { + "api-key": "app-1234567890", + "timeout": 120 + }, "workflow": { "api-key": "app-1234567890", "output-key": "summary", From 793d64303e81d20ed7b2afe13ca03a1e71ffca55 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 17 Dec 2024 01:04:08 +0800 Subject: [PATCH 3/4] =?UTF-8?q?perf:=20=E5=AE=8C=E5=96=84dify=20api=20runn?= =?UTF-8?q?er?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/provider/runners/difysvapi.py | 32 +++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/provider/runners/difysvapi.py b/pkg/provider/runners/difysvapi.py index 4fed4277..beb49115 100644 --- a/pkg/provider/runners/difysvapi.py +++ b/pkg/provider/runners/difysvapi.py @@ -85,6 +85,10 @@ class DifyServiceAPIRunner(runner.RequestRunner): for image_id in image_ids ] + mode = "basic" # 标记是基础编排还是工作流编排 + + basic_mode_pending_chunk = '' + async for chunk in self.dify_client.chat_messages( inputs={}, query=plain_text, @@ -93,13 +97,27 @@ class DifyServiceAPIRunner(runner.RequestRunner): files=files, timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"], ): - self.ap.logger.debug("dify-chat-chunk: "+chunk) - if chunk['event'] == 'node_finished': - if chunk['data']['node_type'] == 'answer': + self.ap.logger.debug("dify-chat-chunk: ", chunk) + + if chunk['event'] == 'workflow_started': + mode = "workflow" + + if mode == "workflow": + if chunk['event'] == 'node_finished': + if chunk['data']['node_type'] == 'answer': + yield llm_entities.Message( + role="assistant", + content=chunk['data']['outputs']['answer'], + ) + elif mode == "basic": + if chunk['event'] == 'message': + basic_mode_pending_chunk += chunk['answer'] + elif chunk['event'] == 'message_end': yield llm_entities.Message( role="assistant", - content=chunk['data']['outputs']['answer'], + content=basic_mode_pending_chunk, ) + basic_mode_pending_chunk = '' query.session.using_conversation.uuid = chunk["conversation_id"] @@ -131,7 +149,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): files=files, timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"], ): - self.ap.logger.debug("dify-agent-chunk: "+chunk) + self.ap.logger.debug("dify-agent-chunk: ", chunk) if chunk["event"] in ignored_events: continue if chunk["event"] == "agent_thought": @@ -197,7 +215,7 @@ class DifyServiceAPIRunner(runner.RequestRunner): files=files, timeout=self.ap.provider_cfg.data["dify-service-api"]["workflow"]["timeout"], ): - self.ap.logger.debug("dify-workflow-chunk: "+chunk) + self.ap.logger.debug("dify-workflow-chunk: ", chunk) if chunk["event"] in ignored_events: continue @@ -227,6 +245,8 @@ class DifyServiceAPIRunner(runner.RequestRunner): yield msg elif chunk["event"] == "workflow_finished": + if chunk['data']['error']: + raise errors.DifyAPIError(chunk['data']['error']) msg = llm_entities.Message( role="assistant", From 3314a7a9e9a824d293e6c24d631f1c4b3f5a48e5 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 17 Dec 2024 01:17:57 +0800 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20=E5=9C=A8=E8=AE=BE=E7=BD=AEmodel?= =?UTF-8?q?=E4=B8=BA=E9=9D=9E=E8=A7=86=E8=A7=89=E6=A8=A1=E5=9E=8B=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E9=9D=9Elocal-agent=E7=9A=84runner=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E8=8E=B7=E5=BE=97=E5=9B=BE=E7=89=87=E6=B6=88=E6=81=AF=20(#948)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libs/dify_service_api/v1/client.py | 4 ---- pkg/pipeline/preproc/preproc.py | 6 +++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/libs/dify_service_api/v1/client.py b/libs/dify_service_api/v1/client.py index efa70ea5..70a804b7 100644 --- a/libs/dify_service_api/v1/client.py +++ b/libs/dify_service_api/v1/client.py @@ -105,10 +105,6 @@ class AsyncDifyServiceClient: timeout: float = 30.0, ) -> str: """上传文件""" - # curl -X POST 'http://dify.rockchin.top/v1/files/upload' \ - # --header 'Authorization: Bearer {api_key}' \ - # --form 'file=@localfile;type=image/[png|jpeg|jpg|webp|gif] \ - # --form 'user=abc-123' async with httpx.AsyncClient( base_url=self.base_url, trust_env=True, diff --git a/pkg/pipeline/preproc/preproc.py b/pkg/pipeline/preproc/preproc.py index 3a71a841..4fa32c65 100644 --- a/pkg/pipeline/preproc/preproc.py +++ b/pkg/pipeline/preproc/preproc.py @@ -45,7 +45,7 @@ class PreProcessor(stage.PipelineStage): # 检查vision是否启用,没启用就删除所有图片 - if not self.ap.provider_cfg.data['enable-vision'] or not query.use_model.vision_supported: + if not self.ap.provider_cfg.data['enable-vision'] or (self.ap.provider_cfg.data['runner'] == 'local-agent' and not query.use_model.vision_supported): for msg in query.messages: if isinstance(msg.content, list): for me in msg.content: @@ -60,13 +60,13 @@ class PreProcessor(stage.PipelineStage): llm_entities.ContentElement.from_text(me.text) ) elif isinstance(me, platform_message.Image): - if self.ap.provider_cfg.data['enable-vision'] and query.use_model.vision_supported: + if self.ap.provider_cfg.data['enable-vision'] and (self.ap.provider_cfg.data['runner'] != 'local-agent' or query.use_model.vision_supported): if me.url is not None: content_list.append( llm_entities.ContentElement.from_image_url(str(me.url)) ) - query.user_message = llm_entities.Message( # TODO 适配多模态输入 + query.user_message = llm_entities.Message( role='user', content=content_list )