From 60ca688bcb82b82d6c9412806dd9af5e26161ce4 Mon Sep 17 00:00:00 2001 From: fdc310 <82008029+fdc310@users.noreply.github.com> Date: Tue, 23 Dec 2025 09:42:26 +0800 Subject: [PATCH] Fix/Incomplete JSON data returned by N8N streaming data causes the loss of chunks. (#1880) * fix: Incomplete JSON data returned by N8N streaming data causes the loss of chunks. --- src/langbot/pkg/provider/runners/n8nsvapi.py | 81 ++++++++++++++++---- 1 file changed, 68 insertions(+), 13 deletions(-) diff --git a/src/langbot/pkg/provider/runners/n8nsvapi.py b/src/langbot/pkg/provider/runners/n8nsvapi.py index 5c565007..89cb6679 100644 --- a/src/langbot/pkg/provider/runners/n8nsvapi.py +++ b/src/langbot/pkg/provider/runners/n8nsvapi.py @@ -70,24 +70,78 @@ class N8nServiceAPIRunner(runner.RequestRunner): async def _process_stream_response(self, response: aiohttp.ClientResponse) -> typing.AsyncGenerator[ provider_message.Message, None]: - """处理流式响应""" + """处理流式响应——支持部分 JSON 和多个 JSON 对象在同一 chunk 的情况""" full_content = "" chunk_idx = 0 is_final = False message_idx = 0 - async for chunk in response.content.iter_chunked(1024): - if not chunk: + + buffer = "" + decoder = json.JSONDecoder() + + async for raw_chunk in response.content.iter_chunked(1024): + if not raw_chunk: continue try: - data = json.loads(chunk) - if data.get('type') == 'item' and 'content' in data: - chunk_idx += 1 - content = data['content'] - full_content += content - elif data.get('type') == 'end': - is_final = True - if is_final or chunk_idx % 8 == 0: + # 将 bytes 解码为字符串(容忍错误) + if isinstance(raw_chunk, (bytes, bytearray)): + chunk_str = raw_chunk.decode('utf-8', errors='replace') + else: + chunk_str = str(raw_chunk) + + buffer += chunk_str + + # 尝试从 buffer 中循环解析出 JSON 对象(处理多个对象或部分对象) + while buffer: + buffer = buffer.lstrip() + if not buffer: + break + try: + obj, idx = decoder.raw_decode(buffer) + buffer = buffer[idx:] + + if not isinstance(obj, dict): + # 忽略非字典类型的顶级 JSON + continue + + if obj.get('type') == 'item' and 'content' in obj: + chunk_idx += 1 + content = obj['content'] + full_content += content + elif obj.get('type') == 'end': + is_final = True + + if is_final or chunk_idx % 8 == 0: + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + content=full_content, + is_final=is_final, + msg_sequence=message_idx, + ) + except json.JSONDecodeError: + # buffer 末尾可能是一个不完整的 JSON,等待更多数据 + break + except Exception as e: + # 记录解析失败并继续接收后续 chunk + try: + preview = chunk_str[:200] + except Exception: + preview = '' + self.ap.logger.warning(f"Failed to process chunk: {e}; chunk preview: {preview}") + + # 流结束后,尝试解析残余 buffer + if buffer: + try: + buffer = buffer.strip() + if buffer: + obj, _ = decoder.raw_decode(buffer) + if isinstance(obj, dict): + if obj.get('type') == 'item' and 'content' in obj: + full_content += obj['content'] + elif obj.get('type') == 'end': + is_final = True message_idx += 1 yield provider_message.MessageChunk( role='assistant', @@ -95,8 +149,9 @@ class N8nServiceAPIRunner(runner.RequestRunner): is_final=is_final, msg_sequence=message_idx, ) - except json.JSONDecodeError: - self.ap.logger.warning(f"Failed to parse final JSON line: {response.text()}") + except Exception as e: + preview = buffer[:200] + self.ap.logger.warning(f"Failed to parse remaining buffer: {e}; buffer preview: {preview}") async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: """调用n8n webhook"""