mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-07 22:36:02 +00:00
style: restrict line-length
This commit is contained in:
@@ -59,9 +59,7 @@ class AnnouncementManager:
|
||||
|
||||
async def fetch_saved(self) -> list[Announcement]:
|
||||
if not os.path.exists('data/labels/announcement_saved.json'):
|
||||
with open(
|
||||
'data/labels/announcement_saved.json', 'w', encoding='utf-8'
|
||||
) as f:
|
||||
with open('data/labels/announcement_saved.json', 'w', encoding='utf-8') as f:
|
||||
f.write('[]')
|
||||
|
||||
with open('data/labels/announcement_saved.json', 'r', encoding='utf-8') as f:
|
||||
@@ -74,11 +72,7 @@ class AnnouncementManager:
|
||||
|
||||
async def write_saved(self, content: list[Announcement]):
|
||||
with open('data/labels/announcement_saved.json', 'w', encoding='utf-8') as f:
|
||||
f.write(
|
||||
json.dumps(
|
||||
[item.to_dict() for item in content], indent=4, ensure_ascii=False
|
||||
)
|
||||
)
|
||||
f.write(json.dumps([item.to_dict() for item in content], indent=4, ensure_ascii=False))
|
||||
|
||||
async def fetch_new(self) -> list[Announcement]:
|
||||
"""获取新公告"""
|
||||
|
||||
@@ -57,9 +57,7 @@ async def get_gewechat_image_base64(
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
# print(response)
|
||||
raise Exception(
|
||||
f'获取gewechat图片下载失败: {await response.text()}'
|
||||
)
|
||||
raise Exception(f'获取gewechat图片下载失败: {await response.text()}')
|
||||
|
||||
resp_data = await response.json()
|
||||
if resp_data.get('ret') != 200:
|
||||
@@ -79,9 +77,7 @@ async def get_gewechat_image_base64(
|
||||
try:
|
||||
async with session.get(download_url) as img_response:
|
||||
if img_response.status != 200:
|
||||
raise Exception(
|
||||
f'下载图片失败: {await img_response.text()}, URL: {download_url}'
|
||||
)
|
||||
raise Exception(f'下载图片失败: {await img_response.text()}, URL: {download_url}')
|
||||
|
||||
image_data = await img_response.read()
|
||||
|
||||
@@ -128,9 +124,7 @@ async def get_wecom_image_base64(pic_url: str) -> tuple[str, str]:
|
||||
return image_base64, image_format
|
||||
|
||||
|
||||
async def get_qq_official_image_base64(
|
||||
pic_url: str, content_type: str
|
||||
) -> tuple[str, str]:
|
||||
async def get_qq_official_image_base64(pic_url: str, content_type: str) -> tuple[str, str]:
|
||||
"""
|
||||
下载QQ官方图片,
|
||||
并且转换为base64格式
|
||||
|
||||
@@ -29,9 +29,7 @@ def import_dir(path: str):
|
||||
for file in os.listdir(path):
|
||||
if file.endswith('.py') and file != '__init__.py':
|
||||
full_path = os.path.join(path, file)
|
||||
rel_path = full_path.replace(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), ''
|
||||
)
|
||||
rel_path = full_path.replace(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '')
|
||||
rel_path = rel_path[1:]
|
||||
rel_path = rel_path.replace('/', '.')[:-3]
|
||||
importlib.import_module(rel_path)
|
||||
|
||||
@@ -3,9 +3,7 @@ import aiohttp
|
||||
|
||||
async def get_myip() -> str:
|
||||
try:
|
||||
async with aiohttp.ClientSession(
|
||||
timeout=aiohttp.ClientTimeout(total=10)
|
||||
) as session:
|
||||
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10)) as session:
|
||||
async with session.get('https://ip.useragentinfo.com/myip') as response:
|
||||
return await response.text()
|
||||
except Exception:
|
||||
|
||||
@@ -23,20 +23,10 @@ class ProxyManager:
|
||||
'https://': os.getenv('HTTPS_PROXY') or os.getenv('https_proxy'),
|
||||
}
|
||||
|
||||
if (
|
||||
'http' in self.ap.instance_config.data['proxy']
|
||||
and self.ap.instance_config.data['proxy']['http']
|
||||
):
|
||||
self.forward_proxies['http://'] = self.ap.instance_config.data['proxy'][
|
||||
'http'
|
||||
]
|
||||
if (
|
||||
'https' in self.ap.instance_config.data['proxy']
|
||||
and self.ap.instance_config.data['proxy']['https']
|
||||
):
|
||||
self.forward_proxies['https://'] = self.ap.instance_config.data['proxy'][
|
||||
'https'
|
||||
]
|
||||
if 'http' in self.ap.instance_config.data['proxy'] and self.ap.instance_config.data['proxy']['http']:
|
||||
self.forward_proxies['http://'] = self.ap.instance_config.data['proxy']['http']
|
||||
if 'https' in self.ap.instance_config.data['proxy'] and self.ap.instance_config.data['proxy']['https']:
|
||||
self.forward_proxies['https://'] = self.ap.instance_config.data['proxy']['https']
|
||||
|
||||
# 设置到环境变量
|
||||
os.environ['HTTP_PROXY'] = self.forward_proxies['http://'] or ''
|
||||
|
||||
@@ -60,18 +60,14 @@ class VersionManager:
|
||||
latest_rls = rls
|
||||
self.ap.logger.info('更新日志: {}'.format(rls_notes))
|
||||
|
||||
if latest_rls == {} and not self.is_newer(
|
||||
latest_tag_name, current_tag
|
||||
): # 没有新版本
|
||||
if latest_rls == {} and not self.is_newer(latest_tag_name, current_tag): # 没有新版本
|
||||
return False
|
||||
|
||||
# 下载最新版本的zip到temp目录
|
||||
self.ap.logger.info('开始下载最新版本: {}'.format(latest_rls['zipball_url']))
|
||||
|
||||
zip_url = latest_rls['zipball_url']
|
||||
zip_resp = requests.get(
|
||||
url=zip_url, proxies=self.ap.proxy_mgr.get_forward_proxies()
|
||||
)
|
||||
zip_resp = requests.get(url=zip_url, proxies=self.ap.proxy_mgr.get_forward_proxies())
|
||||
zip_data = zip_resp.content
|
||||
|
||||
# 检查temp/updater目录
|
||||
@@ -82,11 +78,7 @@ class VersionManager:
|
||||
with open('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'wb') as f:
|
||||
f.write(zip_data)
|
||||
|
||||
self.ap.logger.info(
|
||||
'下载最新版本完成: {}'.format(
|
||||
'temp/updater/{}.zip'.format(latest_rls['tag_name'])
|
||||
)
|
||||
)
|
||||
self.ap.logger.info('下载最新版本完成: {}'.format('temp/updater/{}.zip'.format(latest_rls['tag_name'])))
|
||||
|
||||
# 解压zip到temp/updater/<tag_name>/
|
||||
import zipfile
|
||||
@@ -97,17 +89,13 @@ class VersionManager:
|
||||
|
||||
shutil.rmtree('temp/updater/{}'.format(latest_rls['tag_name']))
|
||||
os.mkdir('temp/updater/{}'.format(latest_rls['tag_name']))
|
||||
with zipfile.ZipFile(
|
||||
'temp/updater/{}.zip'.format(latest_rls['tag_name']), 'r'
|
||||
) as zip_ref:
|
||||
with zipfile.ZipFile('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'r') as zip_ref:
|
||||
zip_ref.extractall('temp/updater/{}'.format(latest_rls['tag_name']))
|
||||
|
||||
# 覆盖源码
|
||||
source_root = ''
|
||||
# 找到temp/updater/<tag_name>/中的第一个子目录路径
|
||||
for root, dirs, files in os.walk(
|
||||
'temp/updater/{}'.format(latest_rls['tag_name'])
|
||||
):
|
||||
for root, dirs, files in os.walk('temp/updater/{}'.format(latest_rls['tag_name'])):
|
||||
if root != 'temp/updater/{}'.format(latest_rls['tag_name']):
|
||||
source_root = root
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user