From 7dcc44b4fcb00d24d34f40335c9cdc29fddd482a Mon Sep 17 00:00:00 2001 From: wangcham Date: Thu, 13 Feb 2025 03:47:45 -0500 Subject: [PATCH] feat: add support for dingtalk --- libs/dingtalk_api/EchoHandler.py | 29 +++++ libs/dingtalk_api/__init__.py | 0 libs/dingtalk_api/api.py | 174 +++++++++++++++++++++++++++ libs/dingtalk_api/dingtalkevent.py | 64 ++++++++++ libs/wecom_api/api.py | 1 - pkg/platform/manager.py | 2 +- pkg/platform/sources/dingtalk.py | 182 +++++++++++++++++++++++++++++ requirements.txt | 2 +- templates/platform.json | 8 ++ 9 files changed, 459 insertions(+), 3 deletions(-) create mode 100644 libs/dingtalk_api/EchoHandler.py create mode 100644 libs/dingtalk_api/__init__.py create mode 100644 libs/dingtalk_api/api.py create mode 100644 libs/dingtalk_api/dingtalkevent.py create mode 100644 pkg/platform/sources/dingtalk.py diff --git a/libs/dingtalk_api/EchoHandler.py b/libs/dingtalk_api/EchoHandler.py new file mode 100644 index 00000000..00ff6c72 --- /dev/null +++ b/libs/dingtalk_api/EchoHandler.py @@ -0,0 +1,29 @@ +import asyncio +import dingtalk_stream +from dingtalk_stream import AckMessage + +class EchoTextHandler(dingtalk_stream.ChatbotHandler): + def __init__(self, client): + self.msg_id = '' + self.incoming_message = None + self.client = client # 用于更新 DingTalkClient 中的 incoming_message + + """处理钉钉消息""" + async def process(self, callback: dingtalk_stream.CallbackMessage): + incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + if incoming_message.message_id != self.msg_id: + self.msg_id = incoming_message.message_id + + await self.client.update_incoming_message(incoming_message) + + return AckMessage.STATUS_OK, 'OK' + + async def get_incoming_message(self): + """异步等待消息的到来""" + while self.incoming_message is None: + await asyncio.sleep(0.1) # 异步等待,避免阻塞 + return self.incoming_message + +async def get_dingtalk_client(client_id, client_secret): + from api import DingTalkClient # 延迟导入,避免循环导入 + return DingTalkClient(client_id, client_secret) diff --git a/libs/dingtalk_api/__init__.py b/libs/dingtalk_api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libs/dingtalk_api/api.py b/libs/dingtalk_api/api.py new file mode 100644 index 00000000..905557a6 --- /dev/null +++ b/libs/dingtalk_api/api.py @@ -0,0 +1,174 @@ +import base64 +import time +from typing import Callable +import dingtalk_stream +from .EchoHandler import EchoTextHandler +from .dingtalkevent import DingTalkEvent +import httpx +import traceback + + +class DingTalkClient: + def __init__(self, client_id: str, client_secret: str,robot_name:str,robot_code:str): + """初始化 WebSocket 连接并自动启动""" + self.credential = dingtalk_stream.Credential(client_id, client_secret) + self.client = dingtalk_stream.DingTalkStreamClient(self.credential) + self.key = client_id + self.secret = client_secret + # 在 DingTalkClient 中传入自己作为参数,避免循环导入 + self.EchoTextHandler = EchoTextHandler(self) + self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler) + self._message_handlers = { + "example":[], + } + self.access_token = '' + self.robot_name = robot_name + self.robot_code = robot_code + self.access_token_expiry_time = '' + + + + async def get_access_token(self): + url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" + headers = { + "Content-Type": "application/json" + } + data = { + "appKey": self.key, + "appSecret": self.secret + } + async with httpx.AsyncClient() as client: + try: + response = await client.post(url,json=data,headers=headers) + if response.status_code == 200: + response_data = response.json() + self.access_token = response_data.get("accessToken") + expires_in = int(response_data.get("expireIn",7200)) + self.access_token_expiry_time = time.time() + expires_in - 60 + except Exception as e: + raise Exception(e) + + + async def is_token_expired(self): + """检查token是否过期""" + if self.access_token_expiry_time is None: + return True + return time.time() > self.access_token_expiry_time + + async def check_access_token(self): + if not self.access_token or await self.is_token_expired(): + return False + return bool(self.access_token and self.access_token.strip()) + + async def download_image(self,download_code:str): + if not await self.check_access_token(): + await self.get_access_token() + url = 'https://api.dingtalk.com/v1.0/robot/messageFiles/download' + params = { + "downloadCode":download_code, + "robotCode":self.robot_code + } + headers ={ + "x-acs-dingtalk-access-token": self.access_token + } + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, json=params) + if response.status_code == 200: + result = response.json() + download_url = result.get("downloadUrl") + else: + raise Exception(f"Error: {response.status_code}, {response.text}") + + if download_url: + return await self.download_url_to_base64(download_url) + + async def download_url_to_base64(self,download_url): + async with httpx.AsyncClient() as client: + response = await client.get(download_url) + + if response.status_code == 200: + + file_bytes = response.content + base64_str = base64.b64encode(file_bytes).decode('utf-8') # 返回字符串格式 + return base64_str + else: + raise Exception("获取图片失败") + + async def update_incoming_message(self, message): + """异步更新 DingTalkClient 中的 incoming_message""" + message_data = await self.get_message(message) + if message_data: + event = DingTalkEvent.from_payload(message_data) + if event: + await self._handle_message(event) + + + async def send_message(self,content:str,incoming_message): + self.EchoTextHandler.reply_text(content,incoming_message) + + + async def get_incoming_message(self): + """获取收到的消息""" + return await self.EchoTextHandler.get_incoming_message() + + + + def on_message(self, msg_type: str): + def decorator(func: Callable[[DingTalkEvent], None]): + if msg_type not in self._message_handlers: + self._message_handlers[msg_type] = [] + self._message_handlers[msg_type].append(func) + return func + return decorator + + async def _handle_message(self, event: DingTalkEvent): + """ + 处理消息事件。 + """ + msg_type = event.conversation + if msg_type in self._message_handlers: + for handler in self._message_handlers[msg_type]: + await handler(event) + + + async def get_message(self,incoming_message:dingtalk_stream.chatbot.ChatbotMessage): + try: + message_data = { + "IncomingMessage":incoming_message, + } + if str(incoming_message.conversation_type) == '1': + message_data["conversation_type"] = 'FriendMessage' + elif str(incoming_message.conversation_type) == '2': + message_data["conversation_type"] = 'GroupMessage' + + + if incoming_message.message_type == 'richText': + + data = incoming_message.rich_text_content.to_dict() + for item in data['richText']: + if 'text' in item: + message_data["Content"] = item['text'] + if incoming_message.get_image_list()[0]: + message_data["Picture"] = await self.download_image(incoming_message.get_image_list()[0]) + message_data["Type"] = 'text' + + elif incoming_message.message_type == 'text': + message_data['Content'] = incoming_message.get_text_list()[0] + + message_data["Type"] = 'text' + elif incoming_message.message_type == 'picture': + message_data['Picture'] = await self.download_image(incoming_message.get_image_list()[0]) + + message_data['Type'] = 'image' + + # 删掉开头的@消息 + if message_data["Content"].startswith("@"+self.robot_name): + message_data["Content"][len("@"+self.robot_name):] + except Exception: + traceback.print_exc() + + return message_data + + async def start(self): + """启动 WebSocket 连接,监听消息""" + await self.client.start() diff --git a/libs/dingtalk_api/dingtalkevent.py b/libs/dingtalk_api/dingtalkevent.py new file mode 100644 index 00000000..8f6bdfc1 --- /dev/null +++ b/libs/dingtalk_api/dingtalkevent.py @@ -0,0 +1,64 @@ +from typing import Dict, Any, Optional + +class DingTalkEvent(dict): + @staticmethod + def from_payload(payload: Dict[str, Any]) -> Optional["DingTalkEvent"]: + try: + event = DingTalkEvent(payload) + return event + except KeyError: + return None + + + @property + def content(self): + return self.get("Content","") + + @property + def incoming_message(self): + return self.get("IncomingMessage") + + @property + def type(self): + return self.get("Type","") + + @property + def picture(self): + return self.get("Picture","") + + @property + def conversation(self): + return self.get("conversation_type","") + + + + def __getattr__(self, key: str) -> Optional[Any]: + """ + 允许通过属性访问数据中的任意字段。 + + Args: + key (str): 字段名。 + + Returns: + Optional[Any]: 字段值。 + """ + return self.get(key) + + def __setattr__(self, key: str, value: Any) -> None: + """ + 允许通过属性设置数据中的任意字段。 + + Args: + key (str): 字段名。 + value (Any): 字段值。 + """ + self[key] = value + + def __repr__(self) -> str: + """ + 生成事件对象的字符串表示。 + + Returns: + str: 字符串表示。 + """ + return f"" \ No newline at end of file diff --git a/libs/wecom_api/api.py b/libs/wecom_api/api.py index 8aecca04..d0068be7 100644 --- a/libs/wecom_api/api.py +++ b/libs/wecom_api/api.py @@ -129,7 +129,6 @@ class WecomClient(): self.access_token = await self.get_access_token(self.secret) url = self.base_url+'/message/send?access_token='+self.access_token - async with httpx.AsyncClient() as client: params={ "touser" : user_id, diff --git a/pkg/platform/manager.py b/pkg/platform/manager.py index c70417d2..75d6f789 100644 --- a/pkg/platform/manager.py +++ b/pkg/platform/manager.py @@ -39,7 +39,7 @@ class PlatformManager: async def initialize(self): - from .sources import nakuru, aiocqhttp, qqbotpy, qqofficial, wecom, lark, discord, gewechat, officialaccount + from .sources import nakuru, aiocqhttp, qqbotpy, qqofficial, wecom, lark, discord, gewechat, officialaccount,dingtalk async def on_friend_message(event: platform_events.FriendMessage, adapter: msadapter.MessageSourceAdapter): diff --git a/pkg/platform/sources/dingtalk.py b/pkg/platform/sources/dingtalk.py new file mode 100644 index 00000000..b70489df --- /dev/null +++ b/pkg/platform/sources/dingtalk.py @@ -0,0 +1,182 @@ + +import traceback +import typing +from libs.dingtalk_api.dingtalkevent import DingTalkEvent +from pkg.platform.types import message as platform_message +from pkg.platform.adapter import MessageSourceAdapter +from pkg.platform.types import events as platform_events, message as platform_message +from pkg.core import app +from .. import adapter +from ...pipeline.longtext.strategies import forward +from ...core import app +from ..types import message as platform_message +from ..types import events as platform_events +from ..types import entities as platform_entities +from ...command.errors import ParamNotEnoughError +from libs.dingtalk_api.api import DingTalkClient +import datetime + + +class DingTalkMessageConverter(adapter.MessageConverter): + + @staticmethod + async def yiri2target( + message_chain:platform_message.MessageChain + ): + for msg in message_chain: + if type(msg) is platform_message.Plain: + return msg.text + + @staticmethod + async def target2yiri(event:DingTalkEvent): + yiri_msg_list = [] + yiri_msg_list.append( + platform_message.Source(id = '0',time=datetime.datetime.now()) + ) + + if event.content: + yiri_msg_list.append(platform_message.Plain(text=event.content)) + if event.picture: + yiri_msg_list.append(platform_message.Image(base64=event.picture)) + + chain = platform_message.MessageChain(yiri_msg_list) + + return chain + + +class DingTalkEventConverter(adapter.EventConverter): + + @staticmethod + async def yiri2target( + event:platform_events.MessageEvent + ): + return event.source_platform_object + + @staticmethod + async def target2yiri( + event:DingTalkEvent + ): + + message_chain = await DingTalkMessageConverter.target2yiri(event) + + + if event.conversation == 'FriendMessage': + + return platform_events.FriendMessage( + sender=platform_entities.Friend( + id= 0, + nickname ='nickname', + remark="" + ), + message_chain = message_chain, + time = datetime.datetime.now(), + source_platform_object=event, + ) + elif event.conversation == 'GroupMessage': + message_chain.insert(0, platform_message.At(target="justbot")) + sender = platform_entities.GroupMember( + id = 111, + member_name="name", + permission= 'MEMBER', + group = platform_entities.Group( + id = 111, + name = 'MEMBER', + permission=platform_entities.Permission.Member + ), + special_title='', + join_timestamp=0, + last_speak_timestamp=0, + mute_time_remaining=0 + ) + time = datetime.datetime.now(), + return platform_events.GroupMessage( + sender =sender, + message_chain = message_chain, + time = time, + source_platform_object=event + ) + +@adapter.adapter_class("dingtalk") +class DingTalkAdapter(adapter.MessageSourceAdapter): + bot: DingTalkClient + ap: app.Application + bot_account_id: str + message_converter: DingTalkMessageConverter = DingTalkMessageConverter() + event_converter: DingTalkEventConverter = DingTalkEventConverter() + config: dict + + def __init__(self,config:dict,ap:app.Application): + self.config = config + self.ap = ap + required_keys = [ + "client_id", + "client_secret", + "robot_name", + "robot_code", + ] + missing_keys = [key for key in required_keys if key not in config] + if missing_keys: + raise ParamNotEnoughError("钉钉缺少相关配置项,请查看文档或联系管理员") + + self.bot = DingTalkClient( + client_id=config["client_id"], + client_secret=config["client_secret"], + robot_name = config["robot_name"], + robot_code=config["robot_code"] + ) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ): + event = await DingTalkEventConverter.yiri2target( + message_source, + ) + incoming_message = event.incoming_message + + content = await DingTalkMessageConverter.yiri2target(message) + await self.bot.send_message(content,incoming_message) + + + async def send_message( + self, target_type: str, target_id: str, message: platform_message.MessageChain + ): + pass + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, adapter.MessageSourceAdapter], None + ], + ): + async def on_message(event: DingTalkEvent): + self.bot_account_id = 'justbot' + try: + return await callback( + await self.event_converter.target2yiri(event), self + ) + except: + traceback.print_exc() + + if event_type == platform_events.FriendMessage: + self.bot.on_message("FriendMessage")(on_message) + elif event_type == platform_events.GroupMessage: + self.bot.on_message("GroupMessage")(on_message) + + async def run_async(self): + + await self.bot.start() + + async def kill(self) -> bool: + return False + + async def unregister_listener( + self, + event_type: type, + callback: typing.Callable[[platform_events.Event, MessageSourceAdapter], None], + ): + return super().unregister_listener(event_type, callback) + diff --git a/requirements.txt b/requirements.txt index 27349f64..b6a3f066 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,6 @@ lark-oapi discord.py cryptography gewechat-client - +dingtalk_stream # indirect taskgroup==0.0.0a4 \ No newline at end of file diff --git a/templates/platform.json b/templates/platform.json index 5a1fca77..2efdf521 100644 --- a/templates/platform.json +++ b/templates/platform.json @@ -75,6 +75,14 @@ "AppID":"", "host": "0.0.0.0", "port": 2287 + }, + { + "adapter":"dingtalk", + "enable": false, + "client_id":"", + "client_secret":"", + "robot_code":"", + "robot_name":"" } ], "track-function-calls": true,