Fix/Incomplete JSON data returned by N8N streaming data causes the loss of chunks. (#1880)

* fix: Incomplete JSON data returned by N8N streaming data causes the loss of chunks.
This commit is contained in:
fdc310
2025-12-23 09:42:26 +08:00
committed by GitHub
parent 76d8eea41d
commit 60ca688bcb

View File

@@ -70,24 +70,78 @@ class N8nServiceAPIRunner(runner.RequestRunner):
async def _process_stream_response(self, response: aiohttp.ClientResponse) -> typing.AsyncGenerator[
provider_message.Message, None]:
"""处理流式响应"""
"""处理流式响应——支持部分 JSON 和多个 JSON 对象在同一 chunk 的情况"""
full_content = ""
chunk_idx = 0
is_final = False
message_idx = 0
async for chunk in response.content.iter_chunked(1024):
if not chunk:
buffer = ""
decoder = json.JSONDecoder()
async for raw_chunk in response.content.iter_chunked(1024):
if not raw_chunk:
continue
try:
data = json.loads(chunk)
if data.get('type') == 'item' and 'content' in data:
chunk_idx += 1
content = data['content']
full_content += content
elif data.get('type') == 'end':
is_final = True
if is_final or chunk_idx % 8 == 0:
# 将 bytes 解码为字符串(容忍错误)
if isinstance(raw_chunk, (bytes, bytearray)):
chunk_str = raw_chunk.decode('utf-8', errors='replace')
else:
chunk_str = str(raw_chunk)
buffer += chunk_str
# 尝试从 buffer 中循环解析出 JSON 对象(处理多个对象或部分对象)
while buffer:
buffer = buffer.lstrip()
if not buffer:
break
try:
obj, idx = decoder.raw_decode(buffer)
buffer = buffer[idx:]
if not isinstance(obj, dict):
# 忽略非字典类型的顶级 JSON
continue
if obj.get('type') == 'item' and 'content' in obj:
chunk_idx += 1
content = obj['content']
full_content += content
elif obj.get('type') == 'end':
is_final = True
if is_final or chunk_idx % 8 == 0:
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
content=full_content,
is_final=is_final,
msg_sequence=message_idx,
)
except json.JSONDecodeError:
# buffer 末尾可能是一个不完整的 JSON等待更多数据
break
except Exception as e:
# 记录解析失败并继续接收后续 chunk
try:
preview = chunk_str[:200]
except Exception:
preview = '<unavailable>'
self.ap.logger.warning(f"Failed to process chunk: {e}; chunk preview: {preview}")
# 流结束后,尝试解析残余 buffer
if buffer:
try:
buffer = buffer.strip()
if buffer:
obj, _ = decoder.raw_decode(buffer)
if isinstance(obj, dict):
if obj.get('type') == 'item' and 'content' in obj:
full_content += obj['content']
elif obj.get('type') == 'end':
is_final = True
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
@@ -95,8 +149,9 @@ class N8nServiceAPIRunner(runner.RequestRunner):
is_final=is_final,
msg_sequence=message_idx,
)
except json.JSONDecodeError:
self.ap.logger.warning(f"Failed to parse final JSON line: {response.text()}")
except Exception as e:
preview = buffer[:200]
self.ap.logger.warning(f"Failed to parse remaining buffer: {e}; buffer preview: {preview}")
async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用n8n webhook"""