mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-15 18:26:02 +00:00
Compare commits
9 Commits
codex/agen
...
feat/card_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f10ea82418 | ||
|
|
a32c4d152f | ||
|
|
83b0d26e99 | ||
|
|
e08b5db625 | ||
|
|
d0f65b17ec | ||
|
|
2b533c4a00 | ||
|
|
f663d87a60 | ||
|
|
60e5b873ee | ||
|
|
b96f209b98 |
649
scripts/build_dingtalk_card_template.py
Normal file
649
scripts/build_dingtalk_card_template.py
Normal file
@@ -0,0 +1,649 @@
|
||||
"""Generate the DingTalk human-input card template JSON.
|
||||
|
||||
The output is wrapped in the {editorData, widgetInfo, type, mode} envelope
|
||||
the DingTalk card builder expects on import. editorData is itself a JSON
|
||||
string (NOT a nested object), matching real exports from the builder.
|
||||
|
||||
Run from the repo root: python scripts/build_dingtalk_card_template.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
OUTPUT = Path('src/langbot/templates/dingtalk_human_input_card.json')
|
||||
|
||||
|
||||
def markdown_block(node_id, variable='content'):
|
||||
"""A MarkdownBlock whose content is bound to a global variable.
|
||||
|
||||
Unlike BaseText (whose `text` field requires editor-side manual binding),
|
||||
MarkdownBlock's `content` accepts `variableValue` binding at JSON load
|
||||
time — the imported template renders the variable straight away.
|
||||
"""
|
||||
return {
|
||||
'componentName': 'MarkdownBlock',
|
||||
'id': node_id,
|
||||
'props': {
|
||||
'mdVer': 0,
|
||||
'icon': {'type': 'icon', 'icon': '', 'iconType': 'emoji'},
|
||||
'content': {'variable': variable, 'variableType': 'global', 'type': 'variableValue'},
|
||||
'visible': {
|
||||
'type': 'dynamicVisible',
|
||||
'value': True,
|
||||
'valueType': 'fixed',
|
||||
'condition': {'op': 'and', 'conditions': []},
|
||||
},
|
||||
'isStreaming': True,
|
||||
'enableLinkStatPoint': False,
|
||||
'linkStatPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||
'linkStatPointParams': [],
|
||||
'marginTop': 6,
|
||||
'marginBottom': 6,
|
||||
'marginLeft': 12,
|
||||
'marginRight': 12,
|
||||
},
|
||||
'title': 'AI 流式富文本',
|
||||
'hidden': False,
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
}
|
||||
|
||||
|
||||
def text_block(
|
||||
node_id,
|
||||
text,
|
||||
*,
|
||||
bold=False,
|
||||
gravity='left',
|
||||
font_size=14,
|
||||
line_height=22,
|
||||
max_lines=20,
|
||||
ml=12,
|
||||
mr=12,
|
||||
mt=4,
|
||||
mb=4,
|
||||
color_token='common_level1_base_color',
|
||||
style_token='common_body_text_style',
|
||||
):
|
||||
return {
|
||||
'componentName': 'BaseText',
|
||||
'id': node_id,
|
||||
'props': {
|
||||
'text': {'i18n': False, 'type': 'dynamicString', 'content': text},
|
||||
'hoverText': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||
'iconType': 'iconCode',
|
||||
'iconFont': {'type': 'icon', 'icon': '', 'iconType': 'ddIcon'},
|
||||
'icon': {
|
||||
'type': 'dynamicLink',
|
||||
'value': '',
|
||||
'valueType': 'fixed',
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'darkIcon': {
|
||||
'type': 'dynamicLink',
|
||||
'value': '',
|
||||
'valueType': 'fixed',
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'autoWidth': False,
|
||||
'maxWidth': {
|
||||
'type': 'dynamicNumber',
|
||||
'valueType': 'fixed',
|
||||
'value': 0,
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'fixedWidth': {
|
||||
'type': 'dynamicNumber',
|
||||
'valueType': 'fixed',
|
||||
'value': 0,
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'marginLeft': ml,
|
||||
'marginRight': mr,
|
||||
'marginTop': mt,
|
||||
'marginBottom': mb,
|
||||
'fontColorType': 'Standard',
|
||||
'enableHighlight': False,
|
||||
'maxLine': {
|
||||
'type': 'dynamicNumber',
|
||||
'valueType': 'fixed',
|
||||
'value': max_lines,
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'color': {
|
||||
'type': 'dynamicColor',
|
||||
'valueType': 'fixed',
|
||||
'value': color_token,
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'customLightColor': {
|
||||
'type': 'dynamicColor',
|
||||
'valueType': 'fixed',
|
||||
'value': '#35404b',
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'customDarkColor': {
|
||||
'type': 'dynamicColor',
|
||||
'valueType': 'fixed',
|
||||
'value': '#f6f6f6',
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'gravity': gravity,
|
||||
'fontSizeType': 'Standard',
|
||||
'styleType': 'custom',
|
||||
'styleToken': style_token,
|
||||
'size': 'middle',
|
||||
'customFontSize': font_size,
|
||||
'customFontLineHeight': line_height,
|
||||
'bold': bold,
|
||||
'italic': False,
|
||||
'strikeThrough': False,
|
||||
'lineHeight': 'normal',
|
||||
'visible': {
|
||||
'type': 'dynamicVisible',
|
||||
'value': True,
|
||||
'valueType': 'fixed',
|
||||
'condition': {'op': 'and', 'conditions': []},
|
||||
},
|
||||
'autoMaxWidth': False,
|
||||
'innerOffset': 0,
|
||||
'enableIcon': False,
|
||||
'widthMode': 'match_parent',
|
||||
'margin': -2,
|
||||
},
|
||||
'title': '基础文本',
|
||||
'hidden': False,
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
}
|
||||
|
||||
|
||||
def button_group(node_id):
|
||||
return {
|
||||
'componentName': 'ButtonGroup',
|
||||
'id': node_id,
|
||||
'props': {
|
||||
'dynamicButtons': {'type': 'variableValue', 'variableType': 'global', 'variable': 'btns'},
|
||||
'marginLeft': 12,
|
||||
'marginRight': 12,
|
||||
'marginTop': 6,
|
||||
'marginBottom': 12,
|
||||
'visible': {
|
||||
'type': 'dynamicVisible',
|
||||
'value': True,
|
||||
'valueType': 'fixed',
|
||||
'condition': {'op': 'and', 'conditions': []},
|
||||
},
|
||||
'responsiveLayoutWidth': 350,
|
||||
'buttonsSource': 'variable',
|
||||
'fixedButtonIds': [],
|
||||
'fixedButtons': [],
|
||||
'enableResponsiveLayout': False,
|
||||
'matchContent': False,
|
||||
'buttonSpacing': 8,
|
||||
'margin': -2,
|
||||
'innerOffset': 0,
|
||||
},
|
||||
'title': '按钮组',
|
||||
'hidden': False,
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
}
|
||||
|
||||
|
||||
def build_editor_data():
|
||||
component_names = [
|
||||
'AIPending',
|
||||
'AICardStatusContainer',
|
||||
'BaseText',
|
||||
'AICardContent',
|
||||
'AICardContainer',
|
||||
'ButtonGroup',
|
||||
'MarkdownBlock',
|
||||
]
|
||||
components_map = [
|
||||
{
|
||||
'package': '@ali/dxComponent',
|
||||
'version': '1.0.0',
|
||||
'exportName': n,
|
||||
'main': './src/index.tsx',
|
||||
'destructuring': False,
|
||||
'subName': '',
|
||||
'componentName': n,
|
||||
}
|
||||
for n in component_names
|
||||
]
|
||||
|
||||
pending_state = {
|
||||
'componentName': 'AICardStatusContainer',
|
||||
'id': 'node_status_pending',
|
||||
'props': {
|
||||
'status': 1,
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'enableExtend': False,
|
||||
'autoFoldConfig': {
|
||||
'needFold': True,
|
||||
'heightLimit': 480,
|
||||
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
|
||||
},
|
||||
'innerOffset': 0,
|
||||
'enableCollapse': False,
|
||||
'margin': -2,
|
||||
},
|
||||
'title': '处理中状态',
|
||||
'hidden': False,
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
'children': [
|
||||
{
|
||||
'componentName': 'AIPending',
|
||||
'id': 'node_pending_inner',
|
||||
'props': {
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'pendingTip': {'type': 'dynamicString', 'content': '处理中...', 'i18n': False},
|
||||
'style': 'embed',
|
||||
'hideIcon': False,
|
||||
},
|
||||
'hidden': False,
|
||||
'title': '',
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
done_state = {
|
||||
'componentName': 'AICardStatusContainer',
|
||||
'id': 'node_status_done',
|
||||
'props': {
|
||||
'status': 3,
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'enableExtend': False,
|
||||
'autoFoldConfig': {
|
||||
'needFold': True,
|
||||
'heightLimit': 480,
|
||||
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
|
||||
},
|
||||
'innerOffset': 0,
|
||||
'enableCollapse': False,
|
||||
'margin': -2,
|
||||
},
|
||||
'title': '完成状态',
|
||||
'hidden': False,
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
'children': [
|
||||
{
|
||||
'componentName': 'AICardContent',
|
||||
'id': 'node_done_content',
|
||||
'props': {
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'visible': {
|
||||
'type': 'dynamicVisible',
|
||||
'value': True,
|
||||
'valueType': 'fixed',
|
||||
'condition': {'op': 'and', 'conditions': []},
|
||||
},
|
||||
'innerOffset': 0,
|
||||
'disabledWhileForward': False,
|
||||
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||
'statPointParams': [
|
||||
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
|
||||
],
|
||||
'margin': -2,
|
||||
'transformToEventChain': False,
|
||||
'enableStatPoint': False,
|
||||
},
|
||||
'hidden': False,
|
||||
'title': '',
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
'children': [
|
||||
markdown_block('node_text_content', variable='content'),
|
||||
button_group('node_btn_group'),
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
failed_state = {
|
||||
'componentName': 'AICardStatusContainer',
|
||||
'id': 'node_status_failed',
|
||||
'props': {
|
||||
'status': 5,
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'enableExtend': False,
|
||||
'autoFoldConfig': {
|
||||
'needFold': True,
|
||||
'heightLimit': 480,
|
||||
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
|
||||
},
|
||||
'innerOffset': 0,
|
||||
'enableCollapse': False,
|
||||
'margin': -2,
|
||||
},
|
||||
'title': '失败状态',
|
||||
'hidden': False,
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
'children': [
|
||||
{
|
||||
'componentName': 'AICardContent',
|
||||
'id': 'node_failed_content',
|
||||
'props': {
|
||||
'visible': {
|
||||
'type': 'dynamicVisible',
|
||||
'value': True,
|
||||
'valueType': 'fixed',
|
||||
'condition': {'op': 'and', 'conditions': []},
|
||||
},
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'innerOffset': 0,
|
||||
'disabledWhileForward': False,
|
||||
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
|
||||
'statPointParams': [
|
||||
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
|
||||
],
|
||||
'margin': -2,
|
||||
'transformToEventChain': False,
|
||||
'enableStatPoint': False,
|
||||
},
|
||||
'hidden': False,
|
||||
'title': '',
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
'children': [
|
||||
text_block(
|
||||
'node_failed_text',
|
||||
'操作失败,请稍后重试。',
|
||||
gravity='center',
|
||||
mt=10,
|
||||
mb=10,
|
||||
ml=10,
|
||||
mr=10,
|
||||
max_lines=2,
|
||||
font_size=15,
|
||||
)
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
root = {
|
||||
'componentName': 'AICardContainer',
|
||||
'id': 'node_root',
|
||||
'props': {
|
||||
'marginLeft': 0,
|
||||
'marginRight': 0,
|
||||
'marginTop': 0,
|
||||
'marginBottom': 0,
|
||||
'enablePending': True,
|
||||
'enableWriting': False,
|
||||
'enableDoing': False,
|
||||
'enableFailed': True,
|
||||
'summaryContent': {'type': 'variableValue', 'variableType': 'global', 'variable': ''},
|
||||
'enableTitle': False,
|
||||
'flowStatusVar': {'type': 'variableValue', 'variableType': 'global', 'variable': 'flowStatus'},
|
||||
'operationPenalType': 'custom',
|
||||
'enableFlowAbort': True,
|
||||
'innerOffset': 0,
|
||||
'enableGradientBorder': True,
|
||||
'cardSizeMode': 'adaptive',
|
||||
'cardSizeHeightMode': 'adaptive',
|
||||
'cardSizeWidthMode': 'adaptive',
|
||||
'cardSizeHeight': {
|
||||
'type': 'dynamicNumber',
|
||||
'valueType': 'fixed',
|
||||
'value': 226,
|
||||
'variable': '',
|
||||
'variableType': 'global',
|
||||
},
|
||||
'hasBackground': False,
|
||||
'backgroundType': 'Standard',
|
||||
'standardBackgroundColor': 'gray',
|
||||
'backgroundColor': '#F6F6F6',
|
||||
'darkModeBackgroundColor': '#3C3C3C',
|
||||
'enableEngineUpgrade': False,
|
||||
'enableExposeStatPoint': False,
|
||||
'enableDebugTool': False,
|
||||
},
|
||||
'hidden': False,
|
||||
'title': '',
|
||||
'isLocked': False,
|
||||
'condition': True,
|
||||
'conditionGroup': '',
|
||||
'children': [pending_state, done_state, failed_state],
|
||||
}
|
||||
|
||||
btns_var = {
|
||||
'name': 'btns',
|
||||
'private': False,
|
||||
'type': 'buttonGroup',
|
||||
'id': 'btns',
|
||||
'description': '动态按钮列表(Dify actions)',
|
||||
'editorVarType': 'variables',
|
||||
'disabled': False,
|
||||
'schema': [
|
||||
{
|
||||
'id': 'btns.text',
|
||||
'type': 'string',
|
||||
'name': 'text',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '按钮文案',
|
||||
},
|
||||
{
|
||||
'id': 'btns.color',
|
||||
'type': 'string',
|
||||
'name': 'color',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '按钮颜色',
|
||||
},
|
||||
{
|
||||
'id': 'btns.status',
|
||||
'type': 'string',
|
||||
'name': 'status',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '按钮状态',
|
||||
},
|
||||
{
|
||||
'id': 'btns.event',
|
||||
'type': 'dynamicEvent',
|
||||
'name': 'event',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '按钮点击事件',
|
||||
'schema': [
|
||||
{
|
||||
'id': 'btns.type',
|
||||
'type': 'string',
|
||||
'name': 'type',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '事件类型:openLink / sendCardRequest',
|
||||
},
|
||||
{
|
||||
'id': 'btns.params',
|
||||
'type': 'object',
|
||||
'name': 'params',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '事件参数',
|
||||
'schema': [
|
||||
{
|
||||
'id': 'btns.url',
|
||||
'type': 'string',
|
||||
'name': 'url',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '点击跳转链接(type=openLink)',
|
||||
},
|
||||
{
|
||||
'id': 'btns.actionId',
|
||||
'type': 'string',
|
||||
'name': 'actionId',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '回传请求 id(type=sendCardRequest)',
|
||||
},
|
||||
{
|
||||
'id': 'btns.params',
|
||||
'type': 'object',
|
||||
'name': 'params',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'description': '回传请求参数(type=sendCardRequest)',
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
return {
|
||||
'schemaVersion': '3.0.0',
|
||||
'schema': {
|
||||
'config': {'update_multi': True, 'streaming_mode': True},
|
||||
'componentsMap': components_map,
|
||||
'componentsTree': [root],
|
||||
'i18n': {},
|
||||
'version': '1.0.0',
|
||||
},
|
||||
'mockData': {
|
||||
'cardData': {
|
||||
'flowStatus': 3,
|
||||
'content': '请审核以下报销申请:\n\n- 申请人:张三\n- 金额:¥1,200\n- 类别:差旅',
|
||||
'btns': [
|
||||
{
|
||||
'text': '通过',
|
||||
'color': 'blue',
|
||||
'status': 'normal',
|
||||
'event': {
|
||||
'type': 'sendCardRequest',
|
||||
'params': {'actionId': 'approve', 'params': {'action_id': 'approve'}},
|
||||
},
|
||||
},
|
||||
{
|
||||
'text': '驳回',
|
||||
'color': 'gray',
|
||||
'status': 'normal',
|
||||
'event': {
|
||||
'type': 'sendCardRequest',
|
||||
'params': {'actionId': 'reject', 'params': {'action_id': 'reject'}},
|
||||
},
|
||||
},
|
||||
{
|
||||
'text': '补充资料',
|
||||
'color': 'gray',
|
||||
'status': 'normal',
|
||||
'event': {
|
||||
'type': 'sendCardRequest',
|
||||
'params': {'actionId': 'more_info', 'params': {'action_id': 'more_info'}},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
'cardPrivateData': {},
|
||||
'localData': {'flowStatus': '', '_cardFoldStatusLocalDataKey': ''},
|
||||
'richTextData': {},
|
||||
},
|
||||
'renderContext': {'regenerateEnabled': '1', 'regenerateIndex': '2', 'regenerateTotal': '5'},
|
||||
'editVersion': 0,
|
||||
'customWidgetInfo': '',
|
||||
'useCustomWidgetInfo': False,
|
||||
'variableList': [
|
||||
{
|
||||
'id': 'content',
|
||||
'type': 'markdown',
|
||||
'name': 'content',
|
||||
'description': '人工输入提示词(Dify form_content 含可选 node_title 前缀)',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': False,
|
||||
},
|
||||
{
|
||||
'id': 'flowStatus',
|
||||
'type': 'string',
|
||||
'name': 'flowStatus',
|
||||
'description': 'AI卡片状态:pending(1)、writing(2)、done(3)、failed(5)',
|
||||
'private': False,
|
||||
'editorVarType': 'variables',
|
||||
'disabled': True,
|
||||
'visible': False,
|
||||
},
|
||||
btns_var,
|
||||
],
|
||||
'formList': [],
|
||||
'customContextList': [],
|
||||
'expList': [],
|
||||
'localList': [],
|
||||
'hsfList': [],
|
||||
'lwpList': [],
|
||||
'pageData': {},
|
||||
'extension': {'extendType': 'AI', 'aiStatusList': [3, 1, 5], 'fileTypeList': []},
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
editor_data = build_editor_data()
|
||||
wrapper = {
|
||||
'editorData': json.dumps(editor_data, ensure_ascii=False, separators=(',', ':')),
|
||||
'widgetInfo': '',
|
||||
'type': 'im',
|
||||
'mode': 'card',
|
||||
}
|
||||
OUTPUT.write_text(json.dumps(wrapper, ensure_ascii=False, indent=2), encoding='utf-8')
|
||||
print(f'wrote {OUTPUT}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
|
||||
if chunk.startswith('data:'):
|
||||
yield json.loads(chunk[5:])
|
||||
|
||||
async def workflow_submit(
|
||||
self,
|
||||
form_token: str,
|
||||
workflow_run_id: str,
|
||||
inputs: dict[str, typing.Any],
|
||||
user: str,
|
||||
action: str = '',
|
||||
timeout: float = 120.0,
|
||||
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
||||
"""Submit human input to resume a paused workflow, then stream events.
|
||||
|
||||
1. POST /form/human_input/{form_token} to submit the form
|
||||
2. GET /workflow/{task_id}/events to stream the resumed workflow events
|
||||
"""
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
base_url=self.base_url,
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
# Step 1: Submit the form
|
||||
payload: dict[str, typing.Any] = {
|
||||
'inputs': inputs if isinstance(inputs, dict) else {},
|
||||
'user': user,
|
||||
'action': action,
|
||||
}
|
||||
|
||||
submit_resp = await client.post(
|
||||
f'/form/human_input/{form_token}',
|
||||
headers=headers,
|
||||
json=payload,
|
||||
)
|
||||
if submit_resp.status_code != 200:
|
||||
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
|
||||
|
||||
# Step 2: Stream resumed workflow events
|
||||
async with client.stream(
|
||||
'GET',
|
||||
f'/workflow/{workflow_run_id}/events',
|
||||
headers={'Authorization': f'Bearer {self.api_key}'},
|
||||
params={'user': user},
|
||||
) as r:
|
||||
async for chunk in r.aiter_lines():
|
||||
if r.status_code != 200:
|
||||
raise DifyAPIError(f'{r.status_code} {chunk}')
|
||||
if chunk.strip() == '':
|
||||
continue
|
||||
if chunk.startswith('data:'):
|
||||
yield json.loads(chunk[5:])
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
file: httpx._types.FileTypes,
|
||||
|
||||
@@ -1,17 +1,26 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import uuid
|
||||
import urllib.parse
|
||||
from typing import Callable
|
||||
from typing import Awaitable, Callable, Optional
|
||||
import dingtalk_stream # type: ignore
|
||||
import websockets
|
||||
from .EchoHandler import EchoTextHandler
|
||||
from .card_callback import DingTalkCardActionHandler
|
||||
from .dingtalkevent import DingTalkEvent
|
||||
import httpx
|
||||
import traceback
|
||||
|
||||
|
||||
_stdout_logger = logging.getLogger('langbot.dingtalk_api')
|
||||
|
||||
|
||||
DINGTALK_OPENAPI_BASE = 'https://api.dingtalk.com'
|
||||
|
||||
|
||||
class DingTalkClient:
|
||||
def __init__(
|
||||
self,
|
||||
@@ -21,6 +30,7 @@ class DingTalkClient:
|
||||
robot_code: str,
|
||||
markdown_card: bool,
|
||||
logger: None,
|
||||
card_action_callback: Optional[Callable[[dict], Awaitable[None]]] = None,
|
||||
):
|
||||
"""初始化 WebSocket 连接并自动启动"""
|
||||
self.credential = dingtalk_stream.Credential(client_id, client_secret)
|
||||
@@ -30,6 +40,14 @@ class DingTalkClient:
|
||||
# 在 DingTalkClient 中传入自己作为参数,避免循环导入
|
||||
self.EchoTextHandler = EchoTextHandler(self)
|
||||
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
|
||||
# STREAM-mode card action button click handler. Forwards parsed payload
|
||||
# to the adapter so it can resume paused Dify workflows.
|
||||
self.card_action_callback = card_action_callback
|
||||
self.card_action_handler = DingTalkCardActionHandler(self.client, self._on_card_action)
|
||||
self.client.register_callback_handler(
|
||||
dingtalk_stream.handlers.CallbackHandler.TOPIC_CARD_CALLBACK,
|
||||
self.card_action_handler,
|
||||
)
|
||||
self._message_handlers = {
|
||||
'example': [],
|
||||
}
|
||||
@@ -41,6 +59,16 @@ class DingTalkClient:
|
||||
self.logger = logger
|
||||
self._stopped = False # Flag to control the event loop
|
||||
|
||||
async def _on_card_action(self, payload: dict) -> None:
|
||||
"""Dispatch a parsed card-action payload to the adapter callback."""
|
||||
if self.card_action_callback is None:
|
||||
return
|
||||
try:
|
||||
await self.card_action_callback(payload)
|
||||
except Exception:
|
||||
if self.logger:
|
||||
await self.logger.error(f'DingTalk card action callback error: {traceback.format_exc()}')
|
||||
|
||||
async def get_access_token(self):
|
||||
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
@@ -429,18 +457,35 @@ class DingTalkClient:
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
# For enterprise-internal robots, robotCode == AppKey (client_id).
|
||||
# The dedicated robot_code field is only required for scenario-group
|
||||
# robots or third-party robots; fall back to client_id when empty so
|
||||
# the common single-bot setup keeps working without manual config.
|
||||
robot_code = self.robot_code or self.key
|
||||
data = {
|
||||
'robotCode': self.robot_code,
|
||||
'robotCode': robot_code,
|
||||
'userIds': [target_id],
|
||||
'msgKey': 'sampleText',
|
||||
'msgParam': json.dumps({'content': content}),
|
||||
}
|
||||
_stdout_logger.info(
|
||||
'DingTalk send_proactive_message_to_one request: robotCode=%s target_id=%s content_len=%d',
|
||||
robot_code,
|
||||
target_id,
|
||||
len(content),
|
||||
)
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(url, headers=headers, json=data)
|
||||
_stdout_logger.info(
|
||||
'DingTalk send_proactive_message_to_one response: status=%d body=%s',
|
||||
response.status_code,
|
||||
response.text[:500],
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return
|
||||
except Exception:
|
||||
_stdout_logger.exception('DingTalk send_proactive_message_to_one error')
|
||||
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
||||
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
|
||||
|
||||
@@ -456,7 +501,7 @@ class DingTalkClient:
|
||||
}
|
||||
|
||||
data = {
|
||||
'robotCode': self.robot_code,
|
||||
'robotCode': self.robot_code or self.key,
|
||||
'openConversationId': target_id,
|
||||
'msgKey': 'sampleText',
|
||||
'msgParam': json.dumps({'content': content}),
|
||||
@@ -477,47 +522,244 @@ class DingTalkClient:
|
||||
quote_origin: bool = False,
|
||||
card_auto_layout: bool = False,
|
||||
):
|
||||
card_data = {}
|
||||
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
|
||||
card_data['content'] = ''
|
||||
"""Create + deliver the streaming chat card for a chatbot reply.
|
||||
|
||||
# 将用户的消息内容作为卡片的查询参数,方便后续处理
|
||||
if incoming_message.message_type == 'text':
|
||||
card_data['query'] = incoming_message.get_text_list()[0]
|
||||
Replaces the old `dingtalk_stream.AICardReplier`-based path. Returns
|
||||
`(None, out_track_id)` to keep call sites compatible with the
|
||||
previous `(card_instance, card_instance_id)` shape — the first slot
|
||||
is unused now that everything is driven by out_track_id.
|
||||
"""
|
||||
out_track_id = uuid.uuid4().hex
|
||||
is_group = str(incoming_message.conversation_type) == '2'
|
||||
if is_group:
|
||||
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
|
||||
else:
|
||||
card_data['query'] = '...'
|
||||
open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
|
||||
|
||||
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message)
|
||||
# print(card_instance)
|
||||
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards
|
||||
card_instance_id = await card_instance.async_create_and_deliver_card(
|
||||
temp_card_id,
|
||||
card_data,
|
||||
card_param_map = {'content': ''}
|
||||
if incoming_message.message_type == 'text':
|
||||
card_param_map['query'] = incoming_message.get_text_list()[0]
|
||||
else:
|
||||
card_param_map['query'] = '...'
|
||||
|
||||
await self.create_and_deliver_card(
|
||||
card_template_id=temp_card_id,
|
||||
out_track_id=out_track_id,
|
||||
open_space_id=open_space_id,
|
||||
is_group=is_group,
|
||||
card_param_map=card_param_map,
|
||||
card_data_config={'autoLayout': card_auto_layout},
|
||||
)
|
||||
return card_instance, card_instance_id
|
||||
return None, out_track_id
|
||||
|
||||
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
|
||||
content_key = 'content'
|
||||
"""Stream a single chunk into an existing card's `content` field."""
|
||||
try:
|
||||
await card_instance.async_streaming(
|
||||
card_instance_id,
|
||||
content_key=content_key,
|
||||
await self.streaming_update_card(
|
||||
out_track_id=card_instance_id,
|
||||
content_key='content',
|
||||
content_value=content,
|
||||
append=False,
|
||||
finished=is_final,
|
||||
failed=False,
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.exception(e)
|
||||
await card_instance.async_streaming(
|
||||
card_instance_id,
|
||||
content_key=content_key,
|
||||
if self.logger:
|
||||
self.logger.exception(e)
|
||||
await self.streaming_update_card(
|
||||
out_track_id=card_instance_id,
|
||||
content_key='content',
|
||||
content_value='',
|
||||
append=False,
|
||||
finished=is_final,
|
||||
failed=True,
|
||||
)
|
||||
|
||||
async def create_and_deliver_card(
|
||||
self,
|
||||
*,
|
||||
card_template_id: str,
|
||||
out_track_id: str,
|
||||
open_space_id: str,
|
||||
is_group: bool,
|
||||
card_param_map: Optional[dict] = None,
|
||||
callback_type: str = 'STREAM',
|
||||
callback_route_key: Optional[str] = None,
|
||||
support_forward: bool = True,
|
||||
dynamic_data_source_configs: Optional[list] = None,
|
||||
card_data_config: Optional[dict] = None,
|
||||
at_user_ids: Optional[dict] = None,
|
||||
recipients: Optional[list] = None,
|
||||
) -> bool:
|
||||
"""POST /v1.0/card/instances/createAndDeliver.
|
||||
|
||||
Mirrors the SDK's `async_create_and_deliver_card` shape but exposes
|
||||
the dynamic-data-source config slot so we can register a pull URL
|
||||
for variable-length button lists.
|
||||
"""
|
||||
if not await self.check_access_token():
|
||||
await self.get_access_token()
|
||||
|
||||
cardData: dict = {'cardParamMap': card_param_map or {}}
|
||||
if card_data_config is not None:
|
||||
cardData['config'] = json.dumps(card_data_config)
|
||||
|
||||
body: dict = {
|
||||
'cardTemplateId': card_template_id,
|
||||
'outTrackId': out_track_id,
|
||||
'cardData': cardData,
|
||||
'callbackType': callback_type,
|
||||
'openSpaceId': open_space_id,
|
||||
'imGroupOpenSpaceModel': {'supportForward': support_forward},
|
||||
'imRobotOpenSpaceModel': {'supportForward': support_forward},
|
||||
}
|
||||
if callback_type == 'HTTP' and callback_route_key:
|
||||
body['callbackRouteKey'] = callback_route_key
|
||||
|
||||
if is_group:
|
||||
deliver: dict = {'robotCode': self.robot_code or self.key}
|
||||
if at_user_ids:
|
||||
deliver['atUserIds'] = at_user_ids
|
||||
if recipients is not None:
|
||||
deliver['recipients'] = recipients
|
||||
body['imGroupOpenDeliverModel'] = deliver
|
||||
else:
|
||||
body['imRobotOpenDeliverModel'] = {'spaceType': 'IM_ROBOT'}
|
||||
|
||||
if dynamic_data_source_configs:
|
||||
body['openDynamicDataConfig'] = {'dynamicDataSourceConfigs': dynamic_data_source_configs}
|
||||
|
||||
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/createAndDeliver'
|
||||
headers = {
|
||||
'x-acs-dingtalk-access-token': self.access_token,
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
try:
|
||||
_stdout_logger.info(
|
||||
'DingTalk createAndDeliver request body: %s',
|
||||
json.dumps(body, ensure_ascii=False)[:1500],
|
||||
)
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(url, headers=headers, json=body, timeout=30.0)
|
||||
if response.status_code == 200:
|
||||
_stdout_logger.info(
|
||||
'DingTalk createAndDeliver response: %s',
|
||||
response.text[:500],
|
||||
)
|
||||
return True
|
||||
_stdout_logger.error(
|
||||
'DingTalk createAndDeliver failed: status=%s body=%s',
|
||||
response.status_code,
|
||||
response.text,
|
||||
)
|
||||
if self.logger:
|
||||
await self.logger.error(
|
||||
f'DingTalk createAndDeliver failed: status={response.status_code} body={response.text}'
|
||||
)
|
||||
return False
|
||||
except Exception:
|
||||
_stdout_logger.exception('DingTalk createAndDeliver error')
|
||||
if self.logger:
|
||||
await self.logger.error(f'DingTalk createAndDeliver error: {traceback.format_exc()}')
|
||||
return False
|
||||
|
||||
async def streaming_update_card(
|
||||
self,
|
||||
*,
|
||||
out_track_id: str,
|
||||
content_key: str,
|
||||
content_value: str,
|
||||
append: bool,
|
||||
finished: bool,
|
||||
failed: bool = False,
|
||||
) -> bool:
|
||||
"""PUT /v1.0/card/streaming.
|
||||
|
||||
Replaces `dingtalk_stream.AICardReplier.async_streaming` — same body
|
||||
shape (outTrackId / guid / key / content / isFull / isFinalize /
|
||||
isError) per the SDK source.
|
||||
"""
|
||||
if not await self.check_access_token():
|
||||
await self.get_access_token()
|
||||
|
||||
body = {
|
||||
'outTrackId': out_track_id,
|
||||
'guid': uuid.uuid4().hex,
|
||||
'key': content_key,
|
||||
'content': content_value,
|
||||
'isFull': not append,
|
||||
'isFinalize': finished,
|
||||
'isError': failed,
|
||||
}
|
||||
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/streaming'
|
||||
headers = {
|
||||
'x-acs-dingtalk-access-token': self.access_token,
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(url, headers=headers, json=body, timeout=30.0)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
if self.logger:
|
||||
await self.logger.error(
|
||||
f'DingTalk card streaming failed: status={response.status_code} body={response.text}'
|
||||
)
|
||||
return False
|
||||
except Exception:
|
||||
if self.logger:
|
||||
await self.logger.error(f'DingTalk card streaming error: {traceback.format_exc()}')
|
||||
return False
|
||||
|
||||
async def update_card_data(
|
||||
self,
|
||||
*,
|
||||
out_track_id: str,
|
||||
card_param_map: Optional[dict] = None,
|
||||
private_data: Optional[dict] = None,
|
||||
) -> bool:
|
||||
"""PUT /v1.0/card/instances — non-streaming card content update."""
|
||||
if not await self.check_access_token():
|
||||
await self.get_access_token()
|
||||
|
||||
body: dict = {
|
||||
'outTrackId': out_track_id,
|
||||
'cardData': {'cardParamMap': card_param_map or {}},
|
||||
}
|
||||
if private_data:
|
||||
body['privateData'] = private_data
|
||||
|
||||
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances'
|
||||
headers = {
|
||||
'x-acs-dingtalk-access-token': self.access_token,
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
try:
|
||||
_stdout_logger.info(
|
||||
'DingTalk update_card_data request: out_track_id=%s body=%s',
|
||||
out_track_id,
|
||||
json.dumps(body, ensure_ascii=False)[:500],
|
||||
)
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(url, headers=headers, json=body, timeout=30.0)
|
||||
_stdout_logger.info(
|
||||
'DingTalk update_card_data response: status=%d body=%s',
|
||||
response.status_code,
|
||||
response.text[:300],
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
if self.logger:
|
||||
await self.logger.error(
|
||||
f'DingTalk update card failed: status={response.status_code} body={response.text}'
|
||||
)
|
||||
return False
|
||||
except Exception:
|
||||
_stdout_logger.exception('DingTalk update_card_data error')
|
||||
if self.logger:
|
||||
await self.logger.error(f'DingTalk update card error: {traceback.format_exc()}')
|
||||
return False
|
||||
|
||||
async def start(self):
|
||||
"""启动 WebSocket 连接,监听消息"""
|
||||
self._stopped = False
|
||||
|
||||
96
src/langbot/libs/dingtalk_api/card_callback.py
Normal file
96
src/langbot/libs/dingtalk_api/card_callback.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""STREAM-mode handler for DingTalk card action button clicks.
|
||||
|
||||
DingTalk delivers card-action callbacks over the same WebSocket stream used
|
||||
for chatbot messages, under the topic `/v1.0/card/instances/callback`. This
|
||||
module subclasses `dingtalk_stream.CallbackHandler` and forwards the parsed
|
||||
payload to a coroutine the adapter registers, so the resume-paused-workflow
|
||||
logic stays in the platform adapter where it belongs.
|
||||
|
||||
The `CardCallbackMessage` returned by `from_dict` exposes:
|
||||
|
||||
* `card_instance_id` (from `outTrackId`) — the card whose button was clicked
|
||||
* `user_id` — the clicker's userId
|
||||
* `content` — parsed JSON; the click params live here. Where exactly inside
|
||||
`content` they sit depends on the template binding. We probe
|
||||
the common paths.
|
||||
* `extension` — parsed JSON; any extra data we set when delivering the card.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Awaitable, Callable, Optional
|
||||
|
||||
import dingtalk_stream # type: ignore
|
||||
from dingtalk_stream import AckMessage
|
||||
from dingtalk_stream.card_callback import CardCallbackMessage
|
||||
|
||||
|
||||
_PARAM_PATHS = (
|
||||
('params',),
|
||||
('cardPrivateData', 'params'),
|
||||
('userPrivateData', 'params'),
|
||||
)
|
||||
|
||||
|
||||
def _extract_params(content: dict) -> dict:
|
||||
"""Return the action params dict regardless of where the template put it."""
|
||||
for path in _PARAM_PATHS:
|
||||
node = content
|
||||
for key in path:
|
||||
if not isinstance(node, dict):
|
||||
node = None
|
||||
break
|
||||
node = node.get(key)
|
||||
if node is None:
|
||||
break
|
||||
if isinstance(node, dict) and node:
|
||||
return node
|
||||
return {}
|
||||
|
||||
|
||||
class DingTalkCardActionHandler(dingtalk_stream.CallbackHandler):
|
||||
def __init__(
|
||||
self,
|
||||
dingtalk_stream_client,
|
||||
on_action: Optional[Callable[[dict], Awaitable[None]]] = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.dingtalk_client = dingtalk_stream_client
|
||||
self.on_action = on_action
|
||||
|
||||
async def process(self, callback: dingtalk_stream.CallbackMessage):
|
||||
try:
|
||||
message = CardCallbackMessage.from_dict(callback.data)
|
||||
params = _extract_params(message.content if isinstance(message.content, dict) else {})
|
||||
|
||||
# `CardCallbackMessage.from_dict` does not surface `actionId` (the
|
||||
# top-level field that ButtonGroup's sendCardRequest event puts
|
||||
# there). Pull it from the raw callback.data instead.
|
||||
raw = callback.data if isinstance(callback.data, dict) else {}
|
||||
action_id = raw.get('actionId') or ''
|
||||
if not action_id:
|
||||
# Some templates nest it under actionData / cardPrivateData.
|
||||
action_data = raw.get('actionData') or {}
|
||||
if isinstance(action_data, dict):
|
||||
action_id = action_data.get('actionId') or action_id
|
||||
if not action_id:
|
||||
cpd = action_data.get('cardPrivateData') or {}
|
||||
if isinstance(cpd, dict):
|
||||
ids = cpd.get('actionIds')
|
||||
if isinstance(ids, list) and ids:
|
||||
action_id = str(ids[0])
|
||||
|
||||
payload = {
|
||||
'out_track_id': message.card_instance_id,
|
||||
'user_id': message.user_id,
|
||||
'corp_id': message.corp_id,
|
||||
'action_id': action_id,
|
||||
'params': params,
|
||||
'raw_content': message.content,
|
||||
'extension': message.extension if isinstance(message.extension, dict) else {},
|
||||
}
|
||||
if self.on_action is not None:
|
||||
await self.on_action(payload)
|
||||
except Exception as e:
|
||||
self.logger.error(f'DingTalkCardActionHandler.process error: {e}')
|
||||
return AckMessage.STATUS_OK, 'OK'
|
||||
@@ -67,6 +67,16 @@ class StreamSession:
|
||||
# 反馈 ID,用于接收用户点赞/点踩反馈
|
||||
feedback_id: Optional[str] = None
|
||||
|
||||
# Dify 人工输入暂停态:runner 把 _form_data 传过来时填充。
|
||||
# 一旦设置,下次企微 followup 请求时返回 button_interaction 模板卡
|
||||
# 替代 stream chunk。点击按钮会回调 template_card_event,EventKey
|
||||
# 就是 Dify 的 action_id。
|
||||
pending_form: Optional[dict] = None
|
||||
|
||||
# template_card task_id(企微要求 button_interaction 必填且不可重复)。
|
||||
# 创建 pending_form 时生成;按钮点击回调里用来反查 session。
|
||||
pending_form_task_id: Optional[str] = None
|
||||
|
||||
|
||||
class StreamSessionManager:
|
||||
"""管理 stream 会话的生命周期,并负责队列的生产消费。"""
|
||||
@@ -83,6 +93,9 @@ class StreamSessionManager:
|
||||
self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射
|
||||
self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话
|
||||
self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射
|
||||
# task_id (button_interaction template_card 的) -> stream_id 映射,
|
||||
# 用于按钮点击回调里反查 pending_form。
|
||||
self._task_index: dict[str, str] = {}
|
||||
|
||||
def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]:
|
||||
if not msg_id:
|
||||
@@ -118,6 +131,40 @@ class StreamSessionManager:
|
||||
if feedback_id and stream_id:
|
||||
self._feedback_index[feedback_id] = stream_id
|
||||
|
||||
def set_pending_form(self, stream_id: str, form_data: dict, task_id: str) -> None:
|
||||
"""把 Dify 人工输入暂停态绑定到 stream session。
|
||||
|
||||
下一次企微 followup 请求时,adapter 检测到 pending_form,
|
||||
返回 button_interaction 模板卡而不是 stream chunk。
|
||||
"""
|
||||
session = self._sessions.get(stream_id)
|
||||
if not session:
|
||||
return
|
||||
session.pending_form = form_data
|
||||
session.pending_form_task_id = task_id
|
||||
if task_id:
|
||||
self._task_index[task_id] = stream_id
|
||||
|
||||
def get_session_by_task_id(self, task_id: str) -> Optional[StreamSession]:
|
||||
"""按按钮点击回调里的 TaskId 反查 session。"""
|
||||
if not task_id:
|
||||
return None
|
||||
stream_id = self._task_index.get(task_id)
|
||||
if not stream_id:
|
||||
return None
|
||||
return self._sessions.get(stream_id)
|
||||
|
||||
def clear_pending_form(self, stream_id: str) -> None:
|
||||
"""按钮点击消费完后清掉 pending_form,避免重复弹卡。"""
|
||||
session = self._sessions.get(stream_id)
|
||||
if not session:
|
||||
return
|
||||
task_id = session.pending_form_task_id
|
||||
session.pending_form = None
|
||||
session.pending_form_task_id = None
|
||||
if task_id:
|
||||
self._task_index.pop(task_id, None)
|
||||
|
||||
def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]:
|
||||
"""根据企业微信回调创建或获取会话。
|
||||
|
||||
@@ -723,6 +770,79 @@ async def parse_wecom_bot_message(
|
||||
return message_data
|
||||
|
||||
|
||||
def build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]:
|
||||
"""Build a `template_card` (button_interaction) WeCom payload.
|
||||
|
||||
Shared by both the webhook-mode client (returns the payload as the
|
||||
response to a stream-followup callback) and the ws_client (sends it
|
||||
as a reply frame). Output shape is `{"msgtype": "template_card",
|
||||
"template_card": {...}}` per the WeCom spec.
|
||||
|
||||
Args:
|
||||
form_data: Dify human-input form data with keys ``actions`` (list of
|
||||
``{id, title, button_style}``), ``node_title``, ``form_content``.
|
||||
task_id: Unique per-card identifier. WeCom requires this for
|
||||
button_interaction. The click callback returns it as TaskId so we
|
||||
can find the originating session.
|
||||
|
||||
Notes:
|
||||
* ``button.key`` is set directly to the Dify ``action_id``. The click
|
||||
callback's ``EventKey`` carries this back unchanged (1024-byte limit
|
||||
per the spec, far more than we ever need).
|
||||
* WeCom caps the button list at 6. Extra actions are appended to
|
||||
``sub_title_text`` so users can still reply with the id as text.
|
||||
* Styles map ``primary``→1 (blue), ``danger``→2 (red), default→0
|
||||
(gray). First button is auto-promoted to primary when no style.
|
||||
"""
|
||||
actions = list(form_data.get('actions') or [])
|
||||
node_title = (form_data.get('node_title') or '').strip() or '人工介入'
|
||||
form_content = (form_data.get('form_content') or '').strip()
|
||||
|
||||
visible_actions = actions[:6]
|
||||
overflow = actions[6:]
|
||||
|
||||
sub_title_parts: list[str] = []
|
||||
if form_content:
|
||||
sub_title_parts.append(form_content)
|
||||
if overflow:
|
||||
extra_lines = [f' - {a.get("title") or a.get("id") or ""} (回复 id: {a.get("id") or ""})' for a in overflow]
|
||||
sub_title_parts.append(f'另有 {len(overflow)} 个选项不在按钮列表中,可直接回复 id:\n' + '\n'.join(extra_lines))
|
||||
sub_title_text = '\n\n'.join(sub_title_parts) or '请选择一个操作以继续。'
|
||||
|
||||
button_list = []
|
||||
for idx, action in enumerate(visible_actions):
|
||||
action_id = str(action.get('id') or '')
|
||||
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
|
||||
style_raw = (action.get('button_style') or '').lower()
|
||||
if style_raw == 'primary' or (style_raw == '' and idx == 0):
|
||||
style = 1
|
||||
elif style_raw == 'danger':
|
||||
style = 2
|
||||
else:
|
||||
style = 0
|
||||
button_list.append(
|
||||
{
|
||||
'text': title,
|
||||
'style': style,
|
||||
'key': action_id,
|
||||
}
|
||||
)
|
||||
|
||||
card = {
|
||||
'card_type': 'button_interaction',
|
||||
'main_title': {
|
||||
'title': node_title,
|
||||
},
|
||||
'sub_title_text': sub_title_text,
|
||||
'button_list': button_list,
|
||||
'task_id': task_id,
|
||||
}
|
||||
return {
|
||||
'msgtype': 'template_card',
|
||||
'template_card': card,
|
||||
}
|
||||
|
||||
|
||||
class WecomBotClient:
|
||||
def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLogger, unified_mode: bool = False):
|
||||
"""企业微信智能机器人客户端。
|
||||
@@ -761,6 +881,7 @@ class WecomBotClient:
|
||||
self.stream_poll_timeout = 0.5
|
||||
|
||||
self._feedback_callback: Optional[Callable] = None
|
||||
self._card_action_callback: Optional[Callable] = None
|
||||
|
||||
def set_feedback_callback(self, callback: Callable) -> None:
|
||||
"""设置反馈回调函数。
|
||||
@@ -770,6 +891,19 @@ class WecomBotClient:
|
||||
"""
|
||||
self._feedback_callback = callback
|
||||
|
||||
def set_card_action_callback(self, callback: Callable) -> None:
|
||||
"""设置按钮卡片点击回调函数。
|
||||
|
||||
Signature: ``async def callback(session, action_id, task_id, raw_event) -> None``
|
||||
|
||||
``session`` is the StreamSession the card was attached to;
|
||||
``action_id`` is the Dify action_id reflected back via the
|
||||
button's ``key`` field; ``task_id`` is the card's task_id
|
||||
(matches ``session.pending_form_task_id``); ``raw_event`` is the
|
||||
decoded callback JSON for any extra fields the adapter wants.
|
||||
"""
|
||||
self._card_action_callback = callback
|
||||
|
||||
@staticmethod
|
||||
def _build_stream_payload(
|
||||
stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None
|
||||
@@ -800,6 +934,12 @@ class WecomBotClient:
|
||||
'stream': stream_payload,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]:
|
||||
"""Class-level shim — delegates to module-level builder so ws_client
|
||||
can reuse the exact same payload shape without importing the class."""
|
||||
return build_button_interaction_payload(form_data, task_id)
|
||||
|
||||
async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]:
|
||||
"""对响应进行加密封装并返回给企业微信。
|
||||
|
||||
@@ -892,6 +1032,22 @@ class WecomBotClient:
|
||||
return await self._encrypt_and_reply(self._build_stream_payload('', '', True), nonce)
|
||||
|
||||
session = self.stream_sessions.get_session(stream_id)
|
||||
|
||||
# If a Dify human-input pause arrived during this stream, switch
|
||||
# the response from `msgtype: stream` to `msgtype: template_card`
|
||||
# (button_interaction). The session's stream is also marked
|
||||
# finished so future followups aren't expected (assuming the
|
||||
# WeCom client treats template_card as the terminal response —
|
||||
# we'll know from the next callback whether it kept polling).
|
||||
if session and session.pending_form and session.pending_form_task_id:
|
||||
await self.logger.info(
|
||||
f'WeComBot: returning button_interaction for stream_id={stream_id} '
|
||||
f'task_id={session.pending_form_task_id} actions={len(session.pending_form.get("actions") or [])}'
|
||||
)
|
||||
card_payload = self._build_button_interaction_payload(session.pending_form, session.pending_form_task_id)
|
||||
self.stream_sessions.mark_finished(stream_id)
|
||||
return await self._encrypt_and_reply(card_payload, nonce)
|
||||
|
||||
chunk = await self.stream_sessions.consume(stream_id, timeout=self.stream_poll_timeout)
|
||||
|
||||
if not chunk:
|
||||
@@ -1000,11 +1156,50 @@ class WecomBotClient:
|
||||
if event_type == 'feedback_event':
|
||||
return await self._handle_feedback_event(msg_json, nonce)
|
||||
|
||||
# Button click on a button_interaction template_card. The WeCom doc
|
||||
# calls this `template_card_event`; some routes wrap the button
|
||||
# event payload inside `event.template_card_event`.
|
||||
if event_type == 'template_card_event':
|
||||
return await self._handle_template_card_event(msg_json, nonce)
|
||||
|
||||
if msg_json.get('msgtype') == 'stream':
|
||||
return await self._handle_post_followup_response(msg_json, nonce)
|
||||
|
||||
return await self._handle_post_initial_response(msg_json, nonce)
|
||||
|
||||
async def _handle_template_card_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
|
||||
"""Handle a button click on a button_interaction template_card.
|
||||
|
||||
WeCom carries the click info in ``event.template_card_event`` with
|
||||
``TaskId`` matching the card we created and ``EventKey`` carrying
|
||||
the button's ``key`` (which we set to the Dify ``action_id``).
|
||||
"""
|
||||
try:
|
||||
tce = msg_json.get('event', {}).get('template_card_event', {})
|
||||
task_id = tce.get('TaskId') or tce.get('task_id') or ''
|
||||
event_key = tce.get('EventKey') or tce.get('event_key') or ''
|
||||
card_type = tce.get('CardType') or tce.get('card_type') or ''
|
||||
|
||||
await self.logger.info(f'收到按钮点击: task_id={task_id} event_key={event_key!r} card_type={card_type}')
|
||||
|
||||
session = self.stream_sessions.get_session_by_task_id(task_id)
|
||||
if session is None:
|
||||
await self.logger.warning(f'未找到 task_id={task_id} 对应的 session,按钮点击被丢弃')
|
||||
else:
|
||||
if self._card_action_callback is not None:
|
||||
try:
|
||||
await self._card_action_callback(session, event_key, task_id, msg_json)
|
||||
except Exception:
|
||||
await self.logger.error(f'card action callback raised: {traceback.format_exc()}')
|
||||
# Drop the form so a fresh chunk/followup doesn't re-render
|
||||
# the same card (and so the task_id can be GC'd).
|
||||
self.stream_sessions.clear_pending_form(session.stream_id)
|
||||
except Exception:
|
||||
await self.logger.error(f'_handle_template_card_event error: {traceback.format_exc()}')
|
||||
|
||||
# WeCom expects an empty success ack for event callbacks.
|
||||
return await self._encrypt_and_reply({}, nonce)
|
||||
|
||||
async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
|
||||
"""处理企业微信用户反馈事件(点赞/点踩)。
|
||||
|
||||
@@ -1114,6 +1309,29 @@ class WecomBotClient:
|
||||
self.stream_sessions.mark_finished(stream_id)
|
||||
return True
|
||||
|
||||
async def push_form_pause(
|
||||
self, msg_id: str, form_data: dict, task_id: Optional[str] = None
|
||||
) -> tuple[bool, Optional[str], Optional[str]]:
|
||||
"""Attach a Dify human-input pause to the active stream session.
|
||||
|
||||
On the next WeCom followup poll, the response switches from
|
||||
``msgtype: stream`` to ``msgtype: template_card`` (button_interaction)
|
||||
carrying the buttons. ``task_id`` is auto-generated if not provided
|
||||
and is what the button-click callback uses to look the session back up.
|
||||
|
||||
Returns:
|
||||
``(ok, stream_id, task_id)``. ``ok`` is False if the
|
||||
adapter's msg_id maps to no stream session (e.g. non-stream mode).
|
||||
"""
|
||||
stream_id = self.stream_sessions.get_stream_id_by_msg(msg_id)
|
||||
if not stream_id:
|
||||
return False, None, None
|
||||
if not task_id:
|
||||
# WeCom requires task_id [A-Za-z0-9_-@], <= 128 bytes, unique per bot.
|
||||
task_id = f'dify-{uuid.uuid4().hex[:24]}'
|
||||
self.stream_sessions.set_pending_form(stream_id, form_data, task_id)
|
||||
return True, stream_id, task_id
|
||||
|
||||
async def set_message(self, msg_id: str, content: str):
|
||||
"""兼容旧逻辑:若无法流式返回则缓存最终结果。
|
||||
|
||||
|
||||
@@ -20,7 +20,11 @@ from typing import Any, Callable, Optional
|
||||
import aiohttp
|
||||
|
||||
from langbot.libs.wecom_ai_bot_api import wecombotevent
|
||||
from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession
|
||||
from langbot.libs.wecom_ai_bot_api.api import (
|
||||
parse_wecom_bot_message,
|
||||
StreamSession,
|
||||
build_button_interaction_payload,
|
||||
)
|
||||
from langbot.pkg.platform.logger import EventLogger
|
||||
|
||||
DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com'
|
||||
@@ -103,6 +107,18 @@ class WecomBotWsClient:
|
||||
# msg_id -> feedback_id (for associating feedback with message)
|
||||
self._msg_feedback_ids: dict[str, str] = {} # msg_id -> feedback_id
|
||||
|
||||
# Dify human-input pause state for ws mode. Keys are task_id (echoed
|
||||
# back in template_card_event.TaskId so we can rebuild the session
|
||||
# context on click).
|
||||
# task_id -> {form_data, msg_id, user_id, chat_id, stream_id, req_id}
|
||||
self._pending_forms_by_task: dict[str, dict] = {}
|
||||
# Reverse: msg_id -> task_id (for cleanup when stream finishes).
|
||||
self._task_id_by_msg: dict[str, str] = {}
|
||||
# Optional card-action callback registered by the adapter.
|
||||
# Signature mirrors the http-mode WecomBotClient:
|
||||
# async def callback(session, action_id, task_id, raw_event) -> None
|
||||
self._card_action_callback: Optional[Callable] = None
|
||||
|
||||
# ── Public API ──────────────────────────────────────────────────
|
||||
|
||||
async def connect(self):
|
||||
@@ -236,6 +252,83 @@ class WecomBotWsClient:
|
||||
}
|
||||
return await self._send_reply(req_id, body)
|
||||
|
||||
async def reply_template_card(self, req_id: str, card_payload: dict[str, Any]) -> Optional[dict]:
|
||||
"""Send a template_card (button_interaction etc.) reply.
|
||||
|
||||
Args:
|
||||
req_id: The req_id from the original message frame.
|
||||
card_payload: Body produced by ``build_button_interaction_payload``;
|
||||
must contain ``msgtype`` and ``template_card`` keys.
|
||||
|
||||
Returns:
|
||||
ACK frame dict, or None on failure.
|
||||
"""
|
||||
return await self._send_reply(req_id, card_payload)
|
||||
|
||||
def set_card_action_callback(self, callback: Callable) -> None:
|
||||
"""Register the button-click handler.
|
||||
|
||||
``async def callback(session, action_id, task_id, raw_event) -> None``
|
||||
— same signature as the http-mode WecomBotClient version so the
|
||||
adapter can hand both off to the same coroutine.
|
||||
"""
|
||||
self._card_action_callback = callback
|
||||
|
||||
async def push_form_pause(
|
||||
self, msg_id: str, form_data: dict, task_id: Optional[str] = None
|
||||
) -> tuple[bool, Optional[str], Optional[str]]:
|
||||
"""Attach a Dify human-input pause to the active stream and send
|
||||
the button_interaction card immediately.
|
||||
|
||||
ws mode has no notion of polled "followup" responses — each reply
|
||||
is a one-shot frame send. So unlike the http path (which defers
|
||||
card delivery to the next followup), here we just craft the card
|
||||
and reply with it on the original req_id. The corresponding stream
|
||||
session is then torn down so subsequent chunks don't re-send.
|
||||
|
||||
Returns:
|
||||
``(ok, stream_id, task_id)``. ``ok=False`` if no active stream
|
||||
for this msg_id (e.g. message arrived in non-stream mode).
|
||||
"""
|
||||
key = self._stream_ids.get(msg_id)
|
||||
if not key:
|
||||
return False, None, None
|
||||
req_id, stream_id = key.split('|', 1)
|
||||
|
||||
if not task_id:
|
||||
task_id = f'dify-{secrets.token_hex(12)}'
|
||||
|
||||
session_info = self._stream_sessions.get(msg_id) or {}
|
||||
self._pending_forms_by_task[task_id] = {
|
||||
'form_data': form_data,
|
||||
'msg_id': msg_id,
|
||||
'user_id': session_info.get('user_id', ''),
|
||||
'chat_id': session_info.get('chat_id', ''),
|
||||
'stream_id': stream_id,
|
||||
'req_id': req_id,
|
||||
}
|
||||
self._task_id_by_msg[msg_id] = task_id
|
||||
|
||||
card_payload = build_button_interaction_payload(form_data, task_id)
|
||||
try:
|
||||
await self.reply_template_card(req_id, card_payload)
|
||||
except Exception:
|
||||
await self.logger.error(f'Failed to send button_interaction card: {traceback.format_exc()}')
|
||||
# Roll back the bookkeeping so the next attempt isn't blocked.
|
||||
self._pending_forms_by_task.pop(task_id, None)
|
||||
self._task_id_by_msg.pop(msg_id, None)
|
||||
return False, stream_id, None
|
||||
|
||||
# Tear down the stream — WeCom expects either stream chunks OR a
|
||||
# template_card, not both on the same req_id. Subsequent
|
||||
# push_stream_chunk calls for this msg_id become no-ops.
|
||||
self._stream_ids.pop(msg_id, None)
|
||||
self._stream_last_content.pop(msg_id, None)
|
||||
# Keep _stream_sessions so the button callback can still resolve
|
||||
# user/chat context; it gets cleaned up when the click fires.
|
||||
|
||||
return True, stream_id, task_id
|
||||
|
||||
async def send_message(self, chat_id: str, content: str, msgtype: str = 'markdown') -> Optional[dict]:
|
||||
"""Proactively send a message to a specified chat.
|
||||
|
||||
@@ -258,6 +351,23 @@ class WecomBotWsClient:
|
||||
body['text'] = {'content': content}
|
||||
return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG)
|
||||
|
||||
async def send_template_card(self, chat_id: str, card_payload: dict[str, Any]) -> Optional[dict]:
|
||||
"""Proactively push a template_card to a chat.
|
||||
|
||||
Used for the resumed-workflow path (button click → new query):
|
||||
synthetic events have no inbound req_id to reply against, so we
|
||||
fall back to proactive ``aibot_send_msg`` instead of reply mode.
|
||||
|
||||
Args:
|
||||
chat_id: userid (single chat) or chatid (group chat).
|
||||
card_payload: ``{"msgtype": "template_card", "template_card": {...}}``
|
||||
as produced by :func:`build_button_interaction_payload`.
|
||||
"""
|
||||
req_id = _generate_req_id(CMD_SEND_MSG)
|
||||
body = dict(card_payload)
|
||||
body['chatid'] = chat_id
|
||||
return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG)
|
||||
|
||||
async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = False) -> bool:
|
||||
"""Push a streaming chunk for a given message ID.
|
||||
|
||||
@@ -568,6 +678,38 @@ class WecomBotWsClient:
|
||||
await self.logger.error(f'Error in feedback handler: {traceback.format_exc()}')
|
||||
return
|
||||
|
||||
if event_type == 'template_card_event':
|
||||
tce = event_info.get('template_card_event', {})
|
||||
task_id = tce.get('TaskId') or tce.get('task_id') or ''
|
||||
event_key = tce.get('EventKey') or tce.get('event_key') or ''
|
||||
card_type = tce.get('CardType') or tce.get('card_type') or ''
|
||||
await self.logger.info(
|
||||
f'收到按钮点击 (ws): task_id={task_id} event_key={event_key!r} card_type={card_type}'
|
||||
)
|
||||
pending = self._pending_forms_by_task.get(task_id)
|
||||
if pending is None:
|
||||
await self.logger.warning(f'未找到 task_id={task_id} 对应的 pending_form (ws),按钮点击被丢弃')
|
||||
elif self._card_action_callback is not None:
|
||||
try:
|
||||
session = StreamSession(
|
||||
stream_id=pending.get('stream_id', ''),
|
||||
msg_id=pending.get('msg_id', ''),
|
||||
chat_id=pending.get('chat_id') or None,
|
||||
user_id=pending.get('user_id') or None,
|
||||
)
|
||||
session.pending_form = pending.get('form_data')
|
||||
session.pending_form_task_id = task_id
|
||||
await self._card_action_callback(session, event_key, task_id, body)
|
||||
except Exception:
|
||||
await self.logger.error(f'card action callback raised (ws): {traceback.format_exc()}')
|
||||
# Consume — drop bookkeeping so a stale click can't re-fire.
|
||||
self._pending_forms_by_task.pop(task_id, None)
|
||||
msg_id = pending.get('msg_id', '')
|
||||
if msg_id:
|
||||
self._task_id_by_msg.pop(msg_id, None)
|
||||
self._stream_sessions.pop(msg_id, None)
|
||||
return
|
||||
|
||||
event = wecombotevent.WecomBotEvent(message_data)
|
||||
|
||||
if event_type in self._message_handlers:
|
||||
|
||||
@@ -157,7 +157,7 @@ class RuntimePipeline:
|
||||
bot_message=query.resp_messages[-1],
|
||||
message=result.user_notice,
|
||||
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
|
||||
is_final=[msg.is_final for msg in query.resp_messages][0],
|
||||
is_final=[msg.is_final for msg in query.resp_messages][-1],
|
||||
)
|
||||
else:
|
||||
await query.adapter.reply_message(
|
||||
|
||||
@@ -42,9 +42,13 @@ class QueryPool:
|
||||
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
|
||||
pipeline_uuid: typing.Optional[str] = None,
|
||||
routed_by_rule: bool = False,
|
||||
variables: typing.Optional[dict[str, typing.Any]] = None,
|
||||
) -> pipeline_query.Query:
|
||||
async with self.condition:
|
||||
query_id = self.query_id_counter
|
||||
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
|
||||
if variables:
|
||||
initial_variables.update(variables)
|
||||
query = pipeline_query.Query(
|
||||
bot_uuid=bot_uuid,
|
||||
query_id=query_id,
|
||||
@@ -53,7 +57,7 @@ class QueryPool:
|
||||
sender_id=sender_id,
|
||||
message_event=message_event,
|
||||
message_chain=message_chain,
|
||||
variables={'_routed_by_rule': routed_by_rule},
|
||||
variables=initial_variables,
|
||||
resp_messages=[],
|
||||
resp_message_chain=[],
|
||||
adapter=adapter,
|
||||
|
||||
@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
|
||||
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
|
||||
# TODO 命令与流式的兼容性问题
|
||||
if await query.adapter.is_stream_output_supported() and has_chunks:
|
||||
is_final = [msg.is_final for msg in query.resp_messages][0]
|
||||
is_final = [msg.is_final for msg in query.resp_messages][-1]
|
||||
await query.adapter.reply_message_chunk(
|
||||
message_source=query.message_event,
|
||||
bot_message=query.resp_messages[-1],
|
||||
|
||||
@@ -501,6 +501,8 @@ class PlatformManager:
|
||||
bot_entity.adapter_config,
|
||||
logger,
|
||||
)
|
||||
if hasattr(adapter_inst, 'ap'):
|
||||
adapter_inst.ap = self.ap
|
||||
|
||||
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook)
|
||||
if hasattr(adapter_inst, 'set_bot_uuid'):
|
||||
|
||||
@@ -1,13 +1,19 @@
|
||||
import asyncio
|
||||
import json
|
||||
import traceback
|
||||
import typing
|
||||
import uuid
|
||||
|
||||
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
||||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||||
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
from langbot.libs.dingtalk_api.api import DingTalkClient
|
||||
import datetime
|
||||
from langbot.pkg.platform.logger import EventLogger
|
||||
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
|
||||
|
||||
|
||||
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
||||
@@ -170,6 +176,22 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
card_instance_id_dict: (
|
||||
dict # 回复卡片消息字典,key为消息id,value为回复卡片实例id,用于在流式消息时判断是否发送到指定卡片
|
||||
)
|
||||
# outTrackId → form snapshot {session_key, launcher_type, launcher_id, form_token,
|
||||
# workflow_run_id, actions, node_title, form_content, expires_at, open_space_id,
|
||||
# user_id_hint, current_text}. Lookup keys for the data-source pull endpoint and
|
||||
# the STREAM card-action callback.
|
||||
card_state: dict
|
||||
# session_key → out_track_id of the currently-active card for the
|
||||
# conversation turn. Lets resumed-workflow chunks (which arrive on a
|
||||
# synthetic event with a fresh resp_message_id) keep updating the same
|
||||
# card the user clicked instead of getting a new one.
|
||||
active_turn_card: dict
|
||||
# session_key → accumulated streaming text for the active turn. Read
|
||||
# by _paint_form_on_card so the post-pause form keeps the streamed
|
||||
# context above the new prompt.
|
||||
active_turn_text: dict
|
||||
ap: typing.Any = None
|
||||
bot_uuid: str = ''
|
||||
|
||||
def __init__(self, config: dict, logger: EventLogger):
|
||||
required_keys = [
|
||||
@@ -194,10 +216,17 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
config=config,
|
||||
logger=logger,
|
||||
card_instance_id_dict={},
|
||||
card_state={},
|
||||
active_turn_card={},
|
||||
active_turn_text={},
|
||||
bot_account_id=bot_account_id,
|
||||
bot=bot,
|
||||
listeners={},
|
||||
)
|
||||
# Wire the card-action callback after super().__init__ so we can reference
|
||||
# self.* — the client's handler stores this as a soft reference and reads
|
||||
# it at fire time.
|
||||
self.bot.card_action_callback = self._on_card_action
|
||||
|
||||
async def reply_message(
|
||||
self,
|
||||
@@ -222,28 +251,82 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
quote_origin: bool = False,
|
||||
is_final: bool = False,
|
||||
):
|
||||
# event = await DingTalkEventConverter.yiri2target(
|
||||
# message_source,
|
||||
# )
|
||||
# incoming_message = event.incoming_message
|
||||
|
||||
# msg_id = incoming_message.message_id
|
||||
message_id = bot_message.resp_message_id
|
||||
msg_seq = bot_message.msg_sequence
|
||||
|
||||
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
if is_final and self.ap is not None:
|
||||
self.ap.logger.info(
|
||||
f'DingTalk reply_message_chunk final: form_data_present={form_data is not None}, '
|
||||
f'form_template_configured={bool(form_template_id)}'
|
||||
)
|
||||
|
||||
if form_data and is_final:
|
||||
await self._handle_form_chunk(message_source, bot_message, message, form_data)
|
||||
return
|
||||
|
||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||
markdown_enabled = self.config.get('markdown_card', False)
|
||||
content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
|
||||
|
||||
card_instance, card_instance_id = self.card_instance_id_dict[message_id]
|
||||
if not content and bot_message.content:
|
||||
content = bot_message.content # 兼容直接传入content的情况
|
||||
# print(card_instance_id)
|
||||
|
||||
chat_card_entry = self.card_instance_id_dict.get(message_id)
|
||||
if chat_card_entry is None:
|
||||
# No streaming chat card was created for this query — common
|
||||
# path for synthetic events (e.g. resumed workflow after a
|
||||
# button click). Lazy-create one so the resumed output streams
|
||||
# into a card just like a normal conversation, instead of
|
||||
# being deferred and sent in one shot on is_final.
|
||||
if not content:
|
||||
return # nothing to stream yet
|
||||
chat_card_entry = await self._lazy_create_resume_chat_card(message_source, message_id)
|
||||
if chat_card_entry is None:
|
||||
# Lazy-create failed (no template configured); fall back
|
||||
# to a one-shot proactive message on the final chunk.
|
||||
if is_final:
|
||||
await self._send_proactive_to_event(message_source, content)
|
||||
return
|
||||
|
||||
card_instance, card_instance_id = chat_card_entry
|
||||
if content:
|
||||
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
# self.seq = 1 # 消息回复结束之后重置seq
|
||||
self.card_instance_id_dict.pop(message_id) # 消息回复结束之后删除卡片实例id
|
||||
if form_template_id:
|
||||
# The card content has already been written via
|
||||
# update_card_data (in _paint_form_on_card and the
|
||||
# initial card creation). The streaming endpoint
|
||||
# (PUT /v1.0/card/streaming) does not propagate
|
||||
# updates on cards whose content was last set via
|
||||
# update_card_data — they take different code paths
|
||||
# on the DingTalk client. Stick with update_card_data
|
||||
# for the whole turn for consistency.
|
||||
try:
|
||||
await self.bot.update_card_data(
|
||||
out_track_id=card_instance_id,
|
||||
card_param_map={
|
||||
'content': content,
|
||||
'btns': '[]',
|
||||
'flowStatus': '3' if is_final else '1',
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.exception('DingTalk: update card content failed')
|
||||
else:
|
||||
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
|
||||
if is_final:
|
||||
if form_template_id and not content:
|
||||
# Empty final chunk still needs to leave the card with
|
||||
# flowStatus=3 so the spinner stops.
|
||||
try:
|
||||
await self.bot.update_card_data(
|
||||
out_track_id=card_instance_id,
|
||||
card_param_map={'flowStatus': '3'},
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
if bot_message.tool_calls is None:
|
||||
self.card_instance_id_dict.pop(message_id, None)
|
||||
|
||||
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
|
||||
markdown_enabled = self.config.get('markdown_card', False)
|
||||
@@ -260,16 +343,80 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
return is_stream
|
||||
|
||||
async def create_message_card(self, message_id, event):
|
||||
card_template_id = self.config['card_template_id']
|
||||
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||
legacy_template_id = self.config.get('card_template_id', '')
|
||||
|
||||
# Synthetic events (button clicks): look up the card already in
|
||||
# active_turn_card so reply_message_chunk can stream to it.
|
||||
if event is None or event.source_platform_object is None:
|
||||
if form_template_id:
|
||||
session_key = self._session_key_from_event(event) if event is not None else ''
|
||||
carry = self.active_turn_card.get(session_key, '') if session_key else ''
|
||||
if carry:
|
||||
self.card_instance_id_dict[message_id] = (None, carry)
|
||||
return True
|
||||
return False
|
||||
|
||||
if form_template_id:
|
||||
# Create one card with the form template, empty buttons,
|
||||
# pending state. Streaming writes content to it; form pause
|
||||
# paints buttons onto it. One card per turn, no duplication.
|
||||
incoming_message = event.source_platform_object.incoming_message
|
||||
out_track_id = uuid.uuid4().hex
|
||||
is_group = str(incoming_message.conversation_type) == '2'
|
||||
if is_group:
|
||||
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
|
||||
else:
|
||||
open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
|
||||
try:
|
||||
await self.bot.create_and_deliver_card(
|
||||
card_template_id=form_template_id,
|
||||
out_track_id=out_track_id,
|
||||
open_space_id=open_space_id,
|
||||
is_group=is_group,
|
||||
card_param_map={'content': '', 'btns': '[]', 'flowStatus': '1'},
|
||||
callback_type='STREAM',
|
||||
)
|
||||
except Exception:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.exception('DingTalk: create form-template card failed')
|
||||
return False
|
||||
self.card_instance_id_dict[message_id] = (None, out_track_id)
|
||||
session_key = self._session_key_from_event(event)
|
||||
if session_key:
|
||||
self.active_turn_card[session_key] = out_track_id
|
||||
self.active_turn_text[session_key] = ''
|
||||
return True
|
||||
|
||||
# Legacy chat-card path (no form template).
|
||||
incoming_message = event.source_platform_object.incoming_message
|
||||
# message_id = incoming_message.message_id
|
||||
card_auto_layout = self.config.get('card_ auto_layout', False)
|
||||
card_auto_layout = self.config.get('card_auto_layout', False)
|
||||
card_instance, card_instance_id = await self.bot.create_and_card(
|
||||
card_template_id, incoming_message, card_auto_layout=card_auto_layout
|
||||
legacy_template_id, incoming_message, card_auto_layout=card_auto_layout
|
||||
)
|
||||
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
|
||||
return True
|
||||
|
||||
def _session_key_from_event(self, event) -> str:
|
||||
"""Return launcher_type_launcher_id for an event, '' if unrecoverable."""
|
||||
if event is None:
|
||||
return ''
|
||||
spo = event.source_platform_object
|
||||
if spo is None:
|
||||
try:
|
||||
if isinstance(event, platform_events.GroupMessage):
|
||||
return f'group_{event.group.id}'
|
||||
return f'person_{event.sender.id}'
|
||||
except Exception:
|
||||
return ''
|
||||
try:
|
||||
inc = spo.incoming_message
|
||||
if str(inc.conversation_type) == '2':
|
||||
return f'group_{inc.conversation_id}'
|
||||
return f'person_{inc.sender_staff_id}'
|
||||
except Exception:
|
||||
return ''
|
||||
|
||||
def register_listener(
|
||||
self,
|
||||
event_type: typing.Type[platform_events.Event],
|
||||
@@ -309,3 +456,543 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
],
|
||||
):
|
||||
return super().unregister_listener(event_type, callback)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Dify human-input form support
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def set_bot_uuid(self, bot_uuid: str):
|
||||
"""Receive the bot uuid from the platform manager.
|
||||
|
||||
Used to compose the public-facing unified-webhook URL for the card
|
||||
dynamic-data-source pull endpoint.
|
||||
"""
|
||||
self.bot_uuid = bot_uuid
|
||||
|
||||
def _derive_open_space(self, message_source: platform_events.MessageEvent) -> tuple[str, bool]:
|
||||
"""Return (openSpaceId, is_group) for the given inbound event."""
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
return f'dtv1.card//IM_GROUP.{message_source.group.id}', True
|
||||
return f'dtv1.card//IM_ROBOT.{message_source.sender.id}', False
|
||||
|
||||
def _derive_session_descriptor(
|
||||
self, message_source: platform_events.MessageEvent
|
||||
) -> tuple[provider_session.LauncherTypes, str, str]:
|
||||
"""Return (launcher_type, launcher_id, sender_user_id) for routing."""
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
return (
|
||||
provider_session.LauncherTypes.GROUP,
|
||||
str(message_source.group.id),
|
||||
str(message_source.sender.id),
|
||||
)
|
||||
return (
|
||||
provider_session.LauncherTypes.PERSON,
|
||||
str(message_source.sender.id),
|
||||
str(message_source.sender.id),
|
||||
)
|
||||
|
||||
async def _handle_form_chunk(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
bot_message,
|
||||
message: platform_message.MessageChain,
|
||||
form_data: dict,
|
||||
) -> None:
|
||||
"""Surface human-input prompt + buttons on the active card.
|
||||
|
||||
In single-card mode (form_template_id configured): update the
|
||||
EXISTING card with form buttons so it transitions from streaming
|
||||
output to prompt+buttons on the same card. In legacy mode:
|
||||
finalize the chat card and deliver a separate form card.
|
||||
"""
|
||||
if self.ap is not None:
|
||||
self.ap.logger.info(
|
||||
f'DingTalk _handle_form_chunk: actions={len(form_data.get("actions") or [])}, '
|
||||
f'node_title={form_data.get("node_title", "")!r}'
|
||||
)
|
||||
message_id = bot_message.resp_message_id
|
||||
template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||
|
||||
if template_id:
|
||||
# Single-card mode: paint prompt + buttons onto the existing card.
|
||||
session_key = self._session_key_from_event(message_source)
|
||||
entry = self.card_instance_id_dict.get(message_id)
|
||||
out_track_id = entry[1] if entry else None
|
||||
if not out_track_id and session_key:
|
||||
out_track_id = self.active_turn_card.get(session_key, '')
|
||||
if out_track_id:
|
||||
await self._paint_form_on_card(message_source, out_track_id, form_data, session_key)
|
||||
self.card_instance_id_dict.pop(message_id, None)
|
||||
return
|
||||
|
||||
# No existing card (e.g. Dify paused immediately with no LLM
|
||||
# output before the pause). Create a form card directly.
|
||||
await self._send_form_card(message_source, form_data, template_id)
|
||||
self.card_instance_id_dict.pop(message_id, None)
|
||||
return
|
||||
|
||||
# Legacy mode: finalize the streaming card with text fallback.
|
||||
chat_card_entry = self.card_instance_id_dict.pop(message_id, None)
|
||||
if chat_card_entry is not None:
|
||||
_, chat_out_track_id = chat_card_entry
|
||||
markdown_enabled = self.config.get('markdown_card', False)
|
||||
text_content, _ = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
|
||||
if not text_content and bot_message.content:
|
||||
text_content = bot_message.content
|
||||
try:
|
||||
await self.bot.send_card_message(None, chat_out_track_id, text_content or '', True)
|
||||
except Exception:
|
||||
await self.logger.error(f'DingTalk: finalize chat card before form failed: {traceback.format_exc()}')
|
||||
|
||||
await self.send_message_text_form(message_source, form_data)
|
||||
|
||||
async def _paint_form_on_card(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
out_track_id: str,
|
||||
form_data: dict,
|
||||
session_key: str,
|
||||
) -> None:
|
||||
"""Update an existing card's content + buttons for human-input."""
|
||||
actions = list(form_data.get('actions') or [])
|
||||
node_title = form_data.get('node_title', '') or 'Human Input Required'
|
||||
form_content = form_data.get('form_content', '') or ''
|
||||
|
||||
# Record form state for the click-handler.
|
||||
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
|
||||
self.card_state[out_track_id] = {
|
||||
'session_key': session_key,
|
||||
'launcher_type': launcher_type.value,
|
||||
'launcher_id': launcher_id,
|
||||
'sender_user_id': sender_user_id,
|
||||
'form_token': form_data.get('form_token', ''),
|
||||
'workflow_run_id': form_data.get('workflow_run_id', ''),
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
'form_content': form_content,
|
||||
}
|
||||
|
||||
btns = self._build_btns(actions, out_track_id)
|
||||
parts: list[str] = []
|
||||
prior = self.active_turn_text.get(session_key, '') if session_key else ''
|
||||
if prior.strip():
|
||||
parts.append(prior.rstrip())
|
||||
parts.append('---')
|
||||
if node_title:
|
||||
parts.append(f'**{node_title}**')
|
||||
if form_content:
|
||||
parts.append(form_content)
|
||||
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
|
||||
|
||||
try:
|
||||
await self.bot.update_card_data(
|
||||
out_track_id=out_track_id,
|
||||
card_param_map={
|
||||
'content': display_content,
|
||||
'btns': json.dumps(btns, ensure_ascii=False),
|
||||
'flowStatus': '3',
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.exception('DingTalk: paint form on card failed')
|
||||
await self.send_message_text_form(message_source, form_data)
|
||||
return
|
||||
|
||||
if session_key:
|
||||
self.active_turn_text[session_key] = display_content
|
||||
|
||||
@staticmethod
|
||||
def _build_btns(actions: list, out_track_id: str) -> list:
|
||||
btns = []
|
||||
for idx, action in enumerate(actions):
|
||||
action_id = str(action.get('id') or '')
|
||||
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
|
||||
style = (action.get('button_style') or '').lower()
|
||||
if style == 'primary' or (style == '' and idx == 0):
|
||||
color = 'blue'
|
||||
elif style == 'danger':
|
||||
color = 'red'
|
||||
else:
|
||||
color = 'gray'
|
||||
btns.append(
|
||||
{
|
||||
'text': title,
|
||||
'color': color,
|
||||
'status': 'normal',
|
||||
'event': {
|
||||
'type': 'sendCardRequest',
|
||||
'params': {
|
||||
'actionId': action_id,
|
||||
'params': {'action_id': action_id, 'out_track_id': out_track_id},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
return btns
|
||||
|
||||
async def _send_form_card(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
template_id: str,
|
||||
) -> None:
|
||||
"""Deliver a new card pre-loaded with the human-input prompt + buttons."""
|
||||
out_track_id = uuid.uuid4().hex
|
||||
open_space_id, is_group = self._derive_open_space(message_source)
|
||||
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
|
||||
session_key = f'{launcher_type.value}_{launcher_id}'
|
||||
|
||||
actions = list(form_data.get('actions') or [])
|
||||
node_title = form_data.get('node_title', '') or 'Human Input Required'
|
||||
form_content = form_data.get('form_content', '') or ''
|
||||
|
||||
self.card_state[out_track_id] = {
|
||||
'session_key': session_key,
|
||||
'launcher_type': launcher_type.value,
|
||||
'launcher_id': launcher_id,
|
||||
'sender_user_id': sender_user_id,
|
||||
'form_token': form_data.get('form_token', ''),
|
||||
'workflow_run_id': form_data.get('workflow_run_id', ''),
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
'form_content': form_content,
|
||||
'open_space_id': open_space_id,
|
||||
'is_group': is_group,
|
||||
}
|
||||
|
||||
parts = []
|
||||
if node_title:
|
||||
parts.append(f'**{node_title}**')
|
||||
if form_content:
|
||||
parts.append(form_content)
|
||||
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
|
||||
|
||||
btns = []
|
||||
for idx, action in enumerate(actions):
|
||||
action_id = str(action.get('id') or '')
|
||||
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
|
||||
style = (action.get('button_style') or '').lower()
|
||||
if style == 'primary' or (style == '' and idx == 0):
|
||||
color = 'blue'
|
||||
elif style == 'danger':
|
||||
color = 'red'
|
||||
else:
|
||||
color = 'gray'
|
||||
btns.append(
|
||||
{
|
||||
'text': title,
|
||||
'color': color,
|
||||
'status': 'normal',
|
||||
'event': {
|
||||
'type': 'sendCardRequest',
|
||||
'params': {
|
||||
'actionId': action_id,
|
||||
'params': {'action_id': action_id, 'out_track_id': out_track_id},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.info(
|
||||
f'DingTalk _send_form_card: out_track_id={out_track_id} template_id={template_id} '
|
||||
f'open_space_id={open_space_id} is_group={is_group} btns={len(btns)}'
|
||||
)
|
||||
await self.bot.create_and_deliver_card(
|
||||
card_template_id=template_id,
|
||||
out_track_id=out_track_id,
|
||||
open_space_id=open_space_id,
|
||||
is_group=is_group,
|
||||
card_param_map={
|
||||
'content': display_content,
|
||||
'btns': json.dumps(btns, ensure_ascii=False),
|
||||
'flowStatus': '3',
|
||||
},
|
||||
callback_type='STREAM',
|
||||
)
|
||||
except Exception:
|
||||
await self.logger.error(f'DingTalk: deliver form card failed: {traceback.format_exc()}')
|
||||
await self.send_message_text_form(message_source, form_data)
|
||||
self.card_state.pop(out_track_id, None)
|
||||
|
||||
async def _lazy_create_resume_chat_card(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
message_id: str,
|
||||
) -> typing.Optional[tuple]:
|
||||
"""Create a new card for resumed-workflow streaming output.
|
||||
|
||||
Used after a button click triggers a synthetic event — the form
|
||||
card stays put with the "已选择" notice, and a fresh card is
|
||||
spawned here for the LLM reply to stream into.
|
||||
"""
|
||||
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
|
||||
legacy_template_id = (self.config.get('card_template_id') or '').strip()
|
||||
template_id = form_template_id or legacy_template_id
|
||||
if not template_id:
|
||||
return None
|
||||
out_track_id = uuid.uuid4().hex
|
||||
open_space_id, is_group = self._derive_open_space(message_source)
|
||||
if form_template_id:
|
||||
card_param_map = {'content': '', 'btns': '[]', 'flowStatus': '1'}
|
||||
card_data_config = None
|
||||
else:
|
||||
card_param_map = {'content': '', 'query': '...'}
|
||||
card_data_config = {'autoLayout': self.config.get('card_auto_layout', False)}
|
||||
try:
|
||||
success = await self.bot.create_and_deliver_card(
|
||||
card_template_id=template_id,
|
||||
out_track_id=out_track_id,
|
||||
open_space_id=open_space_id,
|
||||
is_group=is_group,
|
||||
card_param_map=card_param_map,
|
||||
card_data_config=card_data_config,
|
||||
callback_type='STREAM',
|
||||
)
|
||||
except Exception:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.exception('DingTalk: lazy create resume chat card failed')
|
||||
return None
|
||||
if not success:
|
||||
return None
|
||||
entry = (None, out_track_id)
|
||||
self.card_instance_id_dict[message_id] = entry
|
||||
# Register as the active card so any further chunks on this turn
|
||||
# (and a subsequent re-pause) land on the same new card.
|
||||
session_key = self._session_key_from_event(message_source)
|
||||
if session_key:
|
||||
self.active_turn_card[session_key] = out_track_id
|
||||
self.active_turn_text[session_key] = ''
|
||||
return entry
|
||||
|
||||
async def send_message_text_form(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
) -> None:
|
||||
"""Fallback: send the human-input prompt as plain text."""
|
||||
display_text = _format_human_input_text(
|
||||
form_data.get('node_title', ''),
|
||||
form_data.get('form_content', ''),
|
||||
form_data.get('actions', []) or [],
|
||||
)
|
||||
await self._send_proactive_to_event(message_source, display_text)
|
||||
|
||||
async def _send_proactive_to_event(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
content: str,
|
||||
) -> None:
|
||||
"""Send `content` as a proactive message to the conversation behind
|
||||
`message_source`. Used when no inbound chatbot message exists to
|
||||
anchor a card on (e.g. resumed flows triggered by card actions).
|
||||
"""
|
||||
if not content:
|
||||
return
|
||||
if self.ap is not None:
|
||||
target = (
|
||||
str(message_source.group.id)
|
||||
if isinstance(message_source, platform_events.GroupMessage)
|
||||
else str(message_source.sender.id)
|
||||
)
|
||||
self.ap.logger.info(
|
||||
f'DingTalk _send_proactive_to_event: target={target} '
|
||||
f'is_group={isinstance(message_source, platform_events.GroupMessage)} content_len={len(content)}'
|
||||
)
|
||||
try:
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
await self.bot.send_proactive_message_to_group(str(message_source.group.id), content)
|
||||
else:
|
||||
await self.bot.send_proactive_message_to_one(str(message_source.sender.id), content)
|
||||
except Exception:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.exception('DingTalk: send proactive message failed')
|
||||
await self.logger.error(f'DingTalk: send proactive message failed: {traceback.format_exc()}')
|
||||
|
||||
async def _on_card_action(self, payload: dict) -> None:
|
||||
"""Translate a card button click into a synthetic query.
|
||||
|
||||
Reads the clicked button's ``actionId`` (the real Dify action id —
|
||||
the ButtonGroup template sends it back via `event.params.actionId`),
|
||||
recovers the action title from ``card_state``, and enqueues a
|
||||
synthetic `_dify_form_action` query the same way Lark / Telegram do.
|
||||
"""
|
||||
if self.ap is not None:
|
||||
self.ap.logger.info(
|
||||
f'DingTalk _on_card_action received: out_track_id={payload.get("out_track_id")} '
|
||||
f'payload_action_id={payload.get("action_id")!r} params={payload.get("params")!r}'
|
||||
)
|
||||
out_track_id = payload.get('out_track_id') or ''
|
||||
params = payload.get('params') or {}
|
||||
# ButtonGroup `sendCardRequest` events surface the click id at the
|
||||
# callback top level as `actionId`; fall back to `params.action_id`
|
||||
# (alternate template wiring) and `params.actionId`.
|
||||
raw_action_id = (
|
||||
(payload.get('action_id') or '').strip()
|
||||
or (params.get('action_id') or '').strip()
|
||||
or (params.get('actionId') or '').strip()
|
||||
or (params.get('id') or '').strip()
|
||||
)
|
||||
state = self.card_state.get(out_track_id)
|
||||
if state is None:
|
||||
await self.logger.warning(f'DingTalk: card action received for unknown out_track_id={out_track_id}')
|
||||
return
|
||||
if not raw_action_id:
|
||||
await self.logger.warning(f'DingTalk: card action with no action_id, payload={payload}')
|
||||
return
|
||||
|
||||
actions = state.get('actions', []) or []
|
||||
action_id = raw_action_id
|
||||
action_title = raw_action_id
|
||||
for action in actions:
|
||||
if str(action.get('id', '')) == raw_action_id:
|
||||
action_title = action.get('title') or raw_action_id
|
||||
break
|
||||
|
||||
launcher_type = (
|
||||
provider_session.LauncherTypes.GROUP
|
||||
if state.get('launcher_type') == provider_session.LauncherTypes.GROUP.value
|
||||
else provider_session.LauncherTypes.PERSON
|
||||
)
|
||||
launcher_id = state.get('launcher_id', '')
|
||||
sender_user_id = state.get('sender_user_id') or payload.get('user_id') or launcher_id
|
||||
|
||||
form_action_data = {
|
||||
'form_token': state.get('form_token', ''),
|
||||
'workflow_run_id': state.get('workflow_run_id', ''),
|
||||
'action_id': action_id,
|
||||
'action_title': action_title,
|
||||
'node_title': state.get('node_title', ''),
|
||||
'user': f'{launcher_type.value}_{launcher_id}',
|
||||
'inputs': {},
|
||||
}
|
||||
|
||||
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
|
||||
|
||||
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||
synthetic_event = platform_events.GroupMessage(
|
||||
sender=platform_entities.GroupMember(
|
||||
id=sender_user_id,
|
||||
member_name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
group=platform_entities.Group(
|
||||
id=launcher_id,
|
||||
name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
),
|
||||
special_title='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=int(datetime.datetime.now().timestamp()),
|
||||
source_platform_object=None,
|
||||
)
|
||||
else:
|
||||
synthetic_event = platform_events.FriendMessage(
|
||||
sender=platform_entities.Friend(
|
||||
id=sender_user_id,
|
||||
nickname='',
|
||||
remark='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=int(datetime.datetime.now().timestamp()),
|
||||
source_platform_object=None,
|
||||
)
|
||||
|
||||
bot_uuid = ''
|
||||
pipeline_uuid = None
|
||||
if self.ap is not None:
|
||||
for bot in self.ap.platform_mgr.bots:
|
||||
if bot.adapter is self:
|
||||
bot_uuid = bot.bot_entity.uuid
|
||||
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
|
||||
break
|
||||
|
||||
try:
|
||||
self.ap.logger.info(
|
||||
f'DingTalk _on_card_action enqueuing form action: action_id={action_id!r} '
|
||||
f'action_title={action_title!r} launcher_type={launcher_type.value} '
|
||||
f'launcher_id={launcher_id} bot_uuid={bot_uuid} pipeline_uuid={pipeline_uuid}'
|
||||
)
|
||||
await self.ap.query_pool.add_query(
|
||||
bot_uuid=bot_uuid,
|
||||
launcher_type=launcher_type,
|
||||
launcher_id=launcher_id,
|
||||
sender_id=sender_user_id,
|
||||
message_event=synthetic_event,
|
||||
message_chain=message_chain,
|
||||
adapter=self,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
variables={
|
||||
'_dify_form_action': form_action_data,
|
||||
'_routed_by_rule': True,
|
||||
},
|
||||
)
|
||||
self.ap.logger.info('DingTalk _on_card_action: query enqueued OK')
|
||||
except Exception:
|
||||
self.ap.logger.exception('DingTalk: enqueue form action query failed')
|
||||
return
|
||||
|
||||
# Visual feedback on the form card itself: keep the prompt visible,
|
||||
# add a "已选择" line, remove the buttons. The resumed-workflow
|
||||
# output lives on a separate new card (lazy-created in
|
||||
# reply_message_chunk on the synthetic event), so the form card
|
||||
# stays put as a record of the user's selection.
|
||||
asyncio.create_task(
|
||||
self._mark_card_resolved(
|
||||
out_track_id,
|
||||
action_title,
|
||||
node_title=state.get('node_title', ''),
|
||||
form_content=state.get('form_content', ''),
|
||||
)
|
||||
)
|
||||
|
||||
# Crucial: do NOT leave the form card's out_track_id in
|
||||
# active_turn_card — otherwise create_message_card for the
|
||||
# synthetic event would reuse it for the resume output, painting
|
||||
# the LLM reply on top of the "已选择" notice. Clear it so the
|
||||
# resume goes through the lazy-create path and spawns a fresh card.
|
||||
session_key = state.get('session_key', '')
|
||||
if session_key and self.active_turn_card.get(session_key) == out_track_id:
|
||||
self.active_turn_card.pop(session_key, None)
|
||||
self.active_turn_text.pop(session_key, None)
|
||||
|
||||
# Once consumed, drop the state — the runner clears _PENDING_FORMS too.
|
||||
self.card_state.pop(out_track_id, None)
|
||||
|
||||
async def _mark_card_resolved(
|
||||
self,
|
||||
out_track_id: str,
|
||||
action_title: str,
|
||||
*,
|
||||
node_title: str = '',
|
||||
form_content: str = '',
|
||||
) -> None:
|
||||
"""Update the form card to acknowledge the user's selection.
|
||||
|
||||
Keeps the original prompt visible, adds a "已选择: X" notice, and
|
||||
clears the buttons. The card stays as a permanent record of the
|
||||
choice; the resumed workflow's output goes to a separate new card.
|
||||
"""
|
||||
parts: list[str] = []
|
||||
if node_title:
|
||||
parts.append(f'**{node_title}**')
|
||||
if form_content:
|
||||
parts.append(form_content)
|
||||
parts.append(f'---\n✅ 已选择:**{action_title}**')
|
||||
content = '\n\n'.join(parts)
|
||||
if self.ap is not None:
|
||||
self.ap.logger.info(f'DingTalk _mark_card_resolved: out_track_id={out_track_id} action={action_title!r}')
|
||||
try:
|
||||
await self.bot.update_card_data(
|
||||
out_track_id=out_track_id,
|
||||
card_param_map={
|
||||
'content': content,
|
||||
'btns': '[]',
|
||||
'flowStatus': '3',
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
if self.ap is not None:
|
||||
self.ap.logger.exception('DingTalk: mark card resolved failed')
|
||||
|
||||
@@ -103,6 +103,18 @@ spec:
|
||||
type: string
|
||||
required: true
|
||||
default: "填写你的卡片template_id"
|
||||
- name: human_input_card_template_id
|
||||
label:
|
||||
en_US: Human Input Card Template ID
|
||||
zh_Hans: 人工输入卡片模板ID
|
||||
zh_Hant: 人工輸入卡片範本ID
|
||||
description:
|
||||
en_US: "Template ID used as the SINGLE card for the whole conversation turn. Streamed LLM text fills the `content` markdown variable; on a Dify human-input pause the `btns` buttonGroup variable is populated so action buttons appear on the SAME card; after the user clicks a button the buttons disappear and resumed streaming continues into the same card. Use the bundled `src/langbot/templates/dingtalk_human_input_card.json` — it ships with `content` (MarkdownBlock) and `btns` (ButtonGroup) already wired. Leave empty to fall back to the legacy two-card behaviour (chat card streaming text + plain-text human-input prompts)."
|
||||
zh_Hans: "用作整个对话回合**唯一**卡片的模板ID。流式 LLM 文本写入 `content` markdown 变量;Dify 人工输入暂停时同一张卡的 `btns` buttonGroup 变量被填上、按钮浮现;用户点击后按钮消失、恢复的流式内容继续追加到同一张卡。可使用项目附带的 `src/langbot/templates/dingtalk_human_input_card.json`——已经预先连好 `content` (MarkdownBlock) 与 `btns` (ButtonGroup)。留空则降级为旧的双卡行为(聊天卡走流式 + 人工输入走纯文本)。"
|
||||
zh_Hant: "用作整個對話回合**唯一**卡片的範本ID。流式 LLM 文字寫入 `content` markdown 變數;Dify 人工輸入暫停時同一張卡的 `btns` buttonGroup 變數被填上、按鈕浮現;使用者點擊後按鈕消失、恢復的流式內容繼續追加到同一張卡。可使用專案附帶的 `src/langbot/templates/dingtalk_human_input_card.json`——已經預先連好 `content` (MarkdownBlock) 與 `btns` (ButtonGroup)。留空則降級為舊的雙卡行為(聊天卡走流式 + 人工輸入走純文字)。"
|
||||
type: string
|
||||
required: false
|
||||
default: ""
|
||||
execution:
|
||||
python:
|
||||
path: ./dingtalk.py
|
||||
|
||||
@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||||
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
||||
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
|
||||
|
||||
class AESCipher(object):
|
||||
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
|
||||
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
|
||||
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
|
||||
ap: typing.Any = pydantic.Field(exclude=True, default=None)
|
||||
|
||||
bot_account_id: str # 用于在流水线中识别at是否是本bot,直接以bot_name作为标识
|
||||
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
|
||||
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
pending_monitoring_msg: dict[str, str]
|
||||
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
|
||||
reply_to_monitoring_msg: dict[str, tuple[str, float]]
|
||||
reply_message_card_ids: dict[str, str]
|
||||
card_sequence_dict: dict[str, int]
|
||||
# card_id → set of source message ids registered against it (for cleanup)
|
||||
card_id_to_source_ids: dict[str, set[str]]
|
||||
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
|
||||
card_streaming_text: dict[str, str]
|
||||
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
|
||||
card_pre_pause_text: dict[str, str]
|
||||
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
|
||||
card_resume_transitioned: set[str]
|
||||
_MONITORING_MAPPING_TTL = 600 # 10 minutes
|
||||
|
||||
seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识
|
||||
@@ -812,11 +824,134 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
|
||||
asyncio.create_task(on_message(event))
|
||||
|
||||
def schedule_on_app_loop(coro):
|
||||
"""Run a coroutine on the application event loop from sync callbacks."""
|
||||
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
|
||||
|
||||
def sync_on_card_action(event):
|
||||
try:
|
||||
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {})
|
||||
action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
|
||||
# Parse JSON string values (from form action buttons)
|
||||
if isinstance(action_value_raw, str):
|
||||
try:
|
||||
action_value_obj = json.loads(action_value_raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
action_value_obj = {}
|
||||
else:
|
||||
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
|
||||
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
|
||||
|
||||
# Handle Dify form action button clicks
|
||||
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
|
||||
form_token = action_value_obj.get('form_token', '')
|
||||
workflow_run_id = action_value_obj.get('workflow_run_id', '')
|
||||
action_id = action_value_obj.get('action_id', '')
|
||||
session_key = action_value_obj.get('session_key', '')
|
||||
|
||||
if session_key.startswith('group_') or session_key.startswith('g:'):
|
||||
launcher_type = provider_session.LauncherTypes.GROUP
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('g:')
|
||||
else session_key[len('group_') :]
|
||||
)
|
||||
else:
|
||||
launcher_type = provider_session.LauncherTypes.PERSON
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('p:')
|
||||
else session_key[len('person_') :]
|
||||
)
|
||||
|
||||
# Find the bot entity to get bot_uuid and pipeline_uuid
|
||||
bot_uuid = ''
|
||||
pipeline_uuid = None
|
||||
for bot in self.ap.platform_mgr.bots:
|
||||
if bot.adapter is self:
|
||||
bot_uuid = bot.bot_entity.uuid
|
||||
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
|
||||
break
|
||||
|
||||
form_action_data = {
|
||||
'form_token': form_token,
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'action_id': action_id,
|
||||
'user': f'{launcher_type.value}_{launcher_id}',
|
||||
'inputs': {},
|
||||
}
|
||||
|
||||
context = getattr(event.event, 'context', None)
|
||||
open_message_id = getattr(context, 'open_message_id', None)
|
||||
source_time = datetime.datetime.now()
|
||||
event_time = source_time.timestamp()
|
||||
action_text = action_value_obj.get('action_id', 'confirm')
|
||||
message_chain = platform_message.MessageChain(
|
||||
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
|
||||
)
|
||||
if open_message_id:
|
||||
message_chain.insert(
|
||||
0,
|
||||
platform_message.Source(
|
||||
id=open_message_id,
|
||||
time=source_time,
|
||||
),
|
||||
)
|
||||
|
||||
operator = getattr(event.event, 'operator', None)
|
||||
user_id = (
|
||||
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
|
||||
)
|
||||
|
||||
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||
synthetic_event = platform_events.GroupMessage(
|
||||
sender=platform_entities.GroupMember(
|
||||
id=user_id,
|
||||
member_name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
group=platform_entities.Group(
|
||||
id=launcher_id,
|
||||
name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
),
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=event_time,
|
||||
source_platform_object=event,
|
||||
)
|
||||
else:
|
||||
synthetic_event = platform_events.FriendMessage(
|
||||
sender=platform_entities.Friend(
|
||||
id=user_id,
|
||||
nickname='',
|
||||
remark='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=event_time,
|
||||
source_platform_object=event,
|
||||
)
|
||||
|
||||
async def add_form_action_query():
|
||||
await self.ap.query_pool.add_query(
|
||||
bot_uuid=bot_uuid,
|
||||
launcher_type=launcher_type,
|
||||
launcher_id=launcher_id,
|
||||
sender_id=user_id,
|
||||
message_event=synthetic_event,
|
||||
message_chain=message_chain,
|
||||
adapter=self,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
variables={
|
||||
'_dify_form_action': form_action_data,
|
||||
'_routed_by_rule': True,
|
||||
},
|
||||
)
|
||||
|
||||
schedule_on_app_loop(add_form_action_query())
|
||||
|
||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||
|
||||
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
|
||||
|
||||
if action_value == '有帮助':
|
||||
feedback_type = 1
|
||||
elif action_value == '无帮助':
|
||||
@@ -857,17 +992,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
)
|
||||
|
||||
if platform_events.FeedbackEvent in self.listeners:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||
else:
|
||||
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||
schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||
|
||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||
|
||||
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
|
||||
except Exception:
|
||||
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
|
||||
traceback.print_exc()
|
||||
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
|
||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||
|
||||
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
|
||||
@@ -893,6 +1025,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
card_id_dict={},
|
||||
pending_monitoring_msg={},
|
||||
reply_to_monitoring_msg={},
|
||||
reply_message_card_ids={},
|
||||
card_sequence_dict={},
|
||||
card_id_to_source_ids={},
|
||||
card_streaming_text={},
|
||||
card_pre_pause_text={},
|
||||
card_resume_transitioned=set(),
|
||||
seq=1,
|
||||
listeners={},
|
||||
quart_app=quart_app,
|
||||
@@ -1132,6 +1270,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
for k in expired:
|
||||
del self.reply_to_monitoring_msg[k]
|
||||
|
||||
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
|
||||
"""Return the next strictly increasing sequence for a card update."""
|
||||
current = self.card_sequence_dict.get(card_id, 0)
|
||||
next_seq = max(current + 1, suggested)
|
||||
self.card_sequence_dict[card_id] = next_seq
|
||||
return next_seq
|
||||
|
||||
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
|
||||
"""Register a card_id under one or more source message ids."""
|
||||
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
|
||||
for sid in source_ids:
|
||||
if not sid:
|
||||
continue
|
||||
self.reply_message_card_ids[sid] = card_id
|
||||
bucket.add(sid)
|
||||
|
||||
def _drop_card_state(self, card_id: str) -> None:
|
||||
"""Pop all per-card state for the given card_id."""
|
||||
if not card_id:
|
||||
return
|
||||
for sid in self.card_id_to_source_ids.pop(card_id, set()):
|
||||
self.reply_message_card_ids.pop(sid, None)
|
||||
self.card_sequence_dict.pop(card_id, None)
|
||||
self.card_streaming_text.pop(card_id, None)
|
||||
self.card_pre_pause_text.pop(card_id, None)
|
||||
self.card_resume_transitioned.discard(card_id)
|
||||
|
||||
async def create_card_id(self, message_id):
|
||||
try:
|
||||
# self.logger.debug('飞书支持stream输出,创建卡片......')
|
||||
@@ -1327,6 +1492,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
self.card_id_dict[message_id] = response.data.card_id
|
||||
|
||||
card_id = response.data.card_id
|
||||
self.card_sequence_dict[card_id] = 0
|
||||
return card_id
|
||||
|
||||
except Exception as e:
|
||||
@@ -1339,6 +1505,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
"""
|
||||
# message_id = event.message_chain.message_id
|
||||
|
||||
source_message_id = str(event.message_chain.message_id)
|
||||
existing_card_id = self.reply_message_card_ids.get(source_message_id)
|
||||
if existing_card_id:
|
||||
self.card_id_dict[message_id] = existing_card_id
|
||||
return True
|
||||
|
||||
card_id = await self.create_card_id(message_id)
|
||||
content = {
|
||||
'type': 'card',
|
||||
@@ -1377,6 +1549,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
user_msg_id = event.message_chain.message_id
|
||||
reply_msg_id = getattr(response.data, 'message_id', None)
|
||||
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
|
||||
# Register the card under both the user-incoming msg id (so a
|
||||
# second reply_message_first_chunk for the same user message
|
||||
# reuses this card) AND the bot-reply msg id (so a synthetic
|
||||
# event from a form-button callback — whose Source.id equals
|
||||
# the bot's card message id — hits the same card and renders
|
||||
# the resume content into it).
|
||||
if reply_msg_id:
|
||||
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
|
||||
else:
|
||||
self._register_card_for_source(card_id, str(user_msg_id))
|
||||
if reply_msg_id and monitoring_msg_id:
|
||||
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
|
||||
self._cleanup_monitoring_mapping()
|
||||
@@ -1385,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
return True
|
||||
|
||||
async def _open_new_form_card(
|
||||
self,
|
||||
message_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
) -> str | None:
|
||||
"""Spawn a fresh card to host a re-paused human-input prompt.
|
||||
|
||||
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
|
||||
replies it to the current incoming message so it appears as the next
|
||||
step in the chat, registers the new reply_msg_id so subsequent button
|
||||
callbacks resolve back to it, and renders the prompt + buttons on it.
|
||||
|
||||
Returns the new card_id, or ``None`` if creation failed (caller is
|
||||
responsible for falling back to in-place update so the workflow
|
||||
remains continuable).
|
||||
"""
|
||||
source_message_id = getattr(message_source.message_chain, 'message_id', None)
|
||||
if not source_message_id:
|
||||
await self.logger.error('Cannot open new form card: source message_id missing')
|
||||
return None
|
||||
|
||||
try:
|
||||
new_card_id = await self.create_card_id(message_id)
|
||||
except Exception:
|
||||
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
|
||||
return None
|
||||
|
||||
tenant_key = (
|
||||
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
|
||||
)
|
||||
app_access_token = self.get_app_access_token()
|
||||
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||
req_opt: RequestOption = (
|
||||
RequestOption.builder()
|
||||
.app_ticket(self.app_ticket)
|
||||
.tenant_key(tenant_key)
|
||||
.app_access_token(app_access_token)
|
||||
.tenant_access_token(tenant_access_token)
|
||||
.build()
|
||||
)
|
||||
|
||||
content = {
|
||||
'type': 'card',
|
||||
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
|
||||
}
|
||||
request: ReplyMessageRequest = (
|
||||
ReplyMessageRequest.builder()
|
||||
.message_id(str(source_message_id))
|
||||
.request_body(
|
||||
ReplyMessageRequestBody.builder()
|
||||
.content(json.dumps(content))
|
||||
.msg_type('interactive')
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
|
||||
try:
|
||||
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
|
||||
except Exception:
|
||||
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
|
||||
return None
|
||||
|
||||
if not response.success():
|
||||
await self.logger.error(
|
||||
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
|
||||
f'log_id={response.get_log_id()}'
|
||||
)
|
||||
return None
|
||||
|
||||
reply_msg_id = getattr(response.data, 'message_id', None)
|
||||
if reply_msg_id:
|
||||
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
|
||||
|
||||
sequence = self._next_card_sequence(new_card_id, 1)
|
||||
await self._update_card_layout(
|
||||
card_id=new_card_id,
|
||||
message_source=message_source,
|
||||
text_message='',
|
||||
sequence=sequence,
|
||||
form_data=form_data,
|
||||
show_form_prompt=True,
|
||||
)
|
||||
return new_card_id
|
||||
|
||||
async def reply_message(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
@@ -1504,45 +1773,544 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
):
|
||||
"""
|
||||
回复消息变成更新卡片消息
|
||||
|
||||
Supports Dify form-action resume: when the runner yields a chunk with
|
||||
``_resume_from_form=True``, the card transitions from buttons to a
|
||||
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
|
||||
for subsequent resume chunks to stream into.
|
||||
|
||||
When ``_open_new_card=True`` on the final chunk, the existing card is
|
||||
left as-is and the pipeline will create a new card (with fresh form
|
||||
buttons) for the re-pause.
|
||||
"""
|
||||
# self.seq += 1
|
||||
message_id = bot_message.resp_message_id
|
||||
msg_seq = bot_message.msg_sequence
|
||||
if msg_seq % 8 == 0 or is_final:
|
||||
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
|
||||
|
||||
text_message = ''
|
||||
if text_elements:
|
||||
parts = []
|
||||
for paragraph in text_elements:
|
||||
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
|
||||
if para_text:
|
||||
parts.append(para_text)
|
||||
text_message = '\n\n'.join(parts)
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
resume_from = getattr(bot_message, '_resume_from_form', False)
|
||||
action_title = getattr(bot_message, '_resume_action_title', '')
|
||||
resume_node_title = getattr(bot_message, '_resume_node_title', '')
|
||||
open_new_card = getattr(bot_message, '_open_new_card', False)
|
||||
if action_title:
|
||||
if resume_node_title:
|
||||
selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
|
||||
else:
|
||||
selected_notice = f'**已选择**:{action_title}'
|
||||
else:
|
||||
selected_notice = ''
|
||||
|
||||
# content = {
|
||||
# 'type': 'card_json',
|
||||
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}},
|
||||
# }
|
||||
# ── decide whether this chunk needs a card update ────────────────────
|
||||
card_id = self.card_id_dict.get(message_id)
|
||||
if not card_id:
|
||||
return
|
||||
|
||||
request: ContentCardElementRequest = (
|
||||
ContentCardElementRequest.builder()
|
||||
.card_id(self.card_id_dict[message_id])
|
||||
.element_id('streaming_txt')
|
||||
.request_body(
|
||||
ContentCardElementRequestBody.builder()
|
||||
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204")
|
||||
.content(text_message)
|
||||
.sequence(msg_seq)
|
||||
# ── convert message chain → text ─────────────────────────────────────
|
||||
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
|
||||
|
||||
text_message = ''
|
||||
if text_elements:
|
||||
parts = []
|
||||
for paragraph in text_elements:
|
||||
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
|
||||
if para_text:
|
||||
parts.append(para_text)
|
||||
text_message = '\n\n'.join(parts)
|
||||
|
||||
tenant_key = (
|
||||
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
|
||||
)
|
||||
app_access_token = self.get_app_access_token()
|
||||
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||
req_opt: RequestOption = (
|
||||
RequestOption.builder()
|
||||
.app_ticket(self.app_ticket)
|
||||
.tenant_key(tenant_key)
|
||||
.app_access_token(app_access_token)
|
||||
.tenant_access_token(tenant_access_token)
|
||||
.build()
|
||||
)
|
||||
|
||||
card_sequence = self._next_card_sequence(card_id, msg_seq)
|
||||
|
||||
# ── RESUME: first chunk after button click ───────────────────────────
|
||||
if resume_from and card_id not in self.card_resume_transitioned:
|
||||
# Transition the card from the form state into resume mode.
|
||||
# Preserve the text that was shown before the pause, and seed the
|
||||
# resume placeholder with the current resume content if we already
|
||||
# have any on the first yielded chunk.
|
||||
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
|
||||
initial_resume_text = text_message or '\u200b'
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause_text,
|
||||
sequence=card_sequence,
|
||||
form_data=None,
|
||||
notice_text=selected_notice,
|
||||
resume_placeholder_text=initial_resume_text,
|
||||
)
|
||||
self.card_resume_transitioned.add(card_id)
|
||||
self.card_pre_pause_text[card_id] = pre_pause_text
|
||||
self.card_streaming_text[card_id] = text_message
|
||||
if not is_final:
|
||||
return
|
||||
|
||||
# ── RESUME: subsequent chunks → full card update ─────────────────────
|
||||
if resume_from and card_id in self.card_resume_transitioned:
|
||||
cached = self.card_streaming_text.get(card_id, '')
|
||||
if text_message != cached:
|
||||
self.card_streaming_text[card_id] = text_message
|
||||
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause_text,
|
||||
sequence=card_sequence,
|
||||
form_data=None,
|
||||
notice_text=selected_notice,
|
||||
resume_placeholder_text=text_message,
|
||||
)
|
||||
if not is_final:
|
||||
return
|
||||
|
||||
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
|
||||
if not resume_from and (msg_seq % 8 == 0 or is_final):
|
||||
cached = self.card_streaming_text.get(card_id)
|
||||
if text_message != cached:
|
||||
self.card_streaming_text[card_id] = text_message
|
||||
request: ContentCardElementRequest = (
|
||||
ContentCardElementRequest.builder()
|
||||
.card_id(card_id)
|
||||
.element_id('streaming_txt')
|
||||
.request_body(
|
||||
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
|
||||
request, req_opt
|
||||
)
|
||||
if not response.success():
|
||||
raise Exception(
|
||||
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
|
||||
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
|
||||
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
|
||||
)
|
||||
|
||||
# ── FINAL chunk: full card layout update ─────────────────────────────
|
||||
if is_final:
|
||||
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
|
||||
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
|
||||
resume_cached = self.card_streaming_text.get(card_id, '')
|
||||
if form_data:
|
||||
if open_new_card:
|
||||
# The old card has already been laid out into resume mode
|
||||
# by the resume-transition block above (notice + resume
|
||||
# placeholder). Finalise it as a frozen step snapshot and
|
||||
# spawn a brand-new card to host the next human-input
|
||||
# prompt — each step stays visible as its own card in the
|
||||
# chat history.
|
||||
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
|
||||
if new_card_id is None:
|
||||
# Fallback: keep the existing in-place behaviour so the
|
||||
# workflow remains continuable even if creating the
|
||||
# new card failed.
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause,
|
||||
sequence=final_seq,
|
||||
form_data=form_data,
|
||||
resume_placeholder_text=resume_cached,
|
||||
show_form_prompt=True,
|
||||
)
|
||||
self.card_streaming_text.pop(card_id, None)
|
||||
self.card_pre_pause_text.pop(card_id, None)
|
||||
else:
|
||||
# The old card is now a frozen snapshot; let go of its
|
||||
# streaming-side state but keep its source registrations
|
||||
# intact (no _drop_card_state) so historical button
|
||||
# callbacks aimed at it can still be matched if needed.
|
||||
self.card_streaming_text.pop(card_id, None)
|
||||
self.card_pre_pause_text.pop(card_id, None)
|
||||
self.card_resume_transitioned.discard(card_id)
|
||||
else:
|
||||
# Initial pause path: render prompt + buttons in place on
|
||||
# the current card.
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=text_message,
|
||||
sequence=final_seq,
|
||||
form_data=form_data,
|
||||
show_form_prompt=True,
|
||||
)
|
||||
# The human-input prompt itself is rendered as buttons only
|
||||
# on Lark, so do not keep the hidden fallback text around;
|
||||
# otherwise it will resurface after the button click.
|
||||
self.card_streaming_text[card_id] = ''
|
||||
self.card_pre_pause_text[card_id] = ''
|
||||
else:
|
||||
# Normal finish: keep pre-pause + resume content visible,
|
||||
# remove buttons/notice, drop the resume placeholder.
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause,
|
||||
sequence=final_seq,
|
||||
form_data=None,
|
||||
notice_text=selected_notice if resume_from else '',
|
||||
resume_placeholder_text=resume_cached,
|
||||
)
|
||||
self._drop_card_state(card_id)
|
||||
self.card_id_dict.pop(message_id, None)
|
||||
|
||||
# ── media (images / files) appended at the end ───────────────────────
|
||||
if is_final and media_items:
|
||||
for media in media_items:
|
||||
media_request: ReplyMessageRequest = (
|
||||
ReplyMessageRequest.builder()
|
||||
.message_id(message_source.message_chain.message_id)
|
||||
.request_body(
|
||||
ReplyMessageRequestBody.builder()
|
||||
.content(json.dumps(media['content']))
|
||||
.msg_type(media['msg_type'])
|
||||
.reply_in_thread(False)
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
|
||||
media_request, req_opt
|
||||
)
|
||||
if not media_response.success():
|
||||
raise Exception(
|
||||
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
|
||||
)
|
||||
|
||||
async def _add_form_buttons_to_card(
|
||||
self,
|
||||
card_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
text_message: str = '',
|
||||
sequence: int = 1,
|
||||
):
|
||||
"""Update the entire card to include form action buttons.
|
||||
|
||||
Uses card.aupdate to replace the card JSON with a template that
|
||||
includes the streaming text content plus interactive buttons.
|
||||
"""
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=text_message,
|
||||
sequence=sequence,
|
||||
form_data=form_data,
|
||||
)
|
||||
|
||||
async def _remove_form_buttons_from_card(
|
||||
self,
|
||||
card_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
text_message: str = '',
|
||||
sequence: int = 1,
|
||||
):
|
||||
"""Replace the human-input card layout with the plain final layout."""
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=text_message,
|
||||
sequence=sequence,
|
||||
form_data=None,
|
||||
)
|
||||
|
||||
async def _update_card_layout(
|
||||
self,
|
||||
card_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
text_message: str = '',
|
||||
sequence: int = 1,
|
||||
form_data: dict | None = None,
|
||||
notice_text: str = '',
|
||||
resume_placeholder_text: str = '',
|
||||
show_form_prompt: bool = True,
|
||||
):
|
||||
"""Update the entire card layout.
|
||||
|
||||
• form_data → show interactive buttons (initial Dify pause)
|
||||
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
|
||||
• resume_placeholder_text → add a streaming_txt_resume markdown element
|
||||
"""
|
||||
form_data = form_data or {}
|
||||
actions = form_data.get('actions', [])
|
||||
form_token = form_data.get('form_token', '')
|
||||
workflow_run_id = form_data.get('workflow_run_id', '')
|
||||
node_title = form_data.get('node_title', '') or 'Human Input Required'
|
||||
form_content = form_data.get('form_content', '')
|
||||
|
||||
# When form_data is set, the visible content is rendered inside the
|
||||
# interactive container, so the top streaming text should stay empty
|
||||
# to avoid duplicate text above the action area.
|
||||
#
|
||||
# For resume notice state, keep the existing text visible in the card
|
||||
# and only add the grey "selected" notice below it.
|
||||
if form_data:
|
||||
render_text_message = ''
|
||||
else:
|
||||
render_text_message = text_message
|
||||
|
||||
# Determine session key from message source
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
session_key = f'group_{message_source.group.id}'
|
||||
else:
|
||||
session_key = f'person_{message_source.sender.id}'
|
||||
|
||||
# Build button elements matching the existing card template's thumbsup/down format
|
||||
action_buttons = []
|
||||
for action in actions:
|
||||
action_id = action.get('id', '')
|
||||
action_title = action.get('title', action_id)
|
||||
button_style = action.get('button_style', 'default')
|
||||
|
||||
if button_style == 'primary':
|
||||
lark_button_type = 'primary'
|
||||
elif button_style == 'danger':
|
||||
lark_button_type = 'danger'
|
||||
else:
|
||||
lark_button_type = 'default'
|
||||
|
||||
action_buttons.append(
|
||||
{
|
||||
'tag': 'button',
|
||||
'text': {'tag': 'plain_text', 'content': action_title},
|
||||
'type': lark_button_type,
|
||||
'width': 'fill',
|
||||
'size': 'medium',
|
||||
'hover_tips': {'tag': 'plain_text', 'content': action_title},
|
||||
'behaviors': [
|
||||
{
|
||||
'type': 'callback',
|
||||
'value': {
|
||||
'form_action': True,
|
||||
'form_token': form_token,
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'action_id': action_id,
|
||||
'session_key': session_key,
|
||||
},
|
||||
}
|
||||
],
|
||||
'margin': '0px 0px 0px 0px',
|
||||
}
|
||||
)
|
||||
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
# self.seq = 1 # 消息回复结束之后重置seq
|
||||
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片
|
||||
interactive_elements = []
|
||||
if form_data:
|
||||
if show_form_prompt:
|
||||
interactive_elements = [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': f'**[Human Input Required] {node_title}**',
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 4px 0px',
|
||||
}
|
||||
]
|
||||
if form_content:
|
||||
interactive_elements.append(
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': form_content,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 8px 0px',
|
||||
}
|
||||
)
|
||||
interactive_elements.append(
|
||||
{
|
||||
'tag': 'column_set',
|
||||
'horizontal_spacing': '8px',
|
||||
'horizontal_align': 'left',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'columns': [
|
||||
{
|
||||
'tag': 'column',
|
||||
'width': 'weighted',
|
||||
'elements': [btn],
|
||||
'padding': '0px 0px 0px 0px',
|
||||
}
|
||||
for btn in action_buttons
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
# Build the full card JSON with buttons, same structure as create_card_id
|
||||
# ── mid_section: either form buttons, resume notice, or empty ──
|
||||
mid_section_elements = []
|
||||
if form_data:
|
||||
mid_section_elements = [
|
||||
{
|
||||
'tag': 'interactive_container',
|
||||
'margin': '12px 0px 8px 0px',
|
||||
'padding': '12px 12px 12px 12px',
|
||||
'has_border': True,
|
||||
'elements': interactive_elements,
|
||||
},
|
||||
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
|
||||
]
|
||||
elif notice_text:
|
||||
mid_section_elements = [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': notice_text,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '8px 0px 4px 0px',
|
||||
'text_color': 'grey',
|
||||
},
|
||||
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
|
||||
]
|
||||
|
||||
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
|
||||
resume_elements = []
|
||||
if resume_placeholder_text:
|
||||
resume_elements = [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': resume_placeholder_text,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'element_id': 'streaming_txt_resume',
|
||||
},
|
||||
]
|
||||
|
||||
card_data = {
|
||||
'schema': '2.0',
|
||||
'config': {
|
||||
'update_multi': True,
|
||||
'streaming_mode': False,
|
||||
},
|
||||
'body': {
|
||||
'direction': 'vertical',
|
||||
'padding': '12px 12px 12px 12px',
|
||||
'elements': [
|
||||
{
|
||||
'tag': 'div',
|
||||
'text': {
|
||||
'tag': 'plain_text',
|
||||
'content': 'LangBot',
|
||||
'text_size': 'normal',
|
||||
'text_align': 'left',
|
||||
'text_color': 'default',
|
||||
},
|
||||
'icon': {
|
||||
'tag': 'custom_icon',
|
||||
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
|
||||
},
|
||||
},
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': render_text_message,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'element_id': 'streaming_txt',
|
||||
},
|
||||
*mid_section_elements,
|
||||
*resume_elements,
|
||||
{
|
||||
'tag': 'column_set',
|
||||
'horizontal_spacing': '12px',
|
||||
'horizontal_align': 'right',
|
||||
'columns': [
|
||||
{
|
||||
'tag': 'column',
|
||||
'width': 'weighted',
|
||||
'elements': [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
|
||||
'text_align': 'left',
|
||||
'text_size': 'notation',
|
||||
'margin': '4px 0px 0px 0px',
|
||||
'icon': {
|
||||
'tag': 'standard_icon',
|
||||
'token': 'robot_outlined',
|
||||
'color': 'grey',
|
||||
},
|
||||
}
|
||||
],
|
||||
'padding': '0px 0px 0px 0px',
|
||||
'direction': 'vertical',
|
||||
'horizontal_spacing': '8px',
|
||||
'vertical_spacing': '8px',
|
||||
'horizontal_align': 'left',
|
||||
'vertical_align': 'top',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'weight': 1,
|
||||
},
|
||||
*(
|
||||
[]
|
||||
if form_data
|
||||
else [
|
||||
{
|
||||
'tag': 'column',
|
||||
'width': '20px',
|
||||
'elements': [
|
||||
{
|
||||
'tag': 'button',
|
||||
'text': {'tag': 'plain_text', 'content': ''},
|
||||
'type': 'text',
|
||||
'width': 'fill',
|
||||
'size': 'medium',
|
||||
'icon': {'tag': 'standard_icon', 'token': 'thumbsup_outlined'},
|
||||
'hover_tips': {'tag': 'plain_text', 'content': '有帮助'},
|
||||
'behaviors': [{'type': 'callback', 'value': {'feedback': '有帮助'}}],
|
||||
'margin': '0px 0px 0px 0px',
|
||||
}
|
||||
],
|
||||
'padding': '0px 0px 0px 0px',
|
||||
'direction': 'vertical',
|
||||
'horizontal_spacing': '8px',
|
||||
'vertical_spacing': '8px',
|
||||
'horizontal_align': 'left',
|
||||
'vertical_align': 'top',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
},
|
||||
{
|
||||
'tag': 'column',
|
||||
'width': '30px',
|
||||
'elements': [
|
||||
{
|
||||
'tag': 'button',
|
||||
'text': {'tag': 'plain_text', 'content': ''},
|
||||
'type': 'text',
|
||||
'width': 'default',
|
||||
'size': 'medium',
|
||||
'icon': {'tag': 'standard_icon', 'token': 'thumbdown_outlined'},
|
||||
'hover_tips': {'tag': 'plain_text', 'content': '无帮助'},
|
||||
'behaviors': [{'type': 'callback', 'value': {'feedback': '无帮助'}}],
|
||||
'margin': '0px 0px 0px 0px',
|
||||
}
|
||||
],
|
||||
'padding': '0px 0px 0px 0px',
|
||||
'vertical_spacing': '8px',
|
||||
'horizontal_align': 'left',
|
||||
'vertical_align': 'top',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
},
|
||||
]
|
||||
),
|
||||
],
|
||||
'margin': '0px 0px 4px 0px',
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
try:
|
||||
tenant_key = (
|
||||
message_source.source_platform_object.header.tenant_key
|
||||
if message_source.source_platform_object
|
||||
@@ -1558,39 +2326,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
.tenant_access_token(tenant_access_token)
|
||||
.build()
|
||||
)
|
||||
# 发起请求
|
||||
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
|
||||
|
||||
# 处理失败返回
|
||||
if not response.success():
|
||||
raise Exception(
|
||||
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
|
||||
request: UpdateCardRequest = (
|
||||
UpdateCardRequest.builder()
|
||||
.card_id(card_id)
|
||||
.request_body(
|
||||
UpdateCardRequestBody.builder()
|
||||
.sequence(sequence)
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
|
||||
.build()
|
||||
)
|
||||
return
|
||||
|
||||
# Send media messages when streaming is done
|
||||
if is_final and media_items:
|
||||
for media in media_items:
|
||||
media_request: ReplyMessageRequest = (
|
||||
ReplyMessageRequest.builder()
|
||||
.message_id(message_source.message_chain.message_id)
|
||||
.request_body(
|
||||
ReplyMessageRequestBody.builder()
|
||||
.content(json.dumps(media['content']))
|
||||
.msg_type(media['msg_type'])
|
||||
.reply_in_thread(False)
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
|
||||
media_request, req_opt
|
||||
)
|
||||
if not media_response.success():
|
||||
raise Exception(
|
||||
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
|
||||
)
|
||||
.build()
|
||||
)
|
||||
response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
|
||||
if not response.success():
|
||||
await self.logger.error(
|
||||
f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
|
||||
f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
|
||||
)
|
||||
except Exception:
|
||||
await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
|
||||
|
||||
async def is_muted(self, group_id: int) -> bool:
|
||||
return False
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
from __future__ import annotations
|
||||
import time
|
||||
|
||||
|
||||
import telegram
|
||||
import telegram.ext
|
||||
from telegram import Update
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
|
||||
import telegramify_markdown
|
||||
import typing
|
||||
import traceback
|
||||
import json
|
||||
import base64
|
||||
import pydantic
|
||||
|
||||
@@ -189,6 +189,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
|
||||
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
bot: telegram.Bot = pydantic.Field(exclude=True)
|
||||
application: telegram.ext.Application = pydantic.Field(exclude=True)
|
||||
ap: typing.Any = pydantic.Field(exclude=True, default=None)
|
||||
|
||||
message_converter: TelegramMessageConverter = TelegramMessageConverter()
|
||||
event_converter: TelegramEventConverter = TelegramEventConverter()
|
||||
@@ -224,6 +225,102 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
telegram_callback,
|
||||
)
|
||||
)
|
||||
|
||||
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
query = update.callback_query
|
||||
await query.answer()
|
||||
try:
|
||||
data = json.loads(query.data)
|
||||
if data.get('form_action') or data.get('f'):
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
|
||||
workflow_run_id = data.get('workflow_run_id', '')
|
||||
w_suffix = data.get('w', '')
|
||||
action_id = data.get('action_id') or data.get('a', '')
|
||||
session_key = data.get('session_key') or data.get('s', '')
|
||||
|
||||
if session_key.startswith('group_') or session_key.startswith('g:'):
|
||||
launcher_type = provider_session.LauncherTypes.GROUP
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('g:')
|
||||
else session_key[len('group_') :]
|
||||
)
|
||||
else:
|
||||
launcher_type = provider_session.LauncherTypes.PERSON
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('p:')
|
||||
else session_key[len('person_') :]
|
||||
)
|
||||
|
||||
user_id = str(query.from_user.id)
|
||||
|
||||
# Find bot_uuid and pipeline_uuid
|
||||
bot_uuid = ''
|
||||
pipeline_uuid = None
|
||||
for b in self.ap.platform_mgr.bots:
|
||||
if b.adapter is self:
|
||||
bot_uuid = b.bot_entity.uuid
|
||||
pipeline_uuid = b.bot_entity.use_pipeline_uuid
|
||||
break
|
||||
|
||||
form_action_data = {
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'w_suffix': w_suffix,
|
||||
'action_id': action_id,
|
||||
'user': f'{launcher_type.value}_{launcher_id}',
|
||||
'inputs': {},
|
||||
}
|
||||
|
||||
message_chain = platform_message.MessageChain(
|
||||
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
|
||||
)
|
||||
|
||||
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||
synthetic_event = platform_events.GroupMessage(
|
||||
sender=platform_entities.GroupMember(
|
||||
id=user_id,
|
||||
member_name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
group=platform_entities.Group(
|
||||
id=launcher_id,
|
||||
name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
),
|
||||
),
|
||||
message_chain=message_chain,
|
||||
source_platform_object=update,
|
||||
)
|
||||
else:
|
||||
synthetic_event = platform_events.FriendMessage(
|
||||
sender=platform_entities.Friend(
|
||||
id=user_id,
|
||||
nickname='',
|
||||
remark='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
source_platform_object=update,
|
||||
)
|
||||
|
||||
await self.ap.query_pool.add_query(
|
||||
bot_uuid=bot_uuid,
|
||||
launcher_type=launcher_type,
|
||||
launcher_id=launcher_id,
|
||||
sender_id=user_id,
|
||||
message_event=synthetic_event,
|
||||
message_chain=message_chain,
|
||||
adapter=self,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
variables={
|
||||
'_dify_form_action': form_action_data,
|
||||
'_routed_by_rule': True,
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
|
||||
|
||||
application.add_handler(CallbackQueryHandler(callback_query_handler))
|
||||
super().__init__(
|
||||
config=config,
|
||||
logger=logger,
|
||||
@@ -319,14 +416,19 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
update = event.source_platform_object
|
||||
chat_id = update.effective_chat.id
|
||||
chat_type = update.effective_chat.type
|
||||
message_thread_id = update.message.message_thread_id
|
||||
effective_message = update.effective_message
|
||||
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||
|
||||
if chat_type == 'private':
|
||||
draft_id = int(time.time() * 1000)
|
||||
self.msg_stream_id[message_id] = ('private', draft_id)
|
||||
import time as _time
|
||||
|
||||
draft_id = int(_time.time() * 1000)
|
||||
self.msg_stream_id[message_id] = ('private', draft_id)
|
||||
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
|
||||
await self.bot.send_message_draft(**args)
|
||||
try:
|
||||
await self.bot.send_message_draft(**args)
|
||||
except (telegram.error.RetryAfter, telegram.error.BadRequest):
|
||||
pass
|
||||
else:
|
||||
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
|
||||
send_msg = await self.bot.send_message(**args)
|
||||
@@ -347,12 +449,13 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
assert isinstance(message_source.source_platform_object, Update)
|
||||
update = message_source.source_platform_object
|
||||
chat_id = update.effective_chat.id
|
||||
message_thread_id = update.message.message_thread_id
|
||||
effective_message = update.effective_message
|
||||
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||
|
||||
if message_id not in self.msg_stream_id:
|
||||
return
|
||||
|
||||
chat_mode, draft_id = self.msg_stream_id[message_id]
|
||||
chat_mode, stream_id = self.msg_stream_id[message_id]
|
||||
components = await TelegramMessageConverter.yiri2target(message, self.bot)
|
||||
|
||||
if not components or components[0]['type'] != 'text':
|
||||
@@ -361,16 +464,42 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
return
|
||||
|
||||
content = components[0]['text']
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
|
||||
if form_data and is_final:
|
||||
self.msg_stream_id.pop(message_id, None)
|
||||
await self._send_form_action_buttons(message_source, form_data)
|
||||
return
|
||||
|
||||
if chat_mode == 'private':
|
||||
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id)
|
||||
await self.bot.send_message_draft(**args)
|
||||
# Streaming via draft (ephemeral preview in the chat input area)
|
||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=stream_id)
|
||||
try:
|
||||
await self.bot.send_message_draft(**args)
|
||||
except telegram.error.BadRequest as exc:
|
||||
if 'Message_too_long' in str(exc):
|
||||
args['text'] = content[:4000] + '\n\n… (truncated)'
|
||||
try:
|
||||
await self.bot.send_message_draft(**args)
|
||||
except telegram.error.RetryAfter:
|
||||
pass
|
||||
else:
|
||||
pass # Ignore other draft errors (cosmetic)
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
del args['draft_id']
|
||||
await self.bot.send_message(**args)
|
||||
# Finalise: send the real message, discard the draft
|
||||
args = self._build_message_args(chat_id, content, message_thread_id)
|
||||
try:
|
||||
await self.bot.send_message(**args)
|
||||
except telegram.error.BadRequest as exc:
|
||||
if 'Message_too_long' in str(exc):
|
||||
args['text'] = content[:4000] + '\n\n… (truncated)'
|
||||
await self.bot.send_message(**args)
|
||||
else:
|
||||
raise
|
||||
self.msg_stream_id.pop(message_id)
|
||||
else:
|
||||
stream_id = draft_id
|
||||
# Streaming via edit_message_text (persistent message)
|
||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||
args = {
|
||||
'message_id': stream_id,
|
||||
@@ -379,11 +508,68 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
}
|
||||
if self.config.get('markdown_card', False):
|
||||
args['parse_mode'] = 'MarkdownV2'
|
||||
await self.bot.edit_message_text(**args)
|
||||
try:
|
||||
await self.bot.edit_message_text(**args)
|
||||
except telegram.error.BadRequest as exc:
|
||||
if 'Message_too_long' in str(exc):
|
||||
args['text'] = self._process_markdown(content[:4000] + '\n\n… (truncated)')
|
||||
await self.bot.edit_message_text(**args)
|
||||
else:
|
||||
raise
|
||||
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
self.msg_stream_id.pop(message_id)
|
||||
|
||||
async def _send_form_action_buttons(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
):
|
||||
"""Send inline keyboard buttons for Dify human_input_required form actions."""
|
||||
actions = form_data.get('actions', [])
|
||||
node_title = form_data.get('node_title', '')
|
||||
form_content = form_data.get('form_content', '')
|
||||
workflow_run_id = form_data.get('workflow_run_id', '')
|
||||
# Telegram callback_data is capped at 64 bytes, so we identify the
|
||||
# paused workflow by the last 8 chars of workflow_run_id (unique
|
||||
# within a session with overwhelming probability).
|
||||
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
|
||||
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
session_key = f'g:{message_source.group.id}'
|
||||
else:
|
||||
session_key = f'p:{message_source.sender.id}'
|
||||
|
||||
keyboard = []
|
||||
for action in actions:
|
||||
action_id = action.get('id', '')
|
||||
action_title = action.get('title', action_id)
|
||||
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
|
||||
if w_suffix:
|
||||
callback_payload['w'] = w_suffix
|
||||
callback_data = json.dumps(callback_payload, separators=(',', ':'))
|
||||
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
|
||||
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
|
||||
update = message_source.source_platform_object
|
||||
chat_id = update.effective_chat.id
|
||||
effective_message = update.effective_message
|
||||
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||
|
||||
text_lines = [f'[{node_title}] Please select an action:']
|
||||
if form_content:
|
||||
text_lines.insert(0, form_content)
|
||||
args = {
|
||||
'chat_id': chat_id,
|
||||
'text': '\n\n'.join(text_lines),
|
||||
'reply_markup': reply_markup,
|
||||
}
|
||||
if message_thread_id:
|
||||
args['message_thread_id'] = message_thread_id
|
||||
|
||||
await self.bot.send_message(**args)
|
||||
|
||||
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
|
||||
if not isinstance(event.source_platform_object, Update):
|
||||
return None
|
||||
|
||||
@@ -296,6 +296,7 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
listeners: dict = {}
|
||||
_stream_to_monitoring_msg: dict = {} # Maps stream_id to (monitoring_message_id, timestamp)
|
||||
_STREAM_MAPPING_TTL = 600 # 10 minutes
|
||||
ap: typing.Any = None
|
||||
|
||||
def __init__(self, config: dict, logger: EventLogger):
|
||||
enable_webhook = config.get('enable-webhook', False)
|
||||
@@ -336,6 +337,12 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
_stream_to_monitoring_msg={},
|
||||
)
|
||||
|
||||
# Both WecomBotClient (webhook) and WecomBotWsClient (ws long-conn)
|
||||
# expose ``set_card_action_callback``. Wire the click handler so
|
||||
# Dify human-input button taps resume the workflow on either mode.
|
||||
if hasattr(self.bot, 'set_card_action_callback'):
|
||||
self.bot.set_card_action_callback(self._on_card_action)
|
||||
|
||||
async def reply_message(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
@@ -345,15 +352,37 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
content = await self.message_converter.yiri2target(message)
|
||||
_ws_mode = not self.config.get('enable-webhook', False)
|
||||
|
||||
event = message_source.source_platform_object
|
||||
# Synthetic events (button-click resume queries) have no inbound
|
||||
# platform object. Fall back to a proactive send so error
|
||||
# messages and one-shot replies still reach the user.
|
||||
if event is None:
|
||||
if _ws_mode:
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
chat_id = str(message_source.group.id)
|
||||
else:
|
||||
chat_id = str(message_source.sender.id)
|
||||
try:
|
||||
await self.bot.send_message(chat_id, content)
|
||||
except Exception:
|
||||
await self.logger.error(
|
||||
f'WeComBot: proactive reply for synthetic event failed: {traceback.format_exc()}'
|
||||
)
|
||||
else:
|
||||
await self.logger.warning(
|
||||
'WeComBot webhook mode cannot reply to a synthetic event '
|
||||
'(no req_id and no proactive-send credentials); dropping.'
|
||||
)
|
||||
return
|
||||
|
||||
if _ws_mode:
|
||||
event = message_source.source_platform_object
|
||||
req_id = event.get('req_id', '')
|
||||
req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '')
|
||||
if req_id:
|
||||
await self.bot.reply_text(req_id, content)
|
||||
else:
|
||||
await self.bot.set_message(event.message_id, content)
|
||||
else:
|
||||
await self.bot.set_message(message_source.source_platform_object.message_id, content)
|
||||
await self.bot.set_message(event.message_id, content)
|
||||
|
||||
async def reply_message_chunk(
|
||||
self,
|
||||
@@ -364,9 +393,56 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
is_final: bool = False,
|
||||
):
|
||||
content = await self.message_converter.yiri2target(message)
|
||||
msg_id = message_source.source_platform_object.message_id
|
||||
_ws_mode = not self.config.get('enable-webhook', False)
|
||||
|
||||
# Synthetic events (e.g. button-click triggered form resume) have
|
||||
# no inbound platform message — no msg_id, no req_id, no stream
|
||||
# session. The output must go via the proactive-send path instead
|
||||
# of the stream/reply path.
|
||||
spo = message_source.source_platform_object
|
||||
if spo is None:
|
||||
return await self._handle_synthetic_chunk(message_source, bot_message, content, is_final, _ws_mode)
|
||||
|
||||
msg_id = spo.message_id
|
||||
|
||||
# Dify human-input pause: when the runner attaches `_form_data` to
|
||||
# the final chunk, hand the button_interaction card off to the
|
||||
# underlying client. In webhook mode the card is queued for the
|
||||
# next followup poll; in ws mode it's sent as a reply frame
|
||||
# immediately. Falls back to plain text when the bot has no active
|
||||
# stream session for this msg_id (rare).
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
if form_data and is_final:
|
||||
if hasattr(self.bot, 'push_form_pause'):
|
||||
ok, stream_id, task_id = await self.bot.push_form_pause(msg_id, form_data)
|
||||
if ok:
|
||||
await self.logger.info(
|
||||
f'WeComBot: pending button_interaction registered '
|
||||
f'stream_id={stream_id} task_id={task_id} ws_mode={_ws_mode}'
|
||||
)
|
||||
return {'stream': True, 'form': True, 'task_id': task_id}
|
||||
await self.logger.warning(
|
||||
'WeComBot: cannot register form pause (no active stream session); falling back to plain text'
|
||||
)
|
||||
try:
|
||||
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
|
||||
|
||||
fallback = _format_human_input_text(
|
||||
form_data.get('node_title', ''),
|
||||
form_data.get('form_content', ''),
|
||||
form_data.get('actions', []) or [],
|
||||
)
|
||||
except Exception:
|
||||
fallback = content or '(人工输入)'
|
||||
if _ws_mode:
|
||||
event = message_source.source_platform_object
|
||||
req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '')
|
||||
if req_id:
|
||||
await self.bot.reply_text(req_id, fallback)
|
||||
else:
|
||||
await self.bot.set_message(msg_id, fallback)
|
||||
return {'stream': False, 'form': True, 'fallback': True}
|
||||
|
||||
if _ws_mode:
|
||||
success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final)
|
||||
if not success and is_final:
|
||||
@@ -385,6 +461,129 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
"""Whether streaming output is enabled for this bot instance."""
|
||||
return self.config.get('enable-stream-reply', True)
|
||||
|
||||
async def _handle_synthetic_chunk(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
bot_message,
|
||||
content: str,
|
||||
is_final: bool,
|
||||
ws_mode: bool,
|
||||
) -> dict:
|
||||
"""Handle reply_message_chunk for synthetic events (button clicks).
|
||||
|
||||
Synthetic events have no inbound message → no msg_id, no req_id,
|
||||
no stream session. We can't do incremental streaming, so we
|
||||
buffer chunks per-conversation and flush on ``is_final`` via the
|
||||
proactive send path.
|
||||
|
||||
Buffer keyed by ``(launcher_type, launcher_id)`` from the
|
||||
synthetic event itself. Only ws mode has a usable proactive-send
|
||||
path right now (``ws_client.send_message`` /
|
||||
``ws_client.send_template_card``); webhook mode requires a
|
||||
corpid/secret we don't have, so it logs and drops.
|
||||
"""
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
chat_id = str(message_source.group.id)
|
||||
else:
|
||||
chat_id = str(message_source.sender.id)
|
||||
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
|
||||
# Buffer streaming content until is_final.
|
||||
buf_key = chat_id
|
||||
if not hasattr(self, '_synthetic_buffers'):
|
||||
# Attribute-not-declared trick: pydantic forbids dynamic attrs
|
||||
# on the model, but plain instance dicts via object.__setattr__
|
||||
# do work. Lazy-create on first call.
|
||||
object.__setattr__(self, '_synthetic_buffers', {})
|
||||
buffers: dict[str, str] = self._synthetic_buffers
|
||||
if content and not form_data:
|
||||
buffers[buf_key] = buffers.get(buf_key, '') + content
|
||||
|
||||
if not is_final:
|
||||
return {'stream': True, 'synthetic': True, 'buffered': True}
|
||||
|
||||
final_content = buffers.pop(buf_key, '')
|
||||
if content and final_content.startswith(content):
|
||||
# is_final chunk re-emitted the full accumulated text — keep
|
||||
# whichever is longer.
|
||||
final_content = final_content if len(final_content) >= len(content) else content
|
||||
elif content and not final_content:
|
||||
final_content = content
|
||||
|
||||
if not ws_mode:
|
||||
await self.logger.warning(
|
||||
'WeComBot webhook mode cannot proactively push synthetic-event '
|
||||
'output (no corpid/secret); the resume reply is dropped. '
|
||||
f'content_len={len(final_content)} form_data_present={form_data is not None}'
|
||||
)
|
||||
return {'stream': False, 'synthetic': True, 'dropped': True}
|
||||
|
||||
# ws mode: proactive send.
|
||||
try:
|
||||
if form_data:
|
||||
# Determine user_id / chat_id for the routing context of any
|
||||
# subsequent click on this card.
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
routing_chat_id = str(message_source.group.id)
|
||||
routing_user_id = str(message_source.sender.id)
|
||||
else:
|
||||
routing_chat_id = ''
|
||||
routing_user_id = str(message_source.sender.id)
|
||||
payload = self._build_button_interaction_payload_from_form(
|
||||
form_data,
|
||||
user_id=routing_user_id,
|
||||
chat_id=routing_chat_id,
|
||||
)
|
||||
await self.bot.send_template_card(chat_id, payload)
|
||||
await self.logger.info(
|
||||
f'WeComBot ws: proactively sent template_card for synthetic event '
|
||||
f'chat_id={chat_id} form_token={form_data.get("form_token")!r} '
|
||||
f'workflow_run_id={form_data.get("workflow_run_id")!r}'
|
||||
)
|
||||
elif final_content:
|
||||
await self.bot.send_message(chat_id, final_content)
|
||||
await self.logger.info(
|
||||
f'WeComBot ws: proactively sent text for synthetic event chat_id={chat_id} len={len(final_content)}'
|
||||
)
|
||||
except Exception:
|
||||
await self.logger.error(f'WeComBot: synthetic event proactive send failed: {traceback.format_exc()}')
|
||||
return {'stream': False, 'synthetic': True, 'error': True}
|
||||
|
||||
return {'stream': True, 'synthetic': True}
|
||||
|
||||
def _build_button_interaction_payload_from_form(
|
||||
self, form_data: dict, *, user_id: str = '', chat_id: str = ''
|
||||
) -> dict:
|
||||
"""Build a button_interaction payload + track task_id for click resolution.
|
||||
|
||||
Unlike the inbound-event path (where push_form_pause registers the
|
||||
task_id with the active stream session), proactive sends still
|
||||
need the task_id registered so button clicks find pending_form.
|
||||
For ws mode we stash it directly on the ws_client's pending dict.
|
||||
"""
|
||||
from langbot.libs.wecom_ai_bot_api.api import build_button_interaction_payload
|
||||
import secrets as _secrets
|
||||
|
||||
task_id = f'dify-{_secrets.token_hex(12)}'
|
||||
payload = build_button_interaction_payload(form_data, task_id)
|
||||
|
||||
# Register task_id → form_data so the click callback can find it.
|
||||
# user_id / chat_id are required so _on_card_action can route the
|
||||
# resulting synthetic query back to the right user. msg_id / req_id
|
||||
# / stream_id are intentionally empty — synthetic cards have no
|
||||
# inbound message to anchor on.
|
||||
if hasattr(self.bot, '_pending_forms_by_task'):
|
||||
self.bot._pending_forms_by_task[task_id] = {
|
||||
'form_data': form_data,
|
||||
'msg_id': '',
|
||||
'user_id': user_id,
|
||||
'chat_id': chat_id,
|
||||
'stream_id': '',
|
||||
'req_id': '',
|
||||
}
|
||||
return payload
|
||||
|
||||
async def send_message(self, target_type, target_id, message):
|
||||
_ws_mode = not self.config.get('enable-webhook', False)
|
||||
if _ws_mode:
|
||||
@@ -531,3 +730,114 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
async def is_muted(self, group_id: int) -> bool:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Dify human-input button-interaction click handling
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _on_card_action(self, session, action_id: str, task_id: str, raw_event: dict) -> None:
|
||||
"""Translate a button click on a button_interaction card into a
|
||||
synthetic ``_dify_form_action`` query enqueued on the pool.
|
||||
|
||||
Pattern mirrors DingTalk / Lark / Telegram so the runner's
|
||||
``_merge_pending_form_action`` path resumes the workflow.
|
||||
"""
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
|
||||
form = session.pending_form or {}
|
||||
await self.logger.info(
|
||||
f'WeComBot _on_card_action: task_id={task_id} action_id={action_id!r} '
|
||||
f'form_token={form.get("form_token")!r} workflow_run_id={form.get("workflow_run_id")!r} '
|
||||
f'session.user_id={session.user_id!r} session.chat_id={session.chat_id!r}'
|
||||
)
|
||||
|
||||
actions = form.get('actions') or []
|
||||
clean_action_id = (action_id or '').strip()
|
||||
action_title = clean_action_id
|
||||
for a in actions:
|
||||
if str(a.get('id', '')) == clean_action_id:
|
||||
action_title = a.get('title') or clean_action_id
|
||||
break
|
||||
|
||||
launcher_id = session.user_id or session.chat_id or ''
|
||||
sender_user_id = session.user_id or launcher_id
|
||||
# WeCom AI bot has both single-chat and group-chat; chat_id present
|
||||
# indicates group context.
|
||||
if session.chat_id:
|
||||
launcher_type = provider_session.LauncherTypes.GROUP
|
||||
launcher_id = session.chat_id
|
||||
else:
|
||||
launcher_type = provider_session.LauncherTypes.PERSON
|
||||
launcher_id = session.user_id or ''
|
||||
|
||||
form_action_data = {
|
||||
'form_token': form.get('form_token', ''),
|
||||
'workflow_run_id': form.get('workflow_run_id', ''),
|
||||
'action_id': clean_action_id,
|
||||
'action_title': action_title,
|
||||
'node_title': form.get('node_title', ''),
|
||||
'user': f'{launcher_type.value}_{launcher_id}',
|
||||
'inputs': {},
|
||||
}
|
||||
|
||||
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
|
||||
|
||||
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||
synthetic_event = platform_events.GroupMessage(
|
||||
sender=platform_entities.GroupMember(
|
||||
id=sender_user_id,
|
||||
member_name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
group=platform_entities.Group(
|
||||
id=launcher_id,
|
||||
name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
),
|
||||
special_title='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=int(time.time()),
|
||||
source_platform_object=None,
|
||||
)
|
||||
else:
|
||||
synthetic_event = platform_events.FriendMessage(
|
||||
sender=platform_entities.Friend(
|
||||
id=sender_user_id,
|
||||
nickname='',
|
||||
remark='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=int(time.time()),
|
||||
source_platform_object=None,
|
||||
)
|
||||
|
||||
if self.ap is None:
|
||||
await self.logger.error('WeComBot: ap not injected; cannot enqueue button-click query')
|
||||
return
|
||||
|
||||
bot_uuid = ''
|
||||
pipeline_uuid = None
|
||||
for bot in self.ap.platform_mgr.bots:
|
||||
if bot.adapter is self:
|
||||
bot_uuid = bot.bot_entity.uuid
|
||||
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
|
||||
break
|
||||
|
||||
try:
|
||||
await self.ap.query_pool.add_query(
|
||||
bot_uuid=bot_uuid,
|
||||
launcher_type=launcher_type,
|
||||
launcher_id=launcher_id,
|
||||
sender_id=sender_user_id,
|
||||
message_event=synthetic_event,
|
||||
message_chain=message_chain,
|
||||
adapter=self,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
variables={
|
||||
'_dify_form_action': form_action_data,
|
||||
'_routed_by_rule': True,
|
||||
},
|
||||
)
|
||||
await self.logger.info(f'WeComBot: button-click query enqueued action_id={clean_action_id!r}')
|
||||
except Exception:
|
||||
await self.logger.error(f'WeComBot: enqueue button-click query failed: {traceback.format_exc()}')
|
||||
|
||||
@@ -2,9 +2,11 @@ from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
import base64
|
||||
import mimetypes
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
from langbot.pkg.provider import runner
|
||||
@@ -16,6 +18,125 @@ from langbot.libs.dify_service_api.v1 import client, errors
|
||||
import httpx
|
||||
|
||||
|
||||
# Module-level store for paused-workflow form state, keyed by session key
|
||||
# (launcher_type_value + "_" + launcher_id). Each session holds an
|
||||
# insertion-ordered dict of form_token -> form_data, allowing multiple
|
||||
# Dify workflows to be paused simultaneously for the same session.
|
||||
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
|
||||
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
|
||||
|
||||
|
||||
def _session_key_from_query(query: pipeline_query.Query) -> str:
|
||||
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
|
||||
|
||||
|
||||
def _prune_pending_forms(now: float | None = None) -> None:
|
||||
if now is None:
|
||||
now = time.time()
|
||||
for session_key in list(_PENDING_FORMS.keys()):
|
||||
forms = _PENDING_FORMS[session_key]
|
||||
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
|
||||
for token in expired_tokens:
|
||||
forms.pop(token, None)
|
||||
if not forms:
|
||||
_PENDING_FORMS.pop(session_key, None)
|
||||
|
||||
|
||||
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
|
||||
_prune_pending_forms()
|
||||
stored = dict(form_data)
|
||||
expiration_time = stored.get('expiration_time')
|
||||
try:
|
||||
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
|
||||
except (TypeError, ValueError):
|
||||
expiration_ts = 0.0
|
||||
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
|
||||
form_token = str(stored.get('form_token') or '')
|
||||
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
|
||||
# Re-insert at the end so this becomes the "latest" entry
|
||||
forms.pop(form_token, None)
|
||||
forms[form_token] = stored
|
||||
|
||||
|
||||
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms or not form_token:
|
||||
return None
|
||||
return forms.get(form_token)
|
||||
|
||||
|
||||
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
|
||||
"""Look up a pending form whose workflow_run_id ends with the given suffix.
|
||||
|
||||
Used by adapters (e.g. Telegram) whose callback payload is too small to
|
||||
carry the full form_token / workflow_run_id.
|
||||
"""
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms or not w_suffix:
|
||||
return None
|
||||
for token in reversed(forms):
|
||||
form = forms[token]
|
||||
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
|
||||
return form
|
||||
return None
|
||||
|
||||
|
||||
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms:
|
||||
return None
|
||||
return forms[next(reversed(forms))]
|
||||
|
||||
|
||||
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
|
||||
"""Iterate pending forms for a session, newest-first."""
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms:
|
||||
return
|
||||
for token in reversed(list(forms.keys())):
|
||||
yield forms[token]
|
||||
|
||||
|
||||
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
|
||||
"""Clear one specific pending form (by token) or all forms for the session."""
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms:
|
||||
return
|
||||
if form_token is None:
|
||||
_PENDING_FORMS.pop(session_key, None)
|
||||
return
|
||||
forms.pop(form_token, None)
|
||||
if not forms:
|
||||
_PENDING_FORMS.pop(session_key, None)
|
||||
|
||||
|
||||
def _format_human_input_text(
|
||||
node_title: str,
|
||||
form_content: str,
|
||||
actions: list[dict[str, typing.Any]],
|
||||
) -> str:
|
||||
"""Render a paused-workflow human-input prompt as plain text.
|
||||
|
||||
Used by adapters without rich UI (no buttons/cards) so users can reply
|
||||
with the option number or the option title to resume the workflow.
|
||||
"""
|
||||
lines: list[str] = [f'[Human Input Required] {node_title or ""}'.rstrip()]
|
||||
if form_content:
|
||||
lines.append('')
|
||||
lines.append(form_content)
|
||||
if actions:
|
||||
lines.append('')
|
||||
lines.append('Reply with the number or title to continue:')
|
||||
for idx, action in enumerate(actions, start=1):
|
||||
title = action.get('title') or action.get('id') or ''
|
||||
lines.append(f' {idx}. {title}')
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
@runner.runner_class('dify-service-api')
|
||||
class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
"""Dify Service API 对话请求器"""
|
||||
@@ -335,11 +456,155 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
|
||||
query.session.using_conversation.uuid = chunk['conversation_id']
|
||||
|
||||
async def _submit_workflow_form_blocking(
|
||||
self, form_action: dict
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
|
||||
|
||||
form_token = form_action['form_token']
|
||||
workflow_run_id = form_action['workflow_run_id']
|
||||
user = form_action['user']
|
||||
action_id = form_action.get('action_id', '')
|
||||
inputs = form_action.get('inputs', {})
|
||||
|
||||
async for chunk in self.dify_client.workflow_submit(
|
||||
form_token=form_token,
|
||||
workflow_run_id=workflow_run_id,
|
||||
inputs=inputs,
|
||||
user=user,
|
||||
action=action_id,
|
||||
timeout=120,
|
||||
):
|
||||
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
|
||||
|
||||
if chunk['event'] == 'workflow_finished':
|
||||
if chunk['data'].get('error'):
|
||||
raise errors.DifyAPIError(chunk['data']['error'])
|
||||
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=content,
|
||||
)
|
||||
|
||||
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
|
||||
"""Locate the pending form this action targets.
|
||||
|
||||
Tries identifiers in order of specificity: form_token, full
|
||||
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
|
||||
then falls back to the newest pending form for the session.
|
||||
"""
|
||||
form_token = form_action.get('form_token')
|
||||
if form_token:
|
||||
form = _get_pending_form_by_token(session_key, form_token)
|
||||
if form:
|
||||
return form
|
||||
|
||||
workflow_run_id = form_action.get('workflow_run_id')
|
||||
if workflow_run_id:
|
||||
for form in _iter_pending_forms(session_key):
|
||||
if form.get('workflow_run_id') == workflow_run_id:
|
||||
return form
|
||||
|
||||
w_suffix = form_action.get('w_suffix')
|
||||
if w_suffix:
|
||||
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
|
||||
if form:
|
||||
return form
|
||||
|
||||
return _get_latest_pending_form(session_key)
|
||||
|
||||
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
|
||||
"""Backfill resume fields from the matching pending form."""
|
||||
if not form_action:
|
||||
return None
|
||||
|
||||
merged_action = dict(form_action)
|
||||
merged_action.pop('w_suffix', None)
|
||||
pending_form = self._resolve_pending_form(session_key, form_action)
|
||||
if pending_form:
|
||||
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
|
||||
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
|
||||
'workflow_run_id', ''
|
||||
)
|
||||
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
|
||||
merged_action.setdefault('user', pending_form.get('user', ''))
|
||||
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
|
||||
|
||||
# Resolve clicked action's display title from the stored actions list
|
||||
if 'action_title' not in merged_action:
|
||||
clicked_id = merged_action.get('action_id', '')
|
||||
for action in pending_form.get('actions', []):
|
||||
if str(action.get('id', '')) == str(clicked_id):
|
||||
merged_action['action_title'] = action.get('title', clicked_id)
|
||||
break
|
||||
|
||||
return merged_action
|
||||
|
||||
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
|
||||
"""Match plain text replies against pending Dify form actions.
|
||||
|
||||
Resolution order:
|
||||
1. A pure digit reply (e.g. "1", "2") maps to the 1-indexed action of
|
||||
the most recent pending form. Lets users on plain-text platforms
|
||||
pick options without retyping titles.
|
||||
2. Otherwise, iterate pending forms newest-first and match each
|
||||
action's title/id case-insensitively. The first hit wins, so when
|
||||
two forms share a button label the newer one resolves.
|
||||
"""
|
||||
normalized_text = user_text.strip().lower()
|
||||
if not normalized_text:
|
||||
return None
|
||||
|
||||
def _build(pending_form: dict, action: dict) -> dict:
|
||||
return {
|
||||
'form_token': pending_form.get('form_token', ''),
|
||||
'workflow_run_id': pending_form.get('workflow_run_id', ''),
|
||||
'action_id': action.get('id', ''),
|
||||
'action_title': action.get('title', action.get('id', '')),
|
||||
'node_title': pending_form.get('node_title', ''),
|
||||
'inputs': pending_form.get('inputs', {}),
|
||||
'user': pending_form.get('user', ''),
|
||||
}
|
||||
|
||||
if normalized_text.isdigit():
|
||||
position = int(normalized_text)
|
||||
latest_form = _get_latest_pending_form(session_key)
|
||||
if latest_form is not None:
|
||||
actions = latest_form.get('actions', [])
|
||||
if 1 <= position <= len(actions):
|
||||
return _build(latest_form, actions[position - 1])
|
||||
|
||||
for pending_form in _iter_pending_forms(session_key):
|
||||
for action in pending_form.get('actions', []):
|
||||
titles = {
|
||||
str(action.get('title', '')).strip().lower(),
|
||||
str(action.get('id', '')).strip().lower(),
|
||||
}
|
||||
if normalized_text in titles:
|
||||
return _build(pending_form, action)
|
||||
|
||||
return None
|
||||
|
||||
async def _workflow_messages(
|
||||
self, query: pipeline_query.Query
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""调用工作流"""
|
||||
|
||||
# Check if this is a form action resume (button click or text match)
|
||||
form_action_raw = query.variables.get('_dify_form_action')
|
||||
session_key = _session_key_from_query(query)
|
||||
|
||||
if form_action_raw:
|
||||
form_action = self._merge_pending_form_action(session_key, form_action_raw)
|
||||
else:
|
||||
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
|
||||
|
||||
if form_action:
|
||||
_clear_pending_form(session_key, form_action.get('form_token') or None)
|
||||
async for msg in self._submit_workflow_form_blocking(form_action):
|
||||
yield msg
|
||||
return
|
||||
|
||||
if not query.session.using_conversation.uuid:
|
||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||
|
||||
@@ -366,6 +631,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
}
|
||||
|
||||
inputs.update(query.variables)
|
||||
human_input_yielded = False
|
||||
|
||||
async for chunk in self.dify_client.workflow_run(
|
||||
inputs=inputs,
|
||||
@@ -377,6 +643,45 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
if chunk['event'] in ignored_events:
|
||||
continue
|
||||
|
||||
if chunk['event'] == 'workflow_paused':
|
||||
reasons = chunk['data'].get('reasons', [])
|
||||
workflow_run_id = chunk['data'].get('workflow_run_id', '')
|
||||
for reason in reasons:
|
||||
if reason.get('TYPE') == 'human_input_required':
|
||||
form_content = reason.get('form_content', '')
|
||||
actions = reason.get('actions', [])
|
||||
node_title = reason.get('node_title', '')
|
||||
|
||||
_set_pending_form(
|
||||
_session_key_from_query(query),
|
||||
{
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'form_id': reason.get('form_id'),
|
||||
'form_token': reason.get('form_token'),
|
||||
'node_id': reason.get('node_id'),
|
||||
'node_title': node_title,
|
||||
'form_content': form_content,
|
||||
'inputs': reason.get('inputs', {}),
|
||||
'actions': actions,
|
||||
'expiration_time': reason.get('expiration_time'),
|
||||
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||
},
|
||||
)
|
||||
|
||||
query.variables['_dify_form_render'] = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
}
|
||||
|
||||
display_text = _format_human_input_text(node_title, form_content, actions)
|
||||
|
||||
human_input_yielded = True
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=display_text,
|
||||
)
|
||||
|
||||
if chunk['event'] == 'node_started':
|
||||
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
|
||||
continue
|
||||
@@ -399,6 +704,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
yield msg
|
||||
|
||||
elif chunk['event'] == 'workflow_finished':
|
||||
if human_input_yielded:
|
||||
break
|
||||
if chunk['data']['error']:
|
||||
raise errors.DifyAPIError(chunk['data']['error'])
|
||||
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
||||
@@ -636,11 +943,153 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
|
||||
query.session.using_conversation.uuid = chunk['conversation_id']
|
||||
|
||||
async def _submit_workflow_form(
|
||||
self, form_action: dict
|
||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
"""Submit human input to resume a paused Dify workflow."""
|
||||
|
||||
form_token = form_action['form_token']
|
||||
workflow_run_id = form_action['workflow_run_id']
|
||||
user = form_action['user']
|
||||
action_id = form_action.get('action_id', '')
|
||||
action_title = form_action.get('action_title', '') or action_id
|
||||
node_title = form_action.get('node_title', '')
|
||||
inputs = form_action.get('inputs', {})
|
||||
|
||||
messsage_idx = 0
|
||||
is_final = False
|
||||
think_start = False
|
||||
think_end = False
|
||||
workflow_contents = ''
|
||||
repause_form_data: dict | None = None
|
||||
|
||||
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
|
||||
async for chunk in self.dify_client.workflow_submit(
|
||||
form_token=form_token,
|
||||
workflow_run_id=workflow_run_id,
|
||||
inputs=inputs,
|
||||
user=user,
|
||||
action=action_id,
|
||||
timeout=120,
|
||||
):
|
||||
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
|
||||
|
||||
yield_this_iteration = False
|
||||
|
||||
if chunk['event'] == 'workflow_finished':
|
||||
is_final = True
|
||||
yield_this_iteration = True
|
||||
if chunk['data'].get('error'):
|
||||
raise errors.DifyAPIError(chunk['data']['error'])
|
||||
|
||||
if chunk['event'] == 'workflow_paused':
|
||||
reasons = chunk['data'].get('reasons', [])
|
||||
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
|
||||
for reason in reasons:
|
||||
if reason.get('TYPE') != 'human_input_required':
|
||||
continue
|
||||
form_content = reason.get('form_content', '')
|
||||
actions = reason.get('actions', [])
|
||||
# Use a distinct name — `node_title` (the just-resolved step)
|
||||
# must keep its value so the resume notice on the previous
|
||||
# card still shows which step the user acted on.
|
||||
paused_node_title = reason.get('node_title', '')
|
||||
raw_inputs = reason.get('inputs', {})
|
||||
|
||||
_set_pending_form(
|
||||
user,
|
||||
{
|
||||
'workflow_run_id': new_run_id,
|
||||
'form_id': reason.get('form_id'),
|
||||
'form_token': reason.get('form_token'),
|
||||
'node_id': reason.get('node_id'),
|
||||
'node_title': paused_node_title,
|
||||
'form_content': form_content,
|
||||
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
|
||||
'actions': actions,
|
||||
'expiration_time': reason.get('expiration_time'),
|
||||
'user': user,
|
||||
},
|
||||
)
|
||||
|
||||
repause_form_data = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': paused_node_title,
|
||||
'workflow_run_id': new_run_id,
|
||||
'form_token': reason.get('form_token', ''),
|
||||
}
|
||||
# Ensure the final chunk has non-empty content so
|
||||
# ResponseWrapper (which skips empty-content chunks) lets it
|
||||
# propagate to SendResponseBackStage. Use a zero-width space
|
||||
# so neither Lark nor Telegram renders visible noise — the
|
||||
# adapter substitutes its own card text from _form_data.
|
||||
if not workflow_contents:
|
||||
workflow_contents = ''
|
||||
is_final = True
|
||||
yield_this_iteration = True
|
||||
break
|
||||
|
||||
if chunk['event'] == 'text_chunk':
|
||||
messsage_idx += 1
|
||||
if remove_think:
|
||||
if '<think>' in chunk['data']['text'] and not think_start:
|
||||
think_start = True
|
||||
continue
|
||||
if '</think>' in chunk['data']['text'] and not think_end:
|
||||
import re
|
||||
|
||||
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
|
||||
workflow_contents += content
|
||||
think_end = True
|
||||
elif think_end:
|
||||
workflow_contents += chunk['data']['text']
|
||||
if think_start:
|
||||
continue
|
||||
else:
|
||||
workflow_contents += chunk['data']['text']
|
||||
if messsage_idx % 8 == 0:
|
||||
yield_this_iteration = True
|
||||
|
||||
if yield_this_iteration:
|
||||
msg = provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=workflow_contents,
|
||||
is_final=is_final,
|
||||
)
|
||||
msg._resume_from_form = True
|
||||
if action_title:
|
||||
msg._resume_action_title = action_title
|
||||
if node_title:
|
||||
msg._resume_node_title = node_title
|
||||
if is_final and repause_form_data:
|
||||
msg._form_data = repause_form_data
|
||||
msg._open_new_card = True
|
||||
yield msg
|
||||
if is_final:
|
||||
return
|
||||
|
||||
async def _workflow_messages_chunk(
|
||||
self, query: pipeline_query.Query
|
||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
"""调用工作流"""
|
||||
|
||||
# Check if this is a form action resume (button click or text match)
|
||||
form_action_raw = query.variables.get('_dify_form_action')
|
||||
session_key = _session_key_from_query(query)
|
||||
|
||||
if form_action_raw:
|
||||
form_action = self._merge_pending_form_action(session_key, form_action_raw)
|
||||
else:
|
||||
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
|
||||
|
||||
if form_action:
|
||||
_clear_pending_form(session_key, form_action.get('form_token') or None)
|
||||
# Resume paused workflow via submit endpoint
|
||||
async for msg in self._submit_workflow_form(form_action):
|
||||
yield msg
|
||||
return
|
||||
|
||||
if not query.session.using_conversation.uuid:
|
||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||
|
||||
@@ -672,6 +1121,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
think_start = False
|
||||
think_end = False
|
||||
workflow_contents = ''
|
||||
workflow_run_id = ''
|
||||
human_input_yielded = False
|
||||
|
||||
# Saved form data to attach to the final MessageChunk so the adapter
|
||||
# can detect it when is_final=True and render buttons.
|
||||
pending_form_data = None
|
||||
display_text = ''
|
||||
|
||||
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
|
||||
async for chunk in self.dify_client.workflow_run(
|
||||
@@ -682,7 +1138,61 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
):
|
||||
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
|
||||
if chunk['event'] in ignored_events:
|
||||
if chunk['event'] == 'workflow_started':
|
||||
workflow_run_id = chunk['data'].get('workflow_run_id', '')
|
||||
continue
|
||||
|
||||
if chunk['event'] == 'workflow_paused':
|
||||
reasons = chunk['data'].get('reasons', [])
|
||||
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
|
||||
for reason in reasons:
|
||||
if reason.get('TYPE') == 'human_input_required':
|
||||
form_content = reason.get('form_content', '')
|
||||
actions = reason.get('actions', [])
|
||||
node_title = reason.get('node_title', '')
|
||||
|
||||
# Persist form state in module-level store keyed by session
|
||||
raw_inputs = reason.get('inputs', {})
|
||||
_set_pending_form(
|
||||
_session_key_from_query(query),
|
||||
{
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'form_id': reason.get('form_id'),
|
||||
'form_token': reason.get('form_token'),
|
||||
'node_id': reason.get('node_id'),
|
||||
'node_title': node_title,
|
||||
'form_content': form_content,
|
||||
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
|
||||
'actions': actions,
|
||||
'expiration_time': reason.get('expiration_time'),
|
||||
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||
},
|
||||
)
|
||||
|
||||
# Pass form render metadata to downstream stages
|
||||
query.variables['_dify_form_render'] = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
}
|
||||
|
||||
display_text = _format_human_input_text(node_title, form_content, actions)
|
||||
workflow_contents += display_text + '\n'
|
||||
|
||||
# Save form data to attach to the final chunk later.
|
||||
# We do NOT yield here — the form content will be sent
|
||||
# as the final MessageChunk (with is_final=True and
|
||||
# _form_data) so the adapter can update the card and
|
||||
# add buttons in one pass.
|
||||
pending_form_data = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'form_token': reason.get('form_token', ''),
|
||||
}
|
||||
human_input_yielded = True
|
||||
|
||||
if chunk['event'] == 'workflow_finished':
|
||||
is_final = True
|
||||
if chunk['data']['error']:
|
||||
@@ -730,11 +1240,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
yield msg
|
||||
|
||||
if messsage_idx % 8 == 0 or is_final:
|
||||
yield provider_message.MessageChunk(
|
||||
final_content = workflow_contents if workflow_contents.strip() else ''
|
||||
msg = provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=workflow_contents,
|
||||
content=final_content,
|
||||
is_final=is_final,
|
||||
)
|
||||
# Attach form data to the final chunk for the adapter
|
||||
if is_final and pending_form_data:
|
||||
msg._form_data = pending_form_data
|
||||
pending_form_data = None
|
||||
yield msg
|
||||
|
||||
# If the stream ended after workflow_paused without a
|
||||
# workflow_finished event, yield a final chunk so the adapter
|
||||
# can update the card and add buttons.
|
||||
if human_input_yielded and not is_final:
|
||||
msg = provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=workflow_contents or display_text,
|
||||
is_final=True,
|
||||
)
|
||||
msg._form_data = pending_form_data
|
||||
yield msg
|
||||
|
||||
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""运行请求"""
|
||||
|
||||
6
src/langbot/templates/dingtalk_human_input_card.json
Normal file
6
src/langbot/templates/dingtalk_human_input_card.json
Normal file
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user