mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-15 10:16:03 +00:00
Compare commits
7 Commits
codex/agen
...
feat/card_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83b0d26e99 | ||
|
|
e08b5db625 | ||
|
|
d0f65b17ec | ||
|
|
2b533c4a00 | ||
|
|
f663d87a60 | ||
|
|
60e5b873ee | ||
|
|
b96f209b98 |
648
scripts/build_dingtalk_card_template.py
Normal file
648
scripts/build_dingtalk_card_template.py
Normal file
@@ -0,0 +1,648 @@
|
|||||||
|
"""Generate the DingTalk human-input card template JSON.
|
||||||
|
|
||||||
|
The output is wrapped in the {editorData, widgetInfo, type, mode} envelope
|
||||||
|
the DingTalk card builder expects on import. editorData is itself a JSON
|
||||||
|
string (NOT a nested object), matching real exports from the builder.
|
||||||
|
|
||||||
|
Run from the repo root: python scripts/build_dingtalk_card_template.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
OUTPUT = Path('src/langbot/templates/dingtalk_human_input_card.json')
|
||||||
|
|
||||||
|
|
||||||
|
def markdown_block(node_id, variable='content'):
|
||||||
|
"""A MarkdownBlock whose content is bound to a global variable.
|
||||||
|
|
||||||
|
Unlike BaseText (whose `text` field requires editor-side manual binding),
|
||||||
|
MarkdownBlock's `content` accepts `variableValue` binding at JSON load
|
||||||
|
time — the imported template renders the variable straight away.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
'componentName': 'MarkdownBlock',
|
||||||
|
'id': node_id,
|
||||||
|
'props': {
|
||||||
|
'mdVer': 0,
|
||||||
|
'icon': {'type': 'icon', 'icon': '', 'iconType': 'emoji'},
|
||||||
|
'content': {'variable': variable, 'variableType': 'global', 'type': 'variableValue'},
|
||||||
|
'visible': {
|
||||||
|
'type': 'dynamicVisible',
|
||||||
|
'value': True,
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'condition': {'op': 'and', 'conditions': []},
|
||||||
|
},
|
||||||
|
'isStreaming': False,
|
||||||
|
'enableLinkStatPoint': False,
|
||||||
|
'linkStatPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||||
|
'linkStatPointParams': [],
|
||||||
|
'marginTop': 6,
|
||||||
|
'marginBottom': 6,
|
||||||
|
'marginLeft': 12,
|
||||||
|
'marginRight': 12,
|
||||||
|
},
|
||||||
|
'title': 'AI 流式富文本',
|
||||||
|
'hidden': False,
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def text_block(
|
||||||
|
node_id,
|
||||||
|
text,
|
||||||
|
*,
|
||||||
|
bold=False,
|
||||||
|
gravity='left',
|
||||||
|
font_size=14,
|
||||||
|
line_height=22,
|
||||||
|
max_lines=20,
|
||||||
|
ml=12,
|
||||||
|
mr=12,
|
||||||
|
mt=4,
|
||||||
|
mb=4,
|
||||||
|
color_token='common_level1_base_color',
|
||||||
|
style_token='common_body_text_style',
|
||||||
|
):
|
||||||
|
return {
|
||||||
|
'componentName': 'BaseText',
|
||||||
|
'id': node_id,
|
||||||
|
'props': {
|
||||||
|
'text': {'i18n': False, 'type': 'dynamicString', 'content': text},
|
||||||
|
'hoverText': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||||
|
'iconType': 'iconCode',
|
||||||
|
'iconFont': {'type': 'icon', 'icon': '', 'iconType': 'ddIcon'},
|
||||||
|
'icon': {
|
||||||
|
'type': 'dynamicLink',
|
||||||
|
'value': '',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'darkIcon': {
|
||||||
|
'type': 'dynamicLink',
|
||||||
|
'value': '',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'autoWidth': False,
|
||||||
|
'maxWidth': {
|
||||||
|
'type': 'dynamicNumber',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': 0,
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'fixedWidth': {
|
||||||
|
'type': 'dynamicNumber',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': 0,
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'marginLeft': ml,
|
||||||
|
'marginRight': mr,
|
||||||
|
'marginTop': mt,
|
||||||
|
'marginBottom': mb,
|
||||||
|
'fontColorType': 'Standard',
|
||||||
|
'enableHighlight': False,
|
||||||
|
'maxLine': {
|
||||||
|
'type': 'dynamicNumber',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': max_lines,
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'color': {
|
||||||
|
'type': 'dynamicColor',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': color_token,
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'customLightColor': {
|
||||||
|
'type': 'dynamicColor',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': '#35404b',
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'customDarkColor': {
|
||||||
|
'type': 'dynamicColor',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': '#f6f6f6',
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'gravity': gravity,
|
||||||
|
'fontSizeType': 'Standard',
|
||||||
|
'styleType': 'custom',
|
||||||
|
'styleToken': style_token,
|
||||||
|
'size': 'middle',
|
||||||
|
'customFontSize': font_size,
|
||||||
|
'customFontLineHeight': line_height,
|
||||||
|
'bold': bold,
|
||||||
|
'italic': False,
|
||||||
|
'strikeThrough': False,
|
||||||
|
'lineHeight': 'normal',
|
||||||
|
'visible': {
|
||||||
|
'type': 'dynamicVisible',
|
||||||
|
'value': True,
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'condition': {'op': 'and', 'conditions': []},
|
||||||
|
},
|
||||||
|
'autoMaxWidth': False,
|
||||||
|
'innerOffset': 0,
|
||||||
|
'enableIcon': False,
|
||||||
|
'widthMode': 'match_parent',
|
||||||
|
'margin': -2,
|
||||||
|
},
|
||||||
|
'title': '基础文本',
|
||||||
|
'hidden': False,
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def button_group(node_id):
|
||||||
|
return {
|
||||||
|
'componentName': 'ButtonGroup',
|
||||||
|
'id': node_id,
|
||||||
|
'props': {
|
||||||
|
'dynamicButtons': {'type': 'variableValue', 'variableType': 'global', 'variable': 'btns'},
|
||||||
|
'marginLeft': 12,
|
||||||
|
'marginRight': 12,
|
||||||
|
'marginTop': 6,
|
||||||
|
'marginBottom': 12,
|
||||||
|
'visible': {
|
||||||
|
'type': 'dynamicVisible',
|
||||||
|
'value': True,
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'condition': {'op': 'and', 'conditions': []},
|
||||||
|
},
|
||||||
|
'responsiveLayoutWidth': 350,
|
||||||
|
'buttonsSource': 'variable',
|
||||||
|
'fixedButtonIds': [],
|
||||||
|
'fixedButtons': [],
|
||||||
|
'enableResponsiveLayout': False,
|
||||||
|
'matchContent': False,
|
||||||
|
'buttonSpacing': 8,
|
||||||
|
'margin': -2,
|
||||||
|
'innerOffset': 0,
|
||||||
|
},
|
||||||
|
'title': '按钮组',
|
||||||
|
'hidden': False,
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def build_editor_data():
|
||||||
|
component_names = [
|
||||||
|
'AIPending',
|
||||||
|
'AICardStatusContainer',
|
||||||
|
'BaseText',
|
||||||
|
'AICardContent',
|
||||||
|
'AICardContainer',
|
||||||
|
'ButtonGroup',
|
||||||
|
'MarkdownBlock',
|
||||||
|
]
|
||||||
|
components_map = [
|
||||||
|
{
|
||||||
|
'package': '@ali/dxComponent',
|
||||||
|
'version': '1.0.0',
|
||||||
|
'exportName': n,
|
||||||
|
'main': './src/index.tsx',
|
||||||
|
'destructuring': False,
|
||||||
|
'subName': '',
|
||||||
|
'componentName': n,
|
||||||
|
}
|
||||||
|
for n in component_names
|
||||||
|
]
|
||||||
|
|
||||||
|
pending_state = {
|
||||||
|
'componentName': 'AICardStatusContainer',
|
||||||
|
'id': 'node_status_pending',
|
||||||
|
'props': {
|
||||||
|
'status': 1,
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'enableExtend': False,
|
||||||
|
'autoFoldConfig': {
|
||||||
|
'needFold': True,
|
||||||
|
'heightLimit': 480,
|
||||||
|
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
|
||||||
|
},
|
||||||
|
'innerOffset': 0,
|
||||||
|
'enableCollapse': False,
|
||||||
|
'margin': -2,
|
||||||
|
},
|
||||||
|
'title': '处理中状态',
|
||||||
|
'hidden': False,
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
'children': [
|
||||||
|
{
|
||||||
|
'componentName': 'AIPending',
|
||||||
|
'id': 'node_pending_inner',
|
||||||
|
'props': {
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'pendingTip': {'type': 'dynamicString', 'content': '处理中...', 'i18n': False},
|
||||||
|
'style': 'embed',
|
||||||
|
'hideIcon': False,
|
||||||
|
},
|
||||||
|
'hidden': False,
|
||||||
|
'title': '',
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
done_state = {
|
||||||
|
'componentName': 'AICardStatusContainer',
|
||||||
|
'id': 'node_status_done',
|
||||||
|
'props': {
|
||||||
|
'status': 3,
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'enableExtend': False,
|
||||||
|
'autoFoldConfig': {
|
||||||
|
'needFold': True,
|
||||||
|
'heightLimit': 480,
|
||||||
|
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
|
||||||
|
},
|
||||||
|
'innerOffset': 0,
|
||||||
|
'enableCollapse': False,
|
||||||
|
'margin': -2,
|
||||||
|
},
|
||||||
|
'title': '完成状态',
|
||||||
|
'hidden': False,
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
'children': [
|
||||||
|
{
|
||||||
|
'componentName': 'AICardContent',
|
||||||
|
'id': 'node_done_content',
|
||||||
|
'props': {
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'visible': {
|
||||||
|
'type': 'dynamicVisible',
|
||||||
|
'value': True,
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'condition': {'op': 'and', 'conditions': []},
|
||||||
|
},
|
||||||
|
'innerOffset': 0,
|
||||||
|
'disabledWhileForward': False,
|
||||||
|
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||||
|
'statPointParams': [
|
||||||
|
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
|
||||||
|
],
|
||||||
|
'margin': -2,
|
||||||
|
'transformToEventChain': False,
|
||||||
|
'enableStatPoint': False,
|
||||||
|
},
|
||||||
|
'hidden': False,
|
||||||
|
'title': '',
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
'children': [
|
||||||
|
markdown_block('node_text_content', variable='content'),
|
||||||
|
button_group('node_btn_group'),
|
||||||
|
],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
failed_state = {
|
||||||
|
'componentName': 'AICardStatusContainer',
|
||||||
|
'id': 'node_status_failed',
|
||||||
|
'props': {
|
||||||
|
'status': 5,
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'enableExtend': False,
|
||||||
|
'autoFoldConfig': {
|
||||||
|
'needFold': True,
|
||||||
|
'heightLimit': 480,
|
||||||
|
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
|
||||||
|
},
|
||||||
|
'innerOffset': 0,
|
||||||
|
'enableCollapse': False,
|
||||||
|
'margin': -2,
|
||||||
|
},
|
||||||
|
'title': '失败状态',
|
||||||
|
'hidden': False,
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
'children': [
|
||||||
|
{
|
||||||
|
'componentName': 'AICardContent',
|
||||||
|
'id': 'node_failed_content',
|
||||||
|
'props': {
|
||||||
|
'visible': {
|
||||||
|
'type': 'dynamicVisible',
|
||||||
|
'value': True,
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'condition': {'op': 'and', 'conditions': []},
|
||||||
|
},
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'innerOffset': 0,
|
||||||
|
'disabledWhileForward': False,
|
||||||
|
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||||
|
'statPointParams': [
|
||||||
|
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
|
||||||
|
],
|
||||||
|
'margin': -2,
|
||||||
|
'transformToEventChain': False,
|
||||||
|
'enableStatPoint': False,
|
||||||
|
},
|
||||||
|
'hidden': False,
|
||||||
|
'title': '',
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
'children': [
|
||||||
|
text_block(
|
||||||
|
'node_failed_text',
|
||||||
|
'操作失败,请稍后重试。',
|
||||||
|
gravity='center',
|
||||||
|
mt=10,
|
||||||
|
mb=10,
|
||||||
|
ml=10,
|
||||||
|
mr=10,
|
||||||
|
max_lines=2,
|
||||||
|
font_size=15,
|
||||||
|
)
|
||||||
|
],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
root = {
|
||||||
|
'componentName': 'AICardContainer',
|
||||||
|
'id': 'node_root',
|
||||||
|
'props': {
|
||||||
|
'marginLeft': 0,
|
||||||
|
'marginRight': 0,
|
||||||
|
'marginTop': 0,
|
||||||
|
'marginBottom': 0,
|
||||||
|
'enablePending': True,
|
||||||
|
'enableWriting': False,
|
||||||
|
'enableDoing': False,
|
||||||
|
'enableFailed': True,
|
||||||
|
'summaryContent': {'type': 'variableValue', 'variableType': 'global', 'variable': ''},
|
||||||
|
'enableTitle': False,
|
||||||
|
'flowStatusVar': {'type': 'variableValue', 'variableType': 'global', 'variable': 'flowStatus'},
|
||||||
|
'operationPenalType': 'custom',
|
||||||
|
'enableFlowAbort': True,
|
||||||
|
'innerOffset': 0,
|
||||||
|
'enableGradientBorder': True,
|
||||||
|
'cardSizeMode': 'adaptive',
|
||||||
|
'cardSizeHeightMode': 'adaptive',
|
||||||
|
'cardSizeWidthMode': 'adaptive',
|
||||||
|
'cardSizeHeight': {
|
||||||
|
'type': 'dynamicNumber',
|
||||||
|
'valueType': 'fixed',
|
||||||
|
'value': 226,
|
||||||
|
'variable': '',
|
||||||
|
'variableType': 'global',
|
||||||
|
},
|
||||||
|
'hasBackground': False,
|
||||||
|
'backgroundType': 'Standard',
|
||||||
|
'standardBackgroundColor': 'gray',
|
||||||
|
'backgroundColor': '#F6F6F6',
|
||||||
|
'darkModeBackgroundColor': '#3C3C3C',
|
||||||
|
'enableEngineUpgrade': False,
|
||||||
|
'enableExposeStatPoint': False,
|
||||||
|
'enableDebugTool': False,
|
||||||
|
},
|
||||||
|
'hidden': False,
|
||||||
|
'title': '',
|
||||||
|
'isLocked': False,
|
||||||
|
'condition': True,
|
||||||
|
'conditionGroup': '',
|
||||||
|
'children': [pending_state, done_state, failed_state],
|
||||||
|
}
|
||||||
|
|
||||||
|
btns_var = {
|
||||||
|
'name': 'btns',
|
||||||
|
'private': False,
|
||||||
|
'type': 'buttonGroup',
|
||||||
|
'id': 'btns',
|
||||||
|
'description': '动态按钮列表(Dify actions)',
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': False,
|
||||||
|
'schema': [
|
||||||
|
{
|
||||||
|
'id': 'btns.text',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'text',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '按钮文案',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'btns.color',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'color',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '按钮颜色',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'btns.status',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'status',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '按钮状态',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'btns.event',
|
||||||
|
'type': 'dynamicEvent',
|
||||||
|
'name': 'event',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '按钮点击事件',
|
||||||
|
'schema': [
|
||||||
|
{
|
||||||
|
'id': 'btns.type',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'type',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '事件类型:openLink / sendCardRequest',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'btns.params',
|
||||||
|
'type': 'object',
|
||||||
|
'name': 'params',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '事件参数',
|
||||||
|
'schema': [
|
||||||
|
{
|
||||||
|
'id': 'btns.url',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'url',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '点击跳转链接(type=openLink)',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'btns.actionId',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'actionId',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '回传请求 id(type=sendCardRequest)',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'btns.params',
|
||||||
|
'type': 'object',
|
||||||
|
'name': 'params',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'description': '回传请求参数(type=sendCardRequest)',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
'schemaVersion': '3.0.0',
|
||||||
|
'schema': {
|
||||||
|
'componentsMap': components_map,
|
||||||
|
'componentsTree': [root],
|
||||||
|
'i18n': {},
|
||||||
|
'version': '1.0.0',
|
||||||
|
},
|
||||||
|
'mockData': {
|
||||||
|
'cardData': {
|
||||||
|
'flowStatus': 3,
|
||||||
|
'content': '请审核以下报销申请:\n\n- 申请人:张三\n- 金额:¥1,200\n- 类别:差旅',
|
||||||
|
'btns': [
|
||||||
|
{
|
||||||
|
'text': '通过',
|
||||||
|
'color': 'blue',
|
||||||
|
'status': 'normal',
|
||||||
|
'event': {
|
||||||
|
'type': 'sendCardRequest',
|
||||||
|
'params': {'actionId': 'approve', 'params': {'action_id': 'approve'}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'text': '驳回',
|
||||||
|
'color': 'gray',
|
||||||
|
'status': 'normal',
|
||||||
|
'event': {
|
||||||
|
'type': 'sendCardRequest',
|
||||||
|
'params': {'actionId': 'reject', 'params': {'action_id': 'reject'}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'text': '补充资料',
|
||||||
|
'color': 'gray',
|
||||||
|
'status': 'normal',
|
||||||
|
'event': {
|
||||||
|
'type': 'sendCardRequest',
|
||||||
|
'params': {'actionId': 'more_info', 'params': {'action_id': 'more_info'}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
'cardPrivateData': {},
|
||||||
|
'localData': {'flowStatus': '', '_cardFoldStatusLocalDataKey': ''},
|
||||||
|
'richTextData': {},
|
||||||
|
},
|
||||||
|
'renderContext': {'regenerateEnabled': '1', 'regenerateIndex': '2', 'regenerateTotal': '5'},
|
||||||
|
'editVersion': 0,
|
||||||
|
'customWidgetInfo': '',
|
||||||
|
'useCustomWidgetInfo': False,
|
||||||
|
'variableList': [
|
||||||
|
{
|
||||||
|
'id': 'content',
|
||||||
|
'type': 'markdown',
|
||||||
|
'name': 'content',
|
||||||
|
'description': '人工输入提示词(Dify form_content 含可选 node_title 前缀)',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': False,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 'flowStatus',
|
||||||
|
'type': 'string',
|
||||||
|
'name': 'flowStatus',
|
||||||
|
'description': 'AI卡片状态:pending(1)、writing(2)、done(3)、failed(5)',
|
||||||
|
'private': False,
|
||||||
|
'editorVarType': 'variables',
|
||||||
|
'disabled': True,
|
||||||
|
'visible': False,
|
||||||
|
},
|
||||||
|
btns_var,
|
||||||
|
],
|
||||||
|
'formList': [],
|
||||||
|
'customContextList': [],
|
||||||
|
'expList': [],
|
||||||
|
'localList': [],
|
||||||
|
'hsfList': [],
|
||||||
|
'lwpList': [],
|
||||||
|
'pageData': {},
|
||||||
|
'extension': {'extendType': 'AI', 'aiStatusList': [3, 1, 5], 'fileTypeList': []},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
editor_data = build_editor_data()
|
||||||
|
wrapper = {
|
||||||
|
'editorData': json.dumps(editor_data, ensure_ascii=False, separators=(',', ':')),
|
||||||
|
'widgetInfo': '',
|
||||||
|
'type': 'im',
|
||||||
|
'mode': 'card',
|
||||||
|
}
|
||||||
|
OUTPUT.write_text(json.dumps(wrapper, ensure_ascii=False, indent=2), encoding='utf-8')
|
||||||
|
print(f'wrote {OUTPUT}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
|
|||||||
if chunk.startswith('data:'):
|
if chunk.startswith('data:'):
|
||||||
yield json.loads(chunk[5:])
|
yield json.loads(chunk[5:])
|
||||||
|
|
||||||
|
async def workflow_submit(
|
||||||
|
self,
|
||||||
|
form_token: str,
|
||||||
|
workflow_run_id: str,
|
||||||
|
inputs: dict[str, typing.Any],
|
||||||
|
user: str,
|
||||||
|
action: str = '',
|
||||||
|
timeout: float = 120.0,
|
||||||
|
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
||||||
|
"""Submit human input to resume a paused workflow, then stream events.
|
||||||
|
|
||||||
|
1. POST /form/human_input/{form_token} to submit the form
|
||||||
|
2. GET /workflow/{task_id}/events to stream the resumed workflow events
|
||||||
|
"""
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {self.api_key}',
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
base_url=self.base_url,
|
||||||
|
trust_env=True,
|
||||||
|
timeout=timeout,
|
||||||
|
) as client:
|
||||||
|
# Step 1: Submit the form
|
||||||
|
payload: dict[str, typing.Any] = {
|
||||||
|
'inputs': inputs if isinstance(inputs, dict) else {},
|
||||||
|
'user': user,
|
||||||
|
'action': action,
|
||||||
|
}
|
||||||
|
|
||||||
|
submit_resp = await client.post(
|
||||||
|
f'/form/human_input/{form_token}',
|
||||||
|
headers=headers,
|
||||||
|
json=payload,
|
||||||
|
)
|
||||||
|
if submit_resp.status_code != 200:
|
||||||
|
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
|
||||||
|
|
||||||
|
# Step 2: Stream resumed workflow events
|
||||||
|
async with client.stream(
|
||||||
|
'GET',
|
||||||
|
f'/workflow/{workflow_run_id}/events',
|
||||||
|
headers={'Authorization': f'Bearer {self.api_key}'},
|
||||||
|
params={'user': user},
|
||||||
|
) as r:
|
||||||
|
async for chunk in r.aiter_lines():
|
||||||
|
if r.status_code != 200:
|
||||||
|
raise DifyAPIError(f'{r.status_code} {chunk}')
|
||||||
|
if chunk.strip() == '':
|
||||||
|
continue
|
||||||
|
if chunk.startswith('data:'):
|
||||||
|
yield json.loads(chunk[5:])
|
||||||
|
|
||||||
async def upload_file(
|
async def upload_file(
|
||||||
self,
|
self,
|
||||||
file: httpx._types.FileTypes,
|
file: httpx._types.FileTypes,
|
||||||
|
|||||||
@@ -1,17 +1,26 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import base64
|
import base64
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import time
|
import time
|
||||||
|
import uuid
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
from typing import Callable
|
from typing import Awaitable, Callable, Optional
|
||||||
import dingtalk_stream # type: ignore
|
import dingtalk_stream # type: ignore
|
||||||
import websockets
|
import websockets
|
||||||
from .EchoHandler import EchoTextHandler
|
from .EchoHandler import EchoTextHandler
|
||||||
|
from .card_callback import DingTalkCardActionHandler
|
||||||
from .dingtalkevent import DingTalkEvent
|
from .dingtalkevent import DingTalkEvent
|
||||||
import httpx
|
import httpx
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
|
||||||
|
_stdout_logger = logging.getLogger('langbot.dingtalk_api')
|
||||||
|
|
||||||
|
|
||||||
|
DINGTALK_OPENAPI_BASE = 'https://api.dingtalk.com'
|
||||||
|
|
||||||
|
|
||||||
class DingTalkClient:
|
class DingTalkClient:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -21,6 +30,7 @@ class DingTalkClient:
|
|||||||
robot_code: str,
|
robot_code: str,
|
||||||
markdown_card: bool,
|
markdown_card: bool,
|
||||||
logger: None,
|
logger: None,
|
||||||
|
card_action_callback: Optional[Callable[[dict], Awaitable[None]]] = None,
|
||||||
):
|
):
|
||||||
"""初始化 WebSocket 连接并自动启动"""
|
"""初始化 WebSocket 连接并自动启动"""
|
||||||
self.credential = dingtalk_stream.Credential(client_id, client_secret)
|
self.credential = dingtalk_stream.Credential(client_id, client_secret)
|
||||||
@@ -30,6 +40,14 @@ class DingTalkClient:
|
|||||||
# 在 DingTalkClient 中传入自己作为参数,避免循环导入
|
# 在 DingTalkClient 中传入自己作为参数,避免循环导入
|
||||||
self.EchoTextHandler = EchoTextHandler(self)
|
self.EchoTextHandler = EchoTextHandler(self)
|
||||||
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
|
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
|
||||||
|
# STREAM-mode card action button click handler. Forwards parsed payload
|
||||||
|
# to the adapter so it can resume paused Dify workflows.
|
||||||
|
self.card_action_callback = card_action_callback
|
||||||
|
self.card_action_handler = DingTalkCardActionHandler(self.client, self._on_card_action)
|
||||||
|
self.client.register_callback_handler(
|
||||||
|
dingtalk_stream.handlers.CallbackHandler.TOPIC_CARD_CALLBACK,
|
||||||
|
self.card_action_handler,
|
||||||
|
)
|
||||||
self._message_handlers = {
|
self._message_handlers = {
|
||||||
'example': [],
|
'example': [],
|
||||||
}
|
}
|
||||||
@@ -41,6 +59,16 @@ class DingTalkClient:
|
|||||||
self.logger = logger
|
self.logger = logger
|
||||||
self._stopped = False # Flag to control the event loop
|
self._stopped = False # Flag to control the event loop
|
||||||
|
|
||||||
|
async def _on_card_action(self, payload: dict) -> None:
|
||||||
|
"""Dispatch a parsed card-action payload to the adapter callback."""
|
||||||
|
if self.card_action_callback is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await self.card_action_callback(payload)
|
||||||
|
except Exception:
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(f'DingTalk card action callback error: {traceback.format_exc()}')
|
||||||
|
|
||||||
async def get_access_token(self):
|
async def get_access_token(self):
|
||||||
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
||||||
headers = {'Content-Type': 'application/json'}
|
headers = {'Content-Type': 'application/json'}
|
||||||
@@ -429,18 +457,35 @@ class DingTalkClient:
|
|||||||
'Content-Type': 'application/json',
|
'Content-Type': 'application/json',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# For enterprise-internal robots, robotCode == AppKey (client_id).
|
||||||
|
# The dedicated robot_code field is only required for scenario-group
|
||||||
|
# robots or third-party robots; fall back to client_id when empty so
|
||||||
|
# the common single-bot setup keeps working without manual config.
|
||||||
|
robot_code = self.robot_code or self.key
|
||||||
data = {
|
data = {
|
||||||
'robotCode': self.robot_code,
|
'robotCode': robot_code,
|
||||||
'userIds': [target_id],
|
'userIds': [target_id],
|
||||||
'msgKey': 'sampleText',
|
'msgKey': 'sampleText',
|
||||||
'msgParam': json.dumps({'content': content}),
|
'msgParam': json.dumps({'content': content}),
|
||||||
}
|
}
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk send_proactive_message_to_one request: robotCode=%s target_id=%s content_len=%d',
|
||||||
|
robot_code,
|
||||||
|
target_id,
|
||||||
|
len(content),
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient() as client:
|
async with httpx.AsyncClient() as client:
|
||||||
response = await client.post(url, headers=headers, json=data)
|
response = await client.post(url, headers=headers, json=data)
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk send_proactive_message_to_one response: status=%d body=%s',
|
||||||
|
response.status_code,
|
||||||
|
response.text[:500],
|
||||||
|
)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
return
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
|
_stdout_logger.exception('DingTalk send_proactive_message_to_one error')
|
||||||
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
||||||
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
||||||
|
|
||||||
@@ -456,7 +501,7 @@ class DingTalkClient:
|
|||||||
}
|
}
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
'robotCode': self.robot_code,
|
'robotCode': self.robot_code or self.key,
|
||||||
'openConversationId': target_id,
|
'openConversationId': target_id,
|
||||||
'msgKey': 'sampleText',
|
'msgKey': 'sampleText',
|
||||||
'msgParam': json.dumps({'content': content}),
|
'msgParam': json.dumps({'content': content}),
|
||||||
@@ -477,47 +522,274 @@ class DingTalkClient:
|
|||||||
quote_origin: bool = False,
|
quote_origin: bool = False,
|
||||||
card_auto_layout: bool = False,
|
card_auto_layout: bool = False,
|
||||||
):
|
):
|
||||||
card_data = {}
|
"""Create + deliver the streaming chat card for a chatbot reply.
|
||||||
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
|
|
||||||
card_data['content'] = ''
|
|
||||||
|
|
||||||
# 将用户的消息内容作为卡片的查询参数,方便后续处理
|
Replaces the old `dingtalk_stream.AICardReplier`-based path. Returns
|
||||||
if incoming_message.message_type == 'text':
|
`(None, out_track_id)` to keep call sites compatible with the
|
||||||
card_data['query'] = incoming_message.get_text_list()[0]
|
previous `(card_instance, card_instance_id)` shape — the first slot
|
||||||
|
is unused now that everything is driven by out_track_id.
|
||||||
|
"""
|
||||||
|
out_track_id = uuid.uuid4().hex
|
||||||
|
is_group = str(incoming_message.conversation_type) == '2'
|
||||||
|
if is_group:
|
||||||
|
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
|
||||||
else:
|
else:
|
||||||
card_data['query'] = '...'
|
open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
|
||||||
|
|
||||||
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message)
|
card_param_map = {'content': ''}
|
||||||
# print(card_instance)
|
if incoming_message.message_type == 'text':
|
||||||
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards
|
card_param_map['query'] = incoming_message.get_text_list()[0]
|
||||||
card_instance_id = await card_instance.async_create_and_deliver_card(
|
else:
|
||||||
temp_card_id,
|
card_param_map['query'] = '...'
|
||||||
card_data,
|
|
||||||
|
await self.create_and_deliver_card(
|
||||||
|
card_template_id=temp_card_id,
|
||||||
|
out_track_id=out_track_id,
|
||||||
|
open_space_id=open_space_id,
|
||||||
|
is_group=is_group,
|
||||||
|
card_param_map=card_param_map,
|
||||||
|
card_data_config={'autoLayout': card_auto_layout},
|
||||||
)
|
)
|
||||||
return card_instance, card_instance_id
|
return None, out_track_id
|
||||||
|
|
||||||
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
|
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
|
||||||
content_key = 'content'
|
"""Stream a single chunk into an existing card's `content` field."""
|
||||||
try:
|
try:
|
||||||
await card_instance.async_streaming(
|
await self.streaming_update_card(
|
||||||
card_instance_id,
|
out_track_id=card_instance_id,
|
||||||
content_key=content_key,
|
content_key='content',
|
||||||
content_value=content,
|
content_value=content,
|
||||||
append=False,
|
append=False,
|
||||||
finished=is_final,
|
finished=is_final,
|
||||||
failed=False,
|
failed=False,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
if self.logger:
|
||||||
self.logger.exception(e)
|
self.logger.exception(e)
|
||||||
await card_instance.async_streaming(
|
await self.streaming_update_card(
|
||||||
card_instance_id,
|
out_track_id=card_instance_id,
|
||||||
content_key=content_key,
|
content_key='content',
|
||||||
content_value='',
|
content_value='',
|
||||||
append=False,
|
append=False,
|
||||||
finished=is_final,
|
finished=is_final,
|
||||||
failed=True,
|
failed=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def create_and_deliver_card(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
card_template_id: str,
|
||||||
|
out_track_id: str,
|
||||||
|
open_space_id: str,
|
||||||
|
is_group: bool,
|
||||||
|
card_param_map: Optional[dict] = None,
|
||||||
|
callback_type: str = 'STREAM',
|
||||||
|
callback_route_key: Optional[str] = None,
|
||||||
|
support_forward: bool = True,
|
||||||
|
dynamic_data_source_configs: Optional[list] = None,
|
||||||
|
card_data_config: Optional[dict] = None,
|
||||||
|
at_user_ids: Optional[dict] = None,
|
||||||
|
recipients: Optional[list] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""POST /v1.0/card/instances/createAndDeliver.
|
||||||
|
|
||||||
|
Mirrors the SDK's `async_create_and_deliver_card` shape but exposes
|
||||||
|
the dynamic-data-source config slot so we can register a pull URL
|
||||||
|
for variable-length button lists.
|
||||||
|
"""
|
||||||
|
if not await self.check_access_token():
|
||||||
|
await self.get_access_token()
|
||||||
|
|
||||||
|
cardData: dict = {'cardParamMap': card_param_map or {}}
|
||||||
|
if card_data_config is not None:
|
||||||
|
cardData['config'] = json.dumps(card_data_config)
|
||||||
|
|
||||||
|
body: dict = {
|
||||||
|
'cardTemplateId': card_template_id,
|
||||||
|
'outTrackId': out_track_id,
|
||||||
|
'cardData': cardData,
|
||||||
|
'callbackType': callback_type,
|
||||||
|
'openSpaceId': open_space_id,
|
||||||
|
'imGroupOpenSpaceModel': {'supportForward': support_forward},
|
||||||
|
'imRobotOpenSpaceModel': {'supportForward': support_forward},
|
||||||
|
}
|
||||||
|
if callback_type == 'HTTP' and callback_route_key:
|
||||||
|
body['callbackRouteKey'] = callback_route_key
|
||||||
|
|
||||||
|
if is_group:
|
||||||
|
deliver: dict = {'robotCode': self.robot_code or self.key}
|
||||||
|
if at_user_ids:
|
||||||
|
deliver['atUserIds'] = at_user_ids
|
||||||
|
if recipients is not None:
|
||||||
|
deliver['recipients'] = recipients
|
||||||
|
body['imGroupOpenDeliverModel'] = deliver
|
||||||
|
else:
|
||||||
|
body['imRobotOpenDeliverModel'] = {'spaceType': 'IM_ROBOT'}
|
||||||
|
|
||||||
|
if dynamic_data_source_configs:
|
||||||
|
body['openDynamicDataConfig'] = {'dynamicDataSourceConfigs': dynamic_data_source_configs}
|
||||||
|
|
||||||
|
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/createAndDeliver'
|
||||||
|
headers = {
|
||||||
|
'x-acs-dingtalk-access-token': self.access_token,
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk createAndDeliver request body: %s',
|
||||||
|
json.dumps(body, ensure_ascii=False)[:1500],
|
||||||
|
)
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.post(url, headers=headers, json=body, timeout=30.0)
|
||||||
|
if response.status_code == 200:
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk createAndDeliver response: %s',
|
||||||
|
response.text[:500],
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
_stdout_logger.error(
|
||||||
|
'DingTalk createAndDeliver failed: status=%s body=%s',
|
||||||
|
response.status_code,
|
||||||
|
response.text,
|
||||||
|
)
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(
|
||||||
|
f'DingTalk createAndDeliver failed: status={response.status_code} body={response.text}'
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
_stdout_logger.exception('DingTalk createAndDeliver error')
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(f'DingTalk createAndDeliver error: {traceback.format_exc()}')
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def streaming_update_card(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
out_track_id: str,
|
||||||
|
content_key: str,
|
||||||
|
content_value: str,
|
||||||
|
append: bool,
|
||||||
|
finished: bool,
|
||||||
|
failed: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
"""PUT /v1.0/card/streaming.
|
||||||
|
|
||||||
|
Replaces `dingtalk_stream.AICardReplier.async_streaming` — same body
|
||||||
|
shape (outTrackId / guid / key / content / isFull / isFinalize /
|
||||||
|
isError) per the SDK source.
|
||||||
|
"""
|
||||||
|
if not await self.check_access_token():
|
||||||
|
await self.get_access_token()
|
||||||
|
|
||||||
|
body = {
|
||||||
|
'outTrackId': out_track_id,
|
||||||
|
'guid': uuid.uuid4().hex,
|
||||||
|
'key': content_key,
|
||||||
|
'content': content_value,
|
||||||
|
'isFull': not append,
|
||||||
|
'isFinalize': finished,
|
||||||
|
'isError': failed,
|
||||||
|
}
|
||||||
|
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/streaming'
|
||||||
|
headers = {
|
||||||
|
'x-acs-dingtalk-access-token': self.access_token,
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.put(url, headers=headers, json=body, timeout=30.0)
|
||||||
|
if response.status_code == 200:
|
||||||
|
return True
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(
|
||||||
|
f'DingTalk card streaming failed: status={response.status_code} body={response.text}'
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(f'DingTalk card streaming error: {traceback.format_exc()}')
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def update_card_data(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
out_track_id: str,
|
||||||
|
card_param_map: Optional[dict] = None,
|
||||||
|
private_data: Optional[dict] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""PUT /v1.0/card/instances — non-streaming card content update."""
|
||||||
|
if not await self.check_access_token():
|
||||||
|
await self.get_access_token()
|
||||||
|
|
||||||
|
body: dict = {
|
||||||
|
'outTrackId': out_track_id,
|
||||||
|
'cardData': {'cardParamMap': card_param_map or {}},
|
||||||
|
}
|
||||||
|
if private_data:
|
||||||
|
body['privateData'] = private_data
|
||||||
|
|
||||||
|
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances'
|
||||||
|
headers = {
|
||||||
|
'x-acs-dingtalk-access-token': self.access_token,
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk update_card_data request: out_track_id=%s body=%s',
|
||||||
|
out_track_id,
|
||||||
|
json.dumps(body, ensure_ascii=False)[:500],
|
||||||
|
)
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.put(url, headers=headers, json=body, timeout=30.0)
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk update_card_data response: status=%d body=%s',
|
||||||
|
response.status_code,
|
||||||
|
response.text[:300],
|
||||||
|
)
|
||||||
|
if response.status_code == 200:
|
||||||
|
return True
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(
|
||||||
|
f'DingTalk update card failed: status={response.status_code} body={response.text}'
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
except Exception:
|
||||||
|
_stdout_logger.exception('DingTalk update_card_data error')
|
||||||
|
if self.logger:
|
||||||
|
await self.logger.error(f'DingTalk update card error: {traceback.format_exc()}')
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def delete_card(self, *, out_track_id: str) -> bool:
|
||||||
|
"""POST /v1.0/card/instances/delete — recall a delivered card.
|
||||||
|
|
||||||
|
Used to retroactively remove the initial streaming chat card when
|
||||||
|
the workflow turns out to be paused for human input — the prompt
|
||||||
|
and buttons then live entirely on the dedicated form card.
|
||||||
|
"""
|
||||||
|
if not await self.check_access_token():
|
||||||
|
await self.get_access_token()
|
||||||
|
|
||||||
|
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/delete'
|
||||||
|
headers = {
|
||||||
|
'x-acs-dingtalk-access-token': self.access_token,
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
}
|
||||||
|
body = {'outTrackId': out_track_id, 'userIdType': 1}
|
||||||
|
try:
|
||||||
|
_stdout_logger.info('DingTalk delete_card request: out_track_id=%s', out_track_id)
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.post(url, headers=headers, json=body, timeout=30.0)
|
||||||
|
_stdout_logger.info(
|
||||||
|
'DingTalk delete_card response: status=%d body=%s',
|
||||||
|
response.status_code,
|
||||||
|
response.text[:300],
|
||||||
|
)
|
||||||
|
return response.status_code == 200
|
||||||
|
except Exception:
|
||||||
|
_stdout_logger.exception('DingTalk delete_card error')
|
||||||
|
return False
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
"""启动 WebSocket 连接,监听消息"""
|
"""启动 WebSocket 连接,监听消息"""
|
||||||
self._stopped = False
|
self._stopped = False
|
||||||
|
|||||||
96
src/langbot/libs/dingtalk_api/card_callback.py
Normal file
96
src/langbot/libs/dingtalk_api/card_callback.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
"""STREAM-mode handler for DingTalk card action button clicks.
|
||||||
|
|
||||||
|
DingTalk delivers card-action callbacks over the same WebSocket stream used
|
||||||
|
for chatbot messages, under the topic `/v1.0/card/instances/callback`. This
|
||||||
|
module subclasses `dingtalk_stream.CallbackHandler` and forwards the parsed
|
||||||
|
payload to a coroutine the adapter registers, so the resume-paused-workflow
|
||||||
|
logic stays in the platform adapter where it belongs.
|
||||||
|
|
||||||
|
The `CardCallbackMessage` returned by `from_dict` exposes:
|
||||||
|
|
||||||
|
* `card_instance_id` (from `outTrackId`) — the card whose button was clicked
|
||||||
|
* `user_id` — the clicker's userId
|
||||||
|
* `content` — parsed JSON; the click params live here. Where exactly inside
|
||||||
|
`content` they sit depends on the template binding. We probe
|
||||||
|
the common paths.
|
||||||
|
* `extension` — parsed JSON; any extra data we set when delivering the card.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Awaitable, Callable, Optional
|
||||||
|
|
||||||
|
import dingtalk_stream # type: ignore
|
||||||
|
from dingtalk_stream import AckMessage
|
||||||
|
from dingtalk_stream.card_callback import CardCallbackMessage
|
||||||
|
|
||||||
|
|
||||||
|
_PARAM_PATHS = (
|
||||||
|
('params',),
|
||||||
|
('cardPrivateData', 'params'),
|
||||||
|
('userPrivateData', 'params'),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_params(content: dict) -> dict:
|
||||||
|
"""Return the action params dict regardless of where the template put it."""
|
||||||
|
for path in _PARAM_PATHS:
|
||||||
|
node = content
|
||||||
|
for key in path:
|
||||||
|
if not isinstance(node, dict):
|
||||||
|
node = None
|
||||||
|
break
|
||||||
|
node = node.get(key)
|
||||||
|
if node is None:
|
||||||
|
break
|
||||||
|
if isinstance(node, dict) and node:
|
||||||
|
return node
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
class DingTalkCardActionHandler(dingtalk_stream.CallbackHandler):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
dingtalk_stream_client,
|
||||||
|
on_action: Optional[Callable[[dict], Awaitable[None]]] = None,
|
||||||
|
):
|
||||||
|
super().__init__()
|
||||||
|
self.dingtalk_client = dingtalk_stream_client
|
||||||
|
self.on_action = on_action
|
||||||
|
|
||||||
|
async def process(self, callback: dingtalk_stream.CallbackMessage):
|
||||||
|
try:
|
||||||
|
message = CardCallbackMessage.from_dict(callback.data)
|
||||||
|
params = _extract_params(message.content if isinstance(message.content, dict) else {})
|
||||||
|
|
||||||
|
# `CardCallbackMessage.from_dict` does not surface `actionId` (the
|
||||||
|
# top-level field that ButtonGroup's sendCardRequest event puts
|
||||||
|
# there). Pull it from the raw callback.data instead.
|
||||||
|
raw = callback.data if isinstance(callback.data, dict) else {}
|
||||||
|
action_id = raw.get('actionId') or ''
|
||||||
|
if not action_id:
|
||||||
|
# Some templates nest it under actionData / cardPrivateData.
|
||||||
|
action_data = raw.get('actionData') or {}
|
||||||
|
if isinstance(action_data, dict):
|
||||||
|
action_id = action_data.get('actionId') or action_id
|
||||||
|
if not action_id:
|
||||||
|
cpd = action_data.get('cardPrivateData') or {}
|
||||||
|
if isinstance(cpd, dict):
|
||||||
|
ids = cpd.get('actionIds')
|
||||||
|
if isinstance(ids, list) and ids:
|
||||||
|
action_id = str(ids[0])
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
'out_track_id': message.card_instance_id,
|
||||||
|
'user_id': message.user_id,
|
||||||
|
'corp_id': message.corp_id,
|
||||||
|
'action_id': action_id,
|
||||||
|
'params': params,
|
||||||
|
'raw_content': message.content,
|
||||||
|
'extension': message.extension if isinstance(message.extension, dict) else {},
|
||||||
|
}
|
||||||
|
if self.on_action is not None:
|
||||||
|
await self.on_action(payload)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f'DingTalkCardActionHandler.process error: {e}')
|
||||||
|
return AckMessage.STATUS_OK, 'OK'
|
||||||
@@ -157,7 +157,7 @@ class RuntimePipeline:
|
|||||||
bot_message=query.resp_messages[-1],
|
bot_message=query.resp_messages[-1],
|
||||||
message=result.user_notice,
|
message=result.user_notice,
|
||||||
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
|
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
|
||||||
is_final=[msg.is_final for msg in query.resp_messages][0],
|
is_final=[msg.is_final for msg in query.resp_messages][-1],
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await query.adapter.reply_message(
|
await query.adapter.reply_message(
|
||||||
|
|||||||
@@ -42,9 +42,13 @@ class QueryPool:
|
|||||||
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
|
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
|
||||||
pipeline_uuid: typing.Optional[str] = None,
|
pipeline_uuid: typing.Optional[str] = None,
|
||||||
routed_by_rule: bool = False,
|
routed_by_rule: bool = False,
|
||||||
|
variables: typing.Optional[dict[str, typing.Any]] = None,
|
||||||
) -> pipeline_query.Query:
|
) -> pipeline_query.Query:
|
||||||
async with self.condition:
|
async with self.condition:
|
||||||
query_id = self.query_id_counter
|
query_id = self.query_id_counter
|
||||||
|
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
|
||||||
|
if variables:
|
||||||
|
initial_variables.update(variables)
|
||||||
query = pipeline_query.Query(
|
query = pipeline_query.Query(
|
||||||
bot_uuid=bot_uuid,
|
bot_uuid=bot_uuid,
|
||||||
query_id=query_id,
|
query_id=query_id,
|
||||||
@@ -53,7 +57,7 @@ class QueryPool:
|
|||||||
sender_id=sender_id,
|
sender_id=sender_id,
|
||||||
message_event=message_event,
|
message_event=message_event,
|
||||||
message_chain=message_chain,
|
message_chain=message_chain,
|
||||||
variables={'_routed_by_rule': routed_by_rule},
|
variables=initial_variables,
|
||||||
resp_messages=[],
|
resp_messages=[],
|
||||||
resp_message_chain=[],
|
resp_message_chain=[],
|
||||||
adapter=adapter,
|
adapter=adapter,
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
|
|||||||
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
|
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
|
||||||
# TODO 命令与流式的兼容性问题
|
# TODO 命令与流式的兼容性问题
|
||||||
if await query.adapter.is_stream_output_supported() and has_chunks:
|
if await query.adapter.is_stream_output_supported() and has_chunks:
|
||||||
is_final = [msg.is_final for msg in query.resp_messages][0]
|
is_final = [msg.is_final for msg in query.resp_messages][-1]
|
||||||
await query.adapter.reply_message_chunk(
|
await query.adapter.reply_message_chunk(
|
||||||
message_source=query.message_event,
|
message_source=query.message_event,
|
||||||
bot_message=query.resp_messages[-1],
|
bot_message=query.resp_messages[-1],
|
||||||
|
|||||||
@@ -501,6 +501,8 @@ class PlatformManager:
|
|||||||
bot_entity.adapter_config,
|
bot_entity.adapter_config,
|
||||||
logger,
|
logger,
|
||||||
)
|
)
|
||||||
|
if hasattr(adapter_inst, 'ap'):
|
||||||
|
adapter_inst.ap = self.ap
|
||||||
|
|
||||||
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook)
|
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook)
|
||||||
if hasattr(adapter_inst, 'set_bot_uuid'):
|
if hasattr(adapter_inst, 'set_bot_uuid'):
|
||||||
|
|||||||
@@ -1,13 +1,19 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
import traceback
|
import traceback
|
||||||
import typing
|
import typing
|
||||||
|
import uuid
|
||||||
|
|
||||||
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
|
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
|
||||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||||
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
||||||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||||||
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
||||||
|
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||||
from langbot.libs.dingtalk_api.api import DingTalkClient
|
from langbot.libs.dingtalk_api.api import DingTalkClient
|
||||||
import datetime
|
import datetime
|
||||||
from langbot.pkg.platform.logger import EventLogger
|
from langbot.pkg.platform.logger import EventLogger
|
||||||
|
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
|
||||||
|
|
||||||
|
|
||||||
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
||||||
@@ -170,6 +176,13 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
card_instance_id_dict: (
|
card_instance_id_dict: (
|
||||||
dict # 回复卡片消息字典,key为消息id,value为回复卡片实例id,用于在流式消息时判断是否发送到指定卡片
|
dict # 回复卡片消息字典,key为消息id,value为回复卡片实例id,用于在流式消息时判断是否发送到指定卡片
|
||||||
)
|
)
|
||||||
|
# outTrackId → form snapshot {session_key, launcher_type, launcher_id, form_token,
|
||||||
|
# workflow_run_id, actions, node_title, form_content, expires_at, open_space_id,
|
||||||
|
# user_id_hint, current_text}. Lookup keys for the data-source pull endpoint and
|
||||||
|
# the STREAM card-action callback.
|
||||||
|
card_state: dict
|
||||||
|
ap: typing.Any = None
|
||||||
|
bot_uuid: str = ''
|
||||||
|
|
||||||
def __init__(self, config: dict, logger: EventLogger):
|
def __init__(self, config: dict, logger: EventLogger):
|
||||||
required_keys = [
|
required_keys = [
|
||||||
@@ -194,10 +207,15 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
config=config,
|
config=config,
|
||||||
logger=logger,
|
logger=logger,
|
||||||
card_instance_id_dict={},
|
card_instance_id_dict={},
|
||||||
|
card_state={},
|
||||||
bot_account_id=bot_account_id,
|
bot_account_id=bot_account_id,
|
||||||
bot=bot,
|
bot=bot,
|
||||||
listeners={},
|
listeners={},
|
||||||
)
|
)
|
||||||
|
# Wire the card-action callback after super().__init__ so we can reference
|
||||||
|
# self.* — the client's handler stores this as a soft reference and reads
|
||||||
|
# it at fire time.
|
||||||
|
self.bot.card_action_callback = self._on_card_action
|
||||||
|
|
||||||
async def reply_message(
|
async def reply_message(
|
||||||
self,
|
self,
|
||||||
@@ -222,28 +240,79 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
quote_origin: bool = False,
|
quote_origin: bool = False,
|
||||||
is_final: bool = False,
|
is_final: bool = False,
|
||||||
):
|
):
|
||||||
# event = await DingTalkEventConverter.yiri2target(
|
|
||||||
# message_source,
|
|
||||||
# )
|
|
||||||
# incoming_message = event.incoming_message
|
|
||||||
|
|
||||||
# msg_id = incoming_message.message_id
|
|
||||||
message_id = bot_message.resp_message_id
|
message_id = bot_message.resp_message_id
|
||||||
msg_seq = bot_message.msg_sequence
|
msg_seq = bot_message.msg_sequence
|
||||||
|
|
||||||
|
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||||
|
form_data = getattr(bot_message, '_form_data', None)
|
||||||
|
if is_final and self.ap is not None:
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk reply_message_chunk final: form_data_present={form_data is not None}, '
|
||||||
|
f'form_template_configured={bool(form_template_id)}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if form_data and is_final:
|
||||||
|
await self._handle_form_chunk(message_source, bot_message, message, form_data)
|
||||||
|
return
|
||||||
|
|
||||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||||
markdown_enabled = self.config.get('markdown_card', False)
|
markdown_enabled = self.config.get('markdown_card', False)
|
||||||
content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
|
content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
|
||||||
|
|
||||||
card_instance, card_instance_id = self.card_instance_id_dict[message_id]
|
|
||||||
if not content and bot_message.content:
|
if not content and bot_message.content:
|
||||||
content = bot_message.content # 兼容直接传入content的情况
|
content = bot_message.content # 兼容直接传入content的情况
|
||||||
# print(card_instance_id)
|
|
||||||
|
chat_card_entry = self.card_instance_id_dict.get(message_id)
|
||||||
|
if chat_card_entry is None:
|
||||||
|
# No streaming chat card was created for this query — common
|
||||||
|
# path for synthetic events (e.g. resumed workflow after a
|
||||||
|
# button click). Lazy-create one so the resumed output streams
|
||||||
|
# into a card just like a normal conversation, instead of
|
||||||
|
# being deferred and sent in one shot on is_final.
|
||||||
|
if not content:
|
||||||
|
return # nothing to stream yet
|
||||||
|
chat_card_entry = await self._lazy_create_resume_chat_card(message_source, message_id)
|
||||||
|
if chat_card_entry is None:
|
||||||
|
# Lazy-create failed (no template configured); fall back
|
||||||
|
# to a one-shot proactive message on the final chunk.
|
||||||
|
if is_final:
|
||||||
|
await self._send_proactive_to_event(message_source, content)
|
||||||
|
return
|
||||||
|
|
||||||
|
card_instance, card_instance_id = chat_card_entry
|
||||||
if content:
|
if content:
|
||||||
|
if form_template_id:
|
||||||
|
# The form template's MarkdownBlock has `isStreaming: false`
|
||||||
|
# — the streaming endpoint (PUT /v1.0/card/streaming) does
|
||||||
|
# not propagate to non-streaming components. Use the full
|
||||||
|
# update_card_data PUT instead so the content actually
|
||||||
|
# appears in the card body.
|
||||||
|
try:
|
||||||
|
await self.bot.update_card_data(
|
||||||
|
out_track_id=card_instance_id,
|
||||||
|
card_param_map={
|
||||||
|
'content': content,
|
||||||
|
'btns': '[]',
|
||||||
|
'flowStatus': '3' if is_final else '1',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.exception('DingTalk: update card content failed')
|
||||||
|
else:
|
||||||
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
|
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
|
||||||
if is_final and bot_message.tool_calls is None:
|
if is_final:
|
||||||
# self.seq = 1 # 消息回复结束之后重置seq
|
if form_template_id and not content:
|
||||||
self.card_instance_id_dict.pop(message_id) # 消息回复结束之后删除卡片实例id
|
# Empty final chunk still needs to leave the card with
|
||||||
|
# flowStatus=3 so the spinner stops.
|
||||||
|
try:
|
||||||
|
await self.bot.update_card_data(
|
||||||
|
out_track_id=card_instance_id,
|
||||||
|
card_param_map={'flowStatus': '3'},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
if bot_message.tool_calls is None:
|
||||||
|
self.card_instance_id_dict.pop(message_id, None)
|
||||||
|
|
||||||
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
|
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
|
||||||
markdown_enabled = self.config.get('markdown_card', False)
|
markdown_enabled = self.config.get('markdown_card', False)
|
||||||
@@ -260,16 +329,56 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
return is_stream
|
return is_stream
|
||||||
|
|
||||||
async def create_message_card(self, message_id, event):
|
async def create_message_card(self, message_id, event):
|
||||||
card_template_id = self.config['card_template_id']
|
# When a form template is configured, every card in the conversation
|
||||||
|
# uses it (chat output, form prompts, post-click states). The chat
|
||||||
|
# template fallback only kicks in if no form template is configured.
|
||||||
|
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||||
|
legacy_template_id = self.config.get('card_template_id', '')
|
||||||
|
|
||||||
|
# Synthetic events (e.g. card button clicks) have no inbound chatbot
|
||||||
|
# message — skip card creation. The lazy-create path in
|
||||||
|
# reply_message_chunk will spawn a fresh card when the first
|
||||||
|
# non-empty resume chunk arrives.
|
||||||
|
if event is None or event.source_platform_object is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if form_template_id:
|
||||||
|
# Defer card creation to the first non-empty chunk. If the Dify
|
||||||
|
# workflow pauses immediately for human input without producing
|
||||||
|
# any LLM text first, no chat card is created at all — only the
|
||||||
|
# form card gets delivered. Lazy-create lives in
|
||||||
|
# reply_message_chunk → _lazy_create_resume_chat_card.
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Legacy chat-card path (no form template configured).
|
||||||
incoming_message = event.source_platform_object.incoming_message
|
incoming_message = event.source_platform_object.incoming_message
|
||||||
# message_id = incoming_message.message_id
|
|
||||||
card_auto_layout = self.config.get('card_ auto_layout', False)
|
card_auto_layout = self.config.get('card_ auto_layout', False)
|
||||||
card_instance, card_instance_id = await self.bot.create_and_card(
|
card_instance, card_instance_id = await self.bot.create_and_card(
|
||||||
card_template_id, incoming_message, card_auto_layout=card_auto_layout
|
legacy_template_id, incoming_message, card_auto_layout=card_auto_layout
|
||||||
)
|
)
|
||||||
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
|
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def _session_key_from_event(self, event) -> str:
|
||||||
|
"""Return launcher_type_launcher_id for an event, '' if unrecoverable."""
|
||||||
|
if event is None:
|
||||||
|
return ''
|
||||||
|
spo = event.source_platform_object
|
||||||
|
if spo is None:
|
||||||
|
try:
|
||||||
|
if isinstance(event, platform_events.GroupMessage):
|
||||||
|
return f'group_{event.group.id}'
|
||||||
|
return f'person_{event.sender.id}'
|
||||||
|
except Exception:
|
||||||
|
return ''
|
||||||
|
try:
|
||||||
|
inc = spo.incoming_message
|
||||||
|
if str(inc.conversation_type) == '2':
|
||||||
|
return f'group_{inc.conversation_id}'
|
||||||
|
return f'person_{inc.sender_staff_id}'
|
||||||
|
except Exception:
|
||||||
|
return ''
|
||||||
|
|
||||||
def register_listener(
|
def register_listener(
|
||||||
self,
|
self,
|
||||||
event_type: typing.Type[platform_events.Event],
|
event_type: typing.Type[platform_events.Event],
|
||||||
@@ -309,3 +418,449 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
],
|
],
|
||||||
):
|
):
|
||||||
return super().unregister_listener(event_type, callback)
|
return super().unregister_listener(event_type, callback)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Dify human-input form support
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def set_bot_uuid(self, bot_uuid: str):
|
||||||
|
"""Receive the bot uuid from the platform manager.
|
||||||
|
|
||||||
|
Used to compose the public-facing unified-webhook URL for the card
|
||||||
|
dynamic-data-source pull endpoint.
|
||||||
|
"""
|
||||||
|
self.bot_uuid = bot_uuid
|
||||||
|
|
||||||
|
def _derive_open_space(self, message_source: platform_events.MessageEvent) -> tuple[str, bool]:
|
||||||
|
"""Return (openSpaceId, is_group) for the given inbound event."""
|
||||||
|
if isinstance(message_source, platform_events.GroupMessage):
|
||||||
|
return f'dtv1.card//IM_GROUP.{message_source.group.id}', True
|
||||||
|
return f'dtv1.card//IM_ROBOT.{message_source.sender.id}', False
|
||||||
|
|
||||||
|
def _derive_session_descriptor(
|
||||||
|
self, message_source: platform_events.MessageEvent
|
||||||
|
) -> tuple[provider_session.LauncherTypes, str, str]:
|
||||||
|
"""Return (launcher_type, launcher_id, sender_user_id) for routing."""
|
||||||
|
if isinstance(message_source, platform_events.GroupMessage):
|
||||||
|
return (
|
||||||
|
provider_session.LauncherTypes.GROUP,
|
||||||
|
str(message_source.group.id),
|
||||||
|
str(message_source.sender.id),
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
provider_session.LauncherTypes.PERSON,
|
||||||
|
str(message_source.sender.id),
|
||||||
|
str(message_source.sender.id),
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _handle_form_chunk(
|
||||||
|
self,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
bot_message,
|
||||||
|
message: platform_message.MessageChain,
|
||||||
|
form_data: dict,
|
||||||
|
) -> None:
|
||||||
|
"""Finalize the current chat card and deliver a new form card.
|
||||||
|
|
||||||
|
Multi-card flow: every Dify pause spawns its own card. The card the
|
||||||
|
chat was streaming into (if any) is closed out via streaming_update
|
||||||
|
with finished=True so its spinner stops; a fresh card is then
|
||||||
|
delivered carrying the prompt + buttons.
|
||||||
|
"""
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk _handle_form_chunk: actions={len(form_data.get("actions") or [])}, '
|
||||||
|
f'node_title={form_data.get("node_title", "")!r}'
|
||||||
|
)
|
||||||
|
message_id = bot_message.resp_message_id
|
||||||
|
template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||||
|
|
||||||
|
# Finalize the previous chat card so its spinner stops. Use the
|
||||||
|
# already-streamed text as the final content (or zero-width space
|
||||||
|
# when nothing streamed, to satisfy any non-empty-content guards).
|
||||||
|
chat_card_entry = self.card_instance_id_dict.pop(message_id, None)
|
||||||
|
if chat_card_entry is not None:
|
||||||
|
_, chat_out_track_id = chat_card_entry
|
||||||
|
markdown_enabled = self.config.get('markdown_card', False)
|
||||||
|
text_content, _ = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
|
||||||
|
if not text_content and bot_message.content:
|
||||||
|
text_content = bot_message.content
|
||||||
|
try:
|
||||||
|
await self.bot.send_card_message(None, chat_out_track_id, text_content or '', True)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'DingTalk: finalize chat card before form failed: {traceback.format_exc()}')
|
||||||
|
# When the chat card uses the form template, also flip flowStatus
|
||||||
|
# to 3 so it leaves the pending state visibly.
|
||||||
|
if template_id:
|
||||||
|
try:
|
||||||
|
await self.bot.update_card_data(
|
||||||
|
out_track_id=chat_out_track_id,
|
||||||
|
card_param_map={'flowStatus': '3'},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not template_id:
|
||||||
|
# No form template configured — fall back to plain text so users
|
||||||
|
# can still reply with the option number or title.
|
||||||
|
await self.send_message_text_form(message_source, form_data)
|
||||||
|
return
|
||||||
|
|
||||||
|
await self._send_form_card(message_source, form_data, template_id)
|
||||||
|
|
||||||
|
async def _send_form_card(
|
||||||
|
self,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
form_data: dict,
|
||||||
|
template_id: str,
|
||||||
|
) -> None:
|
||||||
|
"""Deliver a new card pre-loaded with the human-input prompt + buttons."""
|
||||||
|
out_track_id = uuid.uuid4().hex
|
||||||
|
open_space_id, is_group = self._derive_open_space(message_source)
|
||||||
|
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
|
||||||
|
session_key = f'{launcher_type.value}_{launcher_id}'
|
||||||
|
|
||||||
|
actions = list(form_data.get('actions') or [])
|
||||||
|
node_title = form_data.get('node_title', '') or 'Human Input Required'
|
||||||
|
form_content = form_data.get('form_content', '') or ''
|
||||||
|
|
||||||
|
self.card_state[out_track_id] = {
|
||||||
|
'session_key': session_key,
|
||||||
|
'launcher_type': launcher_type.value,
|
||||||
|
'launcher_id': launcher_id,
|
||||||
|
'sender_user_id': sender_user_id,
|
||||||
|
'form_token': form_data.get('form_token', ''),
|
||||||
|
'workflow_run_id': form_data.get('workflow_run_id', ''),
|
||||||
|
'actions': actions,
|
||||||
|
'node_title': node_title,
|
||||||
|
'form_content': form_content,
|
||||||
|
'open_space_id': open_space_id,
|
||||||
|
'is_group': is_group,
|
||||||
|
}
|
||||||
|
|
||||||
|
parts = []
|
||||||
|
if node_title:
|
||||||
|
parts.append(f'**{node_title}**')
|
||||||
|
if form_content:
|
||||||
|
parts.append(form_content)
|
||||||
|
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
|
||||||
|
|
||||||
|
btns = []
|
||||||
|
for idx, action in enumerate(actions):
|
||||||
|
action_id = str(action.get('id') or '')
|
||||||
|
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
|
||||||
|
style = (action.get('button_style') or '').lower()
|
||||||
|
if style == 'primary' or (style == '' and idx == 0):
|
||||||
|
color = 'blue'
|
||||||
|
elif style == 'danger':
|
||||||
|
color = 'red'
|
||||||
|
else:
|
||||||
|
color = 'gray'
|
||||||
|
btns.append(
|
||||||
|
{
|
||||||
|
'text': title,
|
||||||
|
'color': color,
|
||||||
|
'status': 'normal',
|
||||||
|
'event': {
|
||||||
|
'type': 'sendCardRequest',
|
||||||
|
'params': {
|
||||||
|
'actionId': action_id,
|
||||||
|
'params': {'action_id': action_id, 'out_track_id': out_track_id},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk _send_form_card: out_track_id={out_track_id} template_id={template_id} '
|
||||||
|
f'open_space_id={open_space_id} is_group={is_group} btns={len(btns)}'
|
||||||
|
)
|
||||||
|
await self.bot.create_and_deliver_card(
|
||||||
|
card_template_id=template_id,
|
||||||
|
out_track_id=out_track_id,
|
||||||
|
open_space_id=open_space_id,
|
||||||
|
is_group=is_group,
|
||||||
|
card_param_map={
|
||||||
|
'content': display_content,
|
||||||
|
'btns': json.dumps(btns, ensure_ascii=False),
|
||||||
|
'flowStatus': '3',
|
||||||
|
},
|
||||||
|
callback_type='STREAM',
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'DingTalk: deliver form card failed: {traceback.format_exc()}')
|
||||||
|
await self.send_message_text_form(message_source, form_data)
|
||||||
|
self.card_state.pop(out_track_id, None)
|
||||||
|
|
||||||
|
async def _lazy_create_resume_chat_card(
|
||||||
|
self,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
message_id: str,
|
||||||
|
) -> typing.Optional[tuple]:
|
||||||
|
"""Create a new card for resumed-workflow streaming output.
|
||||||
|
|
||||||
|
Used after a button click triggers a synthetic event — no inbound
|
||||||
|
chatbot message means no card was created upstream, so we spin one
|
||||||
|
up here when the first non-empty chunk arrives. Prefers the form
|
||||||
|
template (so empty `btns` keep the layout consistent across the
|
||||||
|
whole conversation); falls back to the legacy chat template.
|
||||||
|
"""
|
||||||
|
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||||
|
legacy_template_id = (self.config.get('card_template_id') or '').strip()
|
||||||
|
template_id = form_template_id or legacy_template_id
|
||||||
|
if not template_id:
|
||||||
|
return None
|
||||||
|
out_track_id = uuid.uuid4().hex
|
||||||
|
open_space_id, is_group = self._derive_open_space(message_source)
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk _lazy_create_resume_chat_card: out_track_id={out_track_id} '
|
||||||
|
f'open_space_id={open_space_id} is_group={is_group} '
|
||||||
|
f'using_form_template={bool(form_template_id)}'
|
||||||
|
)
|
||||||
|
if form_template_id:
|
||||||
|
card_param_map = {'content': '', 'btns': '[]', 'flowStatus': '1'}
|
||||||
|
card_data_config = None
|
||||||
|
else:
|
||||||
|
card_param_map = {'content': '', 'query': '...'}
|
||||||
|
card_data_config = {'autoLayout': self.config.get('card_auto_layout', False)}
|
||||||
|
try:
|
||||||
|
success = await self.bot.create_and_deliver_card(
|
||||||
|
card_template_id=template_id,
|
||||||
|
out_track_id=out_track_id,
|
||||||
|
open_space_id=open_space_id,
|
||||||
|
is_group=is_group,
|
||||||
|
card_param_map=card_param_map,
|
||||||
|
card_data_config=card_data_config,
|
||||||
|
callback_type='STREAM',
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.exception('DingTalk: lazy create resume chat card failed')
|
||||||
|
return None
|
||||||
|
if not success:
|
||||||
|
return None
|
||||||
|
entry = (None, out_track_id)
|
||||||
|
self.card_instance_id_dict[message_id] = entry
|
||||||
|
return entry
|
||||||
|
|
||||||
|
async def send_message_text_form(
|
||||||
|
self,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
form_data: dict,
|
||||||
|
) -> None:
|
||||||
|
"""Fallback: send the human-input prompt as plain text."""
|
||||||
|
display_text = _format_human_input_text(
|
||||||
|
form_data.get('node_title', ''),
|
||||||
|
form_data.get('form_content', ''),
|
||||||
|
form_data.get('actions', []) or [],
|
||||||
|
)
|
||||||
|
await self._send_proactive_to_event(message_source, display_text)
|
||||||
|
|
||||||
|
async def _send_proactive_to_event(
|
||||||
|
self,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
content: str,
|
||||||
|
) -> None:
|
||||||
|
"""Send `content` as a proactive message to the conversation behind
|
||||||
|
`message_source`. Used when no inbound chatbot message exists to
|
||||||
|
anchor a card on (e.g. resumed flows triggered by card actions).
|
||||||
|
"""
|
||||||
|
if not content:
|
||||||
|
return
|
||||||
|
if self.ap is not None:
|
||||||
|
target = (
|
||||||
|
str(message_source.group.id)
|
||||||
|
if isinstance(message_source, platform_events.GroupMessage)
|
||||||
|
else str(message_source.sender.id)
|
||||||
|
)
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk _send_proactive_to_event: target={target} '
|
||||||
|
f'is_group={isinstance(message_source, platform_events.GroupMessage)} content_len={len(content)}'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
if isinstance(message_source, platform_events.GroupMessage):
|
||||||
|
await self.bot.send_proactive_message_to_group(str(message_source.group.id), content)
|
||||||
|
else:
|
||||||
|
await self.bot.send_proactive_message_to_one(str(message_source.sender.id), content)
|
||||||
|
except Exception:
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.exception('DingTalk: send proactive message failed')
|
||||||
|
await self.logger.error(f'DingTalk: send proactive message failed: {traceback.format_exc()}')
|
||||||
|
|
||||||
|
async def _on_card_action(self, payload: dict) -> None:
|
||||||
|
"""Translate a card button click into a synthetic query.
|
||||||
|
|
||||||
|
Reads the clicked button's ``actionId`` (the real Dify action id —
|
||||||
|
the ButtonGroup template sends it back via `event.params.actionId`),
|
||||||
|
recovers the action title from ``card_state``, and enqueues a
|
||||||
|
synthetic `_dify_form_action` query the same way Lark / Telegram do.
|
||||||
|
"""
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk _on_card_action received: out_track_id={payload.get("out_track_id")} '
|
||||||
|
f'payload_action_id={payload.get("action_id")!r} params={payload.get("params")!r}'
|
||||||
|
)
|
||||||
|
out_track_id = payload.get('out_track_id') or ''
|
||||||
|
params = payload.get('params') or {}
|
||||||
|
# ButtonGroup `sendCardRequest` events surface the click id at the
|
||||||
|
# callback top level as `actionId`; fall back to `params.action_id`
|
||||||
|
# (alternate template wiring) and `params.actionId`.
|
||||||
|
raw_action_id = (
|
||||||
|
(payload.get('action_id') or '').strip()
|
||||||
|
or (params.get('action_id') or '').strip()
|
||||||
|
or (params.get('actionId') or '').strip()
|
||||||
|
or (params.get('id') or '').strip()
|
||||||
|
)
|
||||||
|
state = self.card_state.get(out_track_id)
|
||||||
|
if state is None:
|
||||||
|
await self.logger.warning(f'DingTalk: card action received for unknown out_track_id={out_track_id}')
|
||||||
|
return
|
||||||
|
if not raw_action_id:
|
||||||
|
await self.logger.warning(f'DingTalk: card action with no action_id, payload={payload}')
|
||||||
|
return
|
||||||
|
|
||||||
|
actions = state.get('actions', []) or []
|
||||||
|
action_id = raw_action_id
|
||||||
|
action_title = raw_action_id
|
||||||
|
for action in actions:
|
||||||
|
if str(action.get('id', '')) == raw_action_id:
|
||||||
|
action_title = action.get('title') or raw_action_id
|
||||||
|
break
|
||||||
|
|
||||||
|
launcher_type = (
|
||||||
|
provider_session.LauncherTypes.GROUP
|
||||||
|
if state.get('launcher_type') == provider_session.LauncherTypes.GROUP.value
|
||||||
|
else provider_session.LauncherTypes.PERSON
|
||||||
|
)
|
||||||
|
launcher_id = state.get('launcher_id', '')
|
||||||
|
sender_user_id = state.get('sender_user_id') or payload.get('user_id') or launcher_id
|
||||||
|
|
||||||
|
form_action_data = {
|
||||||
|
'form_token': state.get('form_token', ''),
|
||||||
|
'workflow_run_id': state.get('workflow_run_id', ''),
|
||||||
|
'action_id': action_id,
|
||||||
|
'action_title': action_title,
|
||||||
|
'node_title': state.get('node_title', ''),
|
||||||
|
'user': f'{launcher_type.value}_{launcher_id}',
|
||||||
|
'inputs': {},
|
||||||
|
}
|
||||||
|
|
||||||
|
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
|
||||||
|
|
||||||
|
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||||
|
synthetic_event = platform_events.GroupMessage(
|
||||||
|
sender=platform_entities.GroupMember(
|
||||||
|
id=sender_user_id,
|
||||||
|
member_name='',
|
||||||
|
permission=platform_entities.Permission.Member,
|
||||||
|
group=platform_entities.Group(
|
||||||
|
id=launcher_id,
|
||||||
|
name='',
|
||||||
|
permission=platform_entities.Permission.Member,
|
||||||
|
),
|
||||||
|
special_title='',
|
||||||
|
),
|
||||||
|
message_chain=message_chain,
|
||||||
|
time=int(datetime.datetime.now().timestamp()),
|
||||||
|
source_platform_object=None,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
synthetic_event = platform_events.FriendMessage(
|
||||||
|
sender=platform_entities.Friend(
|
||||||
|
id=sender_user_id,
|
||||||
|
nickname='',
|
||||||
|
remark='',
|
||||||
|
),
|
||||||
|
message_chain=message_chain,
|
||||||
|
time=int(datetime.datetime.now().timestamp()),
|
||||||
|
source_platform_object=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
bot_uuid = ''
|
||||||
|
pipeline_uuid = None
|
||||||
|
if self.ap is not None:
|
||||||
|
for bot in self.ap.platform_mgr.bots:
|
||||||
|
if bot.adapter is self:
|
||||||
|
bot_uuid = bot.bot_entity.uuid
|
||||||
|
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.ap.logger.info(
|
||||||
|
f'DingTalk _on_card_action enqueuing form action: action_id={action_id!r} '
|
||||||
|
f'action_title={action_title!r} launcher_type={launcher_type.value} '
|
||||||
|
f'launcher_id={launcher_id} bot_uuid={bot_uuid} pipeline_uuid={pipeline_uuid}'
|
||||||
|
)
|
||||||
|
await self.ap.query_pool.add_query(
|
||||||
|
bot_uuid=bot_uuid,
|
||||||
|
launcher_type=launcher_type,
|
||||||
|
launcher_id=launcher_id,
|
||||||
|
sender_id=sender_user_id,
|
||||||
|
message_event=synthetic_event,
|
||||||
|
message_chain=message_chain,
|
||||||
|
adapter=self,
|
||||||
|
pipeline_uuid=pipeline_uuid,
|
||||||
|
variables={
|
||||||
|
'_dify_form_action': form_action_data,
|
||||||
|
'_routed_by_rule': True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
self.ap.logger.info('DingTalk _on_card_action: query enqueued OK')
|
||||||
|
except Exception:
|
||||||
|
self.ap.logger.exception('DingTalk: enqueue form action query failed')
|
||||||
|
return
|
||||||
|
|
||||||
|
# Visual feedback: collapse the form card to a "已选择" notice so
|
||||||
|
# the user knows the click registered while the workflow resumes.
|
||||||
|
asyncio.create_task(
|
||||||
|
self._mark_card_resolved(
|
||||||
|
out_track_id,
|
||||||
|
action_title,
|
||||||
|
node_title=state.get('node_title', ''),
|
||||||
|
form_content=state.get('form_content', ''),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Once consumed, drop the state — the runner clears _PENDING_FORMS too.
|
||||||
|
self.card_state.pop(out_track_id, None)
|
||||||
|
|
||||||
|
async def _mark_card_resolved(
|
||||||
|
self,
|
||||||
|
out_track_id: str,
|
||||||
|
action_title: str,
|
||||||
|
*,
|
||||||
|
node_title: str = '',
|
||||||
|
form_content: str = '',
|
||||||
|
) -> None:
|
||||||
|
"""Update the form card to acknowledge the user's selection.
|
||||||
|
|
||||||
|
We rewrite the card content with the original prompt + a green tick
|
||||||
|
marker, and explicitly clear ``btns`` so the buttons are removed
|
||||||
|
once chosen. ``flowStatus`` is re-sent because some DingTalk clients
|
||||||
|
treat the PUT update as a partial *replace* of cardParamMap rather
|
||||||
|
than a merge — without it, the AICardContainer status containers
|
||||||
|
would all gate to ``gone`` and the whole card would blank out.
|
||||||
|
"""
|
||||||
|
parts: list[str] = []
|
||||||
|
if node_title:
|
||||||
|
parts.append(f'**{node_title}**')
|
||||||
|
if form_content:
|
||||||
|
parts.append(form_content)
|
||||||
|
parts.append(f'---\n✅ 已选择:**{action_title}**')
|
||||||
|
content = '\n\n'.join(parts)
|
||||||
|
if self.ap is not None:
|
||||||
|
self.ap.logger.info(f'DingTalk _mark_card_resolved: out_track_id={out_track_id} action={action_title!r}')
|
||||||
|
try:
|
||||||
|
await self.bot.update_card_data(
|
||||||
|
out_track_id=out_track_id,
|
||||||
|
card_param_map={
|
||||||
|
'content': content,
|
||||||
|
'btns': '[]',
|
||||||
|
'flowStatus': '3',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'DingTalk: update form card after click failed: {traceback.format_exc()}')
|
||||||
|
|||||||
@@ -103,6 +103,18 @@ spec:
|
|||||||
type: string
|
type: string
|
||||||
required: true
|
required: true
|
||||||
default: "填写你的卡片template_id"
|
default: "填写你的卡片template_id"
|
||||||
|
- name: human_input_card_template_id
|
||||||
|
label:
|
||||||
|
en_US: Human Input Card Template ID
|
||||||
|
zh_Hans: 人工输入卡片模板ID
|
||||||
|
zh_Hant: 人工輸入卡片範本ID
|
||||||
|
description:
|
||||||
|
en_US: "Template ID used as the SINGLE card for the whole conversation turn. Streamed LLM text fills the `content` markdown variable; on a Dify human-input pause the `btns` buttonGroup variable is populated so action buttons appear on the SAME card; after the user clicks a button the buttons disappear and resumed streaming continues into the same card. Use the bundled `src/langbot/templates/dingtalk_human_input_card.json` — it ships with `content` (MarkdownBlock) and `btns` (ButtonGroup) already wired. Leave empty to fall back to the legacy two-card behaviour (chat card streaming text + plain-text human-input prompts)."
|
||||||
|
zh_Hans: "用作整个对话回合**唯一**卡片的模板ID。流式 LLM 文本写入 `content` markdown 变量;Dify 人工输入暂停时同一张卡的 `btns` buttonGroup 变量被填上、按钮浮现;用户点击后按钮消失、恢复的流式内容继续追加到同一张卡。可使用项目附带的 `src/langbot/templates/dingtalk_human_input_card.json`——已经预先连好 `content` (MarkdownBlock) 与 `btns` (ButtonGroup)。留空则降级为旧的双卡行为(聊天卡走流式 + 人工输入走纯文本)。"
|
||||||
|
zh_Hant: "用作整個對話回合**唯一**卡片的範本ID。流式 LLM 文字寫入 `content` markdown 變數;Dify 人工輸入暫停時同一張卡的 `btns` buttonGroup 變數被填上、按鈕浮現;使用者點擊後按鈕消失、恢復的流式內容繼續追加到同一張卡。可使用專案附帶的 `src/langbot/templates/dingtalk_human_input_card.json`——已經預先連好 `content` (MarkdownBlock) 與 `btns` (ButtonGroup)。留空則降級為舊的雙卡行為(聊天卡走流式 + 人工輸入走純文字)。"
|
||||||
|
type: string
|
||||||
|
required: false
|
||||||
|
default: ""
|
||||||
execution:
|
execution:
|
||||||
python:
|
python:
|
||||||
path: ./dingtalk.py
|
path: ./dingtalk.py
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
|||||||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||||||
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
||||||
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
|
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
|
||||||
|
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||||
|
|
||||||
|
|
||||||
class AESCipher(object):
|
class AESCipher(object):
|
||||||
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
|
|||||||
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||||
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
|
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
|
||||||
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
|
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
|
||||||
|
ap: typing.Any = pydantic.Field(exclude=True, default=None)
|
||||||
|
|
||||||
bot_account_id: str # 用于在流水线中识别at是否是本bot,直接以bot_name作为标识
|
bot_account_id: str # 用于在流水线中识别at是否是本bot,直接以bot_name作为标识
|
||||||
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
|
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
|
||||||
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
pending_monitoring_msg: dict[str, str]
|
pending_monitoring_msg: dict[str, str]
|
||||||
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
|
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
|
||||||
reply_to_monitoring_msg: dict[str, tuple[str, float]]
|
reply_to_monitoring_msg: dict[str, tuple[str, float]]
|
||||||
|
reply_message_card_ids: dict[str, str]
|
||||||
|
card_sequence_dict: dict[str, int]
|
||||||
|
# card_id → set of source message ids registered against it (for cleanup)
|
||||||
|
card_id_to_source_ids: dict[str, set[str]]
|
||||||
|
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
|
||||||
|
card_streaming_text: dict[str, str]
|
||||||
|
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
|
||||||
|
card_pre_pause_text: dict[str, str]
|
||||||
|
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
|
||||||
|
card_resume_transitioned: set[str]
|
||||||
_MONITORING_MAPPING_TTL = 600 # 10 minutes
|
_MONITORING_MAPPING_TTL = 600 # 10 minutes
|
||||||
|
|
||||||
seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识
|
seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识
|
||||||
@@ -812,11 +824,134 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
|
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
|
||||||
asyncio.create_task(on_message(event))
|
asyncio.create_task(on_message(event))
|
||||||
|
|
||||||
|
def schedule_on_app_loop(coro):
|
||||||
|
"""Run a coroutine on the application event loop from sync callbacks."""
|
||||||
|
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
|
||||||
|
|
||||||
def sync_on_card_action(event):
|
def sync_on_card_action(event):
|
||||||
try:
|
try:
|
||||||
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {})
|
action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
|
||||||
|
# Parse JSON string values (from form action buttons)
|
||||||
|
if isinstance(action_value_raw, str):
|
||||||
|
try:
|
||||||
|
action_value_obj = json.loads(action_value_raw)
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
action_value_obj = {}
|
||||||
|
else:
|
||||||
|
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
|
||||||
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
|
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
|
||||||
|
|
||||||
|
# Handle Dify form action button clicks
|
||||||
|
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
|
||||||
|
form_token = action_value_obj.get('form_token', '')
|
||||||
|
workflow_run_id = action_value_obj.get('workflow_run_id', '')
|
||||||
|
action_id = action_value_obj.get('action_id', '')
|
||||||
|
session_key = action_value_obj.get('session_key', '')
|
||||||
|
|
||||||
|
if session_key.startswith('group_') or session_key.startswith('g:'):
|
||||||
|
launcher_type = provider_session.LauncherTypes.GROUP
|
||||||
|
launcher_id = (
|
||||||
|
session_key.split(':', 1)[1]
|
||||||
|
if session_key.startswith('g:')
|
||||||
|
else session_key[len('group_') :]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
launcher_type = provider_session.LauncherTypes.PERSON
|
||||||
|
launcher_id = (
|
||||||
|
session_key.split(':', 1)[1]
|
||||||
|
if session_key.startswith('p:')
|
||||||
|
else session_key[len('person_') :]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find the bot entity to get bot_uuid and pipeline_uuid
|
||||||
|
bot_uuid = ''
|
||||||
|
pipeline_uuid = None
|
||||||
|
for bot in self.ap.platform_mgr.bots:
|
||||||
|
if bot.adapter is self:
|
||||||
|
bot_uuid = bot.bot_entity.uuid
|
||||||
|
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
|
||||||
|
break
|
||||||
|
|
||||||
|
form_action_data = {
|
||||||
|
'form_token': form_token,
|
||||||
|
'workflow_run_id': workflow_run_id,
|
||||||
|
'action_id': action_id,
|
||||||
|
'user': f'{launcher_type.value}_{launcher_id}',
|
||||||
|
'inputs': {},
|
||||||
|
}
|
||||||
|
|
||||||
|
context = getattr(event.event, 'context', None)
|
||||||
|
open_message_id = getattr(context, 'open_message_id', None)
|
||||||
|
source_time = datetime.datetime.now()
|
||||||
|
event_time = source_time.timestamp()
|
||||||
|
action_text = action_value_obj.get('action_id', 'confirm')
|
||||||
|
message_chain = platform_message.MessageChain(
|
||||||
|
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
|
||||||
|
)
|
||||||
|
if open_message_id:
|
||||||
|
message_chain.insert(
|
||||||
|
0,
|
||||||
|
platform_message.Source(
|
||||||
|
id=open_message_id,
|
||||||
|
time=source_time,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
operator = getattr(event.event, 'operator', None)
|
||||||
|
user_id = (
|
||||||
|
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||||
|
synthetic_event = platform_events.GroupMessage(
|
||||||
|
sender=platform_entities.GroupMember(
|
||||||
|
id=user_id,
|
||||||
|
member_name='',
|
||||||
|
permission=platform_entities.Permission.Member,
|
||||||
|
group=platform_entities.Group(
|
||||||
|
id=launcher_id,
|
||||||
|
name='',
|
||||||
|
permission=platform_entities.Permission.Member,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
message_chain=message_chain,
|
||||||
|
time=event_time,
|
||||||
|
source_platform_object=event,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
synthetic_event = platform_events.FriendMessage(
|
||||||
|
sender=platform_entities.Friend(
|
||||||
|
id=user_id,
|
||||||
|
nickname='',
|
||||||
|
remark='',
|
||||||
|
),
|
||||||
|
message_chain=message_chain,
|
||||||
|
time=event_time,
|
||||||
|
source_platform_object=event,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def add_form_action_query():
|
||||||
|
await self.ap.query_pool.add_query(
|
||||||
|
bot_uuid=bot_uuid,
|
||||||
|
launcher_type=launcher_type,
|
||||||
|
launcher_id=launcher_id,
|
||||||
|
sender_id=user_id,
|
||||||
|
message_event=synthetic_event,
|
||||||
|
message_chain=message_chain,
|
||||||
|
adapter=self,
|
||||||
|
pipeline_uuid=pipeline_uuid,
|
||||||
|
variables={
|
||||||
|
'_dify_form_action': form_action_data,
|
||||||
|
'_routed_by_rule': True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
schedule_on_app_loop(add_form_action_query())
|
||||||
|
|
||||||
|
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||||
|
|
||||||
|
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
|
||||||
|
|
||||||
if action_value == '有帮助':
|
if action_value == '有帮助':
|
||||||
feedback_type = 1
|
feedback_type = 1
|
||||||
elif action_value == '无帮助':
|
elif action_value == '无帮助':
|
||||||
@@ -857,17 +992,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if platform_events.FeedbackEvent in self.listeners:
|
if platform_events.FeedbackEvent in self.listeners:
|
||||||
loop = asyncio.get_event_loop()
|
schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||||
if loop.is_running():
|
|
||||||
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
|
||||||
else:
|
|
||||||
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
|
||||||
|
|
||||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||||
|
|
||||||
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
|
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
|
||||||
except Exception:
|
except Exception:
|
||||||
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
|
traceback.print_exc()
|
||||||
|
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
|
||||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||||
|
|
||||||
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
|
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
|
||||||
@@ -893,6 +1025,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
card_id_dict={},
|
card_id_dict={},
|
||||||
pending_monitoring_msg={},
|
pending_monitoring_msg={},
|
||||||
reply_to_monitoring_msg={},
|
reply_to_monitoring_msg={},
|
||||||
|
reply_message_card_ids={},
|
||||||
|
card_sequence_dict={},
|
||||||
|
card_id_to_source_ids={},
|
||||||
|
card_streaming_text={},
|
||||||
|
card_pre_pause_text={},
|
||||||
|
card_resume_transitioned=set(),
|
||||||
seq=1,
|
seq=1,
|
||||||
listeners={},
|
listeners={},
|
||||||
quart_app=quart_app,
|
quart_app=quart_app,
|
||||||
@@ -1132,6 +1270,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
for k in expired:
|
for k in expired:
|
||||||
del self.reply_to_monitoring_msg[k]
|
del self.reply_to_monitoring_msg[k]
|
||||||
|
|
||||||
|
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
|
||||||
|
"""Return the next strictly increasing sequence for a card update."""
|
||||||
|
current = self.card_sequence_dict.get(card_id, 0)
|
||||||
|
next_seq = max(current + 1, suggested)
|
||||||
|
self.card_sequence_dict[card_id] = next_seq
|
||||||
|
return next_seq
|
||||||
|
|
||||||
|
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
|
||||||
|
"""Register a card_id under one or more source message ids."""
|
||||||
|
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
|
||||||
|
for sid in source_ids:
|
||||||
|
if not sid:
|
||||||
|
continue
|
||||||
|
self.reply_message_card_ids[sid] = card_id
|
||||||
|
bucket.add(sid)
|
||||||
|
|
||||||
|
def _drop_card_state(self, card_id: str) -> None:
|
||||||
|
"""Pop all per-card state for the given card_id."""
|
||||||
|
if not card_id:
|
||||||
|
return
|
||||||
|
for sid in self.card_id_to_source_ids.pop(card_id, set()):
|
||||||
|
self.reply_message_card_ids.pop(sid, None)
|
||||||
|
self.card_sequence_dict.pop(card_id, None)
|
||||||
|
self.card_streaming_text.pop(card_id, None)
|
||||||
|
self.card_pre_pause_text.pop(card_id, None)
|
||||||
|
self.card_resume_transitioned.discard(card_id)
|
||||||
|
|
||||||
async def create_card_id(self, message_id):
|
async def create_card_id(self, message_id):
|
||||||
try:
|
try:
|
||||||
# self.logger.debug('飞书支持stream输出,创建卡片......')
|
# self.logger.debug('飞书支持stream输出,创建卡片......')
|
||||||
@@ -1327,6 +1492,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
self.card_id_dict[message_id] = response.data.card_id
|
self.card_id_dict[message_id] = response.data.card_id
|
||||||
|
|
||||||
card_id = response.data.card_id
|
card_id = response.data.card_id
|
||||||
|
self.card_sequence_dict[card_id] = 0
|
||||||
return card_id
|
return card_id
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -1339,6 +1505,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
"""
|
"""
|
||||||
# message_id = event.message_chain.message_id
|
# message_id = event.message_chain.message_id
|
||||||
|
|
||||||
|
source_message_id = str(event.message_chain.message_id)
|
||||||
|
existing_card_id = self.reply_message_card_ids.get(source_message_id)
|
||||||
|
if existing_card_id:
|
||||||
|
self.card_id_dict[message_id] = existing_card_id
|
||||||
|
return True
|
||||||
|
|
||||||
card_id = await self.create_card_id(message_id)
|
card_id = await self.create_card_id(message_id)
|
||||||
content = {
|
content = {
|
||||||
'type': 'card',
|
'type': 'card',
|
||||||
@@ -1377,6 +1549,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
user_msg_id = event.message_chain.message_id
|
user_msg_id = event.message_chain.message_id
|
||||||
reply_msg_id = getattr(response.data, 'message_id', None)
|
reply_msg_id = getattr(response.data, 'message_id', None)
|
||||||
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
|
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
|
||||||
|
# Register the card under both the user-incoming msg id (so a
|
||||||
|
# second reply_message_first_chunk for the same user message
|
||||||
|
# reuses this card) AND the bot-reply msg id (so a synthetic
|
||||||
|
# event from a form-button callback — whose Source.id equals
|
||||||
|
# the bot's card message id — hits the same card and renders
|
||||||
|
# the resume content into it).
|
||||||
|
if reply_msg_id:
|
||||||
|
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
|
||||||
|
else:
|
||||||
|
self._register_card_for_source(card_id, str(user_msg_id))
|
||||||
if reply_msg_id and monitoring_msg_id:
|
if reply_msg_id and monitoring_msg_id:
|
||||||
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
|
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
|
||||||
self._cleanup_monitoring_mapping()
|
self._cleanup_monitoring_mapping()
|
||||||
@@ -1385,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
async def _open_new_form_card(
|
||||||
|
self,
|
||||||
|
message_id: str,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
form_data: dict,
|
||||||
|
) -> str | None:
|
||||||
|
"""Spawn a fresh card to host a re-paused human-input prompt.
|
||||||
|
|
||||||
|
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
|
||||||
|
replies it to the current incoming message so it appears as the next
|
||||||
|
step in the chat, registers the new reply_msg_id so subsequent button
|
||||||
|
callbacks resolve back to it, and renders the prompt + buttons on it.
|
||||||
|
|
||||||
|
Returns the new card_id, or ``None`` if creation failed (caller is
|
||||||
|
responsible for falling back to in-place update so the workflow
|
||||||
|
remains continuable).
|
||||||
|
"""
|
||||||
|
source_message_id = getattr(message_source.message_chain, 'message_id', None)
|
||||||
|
if not source_message_id:
|
||||||
|
await self.logger.error('Cannot open new form card: source message_id missing')
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
new_card_id = await self.create_card_id(message_id)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
tenant_key = (
|
||||||
|
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
|
||||||
|
)
|
||||||
|
app_access_token = self.get_app_access_token()
|
||||||
|
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||||
|
req_opt: RequestOption = (
|
||||||
|
RequestOption.builder()
|
||||||
|
.app_ticket(self.app_ticket)
|
||||||
|
.tenant_key(tenant_key)
|
||||||
|
.app_access_token(app_access_token)
|
||||||
|
.tenant_access_token(tenant_access_token)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
|
||||||
|
content = {
|
||||||
|
'type': 'card',
|
||||||
|
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
|
||||||
|
}
|
||||||
|
request: ReplyMessageRequest = (
|
||||||
|
ReplyMessageRequest.builder()
|
||||||
|
.message_id(str(source_message_id))
|
||||||
|
.request_body(
|
||||||
|
ReplyMessageRequestBody.builder()
|
||||||
|
.content(json.dumps(content))
|
||||||
|
.msg_type('interactive')
|
||||||
|
.uuid(str(uuid.uuid4()))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
if not response.success():
|
||||||
|
await self.logger.error(
|
||||||
|
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
|
||||||
|
f'log_id={response.get_log_id()}'
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
reply_msg_id = getattr(response.data, 'message_id', None)
|
||||||
|
if reply_msg_id:
|
||||||
|
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
|
||||||
|
|
||||||
|
sequence = self._next_card_sequence(new_card_id, 1)
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=new_card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message='',
|
||||||
|
sequence=sequence,
|
||||||
|
form_data=form_data,
|
||||||
|
show_form_prompt=True,
|
||||||
|
)
|
||||||
|
return new_card_id
|
||||||
|
|
||||||
async def reply_message(
|
async def reply_message(
|
||||||
self,
|
self,
|
||||||
message_source: platform_events.MessageEvent,
|
message_source: platform_events.MessageEvent,
|
||||||
@@ -1504,11 +1773,38 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
回复消息变成更新卡片消息
|
回复消息变成更新卡片消息
|
||||||
|
|
||||||
|
Supports Dify form-action resume: when the runner yields a chunk with
|
||||||
|
``_resume_from_form=True``, the card transitions from buttons to a
|
||||||
|
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
|
||||||
|
for subsequent resume chunks to stream into.
|
||||||
|
|
||||||
|
When ``_open_new_card=True`` on the final chunk, the existing card is
|
||||||
|
left as-is and the pipeline will create a new card (with fresh form
|
||||||
|
buttons) for the re-pause.
|
||||||
"""
|
"""
|
||||||
# self.seq += 1
|
|
||||||
message_id = bot_message.resp_message_id
|
message_id = bot_message.resp_message_id
|
||||||
msg_seq = bot_message.msg_sequence
|
msg_seq = bot_message.msg_sequence
|
||||||
if msg_seq % 8 == 0 or is_final:
|
|
||||||
|
form_data = getattr(bot_message, '_form_data', None)
|
||||||
|
resume_from = getattr(bot_message, '_resume_from_form', False)
|
||||||
|
action_title = getattr(bot_message, '_resume_action_title', '')
|
||||||
|
resume_node_title = getattr(bot_message, '_resume_node_title', '')
|
||||||
|
open_new_card = getattr(bot_message, '_open_new_card', False)
|
||||||
|
if action_title:
|
||||||
|
if resume_node_title:
|
||||||
|
selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
|
||||||
|
else:
|
||||||
|
selected_notice = f'**已选择**:{action_title}'
|
||||||
|
else:
|
||||||
|
selected_notice = ''
|
||||||
|
|
||||||
|
# ── decide whether this chunk needs a card update ────────────────────
|
||||||
|
card_id = self.card_id_dict.get(message_id)
|
||||||
|
if not card_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
# ── convert message chain → text ─────────────────────────────────────
|
||||||
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
|
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
|
||||||
|
|
||||||
text_message = ''
|
text_message = ''
|
||||||
@@ -1520,33 +1816,8 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
parts.append(para_text)
|
parts.append(para_text)
|
||||||
text_message = '\n\n'.join(parts)
|
text_message = '\n\n'.join(parts)
|
||||||
|
|
||||||
# content = {
|
|
||||||
# 'type': 'card_json',
|
|
||||||
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}},
|
|
||||||
# }
|
|
||||||
|
|
||||||
request: ContentCardElementRequest = (
|
|
||||||
ContentCardElementRequest.builder()
|
|
||||||
.card_id(self.card_id_dict[message_id])
|
|
||||||
.element_id('streaming_txt')
|
|
||||||
.request_body(
|
|
||||||
ContentCardElementRequestBody.builder()
|
|
||||||
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204")
|
|
||||||
.content(text_message)
|
|
||||||
.sequence(msg_seq)
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
|
|
||||||
if is_final and bot_message.tool_calls is None:
|
|
||||||
# self.seq = 1 # 消息回复结束之后重置seq
|
|
||||||
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片
|
|
||||||
|
|
||||||
tenant_key = (
|
tenant_key = (
|
||||||
message_source.source_platform_object.header.tenant_key
|
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
|
||||||
if message_source.source_platform_object
|
|
||||||
else None
|
|
||||||
)
|
)
|
||||||
app_access_token = self.get_app_access_token()
|
app_access_token = self.get_app_access_token()
|
||||||
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||||
@@ -1558,17 +1829,143 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
.tenant_access_token(tenant_access_token)
|
.tenant_access_token(tenant_access_token)
|
||||||
.build()
|
.build()
|
||||||
)
|
)
|
||||||
# 发起请求
|
|
||||||
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
|
|
||||||
|
|
||||||
# 处理失败返回
|
card_sequence = self._next_card_sequence(card_id, msg_seq)
|
||||||
if not response.success():
|
|
||||||
raise Exception(
|
# ── RESUME: first chunk after button click ───────────────────────────
|
||||||
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
|
if resume_from and card_id not in self.card_resume_transitioned:
|
||||||
|
# Transition the card from the form state into resume mode.
|
||||||
|
# Preserve the text that was shown before the pause, and seed the
|
||||||
|
# resume placeholder with the current resume content if we already
|
||||||
|
# have any on the first yielded chunk.
|
||||||
|
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
|
||||||
|
initial_resume_text = text_message or '\u200b'
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=pre_pause_text,
|
||||||
|
sequence=card_sequence,
|
||||||
|
form_data=None,
|
||||||
|
notice_text=selected_notice,
|
||||||
|
resume_placeholder_text=initial_resume_text,
|
||||||
)
|
)
|
||||||
|
self.card_resume_transitioned.add(card_id)
|
||||||
|
self.card_pre_pause_text[card_id] = pre_pause_text
|
||||||
|
self.card_streaming_text[card_id] = text_message
|
||||||
|
if not is_final:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Send media messages when streaming is done
|
# ── RESUME: subsequent chunks → full card update ─────────────────────
|
||||||
|
if resume_from and card_id in self.card_resume_transitioned:
|
||||||
|
cached = self.card_streaming_text.get(card_id, '')
|
||||||
|
if text_message != cached:
|
||||||
|
self.card_streaming_text[card_id] = text_message
|
||||||
|
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=pre_pause_text,
|
||||||
|
sequence=card_sequence,
|
||||||
|
form_data=None,
|
||||||
|
notice_text=selected_notice,
|
||||||
|
resume_placeholder_text=text_message,
|
||||||
|
)
|
||||||
|
if not is_final:
|
||||||
|
return
|
||||||
|
|
||||||
|
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
|
||||||
|
if not resume_from and (msg_seq % 8 == 0 or is_final):
|
||||||
|
cached = self.card_streaming_text.get(card_id)
|
||||||
|
if text_message != cached:
|
||||||
|
self.card_streaming_text[card_id] = text_message
|
||||||
|
request: ContentCardElementRequest = (
|
||||||
|
ContentCardElementRequest.builder()
|
||||||
|
.card_id(card_id)
|
||||||
|
.element_id('streaming_txt')
|
||||||
|
.request_body(
|
||||||
|
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
|
||||||
|
)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
|
||||||
|
request, req_opt
|
||||||
|
)
|
||||||
|
if not response.success():
|
||||||
|
raise Exception(
|
||||||
|
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
|
||||||
|
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
|
||||||
|
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── FINAL chunk: full card layout update ─────────────────────────────
|
||||||
|
if is_final:
|
||||||
|
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
|
||||||
|
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
|
||||||
|
resume_cached = self.card_streaming_text.get(card_id, '')
|
||||||
|
if form_data:
|
||||||
|
if open_new_card:
|
||||||
|
# The old card has already been laid out into resume mode
|
||||||
|
# by the resume-transition block above (notice + resume
|
||||||
|
# placeholder). Finalise it as a frozen step snapshot and
|
||||||
|
# spawn a brand-new card to host the next human-input
|
||||||
|
# prompt — each step stays visible as its own card in the
|
||||||
|
# chat history.
|
||||||
|
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
|
||||||
|
if new_card_id is None:
|
||||||
|
# Fallback: keep the existing in-place behaviour so the
|
||||||
|
# workflow remains continuable even if creating the
|
||||||
|
# new card failed.
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=pre_pause,
|
||||||
|
sequence=final_seq,
|
||||||
|
form_data=form_data,
|
||||||
|
resume_placeholder_text=resume_cached,
|
||||||
|
show_form_prompt=True,
|
||||||
|
)
|
||||||
|
self.card_streaming_text.pop(card_id, None)
|
||||||
|
self.card_pre_pause_text.pop(card_id, None)
|
||||||
|
else:
|
||||||
|
# The old card is now a frozen snapshot; let go of its
|
||||||
|
# streaming-side state but keep its source registrations
|
||||||
|
# intact (no _drop_card_state) so historical button
|
||||||
|
# callbacks aimed at it can still be matched if needed.
|
||||||
|
self.card_streaming_text.pop(card_id, None)
|
||||||
|
self.card_pre_pause_text.pop(card_id, None)
|
||||||
|
self.card_resume_transitioned.discard(card_id)
|
||||||
|
else:
|
||||||
|
# Initial pause path: render prompt + buttons in place on
|
||||||
|
# the current card.
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=text_message,
|
||||||
|
sequence=final_seq,
|
||||||
|
form_data=form_data,
|
||||||
|
show_form_prompt=True,
|
||||||
|
)
|
||||||
|
# The human-input prompt itself is rendered as buttons only
|
||||||
|
# on Lark, so do not keep the hidden fallback text around;
|
||||||
|
# otherwise it will resurface after the button click.
|
||||||
|
self.card_streaming_text[card_id] = ''
|
||||||
|
self.card_pre_pause_text[card_id] = ''
|
||||||
|
else:
|
||||||
|
# Normal finish: keep pre-pause + resume content visible,
|
||||||
|
# remove buttons/notice, drop the resume placeholder.
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=pre_pause,
|
||||||
|
sequence=final_seq,
|
||||||
|
form_data=None,
|
||||||
|
notice_text=selected_notice if resume_from else '',
|
||||||
|
resume_placeholder_text=resume_cached,
|
||||||
|
)
|
||||||
|
self._drop_card_state(card_id)
|
||||||
|
self.card_id_dict.pop(message_id, None)
|
||||||
|
|
||||||
|
# ── media (images / files) appended at the end ───────────────────────
|
||||||
if is_final and media_items:
|
if is_final and media_items:
|
||||||
for media in media_items:
|
for media in media_items:
|
||||||
media_request: ReplyMessageRequest = (
|
media_request: ReplyMessageRequest = (
|
||||||
@@ -1592,6 +1989,365 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
|
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def _add_form_buttons_to_card(
|
||||||
|
self,
|
||||||
|
card_id: str,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
form_data: dict,
|
||||||
|
text_message: str = '',
|
||||||
|
sequence: int = 1,
|
||||||
|
):
|
||||||
|
"""Update the entire card to include form action buttons.
|
||||||
|
|
||||||
|
Uses card.aupdate to replace the card JSON with a template that
|
||||||
|
includes the streaming text content plus interactive buttons.
|
||||||
|
"""
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=text_message,
|
||||||
|
sequence=sequence,
|
||||||
|
form_data=form_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _remove_form_buttons_from_card(
|
||||||
|
self,
|
||||||
|
card_id: str,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
text_message: str = '',
|
||||||
|
sequence: int = 1,
|
||||||
|
):
|
||||||
|
"""Replace the human-input card layout with the plain final layout."""
|
||||||
|
await self._update_card_layout(
|
||||||
|
card_id=card_id,
|
||||||
|
message_source=message_source,
|
||||||
|
text_message=text_message,
|
||||||
|
sequence=sequence,
|
||||||
|
form_data=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _update_card_layout(
|
||||||
|
self,
|
||||||
|
card_id: str,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
text_message: str = '',
|
||||||
|
sequence: int = 1,
|
||||||
|
form_data: dict | None = None,
|
||||||
|
notice_text: str = '',
|
||||||
|
resume_placeholder_text: str = '',
|
||||||
|
show_form_prompt: bool = True,
|
||||||
|
):
|
||||||
|
"""Update the entire card layout.
|
||||||
|
|
||||||
|
• form_data → show interactive buttons (initial Dify pause)
|
||||||
|
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
|
||||||
|
• resume_placeholder_text → add a streaming_txt_resume markdown element
|
||||||
|
"""
|
||||||
|
form_data = form_data or {}
|
||||||
|
actions = form_data.get('actions', [])
|
||||||
|
form_token = form_data.get('form_token', '')
|
||||||
|
workflow_run_id = form_data.get('workflow_run_id', '')
|
||||||
|
node_title = form_data.get('node_title', '') or 'Human Input Required'
|
||||||
|
form_content = form_data.get('form_content', '')
|
||||||
|
|
||||||
|
# When form_data is set, the visible content is rendered inside the
|
||||||
|
# interactive container, so the top streaming text should stay empty
|
||||||
|
# to avoid duplicate text above the action area.
|
||||||
|
#
|
||||||
|
# For resume notice state, keep the existing text visible in the card
|
||||||
|
# and only add the grey "selected" notice below it.
|
||||||
|
if form_data:
|
||||||
|
render_text_message = ''
|
||||||
|
else:
|
||||||
|
render_text_message = text_message
|
||||||
|
|
||||||
|
# Determine session key from message source
|
||||||
|
if isinstance(message_source, platform_events.GroupMessage):
|
||||||
|
session_key = f'group_{message_source.group.id}'
|
||||||
|
else:
|
||||||
|
session_key = f'person_{message_source.sender.id}'
|
||||||
|
|
||||||
|
# Build button elements matching the existing card template's thumbsup/down format
|
||||||
|
action_buttons = []
|
||||||
|
for action in actions:
|
||||||
|
action_id = action.get('id', '')
|
||||||
|
action_title = action.get('title', action_id)
|
||||||
|
button_style = action.get('button_style', 'default')
|
||||||
|
|
||||||
|
if button_style == 'primary':
|
||||||
|
lark_button_type = 'primary'
|
||||||
|
elif button_style == 'danger':
|
||||||
|
lark_button_type = 'danger'
|
||||||
|
else:
|
||||||
|
lark_button_type = 'default'
|
||||||
|
|
||||||
|
action_buttons.append(
|
||||||
|
{
|
||||||
|
'tag': 'button',
|
||||||
|
'text': {'tag': 'plain_text', 'content': action_title},
|
||||||
|
'type': lark_button_type,
|
||||||
|
'width': 'fill',
|
||||||
|
'size': 'medium',
|
||||||
|
'hover_tips': {'tag': 'plain_text', 'content': action_title},
|
||||||
|
'behaviors': [
|
||||||
|
{
|
||||||
|
'type': 'callback',
|
||||||
|
'value': {
|
||||||
|
'form_action': True,
|
||||||
|
'form_token': form_token,
|
||||||
|
'workflow_run_id': workflow_run_id,
|
||||||
|
'action_id': action_id,
|
||||||
|
'session_key': session_key,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
interactive_elements = []
|
||||||
|
if form_data:
|
||||||
|
if show_form_prompt:
|
||||||
|
interactive_elements = [
|
||||||
|
{
|
||||||
|
'tag': 'markdown',
|
||||||
|
'content': f'**[Human Input Required] {node_title}**',
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_size': 'normal',
|
||||||
|
'margin': '0px 0px 4px 0px',
|
||||||
|
}
|
||||||
|
]
|
||||||
|
if form_content:
|
||||||
|
interactive_elements.append(
|
||||||
|
{
|
||||||
|
'tag': 'markdown',
|
||||||
|
'content': form_content,
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_size': 'normal',
|
||||||
|
'margin': '0px 0px 8px 0px',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
interactive_elements.append(
|
||||||
|
{
|
||||||
|
'tag': 'column_set',
|
||||||
|
'horizontal_spacing': '8px',
|
||||||
|
'horizontal_align': 'left',
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
'columns': [
|
||||||
|
{
|
||||||
|
'tag': 'column',
|
||||||
|
'width': 'weighted',
|
||||||
|
'elements': [btn],
|
||||||
|
'padding': '0px 0px 0px 0px',
|
||||||
|
}
|
||||||
|
for btn in action_buttons
|
||||||
|
],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Build the full card JSON with buttons, same structure as create_card_id
|
||||||
|
# ── mid_section: either form buttons, resume notice, or empty ──
|
||||||
|
mid_section_elements = []
|
||||||
|
if form_data:
|
||||||
|
mid_section_elements = [
|
||||||
|
{
|
||||||
|
'tag': 'interactive_container',
|
||||||
|
'margin': '12px 0px 8px 0px',
|
||||||
|
'padding': '12px 12px 12px 12px',
|
||||||
|
'has_border': True,
|
||||||
|
'elements': interactive_elements,
|
||||||
|
},
|
||||||
|
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
|
||||||
|
]
|
||||||
|
elif notice_text:
|
||||||
|
mid_section_elements = [
|
||||||
|
{
|
||||||
|
'tag': 'markdown',
|
||||||
|
'content': notice_text,
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_size': 'normal',
|
||||||
|
'margin': '8px 0px 4px 0px',
|
||||||
|
'text_color': 'grey',
|
||||||
|
},
|
||||||
|
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
|
||||||
|
]
|
||||||
|
|
||||||
|
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
|
||||||
|
resume_elements = []
|
||||||
|
if resume_placeholder_text:
|
||||||
|
resume_elements = [
|
||||||
|
{
|
||||||
|
'tag': 'markdown',
|
||||||
|
'content': resume_placeholder_text,
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_size': 'normal',
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
'element_id': 'streaming_txt_resume',
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
card_data = {
|
||||||
|
'schema': '2.0',
|
||||||
|
'config': {
|
||||||
|
'update_multi': True,
|
||||||
|
'streaming_mode': False,
|
||||||
|
},
|
||||||
|
'body': {
|
||||||
|
'direction': 'vertical',
|
||||||
|
'padding': '12px 12px 12px 12px',
|
||||||
|
'elements': [
|
||||||
|
{
|
||||||
|
'tag': 'div',
|
||||||
|
'text': {
|
||||||
|
'tag': 'plain_text',
|
||||||
|
'content': 'LangBot',
|
||||||
|
'text_size': 'normal',
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_color': 'default',
|
||||||
|
},
|
||||||
|
'icon': {
|
||||||
|
'tag': 'custom_icon',
|
||||||
|
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'tag': 'markdown',
|
||||||
|
'content': render_text_message,
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_size': 'normal',
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
'element_id': 'streaming_txt',
|
||||||
|
},
|
||||||
|
*mid_section_elements,
|
||||||
|
*resume_elements,
|
||||||
|
{
|
||||||
|
'tag': 'column_set',
|
||||||
|
'horizontal_spacing': '12px',
|
||||||
|
'horizontal_align': 'right',
|
||||||
|
'columns': [
|
||||||
|
{
|
||||||
|
'tag': 'column',
|
||||||
|
'width': 'weighted',
|
||||||
|
'elements': [
|
||||||
|
{
|
||||||
|
'tag': 'markdown',
|
||||||
|
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
|
||||||
|
'text_align': 'left',
|
||||||
|
'text_size': 'notation',
|
||||||
|
'margin': '4px 0px 0px 0px',
|
||||||
|
'icon': {
|
||||||
|
'tag': 'standard_icon',
|
||||||
|
'token': 'robot_outlined',
|
||||||
|
'color': 'grey',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
'padding': '0px 0px 0px 0px',
|
||||||
|
'direction': 'vertical',
|
||||||
|
'horizontal_spacing': '8px',
|
||||||
|
'vertical_spacing': '8px',
|
||||||
|
'horizontal_align': 'left',
|
||||||
|
'vertical_align': 'top',
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
'weight': 1,
|
||||||
|
},
|
||||||
|
*(
|
||||||
|
[]
|
||||||
|
if form_data
|
||||||
|
else [
|
||||||
|
{
|
||||||
|
'tag': 'column',
|
||||||
|
'width': '20px',
|
||||||
|
'elements': [
|
||||||
|
{
|
||||||
|
'tag': 'button',
|
||||||
|
'text': {'tag': 'plain_text', 'content': ''},
|
||||||
|
'type': 'text',
|
||||||
|
'width': 'fill',
|
||||||
|
'size': 'medium',
|
||||||
|
'icon': {'tag': 'standard_icon', 'token': 'thumbsup_outlined'},
|
||||||
|
'hover_tips': {'tag': 'plain_text', 'content': '有帮助'},
|
||||||
|
'behaviors': [{'type': 'callback', 'value': {'feedback': '有帮助'}}],
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
}
|
||||||
|
],
|
||||||
|
'padding': '0px 0px 0px 0px',
|
||||||
|
'direction': 'vertical',
|
||||||
|
'horizontal_spacing': '8px',
|
||||||
|
'vertical_spacing': '8px',
|
||||||
|
'horizontal_align': 'left',
|
||||||
|
'vertical_align': 'top',
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'tag': 'column',
|
||||||
|
'width': '30px',
|
||||||
|
'elements': [
|
||||||
|
{
|
||||||
|
'tag': 'button',
|
||||||
|
'text': {'tag': 'plain_text', 'content': ''},
|
||||||
|
'type': 'text',
|
||||||
|
'width': 'default',
|
||||||
|
'size': 'medium',
|
||||||
|
'icon': {'tag': 'standard_icon', 'token': 'thumbdown_outlined'},
|
||||||
|
'hover_tips': {'tag': 'plain_text', 'content': '无帮助'},
|
||||||
|
'behaviors': [{'type': 'callback', 'value': {'feedback': '无帮助'}}],
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
}
|
||||||
|
],
|
||||||
|
'padding': '0px 0px 0px 0px',
|
||||||
|
'vertical_spacing': '8px',
|
||||||
|
'horizontal_align': 'left',
|
||||||
|
'vertical_align': 'top',
|
||||||
|
'margin': '0px 0px 0px 0px',
|
||||||
|
},
|
||||||
|
]
|
||||||
|
),
|
||||||
|
],
|
||||||
|
'margin': '0px 0px 4px 0px',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
tenant_key = (
|
||||||
|
message_source.source_platform_object.header.tenant_key
|
||||||
|
if message_source.source_platform_object
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
app_access_token = self.get_app_access_token()
|
||||||
|
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||||
|
req_opt: RequestOption = (
|
||||||
|
RequestOption.builder()
|
||||||
|
.app_ticket(self.app_ticket)
|
||||||
|
.tenant_key(tenant_key)
|
||||||
|
.app_access_token(app_access_token)
|
||||||
|
.tenant_access_token(tenant_access_token)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
|
||||||
|
request: UpdateCardRequest = (
|
||||||
|
UpdateCardRequest.builder()
|
||||||
|
.card_id(card_id)
|
||||||
|
.request_body(
|
||||||
|
UpdateCardRequestBody.builder()
|
||||||
|
.sequence(sequence)
|
||||||
|
.uuid(str(uuid.uuid4()))
|
||||||
|
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
|
||||||
|
if not response.success():
|
||||||
|
await self.logger.error(
|
||||||
|
f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
|
||||||
|
f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
|
||||||
|
|
||||||
async def is_muted(self, group_id: int) -> bool:
|
async def is_muted(self, group_id: int) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
import telegram
|
import telegram
|
||||||
import telegram.ext
|
import telegram.ext
|
||||||
from telegram import Update
|
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||||
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
|
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
|
||||||
import telegramify_markdown
|
import telegramify_markdown
|
||||||
import typing
|
import typing
|
||||||
import traceback
|
import traceback
|
||||||
|
import json
|
||||||
import base64
|
import base64
|
||||||
import pydantic
|
import pydantic
|
||||||
|
|
||||||
@@ -189,6 +189,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
|
|||||||
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||||
bot: telegram.Bot = pydantic.Field(exclude=True)
|
bot: telegram.Bot = pydantic.Field(exclude=True)
|
||||||
application: telegram.ext.Application = pydantic.Field(exclude=True)
|
application: telegram.ext.Application = pydantic.Field(exclude=True)
|
||||||
|
ap: typing.Any = pydantic.Field(exclude=True, default=None)
|
||||||
|
|
||||||
message_converter: TelegramMessageConverter = TelegramMessageConverter()
|
message_converter: TelegramMessageConverter = TelegramMessageConverter()
|
||||||
event_converter: TelegramEventConverter = TelegramEventConverter()
|
event_converter: TelegramEventConverter = TelegramEventConverter()
|
||||||
@@ -224,6 +225,102 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
telegram_callback,
|
telegram_callback,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||||
|
query = update.callback_query
|
||||||
|
await query.answer()
|
||||||
|
try:
|
||||||
|
data = json.loads(query.data)
|
||||||
|
if data.get('form_action') or data.get('f'):
|
||||||
|
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||||
|
|
||||||
|
workflow_run_id = data.get('workflow_run_id', '')
|
||||||
|
w_suffix = data.get('w', '')
|
||||||
|
action_id = data.get('action_id') or data.get('a', '')
|
||||||
|
session_key = data.get('session_key') or data.get('s', '')
|
||||||
|
|
||||||
|
if session_key.startswith('group_') or session_key.startswith('g:'):
|
||||||
|
launcher_type = provider_session.LauncherTypes.GROUP
|
||||||
|
launcher_id = (
|
||||||
|
session_key.split(':', 1)[1]
|
||||||
|
if session_key.startswith('g:')
|
||||||
|
else session_key[len('group_') :]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
launcher_type = provider_session.LauncherTypes.PERSON
|
||||||
|
launcher_id = (
|
||||||
|
session_key.split(':', 1)[1]
|
||||||
|
if session_key.startswith('p:')
|
||||||
|
else session_key[len('person_') :]
|
||||||
|
)
|
||||||
|
|
||||||
|
user_id = str(query.from_user.id)
|
||||||
|
|
||||||
|
# Find bot_uuid and pipeline_uuid
|
||||||
|
bot_uuid = ''
|
||||||
|
pipeline_uuid = None
|
||||||
|
for b in self.ap.platform_mgr.bots:
|
||||||
|
if b.adapter is self:
|
||||||
|
bot_uuid = b.bot_entity.uuid
|
||||||
|
pipeline_uuid = b.bot_entity.use_pipeline_uuid
|
||||||
|
break
|
||||||
|
|
||||||
|
form_action_data = {
|
||||||
|
'workflow_run_id': workflow_run_id,
|
||||||
|
'w_suffix': w_suffix,
|
||||||
|
'action_id': action_id,
|
||||||
|
'user': f'{launcher_type.value}_{launcher_id}',
|
||||||
|
'inputs': {},
|
||||||
|
}
|
||||||
|
|
||||||
|
message_chain = platform_message.MessageChain(
|
||||||
|
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
|
||||||
|
)
|
||||||
|
|
||||||
|
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||||
|
synthetic_event = platform_events.GroupMessage(
|
||||||
|
sender=platform_entities.GroupMember(
|
||||||
|
id=user_id,
|
||||||
|
member_name='',
|
||||||
|
permission=platform_entities.Permission.Member,
|
||||||
|
group=platform_entities.Group(
|
||||||
|
id=launcher_id,
|
||||||
|
name='',
|
||||||
|
permission=platform_entities.Permission.Member,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
message_chain=message_chain,
|
||||||
|
source_platform_object=update,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
synthetic_event = platform_events.FriendMessage(
|
||||||
|
sender=platform_entities.Friend(
|
||||||
|
id=user_id,
|
||||||
|
nickname='',
|
||||||
|
remark='',
|
||||||
|
),
|
||||||
|
message_chain=message_chain,
|
||||||
|
source_platform_object=update,
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.ap.query_pool.add_query(
|
||||||
|
bot_uuid=bot_uuid,
|
||||||
|
launcher_type=launcher_type,
|
||||||
|
launcher_id=launcher_id,
|
||||||
|
sender_id=user_id,
|
||||||
|
message_event=synthetic_event,
|
||||||
|
message_chain=message_chain,
|
||||||
|
adapter=self,
|
||||||
|
pipeline_uuid=pipeline_uuid,
|
||||||
|
variables={
|
||||||
|
'_dify_form_action': form_action_data,
|
||||||
|
'_routed_by_rule': True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
|
||||||
|
|
||||||
|
application.add_handler(CallbackQueryHandler(callback_query_handler))
|
||||||
super().__init__(
|
super().__init__(
|
||||||
config=config,
|
config=config,
|
||||||
logger=logger,
|
logger=logger,
|
||||||
@@ -319,14 +416,19 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
update = event.source_platform_object
|
update = event.source_platform_object
|
||||||
chat_id = update.effective_chat.id
|
chat_id = update.effective_chat.id
|
||||||
chat_type = update.effective_chat.type
|
chat_type = update.effective_chat.type
|
||||||
message_thread_id = update.message.message_thread_id
|
effective_message = update.effective_message
|
||||||
|
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||||
|
|
||||||
if chat_type == 'private':
|
if chat_type == 'private':
|
||||||
draft_id = int(time.time() * 1000)
|
import time as _time
|
||||||
self.msg_stream_id[message_id] = ('private', draft_id)
|
|
||||||
|
|
||||||
|
draft_id = int(_time.time() * 1000)
|
||||||
|
self.msg_stream_id[message_id] = ('private', draft_id)
|
||||||
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
|
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
|
||||||
|
try:
|
||||||
await self.bot.send_message_draft(**args)
|
await self.bot.send_message_draft(**args)
|
||||||
|
except (telegram.error.RetryAfter, telegram.error.BadRequest):
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
|
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
|
||||||
send_msg = await self.bot.send_message(**args)
|
send_msg = await self.bot.send_message(**args)
|
||||||
@@ -347,12 +449,13 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
assert isinstance(message_source.source_platform_object, Update)
|
assert isinstance(message_source.source_platform_object, Update)
|
||||||
update = message_source.source_platform_object
|
update = message_source.source_platform_object
|
||||||
chat_id = update.effective_chat.id
|
chat_id = update.effective_chat.id
|
||||||
message_thread_id = update.message.message_thread_id
|
effective_message = update.effective_message
|
||||||
|
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||||
|
|
||||||
if message_id not in self.msg_stream_id:
|
if message_id not in self.msg_stream_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
chat_mode, draft_id = self.msg_stream_id[message_id]
|
chat_mode, stream_id = self.msg_stream_id[message_id]
|
||||||
components = await TelegramMessageConverter.yiri2target(message, self.bot)
|
components = await TelegramMessageConverter.yiri2target(message, self.bot)
|
||||||
|
|
||||||
if not components or components[0]['type'] != 'text':
|
if not components or components[0]['type'] != 'text':
|
||||||
@@ -361,16 +464,42 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
return
|
return
|
||||||
|
|
||||||
content = components[0]['text']
|
content = components[0]['text']
|
||||||
|
form_data = getattr(bot_message, '_form_data', None)
|
||||||
|
|
||||||
|
if form_data and is_final:
|
||||||
|
self.msg_stream_id.pop(message_id, None)
|
||||||
|
await self._send_form_action_buttons(message_source, form_data)
|
||||||
|
return
|
||||||
|
|
||||||
if chat_mode == 'private':
|
if chat_mode == 'private':
|
||||||
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id)
|
# Streaming via draft (ephemeral preview in the chat input area)
|
||||||
|
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||||
|
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=stream_id)
|
||||||
|
try:
|
||||||
await self.bot.send_message_draft(**args)
|
await self.bot.send_message_draft(**args)
|
||||||
|
except telegram.error.BadRequest as exc:
|
||||||
|
if 'Message_too_long' in str(exc):
|
||||||
|
args['text'] = content[:4000] + '\n\n… (truncated)'
|
||||||
|
try:
|
||||||
|
await self.bot.send_message_draft(**args)
|
||||||
|
except telegram.error.RetryAfter:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
pass # Ignore other draft errors (cosmetic)
|
||||||
if is_final and bot_message.tool_calls is None:
|
if is_final and bot_message.tool_calls is None:
|
||||||
del args['draft_id']
|
# Finalise: send the real message, discard the draft
|
||||||
|
args = self._build_message_args(chat_id, content, message_thread_id)
|
||||||
|
try:
|
||||||
await self.bot.send_message(**args)
|
await self.bot.send_message(**args)
|
||||||
|
except telegram.error.BadRequest as exc:
|
||||||
|
if 'Message_too_long' in str(exc):
|
||||||
|
args['text'] = content[:4000] + '\n\n… (truncated)'
|
||||||
|
await self.bot.send_message(**args)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
self.msg_stream_id.pop(message_id)
|
self.msg_stream_id.pop(message_id)
|
||||||
else:
|
else:
|
||||||
stream_id = draft_id
|
# Streaming via edit_message_text (persistent message)
|
||||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||||
args = {
|
args = {
|
||||||
'message_id': stream_id,
|
'message_id': stream_id,
|
||||||
@@ -379,11 +508,68 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
|||||||
}
|
}
|
||||||
if self.config.get('markdown_card', False):
|
if self.config.get('markdown_card', False):
|
||||||
args['parse_mode'] = 'MarkdownV2'
|
args['parse_mode'] = 'MarkdownV2'
|
||||||
|
try:
|
||||||
await self.bot.edit_message_text(**args)
|
await self.bot.edit_message_text(**args)
|
||||||
|
except telegram.error.BadRequest as exc:
|
||||||
|
if 'Message_too_long' in str(exc):
|
||||||
|
args['text'] = self._process_markdown(content[:4000] + '\n\n… (truncated)')
|
||||||
|
await self.bot.edit_message_text(**args)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
if is_final and bot_message.tool_calls is None:
|
if is_final and bot_message.tool_calls is None:
|
||||||
self.msg_stream_id.pop(message_id)
|
self.msg_stream_id.pop(message_id)
|
||||||
|
|
||||||
|
async def _send_form_action_buttons(
|
||||||
|
self,
|
||||||
|
message_source: platform_events.MessageEvent,
|
||||||
|
form_data: dict,
|
||||||
|
):
|
||||||
|
"""Send inline keyboard buttons for Dify human_input_required form actions."""
|
||||||
|
actions = form_data.get('actions', [])
|
||||||
|
node_title = form_data.get('node_title', '')
|
||||||
|
form_content = form_data.get('form_content', '')
|
||||||
|
workflow_run_id = form_data.get('workflow_run_id', '')
|
||||||
|
# Telegram callback_data is capped at 64 bytes, so we identify the
|
||||||
|
# paused workflow by the last 8 chars of workflow_run_id (unique
|
||||||
|
# within a session with overwhelming probability).
|
||||||
|
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
|
||||||
|
|
||||||
|
if isinstance(message_source, platform_events.GroupMessage):
|
||||||
|
session_key = f'g:{message_source.group.id}'
|
||||||
|
else:
|
||||||
|
session_key = f'p:{message_source.sender.id}'
|
||||||
|
|
||||||
|
keyboard = []
|
||||||
|
for action in actions:
|
||||||
|
action_id = action.get('id', '')
|
||||||
|
action_title = action.get('title', action_id)
|
||||||
|
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
|
||||||
|
if w_suffix:
|
||||||
|
callback_payload['w'] = w_suffix
|
||||||
|
callback_data = json.dumps(callback_payload, separators=(',', ':'))
|
||||||
|
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
|
||||||
|
|
||||||
|
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||||
|
|
||||||
|
update = message_source.source_platform_object
|
||||||
|
chat_id = update.effective_chat.id
|
||||||
|
effective_message = update.effective_message
|
||||||
|
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||||
|
|
||||||
|
text_lines = [f'[{node_title}] Please select an action:']
|
||||||
|
if form_content:
|
||||||
|
text_lines.insert(0, form_content)
|
||||||
|
args = {
|
||||||
|
'chat_id': chat_id,
|
||||||
|
'text': '\n\n'.join(text_lines),
|
||||||
|
'reply_markup': reply_markup,
|
||||||
|
}
|
||||||
|
if message_thread_id:
|
||||||
|
args['message_thread_id'] = message_thread_id
|
||||||
|
|
||||||
|
await self.bot.send_message(**args)
|
||||||
|
|
||||||
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
|
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
|
||||||
if not isinstance(event.source_platform_object, Update):
|
if not isinstance(event.source_platform_object, Update):
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -2,9 +2,11 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import typing
|
import typing
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
import base64
|
import base64
|
||||||
import mimetypes
|
import mimetypes
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
|
||||||
from langbot.pkg.provider import runner
|
from langbot.pkg.provider import runner
|
||||||
@@ -16,6 +18,125 @@ from langbot.libs.dify_service_api.v1 import client, errors
|
|||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level store for paused-workflow form state, keyed by session key
|
||||||
|
# (launcher_type_value + "_" + launcher_id). Each session holds an
|
||||||
|
# insertion-ordered dict of form_token -> form_data, allowing multiple
|
||||||
|
# Dify workflows to be paused simultaneously for the same session.
|
||||||
|
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
|
||||||
|
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
|
||||||
|
|
||||||
|
|
||||||
|
def _session_key_from_query(query: pipeline_query.Query) -> str:
|
||||||
|
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
|
||||||
|
|
||||||
|
|
||||||
|
def _prune_pending_forms(now: float | None = None) -> None:
|
||||||
|
if now is None:
|
||||||
|
now = time.time()
|
||||||
|
for session_key in list(_PENDING_FORMS.keys()):
|
||||||
|
forms = _PENDING_FORMS[session_key]
|
||||||
|
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
|
||||||
|
for token in expired_tokens:
|
||||||
|
forms.pop(token, None)
|
||||||
|
if not forms:
|
||||||
|
_PENDING_FORMS.pop(session_key, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
|
||||||
|
_prune_pending_forms()
|
||||||
|
stored = dict(form_data)
|
||||||
|
expiration_time = stored.get('expiration_time')
|
||||||
|
try:
|
||||||
|
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
expiration_ts = 0.0
|
||||||
|
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
|
||||||
|
form_token = str(stored.get('form_token') or '')
|
||||||
|
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
|
||||||
|
# Re-insert at the end so this becomes the "latest" entry
|
||||||
|
forms.pop(form_token, None)
|
||||||
|
forms[form_token] = stored
|
||||||
|
|
||||||
|
|
||||||
|
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
|
||||||
|
_prune_pending_forms()
|
||||||
|
forms = _PENDING_FORMS.get(session_key)
|
||||||
|
if not forms or not form_token:
|
||||||
|
return None
|
||||||
|
return forms.get(form_token)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
|
||||||
|
"""Look up a pending form whose workflow_run_id ends with the given suffix.
|
||||||
|
|
||||||
|
Used by adapters (e.g. Telegram) whose callback payload is too small to
|
||||||
|
carry the full form_token / workflow_run_id.
|
||||||
|
"""
|
||||||
|
_prune_pending_forms()
|
||||||
|
forms = _PENDING_FORMS.get(session_key)
|
||||||
|
if not forms or not w_suffix:
|
||||||
|
return None
|
||||||
|
for token in reversed(forms):
|
||||||
|
form = forms[token]
|
||||||
|
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
|
||||||
|
return form
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
|
||||||
|
_prune_pending_forms()
|
||||||
|
forms = _PENDING_FORMS.get(session_key)
|
||||||
|
if not forms:
|
||||||
|
return None
|
||||||
|
return forms[next(reversed(forms))]
|
||||||
|
|
||||||
|
|
||||||
|
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
|
||||||
|
"""Iterate pending forms for a session, newest-first."""
|
||||||
|
_prune_pending_forms()
|
||||||
|
forms = _PENDING_FORMS.get(session_key)
|
||||||
|
if not forms:
|
||||||
|
return
|
||||||
|
for token in reversed(list(forms.keys())):
|
||||||
|
yield forms[token]
|
||||||
|
|
||||||
|
|
||||||
|
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
|
||||||
|
"""Clear one specific pending form (by token) or all forms for the session."""
|
||||||
|
forms = _PENDING_FORMS.get(session_key)
|
||||||
|
if not forms:
|
||||||
|
return
|
||||||
|
if form_token is None:
|
||||||
|
_PENDING_FORMS.pop(session_key, None)
|
||||||
|
return
|
||||||
|
forms.pop(form_token, None)
|
||||||
|
if not forms:
|
||||||
|
_PENDING_FORMS.pop(session_key, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _format_human_input_text(
|
||||||
|
node_title: str,
|
||||||
|
form_content: str,
|
||||||
|
actions: list[dict[str, typing.Any]],
|
||||||
|
) -> str:
|
||||||
|
"""Render a paused-workflow human-input prompt as plain text.
|
||||||
|
|
||||||
|
Used by adapters without rich UI (no buttons/cards) so users can reply
|
||||||
|
with the option number or the option title to resume the workflow.
|
||||||
|
"""
|
||||||
|
lines: list[str] = [f'[Human Input Required] {node_title or ""}'.rstrip()]
|
||||||
|
if form_content:
|
||||||
|
lines.append('')
|
||||||
|
lines.append(form_content)
|
||||||
|
if actions:
|
||||||
|
lines.append('')
|
||||||
|
lines.append('Reply with the number or title to continue:')
|
||||||
|
for idx, action in enumerate(actions, start=1):
|
||||||
|
title = action.get('title') or action.get('id') or ''
|
||||||
|
lines.append(f' {idx}. {title}')
|
||||||
|
return '\n'.join(lines)
|
||||||
|
|
||||||
|
|
||||||
@runner.runner_class('dify-service-api')
|
@runner.runner_class('dify-service-api')
|
||||||
class DifyServiceAPIRunner(runner.RequestRunner):
|
class DifyServiceAPIRunner(runner.RequestRunner):
|
||||||
"""Dify Service API 对话请求器"""
|
"""Dify Service API 对话请求器"""
|
||||||
@@ -335,11 +456,155 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
|
|
||||||
query.session.using_conversation.uuid = chunk['conversation_id']
|
query.session.using_conversation.uuid = chunk['conversation_id']
|
||||||
|
|
||||||
|
async def _submit_workflow_form_blocking(
|
||||||
|
self, form_action: dict
|
||||||
|
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||||
|
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
|
||||||
|
|
||||||
|
form_token = form_action['form_token']
|
||||||
|
workflow_run_id = form_action['workflow_run_id']
|
||||||
|
user = form_action['user']
|
||||||
|
action_id = form_action.get('action_id', '')
|
||||||
|
inputs = form_action.get('inputs', {})
|
||||||
|
|
||||||
|
async for chunk in self.dify_client.workflow_submit(
|
||||||
|
form_token=form_token,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
inputs=inputs,
|
||||||
|
user=user,
|
||||||
|
action=action_id,
|
||||||
|
timeout=120,
|
||||||
|
):
|
||||||
|
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
|
||||||
|
|
||||||
|
if chunk['event'] == 'workflow_finished':
|
||||||
|
if chunk['data'].get('error'):
|
||||||
|
raise errors.DifyAPIError(chunk['data']['error'])
|
||||||
|
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
||||||
|
yield provider_message.Message(
|
||||||
|
role='assistant',
|
||||||
|
content=content,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
|
||||||
|
"""Locate the pending form this action targets.
|
||||||
|
|
||||||
|
Tries identifiers in order of specificity: form_token, full
|
||||||
|
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
|
||||||
|
then falls back to the newest pending form for the session.
|
||||||
|
"""
|
||||||
|
form_token = form_action.get('form_token')
|
||||||
|
if form_token:
|
||||||
|
form = _get_pending_form_by_token(session_key, form_token)
|
||||||
|
if form:
|
||||||
|
return form
|
||||||
|
|
||||||
|
workflow_run_id = form_action.get('workflow_run_id')
|
||||||
|
if workflow_run_id:
|
||||||
|
for form in _iter_pending_forms(session_key):
|
||||||
|
if form.get('workflow_run_id') == workflow_run_id:
|
||||||
|
return form
|
||||||
|
|
||||||
|
w_suffix = form_action.get('w_suffix')
|
||||||
|
if w_suffix:
|
||||||
|
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
|
||||||
|
if form:
|
||||||
|
return form
|
||||||
|
|
||||||
|
return _get_latest_pending_form(session_key)
|
||||||
|
|
||||||
|
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
|
||||||
|
"""Backfill resume fields from the matching pending form."""
|
||||||
|
if not form_action:
|
||||||
|
return None
|
||||||
|
|
||||||
|
merged_action = dict(form_action)
|
||||||
|
merged_action.pop('w_suffix', None)
|
||||||
|
pending_form = self._resolve_pending_form(session_key, form_action)
|
||||||
|
if pending_form:
|
||||||
|
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
|
||||||
|
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
|
||||||
|
'workflow_run_id', ''
|
||||||
|
)
|
||||||
|
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
|
||||||
|
merged_action.setdefault('user', pending_form.get('user', ''))
|
||||||
|
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
|
||||||
|
|
||||||
|
# Resolve clicked action's display title from the stored actions list
|
||||||
|
if 'action_title' not in merged_action:
|
||||||
|
clicked_id = merged_action.get('action_id', '')
|
||||||
|
for action in pending_form.get('actions', []):
|
||||||
|
if str(action.get('id', '')) == str(clicked_id):
|
||||||
|
merged_action['action_title'] = action.get('title', clicked_id)
|
||||||
|
break
|
||||||
|
|
||||||
|
return merged_action
|
||||||
|
|
||||||
|
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
|
||||||
|
"""Match plain text replies against pending Dify form actions.
|
||||||
|
|
||||||
|
Resolution order:
|
||||||
|
1. A pure digit reply (e.g. "1", "2") maps to the 1-indexed action of
|
||||||
|
the most recent pending form. Lets users on plain-text platforms
|
||||||
|
pick options without retyping titles.
|
||||||
|
2. Otherwise, iterate pending forms newest-first and match each
|
||||||
|
action's title/id case-insensitively. The first hit wins, so when
|
||||||
|
two forms share a button label the newer one resolves.
|
||||||
|
"""
|
||||||
|
normalized_text = user_text.strip().lower()
|
||||||
|
if not normalized_text:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _build(pending_form: dict, action: dict) -> dict:
|
||||||
|
return {
|
||||||
|
'form_token': pending_form.get('form_token', ''),
|
||||||
|
'workflow_run_id': pending_form.get('workflow_run_id', ''),
|
||||||
|
'action_id': action.get('id', ''),
|
||||||
|
'action_title': action.get('title', action.get('id', '')),
|
||||||
|
'node_title': pending_form.get('node_title', ''),
|
||||||
|
'inputs': pending_form.get('inputs', {}),
|
||||||
|
'user': pending_form.get('user', ''),
|
||||||
|
}
|
||||||
|
|
||||||
|
if normalized_text.isdigit():
|
||||||
|
position = int(normalized_text)
|
||||||
|
latest_form = _get_latest_pending_form(session_key)
|
||||||
|
if latest_form is not None:
|
||||||
|
actions = latest_form.get('actions', [])
|
||||||
|
if 1 <= position <= len(actions):
|
||||||
|
return _build(latest_form, actions[position - 1])
|
||||||
|
|
||||||
|
for pending_form in _iter_pending_forms(session_key):
|
||||||
|
for action in pending_form.get('actions', []):
|
||||||
|
titles = {
|
||||||
|
str(action.get('title', '')).strip().lower(),
|
||||||
|
str(action.get('id', '')).strip().lower(),
|
||||||
|
}
|
||||||
|
if normalized_text in titles:
|
||||||
|
return _build(pending_form, action)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
async def _workflow_messages(
|
async def _workflow_messages(
|
||||||
self, query: pipeline_query.Query
|
self, query: pipeline_query.Query
|
||||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||||
"""调用工作流"""
|
"""调用工作流"""
|
||||||
|
|
||||||
|
# Check if this is a form action resume (button click or text match)
|
||||||
|
form_action_raw = query.variables.get('_dify_form_action')
|
||||||
|
session_key = _session_key_from_query(query)
|
||||||
|
|
||||||
|
if form_action_raw:
|
||||||
|
form_action = self._merge_pending_form_action(session_key, form_action_raw)
|
||||||
|
else:
|
||||||
|
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
|
||||||
|
|
||||||
|
if form_action:
|
||||||
|
_clear_pending_form(session_key, form_action.get('form_token') or None)
|
||||||
|
async for msg in self._submit_workflow_form_blocking(form_action):
|
||||||
|
yield msg
|
||||||
|
return
|
||||||
|
|
||||||
if not query.session.using_conversation.uuid:
|
if not query.session.using_conversation.uuid:
|
||||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||||
|
|
||||||
@@ -366,6 +631,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
}
|
}
|
||||||
|
|
||||||
inputs.update(query.variables)
|
inputs.update(query.variables)
|
||||||
|
human_input_yielded = False
|
||||||
|
|
||||||
async for chunk in self.dify_client.workflow_run(
|
async for chunk in self.dify_client.workflow_run(
|
||||||
inputs=inputs,
|
inputs=inputs,
|
||||||
@@ -377,6 +643,45 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
if chunk['event'] in ignored_events:
|
if chunk['event'] in ignored_events:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if chunk['event'] == 'workflow_paused':
|
||||||
|
reasons = chunk['data'].get('reasons', [])
|
||||||
|
workflow_run_id = chunk['data'].get('workflow_run_id', '')
|
||||||
|
for reason in reasons:
|
||||||
|
if reason.get('TYPE') == 'human_input_required':
|
||||||
|
form_content = reason.get('form_content', '')
|
||||||
|
actions = reason.get('actions', [])
|
||||||
|
node_title = reason.get('node_title', '')
|
||||||
|
|
||||||
|
_set_pending_form(
|
||||||
|
_session_key_from_query(query),
|
||||||
|
{
|
||||||
|
'workflow_run_id': workflow_run_id,
|
||||||
|
'form_id': reason.get('form_id'),
|
||||||
|
'form_token': reason.get('form_token'),
|
||||||
|
'node_id': reason.get('node_id'),
|
||||||
|
'node_title': node_title,
|
||||||
|
'form_content': form_content,
|
||||||
|
'inputs': reason.get('inputs', {}),
|
||||||
|
'actions': actions,
|
||||||
|
'expiration_time': reason.get('expiration_time'),
|
||||||
|
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
query.variables['_dify_form_render'] = {
|
||||||
|
'form_content': form_content,
|
||||||
|
'actions': actions,
|
||||||
|
'node_title': node_title,
|
||||||
|
}
|
||||||
|
|
||||||
|
display_text = _format_human_input_text(node_title, form_content, actions)
|
||||||
|
|
||||||
|
human_input_yielded = True
|
||||||
|
yield provider_message.Message(
|
||||||
|
role='assistant',
|
||||||
|
content=display_text,
|
||||||
|
)
|
||||||
|
|
||||||
if chunk['event'] == 'node_started':
|
if chunk['event'] == 'node_started':
|
||||||
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
|
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
|
||||||
continue
|
continue
|
||||||
@@ -399,6 +704,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
yield msg
|
yield msg
|
||||||
|
|
||||||
elif chunk['event'] == 'workflow_finished':
|
elif chunk['event'] == 'workflow_finished':
|
||||||
|
if human_input_yielded:
|
||||||
|
break
|
||||||
if chunk['data']['error']:
|
if chunk['data']['error']:
|
||||||
raise errors.DifyAPIError(chunk['data']['error'])
|
raise errors.DifyAPIError(chunk['data']['error'])
|
||||||
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
||||||
@@ -636,11 +943,153 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
|
|
||||||
query.session.using_conversation.uuid = chunk['conversation_id']
|
query.session.using_conversation.uuid = chunk['conversation_id']
|
||||||
|
|
||||||
|
async def _submit_workflow_form(
|
||||||
|
self, form_action: dict
|
||||||
|
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||||
|
"""Submit human input to resume a paused Dify workflow."""
|
||||||
|
|
||||||
|
form_token = form_action['form_token']
|
||||||
|
workflow_run_id = form_action['workflow_run_id']
|
||||||
|
user = form_action['user']
|
||||||
|
action_id = form_action.get('action_id', '')
|
||||||
|
action_title = form_action.get('action_title', '') or action_id
|
||||||
|
node_title = form_action.get('node_title', '')
|
||||||
|
inputs = form_action.get('inputs', {})
|
||||||
|
|
||||||
|
messsage_idx = 0
|
||||||
|
is_final = False
|
||||||
|
think_start = False
|
||||||
|
think_end = False
|
||||||
|
workflow_contents = ''
|
||||||
|
repause_form_data: dict | None = None
|
||||||
|
|
||||||
|
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
|
||||||
|
async for chunk in self.dify_client.workflow_submit(
|
||||||
|
form_token=form_token,
|
||||||
|
workflow_run_id=workflow_run_id,
|
||||||
|
inputs=inputs,
|
||||||
|
user=user,
|
||||||
|
action=action_id,
|
||||||
|
timeout=120,
|
||||||
|
):
|
||||||
|
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
|
||||||
|
|
||||||
|
yield_this_iteration = False
|
||||||
|
|
||||||
|
if chunk['event'] == 'workflow_finished':
|
||||||
|
is_final = True
|
||||||
|
yield_this_iteration = True
|
||||||
|
if chunk['data'].get('error'):
|
||||||
|
raise errors.DifyAPIError(chunk['data']['error'])
|
||||||
|
|
||||||
|
if chunk['event'] == 'workflow_paused':
|
||||||
|
reasons = chunk['data'].get('reasons', [])
|
||||||
|
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
|
||||||
|
for reason in reasons:
|
||||||
|
if reason.get('TYPE') != 'human_input_required':
|
||||||
|
continue
|
||||||
|
form_content = reason.get('form_content', '')
|
||||||
|
actions = reason.get('actions', [])
|
||||||
|
# Use a distinct name — `node_title` (the just-resolved step)
|
||||||
|
# must keep its value so the resume notice on the previous
|
||||||
|
# card still shows which step the user acted on.
|
||||||
|
paused_node_title = reason.get('node_title', '')
|
||||||
|
raw_inputs = reason.get('inputs', {})
|
||||||
|
|
||||||
|
_set_pending_form(
|
||||||
|
user,
|
||||||
|
{
|
||||||
|
'workflow_run_id': new_run_id,
|
||||||
|
'form_id': reason.get('form_id'),
|
||||||
|
'form_token': reason.get('form_token'),
|
||||||
|
'node_id': reason.get('node_id'),
|
||||||
|
'node_title': paused_node_title,
|
||||||
|
'form_content': form_content,
|
||||||
|
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
|
||||||
|
'actions': actions,
|
||||||
|
'expiration_time': reason.get('expiration_time'),
|
||||||
|
'user': user,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
repause_form_data = {
|
||||||
|
'form_content': form_content,
|
||||||
|
'actions': actions,
|
||||||
|
'node_title': paused_node_title,
|
||||||
|
'workflow_run_id': new_run_id,
|
||||||
|
'form_token': reason.get('form_token', ''),
|
||||||
|
}
|
||||||
|
# Ensure the final chunk has non-empty content so
|
||||||
|
# ResponseWrapper (which skips empty-content chunks) lets it
|
||||||
|
# propagate to SendResponseBackStage. Use a zero-width space
|
||||||
|
# so neither Lark nor Telegram renders visible noise — the
|
||||||
|
# adapter substitutes its own card text from _form_data.
|
||||||
|
if not workflow_contents:
|
||||||
|
workflow_contents = ''
|
||||||
|
is_final = True
|
||||||
|
yield_this_iteration = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if chunk['event'] == 'text_chunk':
|
||||||
|
messsage_idx += 1
|
||||||
|
if remove_think:
|
||||||
|
if '<think>' in chunk['data']['text'] and not think_start:
|
||||||
|
think_start = True
|
||||||
|
continue
|
||||||
|
if '</think>' in chunk['data']['text'] and not think_end:
|
||||||
|
import re
|
||||||
|
|
||||||
|
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
|
||||||
|
workflow_contents += content
|
||||||
|
think_end = True
|
||||||
|
elif think_end:
|
||||||
|
workflow_contents += chunk['data']['text']
|
||||||
|
if think_start:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
workflow_contents += chunk['data']['text']
|
||||||
|
if messsage_idx % 8 == 0:
|
||||||
|
yield_this_iteration = True
|
||||||
|
|
||||||
|
if yield_this_iteration:
|
||||||
|
msg = provider_message.MessageChunk(
|
||||||
|
role='assistant',
|
||||||
|
content=workflow_contents,
|
||||||
|
is_final=is_final,
|
||||||
|
)
|
||||||
|
msg._resume_from_form = True
|
||||||
|
if action_title:
|
||||||
|
msg._resume_action_title = action_title
|
||||||
|
if node_title:
|
||||||
|
msg._resume_node_title = node_title
|
||||||
|
if is_final and repause_form_data:
|
||||||
|
msg._form_data = repause_form_data
|
||||||
|
msg._open_new_card = True
|
||||||
|
yield msg
|
||||||
|
if is_final:
|
||||||
|
return
|
||||||
|
|
||||||
async def _workflow_messages_chunk(
|
async def _workflow_messages_chunk(
|
||||||
self, query: pipeline_query.Query
|
self, query: pipeline_query.Query
|
||||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||||
"""调用工作流"""
|
"""调用工作流"""
|
||||||
|
|
||||||
|
# Check if this is a form action resume (button click or text match)
|
||||||
|
form_action_raw = query.variables.get('_dify_form_action')
|
||||||
|
session_key = _session_key_from_query(query)
|
||||||
|
|
||||||
|
if form_action_raw:
|
||||||
|
form_action = self._merge_pending_form_action(session_key, form_action_raw)
|
||||||
|
else:
|
||||||
|
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
|
||||||
|
|
||||||
|
if form_action:
|
||||||
|
_clear_pending_form(session_key, form_action.get('form_token') or None)
|
||||||
|
# Resume paused workflow via submit endpoint
|
||||||
|
async for msg in self._submit_workflow_form(form_action):
|
||||||
|
yield msg
|
||||||
|
return
|
||||||
|
|
||||||
if not query.session.using_conversation.uuid:
|
if not query.session.using_conversation.uuid:
|
||||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||||
|
|
||||||
@@ -672,6 +1121,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
think_start = False
|
think_start = False
|
||||||
think_end = False
|
think_end = False
|
||||||
workflow_contents = ''
|
workflow_contents = ''
|
||||||
|
workflow_run_id = ''
|
||||||
|
human_input_yielded = False
|
||||||
|
|
||||||
|
# Saved form data to attach to the final MessageChunk so the adapter
|
||||||
|
# can detect it when is_final=True and render buttons.
|
||||||
|
pending_form_data = None
|
||||||
|
display_text = ''
|
||||||
|
|
||||||
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
|
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
|
||||||
async for chunk in self.dify_client.workflow_run(
|
async for chunk in self.dify_client.workflow_run(
|
||||||
@@ -682,7 +1138,61 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
):
|
):
|
||||||
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
|
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
|
||||||
if chunk['event'] in ignored_events:
|
if chunk['event'] in ignored_events:
|
||||||
|
if chunk['event'] == 'workflow_started':
|
||||||
|
workflow_run_id = chunk['data'].get('workflow_run_id', '')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if chunk['event'] == 'workflow_paused':
|
||||||
|
reasons = chunk['data'].get('reasons', [])
|
||||||
|
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
|
||||||
|
for reason in reasons:
|
||||||
|
if reason.get('TYPE') == 'human_input_required':
|
||||||
|
form_content = reason.get('form_content', '')
|
||||||
|
actions = reason.get('actions', [])
|
||||||
|
node_title = reason.get('node_title', '')
|
||||||
|
|
||||||
|
# Persist form state in module-level store keyed by session
|
||||||
|
raw_inputs = reason.get('inputs', {})
|
||||||
|
_set_pending_form(
|
||||||
|
_session_key_from_query(query),
|
||||||
|
{
|
||||||
|
'workflow_run_id': workflow_run_id,
|
||||||
|
'form_id': reason.get('form_id'),
|
||||||
|
'form_token': reason.get('form_token'),
|
||||||
|
'node_id': reason.get('node_id'),
|
||||||
|
'node_title': node_title,
|
||||||
|
'form_content': form_content,
|
||||||
|
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
|
||||||
|
'actions': actions,
|
||||||
|
'expiration_time': reason.get('expiration_time'),
|
||||||
|
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pass form render metadata to downstream stages
|
||||||
|
query.variables['_dify_form_render'] = {
|
||||||
|
'form_content': form_content,
|
||||||
|
'actions': actions,
|
||||||
|
'node_title': node_title,
|
||||||
|
}
|
||||||
|
|
||||||
|
display_text = _format_human_input_text(node_title, form_content, actions)
|
||||||
|
workflow_contents += display_text + '\n'
|
||||||
|
|
||||||
|
# Save form data to attach to the final chunk later.
|
||||||
|
# We do NOT yield here — the form content will be sent
|
||||||
|
# as the final MessageChunk (with is_final=True and
|
||||||
|
# _form_data) so the adapter can update the card and
|
||||||
|
# add buttons in one pass.
|
||||||
|
pending_form_data = {
|
||||||
|
'form_content': form_content,
|
||||||
|
'actions': actions,
|
||||||
|
'node_title': node_title,
|
||||||
|
'workflow_run_id': workflow_run_id,
|
||||||
|
'form_token': reason.get('form_token', ''),
|
||||||
|
}
|
||||||
|
human_input_yielded = True
|
||||||
|
|
||||||
if chunk['event'] == 'workflow_finished':
|
if chunk['event'] == 'workflow_finished':
|
||||||
is_final = True
|
is_final = True
|
||||||
if chunk['data']['error']:
|
if chunk['data']['error']:
|
||||||
@@ -730,11 +1240,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
|||||||
yield msg
|
yield msg
|
||||||
|
|
||||||
if messsage_idx % 8 == 0 or is_final:
|
if messsage_idx % 8 == 0 or is_final:
|
||||||
yield provider_message.MessageChunk(
|
final_content = workflow_contents if workflow_contents.strip() else ''
|
||||||
|
msg = provider_message.MessageChunk(
|
||||||
role='assistant',
|
role='assistant',
|
||||||
content=workflow_contents,
|
content=final_content,
|
||||||
is_final=is_final,
|
is_final=is_final,
|
||||||
)
|
)
|
||||||
|
# Attach form data to the final chunk for the adapter
|
||||||
|
if is_final and pending_form_data:
|
||||||
|
msg._form_data = pending_form_data
|
||||||
|
pending_form_data = None
|
||||||
|
yield msg
|
||||||
|
|
||||||
|
# If the stream ended after workflow_paused without a
|
||||||
|
# workflow_finished event, yield a final chunk so the adapter
|
||||||
|
# can update the card and add buttons.
|
||||||
|
if human_input_yielded and not is_final:
|
||||||
|
msg = provider_message.MessageChunk(
|
||||||
|
role='assistant',
|
||||||
|
content=workflow_contents or display_text,
|
||||||
|
is_final=True,
|
||||||
|
)
|
||||||
|
msg._form_data = pending_form_data
|
||||||
|
yield msg
|
||||||
|
|
||||||
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||||
"""运行请求"""
|
"""运行请求"""
|
||||||
|
|||||||
6
src/langbot/templates/dingtalk_human_input_card.json
Normal file
6
src/langbot/templates/dingtalk_human_input_card.json
Normal file
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user