feat(platform): add skip_pipeline parameter for webhook responses (#1837)

* feat(platform): add skip_pipeline parameter for webhook responses

Add support for skip_pipeline parameter in webhook responses, allowing
webhook targets to instruct LangBot to skip pipeline processing for
specific messages. When a webhook responds with skip_pipeline=true,
the message is treated as a notification only and bypasses the query pool.

Changes:
- webhook_pusher.py: Parse JSON responses and return skip_pipeline flag
- botmgr.py: Check skip_pipeline before adding messages to query pool
- docker-compose.yaml: Add DNS configuration to fix container networking

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: webhook crud bug

* chore: revert docker-compose.yaml

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Junyan Qin (Chin)
2025-12-04 13:40:26 +08:00
committed by GitHub
parent 905e48c8ed
commit b4f92eba38
3 changed files with 133 additions and 36 deletions
+36 -26
View File
@@ -66,22 +66,27 @@ class RuntimeBot:
message_session_id=f'person_{event.sender.id}',
)
# Push to webhooks
# Push to webhooks and check if pipeline should be skipped
skip_pipeline = False
if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher:
asyncio.create_task(
self.ap.webhook_pusher.push_person_message(event, self.bot_entity.uuid, adapter.__class__.__name__)
skip_pipeline = await self.ap.webhook_pusher.push_person_message(
event, self.bot_entity.uuid, adapter.__class__.__name__
)
await self.ap.query_pool.add_query(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
)
# Only add to query pool if no webhook requested to skip pipeline
if not skip_pipeline:
await self.ap.query_pool.add_query(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=event.sender.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
)
else:
await self.logger.info(f'Pipeline skipped for person message due to webhook response')
async def on_group_message(
event: platform_events.GroupMessage,
@@ -97,22 +102,27 @@ class RuntimeBot:
message_session_id=f'group_{event.group.id}',
)
# Push to webhooks
# Push to webhooks and check if pipeline should be skipped
skip_pipeline = False
if hasattr(self.ap, 'webhook_pusher') and self.ap.webhook_pusher:
asyncio.create_task(
self.ap.webhook_pusher.push_group_message(event, self.bot_entity.uuid, adapter.__class__.__name__)
skip_pipeline = await self.ap.webhook_pusher.push_group_message(
event, self.bot_entity.uuid, adapter.__class__.__name__
)
await self.ap.query_pool.add_query(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.GROUP,
launcher_id=event.group.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
)
# Only add to query pool if no webhook requested to skip pipeline
if not skip_pipeline:
await self.ap.query_pool.add_query(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.GROUP,
launcher_id=event.group.id,
sender_id=event.sender.id,
message_event=event,
message_chain=event.message_chain,
adapter=adapter,
pipeline_uuid=self.bot_entity.use_pipeline_uuid,
)
else:
await self.logger.info(f'Pipeline skipped for group message due to webhook response')
self.adapter.register_listener(platform_events.FriendMessage, on_friend_message)
self.adapter.register_listener(platform_events.GroupMessage, on_group_message)