From 1b4107a90a4686044be3b48db31122e0709f243e Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Fri, 17 Apr 2026 22:37:46 +0800 Subject: [PATCH] refactor: use Space API for release checks and stop idle polling - version.py: switch release list API from GitHub to space.langbot.app, remove unused in-place update logic (update_all, compare_version_str), translate all comments/logs to English - PluginInstallTaskContext: only poll when active install tasks exist --- src/langbot/pkg/utils/version.py | 187 ++++++------------------------- 1 file changed, 32 insertions(+), 155 deletions(-) diff --git a/src/langbot/pkg/utils/version.py b/src/langbot/pkg/utils/version.py index 23440e4a..1e19420d 100644 --- a/src/langbot/pkg/utils/version.py +++ b/src/langbot/pkg/utils/version.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import typing import logging @@ -11,7 +10,7 @@ from . import constants class VersionManager: - """版本管理器""" + """Version manager""" ap: app.Application @@ -22,190 +21,68 @@ class VersionManager: pass def get_current_version(self) -> str: - current_tag = constants.semantic_version - - return current_tag + return constants.semantic_version async def get_release_list(self) -> list: - """获取发行列表""" + """Fetch release list from Space API (cached GitHub releases).""" try: rls_list_resp = requests.get( - url='https://api.github.com/repos/langbot-app/LangBot/releases', + url='https://space.langbot.app/api/v1/dist/info/releases', proxies=self.ap.proxy_mgr.get_forward_proxies(), - timeout=5, + timeout=10, ) - rls_list_resp.raise_for_status() # 检查请求是否成功 - rls_list = rls_list_resp.json() - return rls_list + rls_list_resp.raise_for_status() + resp_json = rls_list_resp.json() + if resp_json.get('code') == 0 and isinstance(resp_json.get('data'), list): + return resp_json['data'] + self.ap.logger.warning(f'Failed to fetch release list: unexpected response: {resp_json.get("msg", "")}') + return [] except Exception as e: - self.ap.logger.warning(f'获取发行列表失败: {e}') - pass + self.ap.logger.warning(f'Failed to fetch release list: {e}') return [] - async def update_all(self): - """检查更新并下载源码""" - - current_tag = self.get_current_version() - - rls_list = await self.get_release_list() - - latest_rls = {} - rls_notes = [] - latest_tag_name = '' - for rls in rls_list: - rls_notes.append(rls['name']) # 使用发行名称作为note - if latest_tag_name == '': - latest_tag_name = rls['tag_name'] - - if rls['tag_name'] == current_tag: - break - - if latest_rls == {}: - latest_rls = rls - self.ap.logger.info('更新日志: {}'.format(rls_notes)) - - if latest_rls == {} and not self.is_newer(latest_tag_name, current_tag): # 没有新版本 - return False - - # 下载最新版本的zip到temp目录 - self.ap.logger.info('开始下载最新版本: {}'.format(latest_rls['zipball_url'])) - - zip_url = latest_rls['zipball_url'] - zip_resp = requests.get(url=zip_url, proxies=self.ap.proxy_mgr.get_forward_proxies()) - zip_data = zip_resp.content - - # 检查temp/updater目录 - if not os.path.exists('temp'): - os.mkdir('temp') - if not os.path.exists('temp/updater'): - os.mkdir('temp/updater') - with open('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'wb') as f: - f.write(zip_data) - - self.ap.logger.info('下载最新版本完成: {}'.format('temp/updater/{}.zip'.format(latest_rls['tag_name']))) - - # 解压zip到temp/updater// - import zipfile - - # 检查目标文件夹 - if os.path.exists('temp/updater/{}'.format(latest_rls['tag_name'])): - import shutil - - shutil.rmtree('temp/updater/{}'.format(latest_rls['tag_name'])) - os.mkdir('temp/updater/{}'.format(latest_rls['tag_name'])) - with zipfile.ZipFile('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'r') as zip_ref: - zip_ref.extractall('temp/updater/{}'.format(latest_rls['tag_name'])) - - # 覆盖源码 - source_root = '' - # 找到temp/updater//中的第一个子目录路径 - for root, dirs, files in os.walk('temp/updater/{}'.format(latest_rls['tag_name'])): - if root != 'temp/updater/{}'.format(latest_rls['tag_name']): - source_root = root - break - - # 覆盖源码 - import shutil - - for root, dirs, files in os.walk(source_root): - # 覆盖所有子文件子目录 - for file in files: - src = os.path.join(root, file) - dst = src.replace(source_root, '.') - if os.path.exists(dst): - os.remove(dst) - - # 检查目标文件夹是否存在 - if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) - # 检查目标文件是否存在 - if not os.path.exists(dst): - # 创建目标文件 - open(dst, 'w').close() - - shutil.copy(src, dst) - - # 把current_tag写入文件 - current_tag = latest_rls['tag_name'] - with open('current_tag', 'w') as f: - f.write(current_tag) - - # TODO statistics - async def is_new_version_available(self) -> bool: - """检查是否有新版本""" - # 从github获取release列表 + """Check whether a newer version is available.""" rls_list = await self.get_release_list() - if rls_list is None: + if not rls_list: return False - # 获取当前版本 current_tag = self.get_current_version() - # 检查是否有新版本 latest_tag_name = '' for rls in rls_list: - if latest_tag_name == '': - latest_tag_name = rls['tag_name'] - break + latest_tag_name = rls.get('tag_name', '') + break - return self.is_newer(latest_tag_name, current_tag) + return self._is_newer(latest_tag_name, current_tag) - def is_newer(self, new_tag: str, old_tag: str): - """判断版本是否更新,忽略第四位版本和第一位版本""" - if new_tag == old_tag: + def _is_newer(self, new_tag: str, old_tag: str) -> bool: + """Check if new_tag is a newer version than old_tag. + + Compares the first three segments (major.minor.patch) only. + Returns False if the major version differs (breaking change boundary). + """ + if not new_tag or not old_tag or new_tag == old_tag: return False - new_tag = new_tag.split('.') - old_tag = old_tag.split('.') + new_parts = new_tag.split('.') + old_parts = old_tag.split('.') - # 判断主版本是否相同 - if new_tag[0] != old_tag[0]: + # Different major version — not considered an upgrade + if new_parts[0] != old_parts[0]: return False - if len(new_tag) < 4: + if len(new_parts) < 4: return True - # 合成前三段,判断是否相同 - new_tag = '.'.join(new_tag[:3]) - old_tag = '.'.join(old_tag[:3]) - - return new_tag != old_tag - - def compare_version_str(v0: str, v1: str) -> int: - """比较两个版本号""" - - # 删除版本号前的v - if v0.startswith('v'): - v0 = v0[1:] - if v1.startswith('v'): - v1 = v1[1:] - - v0: list = v0.split('.') - v1: list = v1.split('.') - - # 如果两个版本号节数不同,把短的后面用0补齐 - if len(v0) < len(v1): - v0.extend(['0'] * (len(v1) - len(v0))) - elif len(v0) > len(v1): - v1.extend(['0'] * (len(v0) - len(v1))) - - # 从高位向低位比较 - for i in range(len(v0)): - if int(v0[i]) > int(v1[i]): - return 1 - elif int(v0[i]) < int(v1[i]): - return -1 - - return 0 + return '.'.join(new_parts[:3]) != '.'.join(old_parts[:3]) async def show_version_update(self) -> typing.Tuple[str, int]: try: - if await self.ap.ver_mgr.is_new_version_available(): + if await self.is_new_version_available(): return ( - 'New version available:\n有新版本可用,根据文档更新: \nhttps://link.langbot.app/zh/docs/update', + 'New version available. Update guide: https://link.langbot.app/en/docs/update', logging.INFO, ) - except Exception as e: return f'Error checking version update: {e}', logging.WARNING