style: introduce ruff as linter and formatter (#1356)

* style: remove necessary imports

* style: fix F841

* style: fix F401

* style: fix F811

* style: fix E402

* style: fix E721

* style: fix E722

* style: fix E722

* style: fix F541

* style: ruff format

* style: all passed

* style: add ruff in deps

* style: more ignores in ruff.toml

* style: add pre-commit
This commit is contained in:
Junyan Qin (Chin)
2025-04-29 17:24:07 +08:00
committed by GitHub
parent 09e70d70e9
commit 209f16af76
240 changed files with 5307 additions and 4689 deletions

View File

@@ -12,41 +12,33 @@ from . import constants
class VersionManager:
"""版本管理器
"""
"""版本管理器"""
ap: app.Application
def __init__(
self,
ap: app.Application
):
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(
self
):
async def initialize(self):
pass
def get_current_version(
self
) -> str:
def get_current_version(self) -> str:
current_tag = constants.semantic_version
return current_tag
async def get_release_list(self) -> list:
"""获取发行列表"""
rls_list_resp = requests.get(
url="https://api.github.com/repos/RockChinQ/LangBot/releases",
url='https://api.github.com/repos/RockChinQ/LangBot/releases',
proxies=self.ap.proxy_mgr.get_forward_proxies(),
timeout=5
timeout=5,
)
rls_list = rls_list_resp.json()
return rls_list
async def update_all(self):
"""检查更新并下载源码"""
start_time = time.time()
@@ -58,10 +50,10 @@ class VersionManager:
latest_rls = {}
rls_notes = []
latest_tag_name = ""
latest_tag_name = ''
for rls in rls_list:
rls_notes.append(rls['name']) # 使用发行名称作为note
if latest_tag_name == "":
if latest_tag_name == '':
latest_tag_name = rls['tag_name']
if rls['tag_name'] == current_tag:
@@ -69,56 +61,68 @@ class VersionManager:
if latest_rls == {}:
latest_rls = rls
self.ap.logger.info("更新日志: {}".format(rls_notes))
self.ap.logger.info('更新日志: {}'.format(rls_notes))
if latest_rls == {} and not self.is_newer(latest_tag_name, current_tag): # 没有新版本
if latest_rls == {} and not self.is_newer(
latest_tag_name, current_tag
): # 没有新版本
return False
# 下载最新版本的zip到temp目录
self.ap.logger.info("开始下载最新版本: {}".format(latest_rls['zipball_url']))
self.ap.logger.info('开始下载最新版本: {}'.format(latest_rls['zipball_url']))
zip_url = latest_rls['zipball_url']
zip_resp = requests.get(
url=zip_url,
proxies=self.ap.proxy_mgr.get_forward_proxies()
url=zip_url, proxies=self.ap.proxy_mgr.get_forward_proxies()
)
zip_data = zip_resp.content
# 检查temp/updater目录
if not os.path.exists("temp"):
os.mkdir("temp")
if not os.path.exists("temp/updater"):
os.mkdir("temp/updater")
with open("temp/updater/{}.zip".format(latest_rls['tag_name']), "wb") as f:
if not os.path.exists('temp'):
os.mkdir('temp')
if not os.path.exists('temp/updater'):
os.mkdir('temp/updater')
with open('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'wb') as f:
f.write(zip_data)
self.ap.logger.info("下载最新版本完成: {}".format("temp/updater/{}.zip".format(latest_rls['tag_name'])))
self.ap.logger.info(
'下载最新版本完成: {}'.format(
'temp/updater/{}.zip'.format(latest_rls['tag_name'])
)
)
# 解压zip到temp/updater/<tag_name>/
import zipfile
# 检查目标文件夹
if os.path.exists("temp/updater/{}".format(latest_rls['tag_name'])):
if os.path.exists('temp/updater/{}'.format(latest_rls['tag_name'])):
import shutil
shutil.rmtree("temp/updater/{}".format(latest_rls['tag_name']))
os.mkdir("temp/updater/{}".format(latest_rls['tag_name']))
with zipfile.ZipFile("temp/updater/{}.zip".format(latest_rls['tag_name']), 'r') as zip_ref:
zip_ref.extractall("temp/updater/{}".format(latest_rls['tag_name']))
shutil.rmtree('temp/updater/{}'.format(latest_rls['tag_name']))
os.mkdir('temp/updater/{}'.format(latest_rls['tag_name']))
with zipfile.ZipFile(
'temp/updater/{}.zip'.format(latest_rls['tag_name']), 'r'
) as zip_ref:
zip_ref.extractall('temp/updater/{}'.format(latest_rls['tag_name']))
# 覆盖源码
source_root = ""
source_root = ''
# 找到temp/updater/<tag_name>/中的第一个子目录路径
for root, dirs, files in os.walk("temp/updater/{}".format(latest_rls['tag_name'])):
if root != "temp/updater/{}".format(latest_rls['tag_name']):
for root, dirs, files in os.walk(
'temp/updater/{}'.format(latest_rls['tag_name'])
):
if root != 'temp/updater/{}'.format(latest_rls['tag_name']):
source_root = root
break
# 覆盖源码
import shutil
for root, dirs, files in os.walk(source_root):
# 覆盖所有子文件子目录
for file in files:
src = os.path.join(root, file)
dst = src.replace(source_root, ".")
dst = src.replace(source_root, '.')
if os.path.exists(dst):
os.remove(dst)
@@ -128,18 +132,18 @@ class VersionManager:
# 检查目标文件是否存在
if not os.path.exists(dst):
# 创建目标文件
open(dst, "w").close()
open(dst, 'w').close()
shutil.copy(src, dst)
# 把current_tag写入文件
current_tag = latest_rls['tag_name']
with open("current_tag", "w") as f:
with open('current_tag', 'w') as f:
f.write(current_tag)
await self.ap.ctr_mgr.main.post_update_record(
spent_seconds=int(time.time()-start_time),
infer_reason="update",
spent_seconds=int(time.time() - start_time),
infer_reason='update',
old_version=old_tag,
new_version=current_tag,
)
@@ -155,23 +159,22 @@ class VersionManager:
current_tag = self.get_current_version()
# 检查是否有新版本
latest_tag_name = ""
latest_tag_name = ''
for rls in rls_list:
if latest_tag_name == "":
if latest_tag_name == '':
latest_tag_name = rls['tag_name']
break
return self.is_newer(latest_tag_name, current_tag)
def is_newer(self, new_tag: str, old_tag: str):
"""判断版本是否更新,忽略第四位版本和第一位版本"""
if new_tag == old_tag:
return False
new_tag = new_tag.split(".")
old_tag = old_tag.split(".")
new_tag = new_tag.split('.')
old_tag = old_tag.split('.')
# 判断主版本是否相同
if new_tag[0] != old_tag[0]:
return False
@@ -180,29 +183,28 @@ class VersionManager:
return True
# 合成前三段,判断是否相同
new_tag = ".".join(new_tag[:3])
old_tag = ".".join(old_tag[:3])
new_tag = '.'.join(new_tag[:3])
old_tag = '.'.join(old_tag[:3])
return new_tag != old_tag
def compare_version_str(v0: str, v1: str) -> int:
"""比较两个版本号"""
# 删除版本号前的v
if v0.startswith("v"):
if v0.startswith('v'):
v0 = v0[1:]
if v1.startswith("v"):
if v1.startswith('v'):
v1 = v1[1:]
v0:list = v0.split(".")
v1:list = v1.split(".")
v0: list = v0.split('.')
v1: list = v1.split('.')
# 如果两个版本号节数不同把短的后面用0补齐
if len(v0) < len(v1):
v0.extend(["0"]*(len(v1)-len(v0)))
v0.extend(['0'] * (len(v1) - len(v0)))
elif len(v0) > len(v1):
v1.extend(["0"]*(len(v0)-len(v1)))
v1.extend(['0'] * (len(v0) - len(v1)))
# 从高位向低位比较
for i in range(len(v0)):
@@ -210,16 +212,16 @@ class VersionManager:
return 1
elif int(v0[i]) < int(v1[i]):
return -1
return 0
async def show_version_update(
self
) -> typing.Tuple[str, int]:
async def show_version_update(self) -> typing.Tuple[str, int]:
try:
if await self.ap.ver_mgr.is_new_version_available():
return "有新版本可用根据文档更新https://docs.langbot.app/deploy/update.html", logging.INFO
return (
'有新版本可用根据文档更新https://docs.langbot.app/deploy/update.html',
logging.INFO,
)
except Exception as e:
return f"检查版本更新时出错: {e}", logging.WARNING
return f'检查版本更新时出错: {e}', logging.WARNING