From 58e4b3577089dfb6915d193c93040e1ccb52822d Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <1051233107@qq.com> Date: Fri, 29 May 2026 21:05:20 +0800 Subject: [PATCH] fix(agent-runner): stabilize event context and streams --- .../pkg/agent/runner/context_builder.py | 2 +- .../pkg/agent/runner/pipeline_adapter.py | 65 +++++++++++++----- src/langbot/pkg/plugin/handler.py | 2 + .../modelmgr/requesters/bailianchatcmpl.py | 3 +- .../provider/modelmgr/requesters/chatcmpl.py | 3 +- .../modelmgr/requesters/geminichatcmpl.py | 3 +- .../modelmgr/requesters/jiekouaichatcmpl.py | 5 +- .../modelmgr/requesters/modelscopechatcmpl.py | 3 +- .../modelmgr/requesters/ppiochatcmpl.py | 5 +- .../agent/test_context_validation.py | 67 ++++++++++++++++--- .../agent/test_event_first_protocol.py | 52 ++++++++------ .../unit_tests/plugin/test_handler_actions.py | 47 +++++++++++++ .../requesters/test_chatcmpl_errors_direct.py | 31 +++++++++ 13 files changed, 232 insertions(+), 56 deletions(-) diff --git a/src/langbot/pkg/agent/runner/context_builder.py b/src/langbot/pkg/agent/runner/context_builder.py index da7321e4..a6be6940 100644 --- a/src/langbot/pkg/agent/runner/context_builder.py +++ b/src/langbot/pkg/agent/runner/context_builder.py @@ -229,7 +229,7 @@ class AgentRunContextBuilder: subject_context = { 'subject_type': event.subject.subject_type, 'subject_id': event.subject.subject_id, - 'subject_data': event.subject.data, + 'data': event.subject.data, } # Build input from event diff --git a/src/langbot/pkg/agent/runner/pipeline_adapter.py b/src/langbot/pkg/agent/runner/pipeline_adapter.py index cbabebaa..ca0d67ae 100644 --- a/src/langbot/pkg/agent/runner/pipeline_adapter.py +++ b/src/langbot/pkg/agent/runner/pipeline_adapter.py @@ -5,8 +5,8 @@ Protocol v1 architecture. """ from __future__ import annotations +import hashlib import typing -import time from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query from langbot_plugin.api.entities.builtin.platform import message as platform_message @@ -19,7 +19,6 @@ from langbot_plugin.api.entities.builtin.agent_runner.event import ( ) from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext -from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger from .host_models import ( AgentEventEnvelope, @@ -305,8 +304,9 @@ class PipelineAdapter: if isinstance(event_time, (int, float)): event_time = int(event_time) + source_event_id = str(message_id or query.query_id) return AgentEventContext( - event_id=str(message_id or query.query_id), + event_id=cls._build_scoped_event_id(query, source_event_id, event_time), event_type=runner_events.MESSAGE_RECEIVED, event_time=event_time, source="pipeline_adapter", @@ -314,20 +314,36 @@ class PipelineAdapter: data=event_data, ) + @classmethod + def _build_scoped_event_id( + cls, + query: pipeline_query.Query, + source_event_id: str, + event_time: int | None, + ) -> str: + """Build a globally unique host event id from pipeline-local ids.""" + launcher_type = getattr(query, 'launcher_type', None) + launcher_type_value = getattr(launcher_type, 'value', launcher_type) if launcher_type is not None else None + scope_parts = [ + 'pipeline_adapter', + getattr(query, 'pipeline_uuid', None), + getattr(query, 'bot_uuid', None), + launcher_type_value, + getattr(query, 'launcher_id', None), + getattr(query, 'sender_id', None), + source_event_id, + event_time, + ] + scoped = '|'.join('' if part is None else str(part) for part in scope_parts) + digest = hashlib.sha256(scoped.encode('utf-8')).hexdigest()[:32] + return f'pipeline:{digest}' + @classmethod def _build_conversation_context( cls, query: pipeline_query.Query, ) -> ConversationContext: """Build ConversationContext from Query.""" - # Handle session and conversation_id - conversation_id = None - session = getattr(query, 'session', None) - if session: - conversation = getattr(session, 'using_conversation', None) - if conversation: - conversation_id = getattr(conversation, 'uuid', None) - # Handle launcher_type safely launcher_type = getattr(query, 'launcher_type', None) launcher_type_value = None @@ -337,6 +353,26 @@ class PipelineAdapter: # Handle launcher_id launcher_id = getattr(query, 'launcher_id', None) + # Build session_id from launcher info if available + session_id = None + if launcher_type_value and launcher_id: + session_id = f'{launcher_type_value}_{launcher_id}' + + # Handle session and conversation_id + conversation_id = None + session = getattr(query, 'session', None) + if session: + conversation = getattr(session, 'using_conversation', None) + if conversation: + conversation_id = getattr(conversation, 'uuid', None) + + if not conversation_id: + variables = getattr(query, 'variables', None) or {} + conversation_id = variables.get('conversation_id') or None + + if not conversation_id: + conversation_id = session_id + # Handle sender_id sender_id = getattr(query, 'sender_id', None) if sender_id is not None: @@ -348,13 +384,8 @@ class PipelineAdapter: # Handle pipeline_uuid pipeline_uuid = getattr(query, 'pipeline_uuid', None) - # Build session_id from launcher info if available - session_id = None - if launcher_type_value and launcher_id: - session_id = f'{launcher_type_value}_{launcher_id}' - return ConversationContext( - conversation_id=conversation_id, + conversation_id=str(conversation_id) if conversation_id is not None else None, thread_id=None, launcher_type=launcher_type_value, launcher_id=launcher_id, diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index d83726e4..312d2877 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -674,6 +674,8 @@ class RuntimeConnectionHandler(handler.Handler): extra_args=effective_extra_args, remove_think=remove_think, ): + if chunk is None: + continue yield handler.ActionResponse.success( data={ 'chunk': chunk.model_dump(), diff --git a/src/langbot/pkg/provider/modelmgr/requesters/bailianchatcmpl.py b/src/langbot/pkg/provider/modelmgr/requesters/bailianchatcmpl.py index 9da6e1b4..6b0da927 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/bailianchatcmpl.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/bailianchatcmpl.py @@ -171,7 +171,8 @@ class BailianChatCompletions(modelscopechatcmpl.ModelScopeChatCompletions): # 解析 chunk 数据 if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + delta_obj = getattr(choice, 'delta', None) + delta = delta_obj.model_dump() if delta_obj is not None else {} finish_reason = getattr(choice, 'finish_reason', None) else: delta = {} diff --git a/src/langbot/pkg/provider/modelmgr/requesters/chatcmpl.py b/src/langbot/pkg/provider/modelmgr/requesters/chatcmpl.py index e63e362b..ff5e6027 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/chatcmpl.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/chatcmpl.py @@ -359,7 +359,8 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester): if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + delta_obj = getattr(choice, 'delta', None) + delta = delta_obj.model_dump() if delta_obj is not None else {} finish_reason = getattr(choice, 'finish_reason', None) else: diff --git a/src/langbot/pkg/provider/modelmgr/requesters/geminichatcmpl.py b/src/langbot/pkg/provider/modelmgr/requesters/geminichatcmpl.py index 956b49f6..149a7356 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/geminichatcmpl.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/geminichatcmpl.py @@ -132,7 +132,8 @@ class GeminiChatCompletions(chatcmpl.OpenAIChatCompletions): if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + delta_obj = getattr(choice, 'delta', None) + delta = delta_obj.model_dump() if delta_obj is not None else {} finish_reason = getattr(choice, 'finish_reason', None) else: diff --git a/src/langbot/pkg/provider/modelmgr/requesters/jiekouaichatcmpl.py b/src/langbot/pkg/provider/modelmgr/requesters/jiekouaichatcmpl.py index 305ae21f..67a45651 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/jiekouaichatcmpl.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/jiekouaichatcmpl.py @@ -144,7 +144,8 @@ class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions): # 解析 chunk 数据 if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + delta_obj = getattr(choice, 'delta', None) + delta = delta_obj.model_dump() if delta_obj is not None else {} finish_reason = getattr(choice, 'finish_reason', None) else: delta = {} @@ -159,7 +160,7 @@ class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions): # reasoning_content = delta.get('reasoning_content', '') if remove_think: - if delta['content'] is not None: + if delta.get('content') is not None: if '' in delta['content'] and not thinking_started and not thinking_ended: thinking_started = True continue diff --git a/src/langbot/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py b/src/langbot/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py index c98a71d7..479e6775 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/modelscopechatcmpl.py @@ -391,7 +391,8 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester): # 解析 chunk 数据 if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + delta_obj = getattr(choice, 'delta', None) + delta = delta_obj.model_dump() if delta_obj is not None else {} finish_reason = getattr(choice, 'finish_reason', None) else: delta = {} diff --git a/src/langbot/pkg/provider/modelmgr/requesters/ppiochatcmpl.py b/src/langbot/pkg/provider/modelmgr/requesters/ppiochatcmpl.py index 1836bd62..594cb841 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/ppiochatcmpl.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/ppiochatcmpl.py @@ -144,7 +144,8 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): # 解析 chunk 数据 if hasattr(chunk, 'choices') and chunk.choices: choice = chunk.choices[0] - delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} + delta_obj = getattr(choice, 'delta', None) + delta = delta_obj.model_dump() if delta_obj is not None else {} finish_reason = getattr(choice, 'finish_reason', None) else: delta = {} @@ -159,7 +160,7 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions): # reasoning_content = delta.get('reasoning_content', '') if remove_think: - if delta['content'] is not None: + if delta.get('content') is not None: if '' in delta['content'] and not thinking_started and not thinking_ended: thinking_started = True continue diff --git a/tests/unit_tests/agent/test_context_validation.py b/tests/unit_tests/agent/test_context_validation.py index 480b64b3..0ea003a8 100644 --- a/tests/unit_tests/agent/test_context_validation.py +++ b/tests/unit_tests/agent/test_context_validation.py @@ -3,28 +3,20 @@ from __future__ import annotations import pytest from unittest.mock import MagicMock, AsyncMock, patch -import uuid # SDK imports for validation from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext from langbot_plugin.api.entities.builtin.agent_runner.event import AgentEventContext from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext from langbot_plugin.api.entities.builtin.agent_runner.context_access import ContextAccess -from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext -from langbot_plugin.api.entities.builtin.agent_runner.state import AgentRunState # LangBot imports from langbot.pkg.agent.runner.context_builder import ( AgentRunContextBuilder, - AgentTrigger as BuilderTrigger, - ConversationContext as BuilderConversation, - AgentInput as BuilderInput, - AgentRunState as BuilderState, AgentResources as BuilderResources, - AgentRuntimeContext as BuilderRuntime, ) from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding, BindingScope from langbot.pkg.core import app @@ -171,6 +163,65 @@ class TestContextValidation: # Verify input assert validated.input.text == "Hello world" + @pytest.mark.asyncio + async def test_build_context_preserves_subject_data_for_non_message_events(self): + """Non-message EBA events keep subject.data instead of relying on message text.""" + from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext, SubjectContext + from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput as EventInput + from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext + + mock_app = self._make_mock_app() + builder = AgentRunContextBuilder(mock_app) + event = AgentEventEnvelope( + event_id="evt_recall_1", + event_type="message.recalled", + event_time=1700000001, + source="platform", + source_event_type="platform.message.recall", + bot_id="bot_1", + workspace_id="workspace_1", + conversation_id="conv_1", + actor=ActorContext(actor_type="user", actor_id="user_1"), + subject=SubjectContext( + subject_type="message", + subject_id="message_1", + data={"recalled_message_id": "message_1", "reason": "user_recall"}, + ), + input=EventInput(text=None), + delivery=DeliveryContext(surface="test"), + data={"source_event_id": "source_recall_1"}, + ) + binding = self._make_binding() + binding.event_types = ["message.recalled"] + resources = self._make_resources() + descriptor = self._make_descriptor() + + with patch('langbot.pkg.agent.runner.context_builder.get_persistent_state_store') as mock_get_store: + mock_store = AsyncMock() + mock_store.build_snapshot_from_event = AsyncMock(return_value={ + 'conversation': {}, + 'actor': {}, + 'subject': {}, + 'runner': {}, + }) + mock_get_store.return_value = mock_store + + context_dict = await builder.build_context_from_event( + event=event, + binding=binding, + descriptor=descriptor, + resources=resources, + ) + + validated = AgentRunContext.model_validate(context_dict) + + assert validated.event.event_type == "message.recalled" + assert validated.input.text is None + assert validated.subject is not None + assert validated.subject.subject_type == "message" + assert validated.subject.subject_id == "message_1" + assert validated.subject.data == {"recalled_message_id": "message_1", "reason": "user_recall"} + @pytest.mark.asyncio async def test_build_context_from_event_has_no_legacy_top_level_fields(self): """Test that build_context_from_event does NOT have top-level messages/prompt/params.""" diff --git a/tests/unit_tests/agent/test_event_first_protocol.py b/tests/unit_tests/agent/test_event_first_protocol.py index 6dee9c30..a58c1848 100644 --- a/tests/unit_tests/agent/test_event_first_protocol.py +++ b/tests/unit_tests/agent/test_event_first_protocol.py @@ -10,15 +10,11 @@ Tests cover: from __future__ import annotations import pytest -from unittest.mock import Mock, MagicMock, patch -import typing +from unittest.mock import Mock # Import SDK entities from langbot_plugin.api.entities.builtin.agent_runner.event import ( AgentEventContext, - ConversationContext, - ActorContext, - SubjectContext, ) from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger @@ -33,22 +29,8 @@ from langbot_plugin.api.entities.builtin.agent_runner.capabilities import ( from langbot_plugin.api.entities.builtin.agent_runner.permissions import ( AgentRunnerPermissions, ) -from langbot_plugin.api.entities.builtin.agent_runner.context_policy import ( - AgentRunnerContextPolicy, -) -from langbot_plugin.api.entities.builtin.agent_runner.manifest import ( - AgentRunnerManifest, -) # Import LangBot host models -from langbot.pkg.agent.runner.host_models import ( - AgentEventEnvelope, - AgentBinding, - BindingScope, - ResourcePolicy, - StatePolicy, - DeliveryPolicy, -) from langbot.pkg.agent.runner.pipeline_adapter import PipelineAdapter @@ -76,9 +58,24 @@ class TestPipelineQueryToEventEnvelope: """Test conversation context extraction.""" event = PipelineAdapter.query_to_event(mock_query) - # Conversation may be None if no session - if event.conversation_id: - assert event.conversation_id is not None + assert event.conversation_id == "conv-uuid-123" + + def test_query_to_event_prefers_variable_conversation_id_when_conversation_uuid_missing(self, mock_query): + """Pipeline variables can provide the conversation identity for state scope.""" + mock_query.session.using_conversation.uuid = None + mock_query.variables["conversation_id"] = "conv-from-vars" + + event = PipelineAdapter.query_to_event(mock_query) + + assert event.conversation_id == "conv-from-vars" + + def test_query_to_event_falls_back_to_launcher_session_for_state_scope(self, mock_query): + """Debug Chat and legacy pipeline runs may not have a conversation UUID.""" + mock_query.session.using_conversation.uuid = None + + event = PipelineAdapter.query_to_event(mock_query) + + assert event.conversation_id == "person_launcher-123" def test_query_to_event_delivery_context(self, mock_query): """Test delivery context extraction.""" @@ -118,6 +115,17 @@ class TestPipelineQueryToEventEnvelope: assert event.delivery.reply_target == {"message_id": None} + def test_query_to_event_scopes_pipeline_local_event_ids(self, mock_query): + """Pipeline-local message IDs must not become global audit IDs.""" + first = PipelineAdapter.query_to_event(mock_query) + + mock_query.launcher_id = "launcher-456" + second = PipelineAdapter.query_to_event(mock_query) + + assert first.event_id.startswith("pipeline:") + assert first.event_id != "789" + assert second.event_id != first.event_id + class TestPipelineConfigToBinding: """Test Pipeline config -> AgentBinding conversion.""" diff --git a/tests/unit_tests/plugin/test_handler_actions.py b/tests/unit_tests/plugin/test_handler_actions.py index 29e6a0cb..60b52552 100644 --- a/tests/unit_tests/plugin/test_handler_actions.py +++ b/tests/unit_tests/plugin/test_handler_actions.py @@ -511,6 +511,53 @@ class TestAgentRunProxyActions: assert provider.kwargs['remove_think'] is True assert provider.kwargs['funcs'] == [] + @pytest.mark.asyncio + async def test_invoke_llm_stream_skips_none_chunks(self, app): + """INVOKE_LLM_STREAM tolerates provider heartbeat/no-op chunks.""" + from langbot.pkg.agent.runner.session_registry import get_session_registry + + class StreamProvider: + async def invoke_llm_stream(self, **kwargs): + yield provider_message.MessageChunk(role='assistant', content='ok') + yield None + yield provider_message.MessageChunk(role='assistant', content=' done', is_final=True) + + run_id = 'run_proxy_invoke_llm_stream_none_chunks' + query = self.query() + app.query_pool.cached_queries[904] = query + + registry = get_session_registry() + await registry.unregister(run_id) + await registry.register( + run_id=run_id, + runner_id='plugin:test/runner/default', + query_id=904, + plugin_identity='test/runner', + resources=make_agent_resources(models=[{'model_id': 'llm_stream_002'}]), + ) + + model = SimpleNamespace( + model_entity=SimpleNamespace(abilities=[], extra_args={}), + provider=StreamProvider(), + ) + app.model_mgr.get_model_by_uuid.return_value = model + runtime_handler = make_handler(app) + + responses = [] + try: + stream = runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM_STREAM.value]({ + 'run_id': run_id, + 'llm_model_uuid': 'llm_stream_002', + 'messages': [{'role': 'user', 'content': 'hello'}], + }) + async for response in stream: + responses.append(response) + finally: + await registry.unregister(run_id) + + assert [response.code for response in responses] == [0, 0] + assert [response.data['chunk']['content'] for response in responses] == ['ok', ' done'] + @pytest.mark.asyncio async def test_call_tool_passes_current_query(self, app): """CALL_TOOL passes the current Query back into tool execution.""" diff --git a/tests/unit_tests/provider/requesters/test_chatcmpl_errors_direct.py b/tests/unit_tests/provider/requesters/test_chatcmpl_errors_direct.py index c51476c2..946c32a4 100644 --- a/tests/unit_tests/provider/requesters/test_chatcmpl_errors_direct.py +++ b/tests/unit_tests/provider/requesters/test_chatcmpl_errors_direct.py @@ -7,6 +7,7 @@ only the necessary dependencies. from __future__ import annotations import asyncio +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest import openai # Import real openai package @@ -140,6 +141,36 @@ class TestInvokeLLMErrorHandling: assert '频繁' in str(exc.value) or '余额' in str(exc.value) + @pytest.mark.asyncio + async def test_stream_skips_choice_with_none_delta(self, requester_with_mocked_client, mock_model): + """OpenAI-compatible streams may include choice frames with delta=None.""" + + async def fake_req_stream(args, extra_body=None): + yield SimpleNamespace(choices=[SimpleNamespace(delta=None, finish_reason=None)]) + yield SimpleNamespace( + choices=[ + SimpleNamespace( + delta=SimpleNamespace(model_dump=lambda: {'role': 'assistant', 'content': 'OK'}), + finish_reason='stop', + ) + ] + ) + + requester_with_mocked_client._req_stream = fake_req_stream + + chunks = [ + chunk + async for chunk in requester_with_mocked_client._closure_stream( + query=None, + req_messages=[{'role': 'user', 'content': 'hello'}], + use_model=mock_model, + ) + ] + + assert len(chunks) == 1 + assert chunks[0].content == 'OK' + assert chunks[0].is_final is True + class TestInvokeEmbeddingErrorHandling: """Tests for invoke_embedding error handling."""