From c8915ca9648427d573d466e671b70b41f3ef180a Mon Sep 17 00:00:00 2001 From: hzhhong Date: Thu, 16 Apr 2026 22:15:57 +0800 Subject: [PATCH] fix(n8n-runner): fix output_key not applied when n8n returns plain JSON (#2119) --- src/langbot/pkg/provider/runners/n8nsvapi.py | 86 +++-- tests/unit_tests/pipeline/test_n8nsvapi.py | 328 +++++++++++++++++++ 2 files changed, 370 insertions(+), 44 deletions(-) create mode 100644 tests/unit_tests/pipeline/test_n8nsvapi.py diff --git a/src/langbot/pkg/provider/runners/n8nsvapi.py b/src/langbot/pkg/provider/runners/n8nsvapi.py index d177d6b8..f8238eb6 100644 --- a/src/langbot/pkg/provider/runners/n8nsvapi.py +++ b/src/langbot/pkg/provider/runners/n8nsvapi.py @@ -70,11 +70,12 @@ class N8nServiceAPIRunner(runner.RequestRunner): return plain_text - async def _process_stream_response( + async def _process_response( self, response: aiohttp.ClientResponse ) -> typing.AsyncGenerator[provider_message.Message, None]: - """处理流式响应——支持部分 JSON 和多个 JSON 对象在同一 chunk 的情况""" + """处理响应——支持流式格式和普通 JSON 格式""" full_content = '' + full_text = '' chunk_idx = 0 is_final = False message_idx = 0 @@ -93,6 +94,7 @@ class N8nServiceAPIRunner(runner.RequestRunner): else: chunk_str = str(raw_chunk) + full_text += chunk_str buffer += chunk_str # 尝试从 buffer 中循环解析出 JSON 对象(处理多个对象或部分对象) @@ -115,7 +117,7 @@ class N8nServiceAPIRunner(runner.RequestRunner): elif obj.get('type') == 'end': is_final = True - if is_final or chunk_idx % 8 == 0: + if is_final or (chunk_idx > 0 and chunk_idx % 8 == 0): message_idx += 1 yield provider_message.MessageChunk( role='assistant', @@ -142,6 +144,7 @@ class N8nServiceAPIRunner(runner.RequestRunner): obj, _ = decoder.raw_decode(buffer) if isinstance(obj, dict): if obj.get('type') == 'item' and 'content' in obj: + chunk_idx += 1 full_content += obj['content'] elif obj.get('type') == 'end': is_final = True @@ -156,6 +159,28 @@ class N8nServiceAPIRunner(runner.RequestRunner): preview = buffer[:200] self.ap.logger.warning(f'Failed to parse remaining buffer: {e}; buffer preview: {preview}') + # n8n 返回普通 JSON 格式(无任何流式 type:item 内容) + if chunk_idx == 0: + output_content = '' + try: + response_data = json.loads(full_text.strip()) + if isinstance(response_data, dict): + if self.output_key in response_data: + output_content = response_data[self.output_key] + else: + output_content = json.dumps(response_data, ensure_ascii=False) + else: + output_content = full_text + except json.JSONDecodeError: + output_content = full_text + self.ap.logger.debug(f'n8n webhook response (non-stream): {full_text[:200]}') + yield provider_message.MessageChunk( + role='assistant', + content=output_content, + is_final=True, + msg_sequence=message_idx + 1, + ) + async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: """调用n8n webhook""" # 生成会话ID(如果不存在) @@ -220,49 +245,22 @@ class N8nServiceAPIRunner(runner.RequestRunner): # 调用webhook session = httpclient.get_session() - if is_stream: - # 流式请求 - async with session.post( - self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout - ) as response: - if response.status != 200: - error_text = await response.text() - self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') - raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') + async with session.post( + self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout + ) as response: + if response.status != 200: + error_text = await response.text() + self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') + raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') - # 处理流式响应 - async for chunk in self._process_stream_response(response): + async for chunk in self._process_response(response): + if is_stream: yield chunk - else: - async with session.post( - self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout - ) as response: - try: - async for chunk in self._process_stream_response(response): - output_content = chunk.content if chunk.is_final else '' - except: - # 非流式请求(保持原有逻辑) - if response.status != 200: - error_text = await response.text() - self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') - raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') - - # 解析响应 - response_data = await response.json() - self.ap.logger.debug(f'n8n webhook response: {response_data}') - - # 从响应中提取输出 - if self.output_key in response_data: - output_content = response_data[self.output_key] - else: - # 如果没有指定的输出键,则使用整个响应 - output_content = json.dumps(response_data, ensure_ascii=False) - - # 返回消息 - yield provider_message.Message( - role='assistant', - content=output_content, - ) + elif chunk.is_final: + yield provider_message.Message( + role='assistant', + content=chunk.content, + ) except Exception as e: self.ap.logger.error(f'n8n webhook call exception: {str(e)}') raise N8nAPIError(f'n8n webhook call exception: {str(e)}') diff --git a/tests/unit_tests/pipeline/test_n8nsvapi.py b/tests/unit_tests/pipeline/test_n8nsvapi.py new file mode 100644 index 00000000..68f3cdcc --- /dev/null +++ b/tests/unit_tests/pipeline/test_n8nsvapi.py @@ -0,0 +1,328 @@ +""" +Unit tests for N8nServiceAPIRunner._process_response + +Tests cover four scenarios: +- Stream adapter + n8n stream format (type:item/end) +- Stream adapter + n8n plain JSON +- Non-stream adapter + n8n stream format +- Non-stream adapter + n8n plain JSON +""" + +from __future__ import annotations + +import json +import sys +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +# Break the circular import chain before importing n8nsvapi: +# n8nsvapi → runner → app → pipelinemgr → all runners → runner (partially init) +_mock_runner = MagicMock() +_mock_runner.runner_class = lambda name: (lambda cls: cls) # no-op decorator +_mock_runner.RequestRunner = object +sys.modules.setdefault('langbot.pkg.provider.runner', _mock_runner) +sys.modules.setdefault('langbot.pkg.core.app', MagicMock()) +sys.modules.setdefault('langbot.pkg.utils.httpclient', MagicMock()) + +import pytest +import langbot_plugin.api.entities.builtin.provider.message as provider_message +from langbot.pkg.provider.runners.n8nsvapi import N8nServiceAPIRunner + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_runner(output_key: str = 'response') -> N8nServiceAPIRunner: + ap = Mock() + ap.logger = Mock() + pipeline_config = { + 'ai': { + 'n8n-service-api': { + 'webhook-url': 'http://test-n8n/webhook', + 'output-key': output_key, + 'auth-type': 'none', + } + } + } + return N8nServiceAPIRunner(ap, pipeline_config) + + +def make_mock_response(chunks: list[bytes | str], status: int = 200): + """Build a minimal aiohttp.ClientResponse mock with iter_chunked support.""" + response = Mock() + response.status = status + + async def iter_chunked(size): + for chunk in chunks: + yield chunk + + response.content = Mock() + response.content.iter_chunked = iter_chunked + return response + + +async def collect_chunks(runner: N8nServiceAPIRunner, chunks: list[bytes | str]): + """Run _process_response and collect all yielded MessageChunks.""" + response = make_mock_response(chunks) + result = [] + async for chunk in runner._process_response(response): + result.append(chunk) + return result + + +# --------------------------------------------------------------------------- +# _process_response: stream format (type:item/end) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_stream_format_single_item(): + """Single item + end in one chunk yields final chunk with full content.""" + runner = make_runner() + data = b'{"type":"item","content":"hello"}{"type":"end"}' + + chunks = await collect_chunks(runner, [data]) + + assert len(chunks) >= 1 + final = chunks[-1] + assert final.is_final is True + assert final.content == 'hello' + + +@pytest.mark.asyncio +async def test_stream_format_multi_item_accumulates(): + """Multiple items accumulate into full_content.""" + runner = make_runner() + chunks_data = [ + b'{"type":"item","content":"foo"}', + b'{"type":"item","content":"bar"}', + b'{"type":"end"}', + ] + + chunks = await collect_chunks(runner, chunks_data) + + final = chunks[-1] + assert final.is_final is True + assert final.content == 'foobar' + + +@pytest.mark.asyncio +async def test_stream_format_batches_every_8_items(): + """Every 8th item triggers an intermediate yield before the final.""" + runner = make_runner() + items = [f'{{"type":"item","content":"{i}"}}' for i in range(8)] + items.append('{"type":"end"}') + data = ''.join(items).encode() + + chunks = await collect_chunks(runner, [data]) + + # At least the batch yield at chunk_idx==8 + final yield + assert len(chunks) >= 2 + assert chunks[-1].is_final is True + + +@pytest.mark.asyncio +async def test_stream_format_split_across_network_chunks(): + """JSON split across multiple network chunks is reassembled correctly.""" + runner = make_runner() + part1 = b'{"type":"item","con' + part2 = b'tent":"world"}{"type":"end"}' + + chunks = await collect_chunks(runner, [part1, part2]) + + final = chunks[-1] + assert final.is_final is True + assert final.content == 'world' + + +@pytest.mark.asyncio +async def test_stream_format_no_spurious_empty_yield(): + """chunk_idx==0 guard prevents spurious empty yield before any item is received.""" + runner = make_runner() + # Send some non-stream JSON first, then stream + data = b'{"type":"item","content":"x"}{"type":"end"}' + + chunks = await collect_chunks(runner, [data]) + + # No chunk should have empty content before the real content arrives + non_final = [c for c in chunks if not c.is_final] + for c in non_final: + assert c.content # must be non-empty + + +# --------------------------------------------------------------------------- +# _process_response: plain JSON fallback +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_plain_json_with_output_key(): + """Plain JSON with matching output_key extracts value via output_key.""" + runner = make_runner(output_key='response') + data = json.dumps({'response': 'hello world'}).encode() + + chunks = await collect_chunks(runner, [data]) + + assert len(chunks) == 1 + assert chunks[0].is_final is True + assert chunks[0].content == 'hello world' + + +@pytest.mark.asyncio +async def test_plain_json_output_key_not_found(): + """Plain JSON without output_key falls back to entire JSON string.""" + runner = make_runner(output_key='response') + payload = {'other_key': 'hello'} + data = json.dumps(payload).encode() + + chunks = await collect_chunks(runner, [data]) + + assert len(chunks) == 1 + assert chunks[0].is_final is True + assert json.loads(chunks[0].content) == payload + + +@pytest.mark.asyncio +async def test_plain_json_output_key_empty_string(): + """output_key present but value is empty string — returns empty string, not whole JSON.""" + runner = make_runner(output_key='response') + data = json.dumps({'response': ''}).encode() + + chunks = await collect_chunks(runner, [data]) + + assert len(chunks) == 1 + assert chunks[0].is_final is True + assert chunks[0].content == '' + + +@pytest.mark.asyncio +async def test_plain_json_non_dict_response(): + """Plain JSON array falls back to raw text.""" + runner = make_runner() + data = b'["a", "b"]' + + chunks = await collect_chunks(runner, [data]) + + assert len(chunks) == 1 + assert chunks[0].is_final is True + assert chunks[0].content == '["a", "b"]' + + +@pytest.mark.asyncio +async def test_invalid_json_returns_raw_text(): + """Non-JSON response returns raw text as-is.""" + runner = make_runner() + data = b'plain text response' + + chunks = await collect_chunks(runner, [data]) + + assert len(chunks) == 1 + assert chunks[0].is_final is True + assert chunks[0].content == 'plain text response' + + +# --------------------------------------------------------------------------- +# _call_webhook: output type depends on is_stream +# --------------------------------------------------------------------------- + +def make_query(is_stream: bool): + """Build a minimal Query mock.""" + query = Mock() + query.adapter = AsyncMock() + query.adapter.is_stream_output_supported = AsyncMock(return_value=is_stream) + + session = Mock() + session.using_conversation = Mock() + session.using_conversation.uuid = 'test-uuid' + session.launcher_type = Mock() + session.launcher_type.value = 'person' + session.launcher_id = '12345' + query.session = session + + query.user_message = Mock() + query.user_message.content = 'hi' + query.variables = {} + return query + + +def make_http_session_mock(response_bytes: bytes, status: int = 200): + """Mock httpclient.get_session() returning a session whose post() yields response_bytes.""" + mock_response = make_mock_response([response_bytes], status=status) + mock_response.status = status + + mock_cm = AsyncMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_response) + mock_cm.__aexit__ = AsyncMock(return_value=False) + + mock_session = Mock() + mock_session.post = Mock(return_value=mock_cm) + return mock_session + + +@pytest.mark.asyncio +async def test_call_webhook_nonstream_adapter_plain_json(): + """Non-stream adapter + plain JSON → single Message with output_key value.""" + runner = make_runner(output_key='response') + query = make_query(is_stream=False) + http_session = make_http_session_mock(json.dumps({'response': 'result text'}).encode()) + + with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session): + results = [] + async for msg in runner._call_webhook(query): + results.append(msg) + + assert len(results) == 1 + assert isinstance(results[0], provider_message.Message) + assert results[0].content == 'result text' + + +@pytest.mark.asyncio +async def test_call_webhook_stream_adapter_stream_format(): + """Stream adapter + stream format → MessageChunks, last is_final.""" + runner = make_runner() + query = make_query(is_stream=True) + data = b'{"type":"item","content":"hi"}{"type":"end"}' + http_session = make_http_session_mock(data) + + with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session): + results = [] + async for msg in runner._call_webhook(query): + results.append(msg) + + assert all(isinstance(r, provider_message.MessageChunk) for r in results) + assert results[-1].is_final is True + assert results[-1].content == 'hi' + + +@pytest.mark.asyncio +async def test_call_webhook_stream_adapter_plain_json(): + """Stream adapter + plain JSON → single MessageChunk with is_final=True.""" + runner = make_runner(output_key='response') + query = make_query(is_stream=True) + data = json.dumps({'response': 'fallback'}).encode() + http_session = make_http_session_mock(data) + + with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session): + results = [] + async for msg in runner._call_webhook(query): + results.append(msg) + + assert all(isinstance(r, provider_message.MessageChunk) for r in results) + assert results[-1].is_final is True + assert results[-1].content == 'fallback' + + +@pytest.mark.asyncio +async def test_call_webhook_nonstream_adapter_stream_format(): + """Non-stream adapter + stream format → single Message with accumulated content.""" + runner = make_runner() + query = make_query(is_stream=False) + data = b'{"type":"item","content":"foo"}{"type":"item","content":"bar"}{"type":"end"}' + http_session = make_http_session_mock(data) + + with patch('langbot.pkg.provider.runners.n8nsvapi.httpclient.get_session', return_value=http_session): + results = [] + async for msg in runner._call_webhook(query): + results.append(msg) + + assert len(results) == 1 + assert isinstance(results[0], provider_message.Message) + assert results[0].content == 'foobar'