fix(agent-runner): stabilize event context and streams

This commit is contained in:
huanghuoguoguo
2026-05-29 21:05:20 +08:00
parent 056e62aa03
commit 58e4b35770
13 changed files with 232 additions and 56 deletions

View File

@@ -229,7 +229,7 @@ class AgentRunContextBuilder:
subject_context = { subject_context = {
'subject_type': event.subject.subject_type, 'subject_type': event.subject.subject_type,
'subject_id': event.subject.subject_id, 'subject_id': event.subject.subject_id,
'subject_data': event.subject.data, 'data': event.subject.data,
} }
# Build input from event # Build input from event

View File

@@ -5,8 +5,8 @@ Protocol v1 architecture.
""" """
from __future__ import annotations from __future__ import annotations
import hashlib
import typing import typing
import time
from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query from langbot_plugin.api.entities.builtin.pipeline import query as pipeline_query
from langbot_plugin.api.entities.builtin.platform import message as platform_message from langbot_plugin.api.entities.builtin.platform import message as platform_message
@@ -19,7 +19,6 @@ from langbot_plugin.api.entities.builtin.agent_runner.event import (
) )
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger
from .host_models import ( from .host_models import (
AgentEventEnvelope, AgentEventEnvelope,
@@ -305,8 +304,9 @@ class PipelineAdapter:
if isinstance(event_time, (int, float)): if isinstance(event_time, (int, float)):
event_time = int(event_time) event_time = int(event_time)
source_event_id = str(message_id or query.query_id)
return AgentEventContext( return AgentEventContext(
event_id=str(message_id or query.query_id), event_id=cls._build_scoped_event_id(query, source_event_id, event_time),
event_type=runner_events.MESSAGE_RECEIVED, event_type=runner_events.MESSAGE_RECEIVED,
event_time=event_time, event_time=event_time,
source="pipeline_adapter", source="pipeline_adapter",
@@ -314,20 +314,36 @@ class PipelineAdapter:
data=event_data, data=event_data,
) )
@classmethod
def _build_scoped_event_id(
cls,
query: pipeline_query.Query,
source_event_id: str,
event_time: int | None,
) -> str:
"""Build a globally unique host event id from pipeline-local ids."""
launcher_type = getattr(query, 'launcher_type', None)
launcher_type_value = getattr(launcher_type, 'value', launcher_type) if launcher_type is not None else None
scope_parts = [
'pipeline_adapter',
getattr(query, 'pipeline_uuid', None),
getattr(query, 'bot_uuid', None),
launcher_type_value,
getattr(query, 'launcher_id', None),
getattr(query, 'sender_id', None),
source_event_id,
event_time,
]
scoped = '|'.join('' if part is None else str(part) for part in scope_parts)
digest = hashlib.sha256(scoped.encode('utf-8')).hexdigest()[:32]
return f'pipeline:{digest}'
@classmethod @classmethod
def _build_conversation_context( def _build_conversation_context(
cls, cls,
query: pipeline_query.Query, query: pipeline_query.Query,
) -> ConversationContext: ) -> ConversationContext:
"""Build ConversationContext from Query.""" """Build ConversationContext from Query."""
# Handle session and conversation_id
conversation_id = None
session = getattr(query, 'session', None)
if session:
conversation = getattr(session, 'using_conversation', None)
if conversation:
conversation_id = getattr(conversation, 'uuid', None)
# Handle launcher_type safely # Handle launcher_type safely
launcher_type = getattr(query, 'launcher_type', None) launcher_type = getattr(query, 'launcher_type', None)
launcher_type_value = None launcher_type_value = None
@@ -337,6 +353,26 @@ class PipelineAdapter:
# Handle launcher_id # Handle launcher_id
launcher_id = getattr(query, 'launcher_id', None) launcher_id = getattr(query, 'launcher_id', None)
# Build session_id from launcher info if available
session_id = None
if launcher_type_value and launcher_id:
session_id = f'{launcher_type_value}_{launcher_id}'
# Handle session and conversation_id
conversation_id = None
session = getattr(query, 'session', None)
if session:
conversation = getattr(session, 'using_conversation', None)
if conversation:
conversation_id = getattr(conversation, 'uuid', None)
if not conversation_id:
variables = getattr(query, 'variables', None) or {}
conversation_id = variables.get('conversation_id') or None
if not conversation_id:
conversation_id = session_id
# Handle sender_id # Handle sender_id
sender_id = getattr(query, 'sender_id', None) sender_id = getattr(query, 'sender_id', None)
if sender_id is not None: if sender_id is not None:
@@ -348,13 +384,8 @@ class PipelineAdapter:
# Handle pipeline_uuid # Handle pipeline_uuid
pipeline_uuid = getattr(query, 'pipeline_uuid', None) pipeline_uuid = getattr(query, 'pipeline_uuid', None)
# Build session_id from launcher info if available
session_id = None
if launcher_type_value and launcher_id:
session_id = f'{launcher_type_value}_{launcher_id}'
return ConversationContext( return ConversationContext(
conversation_id=conversation_id, conversation_id=str(conversation_id) if conversation_id is not None else None,
thread_id=None, thread_id=None,
launcher_type=launcher_type_value, launcher_type=launcher_type_value,
launcher_id=launcher_id, launcher_id=launcher_id,

View File

@@ -674,6 +674,8 @@ class RuntimeConnectionHandler(handler.Handler):
extra_args=effective_extra_args, extra_args=effective_extra_args,
remove_think=remove_think, remove_think=remove_think,
): ):
if chunk is None:
continue
yield handler.ActionResponse.success( yield handler.ActionResponse.success(
data={ data={
'chunk': chunk.model_dump(), 'chunk': chunk.model_dump(),

View File

@@ -171,7 +171,8 @@ class BailianChatCompletions(modelscopechatcmpl.ModelScopeChatCompletions):
# 解析 chunk 数据 # 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices: if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0] choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} delta_obj = getattr(choice, 'delta', None)
delta = delta_obj.model_dump() if delta_obj is not None else {}
finish_reason = getattr(choice, 'finish_reason', None) finish_reason = getattr(choice, 'finish_reason', None)
else: else:
delta = {} delta = {}

View File

@@ -359,7 +359,8 @@ class OpenAIChatCompletions(requester.ProviderAPIRequester):
if hasattr(chunk, 'choices') and chunk.choices: if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0] choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} delta_obj = getattr(choice, 'delta', None)
delta = delta_obj.model_dump() if delta_obj is not None else {}
finish_reason = getattr(choice, 'finish_reason', None) finish_reason = getattr(choice, 'finish_reason', None)
else: else:

View File

@@ -132,7 +132,8 @@ class GeminiChatCompletions(chatcmpl.OpenAIChatCompletions):
if hasattr(chunk, 'choices') and chunk.choices: if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0] choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} delta_obj = getattr(choice, 'delta', None)
delta = delta_obj.model_dump() if delta_obj is not None else {}
finish_reason = getattr(choice, 'finish_reason', None) finish_reason = getattr(choice, 'finish_reason', None)
else: else:

View File

@@ -144,7 +144,8 @@ class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions):
# 解析 chunk 数据 # 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices: if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0] choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} delta_obj = getattr(choice, 'delta', None)
delta = delta_obj.model_dump() if delta_obj is not None else {}
finish_reason = getattr(choice, 'finish_reason', None) finish_reason = getattr(choice, 'finish_reason', None)
else: else:
delta = {} delta = {}
@@ -159,7 +160,7 @@ class JieKouAIChatCompletions(chatcmpl.OpenAIChatCompletions):
# reasoning_content = delta.get('reasoning_content', '') # reasoning_content = delta.get('reasoning_content', '')
if remove_think: if remove_think:
if delta['content'] is not None: if delta.get('content') is not None:
if '<think>' in delta['content'] and not thinking_started and not thinking_ended: if '<think>' in delta['content'] and not thinking_started and not thinking_ended:
thinking_started = True thinking_started = True
continue continue

View File

@@ -391,7 +391,8 @@ class ModelScopeChatCompletions(requester.ProviderAPIRequester):
# 解析 chunk 数据 # 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices: if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0] choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} delta_obj = getattr(choice, 'delta', None)
delta = delta_obj.model_dump() if delta_obj is not None else {}
finish_reason = getattr(choice, 'finish_reason', None) finish_reason = getattr(choice, 'finish_reason', None)
else: else:
delta = {} delta = {}

View File

@@ -144,7 +144,8 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
# 解析 chunk 数据 # 解析 chunk 数据
if hasattr(chunk, 'choices') and chunk.choices: if hasattr(chunk, 'choices') and chunk.choices:
choice = chunk.choices[0] choice = chunk.choices[0]
delta = choice.delta.model_dump() if hasattr(choice, 'delta') else {} delta_obj = getattr(choice, 'delta', None)
delta = delta_obj.model_dump() if delta_obj is not None else {}
finish_reason = getattr(choice, 'finish_reason', None) finish_reason = getattr(choice, 'finish_reason', None)
else: else:
delta = {} delta = {}
@@ -159,7 +160,7 @@ class PPIOChatCompletions(chatcmpl.OpenAIChatCompletions):
# reasoning_content = delta.get('reasoning_content', '') # reasoning_content = delta.get('reasoning_content', '')
if remove_think: if remove_think:
if delta['content'] is not None: if delta.get('content') is not None:
if '<think>' in delta['content'] and not thinking_started and not thinking_ended: if '<think>' in delta['content'] and not thinking_started and not thinking_ended:
thinking_started = True thinking_started = True
continue continue

View File

@@ -3,28 +3,20 @@ from __future__ import annotations
import pytest import pytest
from unittest.mock import MagicMock, AsyncMock, patch from unittest.mock import MagicMock, AsyncMock, patch
import uuid
# SDK imports for validation # SDK imports for validation
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
from langbot_plugin.api.entities.builtin.agent_runner.event import AgentEventContext from langbot_plugin.api.entities.builtin.agent_runner.event import AgentEventContext
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
from langbot_plugin.api.entities.builtin.agent_runner.context_access import ContextAccess from langbot_plugin.api.entities.builtin.agent_runner.context_access import ContextAccess
from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources
from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext
from langbot_plugin.api.entities.builtin.agent_runner.state import AgentRunState
# LangBot imports # LangBot imports
from langbot.pkg.agent.runner.context_builder import ( from langbot.pkg.agent.runner.context_builder import (
AgentRunContextBuilder, AgentRunContextBuilder,
AgentTrigger as BuilderTrigger,
ConversationContext as BuilderConversation,
AgentInput as BuilderInput,
AgentRunState as BuilderState,
AgentResources as BuilderResources, AgentResources as BuilderResources,
AgentRuntimeContext as BuilderRuntime,
) )
from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding, BindingScope from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding, BindingScope
from langbot.pkg.core import app from langbot.pkg.core import app
@@ -171,6 +163,65 @@ class TestContextValidation:
# Verify input # Verify input
assert validated.input.text == "Hello world" assert validated.input.text == "Hello world"
@pytest.mark.asyncio
async def test_build_context_preserves_subject_data_for_non_message_events(self):
"""Non-message EBA events keep subject.data instead of relying on message text."""
from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext, SubjectContext
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput as EventInput
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
mock_app = self._make_mock_app()
builder = AgentRunContextBuilder(mock_app)
event = AgentEventEnvelope(
event_id="evt_recall_1",
event_type="message.recalled",
event_time=1700000001,
source="platform",
source_event_type="platform.message.recall",
bot_id="bot_1",
workspace_id="workspace_1",
conversation_id="conv_1",
actor=ActorContext(actor_type="user", actor_id="user_1"),
subject=SubjectContext(
subject_type="message",
subject_id="message_1",
data={"recalled_message_id": "message_1", "reason": "user_recall"},
),
input=EventInput(text=None),
delivery=DeliveryContext(surface="test"),
data={"source_event_id": "source_recall_1"},
)
binding = self._make_binding()
binding.event_types = ["message.recalled"]
resources = self._make_resources()
descriptor = self._make_descriptor()
with patch('langbot.pkg.agent.runner.context_builder.get_persistent_state_store') as mock_get_store:
mock_store = AsyncMock()
mock_store.build_snapshot_from_event = AsyncMock(return_value={
'conversation': {},
'actor': {},
'subject': {},
'runner': {},
})
mock_get_store.return_value = mock_store
context_dict = await builder.build_context_from_event(
event=event,
binding=binding,
descriptor=descriptor,
resources=resources,
)
validated = AgentRunContext.model_validate(context_dict)
assert validated.event.event_type == "message.recalled"
assert validated.input.text is None
assert validated.subject is not None
assert validated.subject.subject_type == "message"
assert validated.subject.subject_id == "message_1"
assert validated.subject.data == {"recalled_message_id": "message_1", "reason": "user_recall"}
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_build_context_from_event_has_no_legacy_top_level_fields(self): async def test_build_context_from_event_has_no_legacy_top_level_fields(self):
"""Test that build_context_from_event does NOT have top-level messages/prompt/params.""" """Test that build_context_from_event does NOT have top-level messages/prompt/params."""

View File

@@ -10,15 +10,11 @@ Tests cover:
from __future__ import annotations from __future__ import annotations
import pytest import pytest
from unittest.mock import Mock, MagicMock, patch from unittest.mock import Mock
import typing
# Import SDK entities # Import SDK entities
from langbot_plugin.api.entities.builtin.agent_runner.event import ( from langbot_plugin.api.entities.builtin.agent_runner.event import (
AgentEventContext, AgentEventContext,
ConversationContext,
ActorContext,
SubjectContext,
) )
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger
@@ -33,22 +29,8 @@ from langbot_plugin.api.entities.builtin.agent_runner.capabilities import (
from langbot_plugin.api.entities.builtin.agent_runner.permissions import ( from langbot_plugin.api.entities.builtin.agent_runner.permissions import (
AgentRunnerPermissions, AgentRunnerPermissions,
) )
from langbot_plugin.api.entities.builtin.agent_runner.context_policy import (
AgentRunnerContextPolicy,
)
from langbot_plugin.api.entities.builtin.agent_runner.manifest import (
AgentRunnerManifest,
)
# Import LangBot host models # Import LangBot host models
from langbot.pkg.agent.runner.host_models import (
AgentEventEnvelope,
AgentBinding,
BindingScope,
ResourcePolicy,
StatePolicy,
DeliveryPolicy,
)
from langbot.pkg.agent.runner.pipeline_adapter import PipelineAdapter from langbot.pkg.agent.runner.pipeline_adapter import PipelineAdapter
@@ -76,9 +58,24 @@ class TestPipelineQueryToEventEnvelope:
"""Test conversation context extraction.""" """Test conversation context extraction."""
event = PipelineAdapter.query_to_event(mock_query) event = PipelineAdapter.query_to_event(mock_query)
# Conversation may be None if no session assert event.conversation_id == "conv-uuid-123"
if event.conversation_id:
assert event.conversation_id is not None def test_query_to_event_prefers_variable_conversation_id_when_conversation_uuid_missing(self, mock_query):
"""Pipeline variables can provide the conversation identity for state scope."""
mock_query.session.using_conversation.uuid = None
mock_query.variables["conversation_id"] = "conv-from-vars"
event = PipelineAdapter.query_to_event(mock_query)
assert event.conversation_id == "conv-from-vars"
def test_query_to_event_falls_back_to_launcher_session_for_state_scope(self, mock_query):
"""Debug Chat and legacy pipeline runs may not have a conversation UUID."""
mock_query.session.using_conversation.uuid = None
event = PipelineAdapter.query_to_event(mock_query)
assert event.conversation_id == "person_launcher-123"
def test_query_to_event_delivery_context(self, mock_query): def test_query_to_event_delivery_context(self, mock_query):
"""Test delivery context extraction.""" """Test delivery context extraction."""
@@ -118,6 +115,17 @@ class TestPipelineQueryToEventEnvelope:
assert event.delivery.reply_target == {"message_id": None} assert event.delivery.reply_target == {"message_id": None}
def test_query_to_event_scopes_pipeline_local_event_ids(self, mock_query):
"""Pipeline-local message IDs must not become global audit IDs."""
first = PipelineAdapter.query_to_event(mock_query)
mock_query.launcher_id = "launcher-456"
second = PipelineAdapter.query_to_event(mock_query)
assert first.event_id.startswith("pipeline:")
assert first.event_id != "789"
assert second.event_id != first.event_id
class TestPipelineConfigToBinding: class TestPipelineConfigToBinding:
"""Test Pipeline config -> AgentBinding conversion.""" """Test Pipeline config -> AgentBinding conversion."""

View File

@@ -511,6 +511,53 @@ class TestAgentRunProxyActions:
assert provider.kwargs['remove_think'] is True assert provider.kwargs['remove_think'] is True
assert provider.kwargs['funcs'] == [] assert provider.kwargs['funcs'] == []
@pytest.mark.asyncio
async def test_invoke_llm_stream_skips_none_chunks(self, app):
"""INVOKE_LLM_STREAM tolerates provider heartbeat/no-op chunks."""
from langbot.pkg.agent.runner.session_registry import get_session_registry
class StreamProvider:
async def invoke_llm_stream(self, **kwargs):
yield provider_message.MessageChunk(role='assistant', content='ok')
yield None
yield provider_message.MessageChunk(role='assistant', content=' done', is_final=True)
run_id = 'run_proxy_invoke_llm_stream_none_chunks'
query = self.query()
app.query_pool.cached_queries[904] = query
registry = get_session_registry()
await registry.unregister(run_id)
await registry.register(
run_id=run_id,
runner_id='plugin:test/runner/default',
query_id=904,
plugin_identity='test/runner',
resources=make_agent_resources(models=[{'model_id': 'llm_stream_002'}]),
)
model = SimpleNamespace(
model_entity=SimpleNamespace(abilities=[], extra_args={}),
provider=StreamProvider(),
)
app.model_mgr.get_model_by_uuid.return_value = model
runtime_handler = make_handler(app)
responses = []
try:
stream = runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM_STREAM.value]({
'run_id': run_id,
'llm_model_uuid': 'llm_stream_002',
'messages': [{'role': 'user', 'content': 'hello'}],
})
async for response in stream:
responses.append(response)
finally:
await registry.unregister(run_id)
assert [response.code for response in responses] == [0, 0]
assert [response.data['chunk']['content'] for response in responses] == ['ok', ' done']
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_call_tool_passes_current_query(self, app): async def test_call_tool_passes_current_query(self, app):
"""CALL_TOOL passes the current Query back into tool execution.""" """CALL_TOOL passes the current Query back into tool execution."""

View File

@@ -7,6 +7,7 @@ only the necessary dependencies.
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
import openai # Import real openai package import openai # Import real openai package
@@ -140,6 +141,36 @@ class TestInvokeLLMErrorHandling:
assert '频繁' in str(exc.value) or '余额' in str(exc.value) assert '频繁' in str(exc.value) or '余额' in str(exc.value)
@pytest.mark.asyncio
async def test_stream_skips_choice_with_none_delta(self, requester_with_mocked_client, mock_model):
"""OpenAI-compatible streams may include choice frames with delta=None."""
async def fake_req_stream(args, extra_body=None):
yield SimpleNamespace(choices=[SimpleNamespace(delta=None, finish_reason=None)])
yield SimpleNamespace(
choices=[
SimpleNamespace(
delta=SimpleNamespace(model_dump=lambda: {'role': 'assistant', 'content': 'OK'}),
finish_reason='stop',
)
]
)
requester_with_mocked_client._req_stream = fake_req_stream
chunks = [
chunk
async for chunk in requester_with_mocked_client._closure_stream(
query=None,
req_messages=[{'role': 'user', 'content': 'hello'}],
use_model=mock_model,
)
]
assert len(chunks) == 1
assert chunks[0].content == 'OK'
assert chunks[0].is_final is True
class TestInvokeEmbeddingErrorHandling: class TestInvokeEmbeddingErrorHandling:
"""Tests for invoke_embedding error handling.""" """Tests for invoke_embedding error handling."""