diff --git a/skills/scripts/e2e/ensure-acp-agent-runner-pipeline.mjs b/skills/scripts/e2e/ensure-acp-agent-runner-pipeline.mjs index 1278fcd1b..e8b2e515b 100644 --- a/skills/scripts/e2e/ensure-acp-agent-runner-pipeline.mjs +++ b/skills/scripts/e2e/ensure-acp-agent-runner-pipeline.mjs @@ -102,7 +102,7 @@ try { }); Object.assign(result, prepared); if (result.pipeline_id) { - result.pipeline_url = `${frontendUrl.replace(/\/$/, "")}/home/pipelines?id=${encodeURIComponent(result.pipeline_id)}`; + result.pipeline_url = `${frontendUrl.replace(/\/$/, "")}/home/agents?id=${encodeURIComponent(result.pipeline_id)}`; } if (writeEnv && result.pipeline_id) { diff --git a/skills/scripts/e2e/ensure-local-agent-pipeline.mjs b/skills/scripts/e2e/ensure-local-agent-pipeline.mjs index 0962c6bf5..6dffacf27 100644 --- a/skills/scripts/e2e/ensure-local-agent-pipeline.mjs +++ b/skills/scripts/e2e/ensure-local-agent-pipeline.mjs @@ -89,7 +89,7 @@ try { }); Object.assign(result, prepared); if (result.pipeline_id) { - result.pipeline_url = `${frontendUrl.replace(/\/$/, "")}/home/pipelines?id=${encodeURIComponent(result.pipeline_id)}`; + result.pipeline_url = `${frontendUrl.replace(/\/$/, "")}/home/agents?id=${encodeURIComponent(result.pipeline_id)}`; } if (writeEnv && result.pipeline_id) { diff --git a/skills/scripts/e2e/ensure-qa-agent-runner-pipeline.mjs b/skills/scripts/e2e/ensure-qa-agent-runner-pipeline.mjs index abc43241a..7346eff29 100644 --- a/skills/scripts/e2e/ensure-qa-agent-runner-pipeline.mjs +++ b/skills/scripts/e2e/ensure-qa-agent-runner-pipeline.mjs @@ -74,7 +74,7 @@ try { }); Object.assign(result, prepared); if (result.pipeline_id) { - result.pipeline_url = `${frontendUrl.replace(/\/$/, "")}/home/pipelines?id=${encodeURIComponent(result.pipeline_id)}`; + result.pipeline_url = `${frontendUrl.replace(/\/$/, "")}/home/agents?id=${encodeURIComponent(result.pipeline_id)}`; } if (writeEnv && result.pipeline_id) { diff --git a/skills/test/lbs-cli.test.ts b/skills/test/lbs-cli.test.ts index d0fac2ee6..d38c55d44 100644 --- a/skills/test/lbs-cli.test.ts +++ b/skills/test/lbs-cli.test.ts @@ -1448,7 +1448,7 @@ test("generic pipeline readiness accepts either URL or name target", () => { LANGBOT_BROWSER_PROFILE: "/tmp/langbot-test-profile", LANGBOT_CHROMIUM_EXECUTABLE: "/tmp/langbot-test-chromium", }, () => { - process.env.LANGBOT_PIPELINE_URL = "http://127.0.0.1:3000/home/pipelines?id=only-url"; + process.env.LANGBOT_PIPELINE_URL = "http://127.0.0.1:3000/home/agents?id=only-url"; process.env.LANGBOT_PIPELINE_NAME = ""; const ready = capture(() => commandTestPlan(ctx(["test", "plan", "pipeline-debug-chat", "--json"]))); diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 244f57c04..d539fbdd0 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -922,17 +922,31 @@ class RuntimeBot: ) async def initialize(self): + def websocket_pipeline_uuid(event, adapter): + if adapter.__class__.__name__ != 'WebSocketAdapter': + return None + value = getattr(event, '_langbot_pipeline_uuid', None) + return value if isinstance(value, str) and value else None + async def on_friend_message( event: platform_events.FriendMessage, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, ): - await self._handle_legacy_message_event(event, adapter) + await self._handle_legacy_message_event( + event, + adapter, + pipeline_uuid_override=websocket_pipeline_uuid(event, adapter), + ) async def on_group_message( event: platform_events.GroupMessage, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, ): - await self._handle_legacy_message_event(event, adapter) + await self._handle_legacy_message_event( + event, + adapter, + pipeline_uuid_override=websocket_pipeline_uuid(event, adapter), + ) self.adapter.register_listener(platform_events.FriendMessage, on_friend_message) self.adapter.register_listener(platform_events.GroupMessage, on_group_message) diff --git a/src/langbot/pkg/platform/sources/websocket_adapter.py b/src/langbot/pkg/platform/sources/websocket_adapter.py index 0574292f3..c34027792 100644 --- a/src/langbot/pkg/platform/sources/websocket_adapter.py +++ b/src/langbot/pkg/platform/sources/websocket_adapter.py @@ -447,6 +447,8 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter) sender=sender, message_chain=message_chain, time=datetime.now().timestamp() ) + object.__setattr__(event, '_langbot_pipeline_uuid', pipeline_uuid) + # 设置流水线UUID (proxy bot always needs it for reply_message routing) self.ap.platform_mgr.websocket_proxy_bot.bot_entity.use_pipeline_uuid = pipeline_uuid if owner_bot is not None: diff --git a/tests/unit_tests/platform/test_websocket_adapter_attachments.py b/tests/unit_tests/platform/test_websocket_adapter_attachments.py index 18138383d..71bce9024 100644 --- a/tests/unit_tests/platform/test_websocket_adapter_attachments.py +++ b/tests/unit_tests/platform/test_websocket_adapter_attachments.py @@ -10,11 +10,17 @@ type and graceful error handling. from __future__ import annotations import base64 +import asyncio +from types import SimpleNamespace from unittest.mock import AsyncMock, Mock import pytest -from langbot.pkg.platform.sources.websocket_adapter import WebSocketAdapter +import langbot_plugin.api.entities.builtin.platform.events as platform_events +import langbot_plugin.api.entities.builtin.platform.entities as platform_entities +import langbot_plugin.api.entities.builtin.platform.message as platform_message +from langbot.pkg.platform.botmgr import RuntimeBot +from langbot.pkg.platform.sources.websocket_adapter import WebSocketAdapter, WebSocketSession def _make_adapter(load_return=b'hello', load_side_effect=None): @@ -90,3 +96,64 @@ async def test_load_failure_is_logged_not_raised(): await adapter._process_image_components(chain) assert 'base64' not in chain[0] adapter.logger.error.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_handle_websocket_message_marks_event_with_pipeline_uuid(): + adapter, _ = _make_adapter() + adapter.websocket_person_session = WebSocketSession(id='websocketperson') + adapter.listeners = {} + adapter.listeners[platform_events.FriendMessage] = AsyncMock() + adapter.ap.platform_mgr.websocket_proxy_bot.bot_entity.use_pipeline_uuid = '' + + connection = SimpleNamespace( + pipeline_uuid='pipeline-123', + session_type='person', + connection_id='conn-1', + ) + + await adapter.handle_websocket_message( + connection, + {'message': [{'type': 'Plain', 'text': 'hello'}], 'stream': True}, + ) + await asyncio.sleep(0) + + event = adapter.listeners[platform_events.FriendMessage].await_args.args[0] + assert getattr(event, '_langbot_pipeline_uuid') == 'pipeline-123' + + +@pytest.mark.asyncio +async def test_runtime_bot_websocket_listener_uses_event_pipeline_uuid(): + app = Mock() + app.msg_aggregator.add_message = AsyncMock() + app.webhook_pusher = None + logger = Mock() + logger.info = AsyncMock() + logger.warning = AsyncMock() + logger.error = AsyncMock() + bot_entity = Mock() + bot_entity.uuid = 'websocket-proxy-bot' + bot_entity.enable = True + bot_entity.use_pipeline_uuid = '' + adapter = WebSocketAdapter.model_construct( + ap=app, + logger=Mock(error=AsyncMock()), + listeners={}, + websocket_person_session=WebSocketSession(id='websocketperson'), + websocket_group_session=WebSocketSession(id='websocketgroup'), + ) + bot = RuntimeBot(ap=app, bot_entity=bot_entity, adapter=adapter, logger=logger) + + await bot.initialize() + + event = platform_events.FriendMessage( + sender=platform_entities.Friend(id='sender-1', nickname='User', remark='User'), + message_chain=platform_message.MessageChain([platform_message.Plain(text='hello')]), + time=1, + ) + object.__setattr__(event, '_langbot_pipeline_uuid', 'pipeline-123') + + await adapter.listeners[platform_events.FriendMessage](event, adapter) + + app.msg_aggregator.add_message.assert_awaited_once() + assert app.msg_aggregator.add_message.await_args.kwargs['pipeline_uuid'] == 'pipeline-123'