feat(discord): implement Discord form view with button interactions for Dify actions

This commit is contained in:
fdc310
2026-06-17 11:25:59 +08:00
parent b55f073e62
commit b1750882ed
+509 -5
View File
@@ -1,6 +1,7 @@
from __future__ import annotations
import discord
from discord import ui as discord_ui
import typing
import re
@@ -8,6 +9,8 @@ import base64
import uuid
import os
import datetime
import time
import traceback
# 使用BytesIO创建文件对象,避免路径问题
import io
@@ -824,6 +827,69 @@ class DiscordEventConverter(abstract_platform_adapter.AbstractEventConverter):
)
class DiscordFormView(discord_ui.View):
"""Discord ``ui.View`` that renders one button per Dify form action.
Each button's click triggers ``adapter._on_form_button_click`` which
acks the interaction, locks the buttons in place, and enqueues a
synthetic ``_dify_form_action`` query so the runner resumes the
workflow.
"""
# Discord button style mapping for Dify ``button_style`` values.
_STYLE_MAP: typing.ClassVar[dict] = {
'primary': discord.ButtonStyle.primary,
'danger': discord.ButtonStyle.danger,
'warning': discord.ButtonStyle.danger,
'success': discord.ButtonStyle.success,
'default': discord.ButtonStyle.secondary,
'': discord.ButtonStyle.secondary,
}
def __init__(
self,
adapter: 'DiscordAdapter',
session_key: str,
actions: list,
timeout: float = 1800,
):
super().__init__(timeout=timeout)
self._adapter = adapter
self._session_key = session_key
# Discord caps a view at 25 children (5 rows × 5 buttons). Trim
# silently — most Dify forms have ≤10 actions in practice.
for idx, action in enumerate(actions[:25]):
action_id = str(action.get('id') or '')
label = str(action.get('title') or action_id or f'Option {idx + 1}')
style = self._STYLE_MAP.get(
str(action.get('button_style') or '').lower(),
discord.ButtonStyle.secondary,
)
# custom_id must be unique within the view and ≤100 chars.
# Encode (session, idx) so we can recover the action even
# if Dify ids contain unsafe characters.
custom_id = f'lb_form:{idx}:{action_id[:80]}'[:100]
button = discord_ui.Button(
label=label[:80], # Discord label limit
style=style,
custom_id=custom_id,
)
button.callback = self._make_callback(action_id, label)
self.add_item(button)
def _make_callback(self, action_id: str, action_title: str):
async def _cb(interaction: discord.Interaction):
await self._adapter._on_form_button_click(
interaction=interaction,
session_key=self._session_key,
action_id=action_id,
action_title=action_title,
view=self,
)
return _cb
class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: discord.Client = pydantic.Field(exclude=True)
@@ -837,6 +903,10 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
voice_manager: VoiceConnectionManager | None = pydantic.Field(exclude=True, default=None)
# Injected by botmgr at construction so the form-button callback can
# enqueue a synthetic resume query (`_dify_form_action`) on the pool.
ap: typing.Any = pydantic.Field(exclude=True, default=None)
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs):
bot_account_id = config['client_id']
@@ -860,8 +930,18 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
args = {}
if os.getenv('http_proxy'):
args['proxy'] = os.getenv('http_proxy')
# Proxy: config > env var > auto-detect.
# discord.py uses aiohttp which does NOT respect http_proxy env
# vars by default — we must pass proxy= explicitly.
proxy = (
config.get('proxy')
or os.getenv('http_proxy')
or os.getenv('HTTP_PROXY')
or os.getenv('https_proxy')
or os.getenv('HTTPS_PROXY')
)
if proxy:
args['proxy'] = proxy
bot = MyClient(intents=intents, **args)
@@ -875,6 +955,19 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
**kwargs,
)
# Per-resp-message-id buffer for the accumulated text yielded by
# the runner. Discord's edit-message ratelimit (5/5s) makes true
# progressive streaming impractical, so we collect chunks and
# render once on is_final. ``_form_data`` on the final chunk
# diverts to the button-view path.
self._stream_buffer: dict[str, str] = {}
# session_key -> {form_data, channel_id, thread_id, sender_id,
# posted_at, view_message_id}
# Populated when we send a form view; consumed when the user
# clicks a button so we know which workflow_run / form_token to
# resume.
self._pending_forms: dict[str, dict] = {}
# Voice functionality methods
async def join_voice_channel(self, guild_id: int, channel_id: int, user_id: int = None) -> discord.VoiceClient:
"""
@@ -1068,7 +1161,12 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
):
msg_to_send, files = await self.message_converter.yiri2target(message)
assert isinstance(message_source.source_platform_object, discord.Message)
# Synthetic events (button-click resume) have no inbound discord
# Message. Route via the channel we cached when the user clicked.
source = message_source.source_platform_object
if not isinstance(source, discord.Message):
await self._reply_synthetic(message_source, msg_to_send, files)
return
args = {
'content': msg_to_send,
@@ -1078,7 +1176,7 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
args['files'] = files
if quote_origin:
args['reference'] = message_source.source_platform_object
args['reference'] = source
has_at = False
@@ -1090,7 +1188,413 @@ class DiscordAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
if has_at:
args['mention_author'] = True
await message_source.source_platform_object.channel.send(**args)
await source.channel.send(**args)
async def _reply_synthetic(
self,
message_source: platform_events.MessageEvent,
msg_to_send: str,
files: list,
) -> None:
"""Deliver a reply for a button-click-resumed (synthetic) event.
We don't have an inbound discord.Message to anchor to; instead
look up the channel cached in ``_pending_forms[session_key +
'__last_channel']`` from the most recent button click.
"""
if isinstance(message_source, platform_events.GroupMessage):
# _handle_form_chunk uses channel_id alone as the session
# scope, and launcher_id was set to channel_id when
# synthesizing the event.
session_key = f'c:{message_source.group.id}'
else:
session_key = f'p:{message_source.sender.id}'
cached = self._pending_forms.get(session_key + '__last_channel') or {}
channel = cached.get('channel')
if channel is None:
if self.ap is not None:
self.ap.logger.warning(
f'Discord: synthetic reply has no cached channel for '
f'{session_key}; dropping content (len={len(msg_to_send)})'
)
return
args: dict[str, typing.Any] = {'content': msg_to_send}
if files:
args['files'] = files
try:
await channel.send(**args)
except Exception:
if self.ap is not None:
self.ap.logger.error(f'Discord: synthetic reply send failed: {traceback.format_exc()}')
# Discord allows 5 edits per 5 seconds per message. We throttle
# to one edit per 8 runner-chunks (runner already yields every 8
# text_chunks internally), which stays comfortably within limits.
_STREAM_EDIT_INTERVAL = 8
async def is_stream_output_supported(self) -> bool:
return True
async def create_message_card(self, message_id: str, event: platform_events.MessageEvent) -> bool:
"""Set up a stream context for progressive editing.
The first non-empty reply_message_chunk will send the initial
message; subsequent chunks edit it in place.
"""
source = event.source_platform_object
if not isinstance(source, discord.Message):
return False
self._stream_buffer[message_id] = {
'channel': source.channel,
'sent_message': None, # discord.Message set on first send
'last_content': '',
'chunk_count': 0,
}
return True
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message: typing.Any,
message: platform_message.MessageChain,
quote_origin: bool = False,
is_final: bool = False,
):
msg_id = (
bot_message.get('resp_message_id')
if isinstance(bot_message, dict)
else getattr(bot_message, 'resp_message_id', None)
)
text_parts = [m.text for m in message if isinstance(m, platform_message.Plain)]
chunk_text = '\n\n'.join(t for t in text_parts if t)
form_data = getattr(bot_message, '_form_data', None) if not isinstance(bot_message, dict) else None
ctx = self._stream_buffer.get(msg_id) if msg_id else None
# If the stream ctx was not set up (create_message_card wasn't
# called, e.g. synthetic event), or the final chunk carries a
# form, skip progressive editing entirely.
if ctx is None or form_data:
try:
if form_data and is_final:
await self._handle_form_chunk(message_source, form_data)
elif is_final and chunk_text:
await self.reply_message(
message_source,
platform_message.MessageChain([platform_message.Plain(text=chunk_text)]),
quote_origin,
)
finally:
self._stream_buffer.pop(msg_id, None)
return
# Progressive streaming path: send first chunk, edit subsequent.
ctx['chunk_count'] += 1
# Runner yields the full accumulated text on each chunk, so we
# always replace (not append).
if chunk_text:
ctx['last_content'] = chunk_text
sent = ctx['sent_message']
if sent is None:
# First non-empty chunk — send the initial message.
if not ctx['last_content']:
return # No content yet, wait for next chunk.
try:
sent = await ctx['channel'].send(ctx['last_content'])
ctx['sent_message'] = sent
except Exception:
if self.ap is not None:
self.ap.logger.error(f'Discord stream send failed: {traceback.format_exc()}')
self._stream_buffer.pop(msg_id, None)
return
if is_final:
# Final chunk — edit to the full content, then clean up.
if ctx['last_content'] and ctx['last_content'] != sent.content:
try:
await sent.edit(content=ctx['last_content'][:2000])
except Exception:
pass # Best-effort
self._stream_buffer.pop(msg_id, None)
elif (ctx['chunk_count'] % self._STREAM_EDIT_INTERVAL) == 0:
# Intermediate edit — throttle to avoid rate limits.
if ctx['last_content'] and ctx['last_content'] != sent.content:
try:
await sent.edit(content=ctx['last_content'][:2000])
except Exception:
pass # Rate-limited or deleted — ignore.
async def _handle_form_chunk(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> None:
"""Render a Dify form pause as a Discord embed + button View.
Mirrors the QQ / Telegram / Lark form path: the button's click
callback synthesizes a ``_dify_form_action`` query so the runner's
``_merge_pending_form_action`` resumes the workflow.
"""
source = message_source.source_platform_object
actions = form_data.get('actions') or []
if not actions:
# Nothing clickable — fall back to plain text.
if source is not None:
await self.reply_message(
message_source,
platform_message.MessageChain(
[platform_message.Plain(text=str(form_data.get('node_title') or ''))]
),
)
return
node_title = str(form_data.get('node_title') or 'Confirmation needed')
form_content = str(form_data.get('form_content') or '').strip()
# Two paths:
# (a) Real message — extract channel from source.
# (b) Synthetic event (button-click resume) — no
# source_platform_object; recover the channel we cached
# when the user clicked.
if isinstance(source, discord.Message):
channel = source.channel
guild_id = str(source.guild.id) if source.guild else ''
sender_id = str(source.author.id)
channel_id = str(source.channel.id)
session_key = f'c:{channel_id}' if guild_id else f'p:{sender_id}'
else:
# Synthetic event — resolve session_key from event shape,
# then look up the cached channel from the click.
if isinstance(message_source, platform_events.GroupMessage):
# launcher_id was set to channel_id when we synthesized.
channel_id = str(message_source.group.id)
session_key = f'c:{channel_id}'
else:
session_key = f'p:{message_source.sender.id}'
channel_id = ''
cached = self._pending_forms.get(session_key + '__last_channel')
channel = cached.get('channel') if cached else None
guild_id = (cached or {}).get('guild_id', '')
sender_id = str(message_source.sender.id) if message_source.sender else ''
if channel is None:
if self.ap is not None:
self.ap.logger.warning(
f'Discord: synthetic form chunk has no cached channel for '
f'{session_key}; cannot render form buttons'
)
return
body_parts: list[str] = []
if form_content:
body_parts.append(form_content)
body_parts.append('Please select an option below:')
embed_body = '\n\n'.join(body_parts)
# Discord embed.description has a 4096 char limit — defensive trim.
if len(embed_body) > 4000:
embed_body = embed_body[:3990] + '\n\n…(truncated)'
embed = discord.Embed(
title=node_title[:256],
description=embed_body,
color=discord.Color.blurple(),
)
view = DiscordFormView(
adapter=self,
session_key=session_key,
actions=actions,
timeout=1800, # 30 min — matches Dify form_token TTL
)
try:
sent_msg = await channel.send(embed=embed, view=view)
except Exception:
if self.ap is not None:
self.ap.logger.error(f'Discord: form view send failed: {traceback.format_exc()}')
return
self._pending_forms[session_key] = {
'form_data': form_data,
'channel_id': channel_id,
'guild_id': guild_id,
'sender_id': sender_id,
'view_message_id': str(sent_msg.id),
'posted_at': time.time(),
}
if self.ap is not None:
self.ap.logger.info(f'Discord: form view posted session={session_key} actions={len(actions)}')
async def _on_form_button_click(
self,
interaction: discord.Interaction,
session_key: str,
action_id: str,
action_title: str,
view: DiscordFormView,
) -> None:
"""Handle a click on a form button — ack, resume the workflow,
and disable the View buttons so the choice is visually locked in."""
import langbot_plugin.api.entities.builtin.provider.session as provider_session
# ACK first (3-second deadline before Discord shows "interaction failed").
try:
await interaction.response.defer()
except discord.HTTPException:
# Already responded somehow — proceed regardless.
pass
pending = self._pending_forms.pop(session_key, None)
if not pending:
if self.ap is not None:
self.ap.logger.warning(
f'Discord: button click on stale session {session_key}; ignoring (action_id={action_id!r})'
)
await self._lock_view_message(interaction, view, action_title, stale=True)
return
# Lock the buttons in place: disable everything, mark chosen one.
await self._lock_view_message(interaction, view, action_title)
form_data: dict = pending.get('form_data') or {}
guild_id = pending.get('guild_id', '')
channel_id = pending.get('channel_id', '')
sender_id = pending.get('sender_id', '')
# In group context the launcher is the CHANNEL (not the user who
# clicked) — matches how the original message was routed through
# the pipeline. Using the clicker's user id would mismatch the
# Dify session and produce "Workflow run not found".
if guild_id:
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = channel_id
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = sender_id or str(interaction.user.id)
form_action_data = {
'form_token': form_data.get('form_token', ''),
'workflow_run_id': form_data.get('workflow_run_id', ''),
'action_id': action_id,
'action_title': action_title,
'node_title': form_data.get('node_title', ''),
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
# Synthesize a platform event so the pipeline can run the resume
# query. source_platform_object=None signals "no inbound discord
# message" — reply_message must tolerate this (it falls through
# to channel.send via the cached interaction.channel below).
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event: platform_events.MessageEvent = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=sender_id,
member_name=interaction.user.display_name if interaction.user else '',
permission='MEMBER',
group=platform_entities.Group(
id=launcher_id,
name=channel_id,
permission=platform_entities.Permission.Member,
),
special_title='',
),
message_chain=message_chain,
time=int(time.time()),
source_platform_object=None,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=sender_id,
nickname=interaction.user.display_name if interaction.user else '',
remark='',
),
message_chain=message_chain,
time=int(time.time()),
source_platform_object=None,
)
if self.ap is None:
if self.logger:
await self.logger.error('Discord: ap not injected; cannot enqueue button-click query')
return
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
# Remember the channel so _reply_synthetic and _handle_form_chunk
# (synthetic-event path) can find a target. guild_id is needed
# to reconstruct the launcher_type on subsequent form pauses.
self._pending_forms[session_key + '__last_channel'] = {
'channel': interaction.channel,
'guild_id': guild_id,
'posted_at': time.time(),
}
try:
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
if self.ap is not None:
self.ap.logger.info(
f'Discord: button-click query enqueued action_id={action_id!r} session={session_key}'
)
except Exception:
if self.ap is not None:
self.ap.logger.error(f'Discord: enqueue button-click query failed: {traceback.format_exc()}')
async def _lock_view_message(
self,
interaction: discord.Interaction,
view: DiscordFormView,
chosen_title: str,
stale: bool = False,
) -> None:
"""Disable all buttons on the form view and annotate the chosen
one — mirrors DingTalk/Lark's in-card 已选择 feedback."""
try:
for child in view.children:
if not isinstance(child, discord_ui.Button):
continue
child.disabled = True
if not stale and child.label == chosen_title:
child.style = discord.ButtonStyle.success
if not (child.label or '').startswith(''):
child.label = f'{child.label}'
view.stop()
if interaction.message is not None:
await interaction.message.edit(view=view)
except Exception:
if self.ap is not None:
self.ap.logger.warning(f'Discord: lock-view-message failed (non-fatal): {traceback.format_exc()}')
async def is_muted(self, group_id: int) -> bool:
return False