chore: Add PyPI package support for uvx/pip installation (#1764)

* Initial plan

* Add package structure and resource path utilities

- Created langbot/ package with __init__.py and __main__.py entry point
- Added paths utility to find frontend and resource files from package installation
- Updated config loading to use resource paths
- Updated frontend serving to use resource paths
- Added MANIFEST.in for package data inclusion
- Updated pyproject.toml with build system and entry points

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Add PyPI publishing workflow and update license

- Created GitHub Actions workflow to build frontend and publish to PyPI
- Added license field to pyproject.toml to fix deprecation warning
- Updated .gitignore to exclude build artifacts
- Tested package building successfully

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Add PyPI installation documentation

- Created PYPI_INSTALLATION.md with detailed installation and usage instructions
- Updated README.md to feature uvx/pip installation as recommended method
- Updated README_EN.md with same changes for English documentation

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Address code review feedback

- Made package-data configuration more specific to langbot package only
- Improved path detection with caching to avoid repeated file I/O
- Removed sys.path searching which was incorrect for package data
- Removed interactive input() call for non-interactive environment compatibility
- Simplified error messages for version check

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Fix code review issues

- Use specific exception types instead of bare except
- Fix misleading comments about directory levels
- Remove redundant existence check before makedirs with exist_ok=True
- Use context manager for file opening to ensure proper cleanup

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Simplify package configuration and document behavioral differences

- Removed redundant package-data configuration, relying on MANIFEST.in
- Added documentation about behavioral differences between package and source installation
- Clarified that include-package-data=true uses MANIFEST.in for data files

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* chore: update pyproject.toml

* chore: try pack templates in langbot/

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update

* chore: adjust dir structure

* chore: fix imports

* fix: read default-pipeline-config.json

* fix: read default-pipeline-config.json

* fix: tests

* ci: publish pypi

* chore: bump version 4.6.0-beta.1 for testing

* chore: add templates/**

* fix: send adapters and requesters icons

* chore: bump version 4.6.0b2 for testing

* chore: add platform field for docker-compose.yaml

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
Copilot
2025-11-16 19:53:01 +08:00
committed by GitHub
parent 6a24c951e0
commit e642ffa5b3
477 changed files with 1001 additions and 1002 deletions

View File

View File

@@ -0,0 +1,8 @@
semantic_version = 'v4.6.0-beta.2'
required_database_version = 11
"""Tag the version of the database schema, used to check if the database needs to be migrated"""
debug_mode = False
edition = 'community'

View File

@@ -0,0 +1,114 @@
import re
import inspect
import typing
def get_func_schema(function: typing.Callable) -> dict:
"""
Return the data schema of a function.
{
"function": function,
"description": "function description",
"parameters": {
"type": "object",
"properties": {
"parameter_a": {
"type": "str",
"description": "parameter_a description"
},
"parameter_b": {
"type": "int",
"description": "parameter_b description"
},
"parameter_c": {
"type": "str",
"description": "parameter_c description",
"enum": ["a", "b", "c"]
},
},
"required": ["parameter_a", "parameter_b"]
}
}
"""
func_doc = function.__doc__
# Google Style Docstring
if func_doc is None:
raise Exception('Function {} has no docstring.'.format(function.__name__))
func_doc = func_doc.strip().replace(' ', '').replace('\t', '')
# extract doc of args from docstring
doc_spt = func_doc.split('\n\n')
desc = doc_spt[0]
args = doc_spt[1] if len(doc_spt) > 1 else ''
# returns = doc_spt[2] if len(doc_spt) > 2 else ""
# extract args
# delete the first line of args
arg_lines = args.split('\n')[1:]
# arg_doc_list = re.findall(r'(\w+)(\((\w+)\))?:\s*(.*)', args)
args_doc = {}
for arg_line in arg_lines:
doc_tuple = re.findall(r'(\w+)(\(([\w\[\]]+)\))?:\s*(.*)', arg_line)
if len(doc_tuple) == 0:
continue
args_doc[doc_tuple[0][0]] = doc_tuple[0][3]
# extract returns
# return_doc_list = re.findall(r'(\w+):\s*(.*)', returns)
params = enumerate(inspect.signature(function).parameters.values())
parameters = {
'type': 'object',
'required': [],
'properties': {},
}
for i, param in params:
# 排除 self, query
if param.name in ['self', 'query']:
continue
param_type = param.annotation.__name__
type_name_mapping = {
'str': 'string',
'int': 'integer',
'float': 'number',
'bool': 'boolean',
'list': 'array',
'dict': 'object',
}
if param_type in type_name_mapping:
param_type = type_name_mapping[param_type]
parameters['properties'][param.name] = {
'type': param_type,
'description': args_doc[param.name],
}
# add schema for array
if param_type == 'array':
# extract type of array, the int of list[int]
# use re
array_type_tuple = re.findall(r'list\[(\w+)\]', str(param.annotation))
array_type = 'string'
if len(array_type_tuple) > 0:
array_type = array_type_tuple[0]
if array_type in type_name_mapping:
array_type = type_name_mapping[array_type]
parameters['properties'][param.name]['items'] = {
'type': array_type,
}
if param.default is inspect.Parameter.empty:
parameters['required'].append(param.name)
return {
'function': function,
'description': desc,
'parameters': parameters,
}

View File

@@ -0,0 +1,212 @@
import base64
import typing
import io
from urllib.parse import urlparse, parse_qs
import ssl
import aiohttp
import PIL.Image
import httpx
import asyncio
async def get_gewechat_image_base64(
gewechat_url: str,
gewechat_file_url: str,
app_id: str,
xml_content: str,
token: str,
image_type: int = 2,
) -> typing.Tuple[str, str]:
"""从gewechat服务器获取图片并转换为base64格式
Args:
gewechat_url (str): gewechat服务器地址用于获取图片URL
gewechat_file_url (str): gewechat文件下载服务地址
app_id (str): gewechat应用ID
xml_content (str): 图片的XML内容
token (str): Gewechat API Token
image_type (int, optional): 图片类型. Defaults to 2.
Returns:
typing.Tuple[str, str]: (base64编码, 图片格式)
Raises:
aiohttp.ClientTimeout: 请求超时15秒或连接超时2秒
Exception: 其他错误
"""
headers = {'X-GEWE-TOKEN': token, 'Content-Type': 'application/json'}
# 设置超时
timeout = aiohttp.ClientTimeout(
total=15.0, # 总超时时间15秒
connect=2.0, # 连接超时2秒
sock_connect=2.0, # socket连接超时2秒
sock_read=15.0, # socket读取超时15秒
)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
# 获取图片下载链接
try:
async with session.post(
f'{gewechat_url}/v2/api/message/downloadImage',
headers=headers,
json={'appId': app_id, 'type': image_type, 'xml': xml_content},
) as response:
if response.status != 200:
# print(response)
raise Exception(f'获取gewechat图片下载失败: {await response.text()}')
resp_data = await response.json()
if resp_data.get('ret') != 200:
raise Exception(f'获取gewechat图片下载链接失败: {resp_data}')
file_url = resp_data['data']['fileUrl']
except asyncio.TimeoutError:
raise Exception('获取图片下载链接超时')
except aiohttp.ClientError as e:
raise Exception(f'获取图片下载链接网络错误: {str(e)}')
# 解析原始URL并替换端口
base_url = gewechat_file_url
download_url = f'{base_url}/download/{file_url}'
# 下载图片
try:
async with session.get(download_url) as img_response:
if img_response.status != 200:
raise Exception(f'下载图片失败: {await img_response.text()}, URL: {download_url}')
image_data = await img_response.read()
content_type = img_response.headers.get('Content-Type', '')
if content_type:
image_format = content_type.split('/')[-1]
else:
image_format = file_url.split('.')[-1]
base64_str = base64.b64encode(image_data).decode('utf-8')
return base64_str, image_format
except asyncio.TimeoutError:
raise Exception(f'下载图片超时, URL: {download_url}')
except aiohttp.ClientError as e:
raise Exception(f'下载图片网络错误: {str(e)}, URL: {download_url}')
except Exception as e:
raise Exception(f'获取图片失败: {str(e)}') from e
async def get_wecom_image_base64(pic_url: str) -> tuple[str, str]:
"""
下载企业微信图片并转换为 base64
:param pic_url: 企业微信图片URL
:return: (base64_str, image_format)
"""
async with aiohttp.ClientSession() as session:
async with session.get(pic_url) as response:
if response.status != 200:
raise Exception(f'Failed to download image: {response.status}')
# 读取图片数据
image_data = await response.read()
# 获取图片格式
content_type = response.headers.get('Content-Type', '')
image_format = content_type.split('/')[-1] # 例如 'image/jpeg' -> 'jpeg'
# 转换为 base64
import base64
image_base64 = base64.b64encode(image_data).decode('utf-8')
return image_base64, image_format
async def get_qq_official_image_base64(pic_url: str, content_type: str) -> tuple[str, str]:
"""
下载QQ官方图片
并且转换为base64格式
"""
async with httpx.AsyncClient() as client:
response = await client.get(pic_url)
response.raise_for_status() # 确保请求成功
image_data = response.content
base64_data = base64.b64encode(image_data).decode('utf-8')
return f'data:{content_type};base64,{base64_data}'
def get_qq_image_downloadable_url(image_url: str) -> tuple[str, dict]:
"""获取QQ图片的下载链接"""
parsed = urlparse(image_url)
query = parse_qs(parsed.query)
return f'http://{parsed.netloc}{parsed.path}', query
async def get_qq_image_bytes(image_url: str, query: dict = {}) -> tuple[bytes, str]:
"""[弃用]获取QQ图片的bytes"""
image_url, query_in_url = get_qq_image_downloadable_url(image_url)
query = {**query, **query_in_url}
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with aiohttp.ClientSession(trust_env=False) as session:
async with session.get(image_url, params=query, ssl=ssl_context) as resp:
resp.raise_for_status()
file_bytes = await resp.read()
content_type = resp.headers.get('Content-Type')
if not content_type:
image_format = 'jpeg'
elif not content_type.startswith('image/'):
pil_img = PIL.Image.open(io.BytesIO(file_bytes))
image_format = pil_img.format.lower()
else:
image_format = content_type.split('/')[-1]
return file_bytes, image_format
async def qq_image_url_to_base64(image_url: str) -> typing.Tuple[str, str]:
"""[弃用]将QQ图片URL转为base64并返回图片格式
Args:
image_url (str): QQ图片URL
Returns:
typing.Tuple[str, str]: base64编码和图片格式
"""
image_url, query = get_qq_image_downloadable_url(image_url)
# Flatten the query dictionary
query = {k: v[0] for k, v in query.items()}
file_bytes, image_format = await get_qq_image_bytes(image_url, query)
base64_str = base64.b64encode(file_bytes).decode()
return base64_str, image_format
async def extract_b64_and_format(image_base64_data: str) -> typing.Tuple[str, str]:
"""提取base64编码和图片格式
data:image/jpeg;base64,xxx
提取出base64编码和图片格式
"""
base64_str = image_base64_data.split(',')[-1]
image_format = image_base64_data.split(':')[-1].split(';')[0].split('/')[-1]
return base64_str, image_format
async def get_slack_image_to_base64(pic_url: str, bot_token: str):
headers = {'Authorization': f'Bearer {bot_token}'}
try:
async with aiohttp.ClientSession() as session:
async with session.get(pic_url, headers=headers) as resp:
mime_type = resp.headers.get('Content-Type', 'application/octet-stream')
file_bytes = await resp.read()
base64_str = base64.b64encode(file_bytes).decode('utf-8')
return f'data:{mime_type};base64,{base64_str}'
except Exception as e:
raise (e)

View File

@@ -0,0 +1,49 @@
import importlib
import importlib.resources
import os
import typing
def import_modules_in_pkg(pkg: typing.Any) -> None:
"""
导入一个包内的所有模块
Args:
pkg: 要导入的包对象
"""
pkg_path = os.path.dirname(pkg.__file__)
import_dir(pkg_path)
def import_modules_in_pkgs(pkgs: typing.List) -> None:
for pkg in pkgs:
import_modules_in_pkg(pkg)
def import_dot_style_dir(dot_sep_path: str):
sec = dot_sep_path.split('.')
return import_dir(os.path.join(*sec))
def import_dir(path: str, path_prefix: str = 'langbot.'):
for file in os.listdir(path):
if file.endswith('.py') and file != '__init__.py':
full_path = os.path.join(path, file)
rel_path = full_path.replace(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '')
rel_path = rel_path[1:]
rel_path = rel_path.replace('/', '.')[:-3]
rel_path = rel_path.replace('\\', '.')
importlib.import_module(f'{path_prefix}{rel_path}')
def read_resource_file(resource_path: str) -> str:
with importlib.resources.files('langbot').joinpath(resource_path).open('r', encoding='utf-8') as f:
return f.read()
def read_resource_file_bytes(resource_path: str) -> bytes:
return importlib.resources.files('langbot').joinpath(resource_path).read_bytes()
def list_resource_files(resource_path: str) -> list[str]:
return [f.name for f in importlib.resources.files('langbot').joinpath(resource_path).iterdir()]

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
LOG_PAGE_SIZE = 20
MAX_CACHED_PAGES = 10
class LogPage:
"""日志页"""
number: int
"""页码"""
logs: list[str]
def __init__(self, number: int):
self.number = number
self.logs = []
def add_log(self, log: str) -> bool:
"""添加日志
Returns:
bool: 是否已满
"""
self.logs.append(log)
return len(self.logs) >= LOG_PAGE_SIZE
class LogCache:
"""由于 logger 是同步的,但实例中的数据库操作是异步的;
同时,持久化的日志信息已经写入文件了,故做一个缓存来为前端提供日志查询服务"""
log_pages: list[LogPage] = []
"""从前到后,越新的日志页越靠后"""
def __init__(self):
self.log_pages = []
self.log_pages.append(LogPage(number=0))
def add_log(self, log: str):
"""添加日志"""
if self.log_pages[-1].add_log(log):
self.log_pages.append(LogPage(number=self.log_pages[-1].number + 1))
if len(self.log_pages) > MAX_CACHED_PAGES:
self.log_pages.pop(0)
def get_log_by_pointer(
self,
start_page_number: int,
start_offset: int,
) -> tuple[str, int, int]:
"""获取指定页码和偏移量的日志"""
final_logs_str = ''
for page in self.log_pages:
if page.number == start_page_number:
final_logs_str += '\n'.join(page.logs[start_offset:])
elif page.number > start_page_number:
final_logs_str += '\n'.join(page.logs)
return final_logs_str, page.number, len(page.logs)

View File

@@ -0,0 +1,92 @@
"""Utility functions for finding package resources"""
import os
from pathlib import Path
_is_source_install = None
def _check_if_source_install() -> bool:
"""
Check if we're running from source directory or an installed package.
Cached to avoid repeated file I/O.
"""
global _is_source_install
if _is_source_install is not None:
return _is_source_install
# Check if main.py exists in current directory with LangBot marker
if os.path.exists('main.py'):
try:
with open('main.py', 'r', encoding='utf-8') as f:
# Only read first 500 chars to check for marker
content = f.read(500)
if 'LangBot/main.py' in content:
_is_source_install = True
return True
except (IOError, OSError, UnicodeDecodeError):
# If we can't read the file, assume not a source install
pass
_is_source_install = False
return False
def get_frontend_path() -> str:
"""
Get the path to the frontend build files.
Returns the path to web/out directory, handling both:
- Development mode: running from source directory
- Package mode: installed via pip/uvx
"""
# First, check if we're running from source directory
if _check_if_source_install() and os.path.exists('web/out'):
return 'web/out'
# Second, check current directory for web/out (in case user is in source dir)
if os.path.exists('web/out'):
return 'web/out'
# Third, find it relative to the package installation
# Get the directory where this file is located
# paths.py is in pkg/utils/, so parent.parent goes up to pkg/, then parent again goes up to the package root
pkg_dir = Path(__file__).parent.parent.parent
frontend_path = pkg_dir / 'web' / 'out'
if frontend_path.exists():
return str(frontend_path)
# Return the default path (will be checked by caller)
return 'web/out'
def get_resource_path(resource: str) -> str:
"""
Get the path to a resource file.
Args:
resource: Relative path to resource (e.g., 'templates/config.yaml')
Returns:
Absolute path to the resource
"""
# First, check if resource exists in current directory (source install)
if _check_if_source_install() and os.path.exists(resource):
return resource
# Second, check current directory anyway
if os.path.exists(resource):
return resource
# Third, find it relative to package directory
# Get the directory where this file is located
# paths.py is in pkg/utils/, so parent.parent goes up to pkg/, then parent again goes up to the package root
pkg_dir = Path(__file__).parent.parent.parent
resource_path = pkg_dir / resource
if resource_path.exists():
return str(resource_path)
# Return the original path
return resource

View File

@@ -0,0 +1,38 @@
from pip._internal import main as pipmain
def install(package):
pipmain(['install', package])
def install_upgrade(package):
pipmain(
[
'install',
'--upgrade',
package,
'-i',
'https://pypi.tuna.tsinghua.edu.cn/simple',
'--trusted-host',
'pypi.tuna.tsinghua.edu.cn',
]
)
def run_pip(params: list):
pipmain(params)
def install_requirements(file, extra_params: list = []):
pipmain(
[
'install',
'-r',
file,
'-i',
'https://pypi.tuna.tsinghua.edu.cn/simple',
'--trusted-host',
'pypi.tuna.tsinghua.edu.cn',
]
+ extra_params
)

View File

@@ -0,0 +1,22 @@
import os
import sys
def get_platform() -> str:
"""获取当前平台"""
# 检查是不是在 docker 里
DOCKER_ENV = os.environ.get('DOCKER_ENV', 'false')
if os.path.exists('/.dockerenv') or DOCKER_ENV == 'true':
return 'docker'
return sys.platform
standalone_runtime = False
def use_websocket_to_connect_plugin_runtime() -> bool:
"""是否使用 websocket 连接插件运行时"""
return standalone_runtime

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
import os
from ..core import app
class ProxyManager:
"""代理管理器"""
ap: app.Application
forward_proxies: dict[str, str]
def __init__(self, ap: app.Application):
self.ap = ap
self.forward_proxies = {}
async def initialize(self):
self.forward_proxies = {
'http://': os.getenv('HTTP_PROXY') or os.getenv('http_proxy'),
'https://': os.getenv('HTTPS_PROXY') or os.getenv('https_proxy'),
}
if 'http' in self.ap.instance_config.data['proxy'] and self.ap.instance_config.data['proxy']['http']:
self.forward_proxies['http://'] = self.ap.instance_config.data['proxy']['http']
if 'https' in self.ap.instance_config.data['proxy'] and self.ap.instance_config.data['proxy']['https']:
self.forward_proxies['https://'] = self.ap.instance_config.data['proxy']['https']
# 设置到环境变量
os.environ['HTTP_PROXY'] = self.forward_proxies['http://'] or ''
os.environ['HTTPS_PROXY'] = self.forward_proxies['https://'] or ''
def get_forward_proxies(self) -> dict:
return self.forward_proxies.copy()

View File

@@ -0,0 +1,211 @@
from __future__ import annotations
import os
import typing
import logging
import requests
from ..core import app
from . import constants
class VersionManager:
"""版本管理器"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self):
pass
def get_current_version(self) -> str:
current_tag = constants.semantic_version
return current_tag
async def get_release_list(self) -> list:
"""获取发行列表"""
try:
rls_list_resp = requests.get(
url='https://api.github.com/repos/langbot-app/LangBot/releases',
proxies=self.ap.proxy_mgr.get_forward_proxies(),
timeout=5,
)
rls_list_resp.raise_for_status() # 检查请求是否成功
rls_list = rls_list_resp.json()
return rls_list
except Exception as e:
self.ap.logger.warning(f'获取发行列表失败: {e}')
pass
return []
async def update_all(self):
"""检查更新并下载源码"""
current_tag = self.get_current_version()
rls_list = await self.get_release_list()
latest_rls = {}
rls_notes = []
latest_tag_name = ''
for rls in rls_list:
rls_notes.append(rls['name']) # 使用发行名称作为note
if latest_tag_name == '':
latest_tag_name = rls['tag_name']
if rls['tag_name'] == current_tag:
break
if latest_rls == {}:
latest_rls = rls
self.ap.logger.info('更新日志: {}'.format(rls_notes))
if latest_rls == {} and not self.is_newer(latest_tag_name, current_tag): # 没有新版本
return False
# 下载最新版本的zip到temp目录
self.ap.logger.info('开始下载最新版本: {}'.format(latest_rls['zipball_url']))
zip_url = latest_rls['zipball_url']
zip_resp = requests.get(url=zip_url, proxies=self.ap.proxy_mgr.get_forward_proxies())
zip_data = zip_resp.content
# 检查temp/updater目录
if not os.path.exists('temp'):
os.mkdir('temp')
if not os.path.exists('temp/updater'):
os.mkdir('temp/updater')
with open('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'wb') as f:
f.write(zip_data)
self.ap.logger.info('下载最新版本完成: {}'.format('temp/updater/{}.zip'.format(latest_rls['tag_name'])))
# 解压zip到temp/updater/<tag_name>/
import zipfile
# 检查目标文件夹
if os.path.exists('temp/updater/{}'.format(latest_rls['tag_name'])):
import shutil
shutil.rmtree('temp/updater/{}'.format(latest_rls['tag_name']))
os.mkdir('temp/updater/{}'.format(latest_rls['tag_name']))
with zipfile.ZipFile('temp/updater/{}.zip'.format(latest_rls['tag_name']), 'r') as zip_ref:
zip_ref.extractall('temp/updater/{}'.format(latest_rls['tag_name']))
# 覆盖源码
source_root = ''
# 找到temp/updater/<tag_name>/中的第一个子目录路径
for root, dirs, files in os.walk('temp/updater/{}'.format(latest_rls['tag_name'])):
if root != 'temp/updater/{}'.format(latest_rls['tag_name']):
source_root = root
break
# 覆盖源码
import shutil
for root, dirs, files in os.walk(source_root):
# 覆盖所有子文件子目录
for file in files:
src = os.path.join(root, file)
dst = src.replace(source_root, '.')
if os.path.exists(dst):
os.remove(dst)
# 检查目标文件夹是否存在
if not os.path.exists(os.path.dirname(dst)):
os.makedirs(os.path.dirname(dst))
# 检查目标文件是否存在
if not os.path.exists(dst):
# 创建目标文件
open(dst, 'w').close()
shutil.copy(src, dst)
# 把current_tag写入文件
current_tag = latest_rls['tag_name']
with open('current_tag', 'w') as f:
f.write(current_tag)
# TODO statistics
async def is_new_version_available(self) -> bool:
"""检查是否有新版本"""
# 从github获取release列表
rls_list = await self.get_release_list()
if rls_list is None:
return False
# 获取当前版本
current_tag = self.get_current_version()
# 检查是否有新版本
latest_tag_name = ''
for rls in rls_list:
if latest_tag_name == '':
latest_tag_name = rls['tag_name']
break
return self.is_newer(latest_tag_name, current_tag)
def is_newer(self, new_tag: str, old_tag: str):
"""判断版本是否更新,忽略第四位版本和第一位版本"""
if new_tag == old_tag:
return False
new_tag = new_tag.split('.')
old_tag = old_tag.split('.')
# 判断主版本是否相同
if new_tag[0] != old_tag[0]:
return False
if len(new_tag) < 4:
return True
# 合成前三段,判断是否相同
new_tag = '.'.join(new_tag[:3])
old_tag = '.'.join(old_tag[:3])
return new_tag != old_tag
def compare_version_str(v0: str, v1: str) -> int:
"""比较两个版本号"""
# 删除版本号前的v
if v0.startswith('v'):
v0 = v0[1:]
if v1.startswith('v'):
v1 = v1[1:]
v0: list = v0.split('.')
v1: list = v1.split('.')
# 如果两个版本号节数不同把短的后面用0补齐
if len(v0) < len(v1):
v0.extend(['0'] * (len(v1) - len(v0)))
elif len(v0) > len(v1):
v1.extend(['0'] * (len(v0) - len(v1)))
# 从高位向低位比较
for i in range(len(v0)):
if int(v0[i]) > int(v1[i]):
return 1
elif int(v0[i]) < int(v1[i]):
return -1
return 0
async def show_version_update(self) -> typing.Tuple[str, int]:
try:
if await self.ap.ver_mgr.is_new_version_available():
return (
'New version available:\n有新版本可用,根据文档更新: \nhttps://docs.langbot.app/zh/deploy/update.html',
logging.INFO,
)
except Exception as e:
return f'Error checking version update: {e}', logging.WARNING