diff --git a/src/langbot/libs/dify_service_api/v1/client.py b/src/langbot/libs/dify_service_api/v1/client.py
index cc8008e1..a608a0db 100644
--- a/src/langbot/libs/dify_service_api/v1/client.py
+++ b/src/langbot/libs/dify_service_api/v1/client.py
@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
+ async def workflow_submit(
+ self,
+ form_token: str,
+ workflow_run_id: str,
+ inputs: dict[str, typing.Any],
+ user: str,
+ action: str = '',
+ timeout: float = 120.0,
+ ) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
+ """Submit human input to resume a paused workflow, then stream events.
+
+ 1. POST /form/human_input/{form_token} to submit the form
+ 2. GET /workflow/{task_id}/events to stream the resumed workflow events
+ """
+
+ headers = {
+ 'Authorization': f'Bearer {self.api_key}',
+ 'Content-Type': 'application/json',
+ }
+
+ async with httpx.AsyncClient(
+ base_url=self.base_url,
+ trust_env=True,
+ timeout=timeout,
+ ) as client:
+ # Step 1: Submit the form
+ payload: dict[str, typing.Any] = {
+ 'inputs': inputs if isinstance(inputs, dict) else {},
+ 'user': user,
+ 'action': action,
+ }
+
+ submit_resp = await client.post(
+ f'/form/human_input/{form_token}',
+ headers=headers,
+ json=payload,
+ )
+ if submit_resp.status_code != 200:
+ raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
+
+ # Step 2: Stream resumed workflow events
+ async with client.stream(
+ 'GET',
+ f'/workflow/{workflow_run_id}/events',
+ headers={'Authorization': f'Bearer {self.api_key}'},
+ params={'user': user},
+ ) as r:
+ async for chunk in r.aiter_lines():
+ if r.status_code != 200:
+ raise DifyAPIError(f'{r.status_code} {chunk}')
+ if chunk.strip() == '':
+ continue
+ if chunk.startswith('data:'):
+ yield json.loads(chunk[5:])
+
async def upload_file(
self,
file: httpx._types.FileTypes,
diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py
index 1426fe3d..6f917b2d 100644
--- a/src/langbot/pkg/pipeline/pipelinemgr.py
+++ b/src/langbot/pkg/pipeline/pipelinemgr.py
@@ -157,7 +157,7 @@ class RuntimePipeline:
bot_message=query.resp_messages[-1],
message=result.user_notice,
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
- is_final=[msg.is_final for msg in query.resp_messages][0],
+ is_final=[msg.is_final for msg in query.resp_messages][-1],
)
else:
await query.adapter.reply_message(
diff --git a/src/langbot/pkg/pipeline/pool.py b/src/langbot/pkg/pipeline/pool.py
index d2d4563b..55ce7fe1 100644
--- a/src/langbot/pkg/pipeline/pool.py
+++ b/src/langbot/pkg/pipeline/pool.py
@@ -42,9 +42,13 @@ class QueryPool:
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False,
+ variables: typing.Optional[dict[str, typing.Any]] = None,
) -> pipeline_query.Query:
async with self.condition:
query_id = self.query_id_counter
+ initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
+ if variables:
+ initial_variables.update(variables)
query = pipeline_query.Query(
bot_uuid=bot_uuid,
query_id=query_id,
@@ -53,7 +57,7 @@ class QueryPool:
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
- variables={'_routed_by_rule': routed_by_rule},
+ variables=initial_variables,
resp_messages=[],
resp_message_chain=[],
adapter=adapter,
diff --git a/src/langbot/pkg/pipeline/respback/respback.py b/src/langbot/pkg/pipeline/respback/respback.py
index 574404bc..39cba04a 100644
--- a/src/langbot/pkg/pipeline/respback/respback.py
+++ b/src/langbot/pkg/pipeline/respback/respback.py
@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
# TODO 命令与流式的兼容性问题
if await query.adapter.is_stream_output_supported() and has_chunks:
- is_final = [msg.is_final for msg in query.resp_messages][0]
+ is_final = [msg.is_final for msg in query.resp_messages][-1]
await query.adapter.reply_message_chunk(
message_source=query.message_event,
bot_message=query.resp_messages[-1],
diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py
index 8e99618c..6e995206 100644
--- a/src/langbot/pkg/platform/botmgr.py
+++ b/src/langbot/pkg/platform/botmgr.py
@@ -501,6 +501,8 @@ class PlatformManager:
bot_entity.adapter_config,
logger,
)
+ if hasattr(adapter_inst, 'ap'):
+ adapter_inst.ap = self.ap
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook)
if hasattr(adapter_inst, 'set_bot_uuid'):
diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py
index f0938f56..7ed0b8c9 100644
--- a/src/langbot/pkg/platform/sources/lark.py
+++ b/src/langbot/pkg/platform/sources/lark.py
@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
+import langbot_plugin.api.entities.builtin.provider.session as provider_session
class AESCipher(object):
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
+ ap: typing.Any = pydantic.Field(exclude=True, default=None)
bot_account_id: str # 用于在流水线中识别at是否是本bot,直接以bot_name作为标识
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
pending_monitoring_msg: dict[str, str]
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
reply_to_monitoring_msg: dict[str, tuple[str, float]]
+ reply_message_card_ids: dict[str, str]
+ card_sequence_dict: dict[str, int]
+ # card_id → set of source message ids registered against it (for cleanup)
+ card_id_to_source_ids: dict[str, set[str]]
+ # card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
+ card_streaming_text: dict[str, str]
+ # card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
+ card_pre_pause_text: dict[str, str]
+ # set of card_ids that have already transitioned from "buttons visible" to "resume layout"
+ card_resume_transitioned: set[str]
_MONITORING_MAPPING_TTL = 600 # 10 minutes
seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识
@@ -812,11 +824,131 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
asyncio.create_task(on_message(event))
+ def schedule_on_app_loop(coro):
+ """Run a coroutine on the application event loop from sync callbacks."""
+ return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
+
def sync_on_card_action(event):
try:
- action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {})
+ action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
+ # Parse JSON string values (from form action buttons)
+ if isinstance(action_value_raw, str):
+ try:
+ action_value_obj = json.loads(action_value_raw)
+ except (json.JSONDecodeError, TypeError):
+ action_value_obj = {}
+ else:
+ action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
+ # Handle Dify form action button clicks
+ if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
+ form_token = action_value_obj.get('form_token', '')
+ workflow_run_id = action_value_obj.get('workflow_run_id', '')
+ action_id = action_value_obj.get('action_id', '')
+ session_key = action_value_obj.get('session_key', '')
+
+ if session_key.startswith('group_') or session_key.startswith('g:'):
+ launcher_type = provider_session.LauncherTypes.GROUP
+ launcher_id = (
+ session_key.split(':', 1)[1]
+ if session_key.startswith('g:')
+ else session_key[len('group_') :]
+ )
+ else:
+ launcher_type = provider_session.LauncherTypes.PERSON
+ launcher_id = (
+ session_key.split(':', 1)[1]
+ if session_key.startswith('p:')
+ else session_key[len('person_') :]
+ )
+
+ # Find the bot entity to get bot_uuid and pipeline_uuid
+ bot_uuid = ''
+ pipeline_uuid = None
+ for bot in self.ap.platform_mgr.bots:
+ if bot.adapter is self:
+ bot_uuid = bot.bot_entity.uuid
+ pipeline_uuid = bot.bot_entity.use_pipeline_uuid
+ break
+
+ form_action_data = {
+ 'form_token': form_token,
+ 'workflow_run_id': workflow_run_id,
+ 'action_id': action_id,
+ 'user': f'{launcher_type.value}_{launcher_id}',
+ 'inputs': {},
+ }
+
+ context = getattr(event.event, 'context', None)
+ open_message_id = getattr(context, 'open_message_id', None)
+ source_time = datetime.datetime.now()
+ event_time = source_time.timestamp()
+ action_text = action_value_obj.get('action_id', 'confirm')
+ message_chain = platform_message.MessageChain(
+ [platform_message.Plain(text=f'[Form Action: {action_text}]')]
+ )
+ if open_message_id:
+ message_chain.insert(
+ 0,
+ platform_message.Source(
+ id=open_message_id,
+ time=source_time,
+ ),
+ )
+
+ operator = getattr(event.event, 'operator', None)
+ user_id = (
+ getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
+ )
+
+ if launcher_type == provider_session.LauncherTypes.GROUP:
+ synthetic_event = platform_events.GroupMessage(
+ sender=platform_entities.GroupMember(
+ id=user_id,
+ member_name='',
+ permission=platform_entities.Permission.Member,
+ group=platform_entities.Group(
+ id=launcher_id,
+ name='',
+ permission=platform_entities.Permission.Member,
+ ),
+ ),
+ message_chain=message_chain,
+ time=event_time,
+ source_platform_object=event,
+ )
+ else:
+ synthetic_event = platform_events.FriendMessage(
+ sender=platform_entities.Friend(
+ id=user_id,
+ nickname='',
+ remark='',
+ ),
+ message_chain=message_chain,
+ time=event_time,
+ source_platform_object=event,
+ )
+
+ async def add_form_action_query():
+ await self.ap.query_pool.add_query(
+ bot_uuid=bot_uuid,
+ launcher_type=launcher_type,
+ launcher_id=launcher_id,
+ sender_id=user_id,
+ message_event=synthetic_event,
+ message_chain=message_chain,
+ adapter=self,
+ pipeline_uuid=pipeline_uuid,
+ variables={'_dify_form_action': form_action_data},
+ )
+
+ schedule_on_app_loop(add_form_action_query())
+
+ from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
+
+ return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
+
if action_value == '有帮助':
feedback_type = 1
elif action_value == '无帮助':
@@ -857,17 +989,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
)
if platform_events.FeedbackEvent in self.listeners:
- loop = asyncio.get_event_loop()
- if loop.is_running():
- asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
- else:
- loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
+ schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
except Exception:
- asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
+ traceback.print_exc()
+ schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
@@ -893,6 +1022,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_id_dict={},
pending_monitoring_msg={},
reply_to_monitoring_msg={},
+ reply_message_card_ids={},
+ card_sequence_dict={},
+ card_id_to_source_ids={},
+ card_streaming_text={},
+ card_pre_pause_text={},
+ card_resume_transitioned=set(),
seq=1,
listeners={},
quart_app=quart_app,
@@ -1132,6 +1267,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
for k in expired:
del self.reply_to_monitoring_msg[k]
+ def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
+ """Return the next strictly increasing sequence for a card update."""
+ current = self.card_sequence_dict.get(card_id, 0)
+ next_seq = max(current + 1, suggested)
+ self.card_sequence_dict[card_id] = next_seq
+ return next_seq
+
+ def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
+ """Register a card_id under one or more source message ids."""
+ bucket = self.card_id_to_source_ids.setdefault(card_id, set())
+ for sid in source_ids:
+ if not sid:
+ continue
+ self.reply_message_card_ids[sid] = card_id
+ bucket.add(sid)
+
+ def _drop_card_state(self, card_id: str) -> None:
+ """Pop all per-card state for the given card_id."""
+ if not card_id:
+ return
+ for sid in self.card_id_to_source_ids.pop(card_id, set()):
+ self.reply_message_card_ids.pop(sid, None)
+ self.card_sequence_dict.pop(card_id, None)
+ self.card_streaming_text.pop(card_id, None)
+ self.card_pre_pause_text.pop(card_id, None)
+ self.card_resume_transitioned.discard(card_id)
+
async def create_card_id(self, message_id):
try:
# self.logger.debug('飞书支持stream输出,创建卡片......')
@@ -1327,6 +1489,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id
+ self.card_sequence_dict[card_id] = 0
return card_id
except Exception as e:
@@ -1339,6 +1502,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""
# message_id = event.message_chain.message_id
+ source_message_id = str(event.message_chain.message_id)
+ existing_card_id = self.reply_message_card_ids.get(source_message_id)
+ if existing_card_id:
+ self.card_id_dict[message_id] = existing_card_id
+ return True
+
card_id = await self.create_card_id(message_id)
content = {
'type': 'card',
@@ -1377,6 +1546,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
user_msg_id = event.message_chain.message_id
reply_msg_id = getattr(response.data, 'message_id', None)
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
+ # Register the card under both the user-incoming msg id (so a
+ # second reply_message_first_chunk for the same user message
+ # reuses this card) AND the bot-reply msg id (so a synthetic
+ # event from a form-button callback — whose Source.id equals
+ # the bot's card message id — hits the same card and renders
+ # the resume content into it).
+ if reply_msg_id:
+ self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
+ else:
+ self._register_card_for_source(card_id, str(user_msg_id))
if reply_msg_id and monitoring_msg_id:
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
self._cleanup_monitoring_mapping()
@@ -1504,45 +1683,462 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
):
"""
回复消息变成更新卡片消息
+
+ Supports Dify form-action resume: when the runner yields a chunk with
+ ``_resume_from_form=True``, the card transitions from buttons to a
+ grey "已选择" notice and a new ``streaming_txt_resume`` element is added
+ for subsequent resume chunks to stream into.
+
+ When ``_open_new_card=True`` on the final chunk, the existing card is
+ left as-is and the pipeline will create a new card (with fresh form
+ buttons) for the re-pause.
"""
- # self.seq += 1
message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence
- if msg_seq % 8 == 0 or is_final:
- text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
- text_message = ''
- if text_elements:
- parts = []
- for paragraph in text_elements:
- para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
- if para_text:
- parts.append(para_text)
- text_message = '\n\n'.join(parts)
+ form_data = getattr(bot_message, '_form_data', None)
+ resume_from = getattr(bot_message, '_resume_from_form', False)
+ action_title = getattr(bot_message, '_resume_action_title', '')
+ open_new_card = getattr(bot_message, '_open_new_card', False)
+ selected_notice = f'> **已选择**:{action_title}' if action_title else ''
- # content = {
- # 'type': 'card_json',
- # 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}},
- # }
+ # ── decide whether this chunk needs a card update ────────────────────
+ card_id = self.card_id_dict.get(message_id)
+ if not card_id:
+ return
- request: ContentCardElementRequest = (
- ContentCardElementRequest.builder()
- .card_id(self.card_id_dict[message_id])
- .element_id('streaming_txt')
- .request_body(
- ContentCardElementRequestBody.builder()
- # .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204")
- .content(text_message)
- .sequence(msg_seq)
+ # ── convert message chain → text ─────────────────────────────────────
+ text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
+
+ text_message = ''
+ if text_elements:
+ parts = []
+ for paragraph in text_elements:
+ para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
+ if para_text:
+ parts.append(para_text)
+ text_message = '\n\n'.join(parts)
+
+ tenant_key = (
+ message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
+ )
+ app_access_token = self.get_app_access_token()
+ tenant_access_token = self.get_tenant_access_token(tenant_key)
+ req_opt: RequestOption = (
+ RequestOption.builder()
+ .app_ticket(self.app_ticket)
+ .tenant_key(tenant_key)
+ .app_access_token(app_access_token)
+ .tenant_access_token(tenant_access_token)
+ .build()
+ )
+
+ card_sequence = self._next_card_sequence(card_id, msg_seq)
+
+ # ── RESUME: first chunk after button click ───────────────────────────
+ if resume_from and card_id not in self.card_resume_transitioned:
+ # Transition the card from the form state into resume mode.
+ # Preserve the text that was shown before the pause, and seed the
+ # resume placeholder with the current resume content if we already
+ # have any on the first yielded chunk.
+ pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
+ initial_resume_text = text_message or '\u200b'
+ await self._update_card_layout(
+ card_id=card_id,
+ message_source=message_source,
+ text_message=pre_pause_text,
+ sequence=card_sequence,
+ form_data=None,
+ notice_text=selected_notice,
+ resume_placeholder_text=initial_resume_text,
+ )
+ self.card_resume_transitioned.add(card_id)
+ self.card_pre_pause_text[card_id] = pre_pause_text
+ self.card_streaming_text[card_id] = text_message
+ if not is_final:
+ return
+
+ # ── RESUME: subsequent chunks → full card update ─────────────────────
+ if resume_from and card_id in self.card_resume_transitioned:
+ cached = self.card_streaming_text.get(card_id, '')
+ if text_message != cached:
+ self.card_streaming_text[card_id] = text_message
+ pre_pause_text = self.card_pre_pause_text.get(card_id, '')
+ await self._update_card_layout(
+ card_id=card_id,
+ message_source=message_source,
+ text_message=pre_pause_text,
+ sequence=card_sequence,
+ form_data=None,
+ notice_text=selected_notice,
+ resume_placeholder_text=text_message,
+ )
+ if not is_final:
+ return
+
+ # ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
+ if not resume_from and (msg_seq % 8 == 0 or is_final):
+ cached = self.card_streaming_text.get(card_id)
+ if text_message != cached:
+ self.card_streaming_text[card_id] = text_message
+ request: ContentCardElementRequest = (
+ ContentCardElementRequest.builder()
+ .card_id(card_id)
+ .element_id('streaming_txt')
+ .request_body(
+ ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
+ )
.build()
)
- .build()
+ response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
+ request, req_opt
+ )
+ if not response.success():
+ raise Exception(
+ f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
+ f'msg: {response.msg}, log_id: {response.get_log_id()}, '
+ f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
+ )
+
+ # ── FINAL chunk: full card layout update ─────────────────────────────
+ if is_final:
+ final_seq = self._next_card_sequence(card_id, card_sequence + 1)
+ pre_pause = self.card_pre_pause_text.get(card_id, text_message)
+ resume_cached = self.card_streaming_text.get(card_id, '')
+ if form_data:
+ # Keep the current card and render the human-input action area.
+ # Initial pause uses the latest stream text, while re-pause after
+ # a button click preserves the pre-pause text and keeps the resume
+ # stream content in the extra placeholder area.
+ final_text = pre_pause if open_new_card else text_message
+ final_resume_placeholder = resume_cached if open_new_card else ''
+ await self._update_card_layout(
+ card_id=card_id,
+ message_source=message_source,
+ text_message=final_text,
+ sequence=final_seq,
+ form_data=form_data,
+ resume_placeholder_text=final_resume_placeholder,
+ show_form_prompt=not open_new_card,
+ )
+ if open_new_card:
+ self.card_streaming_text.pop(card_id, None)
+ self.card_pre_pause_text.pop(card_id, None)
+ else:
+ # The human-input prompt itself is rendered as buttons only
+ # on Lark, so do not keep the hidden fallback text around;
+ # otherwise it will resurface after the button click.
+ self.card_streaming_text[card_id] = ''
+ self.card_pre_pause_text[card_id] = ''
+ else:
+ # Normal finish: keep pre-pause + resume content visible,
+ # remove buttons/notice, drop the resume placeholder.
+ await self._update_card_layout(
+ card_id=card_id,
+ message_source=message_source,
+ text_message=pre_pause,
+ sequence=final_seq,
+ form_data=None,
+ notice_text=selected_notice if resume_from else '',
+ resume_placeholder_text=resume_cached,
+ )
+ self._drop_card_state(card_id)
+ self.card_id_dict.pop(message_id, None)
+
+ # ── media (images / files) appended at the end ───────────────────────
+ if is_final and media_items:
+ for media in media_items:
+ media_request: ReplyMessageRequest = (
+ ReplyMessageRequest.builder()
+ .message_id(message_source.message_chain.message_id)
+ .request_body(
+ ReplyMessageRequestBody.builder()
+ .content(json.dumps(media['content']))
+ .msg_type(media['msg_type'])
+ .reply_in_thread(False)
+ .uuid(str(uuid.uuid4()))
+ .build()
+ )
+ .build()
+ )
+ media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
+ media_request, req_opt
+ )
+ if not media_response.success():
+ raise Exception(
+ f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
+ )
+
+ async def _add_form_buttons_to_card(
+ self,
+ card_id: str,
+ message_source: platform_events.MessageEvent,
+ form_data: dict,
+ text_message: str = '',
+ sequence: int = 1,
+ ):
+ """Update the entire card to include form action buttons.
+
+ Uses card.aupdate to replace the card JSON with a template that
+ includes the streaming text content plus interactive buttons.
+ """
+ await self._update_card_layout(
+ card_id=card_id,
+ message_source=message_source,
+ text_message=text_message,
+ sequence=sequence,
+ form_data=form_data,
+ )
+
+ async def _remove_form_buttons_from_card(
+ self,
+ card_id: str,
+ message_source: platform_events.MessageEvent,
+ text_message: str = '',
+ sequence: int = 1,
+ ):
+ """Replace the human-input card layout with the plain final layout."""
+ await self._update_card_layout(
+ card_id=card_id,
+ message_source=message_source,
+ text_message=text_message,
+ sequence=sequence,
+ form_data=None,
+ )
+
+ async def _update_card_layout(
+ self,
+ card_id: str,
+ message_source: platform_events.MessageEvent,
+ text_message: str = '',
+ sequence: int = 1,
+ form_data: dict | None = None,
+ notice_text: str = '',
+ resume_placeholder_text: str = '',
+ show_form_prompt: bool = True,
+ ):
+ """Update the entire card layout.
+
+ • form_data → show interactive buttons (initial Dify pause)
+ • notice_text → replace buttons with a grey "已选择" notice (resume transition)
+ • resume_placeholder_text → add a streaming_txt_resume markdown element
+ """
+ form_data = form_data or {}
+ actions = form_data.get('actions', [])
+ form_token = form_data.get('form_token', '')
+ workflow_run_id = form_data.get('workflow_run_id', '')
+ node_title = form_data.get('node_title', '') or 'Human Input Required'
+ form_content = form_data.get('form_content', '')
+
+ # When form_data is set, the visible content is rendered inside the
+ # interactive container, so the top streaming text should stay empty
+ # to avoid duplicate text above the action area.
+ #
+ # For resume notice state, keep the existing text visible in the card
+ # and only add the grey "selected" notice below it.
+ if form_data:
+ render_text_message = ''
+ else:
+ render_text_message = text_message
+
+ # Determine session key from message source
+ if isinstance(message_source, platform_events.GroupMessage):
+ session_key = f'group_{message_source.group.id}'
+ else:
+ session_key = f'person_{message_source.sender.id}'
+
+ # Build button elements matching the existing card template's thumbsup/down format
+ action_buttons = []
+ for action in actions:
+ action_id = action.get('id', '')
+ action_title = action.get('title', action_id)
+ button_style = action.get('button_style', 'default')
+
+ if button_style == 'primary':
+ lark_button_type = 'primary'
+ elif button_style == 'danger':
+ lark_button_type = 'danger'
+ else:
+ lark_button_type = 'default'
+
+ action_buttons.append(
+ {
+ 'tag': 'button',
+ 'text': {'tag': 'plain_text', 'content': action_title},
+ 'type': lark_button_type,
+ 'width': 'fill',
+ 'size': 'medium',
+ 'hover_tips': {'tag': 'plain_text', 'content': action_title},
+ 'behaviors': [
+ {
+ 'type': 'callback',
+ 'value': {
+ 'form_action': True,
+ 'form_token': form_token,
+ 'workflow_run_id': workflow_run_id,
+ 'action_id': action_id,
+ 'session_key': session_key,
+ },
+ }
+ ],
+ 'margin': '0px 0px 0px 0px',
+ }
)
- if is_final and bot_message.tool_calls is None:
- # self.seq = 1 # 消息回复结束之后重置seq
- self.card_id_dict.pop(message_id) # 清理已经使用过的卡片
+ interactive_elements = []
+ if form_data:
+ if show_form_prompt:
+ interactive_elements = [
+ {
+ 'tag': 'markdown',
+ 'content': f'**[Human Input Required] {node_title}**',
+ 'text_align': 'left',
+ 'text_size': 'normal',
+ 'margin': '0px 0px 4px 0px',
+ }
+ ]
+ if form_content:
+ interactive_elements.append(
+ {
+ 'tag': 'markdown',
+ 'content': form_content,
+ 'text_align': 'left',
+ 'text_size': 'normal',
+ 'margin': '0px 0px 8px 0px',
+ }
+ )
+ interactive_elements.append(
+ {
+ 'tag': 'column_set',
+ 'horizontal_spacing': '8px',
+ 'horizontal_align': 'left',
+ 'margin': '0px 0px 0px 0px',
+ 'columns': [
+ {
+ 'tag': 'column',
+ 'width': 'weighted',
+ 'elements': [btn],
+ 'padding': '0px 0px 0px 0px',
+ }
+ for btn in action_buttons
+ ],
+ }
+ )
+ # Build the full card JSON with buttons, same structure as create_card_id
+ # ── mid_section: either form buttons, resume notice, or empty ──
+ mid_section_elements = []
+ if form_data:
+ mid_section_elements = [
+ {
+ 'tag': 'interactive_container',
+ 'margin': '12px 0px 8px 0px',
+ 'padding': '12px 12px 12px 12px',
+ 'has_border': True,
+ 'elements': interactive_elements,
+ },
+ {'tag': 'hr', 'margin': '0px 0px 0px 0px'},
+ ]
+ elif notice_text:
+ mid_section_elements = [
+ {
+ 'tag': 'markdown',
+ 'content': notice_text,
+ 'text_align': 'left',
+ 'text_size': 'normal',
+ 'margin': '8px 0px 4px 0px',
+ 'text_color': 'grey',
+ },
+ {'tag': 'hr', 'margin': '0px 0px 0px 0px'},
+ ]
+
+ # ── resume placeholder element (empty, filled via acontent on each chunk) ──
+ resume_elements = []
+ if resume_placeholder_text:
+ resume_elements = [
+ {
+ 'tag': 'markdown',
+ 'content': resume_placeholder_text,
+ 'text_align': 'left',
+ 'text_size': 'normal',
+ 'margin': '0px 0px 0px 0px',
+ 'element_id': 'streaming_txt_resume',
+ },
+ ]
+
+ card_data = {
+ 'schema': '2.0',
+ 'config': {
+ 'update_multi': True,
+ 'streaming_mode': False,
+ },
+ 'body': {
+ 'direction': 'vertical',
+ 'padding': '12px 12px 12px 12px',
+ 'elements': [
+ {
+ 'tag': 'div',
+ 'text': {
+ 'tag': 'plain_text',
+ 'content': 'LangBot',
+ 'text_size': 'normal',
+ 'text_align': 'left',
+ 'text_color': 'default',
+ },
+ 'icon': {
+ 'tag': 'custom_icon',
+ 'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
+ },
+ },
+ {
+ 'tag': 'markdown',
+ 'content': render_text_message,
+ 'text_align': 'left',
+ 'text_size': 'normal',
+ 'margin': '0px 0px 0px 0px',
+ 'element_id': 'streaming_txt',
+ },
+ *mid_section_elements,
+ *resume_elements,
+ {
+ 'tag': 'column_set',
+ 'horizontal_spacing': '12px',
+ 'horizontal_align': 'right',
+ 'columns': [
+ {
+ 'tag': 'column',
+ 'width': 'weighted',
+ 'elements': [
+ {
+ 'tag': 'markdown',
+ 'content': '以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看',
+ 'text_align': 'left',
+ 'text_size': 'notation',
+ 'margin': '4px 0px 0px 0px',
+ 'icon': {
+ 'tag': 'standard_icon',
+ 'token': 'robot_outlined',
+ 'color': 'grey',
+ },
+ }
+ ],
+ 'padding': '0px 0px 0px 0px',
+ 'direction': 'vertical',
+ 'horizontal_spacing': '8px',
+ 'vertical_spacing': '8px',
+ 'horizontal_align': 'left',
+ 'vertical_align': 'top',
+ 'margin': '0px 0px 0px 0px',
+ 'weight': 1,
+ }
+ ],
+ 'margin': '0px 0px 4px 0px',
+ },
+ ],
+ },
+ }
+
+ try:
tenant_key = (
message_source.source_platform_object.header.tenant_key
if message_source.source_platform_object
@@ -1558,39 +2154,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
.tenant_access_token(tenant_access_token)
.build()
)
- # 发起请求
- response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
- # 处理失败返回
- if not response.success():
- raise Exception(
- f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
+ request: UpdateCardRequest = (
+ UpdateCardRequest.builder()
+ .card_id(card_id)
+ .request_body(
+ UpdateCardRequestBody.builder()
+ .sequence(sequence)
+ .uuid(str(uuid.uuid4()))
+ .card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
+ .build()
)
- return
-
- # Send media messages when streaming is done
- if is_final and media_items:
- for media in media_items:
- media_request: ReplyMessageRequest = (
- ReplyMessageRequest.builder()
- .message_id(message_source.message_chain.message_id)
- .request_body(
- ReplyMessageRequestBody.builder()
- .content(json.dumps(media['content']))
- .msg_type(media['msg_type'])
- .reply_in_thread(False)
- .uuid(str(uuid.uuid4()))
- .build()
- )
- .build()
- )
- media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
- media_request, req_opt
- )
- if not media_response.success():
- raise Exception(
- f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
- )
+ .build()
+ )
+ response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
+ if not response.success():
+ await self.logger.error(
+ f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
+ f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
+ )
+ except Exception:
+ await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
async def is_muted(self, group_id: int) -> bool:
return False
diff --git a/src/langbot/pkg/platform/sources/telegram.py b/src/langbot/pkg/platform/sources/telegram.py
index 1833975b..c1d9ab68 100644
--- a/src/langbot/pkg/platform/sources/telegram.py
+++ b/src/langbot/pkg/platform/sources/telegram.py
@@ -4,11 +4,12 @@ import time
import telegram
import telegram.ext
-from telegram import Update
-from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
+from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
+from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
import telegramify_markdown
import typing
import traceback
+import json
import base64
import pydantic
@@ -189,6 +190,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: telegram.Bot = pydantic.Field(exclude=True)
application: telegram.ext.Application = pydantic.Field(exclude=True)
+ ap: typing.Any = pydantic.Field(exclude=True, default=None)
message_converter: TelegramMessageConverter = TelegramMessageConverter()
event_converter: TelegramEventConverter = TelegramEventConverter()
@@ -224,6 +226,95 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
telegram_callback,
)
)
+
+ async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
+ query = update.callback_query
+ await query.answer()
+ try:
+ data = json.loads(query.data)
+ if data.get('form_action') or data.get('f'):
+ import langbot_plugin.api.entities.builtin.provider.session as provider_session
+
+ workflow_run_id = data.get('workflow_run_id') or data.get('w', '')
+ action_id = data.get('action_id') or data.get('a', '')
+ session_key = data.get('session_key') or data.get('s', '')
+
+ if session_key.startswith('group_') or session_key.startswith('g:'):
+ launcher_type = provider_session.LauncherTypes.GROUP
+ launcher_id = (
+ session_key.split(':', 1)[1]
+ if session_key.startswith('g:')
+ else session_key[len('group_') :]
+ )
+ else:
+ launcher_type = provider_session.LauncherTypes.PERSON
+ launcher_id = (
+ session_key.split(':', 1)[1]
+ if session_key.startswith('p:')
+ else session_key[len('person_') :]
+ )
+
+ user_id = str(query.from_user.id)
+
+ # Find bot_uuid and pipeline_uuid
+ bot_uuid = ''
+ pipeline_uuid = None
+ for b in self.ap.platform_mgr.bots:
+ if b.adapter is self:
+ bot_uuid = b.bot_entity.uuid
+ pipeline_uuid = b.bot_entity.use_pipeline_uuid
+ break
+
+ form_action_data = {
+ 'workflow_run_id': workflow_run_id,
+ 'action_id': action_id,
+ 'user': f'{launcher_type.value}_{launcher_id}',
+ 'inputs': {},
+ }
+
+ message_chain = platform_message.MessageChain(
+ [platform_message.Plain(text=f'[Form Action: {action_id}]')]
+ )
+
+ if launcher_type == provider_session.LauncherTypes.GROUP:
+ synthetic_event = platform_events.GroupMessage(
+ sender=platform_entities.GroupMember(
+ id=user_id,
+ member_name='',
+ permission=platform_entities.Permission.Member,
+ group=platform_entities.Group(
+ id=launcher_id,
+ name='',
+ permission=platform_entities.Permission.Member,
+ ),
+ ),
+ message_chain=message_chain,
+ )
+ else:
+ synthetic_event = platform_events.FriendMessage(
+ sender=platform_entities.Friend(
+ id=user_id,
+ nickname='',
+ remark='',
+ ),
+ message_chain=message_chain,
+ )
+
+ await self.ap.query_pool.add_query(
+ bot_uuid=bot_uuid,
+ launcher_type=launcher_type,
+ launcher_id=launcher_id,
+ sender_id=user_id,
+ message_event=synthetic_event,
+ message_chain=message_chain,
+ adapter=self,
+ pipeline_uuid=pipeline_uuid,
+ variables={'_dify_form_action': form_action_data},
+ )
+ except Exception:
+ await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
+
+ application.add_handler(CallbackQueryHandler(callback_query_handler))
super().__init__(
config=config,
logger=logger,
@@ -369,6 +460,11 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
del args['draft_id']
await self.bot.send_message(**args)
self.msg_stream_id.pop(message_id)
+
+ # Send form action buttons if form data is present
+ form_data = getattr(bot_message, '_form_data', None)
+ if form_data:
+ await self._send_form_action_buttons(message_source, form_data)
else:
stream_id = draft_id
if (msg_seq - 1) % 8 == 0 or is_final:
@@ -384,6 +480,51 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id)
+ # Send form action buttons if form data is present
+ form_data = getattr(bot_message, '_form_data', None)
+ if form_data:
+ await self._send_form_action_buttons(message_source, form_data)
+
+ async def _send_form_action_buttons(
+ self,
+ message_source: platform_events.MessageEvent,
+ form_data: dict,
+ ):
+ """Send inline keyboard buttons for Dify human_input_required form actions."""
+ actions = form_data.get('actions', [])
+ node_title = form_data.get('node_title', '')
+
+ if isinstance(message_source, platform_events.GroupMessage):
+ session_key = f'g:{message_source.group.id}'
+ else:
+ session_key = f'p:{message_source.sender.id}'
+
+ keyboard = []
+ for action in actions:
+ action_id = action.get('id', '')
+ action_title = action.get('title', action_id)
+ callback_data = json.dumps(
+ {'f': 1, 'a': action_id, 's': session_key},
+ separators=(',', ':'),
+ )
+ keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
+
+ reply_markup = InlineKeyboardMarkup(keyboard)
+
+ update = message_source.source_platform_object
+ chat_id = update.effective_chat.id
+ message_thread_id = update.message.message_thread_id
+
+ args = {
+ 'chat_id': chat_id,
+ 'text': f'[{node_title}] Please select an action:',
+ 'reply_markup': reply_markup,
+ }
+ if message_thread_id:
+ args['message_thread_id'] = message_thread_id
+
+ await self.bot.send_message(**args)
+
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
if not isinstance(event.source_platform_object, Update):
return None
diff --git a/src/langbot/pkg/provider/runners/difysvapi.py b/src/langbot/pkg/provider/runners/difysvapi.py
index 039bf33a..b871a324 100644
--- a/src/langbot/pkg/provider/runners/difysvapi.py
+++ b/src/langbot/pkg/provider/runners/difysvapi.py
@@ -2,6 +2,7 @@ from __future__ import annotations
import typing
import json
+import time
import uuid
import base64
import mimetypes
@@ -16,6 +17,46 @@ from langbot.libs.dify_service_api.v1 import client, errors
import httpx
+# Module-level store for paused-workflow form state, keyed by session key
+# (launcher_type_value + "_" + launcher_id). Keeps state out of the SDK
+# Conversation type, which may not accept arbitrary attribute assignment.
+_PENDING_FORMS: dict[str, dict[str, typing.Any]] = {}
+_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
+
+
+def _session_key_from_query(query: pipeline_query.Query) -> str:
+ return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
+
+
+def _prune_pending_forms(now: float | None = None) -> None:
+ if now is None:
+ now = time.time()
+ expired = [key for key, data in _PENDING_FORMS.items() if data.get('_expires_at', 0) <= now]
+ for key in expired:
+ _PENDING_FORMS.pop(key, None)
+
+
+def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
+ _prune_pending_forms()
+ stored = dict(form_data)
+ expiration_time = stored.get('expiration_time')
+ try:
+ expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
+ except (TypeError, ValueError):
+ expiration_ts = 0.0
+ stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
+ _PENDING_FORMS[session_key] = stored
+
+
+def _get_pending_form(session_key: str) -> dict[str, typing.Any] | None:
+ _prune_pending_forms()
+ return _PENDING_FORMS.get(session_key)
+
+
+def _clear_pending_form(session_key: str) -> None:
+ _PENDING_FORMS.pop(session_key, None)
+
+
@runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器"""
@@ -335,11 +376,107 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id']
+ async def _submit_workflow_form_blocking(
+ self, form_action: dict
+ ) -> typing.AsyncGenerator[provider_message.Message, None]:
+ """Submit human input to resume a paused Dify workflow (non-streaming)."""
+
+ form_token = form_action['form_token']
+ workflow_run_id = form_action['workflow_run_id']
+ user = form_action['user']
+ action_id = form_action.get('action_id', '')
+ inputs = form_action.get('inputs', {})
+
+ async for chunk in self.dify_client.workflow_submit(
+ form_token=form_token,
+ workflow_run_id=workflow_run_id,
+ inputs=inputs,
+ user=user,
+ action=action_id,
+ timeout=120,
+ ):
+ self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
+
+ if chunk['event'] == 'workflow_finished':
+ if chunk['data'].get('error'):
+ raise errors.DifyAPIError(chunk['data']['error'])
+ content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
+ yield provider_message.Message(
+ role='assistant',
+ content=content,
+ )
+
+ def _merge_pending_form_action(self, form_action: dict | None, pending_form: dict | None) -> dict | None:
+ """Backfill resume fields from the pending form stored on the conversation."""
+ if not form_action:
+ return None
+
+ merged_action = dict(form_action)
+ if pending_form:
+ merged_action.setdefault('form_token', pending_form.get('form_token', ''))
+ merged_action.setdefault('workflow_run_id', pending_form.get('workflow_run_id', ''))
+ merged_action.setdefault('inputs', pending_form.get('inputs', {}))
+ merged_action.setdefault('user', pending_form.get('user', ''))
+
+ # Resolve clicked action's display title from the stored actions list
+ if 'action_title' not in merged_action:
+ clicked_id = merged_action.get('action_id', '')
+ for action in pending_form.get('actions', []):
+ if str(action.get('id', '')) == str(clicked_id):
+ merged_action['action_title'] = action.get('title', clicked_id)
+ break
+
+ return merged_action
+
+ def _match_pending_form_action(self, user_text: str, pending_form: dict | None) -> dict | None:
+ """Match plain text replies against pending Dify form actions."""
+ if not pending_form:
+ return None
+
+ normalized_text = user_text.strip().lower()
+ if not normalized_text:
+ return None
+
+ for action in pending_form.get('actions', []):
+ titles = {
+ str(action.get('title', '')).strip().lower(),
+ str(action.get('id', '')).strip().lower(),
+ }
+ if normalized_text in titles:
+ return {
+ 'form_token': pending_form.get('form_token', ''),
+ 'workflow_run_id': pending_form.get('workflow_run_id', ''),
+ 'action_id': action.get('id', ''),
+ 'action_title': action.get('title', action.get('id', '')),
+ 'inputs': pending_form.get('inputs', {}),
+ 'user': pending_form.get('user', ''),
+ }
+
+ return None
+
async def _workflow_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流"""
+ # Check if this is a form action resume (button click or text match)
+ form_action = query.variables.get('_dify_form_action')
+ session_key = _session_key_from_query(query)
+ pending_form = _get_pending_form(session_key)
+
+ if form_action:
+ form_action = self._merge_pending_form_action(form_action, pending_form)
+ _clear_pending_form(session_key)
+ elif pending_form:
+ form_action = self._match_pending_form_action(str(query.message_chain), pending_form)
+ if form_action:
+ _clear_pending_form(session_key)
+
+ if form_action:
+ async for msg in self._submit_workflow_form_blocking(form_action):
+ yield msg
+ return
+
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -366,6 +503,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
}
inputs.update(query.variables)
+ human_input_yielded = False
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
@@ -377,6 +515,46 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] in ignored_events:
continue
+ if chunk['event'] == 'workflow_paused':
+ reasons = chunk['data'].get('reasons', [])
+ workflow_run_id = chunk['data'].get('workflow_run_id', '')
+ for reason in reasons:
+ if reason.get('TYPE') == 'human_input_required':
+ form_content = reason.get('form_content', '')
+ actions = reason.get('actions', [])
+ node_title = reason.get('node_title', '')
+
+ _set_pending_form(
+ _session_key_from_query(query),
+ {
+ 'workflow_run_id': workflow_run_id,
+ 'form_id': reason.get('form_id'),
+ 'form_token': reason.get('form_token'),
+ 'node_id': reason.get('node_id'),
+ 'node_title': node_title,
+ 'form_content': form_content,
+ 'inputs': reason.get('inputs', {}),
+ 'actions': actions,
+ 'expiration_time': reason.get('expiration_time'),
+ 'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
+ },
+ )
+
+ query.variables['_dify_form_render'] = {
+ 'form_content': form_content,
+ 'actions': actions,
+ 'node_title': node_title,
+ }
+
+ action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
+ display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
+
+ human_input_yielded = True
+ yield provider_message.Message(
+ role='assistant',
+ content=display_text,
+ )
+
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
@@ -399,6 +577,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
elif chunk['event'] == 'workflow_finished':
+ if human_input_yielded:
+ break
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
@@ -636,11 +816,143 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id']
+ async def _submit_workflow_form(
+ self, form_action: dict
+ ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
+ """Submit human input to resume a paused Dify workflow."""
+
+ form_token = form_action['form_token']
+ workflow_run_id = form_action['workflow_run_id']
+ user = form_action['user']
+ action_id = form_action.get('action_id', '')
+ action_title = form_action.get('action_title', '') or action_id
+ inputs = form_action.get('inputs', {})
+
+ messsage_idx = 0
+ is_final = False
+ think_start = False
+ think_end = False
+ workflow_contents = ''
+ repause_form_data: dict | None = None
+
+ remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
+ async for chunk in self.dify_client.workflow_submit(
+ form_token=form_token,
+ workflow_run_id=workflow_run_id,
+ inputs=inputs,
+ user=user,
+ action=action_id,
+ timeout=120,
+ ):
+ self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
+
+ yield_this_iteration = False
+
+ if chunk['event'] == 'workflow_finished':
+ is_final = True
+ yield_this_iteration = True
+ if chunk['data'].get('error'):
+ raise errors.DifyAPIError(chunk['data']['error'])
+
+ if chunk['event'] == 'workflow_paused':
+ reasons = chunk['data'].get('reasons', [])
+ new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
+ for reason in reasons:
+ if reason.get('TYPE') != 'human_input_required':
+ continue
+ form_content = reason.get('form_content', '')
+ actions = reason.get('actions', [])
+ node_title = reason.get('node_title', '')
+ raw_inputs = reason.get('inputs', {})
+
+ _set_pending_form(
+ user,
+ {
+ 'workflow_run_id': new_run_id,
+ 'form_id': reason.get('form_id'),
+ 'form_token': reason.get('form_token'),
+ 'node_id': reason.get('node_id'),
+ 'node_title': node_title,
+ 'form_content': form_content,
+ 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
+ 'actions': actions,
+ 'expiration_time': reason.get('expiration_time'),
+ 'user': user,
+ },
+ )
+
+ repause_form_data = {
+ 'form_content': form_content,
+ 'actions': actions,
+ 'node_title': node_title,
+ 'workflow_run_id': new_run_id,
+ 'form_token': reason.get('form_token', ''),
+ }
+ is_final = True
+ yield_this_iteration = True
+ break
+
+ if chunk['event'] == 'text_chunk':
+ messsage_idx += 1
+ if remove_think:
+ if '' in chunk['data']['text'] and not think_start:
+ think_start = True
+ continue
+ if '' in chunk['data']['text'] and not think_end:
+ import re
+
+ content = re.sub(r'^\n', '', chunk['data']['text'])
+ workflow_contents += content
+ think_end = True
+ elif think_end:
+ workflow_contents += chunk['data']['text']
+ if think_start:
+ continue
+ else:
+ workflow_contents += chunk['data']['text']
+ if messsage_idx % 8 == 0:
+ yield_this_iteration = True
+
+ if yield_this_iteration:
+ msg = provider_message.MessageChunk(
+ role='assistant',
+ content=workflow_contents,
+ is_final=is_final,
+ )
+ msg._resume_from_form = True
+ if action_title:
+ msg._resume_action_title = action_title
+ if is_final and repause_form_data:
+ msg._form_data = repause_form_data
+ msg._open_new_card = True
+ yield msg
+ if is_final:
+ return
+
async def _workflow_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流"""
+ # Check if this is a form action resume (button click or text match)
+ form_action = query.variables.get('_dify_form_action')
+ session_key = _session_key_from_query(query)
+ pending_form = _get_pending_form(session_key)
+
+ if form_action:
+ form_action = self._merge_pending_form_action(form_action, pending_form)
+ _clear_pending_form(session_key)
+ elif pending_form:
+ form_action = self._match_pending_form_action(str(query.message_chain), pending_form)
+ if form_action:
+ _clear_pending_form(session_key)
+
+ if form_action:
+ # Resume paused workflow via submit endpoint
+ async for msg in self._submit_workflow_form(form_action):
+ yield msg
+ return
+
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -672,6 +984,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
think_start = False
think_end = False
workflow_contents = ''
+ workflow_run_id = ''
+ human_input_yielded = False
+
+ # Saved form data to attach to the final MessageChunk so the adapter
+ # can detect it when is_final=True and render buttons.
+ pending_form_data = None
+ display_text = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run(
@@ -682,7 +1001,62 @@ class DifyServiceAPIRunner(runner.RequestRunner):
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
+ if chunk['event'] == 'workflow_started':
+ workflow_run_id = chunk['data'].get('workflow_run_id', '')
continue
+
+ if chunk['event'] == 'workflow_paused':
+ reasons = chunk['data'].get('reasons', [])
+ workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
+ for reason in reasons:
+ if reason.get('TYPE') == 'human_input_required':
+ form_content = reason.get('form_content', '')
+ actions = reason.get('actions', [])
+ node_title = reason.get('node_title', '')
+
+ # Persist form state in module-level store keyed by session
+ raw_inputs = reason.get('inputs', {})
+ _set_pending_form(
+ _session_key_from_query(query),
+ {
+ 'workflow_run_id': workflow_run_id,
+ 'form_id': reason.get('form_id'),
+ 'form_token': reason.get('form_token'),
+ 'node_id': reason.get('node_id'),
+ 'node_title': node_title,
+ 'form_content': form_content,
+ 'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
+ 'actions': actions,
+ 'expiration_time': reason.get('expiration_time'),
+ 'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
+ },
+ )
+
+ # Pass form render metadata to downstream stages
+ query.variables['_dify_form_render'] = {
+ 'form_content': form_content,
+ 'actions': actions,
+ 'node_title': node_title,
+ }
+
+ action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
+ display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
+ workflow_contents += display_text + '\n'
+
+ # Save form data to attach to the final chunk later.
+ # We do NOT yield here — the form content will be sent
+ # as the final MessageChunk (with is_final=True and
+ # _form_data) so the adapter can update the card and
+ # add buttons in one pass.
+ pending_form_data = {
+ 'form_content': form_content,
+ 'actions': actions,
+ 'node_title': node_title,
+ 'workflow_run_id': workflow_run_id,
+ 'form_token': reason.get('form_token', ''),
+ }
+ human_input_yielded = True
+
if chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data']['error']:
@@ -730,11 +1104,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
if messsage_idx % 8 == 0 or is_final:
- yield provider_message.MessageChunk(
+ final_content = workflow_contents if workflow_contents.strip() else ''
+ msg = provider_message.MessageChunk(
role='assistant',
- content=workflow_contents,
+ content=final_content,
is_final=is_final,
)
+ # Attach form data to the final chunk for the adapter
+ if is_final and pending_form_data:
+ msg._form_data = pending_form_data
+ pending_form_data = None
+ yield msg
+
+ # If the stream ended after workflow_paused without a
+ # workflow_finished event, yield a final chunk so the adapter
+ # can update the card and add buttons.
+ if human_input_yielded and not is_final:
+ msg = provider_message.MessageChunk(
+ role='assistant',
+ content=workflow_contents or display_text,
+ is_final=True,
+ )
+ msg._form_data = pending_form_data
+ yield msg
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""