Compare commits

...

9 Commits

Author SHA1 Message Date
fdc310
f10ea82418 feat(wecom): implement Dify human input pause handling with button interaction support 2026-06-16 00:07:11 +08:00
fdc310
a32c4d152f feat(dingtalk): enhance human input card functionality with streaming support and active turn management
- Updated the DingTalk card template to enable streaming mode and multi-update configuration.
- Removed the obsolete delete_card method from DingTalkClient to streamline card management.
- Enhanced DingTalkAdapter to manage active turn cards and accumulated streaming text, ensuring a seamless user experience during human input prompts.
- Modified the create_message_card method to utilize existing active cards for resumed workflows, preventing duplication.
- Improved the _paint_form_on_card method to update existing cards with human input prompts and buttons dynamically.
- Updated the dingtalk_human_input_card.json template to reflect the new streaming capabilities and configuration options.
2026-06-15 17:45:09 +08:00
fdc310
83b0d26e99 feat(dingtalk): implement human input card support and card action handling
- Add a new module `card_callback.py` to handle card action button clicks from DingTalk.
- Introduce `DingTalkCardActionHandler` to process card action callbacks and extract parameters.
- Update `DingTalkAdapter` to manage card state and handle form input through a single card template.
- Add configuration for `human_input_card_template_id` in `dingtalk.yaml` to specify the template for human input.
- Create a new card template `dingtalk_human_input_card.json` for rendering human input prompts and buttons.
2026-06-15 15:51:52 +08:00
fdc310
e08b5db625 feat: Add the function for formatting human input text to support adapters without rich UI. 2026-06-10 15:41:18 +08:00
fdc310
d0f65b17ec feat: Improve TelegramAdapter message handling with enhanced error management and draft message support 2026-06-02 01:14:25 +08:00
fdc310
2b533c4a00 feat: Enhance TelegramAdapter to handle form action buttons and message threading 2026-06-02 00:25:52 +08:00
fdc310
f663d87a60 feat: Enhance Lark and Telegram adapters with new form handling for paused workflows 2026-06-01 23:48:59 +08:00
fdc310
60e5b873ee feat: Add '_routed_by_rule' variable to form action in Lark and Telegram adapters 2026-05-30 12:22:00 +08:00
fdc310
b96f209b98 feat: Implement workflow form handling for paused workflows
- Added module-level storage for pending forms to manage state across sessions.
- Introduced functions to set, get, and clear pending forms with expiration handling.
- Enhanced DifyServiceAPIRunner to support resuming paused workflows via form actions.
- Implemented logic to yield human input requests and display appropriate messages.
- Updated workflow submission methods to handle paused states and resume actions.
- Ensured proper merging of pending form actions with user inputs for seamless interaction.
2026-05-28 23:32:46 +08:00
17 changed files with 4026 additions and 133 deletions

View File

@@ -0,0 +1,649 @@
"""Generate the DingTalk human-input card template JSON.
The output is wrapped in the {editorData, widgetInfo, type, mode} envelope
the DingTalk card builder expects on import. editorData is itself a JSON
string (NOT a nested object), matching real exports from the builder.
Run from the repo root: python scripts/build_dingtalk_card_template.py
"""
from __future__ import annotations
import json
from pathlib import Path
OUTPUT = Path('src/langbot/templates/dingtalk_human_input_card.json')
def markdown_block(node_id, variable='content'):
"""A MarkdownBlock whose content is bound to a global variable.
Unlike BaseText (whose `text` field requires editor-side manual binding),
MarkdownBlock's `content` accepts `variableValue` binding at JSON load
time — the imported template renders the variable straight away.
"""
return {
'componentName': 'MarkdownBlock',
'id': node_id,
'props': {
'mdVer': 0,
'icon': {'type': 'icon', 'icon': '', 'iconType': 'emoji'},
'content': {'variable': variable, 'variableType': 'global', 'type': 'variableValue'},
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'isStreaming': True,
'enableLinkStatPoint': False,
'linkStatPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'linkStatPointParams': [],
'marginTop': 6,
'marginBottom': 6,
'marginLeft': 12,
'marginRight': 12,
},
'title': 'AI 流式富文本',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
def text_block(
node_id,
text,
*,
bold=False,
gravity='left',
font_size=14,
line_height=22,
max_lines=20,
ml=12,
mr=12,
mt=4,
mb=4,
color_token='common_level1_base_color',
style_token='common_body_text_style',
):
return {
'componentName': 'BaseText',
'id': node_id,
'props': {
'text': {'i18n': False, 'type': 'dynamicString', 'content': text},
'hoverText': {'type': 'dynamicString', 'content': '', 'i18n': False},
'iconType': 'iconCode',
'iconFont': {'type': 'icon', 'icon': '', 'iconType': 'ddIcon'},
'icon': {
'type': 'dynamicLink',
'value': '',
'valueType': 'fixed',
'variable': '',
'variableType': 'global',
},
'darkIcon': {
'type': 'dynamicLink',
'value': '',
'valueType': 'fixed',
'variable': '',
'variableType': 'global',
},
'autoWidth': False,
'maxWidth': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': 0,
'variable': '',
'variableType': 'global',
},
'fixedWidth': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': 0,
'variable': '',
'variableType': 'global',
},
'marginLeft': ml,
'marginRight': mr,
'marginTop': mt,
'marginBottom': mb,
'fontColorType': 'Standard',
'enableHighlight': False,
'maxLine': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': max_lines,
'variable': '',
'variableType': 'global',
},
'color': {
'type': 'dynamicColor',
'valueType': 'fixed',
'value': color_token,
'variable': '',
'variableType': 'global',
},
'customLightColor': {
'type': 'dynamicColor',
'valueType': 'fixed',
'value': '#35404b',
'variable': '',
'variableType': 'global',
},
'customDarkColor': {
'type': 'dynamicColor',
'valueType': 'fixed',
'value': '#f6f6f6',
'variable': '',
'variableType': 'global',
},
'gravity': gravity,
'fontSizeType': 'Standard',
'styleType': 'custom',
'styleToken': style_token,
'size': 'middle',
'customFontSize': font_size,
'customFontLineHeight': line_height,
'bold': bold,
'italic': False,
'strikeThrough': False,
'lineHeight': 'normal',
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'autoMaxWidth': False,
'innerOffset': 0,
'enableIcon': False,
'widthMode': 'match_parent',
'margin': -2,
},
'title': '基础文本',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
def button_group(node_id):
return {
'componentName': 'ButtonGroup',
'id': node_id,
'props': {
'dynamicButtons': {'type': 'variableValue', 'variableType': 'global', 'variable': 'btns'},
'marginLeft': 12,
'marginRight': 12,
'marginTop': 6,
'marginBottom': 12,
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'responsiveLayoutWidth': 350,
'buttonsSource': 'variable',
'fixedButtonIds': [],
'fixedButtons': [],
'enableResponsiveLayout': False,
'matchContent': False,
'buttonSpacing': 8,
'margin': -2,
'innerOffset': 0,
},
'title': '按钮组',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
def build_editor_data():
component_names = [
'AIPending',
'AICardStatusContainer',
'BaseText',
'AICardContent',
'AICardContainer',
'ButtonGroup',
'MarkdownBlock',
]
components_map = [
{
'package': '@ali/dxComponent',
'version': '1.0.0',
'exportName': n,
'main': './src/index.tsx',
'destructuring': False,
'subName': '',
'componentName': n,
}
for n in component_names
]
pending_state = {
'componentName': 'AICardStatusContainer',
'id': 'node_status_pending',
'props': {
'status': 1,
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enableExtend': False,
'autoFoldConfig': {
'needFold': True,
'heightLimit': 480,
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
},
'innerOffset': 0,
'enableCollapse': False,
'margin': -2,
},
'title': '处理中状态',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
{
'componentName': 'AIPending',
'id': 'node_pending_inner',
'props': {
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'pendingTip': {'type': 'dynamicString', 'content': '处理中...', 'i18n': False},
'style': 'embed',
'hideIcon': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
}
],
}
done_state = {
'componentName': 'AICardStatusContainer',
'id': 'node_status_done',
'props': {
'status': 3,
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enableExtend': False,
'autoFoldConfig': {
'needFold': True,
'heightLimit': 480,
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
},
'innerOffset': 0,
'enableCollapse': False,
'margin': -2,
},
'title': '完成状态',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
{
'componentName': 'AICardContent',
'id': 'node_done_content',
'props': {
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'innerOffset': 0,
'disabledWhileForward': False,
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'statPointParams': [
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
],
'margin': -2,
'transformToEventChain': False,
'enableStatPoint': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
markdown_block('node_text_content', variable='content'),
button_group('node_btn_group'),
],
}
],
}
failed_state = {
'componentName': 'AICardStatusContainer',
'id': 'node_status_failed',
'props': {
'status': 5,
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enableExtend': False,
'autoFoldConfig': {
'needFold': True,
'heightLimit': 480,
'foldStatusLocalDataKey': '_cardFoldStatusLocalDataKey',
},
'innerOffset': 0,
'enableCollapse': False,
'margin': -2,
},
'title': '失败状态',
'hidden': False,
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
{
'componentName': 'AICardContent',
'id': 'node_failed_content',
'props': {
'visible': {
'type': 'dynamicVisible',
'value': True,
'valueType': 'fixed',
'condition': {'op': 'and', 'conditions': []},
},
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'innerOffset': 0,
'disabledWhileForward': False,
'statPoint': {'type': 'dynamicString', 'content': '', 'i18n': False},
'statPointParams': [
{'type': 'fixed', 'variable': '', 'value': '', 'name': '', 'variableType': 'global', 'id': '1'}
],
'margin': -2,
'transformToEventChain': False,
'enableStatPoint': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [
text_block(
'node_failed_text',
'操作失败,请稍后重试。',
gravity='center',
mt=10,
mb=10,
ml=10,
mr=10,
max_lines=2,
font_size=15,
)
],
}
],
}
root = {
'componentName': 'AICardContainer',
'id': 'node_root',
'props': {
'marginLeft': 0,
'marginRight': 0,
'marginTop': 0,
'marginBottom': 0,
'enablePending': True,
'enableWriting': False,
'enableDoing': False,
'enableFailed': True,
'summaryContent': {'type': 'variableValue', 'variableType': 'global', 'variable': ''},
'enableTitle': False,
'flowStatusVar': {'type': 'variableValue', 'variableType': 'global', 'variable': 'flowStatus'},
'operationPenalType': 'custom',
'enableFlowAbort': True,
'innerOffset': 0,
'enableGradientBorder': True,
'cardSizeMode': 'adaptive',
'cardSizeHeightMode': 'adaptive',
'cardSizeWidthMode': 'adaptive',
'cardSizeHeight': {
'type': 'dynamicNumber',
'valueType': 'fixed',
'value': 226,
'variable': '',
'variableType': 'global',
},
'hasBackground': False,
'backgroundType': 'Standard',
'standardBackgroundColor': 'gray',
'backgroundColor': '#F6F6F6',
'darkModeBackgroundColor': '#3C3C3C',
'enableEngineUpgrade': False,
'enableExposeStatPoint': False,
'enableDebugTool': False,
},
'hidden': False,
'title': '',
'isLocked': False,
'condition': True,
'conditionGroup': '',
'children': [pending_state, done_state, failed_state],
}
btns_var = {
'name': 'btns',
'private': False,
'type': 'buttonGroup',
'id': 'btns',
'description': '动态按钮列表Dify actions',
'editorVarType': 'variables',
'disabled': False,
'schema': [
{
'id': 'btns.text',
'type': 'string',
'name': 'text',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮文案',
},
{
'id': 'btns.color',
'type': 'string',
'name': 'color',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮颜色',
},
{
'id': 'btns.status',
'type': 'string',
'name': 'status',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮状态',
},
{
'id': 'btns.event',
'type': 'dynamicEvent',
'name': 'event',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '按钮点击事件',
'schema': [
{
'id': 'btns.type',
'type': 'string',
'name': 'type',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '事件类型openLink / sendCardRequest',
},
{
'id': 'btns.params',
'type': 'object',
'name': 'params',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '事件参数',
'schema': [
{
'id': 'btns.url',
'type': 'string',
'name': 'url',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '点击跳转链接type=openLink',
},
{
'id': 'btns.actionId',
'type': 'string',
'name': 'actionId',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '回传请求 idtype=sendCardRequest',
},
{
'id': 'btns.params',
'type': 'object',
'name': 'params',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'description': '回传请求参数type=sendCardRequest',
},
],
},
],
},
],
}
return {
'schemaVersion': '3.0.0',
'schema': {
'config': {'update_multi': True, 'streaming_mode': True},
'componentsMap': components_map,
'componentsTree': [root],
'i18n': {},
'version': '1.0.0',
},
'mockData': {
'cardData': {
'flowStatus': 3,
'content': '请审核以下报销申请:\n\n- 申请人:张三\n- 金额¥1,200\n- 类别:差旅',
'btns': [
{
'text': '通过',
'color': 'blue',
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {'actionId': 'approve', 'params': {'action_id': 'approve'}},
},
},
{
'text': '驳回',
'color': 'gray',
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {'actionId': 'reject', 'params': {'action_id': 'reject'}},
},
},
{
'text': '补充资料',
'color': 'gray',
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {'actionId': 'more_info', 'params': {'action_id': 'more_info'}},
},
},
],
},
'cardPrivateData': {},
'localData': {'flowStatus': '', '_cardFoldStatusLocalDataKey': ''},
'richTextData': {},
},
'renderContext': {'regenerateEnabled': '1', 'regenerateIndex': '2', 'regenerateTotal': '5'},
'editVersion': 0,
'customWidgetInfo': '',
'useCustomWidgetInfo': False,
'variableList': [
{
'id': 'content',
'type': 'markdown',
'name': 'content',
'description': '人工输入提示词Dify form_content 含可选 node_title 前缀)',
'private': False,
'editorVarType': 'variables',
'disabled': False,
},
{
'id': 'flowStatus',
'type': 'string',
'name': 'flowStatus',
'description': 'AI卡片状态pending(1)、writing(2)、done(3)、failed(5)',
'private': False,
'editorVarType': 'variables',
'disabled': True,
'visible': False,
},
btns_var,
],
'formList': [],
'customContextList': [],
'expList': [],
'localList': [],
'hsfList': [],
'lwpList': [],
'pageData': {},
'extension': {'extendType': 'AI', 'aiStatusList': [3, 1, 5], 'fileTypeList': []},
}
def main():
editor_data = build_editor_data()
wrapper = {
'editorData': json.dumps(editor_data, ensure_ascii=False, separators=(',', ':')),
'widgetInfo': '',
'type': 'im',
'mode': 'card',
}
OUTPUT.write_text(json.dumps(wrapper, ensure_ascii=False, indent=2), encoding='utf-8')
print(f'wrote {OUTPUT}')
if __name__ == '__main__':
main()

View File

@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
if chunk.startswith('data:'): if chunk.startswith('data:'):
yield json.loads(chunk[5:]) yield json.loads(chunk[5:])
async def workflow_submit(
self,
form_token: str,
workflow_run_id: str,
inputs: dict[str, typing.Any],
user: str,
action: str = '',
timeout: float = 120.0,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""Submit human input to resume a paused workflow, then stream events.
1. POST /form/human_input/{form_token} to submit the form
2. GET /workflow/{task_id}/events to stream the resumed workflow events
"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
}
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
# Step 1: Submit the form
payload: dict[str, typing.Any] = {
'inputs': inputs if isinstance(inputs, dict) else {},
'user': user,
'action': action,
}
submit_resp = await client.post(
f'/form/human_input/{form_token}',
headers=headers,
json=payload,
)
if submit_resp.status_code != 200:
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
# Step 2: Stream resumed workflow events
async with client.stream(
'GET',
f'/workflow/{workflow_run_id}/events',
headers={'Authorization': f'Bearer {self.api_key}'},
params={'user': user},
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f'{r.status_code} {chunk}')
if chunk.strip() == '':
continue
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
async def upload_file( async def upload_file(
self, self,
file: httpx._types.FileTypes, file: httpx._types.FileTypes,

View File

@@ -1,17 +1,26 @@
import asyncio import asyncio
import base64 import base64
import json import json
import logging
import time import time
import uuid
import urllib.parse import urllib.parse
from typing import Callable from typing import Awaitable, Callable, Optional
import dingtalk_stream # type: ignore import dingtalk_stream # type: ignore
import websockets import websockets
from .EchoHandler import EchoTextHandler from .EchoHandler import EchoTextHandler
from .card_callback import DingTalkCardActionHandler
from .dingtalkevent import DingTalkEvent from .dingtalkevent import DingTalkEvent
import httpx import httpx
import traceback import traceback
_stdout_logger = logging.getLogger('langbot.dingtalk_api')
DINGTALK_OPENAPI_BASE = 'https://api.dingtalk.com'
class DingTalkClient: class DingTalkClient:
def __init__( def __init__(
self, self,
@@ -21,6 +30,7 @@ class DingTalkClient:
robot_code: str, robot_code: str,
markdown_card: bool, markdown_card: bool,
logger: None, logger: None,
card_action_callback: Optional[Callable[[dict], Awaitable[None]]] = None,
): ):
"""初始化 WebSocket 连接并自动启动""" """初始化 WebSocket 连接并自动启动"""
self.credential = dingtalk_stream.Credential(client_id, client_secret) self.credential = dingtalk_stream.Credential(client_id, client_secret)
@@ -30,6 +40,14 @@ class DingTalkClient:
# 在 DingTalkClient 中传入自己作为参数,避免循环导入 # 在 DingTalkClient 中传入自己作为参数,避免循环导入
self.EchoTextHandler = EchoTextHandler(self) self.EchoTextHandler = EchoTextHandler(self)
self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler) self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler)
# STREAM-mode card action button click handler. Forwards parsed payload
# to the adapter so it can resume paused Dify workflows.
self.card_action_callback = card_action_callback
self.card_action_handler = DingTalkCardActionHandler(self.client, self._on_card_action)
self.client.register_callback_handler(
dingtalk_stream.handlers.CallbackHandler.TOPIC_CARD_CALLBACK,
self.card_action_handler,
)
self._message_handlers = { self._message_handlers = {
'example': [], 'example': [],
} }
@@ -41,6 +59,16 @@ class DingTalkClient:
self.logger = logger self.logger = logger
self._stopped = False # Flag to control the event loop self._stopped = False # Flag to control the event loop
async def _on_card_action(self, payload: dict) -> None:
"""Dispatch a parsed card-action payload to the adapter callback."""
if self.card_action_callback is None:
return
try:
await self.card_action_callback(payload)
except Exception:
if self.logger:
await self.logger.error(f'DingTalk card action callback error: {traceback.format_exc()}')
async def get_access_token(self): async def get_access_token(self):
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
headers = {'Content-Type': 'application/json'} headers = {'Content-Type': 'application/json'}
@@ -429,18 +457,35 @@ class DingTalkClient:
'Content-Type': 'application/json', 'Content-Type': 'application/json',
} }
# For enterprise-internal robots, robotCode == AppKey (client_id).
# The dedicated robot_code field is only required for scenario-group
# robots or third-party robots; fall back to client_id when empty so
# the common single-bot setup keeps working without manual config.
robot_code = self.robot_code or self.key
data = { data = {
'robotCode': self.robot_code, 'robotCode': robot_code,
'userIds': [target_id], 'userIds': [target_id],
'msgKey': 'sampleText', 'msgKey': 'sampleText',
'msgParam': json.dumps({'content': content}), 'msgParam': json.dumps({'content': content}),
} }
_stdout_logger.info(
'DingTalk send_proactive_message_to_one request: robotCode=%s target_id=%s content_len=%d',
robot_code,
target_id,
len(content),
)
try: try:
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data) response = await client.post(url, headers=headers, json=data)
_stdout_logger.info(
'DingTalk send_proactive_message_to_one response: status=%d body=%s',
response.status_code,
response.text[:500],
)
if response.status_code == 200: if response.status_code == 200:
return return
except Exception: except Exception:
_stdout_logger.exception('DingTalk send_proactive_message_to_one error')
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}') await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}') raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
@@ -456,7 +501,7 @@ class DingTalkClient:
} }
data = { data = {
'robotCode': self.robot_code, 'robotCode': self.robot_code or self.key,
'openConversationId': target_id, 'openConversationId': target_id,
'msgKey': 'sampleText', 'msgKey': 'sampleText',
'msgParam': json.dumps({'content': content}), 'msgParam': json.dumps({'content': content}),
@@ -477,47 +522,244 @@ class DingTalkClient:
quote_origin: bool = False, quote_origin: bool = False,
card_auto_layout: bool = False, card_auto_layout: bool = False,
): ):
card_data = {} """Create + deliver the streaming chat card for a chatbot reply.
card_data['config'] = json.dumps({'autoLayout': card_auto_layout})
card_data['content'] = ''
# 将用户的消息内容作为卡片的查询参数,方便后续处理 Replaces the old `dingtalk_stream.AICardReplier`-based path. Returns
if incoming_message.message_type == 'text': `(None, out_track_id)` to keep call sites compatible with the
card_data['query'] = incoming_message.get_text_list()[0] previous `(card_instance, card_instance_id)` shape — the first slot
is unused now that everything is driven by out_track_id.
"""
out_track_id = uuid.uuid4().hex
is_group = str(incoming_message.conversation_type) == '2'
if is_group:
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
else: else:
card_data['query'] = '...' open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
card_instance = dingtalk_stream.AICardReplier(self.client, incoming_message) card_param_map = {'content': ''}
# print(card_instance) if incoming_message.message_type == 'text':
# 先投放卡片: https://open.dingtalk.com/document/orgapp/create-and-deliver-cards card_param_map['query'] = incoming_message.get_text_list()[0]
card_instance_id = await card_instance.async_create_and_deliver_card( else:
temp_card_id, card_param_map['query'] = '...'
card_data,
await self.create_and_deliver_card(
card_template_id=temp_card_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map=card_param_map,
card_data_config={'autoLayout': card_auto_layout},
) )
return card_instance, card_instance_id return None, out_track_id
async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool): async def send_card_message(self, card_instance, card_instance_id: str, content: str, is_final: bool):
content_key = 'content' """Stream a single chunk into an existing card's `content` field."""
try: try:
await card_instance.async_streaming( await self.streaming_update_card(
card_instance_id, out_track_id=card_instance_id,
content_key=content_key, content_key='content',
content_value=content, content_value=content,
append=False, append=False,
finished=is_final, finished=is_final,
failed=False, failed=False,
) )
except Exception as e: except Exception as e:
self.logger.exception(e) if self.logger:
await card_instance.async_streaming( self.logger.exception(e)
card_instance_id, await self.streaming_update_card(
content_key=content_key, out_track_id=card_instance_id,
content_key='content',
content_value='', content_value='',
append=False, append=False,
finished=is_final, finished=is_final,
failed=True, failed=True,
) )
async def create_and_deliver_card(
self,
*,
card_template_id: str,
out_track_id: str,
open_space_id: str,
is_group: bool,
card_param_map: Optional[dict] = None,
callback_type: str = 'STREAM',
callback_route_key: Optional[str] = None,
support_forward: bool = True,
dynamic_data_source_configs: Optional[list] = None,
card_data_config: Optional[dict] = None,
at_user_ids: Optional[dict] = None,
recipients: Optional[list] = None,
) -> bool:
"""POST /v1.0/card/instances/createAndDeliver.
Mirrors the SDK's `async_create_and_deliver_card` shape but exposes
the dynamic-data-source config slot so we can register a pull URL
for variable-length button lists.
"""
if not await self.check_access_token():
await self.get_access_token()
cardData: dict = {'cardParamMap': card_param_map or {}}
if card_data_config is not None:
cardData['config'] = json.dumps(card_data_config)
body: dict = {
'cardTemplateId': card_template_id,
'outTrackId': out_track_id,
'cardData': cardData,
'callbackType': callback_type,
'openSpaceId': open_space_id,
'imGroupOpenSpaceModel': {'supportForward': support_forward},
'imRobotOpenSpaceModel': {'supportForward': support_forward},
}
if callback_type == 'HTTP' and callback_route_key:
body['callbackRouteKey'] = callback_route_key
if is_group:
deliver: dict = {'robotCode': self.robot_code or self.key}
if at_user_ids:
deliver['atUserIds'] = at_user_ids
if recipients is not None:
deliver['recipients'] = recipients
body['imGroupOpenDeliverModel'] = deliver
else:
body['imRobotOpenDeliverModel'] = {'spaceType': 'IM_ROBOT'}
if dynamic_data_source_configs:
body['openDynamicDataConfig'] = {'dynamicDataSourceConfigs': dynamic_data_source_configs}
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances/createAndDeliver'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
_stdout_logger.info(
'DingTalk createAndDeliver request body: %s',
json.dumps(body, ensure_ascii=False)[:1500],
)
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=body, timeout=30.0)
if response.status_code == 200:
_stdout_logger.info(
'DingTalk createAndDeliver response: %s',
response.text[:500],
)
return True
_stdout_logger.error(
'DingTalk createAndDeliver failed: status=%s body=%s',
response.status_code,
response.text,
)
if self.logger:
await self.logger.error(
f'DingTalk createAndDeliver failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
_stdout_logger.exception('DingTalk createAndDeliver error')
if self.logger:
await self.logger.error(f'DingTalk createAndDeliver error: {traceback.format_exc()}')
return False
async def streaming_update_card(
self,
*,
out_track_id: str,
content_key: str,
content_value: str,
append: bool,
finished: bool,
failed: bool = False,
) -> bool:
"""PUT /v1.0/card/streaming.
Replaces `dingtalk_stream.AICardReplier.async_streaming` — same body
shape (outTrackId / guid / key / content / isFull / isFinalize /
isError) per the SDK source.
"""
if not await self.check_access_token():
await self.get_access_token()
body = {
'outTrackId': out_track_id,
'guid': uuid.uuid4().hex,
'key': content_key,
'content': content_value,
'isFull': not append,
'isFinalize': finished,
'isError': failed,
}
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/streaming'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=body, timeout=30.0)
if response.status_code == 200:
return True
if self.logger:
await self.logger.error(
f'DingTalk card streaming failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
if self.logger:
await self.logger.error(f'DingTalk card streaming error: {traceback.format_exc()}')
return False
async def update_card_data(
self,
*,
out_track_id: str,
card_param_map: Optional[dict] = None,
private_data: Optional[dict] = None,
) -> bool:
"""PUT /v1.0/card/instances — non-streaming card content update."""
if not await self.check_access_token():
await self.get_access_token()
body: dict = {
'outTrackId': out_track_id,
'cardData': {'cardParamMap': card_param_map or {}},
}
if private_data:
body['privateData'] = private_data
url = f'{DINGTALK_OPENAPI_BASE}/v1.0/card/instances'
headers = {
'x-acs-dingtalk-access-token': self.access_token,
'Content-Type': 'application/json',
}
try:
_stdout_logger.info(
'DingTalk update_card_data request: out_track_id=%s body=%s',
out_track_id,
json.dumps(body, ensure_ascii=False)[:500],
)
async with httpx.AsyncClient() as client:
response = await client.put(url, headers=headers, json=body, timeout=30.0)
_stdout_logger.info(
'DingTalk update_card_data response: status=%d body=%s',
response.status_code,
response.text[:300],
)
if response.status_code == 200:
return True
if self.logger:
await self.logger.error(
f'DingTalk update card failed: status={response.status_code} body={response.text}'
)
return False
except Exception:
_stdout_logger.exception('DingTalk update_card_data error')
if self.logger:
await self.logger.error(f'DingTalk update card error: {traceback.format_exc()}')
return False
async def start(self): async def start(self):
"""启动 WebSocket 连接,监听消息""" """启动 WebSocket 连接,监听消息"""
self._stopped = False self._stopped = False

View File

@@ -0,0 +1,96 @@
"""STREAM-mode handler for DingTalk card action button clicks.
DingTalk delivers card-action callbacks over the same WebSocket stream used
for chatbot messages, under the topic `/v1.0/card/instances/callback`. This
module subclasses `dingtalk_stream.CallbackHandler` and forwards the parsed
payload to a coroutine the adapter registers, so the resume-paused-workflow
logic stays in the platform adapter where it belongs.
The `CardCallbackMessage` returned by `from_dict` exposes:
* `card_instance_id` (from `outTrackId`) — the card whose button was clicked
* `user_id` — the clicker's userId
* `content` — parsed JSON; the click params live here. Where exactly inside
`content` they sit depends on the template binding. We probe
the common paths.
* `extension` — parsed JSON; any extra data we set when delivering the card.
"""
from __future__ import annotations
from typing import Awaitable, Callable, Optional
import dingtalk_stream # type: ignore
from dingtalk_stream import AckMessage
from dingtalk_stream.card_callback import CardCallbackMessage
_PARAM_PATHS = (
('params',),
('cardPrivateData', 'params'),
('userPrivateData', 'params'),
)
def _extract_params(content: dict) -> dict:
"""Return the action params dict regardless of where the template put it."""
for path in _PARAM_PATHS:
node = content
for key in path:
if not isinstance(node, dict):
node = None
break
node = node.get(key)
if node is None:
break
if isinstance(node, dict) and node:
return node
return {}
class DingTalkCardActionHandler(dingtalk_stream.CallbackHandler):
def __init__(
self,
dingtalk_stream_client,
on_action: Optional[Callable[[dict], Awaitable[None]]] = None,
):
super().__init__()
self.dingtalk_client = dingtalk_stream_client
self.on_action = on_action
async def process(self, callback: dingtalk_stream.CallbackMessage):
try:
message = CardCallbackMessage.from_dict(callback.data)
params = _extract_params(message.content if isinstance(message.content, dict) else {})
# `CardCallbackMessage.from_dict` does not surface `actionId` (the
# top-level field that ButtonGroup's sendCardRequest event puts
# there). Pull it from the raw callback.data instead.
raw = callback.data if isinstance(callback.data, dict) else {}
action_id = raw.get('actionId') or ''
if not action_id:
# Some templates nest it under actionData / cardPrivateData.
action_data = raw.get('actionData') or {}
if isinstance(action_data, dict):
action_id = action_data.get('actionId') or action_id
if not action_id:
cpd = action_data.get('cardPrivateData') or {}
if isinstance(cpd, dict):
ids = cpd.get('actionIds')
if isinstance(ids, list) and ids:
action_id = str(ids[0])
payload = {
'out_track_id': message.card_instance_id,
'user_id': message.user_id,
'corp_id': message.corp_id,
'action_id': action_id,
'params': params,
'raw_content': message.content,
'extension': message.extension if isinstance(message.extension, dict) else {},
}
if self.on_action is not None:
await self.on_action(payload)
except Exception as e:
self.logger.error(f'DingTalkCardActionHandler.process error: {e}')
return AckMessage.STATUS_OK, 'OK'

View File

@@ -67,6 +67,16 @@ class StreamSession:
# 反馈 ID用于接收用户点赞/点踩反馈 # 反馈 ID用于接收用户点赞/点踩反馈
feedback_id: Optional[str] = None feedback_id: Optional[str] = None
# Dify 人工输入暂停态runner 把 _form_data 传过来时填充。
# 一旦设置,下次企微 followup 请求时返回 button_interaction 模板卡
# 替代 stream chunk。点击按钮会回调 template_card_eventEventKey
# 就是 Dify 的 action_id。
pending_form: Optional[dict] = None
# template_card task_id企微要求 button_interaction 必填且不可重复)。
# 创建 pending_form 时生成;按钮点击回调里用来反查 session。
pending_form_task_id: Optional[str] = None
class StreamSessionManager: class StreamSessionManager:
"""管理 stream 会话的生命周期,并负责队列的生产消费。""" """管理 stream 会话的生命周期,并负责队列的生产消费。"""
@@ -83,6 +93,9 @@ class StreamSessionManager:
self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射 self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射
self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话 self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话
self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射 self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射
# task_id (button_interaction template_card 的) -> stream_id 映射,
# 用于按钮点击回调里反查 pending_form。
self._task_index: dict[str, str] = {}
def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]: def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]:
if not msg_id: if not msg_id:
@@ -118,6 +131,40 @@ class StreamSessionManager:
if feedback_id and stream_id: if feedback_id and stream_id:
self._feedback_index[feedback_id] = stream_id self._feedback_index[feedback_id] = stream_id
def set_pending_form(self, stream_id: str, form_data: dict, task_id: str) -> None:
"""把 Dify 人工输入暂停态绑定到 stream session。
下一次企微 followup 请求时adapter 检测到 pending_form
返回 button_interaction 模板卡而不是 stream chunk。
"""
session = self._sessions.get(stream_id)
if not session:
return
session.pending_form = form_data
session.pending_form_task_id = task_id
if task_id:
self._task_index[task_id] = stream_id
def get_session_by_task_id(self, task_id: str) -> Optional[StreamSession]:
"""按按钮点击回调里的 TaskId 反查 session。"""
if not task_id:
return None
stream_id = self._task_index.get(task_id)
if not stream_id:
return None
return self._sessions.get(stream_id)
def clear_pending_form(self, stream_id: str) -> None:
"""按钮点击消费完后清掉 pending_form避免重复弹卡。"""
session = self._sessions.get(stream_id)
if not session:
return
task_id = session.pending_form_task_id
session.pending_form = None
session.pending_form_task_id = None
if task_id:
self._task_index.pop(task_id, None)
def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]: def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]:
"""根据企业微信回调创建或获取会话。 """根据企业微信回调创建或获取会话。
@@ -723,6 +770,79 @@ async def parse_wecom_bot_message(
return message_data return message_data
def build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]:
"""Build a `template_card` (button_interaction) WeCom payload.
Shared by both the webhook-mode client (returns the payload as the
response to a stream-followup callback) and the ws_client (sends it
as a reply frame). Output shape is `{"msgtype": "template_card",
"template_card": {...}}` per the WeCom spec.
Args:
form_data: Dify human-input form data with keys ``actions`` (list of
``{id, title, button_style}``), ``node_title``, ``form_content``.
task_id: Unique per-card identifier. WeCom requires this for
button_interaction. The click callback returns it as TaskId so we
can find the originating session.
Notes:
* ``button.key`` is set directly to the Dify ``action_id``. The click
callback's ``EventKey`` carries this back unchanged (1024-byte limit
per the spec, far more than we ever need).
* WeCom caps the button list at 6. Extra actions are appended to
``sub_title_text`` so users can still reply with the id as text.
* Styles map ``primary``→1 (blue), ``danger``→2 (red), default→0
(gray). First button is auto-promoted to primary when no style.
"""
actions = list(form_data.get('actions') or [])
node_title = (form_data.get('node_title') or '').strip() or '人工介入'
form_content = (form_data.get('form_content') or '').strip()
visible_actions = actions[:6]
overflow = actions[6:]
sub_title_parts: list[str] = []
if form_content:
sub_title_parts.append(form_content)
if overflow:
extra_lines = [f' - {a.get("title") or a.get("id") or ""} (回复 id: {a.get("id") or ""})' for a in overflow]
sub_title_parts.append(f'另有 {len(overflow)} 个选项不在按钮列表中,可直接回复 id\n' + '\n'.join(extra_lines))
sub_title_text = '\n\n'.join(sub_title_parts) or '请选择一个操作以继续。'
button_list = []
for idx, action in enumerate(visible_actions):
action_id = str(action.get('id') or '')
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
style_raw = (action.get('button_style') or '').lower()
if style_raw == 'primary' or (style_raw == '' and idx == 0):
style = 1
elif style_raw == 'danger':
style = 2
else:
style = 0
button_list.append(
{
'text': title,
'style': style,
'key': action_id,
}
)
card = {
'card_type': 'button_interaction',
'main_title': {
'title': node_title,
},
'sub_title_text': sub_title_text,
'button_list': button_list,
'task_id': task_id,
}
return {
'msgtype': 'template_card',
'template_card': card,
}
class WecomBotClient: class WecomBotClient:
def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLogger, unified_mode: bool = False): def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLogger, unified_mode: bool = False):
"""企业微信智能机器人客户端。 """企业微信智能机器人客户端。
@@ -761,6 +881,7 @@ class WecomBotClient:
self.stream_poll_timeout = 0.5 self.stream_poll_timeout = 0.5
self._feedback_callback: Optional[Callable] = None self._feedback_callback: Optional[Callable] = None
self._card_action_callback: Optional[Callable] = None
def set_feedback_callback(self, callback: Callable) -> None: def set_feedback_callback(self, callback: Callable) -> None:
"""设置反馈回调函数。 """设置反馈回调函数。
@@ -770,6 +891,19 @@ class WecomBotClient:
""" """
self._feedback_callback = callback self._feedback_callback = callback
def set_card_action_callback(self, callback: Callable) -> None:
"""设置按钮卡片点击回调函数。
Signature: ``async def callback(session, action_id, task_id, raw_event) -> None``
``session`` is the StreamSession the card was attached to;
``action_id`` is the Dify action_id reflected back via the
button's ``key`` field; ``task_id`` is the card's task_id
(matches ``session.pending_form_task_id``); ``raw_event`` is the
decoded callback JSON for any extra fields the adapter wants.
"""
self._card_action_callback = callback
@staticmethod @staticmethod
def _build_stream_payload( def _build_stream_payload(
stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None
@@ -800,6 +934,12 @@ class WecomBotClient:
'stream': stream_payload, 'stream': stream_payload,
} }
@staticmethod
def _build_button_interaction_payload(form_data: dict, task_id: str) -> dict[str, Any]:
"""Class-level shim — delegates to module-level builder so ws_client
can reuse the exact same payload shape without importing the class."""
return build_button_interaction_payload(form_data, task_id)
async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]: async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""对响应进行加密封装并返回给企业微信。 """对响应进行加密封装并返回给企业微信。
@@ -892,6 +1032,22 @@ class WecomBotClient:
return await self._encrypt_and_reply(self._build_stream_payload('', '', True), nonce) return await self._encrypt_and_reply(self._build_stream_payload('', '', True), nonce)
session = self.stream_sessions.get_session(stream_id) session = self.stream_sessions.get_session(stream_id)
# If a Dify human-input pause arrived during this stream, switch
# the response from `msgtype: stream` to `msgtype: template_card`
# (button_interaction). The session's stream is also marked
# finished so future followups aren't expected (assuming the
# WeCom client treats template_card as the terminal response —
# we'll know from the next callback whether it kept polling).
if session and session.pending_form and session.pending_form_task_id:
await self.logger.info(
f'WeComBot: returning button_interaction for stream_id={stream_id} '
f'task_id={session.pending_form_task_id} actions={len(session.pending_form.get("actions") or [])}'
)
card_payload = self._build_button_interaction_payload(session.pending_form, session.pending_form_task_id)
self.stream_sessions.mark_finished(stream_id)
return await self._encrypt_and_reply(card_payload, nonce)
chunk = await self.stream_sessions.consume(stream_id, timeout=self.stream_poll_timeout) chunk = await self.stream_sessions.consume(stream_id, timeout=self.stream_poll_timeout)
if not chunk: if not chunk:
@@ -1000,11 +1156,50 @@ class WecomBotClient:
if event_type == 'feedback_event': if event_type == 'feedback_event':
return await self._handle_feedback_event(msg_json, nonce) return await self._handle_feedback_event(msg_json, nonce)
# Button click on a button_interaction template_card. The WeCom doc
# calls this `template_card_event`; some routes wrap the button
# event payload inside `event.template_card_event`.
if event_type == 'template_card_event':
return await self._handle_template_card_event(msg_json, nonce)
if msg_json.get('msgtype') == 'stream': if msg_json.get('msgtype') == 'stream':
return await self._handle_post_followup_response(msg_json, nonce) return await self._handle_post_followup_response(msg_json, nonce)
return await self._handle_post_initial_response(msg_json, nonce) return await self._handle_post_initial_response(msg_json, nonce)
async def _handle_template_card_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""Handle a button click on a button_interaction template_card.
WeCom carries the click info in ``event.template_card_event`` with
``TaskId`` matching the card we created and ``EventKey`` carrying
the button's ``key`` (which we set to the Dify ``action_id``).
"""
try:
tce = msg_json.get('event', {}).get('template_card_event', {})
task_id = tce.get('TaskId') or tce.get('task_id') or ''
event_key = tce.get('EventKey') or tce.get('event_key') or ''
card_type = tce.get('CardType') or tce.get('card_type') or ''
await self.logger.info(f'收到按钮点击: task_id={task_id} event_key={event_key!r} card_type={card_type}')
session = self.stream_sessions.get_session_by_task_id(task_id)
if session is None:
await self.logger.warning(f'未找到 task_id={task_id} 对应的 session按钮点击被丢弃')
else:
if self._card_action_callback is not None:
try:
await self._card_action_callback(session, event_key, task_id, msg_json)
except Exception:
await self.logger.error(f'card action callback raised: {traceback.format_exc()}')
# Drop the form so a fresh chunk/followup doesn't re-render
# the same card (and so the task_id can be GC'd).
self.stream_sessions.clear_pending_form(session.stream_id)
except Exception:
await self.logger.error(f'_handle_template_card_event error: {traceback.format_exc()}')
# WeCom expects an empty success ack for event callbacks.
return await self._encrypt_and_reply({}, nonce)
async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]: async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""处理企业微信用户反馈事件(点赞/点踩)。 """处理企业微信用户反馈事件(点赞/点踩)。
@@ -1114,6 +1309,29 @@ class WecomBotClient:
self.stream_sessions.mark_finished(stream_id) self.stream_sessions.mark_finished(stream_id)
return True return True
async def push_form_pause(
self, msg_id: str, form_data: dict, task_id: Optional[str] = None
) -> tuple[bool, Optional[str], Optional[str]]:
"""Attach a Dify human-input pause to the active stream session.
On the next WeCom followup poll, the response switches from
``msgtype: stream`` to ``msgtype: template_card`` (button_interaction)
carrying the buttons. ``task_id`` is auto-generated if not provided
and is what the button-click callback uses to look the session back up.
Returns:
``(ok, stream_id, task_id)``. ``ok`` is False if the
adapter's msg_id maps to no stream session (e.g. non-stream mode).
"""
stream_id = self.stream_sessions.get_stream_id_by_msg(msg_id)
if not stream_id:
return False, None, None
if not task_id:
# WeCom requires task_id [A-Za-z0-9_-@], <= 128 bytes, unique per bot.
task_id = f'dify-{uuid.uuid4().hex[:24]}'
self.stream_sessions.set_pending_form(stream_id, form_data, task_id)
return True, stream_id, task_id
async def set_message(self, msg_id: str, content: str): async def set_message(self, msg_id: str, content: str):
"""兼容旧逻辑:若无法流式返回则缓存最终结果。 """兼容旧逻辑:若无法流式返回则缓存最终结果。

View File

@@ -20,7 +20,11 @@ from typing import Any, Callable, Optional
import aiohttp import aiohttp
from langbot.libs.wecom_ai_bot_api import wecombotevent from langbot.libs.wecom_ai_bot_api import wecombotevent
from langbot.libs.wecom_ai_bot_api.api import parse_wecom_bot_message, StreamSession from langbot.libs.wecom_ai_bot_api.api import (
parse_wecom_bot_message,
StreamSession,
build_button_interaction_payload,
)
from langbot.pkg.platform.logger import EventLogger from langbot.pkg.platform.logger import EventLogger
DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com' DEFAULT_WS_URL = 'wss://openws.work.weixin.qq.com'
@@ -103,6 +107,18 @@ class WecomBotWsClient:
# msg_id -> feedback_id (for associating feedback with message) # msg_id -> feedback_id (for associating feedback with message)
self._msg_feedback_ids: dict[str, str] = {} # msg_id -> feedback_id self._msg_feedback_ids: dict[str, str] = {} # msg_id -> feedback_id
# Dify human-input pause state for ws mode. Keys are task_id (echoed
# back in template_card_event.TaskId so we can rebuild the session
# context on click).
# task_id -> {form_data, msg_id, user_id, chat_id, stream_id, req_id}
self._pending_forms_by_task: dict[str, dict] = {}
# Reverse: msg_id -> task_id (for cleanup when stream finishes).
self._task_id_by_msg: dict[str, str] = {}
# Optional card-action callback registered by the adapter.
# Signature mirrors the http-mode WecomBotClient:
# async def callback(session, action_id, task_id, raw_event) -> None
self._card_action_callback: Optional[Callable] = None
# ── Public API ────────────────────────────────────────────────── # ── Public API ──────────────────────────────────────────────────
async def connect(self): async def connect(self):
@@ -236,6 +252,83 @@ class WecomBotWsClient:
} }
return await self._send_reply(req_id, body) return await self._send_reply(req_id, body)
async def reply_template_card(self, req_id: str, card_payload: dict[str, Any]) -> Optional[dict]:
"""Send a template_card (button_interaction etc.) reply.
Args:
req_id: The req_id from the original message frame.
card_payload: Body produced by ``build_button_interaction_payload``;
must contain ``msgtype`` and ``template_card`` keys.
Returns:
ACK frame dict, or None on failure.
"""
return await self._send_reply(req_id, card_payload)
def set_card_action_callback(self, callback: Callable) -> None:
"""Register the button-click handler.
``async def callback(session, action_id, task_id, raw_event) -> None``
— same signature as the http-mode WecomBotClient version so the
adapter can hand both off to the same coroutine.
"""
self._card_action_callback = callback
async def push_form_pause(
self, msg_id: str, form_data: dict, task_id: Optional[str] = None
) -> tuple[bool, Optional[str], Optional[str]]:
"""Attach a Dify human-input pause to the active stream and send
the button_interaction card immediately.
ws mode has no notion of polled "followup" responses — each reply
is a one-shot frame send. So unlike the http path (which defers
card delivery to the next followup), here we just craft the card
and reply with it on the original req_id. The corresponding stream
session is then torn down so subsequent chunks don't re-send.
Returns:
``(ok, stream_id, task_id)``. ``ok=False`` if no active stream
for this msg_id (e.g. message arrived in non-stream mode).
"""
key = self._stream_ids.get(msg_id)
if not key:
return False, None, None
req_id, stream_id = key.split('|', 1)
if not task_id:
task_id = f'dify-{secrets.token_hex(12)}'
session_info = self._stream_sessions.get(msg_id) or {}
self._pending_forms_by_task[task_id] = {
'form_data': form_data,
'msg_id': msg_id,
'user_id': session_info.get('user_id', ''),
'chat_id': session_info.get('chat_id', ''),
'stream_id': stream_id,
'req_id': req_id,
}
self._task_id_by_msg[msg_id] = task_id
card_payload = build_button_interaction_payload(form_data, task_id)
try:
await self.reply_template_card(req_id, card_payload)
except Exception:
await self.logger.error(f'Failed to send button_interaction card: {traceback.format_exc()}')
# Roll back the bookkeeping so the next attempt isn't blocked.
self._pending_forms_by_task.pop(task_id, None)
self._task_id_by_msg.pop(msg_id, None)
return False, stream_id, None
# Tear down the stream — WeCom expects either stream chunks OR a
# template_card, not both on the same req_id. Subsequent
# push_stream_chunk calls for this msg_id become no-ops.
self._stream_ids.pop(msg_id, None)
self._stream_last_content.pop(msg_id, None)
# Keep _stream_sessions so the button callback can still resolve
# user/chat context; it gets cleaned up when the click fires.
return True, stream_id, task_id
async def send_message(self, chat_id: str, content: str, msgtype: str = 'markdown') -> Optional[dict]: async def send_message(self, chat_id: str, content: str, msgtype: str = 'markdown') -> Optional[dict]:
"""Proactively send a message to a specified chat. """Proactively send a message to a specified chat.
@@ -258,6 +351,23 @@ class WecomBotWsClient:
body['text'] = {'content': content} body['text'] = {'content': content}
return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG) return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG)
async def send_template_card(self, chat_id: str, card_payload: dict[str, Any]) -> Optional[dict]:
"""Proactively push a template_card to a chat.
Used for the resumed-workflow path (button click → new query):
synthetic events have no inbound req_id to reply against, so we
fall back to proactive ``aibot_send_msg`` instead of reply mode.
Args:
chat_id: userid (single chat) or chatid (group chat).
card_payload: ``{"msgtype": "template_card", "template_card": {...}}``
as produced by :func:`build_button_interaction_payload`.
"""
req_id = _generate_req_id(CMD_SEND_MSG)
body = dict(card_payload)
body['chatid'] = chat_id
return await self._send_reply(req_id, body, cmd=CMD_SEND_MSG)
async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = False) -> bool: async def push_stream_chunk(self, msg_id: str, content: str, is_final: bool = False) -> bool:
"""Push a streaming chunk for a given message ID. """Push a streaming chunk for a given message ID.
@@ -568,6 +678,38 @@ class WecomBotWsClient:
await self.logger.error(f'Error in feedback handler: {traceback.format_exc()}') await self.logger.error(f'Error in feedback handler: {traceback.format_exc()}')
return return
if event_type == 'template_card_event':
tce = event_info.get('template_card_event', {})
task_id = tce.get('TaskId') or tce.get('task_id') or ''
event_key = tce.get('EventKey') or tce.get('event_key') or ''
card_type = tce.get('CardType') or tce.get('card_type') or ''
await self.logger.info(
f'收到按钮点击 (ws): task_id={task_id} event_key={event_key!r} card_type={card_type}'
)
pending = self._pending_forms_by_task.get(task_id)
if pending is None:
await self.logger.warning(f'未找到 task_id={task_id} 对应的 pending_form (ws),按钮点击被丢弃')
elif self._card_action_callback is not None:
try:
session = StreamSession(
stream_id=pending.get('stream_id', ''),
msg_id=pending.get('msg_id', ''),
chat_id=pending.get('chat_id') or None,
user_id=pending.get('user_id') or None,
)
session.pending_form = pending.get('form_data')
session.pending_form_task_id = task_id
await self._card_action_callback(session, event_key, task_id, body)
except Exception:
await self.logger.error(f'card action callback raised (ws): {traceback.format_exc()}')
# Consume — drop bookkeeping so a stale click can't re-fire.
self._pending_forms_by_task.pop(task_id, None)
msg_id = pending.get('msg_id', '')
if msg_id:
self._task_id_by_msg.pop(msg_id, None)
self._stream_sessions.pop(msg_id, None)
return
event = wecombotevent.WecomBotEvent(message_data) event = wecombotevent.WecomBotEvent(message_data)
if event_type in self._message_handlers: if event_type in self._message_handlers:

View File

@@ -157,7 +157,7 @@ class RuntimePipeline:
bot_message=query.resp_messages[-1], bot_message=query.resp_messages[-1],
message=result.user_notice, message=result.user_notice,
quote_origin=query.pipeline_config['output']['misc']['quote-origin'], quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
is_final=[msg.is_final for msg in query.resp_messages][0], is_final=[msg.is_final for msg in query.resp_messages][-1],
) )
else: else:
await query.adapter.reply_message( await query.adapter.reply_message(

View File

@@ -42,9 +42,13 @@ class QueryPool:
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter, adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None, pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False, routed_by_rule: bool = False,
variables: typing.Optional[dict[str, typing.Any]] = None,
) -> pipeline_query.Query: ) -> pipeline_query.Query:
async with self.condition: async with self.condition:
query_id = self.query_id_counter query_id = self.query_id_counter
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
if variables:
initial_variables.update(variables)
query = pipeline_query.Query( query = pipeline_query.Query(
bot_uuid=bot_uuid, bot_uuid=bot_uuid,
query_id=query_id, query_id=query_id,
@@ -53,7 +57,7 @@ class QueryPool:
sender_id=sender_id, sender_id=sender_id,
message_event=message_event, message_event=message_event,
message_chain=message_chain, message_chain=message_chain,
variables={'_routed_by_rule': routed_by_rule}, variables=initial_variables,
resp_messages=[], resp_messages=[],
resp_message_chain=[], resp_message_chain=[],
adapter=adapter, adapter=adapter,

View File

@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages) has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
# TODO 命令与流式的兼容性问题 # TODO 命令与流式的兼容性问题
if await query.adapter.is_stream_output_supported() and has_chunks: if await query.adapter.is_stream_output_supported() and has_chunks:
is_final = [msg.is_final for msg in query.resp_messages][0] is_final = [msg.is_final for msg in query.resp_messages][-1]
await query.adapter.reply_message_chunk( await query.adapter.reply_message_chunk(
message_source=query.message_event, message_source=query.message_event,
bot_message=query.resp_messages[-1], bot_message=query.resp_messages[-1],

View File

@@ -501,6 +501,8 @@ class PlatformManager:
bot_entity.adapter_config, bot_entity.adapter_config,
logger, logger,
) )
if hasattr(adapter_inst, 'ap'):
adapter_inst.ap = self.ap
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid用于统一 webhook # 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid用于统一 webhook
if hasattr(adapter_inst, 'set_bot_uuid'): if hasattr(adapter_inst, 'set_bot_uuid'):

View File

@@ -1,13 +1,19 @@
import asyncio
import json
import traceback import traceback
import typing import typing
import uuid
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.entities.builtin.provider.session as provider_session
from langbot.libs.dingtalk_api.api import DingTalkClient from langbot.libs.dingtalk_api.api import DingTalkClient
import datetime import datetime
from langbot.pkg.platform.logger import EventLogger from langbot.pkg.platform.logger import EventLogger
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter): class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@@ -170,6 +176,22 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_instance_id_dict: ( card_instance_id_dict: (
dict # 回复卡片消息字典key为消息idvalue为回复卡片实例id用于在流式消息时判断是否发送到指定卡片 dict # 回复卡片消息字典key为消息idvalue为回复卡片实例id用于在流式消息时判断是否发送到指定卡片
) )
# outTrackId → form snapshot {session_key, launcher_type, launcher_id, form_token,
# workflow_run_id, actions, node_title, form_content, expires_at, open_space_id,
# user_id_hint, current_text}. Lookup keys for the data-source pull endpoint and
# the STREAM card-action callback.
card_state: dict
# session_key → out_track_id of the currently-active card for the
# conversation turn. Lets resumed-workflow chunks (which arrive on a
# synthetic event with a fresh resp_message_id) keep updating the same
# card the user clicked instead of getting a new one.
active_turn_card: dict
# session_key → accumulated streaming text for the active turn. Read
# by _paint_form_on_card so the post-pause form keeps the streamed
# context above the new prompt.
active_turn_text: dict
ap: typing.Any = None
bot_uuid: str = ''
def __init__(self, config: dict, logger: EventLogger): def __init__(self, config: dict, logger: EventLogger):
required_keys = [ required_keys = [
@@ -194,10 +216,17 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
config=config, config=config,
logger=logger, logger=logger,
card_instance_id_dict={}, card_instance_id_dict={},
card_state={},
active_turn_card={},
active_turn_text={},
bot_account_id=bot_account_id, bot_account_id=bot_account_id,
bot=bot, bot=bot,
listeners={}, listeners={},
) )
# Wire the card-action callback after super().__init__ so we can reference
# self.* — the client's handler stores this as a soft reference and reads
# it at fire time.
self.bot.card_action_callback = self._on_card_action
async def reply_message( async def reply_message(
self, self,
@@ -222,28 +251,82 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
quote_origin: bool = False, quote_origin: bool = False,
is_final: bool = False, is_final: bool = False,
): ):
# event = await DingTalkEventConverter.yiri2target(
# message_source,
# )
# incoming_message = event.incoming_message
# msg_id = incoming_message.message_id
message_id = bot_message.resp_message_id message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence msg_seq = bot_message.msg_sequence
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
form_data = getattr(bot_message, '_form_data', None)
if is_final and self.ap is not None:
self.ap.logger.info(
f'DingTalk reply_message_chunk final: form_data_present={form_data is not None}, '
f'form_template_configured={bool(form_template_id)}'
)
if form_data and is_final:
await self._handle_form_chunk(message_source, bot_message, message, form_data)
return
if (msg_seq - 1) % 8 == 0 or is_final: if (msg_seq - 1) % 8 == 0 or is_final:
markdown_enabled = self.config.get('markdown_card', False) markdown_enabled = self.config.get('markdown_card', False)
content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled) content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
card_instance, card_instance_id = self.card_instance_id_dict[message_id]
if not content and bot_message.content: if not content and bot_message.content:
content = bot_message.content # 兼容直接传入content的情况 content = bot_message.content # 兼容直接传入content的情况
# print(card_instance_id)
chat_card_entry = self.card_instance_id_dict.get(message_id)
if chat_card_entry is None:
# No streaming chat card was created for this query — common
# path for synthetic events (e.g. resumed workflow after a
# button click). Lazy-create one so the resumed output streams
# into a card just like a normal conversation, instead of
# being deferred and sent in one shot on is_final.
if not content:
return # nothing to stream yet
chat_card_entry = await self._lazy_create_resume_chat_card(message_source, message_id)
if chat_card_entry is None:
# Lazy-create failed (no template configured); fall back
# to a one-shot proactive message on the final chunk.
if is_final:
await self._send_proactive_to_event(message_source, content)
return
card_instance, card_instance_id = chat_card_entry
if content: if content:
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final) if form_template_id:
if is_final and bot_message.tool_calls is None: # The card content has already been written via
# self.seq = 1 # 消息回复结束之后重置seq # update_card_data (in _paint_form_on_card and the
self.card_instance_id_dict.pop(message_id) # 消息回复结束之后删除卡片实例id # initial card creation). The streaming endpoint
# (PUT /v1.0/card/streaming) does not propagate
# updates on cards whose content was last set via
# update_card_data — they take different code paths
# on the DingTalk client. Stick with update_card_data
# for the whole turn for consistency.
try:
await self.bot.update_card_data(
out_track_id=card_instance_id,
card_param_map={
'content': content,
'btns': '[]',
'flowStatus': '3' if is_final else '1',
},
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: update card content failed')
else:
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
if is_final:
if form_template_id and not content:
# Empty final chunk still needs to leave the card with
# flowStatus=3 so the spinner stops.
try:
await self.bot.update_card_data(
out_track_id=card_instance_id,
card_param_map={'flowStatus': '3'},
)
except Exception:
pass
if bot_message.tool_calls is None:
self.card_instance_id_dict.pop(message_id, None)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
markdown_enabled = self.config.get('markdown_card', False) markdown_enabled = self.config.get('markdown_card', False)
@@ -260,16 +343,80 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return is_stream return is_stream
async def create_message_card(self, message_id, event): async def create_message_card(self, message_id, event):
card_template_id = self.config['card_template_id'] form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
legacy_template_id = self.config.get('card_template_id', '')
# Synthetic events (button clicks): look up the card already in
# active_turn_card so reply_message_chunk can stream to it.
if event is None or event.source_platform_object is None:
if form_template_id:
session_key = self._session_key_from_event(event) if event is not None else ''
carry = self.active_turn_card.get(session_key, '') if session_key else ''
if carry:
self.card_instance_id_dict[message_id] = (None, carry)
return True
return False
if form_template_id:
# Create one card with the form template, empty buttons,
# pending state. Streaming writes content to it; form pause
# paints buttons onto it. One card per turn, no duplication.
incoming_message = event.source_platform_object.incoming_message
out_track_id = uuid.uuid4().hex
is_group = str(incoming_message.conversation_type) == '2'
if is_group:
open_space_id = f'dtv1.card//IM_GROUP.{incoming_message.conversation_id}'
else:
open_space_id = f'dtv1.card//IM_ROBOT.{incoming_message.sender_staff_id}'
try:
await self.bot.create_and_deliver_card(
card_template_id=form_template_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map={'content': '', 'btns': '[]', 'flowStatus': '1'},
callback_type='STREAM',
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: create form-template card failed')
return False
self.card_instance_id_dict[message_id] = (None, out_track_id)
session_key = self._session_key_from_event(event)
if session_key:
self.active_turn_card[session_key] = out_track_id
self.active_turn_text[session_key] = ''
return True
# Legacy chat-card path (no form template).
incoming_message = event.source_platform_object.incoming_message incoming_message = event.source_platform_object.incoming_message
# message_id = incoming_message.message_id card_auto_layout = self.config.get('card_auto_layout', False)
card_auto_layout = self.config.get('card_ auto_layout', False)
card_instance, card_instance_id = await self.bot.create_and_card( card_instance, card_instance_id = await self.bot.create_and_card(
card_template_id, incoming_message, card_auto_layout=card_auto_layout legacy_template_id, incoming_message, card_auto_layout=card_auto_layout
) )
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id) self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
return True return True
def _session_key_from_event(self, event) -> str:
"""Return launcher_type_launcher_id for an event, '' if unrecoverable."""
if event is None:
return ''
spo = event.source_platform_object
if spo is None:
try:
if isinstance(event, platform_events.GroupMessage):
return f'group_{event.group.id}'
return f'person_{event.sender.id}'
except Exception:
return ''
try:
inc = spo.incoming_message
if str(inc.conversation_type) == '2':
return f'group_{inc.conversation_id}'
return f'person_{inc.sender_staff_id}'
except Exception:
return ''
def register_listener( def register_listener(
self, self,
event_type: typing.Type[platform_events.Event], event_type: typing.Type[platform_events.Event],
@@ -309,3 +456,543 @@ class DingTalkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
], ],
): ):
return super().unregister_listener(event_type, callback) return super().unregister_listener(event_type, callback)
# ------------------------------------------------------------------
# Dify human-input form support
# ------------------------------------------------------------------
def set_bot_uuid(self, bot_uuid: str):
"""Receive the bot uuid from the platform manager.
Used to compose the public-facing unified-webhook URL for the card
dynamic-data-source pull endpoint.
"""
self.bot_uuid = bot_uuid
def _derive_open_space(self, message_source: platform_events.MessageEvent) -> tuple[str, bool]:
"""Return (openSpaceId, is_group) for the given inbound event."""
if isinstance(message_source, platform_events.GroupMessage):
return f'dtv1.card//IM_GROUP.{message_source.group.id}', True
return f'dtv1.card//IM_ROBOT.{message_source.sender.id}', False
def _derive_session_descriptor(
self, message_source: platform_events.MessageEvent
) -> tuple[provider_session.LauncherTypes, str, str]:
"""Return (launcher_type, launcher_id, sender_user_id) for routing."""
if isinstance(message_source, platform_events.GroupMessage):
return (
provider_session.LauncherTypes.GROUP,
str(message_source.group.id),
str(message_source.sender.id),
)
return (
provider_session.LauncherTypes.PERSON,
str(message_source.sender.id),
str(message_source.sender.id),
)
async def _handle_form_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
form_data: dict,
) -> None:
"""Surface human-input prompt + buttons on the active card.
In single-card mode (form_template_id configured): update the
EXISTING card with form buttons so it transitions from streaming
output to prompt+buttons on the same card. In legacy mode:
finalize the chat card and deliver a separate form card.
"""
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _handle_form_chunk: actions={len(form_data.get("actions") or [])}, '
f'node_title={form_data.get("node_title", "")!r}'
)
message_id = bot_message.resp_message_id
template_id = (self.config.get('human_input_card_template_id') or '').strip()
if template_id:
# Single-card mode: paint prompt + buttons onto the existing card.
session_key = self._session_key_from_event(message_source)
entry = self.card_instance_id_dict.get(message_id)
out_track_id = entry[1] if entry else None
if not out_track_id and session_key:
out_track_id = self.active_turn_card.get(session_key, '')
if out_track_id:
await self._paint_form_on_card(message_source, out_track_id, form_data, session_key)
self.card_instance_id_dict.pop(message_id, None)
return
# No existing card (e.g. Dify paused immediately with no LLM
# output before the pause). Create a form card directly.
await self._send_form_card(message_source, form_data, template_id)
self.card_instance_id_dict.pop(message_id, None)
return
# Legacy mode: finalize the streaming card with text fallback.
chat_card_entry = self.card_instance_id_dict.pop(message_id, None)
if chat_card_entry is not None:
_, chat_out_track_id = chat_card_entry
markdown_enabled = self.config.get('markdown_card', False)
text_content, _ = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
if not text_content and bot_message.content:
text_content = bot_message.content
try:
await self.bot.send_card_message(None, chat_out_track_id, text_content or '', True)
except Exception:
await self.logger.error(f'DingTalk: finalize chat card before form failed: {traceback.format_exc()}')
await self.send_message_text_form(message_source, form_data)
async def _paint_form_on_card(
self,
message_source: platform_events.MessageEvent,
out_track_id: str,
form_data: dict,
session_key: str,
) -> None:
"""Update an existing card's content + buttons for human-input."""
actions = list(form_data.get('actions') or [])
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '') or ''
# Record form state for the click-handler.
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
self.card_state[out_track_id] = {
'session_key': session_key,
'launcher_type': launcher_type.value,
'launcher_id': launcher_id,
'sender_user_id': sender_user_id,
'form_token': form_data.get('form_token', ''),
'workflow_run_id': form_data.get('workflow_run_id', ''),
'actions': actions,
'node_title': node_title,
'form_content': form_content,
}
btns = self._build_btns(actions, out_track_id)
parts: list[str] = []
prior = self.active_turn_text.get(session_key, '') if session_key else ''
if prior.strip():
parts.append(prior.rstrip())
parts.append('---')
if node_title:
parts.append(f'**{node_title}**')
if form_content:
parts.append(form_content)
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
try:
await self.bot.update_card_data(
out_track_id=out_track_id,
card_param_map={
'content': display_content,
'btns': json.dumps(btns, ensure_ascii=False),
'flowStatus': '3',
},
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: paint form on card failed')
await self.send_message_text_form(message_source, form_data)
return
if session_key:
self.active_turn_text[session_key] = display_content
@staticmethod
def _build_btns(actions: list, out_track_id: str) -> list:
btns = []
for idx, action in enumerate(actions):
action_id = str(action.get('id') or '')
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
style = (action.get('button_style') or '').lower()
if style == 'primary' or (style == '' and idx == 0):
color = 'blue'
elif style == 'danger':
color = 'red'
else:
color = 'gray'
btns.append(
{
'text': title,
'color': color,
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {
'actionId': action_id,
'params': {'action_id': action_id, 'out_track_id': out_track_id},
},
},
}
)
return btns
async def _send_form_card(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
template_id: str,
) -> None:
"""Deliver a new card pre-loaded with the human-input prompt + buttons."""
out_track_id = uuid.uuid4().hex
open_space_id, is_group = self._derive_open_space(message_source)
launcher_type, launcher_id, sender_user_id = self._derive_session_descriptor(message_source)
session_key = f'{launcher_type.value}_{launcher_id}'
actions = list(form_data.get('actions') or [])
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '') or ''
self.card_state[out_track_id] = {
'session_key': session_key,
'launcher_type': launcher_type.value,
'launcher_id': launcher_id,
'sender_user_id': sender_user_id,
'form_token': form_data.get('form_token', ''),
'workflow_run_id': form_data.get('workflow_run_id', ''),
'actions': actions,
'node_title': node_title,
'form_content': form_content,
'open_space_id': open_space_id,
'is_group': is_group,
}
parts = []
if node_title:
parts.append(f'**{node_title}**')
if form_content:
parts.append(form_content)
display_content = '\n\n'.join(parts) or '请选择一个操作以继续。'
btns = []
for idx, action in enumerate(actions):
action_id = str(action.get('id') or '')
title = str(action.get('title') or action_id or f'选项 {idx + 1}')
style = (action.get('button_style') or '').lower()
if style == 'primary' or (style == '' and idx == 0):
color = 'blue'
elif style == 'danger':
color = 'red'
else:
color = 'gray'
btns.append(
{
'text': title,
'color': color,
'status': 'normal',
'event': {
'type': 'sendCardRequest',
'params': {
'actionId': action_id,
'params': {'action_id': action_id, 'out_track_id': out_track_id},
},
},
}
)
try:
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _send_form_card: out_track_id={out_track_id} template_id={template_id} '
f'open_space_id={open_space_id} is_group={is_group} btns={len(btns)}'
)
await self.bot.create_and_deliver_card(
card_template_id=template_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map={
'content': display_content,
'btns': json.dumps(btns, ensure_ascii=False),
'flowStatus': '3',
},
callback_type='STREAM',
)
except Exception:
await self.logger.error(f'DingTalk: deliver form card failed: {traceback.format_exc()}')
await self.send_message_text_form(message_source, form_data)
self.card_state.pop(out_track_id, None)
async def _lazy_create_resume_chat_card(
self,
message_source: platform_events.MessageEvent,
message_id: str,
) -> typing.Optional[tuple]:
"""Create a new card for resumed-workflow streaming output.
Used after a button click triggers a synthetic event — the form
card stays put with the "已选择" notice, and a fresh card is
spawned here for the LLM reply to stream into.
"""
form_template_id = (self.config.get('human_input_card_template_id') or '').strip()
legacy_template_id = (self.config.get('card_template_id') or '').strip()
template_id = form_template_id or legacy_template_id
if not template_id:
return None
out_track_id = uuid.uuid4().hex
open_space_id, is_group = self._derive_open_space(message_source)
if form_template_id:
card_param_map = {'content': '', 'btns': '[]', 'flowStatus': '1'}
card_data_config = None
else:
card_param_map = {'content': '', 'query': '...'}
card_data_config = {'autoLayout': self.config.get('card_auto_layout', False)}
try:
success = await self.bot.create_and_deliver_card(
card_template_id=template_id,
out_track_id=out_track_id,
open_space_id=open_space_id,
is_group=is_group,
card_param_map=card_param_map,
card_data_config=card_data_config,
callback_type='STREAM',
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: lazy create resume chat card failed')
return None
if not success:
return None
entry = (None, out_track_id)
self.card_instance_id_dict[message_id] = entry
# Register as the active card so any further chunks on this turn
# (and a subsequent re-pause) land on the same new card.
session_key = self._session_key_from_event(message_source)
if session_key:
self.active_turn_card[session_key] = out_track_id
self.active_turn_text[session_key] = ''
return entry
async def send_message_text_form(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> None:
"""Fallback: send the human-input prompt as plain text."""
display_text = _format_human_input_text(
form_data.get('node_title', ''),
form_data.get('form_content', ''),
form_data.get('actions', []) or [],
)
await self._send_proactive_to_event(message_source, display_text)
async def _send_proactive_to_event(
self,
message_source: platform_events.MessageEvent,
content: str,
) -> None:
"""Send `content` as a proactive message to the conversation behind
`message_source`. Used when no inbound chatbot message exists to
anchor a card on (e.g. resumed flows triggered by card actions).
"""
if not content:
return
if self.ap is not None:
target = (
str(message_source.group.id)
if isinstance(message_source, platform_events.GroupMessage)
else str(message_source.sender.id)
)
self.ap.logger.info(
f'DingTalk _send_proactive_to_event: target={target} '
f'is_group={isinstance(message_source, platform_events.GroupMessage)} content_len={len(content)}'
)
try:
if isinstance(message_source, platform_events.GroupMessage):
await self.bot.send_proactive_message_to_group(str(message_source.group.id), content)
else:
await self.bot.send_proactive_message_to_one(str(message_source.sender.id), content)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: send proactive message failed')
await self.logger.error(f'DingTalk: send proactive message failed: {traceback.format_exc()}')
async def _on_card_action(self, payload: dict) -> None:
"""Translate a card button click into a synthetic query.
Reads the clicked button's ``actionId`` (the real Dify action id —
the ButtonGroup template sends it back via `event.params.actionId`),
recovers the action title from ``card_state``, and enqueues a
synthetic `_dify_form_action` query the same way Lark / Telegram do.
"""
if self.ap is not None:
self.ap.logger.info(
f'DingTalk _on_card_action received: out_track_id={payload.get("out_track_id")} '
f'payload_action_id={payload.get("action_id")!r} params={payload.get("params")!r}'
)
out_track_id = payload.get('out_track_id') or ''
params = payload.get('params') or {}
# ButtonGroup `sendCardRequest` events surface the click id at the
# callback top level as `actionId`; fall back to `params.action_id`
# (alternate template wiring) and `params.actionId`.
raw_action_id = (
(payload.get('action_id') or '').strip()
or (params.get('action_id') or '').strip()
or (params.get('actionId') or '').strip()
or (params.get('id') or '').strip()
)
state = self.card_state.get(out_track_id)
if state is None:
await self.logger.warning(f'DingTalk: card action received for unknown out_track_id={out_track_id}')
return
if not raw_action_id:
await self.logger.warning(f'DingTalk: card action with no action_id, payload={payload}')
return
actions = state.get('actions', []) or []
action_id = raw_action_id
action_title = raw_action_id
for action in actions:
if str(action.get('id', '')) == raw_action_id:
action_title = action.get('title') or raw_action_id
break
launcher_type = (
provider_session.LauncherTypes.GROUP
if state.get('launcher_type') == provider_session.LauncherTypes.GROUP.value
else provider_session.LauncherTypes.PERSON
)
launcher_id = state.get('launcher_id', '')
sender_user_id = state.get('sender_user_id') or payload.get('user_id') or launcher_id
form_action_data = {
'form_token': state.get('form_token', ''),
'workflow_run_id': state.get('workflow_run_id', ''),
'action_id': action_id,
'action_title': action_title,
'node_title': state.get('node_title', ''),
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=sender_user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
special_title='',
),
message_chain=message_chain,
time=int(datetime.datetime.now().timestamp()),
source_platform_object=None,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=sender_user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=int(datetime.datetime.now().timestamp()),
source_platform_object=None,
)
bot_uuid = ''
pipeline_uuid = None
if self.ap is not None:
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
try:
self.ap.logger.info(
f'DingTalk _on_card_action enqueuing form action: action_id={action_id!r} '
f'action_title={action_title!r} launcher_type={launcher_type.value} '
f'launcher_id={launcher_id} bot_uuid={bot_uuid} pipeline_uuid={pipeline_uuid}'
)
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
self.ap.logger.info('DingTalk _on_card_action: query enqueued OK')
except Exception:
self.ap.logger.exception('DingTalk: enqueue form action query failed')
return
# Visual feedback on the form card itself: keep the prompt visible,
# add a "已选择" line, remove the buttons. The resumed-workflow
# output lives on a separate new card (lazy-created in
# reply_message_chunk on the synthetic event), so the form card
# stays put as a record of the user's selection.
asyncio.create_task(
self._mark_card_resolved(
out_track_id,
action_title,
node_title=state.get('node_title', ''),
form_content=state.get('form_content', ''),
)
)
# Crucial: do NOT leave the form card's out_track_id in
# active_turn_card — otherwise create_message_card for the
# synthetic event would reuse it for the resume output, painting
# the LLM reply on top of the "已选择" notice. Clear it so the
# resume goes through the lazy-create path and spawns a fresh card.
session_key = state.get('session_key', '')
if session_key and self.active_turn_card.get(session_key) == out_track_id:
self.active_turn_card.pop(session_key, None)
self.active_turn_text.pop(session_key, None)
# Once consumed, drop the state — the runner clears _PENDING_FORMS too.
self.card_state.pop(out_track_id, None)
async def _mark_card_resolved(
self,
out_track_id: str,
action_title: str,
*,
node_title: str = '',
form_content: str = '',
) -> None:
"""Update the form card to acknowledge the user's selection.
Keeps the original prompt visible, adds a "已选择: X" notice, and
clears the buttons. The card stays as a permanent record of the
choice; the resumed workflow's output goes to a separate new card.
"""
parts: list[str] = []
if node_title:
parts.append(f'**{node_title}**')
if form_content:
parts.append(form_content)
parts.append(f'---\n✅ 已选择:**{action_title}**')
content = '\n\n'.join(parts)
if self.ap is not None:
self.ap.logger.info(f'DingTalk _mark_card_resolved: out_track_id={out_track_id} action={action_title!r}')
try:
await self.bot.update_card_data(
out_track_id=out_track_id,
card_param_map={
'content': content,
'btns': '[]',
'flowStatus': '3',
},
)
except Exception:
if self.ap is not None:
self.ap.logger.exception('DingTalk: mark card resolved failed')

View File

@@ -103,6 +103,18 @@ spec:
type: string type: string
required: true required: true
default: "填写你的卡片template_id" default: "填写你的卡片template_id"
- name: human_input_card_template_id
label:
en_US: Human Input Card Template ID
zh_Hans: 人工输入卡片模板ID
zh_Hant: 人工輸入卡片範本ID
description:
en_US: "Template ID used as the SINGLE card for the whole conversation turn. Streamed LLM text fills the `content` markdown variable; on a Dify human-input pause the `btns` buttonGroup variable is populated so action buttons appear on the SAME card; after the user clicks a button the buttons disappear and resumed streaming continues into the same card. Use the bundled `src/langbot/templates/dingtalk_human_input_card.json` — it ships with `content` (MarkdownBlock) and `btns` (ButtonGroup) already wired. Leave empty to fall back to the legacy two-card behaviour (chat card streaming text + plain-text human-input prompts)."
zh_Hans: "用作整个对话回合**唯一**卡片的模板ID。流式 LLM 文本写入 `content` markdown 变量Dify 人工输入暂停时同一张卡的 `btns` buttonGroup 变量被填上、按钮浮现;用户点击后按钮消失、恢复的流式内容继续追加到同一张卡。可使用项目附带的 `src/langbot/templates/dingtalk_human_input_card.json`——已经预先连好 `content` (MarkdownBlock) 与 `btns` (ButtonGroup)。留空则降级为旧的双卡行为(聊天卡走流式 + 人工输入走纯文本)。"
zh_Hant: "用作整個對話回合**唯一**卡片的範本ID。流式 LLM 文字寫入 `content` markdown 變數Dify 人工輸入暫停時同一張卡的 `btns` buttonGroup 變數被填上、按鈕浮現;使用者點擊後按鈕消失、恢復的流式內容繼續追加到同一張卡。可使用專案附帶的 `src/langbot/templates/dingtalk_human_input_card.json`——已經預先連好 `content` (MarkdownBlock) 與 `btns` (ButtonGroup)。留空則降級為舊的雙卡行為(聊天卡走流式 + 人工輸入走純文字)。"
type: string
required: false
default: ""
execution: execution:
python: python:
path: ./dingtalk.py path: ./dingtalk.py

View File

@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
import langbot_plugin.api.entities.builtin.provider.session as provider_session
class AESCipher(object): class AESCipher(object):
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True) bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
api_client: lark_oapi.Client = pydantic.Field(exclude=True) api_client: lark_oapi.Client = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识 bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
pending_monitoring_msg: dict[str, str] pending_monitoring_msg: dict[str, str]
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks) # Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
reply_to_monitoring_msg: dict[str, tuple[str, float]] reply_to_monitoring_msg: dict[str, tuple[str, float]]
reply_message_card_ids: dict[str, str]
card_sequence_dict: dict[str, int]
# card_id → set of source message ids registered against it (for cleanup)
card_id_to_source_ids: dict[str, set[str]]
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
card_streaming_text: dict[str, str]
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
card_pre_pause_text: dict[str, str]
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
card_resume_transitioned: set[str]
_MONITORING_MAPPING_TTL = 600 # 10 minutes _MONITORING_MAPPING_TTL = 600 # 10 minutes
seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识 seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识
@@ -812,11 +824,134 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
asyncio.create_task(on_message(event)) asyncio.create_task(on_message(event))
def schedule_on_app_loop(coro):
"""Run a coroutine on the application event loop from sync callbacks."""
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
def sync_on_card_action(event): def sync_on_card_action(event):
try: try:
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {}) action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
# Parse JSON string values (from form action buttons)
if isinstance(action_value_raw, str):
try:
action_value_obj = json.loads(action_value_raw)
except (json.JSONDecodeError, TypeError):
action_value_obj = {}
else:
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else '' action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
# Handle Dify form action button clicks
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
form_token = action_value_obj.get('form_token', '')
workflow_run_id = action_value_obj.get('workflow_run_id', '')
action_id = action_value_obj.get('action_id', '')
session_key = action_value_obj.get('session_key', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
# Find the bot entity to get bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
form_action_data = {
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
context = getattr(event.event, 'context', None)
open_message_id = getattr(context, 'open_message_id', None)
source_time = datetime.datetime.now()
event_time = source_time.timestamp()
action_text = action_value_obj.get('action_id', 'confirm')
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
)
if open_message_id:
message_chain.insert(
0,
platform_message.Source(
id=open_message_id,
time=source_time,
),
)
operator = getattr(event.event, 'operator', None)
user_id = (
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
async def add_form_action_query():
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
schedule_on_app_loop(add_form_action_query())
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
if action_value == '有帮助': if action_value == '有帮助':
feedback_type = 1 feedback_type = 1
elif action_value == '无帮助': elif action_value == '无帮助':
@@ -857,17 +992,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
) )
if platform_events.FeedbackEvent in self.listeners: if platform_events.FeedbackEvent in self.listeners:
loop = asyncio.get_event_loop() schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
if loop.is_running():
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
else:
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}}) return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
except Exception: except Exception:
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}')) traceback.print_exc()
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}}) return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
@@ -893,6 +1025,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_id_dict={}, card_id_dict={},
pending_monitoring_msg={}, pending_monitoring_msg={},
reply_to_monitoring_msg={}, reply_to_monitoring_msg={},
reply_message_card_ids={},
card_sequence_dict={},
card_id_to_source_ids={},
card_streaming_text={},
card_pre_pause_text={},
card_resume_transitioned=set(),
seq=1, seq=1,
listeners={}, listeners={},
quart_app=quart_app, quart_app=quart_app,
@@ -1132,6 +1270,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
for k in expired: for k in expired:
del self.reply_to_monitoring_msg[k] del self.reply_to_monitoring_msg[k]
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
"""Return the next strictly increasing sequence for a card update."""
current = self.card_sequence_dict.get(card_id, 0)
next_seq = max(current + 1, suggested)
self.card_sequence_dict[card_id] = next_seq
return next_seq
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
"""Register a card_id under one or more source message ids."""
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
for sid in source_ids:
if not sid:
continue
self.reply_message_card_ids[sid] = card_id
bucket.add(sid)
def _drop_card_state(self, card_id: str) -> None:
"""Pop all per-card state for the given card_id."""
if not card_id:
return
for sid in self.card_id_to_source_ids.pop(card_id, set()):
self.reply_message_card_ids.pop(sid, None)
self.card_sequence_dict.pop(card_id, None)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
async def create_card_id(self, message_id): async def create_card_id(self, message_id):
try: try:
# self.logger.debug('飞书支持stream输出,创建卡片......') # self.logger.debug('飞书支持stream输出,创建卡片......')
@@ -1327,6 +1492,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.card_id_dict[message_id] = response.data.card_id self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id card_id = response.data.card_id
self.card_sequence_dict[card_id] = 0
return card_id return card_id
except Exception as e: except Exception as e:
@@ -1339,6 +1505,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
""" """
# message_id = event.message_chain.message_id # message_id = event.message_chain.message_id
source_message_id = str(event.message_chain.message_id)
existing_card_id = self.reply_message_card_ids.get(source_message_id)
if existing_card_id:
self.card_id_dict[message_id] = existing_card_id
return True
card_id = await self.create_card_id(message_id) card_id = await self.create_card_id(message_id)
content = { content = {
'type': 'card', 'type': 'card',
@@ -1377,6 +1549,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
user_msg_id = event.message_chain.message_id user_msg_id = event.message_chain.message_id
reply_msg_id = getattr(response.data, 'message_id', None) reply_msg_id = getattr(response.data, 'message_id', None)
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None) monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
# Register the card under both the user-incoming msg id (so a
# second reply_message_first_chunk for the same user message
# reuses this card) AND the bot-reply msg id (so a synthetic
# event from a form-button callback — whose Source.id equals
# the bot's card message id — hits the same card and renders
# the resume content into it).
if reply_msg_id:
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
else:
self._register_card_for_source(card_id, str(user_msg_id))
if reply_msg_id and monitoring_msg_id: if reply_msg_id and monitoring_msg_id:
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time()) self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
self._cleanup_monitoring_mapping() self._cleanup_monitoring_mapping()
@@ -1385,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return True return True
async def _open_new_form_card(
self,
message_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> str | None:
"""Spawn a fresh card to host a re-paused human-input prompt.
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
replies it to the current incoming message so it appears as the next
step in the chat, registers the new reply_msg_id so subsequent button
callbacks resolve back to it, and renders the prompt + buttons on it.
Returns the new card_id, or ``None`` if creation failed (caller is
responsible for falling back to in-place update so the workflow
remains continuable).
"""
source_message_id = getattr(message_source.message_chain, 'message_id', None)
if not source_message_id:
await self.logger.error('Cannot open new form card: source message_id missing')
return None
try:
new_card_id = await self.create_card_id(message_id)
except Exception:
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
return None
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
content = {
'type': 'card',
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
}
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(str(source_message_id))
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(content))
.msg_type('interactive')
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
try:
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
except Exception:
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
return None
if not response.success():
await self.logger.error(
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
f'log_id={response.get_log_id()}'
)
return None
reply_msg_id = getattr(response.data, 'message_id', None)
if reply_msg_id:
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
sequence = self._next_card_sequence(new_card_id, 1)
await self._update_card_layout(
card_id=new_card_id,
message_source=message_source,
text_message='',
sequence=sequence,
form_data=form_data,
show_form_prompt=True,
)
return new_card_id
async def reply_message( async def reply_message(
self, self,
message_source: platform_events.MessageEvent, message_source: platform_events.MessageEvent,
@@ -1504,45 +1773,544 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
): ):
""" """
回复消息变成更新卡片消息 回复消息变成更新卡片消息
Supports Dify form-action resume: when the runner yields a chunk with
``_resume_from_form=True``, the card transitions from buttons to a
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
for subsequent resume chunks to stream into.
When ``_open_new_card=True`` on the final chunk, the existing card is
left as-is and the pipeline will create a new card (with fresh form
buttons) for the re-pause.
""" """
# self.seq += 1
message_id = bot_message.resp_message_id message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence msg_seq = bot_message.msg_sequence
if msg_seq % 8 == 0 or is_final:
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
text_message = '' form_data = getattr(bot_message, '_form_data', None)
if text_elements: resume_from = getattr(bot_message, '_resume_from_form', False)
parts = [] action_title = getattr(bot_message, '_resume_action_title', '')
for paragraph in text_elements: resume_node_title = getattr(bot_message, '_resume_node_title', '')
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md')) open_new_card = getattr(bot_message, '_open_new_card', False)
if para_text: if action_title:
parts.append(para_text) if resume_node_title:
text_message = '\n\n'.join(parts) selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
else:
selected_notice = f'**已选择**{action_title}'
else:
selected_notice = ''
# content = { # ── decide whether this chunk needs a card update ────────────────────
# 'type': 'card_json', card_id = self.card_id_dict.get(message_id)
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}}, if not card_id:
# } return
request: ContentCardElementRequest = ( # ── convert message chain → text ─────────────────────────────────────
ContentCardElementRequest.builder() text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
.card_id(self.card_id_dict[message_id])
.element_id('streaming_txt') text_message = ''
.request_body( if text_elements:
ContentCardElementRequestBody.builder() parts = []
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204") for paragraph in text_elements:
.content(text_message) para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
.sequence(msg_seq) if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
card_sequence = self._next_card_sequence(card_id, msg_seq)
# ── RESUME: first chunk after button click ───────────────────────────
if resume_from and card_id not in self.card_resume_transitioned:
# Transition the card from the form state into resume mode.
# Preserve the text that was shown before the pause, and seed the
# resume placeholder with the current resume content if we already
# have any on the first yielded chunk.
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
initial_resume_text = text_message or '\u200b'
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=initial_resume_text,
)
self.card_resume_transitioned.add(card_id)
self.card_pre_pause_text[card_id] = pre_pause_text
self.card_streaming_text[card_id] = text_message
if not is_final:
return
# ── RESUME: subsequent chunks → full card update ─────────────────────
if resume_from and card_id in self.card_resume_transitioned:
cached = self.card_streaming_text.get(card_id, '')
if text_message != cached:
self.card_streaming_text[card_id] = text_message
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=text_message,
)
if not is_final:
return
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
if not resume_from and (msg_seq % 8 == 0 or is_final):
cached = self.card_streaming_text.get(card_id)
if text_message != cached:
self.card_streaming_text[card_id] = text_message
request: ContentCardElementRequest = (
ContentCardElementRequest.builder()
.card_id(card_id)
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
)
.build() .build()
) )
.build() response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
request, req_opt
)
if not response.success():
raise Exception(
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
# ── FINAL chunk: full card layout update ─────────────────────────────
if is_final:
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
resume_cached = self.card_streaming_text.get(card_id, '')
if form_data:
if open_new_card:
# The old card has already been laid out into resume mode
# by the resume-transition block above (notice + resume
# placeholder). Finalise it as a frozen step snapshot and
# spawn a brand-new card to host the next human-input
# prompt — each step stays visible as its own card in the
# chat history.
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
if new_card_id is None:
# Fallback: keep the existing in-place behaviour so the
# workflow remains continuable even if creating the
# new card failed.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=form_data,
resume_placeholder_text=resume_cached,
show_form_prompt=True,
)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
else:
# The old card is now a frozen snapshot; let go of its
# streaming-side state but keep its source registrations
# intact (no _drop_card_state) so historical button
# callbacks aimed at it can still be matched if needed.
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
else:
# Initial pause path: render prompt + buttons in place on
# the current card.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=final_seq,
form_data=form_data,
show_form_prompt=True,
)
# The human-input prompt itself is rendered as buttons only
# on Lark, so do not keep the hidden fallback text around;
# otherwise it will resurface after the button click.
self.card_streaming_text[card_id] = ''
self.card_pre_pause_text[card_id] = ''
else:
# Normal finish: keep pre-pause + resume content visible,
# remove buttons/notice, drop the resume placeholder.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=None,
notice_text=selected_notice if resume_from else '',
resume_placeholder_text=resume_cached,
)
self._drop_card_state(card_id)
self.card_id_dict.pop(message_id, None)
# ── media (images / files) appended at the end ───────────────────────
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def _add_form_buttons_to_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
text_message: str = '',
sequence: int = 1,
):
"""Update the entire card to include form action buttons.
Uses card.aupdate to replace the card JSON with a template that
includes the streaming text content plus interactive buttons.
"""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=form_data,
)
async def _remove_form_buttons_from_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
):
"""Replace the human-input card layout with the plain final layout."""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=None,
)
async def _update_card_layout(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
form_data: dict | None = None,
notice_text: str = '',
resume_placeholder_text: str = '',
show_form_prompt: bool = True,
):
"""Update the entire card layout.
• form_data → show interactive buttons (initial Dify pause)
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
• resume_placeholder_text → add a streaming_txt_resume markdown element
"""
form_data = form_data or {}
actions = form_data.get('actions', [])
form_token = form_data.get('form_token', '')
workflow_run_id = form_data.get('workflow_run_id', '')
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '')
# When form_data is set, the visible content is rendered inside the
# interactive container, so the top streaming text should stay empty
# to avoid duplicate text above the action area.
#
# For resume notice state, keep the existing text visible in the card
# and only add the grey "selected" notice below it.
if form_data:
render_text_message = ''
else:
render_text_message = text_message
# Determine session key from message source
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'group_{message_source.group.id}'
else:
session_key = f'person_{message_source.sender.id}'
# Build button elements matching the existing card template's thumbsup/down format
action_buttons = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
button_style = action.get('button_style', 'default')
if button_style == 'primary':
lark_button_type = 'primary'
elif button_style == 'danger':
lark_button_type = 'danger'
else:
lark_button_type = 'default'
action_buttons.append(
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': action_title},
'type': lark_button_type,
'width': 'fill',
'size': 'medium',
'hover_tips': {'tag': 'plain_text', 'content': action_title},
'behaviors': [
{
'type': 'callback',
'value': {
'form_action': True,
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'session_key': session_key,
},
}
],
'margin': '0px 0px 0px 0px',
}
) )
if is_final and bot_message.tool_calls is None: interactive_elements = []
# self.seq = 1 # 消息回复结束之后重置seq if form_data:
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片 if show_form_prompt:
interactive_elements = [
{
'tag': 'markdown',
'content': f'**[Human Input Required] {node_title}**',
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 4px 0px',
}
]
if form_content:
interactive_elements.append(
{
'tag': 'markdown',
'content': form_content,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 8px 0px',
}
)
interactive_elements.append(
{
'tag': 'column_set',
'horizontal_spacing': '8px',
'horizontal_align': 'left',
'margin': '0px 0px 0px 0px',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [btn],
'padding': '0px 0px 0px 0px',
}
for btn in action_buttons
],
}
)
# Build the full card JSON with buttons, same structure as create_card_id
# ── mid_section: either form buttons, resume notice, or empty ──
mid_section_elements = []
if form_data:
mid_section_elements = [
{
'tag': 'interactive_container',
'margin': '12px 0px 8px 0px',
'padding': '12px 12px 12px 12px',
'has_border': True,
'elements': interactive_elements,
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
elif notice_text:
mid_section_elements = [
{
'tag': 'markdown',
'content': notice_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '8px 0px 4px 0px',
'text_color': 'grey',
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
resume_elements = []
if resume_placeholder_text:
resume_elements = [
{
'tag': 'markdown',
'content': resume_placeholder_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt_resume',
},
]
card_data = {
'schema': '2.0',
'config': {
'update_multi': True,
'streaming_mode': False,
},
'body': {
'direction': 'vertical',
'padding': '12px 12px 12px 12px',
'elements': [
{
'tag': 'div',
'text': {
'tag': 'plain_text',
'content': 'LangBot',
'text_size': 'normal',
'text_align': 'left',
'text_color': 'default',
},
'icon': {
'tag': 'custom_icon',
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
},
},
{
'tag': 'markdown',
'content': render_text_message,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt',
},
*mid_section_elements,
*resume_elements,
{
'tag': 'column_set',
'horizontal_spacing': '12px',
'horizontal_align': 'right',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [
{
'tag': 'markdown',
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
'text_align': 'left',
'text_size': 'notation',
'margin': '4px 0px 0px 0px',
'icon': {
'tag': 'standard_icon',
'token': 'robot_outlined',
'color': 'grey',
},
}
],
'padding': '0px 0px 0px 0px',
'direction': 'vertical',
'horizontal_spacing': '8px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
'weight': 1,
},
*(
[]
if form_data
else [
{
'tag': 'column',
'width': '20px',
'elements': [
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': ''},
'type': 'text',
'width': 'fill',
'size': 'medium',
'icon': {'tag': 'standard_icon', 'token': 'thumbsup_outlined'},
'hover_tips': {'tag': 'plain_text', 'content': '有帮助'},
'behaviors': [{'type': 'callback', 'value': {'feedback': '有帮助'}}],
'margin': '0px 0px 0px 0px',
}
],
'padding': '0px 0px 0px 0px',
'direction': 'vertical',
'horizontal_spacing': '8px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
},
{
'tag': 'column',
'width': '30px',
'elements': [
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': ''},
'type': 'text',
'width': 'default',
'size': 'medium',
'icon': {'tag': 'standard_icon', 'token': 'thumbdown_outlined'},
'hover_tips': {'tag': 'plain_text', 'content': '无帮助'},
'behaviors': [{'type': 'callback', 'value': {'feedback': '无帮助'}}],
'margin': '0px 0px 0px 0px',
}
],
'padding': '0px 0px 0px 0px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
},
]
),
],
'margin': '0px 0px 4px 0px',
},
],
},
}
try:
tenant_key = ( tenant_key = (
message_source.source_platform_object.header.tenant_key message_source.source_platform_object.header.tenant_key
if message_source.source_platform_object if message_source.source_platform_object
@@ -1558,39 +2326,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
.tenant_access_token(tenant_access_token) .tenant_access_token(tenant_access_token)
.build() .build()
) )
# 发起请求
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
# 处理失败返回 request: UpdateCardRequest = (
if not response.success(): UpdateCardRequest.builder()
raise Exception( .card_id(card_id)
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}' .request_body(
UpdateCardRequestBody.builder()
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
.build()
) )
return .build()
)
# Send media messages when streaming is done response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
if is_final and media_items: if not response.success():
for media in media_items: await self.logger.error(
media_request: ReplyMessageRequest = ( f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
ReplyMessageRequest.builder() f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
.message_id(message_source.message_chain.message_id) )
.request_body( except Exception:
ReplyMessageRequestBody.builder() await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def is_muted(self, group_id: int) -> bool: async def is_muted(self, group_id: int) -> bool:
return False return False

View File

@@ -1,14 +1,14 @@
from __future__ import annotations from __future__ import annotations
import time
import telegram import telegram
import telegram.ext import telegram.ext
from telegram import Update from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
import telegramify_markdown import telegramify_markdown
import typing import typing
import traceback import traceback
import json
import base64 import base64
import pydantic import pydantic
@@ -189,6 +189,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: telegram.Bot = pydantic.Field(exclude=True) bot: telegram.Bot = pydantic.Field(exclude=True)
application: telegram.ext.Application = pydantic.Field(exclude=True) application: telegram.ext.Application = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
message_converter: TelegramMessageConverter = TelegramMessageConverter() message_converter: TelegramMessageConverter = TelegramMessageConverter()
event_converter: TelegramEventConverter = TelegramEventConverter() event_converter: TelegramEventConverter = TelegramEventConverter()
@@ -224,6 +225,102 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
telegram_callback, telegram_callback,
) )
) )
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
try:
data = json.loads(query.data)
if data.get('form_action') or data.get('f'):
import langbot_plugin.api.entities.builtin.provider.session as provider_session
workflow_run_id = data.get('workflow_run_id', '')
w_suffix = data.get('w', '')
action_id = data.get('action_id') or data.get('a', '')
session_key = data.get('session_key') or data.get('s', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
user_id = str(query.from_user.id)
# Find bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for b in self.ap.platform_mgr.bots:
if b.adapter is self:
bot_uuid = b.bot_entity.uuid
pipeline_uuid = b.bot_entity.use_pipeline_uuid
break
form_action_data = {
'workflow_run_id': workflow_run_id,
'w_suffix': w_suffix,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
source_platform_object=update,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
source_platform_object=update,
)
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
except Exception:
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
application.add_handler(CallbackQueryHandler(callback_query_handler))
super().__init__( super().__init__(
config=config, config=config,
logger=logger, logger=logger,
@@ -319,14 +416,19 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
update = event.source_platform_object update = event.source_platform_object
chat_id = update.effective_chat.id chat_id = update.effective_chat.id
chat_type = update.effective_chat.type chat_type = update.effective_chat.type
message_thread_id = update.message.message_thread_id effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
if chat_type == 'private': if chat_type == 'private':
draft_id = int(time.time() * 1000) import time as _time
self.msg_stream_id[message_id] = ('private', draft_id)
draft_id = int(_time.time() * 1000)
self.msg_stream_id[message_id] = ('private', draft_id)
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id) args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
await self.bot.send_message_draft(**args) try:
await self.bot.send_message_draft(**args)
except (telegram.error.RetryAfter, telegram.error.BadRequest):
pass
else: else:
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id) args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
send_msg = await self.bot.send_message(**args) send_msg = await self.bot.send_message(**args)
@@ -347,12 +449,13 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
assert isinstance(message_source.source_platform_object, Update) assert isinstance(message_source.source_platform_object, Update)
update = message_source.source_platform_object update = message_source.source_platform_object
chat_id = update.effective_chat.id chat_id = update.effective_chat.id
message_thread_id = update.message.message_thread_id effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
if message_id not in self.msg_stream_id: if message_id not in self.msg_stream_id:
return return
chat_mode, draft_id = self.msg_stream_id[message_id] chat_mode, stream_id = self.msg_stream_id[message_id]
components = await TelegramMessageConverter.yiri2target(message, self.bot) components = await TelegramMessageConverter.yiri2target(message, self.bot)
if not components or components[0]['type'] != 'text': if not components or components[0]['type'] != 'text':
@@ -361,16 +464,42 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return return
content = components[0]['text'] content = components[0]['text']
form_data = getattr(bot_message, '_form_data', None)
if form_data and is_final:
self.msg_stream_id.pop(message_id, None)
await self._send_form_action_buttons(message_source, form_data)
return
if chat_mode == 'private': if chat_mode == 'private':
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id) # Streaming via draft (ephemeral preview in the chat input area)
await self.bot.send_message_draft(**args) if (msg_seq - 1) % 8 == 0 or is_final:
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=stream_id)
try:
await self.bot.send_message_draft(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = content[:4000] + '\n\n… (truncated)'
try:
await self.bot.send_message_draft(**args)
except telegram.error.RetryAfter:
pass
else:
pass # Ignore other draft errors (cosmetic)
if is_final and bot_message.tool_calls is None: if is_final and bot_message.tool_calls is None:
del args['draft_id'] # Finalise: send the real message, discard the draft
await self.bot.send_message(**args) args = self._build_message_args(chat_id, content, message_thread_id)
try:
await self.bot.send_message(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = content[:4000] + '\n\n… (truncated)'
await self.bot.send_message(**args)
else:
raise
self.msg_stream_id.pop(message_id) self.msg_stream_id.pop(message_id)
else: else:
stream_id = draft_id # Streaming via edit_message_text (persistent message)
if (msg_seq - 1) % 8 == 0 or is_final: if (msg_seq - 1) % 8 == 0 or is_final:
args = { args = {
'message_id': stream_id, 'message_id': stream_id,
@@ -379,11 +508,68 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
} }
if self.config.get('markdown_card', False): if self.config.get('markdown_card', False):
args['parse_mode'] = 'MarkdownV2' args['parse_mode'] = 'MarkdownV2'
await self.bot.edit_message_text(**args) try:
await self.bot.edit_message_text(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = self._process_markdown(content[:4000] + '\n\n… (truncated)')
await self.bot.edit_message_text(**args)
else:
raise
if is_final and bot_message.tool_calls is None: if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id) self.msg_stream_id.pop(message_id)
async def _send_form_action_buttons(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
):
"""Send inline keyboard buttons for Dify human_input_required form actions."""
actions = form_data.get('actions', [])
node_title = form_data.get('node_title', '')
form_content = form_data.get('form_content', '')
workflow_run_id = form_data.get('workflow_run_id', '')
# Telegram callback_data is capped at 64 bytes, so we identify the
# paused workflow by the last 8 chars of workflow_run_id (unique
# within a session with overwhelming probability).
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'g:{message_source.group.id}'
else:
session_key = f'p:{message_source.sender.id}'
keyboard = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
if w_suffix:
callback_payload['w'] = w_suffix
callback_data = json.dumps(callback_payload, separators=(',', ':'))
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
reply_markup = InlineKeyboardMarkup(keyboard)
update = message_source.source_platform_object
chat_id = update.effective_chat.id
effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
text_lines = [f'[{node_title}] Please select an action:']
if form_content:
text_lines.insert(0, form_content)
args = {
'chat_id': chat_id,
'text': '\n\n'.join(text_lines),
'reply_markup': reply_markup,
}
if message_thread_id:
args['message_thread_id'] = message_thread_id
await self.bot.send_message(**args)
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
if not isinstance(event.source_platform_object, Update): if not isinstance(event.source_platform_object, Update):
return None return None

View File

@@ -296,6 +296,7 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
listeners: dict = {} listeners: dict = {}
_stream_to_monitoring_msg: dict = {} # Maps stream_id to (monitoring_message_id, timestamp) _stream_to_monitoring_msg: dict = {} # Maps stream_id to (monitoring_message_id, timestamp)
_STREAM_MAPPING_TTL = 600 # 10 minutes _STREAM_MAPPING_TTL = 600 # 10 minutes
ap: typing.Any = None
def __init__(self, config: dict, logger: EventLogger): def __init__(self, config: dict, logger: EventLogger):
enable_webhook = config.get('enable-webhook', False) enable_webhook = config.get('enable-webhook', False)
@@ -336,6 +337,12 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
_stream_to_monitoring_msg={}, _stream_to_monitoring_msg={},
) )
# Both WecomBotClient (webhook) and WecomBotWsClient (ws long-conn)
# expose ``set_card_action_callback``. Wire the click handler so
# Dify human-input button taps resume the workflow on either mode.
if hasattr(self.bot, 'set_card_action_callback'):
self.bot.set_card_action_callback(self._on_card_action)
async def reply_message( async def reply_message(
self, self,
message_source: platform_events.MessageEvent, message_source: platform_events.MessageEvent,
@@ -345,15 +352,37 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
content = await self.message_converter.yiri2target(message) content = await self.message_converter.yiri2target(message)
_ws_mode = not self.config.get('enable-webhook', False) _ws_mode = not self.config.get('enable-webhook', False)
event = message_source.source_platform_object
# Synthetic events (button-click resume queries) have no inbound
# platform object. Fall back to a proactive send so error
# messages and one-shot replies still reach the user.
if event is None:
if _ws_mode:
if isinstance(message_source, platform_events.GroupMessage):
chat_id = str(message_source.group.id)
else:
chat_id = str(message_source.sender.id)
try:
await self.bot.send_message(chat_id, content)
except Exception:
await self.logger.error(
f'WeComBot: proactive reply for synthetic event failed: {traceback.format_exc()}'
)
else:
await self.logger.warning(
'WeComBot webhook mode cannot reply to a synthetic event '
'(no req_id and no proactive-send credentials); dropping.'
)
return
if _ws_mode: if _ws_mode:
event = message_source.source_platform_object req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '')
req_id = event.get('req_id', '')
if req_id: if req_id:
await self.bot.reply_text(req_id, content) await self.bot.reply_text(req_id, content)
else: else:
await self.bot.set_message(event.message_id, content) await self.bot.set_message(event.message_id, content)
else: else:
await self.bot.set_message(message_source.source_platform_object.message_id, content) await self.bot.set_message(event.message_id, content)
async def reply_message_chunk( async def reply_message_chunk(
self, self,
@@ -364,9 +393,56 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
is_final: bool = False, is_final: bool = False,
): ):
content = await self.message_converter.yiri2target(message) content = await self.message_converter.yiri2target(message)
msg_id = message_source.source_platform_object.message_id
_ws_mode = not self.config.get('enable-webhook', False) _ws_mode = not self.config.get('enable-webhook', False)
# Synthetic events (e.g. button-click triggered form resume) have
# no inbound platform message — no msg_id, no req_id, no stream
# session. The output must go via the proactive-send path instead
# of the stream/reply path.
spo = message_source.source_platform_object
if spo is None:
return await self._handle_synthetic_chunk(message_source, bot_message, content, is_final, _ws_mode)
msg_id = spo.message_id
# Dify human-input pause: when the runner attaches `_form_data` to
# the final chunk, hand the button_interaction card off to the
# underlying client. In webhook mode the card is queued for the
# next followup poll; in ws mode it's sent as a reply frame
# immediately. Falls back to plain text when the bot has no active
# stream session for this msg_id (rare).
form_data = getattr(bot_message, '_form_data', None)
if form_data and is_final:
if hasattr(self.bot, 'push_form_pause'):
ok, stream_id, task_id = await self.bot.push_form_pause(msg_id, form_data)
if ok:
await self.logger.info(
f'WeComBot: pending button_interaction registered '
f'stream_id={stream_id} task_id={task_id} ws_mode={_ws_mode}'
)
return {'stream': True, 'form': True, 'task_id': task_id}
await self.logger.warning(
'WeComBot: cannot register form pause (no active stream session); falling back to plain text'
)
try:
from langbot.pkg.provider.runners.difysvapi import _format_human_input_text
fallback = _format_human_input_text(
form_data.get('node_title', ''),
form_data.get('form_content', ''),
form_data.get('actions', []) or [],
)
except Exception:
fallback = content or '(人工输入)'
if _ws_mode:
event = message_source.source_platform_object
req_id = event.get('req_id', '') if isinstance(event, dict) else getattr(event, 'req_id', '')
if req_id:
await self.bot.reply_text(req_id, fallback)
else:
await self.bot.set_message(msg_id, fallback)
return {'stream': False, 'form': True, 'fallback': True}
if _ws_mode: if _ws_mode:
success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final) success = await self.bot.push_stream_chunk(msg_id, content, is_final=is_final)
if not success and is_final: if not success and is_final:
@@ -385,6 +461,129 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""Whether streaming output is enabled for this bot instance.""" """Whether streaming output is enabled for this bot instance."""
return self.config.get('enable-stream-reply', True) return self.config.get('enable-stream-reply', True)
async def _handle_synthetic_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
content: str,
is_final: bool,
ws_mode: bool,
) -> dict:
"""Handle reply_message_chunk for synthetic events (button clicks).
Synthetic events have no inbound message → no msg_id, no req_id,
no stream session. We can't do incremental streaming, so we
buffer chunks per-conversation and flush on ``is_final`` via the
proactive send path.
Buffer keyed by ``(launcher_type, launcher_id)`` from the
synthetic event itself. Only ws mode has a usable proactive-send
path right now (``ws_client.send_message`` /
``ws_client.send_template_card``); webhook mode requires a
corpid/secret we don't have, so it logs and drops.
"""
if isinstance(message_source, platform_events.GroupMessage):
chat_id = str(message_source.group.id)
else:
chat_id = str(message_source.sender.id)
form_data = getattr(bot_message, '_form_data', None)
# Buffer streaming content until is_final.
buf_key = chat_id
if not hasattr(self, '_synthetic_buffers'):
# Attribute-not-declared trick: pydantic forbids dynamic attrs
# on the model, but plain instance dicts via object.__setattr__
# do work. Lazy-create on first call.
object.__setattr__(self, '_synthetic_buffers', {})
buffers: dict[str, str] = self._synthetic_buffers
if content and not form_data:
buffers[buf_key] = buffers.get(buf_key, '') + content
if not is_final:
return {'stream': True, 'synthetic': True, 'buffered': True}
final_content = buffers.pop(buf_key, '')
if content and final_content.startswith(content):
# is_final chunk re-emitted the full accumulated text — keep
# whichever is longer.
final_content = final_content if len(final_content) >= len(content) else content
elif content and not final_content:
final_content = content
if not ws_mode:
await self.logger.warning(
'WeComBot webhook mode cannot proactively push synthetic-event '
'output (no corpid/secret); the resume reply is dropped. '
f'content_len={len(final_content)} form_data_present={form_data is not None}'
)
return {'stream': False, 'synthetic': True, 'dropped': True}
# ws mode: proactive send.
try:
if form_data:
# Determine user_id / chat_id for the routing context of any
# subsequent click on this card.
if isinstance(message_source, platform_events.GroupMessage):
routing_chat_id = str(message_source.group.id)
routing_user_id = str(message_source.sender.id)
else:
routing_chat_id = ''
routing_user_id = str(message_source.sender.id)
payload = self._build_button_interaction_payload_from_form(
form_data,
user_id=routing_user_id,
chat_id=routing_chat_id,
)
await self.bot.send_template_card(chat_id, payload)
await self.logger.info(
f'WeComBot ws: proactively sent template_card for synthetic event '
f'chat_id={chat_id} form_token={form_data.get("form_token")!r} '
f'workflow_run_id={form_data.get("workflow_run_id")!r}'
)
elif final_content:
await self.bot.send_message(chat_id, final_content)
await self.logger.info(
f'WeComBot ws: proactively sent text for synthetic event chat_id={chat_id} len={len(final_content)}'
)
except Exception:
await self.logger.error(f'WeComBot: synthetic event proactive send failed: {traceback.format_exc()}')
return {'stream': False, 'synthetic': True, 'error': True}
return {'stream': True, 'synthetic': True}
def _build_button_interaction_payload_from_form(
self, form_data: dict, *, user_id: str = '', chat_id: str = ''
) -> dict:
"""Build a button_interaction payload + track task_id for click resolution.
Unlike the inbound-event path (where push_form_pause registers the
task_id with the active stream session), proactive sends still
need the task_id registered so button clicks find pending_form.
For ws mode we stash it directly on the ws_client's pending dict.
"""
from langbot.libs.wecom_ai_bot_api.api import build_button_interaction_payload
import secrets as _secrets
task_id = f'dify-{_secrets.token_hex(12)}'
payload = build_button_interaction_payload(form_data, task_id)
# Register task_id → form_data so the click callback can find it.
# user_id / chat_id are required so _on_card_action can route the
# resulting synthetic query back to the right user. msg_id / req_id
# / stream_id are intentionally empty — synthetic cards have no
# inbound message to anchor on.
if hasattr(self.bot, '_pending_forms_by_task'):
self.bot._pending_forms_by_task[task_id] = {
'form_data': form_data,
'msg_id': '',
'user_id': user_id,
'chat_id': chat_id,
'stream_id': '',
'req_id': '',
}
return payload
async def send_message(self, target_type, target_id, message): async def send_message(self, target_type, target_id, message):
_ws_mode = not self.config.get('enable-webhook', False) _ws_mode = not self.config.get('enable-webhook', False)
if _ws_mode: if _ws_mode:
@@ -531,3 +730,114 @@ class WecomBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
async def is_muted(self, group_id: int) -> bool: async def is_muted(self, group_id: int) -> bool:
pass pass
# ------------------------------------------------------------------
# Dify human-input button-interaction click handling
# ------------------------------------------------------------------
async def _on_card_action(self, session, action_id: str, task_id: str, raw_event: dict) -> None:
"""Translate a button click on a button_interaction card into a
synthetic ``_dify_form_action`` query enqueued on the pool.
Pattern mirrors DingTalk / Lark / Telegram so the runner's
``_merge_pending_form_action`` path resumes the workflow.
"""
import langbot_plugin.api.entities.builtin.provider.session as provider_session
form = session.pending_form or {}
await self.logger.info(
f'WeComBot _on_card_action: task_id={task_id} action_id={action_id!r} '
f'form_token={form.get("form_token")!r} workflow_run_id={form.get("workflow_run_id")!r} '
f'session.user_id={session.user_id!r} session.chat_id={session.chat_id!r}'
)
actions = form.get('actions') or []
clean_action_id = (action_id or '').strip()
action_title = clean_action_id
for a in actions:
if str(a.get('id', '')) == clean_action_id:
action_title = a.get('title') or clean_action_id
break
launcher_id = session.user_id or session.chat_id or ''
sender_user_id = session.user_id or launcher_id
# WeCom AI bot has both single-chat and group-chat; chat_id present
# indicates group context.
if session.chat_id:
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = session.chat_id
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = session.user_id or ''
form_action_data = {
'form_token': form.get('form_token', ''),
'workflow_run_id': form.get('workflow_run_id', ''),
'action_id': clean_action_id,
'action_title': action_title,
'node_title': form.get('node_title', ''),
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain([platform_message.Plain(text=f'[Form Action: {action_title}]')])
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=sender_user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
special_title='',
),
message_chain=message_chain,
time=int(time.time()),
source_platform_object=None,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=sender_user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=int(time.time()),
source_platform_object=None,
)
if self.ap is None:
await self.logger.error('WeComBot: ap not injected; cannot enqueue button-click query')
return
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
try:
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
await self.logger.info(f'WeComBot: button-click query enqueued action_id={clean_action_id!r}')
except Exception:
await self.logger.error(f'WeComBot: enqueue button-click query failed: {traceback.format_exc()}')

View File

@@ -2,9 +2,11 @@ from __future__ import annotations
import typing import typing
import json import json
import time
import uuid import uuid
import base64 import base64
import mimetypes import mimetypes
from collections import OrderedDict
from langbot.pkg.provider import runner from langbot.pkg.provider import runner
@@ -16,6 +18,125 @@ from langbot.libs.dify_service_api.v1 import client, errors
import httpx import httpx
# Module-level store for paused-workflow form state, keyed by session key
# (launcher_type_value + "_" + launcher_id). Each session holds an
# insertion-ordered dict of form_token -> form_data, allowing multiple
# Dify workflows to be paused simultaneously for the same session.
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
def _session_key_from_query(query: pipeline_query.Query) -> str:
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
def _prune_pending_forms(now: float | None = None) -> None:
if now is None:
now = time.time()
for session_key in list(_PENDING_FORMS.keys()):
forms = _PENDING_FORMS[session_key]
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
for token in expired_tokens:
forms.pop(token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
_prune_pending_forms()
stored = dict(form_data)
expiration_time = stored.get('expiration_time')
try:
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
except (TypeError, ValueError):
expiration_ts = 0.0
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
form_token = str(stored.get('form_token') or '')
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
# Re-insert at the end so this becomes the "latest" entry
forms.pop(form_token, None)
forms[form_token] = stored
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not form_token:
return None
return forms.get(form_token)
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
"""Look up a pending form whose workflow_run_id ends with the given suffix.
Used by adapters (e.g. Telegram) whose callback payload is too small to
carry the full form_token / workflow_run_id.
"""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not w_suffix:
return None
for token in reversed(forms):
form = forms[token]
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
return form
return None
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return None
return forms[next(reversed(forms))]
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
"""Iterate pending forms for a session, newest-first."""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
for token in reversed(list(forms.keys())):
yield forms[token]
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
"""Clear one specific pending form (by token) or all forms for the session."""
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
if form_token is None:
_PENDING_FORMS.pop(session_key, None)
return
forms.pop(form_token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
def _format_human_input_text(
node_title: str,
form_content: str,
actions: list[dict[str, typing.Any]],
) -> str:
"""Render a paused-workflow human-input prompt as plain text.
Used by adapters without rich UI (no buttons/cards) so users can reply
with the option number or the option title to resume the workflow.
"""
lines: list[str] = [f'[Human Input Required] {node_title or ""}'.rstrip()]
if form_content:
lines.append('')
lines.append(form_content)
if actions:
lines.append('')
lines.append('Reply with the number or title to continue:')
for idx, action in enumerate(actions, start=1):
title = action.get('title') or action.get('id') or ''
lines.append(f' {idx}. {title}')
return '\n'.join(lines)
@runner.runner_class('dify-service-api') @runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner): class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器""" """Dify Service API 对话请求器"""
@@ -335,11 +456,155 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id'] query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form_blocking(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
inputs = form_action.get('inputs', {})
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_finished':
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
yield provider_message.Message(
role='assistant',
content=content,
)
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
"""Locate the pending form this action targets.
Tries identifiers in order of specificity: form_token, full
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
then falls back to the newest pending form for the session.
"""
form_token = form_action.get('form_token')
if form_token:
form = _get_pending_form_by_token(session_key, form_token)
if form:
return form
workflow_run_id = form_action.get('workflow_run_id')
if workflow_run_id:
for form in _iter_pending_forms(session_key):
if form.get('workflow_run_id') == workflow_run_id:
return form
w_suffix = form_action.get('w_suffix')
if w_suffix:
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
if form:
return form
return _get_latest_pending_form(session_key)
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
"""Backfill resume fields from the matching pending form."""
if not form_action:
return None
merged_action = dict(form_action)
merged_action.pop('w_suffix', None)
pending_form = self._resolve_pending_form(session_key, form_action)
if pending_form:
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
'workflow_run_id', ''
)
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
merged_action.setdefault('user', pending_form.get('user', ''))
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
# Resolve clicked action's display title from the stored actions list
if 'action_title' not in merged_action:
clicked_id = merged_action.get('action_id', '')
for action in pending_form.get('actions', []):
if str(action.get('id', '')) == str(clicked_id):
merged_action['action_title'] = action.get('title', clicked_id)
break
return merged_action
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
"""Match plain text replies against pending Dify form actions.
Resolution order:
1. A pure digit reply (e.g. "1", "2") maps to the 1-indexed action of
the most recent pending form. Lets users on plain-text platforms
pick options without retyping titles.
2. Otherwise, iterate pending forms newest-first and match each
action's title/id case-insensitively. The first hit wins, so when
two forms share a button label the newer one resolves.
"""
normalized_text = user_text.strip().lower()
if not normalized_text:
return None
def _build(pending_form: dict, action: dict) -> dict:
return {
'form_token': pending_form.get('form_token', ''),
'workflow_run_id': pending_form.get('workflow_run_id', ''),
'action_id': action.get('id', ''),
'action_title': action.get('title', action.get('id', '')),
'node_title': pending_form.get('node_title', ''),
'inputs': pending_form.get('inputs', {}),
'user': pending_form.get('user', ''),
}
if normalized_text.isdigit():
position = int(normalized_text)
latest_form = _get_latest_pending_form(session_key)
if latest_form is not None:
actions = latest_form.get('actions', [])
if 1 <= position <= len(actions):
return _build(latest_form, actions[position - 1])
for pending_form in _iter_pending_forms(session_key):
for action in pending_form.get('actions', []):
titles = {
str(action.get('title', '')).strip().lower(),
str(action.get('id', '')).strip().lower(),
}
if normalized_text in titles:
return _build(pending_form, action)
return None
async def _workflow_messages( async def _workflow_messages(
self, query: pipeline_query.Query self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]: ) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流""" """调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
async for msg in self._submit_workflow_form_blocking(form_action):
yield msg
return
if not query.session.using_conversation.uuid: if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4()) query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -366,6 +631,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
} }
inputs.update(query.variables) inputs.update(query.variables)
human_input_yielded = False
async for chunk in self.dify_client.workflow_run( async for chunk in self.dify_client.workflow_run(
inputs=inputs, inputs=inputs,
@@ -377,6 +643,45 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] in ignored_events: if chunk['event'] in ignored_events:
continue continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', '')
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': reason.get('inputs', {}),
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
display_text = _format_human_input_text(node_title, form_content, actions)
human_input_yielded = True
yield provider_message.Message(
role='assistant',
content=display_text,
)
if chunk['event'] == 'node_started': if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end': if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue continue
@@ -399,6 +704,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg yield msg
elif chunk['event'] == 'workflow_finished': elif chunk['event'] == 'workflow_finished':
if human_input_yielded:
break
if chunk['data']['error']: if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error']) raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary']) content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
@@ -636,11 +943,153 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id'] query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""Submit human input to resume a paused Dify workflow."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
action_title = form_action.get('action_title', '') or action_id
node_title = form_action.get('node_title', '')
inputs = form_action.get('inputs', {})
messsage_idx = 0
is_final = False
think_start = False
think_end = False
workflow_contents = ''
repause_form_data: dict | None = None
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
yield_this_iteration = False
if chunk['event'] == 'workflow_finished':
is_final = True
yield_this_iteration = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
# Use a distinct name — `node_title` (the just-resolved step)
# must keep its value so the resume notice on the previous
# card still shows which step the user acted on.
paused_node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
user,
{
'workflow_run_id': new_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': paused_node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': user,
},
)
repause_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': paused_node_title,
'workflow_run_id': new_run_id,
'form_token': reason.get('form_token', ''),
}
# Ensure the final chunk has non-empty content so
# ResponseWrapper (which skips empty-content chunks) lets it
# propagate to SendResponseBackStage. Use a zero-width space
# so neither Lark nor Telegram renders visible noise — the
# adapter substitutes its own card text from _form_data.
if not workflow_contents:
workflow_contents = ''
is_final = True
yield_this_iteration = True
break
if chunk['event'] == 'text_chunk':
messsage_idx += 1
if remove_think:
if '<think>' in chunk['data']['text'] and not think_start:
think_start = True
continue
if '</think>' in chunk['data']['text'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += chunk['data']['text']
if think_start:
continue
else:
workflow_contents += chunk['data']['text']
if messsage_idx % 8 == 0:
yield_this_iteration = True
if yield_this_iteration:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
is_final=is_final,
)
msg._resume_from_form = True
if action_title:
msg._resume_action_title = action_title
if node_title:
msg._resume_node_title = node_title
if is_final and repause_form_data:
msg._form_data = repause_form_data
msg._open_new_card = True
yield msg
if is_final:
return
async def _workflow_messages_chunk( async def _workflow_messages_chunk(
self, query: pipeline_query.Query self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流""" """调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
# Resume paused workflow via submit endpoint
async for msg in self._submit_workflow_form(form_action):
yield msg
return
if not query.session.using_conversation.uuid: if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4()) query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -672,6 +1121,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
think_start = False think_start = False
think_end = False think_end = False
workflow_contents = '' workflow_contents = ''
workflow_run_id = ''
human_input_yielded = False
# Saved form data to attach to the final MessageChunk so the adapter
# can detect it when is_final=True and render buttons.
pending_form_data = None
display_text = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think') remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run( async for chunk in self.dify_client.workflow_run(
@@ -682,7 +1138,61 @@ class DifyServiceAPIRunner(runner.RequestRunner):
): ):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk)) self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events: if chunk['event'] in ignored_events:
if chunk['event'] == 'workflow_started':
workflow_run_id = chunk['data'].get('workflow_run_id', '')
continue continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
# Persist form state in module-level store keyed by session
raw_inputs = reason.get('inputs', {})
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
# Pass form render metadata to downstream stages
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
display_text = _format_human_input_text(node_title, form_content, actions)
workflow_contents += display_text + '\n'
# Save form data to attach to the final chunk later.
# We do NOT yield here — the form content will be sent
# as the final MessageChunk (with is_final=True and
# _form_data) so the adapter can update the card and
# add buttons in one pass.
pending_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'workflow_run_id': workflow_run_id,
'form_token': reason.get('form_token', ''),
}
human_input_yielded = True
if chunk['event'] == 'workflow_finished': if chunk['event'] == 'workflow_finished':
is_final = True is_final = True
if chunk['data']['error']: if chunk['data']['error']:
@@ -730,11 +1240,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg yield msg
if messsage_idx % 8 == 0 or is_final: if messsage_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk( final_content = workflow_contents if workflow_contents.strip() else ''
msg = provider_message.MessageChunk(
role='assistant', role='assistant',
content=workflow_contents, content=final_content,
is_final=is_final, is_final=is_final,
) )
# Attach form data to the final chunk for the adapter
if is_final and pending_form_data:
msg._form_data = pending_form_data
pending_form_data = None
yield msg
# If the stream ended after workflow_paused without a
# workflow_finished event, yield a final chunk so the adapter
# can update the card and add buttons.
if human_input_yielded and not is_final:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents or display_text,
is_final=True,
)
msg._form_data = pending_form_data
yield msg
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求""" """运行请求"""

File diff suppressed because one or more lines are too long