diff --git a/libs/dingtalk_api/EchoHandler.py b/libs/dingtalk_api/EchoHandler.py new file mode 100644 index 00000000..00ff6c72 --- /dev/null +++ b/libs/dingtalk_api/EchoHandler.py @@ -0,0 +1,29 @@ +import asyncio +import dingtalk_stream +from dingtalk_stream import AckMessage + +class EchoTextHandler(dingtalk_stream.ChatbotHandler): + def __init__(self, client): + self.msg_id = '' + self.incoming_message = None + self.client = client # 用于更新 DingTalkClient 中的 incoming_message + + """处理钉钉消息""" + async def process(self, callback: dingtalk_stream.CallbackMessage): + incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) + if incoming_message.message_id != self.msg_id: + self.msg_id = incoming_message.message_id + + await self.client.update_incoming_message(incoming_message) + + return AckMessage.STATUS_OK, 'OK' + + async def get_incoming_message(self): + """异步等待消息的到来""" + while self.incoming_message is None: + await asyncio.sleep(0.1) # 异步等待,避免阻塞 + return self.incoming_message + +async def get_dingtalk_client(client_id, client_secret): + from api import DingTalkClient # 延迟导入,避免循环导入 + return DingTalkClient(client_id, client_secret) diff --git a/libs/dingtalk_api/__init__.py b/libs/dingtalk_api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libs/dingtalk_api/api.py b/libs/dingtalk_api/api.py new file mode 100644 index 00000000..905557a6 --- /dev/null +++ b/libs/dingtalk_api/api.py @@ -0,0 +1,174 @@ +import base64 +import time +from typing import Callable +import dingtalk_stream +from .EchoHandler import EchoTextHandler +from .dingtalkevent import DingTalkEvent +import httpx +import traceback + + +class DingTalkClient: + def __init__(self, client_id: str, client_secret: str,robot_name:str,robot_code:str): + """初始化 WebSocket 连接并自动启动""" + self.credential = dingtalk_stream.Credential(client_id, client_secret) + self.client = dingtalk_stream.DingTalkStreamClient(self.credential) + self.key = client_id + self.secret = client_secret + # 在 DingTalkClient 中传入自己作为参数,避免循环导入 + self.EchoTextHandler = EchoTextHandler(self) + self.client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self.EchoTextHandler) + self._message_handlers = { + "example":[], + } + self.access_token = '' + self.robot_name = robot_name + self.robot_code = robot_code + self.access_token_expiry_time = '' + + + + async def get_access_token(self): + url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" + headers = { + "Content-Type": "application/json" + } + data = { + "appKey": self.key, + "appSecret": self.secret + } + async with httpx.AsyncClient() as client: + try: + response = await client.post(url,json=data,headers=headers) + if response.status_code == 200: + response_data = response.json() + self.access_token = response_data.get("accessToken") + expires_in = int(response_data.get("expireIn",7200)) + self.access_token_expiry_time = time.time() + expires_in - 60 + except Exception as e: + raise Exception(e) + + + async def is_token_expired(self): + """检查token是否过期""" + if self.access_token_expiry_time is None: + return True + return time.time() > self.access_token_expiry_time + + async def check_access_token(self): + if not self.access_token or await self.is_token_expired(): + return False + return bool(self.access_token and self.access_token.strip()) + + async def download_image(self,download_code:str): + if not await self.check_access_token(): + await self.get_access_token() + url = 'https://api.dingtalk.com/v1.0/robot/messageFiles/download' + params = { + "downloadCode":download_code, + "robotCode":self.robot_code + } + headers ={ + "x-acs-dingtalk-access-token": self.access_token + } + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, json=params) + if response.status_code == 200: + result = response.json() + download_url = result.get("downloadUrl") + else: + raise Exception(f"Error: {response.status_code}, {response.text}") + + if download_url: + return await self.download_url_to_base64(download_url) + + async def download_url_to_base64(self,download_url): + async with httpx.AsyncClient() as client: + response = await client.get(download_url) + + if response.status_code == 200: + + file_bytes = response.content + base64_str = base64.b64encode(file_bytes).decode('utf-8') # 返回字符串格式 + return base64_str + else: + raise Exception("获取图片失败") + + async def update_incoming_message(self, message): + """异步更新 DingTalkClient 中的 incoming_message""" + message_data = await self.get_message(message) + if message_data: + event = DingTalkEvent.from_payload(message_data) + if event: + await self._handle_message(event) + + + async def send_message(self,content:str,incoming_message): + self.EchoTextHandler.reply_text(content,incoming_message) + + + async def get_incoming_message(self): + """获取收到的消息""" + return await self.EchoTextHandler.get_incoming_message() + + + + def on_message(self, msg_type: str): + def decorator(func: Callable[[DingTalkEvent], None]): + if msg_type not in self._message_handlers: + self._message_handlers[msg_type] = [] + self._message_handlers[msg_type].append(func) + return func + return decorator + + async def _handle_message(self, event: DingTalkEvent): + """ + 处理消息事件。 + """ + msg_type = event.conversation + if msg_type in self._message_handlers: + for handler in self._message_handlers[msg_type]: + await handler(event) + + + async def get_message(self,incoming_message:dingtalk_stream.chatbot.ChatbotMessage): + try: + message_data = { + "IncomingMessage":incoming_message, + } + if str(incoming_message.conversation_type) == '1': + message_data["conversation_type"] = 'FriendMessage' + elif str(incoming_message.conversation_type) == '2': + message_data["conversation_type"] = 'GroupMessage' + + + if incoming_message.message_type == 'richText': + + data = incoming_message.rich_text_content.to_dict() + for item in data['richText']: + if 'text' in item: + message_data["Content"] = item['text'] + if incoming_message.get_image_list()[0]: + message_data["Picture"] = await self.download_image(incoming_message.get_image_list()[0]) + message_data["Type"] = 'text' + + elif incoming_message.message_type == 'text': + message_data['Content'] = incoming_message.get_text_list()[0] + + message_data["Type"] = 'text' + elif incoming_message.message_type == 'picture': + message_data['Picture'] = await self.download_image(incoming_message.get_image_list()[0]) + + message_data['Type'] = 'image' + + # 删掉开头的@消息 + if message_data["Content"].startswith("@"+self.robot_name): + message_data["Content"][len("@"+self.robot_name):] + except Exception: + traceback.print_exc() + + return message_data + + async def start(self): + """启动 WebSocket 连接,监听消息""" + await self.client.start() diff --git a/libs/dingtalk_api/dingtalkevent.py b/libs/dingtalk_api/dingtalkevent.py new file mode 100644 index 00000000..8f6bdfc1 --- /dev/null +++ b/libs/dingtalk_api/dingtalkevent.py @@ -0,0 +1,64 @@ +from typing import Dict, Any, Optional + +class DingTalkEvent(dict): + @staticmethod + def from_payload(payload: Dict[str, Any]) -> Optional["DingTalkEvent"]: + try: + event = DingTalkEvent(payload) + return event + except KeyError: + return None + + + @property + def content(self): + return self.get("Content","") + + @property + def incoming_message(self): + return self.get("IncomingMessage") + + @property + def type(self): + return self.get("Type","") + + @property + def picture(self): + return self.get("Picture","") + + @property + def conversation(self): + return self.get("conversation_type","") + + + + def __getattr__(self, key: str) -> Optional[Any]: + """ + 允许通过属性访问数据中的任意字段。 + + Args: + key (str): 字段名。 + + Returns: + Optional[Any]: 字段值。 + """ + return self.get(key) + + def __setattr__(self, key: str, value: Any) -> None: + """ + 允许通过属性设置数据中的任意字段。 + + Args: + key (str): 字段名。 + value (Any): 字段值。 + """ + self[key] = value + + def __repr__(self) -> str: + """ + 生成事件对象的字符串表示。 + + Returns: + str: 字符串表示。 + """ + return f"" \ No newline at end of file diff --git a/libs/wecom_api/api.py b/libs/wecom_api/api.py index 8aecca04..d0068be7 100644 --- a/libs/wecom_api/api.py +++ b/libs/wecom_api/api.py @@ -129,7 +129,6 @@ class WecomClient(): self.access_token = await self.get_access_token(self.secret) url = self.base_url+'/message/send?access_token='+self.access_token - async with httpx.AsyncClient() as client: params={ "touser" : user_id, diff --git a/pkg/core/bootutils/deps.py b/pkg/core/bootutils/deps.py index 83a9b2a7..7d779271 100644 --- a/pkg/core/bootutils/deps.py +++ b/pkg/core/bootutils/deps.py @@ -30,6 +30,7 @@ required_deps = { "discord": "discord.py", "cryptography": "cryptography", "gewechat_client": "gewechat-client", + "dingtalk_stream": "dingtalk_stream", "dashscope": "dashscope", } diff --git a/pkg/core/migrations/m031_dingtalk_config.py b/pkg/core/migrations/m031_dingtalk_config.py new file mode 100644 index 00000000..a25d2359 --- /dev/null +++ b/pkg/core/migrations/m031_dingtalk_config.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class("dingtalk-config", 31) +class DingTalkConfigMigration(migration.Migration): + """迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + + for adapter in self.ap.platform_cfg.data['platform-adapters']: + if adapter['adapter'] == 'dingtalk': + return False + + return True + + async def run(self): + """执行迁移""" + self.ap.platform_cfg.data['platform-adapters'].append({ + "adapter": "dingtalk", + "enable": False, + "client_id": "", + "client_secret": "", + "robot_code": "", + "robot_name": "" + }) + + await self.ap.platform_cfg.dump_config() diff --git a/pkg/core/stages/migrate.py b/pkg/core/stages/migrate.py index afe60078..5fab90c1 100644 --- a/pkg/core/stages/migrate.py +++ b/pkg/core/stages/migrate.py @@ -10,7 +10,7 @@ from ..migrations import m010_ollama_requester_config, m011_command_prefix_confi from ..migrations import m015_gitee_ai_config, m016_dify_service_api, m017_dify_api_timeout_params, m018_xai_config, m019_zhipuai_config from ..migrations import m020_wecom_config, m021_lark_config, m022_lmstudio_config, m023_siliconflow_config, m024_discord_config, m025_gewechat_config from ..migrations import m026_qqofficial_config, m027_wx_official_account_config, m028_aliyun_requester_config -from ..migrations import m029_dashscope_app_api_config, m030_lark_config_cmpl +from ..migrations import m029_dashscope_app_api_config, m030_lark_config_cmpl, m031_dingtalk_config @stage.stage_class("MigrationStage") diff --git a/pkg/platform/manager.py b/pkg/platform/manager.py index c70417d2..75d6f789 100644 --- a/pkg/platform/manager.py +++ b/pkg/platform/manager.py @@ -39,7 +39,7 @@ class PlatformManager: async def initialize(self): - from .sources import nakuru, aiocqhttp, qqbotpy, qqofficial, wecom, lark, discord, gewechat, officialaccount + from .sources import nakuru, aiocqhttp, qqbotpy, qqofficial, wecom, lark, discord, gewechat, officialaccount,dingtalk async def on_friend_message(event: platform_events.FriendMessage, adapter: msadapter.MessageSourceAdapter): diff --git a/pkg/platform/sources/dingtalk.py b/pkg/platform/sources/dingtalk.py new file mode 100644 index 00000000..b70489df --- /dev/null +++ b/pkg/platform/sources/dingtalk.py @@ -0,0 +1,182 @@ + +import traceback +import typing +from libs.dingtalk_api.dingtalkevent import DingTalkEvent +from pkg.platform.types import message as platform_message +from pkg.platform.adapter import MessageSourceAdapter +from pkg.platform.types import events as platform_events, message as platform_message +from pkg.core import app +from .. import adapter +from ...pipeline.longtext.strategies import forward +from ...core import app +from ..types import message as platform_message +from ..types import events as platform_events +from ..types import entities as platform_entities +from ...command.errors import ParamNotEnoughError +from libs.dingtalk_api.api import DingTalkClient +import datetime + + +class DingTalkMessageConverter(adapter.MessageConverter): + + @staticmethod + async def yiri2target( + message_chain:platform_message.MessageChain + ): + for msg in message_chain: + if type(msg) is platform_message.Plain: + return msg.text + + @staticmethod + async def target2yiri(event:DingTalkEvent): + yiri_msg_list = [] + yiri_msg_list.append( + platform_message.Source(id = '0',time=datetime.datetime.now()) + ) + + if event.content: + yiri_msg_list.append(platform_message.Plain(text=event.content)) + if event.picture: + yiri_msg_list.append(platform_message.Image(base64=event.picture)) + + chain = platform_message.MessageChain(yiri_msg_list) + + return chain + + +class DingTalkEventConverter(adapter.EventConverter): + + @staticmethod + async def yiri2target( + event:platform_events.MessageEvent + ): + return event.source_platform_object + + @staticmethod + async def target2yiri( + event:DingTalkEvent + ): + + message_chain = await DingTalkMessageConverter.target2yiri(event) + + + if event.conversation == 'FriendMessage': + + return platform_events.FriendMessage( + sender=platform_entities.Friend( + id= 0, + nickname ='nickname', + remark="" + ), + message_chain = message_chain, + time = datetime.datetime.now(), + source_platform_object=event, + ) + elif event.conversation == 'GroupMessage': + message_chain.insert(0, platform_message.At(target="justbot")) + sender = platform_entities.GroupMember( + id = 111, + member_name="name", + permission= 'MEMBER', + group = platform_entities.Group( + id = 111, + name = 'MEMBER', + permission=platform_entities.Permission.Member + ), + special_title='', + join_timestamp=0, + last_speak_timestamp=0, + mute_time_remaining=0 + ) + time = datetime.datetime.now(), + return platform_events.GroupMessage( + sender =sender, + message_chain = message_chain, + time = time, + source_platform_object=event + ) + +@adapter.adapter_class("dingtalk") +class DingTalkAdapter(adapter.MessageSourceAdapter): + bot: DingTalkClient + ap: app.Application + bot_account_id: str + message_converter: DingTalkMessageConverter = DingTalkMessageConverter() + event_converter: DingTalkEventConverter = DingTalkEventConverter() + config: dict + + def __init__(self,config:dict,ap:app.Application): + self.config = config + self.ap = ap + required_keys = [ + "client_id", + "client_secret", + "robot_name", + "robot_code", + ] + missing_keys = [key for key in required_keys if key not in config] + if missing_keys: + raise ParamNotEnoughError("钉钉缺少相关配置项,请查看文档或联系管理员") + + self.bot = DingTalkClient( + client_id=config["client_id"], + client_secret=config["client_secret"], + robot_name = config["robot_name"], + robot_code=config["robot_code"] + ) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ): + event = await DingTalkEventConverter.yiri2target( + message_source, + ) + incoming_message = event.incoming_message + + content = await DingTalkMessageConverter.yiri2target(message) + await self.bot.send_message(content,incoming_message) + + + async def send_message( + self, target_type: str, target_id: str, message: platform_message.MessageChain + ): + pass + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, adapter.MessageSourceAdapter], None + ], + ): + async def on_message(event: DingTalkEvent): + self.bot_account_id = 'justbot' + try: + return await callback( + await self.event_converter.target2yiri(event), self + ) + except: + traceback.print_exc() + + if event_type == platform_events.FriendMessage: + self.bot.on_message("FriendMessage")(on_message) + elif event_type == platform_events.GroupMessage: + self.bot.on_message("GroupMessage")(on_message) + + async def run_async(self): + + await self.bot.start() + + async def kill(self) -> bool: + return False + + async def unregister_listener( + self, + event_type: type, + callback: typing.Callable[[platform_events.Event, MessageSourceAdapter], None], + ): + return super().unregister_listener(event_type, callback) + diff --git a/requirements.txt b/requirements.txt index dbee8256..63f5947f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,7 @@ lark-oapi discord.py cryptography gewechat-client +dingtalk_stream dashscope # indirect diff --git a/templates/platform.json b/templates/platform.json index 703a2ad4..0c2b778e 100644 --- a/templates/platform.json +++ b/templates/platform.json @@ -78,6 +78,14 @@ "AppSecret":"", "host": "0.0.0.0", "port": 2287 + }, + { + "adapter":"dingtalk", + "enable": false, + "client_id":"", + "client_secret":"", + "robot_code":"", + "robot_name":"" } ], "track-function-calls": true, diff --git a/templates/schema/platform.json b/templates/schema/platform.json index f0224e61..8d2f436c 100644 --- a/templates/schema/platform.json +++ b/templates/schema/platform.json @@ -391,6 +391,47 @@ "description": "监听的端口" } } + }, + { + "title": "钉钉适配器", + "description": "用于接入钉钉", + "properties": { + "adapter": { + "type": "string", + "const": "dingtalk" + }, + "enable": { + "type": "boolean", + "default": false, + "description": "是否启用此适配器", + "layout": { + "comp": "switch", + "props": { + "color": "primary" + } + } + }, + "client_id": { + "type": "string", + "default": "", + "description": "钉钉的client_id" + }, + "client_secret": { + "type": "string", + "default": "", + "description": "钉钉的client_secret" + }, + "robot_code": { + "type": "string", + "default": "", + "description": "钉钉的robot_code" + }, + "robot_name": { + "type": "string", + "default": "", + "description": "钉钉的robot_name" + } + } } ] }