mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-12 16:56:02 +00:00
fix(agent-runner): stabilize event context and streams
This commit is contained in:
committed by
huanghuoguoguo
parent
9bdebcdc5a
commit
a2b38f5bf2
@@ -3,28 +3,20 @@ from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
import uuid
|
||||
|
||||
# SDK imports for validation
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.context import AgentRunContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.event import AgentEventContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.context_access import ContextAccess
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.resources import AgentResources
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.runtime import AgentRuntimeContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.state import AgentRunState
|
||||
|
||||
# LangBot imports
|
||||
from langbot.pkg.agent.runner.context_builder import (
|
||||
AgentRunContextBuilder,
|
||||
AgentTrigger as BuilderTrigger,
|
||||
ConversationContext as BuilderConversation,
|
||||
AgentInput as BuilderInput,
|
||||
AgentRunState as BuilderState,
|
||||
AgentResources as BuilderResources,
|
||||
AgentRuntimeContext as BuilderRuntime,
|
||||
)
|
||||
from langbot.pkg.agent.runner.host_models import AgentEventEnvelope, AgentBinding, BindingScope
|
||||
from langbot.pkg.core import app
|
||||
@@ -171,6 +163,65 @@ class TestContextValidation:
|
||||
# Verify input
|
||||
assert validated.input.text == "Hello world"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_context_preserves_subject_data_for_non_message_events(self):
|
||||
"""Non-message EBA events keep subject.data instead of relying on message text."""
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.event import ActorContext, SubjectContext
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput as EventInput
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
|
||||
|
||||
mock_app = self._make_mock_app()
|
||||
builder = AgentRunContextBuilder(mock_app)
|
||||
event = AgentEventEnvelope(
|
||||
event_id="evt_recall_1",
|
||||
event_type="message.recalled",
|
||||
event_time=1700000001,
|
||||
source="platform",
|
||||
source_event_type="platform.message.recall",
|
||||
bot_id="bot_1",
|
||||
workspace_id="workspace_1",
|
||||
conversation_id="conv_1",
|
||||
actor=ActorContext(actor_type="user", actor_id="user_1"),
|
||||
subject=SubjectContext(
|
||||
subject_type="message",
|
||||
subject_id="message_1",
|
||||
data={"recalled_message_id": "message_1", "reason": "user_recall"},
|
||||
),
|
||||
input=EventInput(text=None),
|
||||
delivery=DeliveryContext(surface="test"),
|
||||
data={"source_event_id": "source_recall_1"},
|
||||
)
|
||||
binding = self._make_binding()
|
||||
binding.event_types = ["message.recalled"]
|
||||
resources = self._make_resources()
|
||||
descriptor = self._make_descriptor()
|
||||
|
||||
with patch('langbot.pkg.agent.runner.context_builder.get_persistent_state_store') as mock_get_store:
|
||||
mock_store = AsyncMock()
|
||||
mock_store.build_snapshot_from_event = AsyncMock(return_value={
|
||||
'conversation': {},
|
||||
'actor': {},
|
||||
'subject': {},
|
||||
'runner': {},
|
||||
})
|
||||
mock_get_store.return_value = mock_store
|
||||
|
||||
context_dict = await builder.build_context_from_event(
|
||||
event=event,
|
||||
binding=binding,
|
||||
descriptor=descriptor,
|
||||
resources=resources,
|
||||
)
|
||||
|
||||
validated = AgentRunContext.model_validate(context_dict)
|
||||
|
||||
assert validated.event.event_type == "message.recalled"
|
||||
assert validated.input.text is None
|
||||
assert validated.subject is not None
|
||||
assert validated.subject.subject_type == "message"
|
||||
assert validated.subject.subject_id == "message_1"
|
||||
assert validated.subject.data == {"recalled_message_id": "message_1", "reason": "user_recall"}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_build_context_from_event_has_no_legacy_top_level_fields(self):
|
||||
"""Test that build_context_from_event does NOT have top-level messages/prompt/params."""
|
||||
|
||||
@@ -10,15 +10,11 @@ Tests cover:
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, MagicMock, patch
|
||||
import typing
|
||||
from unittest.mock import Mock
|
||||
|
||||
# Import SDK entities
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.event import (
|
||||
AgentEventContext,
|
||||
ConversationContext,
|
||||
ActorContext,
|
||||
SubjectContext,
|
||||
)
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.trigger import AgentTrigger
|
||||
@@ -33,22 +29,8 @@ from langbot_plugin.api.entities.builtin.agent_runner.capabilities import (
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.permissions import (
|
||||
AgentRunnerPermissions,
|
||||
)
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.context_policy import (
|
||||
AgentRunnerContextPolicy,
|
||||
)
|
||||
from langbot_plugin.api.entities.builtin.agent_runner.manifest import (
|
||||
AgentRunnerManifest,
|
||||
)
|
||||
|
||||
# Import LangBot host models
|
||||
from langbot.pkg.agent.runner.host_models import (
|
||||
AgentEventEnvelope,
|
||||
AgentBinding,
|
||||
BindingScope,
|
||||
ResourcePolicy,
|
||||
StatePolicy,
|
||||
DeliveryPolicy,
|
||||
)
|
||||
from langbot.pkg.agent.runner.pipeline_adapter import PipelineAdapter
|
||||
|
||||
|
||||
@@ -76,9 +58,24 @@ class TestPipelineQueryToEventEnvelope:
|
||||
"""Test conversation context extraction."""
|
||||
event = PipelineAdapter.query_to_event(mock_query)
|
||||
|
||||
# Conversation may be None if no session
|
||||
if event.conversation_id:
|
||||
assert event.conversation_id is not None
|
||||
assert event.conversation_id == "conv-uuid-123"
|
||||
|
||||
def test_query_to_event_prefers_variable_conversation_id_when_conversation_uuid_missing(self, mock_query):
|
||||
"""Pipeline variables can provide the conversation identity for state scope."""
|
||||
mock_query.session.using_conversation.uuid = None
|
||||
mock_query.variables["conversation_id"] = "conv-from-vars"
|
||||
|
||||
event = PipelineAdapter.query_to_event(mock_query)
|
||||
|
||||
assert event.conversation_id == "conv-from-vars"
|
||||
|
||||
def test_query_to_event_falls_back_to_launcher_session_for_state_scope(self, mock_query):
|
||||
"""Debug Chat and legacy pipeline runs may not have a conversation UUID."""
|
||||
mock_query.session.using_conversation.uuid = None
|
||||
|
||||
event = PipelineAdapter.query_to_event(mock_query)
|
||||
|
||||
assert event.conversation_id == "person_launcher-123"
|
||||
|
||||
def test_query_to_event_delivery_context(self, mock_query):
|
||||
"""Test delivery context extraction."""
|
||||
@@ -118,6 +115,17 @@ class TestPipelineQueryToEventEnvelope:
|
||||
|
||||
assert event.delivery.reply_target == {"message_id": None}
|
||||
|
||||
def test_query_to_event_scopes_pipeline_local_event_ids(self, mock_query):
|
||||
"""Pipeline-local message IDs must not become global audit IDs."""
|
||||
first = PipelineAdapter.query_to_event(mock_query)
|
||||
|
||||
mock_query.launcher_id = "launcher-456"
|
||||
second = PipelineAdapter.query_to_event(mock_query)
|
||||
|
||||
assert first.event_id.startswith("pipeline:")
|
||||
assert first.event_id != "789"
|
||||
assert second.event_id != first.event_id
|
||||
|
||||
|
||||
class TestPipelineConfigToBinding:
|
||||
"""Test Pipeline config -> AgentBinding conversion."""
|
||||
|
||||
@@ -511,6 +511,53 @@ class TestAgentRunProxyActions:
|
||||
assert provider.kwargs['remove_think'] is True
|
||||
assert provider.kwargs['funcs'] == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invoke_llm_stream_skips_none_chunks(self, app):
|
||||
"""INVOKE_LLM_STREAM tolerates provider heartbeat/no-op chunks."""
|
||||
from langbot.pkg.agent.runner.session_registry import get_session_registry
|
||||
|
||||
class StreamProvider:
|
||||
async def invoke_llm_stream(self, **kwargs):
|
||||
yield provider_message.MessageChunk(role='assistant', content='ok')
|
||||
yield None
|
||||
yield provider_message.MessageChunk(role='assistant', content=' done', is_final=True)
|
||||
|
||||
run_id = 'run_proxy_invoke_llm_stream_none_chunks'
|
||||
query = self.query()
|
||||
app.query_pool.cached_queries[904] = query
|
||||
|
||||
registry = get_session_registry()
|
||||
await registry.unregister(run_id)
|
||||
await registry.register(
|
||||
run_id=run_id,
|
||||
runner_id='plugin:test/runner/default',
|
||||
query_id=904,
|
||||
plugin_identity='test/runner',
|
||||
resources=make_agent_resources(models=[{'model_id': 'llm_stream_002'}]),
|
||||
)
|
||||
|
||||
model = SimpleNamespace(
|
||||
model_entity=SimpleNamespace(abilities=[], extra_args={}),
|
||||
provider=StreamProvider(),
|
||||
)
|
||||
app.model_mgr.get_model_by_uuid.return_value = model
|
||||
runtime_handler = make_handler(app)
|
||||
|
||||
responses = []
|
||||
try:
|
||||
stream = runtime_handler.actions[PluginToRuntimeAction.INVOKE_LLM_STREAM.value]({
|
||||
'run_id': run_id,
|
||||
'llm_model_uuid': 'llm_stream_002',
|
||||
'messages': [{'role': 'user', 'content': 'hello'}],
|
||||
})
|
||||
async for response in stream:
|
||||
responses.append(response)
|
||||
finally:
|
||||
await registry.unregister(run_id)
|
||||
|
||||
assert [response.code for response in responses] == [0, 0]
|
||||
assert [response.data['chunk']['content'] for response in responses] == ['ok', ' done']
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_call_tool_passes_current_query(self, app):
|
||||
"""CALL_TOOL passes the current Query back into tool execution."""
|
||||
|
||||
@@ -7,6 +7,7 @@ only the necessary dependencies.
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
import pytest
|
||||
import openai # Import real openai package
|
||||
@@ -140,6 +141,36 @@ class TestInvokeLLMErrorHandling:
|
||||
|
||||
assert '频繁' in str(exc.value) or '余额' in str(exc.value)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_skips_choice_with_none_delta(self, requester_with_mocked_client, mock_model):
|
||||
"""OpenAI-compatible streams may include choice frames with delta=None."""
|
||||
|
||||
async def fake_req_stream(args, extra_body=None):
|
||||
yield SimpleNamespace(choices=[SimpleNamespace(delta=None, finish_reason=None)])
|
||||
yield SimpleNamespace(
|
||||
choices=[
|
||||
SimpleNamespace(
|
||||
delta=SimpleNamespace(model_dump=lambda: {'role': 'assistant', 'content': 'OK'}),
|
||||
finish_reason='stop',
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
requester_with_mocked_client._req_stream = fake_req_stream
|
||||
|
||||
chunks = [
|
||||
chunk
|
||||
async for chunk in requester_with_mocked_client._closure_stream(
|
||||
query=None,
|
||||
req_messages=[{'role': 'user', 'content': 'hello'}],
|
||||
use_model=mock_model,
|
||||
)
|
||||
]
|
||||
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0].content == 'OK'
|
||||
assert chunks[0].is_final is True
|
||||
|
||||
|
||||
class TestInvokeEmbeddingErrorHandling:
|
||||
"""Tests for invoke_embedding error handling."""
|
||||
|
||||
Reference in New Issue
Block a user