diff --git a/docs/agent-runner-pluginization/PROTOCOL_V1.md b/docs/agent-runner-pluginization/PROTOCOL_V1.md index a6f7a2eb..dda2ee44 100644 --- a/docs/agent-runner-pluginization/PROTOCOL_V1.md +++ b/docs/agent-runner-pluginization/PROTOCOL_V1.md @@ -89,6 +89,8 @@ class AgentRunnerCapabilities(BaseModel): tool_calling: bool = False knowledge_retrieval: bool = False multimodal_input: bool = False + skill_authoring: bool = False + skill_injection: bool = False event_context: bool = True platform_api: bool = False interrupt: bool = False @@ -102,6 +104,8 @@ class AgentRunnerCapabilities(BaseModel): - `tool_calling`: runner 可能调用 Host tool API。 - `knowledge_retrieval`: runner 可能调用 Host knowledge API。 - `multimodal_input`: runner 可以处理非纯文本 input / artifact。 +- `skill_authoring`: runner 需要 Host 提供的 skill authoring tools。 +- `skill_injection`: runner 需要 Host 在 effective prompt 中注入 skill index。 - `event_context`: runner 理解 event-first 输入。 - `platform_api`: runner 可能请求平台动作。 - `interrupt`: runner 支持取消或中断。 diff --git a/src/langbot/pkg/agent/runner/config_schema.py b/src/langbot/pkg/agent/runner/config_schema.py index 430d2d5e..b09d25fd 100644 --- a/src/langbot/pkg/agent/runner/config_schema.py +++ b/src/langbot/pkg/agent/runner/config_schema.py @@ -64,6 +64,20 @@ def uses_host_knowledge_bases(descriptor: AgentRunnerDescriptor | None) -> bool: ) +def supports_skill_authoring(descriptor: AgentRunnerDescriptor | None) -> bool: + """Return whether the runner wants Host skill-authoring tools.""" + if descriptor is None: + return False + return bool(descriptor.capabilities.get('skill_authoring', False)) + + +def supports_skill_injection(descriptor: AgentRunnerDescriptor | None) -> bool: + """Return whether the runner wants the Host skill index in the effective prompt.""" + if descriptor is None: + return False + return bool(descriptor.capabilities.get('skill_injection', False)) + + def extract_prompt_config( descriptor: AgentRunnerDescriptor | None, runner_config: dict[str, typing.Any], diff --git a/src/langbot/pkg/agent/runner/orchestrator.py b/src/langbot/pkg/agent/runner/orchestrator.py index 97092452..6fd934a6 100644 --- a/src/langbot/pkg/agent/runner/orchestrator.py +++ b/src/langbot/pkg/agent/runner/orchestrator.py @@ -137,6 +137,8 @@ class AgentRunOrchestrator: # Merge params into adapter.extra if 'params' in adapter_context: context['adapter']['extra']['params'] = adapter_context['params'] + if adapter_context.get('prompt_get'): + context['context']['available_apis']['prompt_get'] = True # Build state context for State API handlers state_context = build_state_context(event, binding, descriptor) diff --git a/src/langbot/pkg/agent/runner/query_entry_adapter.py b/src/langbot/pkg/agent/runner/query_entry_adapter.py index 9591de72..b5d9a65e 100644 --- a/src/langbot/pkg/agent/runner/query_entry_adapter.py +++ b/src/langbot/pkg/agent/runner/query_entry_adapter.py @@ -148,6 +148,7 @@ class QueryEntryAdapter: return { 'params': cls.build_params(query), 'query_id': getattr(query, 'query_id', None), + 'prompt_get': cls._has_effective_prompt(query), } @classmethod @@ -185,6 +186,12 @@ class QueryEntryAdapter: ) return False + @classmethod + def _has_effective_prompt(cls, query: pipeline_query.Query) -> bool: + prompt = getattr(query, 'prompt', None) + messages = getattr(prompt, 'messages', None) if prompt is not None else None + return isinstance(messages, list) + # Private helper methods @classmethod @@ -374,24 +381,18 @@ class QueryEntryAdapter: content = getattr(user_message, 'content', None) if isinstance(content, list): for elem in content: - # Handle both real objects and mocks + elem_dict = None if hasattr(elem, 'model_dump'): - contents.append(elem.model_dump(mode='json')) + elem_dict = elem.model_dump(mode='json') elif isinstance(elem, dict): - contents.append(elem) - else: - # For mocks, extract type and text attributes - elem_type = getattr(elem, 'type', None) - if elem_type == 'text': - elem_text = getattr(elem, 'text', None) - contents.append({'type': 'text', 'text': elem_text}) - if elem_text: - text_parts.append(elem_text) + elem_dict = elem + + if not isinstance(elem_dict, dict): continue - # Extract text for the text field - if hasattr(elem, 'type') and getattr(elem, 'type', None) == 'text': - elem_text = getattr(elem, 'text', None) + contents.append(elem_dict) + if elem_dict.get('type') == 'text': + elem_text = elem_dict.get('text') if elem_text: text_parts.append(elem_text) elif content is not None: @@ -466,36 +467,37 @@ class QueryEntryAdapter: message_chain = getattr(query, 'message_chain', None) if message_chain: try: - for component in message_chain: - artifact_id = str(uuid.uuid4()) # Generate unique ID - - if isinstance(component, platform_message.Image): - attachments.append({ - 'artifact_id': artifact_id, - 'artifact_type': 'image', - 'source': 'message_chain', - 'id': component.image_id or None, - 'url': component.url or None, - }) - elif isinstance(component, platform_message.File): - attachments.append({ - 'artifact_id': artifact_id, - 'artifact_type': 'file', - 'source': 'message_chain', - 'id': component.id or None, - 'name': component.name or None, - }) - elif isinstance(component, platform_message.Voice): - attachments.append({ - 'artifact_id': artifact_id, - 'artifact_type': 'voice', - 'source': 'message_chain', - 'id': component.voice_id or None, - 'url': component.url or None, - }) + message_components = iter(message_chain) except TypeError: - # message_chain is not iterable (e.g., a Mock object) - pass + message_components = iter(()) + + for component in message_components: + artifact_id = str(uuid.uuid4()) # Generate unique ID + + if isinstance(component, platform_message.Image): + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'image', + 'source': 'message_chain', + 'id': component.image_id or None, + 'url': component.url or None, + }) + elif isinstance(component, platform_message.File): + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'file', + 'source': 'message_chain', + 'id': component.id or None, + 'name': component.name or None, + }) + elif isinstance(component, platform_message.Voice): + attachments.append({ + 'artifact_id': artifact_id, + 'artifact_type': 'voice', + 'source': 'message_chain', + 'id': component.voice_id or None, + 'url': component.url or None, + }) return attachments diff --git a/src/langbot/pkg/agent/runner/transcript_store.py b/src/langbot/pkg/agent/runner/transcript_store.py index ef18115f..b1d7487b 100644 --- a/src/langbot/pkg/agent/runner/transcript_store.py +++ b/src/langbot/pkg/agent/runner/transcript_store.py @@ -11,6 +11,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession from sqlalchemy.orm import sessionmaker from ...entity.persistence.transcript import Transcript +from langbot_plugin.api.entities.builtin.provider import message as provider_message class TranscriptStore: @@ -225,6 +226,30 @@ class TranscriptStore: return None return str(row) + async def get_legacy_provider_messages( + self, + conversation_id: str, + limit: int = HARD_LIMIT, + ) -> list[provider_message.Message]: + """Project Transcript rows into the legacy provider Message view. + + AgentRunner history is canonical in Transcript. This view exists for + legacy Pipeline readers such as PromptPreProcessing that still expect + query.messages. + """ + items, _, _, _ = await self.page_transcript( + conversation_id=conversation_id, + limit=limit, + direction="backward", + ) + + messages: list[provider_message.Message] = [] + for item in reversed(items): + message = self._transcript_item_to_provider_message(item) + if message is not None: + messages.append(message) + return messages + async def has_history_before( self, conversation_id: str, @@ -288,3 +313,29 @@ class TranscriptStore: result['artifact_refs'] = [] return result + + def _transcript_item_to_provider_message( + self, + item: dict[str, typing.Any], + ) -> provider_message.Message | None: + """Convert one Transcript API item into a provider Message.""" + if item.get('item_type') != 'message': + return None + + role = item.get('role') + if role not in {'user', 'assistant'}: + return None + + content_json = item.get('content_json') + if isinstance(content_json, dict): + message_data = dict(content_json) + message_data['role'] = role + try: + return provider_message.Message.model_validate(message_data) + except Exception: + pass + + content = item.get('content') + if content is None: + return None + return provider_message.Message(role=role, content=content) diff --git a/src/langbot/pkg/pipeline/preproc/preproc.py b/src/langbot/pkg/pipeline/preproc/preproc.py index 13265530..b223e438 100644 --- a/src/langbot/pkg/pipeline/preproc/preproc.py +++ b/src/langbot/pkg/pipeline/preproc/preproc.py @@ -19,8 +19,6 @@ DEFAULT_PROMPT_CONFIG = [ {'role': 'system', 'content': 'You are a helpful assistant.'}, ] -LOCAL_AGENT_RUNNER_ID = 'plugin:langbot/local-agent/default' - @stage.stage_class('PreProcessor') class PreProcessor(stage.PipelineStage): @@ -107,6 +105,48 @@ class PreProcessor(stage.PipelineStage): if isinstance(msg.content, list): msg.content = [elem for elem in msg.content if elem.type != 'image_url'] + def _has_declared_db_engine(self) -> bool: + persistence_mgr = getattr(self.ap, 'persistence_mgr', None) + if persistence_mgr is None: + return False + if 'get_db_engine' in getattr(persistence_mgr, '__dict__', {}): + return True + return hasattr(type(persistence_mgr), 'get_db_engine') + + async def _load_agent_runner_history_messages( + self, + runner_id: str | None, + conversation_uuid: str | None, + ) -> list[provider_message.Message] | None: + if not runner_id or not conversation_uuid or not self._has_declared_db_engine(): + return None + + try: + from ...agent.runner.transcript_store import TranscriptStore + + store = TranscriptStore(self.ap.persistence_mgr.get_db_engine()) + messages = await store.get_legacy_provider_messages(str(conversation_uuid)) + except Exception as e: + self.ap.logger.warning( + f'Unable to load Transcript history view for conversation {conversation_uuid}: {e}' + ) + return None + + return messages or None + + async def _resolve_history_messages( + self, + runner_id: str | None, + conversation: typing.Any, + ) -> list[provider_message.Message]: + transcript_messages = await self._load_agent_runner_history_messages( + runner_id, + getattr(conversation, 'uuid', None), + ) + if transcript_messages is not None: + return transcript_messages + return conversation.messages.copy() + async def process( self, query: pipeline_query.Query, @@ -127,8 +167,11 @@ class PreProcessor(stage.PipelineStage): uses_host_models = config_schema.uses_host_models(descriptor) uses_host_tools = config_schema.uses_host_tools(descriptor) - is_local_agent = runner_id == LOCAL_AGENT_RUNNER_ID - include_skill_authoring = is_local_agent and getattr(self.ap, 'skill_service', None) is not None + include_skill_authoring = ( + config_schema.supports_skill_authoring(descriptor) + and getattr(self.ap, 'skill_service', None) is not None + ) + inject_skill_context = config_schema.supports_skill_injection(descriptor) llm_model = None if uses_host_models: primary_uuid, fallback_uuids = config_schema.extract_model_selection(descriptor, runner_config) @@ -171,7 +214,7 @@ class PreProcessor(stage.PipelineStage): # 设置query query.session = session query.prompt = conversation.prompt.copy() - query.messages = conversation.messages.copy() + query.messages = await self._resolve_history_messages(runner_id, conversation) if uses_host_models: query.use_funcs = [] @@ -307,7 +350,7 @@ class PreProcessor(stage.PipelineStage): query.prompt.messages = event_ctx.event.default_prompt query.messages = event_ctx.event.prompt - # =========== Skill awareness for the local-agent runner =========== + # =========== Skill awareness for capable runners =========== # The actual activation goes through the ``activate`` Tool Call so the # LLM doesn't see full SKILL.md instructions until it commits to a # skill (Claude Code's progressive disclosure). But the LLM still has @@ -319,7 +362,7 @@ class PreProcessor(stage.PipelineStage): # only) into the system prompt. The contributor's original PR # relied on this injection; without it the LLM never discovers # the skills are there and just calls native tools instead. - if is_local_agent and self.ap.skill_mgr: + if inject_skill_context and self.ap.skill_mgr: pipeline_data = await self.ap.pipeline_service.get_pipeline(query.pipeline_uuid) extensions_prefs = (pipeline_data or {}).get('extensions_preferences', {}) enable_all_skills = extensions_prefs.get('enable_all_skills', True) diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index 34eea9bf..c12d3b38 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -146,10 +146,11 @@ class ChatMessageHandler(handler.MessageHandler): f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars' ) - # Update conversation history - conversation = await self._ensure_conversation_for_history(query) - conversation.messages.append(query.user_message) - conversation.messages.extend(query.resp_messages) + # Keep a conversation object available for downstream legacy + # readers, but do not mirror AgentRunner history into + # conversation.messages. TranscriptStore is the canonical + # history source for this path. + await self._ensure_conversation_for_history(query) except Exception as e: # Import orchestrator errors for specific handling diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index f9cfe5f4..2fb78cb0 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -368,6 +368,25 @@ def _resolve_remove_think(data: dict[str, Any], query: Any | None) -> bool: return False +def _dump_prompt_messages(query: Any) -> list[dict[str, Any]]: + """Serialize the current effective prompt from a cached Query.""" + prompt = getattr(query, 'prompt', None) + messages = getattr(prompt, 'messages', None) if prompt is not None else None + if not isinstance(messages, list): + return [] + + dumped: list[dict[str, Any]] = [] + for message in messages: + if hasattr(message, 'model_dump'): + try: + dumped.append(message.model_dump(mode='json')) + except TypeError: + dumped.append(message.model_dump()) + elif isinstance(message, dict): + dumped.append(message) + return dumped + + def _merge_model_extra_args(model: Any, call_extra_args: Any) -> dict[str, Any]: """Merge persisted model extra_args with action-level overrides.""" merged: dict[str, Any] = {} @@ -787,17 +806,21 @@ class RuntimeConnectionHandler(handler.Handler): For AgentRunner calls: requires run_id and validates tool_name against session.resources.tools. For regular plugin calls: no run_id, unrestricted access (backward compatibility). - - Note: SDK LangBotAPIProxy (legacy) sends 'tool_parameters' and expects 'tool_response'. - SDK AgentRunAPIProxy sends 'parameters' and expects 'result'. - Handler returns both for backward compatibility. """ tool_name = data['tool_name'] - # Support 'tool_parameters' (LangBotAPIProxy) and 'parameters' (AgentRunAPIProxy) - parameters = data.get('tool_parameters') or data.get('parameters', {}) run_id = data.get('run_id') # Optional: present for AgentRunner calls caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation session = None + is_agent_runner_call = bool(run_id) + + if is_agent_runner_call: + if 'parameters' not in data: + return handler.ActionResponse.error( + message='parameters is required for AgentRunner tool calls', + ) + parameters = data.get('parameters') or {} + else: + parameters = data.get('tool_parameters') or {} # Permission validation for AgentRunner calls if run_id: @@ -817,14 +840,9 @@ class RuntimeConnectionHandler(handler.Handler): parameters=parameters, query=query, ) - # Return both 'tool_response' (LangBotAPIProxy) and 'result' (AgentRunAPIProxy) - # LangBotAPIProxy expects 'tool_response', AgentRunAPIProxy expects 'result' - return handler.ActionResponse.success( - data={ - 'tool_response': result, - 'result': result, # backward compatibility - }, - ) + if is_agent_runner_call: + return handler.ActionResponse.success(data={'result': result}) + return handler.ActionResponse.success(data={'tool_response': result}) except Exception as e: traceback.print_exc() return handler.ActionResponse.error( @@ -1430,6 +1448,32 @@ class RuntimeConnectionHandler(handler.Handler): # ================= Agent History/Event APIs ================= + @self.action(PluginToRuntimeAction.PROMPT_GET) + async def prompt_get(data: dict[str, Any]) -> handler.ActionResponse: + """Return the post-preprocessing effective prompt for a query-backed run.""" + run_id = data.get('run_id') + caller_plugin_identity = data.get('caller_plugin_identity') + + if not run_id: + return handler.ActionResponse.error(message='run_id is required') + + session, error = await _validate_agent_run_session( + run_id, + caller_plugin_identity, + self.ap, + 'Prompt get', + ) + if error: + return error + + query = _resolve_action_query(data, session, self.ap) + if query is None: + return handler.ActionResponse.error( + message='Prompt get is only available for query-backed agent runs', + ) + + return handler.ActionResponse.success(data={'prompt': _dump_prompt_messages(query)}) + @self.action(PluginToRuntimeAction.HISTORY_PAGE) async def history_page(data: dict[str, Any]) -> handler.ActionResponse: """Page through transcript history for a conversation. diff --git a/tests/unit_tests/agent/test_chat_handler.py b/tests/unit_tests/agent/test_chat_handler.py index 602296a5..c3f62550 100644 --- a/tests/unit_tests/agent/test_chat_handler.py +++ b/tests/unit_tests/agent/test_chat_handler.py @@ -408,8 +408,8 @@ class TestChatHandlerAsyncBehavior: assert query.resp_messages[1].content == 'Response 2' @pytest.mark.asyncio - async def test_history_update_recreates_conversation_if_tool_resets_it(self): - """History update should tolerate CREATE_NEW_CONVERSATION during runner execution.""" + async def test_agent_turn_recreates_conversation_if_tool_resets_it(self): + """Agent turn bookkeeping should tolerate CREATE_NEW_CONVERSATION during runner execution.""" from langbot.pkg.pipeline.process.handlers.chat import ChatMessageHandler from langbot.pkg.pipeline import entities @@ -449,7 +449,7 @@ class TestChatHandlerAsyncBehavior: assert results[0].result_type == entities.ResultType.CONTINUE mock_ap.sess_mgr.get_conversation.assert_awaited_once() assert query.session.using_conversation is new_conversation - assert new_conversation.messages == [query.user_message, response] + assert new_conversation.messages == [] @pytest.mark.asyncio async def test_runner_not_found_error(self): diff --git a/tests/unit_tests/agent/test_context_builder_params_state.py b/tests/unit_tests/agent/test_context_builder_params_state.py index 05e868eb..711a1c9b 100644 --- a/tests/unit_tests/agent/test_context_builder_params_state.py +++ b/tests/unit_tests/agent/test_context_builder_params_state.py @@ -159,4 +159,4 @@ class TestBuildAdapterContext: context = QueryEntryAdapter.build_adapter_context(query, binding=None) - assert context == {'params': {}, 'query_id': 123} + assert context == {'params': {}, 'query_id': 123, 'prompt_get': False} diff --git a/tests/unit_tests/agent/test_event_log_transcript.py b/tests/unit_tests/agent/test_event_log_transcript.py index 814144ec..fa24e4b6 100644 --- a/tests/unit_tests/agent/test_event_log_transcript.py +++ b/tests/unit_tests/agent/test_event_log_transcript.py @@ -2,8 +2,6 @@ from __future__ import annotations import pytest -from unittest.mock import Mock, MagicMock, patch -import datetime from langbot.pkg.agent.runner.host_models import ( AgentEventEnvelope, @@ -17,7 +15,6 @@ from langbot.pkg.agent.runner.event_log_store import EventLogStore from langbot.pkg.agent.runner.transcript_store import TranscriptStore from langbot.pkg.agent.runner.session_registry import get_session_registry from langbot_plugin.api.entities.builtin.agent_runner.event import ( - AgentEventContext, ActorContext, ) from langbot_plugin.api.entities.builtin.agent_runner.input import AgentInput @@ -386,9 +383,7 @@ class TestEventLogStoreRealSQLite: async def db_engine(self): """Create an in-memory SQLite database for testing.""" from sqlalchemy.ext.asyncio import create_async_engine - from sqlalchemy import text from langbot.pkg.entity.persistence.base import Base - from langbot.pkg.entity.persistence.event_log import EventLog engine = create_async_engine("sqlite+aiosqlite:///:memory:") @@ -483,9 +478,7 @@ class TestTranscriptStoreRealSQLite: async def db_engine(self): """Create an in-memory SQLite database for testing.""" from sqlalchemy.ext.asyncio import create_async_engine - from sqlalchemy import text from langbot.pkg.entity.persistence.base import Base - from langbot.pkg.entity.persistence.transcript import Transcript engine = create_async_engine("sqlite+aiosqlite:///:memory:") @@ -521,6 +514,44 @@ class TestTranscriptStoreRealSQLite: assert len(items) == 3 assert items[0]["conversation_id"] == "conv_001" + @pytest.mark.asyncio + async def test_get_legacy_provider_messages_projects_transcript_history(self, db_engine): + """Transcript is the canonical source; legacy Pipeline readers get a Message view.""" + store = TranscriptStore(db_engine) + + await store.append_transcript( + transcript_id="trans_view_001", + event_id="evt_view_001", + conversation_id="conv_view", + role="user", + content="User text", + content_json={ + "role": "user", + "content": [{"type": "text", "text": "User structured text"}], + }, + ) + await store.append_transcript( + transcript_id="trans_view_002", + event_id="evt_view_002", + conversation_id="conv_view", + role="tool", + item_type="tool_result", + content="ignored tool result", + ) + await store.append_transcript( + transcript_id="trans_view_003", + event_id="evt_view_003", + conversation_id="conv_view", + role="assistant", + content="Assistant text", + ) + + messages = await store.get_legacy_provider_messages("conv_view") + + assert [message.role for message in messages] == ["user", "assistant"] + assert messages[0].content[0].text == "User structured text" + assert messages[1].content == "Assistant text" + @pytest.mark.asyncio async def test_search_transcript_real_db(self, db_engine): """Test search_transcript with real DB.""" @@ -586,7 +617,7 @@ def mock_db_engine(): @pytest.fixture def mock_handler(): """Create a mock handler for testing actions.""" - from langbot_plugin.runtime.io.handler import Handler, ActionResponse + from langbot_plugin.runtime.io.handler import Handler class MockHandler(Handler): def __init__(self): diff --git a/tests/unit_tests/agent/test_orchestrator_integration.py b/tests/unit_tests/agent/test_orchestrator_integration.py index cf3deaf6..81557f87 100644 --- a/tests/unit_tests/agent/test_orchestrator_integration.py +++ b/tests/unit_tests/agent/test_orchestrator_integration.py @@ -593,6 +593,7 @@ class TestQueryEntryAdapterParams: context = plugin_connector.contexts[0] assert "prompt" not in context assert "prompt" not in context["adapter"]["extra"] + assert context["context"]["available_apis"]["prompt_get"] is True @pytest.mark.asyncio async def test_params_filtering_keeps_public_param(self, clean_agent_state): diff --git a/tests/unit_tests/pipeline/test_preproc.py b/tests/unit_tests/pipeline/test_preproc.py index 9620a1c1..9495b69c 100644 --- a/tests/unit_tests/pipeline/test_preproc.py +++ b/tests/unit_tests/pipeline/test_preproc.py @@ -50,6 +50,8 @@ def make_host_model_runner_descriptor( multimodal_input: bool = True, tool_calling: bool = True, knowledge_retrieval: bool = True, + skill_authoring: bool = False, + skill_injection: bool = False, ): from langbot.pkg.agent.runner.descriptor import AgentRunnerDescriptor @@ -69,6 +71,8 @@ def make_host_model_runner_descriptor( 'tool_calling': tool_calling, 'knowledge_retrieval': knowledge_retrieval, 'multimodal_input': multimodal_input, + 'skill_authoring': skill_authoring, + 'skill_injection': skill_injection, }, permissions={ 'models': ['list', 'invoke', 'stream'], diff --git a/tests/unit_tests/test_preproc.py b/tests/unit_tests/test_preproc.py index 3164f35b..079a5201 100644 --- a/tests/unit_tests/test_preproc.py +++ b/tests/unit_tests/test_preproc.py @@ -17,6 +17,32 @@ from langbot_plugin.api.entities.builtin.provider.prompt import Prompt from langbot_plugin.api.entities.builtin.provider.session import Conversation, LauncherTypes, Session +class _FakeRunnerDescriptor: + config_schema = [ + {'name': 'model', 'type': 'model-fallback-selector'}, + {'name': 'prompt', 'type': 'prompt-editor', 'default': []}, + {'name': 'knowledge-bases', 'type': 'knowledge-base-multi-selector', 'default': []}, + ] + permissions = { + 'models': ['list', 'invoke', 'stream'], + 'tools': ['list', 'detail', 'call'], + 'knowledge_bases': ['list', 'retrieve'], + } + capabilities = { + 'tool_calling': True, + 'knowledge_retrieval': True, + 'multimodal_input': True, + 'skill_authoring': True, + 'skill_injection': True, + } + + def supports_tool_calling(self): + return self.capabilities.get('tool_calling', False) + + def supports_knowledge_retrieval(self): + return self.capabilities.get('knowledge_retrieval', False) + + def _make_query() -> Query: message_chain = MessageChain([Plain(text='create a skill')]) return Query( @@ -34,11 +60,13 @@ def _make_query() -> Query: pipeline_uuid='pipe-1', pipeline_config={ 'ai': { - 'runner': {'runner': 'local-agent'}, - 'local-agent': { - 'model': {'primary': 'model-1', 'fallbacks': []}, - 'prompt': 'default', - 'knowledge-bases': [], + 'runner': {'id': 'plugin:langbot/local-agent/default'}, + 'runner_config': { + 'plugin:langbot/local-agent/default': { + 'model': {'primary': 'model-1', 'fallbacks': []}, + 'prompt': [], + 'knowledge-bases': [], + }, }, }, 'trigger': {'misc': {}}, @@ -57,6 +85,15 @@ def _make_conversation() -> Conversation: ) +async def _passthrough_preproc_event(event, bound_plugins): + return SimpleNamespace( + event=SimpleNamespace( + default_prompt=event.default_prompt, + prompt=event.prompt, + ) + ) + + def _make_app(*, skill_service) -> SimpleNamespace: session = Session(launcher_type=LauncherTypes.PERSON, launcher_id='launcher-1', sender_id='sender-1') conversation = _make_conversation() @@ -83,6 +120,7 @@ def _make_app(*, skill_service) -> SimpleNamespace: pipeline_service=SimpleNamespace( get_pipeline=AsyncMock(return_value={'extensions_preferences': {'enable_all_skills': True}}) ), + agent_runner_registry=SimpleNamespace(get=AsyncMock(return_value=_FakeRunnerDescriptor())), skill_mgr=SimpleNamespace( build_skill_aware_prompt_addition=Mock(return_value=''), skills={}, @@ -197,6 +235,49 @@ async def test_preproc_skips_injection_when_addendum_is_empty(): assert 'Available Skills' not in (query.prompt.messages[0].content or '') +@pytest.mark.asyncio +async def test_preproc_uses_transcript_history_view_when_available(): + preproc_module, entities_module = _import_preproc_modules() + + app = _make_app(skill_service=SimpleNamespace()) + conversation = app.sess_mgr.get_conversation.return_value + conversation.messages = [Message(role='user', content='legacy history')] + app.plugin_connector.emit_event = AsyncMock(side_effect=_passthrough_preproc_event) + + transcript_messages = [ + Message(role='user', content='from transcript user'), + Message(role='assistant', content='from transcript assistant'), + ] + + stage = preproc_module.PreProcessor(app) + stage._load_agent_runner_history_messages = AsyncMock(return_value=transcript_messages) + + query = _make_query() + result = await stage.process(query, 'PreProcessor') + + assert result.result_type == entities_module.ResultType.CONTINUE + assert query.messages == transcript_messages + + +@pytest.mark.asyncio +async def test_preproc_falls_back_to_conversation_messages_when_transcript_empty(): + preproc_module, entities_module = _import_preproc_modules() + + app = _make_app(skill_service=SimpleNamespace()) + legacy_messages = [Message(role='user', content='legacy history')] + app.sess_mgr.get_conversation.return_value.messages = legacy_messages + app.plugin_connector.emit_event = AsyncMock(side_effect=_passthrough_preproc_event) + + stage = preproc_module.PreProcessor(app) + stage._load_agent_runner_history_messages = AsyncMock(return_value=None) + + query = _make_query() + result = await stage.process(query, 'PreProcessor') + + assert result.result_type == entities_module.ResultType.CONTINUE + assert query.messages == legacy_messages + + async def stage_process_capture(preproc_module, app, query): """Run PreProcessor.process and return the result while keeping ``query`` accessible to the assertions (process mutates query in place)."""