feat(platform): add dingtalk eba adapter

This commit is contained in:
Junyan Qin
2026-05-10 19:52:36 +08:00
parent 3ed35593e9
commit 950da65797
18 changed files with 1256 additions and 151 deletions

View File

@@ -438,8 +438,13 @@ class DingTalkClient:
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
try:
body = response.json()
except Exception:
body = {'text': response.text}
if response.status_code == 200:
return
return body
raise Exception(f'Error: {response.status_code}, {body}')
except Exception:
await self.logger.error(f'failed to send proactive massage to person: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to person: {traceback.format_exc()}')
@@ -464,8 +469,13 @@ class DingTalkClient:
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=data)
try:
body = response.json()
except Exception:
body = {'text': response.text}
if response.status_code == 200:
return
return body
raise Exception(f'Error: {response.status_code}, {body}')
except Exception:
await self.logger.error(f'failed to send proactive massage to group: {traceback.format_exc()}')
raise Exception(f'failed to send proactive massage to group: {traceback.format_exc()}')

View File

@@ -0,0 +1 @@
"""DingTalk EBA platform adapter."""

View File

@@ -0,0 +1,235 @@
from __future__ import annotations
import traceback
import typing
import pydantic
from langbot.libs.dingtalk_api.api import DingTalkClient
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
from langbot.pkg.platform.adapters.dingtalk.api_impl import DingTalkAPIMixin
from langbot.pkg.platform.adapters.dingtalk.event_converter import DingTalkEventConverter
from langbot.pkg.platform.adapters.dingtalk.message_converter import DingTalkMessageConverter
from langbot.pkg.platform.adapters.dingtalk.platform_api import PLATFORM_API_MAP
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class DingTalkAdapter(DingTalkAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter):
bot: DingTalkClient = pydantic.Field(exclude=True)
message_converter: DingTalkMessageConverter = DingTalkMessageConverter()
event_converter: DingTalkEventConverter = DingTalkEventConverter()
config: dict
listeners: dict[
typing.Type[platform_events.Event],
typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
] = {}
card_instance_id_dict: dict = {}
_message_cache: dict[str, platform_events.MessageReceivedEvent] = {}
_user_cache: dict[str, platform_entities.User] = {}
_group_cache: dict[str, platform_entities.UserGroup] = {}
class Config:
arbitrary_types_allowed = True
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger):
required_keys = ['client_id', 'client_secret', 'robot_name', 'robot_code']
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
raise Exception('钉钉缺少相关配置项,请查看文档或联系管理员')
bot = DingTalkClient(
client_id=config['client_id'],
client_secret=config['client_secret'],
robot_name=config['robot_name'],
robot_code=config['robot_code'],
markdown_card=config.get('markdown_card', True),
logger=logger,
)
super().__init__(
config=config,
logger=logger,
card_instance_id_dict={},
bot_account_id=config['robot_name'],
bot=bot,
listeners={},
_message_cache={},
_user_cache={},
_group_cache={},
)
self._register_native_handlers()
def get_supported_events(self) -> list[str]:
return [
'message.received',
'platform.specific',
]
def get_supported_apis(self) -> list[str]:
return [
'send_message',
'reply_message',
'get_message',
'get_group_info',
'get_group_list',
'get_group_member_info',
'get_user_info',
'get_friend_list',
'get_file_url',
'call_platform_api',
]
async def send_message(
self,
target_type: str,
target_id: str,
message: platform_message.MessageChain,
) -> platform_events.MessageResult:
markdown_enabled = self.config.get('markdown_card', False)
content, _ = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
if target_type in ('person', 'private'):
raw = await self.bot.send_proactive_message_to_one(target_id, content)
elif target_type == 'group':
raw = await self.bot.send_proactive_message_to_group(target_id, content)
else:
raise ValueError(f'Unsupported dingtalk target_type: {target_type}')
return platform_events.MessageResult(raw=raw if isinstance(raw, dict) else {'result': raw})
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
) -> platform_events.MessageResult:
assert isinstance(message_source.source_platform_object, DingTalkEvent)
incoming_message = message_source.source_platform_object.incoming_message
markdown_enabled = self.config.get('markdown_card', False)
content, at = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
raw = await self.bot.send_message(content, incoming_message, at)
return platform_events.MessageResult(
message_id=getattr(incoming_message, 'message_id', None),
raw=raw if isinstance(raw, dict) else {'result': raw},
)
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
quote_origin: bool = False,
is_final: bool = False,
):
message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence
if (msg_seq - 1) % 8 != 0 and not is_final:
return
markdown_enabled = self.config.get('markdown_card', False)
content, _ = await DingTalkMessageConverter.yiri2target(message, markdown_enabled)
card_instance, card_instance_id = self.card_instance_id_dict[message_id]
if not content and bot_message.content:
content = bot_message.content
if content:
await self.bot.send_card_message(card_instance, card_instance_id, content, is_final)
if is_final and bot_message.tool_calls is None:
self.card_instance_id_dict.pop(message_id)
async def create_message_card(self, message_id, event):
card_template_id = self.config['card_template_id']
incoming_message = event.source_platform_object.incoming_message
card_auto_layout = self.config.get('card_auto_layout', False)
card_instance, card_instance_id = await self.bot.create_and_card(
card_template_id,
incoming_message,
card_auto_layout=card_auto_layout,
)
self.card_instance_id_dict[message_id] = (card_instance, card_instance_id)
return True
async def is_stream_output_supported(self) -> bool:
return bool(self.config.get('enable-stream-reply', False))
async def call_platform_api(self, action: str, params: dict = {}) -> dict:
handler = PLATFORM_API_MAP.get(action)
if handler is None:
raise NotSupportedError(f'call_platform_api:{action}')
return await handler(self.bot, params)
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
self.listeners[event_type] = callback
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
registered = self.listeners.get(event_type)
if registered is callback:
self.listeners.pop(event_type, None)
async def run_async(self):
await self.logger.info('DingTalk EBA adapter starting')
await self.bot.start()
async def kill(self) -> bool:
await self.bot.stop()
return True
async def is_muted(self, group_id: int | None = None) -> bool:
return False
def _register_native_handlers(self):
async def on_message(event: DingTalkEvent):
await self._handle_native_event(event)
self.bot.on_message('FriendMessage')(on_message)
self.bot.on_message('GroupMessage')(on_message)
async def _handle_native_event(self, event: DingTalkEvent):
try:
await self.logger.debug(
'DingTalk EBA event received: '
f'conversation={event.conversation}, message_id={getattr(event.incoming_message, "message_id", None)}'
)
if platform_events.FriendMessage in self.listeners or platform_events.GroupMessage in self.listeners:
legacy_event = await self.event_converter.target2legacy(event, self.config['robot_name'])
if legacy_event:
callback = self.listeners.get(type(legacy_event))
if callback:
await callback(legacy_event, self)
eba_event = await self.event_converter.target2yiri(event, self.config['robot_name'])
if eba_event:
self._cache_event(eba_event)
await self._dispatch_eba_event(eba_event)
except Exception:
await self.logger.error(f'Error in dingtalk native event: {traceback.format_exc()}')
async def _dispatch_eba_event(self, event: platform_events.EBAEvent):
for event_type in (type(event), platform_events.EBAEvent, platform_events.Event):
callback = self.listeners.get(event_type)
if callback:
await callback(event, self)
return
def _cache_event(self, event: platform_events.Event):
if not isinstance(event, platform_events.MessageReceivedEvent):
return
self._message_cache[str(event.message_id)] = event
self._user_cache[str(event.sender.id)] = event.sender
if event.group:
self._group_cache[str(event.group.id)] = event.group

View File

@@ -0,0 +1,65 @@
from __future__ import annotations
import typing
from langbot.libs.dingtalk_api.api import DingTalkClient
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class DingTalkAPIMixin:
bot: DingTalkClient
_message_cache: dict[str, platform_events.MessageReceivedEvent]
_user_cache: dict[str, platform_entities.User]
_group_cache: dict[str, platform_entities.UserGroup]
async def get_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> platform_events.MessageReceivedEvent:
event = self._message_cache.get(str(message_id))
if event is None:
raise NotSupportedError('get_message:message_not_cached')
return event
async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup:
return self._group_cache.get(str(group_id)) or platform_entities.UserGroup(id=group_id, name='')
async def get_group_list(self) -> list[platform_entities.UserGroup]:
return list(self._group_cache.values())
async def get_group_member_list(
self,
group_id: typing.Union[int, str],
) -> list[platform_entities.UserGroupMember]:
raise NotSupportedError('get_group_member_list')
async def get_group_member_info(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> platform_entities.UserGroupMember:
user = self._user_cache.get(str(user_id))
if user is None:
raise NotSupportedError('get_group_member_info:user_not_cached')
return platform_entities.UserGroupMember(
user=user,
group_id=group_id,
role=platform_entities.MemberRole.MEMBER,
display_name=user.nickname,
)
async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User:
return self._user_cache.get(str(user_id)) or platform_entities.User(id=user_id, nickname='')
async def get_friend_list(self) -> list[platform_entities.User]:
return list(self._user_cache.values())
async def upload_file(self, file_data: bytes, filename: str) -> str:
raise NotSupportedError('upload_file')
async def get_file_url(self, file_id: str) -> str:
return await self.bot.get_file_url(file_id)

View File

@@ -0,0 +1,7 @@
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<!-- Uploaded to: SVG Repo, www.svgrepo.com, Transformed by: SVG Repo Mixer Tools -->
<svg fill="#4aa4f8" width="800px" height="800px" viewBox="0 0 1024 1024" xmlns="http://www.w3.org/2000/svg" class="icon" stroke="#4aa4f8">
<g id="SVGRepo_bgCarrier" stroke-width="0"/>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import typing
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot.pkg.platform.adapters.dingtalk.message_converter import DingTalkMessageConverter
from langbot.pkg.platform.adapters.dingtalk.types import ADAPTER_NAME
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
class DingTalkEventConverter(abstract_platform_adapter.AbstractEventConverter):
@staticmethod
async def yiri2target(event: platform_events.Event):
return getattr(event, 'source_platform_object', None)
@staticmethod
async def target2yiri(event: DingTalkEvent, bot_name: str) -> platform_events.Event | None:
if event.conversation in {'FriendMessage', 'GroupMessage'}:
return await DingTalkEventConverter.message_to_eba(event, bot_name)
return DingTalkEventConverter.platform_specific(event, f'message.{event.conversation or "unknown"}')
@staticmethod
async def target2legacy(
event: DingTalkEvent,
bot_name: str,
) -> platform_events.FriendMessage | platform_events.GroupMessage | None:
eba_event = await DingTalkEventConverter.message_to_eba(event, bot_name)
if eba_event:
return eba_event.to_legacy_event()
return None
@staticmethod
async def message_to_eba(event: DingTalkEvent, bot_name: str) -> platform_events.MessageReceivedEvent:
incoming_message = event.incoming_message
message_chain = await DingTalkMessageConverter.target2yiri(event, bot_name)
sender = DingTalkEventConverter.user_from_event(event)
chat_type = platform_entities.ChatType.PRIVATE
chat_id = getattr(incoming_message, 'sender_staff_id', '')
group = None
if event.conversation == 'GroupMessage':
chat_type = platform_entities.ChatType.GROUP
chat_id = getattr(incoming_message, 'conversation_id', '')
group = DingTalkEventConverter.group_from_event(event)
return platform_events.MessageReceivedEvent(
type='message.received',
adapter_name=ADAPTER_NAME,
message_id=getattr(incoming_message, 'message_id', ''),
message_chain=message_chain,
sender=sender,
chat_type=chat_type,
chat_id=chat_id,
group=group,
timestamp=DingTalkEventConverter._timestamp(incoming_message),
source_platform_object=event,
)
@staticmethod
def user_from_event(event: DingTalkEvent) -> platform_entities.User:
incoming_message = event.incoming_message
return platform_entities.User(
id=getattr(incoming_message, 'sender_staff_id', ''),
nickname=getattr(incoming_message, 'sender_nick', '') or '',
)
@staticmethod
def group_from_event(event: DingTalkEvent) -> platform_entities.UserGroup:
incoming_message = event.incoming_message
return platform_entities.UserGroup(
id=getattr(incoming_message, 'conversation_id', ''),
name=getattr(incoming_message, 'conversation_title', '') or '',
)
@staticmethod
def platform_specific(event: DingTalkEvent, action: str) -> platform_events.PlatformSpecificEvent:
return platform_events.PlatformSpecificEvent(
type='platform.specific',
adapter_name=ADAPTER_NAME,
action=action,
data={
key: value for key, value in dict(event).items() if key not in {'IncomingMessage', 'Picture', 'Audio'}
},
timestamp=DingTalkEventConverter._timestamp(event.incoming_message),
source_platform_object=event,
)
@staticmethod
def _timestamp(incoming_message: typing.Any) -> float:
value = getattr(incoming_message, 'create_at', None)
if isinstance(value, (int, float)):
timestamp = float(value)
return timestamp / 1000 if timestamp > 10_000_000_000 else timestamp
if hasattr(value, 'timestamp'):
return float(value.timestamp())
return 0.0

View File

@@ -0,0 +1,126 @@
apiVersion: v1
kind: MessagePlatformAdapter
metadata:
name: dingtalk-eba
label:
en_US: DingTalk (EBA)
zh_Hans: 钉钉 (EBA)
zh_Hant: 釘釘 (EBA)
description:
en_US: DingTalk adapter (EBA architecture)
zh_Hans: 钉钉适配器EBA 架构版本)
zh_Hant: 釘釘適配器EBA 架構版本)
icon: dingtalk.svg
spec:
categories:
- china
help_links:
zh: https://link.langbot.app/zh/platforms/dingtalk
en: https://link.langbot.app/en/platforms/dingtalk
ja: https://link.langbot.app/ja/platforms/dingtalk
config:
- name: client_id
label:
en_US: Client ID
zh_Hans: 客户端ID
zh_Hant: 用戶端ID
type: string
required: true
default: ""
- name: client_secret
label:
en_US: Client Secret
zh_Hans: 客户端密钥
zh_Hant: 用戶端密鑰
type: string
required: true
default: ""
- name: robot_code
label:
en_US: Robot Code
zh_Hans: 机器人代码
zh_Hant: 機器人代碼
type: string
required: true
default: ""
- name: robot_name
label:
en_US: Robot Name
zh_Hans: 机器人名称
zh_Hant: 機器人名稱
type: string
required: true
default: ""
- name: markdown_card
label:
en_US: Markdown Card
zh_Hans: 是否使用 Markdown 卡片
zh_Hant: 是否使用 Markdown 卡片
type: boolean
required: false
default: true
- name: enable-stream-reply
label:
en_US: Enable Stream Reply Mode
zh_Hans: 启用钉钉卡片流式回复模式
zh_Hant: 啟用釘釘卡片串流回覆模式
description:
en_US: If enabled, the bot will use DingTalk card streaming replies.
zh_Hans: 如果启用,将使用钉钉卡片流式方式来回复内容
zh_Hant: 如果啟用,將使用釘釘卡片串流方式來回覆內容
type: boolean
required: true
default: false
- name: card_auto_layout
label:
en_US: Card Auto Layout
zh_Hans: 卡片宽屏自动布局
zh_Hant: 卡片寬螢幕自動佈局
type: boolean
required: false
default: false
- name: card_template_id
label:
en_US: Card Template ID
zh_Hans: 卡片模板ID
zh_Hant: 卡片範本ID
type: string
required: true
default: "填写你的卡片template_id"
supported_events:
- message.received
- platform.specific
supported_apis:
required:
- send_message
- reply_message
optional:
- get_message
- get_group_info
- get_group_list
- get_group_member_info
- get_user_info
- get_friend_list
- get_file_url
- call_platform_api
platform_specific_apis:
- action: check_access_token
description: { en_US: "Check whether the current DingTalk access token is usable", zh_Hans: "检查当前钉钉 access token 是否可用" }
- action: refresh_access_token
description: { en_US: "Refresh the DingTalk access token", zh_Hans: "刷新钉钉 access token" }
- action: get_file_url
description: { en_US: "Resolve a DingTalk download code to a file URL", zh_Hans: "将钉钉 downloadCode 解析为文件 URL" }
- action: get_audio_base64
description: { en_US: "Download DingTalk audio as base64 by download code", zh_Hans: "通过 downloadCode 下载钉钉语音并转为 base64" }
- action: download_image_base64
description: { en_US: "Download DingTalk image as base64 by download code", zh_Hans: "通过 downloadCode 下载钉钉图片并转为 base64" }
execution:
python:
path: ./adapter.py
attr: DingTalkAdapter

View File

@@ -0,0 +1,177 @@
from __future__ import annotations
import datetime
import typing
from langbot.libs.dingtalk_api.dingtalkevent import DingTalkEvent
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot_plugin.api.entities.builtin.platform import message as platform_message
class DingTalkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
def _format_image_as_markdown(msg: platform_message.Image) -> str:
if msg.url:
return f'\n![image]({msg.url})\n'
if msg.base64:
if msg.base64.startswith('data:'):
return f'\n![image]({msg.base64})\n'
return f'\n![image](data:image/png;base64,{msg.base64})\n'
return ''
@staticmethod
def _component_text_fallback(component: platform_message.MessageComponent) -> str:
if isinstance(component, platform_message.At):
return f'@{component.display or component.target}'
if isinstance(component, platform_message.AtAll):
return '@所有人'
if isinstance(component, platform_message.File):
if component.url:
return f'\n[{component.name or "file"}]({component.url})\n'
return f'\n[File]{component.name or component.id or "file"}\n'
if isinstance(component, platform_message.Voice):
return component.url or '[Voice]'
if isinstance(component, platform_message.Face):
return str(component)
if isinstance(component, platform_message.Unknown):
return component.text
return str(component)
@staticmethod
async def yiri2target(
message_chain: platform_message.MessageChain,
markdown_enabled: bool = True,
) -> tuple[str, bool]:
content = ''
at = False
for msg in message_chain:
if isinstance(msg, platform_message.Source):
continue
if isinstance(msg, platform_message.Plain):
content += msg.text
elif isinstance(msg, platform_message.At):
at = True
content += DingTalkMessageConverter._component_text_fallback(msg)
elif isinstance(msg, platform_message.AtAll):
content += DingTalkMessageConverter._component_text_fallback(msg)
elif isinstance(msg, platform_message.Image):
if markdown_enabled:
content += DingTalkMessageConverter._format_image_as_markdown(msg)
else:
content += '[Image]'
elif isinstance(msg, platform_message.File):
content += DingTalkMessageConverter._component_text_fallback(msg)
elif isinstance(msg, platform_message.Voice):
content += DingTalkMessageConverter._component_text_fallback(msg)
elif isinstance(msg, platform_message.Quote):
if msg.id is not None:
content += f'[引用消息 {msg.id}] '
if msg.origin:
quote_content, quote_at = await DingTalkMessageConverter.yiri2target(msg.origin, markdown_enabled)
content += quote_content
at = at or quote_at
elif isinstance(msg, platform_message.Forward):
for node in msg.node_list:
sender = node.sender_name or node.sender_id or ''
if sender:
content += f'\n[{sender}] '
if node.message_chain:
forwarded_content, forwarded_at = await DingTalkMessageConverter.yiri2target(
node.message_chain, markdown_enabled
)
content += forwarded_content
at = at or forwarded_at
else:
content += DingTalkMessageConverter._component_text_fallback(msg)
return content, at
@staticmethod
async def target2yiri(event: DingTalkEvent, bot_name: str) -> platform_message.MessageChain:
incoming_message = event.incoming_message
components: list[platform_message.MessageComponent] = [
platform_message.Source(
id=getattr(incoming_message, 'message_id', ''),
time=DingTalkMessageConverter._message_time(incoming_message),
)
]
for at_user in getattr(incoming_message, 'at_users', []) or []:
if getattr(at_user, 'dingtalk_id', None) == getattr(incoming_message, 'chatbot_user_id', None):
components.append(platform_message.At(target=bot_name, display=bot_name))
rich_content = event.rich_content
if rich_content:
for element in rich_content.get('Elements') or []:
if element.get('Type') == 'text':
text = DingTalkMessageConverter._strip_bot_mention(element.get('Content', ''), bot_name)
if text.strip():
components.append(platform_message.Plain(text=text))
elif element.get('Type') == 'image' and element.get('Picture'):
components.append(platform_message.Image(base64=element['Picture']))
else:
if event.content and event.type != 'audio':
components.append(
platform_message.Plain(
text=DingTalkMessageConverter._strip_bot_mention(event.content, bot_name),
)
)
if event.picture:
components.append(platform_message.Image(base64=event.picture))
if event.file:
components.append(platform_message.File(url=event.file, name=event.name or 'file'))
if event.audio:
if event.content and event.type == 'audio':
components.append(platform_message.Plain(text=event.content))
else:
components.append(platform_message.Voice(base64=event.audio))
quote = DingTalkMessageConverter._quote_component(event)
if quote:
components.append(quote)
return platform_message.MessageChain(components)
@staticmethod
def _quote_component(event: DingTalkEvent) -> platform_message.Quote | None:
quote_info = event.quoted_message
if not quote_info:
return None
origin_components: list[platform_message.MessageComponent] = []
msg_type = quote_info.get('msg_type', '')
if msg_type == 'file' and quote_info.get('file_url'):
origin_components.append(
platform_message.File(url=quote_info['file_url'], name=quote_info.get('file_name', 'file'))
)
elif msg_type == 'picture' and quote_info.get('picture'):
origin_components.append(platform_message.Image(base64=quote_info['picture']))
elif msg_type == 'audio' and quote_info.get('audio'):
origin_components.append(platform_message.Voice(base64=quote_info['audio']))
elif quote_info.get('content'):
origin_components.append(platform_message.Plain(text=str(quote_info['content'])))
incoming_message = event.incoming_message
return platform_message.Quote(
id=quote_info.get('message_id') or None,
group_id=getattr(incoming_message, 'conversation_id', None),
sender_id=quote_info.get('sender_id') or None,
target_id=getattr(incoming_message, 'conversation_id', None)
or getattr(incoming_message, 'sender_staff_id', None),
origin=platform_message.MessageChain(origin_components),
)
@staticmethod
def _strip_bot_mention(text: str, bot_name: str) -> str:
return text.replace('@' + bot_name, '')
@staticmethod
def _message_time(incoming_message: typing.Any) -> datetime.datetime:
value = getattr(incoming_message, 'create_at', None)
if isinstance(value, datetime.datetime):
return value
if isinstance(value, (int, float)):
timestamp = float(value)
if timestamp > 10_000_000_000:
timestamp = timestamp / 1000
return datetime.datetime.fromtimestamp(timestamp)
return datetime.datetime.now()

View File

@@ -0,0 +1,44 @@
from __future__ import annotations
import typing
from langbot.libs.dingtalk_api.api import DingTalkClient
async def check_access_token(bot: DingTalkClient, params: dict) -> dict:
return {'valid': await bot.check_access_token()}
async def refresh_access_token(bot: DingTalkClient, params: dict) -> dict:
await bot.get_access_token()
return {'ok': bool(bot.access_token)}
async def get_file_url(bot: DingTalkClient, params: dict) -> dict:
download_code = params.get('download_code') or params.get('downloadCode') or params.get('file_id')
if not download_code:
raise ValueError('download_code is required')
return {'url': await bot.get_file_url(str(download_code))}
async def get_audio_base64(bot: DingTalkClient, params: dict) -> dict:
download_code = params.get('download_code') or params.get('downloadCode') or params.get('file_id')
if not download_code:
raise ValueError('download_code is required')
return {'base64': await bot.get_audio_url(str(download_code))}
async def download_image_base64(bot: DingTalkClient, params: dict) -> dict:
download_code = params.get('download_code') or params.get('downloadCode') or params.get('file_id')
if not download_code:
raise ValueError('download_code is required')
return {'base64': await bot.download_image(str(download_code))}
PLATFORM_API_MAP: dict[str, typing.Callable[[DingTalkClient, dict], typing.Awaitable[dict]]] = {
'check_access_token': check_access_token,
'refresh_access_token': refresh_access_token,
'get_file_url': get_file_url,
'get_audio_base64': get_audio_base64,
'download_image_base64': download_image_base64,
}

View File

@@ -0,0 +1,3 @@
from __future__ import annotations
ADAPTER_NAME = 'dingtalk'