fix(n8n-runner): fix output_key not applied when n8n returns plain JSON (#2119)

This commit is contained in:
hzhhong
2026-04-16 22:15:57 +08:00
committed by GitHub
parent a715eddd06
commit c8915ca964
2 changed files with 370 additions and 44 deletions

View File

@@ -70,11 +70,12 @@ class N8nServiceAPIRunner(runner.RequestRunner):
return plain_text
async def _process_stream_response(
async def _process_response(
self, response: aiohttp.ClientResponse
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""处理流式响应——支持部分 JSON 和多个 JSON 对象在同一 chunk 的情况"""
"""处理响应——支持流式格式和普通 JSON 格式"""
full_content = ''
full_text = ''
chunk_idx = 0
is_final = False
message_idx = 0
@@ -93,6 +94,7 @@ class N8nServiceAPIRunner(runner.RequestRunner):
else:
chunk_str = str(raw_chunk)
full_text += chunk_str
buffer += chunk_str
# 尝试从 buffer 中循环解析出 JSON 对象(处理多个对象或部分对象)
@@ -115,7 +117,7 @@ class N8nServiceAPIRunner(runner.RequestRunner):
elif obj.get('type') == 'end':
is_final = True
if is_final or chunk_idx % 8 == 0:
if is_final or (chunk_idx > 0 and chunk_idx % 8 == 0):
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
@@ -142,6 +144,7 @@ class N8nServiceAPIRunner(runner.RequestRunner):
obj, _ = decoder.raw_decode(buffer)
if isinstance(obj, dict):
if obj.get('type') == 'item' and 'content' in obj:
chunk_idx += 1
full_content += obj['content']
elif obj.get('type') == 'end':
is_final = True
@@ -156,6 +159,28 @@ class N8nServiceAPIRunner(runner.RequestRunner):
preview = buffer[:200]
self.ap.logger.warning(f'Failed to parse remaining buffer: {e}; buffer preview: {preview}')
# n8n 返回普通 JSON 格式(无任何流式 type:item 内容)
if chunk_idx == 0:
output_content = ''
try:
response_data = json.loads(full_text.strip())
if isinstance(response_data, dict):
if self.output_key in response_data:
output_content = response_data[self.output_key]
else:
output_content = json.dumps(response_data, ensure_ascii=False)
else:
output_content = full_text
except json.JSONDecodeError:
output_content = full_text
self.ap.logger.debug(f'n8n webhook response (non-stream): {full_text[:200]}')
yield provider_message.MessageChunk(
role='assistant',
content=output_content,
is_final=True,
msg_sequence=message_idx + 1,
)
async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用n8n webhook"""
# 生成会话ID如果不存在
@@ -220,49 +245,22 @@ class N8nServiceAPIRunner(runner.RequestRunner):
# 调用webhook
session = httpclient.get_session()
if is_stream:
# 流式请求
async with session.post(
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
async with session.post(
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
# 处理流式响应
async for chunk in self._process_stream_response(response):
async for chunk in self._process_response(response):
if is_stream:
yield chunk
else:
async with session.post(
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
) as response:
try:
async for chunk in self._process_stream_response(response):
output_content = chunk.content if chunk.is_final else ''
except:
# 非流式请求(保持原有逻辑)
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
# 解析响应
response_data = await response.json()
self.ap.logger.debug(f'n8n webhook response: {response_data}')
# 从响应中提取输出
if self.output_key in response_data:
output_content = response_data[self.output_key]
else:
# 如果没有指定的输出键,则使用整个响应
output_content = json.dumps(response_data, ensure_ascii=False)
# 返回消息
yield provider_message.Message(
role='assistant',
content=output_content,
)
elif chunk.is_final:
yield provider_message.Message(
role='assistant',
content=chunk.content,
)
except Exception as e:
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')

View File

@@ -0,0 +1,328 @@
"""
Unit tests for N8nServiceAPIRunner._process_response
Tests cover four scenarios:
- Stream adapter + n8n stream format (type:item/end)
- Stream adapter + n8n plain JSON
- Non-stream adapter + n8n stream format
- Non-stream adapter + n8n plain JSON
"""
from __future__ import annotations
import json
import sys
from unittest.mock import AsyncMock, MagicMock, Mock, patch
# Break the circular import chain before importing n8nsvapi:
# n8nsvapi → runner → app → pipelinemgr → all runners → runner (partially init)
_mock_runner = MagicMock()
_mock_runner.runner_class = lambda name: (lambda cls: cls) # no-op decorator
_mock_runner.RequestRunner = object
sys.modules.setdefault('langbot.pkg.provider.runner', _mock_runner)
sys.modules.setdefault('langbot.pkg.core.app', MagicMock())
sys.modules.setdefault('langbot.pkg.utils.httpclient', MagicMock())
import pytest
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.provider.runners.n8nsvapi import N8nServiceAPIRunner
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_runner(output_key: str = 'response') -> N8nServiceAPIRunner:
ap = Mock()
ap.logger = Mock()
pipeline_config = {
'ai': {
'n8n-service-api': {
'webhook-url': 'http://test-n8n/webhook',
'output-key': output_key,
'auth-type': 'none',
}
}
}
return N8nServiceAPIRunner(ap, pipeline_config)
def make_mock_response(chunks: list[bytes | str], status: int = 200):
"""Build a minimal aiohttp.ClientResponse mock with iter_chunked support."""
response = Mock()
response.status = status
async def iter_chunked(size):
for chunk in chunks:
yield chunk
response.content = Mock()
response.content.iter_chunked = iter_chunked
return response
async def collect_chunks(runner: N8nServiceAPIRunner, chunks: list[bytes | str]):
"""Run _process_response and collect all yielded MessageChunks."""
response = make_mock_response(chunks)
result = []
async for chunk in runner._process_response(response):
result.append(chunk)
return result
# ---------------------------------------------------------------------------
# _process_response: stream format (type:item/end)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_stream_format_single_item():
"""Single item + end in one chunk yields final chunk with full content."""
runner = make_runner()
data = b'{"type":"item","content":"hello"}{"type":"end"}'
chunks = await collect_chunks(runner, [data])
assert len(chunks) >= 1
final = chunks[-1]
assert final.is_final is True
assert final.content == 'hello'
@pytest.mark.asyncio
async def test_stream_format_multi_item_accumulates():
"""Multiple items accumulate into full_content."""
runner = make_runner()
chunks_data = [
b'{"type":"item","content":"foo"}',
b'{"type":"item","content":"bar"}',
b'{"type":"end"}',
]
chunks = await collect_chunks(runner, chunks_data)
final = chunks[-1]
assert final.is_final is True
assert final.content == 'foobar'
@pytest.mark.asyncio
async def test_stream_format_batches_every_8_items():
"""Every 8th item triggers an intermediate yield before the final."""
runner = make_runner()
items = [f'{{"type":"item","content":"{i}"}}' for i in range(8)]
items.append('{"type":"end"}')
data = ''.join(items).encode()
chunks = await collect_chunks(runner, [data])
# At least the batch yield at chunk_idx==8 + final yield
assert len(chunks) >= 2
assert chunks[-1].is_final is True
@pytest.mark.asyncio
async def test_stream_format_split_across_network_chunks():
"""JSON split across multiple network chunks is reassembled correctly."""
runner = make_runner()
part1 = b'{"type":"item","con'
part2 = b'tent":"world"}{"type":"end"}'
chunks = await collect_chunks(runner, [part1, part2])
final = chunks[-1]
assert final.is_final is True
assert final.content == 'world'
@pytest.mark.asyncio
async def test_stream_format_no_spurious_empty_yield():
"""chunk_idx==0 guard prevents spurious empty yield before any item is received."""
runner = make_runner()
# Send some non-stream JSON first, then stream
data = b'{"type":"item","content":"x"}{"type":"end"}'
chunks = await collect_chunks(runner, [data])
# No chunk should have empty content before the real content arrives
non_final = [c for c in chunks if not c.is_final]
for c in non_final:
assert c.content # must be non-empty
# ---------------------------------------------------------------------------
# _process_response: plain JSON fallback
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_plain_json_with_output_key():
"""Plain JSON with matching output_key extracts value via output_key."""
runner = make_runner(output_key='response')
data = json.dumps({'response': 'hello world'}).encode()
chunks = await collect_chunks(runner, [data])
assert len(chunks) == 1
assert chunks[0].is_final is True
assert chunks[0].content == 'hello world'
@pytest.mark.asyncio
async def test_plain_json_output_key_not_found():
"""Plain JSON without output_key falls back to entire JSON string."""
runner = make_runner(output_key='response')
payload = {'other_key': 'hello'}
data = json.dumps(payload).encode()
chunks = await collect_chunks(runner, [data])
assert len(chunks) == 1
assert chunks[0].is_final is True
assert json.loads(chunks[0].content) == payload
@pytest.mark.asyncio
async def test_plain_json_output_key_empty_string():
"""output_key present but value is empty string — returns empty string, not whole JSON."""
runner = make_runner(output_key='response')
data = json.dumps({'response': ''}).encode()
chunks = await collect_chunks(runner, [data])
assert len(chunks) == 1
assert chunks[0].is_final is True
assert chunks[0].content == ''
@pytest.mark.asyncio
async def test_plain_json_non_dict_response():
"""Plain JSON array falls back to raw text."""
runner = make_runner()
data = b'["a", "b"]'
chunks = await collect_chunks(runner, [data])
assert len(chunks) == 1
assert chunks[0].is_final is True
assert chunks[0].content == '["a", "b"]'
@pytest.mark.asyncio
async def test_invalid_json_returns_raw_text():
"""Non-JSON response returns raw text as-is."""
runner = make_runner()
data = b'plain text response'
chunks = await collect_chunks(runner, [data])
assert len(chunks) == 1
assert chunks[0].is_final is True
assert chunks[0].content == 'plain text response'
# ---------------------------------------------------------------------------
# _call_webhook: output type depends on is_stream
# ---------------------------------------------------------------------------
def make_query(is_stream: bool):
"""Build a minimal Query mock."""
query = Mock()
query.adapter = AsyncMock()
query.adapter.is_stream_output_supported = AsyncMock(return_value=is_stream)
session = Mock()
session.using_conversation = Mock()
session.using_conversation.uuid = 'test-uuid'
session.launcher_type = Mock()
session.launcher_type.value = 'person'
session.launcher_id = '12345'
query.session = session
query.user_message = Mock()
query.user_message.content = 'hi'
query.variables = {}
return query
def make_http_session_mock(response_bytes: bytes, status: int = 200):
"""Mock httpclient.get_session() returning a session whose post() yields response_bytes."""
mock_response = make_mock_response([response_bytes], status=status)
mock_response.status = status
mock_cm = AsyncMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_response)
mock_cm.__aexit__ = AsyncMock(return_value=False)
mock_session = Mock()
mock_session.post = Mock(return_value=mock_cm)
return mock_session
@pytest.mark.asyncio
async def test_call_webhook_nonstream_adapter_plain_json():
"""Non-stream adapter + plain JSON → single Message with output_key value."""
runner = make_runner(output_key='response')
query = make_query(is_stream=False)
http_session = make_http_session_mock(json.dumps({'response': 'result text'}).encode())
with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session):
results = []
async for msg in runner._call_webhook(query):
results.append(msg)
assert len(results) == 1
assert isinstance(results[0], provider_message.Message)
assert results[0].content == 'result text'
@pytest.mark.asyncio
async def test_call_webhook_stream_adapter_stream_format():
"""Stream adapter + stream format → MessageChunks, last is_final."""
runner = make_runner()
query = make_query(is_stream=True)
data = b'{"type":"item","content":"hi"}{"type":"end"}'
http_session = make_http_session_mock(data)
with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session):
results = []
async for msg in runner._call_webhook(query):
results.append(msg)
assert all(isinstance(r, provider_message.MessageChunk) for r in results)
assert results[-1].is_final is True
assert results[-1].content == 'hi'
@pytest.mark.asyncio
async def test_call_webhook_stream_adapter_plain_json():
"""Stream adapter + plain JSON → single MessageChunk with is_final=True."""
runner = make_runner(output_key='response')
query = make_query(is_stream=True)
data = json.dumps({'response': 'fallback'}).encode()
http_session = make_http_session_mock(data)
with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session):
results = []
async for msg in runner._call_webhook(query):
results.append(msg)
assert all(isinstance(r, provider_message.MessageChunk) for r in results)
assert results[-1].is_final is True
assert results[-1].content == 'fallback'
@pytest.mark.asyncio
async def test_call_webhook_nonstream_adapter_stream_format():
"""Non-stream adapter + stream format → single Message with accumulated content."""
runner = make_runner()
query = make_query(is_stream=False)
data = b'{"type":"item","content":"foo"}{"type":"item","content":"bar"}{"type":"end"}'
http_session = make_http_session_mock(data)
with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session):
results = []
async for msg in runner._call_webhook(query):
results.append(msg)
assert len(results) == 1
assert isinstance(results[0], provider_message.Message)
assert results[0].content == 'foobar'