Compare commits

...

7 Commits

Author SHA1 Message Date
fdc310
83b0d26e99 feat(dingtalk): implement human input card support and card action handling
- Add a new module `card_callback.py` to handle card action button clicks from DingTalk.
- Introduce `DingTalkCardActionHandler` to process card action callbacks and extract parameters.
- Update `DingTalkAdapter` to manage card state and handle form input through a single card template.
- Add configuration for `human_input_card_template_id` in `dingtalk.yaml` to specify the template for human input.
- Create a new card template `dingtalk_human_input_card.json` for rendering human input prompts and buttons.
2026-06-15 15:51:52 +08:00
fdc310
e08b5db625 feat: Add the function for formatting human input text to support adapters without rich UI. 2026-06-10 15:41:18 +08:00
fdc310
d0f65b17ec feat: Improve TelegramAdapter message handling with enhanced error management and draft message support 2026-06-02 01:14:25 +08:00
fdc310
2b533c4a00 feat: Enhance TelegramAdapter to handle form action buttons and message threading 2026-06-02 00:25:52 +08:00
fdc310
f663d87a60 feat: Enhance Lark and Telegram adapters with new form handling for paused workflows 2026-06-01 23:48:59 +08:00
fdc310
60e5b873ee feat: Add '_routed_by_rule' variable to form action in Lark and Telegram adapters 2026-05-30 12:22:00 +08:00
fdc310
b96f209b98 feat: Implement workflow form handling for paused workflows
- Added module-level storage for pending forms to manage state across sessions.
- Introduced functions to set, get, and clear pending forms with expiration handling.
- Enhanced DifyServiceAPIRunner to support resuming paused workflows via form actions.
- Implemented logic to yield human input requests and display appropriate messages.
- Updated workflow submission methods to handle paused states and resume actions.
- Ensured proper merging of pending form actions with user inputs for seamless interaction.
2026-05-28 23:32:46 +08:00
14 changed files with 3247 additions and 127 deletions

View File

@@ -0,0 +1,648 @@
"""Generate the DingTalk human-input card template JSON.
The output is wrapped in the {editorData, widgetInfo, type, mode} envelope
the DingTalk card builder expects on import. editorData is itself a JSON
string (NOT a nested object), matching real exports from the builder.
Run from the repo root: python scripts/build_dingtalk_card_template.py
"""
from __future__ import annotations
import json
from pathlib import Path
OUTPUT = Path('src/langbot/templates/dingtalk_human_input_card.json')
def markdown_block(node_id, variable='content'):
"""A MarkdownBlock whose content is bound to a global variable.
Unlike BaseText (whose `text` field requires editor-side manual binding),
MarkdownBlock's `content` accepts `variableValue` binding at JSON load
time — the imported template renders the variable straight away.
"""
return {
'componentName': 'MarkdownBlock',
'id': node_id,
'props': {
'mdVer': 0,
'icon': {'type': 'icon', 'icon': '', 'iconType': 'emoji'},
'content': {'variable': variable, 'variableType': 'global', 'type': 'variableValue'},
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'isStreaming': False,
'enableLinkStatPoint': False,
'linkStatPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'linkStatPointParams': [],
'marginTop': 6,
'marginBottom': 6,
'marginLeft': 12,
'marginRight': 12,
},
'title': 'AI 流式富文本',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
def text_block(
node_id,
text,
*,
bold=False,
gravity='left',
font_size=14,
line_height=22,
max_lines=20,
ml=12,
mr=12,
mt=4,
mb=4,
color_token='common_level1_base_color',
style_token='common_body_text_style',
):
return {
'componentName': 'BaseText',
'id': node_id,
'props': {
'text': {'i18n': False, 'type': 'dynamicString', 'content': text},
'hoverText': {'type': 'dynamicString', 'content': '', 'i18n': False},
'iconType': 'iconCode',
'iconFont': {'type': 'icon', 'icon': '', 'iconType': 'ddIcon'},
'icon': {
'type': 'dynamicLink',
'value': '',
'valueType': 'fixed',
'variable': '',
'variableType': 'global',
},
'darkIcon': {
'type': 'dynamicLink',
'value': '',
'valueType': 'fixed',
'variable': '',
'variableType': 'global',
},
'autoWidth': False,
'maxWidth': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': 0,
'variable': '',
'variableType': 'global',
},
'fixedWidth': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': 0,
'variable': '',
'variableType': 'global',
},
'marginLeft': ml,
'marginRight': mr,
'marginTop': mt,
'marginBottom': mb,
'fontColorType': 'Standard',
'enableHighlight': False,
'maxLine': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': max_lines,
'variable': '',
'variableType': 'global',
},
'color': {
'type': 'dynamicColor',
'valueType': 'fixed',
'value': color_token,
'variable': '',
'variableType': 'global',
},
'customLightColor': {
'type': 'dynamicColor',
'valueType': 'fixed',
'value': '#35404b',
'variable': '',
'variableType': 'global',
},
'customDarkColor': {
'type': 'dynamicColor',
'valueType': 'fixed',
'value': '#f6f6f6',
'variable': '',
'variableType': 'global',
},
'gravity': gravity,
'fontSizeType': 'Standard',
'styleType': 'custom',
'styleToken': style_token,
'size': 'middle',
'customFontSize': font_size,
'customFontLineHeight': line_height,
'bold': bold,
'italic': False,
'strikeThrough': False,
'lineHeight': 'normal',
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'autoMaxWidth': False,
'innerOffset': 0,
'enableIcon': False,
'widthMode': 'match_parent',
'margin': -2,
},
'title': '基础文本',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
def button_group(node_id):
return {
'componentName': 'ButtonGroup',
'id': node_id,
'props': {
'dynamicButtons': {'type': 'variableValue', 'variableType': 'global', 'variable': 'btns'},
'marginLeft': 12,
'marginRight': 12,
'marginTop': 6,
'marginBottom': 12,
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'responsiveLayoutWidth': 350,
'buttonsSource': 'variable',
'fixedButtonIds': [],
'fixedButtons': [],
'enableResponsiveLayout': False,
'matchContent': False,
'buttonSpacing': 8,
'margin': -2,
'innerOffset': 0,
},
'title': '按钮组',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
def build_editor_data():
component_names = [
'AIPending',
'AICardStatusContainer',
'BaseText',
'AICardContent',
'AICardContainer',
'ButtonGroup',
'MarkdownBlock',
]
components_map = [
{
'package': '@ali/dxComponent',
'version': '1.0.0',
'exportName': n,
'main': './src/index.tsx',
'destructuring': False,
'subName': '',
'componentName': n,
}
for n in component_names
]
pending_state = {
'componentName': 'AICardStatusContainer',
'id': 'node_status_pending',
'props': {
'status': 1,
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enableExtend': False,
'autoFoldConfig': {
'needFold': True,
'heightLimit': 480,
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
},
'innerOffset': 0,
'enableCollapse': False,
'margin': -2,
},
'title': '处理中状态',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
{
'componentName': 'AIPending',
'id': 'node_pending_inner',
'props': {
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'pendingTip': {'type': 'dynamicString', 'content': '处理中...', 'i18n': False},
'style': 'embed',
'hideIcon': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
],
}
done_state = {
'componentName': 'AICardStatusContainer',
'id': 'node_status_done',
'props': {
'status': 3,
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enableExtend': False,
'autoFoldConfig': {
'needFold': True,
'heightLimit': 480,
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
},
'innerOffset': 0,
'enableCollapse': False,
'margin': -2,
},
'title': '完成状态',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
{
'componentName': 'AICardContent',
'id': 'node_done_content',
'props': {
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'innerOffset': 0,
'disabledWhileForward': False,
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'statPointParams': [
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
],
'margin': -2,
'transformToEventChain': False,
'enableStatPoint': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
markdown_block('node_text_content', variable='content'),
button_group('node_btn_group'),
],
}
],
}
failed_state = {
'componentName': 'AICardStatusContainer',
'id': 'node_status_failed',
'props': {
'status': 5,
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enableExtend': False,
'autoFoldConfig': {
'needFold': True,
'heightLimit': 480,
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
},
'innerOffset': 0,
'enableCollapse': False,
'margin': -2,
},
'title': '失败状态',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
{
'componentName': 'AICardContent',
'id': 'node_failed_content',
'props': {
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'innerOffset': 0,
'disabledWhileForward': False,
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'statPointParams': [
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
],
'margin': -2,
'transformToEventChain': False,
'enableStatPoint': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
text_block(
'node_failed_text',
'操作失败,请稍后重试。',
gravity='center',
mt=10,
mb=10,
ml=10,
mr=10,
max_lines=2,
font_size=15,
)
],
}
],
}
root = {
'componentName': 'AICardContainer',
'id': 'node_root',
'props': {
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enablePending': True,
'enableWriting': False,
'enableDoing': False,
'enableFailed': True,
'summaryContent': {'type': 'variableValue', 'variableType': 'global', 'variable': ''},
'enableTitle': False,
'flowStatusVar': {'type': 'variableValue', 'variableType': 'global', 'variable': 'flowStatus'},
'operationPenalType': 'custom',
'enableFlowAbort': True,
'innerOffset': 0,
'enableGradientBorder': True,
'cardSizeMode': 'adaptive',
'cardSizeHeightMode': 'adaptive',
'cardSizeWidthMode': 'adaptive',
'cardSizeHeight': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': 226,
'variable': '',
'variableType': 'global',
},
'hasBackground': False,
'backgroundType': 'Standard',
'standardBackgroundColor': 'gray',
'backgroundColor': '#F6F6F6',
'darkModeBackgroundColor': '#3C3C3C',
'enableEngineUpgrade': False,
'enableExposeStatPoint': False,
'enableDebugTool': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [pending_state, done_state, failed_state],
}
btns_var = {
'name': 'btns',
'private': False,
'type': 'buttonGroup',
'id': 'btns',
'description': '动态按钮列表Dify actions',
'editorVarType': 'variables',
'disabled': False,
'schema': [
{
'id': 'btns.text',
'type': 'string',
'name': 'text',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮文案',
},
{
'id': 'btns.color',
'type': 'string',
'name': 'color',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮颜色',
},
{
'id': 'btns.status',
'type': 'string',
'name': 'status',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮状态',
},
{
'id': 'btns.event',
'type': 'dynamicEvent',
'name': 'event',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮点击事件',
'schema': [
{
'id': 'btns.type',
'type': 'string',
'name': 'type',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '事件类型openLink / sendCardRequest',
},
{
'id': 'btns.params',
'type': 'object',
'name': 'params',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '事件参数',
'schema': [
{
'id': 'btns.url',
'type': 'string',
'name': 'url',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '点击跳转链接type=openLink',
},
{
'id': 'btns.actionId',
'type': 'string',
'name': 'actionId',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '回传请求 idtype=sendCardRequest',
},
{
'id': 'btns.params',
'type': 'object',
'name': 'params',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '回传请求参数type=sendCardRequest',
},
],
},
],
},
],
}
return {
'schemaVersion': '3.0.0',
'schema': {
'componentsMap': components_map,
'componentsTree': [root],
'i18n': {},
'version': '1.0.0',
},
'mockData': {
'cardData': {
'flowStatus': 3,
'content': '请审核以下报销申请:\n\n- 申请人:张三\n- 金额¥1,200\n- 类别:差旅',
'btns': [
{
'text': '通过',
'color': 'blue',
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {'actionId': 'approve', 'params': {'action_id': 'approve'}},
},
},
{
'text': '驳回',
'color': 'gray',
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {'actionId': 'reject', 'params': {'action_id': 'reject'}},
},
},
{
'text': '补充资料',
'color': 'gray',
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {'actionId': 'more_info', 'params': {'action_id': 'more_info'}},
},
},
],
},
'cardPrivateData': {},
'localData': {'flowStatus': '', '_cardFoldStatusLocalDataKey': ''},
'richTextData': {},
},
'renderContext': {'regenerateEnabled': '1', 'regenerateIndex': '2', 'regenerateTotal': '5'},
'editVersion': 0,
'customWidgetInfo': '',
'useCustomWidgetInfo': False,
'variableList': [
{
'id': 'content',
'type': 'markdown',
'name': 'content',
'description': '人工输入提示词Dify form_content 含可选 node_title 前缀)',
'private': False,
'editorVarType': 'variables',
'disabled': False,
},
{
'id': 'flowStatus',
'type': 'string',
'name': 'flowStatus',
'description': 'AI卡片状态pending(1)、writing(2)、done(3)、failed(5)',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'visible': False,
},
btns_var,
],
'formList': [],
'customContextList': [],
'expList': [],
'localList': [],
'hsfList': [],
'lwpList': [],
'pageData': {},
'extension': {'extendType': 'AI', 'aiStatusList': [3, 1, 5], 'fileTypeList': []},
}
def main():
editor_data = build_editor_data()
wrapper = {
'editorData': json.dumps(editor_data, ensure_ascii=False, separators=(',', ':')),
'widgetInfo': '',
'type': 'im',
'mode': 'card',
}
OUTPUT.write_text(json.dumps(wrapper, ensure_ascii=False, indent=2), encoding='utf-8')
print(f'wrote {OUTPUT}')
if __name__ == '__main__':
main()

View File

@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
if chunk.startswith('data:'): if chunk.startswith('data:'):
yield json.loads(chunk[5:]) yield json.loads(chunk[5:])
async def workflow_submit(
self,
form_token: str,
workflow_run_id: str,
inputs: dict[str, typing.Any],
user: str,
action: str = '',
timeout: float = 120.0,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""Submit human input to resume a paused workflow, then stream events.
1. POST /form/human_input/{form_token} to submit the form
2. GET /workflow/{task_id}/events to stream the resumed workflow events
"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
}
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
# Step 1: Submit the form
payload: dict[str, typing.Any] = {
'inputs': inputs if isinstance(inputs, dict) else {},
'user': user,
'action': action,
}
submit_resp = await client.post(
f'/form/human_input/{form_token}',
headers=headers,
json=payload,
)
if submit_resp.status_code != 200:
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
# Step 2: Stream resumed workflow events
async with client.stream(
'GET',
f'/workflow/{workflow_run_id}/events',
headers={'Authorization': f'Bearer {self.api_key}'},
params={'user': user},
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f'{r.status_code} {chunk}')
if chunk.strip() == '':
continue
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
async def upload_file( async def upload_file(
self, self,
file: httpx._types.FileTypes, file: httpx._types.FileTypes,

View File

@@ -1,17 +1,26 @@
import asyncio import asyncio
import base64 import base64
import json import json
import logging
import time import time
import uuid
import urllib.parse import urllib.parse
from typing import Callable from typing import Awaitable, Callable, Optional
import dingtalk_stream # type: ignore import dingtalk_stream # type: ignore
import websockets import websockets
from .EchoHandler import EchoTextHandler from .EchoHandler import EchoTextHandler
from .card_callback import DingTalkCardActionHandler
from .dingtalkevent import DingTalkEvent from .dingtalkevent import DingTalkEvent
import httpx import httpx
import traceback import traceback
_stdout_logger = logging.getLogger('langbot.dingtalk_api')
DINGTALK_OPENAPI_BASE = 'https://api.dingtalk.com'
class DingTalkClient: class DingTalkClient:
def __init__( def __init__(
self, self,
@@ -21,6 +30,7 @@ class DingTalkClient:
robot_code: str, robot_code: str,
markdown_card: bool, markdown_card: bool,
logger: None, logger: None,
card_action_callback: Optional[Callable[[dict], Awaitable[None]]] = None,
): ):
"""初始化 WebSocket 连接并自动启动""" """初始化 WebSocket 连接并自动启动"""
self.credential = dingtalk_stream.Credential(client_id, client_secret) self.credential = dingtalk_stream.Credential(client_id, client_secret)
@@ -30,6 +40,14 @@ class DingTalkClient:
# 在 DingTalkClient 中传入自己作为参数,避免循环导入 # 在 DingTalkClient 中传入自己作为参数,避免循环导入
self.EchoTextHandler = EchoTextHandler(self) self.EchoTextHandler = EchoTextHandler(self)
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler) self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
# STREAM-mode card action button click handler. Forwards parsed payload
# to the adapter so it can resume paused Dify workflows.
self.card_action_callback = card_action_callback
self.card_action_handler = DingTalkCardActionHandler(self.client, self._on_card_action)
self.client.register_callback_handler(
dingtalk_stream.handlers.CallbackHandler.TOPIC_CARD_CALLBACK,
self.card_action_handler,
)
self._message_handlers = { self._message_handlers = {
'example': [], 'example': [],
} }
@@ -41,6 +59,16 @@ class DingTalkClient:
self.logger = logger self.logger = logger
self._stopped = False # Flag to control the event loop self._stopped = False # Flag to control the event loop
async def _on_card_action(self, payload: dict) -> None:
"""Dispatch a parsed card-action payload to the adapter callback."""
if self.card_action_callback is None:
return
try:
await self.card_action_callback(payload)
except Exception:
if self.logger:
await self.logger.error(f'DingTalk card action callback error: {traceback.format_exc()}')
async def get_access_token(self): async def get_access_token(self):
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
headers = {'Content-Type': 'application/json'} headers = {'Content-Type': 'application/json'}
@@ -429,18 +457,35 @@ class DingTalkClient:
'Content-Type': 'application/json', 'Content-Type': 'application/json',
} }
# For enterprise-internal robots, robotCode == AppKey (client_id).
# The dedicated robot_code field is only required for scenario-group
# robots or third-party robots; fall back to client_id when empty so
# the common single-bot setup keeps working without manual config.
robot_code = self.robot_code or self.key
data = { data = {
'robotCode': self.robot_code, 'robotCode': robot_code,
'userIds': [target_id], 'userIds': [target_id],
'msgKey': 'sampleText', 'msgKey': 'sampleText',
'msgParam': json.dumps({'content': content}), 'msgParam': json.dumps({'content': content}),
} }
_stdout_logger.info(
'DingTalk send_proactive_message_to_one request: robotCode=%s target_id=%s content_len=%d',
robot_code,
target_id,
len(content),
)
try: try:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data) response = await client.post(url, headers=headers, json=data)
_stdout_logger.info(
'DingTalk send_proactive_message_to_one response: status=%d body=%s',
response.status_code,
response.text[:500],
)
if response.status_code == 200: if response.status_code == 200:
return return
except Exception: except Exception:
_stdout_logger.exception('DingTalk send_proactive_message_to_one error')
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}') await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}') raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
@@ -456,7 +501,7 @@ class DingTalkClient:
} }
data = { data = {
'robotCode': self.robot_code, 'robotCode': self.robot_code or self.key,
'openConversationId': target_id, 'openConversationId': target_id,
'msgKey': 'sampleText', 'msgKey': 'sampleText',
'msgParam': json.dumps({'content': content}), 'msgParam': json.dumps({'content': content}),
@@ -477,47 +522,274 @@ class DingTalkClient:
quote_origin: bool = False, quote_origin: bool = False,
card_auto_layout: bool = False, card_auto_layout: bool = False,
): ):
card_data = {} """Create + deliver the streaming chat card for a chatbot reply.
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
card_data['content'] = ''
# 将用户的消息内容作为卡片的查询参数,方便后续处理 Replaces the old `dingtalk_stream.AICardReplier`-based path. Returns
if incoming_message.message_type == 'text': `(None, out_track_id)` to keep call sites compatible with the
card_data['query'] = incoming_message.get_text_list()[0] previous `(card_instance, card_instance_id)` shape — the first slot
is unused now that everything is driven by out_track_id.
"""
out_track_id = uuid.uuid4().hex
is_group = str(incoming_message.conversation_type) == '2'
if is_group:
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
else: else:
card_data['query'] = '...' open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message) card_param_map = {'content': ''}
# print(card_instance) if incoming_message.message_type == 'text':
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards card_param_map['query'] = incoming_message.get_text_list()[0]
card_instance_id = await card_instance.async_create_and_deliver_card( else:
temp_card_id, card_param_map['query'] = '...'
card_data,
await self.create_and_deliver_card(
card_template_id=temp_card_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map=card_param_map,
card_data_config={'autoLayout': card_auto_layout},
) )
return card_instance, card_instance_id return None, out_track_id
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool): async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
content_key = 'content' """Stream a single chunk into an existing card's `content` field."""
try: try:
await card_instance.async_streaming( await self.streaming_update_card(
card_instance_id, out_track_id=card_instance_id,
content_key=content_key, content_key='content',
content_value=content, content_value=content,
append=False, append=False,
finished=is_final, finished=is_final,
failed=False, failed=False,
) )
except Exception as e: except Exception as e:
self.logger.exception(e) if self.logger:
await card_instance.async_streaming( self.logger.exception(e)
card_instance_id, await self.streaming_update_card(
content_key=content_key, out_track_id=card_instance_id,
content_key='content',
content_value='', content_value='',
append=False, append=False,
finished=is_final, finished=is_final,
failed=True, failed=True,
) )
async def create_and_deliver_card(
self,
*,
card_template_id: str,
out_track_id: str,
open_space_id: str,
is_group: bool,
card_param_map: Optional[dict] = None,
callback_type: str = 'STREAM',
callback_route_key: Optional[str] = None,
support_forward: bool = True,
dynamic_data_source_configs: Optional[list] = None,
card_data_config: Optional[dict] = None,
at_user_ids: Optional[dict] = None,
recipients: Optional[list] = None,
) -> bool:
"""POST /v1.0/card/instances/createAndDeliver.
Mirrors the SDK's `async_create_and_deliver_card` shape but exposes
the dynamic-data-source config slot so we can register a pull URL
for variable-length button lists.
"""
if not await self.check_access_token():
await self.get_access_token()
cardData: dict = {'cardParamMap': card_param_map or {}}
if card_data_config is not None:
cardData['config'] = json.dumps(card_data_config)
body: dict = {
'cardTemplateId': card_template_id,
'outTrackId': out_track_id,
'cardData': cardData,
'callbackType': callback_type,
'openSpaceId': open_space_id,
'imGroupOpenSpaceModel': {'supportForward': support_forward},
'imRobotOpenSpaceModel': {'supportForward': support_forward},
}
if callback_type == 'HTTP' and callback_route_key:
body['callbackRouteKey'] = callback_route_key
if is_group:
deliver: dict = {'robotCode': self.robot_code or self.key}
if at_user_ids:
deliver['atUserIds'] = at_user_ids
if recipients is not None:
deliver['recipients'] = recipients
body['imGroupOpenDeliverModel'] = deliver
else:
body['imRobotOpenDeliverModel'] = {'spaceType': 'IM_ROBOT'}
if dynamic_data_source_configs:
body['openDynamicDataConfig'] = {'dynamicDataSourceConfigs': dynamic_data_source_configs}
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/createAndDeliver'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
_stdout_logger.info(
'DingTalk createAndDeliver request body: %s',
json.dumps(body, ensure_ascii=False)[:1500],
)
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=body, timeout=30.0)
if response.status_code == 200:
_stdout_logger.info(
'DingTalk createAndDeliver response: %s',
response.text[:500],
)
return True
_stdout_logger.error(
'DingTalk createAndDeliver failed: status=%s body=%s',
response.status_code,
response.text,
)
if self.logger:
await self.logger.error(
f'DingTalk createAndDeliver failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
_stdout_logger.exception('DingTalk createAndDeliver error')
if self.logger:
await self.logger.error(f'DingTalk createAndDeliver error: {traceback.format_exc()}')
return False
async def streaming_update_card(
self,
*,
out_track_id: str,
content_key: str,
content_value: str,
append: bool,
finished: bool,
failed: bool = False,
) -> bool:
"""PUT /v1.0/card/streaming.
Replaces `dingtalk_stream.AICardReplier.async_streaming` — same body
shape (outTrackId / guid / key / content / isFull / isFinalize /
isError) per the SDK source.
"""
if not await self.check_access_token():
await self.get_access_token()
body = {
'outTrackId': out_track_id,
'guid': uuid.uuid4().hex,
'key': content_key,
'content': content_value,
'isFull': not append,
'isFinalize': finished,
'isError': failed,
}
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/streaming'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=body, timeout=30.0)
if response.status_code == 200:
return True
if self.logger:
await self.logger.error(
f'DingTalk card streaming failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
if self.logger:
await self.logger.error(f'DingTalk card streaming error: {traceback.format_exc()}')
return False
async def update_card_data(
self,
*,
out_track_id: str,
card_param_map: Optional[dict] = None,
private_data: Optional[dict] = None,
) -> bool:
"""PUT /v1.0/card/instances — non-streaming card content update."""
if not await self.check_access_token():
await self.get_access_token()
body: dict = {
'outTrackId': out_track_id,
'cardData': {'cardParamMap': card_param_map or {}},
}
if private_data:
body['privateData'] = private_data
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
_stdout_logger.info(
'DingTalk update_card_data request: out_track_id=%s body=%s',
out_track_id,
json.dumps(body, ensure_ascii=False)[:500],
)
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=body, timeout=30.0)
_stdout_logger.info(
'DingTalk update_card_data response: status=%d body=%s',
response.status_code,
response.text[:300],
)
if response.status_code == 200:
return True
if self.logger:
await self.logger.error(
f'DingTalk update card failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
_stdout_logger.exception('DingTalk update_card_data error')
if self.logger:
await self.logger.error(f'DingTalk update card error: {traceback.format_exc()}')
return False
async def delete_card(self, *, out_track_id: str) -> bool:
"""POST /v1.0/card/instances/delete — recall a delivered card.
Used to retroactively remove the initial streaming chat card when
the workflow turns out to be paused for human input — the prompt
and buttons then live entirely on the dedicated form card.
"""
if not await self.check_access_token():
await self.get_access_token()
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/delete'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
body = {'outTrackId': out_track_id, 'userIdType': 1}
try:
_stdout_logger.info('DingTalk delete_card request: out_track_id=%s', out_track_id)
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=body, timeout=30.0)
_stdout_logger.info(
'DingTalk delete_card response: status=%d body=%s',
response.status_code,
response.text[:300],
)
return response.status_code == 200
except Exception:
_stdout_logger.exception('DingTalk delete_card error')
return False
async def start(self): async def start(self):
"""启动 WebSocket 连接,监听消息""" """启动 WebSocket 连接,监听消息"""
self._stopped = False self._stopped = False

View File

@@ -0,0 +1,96 @@
"""STREAM-mode handler for DingTalk card action button clicks.
DingTalk delivers card-action callbacks over the same WebSocket stream used
for chatbot messages, under the topic `/v1.0/card/instances/callback`. This
module subclasses `dingtalk_stream.CallbackHandler` and forwards the parsed
payload to a coroutine the adapter registers, so the resume-paused-workflow
logic stays in the platform adapter where it belongs.
The `CardCallbackMessage` returned by `from_dict` exposes:
* `card_instance_id` (from `outTrackId`) — the card whose button was clicked
* `user_id` — the clicker's userId
* `content` — parsed JSON; the click params live here. Where exactly inside
`content` they sit depends on the template binding. We probe
the common paths.
* `extension` — parsed JSON; any extra data we set when delivering the card.
"""
from __future__ import annotations
from typing import Awaitable, Callable, Optional
import dingtalk_stream # type: ignore
from dingtalk_stream import AckMessage
from dingtalk_stream.card_callback import CardCallbackMessage
_PARAM_PATHS = (
('params',),
('cardPrivateData', 'params'),
('userPrivateData', 'params'),
)
def _extract_params(content: dict) -> dict:
"""Return the action params dict regardless of where the template put it."""
for path in _PARAM_PATHS:
node = content
for key in path:
if not isinstance(node, dict):
node = None
break
node = node.get(key)
if node is None:
break
if isinstance(node, dict) and node:
return node
return {}
class DingTalkCardActionHandler(dingtalk_stream.CallbackHandler):
def __init__(
self,
dingtalk_stream_client,
on_action: Optional[Callable[[dict], Awaitable[None]]] = None,
):
super().__init__()
self.dingtalk_client = dingtalk_stream_client
self.on_action = on_action
async def process(self, callback: dingtalk_stream.CallbackMessage):
try:
message = CardCallbackMessage.from_dict(callback.data)
params = _extract_params(message.content if isinstance(message.content, dict) else {})
# `CardCallbackMessage.from_dict` does not surface `actionId` (the
# top-level field that ButtonGroup's sendCardRequest event puts
# there). Pull it from the raw callback.data instead.
raw = callback.data if isinstance(callback.data, dict) else {}
action_id = raw.get('actionId') or ''
if not action_id:
# Some templates nest it under actionData / cardPrivateData.
action_data = raw.get('actionData') or {}
if isinstance(action_data, dict):
action_id = action_data.get('actionId') or action_id
if not action_id:
cpd = action_data.get('cardPrivateData') or {}
if isinstance(cpd, dict):
ids = cpd.get('actionIds')
if isinstance(ids, list) and ids:
action_id = str(ids[0])
payload = {
'out_track_id': message.card_instance_id,
'user_id': message.user_id,
'corp_id': message.corp_id,
'action_id': action_id,
'params': params,
'raw_content': message.content,
'extension': message.extension if isinstance(message.extension, dict) else {},
}
if self.on_action is not None:
await self.on_action(payload)
except Exception as e:
self.logger.error(f'DingTalkCardActionHandler.process error: {e}')
return AckMessage.STATUS_OK, 'OK'

View File

@@ -157,7 +157,7 @@ class RuntimePipeline:
bot_message=query.resp_messages[-1], bot_message=query.resp_messages[-1],
message=result.user_notice, message=result.user_notice,
quote_origin=query.pipeline_config['output']['misc']['quote-origin'], quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
is_final=[msg.is_final for msg in query.resp_messages][0], is_final=[msg.is_final for msg in query.resp_messages][-1],
) )
else: else:
await query.adapter.reply_message( await query.adapter.reply_message(

View File

@@ -42,9 +42,13 @@ class QueryPool:
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None, pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False, routed_by_rule: bool = False,
variables: typing.Optional[dict[str, typing.Any]] = None,
) -> pipeline_query.Query: ) -> pipeline_query.Query:
async with self.condition: async with self.condition:
query_id = self.query_id_counter query_id = self.query_id_counter
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
if variables:
initial_variables.update(variables)
query = pipeline_query.Query( query = pipeline_query.Query(
bot_uuid=bot_uuid, bot_uuid=bot_uuid,
query_id=query_id, query_id=query_id,
@@ -53,7 +57,7 @@ class QueryPool:
sender_id=sender_id, sender_id=sender_id,
message_event=message_event, message_event=message_event,
message_chain=message_chain, message_chain=message_chain,
variables={'_routed_by_rule': routed_by_rule}, variables=initial_variables,
resp_messages=[], resp_messages=[],
resp_message_chain=[], resp_message_chain=[],
adapter=adapter, adapter=adapter,

View File

@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages) has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
# TODO 命令与流式的兼容性问题 # TODO 命令与流式的兼容性问题
if await query.adapter.is_stream_output_supported() and has_chunks: if await query.adapter.is_stream_output_supported() and has_chunks:
is_final = [msg.is_final for msg in query.resp_messages][0] is_final = [msg.is_final for msg in query.resp_messages][-1]
await query.adapter.reply_message_chunk( await query.adapter.reply_message_chunk(
message_source=query.message_event, message_source=query.message_event,
bot_message=query.resp_messages[-1], bot_message=query.resp_messages[-1],

View File

@@ -501,6 +501,8 @@ class PlatformManager:
bot_entity.adapter_config, bot_entity.adapter_config,
logger, logger,
) )
if hasattr(adapter_inst, 'ap'):
adapter_inst.ap = self.ap
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid用于统一 webhook # 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid用于统一 webhook
if hasattr(adapter_inst, 'set_bot_uuid'): if hasattr(adapter_inst, 'set_bot_uuid'):

View File

@@ -1,13 +1,19 @@
import asyncio
import json
import traceback import traceback
import typing import typing
import uuid
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.entities.builtin.provider.session as provider_session
from langbot.libs.dingtalk_api.api import DingTalkClient from langbot.libs.dingtalk_api.api import DingTalkClient
import datetime import datetime
from langbot.pkg.platform.logger import EventLogger from langbot.pkg.platform.logger import EventLogger
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter): class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@@ -170,6 +176,13 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_instance_id_dict: ( card_instance_id_dict: (
dict # 回复卡片消息字典key为消息idvalue为回复卡片实例id用于在流式消息时判断是否发送到指定卡片 dict # 回复卡片消息字典key为消息idvalue为回复卡片实例id用于在流式消息时判断是否发送到指定卡片
) )
# outTrackId → form snapshot {session_key, launcher_type, launcher_id, form_token,
# workflow_run_id, actions, node_title, form_content, expires_at, open_space_id,
# user_id_hint, current_text}. Lookup keys for the data-source pull endpoint and
# the STREAM card-action callback.
card_state: dict
ap: typing.Any = None
bot_uuid: str = ''
def __init__(self, config: dict, logger: EventLogger): def __init__(self, config: dict, logger: EventLogger):
required_keys = [ required_keys = [
@@ -194,10 +207,15 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
config=config, config=config,
logger=logger, logger=logger,
card_instance_id_dict={}, card_instance_id_dict={},
card_state={},
bot_account_id=bot_account_id, bot_account_id=bot_account_id,
bot=bot, bot=bot,
listeners={}, listeners={},
) )
# Wire the card-action callback after super().__init__ so we can reference
# self.* — the client's handler stores this as a soft reference and reads
# it at fire time.
self.bot.card_action_callback = self._on_card_action
async def reply_message( async def reply_message(
self, self,
@@ -222,28 +240,79 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
quote_origin: bool = False, quote_origin: bool = False,
is_final: bool = False, is_final: bool = False,
): ):
# event = await DingTalkEventConverter.yiri2target(
# message_source,
# )
# incoming_message = event.incoming_message
# msg_id = incoming_message.message_id
message_id = bot_message.resp_message_id message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence msg_seq = bot_message.msg_sequence
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
form_data = getattr(bot_message, '_form_data', None)
if is_final and self.ap is not None:
self.ap.logger.info(
f'DingTalk reply_message_chunk final: form_data_present={form_data is not None}, '
f'form_template_configured={bool(form_template_id)}'
)
if form_data and is_final:
await self._handle_form_chunk(message_source, bot_message, message, form_data)
return
if (msg_seq - 1) % 8 == 0 or is_final: if (msg_seq - 1) % 8 == 0 or is_final:
markdown_enabled = self.config.get('markdown_card', False) markdown_enabled = self.config.get('markdown_card', False)
content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled) content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
card_instance, card_instance_id = self.card_instance_id_dict[message_id]
if not content and bot_message.content: if not content and bot_message.content:
content = bot_message.content # 兼容直接传入content的情况 content = bot_message.content # 兼容直接传入content的情况
# print(card_instance_id)
chat_card_entry = self.card_instance_id_dict.get(message_id)
if chat_card_entry is None:
# No streaming chat card was created for this query — common
# path for synthetic events (e.g. resumed workflow after a
# button click). Lazy-create one so the resumed output streams
# into a card just like a normal conversation, instead of
# being deferred and sent in one shot on is_final.
if not content:
return # nothing to stream yet
chat_card_entry = await self._lazy_create_resume_chat_card(message_source, message_id)
if chat_card_entry is None:
# Lazy-create failed (no template configured); fall back
# to a one-shot proactive message on the final chunk.
if is_final:
await self._send_proactive_to_event(message_source, content)
return
card_instance, card_instance_id = chat_card_entry
if content: if content:
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final) if form_template_id:
if is_final and bot_message.tool_calls is None: # The form template's MarkdownBlock has `isStreaming: false`
# self.seq = 1 # 消息回复结束之后重置seq # — the streaming endpoint (PUT /v1.0/card/streaming) does
self.card_instance_id_dict.pop(message_id) # 消息回复结束之后删除卡片实例id # not propagate to non-streaming components. Use the full
# update_card_data PUT instead so the content actually
# appears in the card body.
try:
await self.bot.update_card_data(
out_track_id=card_instance_id,
card_param_map={
'content': content,
'btns': '[]',
'flowStatus': '3' if is_final else '1',
},
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: update card content failed')
else:
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
if is_final:
if form_template_id and not content:
# Empty final chunk still needs to leave the card with
# flowStatus=3 so the spinner stops.
try:
await self.bot.update_card_data(
out_track_id=card_instance_id,
card_param_map={'flowStatus': '3'},
)
except Exception:
pass
if bot_message.tool_calls is None:
self.card_instance_id_dict.pop(message_id, None)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
markdown_enabled = self.config.get('markdown_card', False) markdown_enabled = self.config.get('markdown_card', False)
@@ -260,16 +329,56 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return is_stream return is_stream
async def create_message_card(self, message_id, event): async def create_message_card(self, message_id, event):
card_template_id = self.config['card_template_id'] # When a form template is configured, every card in the conversation
# uses it (chat output, form prompts, post-click states). The chat
# template fallback only kicks in if no form template is configured.
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
legacy_template_id = self.config.get('card_template_id', '')
# Synthetic events (e.g. card button clicks) have no inbound chatbot
# message — skip card creation. The lazy-create path in
# reply_message_chunk will spawn a fresh card when the first
# non-empty resume chunk arrives.
if event is None or event.source_platform_object is None:
return False
if form_template_id:
# Defer card creation to the first non-empty chunk. If the Dify
# workflow pauses immediately for human input without producing
# any LLM text first, no chat card is created at all — only the
# form card gets delivered. Lazy-create lives in
# reply_message_chunk → _lazy_create_resume_chat_card.
return False
# Legacy chat-card path (no form template configured).
incoming_message = event.source_platform_object.incoming_message incoming_message = event.source_platform_object.incoming_message
# message_id = incoming_message.message_id
card_auto_layout = self.config.get('card_ auto_layout', False) card_auto_layout = self.config.get('card_ auto_layout', False)
card_instance, card_instance_id = await self.bot.create_and_card( card_instance, card_instance_id = await self.bot.create_and_card(
card_template_id, incoming_message, card_auto_layout=card_auto_layout legacy_template_id, incoming_message, card_auto_layout=card_auto_layout
) )
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id) self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
return True return True
def _session_key_from_event(self, event) -> str:
"""Return launcher_type_launcher_id for an event, '' if unrecoverable."""
if event is None:
return ''
spo = event.source_platform_object
if spo is None:
try:
if isinstance(event, platform_events.GroupMessage):
return f'group_{event.group.id}'
return f'person_{event.sender.id}'
except Exception:
return ''
try:
inc = spo.incoming_message
if str(inc.conversation_type) == '2':
return f'group_{inc.conversation_id}'
return f'person_{inc.sender_staff_id}'
except Exception:
return ''
def register_listener( def register_listener(
self, self,
event_type: typing.Type[platform_events.Event], event_type: typing.Type[platform_events.Event],
@@ -309,3 +418,449 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
], ],
): ):
return super().unregister_listener(event_type, callback) return super().unregister_listener(event_type, callback)
# ------------------------------------------------------------------
# Dify human-input form support
# ------------------------------------------------------------------
def set_bot_uuid(self, bot_uuid: str):
"""Receive the bot uuid from the platform manager.
Used to compose the public-facing unified-webhook URL for the card
dynamic-data-source pull endpoint.
"""
self.bot_uuid = bot_uuid
def _derive_open_space(self, message_source: platform_events.MessageEvent) -> tuple[str, bool]:
"""Return (openSpaceId, is_group) for the given inbound event."""
if isinstance(message_source, platform_events.GroupMessage):
return f'dtv1.card//IM_GROUP.{message_source.group.id}', True
return f'dtv1.card//IM_ROBOT.{message_source.sender.id}', False
def _derive_session_descriptor(
self, message_source: platform_events.MessageEvent
) -> tuple[provider_session.LauncherTypes, str, str]:
"""Return (launcher_type, launcher_id, sender_user_id) for routing."""
if isinstance(message_source, platform_events.GroupMessage):
return (
provider_session.LauncherTypes.GROUP,
str(message_source.group.id),
str(message_source.sender.id),
)
return (
provider_session.LauncherTypes.PERSON,
str(message_source.sender.id),
str(message_source.sender.id),
)
async def _handle_form_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
form_data: dict,
) -> None:
"""Finalize the current chat card and deliver a new form card.
Multi-card flow: every Dify pause spawns its own card. The card the
chat was streaming into (if any) is closed out via streaming_update
with finished=True so its spinner stops; a fresh card is then
delivered carrying the prompt + buttons.
"""
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _handle_form_chunk: actions={len(form_data.get("actions") or [])}, '
f'node_title={form_data.get("node_title", "")!r}'
)
message_id = bot_message.resp_message_id
template_id = (self.config.get('human_input_card_template_id') or '').strip()
# Finalize the previous chat card so its spinner stops. Use the
# already-streamed text as the final content (or zero-width space
# when nothing streamed, to satisfy any non-empty-content guards).
chat_card_entry = self.card_instance_id_dict.pop(message_id, None)
if chat_card_entry is not None:
_, chat_out_track_id = chat_card_entry
markdown_enabled = self.config.get('markdown_card', False)
text_content, _ = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
if not text_content and bot_message.content:
text_content = bot_message.content
try:
await self.bot.send_card_message(None, chat_out_track_id, text_content or '', True)
except Exception:
await self.logger.error(f'DingTalk: finalize chat card before form failed: {traceback.format_exc()}')
# When the chat card uses the form template, also flip flowStatus
# to 3 so it leaves the pending state visibly.
if template_id:
try:
await self.bot.update_card_data(
out_track_id=chat_out_track_id,
card_param_map={'flowStatus': '3'},
)
except Exception:
pass
if not template_id:
# No form template configured — fall back to plain text so users
# can still reply with the option number or title.
await self.send_message_text_form(message_source, form_data)
return
await self._send_form_card(message_source, form_data, template_id)
async def _send_form_card(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
template_id: str,
) -> None:
"""Deliver a new card pre-loaded with the human-input prompt + buttons."""
out_track_id = uuid.uuid4().hex
open_space_id, is_group = self._derive_open_space(message_source)
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
session_key = f'{launcher_type.value}_{launcher_id}'
actions = list(form_data.get('actions') or [])
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '') or ''
self.card_state[out_track_id] = {
'session_key': session_key,
'launcher_type': launcher_type.value,
'launcher_id': launcher_id,
'sender_user_id': sender_user_id,
'form_token': form_data.get('form_token', ''),
'workflow_run_id': form_data.get('workflow_run_id', ''),
'actions': actions,
'node_title': node_title,
'form_content': form_content,
'open_space_id': open_space_id,
'is_group': is_group,
}
parts = []
if node_title:
parts.append(f'**{node_title}**')
if form_content:
parts.append(form_content)
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
btns = []
for idx, action in enumerate(actions):
action_id = str(action.get('id') or '')
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
style = (action.get('button_style') or '').lower()
if style == 'primary' or (style == '' and idx == 0):
color = 'blue'
elif style == 'danger':
color = 'red'
else:
color = 'gray'
btns.append(
{
'text': title,
'color': color,
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {
'actionId': action_id,
'params': {'action_id': action_id, 'out_track_id': out_track_id},
},
},
}
)
try:
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _send_form_card: out_track_id={out_track_id} template_id={template_id} '
f'open_space_id={open_space_id} is_group={is_group} btns={len(btns)}'
)
await self.bot.create_and_deliver_card(
card_template_id=template_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map={
'content': display_content,
'btns': json.dumps(btns, ensure_ascii=False),
'flowStatus': '3',
},
callback_type='STREAM',
)
except Exception:
await self.logger.error(f'DingTalk: deliver form card failed: {traceback.format_exc()}')
await self.send_message_text_form(message_source, form_data)
self.card_state.pop(out_track_id, None)
async def _lazy_create_resume_chat_card(
self,
message_source: platform_events.MessageEvent,
message_id: str,
) -> typing.Optional[tuple]:
"""Create a new card for resumed-workflow streaming output.
Used after a button click triggers a synthetic event — no inbound
chatbot message means no card was created upstream, so we spin one
up here when the first non-empty chunk arrives. Prefers the form
template (so empty `btns` keep the layout consistent across the
whole conversation); falls back to the legacy chat template.
"""
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
legacy_template_id = (self.config.get('card_template_id') or '').strip()
template_id = form_template_id or legacy_template_id
if not template_id:
return None
out_track_id = uuid.uuid4().hex
open_space_id, is_group = self._derive_open_space(message_source)
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _lazy_create_resume_chat_card: out_track_id={out_track_id} '
f'open_space_id={open_space_id} is_group={is_group} '
f'using_form_template={bool(form_template_id)}'
)
if form_template_id:
card_param_map = {'content': '', 'btns': '[]', 'flowStatus': '1'}
card_data_config = None
else:
card_param_map = {'content': '', 'query': '...'}
card_data_config = {'autoLayout': self.config.get('card_auto_layout', False)}
try:
success = await self.bot.create_and_deliver_card(
card_template_id=template_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map=card_param_map,
card_data_config=card_data_config,
callback_type='STREAM',
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: lazy create resume chat card failed')
return None
if not success:
return None
entry = (None, out_track_id)
self.card_instance_id_dict[message_id] = entry
return entry
async def send_message_text_form(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> None:
"""Fallback: send the human-input prompt as plain text."""
display_text = _format_human_input_text(
form_data.get('node_title', ''),
form_data.get('form_content', ''),
form_data.get('actions', []) or [],
)
await self._send_proactive_to_event(message_source, display_text)
async def _send_proactive_to_event(
self,
message_source: platform_events.MessageEvent,
content: str,
) -> None:
"""Send `content` as a proactive message to the conversation behind
`message_source`. Used when no inbound chatbot message exists to
anchor a card on (e.g. resumed flows triggered by card actions).
"""
if not content:
return
if self.ap is not None:
target = (
str(message_source.group.id)
if isinstance(message_source, platform_events.GroupMessage)
else str(message_source.sender.id)
)
self.ap.logger.info(
f'DingTalk _send_proactive_to_event: target={target} '
f'is_group={isinstance(message_source, platform_events.GroupMessage)} content_len={len(content)}'
)
try:
if isinstance(message_source, platform_events.GroupMessage):
await self.bot.send_proactive_message_to_group(str(message_source.group.id), content)
else:
await self.bot.send_proactive_message_to_one(str(message_source.sender.id), content)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: send proactive message failed')
await self.logger.error(f'DingTalk: send proactive message failed: {traceback.format_exc()}')
async def _on_card_action(self, payload: dict) -> None:
"""Translate a card button click into a synthetic query.
Reads the clicked button's ``actionId`` (the real Dify action id —
the ButtonGroup template sends it back via `event.params.actionId`),
recovers the action title from ``card_state``, and enqueues a
synthetic `_dify_form_action` query the same way Lark / Telegram do.
"""
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _on_card_action received: out_track_id={payload.get("out_track_id")} '
f'payload_action_id={payload.get("action_id")!r} params={payload.get("params")!r}'
)
out_track_id = payload.get('out_track_id') or ''
params = payload.get('params') or {}
# ButtonGroup `sendCardRequest` events surface the click id at the
# callback top level as `actionId`; fall back to `params.action_id`
# (alternate template wiring) and `params.actionId`.
raw_action_id = (
(payload.get('action_id') or '').strip()
or (params.get('action_id') or '').strip()
or (params.get('actionId') or '').strip()
or (params.get('id') or '').strip()
)
state = self.card_state.get(out_track_id)
if state is None:
await self.logger.warning(f'DingTalk: card action received for unknown out_track_id={out_track_id}')
return
if not raw_action_id:
await self.logger.warning(f'DingTalk: card action with no action_id, payload={payload}')
return
actions = state.get('actions', []) or []
action_id = raw_action_id
action_title = raw_action_id
for action in actions:
if str(action.get('id', '')) == raw_action_id:
action_title = action.get('title') or raw_action_id
break
launcher_type = (
provider_session.LauncherTypes.GROUP
if state.get('launcher_type') == provider_session.LauncherTypes.GROUP.value
else provider_session.LauncherTypes.PERSON
)
launcher_id = state.get('launcher_id', '')
sender_user_id = state.get('sender_user_id') or payload.get('user_id') or launcher_id
form_action_data = {
'form_token': state.get('form_token', ''),
'workflow_run_id': state.get('workflow_run_id', ''),
'action_id': action_id,
'action_title': action_title,
'node_title': state.get('node_title', ''),
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=sender_user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
special_title='',
),
message_chain=message_chain,
time=int(datetime.datetime.now().timestamp()),
source_platform_object=None,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=sender_user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=int(datetime.datetime.now().timestamp()),
source_platform_object=None,
)
bot_uuid = ''
pipeline_uuid = None
if self.ap is not None:
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
try:
self.ap.logger.info(
f'DingTalk _on_card_action enqueuing form action: action_id={action_id!r} '
f'action_title={action_title!r} launcher_type={launcher_type.value} '
f'launcher_id={launcher_id} bot_uuid={bot_uuid} pipeline_uuid={pipeline_uuid}'
)
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
self.ap.logger.info('DingTalk _on_card_action: query enqueued OK')
except Exception:
self.ap.logger.exception('DingTalk: enqueue form action query failed')
return
# Visual feedback: collapse the form card to a "已选择" notice so
# the user knows the click registered while the workflow resumes.
asyncio.create_task(
self._mark_card_resolved(
out_track_id,
action_title,
node_title=state.get('node_title', ''),
form_content=state.get('form_content', ''),
)
)
# Once consumed, drop the state — the runner clears _PENDING_FORMS too.
self.card_state.pop(out_track_id, None)
async def _mark_card_resolved(
self,
out_track_id: str,
action_title: str,
*,
node_title: str = '',
form_content: str = '',
) -> None:
"""Update the form card to acknowledge the user's selection.
We rewrite the card content with the original prompt + a green tick
marker, and explicitly clear ``btns`` so the buttons are removed
once chosen. ``flowStatus`` is re-sent because some DingTalk clients
treat the PUT update as a partial *replace* of cardParamMap rather
than a merge — without it, the AICardContainer status containers
would all gate to ``gone`` and the whole card would blank out.
"""
parts: list[str] = []
if node_title:
parts.append(f'**{node_title}**')
if form_content:
parts.append(form_content)
parts.append(f'---\n✅ 已选择:**{action_title}**')
content = '\n\n'.join(parts)
if self.ap is not None:
self.ap.logger.info(f'DingTalk _mark_card_resolved: out_track_id={out_track_id} action={action_title!r}')
try:
await self.bot.update_card_data(
out_track_id=out_track_id,
card_param_map={
'content': content,
'btns': '[]',
'flowStatus': '3',
},
)
except Exception:
await self.logger.error(f'DingTalk: update form card after click failed: {traceback.format_exc()}')

View File

@@ -103,6 +103,18 @@ spec:
type: string type: string
required: true required: true
default: "填写你的卡片template_id" default: "填写你的卡片template_id"
- name: human_input_card_template_id
label:
en_US: Human Input Card Template ID
zh_Hans: 人工输入卡片模板ID
zh_Hant: 人工輸入卡片範本ID
description:
en_US: "Template ID used as the SINGLE card for the whole conversation turn. Streamed LLM text fills the `content` markdown variable; on a Dify human-input pause the `btns` buttonGroup variable is populated so action buttons appear on the SAME card; after the user clicks a button the buttons disappear and resumed streaming continues into the same card. Use the bundled `src/langbot/templates/dingtalk_human_input_card.json` — it ships with `content` (MarkdownBlock) and `btns` (ButtonGroup) already wired. Leave empty to fall back to the legacy two-card behaviour (chat card streaming text + plain-text human-input prompts)."
zh_Hans: "用作整个对话回合**唯一**卡片的模板ID。流式 LLM 文本写入 `content` markdown 变量Dify 人工输入暂停时同一张卡的 `btns` buttonGroup 变量被填上、按钮浮现;用户点击后按钮消失、恢复的流式内容继续追加到同一张卡。可使用项目附带的 `src/langbot/templates/dingtalk_human_input_card.json`——已经预先连好 `content` (MarkdownBlock) 与 `btns` (ButtonGroup)。留空则降级为旧的双卡行为(聊天卡走流式 + 人工输入走纯文本)。"
zh_Hant: "用作整個對話回合**唯一**卡片的範本ID。流式 LLM 文字寫入 `content` markdown 變數Dify 人工輸入暫停時同一張卡的 `btns` buttonGroup 變數被填上、按鈕浮現;使用者點擊後按鈕消失、恢復的流式內容繼續追加到同一張卡。可使用專案附帶的 `src/langbot/templates/dingtalk_human_input_card.json`——已經預先連好 `content` (MarkdownBlock) 與 `btns` (ButtonGroup)。留空則降級為舊的雙卡行為(聊天卡走流式 + 人工輸入走純文字)。"
type: string
required: false
default: ""
execution: execution:
python: python:
path: ./dingtalk.py path: ./dingtalk.py

View File

@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
import langbot_plugin.api.entities.builtin.provider.session as provider_session
class AESCipher(object): class AESCipher(object):
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True) bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
api_client: lark_oapi.Client = pydantic.Field(exclude=True) api_client: lark_oapi.Client = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识 bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
pending_monitoring_msg: dict[str, str] pending_monitoring_msg: dict[str, str]
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks) # Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
reply_to_monitoring_msg: dict[str, tuple[str, float]] reply_to_monitoring_msg: dict[str, tuple[str, float]]
reply_message_card_ids: dict[str, str]
card_sequence_dict: dict[str, int]
# card_id → set of source message ids registered against it (for cleanup)
card_id_to_source_ids: dict[str, set[str]]
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
card_streaming_text: dict[str, str]
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
card_pre_pause_text: dict[str, str]
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
card_resume_transitioned: set[str]
_MONITORING_MAPPING_TTL = 600 # 10 minutes _MONITORING_MAPPING_TTL = 600 # 10 minutes
seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识 seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识
@@ -812,11 +824,134 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
asyncio.create_task(on_message(event)) asyncio.create_task(on_message(event))
def schedule_on_app_loop(coro):
"""Run a coroutine on the application event loop from sync callbacks."""
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
def sync_on_card_action(event): def sync_on_card_action(event):
try: try:
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {}) action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
# Parse JSON string values (from form action buttons)
if isinstance(action_value_raw, str):
try:
action_value_obj = json.loads(action_value_raw)
except (json.JSONDecodeError, TypeError):
action_value_obj = {}
else:
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else '' action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
# Handle Dify form action button clicks
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
form_token = action_value_obj.get('form_token', '')
workflow_run_id = action_value_obj.get('workflow_run_id', '')
action_id = action_value_obj.get('action_id', '')
session_key = action_value_obj.get('session_key', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
# Find the bot entity to get bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
form_action_data = {
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
context = getattr(event.event, 'context', None)
open_message_id = getattr(context, 'open_message_id', None)
source_time = datetime.datetime.now()
event_time = source_time.timestamp()
action_text = action_value_obj.get('action_id', 'confirm')
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
)
if open_message_id:
message_chain.insert(
0,
platform_message.Source(
id=open_message_id,
time=source_time,
),
)
operator = getattr(event.event, 'operator', None)
user_id = (
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
async def add_form_action_query():
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
schedule_on_app_loop(add_form_action_query())
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
if action_value == '有帮助': if action_value == '有帮助':
feedback_type = 1 feedback_type = 1
elif action_value == '无帮助': elif action_value == '无帮助':
@@ -857,17 +992,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
) )
if platform_events.FeedbackEvent in self.listeners: if platform_events.FeedbackEvent in self.listeners:
loop = asyncio.get_event_loop() schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
if loop.is_running():
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
else:
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}}) return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
except Exception: except Exception:
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}')) traceback.print_exc()
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}}) return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
@@ -893,6 +1025,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_id_dict={}, card_id_dict={},
pending_monitoring_msg={}, pending_monitoring_msg={},
reply_to_monitoring_msg={}, reply_to_monitoring_msg={},
reply_message_card_ids={},
card_sequence_dict={},
card_id_to_source_ids={},
card_streaming_text={},
card_pre_pause_text={},
card_resume_transitioned=set(),
seq=1, seq=1,
listeners={}, listeners={},
quart_app=quart_app, quart_app=quart_app,
@@ -1132,6 +1270,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
for k in expired: for k in expired:
del self.reply_to_monitoring_msg[k] del self.reply_to_monitoring_msg[k]
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
"""Return the next strictly increasing sequence for a card update."""
current = self.card_sequence_dict.get(card_id, 0)
next_seq = max(current + 1, suggested)
self.card_sequence_dict[card_id] = next_seq
return next_seq
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
"""Register a card_id under one or more source message ids."""
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
for sid in source_ids:
if not sid:
continue
self.reply_message_card_ids[sid] = card_id
bucket.add(sid)
def _drop_card_state(self, card_id: str) -> None:
"""Pop all per-card state for the given card_id."""
if not card_id:
return
for sid in self.card_id_to_source_ids.pop(card_id, set()):
self.reply_message_card_ids.pop(sid, None)
self.card_sequence_dict.pop(card_id, None)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
async def create_card_id(self, message_id): async def create_card_id(self, message_id):
try: try:
# self.logger.debug('飞书支持stream输出,创建卡片......') # self.logger.debug('飞书支持stream输出,创建卡片......')
@@ -1327,6 +1492,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.card_id_dict[message_id] = response.data.card_id self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id card_id = response.data.card_id
self.card_sequence_dict[card_id] = 0
return card_id return card_id
except Exception as e: except Exception as e:
@@ -1339,6 +1505,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
""" """
# message_id = event.message_chain.message_id # message_id = event.message_chain.message_id
source_message_id = str(event.message_chain.message_id)
existing_card_id = self.reply_message_card_ids.get(source_message_id)
if existing_card_id:
self.card_id_dict[message_id] = existing_card_id
return True
card_id = await self.create_card_id(message_id) card_id = await self.create_card_id(message_id)
content = { content = {
'type': 'card', 'type': 'card',
@@ -1377,6 +1549,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
user_msg_id = event.message_chain.message_id user_msg_id = event.message_chain.message_id
reply_msg_id = getattr(response.data, 'message_id', None) reply_msg_id = getattr(response.data, 'message_id', None)
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None) monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
# Register the card under both the user-incoming msg id (so a
# second reply_message_first_chunk for the same user message
# reuses this card) AND the bot-reply msg id (so a synthetic
# event from a form-button callback — whose Source.id equals
# the bot's card message id — hits the same card and renders
# the resume content into it).
if reply_msg_id:
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
else:
self._register_card_for_source(card_id, str(user_msg_id))
if reply_msg_id and monitoring_msg_id: if reply_msg_id and monitoring_msg_id:
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time()) self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
self._cleanup_monitoring_mapping() self._cleanup_monitoring_mapping()
@@ -1385,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return True return True
async def _open_new_form_card(
self,
message_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> str | None:
"""Spawn a fresh card to host a re-paused human-input prompt.
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
replies it to the current incoming message so it appears as the next
step in the chat, registers the new reply_msg_id so subsequent button
callbacks resolve back to it, and renders the prompt + buttons on it.
Returns the new card_id, or ``None`` if creation failed (caller is
responsible for falling back to in-place update so the workflow
remains continuable).
"""
source_message_id = getattr(message_source.message_chain, 'message_id', None)
if not source_message_id:
await self.logger.error('Cannot open new form card: source message_id missing')
return None
try:
new_card_id = await self.create_card_id(message_id)
except Exception:
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
return None
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
content = {
'type': 'card',
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
}
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(str(source_message_id))
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(content))
.msg_type('interactive')
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
try:
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
except Exception:
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
return None
if not response.success():
await self.logger.error(
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
f'log_id={response.get_log_id()}'
)
return None
reply_msg_id = getattr(response.data, 'message_id', None)
if reply_msg_id:
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
sequence = self._next_card_sequence(new_card_id, 1)
await self._update_card_layout(
card_id=new_card_id,
message_source=message_source,
text_message='',
sequence=sequence,
form_data=form_data,
show_form_prompt=True,
)
return new_card_id
async def reply_message( async def reply_message(
self, self,
message_source: platform_events.MessageEvent, message_source: platform_events.MessageEvent,
@@ -1504,45 +1773,544 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
): ):
""" """
回复消息变成更新卡片消息 回复消息变成更新卡片消息
Supports Dify form-action resume: when the runner yields a chunk with
``_resume_from_form=True``, the card transitions from buttons to a
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
for subsequent resume chunks to stream into.
When ``_open_new_card=True`` on the final chunk, the existing card is
left as-is and the pipeline will create a new card (with fresh form
buttons) for the re-pause.
""" """
# self.seq += 1
message_id = bot_message.resp_message_id message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence msg_seq = bot_message.msg_sequence
if msg_seq % 8 == 0 or is_final:
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
text_message = '' form_data = getattr(bot_message, '_form_data', None)
if text_elements: resume_from = getattr(bot_message, '_resume_from_form', False)
parts = [] action_title = getattr(bot_message, '_resume_action_title', '')
for paragraph in text_elements: resume_node_title = getattr(bot_message, '_resume_node_title', '')
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md')) open_new_card = getattr(bot_message, '_open_new_card', False)
if para_text: if action_title:
parts.append(para_text) if resume_node_title:
text_message = '\n\n'.join(parts) selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
else:
selected_notice = f'**已选择**{action_title}'
else:
selected_notice = ''
# content = { # ── decide whether this chunk needs a card update ────────────────────
# 'type': 'card_json', card_id = self.card_id_dict.get(message_id)
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}}, if not card_id:
# } return
request: ContentCardElementRequest = ( # ── convert message chain → text ─────────────────────────────────────
ContentCardElementRequest.builder() text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
.card_id(self.card_id_dict[message_id])
.element_id('streaming_txt') text_message = ''
.request_body( if text_elements:
ContentCardElementRequestBody.builder() parts = []
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204") for paragraph in text_elements:
.content(text_message) para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
.sequence(msg_seq) if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
card_sequence = self._next_card_sequence(card_id, msg_seq)
# ── RESUME: first chunk after button click ───────────────────────────
if resume_from and card_id not in self.card_resume_transitioned:
# Transition the card from the form state into resume mode.
# Preserve the text that was shown before the pause, and seed the
# resume placeholder with the current resume content if we already
# have any on the first yielded chunk.
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
initial_resume_text = text_message or '\u200b'
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=initial_resume_text,
)
self.card_resume_transitioned.add(card_id)
self.card_pre_pause_text[card_id] = pre_pause_text
self.card_streaming_text[card_id] = text_message
if not is_final:
return
# ── RESUME: subsequent chunks → full card update ─────────────────────
if resume_from and card_id in self.card_resume_transitioned:
cached = self.card_streaming_text.get(card_id, '')
if text_message != cached:
self.card_streaming_text[card_id] = text_message
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=text_message,
)
if not is_final:
return
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
if not resume_from and (msg_seq % 8 == 0 or is_final):
cached = self.card_streaming_text.get(card_id)
if text_message != cached:
self.card_streaming_text[card_id] = text_message
request: ContentCardElementRequest = (
ContentCardElementRequest.builder()
.card_id(card_id)
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
)
.build() .build()
) )
.build() response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
request, req_opt
)
if not response.success():
raise Exception(
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
# ── FINAL chunk: full card layout update ─────────────────────────────
if is_final:
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
resume_cached = self.card_streaming_text.get(card_id, '')
if form_data:
if open_new_card:
# The old card has already been laid out into resume mode
# by the resume-transition block above (notice + resume
# placeholder). Finalise it as a frozen step snapshot and
# spawn a brand-new card to host the next human-input
# prompt — each step stays visible as its own card in the
# chat history.
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
if new_card_id is None:
# Fallback: keep the existing in-place behaviour so the
# workflow remains continuable even if creating the
# new card failed.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=form_data,
resume_placeholder_text=resume_cached,
show_form_prompt=True,
)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
else:
# The old card is now a frozen snapshot; let go of its
# streaming-side state but keep its source registrations
# intact (no _drop_card_state) so historical button
# callbacks aimed at it can still be matched if needed.
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
else:
# Initial pause path: render prompt + buttons in place on
# the current card.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=final_seq,
form_data=form_data,
show_form_prompt=True,
)
# The human-input prompt itself is rendered as buttons only
# on Lark, so do not keep the hidden fallback text around;
# otherwise it will resurface after the button click.
self.card_streaming_text[card_id] = ''
self.card_pre_pause_text[card_id] = ''
else:
# Normal finish: keep pre-pause + resume content visible,
# remove buttons/notice, drop the resume placeholder.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=None,
notice_text=selected_notice if resume_from else '',
resume_placeholder_text=resume_cached,
)
self._drop_card_state(card_id)
self.card_id_dict.pop(message_id, None)
# ── media (images / files) appended at the end ───────────────────────
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def _add_form_buttons_to_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
text_message: str = '',
sequence: int = 1,
):
"""Update the entire card to include form action buttons.
Uses card.aupdate to replace the card JSON with a template that
includes the streaming text content plus interactive buttons.
"""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=form_data,
)
async def _remove_form_buttons_from_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
):
"""Replace the human-input card layout with the plain final layout."""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=None,
)
async def _update_card_layout(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
form_data: dict | None = None,
notice_text: str = '',
resume_placeholder_text: str = '',
show_form_prompt: bool = True,
):
"""Update the entire card layout.
• form_data → show interactive buttons (initial Dify pause)
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
• resume_placeholder_text → add a streaming_txt_resume markdown element
"""
form_data = form_data or {}
actions = form_data.get('actions', [])
form_token = form_data.get('form_token', '')
workflow_run_id = form_data.get('workflow_run_id', '')
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '')
# When form_data is set, the visible content is rendered inside the
# interactive container, so the top streaming text should stay empty
# to avoid duplicate text above the action area.
#
# For resume notice state, keep the existing text visible in the card
# and only add the grey "selected" notice below it.
if form_data:
render_text_message = ''
else:
render_text_message = text_message
# Determine session key from message source
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'group_{message_source.group.id}'
else:
session_key = f'person_{message_source.sender.id}'
# Build button elements matching the existing card template's thumbsup/down format
action_buttons = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
button_style = action.get('button_style', 'default')
if button_style == 'primary':
lark_button_type = 'primary'
elif button_style == 'danger':
lark_button_type = 'danger'
else:
lark_button_type = 'default'
action_buttons.append(
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': action_title},
'type': lark_button_type,
'width': 'fill',
'size': 'medium',
'hover_tips': {'tag': 'plain_text', 'content': action_title},
'behaviors': [
{
'type': 'callback',
'value': {
'form_action': True,
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'session_key': session_key,
},
}
],
'margin': '0px 0px 0px 0px',
}
) )
if is_final and bot_message.tool_calls is None: interactive_elements = []
# self.seq = 1 # 消息回复结束之后重置seq if form_data:
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片 if show_form_prompt:
interactive_elements = [
{
'tag': 'markdown',
'content': f'**[Human Input Required] {node_title}**',
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 4px 0px',
}
]
if form_content:
interactive_elements.append(
{
'tag': 'markdown',
'content': form_content,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 8px 0px',
}
)
interactive_elements.append(
{
'tag': 'column_set',
'horizontal_spacing': '8px',
'horizontal_align': 'left',
'margin': '0px 0px 0px 0px',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [btn],
'padding': '0px 0px 0px 0px',
}
for btn in action_buttons
],
}
)
# Build the full card JSON with buttons, same structure as create_card_id
# ── mid_section: either form buttons, resume notice, or empty ──
mid_section_elements = []
if form_data:
mid_section_elements = [
{
'tag': 'interactive_container',
'margin': '12px 0px 8px 0px',
'padding': '12px 12px 12px 12px',
'has_border': True,
'elements': interactive_elements,
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
elif notice_text:
mid_section_elements = [
{
'tag': 'markdown',
'content': notice_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '8px 0px 4px 0px',
'text_color': 'grey',
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
resume_elements = []
if resume_placeholder_text:
resume_elements = [
{
'tag': 'markdown',
'content': resume_placeholder_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt_resume',
},
]
card_data = {
'schema': '2.0',
'config': {
'update_multi': True,
'streaming_mode': False,
},
'body': {
'direction': 'vertical',
'padding': '12px 12px 12px 12px',
'elements': [
{
'tag': 'div',
'text': {
'tag': 'plain_text',
'content': 'LangBot',
'text_size': 'normal',
'text_align': 'left',
'text_color': 'default',
},
'icon': {
'tag': 'custom_icon',
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
},
},
{
'tag': 'markdown',
'content': render_text_message,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt',
},
*mid_section_elements,
*resume_elements,
{
'tag': 'column_set',
'horizontal_spacing': '12px',
'horizontal_align': 'right',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [
{
'tag': 'markdown',
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
'text_align': 'left',
'text_size': 'notation',
'margin': '4px 0px 0px 0px',
'icon': {
'tag': 'standard_icon',
'token': 'robot_outlined',
'color': 'grey',
},
}
],
'padding': '0px 0px 0px 0px',
'direction': 'vertical',
'horizontal_spacing': '8px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
'weight': 1,
},
*(
[]
if form_data
else [
{
'tag': 'column',
'width': '20px',
'elements': [
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': ''},
'type': 'text',
'width': 'fill',
'size': 'medium',
'icon': {'tag': 'standard_icon', 'token': 'thumbsup_outlined'},
'hover_tips': {'tag': 'plain_text', 'content': '有帮助'},
'behaviors': [{'type': 'callback', 'value': {'feedback': '有帮助'}}],
'margin': '0px 0px 0px 0px',
}
],
'padding': '0px 0px 0px 0px',
'direction': 'vertical',
'horizontal_spacing': '8px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
},
{
'tag': 'column',
'width': '30px',
'elements': [
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': ''},
'type': 'text',
'width': 'default',
'size': 'medium',
'icon': {'tag': 'standard_icon', 'token': 'thumbdown_outlined'},
'hover_tips': {'tag': 'plain_text', 'content': '无帮助'},
'behaviors': [{'type': 'callback', 'value': {'feedback': '无帮助'}}],
'margin': '0px 0px 0px 0px',
}
],
'padding': '0px 0px 0px 0px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
},
]
),
],
'margin': '0px 0px 4px 0px',
},
],
},
}
try:
tenant_key = ( tenant_key = (
message_source.source_platform_object.header.tenant_key message_source.source_platform_object.header.tenant_key
if message_source.source_platform_object if message_source.source_platform_object
@@ -1558,39 +2326,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
.tenant_access_token(tenant_access_token) .tenant_access_token(tenant_access_token)
.build() .build()
) )
# 发起请求
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
# 处理失败返回 request: UpdateCardRequest = (
if not response.success(): UpdateCardRequest.builder()
raise Exception( .card_id(card_id)
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}' .request_body(
UpdateCardRequestBody.builder()
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
.build()
) )
return .build()
)
# Send media messages when streaming is done response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
if is_final and media_items: if not response.success():
for media in media_items: await self.logger.error(
media_request: ReplyMessageRequest = ( f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
ReplyMessageRequest.builder() f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
.message_id(message_source.message_chain.message_id) )
.request_body( except Exception:
ReplyMessageRequestBody.builder() await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def is_muted(self, group_id: int) -> bool: async def is_muted(self, group_id: int) -> bool:
return False return False

View File

@@ -1,14 +1,14 @@
from __future__ import annotations from __future__ import annotations
import time
import telegram import telegram
import telegram.ext import telegram.ext
from telegram import Update from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
import telegramify_markdown import telegramify_markdown
import typing import typing
import traceback import traceback
import json
import base64 import base64
import pydantic import pydantic
@@ -189,6 +189,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: telegram.Bot = pydantic.Field(exclude=True) bot: telegram.Bot = pydantic.Field(exclude=True)
application: telegram.ext.Application = pydantic.Field(exclude=True) application: telegram.ext.Application = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
message_converter: TelegramMessageConverter = TelegramMessageConverter() message_converter: TelegramMessageConverter = TelegramMessageConverter()
event_converter: TelegramEventConverter = TelegramEventConverter() event_converter: TelegramEventConverter = TelegramEventConverter()
@@ -224,6 +225,102 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
telegram_callback, telegram_callback,
) )
) )
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
try:
data = json.loads(query.data)
if data.get('form_action') or data.get('f'):
import langbot_plugin.api.entities.builtin.provider.session as provider_session
workflow_run_id = data.get('workflow_run_id', '')
w_suffix = data.get('w', '')
action_id = data.get('action_id') or data.get('a', '')
session_key = data.get('session_key') or data.get('s', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
user_id = str(query.from_user.id)
# Find bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for b in self.ap.platform_mgr.bots:
if b.adapter is self:
bot_uuid = b.bot_entity.uuid
pipeline_uuid = b.bot_entity.use_pipeline_uuid
break
form_action_data = {
'workflow_run_id': workflow_run_id,
'w_suffix': w_suffix,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
source_platform_object=update,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
source_platform_object=update,
)
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
except Exception:
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
application.add_handler(CallbackQueryHandler(callback_query_handler))
super().__init__( super().__init__(
config=config, config=config,
logger=logger, logger=logger,
@@ -319,14 +416,19 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
update = event.source_platform_object update = event.source_platform_object
chat_id = update.effective_chat.id chat_id = update.effective_chat.id
chat_type = update.effective_chat.type chat_type = update.effective_chat.type
message_thread_id = update.message.message_thread_id effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
if chat_type == 'private': if chat_type == 'private':
draft_id = int(time.time() * 1000) import time as _time
self.msg_stream_id[message_id] = ('private', draft_id)
draft_id = int(_time.time() * 1000)
self.msg_stream_id[message_id] = ('private', draft_id)
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id) args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
await self.bot.send_message_draft(**args) try:
await self.bot.send_message_draft(**args)
except (telegram.error.RetryAfter, telegram.error.BadRequest):
pass
else: else:
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id) args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
send_msg = await self.bot.send_message(**args) send_msg = await self.bot.send_message(**args)
@@ -347,12 +449,13 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
assert isinstance(message_source.source_platform_object, Update) assert isinstance(message_source.source_platform_object, Update)
update = message_source.source_platform_object update = message_source.source_platform_object
chat_id = update.effective_chat.id chat_id = update.effective_chat.id
message_thread_id = update.message.message_thread_id effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
if message_id not in self.msg_stream_id: if message_id not in self.msg_stream_id:
return return
chat_mode, draft_id = self.msg_stream_id[message_id] chat_mode, stream_id = self.msg_stream_id[message_id]
components = await TelegramMessageConverter.yiri2target(message, self.bot) components = await TelegramMessageConverter.yiri2target(message, self.bot)
if not components or components[0]['type'] != 'text': if not components or components[0]['type'] != 'text':
@@ -361,16 +464,42 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return return
content = components[0]['text'] content = components[0]['text']
form_data = getattr(bot_message, '_form_data', None)
if form_data and is_final:
self.msg_stream_id.pop(message_id, None)
await self._send_form_action_buttons(message_source, form_data)
return
if chat_mode == 'private': if chat_mode == 'private':
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id) # Streaming via draft (ephemeral preview in the chat input area)
await self.bot.send_message_draft(**args) if (msg_seq - 1) % 8 == 0 or is_final:
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=stream_id)
try:
await self.bot.send_message_draft(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = content[:4000] + '\n\n… (truncated)'
try:
await self.bot.send_message_draft(**args)
except telegram.error.RetryAfter:
pass
else:
pass # Ignore other draft errors (cosmetic)
if is_final and bot_message.tool_calls is None: if is_final and bot_message.tool_calls is None:
del args['draft_id'] # Finalise: send the real message, discard the draft
await self.bot.send_message(**args) args = self._build_message_args(chat_id, content, message_thread_id)
try:
await self.bot.send_message(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = content[:4000] + '\n\n… (truncated)'
await self.bot.send_message(**args)
else:
raise
self.msg_stream_id.pop(message_id) self.msg_stream_id.pop(message_id)
else: else:
stream_id = draft_id # Streaming via edit_message_text (persistent message)
if (msg_seq - 1) % 8 == 0 or is_final: if (msg_seq - 1) % 8 == 0 or is_final:
args = { args = {
'message_id': stream_id, 'message_id': stream_id,
@@ -379,11 +508,68 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
} }
if self.config.get('markdown_card', False): if self.config.get('markdown_card', False):
args['parse_mode'] = 'MarkdownV2' args['parse_mode'] = 'MarkdownV2'
await self.bot.edit_message_text(**args) try:
await self.bot.edit_message_text(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = self._process_markdown(content[:4000] + '\n\n… (truncated)')
await self.bot.edit_message_text(**args)
else:
raise
if is_final and bot_message.tool_calls is None: if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id) self.msg_stream_id.pop(message_id)
async def _send_form_action_buttons(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
):
"""Send inline keyboard buttons for Dify human_input_required form actions."""
actions = form_data.get('actions', [])
node_title = form_data.get('node_title', '')
form_content = form_data.get('form_content', '')
workflow_run_id = form_data.get('workflow_run_id', '')
# Telegram callback_data is capped at 64 bytes, so we identify the
# paused workflow by the last 8 chars of workflow_run_id (unique
# within a session with overwhelming probability).
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'g:{message_source.group.id}'
else:
session_key = f'p:{message_source.sender.id}'
keyboard = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
if w_suffix:
callback_payload['w'] = w_suffix
callback_data = json.dumps(callback_payload, separators=(',', ':'))
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
reply_markup = InlineKeyboardMarkup(keyboard)
update = message_source.source_platform_object
chat_id = update.effective_chat.id
effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
text_lines = [f'[{node_title}] Please select an action:']
if form_content:
text_lines.insert(0, form_content)
args = {
'chat_id': chat_id,
'text': '\n\n'.join(text_lines),
'reply_markup': reply_markup,
}
if message_thread_id:
args['message_thread_id'] = message_thread_id
await self.bot.send_message(**args)
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
if not isinstance(event.source_platform_object, Update): if not isinstance(event.source_platform_object, Update):
return None return None

View File

@@ -2,9 +2,11 @@ from __future__ import annotations
import typing import typing
import json import json
import time
import uuid import uuid
import base64 import base64
import mimetypes import mimetypes
from collections import OrderedDict
from langbot.pkg.provider import runner from langbot.pkg.provider import runner
@@ -16,6 +18,125 @@ from langbot.libs.dify_service_api.v1 import client, errors
import httpx import httpx
# Module-level store for paused-workflow form state, keyed by session key
# (launcher_type_value + "_" + launcher_id). Each session holds an
# insertion-ordered dict of form_token -> form_data, allowing multiple
# Dify workflows to be paused simultaneously for the same session.
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
def _session_key_from_query(query: pipeline_query.Query) -> str:
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
def _prune_pending_forms(now: float | None = None) -> None:
if now is None:
now = time.time()
for session_key in list(_PENDING_FORMS.keys()):
forms = _PENDING_FORMS[session_key]
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
for token in expired_tokens:
forms.pop(token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
_prune_pending_forms()
stored = dict(form_data)
expiration_time = stored.get('expiration_time')
try:
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
except (TypeError, ValueError):
expiration_ts = 0.0
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
form_token = str(stored.get('form_token') or '')
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
# Re-insert at the end so this becomes the "latest" entry
forms.pop(form_token, None)
forms[form_token] = stored
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not form_token:
return None
return forms.get(form_token)
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
"""Look up a pending form whose workflow_run_id ends with the given suffix.
Used by adapters (e.g. Telegram) whose callback payload is too small to
carry the full form_token / workflow_run_id.
"""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not w_suffix:
return None
for token in reversed(forms):
form = forms[token]
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
return form
return None
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return None
return forms[next(reversed(forms))]
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
"""Iterate pending forms for a session, newest-first."""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
for token in reversed(list(forms.keys())):
yield forms[token]
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
"""Clear one specific pending form (by token) or all forms for the session."""
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
if form_token is None:
_PENDING_FORMS.pop(session_key, None)
return
forms.pop(form_token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
def _format_human_input_text(
node_title: str,
form_content: str,
actions: list[dict[str, typing.Any]],
) -> str:
"""Render a paused-workflow human-input prompt as plain text.
Used by adapters without rich UI (no buttons/cards) so users can reply
with the option number or the option title to resume the workflow.
"""
lines: list[str] = [f'[Human Input Required] {node_title or ""}'.rstrip()]
if form_content:
lines.append('')
lines.append(form_content)
if actions:
lines.append('')
lines.append('Reply with the number or title to continue:')
for idx, action in enumerate(actions, start=1):
title = action.get('title') or action.get('id') or ''
lines.append(f' {idx}. {title}')
return '\n'.join(lines)
@runner.runner_class('dify-service-api') @runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner): class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器""" """Dify Service API 对话请求器"""
@@ -335,11 +456,155 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id'] query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form_blocking(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
inputs = form_action.get('inputs', {})
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_finished':
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
yield provider_message.Message(
role='assistant',
content=content,
)
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
"""Locate the pending form this action targets.
Tries identifiers in order of specificity: form_token, full
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
then falls back to the newest pending form for the session.
"""
form_token = form_action.get('form_token')
if form_token:
form = _get_pending_form_by_token(session_key, form_token)
if form:
return form
workflow_run_id = form_action.get('workflow_run_id')
if workflow_run_id:
for form in _iter_pending_forms(session_key):
if form.get('workflow_run_id') == workflow_run_id:
return form
w_suffix = form_action.get('w_suffix')
if w_suffix:
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
if form:
return form
return _get_latest_pending_form(session_key)
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
"""Backfill resume fields from the matching pending form."""
if not form_action:
return None
merged_action = dict(form_action)
merged_action.pop('w_suffix', None)
pending_form = self._resolve_pending_form(session_key, form_action)
if pending_form:
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
'workflow_run_id', ''
)
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
merged_action.setdefault('user', pending_form.get('user', ''))
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
# Resolve clicked action's display title from the stored actions list
if 'action_title' not in merged_action:
clicked_id = merged_action.get('action_id', '')
for action in pending_form.get('actions', []):
if str(action.get('id', '')) == str(clicked_id):
merged_action['action_title'] = action.get('title', clicked_id)
break
return merged_action
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
"""Match plain text replies against pending Dify form actions.
Resolution order:
1. A pure digit reply (e.g. "1", "2") maps to the 1-indexed action of
the most recent pending form. Lets users on plain-text platforms
pick options without retyping titles.
2. Otherwise, iterate pending forms newest-first and match each
action's title/id case-insensitively. The first hit wins, so when
two forms share a button label the newer one resolves.
"""
normalized_text = user_text.strip().lower()
if not normalized_text:
return None
def _build(pending_form: dict, action: dict) -> dict:
return {
'form_token': pending_form.get('form_token', ''),
'workflow_run_id': pending_form.get('workflow_run_id', ''),
'action_id': action.get('id', ''),
'action_title': action.get('title', action.get('id', '')),
'node_title': pending_form.get('node_title', ''),
'inputs': pending_form.get('inputs', {}),
'user': pending_form.get('user', ''),
}
if normalized_text.isdigit():
position = int(normalized_text)
latest_form = _get_latest_pending_form(session_key)
if latest_form is not None:
actions = latest_form.get('actions', [])
if 1 <= position <= len(actions):
return _build(latest_form, actions[position - 1])
for pending_form in _iter_pending_forms(session_key):
for action in pending_form.get('actions', []):
titles = {
str(action.get('title', '')).strip().lower(),
str(action.get('id', '')).strip().lower(),
}
if normalized_text in titles:
return _build(pending_form, action)
return None
async def _workflow_messages( async def _workflow_messages(
self, query: pipeline_query.Query self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]: ) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流""" """调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
async for msg in self._submit_workflow_form_blocking(form_action):
yield msg
return
if not query.session.using_conversation.uuid: if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4()) query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -366,6 +631,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
} }
inputs.update(query.variables) inputs.update(query.variables)
human_input_yielded = False
async for chunk in self.dify_client.workflow_run( async for chunk in self.dify_client.workflow_run(
inputs=inputs, inputs=inputs,
@@ -377,6 +643,45 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] in ignored_events: if chunk['event'] in ignored_events:
continue continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', '')
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': reason.get('inputs', {}),
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
display_text = _format_human_input_text(node_title, form_content, actions)
human_input_yielded = True
yield provider_message.Message(
role='assistant',
content=display_text,
)
if chunk['event'] == 'node_started': if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue continue
@@ -399,6 +704,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg yield msg
elif chunk['event'] == 'workflow_finished': elif chunk['event'] == 'workflow_finished':
if human_input_yielded:
break
if chunk['data']['error']: if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error']) raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary']) content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
@@ -636,11 +943,153 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id'] query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""Submit human input to resume a paused Dify workflow."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
action_title = form_action.get('action_title', '') or action_id
node_title = form_action.get('node_title', '')
inputs = form_action.get('inputs', {})
messsage_idx = 0
is_final = False
think_start = False
think_end = False
workflow_contents = ''
repause_form_data: dict | None = None
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
yield_this_iteration = False
if chunk['event'] == 'workflow_finished':
is_final = True
yield_this_iteration = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
# Use a distinct name — `node_title` (the just-resolved step)
# must keep its value so the resume notice on the previous
# card still shows which step the user acted on.
paused_node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
user,
{
'workflow_run_id': new_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': paused_node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': user,
},
)
repause_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': paused_node_title,
'workflow_run_id': new_run_id,
'form_token': reason.get('form_token', ''),
}
# Ensure the final chunk has non-empty content so
# ResponseWrapper (which skips empty-content chunks) lets it
# propagate to SendResponseBackStage. Use a zero-width space
# so neither Lark nor Telegram renders visible noise — the
# adapter substitutes its own card text from _form_data.
if not workflow_contents:
workflow_contents = ''
is_final = True
yield_this_iteration = True
break
if chunk['event'] == 'text_chunk':
messsage_idx += 1
if remove_think:
if '<think>' in chunk['data']['text'] and not think_start:
think_start = True
continue
if '</think>' in chunk['data']['text'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += chunk['data']['text']
if think_start:
continue
else:
workflow_contents += chunk['data']['text']
if messsage_idx % 8 == 0:
yield_this_iteration = True
if yield_this_iteration:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
is_final=is_final,
)
msg._resume_from_form = True
if action_title:
msg._resume_action_title = action_title
if node_title:
msg._resume_node_title = node_title
if is_final and repause_form_data:
msg._form_data = repause_form_data
msg._open_new_card = True
yield msg
if is_final:
return
async def _workflow_messages_chunk( async def _workflow_messages_chunk(
self, query: pipeline_query.Query self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流""" """调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
# Resume paused workflow via submit endpoint
async for msg in self._submit_workflow_form(form_action):
yield msg
return
if not query.session.using_conversation.uuid: if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4()) query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -672,6 +1121,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
think_start = False think_start = False
think_end = False think_end = False
workflow_contents = '' workflow_contents = ''
workflow_run_id = ''
human_input_yielded = False
# Saved form data to attach to the final MessageChunk so the adapter
# can detect it when is_final=True and render buttons.
pending_form_data = None
display_text = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run( async for chunk in self.dify_client.workflow_run(
@@ -682,7 +1138,61 @@ class DifyServiceAPIRunner(runner.RequestRunner):
): ):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk)) self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events: if chunk['event'] in ignored_events:
if chunk['event'] == 'workflow_started':
workflow_run_id = chunk['data'].get('workflow_run_id', '')
continue continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
# Persist form state in module-level store keyed by session
raw_inputs = reason.get('inputs', {})
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
# Pass form render metadata to downstream stages
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
display_text = _format_human_input_text(node_title, form_content, actions)
workflow_contents += display_text + '\n'
# Save form data to attach to the final chunk later.
# We do NOT yield here — the form content will be sent
# as the final MessageChunk (with is_final=True and
# _form_data) so the adapter can update the card and
# add buttons in one pass.
pending_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'workflow_run_id': workflow_run_id,
'form_token': reason.get('form_token', ''),
}
human_input_yielded = True
if chunk['event'] == 'workflow_finished': if chunk['event'] == 'workflow_finished':
is_final = True is_final = True
if chunk['data']['error']: if chunk['data']['error']:
@@ -730,11 +1240,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg yield msg
if messsage_idx % 8 == 0 or is_final: if messsage_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk( final_content = workflow_contents if workflow_contents.strip() else ''
msg = provider_message.MessageChunk(
role='assistant', role='assistant',
content=workflow_contents, content=final_content,
is_final=is_final, is_final=is_final,
) )
# Attach form data to the final chunk for the adapter
if is_final and pending_form_data:
msg._form_data = pending_form_data
pending_form_data = None
yield msg
# If the stream ended after workflow_paused without a
# workflow_finished event, yield a final chunk so the adapter
# can update the card and add buttons.
if human_input_yielded and not is_final:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents or display_text,
is_final=True,
)
msg._form_data = pending_form_data
yield msg
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求""" """运行请求"""

File diff suppressed because one or more lines are too long