mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 12:05:54 +00:00
602 lines
20 KiB
Python
602 lines
20 KiB
Python
"""Tests for EventLog, Transcript, and history/event APIs."""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, MagicMock, patch
|
|
import datetime
|
|
|
|
from langbot.pkg.agent.runner.host_models import (
|
|
AgentEventEnvelope,
|
|
AgentBinding,
|
|
BindingScope,
|
|
ResourcePolicy,
|
|
StatePolicy,
|
|
DeliveryPolicy,
|
|
)
|
|
from langbot.pkg.agent.runner.event_log_store import EventLogStore
|
|
from langbot.pkg.agent.runner.transcript_store import TranscriptStore
|
|
from langbot.pkg.agent.runner.session_registry import get_session_registry
|
|
from langbot_plugin.api.entities.builtin.agent_runner.event import (
|
|
AgentEventContext,
|
|
ActorContext,
|
|
)
|
|
from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput
|
|
from langbot_plugin.api.entities.builtin.agent_runner.delivery import DeliveryContext
|
|
|
|
|
|
def make_event_envelope(
|
|
event_id: str = "evt_1",
|
|
event_type: str = "message.received",
|
|
conversation_id: str | None = "conv_1",
|
|
actor_id: str | None = "user_1",
|
|
input_text: str = "Hello",
|
|
) -> AgentEventEnvelope:
|
|
"""Create a test event envelope."""
|
|
return AgentEventEnvelope(
|
|
event_id=event_id,
|
|
event_type=event_type,
|
|
event_time=1700000000,
|
|
source="platform",
|
|
bot_id="bot_1",
|
|
workspace_id=None,
|
|
conversation_id=conversation_id,
|
|
thread_id=None,
|
|
actor=ActorContext(
|
|
actor_type="user",
|
|
actor_id=actor_id,
|
|
actor_name="Test User",
|
|
) if actor_id else None,
|
|
subject=None,
|
|
input=AgentInput(text=input_text),
|
|
delivery=DeliveryContext(surface="test"),
|
|
)
|
|
|
|
|
|
def make_binding(runner_id: str = "plugin:test/plugin/runner") -> AgentBinding:
|
|
"""Create a test binding."""
|
|
return AgentBinding(
|
|
binding_id="binding_1",
|
|
scope=BindingScope(scope_type="pipeline", scope_id="pipeline_1"),
|
|
event_types=["message.received"],
|
|
runner_id=runner_id,
|
|
runner_config={},
|
|
resource_policy=ResourcePolicy(),
|
|
state_policy=StatePolicy(),
|
|
delivery_policy=DeliveryPolicy(),
|
|
enabled=True,
|
|
)
|
|
|
|
|
|
class TestEventLogStore:
|
|
"""Test EventLogStore operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_event(self, mock_db_engine):
|
|
"""Test appending an event to EventLog."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = EventLogStore(mock_db_engine)
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.add = MagicMock()
|
|
mock_session.commit = AsyncMock()
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
event_id = await store.append_event(
|
|
event_id="evt_1",
|
|
event_type="message.received",
|
|
source="platform",
|
|
bot_id="bot_1",
|
|
conversation_id="conv_1",
|
|
actor_type="user",
|
|
actor_id="user_1",
|
|
input_summary="Hello world",
|
|
run_id="run_1",
|
|
runner_id="plugin:test/plugin/runner",
|
|
)
|
|
|
|
assert event_id == "evt_1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_event_truncates_input_summary(self, mock_db_engine):
|
|
"""Test that long input summaries are truncated."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = EventLogStore(mock_db_engine)
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.add = MagicMock()
|
|
mock_session.commit = AsyncMock()
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
long_text = "x" * 2000
|
|
event_id = await store.append_event(
|
|
event_id="evt_2",
|
|
event_type="message.received",
|
|
source="platform",
|
|
input_summary=long_text,
|
|
)
|
|
|
|
assert event_id == "evt_2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_page_events_with_conversation_filter(self, mock_db_engine):
|
|
"""Test paging events with conversation_id filter."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = EventLogStore(mock_db_engine)
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.all.return_value = []
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
items, next_seq, has_more = await store.page_events(
|
|
conversation_id="conv_1",
|
|
limit=10,
|
|
)
|
|
|
|
assert isinstance(items, list)
|
|
|
|
|
|
class TestTranscriptStore:
|
|
"""Test TranscriptStore operations."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_transcript(self, mock_db_engine):
|
|
"""Test appending a transcript item."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.add = MagicMock()
|
|
mock_session.commit = AsyncMock()
|
|
|
|
# Mock _get_next_seq
|
|
with patch.object(store, '_get_next_seq', return_value=1):
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
transcript_id = await store.append_transcript(
|
|
transcript_id=None, # Auto-generate
|
|
event_id="evt_1",
|
|
conversation_id="conv_1",
|
|
role="user",
|
|
content="Hello",
|
|
)
|
|
|
|
assert transcript_id is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_transcript_with_artifacts(self, mock_db_engine):
|
|
"""Test appending transcript with artifact refs."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.add = MagicMock()
|
|
mock_session.commit = AsyncMock()
|
|
|
|
with patch.object(store, '_get_next_seq', return_value=1):
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
transcript_id = await store.append_transcript(
|
|
transcript_id=None, # Auto-generate
|
|
event_id="evt_2",
|
|
conversation_id="conv_1",
|
|
role="assistant",
|
|
content="Here's an image",
|
|
artifact_refs=[
|
|
{"artifact_id": "art_1", "artifact_type": "image", "url": "http://example.com/img.png"}
|
|
],
|
|
)
|
|
|
|
assert transcript_id is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_page_transcript_backward(self, mock_db_engine):
|
|
"""Test paging transcript backward (older items)."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.all.return_value = []
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
items, next_seq, prev_seq, has_more = await store.page_transcript(
|
|
conversation_id="conv_1",
|
|
limit=10,
|
|
direction="backward",
|
|
)
|
|
|
|
assert isinstance(items, list)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_page_transcript_has_hard_limit(self, mock_db_engine):
|
|
"""Test that transcript paging has a hard limit."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.all.return_value = []
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
# Request more than the hard limit
|
|
items, next_seq, prev_seq, has_more = await store.page_transcript(
|
|
conversation_id="conv_1",
|
|
limit=200, # Request 200, but hard limit is 100
|
|
)
|
|
|
|
# The store should cap at 100
|
|
assert len(items) <= store.HARD_LIMIT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_transcript(self, mock_db_engine):
|
|
"""Test searching transcript."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.all.return_value = []
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
items = await store.search_transcript(
|
|
conversation_id="conv_1",
|
|
query_text="database",
|
|
top_k=10,
|
|
)
|
|
|
|
assert isinstance(items, list)
|
|
|
|
|
|
class TestHistoryPageAuthorization:
|
|
"""Test history.page authorization."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_page_requires_run_id(self, mock_handler, mock_db_engine):
|
|
"""Test history.page requires run_id."""
|
|
from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction
|
|
|
|
# Mock call_action to simulate the handler
|
|
result = await mock_handler.call_action(
|
|
PluginToRuntimeAction.HISTORY_PAGE,
|
|
{"run_id": None},
|
|
)
|
|
|
|
# Should return error
|
|
assert result.get("ok") is False or "error" in str(result).lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_page_validates_conversation_scope(self, mock_db_engine):
|
|
"""Test history.page only allows access to run's conversation."""
|
|
# This test verifies the authorization logic
|
|
# The actual implementation validates conversation_id matches session
|
|
session_registry = get_session_registry()
|
|
|
|
await session_registry.register(
|
|
run_id="run_1",
|
|
runner_id="plugin:test/plugin/runner",
|
|
query_id=None,
|
|
plugin_identity="test/plugin",
|
|
resources={"models": [], "tools": [], "knowledge_bases": [], "storage": {"plugin_storage": True}},
|
|
conversation_id="conv_1",
|
|
)
|
|
|
|
session = await session_registry.get("run_1")
|
|
assert session is not None
|
|
assert session["conversation_id"] == "conv_1"
|
|
|
|
# Cleanup
|
|
await session_registry.unregister("run_1")
|
|
|
|
|
|
class TestEventGetAuthorization:
|
|
"""Test event.get authorization."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_event_get_requires_run_id(self, mock_handler):
|
|
"""Test event.get requires run_id."""
|
|
from langbot_plugin.entities.io.actions.enums import PluginToRuntimeAction
|
|
|
|
result = await mock_handler.call_action(
|
|
PluginToRuntimeAction.EVENT_GET,
|
|
{"run_id": None, "event_id": "evt_1"},
|
|
)
|
|
|
|
# Should return error
|
|
assert result.get("ok") is False or "error" in str(result).lower()
|
|
|
|
|
|
class TestContextAccessPopulation:
|
|
"""Test ContextAccess population in build_context_from_event."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_context_access_has_history_apis_when_permitted(self, mock_db_engine):
|
|
"""Test ContextAccess shows available APIs based on permissions."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalars.return_value.first.return_value = None
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
cursor = await store.get_latest_cursor("conv_1")
|
|
# Should return None or a cursor string
|
|
assert cursor is None or isinstance(cursor, str)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_context_access_shows_has_history_before(self, mock_db_engine):
|
|
"""Test ContextAccess indicates if history exists."""
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
store = TranscriptStore(mock_db_engine)
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalar.return_value = 0
|
|
|
|
mock_session = AsyncMock()
|
|
mock_session.execute = AsyncMock(return_value=mock_result)
|
|
|
|
with patch.object(store, '_session_factory') as mock_factory:
|
|
mock_factory.return_value.__aenter__.return_value = mock_session
|
|
|
|
has_history = await store.has_history_before("conv_1", 10)
|
|
assert isinstance(has_history, bool)
|
|
|
|
|
|
class TestEventLogStoreRealSQLite:
|
|
"""Test EventLogStore with real SQLite database."""
|
|
|
|
@pytest.fixture
|
|
async def db_engine(self):
|
|
"""Create an in-memory SQLite database for testing."""
|
|
from sqlalchemy.ext.asyncio import create_async_engine
|
|
from sqlalchemy import text
|
|
from langbot.pkg.entity.persistence.base import Base
|
|
from langbot.pkg.entity.persistence.event_log import EventLog
|
|
|
|
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
|
|
|
|
# Create tables
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
await engine.dispose()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_get_event_round_trip(self, db_engine):
|
|
"""Test append_event -> get_event round trip with real DB."""
|
|
store = EventLogStore(db_engine)
|
|
|
|
# Append event
|
|
event_id = await store.append_event(
|
|
event_id="evt_real_001",
|
|
event_type="message.received",
|
|
source="platform",
|
|
bot_id="bot_001",
|
|
conversation_id="conv_001",
|
|
actor_type="user",
|
|
actor_id="user_001",
|
|
actor_name="Test User",
|
|
input_summary="Hello world",
|
|
run_id="run_001",
|
|
runner_id="plugin:test/plugin/runner",
|
|
)
|
|
|
|
assert event_id == "evt_real_001"
|
|
|
|
# Get event
|
|
event = await store.get_event(event_id)
|
|
assert event is not None
|
|
assert event["event_id"] == "evt_real_001"
|
|
assert event["event_type"] == "message.received"
|
|
assert event["source"] == "platform"
|
|
assert event["conversation_id"] == "conv_001"
|
|
assert event["actor_type"] == "user"
|
|
assert event["actor_id"] == "user_001"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_page_events(self, db_engine):
|
|
"""Test page_events with real DB."""
|
|
store = EventLogStore(db_engine)
|
|
|
|
# Append multiple events
|
|
for i in range(5):
|
|
await store.append_event(
|
|
event_id=f"evt_real_{i:03d}",
|
|
event_type="message.received",
|
|
source="platform",
|
|
conversation_id="conv_001",
|
|
input_summary=f"Message {i}",
|
|
)
|
|
|
|
# Page events
|
|
items, next_seq, has_more = await store.page_events(
|
|
conversation_id="conv_001",
|
|
limit=3,
|
|
)
|
|
|
|
assert len(items) == 3
|
|
assert has_more is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_latest_cursor(self, db_engine):
|
|
"""Test get_latest_cursor with real DB."""
|
|
store = EventLogStore(db_engine)
|
|
|
|
# Append events
|
|
for i in range(3):
|
|
await store.append_event(
|
|
event_id=f"evt_cursor_{i:03d}",
|
|
event_type="message.received",
|
|
source="platform",
|
|
conversation_id="conv_cursor",
|
|
)
|
|
|
|
# Get latest cursor
|
|
cursor = await store.get_latest_cursor("conv_cursor")
|
|
assert cursor is not None
|
|
assert int(cursor) > 0
|
|
|
|
|
|
class TestTranscriptStoreRealSQLite:
|
|
"""Test TranscriptStore with real SQLite database."""
|
|
|
|
@pytest.fixture
|
|
async def db_engine(self):
|
|
"""Create an in-memory SQLite database for testing."""
|
|
from sqlalchemy.ext.asyncio import create_async_engine
|
|
from sqlalchemy import text
|
|
from langbot.pkg.entity.persistence.base import Base
|
|
from langbot.pkg.entity.persistence.transcript import Transcript
|
|
|
|
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
|
|
|
|
# Create tables
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
yield engine
|
|
|
|
await engine.dispose()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_append_page_transcript_round_trip(self, db_engine):
|
|
"""Test append_transcript -> page_transcript round trip with real DB."""
|
|
store = TranscriptStore(db_engine)
|
|
|
|
# Append transcript items
|
|
for i in range(3):
|
|
await store.append_transcript(
|
|
transcript_id=f"trans_real_{i:03d}",
|
|
event_id=f"evt_{i:03d}",
|
|
conversation_id="conv_001",
|
|
role="user" if i % 2 == 0 else "assistant",
|
|
content=f"Message {i}",
|
|
)
|
|
|
|
# Page transcript
|
|
items, next_seq, prev_seq, has_more = await store.page_transcript(
|
|
conversation_id="conv_001",
|
|
limit=10,
|
|
)
|
|
|
|
assert len(items) == 3
|
|
assert items[0]["conversation_id"] == "conv_001"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_search_transcript_real_db(self, db_engine):
|
|
"""Test search_transcript with real DB."""
|
|
store = TranscriptStore(db_engine)
|
|
|
|
# Append transcript items
|
|
await store.append_transcript(
|
|
transcript_id="trans_search_001",
|
|
event_id="evt_search_001",
|
|
conversation_id="conv_search",
|
|
role="user",
|
|
content="I want to learn about databases",
|
|
)
|
|
await store.append_transcript(
|
|
transcript_id="trans_search_002",
|
|
event_id="evt_search_002",
|
|
conversation_id="conv_search",
|
|
role="assistant",
|
|
content="Here is information about databases",
|
|
)
|
|
|
|
# Search for "database"
|
|
items = await store.search_transcript(
|
|
conversation_id="conv_search",
|
|
query_text="database",
|
|
)
|
|
|
|
# Should find at least one match
|
|
assert len(items) >= 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_latest_cursor_real_db(self, db_engine):
|
|
"""Test get_latest_cursor with real DB."""
|
|
store = TranscriptStore(db_engine)
|
|
|
|
# Append transcript items
|
|
for i in range(3):
|
|
await store.append_transcript(
|
|
transcript_id=f"trans_cursor_{i:03d}",
|
|
event_id=f"evt_cursor_{i:03d}",
|
|
conversation_id="conv_cursor",
|
|
role="user",
|
|
content=f"Message {i}",
|
|
)
|
|
|
|
# Get latest cursor
|
|
cursor = await store.get_latest_cursor("conv_cursor")
|
|
assert cursor is not None
|
|
assert int(cursor) > 0
|
|
|
|
|
|
# Fixtures
|
|
@pytest.fixture
|
|
def mock_db_engine():
|
|
"""Create a mock database engine for AsyncSession-based stores."""
|
|
from unittest.mock import MagicMock
|
|
from sqlalchemy.ext.asyncio import AsyncEngine
|
|
|
|
engine = MagicMock(spec=AsyncEngine)
|
|
return engine
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_handler():
|
|
"""Create a mock handler for testing actions."""
|
|
from langbot_plugin.runtime.io.handler import Handler, ActionResponse
|
|
|
|
class MockHandler(Handler):
|
|
def __init__(self):
|
|
self._responses = {}
|
|
|
|
async def call_action(self, action, data, timeout=30):
|
|
# Simulate error response for missing run_id
|
|
if not data.get("run_id"):
|
|
return {"ok": False, "message": "run_id is required"}
|
|
return {"ok": True, "data": {}}
|
|
|
|
return MockHandler()
|