mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 03:55:55 +00:00
Compare commits
5 Commits
master
...
feat/card_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0f65b17ec | ||
|
|
2b533c4a00 | ||
|
|
f663d87a60 | ||
|
|
60e5b873ee | ||
|
|
b96f209b98 |
@@ -109,6 +109,61 @@ class AsyncDifyServiceClient:
|
||||
if chunk.startswith('data:'):
|
||||
yield json.loads(chunk[5:])
|
||||
|
||||
async def workflow_submit(
|
||||
self,
|
||||
form_token: str,
|
||||
workflow_run_id: str,
|
||||
inputs: dict[str, typing.Any],
|
||||
user: str,
|
||||
action: str = '',
|
||||
timeout: float = 120.0,
|
||||
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
||||
"""Submit human input to resume a paused workflow, then stream events.
|
||||
|
||||
1. POST /form/human_input/{form_token} to submit the form
|
||||
2. GET /workflow/{task_id}/events to stream the resumed workflow events
|
||||
"""
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.api_key}',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
base_url=self.base_url,
|
||||
trust_env=True,
|
||||
timeout=timeout,
|
||||
) as client:
|
||||
# Step 1: Submit the form
|
||||
payload: dict[str, typing.Any] = {
|
||||
'inputs': inputs if isinstance(inputs, dict) else {},
|
||||
'user': user,
|
||||
'action': action,
|
||||
}
|
||||
|
||||
submit_resp = await client.post(
|
||||
f'/form/human_input/{form_token}',
|
||||
headers=headers,
|
||||
json=payload,
|
||||
)
|
||||
if submit_resp.status_code != 200:
|
||||
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
|
||||
|
||||
# Step 2: Stream resumed workflow events
|
||||
async with client.stream(
|
||||
'GET',
|
||||
f'/workflow/{workflow_run_id}/events',
|
||||
headers={'Authorization': f'Bearer {self.api_key}'},
|
||||
params={'user': user},
|
||||
) as r:
|
||||
async for chunk in r.aiter_lines():
|
||||
if r.status_code != 200:
|
||||
raise DifyAPIError(f'{r.status_code} {chunk}')
|
||||
if chunk.strip() == '':
|
||||
continue
|
||||
if chunk.startswith('data:'):
|
||||
yield json.loads(chunk[5:])
|
||||
|
||||
async def upload_file(
|
||||
self,
|
||||
file: httpx._types.FileTypes,
|
||||
|
||||
@@ -157,7 +157,7 @@ class RuntimePipeline:
|
||||
bot_message=query.resp_messages[-1],
|
||||
message=result.user_notice,
|
||||
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
|
||||
is_final=[msg.is_final for msg in query.resp_messages][0],
|
||||
is_final=[msg.is_final for msg in query.resp_messages][-1],
|
||||
)
|
||||
else:
|
||||
await query.adapter.reply_message(
|
||||
|
||||
@@ -42,9 +42,13 @@ class QueryPool:
|
||||
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
|
||||
pipeline_uuid: typing.Optional[str] = None,
|
||||
routed_by_rule: bool = False,
|
||||
variables: typing.Optional[dict[str, typing.Any]] = None,
|
||||
) -> pipeline_query.Query:
|
||||
async with self.condition:
|
||||
query_id = self.query_id_counter
|
||||
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
|
||||
if variables:
|
||||
initial_variables.update(variables)
|
||||
query = pipeline_query.Query(
|
||||
bot_uuid=bot_uuid,
|
||||
query_id=query_id,
|
||||
@@ -53,7 +57,7 @@ class QueryPool:
|
||||
sender_id=sender_id,
|
||||
message_event=message_event,
|
||||
message_chain=message_chain,
|
||||
variables={'_routed_by_rule': routed_by_rule},
|
||||
variables=initial_variables,
|
||||
resp_messages=[],
|
||||
resp_message_chain=[],
|
||||
adapter=adapter,
|
||||
|
||||
@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
|
||||
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
|
||||
# TODO 命令与流式的兼容性问题
|
||||
if await query.adapter.is_stream_output_supported() and has_chunks:
|
||||
is_final = [msg.is_final for msg in query.resp_messages][0]
|
||||
is_final = [msg.is_final for msg in query.resp_messages][-1]
|
||||
await query.adapter.reply_message_chunk(
|
||||
message_source=query.message_event,
|
||||
bot_message=query.resp_messages[-1],
|
||||
|
||||
@@ -501,6 +501,8 @@ class PlatformManager:
|
||||
bot_entity.adapter_config,
|
||||
logger,
|
||||
)
|
||||
if hasattr(adapter_inst, 'ap'):
|
||||
adapter_inst.ap = self.ap
|
||||
|
||||
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid(用于统一 webhook)
|
||||
if hasattr(adapter_inst, 'set_bot_uuid'):
|
||||
|
||||
@@ -31,6 +31,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||||
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
|
||||
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
|
||||
|
||||
class AESCipher(object):
|
||||
@@ -770,6 +771,7 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
|
||||
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
|
||||
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
|
||||
ap: typing.Any = pydantic.Field(exclude=True, default=None)
|
||||
|
||||
bot_account_id: str # 用于在流水线中识别at是否是本bot,直接以bot_name作为标识
|
||||
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
|
||||
@@ -792,6 +794,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
pending_monitoring_msg: dict[str, str]
|
||||
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
|
||||
reply_to_monitoring_msg: dict[str, tuple[str, float]]
|
||||
reply_message_card_ids: dict[str, str]
|
||||
card_sequence_dict: dict[str, int]
|
||||
# card_id → set of source message ids registered against it (for cleanup)
|
||||
card_id_to_source_ids: dict[str, set[str]]
|
||||
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
|
||||
card_streaming_text: dict[str, str]
|
||||
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
|
||||
card_pre_pause_text: dict[str, str]
|
||||
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
|
||||
card_resume_transitioned: set[str]
|
||||
_MONITORING_MAPPING_TTL = 600 # 10 minutes
|
||||
|
||||
seq: int # 用于在发送卡片消息中识别消息顺序,直接以seq作为标识
|
||||
@@ -812,11 +824,134 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
|
||||
asyncio.create_task(on_message(event))
|
||||
|
||||
def schedule_on_app_loop(coro):
|
||||
"""Run a coroutine on the application event loop from sync callbacks."""
|
||||
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
|
||||
|
||||
def sync_on_card_action(event):
|
||||
try:
|
||||
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {})
|
||||
action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
|
||||
# Parse JSON string values (from form action buttons)
|
||||
if isinstance(action_value_raw, str):
|
||||
try:
|
||||
action_value_obj = json.loads(action_value_raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
action_value_obj = {}
|
||||
else:
|
||||
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
|
||||
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
|
||||
|
||||
# Handle Dify form action button clicks
|
||||
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
|
||||
form_token = action_value_obj.get('form_token', '')
|
||||
workflow_run_id = action_value_obj.get('workflow_run_id', '')
|
||||
action_id = action_value_obj.get('action_id', '')
|
||||
session_key = action_value_obj.get('session_key', '')
|
||||
|
||||
if session_key.startswith('group_') or session_key.startswith('g:'):
|
||||
launcher_type = provider_session.LauncherTypes.GROUP
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('g:')
|
||||
else session_key[len('group_') :]
|
||||
)
|
||||
else:
|
||||
launcher_type = provider_session.LauncherTypes.PERSON
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('p:')
|
||||
else session_key[len('person_') :]
|
||||
)
|
||||
|
||||
# Find the bot entity to get bot_uuid and pipeline_uuid
|
||||
bot_uuid = ''
|
||||
pipeline_uuid = None
|
||||
for bot in self.ap.platform_mgr.bots:
|
||||
if bot.adapter is self:
|
||||
bot_uuid = bot.bot_entity.uuid
|
||||
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
|
||||
break
|
||||
|
||||
form_action_data = {
|
||||
'form_token': form_token,
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'action_id': action_id,
|
||||
'user': f'{launcher_type.value}_{launcher_id}',
|
||||
'inputs': {},
|
||||
}
|
||||
|
||||
context = getattr(event.event, 'context', None)
|
||||
open_message_id = getattr(context, 'open_message_id', None)
|
||||
source_time = datetime.datetime.now()
|
||||
event_time = source_time.timestamp()
|
||||
action_text = action_value_obj.get('action_id', 'confirm')
|
||||
message_chain = platform_message.MessageChain(
|
||||
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
|
||||
)
|
||||
if open_message_id:
|
||||
message_chain.insert(
|
||||
0,
|
||||
platform_message.Source(
|
||||
id=open_message_id,
|
||||
time=source_time,
|
||||
),
|
||||
)
|
||||
|
||||
operator = getattr(event.event, 'operator', None)
|
||||
user_id = (
|
||||
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
|
||||
)
|
||||
|
||||
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||
synthetic_event = platform_events.GroupMessage(
|
||||
sender=platform_entities.GroupMember(
|
||||
id=user_id,
|
||||
member_name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
group=platform_entities.Group(
|
||||
id=launcher_id,
|
||||
name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
),
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=event_time,
|
||||
source_platform_object=event,
|
||||
)
|
||||
else:
|
||||
synthetic_event = platform_events.FriendMessage(
|
||||
sender=platform_entities.Friend(
|
||||
id=user_id,
|
||||
nickname='',
|
||||
remark='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
time=event_time,
|
||||
source_platform_object=event,
|
||||
)
|
||||
|
||||
async def add_form_action_query():
|
||||
await self.ap.query_pool.add_query(
|
||||
bot_uuid=bot_uuid,
|
||||
launcher_type=launcher_type,
|
||||
launcher_id=launcher_id,
|
||||
sender_id=user_id,
|
||||
message_event=synthetic_event,
|
||||
message_chain=message_chain,
|
||||
adapter=self,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
variables={
|
||||
'_dify_form_action': form_action_data,
|
||||
'_routed_by_rule': True,
|
||||
},
|
||||
)
|
||||
|
||||
schedule_on_app_loop(add_form_action_query())
|
||||
|
||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||
|
||||
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
|
||||
|
||||
if action_value == '有帮助':
|
||||
feedback_type = 1
|
||||
elif action_value == '无帮助':
|
||||
@@ -857,17 +992,14 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
)
|
||||
|
||||
if platform_events.FeedbackEvent in self.listeners:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||
else:
|
||||
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||
schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
|
||||
|
||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||
|
||||
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
|
||||
except Exception:
|
||||
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
|
||||
traceback.print_exc()
|
||||
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
|
||||
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
||||
|
||||
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
|
||||
@@ -893,6 +1025,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
card_id_dict={},
|
||||
pending_monitoring_msg={},
|
||||
reply_to_monitoring_msg={},
|
||||
reply_message_card_ids={},
|
||||
card_sequence_dict={},
|
||||
card_id_to_source_ids={},
|
||||
card_streaming_text={},
|
||||
card_pre_pause_text={},
|
||||
card_resume_transitioned=set(),
|
||||
seq=1,
|
||||
listeners={},
|
||||
quart_app=quart_app,
|
||||
@@ -1132,6 +1270,33 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
for k in expired:
|
||||
del self.reply_to_monitoring_msg[k]
|
||||
|
||||
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
|
||||
"""Return the next strictly increasing sequence for a card update."""
|
||||
current = self.card_sequence_dict.get(card_id, 0)
|
||||
next_seq = max(current + 1, suggested)
|
||||
self.card_sequence_dict[card_id] = next_seq
|
||||
return next_seq
|
||||
|
||||
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
|
||||
"""Register a card_id under one or more source message ids."""
|
||||
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
|
||||
for sid in source_ids:
|
||||
if not sid:
|
||||
continue
|
||||
self.reply_message_card_ids[sid] = card_id
|
||||
bucket.add(sid)
|
||||
|
||||
def _drop_card_state(self, card_id: str) -> None:
|
||||
"""Pop all per-card state for the given card_id."""
|
||||
if not card_id:
|
||||
return
|
||||
for sid in self.card_id_to_source_ids.pop(card_id, set()):
|
||||
self.reply_message_card_ids.pop(sid, None)
|
||||
self.card_sequence_dict.pop(card_id, None)
|
||||
self.card_streaming_text.pop(card_id, None)
|
||||
self.card_pre_pause_text.pop(card_id, None)
|
||||
self.card_resume_transitioned.discard(card_id)
|
||||
|
||||
async def create_card_id(self, message_id):
|
||||
try:
|
||||
# self.logger.debug('飞书支持stream输出,创建卡片......')
|
||||
@@ -1327,6 +1492,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
self.card_id_dict[message_id] = response.data.card_id
|
||||
|
||||
card_id = response.data.card_id
|
||||
self.card_sequence_dict[card_id] = 0
|
||||
return card_id
|
||||
|
||||
except Exception as e:
|
||||
@@ -1339,6 +1505,12 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
"""
|
||||
# message_id = event.message_chain.message_id
|
||||
|
||||
source_message_id = str(event.message_chain.message_id)
|
||||
existing_card_id = self.reply_message_card_ids.get(source_message_id)
|
||||
if existing_card_id:
|
||||
self.card_id_dict[message_id] = existing_card_id
|
||||
return True
|
||||
|
||||
card_id = await self.create_card_id(message_id)
|
||||
content = {
|
||||
'type': 'card',
|
||||
@@ -1377,6 +1549,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
user_msg_id = event.message_chain.message_id
|
||||
reply_msg_id = getattr(response.data, 'message_id', None)
|
||||
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
|
||||
# Register the card under both the user-incoming msg id (so a
|
||||
# second reply_message_first_chunk for the same user message
|
||||
# reuses this card) AND the bot-reply msg id (so a synthetic
|
||||
# event from a form-button callback — whose Source.id equals
|
||||
# the bot's card message id — hits the same card and renders
|
||||
# the resume content into it).
|
||||
if reply_msg_id:
|
||||
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
|
||||
else:
|
||||
self._register_card_for_source(card_id, str(user_msg_id))
|
||||
if reply_msg_id and monitoring_msg_id:
|
||||
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
|
||||
self._cleanup_monitoring_mapping()
|
||||
@@ -1385,6 +1567,93 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
return True
|
||||
|
||||
async def _open_new_form_card(
|
||||
self,
|
||||
message_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
) -> str | None:
|
||||
"""Spawn a fresh card to host a re-paused human-input prompt.
|
||||
|
||||
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
|
||||
replies it to the current incoming message so it appears as the next
|
||||
step in the chat, registers the new reply_msg_id so subsequent button
|
||||
callbacks resolve back to it, and renders the prompt + buttons on it.
|
||||
|
||||
Returns the new card_id, or ``None`` if creation failed (caller is
|
||||
responsible for falling back to in-place update so the workflow
|
||||
remains continuable).
|
||||
"""
|
||||
source_message_id = getattr(message_source.message_chain, 'message_id', None)
|
||||
if not source_message_id:
|
||||
await self.logger.error('Cannot open new form card: source message_id missing')
|
||||
return None
|
||||
|
||||
try:
|
||||
new_card_id = await self.create_card_id(message_id)
|
||||
except Exception:
|
||||
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
|
||||
return None
|
||||
|
||||
tenant_key = (
|
||||
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
|
||||
)
|
||||
app_access_token = self.get_app_access_token()
|
||||
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||
req_opt: RequestOption = (
|
||||
RequestOption.builder()
|
||||
.app_ticket(self.app_ticket)
|
||||
.tenant_key(tenant_key)
|
||||
.app_access_token(app_access_token)
|
||||
.tenant_access_token(tenant_access_token)
|
||||
.build()
|
||||
)
|
||||
|
||||
content = {
|
||||
'type': 'card',
|
||||
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
|
||||
}
|
||||
request: ReplyMessageRequest = (
|
||||
ReplyMessageRequest.builder()
|
||||
.message_id(str(source_message_id))
|
||||
.request_body(
|
||||
ReplyMessageRequestBody.builder()
|
||||
.content(json.dumps(content))
|
||||
.msg_type('interactive')
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
|
||||
try:
|
||||
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
|
||||
except Exception:
|
||||
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
|
||||
return None
|
||||
|
||||
if not response.success():
|
||||
await self.logger.error(
|
||||
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
|
||||
f'log_id={response.get_log_id()}'
|
||||
)
|
||||
return None
|
||||
|
||||
reply_msg_id = getattr(response.data, 'message_id', None)
|
||||
if reply_msg_id:
|
||||
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
|
||||
|
||||
sequence = self._next_card_sequence(new_card_id, 1)
|
||||
await self._update_card_layout(
|
||||
card_id=new_card_id,
|
||||
message_source=message_source,
|
||||
text_message='',
|
||||
sequence=sequence,
|
||||
form_data=form_data,
|
||||
show_form_prompt=True,
|
||||
)
|
||||
return new_card_id
|
||||
|
||||
async def reply_message(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
@@ -1504,45 +1773,492 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
):
|
||||
"""
|
||||
回复消息变成更新卡片消息
|
||||
|
||||
Supports Dify form-action resume: when the runner yields a chunk with
|
||||
``_resume_from_form=True``, the card transitions from buttons to a
|
||||
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
|
||||
for subsequent resume chunks to stream into.
|
||||
|
||||
When ``_open_new_card=True`` on the final chunk, the existing card is
|
||||
left as-is and the pipeline will create a new card (with fresh form
|
||||
buttons) for the re-pause.
|
||||
"""
|
||||
# self.seq += 1
|
||||
message_id = bot_message.resp_message_id
|
||||
msg_seq = bot_message.msg_sequence
|
||||
if msg_seq % 8 == 0 or is_final:
|
||||
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
|
||||
|
||||
text_message = ''
|
||||
if text_elements:
|
||||
parts = []
|
||||
for paragraph in text_elements:
|
||||
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
|
||||
if para_text:
|
||||
parts.append(para_text)
|
||||
text_message = '\n\n'.join(parts)
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
resume_from = getattr(bot_message, '_resume_from_form', False)
|
||||
action_title = getattr(bot_message, '_resume_action_title', '')
|
||||
resume_node_title = getattr(bot_message, '_resume_node_title', '')
|
||||
open_new_card = getattr(bot_message, '_open_new_card', False)
|
||||
if action_title:
|
||||
if resume_node_title:
|
||||
selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
|
||||
else:
|
||||
selected_notice = f'**已选择**:{action_title}'
|
||||
else:
|
||||
selected_notice = ''
|
||||
|
||||
# content = {
|
||||
# 'type': 'card_json',
|
||||
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}},
|
||||
# }
|
||||
# ── decide whether this chunk needs a card update ────────────────────
|
||||
card_id = self.card_id_dict.get(message_id)
|
||||
if not card_id:
|
||||
return
|
||||
|
||||
request: ContentCardElementRequest = (
|
||||
ContentCardElementRequest.builder()
|
||||
.card_id(self.card_id_dict[message_id])
|
||||
.element_id('streaming_txt')
|
||||
.request_body(
|
||||
ContentCardElementRequestBody.builder()
|
||||
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204")
|
||||
.content(text_message)
|
||||
.sequence(msg_seq)
|
||||
# ── convert message chain → text ─────────────────────────────────────
|
||||
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
|
||||
|
||||
text_message = ''
|
||||
if text_elements:
|
||||
parts = []
|
||||
for paragraph in text_elements:
|
||||
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
|
||||
if para_text:
|
||||
parts.append(para_text)
|
||||
text_message = '\n\n'.join(parts)
|
||||
|
||||
tenant_key = (
|
||||
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
|
||||
)
|
||||
app_access_token = self.get_app_access_token()
|
||||
tenant_access_token = self.get_tenant_access_token(tenant_key)
|
||||
req_opt: RequestOption = (
|
||||
RequestOption.builder()
|
||||
.app_ticket(self.app_ticket)
|
||||
.tenant_key(tenant_key)
|
||||
.app_access_token(app_access_token)
|
||||
.tenant_access_token(tenant_access_token)
|
||||
.build()
|
||||
)
|
||||
|
||||
card_sequence = self._next_card_sequence(card_id, msg_seq)
|
||||
|
||||
# ── RESUME: first chunk after button click ───────────────────────────
|
||||
if resume_from and card_id not in self.card_resume_transitioned:
|
||||
# Transition the card from the form state into resume mode.
|
||||
# Preserve the text that was shown before the pause, and seed the
|
||||
# resume placeholder with the current resume content if we already
|
||||
# have any on the first yielded chunk.
|
||||
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
|
||||
initial_resume_text = text_message or '\u200b'
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause_text,
|
||||
sequence=card_sequence,
|
||||
form_data=None,
|
||||
notice_text=selected_notice,
|
||||
resume_placeholder_text=initial_resume_text,
|
||||
)
|
||||
self.card_resume_transitioned.add(card_id)
|
||||
self.card_pre_pause_text[card_id] = pre_pause_text
|
||||
self.card_streaming_text[card_id] = text_message
|
||||
if not is_final:
|
||||
return
|
||||
|
||||
# ── RESUME: subsequent chunks → full card update ─────────────────────
|
||||
if resume_from and card_id in self.card_resume_transitioned:
|
||||
cached = self.card_streaming_text.get(card_id, '')
|
||||
if text_message != cached:
|
||||
self.card_streaming_text[card_id] = text_message
|
||||
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause_text,
|
||||
sequence=card_sequence,
|
||||
form_data=None,
|
||||
notice_text=selected_notice,
|
||||
resume_placeholder_text=text_message,
|
||||
)
|
||||
if not is_final:
|
||||
return
|
||||
|
||||
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
|
||||
if not resume_from and (msg_seq % 8 == 0 or is_final):
|
||||
cached = self.card_streaming_text.get(card_id)
|
||||
if text_message != cached:
|
||||
self.card_streaming_text[card_id] = text_message
|
||||
request: ContentCardElementRequest = (
|
||||
ContentCardElementRequest.builder()
|
||||
.card_id(card_id)
|
||||
.element_id('streaming_txt')
|
||||
.request_body(
|
||||
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
|
||||
request, req_opt
|
||||
)
|
||||
if not response.success():
|
||||
raise Exception(
|
||||
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
|
||||
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
|
||||
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
|
||||
)
|
||||
|
||||
# ── FINAL chunk: full card layout update ─────────────────────────────
|
||||
if is_final:
|
||||
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
|
||||
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
|
||||
resume_cached = self.card_streaming_text.get(card_id, '')
|
||||
if form_data:
|
||||
if open_new_card:
|
||||
# The old card has already been laid out into resume mode
|
||||
# by the resume-transition block above (notice + resume
|
||||
# placeholder). Finalise it as a frozen step snapshot and
|
||||
# spawn a brand-new card to host the next human-input
|
||||
# prompt — each step stays visible as its own card in the
|
||||
# chat history.
|
||||
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
|
||||
if new_card_id is None:
|
||||
# Fallback: keep the existing in-place behaviour so the
|
||||
# workflow remains continuable even if creating the
|
||||
# new card failed.
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause,
|
||||
sequence=final_seq,
|
||||
form_data=form_data,
|
||||
resume_placeholder_text=resume_cached,
|
||||
show_form_prompt=True,
|
||||
)
|
||||
self.card_streaming_text.pop(card_id, None)
|
||||
self.card_pre_pause_text.pop(card_id, None)
|
||||
else:
|
||||
# The old card is now a frozen snapshot; let go of its
|
||||
# streaming-side state but keep its source registrations
|
||||
# intact (no _drop_card_state) so historical button
|
||||
# callbacks aimed at it can still be matched if needed.
|
||||
self.card_streaming_text.pop(card_id, None)
|
||||
self.card_pre_pause_text.pop(card_id, None)
|
||||
self.card_resume_transitioned.discard(card_id)
|
||||
else:
|
||||
# Initial pause path: render prompt + buttons in place on
|
||||
# the current card.
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=text_message,
|
||||
sequence=final_seq,
|
||||
form_data=form_data,
|
||||
show_form_prompt=True,
|
||||
)
|
||||
# The human-input prompt itself is rendered as buttons only
|
||||
# on Lark, so do not keep the hidden fallback text around;
|
||||
# otherwise it will resurface after the button click.
|
||||
self.card_streaming_text[card_id] = ''
|
||||
self.card_pre_pause_text[card_id] = ''
|
||||
else:
|
||||
# Normal finish: keep pre-pause + resume content visible,
|
||||
# remove buttons/notice, drop the resume placeholder.
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=pre_pause,
|
||||
sequence=final_seq,
|
||||
form_data=None,
|
||||
notice_text=selected_notice if resume_from else '',
|
||||
resume_placeholder_text=resume_cached,
|
||||
)
|
||||
self._drop_card_state(card_id)
|
||||
self.card_id_dict.pop(message_id, None)
|
||||
|
||||
# ── media (images / files) appended at the end ───────────────────────
|
||||
if is_final and media_items:
|
||||
for media in media_items:
|
||||
media_request: ReplyMessageRequest = (
|
||||
ReplyMessageRequest.builder()
|
||||
.message_id(message_source.message_chain.message_id)
|
||||
.request_body(
|
||||
ReplyMessageRequestBody.builder()
|
||||
.content(json.dumps(media['content']))
|
||||
.msg_type(media['msg_type'])
|
||||
.reply_in_thread(False)
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
|
||||
media_request, req_opt
|
||||
)
|
||||
if not media_response.success():
|
||||
raise Exception(
|
||||
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
|
||||
)
|
||||
|
||||
async def _add_form_buttons_to_card(
|
||||
self,
|
||||
card_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
text_message: str = '',
|
||||
sequence: int = 1,
|
||||
):
|
||||
"""Update the entire card to include form action buttons.
|
||||
|
||||
Uses card.aupdate to replace the card JSON with a template that
|
||||
includes the streaming text content plus interactive buttons.
|
||||
"""
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=text_message,
|
||||
sequence=sequence,
|
||||
form_data=form_data,
|
||||
)
|
||||
|
||||
async def _remove_form_buttons_from_card(
|
||||
self,
|
||||
card_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
text_message: str = '',
|
||||
sequence: int = 1,
|
||||
):
|
||||
"""Replace the human-input card layout with the plain final layout."""
|
||||
await self._update_card_layout(
|
||||
card_id=card_id,
|
||||
message_source=message_source,
|
||||
text_message=text_message,
|
||||
sequence=sequence,
|
||||
form_data=None,
|
||||
)
|
||||
|
||||
async def _update_card_layout(
|
||||
self,
|
||||
card_id: str,
|
||||
message_source: platform_events.MessageEvent,
|
||||
text_message: str = '',
|
||||
sequence: int = 1,
|
||||
form_data: dict | None = None,
|
||||
notice_text: str = '',
|
||||
resume_placeholder_text: str = '',
|
||||
show_form_prompt: bool = True,
|
||||
):
|
||||
"""Update the entire card layout.
|
||||
|
||||
• form_data → show interactive buttons (initial Dify pause)
|
||||
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
|
||||
• resume_placeholder_text → add a streaming_txt_resume markdown element
|
||||
"""
|
||||
form_data = form_data or {}
|
||||
actions = form_data.get('actions', [])
|
||||
form_token = form_data.get('form_token', '')
|
||||
workflow_run_id = form_data.get('workflow_run_id', '')
|
||||
node_title = form_data.get('node_title', '') or 'Human Input Required'
|
||||
form_content = form_data.get('form_content', '')
|
||||
|
||||
# When form_data is set, the visible content is rendered inside the
|
||||
# interactive container, so the top streaming text should stay empty
|
||||
# to avoid duplicate text above the action area.
|
||||
#
|
||||
# For resume notice state, keep the existing text visible in the card
|
||||
# and only add the grey "selected" notice below it.
|
||||
if form_data:
|
||||
render_text_message = ''
|
||||
else:
|
||||
render_text_message = text_message
|
||||
|
||||
# Determine session key from message source
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
session_key = f'group_{message_source.group.id}'
|
||||
else:
|
||||
session_key = f'person_{message_source.sender.id}'
|
||||
|
||||
# Build button elements matching the existing card template's thumbsup/down format
|
||||
action_buttons = []
|
||||
for action in actions:
|
||||
action_id = action.get('id', '')
|
||||
action_title = action.get('title', action_id)
|
||||
button_style = action.get('button_style', 'default')
|
||||
|
||||
if button_style == 'primary':
|
||||
lark_button_type = 'primary'
|
||||
elif button_style == 'danger':
|
||||
lark_button_type = 'danger'
|
||||
else:
|
||||
lark_button_type = 'default'
|
||||
|
||||
action_buttons.append(
|
||||
{
|
||||
'tag': 'button',
|
||||
'text': {'tag': 'plain_text', 'content': action_title},
|
||||
'type': lark_button_type,
|
||||
'width': 'fill',
|
||||
'size': 'medium',
|
||||
'hover_tips': {'tag': 'plain_text', 'content': action_title},
|
||||
'behaviors': [
|
||||
{
|
||||
'type': 'callback',
|
||||
'value': {
|
||||
'form_action': True,
|
||||
'form_token': form_token,
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'action_id': action_id,
|
||||
'session_key': session_key,
|
||||
},
|
||||
}
|
||||
],
|
||||
'margin': '0px 0px 0px 0px',
|
||||
}
|
||||
)
|
||||
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
# self.seq = 1 # 消息回复结束之后重置seq
|
||||
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片
|
||||
interactive_elements = []
|
||||
if form_data:
|
||||
if show_form_prompt:
|
||||
interactive_elements = [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': f'**[Human Input Required] {node_title}**',
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 4px 0px',
|
||||
}
|
||||
]
|
||||
if form_content:
|
||||
interactive_elements.append(
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': form_content,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 8px 0px',
|
||||
}
|
||||
)
|
||||
interactive_elements.append(
|
||||
{
|
||||
'tag': 'column_set',
|
||||
'horizontal_spacing': '8px',
|
||||
'horizontal_align': 'left',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'columns': [
|
||||
{
|
||||
'tag': 'column',
|
||||
'width': 'weighted',
|
||||
'elements': [btn],
|
||||
'padding': '0px 0px 0px 0px',
|
||||
}
|
||||
for btn in action_buttons
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
# Build the full card JSON with buttons, same structure as create_card_id
|
||||
# ── mid_section: either form buttons, resume notice, or empty ──
|
||||
mid_section_elements = []
|
||||
if form_data:
|
||||
mid_section_elements = [
|
||||
{
|
||||
'tag': 'interactive_container',
|
||||
'margin': '12px 0px 8px 0px',
|
||||
'padding': '12px 12px 12px 12px',
|
||||
'has_border': True,
|
||||
'elements': interactive_elements,
|
||||
},
|
||||
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
|
||||
]
|
||||
elif notice_text:
|
||||
mid_section_elements = [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': notice_text,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '8px 0px 4px 0px',
|
||||
'text_color': 'grey',
|
||||
},
|
||||
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
|
||||
]
|
||||
|
||||
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
|
||||
resume_elements = []
|
||||
if resume_placeholder_text:
|
||||
resume_elements = [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': resume_placeholder_text,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'element_id': 'streaming_txt_resume',
|
||||
},
|
||||
]
|
||||
|
||||
card_data = {
|
||||
'schema': '2.0',
|
||||
'config': {
|
||||
'update_multi': True,
|
||||
'streaming_mode': False,
|
||||
},
|
||||
'body': {
|
||||
'direction': 'vertical',
|
||||
'padding': '12px 12px 12px 12px',
|
||||
'elements': [
|
||||
{
|
||||
'tag': 'div',
|
||||
'text': {
|
||||
'tag': 'plain_text',
|
||||
'content': 'LangBot',
|
||||
'text_size': 'normal',
|
||||
'text_align': 'left',
|
||||
'text_color': 'default',
|
||||
},
|
||||
'icon': {
|
||||
'tag': 'custom_icon',
|
||||
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
|
||||
},
|
||||
},
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': render_text_message,
|
||||
'text_align': 'left',
|
||||
'text_size': 'normal',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'element_id': 'streaming_txt',
|
||||
},
|
||||
*mid_section_elements,
|
||||
*resume_elements,
|
||||
{
|
||||
'tag': 'column_set',
|
||||
'horizontal_spacing': '12px',
|
||||
'horizontal_align': 'right',
|
||||
'columns': [
|
||||
{
|
||||
'tag': 'column',
|
||||
'width': 'weighted',
|
||||
'elements': [
|
||||
{
|
||||
'tag': 'markdown',
|
||||
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
|
||||
'text_align': 'left',
|
||||
'text_size': 'notation',
|
||||
'margin': '4px 0px 0px 0px',
|
||||
'icon': {
|
||||
'tag': 'standard_icon',
|
||||
'token': 'robot_outlined',
|
||||
'color': 'grey',
|
||||
},
|
||||
}
|
||||
],
|
||||
'padding': '0px 0px 0px 0px',
|
||||
'direction': 'vertical',
|
||||
'horizontal_spacing': '8px',
|
||||
'vertical_spacing': '8px',
|
||||
'horizontal_align': 'left',
|
||||
'vertical_align': 'top',
|
||||
'margin': '0px 0px 0px 0px',
|
||||
'weight': 1,
|
||||
}
|
||||
],
|
||||
'margin': '0px 0px 4px 0px',
|
||||
},
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
try:
|
||||
tenant_key = (
|
||||
message_source.source_platform_object.header.tenant_key
|
||||
if message_source.source_platform_object
|
||||
@@ -1558,39 +2274,27 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
.tenant_access_token(tenant_access_token)
|
||||
.build()
|
||||
)
|
||||
# 发起请求
|
||||
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
|
||||
|
||||
# 处理失败返回
|
||||
if not response.success():
|
||||
raise Exception(
|
||||
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
|
||||
request: UpdateCardRequest = (
|
||||
UpdateCardRequest.builder()
|
||||
.card_id(card_id)
|
||||
.request_body(
|
||||
UpdateCardRequestBody.builder()
|
||||
.sequence(sequence)
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
|
||||
.build()
|
||||
)
|
||||
return
|
||||
|
||||
# Send media messages when streaming is done
|
||||
if is_final and media_items:
|
||||
for media in media_items:
|
||||
media_request: ReplyMessageRequest = (
|
||||
ReplyMessageRequest.builder()
|
||||
.message_id(message_source.message_chain.message_id)
|
||||
.request_body(
|
||||
ReplyMessageRequestBody.builder()
|
||||
.content(json.dumps(media['content']))
|
||||
.msg_type(media['msg_type'])
|
||||
.reply_in_thread(False)
|
||||
.uuid(str(uuid.uuid4()))
|
||||
.build()
|
||||
)
|
||||
.build()
|
||||
)
|
||||
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
|
||||
media_request, req_opt
|
||||
)
|
||||
if not media_response.success():
|
||||
raise Exception(
|
||||
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
|
||||
)
|
||||
.build()
|
||||
)
|
||||
response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
|
||||
if not response.success():
|
||||
await self.logger.error(
|
||||
f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
|
||||
f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
|
||||
)
|
||||
except Exception:
|
||||
await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
|
||||
|
||||
async def is_muted(self, group_id: int) -> bool:
|
||||
return False
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
from __future__ import annotations
|
||||
import time
|
||||
|
||||
|
||||
import telegram
|
||||
import telegram.ext
|
||||
from telegram import Update
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
|
||||
import telegramify_markdown
|
||||
import typing
|
||||
import traceback
|
||||
import json
|
||||
import base64
|
||||
import pydantic
|
||||
|
||||
@@ -189,6 +189,7 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
|
||||
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
bot: telegram.Bot = pydantic.Field(exclude=True)
|
||||
application: telegram.ext.Application = pydantic.Field(exclude=True)
|
||||
ap: typing.Any = pydantic.Field(exclude=True, default=None)
|
||||
|
||||
message_converter: TelegramMessageConverter = TelegramMessageConverter()
|
||||
event_converter: TelegramEventConverter = TelegramEventConverter()
|
||||
@@ -224,6 +225,102 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
telegram_callback,
|
||||
)
|
||||
)
|
||||
|
||||
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
query = update.callback_query
|
||||
await query.answer()
|
||||
try:
|
||||
data = json.loads(query.data)
|
||||
if data.get('form_action') or data.get('f'):
|
||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||
|
||||
workflow_run_id = data.get('workflow_run_id', '')
|
||||
w_suffix = data.get('w', '')
|
||||
action_id = data.get('action_id') or data.get('a', '')
|
||||
session_key = data.get('session_key') or data.get('s', '')
|
||||
|
||||
if session_key.startswith('group_') or session_key.startswith('g:'):
|
||||
launcher_type = provider_session.LauncherTypes.GROUP
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('g:')
|
||||
else session_key[len('group_') :]
|
||||
)
|
||||
else:
|
||||
launcher_type = provider_session.LauncherTypes.PERSON
|
||||
launcher_id = (
|
||||
session_key.split(':', 1)[1]
|
||||
if session_key.startswith('p:')
|
||||
else session_key[len('person_') :]
|
||||
)
|
||||
|
||||
user_id = str(query.from_user.id)
|
||||
|
||||
# Find bot_uuid and pipeline_uuid
|
||||
bot_uuid = ''
|
||||
pipeline_uuid = None
|
||||
for b in self.ap.platform_mgr.bots:
|
||||
if b.adapter is self:
|
||||
bot_uuid = b.bot_entity.uuid
|
||||
pipeline_uuid = b.bot_entity.use_pipeline_uuid
|
||||
break
|
||||
|
||||
form_action_data = {
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'w_suffix': w_suffix,
|
||||
'action_id': action_id,
|
||||
'user': f'{launcher_type.value}_{launcher_id}',
|
||||
'inputs': {},
|
||||
}
|
||||
|
||||
message_chain = platform_message.MessageChain(
|
||||
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
|
||||
)
|
||||
|
||||
if launcher_type == provider_session.LauncherTypes.GROUP:
|
||||
synthetic_event = platform_events.GroupMessage(
|
||||
sender=platform_entities.GroupMember(
|
||||
id=user_id,
|
||||
member_name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
group=platform_entities.Group(
|
||||
id=launcher_id,
|
||||
name='',
|
||||
permission=platform_entities.Permission.Member,
|
||||
),
|
||||
),
|
||||
message_chain=message_chain,
|
||||
source_platform_object=update,
|
||||
)
|
||||
else:
|
||||
synthetic_event = platform_events.FriendMessage(
|
||||
sender=platform_entities.Friend(
|
||||
id=user_id,
|
||||
nickname='',
|
||||
remark='',
|
||||
),
|
||||
message_chain=message_chain,
|
||||
source_platform_object=update,
|
||||
)
|
||||
|
||||
await self.ap.query_pool.add_query(
|
||||
bot_uuid=bot_uuid,
|
||||
launcher_type=launcher_type,
|
||||
launcher_id=launcher_id,
|
||||
sender_id=user_id,
|
||||
message_event=synthetic_event,
|
||||
message_chain=message_chain,
|
||||
adapter=self,
|
||||
pipeline_uuid=pipeline_uuid,
|
||||
variables={
|
||||
'_dify_form_action': form_action_data,
|
||||
'_routed_by_rule': True,
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
|
||||
|
||||
application.add_handler(CallbackQueryHandler(callback_query_handler))
|
||||
super().__init__(
|
||||
config=config,
|
||||
logger=logger,
|
||||
@@ -319,14 +416,19 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
update = event.source_platform_object
|
||||
chat_id = update.effective_chat.id
|
||||
chat_type = update.effective_chat.type
|
||||
message_thread_id = update.message.message_thread_id
|
||||
effective_message = update.effective_message
|
||||
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||
|
||||
if chat_type == 'private':
|
||||
draft_id = int(time.time() * 1000)
|
||||
self.msg_stream_id[message_id] = ('private', draft_id)
|
||||
import time as _time
|
||||
|
||||
draft_id = int(_time.time() * 1000)
|
||||
self.msg_stream_id[message_id] = ('private', draft_id)
|
||||
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
|
||||
await self.bot.send_message_draft(**args)
|
||||
try:
|
||||
await self.bot.send_message_draft(**args)
|
||||
except (telegram.error.RetryAfter, telegram.error.BadRequest):
|
||||
pass
|
||||
else:
|
||||
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
|
||||
send_msg = await self.bot.send_message(**args)
|
||||
@@ -347,12 +449,13 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
assert isinstance(message_source.source_platform_object, Update)
|
||||
update = message_source.source_platform_object
|
||||
chat_id = update.effective_chat.id
|
||||
message_thread_id = update.message.message_thread_id
|
||||
effective_message = update.effective_message
|
||||
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||
|
||||
if message_id not in self.msg_stream_id:
|
||||
return
|
||||
|
||||
chat_mode, draft_id = self.msg_stream_id[message_id]
|
||||
chat_mode, stream_id = self.msg_stream_id[message_id]
|
||||
components = await TelegramMessageConverter.yiri2target(message, self.bot)
|
||||
|
||||
if not components or components[0]['type'] != 'text':
|
||||
@@ -361,16 +464,42 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
return
|
||||
|
||||
content = components[0]['text']
|
||||
form_data = getattr(bot_message, '_form_data', None)
|
||||
|
||||
if form_data and is_final:
|
||||
self.msg_stream_id.pop(message_id, None)
|
||||
await self._send_form_action_buttons(message_source, form_data)
|
||||
return
|
||||
|
||||
if chat_mode == 'private':
|
||||
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id)
|
||||
await self.bot.send_message_draft(**args)
|
||||
# Streaming via draft (ephemeral preview in the chat input area)
|
||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=stream_id)
|
||||
try:
|
||||
await self.bot.send_message_draft(**args)
|
||||
except telegram.error.BadRequest as exc:
|
||||
if 'Message_too_long' in str(exc):
|
||||
args['text'] = content[:4000] + '\n\n… (truncated)'
|
||||
try:
|
||||
await self.bot.send_message_draft(**args)
|
||||
except telegram.error.RetryAfter:
|
||||
pass
|
||||
else:
|
||||
pass # Ignore other draft errors (cosmetic)
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
del args['draft_id']
|
||||
await self.bot.send_message(**args)
|
||||
# Finalise: send the real message, discard the draft
|
||||
args = self._build_message_args(chat_id, content, message_thread_id)
|
||||
try:
|
||||
await self.bot.send_message(**args)
|
||||
except telegram.error.BadRequest as exc:
|
||||
if 'Message_too_long' in str(exc):
|
||||
args['text'] = content[:4000] + '\n\n… (truncated)'
|
||||
await self.bot.send_message(**args)
|
||||
else:
|
||||
raise
|
||||
self.msg_stream_id.pop(message_id)
|
||||
else:
|
||||
stream_id = draft_id
|
||||
# Streaming via edit_message_text (persistent message)
|
||||
if (msg_seq - 1) % 8 == 0 or is_final:
|
||||
args = {
|
||||
'message_id': stream_id,
|
||||
@@ -379,11 +508,68 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
}
|
||||
if self.config.get('markdown_card', False):
|
||||
args['parse_mode'] = 'MarkdownV2'
|
||||
await self.bot.edit_message_text(**args)
|
||||
try:
|
||||
await self.bot.edit_message_text(**args)
|
||||
except telegram.error.BadRequest as exc:
|
||||
if 'Message_too_long' in str(exc):
|
||||
args['text'] = self._process_markdown(content[:4000] + '\n\n… (truncated)')
|
||||
await self.bot.edit_message_text(**args)
|
||||
else:
|
||||
raise
|
||||
|
||||
if is_final and bot_message.tool_calls is None:
|
||||
self.msg_stream_id.pop(message_id)
|
||||
|
||||
async def _send_form_action_buttons(
|
||||
self,
|
||||
message_source: platform_events.MessageEvent,
|
||||
form_data: dict,
|
||||
):
|
||||
"""Send inline keyboard buttons for Dify human_input_required form actions."""
|
||||
actions = form_data.get('actions', [])
|
||||
node_title = form_data.get('node_title', '')
|
||||
form_content = form_data.get('form_content', '')
|
||||
workflow_run_id = form_data.get('workflow_run_id', '')
|
||||
# Telegram callback_data is capped at 64 bytes, so we identify the
|
||||
# paused workflow by the last 8 chars of workflow_run_id (unique
|
||||
# within a session with overwhelming probability).
|
||||
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
|
||||
|
||||
if isinstance(message_source, platform_events.GroupMessage):
|
||||
session_key = f'g:{message_source.group.id}'
|
||||
else:
|
||||
session_key = f'p:{message_source.sender.id}'
|
||||
|
||||
keyboard = []
|
||||
for action in actions:
|
||||
action_id = action.get('id', '')
|
||||
action_title = action.get('title', action_id)
|
||||
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
|
||||
if w_suffix:
|
||||
callback_payload['w'] = w_suffix
|
||||
callback_data = json.dumps(callback_payload, separators=(',', ':'))
|
||||
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
|
||||
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
|
||||
update = message_source.source_platform_object
|
||||
chat_id = update.effective_chat.id
|
||||
effective_message = update.effective_message
|
||||
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
|
||||
|
||||
text_lines = [f'[{node_title}] Please select an action:']
|
||||
if form_content:
|
||||
text_lines.insert(0, form_content)
|
||||
args = {
|
||||
'chat_id': chat_id,
|
||||
'text': '\n\n'.join(text_lines),
|
||||
'reply_markup': reply_markup,
|
||||
}
|
||||
if message_thread_id:
|
||||
args['message_thread_id'] = message_thread_id
|
||||
|
||||
await self.bot.send_message(**args)
|
||||
|
||||
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
|
||||
if not isinstance(event.source_platform_object, Update):
|
||||
return None
|
||||
|
||||
@@ -2,9 +2,11 @@ from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
import base64
|
||||
import mimetypes
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
from langbot.pkg.provider import runner
|
||||
@@ -16,6 +18,102 @@ from langbot.libs.dify_service_api.v1 import client, errors
|
||||
import httpx
|
||||
|
||||
|
||||
# Module-level store for paused-workflow form state, keyed by session key
|
||||
# (launcher_type_value + "_" + launcher_id). Each session holds an
|
||||
# insertion-ordered dict of form_token -> form_data, allowing multiple
|
||||
# Dify workflows to be paused simultaneously for the same session.
|
||||
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
|
||||
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
|
||||
|
||||
|
||||
def _session_key_from_query(query: pipeline_query.Query) -> str:
|
||||
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
|
||||
|
||||
|
||||
def _prune_pending_forms(now: float | None = None) -> None:
|
||||
if now is None:
|
||||
now = time.time()
|
||||
for session_key in list(_PENDING_FORMS.keys()):
|
||||
forms = _PENDING_FORMS[session_key]
|
||||
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
|
||||
for token in expired_tokens:
|
||||
forms.pop(token, None)
|
||||
if not forms:
|
||||
_PENDING_FORMS.pop(session_key, None)
|
||||
|
||||
|
||||
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
|
||||
_prune_pending_forms()
|
||||
stored = dict(form_data)
|
||||
expiration_time = stored.get('expiration_time')
|
||||
try:
|
||||
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
|
||||
except (TypeError, ValueError):
|
||||
expiration_ts = 0.0
|
||||
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
|
||||
form_token = str(stored.get('form_token') or '')
|
||||
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
|
||||
# Re-insert at the end so this becomes the "latest" entry
|
||||
forms.pop(form_token, None)
|
||||
forms[form_token] = stored
|
||||
|
||||
|
||||
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms or not form_token:
|
||||
return None
|
||||
return forms.get(form_token)
|
||||
|
||||
|
||||
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
|
||||
"""Look up a pending form whose workflow_run_id ends with the given suffix.
|
||||
|
||||
Used by adapters (e.g. Telegram) whose callback payload is too small to
|
||||
carry the full form_token / workflow_run_id.
|
||||
"""
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms or not w_suffix:
|
||||
return None
|
||||
for token in reversed(forms):
|
||||
form = forms[token]
|
||||
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
|
||||
return form
|
||||
return None
|
||||
|
||||
|
||||
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms:
|
||||
return None
|
||||
return forms[next(reversed(forms))]
|
||||
|
||||
|
||||
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
|
||||
"""Iterate pending forms for a session, newest-first."""
|
||||
_prune_pending_forms()
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms:
|
||||
return
|
||||
for token in reversed(list(forms.keys())):
|
||||
yield forms[token]
|
||||
|
||||
|
||||
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
|
||||
"""Clear one specific pending form (by token) or all forms for the session."""
|
||||
forms = _PENDING_FORMS.get(session_key)
|
||||
if not forms:
|
||||
return
|
||||
if form_token is None:
|
||||
_PENDING_FORMS.pop(session_key, None)
|
||||
return
|
||||
forms.pop(form_token, None)
|
||||
if not forms:
|
||||
_PENDING_FORMS.pop(session_key, None)
|
||||
|
||||
|
||||
@runner.runner_class('dify-service-api')
|
||||
class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
"""Dify Service API 对话请求器"""
|
||||
@@ -335,11 +433,140 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
|
||||
query.session.using_conversation.uuid = chunk['conversation_id']
|
||||
|
||||
async def _submit_workflow_form_blocking(
|
||||
self, form_action: dict
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
|
||||
|
||||
form_token = form_action['form_token']
|
||||
workflow_run_id = form_action['workflow_run_id']
|
||||
user = form_action['user']
|
||||
action_id = form_action.get('action_id', '')
|
||||
inputs = form_action.get('inputs', {})
|
||||
|
||||
async for chunk in self.dify_client.workflow_submit(
|
||||
form_token=form_token,
|
||||
workflow_run_id=workflow_run_id,
|
||||
inputs=inputs,
|
||||
user=user,
|
||||
action=action_id,
|
||||
timeout=120,
|
||||
):
|
||||
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
|
||||
|
||||
if chunk['event'] == 'workflow_finished':
|
||||
if chunk['data'].get('error'):
|
||||
raise errors.DifyAPIError(chunk['data']['error'])
|
||||
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=content,
|
||||
)
|
||||
|
||||
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
|
||||
"""Locate the pending form this action targets.
|
||||
|
||||
Tries identifiers in order of specificity: form_token, full
|
||||
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
|
||||
then falls back to the newest pending form for the session.
|
||||
"""
|
||||
form_token = form_action.get('form_token')
|
||||
if form_token:
|
||||
form = _get_pending_form_by_token(session_key, form_token)
|
||||
if form:
|
||||
return form
|
||||
|
||||
workflow_run_id = form_action.get('workflow_run_id')
|
||||
if workflow_run_id:
|
||||
for form in _iter_pending_forms(session_key):
|
||||
if form.get('workflow_run_id') == workflow_run_id:
|
||||
return form
|
||||
|
||||
w_suffix = form_action.get('w_suffix')
|
||||
if w_suffix:
|
||||
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
|
||||
if form:
|
||||
return form
|
||||
|
||||
return _get_latest_pending_form(session_key)
|
||||
|
||||
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
|
||||
"""Backfill resume fields from the matching pending form."""
|
||||
if not form_action:
|
||||
return None
|
||||
|
||||
merged_action = dict(form_action)
|
||||
merged_action.pop('w_suffix', None)
|
||||
pending_form = self._resolve_pending_form(session_key, form_action)
|
||||
if pending_form:
|
||||
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
|
||||
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
|
||||
'workflow_run_id', ''
|
||||
)
|
||||
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
|
||||
merged_action.setdefault('user', pending_form.get('user', ''))
|
||||
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
|
||||
|
||||
# Resolve clicked action's display title from the stored actions list
|
||||
if 'action_title' not in merged_action:
|
||||
clicked_id = merged_action.get('action_id', '')
|
||||
for action in pending_form.get('actions', []):
|
||||
if str(action.get('id', '')) == str(clicked_id):
|
||||
merged_action['action_title'] = action.get('title', clicked_id)
|
||||
break
|
||||
|
||||
return merged_action
|
||||
|
||||
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
|
||||
"""Match plain text replies against pending Dify form actions.
|
||||
|
||||
Iterates all pending forms newest-first; the first action whose
|
||||
title/id matches the text wins. This means when multiple forms are
|
||||
pending with the same button label, the most recent one resolves.
|
||||
"""
|
||||
normalized_text = user_text.strip().lower()
|
||||
if not normalized_text:
|
||||
return None
|
||||
|
||||
for pending_form in _iter_pending_forms(session_key):
|
||||
for action in pending_form.get('actions', []):
|
||||
titles = {
|
||||
str(action.get('title', '')).strip().lower(),
|
||||
str(action.get('id', '')).strip().lower(),
|
||||
}
|
||||
if normalized_text in titles:
|
||||
return {
|
||||
'form_token': pending_form.get('form_token', ''),
|
||||
'workflow_run_id': pending_form.get('workflow_run_id', ''),
|
||||
'action_id': action.get('id', ''),
|
||||
'action_title': action.get('title', action.get('id', '')),
|
||||
'node_title': pending_form.get('node_title', ''),
|
||||
'inputs': pending_form.get('inputs', {}),
|
||||
'user': pending_form.get('user', ''),
|
||||
}
|
||||
|
||||
return None
|
||||
|
||||
async def _workflow_messages(
|
||||
self, query: pipeline_query.Query
|
||||
) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""调用工作流"""
|
||||
|
||||
# Check if this is a form action resume (button click or text match)
|
||||
form_action_raw = query.variables.get('_dify_form_action')
|
||||
session_key = _session_key_from_query(query)
|
||||
|
||||
if form_action_raw:
|
||||
form_action = self._merge_pending_form_action(session_key, form_action_raw)
|
||||
else:
|
||||
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
|
||||
|
||||
if form_action:
|
||||
_clear_pending_form(session_key, form_action.get('form_token') or None)
|
||||
async for msg in self._submit_workflow_form_blocking(form_action):
|
||||
yield msg
|
||||
return
|
||||
|
||||
if not query.session.using_conversation.uuid:
|
||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||
|
||||
@@ -366,6 +593,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
}
|
||||
|
||||
inputs.update(query.variables)
|
||||
human_input_yielded = False
|
||||
|
||||
async for chunk in self.dify_client.workflow_run(
|
||||
inputs=inputs,
|
||||
@@ -377,6 +605,46 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
if chunk['event'] in ignored_events:
|
||||
continue
|
||||
|
||||
if chunk['event'] == 'workflow_paused':
|
||||
reasons = chunk['data'].get('reasons', [])
|
||||
workflow_run_id = chunk['data'].get('workflow_run_id', '')
|
||||
for reason in reasons:
|
||||
if reason.get('TYPE') == 'human_input_required':
|
||||
form_content = reason.get('form_content', '')
|
||||
actions = reason.get('actions', [])
|
||||
node_title = reason.get('node_title', '')
|
||||
|
||||
_set_pending_form(
|
||||
_session_key_from_query(query),
|
||||
{
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'form_id': reason.get('form_id'),
|
||||
'form_token': reason.get('form_token'),
|
||||
'node_id': reason.get('node_id'),
|
||||
'node_title': node_title,
|
||||
'form_content': form_content,
|
||||
'inputs': reason.get('inputs', {}),
|
||||
'actions': actions,
|
||||
'expiration_time': reason.get('expiration_time'),
|
||||
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||
},
|
||||
)
|
||||
|
||||
query.variables['_dify_form_render'] = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
}
|
||||
|
||||
action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
|
||||
display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
|
||||
|
||||
human_input_yielded = True
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=display_text,
|
||||
)
|
||||
|
||||
if chunk['event'] == 'node_started':
|
||||
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
|
||||
continue
|
||||
@@ -399,6 +667,8 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
yield msg
|
||||
|
||||
elif chunk['event'] == 'workflow_finished':
|
||||
if human_input_yielded:
|
||||
break
|
||||
if chunk['data']['error']:
|
||||
raise errors.DifyAPIError(chunk['data']['error'])
|
||||
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
|
||||
@@ -636,11 +906,153 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
|
||||
query.session.using_conversation.uuid = chunk['conversation_id']
|
||||
|
||||
async def _submit_workflow_form(
|
||||
self, form_action: dict
|
||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
"""Submit human input to resume a paused Dify workflow."""
|
||||
|
||||
form_token = form_action['form_token']
|
||||
workflow_run_id = form_action['workflow_run_id']
|
||||
user = form_action['user']
|
||||
action_id = form_action.get('action_id', '')
|
||||
action_title = form_action.get('action_title', '') or action_id
|
||||
node_title = form_action.get('node_title', '')
|
||||
inputs = form_action.get('inputs', {})
|
||||
|
||||
messsage_idx = 0
|
||||
is_final = False
|
||||
think_start = False
|
||||
think_end = False
|
||||
workflow_contents = ''
|
||||
repause_form_data: dict | None = None
|
||||
|
||||
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
|
||||
async for chunk in self.dify_client.workflow_submit(
|
||||
form_token=form_token,
|
||||
workflow_run_id=workflow_run_id,
|
||||
inputs=inputs,
|
||||
user=user,
|
||||
action=action_id,
|
||||
timeout=120,
|
||||
):
|
||||
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
|
||||
|
||||
yield_this_iteration = False
|
||||
|
||||
if chunk['event'] == 'workflow_finished':
|
||||
is_final = True
|
||||
yield_this_iteration = True
|
||||
if chunk['data'].get('error'):
|
||||
raise errors.DifyAPIError(chunk['data']['error'])
|
||||
|
||||
if chunk['event'] == 'workflow_paused':
|
||||
reasons = chunk['data'].get('reasons', [])
|
||||
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
|
||||
for reason in reasons:
|
||||
if reason.get('TYPE') != 'human_input_required':
|
||||
continue
|
||||
form_content = reason.get('form_content', '')
|
||||
actions = reason.get('actions', [])
|
||||
# Use a distinct name — `node_title` (the just-resolved step)
|
||||
# must keep its value so the resume notice on the previous
|
||||
# card still shows which step the user acted on.
|
||||
paused_node_title = reason.get('node_title', '')
|
||||
raw_inputs = reason.get('inputs', {})
|
||||
|
||||
_set_pending_form(
|
||||
user,
|
||||
{
|
||||
'workflow_run_id': new_run_id,
|
||||
'form_id': reason.get('form_id'),
|
||||
'form_token': reason.get('form_token'),
|
||||
'node_id': reason.get('node_id'),
|
||||
'node_title': paused_node_title,
|
||||
'form_content': form_content,
|
||||
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
|
||||
'actions': actions,
|
||||
'expiration_time': reason.get('expiration_time'),
|
||||
'user': user,
|
||||
},
|
||||
)
|
||||
|
||||
repause_form_data = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': paused_node_title,
|
||||
'workflow_run_id': new_run_id,
|
||||
'form_token': reason.get('form_token', ''),
|
||||
}
|
||||
# Ensure the final chunk has non-empty content so
|
||||
# ResponseWrapper (which skips empty-content chunks) lets it
|
||||
# propagate to SendResponseBackStage. Use a zero-width space
|
||||
# so neither Lark nor Telegram renders visible noise — the
|
||||
# adapter substitutes its own card text from _form_data.
|
||||
if not workflow_contents:
|
||||
workflow_contents = ''
|
||||
is_final = True
|
||||
yield_this_iteration = True
|
||||
break
|
||||
|
||||
if chunk['event'] == 'text_chunk':
|
||||
messsage_idx += 1
|
||||
if remove_think:
|
||||
if '<think>' in chunk['data']['text'] and not think_start:
|
||||
think_start = True
|
||||
continue
|
||||
if '</think>' in chunk['data']['text'] and not think_end:
|
||||
import re
|
||||
|
||||
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
|
||||
workflow_contents += content
|
||||
think_end = True
|
||||
elif think_end:
|
||||
workflow_contents += chunk['data']['text']
|
||||
if think_start:
|
||||
continue
|
||||
else:
|
||||
workflow_contents += chunk['data']['text']
|
||||
if messsage_idx % 8 == 0:
|
||||
yield_this_iteration = True
|
||||
|
||||
if yield_this_iteration:
|
||||
msg = provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=workflow_contents,
|
||||
is_final=is_final,
|
||||
)
|
||||
msg._resume_from_form = True
|
||||
if action_title:
|
||||
msg._resume_action_title = action_title
|
||||
if node_title:
|
||||
msg._resume_node_title = node_title
|
||||
if is_final and repause_form_data:
|
||||
msg._form_data = repause_form_data
|
||||
msg._open_new_card = True
|
||||
yield msg
|
||||
if is_final:
|
||||
return
|
||||
|
||||
async def _workflow_messages_chunk(
|
||||
self, query: pipeline_query.Query
|
||||
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
|
||||
"""调用工作流"""
|
||||
|
||||
# Check if this is a form action resume (button click or text match)
|
||||
form_action_raw = query.variables.get('_dify_form_action')
|
||||
session_key = _session_key_from_query(query)
|
||||
|
||||
if form_action_raw:
|
||||
form_action = self._merge_pending_form_action(session_key, form_action_raw)
|
||||
else:
|
||||
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
|
||||
|
||||
if form_action:
|
||||
_clear_pending_form(session_key, form_action.get('form_token') or None)
|
||||
# Resume paused workflow via submit endpoint
|
||||
async for msg in self._submit_workflow_form(form_action):
|
||||
yield msg
|
||||
return
|
||||
|
||||
if not query.session.using_conversation.uuid:
|
||||
query.session.using_conversation.uuid = str(uuid.uuid4())
|
||||
|
||||
@@ -672,6 +1084,13 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
think_start = False
|
||||
think_end = False
|
||||
workflow_contents = ''
|
||||
workflow_run_id = ''
|
||||
human_input_yielded = False
|
||||
|
||||
# Saved form data to attach to the final MessageChunk so the adapter
|
||||
# can detect it when is_final=True and render buttons.
|
||||
pending_form_data = None
|
||||
display_text = ''
|
||||
|
||||
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
|
||||
async for chunk in self.dify_client.workflow_run(
|
||||
@@ -682,7 +1101,62 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
):
|
||||
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
|
||||
if chunk['event'] in ignored_events:
|
||||
if chunk['event'] == 'workflow_started':
|
||||
workflow_run_id = chunk['data'].get('workflow_run_id', '')
|
||||
continue
|
||||
|
||||
if chunk['event'] == 'workflow_paused':
|
||||
reasons = chunk['data'].get('reasons', [])
|
||||
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
|
||||
for reason in reasons:
|
||||
if reason.get('TYPE') == 'human_input_required':
|
||||
form_content = reason.get('form_content', '')
|
||||
actions = reason.get('actions', [])
|
||||
node_title = reason.get('node_title', '')
|
||||
|
||||
# Persist form state in module-level store keyed by session
|
||||
raw_inputs = reason.get('inputs', {})
|
||||
_set_pending_form(
|
||||
_session_key_from_query(query),
|
||||
{
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'form_id': reason.get('form_id'),
|
||||
'form_token': reason.get('form_token'),
|
||||
'node_id': reason.get('node_id'),
|
||||
'node_title': node_title,
|
||||
'form_content': form_content,
|
||||
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
|
||||
'actions': actions,
|
||||
'expiration_time': reason.get('expiration_time'),
|
||||
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||
},
|
||||
)
|
||||
|
||||
# Pass form render metadata to downstream stages
|
||||
query.variables['_dify_form_render'] = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
}
|
||||
|
||||
action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
|
||||
display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
|
||||
workflow_contents += display_text + '\n'
|
||||
|
||||
# Save form data to attach to the final chunk later.
|
||||
# We do NOT yield here — the form content will be sent
|
||||
# as the final MessageChunk (with is_final=True and
|
||||
# _form_data) so the adapter can update the card and
|
||||
# add buttons in one pass.
|
||||
pending_form_data = {
|
||||
'form_content': form_content,
|
||||
'actions': actions,
|
||||
'node_title': node_title,
|
||||
'workflow_run_id': workflow_run_id,
|
||||
'form_token': reason.get('form_token', ''),
|
||||
}
|
||||
human_input_yielded = True
|
||||
|
||||
if chunk['event'] == 'workflow_finished':
|
||||
is_final = True
|
||||
if chunk['data']['error']:
|
||||
@@ -730,11 +1204,29 @@ class DifyServiceAPIRunner(runner.RequestRunner):
|
||||
yield msg
|
||||
|
||||
if messsage_idx % 8 == 0 or is_final:
|
||||
yield provider_message.MessageChunk(
|
||||
final_content = workflow_contents if workflow_contents.strip() else ''
|
||||
msg = provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=workflow_contents,
|
||||
content=final_content,
|
||||
is_final=is_final,
|
||||
)
|
||||
# Attach form data to the final chunk for the adapter
|
||||
if is_final and pending_form_data:
|
||||
msg._form_data = pending_form_data
|
||||
pending_form_data = None
|
||||
yield msg
|
||||
|
||||
# If the stream ended after workflow_paused without a
|
||||
# workflow_finished event, yield a final chunk so the adapter
|
||||
# can update the card and add buttons.
|
||||
if human_input_yielded and not is_final:
|
||||
msg = provider_message.MessageChunk(
|
||||
role='assistant',
|
||||
content=workflow_contents or display_text,
|
||||
is_final=True,
|
||||
)
|
||||
msg._form_data = pending_form_data
|
||||
yield msg
|
||||
|
||||
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
|
||||
"""运行请求"""
|
||||
|
||||
Reference in New Issue
Block a user