diff --git a/src/langbot/pkg/platform/sources/discord.py b/src/langbot/pkg/platform/sources/discord.py index e9cc7a37e..0b22067c5 100644 --- a/src/langbot/pkg/platform/sources/discord.py +++ b/src/langbot/pkg/platform/sources/discord.py @@ -1,6 +1,7 @@ from __future__ import annotations import discord +from discord import ui as discord_ui import typing import re @@ -8,6 +9,8 @@ import base64 import uuid import os import datetime +import time +import traceback # 使用BytesIO创建文件对象,避免路径问题 import io @@ -824,6 +827,69 @@ class DiscordEventConverter(abstract_platform_adapter.AbstractEventConverter): ) +class DiscordFormView(discord_ui.View): + """Discord ``ui.View`` that renders one button per Dify form action. + + Each button's click triggers ``adapter._on_form_button_click`` which + acks the interaction, locks the buttons in place, and enqueues a + synthetic ``_dify_form_action`` query so the runner resumes the + workflow. + """ + + # Discord button style mapping for Dify ``button_style`` values. + _STYLE_MAP: typing.ClassVar[dict] = { + 'primary': discord.ButtonStyle.primary, + 'danger': discord.ButtonStyle.danger, + 'warning': discord.ButtonStyle.danger, + 'success': discord.ButtonStyle.success, + 'default': discord.ButtonStyle.secondary, + '': discord.ButtonStyle.secondary, + } + + def __init__( + self, + adapter: 'DiscordAdapter', + session_key: str, + actions: list, + timeout: float = 1800, + ): + super().__init__(timeout=timeout) + self._adapter = adapter + self._session_key = session_key + # Discord caps a view at 25 children (5 rows × 5 buttons). Trim + # silently — most Dify forms have ≤10 actions in practice. + for idx, action in enumerate(actions[:25]): + action_id = str(action.get('id') or '') + label = str(action.get('title') or action_id or f'Option {idx + 1}') + style = self._STYLE_MAP.get( + str(action.get('button_style') or '').lower(), + discord.ButtonStyle.secondary, + ) + # custom_id must be unique within the view and ≤100 chars. + # Encode (session, idx) so we can recover the action even + # if Dify ids contain unsafe characters. + custom_id = f'lb_form:{idx}:{action_id[:80]}'[:100] + button = discord_ui.Button( + label=label[:80], # Discord label limit + style=style, + custom_id=custom_id, + ) + button.callback = self._make_callback(action_id, label) + self.add_item(button) + + def _make_callback(self, action_id: str, action_title: str): + async def _cb(interaction: discord.Interaction): + await self._adapter._on_form_button_click( + interaction=interaction, + session_key=self._session_key, + action_id=action_id, + action_title=action_title, + view=self, + ) + + return _cb + + class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): bot: discord.Client = pydantic.Field(exclude=True) @@ -837,6 +903,10 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): voice_manager: VoiceConnectionManager | None = pydantic.Field(exclude=True, default=None) + # Injected by botmgr at construction so the form-button callback can + # enqueue a synthetic resume query (`_dify_form_action`) on the pool. + ap: typing.Any = pydantic.Field(exclude=True, default=None) + def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs): bot_account_id = config['client_id'] @@ -860,8 +930,18 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): args = {} - if os.getenv('http_proxy'): - args['proxy'] = os.getenv('http_proxy') + # Proxy: config > env var > auto-detect. + # discord.py uses aiohttp which does NOT respect http_proxy env + # vars by default — we must pass proxy= explicitly. + proxy = ( + config.get('proxy') + or os.getenv('http_proxy') + or os.getenv('HTTP_PROXY') + or os.getenv('https_proxy') + or os.getenv('HTTPS_PROXY') + ) + if proxy: + args['proxy'] = proxy bot = MyClient(intents=intents, **args) @@ -875,6 +955,19 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): **kwargs, ) + # Per-resp-message-id buffer for the accumulated text yielded by + # the runner. Discord's edit-message ratelimit (5/5s) makes true + # progressive streaming impractical, so we collect chunks and + # render once on is_final. ``_form_data`` on the final chunk + # diverts to the button-view path. + self._stream_buffer: dict[str, str] = {} + # session_key -> {form_data, channel_id, thread_id, sender_id, + # posted_at, view_message_id} + # Populated when we send a form view; consumed when the user + # clicks a button so we know which workflow_run / form_token to + # resume. + self._pending_forms: dict[str, dict] = {} + # Voice functionality methods async def join_voice_channel(self, guild_id: int, channel_id: int, user_id: int = None) -> discord.VoiceClient: """ @@ -1068,7 +1161,12 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): ): msg_to_send, files = await self.message_converter.yiri2target(message) - assert isinstance(message_source.source_platform_object, discord.Message) + # Synthetic events (button-click resume) have no inbound discord + # Message. Route via the channel we cached when the user clicked. + source = message_source.source_platform_object + if not isinstance(source, discord.Message): + await self._reply_synthetic(message_source, msg_to_send, files) + return args = { 'content': msg_to_send, @@ -1078,7 +1176,7 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): args['files'] = files if quote_origin: - args['reference'] = message_source.source_platform_object + args['reference'] = source has_at = False @@ -1090,7 +1188,413 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): if has_at: args['mention_author'] = True - await message_source.source_platform_object.channel.send(**args) + await source.channel.send(**args) + + async def _reply_synthetic( + self, + message_source: platform_events.MessageEvent, + msg_to_send: str, + files: list, + ) -> None: + """Deliver a reply for a button-click-resumed (synthetic) event. + + We don't have an inbound discord.Message to anchor to; instead + look up the channel cached in ``_pending_forms[session_key + + '__last_channel']`` from the most recent button click. + """ + if isinstance(message_source, platform_events.GroupMessage): + # _handle_form_chunk uses channel_id alone as the session + # scope, and launcher_id was set to channel_id when + # synthesizing the event. + session_key = f'c:{message_source.group.id}' + else: + session_key = f'p:{message_source.sender.id}' + + cached = self._pending_forms.get(session_key + '__last_channel') or {} + channel = cached.get('channel') + if channel is None: + if self.ap is not None: + self.ap.logger.warning( + f'Discord: synthetic reply has no cached channel for ' + f'{session_key}; dropping content (len={len(msg_to_send)})' + ) + return + + args: dict[str, typing.Any] = {'content': msg_to_send} + if files: + args['files'] = files + try: + await channel.send(**args) + except Exception: + if self.ap is not None: + self.ap.logger.error(f'Discord: synthetic reply send failed: {traceback.format_exc()}') + + # Discord allows 5 edits per 5 seconds per message. We throttle + # to one edit per 8 runner-chunks (runner already yields every 8 + # text_chunks internally), which stays comfortably within limits. + _STREAM_EDIT_INTERVAL = 8 + + async def is_stream_output_supported(self) -> bool: + return True + + async def create_message_card(self, message_id: str, event: platform_events.MessageEvent) -> bool: + """Set up a stream context for progressive editing. + + The first non-empty reply_message_chunk will send the initial + message; subsequent chunks edit it in place. + """ + source = event.source_platform_object + if not isinstance(source, discord.Message): + return False + self._stream_buffer[message_id] = { + 'channel': source.channel, + 'sent_message': None, # discord.Message set on first send + 'last_content': '', + 'chunk_count': 0, + } + return True + + async def reply_message_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message: typing.Any, + message: platform_message.MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ): + msg_id = ( + bot_message.get('resp_message_id') + if isinstance(bot_message, dict) + else getattr(bot_message, 'resp_message_id', None) + ) + + text_parts = [m.text for m in message if isinstance(m, platform_message.Plain)] + chunk_text = '\n\n'.join(t for t in text_parts if t) + + form_data = getattr(bot_message, '_form_data', None) if not isinstance(bot_message, dict) else None + + ctx = self._stream_buffer.get(msg_id) if msg_id else None + + # If the stream ctx was not set up (create_message_card wasn't + # called, e.g. synthetic event), or the final chunk carries a + # form, skip progressive editing entirely. + if ctx is None or form_data: + try: + if form_data and is_final: + await self._handle_form_chunk(message_source, form_data) + elif is_final and chunk_text: + await self.reply_message( + message_source, + platform_message.MessageChain([platform_message.Plain(text=chunk_text)]), + quote_origin, + ) + finally: + self._stream_buffer.pop(msg_id, None) + return + + # Progressive streaming path: send first chunk, edit subsequent. + ctx['chunk_count'] += 1 + + # Runner yields the full accumulated text on each chunk, so we + # always replace (not append). + if chunk_text: + ctx['last_content'] = chunk_text + + sent = ctx['sent_message'] + + if sent is None: + # First non-empty chunk — send the initial message. + if not ctx['last_content']: + return # No content yet, wait for next chunk. + try: + sent = await ctx['channel'].send(ctx['last_content']) + ctx['sent_message'] = sent + except Exception: + if self.ap is not None: + self.ap.logger.error(f'Discord stream send failed: {traceback.format_exc()}') + self._stream_buffer.pop(msg_id, None) + return + + if is_final: + # Final chunk — edit to the full content, then clean up. + if ctx['last_content'] and ctx['last_content'] != sent.content: + try: + await sent.edit(content=ctx['last_content'][:2000]) + except Exception: + pass # Best-effort + self._stream_buffer.pop(msg_id, None) + elif (ctx['chunk_count'] % self._STREAM_EDIT_INTERVAL) == 0: + # Intermediate edit — throttle to avoid rate limits. + if ctx['last_content'] and ctx['last_content'] != sent.content: + try: + await sent.edit(content=ctx['last_content'][:2000]) + except Exception: + pass # Rate-limited or deleted — ignore. + + async def _handle_form_chunk( + self, + message_source: platform_events.MessageEvent, + form_data: dict, + ) -> None: + """Render a Dify form pause as a Discord embed + button View. + + Mirrors the QQ / Telegram / Lark form path: the button's click + callback synthesizes a ``_dify_form_action`` query so the runner's + ``_merge_pending_form_action`` resumes the workflow. + """ + source = message_source.source_platform_object + + actions = form_data.get('actions') or [] + if not actions: + # Nothing clickable — fall back to plain text. + if source is not None: + await self.reply_message( + message_source, + platform_message.MessageChain( + [platform_message.Plain(text=str(form_data.get('node_title') or ''))] + ), + ) + return + + node_title = str(form_data.get('node_title') or 'Confirmation needed') + form_content = str(form_data.get('form_content') or '').strip() + + # Two paths: + # (a) Real message — extract channel from source. + # (b) Synthetic event (button-click resume) — no + # source_platform_object; recover the channel we cached + # when the user clicked. + if isinstance(source, discord.Message): + channel = source.channel + guild_id = str(source.guild.id) if source.guild else '' + sender_id = str(source.author.id) + channel_id = str(source.channel.id) + session_key = f'c:{channel_id}' if guild_id else f'p:{sender_id}' + else: + # Synthetic event — resolve session_key from event shape, + # then look up the cached channel from the click. + if isinstance(message_source, platform_events.GroupMessage): + # launcher_id was set to channel_id when we synthesized. + channel_id = str(message_source.group.id) + session_key = f'c:{channel_id}' + else: + session_key = f'p:{message_source.sender.id}' + channel_id = '' + + cached = self._pending_forms.get(session_key + '__last_channel') + channel = cached.get('channel') if cached else None + guild_id = (cached or {}).get('guild_id', '') + sender_id = str(message_source.sender.id) if message_source.sender else '' + if channel is None: + if self.ap is not None: + self.ap.logger.warning( + f'Discord: synthetic form chunk has no cached channel for ' + f'{session_key}; cannot render form buttons' + ) + return + + body_parts: list[str] = [] + if form_content: + body_parts.append(form_content) + body_parts.append('Please select an option below:') + embed_body = '\n\n'.join(body_parts) + # Discord embed.description has a 4096 char limit — defensive trim. + if len(embed_body) > 4000: + embed_body = embed_body[:3990] + '\n\n…(truncated)' + + embed = discord.Embed( + title=node_title[:256], + description=embed_body, + color=discord.Color.blurple(), + ) + + view = DiscordFormView( + adapter=self, + session_key=session_key, + actions=actions, + timeout=1800, # 30 min — matches Dify form_token TTL + ) + + try: + sent_msg = await channel.send(embed=embed, view=view) + except Exception: + if self.ap is not None: + self.ap.logger.error(f'Discord: form view send failed: {traceback.format_exc()}') + return + + self._pending_forms[session_key] = { + 'form_data': form_data, + 'channel_id': channel_id, + 'guild_id': guild_id, + 'sender_id': sender_id, + 'view_message_id': str(sent_msg.id), + 'posted_at': time.time(), + } + + if self.ap is not None: + self.ap.logger.info(f'Discord: form view posted session={session_key} actions={len(actions)}') + + async def _on_form_button_click( + self, + interaction: discord.Interaction, + session_key: str, + action_id: str, + action_title: str, + view: DiscordFormView, + ) -> None: + """Handle a click on a form button — ack, resume the workflow, + and disable the View buttons so the choice is visually locked in.""" + import langbot_plugin.api.entities.builtin.provider.session as provider_session + + # ACK first (3-second deadline before Discord shows "interaction failed"). + try: + await interaction.response.defer() + except discord.HTTPException: + # Already responded somehow — proceed regardless. + pass + + pending = self._pending_forms.pop(session_key, None) + if not pending: + if self.ap is not None: + self.ap.logger.warning( + f'Discord: button click on stale session {session_key}; ignoring (action_id={action_id!r})' + ) + await self._lock_view_message(interaction, view, action_title, stale=True) + return + + # Lock the buttons in place: disable everything, mark chosen one. + await self._lock_view_message(interaction, view, action_title) + + form_data: dict = pending.get('form_data') or {} + guild_id = pending.get('guild_id', '') + channel_id = pending.get('channel_id', '') + sender_id = pending.get('sender_id', '') + + # In group context the launcher is the CHANNEL (not the user who + # clicked) — matches how the original message was routed through + # the pipeline. Using the clicker's user id would mismatch the + # Dify session and produce "Workflow run not found". + if guild_id: + launcher_type = provider_session.LauncherTypes.GROUP + launcher_id = channel_id + else: + launcher_type = provider_session.LauncherTypes.PERSON + launcher_id = sender_id or str(interaction.user.id) + + form_action_data = { + 'form_token': form_data.get('form_token', ''), + 'workflow_run_id': form_data.get('workflow_run_id', ''), + 'action_id': action_id, + 'action_title': action_title, + 'node_title': form_data.get('node_title', ''), + 'user': f'{launcher_type.value}_{launcher_id}', + 'inputs': {}, + } + + message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')]) + + # Synthesize a platform event so the pipeline can run the resume + # query. source_platform_object=None signals "no inbound discord + # message" — reply_message must tolerate this (it falls through + # to channel.send via the cached interaction.channel below). + if launcher_type == provider_session.LauncherTypes.GROUP: + synthetic_event: platform_events.MessageEvent = platform_events.GroupMessage( + sender=platform_entities.GroupMember( + id=sender_id, + member_name=interaction.user.display_name if interaction.user else '', + permission='MEMBER', + group=platform_entities.Group( + id=launcher_id, + name=channel_id, + permission=platform_entities.Permission.Member, + ), + special_title='', + ), + message_chain=message_chain, + time=int(time.time()), + source_platform_object=None, + ) + else: + synthetic_event = platform_events.FriendMessage( + sender=platform_entities.Friend( + id=sender_id, + nickname=interaction.user.display_name if interaction.user else '', + remark='', + ), + message_chain=message_chain, + time=int(time.time()), + source_platform_object=None, + ) + + if self.ap is None: + if self.logger: + await self.logger.error('Discord: ap not injected; cannot enqueue button-click query') + return + + bot_uuid = '' + pipeline_uuid = None + for bot in self.ap.platform_mgr.bots: + if bot.adapter is self: + bot_uuid = bot.bot_entity.uuid + pipeline_uuid = bot.bot_entity.use_pipeline_uuid + break + + # Remember the channel so _reply_synthetic and _handle_form_chunk + # (synthetic-event path) can find a target. guild_id is needed + # to reconstruct the launcher_type on subsequent form pauses. + self._pending_forms[session_key + '__last_channel'] = { + 'channel': interaction.channel, + 'guild_id': guild_id, + 'posted_at': time.time(), + } + + try: + await self.ap.query_pool.add_query( + bot_uuid=bot_uuid, + launcher_type=launcher_type, + launcher_id=launcher_id, + sender_id=sender_id, + message_event=synthetic_event, + message_chain=message_chain, + adapter=self, + pipeline_uuid=pipeline_uuid, + variables={ + '_dify_form_action': form_action_data, + '_routed_by_rule': True, + }, + ) + if self.ap is not None: + self.ap.logger.info( + f'Discord: button-click query enqueued action_id={action_id!r} session={session_key}' + ) + except Exception: + if self.ap is not None: + self.ap.logger.error(f'Discord: enqueue button-click query failed: {traceback.format_exc()}') + + async def _lock_view_message( + self, + interaction: discord.Interaction, + view: DiscordFormView, + chosen_title: str, + stale: bool = False, + ) -> None: + """Disable all buttons on the form view and annotate the chosen + one — mirrors DingTalk/Lark's in-card 已选择 feedback.""" + try: + for child in view.children: + if not isinstance(child, discord_ui.Button): + continue + child.disabled = True + if not stale and child.label == chosen_title: + child.style = discord.ButtonStyle.success + if not (child.label or '').startswith('✓ '): + child.label = f'✓ {child.label}' + view.stop() + if interaction.message is not None: + await interaction.message.edit(view=view) + except Exception: + if self.ap is not None: + self.ap.logger.warning(f'Discord: lock-view-message failed (non-fatal): {traceback.format_exc()}') async def is_muted(self, group_id: int) -> bool: return False