feat: Implement workflow form handling for paused workflows

- Added module-level storage for pending forms to manage state across sessions.
- Introduced functions to set, get, and clear pending forms with expiration handling.
- Enhanced DifyServiceAPIRunner to support resuming paused workflows via form actions.
- Implemented logic to yield human input requests and display appropriate messages.
- Updated workflow submission methods to handle paused states and resume actions.
- Ensured proper merging of pending form actions with user inputs for seamless interaction.
This commit is contained in:
fdc310
2026-05-28 23:32:46 +08:00
parent 894709d577
commit b96f209b98
8 changed files with 1251 additions and 73 deletions

View File

@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
async def workflow_submit(
self,
form_token: str,
workflow_run_id: str,
inputs: dict[str, typing.Any],
user: str,
action: str = '',
timeout: float = 120.0,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""Submit human input to resume a paused workflow, then stream events.
1. POST /form/human_input/{form_token} to submit the form
2. GET /workflow/{task_id}/events to stream the resumed workflow events
"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
}
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
# Step 1: Submit the form
payload: dict[str, typing.Any] = {
'inputs': inputs if isinstance(inputs, dict) else {},
'user': user,
'action': action,
}
submit_resp = await client.post(
f'/form/human_input/{form_token}',
headers=headers,
json=payload,
)
if submit_resp.status_code != 200:
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
# Step 2: Stream resumed workflow events
async with client.stream(
'GET',
f'/workflow/{workflow_run_id}/events',
headers={'Authorization': f'Bearer {self.api_key}'},
params={'user': user},
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f'{r.status_code} {chunk}')
if chunk.strip() == '':
continue
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
async def upload_file(
self,
file: httpx._types.FileTypes,

View File

@@ -157,7 +157,7 @@ class RuntimePipeline:
bot_message=query.resp_messages[-1],
message=result.user_notice,
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
is_final=[msg.is_final for msg in query.resp_messages][0],
is_final=[msg.is_final for msg in query.resp_messages][-1],
)
else:
await query.adapter.reply_message(

View File

@@ -42,9 +42,13 @@ class QueryPool:
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False,
variables: typing.Optional[dict[str, typing.Any]] = None,
) -> pipeline_query.Query:
async with self.condition:
query_id = self.query_id_counter
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
if variables:
initial_variables.update(variables)
query = pipeline_query.Query(
bot_uuid=bot_uuid,
query_id=query_id,
@@ -53,7 +57,7 @@ class QueryPool:
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
variables={'_routed_by_rule': routed_by_rule},
variables=initial_variables,
resp_messages=[],
resp_message_chain=[],
adapter=adapter,

View File

@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
# TODO 命令与流式的兼容性问题
if await query.adapter.is_stream_output_supported() and has_chunks:
is_final = [msg.is_final for msg in query.resp_messages][0]
is_final = [msg.is_final for msg in query.resp_messages][-1]
await query.adapter.reply_message_chunk(
message_source=query.message_event,
bot_message=query.resp_messages[-1],

View File

@@ -501,6 +501,8 @@ class PlatformManager:
bot_entity.adapter_config,
logger,
)
if hasattr(adapter_inst, 'ap'):
adapter_inst.ap = self.ap
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid用于统一 webhook
if hasattr(adapter_inst, 'set_bot_uuid'):

View File

@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
import langbot_plugin.api.entities.builtin.provider.session as provider_session
class AESCipher(object):
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
pending_monitoring_msg: dict[str, str]
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
reply_to_monitoring_msg: dict[str, tuple[str, float]]
reply_message_card_ids: dict[str, str]
card_sequence_dict: dict[str, int]
# card_id → set of source message ids registered against it (for cleanup)
card_id_to_source_ids: dict[str, set[str]]
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
card_streaming_text: dict[str, str]
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
card_pre_pause_text: dict[str, str]
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
card_resume_transitioned: set[str]
_MONITORING_MAPPING_TTL = 600 # 10 minutes
seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识
@@ -812,11 +824,131 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
asyncio.create_task(on_message(event))
def schedule_on_app_loop(coro):
"""Run a coroutine on the application event loop from sync callbacks."""
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
def sync_on_card_action(event):
try:
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {})
action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
# Parse JSON string values (from form action buttons)
if isinstance(action_value_raw, str):
try:
action_value_obj = json.loads(action_value_raw)
except (json.JSONDecodeError, TypeError):
action_value_obj = {}
else:
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
# Handle Dify form action button clicks
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
form_token = action_value_obj.get('form_token', '')
workflow_run_id = action_value_obj.get('workflow_run_id', '')
action_id = action_value_obj.get('action_id', '')
session_key = action_value_obj.get('session_key', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
# Find the bot entity to get bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
form_action_data = {
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
context = getattr(event.event, 'context', None)
open_message_id = getattr(context, 'open_message_id', None)
source_time = datetime.datetime.now()
event_time = source_time.timestamp()
action_text = action_value_obj.get('action_id', 'confirm')
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
)
if open_message_id:
message_chain.insert(
0,
platform_message.Source(
id=open_message_id,
time=source_time,
),
)
operator = getattr(event.event, 'operator', None)
user_id = (
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
async def add_form_action_query():
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={'_dify_form_action': form_action_data},
)
schedule_on_app_loop(add_form_action_query())
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
if action_value == '有帮助':
feedback_type = 1
elif action_value == '无帮助':
@@ -857,17 +989,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
)
if platform_events.FeedbackEvent in self.listeners:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
else:
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
except Exception:
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
traceback.print_exc()
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
@@ -893,6 +1022,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_id_dict={},
pending_monitoring_msg={},
reply_to_monitoring_msg={},
reply_message_card_ids={},
card_sequence_dict={},
card_id_to_source_ids={},
card_streaming_text={},
card_pre_pause_text={},
card_resume_transitioned=set(),
seq=1,
listeners={},
quart_app=quart_app,
@@ -1132,6 +1267,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
for k in expired:
del self.reply_to_monitoring_msg[k]
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
"""Return the next strictly increasing sequence for a card update."""
current = self.card_sequence_dict.get(card_id, 0)
next_seq = max(current + 1, suggested)
self.card_sequence_dict[card_id] = next_seq
return next_seq
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
"""Register a card_id under one or more source message ids."""
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
for sid in source_ids:
if not sid:
continue
self.reply_message_card_ids[sid] = card_id
bucket.add(sid)
def _drop_card_state(self, card_id: str) -> None:
"""Pop all per-card state for the given card_id."""
if not card_id:
return
for sid in self.card_id_to_source_ids.pop(card_id, set()):
self.reply_message_card_ids.pop(sid, None)
self.card_sequence_dict.pop(card_id, None)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
async def create_card_id(self, message_id):
try:
# self.logger.debug('飞书支持stream输出,创建卡片......')
@@ -1327,6 +1489,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id
self.card_sequence_dict[card_id] = 0
return card_id
except Exception as e:
@@ -1339,6 +1502,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""
# message_id = event.message_chain.message_id
source_message_id = str(event.message_chain.message_id)
existing_card_id = self.reply_message_card_ids.get(source_message_id)
if existing_card_id:
self.card_id_dict[message_id] = existing_card_id
return True
card_id = await self.create_card_id(message_id)
content = {
'type': 'card',
@@ -1377,6 +1546,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
user_msg_id = event.message_chain.message_id
reply_msg_id = getattr(response.data, 'message_id', None)
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
# Register the card under both the user-incoming msg id (so a
# second reply_message_first_chunk for the same user message
# reuses this card) AND the bot-reply msg id (so a synthetic
# event from a form-button callback — whose Source.id equals
# the bot's card message id — hits the same card and renders
# the resume content into it).
if reply_msg_id:
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
else:
self._register_card_for_source(card_id, str(user_msg_id))
if reply_msg_id and monitoring_msg_id:
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
self._cleanup_monitoring_mapping()
@@ -1504,45 +1683,462 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
):
"""
回复消息变成更新卡片消息
Supports Dify form-action resume: when the runner yields a chunk with
``_resume_from_form=True``, the card transitions from buttons to a
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
for subsequent resume chunks to stream into.
When ``_open_new_card=True`` on the final chunk, the existing card is
left as-is and the pipeline will create a new card (with fresh form
buttons) for the re-pause.
"""
# self.seq += 1
message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence
if msg_seq % 8 == 0 or is_final:
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
text_message = ''
if text_elements:
parts = []
for paragraph in text_elements:
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
form_data = getattr(bot_message, '_form_data', None)
resume_from = getattr(bot_message, '_resume_from_form', False)
action_title = getattr(bot_message, '_resume_action_title', '')
open_new_card = getattr(bot_message, '_open_new_card', False)
selected_notice = f'> **已选择**{action_title}' if action_title else ''
# content = {
# 'type': 'card_json',
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}},
# }
# ── decide whether this chunk needs a card update ────────────────────
card_id = self.card_id_dict.get(message_id)
if not card_id:
return
request: ContentCardElementRequest = (
ContentCardElementRequest.builder()
.card_id(self.card_id_dict[message_id])
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder()
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204")
.content(text_message)
.sequence(msg_seq)
# ── convert message chain → text ─────────────────────────────────────
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
text_message = ''
if text_elements:
parts = []
for paragraph in text_elements:
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
card_sequence = self._next_card_sequence(card_id, msg_seq)
# ── RESUME: first chunk after button click ───────────────────────────
if resume_from and card_id not in self.card_resume_transitioned:
# Transition the card from the form state into resume mode.
# Preserve the text that was shown before the pause, and seed the
# resume placeholder with the current resume content if we already
# have any on the first yielded chunk.
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
initial_resume_text = text_message or '\u200b'
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=initial_resume_text,
)
self.card_resume_transitioned.add(card_id)
self.card_pre_pause_text[card_id] = pre_pause_text
self.card_streaming_text[card_id] = text_message
if not is_final:
return
# ── RESUME: subsequent chunks → full card update ─────────────────────
if resume_from and card_id in self.card_resume_transitioned:
cached = self.card_streaming_text.get(card_id, '')
if text_message != cached:
self.card_streaming_text[card_id] = text_message
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=text_message,
)
if not is_final:
return
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
if not resume_from and (msg_seq % 8 == 0 or is_final):
cached = self.card_streaming_text.get(card_id)
if text_message != cached:
self.card_streaming_text[card_id] = text_message
request: ContentCardElementRequest = (
ContentCardElementRequest.builder()
.card_id(card_id)
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
)
.build()
)
.build()
response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
request, req_opt
)
if not response.success():
raise Exception(
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
# ── FINAL chunk: full card layout update ─────────────────────────────
if is_final:
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
resume_cached = self.card_streaming_text.get(card_id, '')
if form_data:
# Keep the current card and render the human-input action area.
# Initial pause uses the latest stream text, while re-pause after
# a button click preserves the pre-pause text and keeps the resume
# stream content in the extra placeholder area.
final_text = pre_pause if open_new_card else text_message
final_resume_placeholder = resume_cached if open_new_card else ''
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=final_text,
sequence=final_seq,
form_data=form_data,
resume_placeholder_text=final_resume_placeholder,
show_form_prompt=not open_new_card,
)
if open_new_card:
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
else:
# The human-input prompt itself is rendered as buttons only
# on Lark, so do not keep the hidden fallback text around;
# otherwise it will resurface after the button click.
self.card_streaming_text[card_id] = ''
self.card_pre_pause_text[card_id] = ''
else:
# Normal finish: keep pre-pause + resume content visible,
# remove buttons/notice, drop the resume placeholder.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=None,
notice_text=selected_notice if resume_from else '',
resume_placeholder_text=resume_cached,
)
self._drop_card_state(card_id)
self.card_id_dict.pop(message_id, None)
# ── media (images / files) appended at the end ───────────────────────
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def _add_form_buttons_to_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
text_message: str = '',
sequence: int = 1,
):
"""Update the entire card to include form action buttons.
Uses card.aupdate to replace the card JSON with a template that
includes the streaming text content plus interactive buttons.
"""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=form_data,
)
async def _remove_form_buttons_from_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
):
"""Replace the human-input card layout with the plain final layout."""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=None,
)
async def _update_card_layout(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
form_data: dict | None = None,
notice_text: str = '',
resume_placeholder_text: str = '',
show_form_prompt: bool = True,
):
"""Update the entire card layout.
• form_data → show interactive buttons (initial Dify pause)
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
• resume_placeholder_text → add a streaming_txt_resume markdown element
"""
form_data = form_data or {}
actions = form_data.get('actions', [])
form_token = form_data.get('form_token', '')
workflow_run_id = form_data.get('workflow_run_id', '')
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '')
# When form_data is set, the visible content is rendered inside the
# interactive container, so the top streaming text should stay empty
# to avoid duplicate text above the action area.
#
# For resume notice state, keep the existing text visible in the card
# and only add the grey "selected" notice below it.
if form_data:
render_text_message = ''
else:
render_text_message = text_message
# Determine session key from message source
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'group_{message_source.group.id}'
else:
session_key = f'person_{message_source.sender.id}'
# Build button elements matching the existing card template's thumbsup/down format
action_buttons = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
button_style = action.get('button_style', 'default')
if button_style == 'primary':
lark_button_type = 'primary'
elif button_style == 'danger':
lark_button_type = 'danger'
else:
lark_button_type = 'default'
action_buttons.append(
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': action_title},
'type': lark_button_type,
'width': 'fill',
'size': 'medium',
'hover_tips': {'tag': 'plain_text', 'content': action_title},
'behaviors': [
{
'type': 'callback',
'value': {
'form_action': True,
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'session_key': session_key,
},
}
],
'margin': '0px 0px 0px 0px',
}
)
if is_final and bot_message.tool_calls is None:
# self.seq = 1 # 消息回复结束之后重置seq
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片
interactive_elements = []
if form_data:
if show_form_prompt:
interactive_elements = [
{
'tag': 'markdown',
'content': f'**[Human Input Required] {node_title}**',
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 4px 0px',
}
]
if form_content:
interactive_elements.append(
{
'tag': 'markdown',
'content': form_content,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 8px 0px',
}
)
interactive_elements.append(
{
'tag': 'column_set',
'horizontal_spacing': '8px',
'horizontal_align': 'left',
'margin': '0px 0px 0px 0px',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [btn],
'padding': '0px 0px 0px 0px',
}
for btn in action_buttons
],
}
)
# Build the full card JSON with buttons, same structure as create_card_id
# ── mid_section: either form buttons, resume notice, or empty ──
mid_section_elements = []
if form_data:
mid_section_elements = [
{
'tag': 'interactive_container',
'margin': '12px 0px 8px 0px',
'padding': '12px 12px 12px 12px',
'has_border': True,
'elements': interactive_elements,
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
elif notice_text:
mid_section_elements = [
{
'tag': 'markdown',
'content': notice_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '8px 0px 4px 0px',
'text_color': 'grey',
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
resume_elements = []
if resume_placeholder_text:
resume_elements = [
{
'tag': 'markdown',
'content': resume_placeholder_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt_resume',
},
]
card_data = {
'schema': '2.0',
'config': {
'update_multi': True,
'streaming_mode': False,
},
'body': {
'direction': 'vertical',
'padding': '12px 12px 12px 12px',
'elements': [
{
'tag': 'div',
'text': {
'tag': 'plain_text',
'content': 'LangBot',
'text_size': 'normal',
'text_align': 'left',
'text_color': 'default',
},
'icon': {
'tag': 'custom_icon',
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
},
},
{
'tag': 'markdown',
'content': render_text_message,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt',
},
*mid_section_elements,
*resume_elements,
{
'tag': 'column_set',
'horizontal_spacing': '12px',
'horizontal_align': 'right',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [
{
'tag': 'markdown',
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
'text_align': 'left',
'text_size': 'notation',
'margin': '4px 0px 0px 0px',
'icon': {
'tag': 'standard_icon',
'token': 'robot_outlined',
'color': 'grey',
},
}
],
'padding': '0px 0px 0px 0px',
'direction': 'vertical',
'horizontal_spacing': '8px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
'weight': 1,
}
],
'margin': '0px 0px 4px 0px',
},
],
},
}
try:
tenant_key = (
message_source.source_platform_object.header.tenant_key
if message_source.source_platform_object
@@ -1558,39 +2154,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
.tenant_access_token(tenant_access_token)
.build()
)
# 发起请求
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
# 处理失败返回
if not response.success():
raise Exception(
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
request: UpdateCardRequest = (
UpdateCardRequest.builder()
.card_id(card_id)
.request_body(
UpdateCardRequestBody.builder()
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
.build()
)
return
# Send media messages when streaming is done
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
.build()
)
response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
if not response.success():
await self.logger.error(
f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
)
except Exception:
await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
async def is_muted(self, group_id: int) -> bool:
return False

View File

@@ -4,11 +4,12 @@ import time
import telegram
import telegram.ext
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
import telegramify_markdown
import typing
import traceback
import json
import base64
import pydantic
@@ -189,6 +190,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: telegram.Bot = pydantic.Field(exclude=True)
application: telegram.ext.Application = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
message_converter: TelegramMessageConverter = TelegramMessageConverter()
event_converter: TelegramEventConverter = TelegramEventConverter()
@@ -224,6 +226,95 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
telegram_callback,
)
)
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
try:
data = json.loads(query.data)
if data.get('form_action') or data.get('f'):
import langbot_plugin.api.entities.builtin.provider.session as provider_session
workflow_run_id = data.get('workflow_run_id') or data.get('w', '')
action_id = data.get('action_id') or data.get('a', '')
session_key = data.get('session_key') or data.get('s', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
user_id = str(query.from_user.id)
# Find bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for b in self.ap.platform_mgr.bots:
if b.adapter is self:
bot_uuid = b.bot_entity.uuid
pipeline_uuid = b.bot_entity.use_pipeline_uuid
break
form_action_data = {
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
)
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={'_dify_form_action': form_action_data},
)
except Exception:
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
application.add_handler(CallbackQueryHandler(callback_query_handler))
super().__init__(
config=config,
logger=logger,
@@ -369,6 +460,11 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
del args['draft_id']
await self.bot.send_message(**args)
self.msg_stream_id.pop(message_id)
# Send form action buttons if form data is present
form_data = getattr(bot_message, '_form_data', None)
if form_data:
await self._send_form_action_buttons(message_source, form_data)
else:
stream_id = draft_id
if (msg_seq - 1) % 8 == 0 or is_final:
@@ -384,6 +480,51 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id)
# Send form action buttons if form data is present
form_data = getattr(bot_message, '_form_data', None)
if form_data:
await self._send_form_action_buttons(message_source, form_data)
async def _send_form_action_buttons(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
):
"""Send inline keyboard buttons for Dify human_input_required form actions."""
actions = form_data.get('actions', [])
node_title = form_data.get('node_title', '')
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'g:{message_source.group.id}'
else:
session_key = f'p:{message_source.sender.id}'
keyboard = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
callback_data = json.dumps(
{'f': 1, 'a': action_id, 's': session_key},
separators=(',', ':'),
)
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
reply_markup = InlineKeyboardMarkup(keyboard)
update = message_source.source_platform_object
chat_id = update.effective_chat.id
message_thread_id = update.message.message_thread_id
args = {
'chat_id': chat_id,
'text': f'[{node_title}] Please select an action:',
'reply_markup': reply_markup,
}
if message_thread_id:
args['message_thread_id'] = message_thread_id
await self.bot.send_message(**args)
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
if not isinstance(event.source_platform_object, Update):
return None

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import typing
import json
import time
import uuid
import base64
import mimetypes
@@ -16,6 +17,46 @@ from langbot.libs.dify_service_api.v1 import client, errors
import httpx
# Module-level store for paused-workflow form state, keyed by session key
# (launcher_type_value + "_" + launcher_id). Keeps state out of the SDK
# Conversation type, which may not accept arbitrary attribute assignment.
_PENDING_FORMS: dict[str, dict[str, typing.Any]] = {}
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
def _session_key_from_query(query: pipeline_query.Query) -> str:
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
def _prune_pending_forms(now: float | None = None) -> None:
if now is None:
now = time.time()
expired = [key for key, data in _PENDING_FORMS.items() if data.get('_expires_at', 0) <= now]
for key in expired:
_PENDING_FORMS.pop(key, None)
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
_prune_pending_forms()
stored = dict(form_data)
expiration_time = stored.get('expiration_time')
try:
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
except (TypeError, ValueError):
expiration_ts = 0.0
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
_PENDING_FORMS[session_key] = stored
def _get_pending_form(session_key: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
return _PENDING_FORMS.get(session_key)
def _clear_pending_form(session_key: str) -> None:
_PENDING_FORMS.pop(session_key, None)
@runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器"""
@@ -335,11 +376,107 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form_blocking(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
inputs = form_action.get('inputs', {})
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_finished':
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
yield provider_message.Message(
role='assistant',
content=content,
)
def _merge_pending_form_action(self, form_action: dict | None, pending_form: dict | None) -> dict | None:
"""Backfill resume fields from the pending form stored on the conversation."""
if not form_action:
return None
merged_action = dict(form_action)
if pending_form:
merged_action.setdefault('form_token', pending_form.get('form_token', ''))
merged_action.setdefault('workflow_run_id', pending_form.get('workflow_run_id', ''))
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
merged_action.setdefault('user', pending_form.get('user', ''))
# Resolve clicked action's display title from the stored actions list
if 'action_title' not in merged_action:
clicked_id = merged_action.get('action_id', '')
for action in pending_form.get('actions', []):
if str(action.get('id', '')) == str(clicked_id):
merged_action['action_title'] = action.get('title', clicked_id)
break
return merged_action
def _match_pending_form_action(self, user_text: str, pending_form: dict | None) -> dict | None:
"""Match plain text replies against pending Dify form actions."""
if not pending_form:
return None
normalized_text = user_text.strip().lower()
if not normalized_text:
return None
for action in pending_form.get('actions', []):
titles = {
str(action.get('title', '')).strip().lower(),
str(action.get('id', '')).strip().lower(),
}
if normalized_text in titles:
return {
'form_token': pending_form.get('form_token', ''),
'workflow_run_id': pending_form.get('workflow_run_id', ''),
'action_id': action.get('id', ''),
'action_title': action.get('title', action.get('id', '')),
'inputs': pending_form.get('inputs', {}),
'user': pending_form.get('user', ''),
}
return None
async def _workflow_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
pending_form = _get_pending_form(session_key)
if form_action:
form_action = self._merge_pending_form_action(form_action, pending_form)
_clear_pending_form(session_key)
elif pending_form:
form_action = self._match_pending_form_action(str(query.message_chain), pending_form)
if form_action:
_clear_pending_form(session_key)
if form_action:
async for msg in self._submit_workflow_form_blocking(form_action):
yield msg
return
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -366,6 +503,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
}
inputs.update(query.variables)
human_input_yielded = False
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
@@ -377,6 +515,46 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', '')
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': reason.get('inputs', {}),
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
human_input_yielded = True
yield provider_message.Message(
role='assistant',
content=display_text,
)
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
@@ -399,6 +577,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
elif chunk['event'] == 'workflow_finished':
if human_input_yielded:
break
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
@@ -636,11 +816,143 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""Submit human input to resume a paused Dify workflow."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
action_title = form_action.get('action_title', '') or action_id
inputs = form_action.get('inputs', {})
messsage_idx = 0
is_final = False
think_start = False
think_end = False
workflow_contents = ''
repause_form_data: dict | None = None
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
yield_this_iteration = False
if chunk['event'] == 'workflow_finished':
is_final = True
yield_this_iteration = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
user,
{
'workflow_run_id': new_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': user,
},
)
repause_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'workflow_run_id': new_run_id,
'form_token': reason.get('form_token', ''),
}
is_final = True
yield_this_iteration = True
break
if chunk['event'] == 'text_chunk':
messsage_idx += 1
if remove_think:
if '<think>' in chunk['data']['text'] and not think_start:
think_start = True
continue
if '</think>' in chunk['data']['text'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += chunk['data']['text']
if think_start:
continue
else:
workflow_contents += chunk['data']['text']
if messsage_idx % 8 == 0:
yield_this_iteration = True
if yield_this_iteration:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
is_final=is_final,
)
msg._resume_from_form = True
if action_title:
msg._resume_action_title = action_title
if is_final and repause_form_data:
msg._form_data = repause_form_data
msg._open_new_card = True
yield msg
if is_final:
return
async def _workflow_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
pending_form = _get_pending_form(session_key)
if form_action:
form_action = self._merge_pending_form_action(form_action, pending_form)
_clear_pending_form(session_key)
elif pending_form:
form_action = self._match_pending_form_action(str(query.message_chain), pending_form)
if form_action:
_clear_pending_form(session_key)
if form_action:
# Resume paused workflow via submit endpoint
async for msg in self._submit_workflow_form(form_action):
yield msg
return
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -672,6 +984,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
think_start = False
think_end = False
workflow_contents = ''
workflow_run_id = ''
human_input_yielded = False
# Saved form data to attach to the final MessageChunk so the adapter
# can detect it when is_final=True and render buttons.
pending_form_data = None
display_text = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run(
@@ -682,7 +1001,62 @@ class DifyServiceAPIRunner(runner.RequestRunner):
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
if chunk['event'] == 'workflow_started':
workflow_run_id = chunk['data'].get('workflow_run_id', '')
continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
# Persist form state in module-level store keyed by session
raw_inputs = reason.get('inputs', {})
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
# Pass form render metadata to downstream stages
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
workflow_contents += display_text + '\n'
# Save form data to attach to the final chunk later.
# We do NOT yield here — the form content will be sent
# as the final MessageChunk (with is_final=True and
# _form_data) so the adapter can update the card and
# add buttons in one pass.
pending_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'workflow_run_id': workflow_run_id,
'form_token': reason.get('form_token', ''),
}
human_input_yielded = True
if chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data']['error']:
@@ -730,11 +1104,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
if messsage_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
final_content = workflow_contents if workflow_contents.strip() else ''
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
content=final_content,
is_final=is_final,
)
# Attach form data to the final chunk for the adapter
if is_final and pending_form_data:
msg._form_data = pending_form_data
pending_form_data = None
yield msg
# If the stream ended after workflow_paused without a
# workflow_finished event, yield a final chunk so the adapter
# can update the card and add buttons.
if human_input_yielded and not is_final:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents or display_text,
is_final=True,
)
msg._form_data = pending_form_data
yield msg
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""