diff --git a/pkg/api/http/controller/groups/pipelines/webchat.py b/pkg/api/http/controller/groups/pipelines/webchat.py index c094731b..ae201934 100644 --- a/pkg/api/http/controller/groups/pipelines/webchat.py +++ b/pkg/api/http/controller/groups/pipelines/webchat.py @@ -13,6 +13,7 @@ class WebChatDebugRouterGroup(group.RouterGroup): """Send a message to the pipeline for debugging""" async def stream_generator(generator): + yield 'data: {"type": "start"}\n\n' async for message in generator: yield f'data: {json.dumps({"message": message})}\n\n' yield 'data: {"type": "end"}\n\n' @@ -38,8 +39,14 @@ class WebChatDebugRouterGroup(group.RouterGroup): generator = webchat_adapter.send_webchat_message( pipeline_uuid, session_type, message_chain_obj, is_stream ) - - return quart.Response(stream_generator(generator), mimetype='text/event-stream') + # 设置正确的响应头 + headers = { + 'Content-Type': 'text/event-stream', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive' + } + return quart.Response(stream_generator(generator), mimetype='text/event-stream',headers=headers) else: # result = await webchat_adapter.send_webchat_message(pipeline_uuid, session_type, message_chain_obj) diff --git a/pkg/pipeline/pipelinemgr.py b/pkg/pipeline/pipelinemgr.py index 77df09dc..79bd4ec6 100644 --- a/pkg/pipeline/pipelinemgr.py +++ b/pkg/pipeline/pipelinemgr.py @@ -93,12 +93,21 @@ class RuntimePipeline: query.message_event, platform_events.GroupMessage ): result.user_notice.insert(0, platform_message.At(query.message_event.sender.id)) - - await query.adapter.reply_message( - message_source=query.message_event, - message=result.user_notice, - quote_origin=query.pipeline_config['output']['misc']['quote-origin'], - ) + if await query.adapter.is_stream_output_supported(): + print(query.resp_messages[-1]) + await query.adapter.reply_message_chunk( + message_source=query.message_event, + message_id=str(query.resp_messages[-1].resp_message_id), + message=result.user_notice, + quote_origin=query.pipeline_config['output']['misc']['quote-origin'], + is_final=[msg.is_final for msg in query.resp_messages][0] + ) + else: + await query.adapter.reply_message( + message_source=query.message_event, + message=result.user_notice, + quote_origin=query.pipeline_config['output']['misc']['quote-origin'], + ) if result.debug_notice: self.ap.logger.debug(result.debug_notice) if result.console_notice: diff --git a/pkg/pipeline/process/handlers/chat.py b/pkg/pipeline/process/handlers/chat.py index 6c428473..e913bbc2 100644 --- a/pkg/pipeline/process/handlers/chat.py +++ b/pkg/pipeline/process/handlers/chat.py @@ -72,9 +72,9 @@ class ChatMessageHandler(handler.MessageHandler): raise ValueError(f'未找到请求运行器: {query.pipeline_config["ai"]["runner"]["runner"]}') if is_stream: resp_message_id = uuid.uuid4() - await query.adapter.create_message_card(resp_message_id, query.message_event) + await query.adapter.create_message_card(str(resp_message_id), query.message_event) async for result in runner.run(query): - result.resp_message_id = resp_message_id + result.resp_message_id = str(resp_message_id) if query.resp_messages: query.resp_messages.pop() if query.resp_message_chain: diff --git a/pkg/pipeline/respback/respback.py b/pkg/pipeline/respback/respback.py index bc91dffe..5683ce92 100644 --- a/pkg/pipeline/respback/respback.py +++ b/pkg/pipeline/respback/respback.py @@ -41,6 +41,7 @@ class SendResponseBackStage(stage.PipelineStage): # TODO 命令与流式的兼容性问题 if await query.adapter.is_stream_output_supported(): is_final = [msg.is_final for msg in query.resp_messages][0] + print(query.resp_messages[-1]) await query.adapter.reply_message_chunk( message_source=query.message_event, message_id=query.resp_messages[-1].resp_message_id, diff --git a/pkg/provider/modelmgr/requesters/anthropicmsgs.py b/pkg/provider/modelmgr/requesters/anthropicmsgs.py index f89fb136..a337af4f 100644 --- a/pkg/provider/modelmgr/requesters/anthropicmsgs.py +++ b/pkg/provider/modelmgr/requesters/anthropicmsgs.py @@ -273,13 +273,11 @@ class AnthropicMessages(requester.ProviderAPIRequester): del msg_dict['tool_calls'] req_messages.append(msg_dict) - if args["thinking"]: + if args.get("thinking", False): args['thinking'] = { "type": "enabled", "budget_tokens": 10000 } - else: - args.pop('thinking') args['messages'] = req_messages @@ -296,15 +294,32 @@ class AnthropicMessages(requester.ProviderAPIRequester): think_ended = False finish_reason = False content = '' + tool_name = '' + tool_id = '' + tool_calls = [] async for chunk in await self.client.messages.create(**args): - # print(chunk) + print(chunk) + tool_call = {"id":None, 'function': {"name": None, "arguments": None},'type':'function'} if isinstance(chunk, anthropic.types.raw_content_block_start_event.RawContentBlockStartEvent): # 记录开始 + if chunk.content_block.type == 'tool_use': + + if chunk.content_block.name is not None: + tool_name = chunk.content_block.name + if chunk.content_block.id is not None: + tool_id = chunk.content_block.id + + + tool_call['function']['name'] = tool_name + tool_call['function']['arguments'] = '' + tool_call['id'] = tool_id + + print(chunk.content_block) if not remove_think: - if chunk.content_block.type == 'thinking': + if chunk.content_block.type == 'thinking' and not remove_think: think_started = True - elif chunk.content_block.type == 'text': + elif chunk.content_block.type == 'text' and chunk.index != 0 and not remove_think: think_ended = True - continue + continue elif isinstance(chunk, anthropic.types.raw_content_block_delta_event.RawContentBlockDeltaEvent): if chunk.delta.type == "thinking_delta": if think_started: @@ -320,6 +335,10 @@ class AnthropicMessages(requester.ProviderAPIRequester): content = '\n\n' + chunk.delta.text else: content = chunk.delta.text + elif chunk.delta.type == "input_json_delta": + tool_call['function']["arguments"] = chunk.delta.partial_json + tool_call['function']['name'] = tool_name + tool_call['id'] = tool_id elif isinstance(chunk, anthropic.types.raw_content_block_stop_event.RawContentBlockStopEvent): continue # 记录raw_content_block结束的 @@ -333,10 +352,12 @@ class AnthropicMessages(requester.ProviderAPIRequester): continue + args = { 'content': content, 'role': role, - "is_final": finish_reason + "is_final": finish_reason, + 'tool_calls': None if tool_call['id'] is None else [tool_call], } # if chunk_idx == 0: # chunk_idx += 1 diff --git a/pkg/provider/modelmgr/requesters/chatcmpl.py b/pkg/provider/modelmgr/requesters/chatcmpl.py index 2d2a0b7e..7afda84f 100644 --- a/pkg/provider/modelmgr/requesters/chatcmpl.py +++ b/pkg/provider/modelmgr/requesters/chatcmpl.py @@ -160,18 +160,21 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): thinking_started = False thinking_ended = False role = 'assistant' # 默认角色 + tool_id = "" + tool_name = '' # accumulated_reasoning = '' # 仅用于判断何时结束思维链 async for chunk in self._req_stream(args, extra_body=extra_args): # 解析 chunk 数据 + if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + finish_reason = getattr(choice, 'finish_reason', None) else: delta = {} finish_reason = None - # 从第一个 chunk 获取 role,后续使用这个 role if 'role' in delta and delta['role']: role = delta['role'] @@ -208,41 +211,29 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): # delta_content = re.sub(r'.*?', '', delta_content, flags=re.DOTALL) # 处理工具调用增量 - delta_tool_calls = None + # delta_tool_calls = None if delta.get('tool_calls'): - delta_tool_calls = [] for tool_call in delta['tool_calls']: - tc_id = tool_call.get('id') - if tc_id: - if tc_id not in tool_calls_map: - # 新的工具调用 - tool_calls_map[tc_id] = llm_entities.ToolCall( - id=tc_id, - type=tool_call.get('type', 'function'), - function=llm_entities.FunctionCall( - name=tool_call.get('function', {}).get('name', ''), - arguments=tool_call.get('function', {}).get('arguments', ''), - ), - ) - delta_tool_calls.append(tool_calls_map[tc_id]) - else: - # 追加函数参数 - func_args = tool_call.get('function', {}).get('arguments', '') - if func_args: - tool_calls_map[tc_id].function.arguments += func_args - # 返回更新后的完整工具调用 - delta_tool_calls.append(tool_calls_map[tc_id]) + if tool_call['id'] and tool_call['function']['name']: + tool_id = tool_call['id'] + tool_name = tool_call['function']['name'] + else: + tool_call['id'] = tool_id + tool_call['function']['name'] = tool_name + if tool_call['type'] is None: + tool_call['type'] = 'function' + + # 跳过空的第一个 chunk(只有 role 没有内容) if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'): chunk_idx += 1 continue - # 构建 MessageChunk - 只包含增量内容 chunk_data = { 'role': role, 'content': delta_content if delta_content else None, - 'tool_calls': delta_tool_calls if delta_tool_calls else None, + 'tool_calls': delta.get('tool_calls'), 'is_final': bool(finish_reason), } @@ -289,7 +280,6 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): # 发送请求 resp = await self._req(args, extra_body=extra_args) - print(resp) # 处理请求结果 message = await self._make_msg(resp, remove_think) diff --git a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py index c526313a..9d8861da 100644 --- a/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py +++ b/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py @@ -289,30 +289,16 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): # delta_content = re.sub(r'.*?', '', delta_content, flags=re.DOTALL) # 处理工具调用增量 - delta_tool_calls = None if delta.get('tool_calls'): - delta_tool_calls = [] for tool_call in delta['tool_calls']: - tc_id = tool_call.get('id') - if tc_id: - if tc_id not in tool_calls_map: - # 新的工具调用 - tool_calls_map[tc_id] = llm_entities.ToolCall( - id=tc_id, - type=tool_call.get('type', 'function'), - function=llm_entities.FunctionCall( - name=tool_call.get('function', {}).get('name', ''), - arguments=tool_call.get('function', {}).get('arguments', ''), - ), - ) - delta_tool_calls.append(tool_calls_map[tc_id]) - else: - # 追加函数参数 - func_args = tool_call.get('function', {}).get('arguments', '') - if func_args: - tool_calls_map[tc_id].function.arguments += func_args - # 返回更新后的完整工具调用 - delta_tool_calls.append(tool_calls_map[tc_id]) + if tool_call['id'] and tool_call['function']['name']: + tool_id = tool_call['id'] + tool_name = tool_call['function']['name'] + else: + tool_call['id'] = tool_id + tool_call['function']['name'] = tool_name + if tool_call['type'] is None: + tool_call['type'] = 'function' # 跳过空的第一个 chunk(只有 role 没有内容) if chunk_idx == 0 and not delta_content and not reasoning_content and not delta.get('tool_calls'): @@ -323,7 +309,7 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): chunk_data = { 'role': role, 'content': delta_content if delta_content else None, - 'tool_calls': delta_tool_calls if delta_tool_calls else None, + 'tool_calls': delta.get('tool_calls'), 'is_final': bool(finish_reason), } diff --git a/pkg/provider/modelmgr/requesters/ppiochatcmpl.py b/pkg/provider/modelmgr/requesters/ppiochatcmpl.py index 967bb676..4af1cde0 100644 --- a/pkg/provider/modelmgr/requesters/ppiochatcmpl.py +++ b/pkg/provider/modelmgr/requesters/ppiochatcmpl.py @@ -179,39 +179,30 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): delta_tool_calls = None if delta.get('tool_calls'): - delta_tool_calls = [] for tool_call in delta['tool_calls']: - tc_id = tool_call.get('id') - if tc_id: - if tc_id not in tool_calls_map: - # 新的工具调用 - tool_calls_map[tc_id] = llm_entities.ToolCall( - id=tc_id, - type=tool_call.get('type', 'function'), - function=llm_entities.FunctionCall( - name=tool_call.get('function', {}).get('name', ''), - arguments=tool_call.get('function', {}).get('arguments', ''), - ), - ) - delta_tool_calls.append(tool_calls_map[tc_id]) - else: - # 追加函数参数 - func_args = tool_call.get('function', {}).get('arguments', '') - if func_args: - tool_calls_map[tc_id].function.arguments += func_args - # 返回更新后的完整工具调用 - delta_tool_calls.append(tool_calls_map[tc_id]) + if tool_call['id'] and tool_call['function']['name']: + tool_id = tool_call['id'] + tool_name = tool_call['function']['name'] + + if tool_call['id'] is None: + tool_call['id'] = tool_id + if tool_call['function']['name'] is None: + tool_call['function']['name'] = tool_name + if tool_call['function']['arguments'] is None: + tool_call['function']['arguments'] = '' + if tool_call['type'] is None: + tool_call['type'] = 'function' # 跳过空的第一个 chunk(只有 role 没有内容) if chunk_idx == 0 and not delta_content and not delta.get('tool_calls'): chunk_idx += 1 continue - # 构建 MessageChunk - 只包含增量内容 + # 构建 MessageChunk - 只包含增量内容 chunk_data = { 'role': role, 'content': delta_content if delta_content else None, - 'tool_calls': delta_tool_calls if delta_tool_calls else None, + 'tool_calls': delta.get('tool_calls'), 'is_final': bool(finish_reason), } diff --git a/pkg/provider/runners/localagent.py b/pkg/provider/runners/localagent.py index 754082ea..73d873ec 100644 --- a/pkg/provider/runners/localagent.py +++ b/pkg/provider/runners/localagent.py @@ -148,13 +148,13 @@ class LocalAgentRunner(runner.RequestRunner): if tool_call.function and tool_call.function.arguments: # 流式处理中,工具调用参数可能分多个chunk返回,需要追加而不是覆盖 tool_calls_map[tool_call.id].function.arguments += tool_call.function.arguments - + print(list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None) + # continue # 每8个chunk或最后一个chunk时,输出所有累积的内容 if msg_idx % 8 == 0 or msg.is_final: yield llm_entities.MessageChunk( role=last_role, content=accumulated_content, # 输出所有累积内容 - tool_calls=list(tool_calls_map.values()) if (tool_calls_map and msg.is_final) else None, is_final=msg.is_final, ) @@ -178,12 +178,18 @@ class LocalAgentRunner(runner.RequestRunner): parameters = json.loads(func.arguments) func_ret = await self.ap.tool_mgr.execute_func_call(query, func.name, parameters) - - msg = llm_entities.Message( - role='tool', - content=json.dumps(func_ret, ensure_ascii=False), - tool_call_id=tool_call.id, - ) + if is_stream: + msg = llm_entities.MessageChunk( + role='tool', + content=json.dumps(func_ret, ensure_ascii=False), + tool_call_id=tool_call.id, + ) + else: + msg = llm_entities.Message( + role='tool', + content=json.dumps(func_ret, ensure_ascii=False), + tool_call_id=tool_call.id, + ) yield msg diff --git a/web/src/app/infra/http/HttpClient.ts b/web/src/app/infra/http/HttpClient.ts index f6ff6a50..35c18680 100644 --- a/web/src/app/infra/http/HttpClient.ts +++ b/web/src/app/infra/http/HttpClient.ts @@ -443,6 +443,9 @@ class HttpClient { onComplete(); return; } + if (data.type === 'start') { + console,log(data.type) + } if (data.message) { // 处理消息数据