Compare commits

..

2 Commits

Author SHA1 Message Date
RockChinQ
4054ba2a76 docs(issue-template): add deployment version selector 2026-06-01 23:31:29 -04:00
Dongchuan Fu
c7cb42bd79 feat(lark): add domain configuration options for Lark adapter (#2220) 2026-05-27 15:34:35 +08:00
11 changed files with 177 additions and 1535 deletions

View File

@@ -10,6 +10,15 @@ body:
placeholder: 例如v3.3.0、CentOS x64 Python 3.10.3、Docker
validations:
required: true
- type: dropdown
attributes:
label: 部署版本
description: 请选择您使用的 LangBot 部署版本。
options:
- 社区版
- 云服务
validations:
required: true
- type: textarea
attributes:
label: 异常情况

View File

@@ -10,6 +10,15 @@ body:
placeholder: "For example: v3.3.0, CentOS x64 Python 3.10.3, Docker"
validations:
required: true
- type: dropdown
attributes:
label: Deployment version
description: Please select the LangBot deployment version you are using.
options:
- Community Edition
- Cloud Service
validations:
required: true
- type: textarea
attributes:
label: Exception

View File

@@ -109,61 +109,6 @@ class AsyncDifyServiceClient:
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
async def workflow_submit(
self,
form_token: str,
workflow_run_id: str,
inputs: dict[str, typing.Any],
user: str,
action: str = '',
timeout: float = 120.0,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""Submit human input to resume a paused workflow, then stream events.
1. POST /form/human_input/{form_token} to submit the form
2. GET /workflow/{task_id}/events to stream the resumed workflow events
"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
}
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
# Step 1: Submit the form
payload: dict[str, typing.Any] = {
'inputs': inputs if isinstance(inputs, dict) else {},
'user': user,
'action': action,
}
submit_resp = await client.post(
f'/form/human_input/{form_token}',
headers=headers,
json=payload,
)
if submit_resp.status_code != 200:
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
# Step 2: Stream resumed workflow events
async with client.stream(
'GET',
f'/workflow/{workflow_run_id}/events',
headers={'Authorization': f'Bearer {self.api_key}'},
params={'user': user},
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f'{r.status_code} {chunk}')
if chunk.strip() == '':
continue
if chunk.startswith('data:'):
yield json.loads(chunk[5:])
async def upload_file(
self,
file: httpx._types.FileTypes,

View File

@@ -157,7 +157,7 @@ class RuntimePipeline:
bot_message=query.resp_messages[-1],
message=result.user_notice,
quote_origin=query.pipeline_config['output']['misc']['quote-origin'],
is_final=[msg.is_final for msg in query.resp_messages][-1],
is_final=[msg.is_final for msg in query.resp_messages][0],
)
else:
await query.adapter.reply_message(

View File

@@ -42,13 +42,9 @@ class QueryPool:
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
routed_by_rule: bool = False,
variables: typing.Optional[dict[str, typing.Any]] = None,
) -> pipeline_query.Query:
async with self.condition:
query_id = self.query_id_counter
initial_variables: dict[str, typing.Any] = {'_routed_by_rule': routed_by_rule}
if variables:
initial_variables.update(variables)
query = pipeline_query.Query(
bot_uuid=bot_uuid,
query_id=query_id,
@@ -57,7 +53,7 @@ class QueryPool:
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
variables=initial_variables,
variables={'_routed_by_rule': routed_by_rule},
resp_messages=[],
resp_message_chain=[],
adapter=adapter,

View File

@@ -40,7 +40,7 @@ class SendResponseBackStage(stage.PipelineStage):
has_chunks = any(isinstance(msg, provider_message.MessageChunk) for msg in query.resp_messages)
# TODO 命令与流式的兼容性问题
if await query.adapter.is_stream_output_supported() and has_chunks:
is_final = [msg.is_final for msg in query.resp_messages][-1]
is_final = [msg.is_final for msg in query.resp_messages][0]
await query.adapter.reply_message_chunk(
message_source=query.message_event,
bot_message=query.resp_messages[-1],

View File

@@ -501,8 +501,6 @@ class PlatformManager:
bot_entity.adapter_config,
logger,
)
if hasattr(adapter_inst, 'ap'):
adapter_inst.ap = self.ap
# 如果 adapter 支持 set_bot_uuid 方法,设置 bot_uuid用于统一 webhook
if hasattr(adapter_inst, 'set_bot_uuid'):

View File

@@ -31,7 +31,6 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
import langbot_plugin.api.entities.builtin.provider.session as provider_session
class AESCipher(object):
@@ -771,7 +770,6 @@ CARD_ID_CACHE_MAX_LIFETIME = 20 * 60 # 20分钟
class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识
lark_tenant_key: str = pydantic.Field(exclude=True, default='') # 飞书企业key
@@ -794,16 +792,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
pending_monitoring_msg: dict[str, str]
# Final: reply Lark message ID → (monitoring_message_id, timestamp) (used by feedback callbacks)
reply_to_monitoring_msg: dict[str, tuple[str, float]]
reply_message_card_ids: dict[str, str]
card_sequence_dict: dict[str, int]
# card_id → set of source message ids registered against it (for cleanup)
card_id_to_source_ids: dict[str, set[str]]
# card_id → current streaming_txt content cache (needed for full aupdate during resume transition)
card_streaming_text: dict[str, str]
# card_id → pre-pause streaming_txt text (captured when resume first chunk arrives)
card_pre_pause_text: dict[str, str]
# set of card_ids that have already transitioned from "buttons visible" to "resume layout"
card_resume_transitioned: set[str]
_MONITORING_MAPPING_TTL = 600 # 10 minutes
seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识
@@ -824,134 +812,11 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
asyncio.create_task(on_message(event))
def schedule_on_app_loop(coro):
"""Run a coroutine on the application event loop from sync callbacks."""
return asyncio.run_coroutine_threadsafe(coro, self.ap.event_loop)
def sync_on_card_action(event):
try:
action_value_raw = getattr(getattr(event.event, 'action', None), 'value', {})
# Parse JSON string values (from form action buttons)
if isinstance(action_value_raw, str):
try:
action_value_obj = json.loads(action_value_raw)
except (json.JSONDecodeError, TypeError):
action_value_obj = {}
else:
action_value_obj = action_value_raw if isinstance(action_value_raw, dict) else {}
action_value_obj = getattr(getattr(event.event, 'action', None), 'value', {})
action_value = action_value_obj.get('feedback', '') if isinstance(action_value_obj, dict) else ''
# Handle Dify form action button clicks
if isinstance(action_value_obj, dict) and action_value_obj.get('form_action'):
form_token = action_value_obj.get('form_token', '')
workflow_run_id = action_value_obj.get('workflow_run_id', '')
action_id = action_value_obj.get('action_id', '')
session_key = action_value_obj.get('session_key', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
# Find the bot entity to get bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for bot in self.ap.platform_mgr.bots:
if bot.adapter is self:
bot_uuid = bot.bot_entity.uuid
pipeline_uuid = bot.bot_entity.use_pipeline_uuid
break
form_action_data = {
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
context = getattr(event.event, 'context', None)
open_message_id = getattr(context, 'open_message_id', None)
source_time = datetime.datetime.now()
event_time = source_time.timestamp()
action_text = action_value_obj.get('action_id', 'confirm')
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_text}]')]
)
if open_message_id:
message_chain.insert(
0,
platform_message.Source(
id=open_message_id,
time=source_time,
),
)
operator = getattr(event.event, 'operator', None)
user_id = (
getattr(operator, 'open_id', None) or getattr(operator, 'user_id', None) or str(launcher_id)
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
time=event_time,
source_platform_object=event,
)
async def add_form_action_query():
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
schedule_on_app_loop(add_form_action_query())
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '操作成功'}})
if action_value == '有帮助':
feedback_type = 1
elif action_value == '无帮助':
@@ -992,14 +857,17 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
)
if platform_events.FeedbackEvent in self.listeners:
schedule_on_app_loop(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
else:
loop.run_until_complete(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
except Exception:
traceback.print_exc()
schedule_on_app_loop(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
asyncio.create_task(self.logger.error(f'Error in lark card action callback: {traceback.format_exc()}'))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'error', 'content': '反馈处理失败'}})
@@ -1013,7 +881,8 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot_account_id = config['bot_name']
bot = lark_oapi.ws.Client(config['app_id'], config['app_secret'], event_handler=event_handler)
domain = self._resolve_domain(config)
bot = lark_oapi.ws.Client(config['app_id'], config['app_secret'], event_handler=event_handler, domain=domain)
api_client = self.build_api_client(config)
cipher = AESCipher(config.get('encrypt-key', ''))
self.request_app_ticket(api_client, config)
@@ -1025,12 +894,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
card_id_dict={},
pending_monitoring_msg={},
reply_to_monitoring_msg={},
reply_message_card_ids={},
card_sequence_dict={},
card_id_to_source_ids={},
card_streaming_text={},
card_pre_pause_text={},
card_resume_transitioned=set(),
seq=1,
listeners={},
quart_app=quart_app,
@@ -1152,13 +1015,28 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return None
@staticmethod
def _resolve_domain(config) -> str:
domain = config.get('domain', lark_oapi.FEISHU_DOMAIN)
if domain == 'custom':
domain = config.get('custom_domain', '')
if not domain:
raise ValueError('Custom domain is required when domain is set to "custom"')
return domain.rstrip('/')
def build_api_client(self, config):
app_id = config['app_id']
app_secret = config['app_secret']
api_client = lark_oapi.Client.builder().app_id(app_id).app_secret(app_secret).build()
domain = self._resolve_domain(config)
api_client = lark_oapi.Client.builder().app_id(app_id).app_secret(app_secret).domain(domain).build()
if 'isv' == config.get('app_type', 'self'):
api_client = (
lark_oapi.Client.builder().app_id(app_id).app_secret(app_secret).app_type(lark_oapi.AppType.ISV).build()
lark_oapi.Client.builder()
.app_id(app_id)
.app_secret(app_secret)
.app_type(lark_oapi.AppType.ISV)
.domain(domain)
.build()
)
return api_client
@@ -1270,33 +1148,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
for k in expired:
del self.reply_to_monitoring_msg[k]
def _next_card_sequence(self, card_id: str, suggested: int = 1) -> int:
"""Return the next strictly increasing sequence for a card update."""
current = self.card_sequence_dict.get(card_id, 0)
next_seq = max(current + 1, suggested)
self.card_sequence_dict[card_id] = next_seq
return next_seq
def _register_card_for_source(self, card_id: str, *source_ids: str) -> None:
"""Register a card_id under one or more source message ids."""
bucket = self.card_id_to_source_ids.setdefault(card_id, set())
for sid in source_ids:
if not sid:
continue
self.reply_message_card_ids[sid] = card_id
bucket.add(sid)
def _drop_card_state(self, card_id: str) -> None:
"""Pop all per-card state for the given card_id."""
if not card_id:
return
for sid in self.card_id_to_source_ids.pop(card_id, set()):
self.reply_message_card_ids.pop(sid, None)
self.card_sequence_dict.pop(card_id, None)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
async def create_card_id(self, message_id):
try:
# self.logger.debug('飞书支持stream输出,创建卡片......')
@@ -1492,7 +1343,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
self.card_id_dict[message_id] = response.data.card_id
card_id = response.data.card_id
self.card_sequence_dict[card_id] = 0
return card_id
except Exception as e:
@@ -1505,12 +1355,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
"""
# message_id = event.message_chain.message_id
source_message_id = str(event.message_chain.message_id)
existing_card_id = self.reply_message_card_ids.get(source_message_id)
if existing_card_id:
self.card_id_dict[message_id] = existing_card_id
return True
card_id = await self.create_card_id(message_id)
content = {
'type': 'card',
@@ -1549,16 +1393,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
user_msg_id = event.message_chain.message_id
reply_msg_id = getattr(response.data, 'message_id', None)
monitoring_msg_id = self.pending_monitoring_msg.pop(user_msg_id, None)
# Register the card under both the user-incoming msg id (so a
# second reply_message_first_chunk for the same user message
# reuses this card) AND the bot-reply msg id (so a synthetic
# event from a form-button callback — whose Source.id equals
# the bot's card message id — hits the same card and renders
# the resume content into it).
if reply_msg_id:
self._register_card_for_source(card_id, str(user_msg_id), str(reply_msg_id))
else:
self._register_card_for_source(card_id, str(user_msg_id))
if reply_msg_id and monitoring_msg_id:
self.reply_to_monitoring_msg[reply_msg_id] = (monitoring_msg_id, time.time())
self._cleanup_monitoring_mapping()
@@ -1567,93 +1401,6 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return True
async def _open_new_form_card(
self,
message_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
) -> str | None:
"""Spawn a fresh card to host a re-paused human-input prompt.
Creates a new card_id (rebinding ``self.card_id_dict[message_id]``),
replies it to the current incoming message so it appears as the next
step in the chat, registers the new reply_msg_id so subsequent button
callbacks resolve back to it, and renders the prompt + buttons on it.
Returns the new card_id, or ``None`` if creation failed (caller is
responsible for falling back to in-place update so the workflow
remains continuable).
"""
source_message_id = getattr(message_source.message_chain, 'message_id', None)
if not source_message_id:
await self.logger.error('Cannot open new form card: source message_id missing')
return None
try:
new_card_id = await self.create_card_id(message_id)
except Exception:
await self.logger.error(f'Failed to create new form card: {traceback.format_exc()}')
return None
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
content = {
'type': 'card',
'data': {'card_id': new_card_id, 'template_variable': {'content': ''}},
}
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(str(source_message_id))
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(content))
.msg_type('interactive')
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
try:
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(request, req_opt)
except Exception:
await self.logger.error(f'Failed to send new form card: {traceback.format_exc()}')
return None
if not response.success():
await self.logger.error(
f'Failed to send new form card: code={response.code}, msg={response.msg}, '
f'log_id={response.get_log_id()}'
)
return None
reply_msg_id = getattr(response.data, 'message_id', None)
if reply_msg_id:
self._register_card_for_source(new_card_id, str(source_message_id), str(reply_msg_id))
sequence = self._next_card_sequence(new_card_id, 1)
await self._update_card_layout(
card_id=new_card_id,
message_source=message_source,
text_message='',
sequence=sequence,
form_data=form_data,
show_form_prompt=True,
)
return new_card_id
async def reply_message(
self,
message_source: platform_events.MessageEvent,
@@ -1773,492 +1520,45 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
):
"""
回复消息变成更新卡片消息
Supports Dify form-action resume: when the runner yields a chunk with
``_resume_from_form=True``, the card transitions from buttons to a
grey "已选择" notice and a new ``streaming_txt_resume`` element is added
for subsequent resume chunks to stream into.
When ``_open_new_card=True`` on the final chunk, the existing card is
left as-is and the pipeline will create a new card (with fresh form
buttons) for the re-pause.
"""
# self.seq += 1
message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence
if msg_seq % 8 == 0 or is_final:
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
form_data = getattr(bot_message, '_form_data', None)
resume_from = getattr(bot_message, '_resume_from_form', False)
action_title = getattr(bot_message, '_resume_action_title', '')
resume_node_title = getattr(bot_message, '_resume_node_title', '')
open_new_card = getattr(bot_message, '_open_new_card', False)
if action_title:
if resume_node_title:
selected_notice = f'**{resume_node_title}**\n已选择:{action_title}'
else:
selected_notice = f'**已选择**{action_title}'
else:
selected_notice = ''
text_message = ''
if text_elements:
parts = []
for paragraph in text_elements:
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
# ── decide whether this chunk needs a card update ────────────────────
card_id = self.card_id_dict.get(message_id)
if not card_id:
return
# content = {
# 'type': 'card_json',
# 'data': {'card_id': self.card_id_dict[message_id], 'elements': {'content': text_message}},
# }
# ── convert message chain → text ─────────────────────────────────────
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
text_message = ''
if text_elements:
parts = []
for paragraph in text_elements:
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
tenant_key = (
message_source.source_platform_object.header.tenant_key if message_source.source_platform_object else None
)
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
req_opt: RequestOption = (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)
card_sequence = self._next_card_sequence(card_id, msg_seq)
# ── RESUME: first chunk after button click ───────────────────────────
if resume_from and card_id not in self.card_resume_transitioned:
# Transition the card from the form state into resume mode.
# Preserve the text that was shown before the pause, and seed the
# resume placeholder with the current resume content if we already
# have any on the first yielded chunk.
pre_pause_text = self.card_pre_pause_text.get(card_id) or self.card_streaming_text.get(card_id, '')
initial_resume_text = text_message or '\u200b'
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=initial_resume_text,
)
self.card_resume_transitioned.add(card_id)
self.card_pre_pause_text[card_id] = pre_pause_text
self.card_streaming_text[card_id] = text_message
if not is_final:
return
# ── RESUME: subsequent chunks → full card update ─────────────────────
if resume_from and card_id in self.card_resume_transitioned:
cached = self.card_streaming_text.get(card_id, '')
if text_message != cached:
self.card_streaming_text[card_id] = text_message
pre_pause_text = self.card_pre_pause_text.get(card_id, '')
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause_text,
sequence=card_sequence,
form_data=None,
notice_text=selected_notice,
resume_placeholder_text=text_message,
)
if not is_final:
return
# ── NORMAL streaming (non-resume): update streaming_txt in-place ──────
if not resume_from and (msg_seq % 8 == 0 or is_final):
cached = self.card_streaming_text.get(card_id)
if text_message != cached:
self.card_streaming_text[card_id] = text_message
request: ContentCardElementRequest = (
ContentCardElementRequest.builder()
.card_id(card_id)
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder().content(text_message).sequence(card_sequence).build()
)
request: ContentCardElementRequest = (
ContentCardElementRequest.builder()
.card_id(self.card_id_dict[message_id])
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder()
# .uuid("a0d69e20-1dd1-458b-k525-dfeca4015204")
.content(text_message)
.sequence(msg_seq)
.build()
)
response: ContentCardElementResponse = await self.api_client.cardkit.v1.card_element.acontent(
request, req_opt
)
if not response.success():
raise Exception(
f'client.cardkit.v1.card_element.acontent failed, code: {response.code}, '
f'msg: {response.msg}, log_id: {response.get_log_id()}, '
f'resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
# ── FINAL chunk: full card layout update ─────────────────────────────
if is_final:
final_seq = self._next_card_sequence(card_id, card_sequence + 1)
pre_pause = self.card_pre_pause_text.get(card_id, text_message)
resume_cached = self.card_streaming_text.get(card_id, '')
if form_data:
if open_new_card:
# The old card has already been laid out into resume mode
# by the resume-transition block above (notice + resume
# placeholder). Finalise it as a frozen step snapshot and
# spawn a brand-new card to host the next human-input
# prompt — each step stays visible as its own card in the
# chat history.
new_card_id = await self._open_new_form_card(message_id, message_source, form_data)
if new_card_id is None:
# Fallback: keep the existing in-place behaviour so the
# workflow remains continuable even if creating the
# new card failed.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=form_data,
resume_placeholder_text=resume_cached,
show_form_prompt=True,
)
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
else:
# The old card is now a frozen snapshot; let go of its
# streaming-side state but keep its source registrations
# intact (no _drop_card_state) so historical button
# callbacks aimed at it can still be matched if needed.
self.card_streaming_text.pop(card_id, None)
self.card_pre_pause_text.pop(card_id, None)
self.card_resume_transitioned.discard(card_id)
else:
# Initial pause path: render prompt + buttons in place on
# the current card.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=final_seq,
form_data=form_data,
show_form_prompt=True,
)
# The human-input prompt itself is rendered as buttons only
# on Lark, so do not keep the hidden fallback text around;
# otherwise it will resurface after the button click.
self.card_streaming_text[card_id] = ''
self.card_pre_pause_text[card_id] = ''
else:
# Normal finish: keep pre-pause + resume content visible,
# remove buttons/notice, drop the resume placeholder.
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=pre_pause,
sequence=final_seq,
form_data=None,
notice_text=selected_notice if resume_from else '',
resume_placeholder_text=resume_cached,
)
self._drop_card_state(card_id)
self.card_id_dict.pop(message_id, None)
# ── media (images / files) appended at the end ───────────────────────
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def _add_form_buttons_to_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
form_data: dict,
text_message: str = '',
sequence: int = 1,
):
"""Update the entire card to include form action buttons.
Uses card.aupdate to replace the card JSON with a template that
includes the streaming text content plus interactive buttons.
"""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=form_data,
)
async def _remove_form_buttons_from_card(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
):
"""Replace the human-input card layout with the plain final layout."""
await self._update_card_layout(
card_id=card_id,
message_source=message_source,
text_message=text_message,
sequence=sequence,
form_data=None,
)
async def _update_card_layout(
self,
card_id: str,
message_source: platform_events.MessageEvent,
text_message: str = '',
sequence: int = 1,
form_data: dict | None = None,
notice_text: str = '',
resume_placeholder_text: str = '',
show_form_prompt: bool = True,
):
"""Update the entire card layout.
• form_data → show interactive buttons (initial Dify pause)
• notice_text → replace buttons with a grey "已选择" notice (resume transition)
• resume_placeholder_text → add a streaming_txt_resume markdown element
"""
form_data = form_data or {}
actions = form_data.get('actions', [])
form_token = form_data.get('form_token', '')
workflow_run_id = form_data.get('workflow_run_id', '')
node_title = form_data.get('node_title', '') or 'Human Input Required'
form_content = form_data.get('form_content', '')
# When form_data is set, the visible content is rendered inside the
# interactive container, so the top streaming text should stay empty
# to avoid duplicate text above the action area.
#
# For resume notice state, keep the existing text visible in the card
# and only add the grey "selected" notice below it.
if form_data:
render_text_message = ''
else:
render_text_message = text_message
# Determine session key from message source
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'group_{message_source.group.id}'
else:
session_key = f'person_{message_source.sender.id}'
# Build button elements matching the existing card template's thumbsup/down format
action_buttons = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
button_style = action.get('button_style', 'default')
if button_style == 'primary':
lark_button_type = 'primary'
elif button_style == 'danger':
lark_button_type = 'danger'
else:
lark_button_type = 'default'
action_buttons.append(
{
'tag': 'button',
'text': {'tag': 'plain_text', 'content': action_title},
'type': lark_button_type,
'width': 'fill',
'size': 'medium',
'hover_tips': {'tag': 'plain_text', 'content': action_title},
'behaviors': [
{
'type': 'callback',
'value': {
'form_action': True,
'form_token': form_token,
'workflow_run_id': workflow_run_id,
'action_id': action_id,
'session_key': session_key,
},
}
],
'margin': '0px 0px 0px 0px',
}
.build()
)
interactive_elements = []
if form_data:
if show_form_prompt:
interactive_elements = [
{
'tag': 'markdown',
'content': f'**[Human Input Required] {node_title}**',
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 4px 0px',
}
]
if form_content:
interactive_elements.append(
{
'tag': 'markdown',
'content': form_content,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 8px 0px',
}
)
interactive_elements.append(
{
'tag': 'column_set',
'horizontal_spacing': '8px',
'horizontal_align': 'left',
'margin': '0px 0px 0px 0px',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [btn],
'padding': '0px 0px 0px 0px',
}
for btn in action_buttons
],
}
)
if is_final and bot_message.tool_calls is None:
# self.seq = 1 # 消息回复结束之后重置seq
self.card_id_dict.pop(message_id) # 清理已经使用过的卡片
# Build the full card JSON with buttons, same structure as create_card_id
# ── mid_section: either form buttons, resume notice, or empty ──
mid_section_elements = []
if form_data:
mid_section_elements = [
{
'tag': 'interactive_container',
'margin': '12px 0px 8px 0px',
'padding': '12px 12px 12px 12px',
'has_border': True,
'elements': interactive_elements,
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
elif notice_text:
mid_section_elements = [
{
'tag': 'markdown',
'content': notice_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '8px 0px 4px 0px',
'text_color': 'grey',
},
{'tag': 'hr', 'margin': '0px 0px 0px 0px'},
]
# ── resume placeholder element (empty, filled via acontent on each chunk) ──
resume_elements = []
if resume_placeholder_text:
resume_elements = [
{
'tag': 'markdown',
'content': resume_placeholder_text,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt_resume',
},
]
card_data = {
'schema': '2.0',
'config': {
'update_multi': True,
'streaming_mode': False,
},
'body': {
'direction': 'vertical',
'padding': '12px 12px 12px 12px',
'elements': [
{
'tag': 'div',
'text': {
'tag': 'plain_text',
'content': 'LangBot',
'text_size': 'normal',
'text_align': 'left',
'text_color': 'default',
},
'icon': {
'tag': 'custom_icon',
'img_key': 'img_v3_02p3_05c65d5d-9bad-440a-a2fb-c89571bfd5bg',
},
},
{
'tag': 'markdown',
'content': render_text_message,
'text_align': 'left',
'text_size': 'normal',
'margin': '0px 0px 0px 0px',
'element_id': 'streaming_txt',
},
*mid_section_elements,
*resume_elements,
{
'tag': 'column_set',
'horizontal_spacing': '12px',
'horizontal_align': 'right',
'columns': [
{
'tag': 'column',
'width': 'weighted',
'elements': [
{
'tag': 'markdown',
'content': '<font color="grey-600">以上内容由 AI 生成,仅供参考。更多详细、准确信息可点击引用链接查看</font>',
'text_align': 'left',
'text_size': 'notation',
'margin': '4px 0px 0px 0px',
'icon': {
'tag': 'standard_icon',
'token': 'robot_outlined',
'color': 'grey',
},
}
],
'padding': '0px 0px 0px 0px',
'direction': 'vertical',
'horizontal_spacing': '8px',
'vertical_spacing': '8px',
'horizontal_align': 'left',
'vertical_align': 'top',
'margin': '0px 0px 0px 0px',
'weight': 1,
}
],
'margin': '0px 0px 4px 0px',
},
],
},
}
try:
tenant_key = (
message_source.source_platform_object.header.tenant_key
if message_source.source_platform_object
@@ -2274,27 +1574,39 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
.tenant_access_token(tenant_access_token)
.build()
)
# 发起请求
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(request, req_opt)
request: UpdateCardRequest = (
UpdateCardRequest.builder()
.card_id(card_id)
.request_body(
UpdateCardRequestBody.builder()
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.card(Card.builder().type('card_json').data(json.dumps(card_data)).build())
.build()
)
.build()
)
response: UpdateCardResponse = await self.api_client.cardkit.v1.card.aupdate(request, req_opt)
# 处理失败返回
if not response.success():
await self.logger.error(
f'Failed to update lark card with form buttons: code={response.code}, msg={response.msg}, '
f'log_id={response.get_log_id()}, resp={getattr(getattr(response, "raw", None), "content", None)}'
raise Exception(
f'client.im.v1.message.patch failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
except Exception:
await self.logger.error(f'Error updating lark card with form buttons: {traceback.format_exc()}')
return
# Send media messages when streaming is done
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def is_muted(self, group_id: int) -> bool:
return False

View File

@@ -23,6 +23,57 @@ spec:
en: https://link.langbot.app/en/platforms/lark
ja: https://link.langbot.app/ja/platforms/lark
config:
- name: domain
label:
en_US: Platform Domain
zh_Hans: 平台域名
zh_Hant: 平台域名
ja_JP: プラットフォームドメイン
description:
en_US: Select the open platform domain. Use Feishu for Chinese mainland, Lark for international
zh_Hans: 选择开放平台域名,国内使用飞书,海外使用 Lark
zh_Hant: 選擇開放平台域名,國內使用飛書,海外使用 Lark
ja_JP: オープンプラットフォームのドメインを選択。中国国内は飛書、海外は Lark を使用
type: select
options:
- name: https://open.feishu.cn
label:
en_US: Feishu (open.feishu.cn)
zh_Hans: 飞书 (open.feishu.cn)
zh_Hant: 飛書 (open.feishu.cn)
ja_JP: 飛書 (open.feishu.cn)
- name: https://open.larksuite.com
label:
en_US: Lark (open.larksuite.com)
zh_Hans: Lark (open.larksuite.com)
zh_Hant: Lark (open.larksuite.com)
ja_JP: Lark (open.larksuite.com)
- name: custom
label:
en_US: Custom
zh_Hans: 自定义
zh_Hant: 自定義
ja_JP: カスタム
required: false
default: https://open.feishu.cn
- name: custom_domain
label:
en_US: Custom Domain
zh_Hans: 自定义域名
zh_Hant: 自定義域名
ja_JP: カスタムドメイン
description:
en_US: "Enter the full domain URL, e.g. https://open.example.com"
zh_Hans: "输入完整的域名 URL例如 https://open.example.com"
zh_Hant: "輸入完整的域名 URL例如 https://open.example.com"
ja_JP: "完全なドメイン URL を入力(例: https://open.example.com"
type: string
required: false
default: ""
show_if:
field: domain
operator: eq
value: custom
- name: one-click-create
label:
en_US: One-Click Create App
@@ -140,10 +191,10 @@ spec:
zh_Hant: 應用類型
ja_JP: アプリタイプ
description:
en_US: Default to self-built application, refer to https://open.feishu.cn/document/platform-overveiw/overview
zh_Hans: 默认为企业自建应用,参考 https://open.feishu.cn/document/platform-overveiw/overview
zh_Hant: 預設為企業自建應用,參考 https://open.feishu.cn/document/platform-overveiw/overview
ja_JP: デフォルトはカスタムアプリです。詳細は https://open.feishu.cn/document/platform-overveiw/overview を参照してください
en_US: "Default to self-built application, refer to https://open.feishu.cn/document/platform-overveiw/overview"
zh_Hans: "默认为企业自建应用,参考 https://open.feishu.cn/document/platform-overveiw/overview"
zh_Hant: "預設為企業自建應用,參考 https://open.feishu.cn/document/platform-overveiw/overview"
ja_JP: "デフォルトはカスタムアプリです。詳細は https://open.feishu.cn/document/platform-overveiw/overview を参照してください"
type: select
options:
- name: self

View File

@@ -1,14 +1,14 @@
from __future__ import annotations
import time
import telegram
import telegram.ext
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, CallbackQueryHandler, filters
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
import telegramify_markdown
import typing
import traceback
import json
import base64
import pydantic
@@ -189,7 +189,6 @@ class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: telegram.Bot = pydantic.Field(exclude=True)
application: telegram.ext.Application = pydantic.Field(exclude=True)
ap: typing.Any = pydantic.Field(exclude=True, default=None)
message_converter: TelegramMessageConverter = TelegramMessageConverter()
event_converter: TelegramEventConverter = TelegramEventConverter()
@@ -225,102 +224,6 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
telegram_callback,
)
)
async def callback_query_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
try:
data = json.loads(query.data)
if data.get('form_action') or data.get('f'):
import langbot_plugin.api.entities.builtin.provider.session as provider_session
workflow_run_id = data.get('workflow_run_id', '')
w_suffix = data.get('w', '')
action_id = data.get('action_id') or data.get('a', '')
session_key = data.get('session_key') or data.get('s', '')
if session_key.startswith('group_') or session_key.startswith('g:'):
launcher_type = provider_session.LauncherTypes.GROUP
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('g:')
else session_key[len('group_') :]
)
else:
launcher_type = provider_session.LauncherTypes.PERSON
launcher_id = (
session_key.split(':', 1)[1]
if session_key.startswith('p:')
else session_key[len('person_') :]
)
user_id = str(query.from_user.id)
# Find bot_uuid and pipeline_uuid
bot_uuid = ''
pipeline_uuid = None
for b in self.ap.platform_mgr.bots:
if b.adapter is self:
bot_uuid = b.bot_entity.uuid
pipeline_uuid = b.bot_entity.use_pipeline_uuid
break
form_action_data = {
'workflow_run_id': workflow_run_id,
'w_suffix': w_suffix,
'action_id': action_id,
'user': f'{launcher_type.value}_{launcher_id}',
'inputs': {},
}
message_chain = platform_message.MessageChain(
[platform_message.Plain(text=f'[Form Action: {action_id}]')]
)
if launcher_type == provider_session.LauncherTypes.GROUP:
synthetic_event = platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=user_id,
member_name='',
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=launcher_id,
name='',
permission=platform_entities.Permission.Member,
),
),
message_chain=message_chain,
source_platform_object=update,
)
else:
synthetic_event = platform_events.FriendMessage(
sender=platform_entities.Friend(
id=user_id,
nickname='',
remark='',
),
message_chain=message_chain,
source_platform_object=update,
)
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=user_id,
message_event=synthetic_event,
message_chain=message_chain,
adapter=self,
pipeline_uuid=pipeline_uuid,
variables={
'_dify_form_action': form_action_data,
'_routed_by_rule': True,
},
)
except Exception:
await self.logger.error(f'Error in telegram callback query: {traceback.format_exc()}')
application.add_handler(CallbackQueryHandler(callback_query_handler))
super().__init__(
config=config,
logger=logger,
@@ -416,19 +319,14 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
update = event.source_platform_object
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
message_thread_id = update.message.message_thread_id
if chat_type == 'private':
import time as _time
draft_id = int(_time.time() * 1000)
draft_id = int(time.time() * 1000)
self.msg_stream_id[message_id] = ('private', draft_id)
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
try:
await self.bot.send_message_draft(**args)
except (telegram.error.RetryAfter, telegram.error.BadRequest):
pass
await self.bot.send_message_draft(**args)
else:
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
send_msg = await self.bot.send_message(**args)
@@ -449,13 +347,12 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
assert isinstance(message_source.source_platform_object, Update)
update = message_source.source_platform_object
chat_id = update.effective_chat.id
effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
message_thread_id = update.message.message_thread_id
if message_id not in self.msg_stream_id:
return
chat_mode, stream_id = self.msg_stream_id[message_id]
chat_mode, draft_id = self.msg_stream_id[message_id]
components = await TelegramMessageConverter.yiri2target(message, self.bot)
if not components or components[0]['type'] != 'text':
@@ -464,42 +361,16 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
return
content = components[0]['text']
form_data = getattr(bot_message, '_form_data', None)
if form_data and is_final:
self.msg_stream_id.pop(message_id, None)
await self._send_form_action_buttons(message_source, form_data)
return
if chat_mode == 'private':
# Streaming via draft (ephemeral preview in the chat input area)
if (msg_seq - 1) % 8 == 0 or is_final:
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=stream_id)
try:
await self.bot.send_message_draft(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = content[:4000] + '\n\n… (truncated)'
try:
await self.bot.send_message_draft(**args)
except telegram.error.RetryAfter:
pass
else:
pass # Ignore other draft errors (cosmetic)
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id)
await self.bot.send_message_draft(**args)
if is_final and bot_message.tool_calls is None:
# Finalise: send the real message, discard the draft
args = self._build_message_args(chat_id, content, message_thread_id)
try:
await self.bot.send_message(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = content[:4000] + '\n\n… (truncated)'
await self.bot.send_message(**args)
else:
raise
del args['draft_id']
await self.bot.send_message(**args)
self.msg_stream_id.pop(message_id)
else:
# Streaming via edit_message_text (persistent message)
stream_id = draft_id
if (msg_seq - 1) % 8 == 0 or is_final:
args = {
'message_id': stream_id,
@@ -508,68 +379,11 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
}
if self.config.get('markdown_card', False):
args['parse_mode'] = 'MarkdownV2'
try:
await self.bot.edit_message_text(**args)
except telegram.error.BadRequest as exc:
if 'Message_too_long' in str(exc):
args['text'] = self._process_markdown(content[:4000] + '\n\n… (truncated)')
await self.bot.edit_message_text(**args)
else:
raise
await self.bot.edit_message_text(**args)
if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id)
async def _send_form_action_buttons(
self,
message_source: platform_events.MessageEvent,
form_data: dict,
):
"""Send inline keyboard buttons for Dify human_input_required form actions."""
actions = form_data.get('actions', [])
node_title = form_data.get('node_title', '')
form_content = form_data.get('form_content', '')
workflow_run_id = form_data.get('workflow_run_id', '')
# Telegram callback_data is capped at 64 bytes, so we identify the
# paused workflow by the last 8 chars of workflow_run_id (unique
# within a session with overwhelming probability).
w_suffix = workflow_run_id[-8:] if workflow_run_id else ''
if isinstance(message_source, platform_events.GroupMessage):
session_key = f'g:{message_source.group.id}'
else:
session_key = f'p:{message_source.sender.id}'
keyboard = []
for action in actions:
action_id = action.get('id', '')
action_title = action.get('title', action_id)
callback_payload = {'f': 1, 'a': action_id, 's': session_key}
if w_suffix:
callback_payload['w'] = w_suffix
callback_data = json.dumps(callback_payload, separators=(',', ':'))
keyboard.append([InlineKeyboardButton(action_title, callback_data=callback_data)])
reply_markup = InlineKeyboardMarkup(keyboard)
update = message_source.source_platform_object
chat_id = update.effective_chat.id
effective_message = update.effective_message
message_thread_id = getattr(effective_message, 'message_thread_id', None) if effective_message else None
text_lines = [f'[{node_title}] Please select an action:']
if form_content:
text_lines.insert(0, form_content)
args = {
'chat_id': chat_id,
'text': '\n\n'.join(text_lines),
'reply_markup': reply_markup,
}
if message_thread_id:
args['message_thread_id'] = message_thread_id
await self.bot.send_message(**args)
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
if not isinstance(event.source_platform_object, Update):
return None

View File

@@ -2,11 +2,9 @@ from __future__ import annotations
import typing
import json
import time
import uuid
import base64
import mimetypes
from collections import OrderedDict
from langbot.pkg.provider import runner
@@ -18,102 +16,6 @@ from langbot.libs.dify_service_api.v1 import client, errors
import httpx
# Module-level store for paused-workflow form state, keyed by session key
# (launcher_type_value + "_" + launcher_id). Each session holds an
# insertion-ordered dict of form_token -> form_data, allowing multiple
# Dify workflows to be paused simultaneously for the same session.
_PENDING_FORMS: dict[str, 'OrderedDict[str, dict[str, typing.Any]]'] = {}
_PENDING_FORM_DEFAULT_TTL = 30 * 60 # 30 minutes safety cap
def _session_key_from_query(query: pipeline_query.Query) -> str:
return f'{query.session.launcher_type.value}_{query.session.launcher_id}'
def _prune_pending_forms(now: float | None = None) -> None:
if now is None:
now = time.time()
for session_key in list(_PENDING_FORMS.keys()):
forms = _PENDING_FORMS[session_key]
expired_tokens = [token for token, data in forms.items() if data.get('_expires_at', 0) <= now]
for token in expired_tokens:
forms.pop(token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
def _set_pending_form(session_key: str, form_data: dict[str, typing.Any]) -> None:
_prune_pending_forms()
stored = dict(form_data)
expiration_time = stored.get('expiration_time')
try:
expiration_ts = float(expiration_time) if expiration_time is not None else 0.0
except (TypeError, ValueError):
expiration_ts = 0.0
stored['_expires_at'] = expiration_ts or (time.time() + _PENDING_FORM_DEFAULT_TTL)
form_token = str(stored.get('form_token') or '')
forms = _PENDING_FORMS.setdefault(session_key, OrderedDict())
# Re-insert at the end so this becomes the "latest" entry
forms.pop(form_token, None)
forms[form_token] = stored
def _get_pending_form_by_token(session_key: str, form_token: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not form_token:
return None
return forms.get(form_token)
def _get_pending_form_by_w_suffix(session_key: str, w_suffix: str) -> dict[str, typing.Any] | None:
"""Look up a pending form whose workflow_run_id ends with the given suffix.
Used by adapters (e.g. Telegram) whose callback payload is too small to
carry the full form_token / workflow_run_id.
"""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms or not w_suffix:
return None
for token in reversed(forms):
form = forms[token]
if str(form.get('workflow_run_id', '')).endswith(w_suffix):
return form
return None
def _get_latest_pending_form(session_key: str) -> dict[str, typing.Any] | None:
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return None
return forms[next(reversed(forms))]
def _iter_pending_forms(session_key: str) -> typing.Iterator[dict[str, typing.Any]]:
"""Iterate pending forms for a session, newest-first."""
_prune_pending_forms()
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
for token in reversed(list(forms.keys())):
yield forms[token]
def _clear_pending_form(session_key: str, form_token: str | None = None) -> None:
"""Clear one specific pending form (by token) or all forms for the session."""
forms = _PENDING_FORMS.get(session_key)
if not forms:
return
if form_token is None:
_PENDING_FORMS.pop(session_key, None)
return
forms.pop(form_token, None)
if not forms:
_PENDING_FORMS.pop(session_key, None)
@runner.runner_class('dify-service-api')
class DifyServiceAPIRunner(runner.RequestRunner):
"""Dify Service API 对话请求器"""
@@ -433,140 +335,11 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form_blocking(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""Submit human input to resume a paused Dify workflow (non-streaming)."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
inputs = form_action.get('inputs', {})
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
if chunk['event'] == 'workflow_finished':
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
yield provider_message.Message(
role='assistant',
content=content,
)
def _resolve_pending_form(self, session_key: str, form_action: dict) -> dict | None:
"""Locate the pending form this action targets.
Tries identifiers in order of specificity: form_token, full
workflow_run_id, workflow_run_id suffix (Telegram-style compact id),
then falls back to the newest pending form for the session.
"""
form_token = form_action.get('form_token')
if form_token:
form = _get_pending_form_by_token(session_key, form_token)
if form:
return form
workflow_run_id = form_action.get('workflow_run_id')
if workflow_run_id:
for form in _iter_pending_forms(session_key):
if form.get('workflow_run_id') == workflow_run_id:
return form
w_suffix = form_action.get('w_suffix')
if w_suffix:
form = _get_pending_form_by_w_suffix(session_key, w_suffix)
if form:
return form
return _get_latest_pending_form(session_key)
def _merge_pending_form_action(self, session_key: str, form_action: dict | None) -> dict | None:
"""Backfill resume fields from the matching pending form."""
if not form_action:
return None
merged_action = dict(form_action)
merged_action.pop('w_suffix', None)
pending_form = self._resolve_pending_form(session_key, form_action)
if pending_form:
merged_action['form_token'] = merged_action.get('form_token') or pending_form.get('form_token', '')
merged_action['workflow_run_id'] = merged_action.get('workflow_run_id') or pending_form.get(
'workflow_run_id', ''
)
merged_action.setdefault('inputs', pending_form.get('inputs', {}))
merged_action.setdefault('user', pending_form.get('user', ''))
merged_action.setdefault('node_title', pending_form.get('node_title', ''))
# Resolve clicked action's display title from the stored actions list
if 'action_title' not in merged_action:
clicked_id = merged_action.get('action_id', '')
for action in pending_form.get('actions', []):
if str(action.get('id', '')) == str(clicked_id):
merged_action['action_title'] = action.get('title', clicked_id)
break
return merged_action
def _match_pending_form_action(self, session_key: str, user_text: str) -> dict | None:
"""Match plain text replies against pending Dify form actions.
Iterates all pending forms newest-first; the first action whose
title/id matches the text wins. This means when multiple forms are
pending with the same button label, the most recent one resolves.
"""
normalized_text = user_text.strip().lower()
if not normalized_text:
return None
for pending_form in _iter_pending_forms(session_key):
for action in pending_form.get('actions', []):
titles = {
str(action.get('title', '')).strip().lower(),
str(action.get('id', '')).strip().lower(),
}
if normalized_text in titles:
return {
'form_token': pending_form.get('form_token', ''),
'workflow_run_id': pending_form.get('workflow_run_id', ''),
'action_id': action.get('id', ''),
'action_title': action.get('title', action.get('id', '')),
'node_title': pending_form.get('node_title', ''),
'inputs': pending_form.get('inputs', {}),
'user': pending_form.get('user', ''),
}
return None
async def _workflow_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
async for msg in self._submit_workflow_form_blocking(form_action):
yield msg
return
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -593,7 +366,6 @@ class DifyServiceAPIRunner(runner.RequestRunner):
}
inputs.update(query.variables)
human_input_yielded = False
async for chunk in self.dify_client.workflow_run(
inputs=inputs,
@@ -605,46 +377,6 @@ class DifyServiceAPIRunner(runner.RequestRunner):
if chunk['event'] in ignored_events:
continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', '')
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': reason.get('inputs', {}),
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
human_input_yielded = True
yield provider_message.Message(
role='assistant',
content=display_text,
)
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
continue
@@ -667,8 +399,6 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
elif chunk['event'] == 'workflow_finished':
if human_input_yielded:
break
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
content, _ = self._process_thinking_content(chunk['data']['outputs']['summary'])
@@ -906,153 +636,11 @@ class DifyServiceAPIRunner(runner.RequestRunner):
query.session.using_conversation.uuid = chunk['conversation_id']
async def _submit_workflow_form(
self, form_action: dict
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""Submit human input to resume a paused Dify workflow."""
form_token = form_action['form_token']
workflow_run_id = form_action['workflow_run_id']
user = form_action['user']
action_id = form_action.get('action_id', '')
action_title = form_action.get('action_title', '') or action_id
node_title = form_action.get('node_title', '')
inputs = form_action.get('inputs', {})
messsage_idx = 0
is_final = False
think_start = False
think_end = False
workflow_contents = ''
repause_form_data: dict | None = None
remove_think = self.pipeline_config['output'].get('misc', {}).get('remove-think')
async for chunk in self.dify_client.workflow_submit(
form_token=form_token,
workflow_run_id=workflow_run_id,
inputs=inputs,
user=user,
action=action_id,
timeout=120,
):
self.ap.logger.debug('dify-workflow-submit-chunk: ' + str(chunk))
yield_this_iteration = False
if chunk['event'] == 'workflow_finished':
is_final = True
yield_this_iteration = True
if chunk['data'].get('error'):
raise errors.DifyAPIError(chunk['data']['error'])
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
new_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') != 'human_input_required':
continue
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
# Use a distinct name — `node_title` (the just-resolved step)
# must keep its value so the resume notice on the previous
# card still shows which step the user acted on.
paused_node_title = reason.get('node_title', '')
raw_inputs = reason.get('inputs', {})
_set_pending_form(
user,
{
'workflow_run_id': new_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': paused_node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': user,
},
)
repause_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': paused_node_title,
'workflow_run_id': new_run_id,
'form_token': reason.get('form_token', ''),
}
# Ensure the final chunk has non-empty content so
# ResponseWrapper (which skips empty-content chunks) lets it
# propagate to SendResponseBackStage. Use a zero-width space
# so neither Lark nor Telegram renders visible noise — the
# adapter substitutes its own card text from _form_data.
if not workflow_contents:
workflow_contents = ''
is_final = True
yield_this_iteration = True
break
if chunk['event'] == 'text_chunk':
messsage_idx += 1
if remove_think:
if '<think>' in chunk['data']['text'] and not think_start:
think_start = True
continue
if '</think>' in chunk['data']['text'] and not think_end:
import re
content = re.sub(r'^\n</think>', '', chunk['data']['text'])
workflow_contents += content
think_end = True
elif think_end:
workflow_contents += chunk['data']['text']
if think_start:
continue
else:
workflow_contents += chunk['data']['text']
if messsage_idx % 8 == 0:
yield_this_iteration = True
if yield_this_iteration:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents,
is_final=is_final,
)
msg._resume_from_form = True
if action_title:
msg._resume_action_title = action_title
if node_title:
msg._resume_node_title = node_title
if is_final and repause_form_data:
msg._form_data = repause_form_data
msg._open_new_card = True
yield msg
if is_final:
return
async def _workflow_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用工作流"""
# Check if this is a form action resume (button click or text match)
form_action_raw = query.variables.get('_dify_form_action')
session_key = _session_key_from_query(query)
if form_action_raw:
form_action = self._merge_pending_form_action(session_key, form_action_raw)
else:
form_action = self._match_pending_form_action(session_key, str(query.message_chain))
if form_action:
_clear_pending_form(session_key, form_action.get('form_token') or None)
# Resume paused workflow via submit endpoint
async for msg in self._submit_workflow_form(form_action):
yield msg
return
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
@@ -1084,13 +672,6 @@ class DifyServiceAPIRunner(runner.RequestRunner):
think_start = False
think_end = False
workflow_contents = ''
workflow_run_id = ''
human_input_yielded = False
# Saved form data to attach to the final MessageChunk so the adapter
# can detect it when is_final=True and render buttons.
pending_form_data = None
display_text = ''
remove_think = self.pipeline_config['output'].get('misc', '').get('remove-think')
async for chunk in self.dify_client.workflow_run(
@@ -1101,62 +682,7 @@ class DifyServiceAPIRunner(runner.RequestRunner):
):
self.ap.logger.debug('dify-workflow-chunk: ' + str(chunk))
if chunk['event'] in ignored_events:
if chunk['event'] == 'workflow_started':
workflow_run_id = chunk['data'].get('workflow_run_id', '')
continue
if chunk['event'] == 'workflow_paused':
reasons = chunk['data'].get('reasons', [])
workflow_run_id = chunk['data'].get('workflow_run_id', workflow_run_id)
for reason in reasons:
if reason.get('TYPE') == 'human_input_required':
form_content = reason.get('form_content', '')
actions = reason.get('actions', [])
node_title = reason.get('node_title', '')
# Persist form state in module-level store keyed by session
raw_inputs = reason.get('inputs', {})
_set_pending_form(
_session_key_from_query(query),
{
'workflow_run_id': workflow_run_id,
'form_id': reason.get('form_id'),
'form_token': reason.get('form_token'),
'node_id': reason.get('node_id'),
'node_title': node_title,
'form_content': form_content,
'inputs': raw_inputs if isinstance(raw_inputs, dict) else {},
'actions': actions,
'expiration_time': reason.get('expiration_time'),
'user': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
},
)
# Pass form render metadata to downstream stages
query.variables['_dify_form_render'] = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
}
action_lines = '\n'.join(f'- [{a.get("title", a.get("id", ""))}]' for a in actions)
display_text = f'[Human Input Required] {node_title}\n{form_content}\n{action_lines}'
workflow_contents += display_text + '\n'
# Save form data to attach to the final chunk later.
# We do NOT yield here — the form content will be sent
# as the final MessageChunk (with is_final=True and
# _form_data) so the adapter can update the card and
# add buttons in one pass.
pending_form_data = {
'form_content': form_content,
'actions': actions,
'node_title': node_title,
'workflow_run_id': workflow_run_id,
'form_token': reason.get('form_token', ''),
}
human_input_yielded = True
if chunk['event'] == 'workflow_finished':
is_final = True
if chunk['data']['error']:
@@ -1204,29 +730,11 @@ class DifyServiceAPIRunner(runner.RequestRunner):
yield msg
if messsage_idx % 8 == 0 or is_final:
final_content = workflow_contents if workflow_contents.strip() else ''
msg = provider_message.MessageChunk(
yield provider_message.MessageChunk(
role='assistant',
content=final_content,
content=workflow_contents,
is_final=is_final,
)
# Attach form data to the final chunk for the adapter
if is_final and pending_form_data:
msg._form_data = pending_form_data
pending_form_data = None
yield msg
# If the stream ended after workflow_paused without a
# workflow_finished event, yield a final chunk so the adapter
# can update the card and add buttons.
if human_input_yielded and not is_final:
msg = provider_message.MessageChunk(
role='assistant',
content=workflow_contents or display_text,
is_final=True,
)
msg._form_data = pending_form_data
yield msg
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""