feat: add wechatpad for personal wechat

* 更新了wechatpad接口,以及适配器

* 更新了wechatpad接口,以及适配器

* 修复一些细节问题,比如at回复,以及启动登录和启动ws长连接的线程同步

* importutil中修复了在wi上启动替换斜杠问题,login中加上了一个login,暂时没啥用。wechatpad中做出了一些细节修改

* 更新了wechatpad接口,以及适配器

* 怎加了处理图片链接转换为image_base64发送

* feat(wechatpad): 调整日志+bugfix

* feat(wechatpad): fix typo

* 修正了发送语音api参数错误,添加了发送链接处理为base64数据(好像只有一部分链接可以)

* 修复了部分手抽的typo错误

* chore: remove manager.py

---------

Co-authored-by: shinelin <shinelinxx@gmail.com>
Co-authored-by: Junyan Qin (Chin) <rockchinq@gmail.com>
This commit is contained in:
fdc310
2025-05-14 21:18:08 +08:00
committed by GitHub
parent 248d4beed1
commit 20a62fcf69
17 changed files with 1611 additions and 1 deletions

View File

View File

@@ -0,0 +1,14 @@
from libs.wechatpad_api.util.http_util import async_request, post_json
class ChatRoomApi:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
def get_chatroom_member_detail(self, chatroom_name):
params = {
"ChatRoomName": chatroom_name
}
url = self.base_url + '/group/GetChatroomMemberDetail'
return post_json(url, token=self.token, data=params)

View File

@@ -0,0 +1,39 @@
from libs.wechatpad_api.util.http_util import async_request, post_json
import httpx
import base64
class DownloadApi:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
def send_download(self, aeskey, file_type, file_url):
json_data = {
"AesKey": aeskey,
"FileType": file_type,
"FileURL": file_url
}
url = self.base_url + "/message/SendCdnDownload"
return post_json(url, token=self.token, data=json_data)
def get_msg_voice(self,buf_id, length, new_msgid):
json_data = {
"Bufid": buf_id,
"Length": length,
"NewMsgId": new_msgid,
"ToUserName": ""
}
url = self.base_url + "/message/GetMsgVoice"
return post_json(url, token=self.token, data=json_data)
async def download_url_to_base64(self, download_url):
async with httpx.AsyncClient() as client:
response = await client.get(download_url)
if response.status_code == 200:
file_bytes = response.content
base64_str = base64.b64encode(file_bytes).decode('utf-8') # 返回字符串格式
return base64_str
else:
raise Exception('获取文件失败')

View File

@@ -0,0 +1,11 @@
from libs.wechatpad_api.util.http_util import post_json,async_request
from typing import List, Dict, Any, Optional
class FriendApi:
"""联系人API类处理所有与联系人相关的操作"""
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.token = token

View File

@@ -0,0 +1,102 @@
from libs.wechatpad_api.util.http_util import async_request,post_json,get_json
class LoginApi:
def __init__(self, base_url: str, token: str = None, admin_key: str = None):
'''
Args:
base_url: 原始路径
token: token
admin_key: 管理员key
'''
self.base_url = base_url
self.token = token
# self.admin_key = admin_key
def get_token(self, admin_key, day: int=365):
# 获取普通token
url = f"{self.base_url}/admin/GenAuthKey1"
json_data = {
"Count": 1,
"Days": day
}
return post_json(base_url=url, token=admin_key, data=json_data)
def get_login_qr(self, Proxy: str = ""):
'''
Args:
Proxy:异地使用时代理
Returns:json数据
'''
"""
{
"Code": 200,
"Data": {
"Key": "3141312",
"QrCodeUrl": "https://1231x/g6bMlv2dX8zwNbqE6-Zs",
"Txt": "建议返回data=之后内容自定义生成二维码",
"baseResp": {
"ret": 0,
"errMsg": {}
}
},
"Text": ""
}
"""
#获取登录二维码
url = f"{self.base_url}/login/GetLoginQrCodeNew"
check = False
if Proxy != "":
check = True
json_data = {
"Check": check,
"Proxy": Proxy
}
return post_json(base_url=url, token=self.token, data=json_data)
def get_login_status(self):
# 获取登录状态
url = f'{self.base_url}/login/GetLoginStatus'
return get_json(base_url=url, token=self.token)
def logout(self):
# 退出登录
url = f'{self.base_url}/login/LogOut'
return post_json(base_url=url, token=self.token)
def wake_up_login(self, Proxy: str = ""):
# 唤醒登录
url = f'{self.base_url}/login/WakeUpLogin'
check = False
if Proxy != "":
check = True
json_data = {
"Check": check,
"Proxy": ""
}
return post_json(base_url=url, token=self.token, data=json_data)
def login(self,admin_key):
login_status = self.get_login_status()
if login_status["Code"] == 300 and login_status["Text"] == "你已退出微信":
print("token已经失效重新获取")
token_data = self.get_token(admin_key)
self.token = token_data["Data"][0]

View File

@@ -0,0 +1,123 @@
from libs.wechatpad_api.util.http_util import async_request, post_json
class MessageApi:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
def post_text(self, to_wxid, content, ats: list= []):
'''
Args:
app_id: 微信id
to_wxid: 发送方的微信id
content: 内容
ats: at
Returns:
'''
url = self.base_url + "/message/SendTextMessage"
"""发送文字消息"""
json_data = {
"MsgItem": [
{
"AtWxIDList": ats,
"ImageContent": "",
"MsgType": 0,
"TextContent": content,
"ToUserName": to_wxid
}
]
}
return post_json(base_url=url, token=self.token, data=json_data)
def post_image(self, to_wxid, img_url, ats: list= []):
"""发送图片消息"""
# 这里好像可以尝试发送多个暂时未测试
json_data = {
"MsgItem": [
{
"AtWxIDList": ats,
"ImageContent": img_url,
"MsgType": 0,
"TextContent": '',
"ToUserName": to_wxid
}
]
}
url = self.base_url + "/message/SendImageMessage"
return post_json(base_url=url, token=self.token, data=json_data)
def post_voice(self, to_wxid, voice_data, voice_forma, voice_duration):
"""发送语音消息"""
json_data = {
"ToUserName": to_wxid,
"VoiceData": voice_data,
"VoiceFormat": voice_forma,
"VoiceSecond": voice_duration
}
url = self.base_url + "/message/SendVoice"
return post_json(base_url=url, token=self.token, data=json_data)
def post_name_card(self, alias, to_wxid, nick_name, name_card_wxid, flag):
"""发送名片消息"""
param = {
"CardAlias": alias,
"CardFlag": flag,
"CardNickName": nick_name,
"CardWxId": name_card_wxid,
"ToUserName": to_wxid
}
url = f"{self.base_url}/message/ShareCardMessage"
return post_json(base_url=url, token=self.token, data=param)
def post_emoji(self, to_wxid, emoji_md5, emoji_size:int=0):
"""发送emoji消息"""
json_data = {
"EmojiList": [
{
"EmojiMd5": emoji_md5,
"EmojiSize": emoji_size,
"ToUserName": to_wxid
}
]
}
url = f"{self.base_url}/message/SendEmojiMessage"
return post_json(base_url=url, token=self.token, data=json_data)
def post_app_msg(self, to_wxid,xml_data, contenttype:int=0):
"""发送appmsg消息"""
json_data = {
"AppList": [
{
"ContentType": contenttype,
"ContentXML": xml_data,
"ToUserName": to_wxid
}
]
}
url = f"{self.base_url}/message/SendAppMessage"
return post_json(base_url=url, token=self.token, data=json_data)
def revoke_msg(self, to_wxid, msg_id, new_msg_id, create_time):
"""撤回消息"""
param = {
"ClientMsgId": msg_id,
"CreateTime": create_time,
"NewMsgId": new_msg_id,
"ToUserName": to_wxid
}
url = f"{self.base_url}/message/RevokeMsg"
return post_json(base_url=url, token=self.token, data=param)

View File

@@ -0,0 +1,37 @@
from libs.wechatpad_api.util.http_util import post_json, async_request, get_json
class UserApi:
def __init__(self, base_url, token):
self.base_url = base_url
self.token = token
def get_profile(self):
"""获取个人资料"""
url = f'{self.base_url}/user/GetProfile'
return get_json(base_url=url, token=self.token)
def get_qr_code(self, recover:bool=True, style:int=8):
"""获取自己的二维码"""
param = {
"Recover": recover,
"Style": style
}
url = f'{self.base_url}/user/GetMyQRCode'
return post_json(base_url=url, token=self.token, data=param)
def get_safety_info(self):
"""获取设备记录"""
url = f'{self.base_url}/equipment/GetSafetyInfo'
return post_json(base_url=url, token=self.token)
async def update_head_img(self, head_img_base64):
"""修改头像"""
param = {
"Base64": head_img_base64
}
url = f'{self.base_url}/user/UploadHeadImage'
return await async_request(base_url=url, token_key=self.token, json=param)