From b4f92eba38b974438a1526249dd8a16830b05e0e Mon Sep 17 00:00:00 2001 From: "Junyan Qin (Chin)" Date: Thu, 4 Dec 2025 13:40:26 +0800 Subject: [PATCH] feat(platform): add skip_pipeline parameter for webhook responses (#1837) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(platform): add skip_pipeline parameter for webhook responses Add support for skip_pipeline parameter in webhook responses, allowing webhook targets to instruct LangBot to skip pipeline processing for specific messages. When a webhook responds with skip_pipeline=true, the message is treated as a notification only and bypasses the query pool. Changes: - webhook_pusher.py: Parse JSON responses and return skip_pipeline flag - botmgr.py: Check skip_pipeline before adding messages to query pool - docker-compose.yaml: Add DNS configuration to fix container networking 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fix: webhook crud bug * chore: revert docker-compose.yaml --------- Co-authored-by: Claude --- .../http/controller/groups/webhook_mgmt.py | 49 +++++++++++++++ src/langbot/pkg/platform/botmgr.py | 62 +++++++++++-------- src/langbot/pkg/platform/webhook_pusher.py | 58 ++++++++++++++--- 3 files changed, 133 insertions(+), 36 deletions(-) create mode 100644 src/langbot/pkg/api/http/controller/groups/webhook_mgmt.py diff --git a/src/langbot/pkg/api/http/controller/groups/webhook_mgmt.py b/src/langbot/pkg/api/http/controller/groups/webhook_mgmt.py new file mode 100644 index 00000000..f82184c1 --- /dev/null +++ b/src/langbot/pkg/api/http/controller/groups/webhook_mgmt.py @@ -0,0 +1,49 @@ +import quart + +from .. import group + + +@group.group_class('webhook_mgmt', '/api/v1/webhooks') +class WebhookManagementRouterGroup(group.RouterGroup): + async def initialize(self) -> None: + @self.route('', methods=['GET', 'POST']) + async def _() -> str: + if quart.request.method == 'GET': + webhooks = await self.ap.webhook_service.get_webhooks() + return self.success(data={'webhooks': webhooks}) + elif quart.request.method == 'POST': + json_data = await quart.request.json + name = json_data.get('name', '') + url = json_data.get('url', '') + description = json_data.get('description', '') + enabled = json_data.get('enabled', True) + + if not name: + return self.http_status(400, -1, 'Name is required') + if not url: + return self.http_status(400, -1, 'URL is required') + + webhook = await self.ap.webhook_service.create_webhook(name, url, description, enabled) + return self.success(data={'webhook': webhook}) + + @self.route('/', methods=['GET', 'PUT', 'DELETE']) + async def _(webhook_id: int) -> str: + if quart.request.method == 'GET': + webhook = await self.ap.webhook_service.get_webhook(webhook_id) + if webhook is None: + return self.http_status(404, -1, 'Webhook not found') + return self.success(data={'webhook': webhook}) + + elif quart.request.method == 'PUT': + json_data = await quart.request.json + name = json_data.get('name') + url = json_data.get('url') + description = json_data.get('description') + enabled = json_data.get('enabled') + + await self.ap.webhook_service.update_webhook(webhook_id, name, url, description, enabled) + return self.success() + + elif quart.request.method == 'DELETE': + await self.ap.webhook_service.delete_webhook(webhook_id) + return self.success() diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index 6e2f20b0..b9d7a5fe 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -66,22 +66,27 @@ class RuntimeBot: message_session_id=f'person_{event.sender.id}', ) - # Push to webhooks + # Push to webhooks and check if pipeline should be skipped + skip_pipeline = False if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher: - asyncio.create_task( - self.ap.webhook_pusher.push_person_message(event, self.bot_entity.uuid, adapter.__class__.__name__) + skip_pipeline = await self.ap.webhook_pusher.push_person_message( + event, self.bot_entity.uuid, adapter.__class__.__name__ ) - await self.ap.query_pool.add_query( - bot_uuid=self.bot_entity.uuid, - launcher_type=provider_session.LauncherTypes.PERSON, - launcher_id=event.sender.id, - sender_id=event.sender.id, - message_event=event, - message_chain=event.message_chain, - adapter=adapter, - pipeline_uuid=self.bot_entity.use_pipeline_uuid, - ) + # Only add to query pool if no webhook requested to skip pipeline + if not skip_pipeline: + await self.ap.query_pool.add_query( + bot_uuid=self.bot_entity.uuid, + launcher_type=provider_session.LauncherTypes.PERSON, + launcher_id=event.sender.id, + sender_id=event.sender.id, + message_event=event, + message_chain=event.message_chain, + adapter=adapter, + pipeline_uuid=self.bot_entity.use_pipeline_uuid, + ) + else: + await self.logger.info(f'Pipeline skipped for person message due to webhook response') async def on_group_message( event: platform_events.GroupMessage, @@ -97,22 +102,27 @@ class RuntimeBot: message_session_id=f'group_{event.group.id}', ) - # Push to webhooks + # Push to webhooks and check if pipeline should be skipped + skip_pipeline = False if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher: - asyncio.create_task( - self.ap.webhook_pusher.push_group_message(event, self.bot_entity.uuid, adapter.__class__.__name__) + skip_pipeline = await self.ap.webhook_pusher.push_group_message( + event, self.bot_entity.uuid, adapter.__class__.__name__ ) - await self.ap.query_pool.add_query( - bot_uuid=self.bot_entity.uuid, - launcher_type=provider_session.LauncherTypes.GROUP, - launcher_id=event.group.id, - sender_id=event.sender.id, - message_event=event, - message_chain=event.message_chain, - adapter=adapter, - pipeline_uuid=self.bot_entity.use_pipeline_uuid, - ) + # Only add to query pool if no webhook requested to skip pipeline + if not skip_pipeline: + await self.ap.query_pool.add_query( + bot_uuid=self.bot_entity.uuid, + launcher_type=provider_session.LauncherTypes.GROUP, + launcher_id=event.group.id, + sender_id=event.sender.id, + message_event=event, + message_chain=event.message_chain, + adapter=adapter, + pipeline_uuid=self.bot_entity.use_pipeline_uuid, + ) + else: + await self.logger.info(f'Pipeline skipped for group message due to webhook response') self.adapter.register_listener(platform_events.FriendMessage, on_friend_message) self.adapter.register_listener(platform_events.GroupMessage, on_group_message) diff --git a/src/langbot/pkg/platform/webhook_pusher.py b/src/langbot/pkg/platform/webhook_pusher.py index ab34bfad..15d25733 100644 --- a/src/langbot/pkg/platform/webhook_pusher.py +++ b/src/langbot/pkg/platform/webhook_pusher.py @@ -22,12 +22,16 @@ class WebhookPusher: self.ap = ap self.logger = self.ap.logger - async def push_person_message(self, event: platform_events.FriendMessage, bot_uuid: str, adapter_name: str) -> None: - """Push person message event to webhooks""" + async def push_person_message(self, event: platform_events.FriendMessage, bot_uuid: str, adapter_name: str) -> bool: + """Push person message event to webhooks + + Returns: + bool: True if any webhook responded with skip_pipeline=true, False otherwise + """ try: webhooks = await self.ap.webhook_service.get_enabled_webhooks() if not webhooks: - return + return False # Build payload payload = { @@ -47,17 +51,30 @@ class WebhookPusher: # Push to all webhooks asynchronously tasks = [self._push_to_webhook(webhook['url'], payload) for webhook in webhooks] - await asyncio.gather(*tasks, return_exceptions=True) + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Check if any webhook responded with skip_pipeline=true + for result in results: + if isinstance(result, dict) and result.get('skip_pipeline') is True: + self.logger.info(f'Webhook responded with skip_pipeline=true, skipping pipeline for person message') + return True + + return False except Exception as e: self.logger.error(f'Failed to push person message to webhooks: {e}') + return False - async def push_group_message(self, event: platform_events.GroupMessage, bot_uuid: str, adapter_name: str) -> None: - """Push group message event to webhooks""" + async def push_group_message(self, event: platform_events.GroupMessage, bot_uuid: str, adapter_name: str) -> bool: + """Push group message event to webhooks + + Returns: + bool: True if any webhook responded with skip_pipeline=true, False otherwise + """ try: webhooks = await self.ap.webhook_service.get_enabled_webhooks() if not webhooks: - return + return False # Build payload payload = { @@ -81,13 +98,26 @@ class WebhookPusher: # Push to all webhooks asynchronously tasks = [self._push_to_webhook(webhook['url'], payload) for webhook in webhooks] - await asyncio.gather(*tasks, return_exceptions=True) + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Check if any webhook responded with skip_pipeline=true + for result in results: + if isinstance(result, dict) and result.get('skip_pipeline') is True: + self.logger.info(f'Webhook responded with skip_pipeline=true, skipping pipeline for group message') + return True + + return False except Exception as e: self.logger.error(f'Failed to push group message to webhooks: {e}') + return False - async def _push_to_webhook(self, url: str, payload: dict) -> None: - """Push payload to a single webhook URL""" + async def _push_to_webhook(self, url: str, payload: dict) -> dict | None: + """Push payload to a single webhook URL + + Returns: + dict | None: The response JSON if successful, None otherwise + """ try: async with aiohttp.ClientSession() as session: async with session.post( @@ -98,9 +128,17 @@ class WebhookPusher: ) as response: if response.status >= 400: self.logger.warning(f'Webhook {url} returned status {response.status}') + return None else: self.logger.debug(f'Successfully pushed to webhook {url}') + try: + return await response.json() + except Exception as json_error: + self.logger.debug(f'Failed to parse JSON response from webhook {url}: {json_error}') + return None except asyncio.TimeoutError: self.logger.warning(f'Timeout pushing to webhook {url}') + return None except Exception as e: self.logger.warning(f'Error pushing to webhook {url}: {e}') + return None