mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-03 20:44:36 +00:00
perf: 优化代码声明
This commit is contained in:
@@ -1 +1,26 @@
|
||||
from .context import BasePlugin as Plugin
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from .context import BasePlugin as Plugin
|
||||
from . import events
|
||||
|
||||
def register(
|
||||
name: str,
|
||||
description: str,
|
||||
version: str,
|
||||
author
|
||||
) -> typing.Callable[[typing.Type[Plugin]], typing.Type[Plugin]]:
|
||||
pass
|
||||
|
||||
|
||||
def on(
|
||||
event: typing.Type[events.BaseEventModel]
|
||||
) -> typing.Callable[[typing.Callable], typing.Callable]:
|
||||
pass
|
||||
|
||||
|
||||
def func(
|
||||
name: str=None,
|
||||
) -> typing.Callable:
|
||||
pass
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
"""OpenAI 接口底层封装
|
||||
|
||||
目前使用的对话接口有:
|
||||
ChatCompletion - gpt-3.5-turbo 等模型
|
||||
Completion - text-davinci-003 等模型
|
||||
此模块封装此两个接口的请求实现,为上层提供统一的调用方式
|
||||
"""
|
||||
import tiktoken
|
||||
import openai
|
||||
|
||||
from ..provider.api import model as api_model
|
||||
from ..provider.api import completion as api_completion
|
||||
from ..provider.api import chat_completion as api_chat_completion
|
||||
|
||||
COMPLETION_MODELS = {
|
||||
"gpt-3.5-turbo-instruct",
|
||||
}
|
||||
|
||||
CHAT_COMPLETION_MODELS = {
|
||||
# GPT 4 系列
|
||||
"gpt-4-1106-preview",
|
||||
"gpt-4-vision-preview",
|
||||
"gpt-4",
|
||||
"gpt-4-32k",
|
||||
"gpt-4-0613",
|
||||
"gpt-4-32k-0613",
|
||||
"gpt-4-0314", # legacy
|
||||
"gpt-4-32k-0314", # legacy
|
||||
# GPT 3.5 系列
|
||||
"gpt-3.5-turbo-1106",
|
||||
"gpt-3.5-turbo",
|
||||
"gpt-3.5-turbo-16k",
|
||||
"gpt-3.5-turbo-0613", # legacy
|
||||
"gpt-3.5-turbo-16k-0613", # legacy
|
||||
"gpt-3.5-turbo-0301", # legacy
|
||||
# One-API 接入
|
||||
"SparkDesk",
|
||||
"chatglm_pro",
|
||||
"chatglm_std",
|
||||
"chatglm_lite",
|
||||
"qwen-v1",
|
||||
"qwen-plus-v1",
|
||||
"ERNIE-Bot",
|
||||
"ERNIE-Bot-turbo",
|
||||
"gemini-pro",
|
||||
}
|
||||
|
||||
EDIT_MODELS = {
|
||||
|
||||
}
|
||||
|
||||
IMAGE_MODELS = {
|
||||
|
||||
}
|
||||
|
||||
|
||||
def select_request_cls(client: openai.Client, model_name: str, messages: list, args: dict) -> api_model.RequestBase:
|
||||
if model_name in CHAT_COMPLETION_MODELS:
|
||||
return api_chat_completion.ChatCompletionRequest(client, model_name, messages, **args)
|
||||
elif model_name in COMPLETION_MODELS:
|
||||
return api_completion.CompletionRequest(client, model_name, messages, **args)
|
||||
raise ValueError("不支持模型[{}],请检查配置文件".format(model_name))
|
||||
|
||||
|
||||
def count_chat_completion_tokens(messages: list, model: str) -> int:
|
||||
"""Return the number of tokens used by a list of messages."""
|
||||
try:
|
||||
encoding = tiktoken.encoding_for_model(model)
|
||||
except KeyError:
|
||||
print("Warning: model not found. Using cl100k_base encoding.")
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
if model in {
|
||||
"gpt-3.5-turbo-0613",
|
||||
"gpt-3.5-turbo-16k-0613",
|
||||
"gpt-4-0314",
|
||||
"gpt-4-32k-0314",
|
||||
"gpt-4-0613",
|
||||
"gpt-4-32k-0613",
|
||||
"SparkDesk",
|
||||
"chatglm_pro",
|
||||
"chatglm_std",
|
||||
"chatglm_lite",
|
||||
"qwen-v1",
|
||||
"qwen-plus-v1",
|
||||
"ERNIE-Bot",
|
||||
"ERNIE-Bot-turbo",
|
||||
"gemini-pro",
|
||||
}:
|
||||
tokens_per_message = 3
|
||||
tokens_per_name = 1
|
||||
elif model == "gpt-3.5-turbo-0301":
|
||||
tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
|
||||
tokens_per_name = -1 # if there's a name, the role is omitted
|
||||
elif "gpt-3.5-turbo" in model:
|
||||
# print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
|
||||
return count_chat_completion_tokens(messages, model="gpt-3.5-turbo-0613")
|
||||
elif "gpt-4" in model:
|
||||
# print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
|
||||
return count_chat_completion_tokens(messages, model="gpt-4-0613")
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"""count_chat_completion_tokens() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
|
||||
)
|
||||
num_tokens = 0
|
||||
for message in messages:
|
||||
num_tokens += tokens_per_message
|
||||
for key, value in message.items():
|
||||
num_tokens += len(encoding.encode(value))
|
||||
if key == "name":
|
||||
num_tokens += tokens_per_name
|
||||
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
|
||||
return num_tokens
|
||||
|
||||
|
||||
def count_completion_tokens(messages: list, model: str) -> int:
|
||||
|
||||
try:
|
||||
encoding = tiktoken.encoding_for_model(model)
|
||||
except KeyError:
|
||||
print("Warning: model not found. Using cl100k_base encoding.")
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
|
||||
text = ""
|
||||
|
||||
for message in messages:
|
||||
text += message['role'] + message['content'] + "\n"
|
||||
|
||||
text += "assistant: "
|
||||
|
||||
return len(encoding.encode(text))
|
||||
|
||||
|
||||
def count_tokens(messages: list, model: str):
|
||||
|
||||
if model in CHAT_COMPLETION_MODELS:
|
||||
return count_chat_completion_tokens(messages, model)
|
||||
elif model in COMPLETION_MODELS:
|
||||
return count_completion_tokens(messages, model)
|
||||
raise ValueError("不支持模型[{}],请检查配置文件".format(model))
|
||||
@@ -1,67 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import shutil
|
||||
|
||||
from . import context
|
||||
|
||||
|
||||
log_file_name = "qchatgpt.log"
|
||||
|
||||
|
||||
log_colors_config = {
|
||||
'DEBUG': 'green', # cyan white
|
||||
'INFO': 'white',
|
||||
'WARNING': 'yellow',
|
||||
'ERROR': 'red',
|
||||
'CRITICAL': 'cyan',
|
||||
}
|
||||
|
||||
|
||||
def init_runtime_log_file():
|
||||
"""为此次运行生成日志文件
|
||||
格式: qchatgpt-yyyy-MM-dd-HH-mm-ss.log
|
||||
"""
|
||||
global log_file_name
|
||||
|
||||
# 检查logs目录是否存在
|
||||
if not os.path.exists("logs"):
|
||||
os.mkdir("logs")
|
||||
|
||||
log_file_name = "logs/qchatgpt-%s.log" % time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
|
||||
|
||||
|
||||
def reset_logging():
|
||||
global log_file_name
|
||||
|
||||
import pkg.utils.context
|
||||
import colorlog
|
||||
|
||||
if pkg.utils.context.context['logger_handler'] is not None:
|
||||
logging.getLogger().removeHandler(pkg.utils.context.context['logger_handler'])
|
||||
|
||||
for handler in logging.getLogger().handlers:
|
||||
logging.getLogger().removeHandler(handler)
|
||||
|
||||
config_mgr = context.get_config_manager()
|
||||
|
||||
logging_level = logging.INFO if config_mgr is None else config_mgr.data['logging_level']
|
||||
|
||||
logging.basicConfig(level=logging_level, # 设置日志输出格式
|
||||
filename=log_file_name, # log日志输出的文件位置和文件名
|
||||
format="[%(asctime)s.%(msecs)03d] %(pathname)s (%(lineno)d) - [%(levelname)s] :\n%(message)s",
|
||||
# 日志输出的格式
|
||||
# -8表示占位符,让输出左对齐,输出长度都为8位
|
||||
datefmt="%Y-%m-%d %H:%M:%S" # 时间输出的格式
|
||||
)
|
||||
sh = logging.StreamHandler()
|
||||
sh.setLevel(logging_level)
|
||||
sh.setFormatter(colorlog.ColoredFormatter(
|
||||
fmt="%(log_color)s[%(asctime)s.%(msecs)03d] %(filename)s (%(lineno)d) - [%(levelname)s] : "
|
||||
"%(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
log_colors=log_colors_config
|
||||
))
|
||||
logging.getLogger().addHandler(sh)
|
||||
pkg.utils.context.context['logger_handler'] = sh
|
||||
return sh
|
||||
@@ -1,174 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import os.path
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from . import constants
|
||||
|
||||
|
||||
def is_newer(new_tag: str, old_tag: str):
|
||||
"""判断版本是否更新,忽略第四位版本和第一位版本"""
|
||||
if new_tag == old_tag:
|
||||
return False
|
||||
|
||||
new_tag = new_tag.split(".")
|
||||
old_tag = old_tag.split(".")
|
||||
|
||||
# 判断主版本是否相同
|
||||
if new_tag[0] != old_tag[0]:
|
||||
return False
|
||||
|
||||
if len(new_tag) < 4:
|
||||
return True
|
||||
|
||||
# 合成前三段,判断是否相同
|
||||
new_tag = ".".join(new_tag[:3])
|
||||
old_tag = ".".join(old_tag[:3])
|
||||
|
||||
return new_tag != old_tag
|
||||
|
||||
|
||||
def compare_version_str(v0: str, v1: str) -> int:
|
||||
"""比较两个版本号"""
|
||||
|
||||
# 删除版本号前的v
|
||||
if v0.startswith("v"):
|
||||
v0 = v0[1:]
|
||||
if v1.startswith("v"):
|
||||
v1 = v1[1:]
|
||||
|
||||
v0:list = v0.split(".")
|
||||
v1:list = v1.split(".")
|
||||
|
||||
# 如果两个版本号节数不同,把短的后面用0补齐
|
||||
if len(v0) < len(v1):
|
||||
v0.extend(["0"]*(len(v1)-len(v0)))
|
||||
elif len(v0) > len(v1):
|
||||
v1.extend(["0"]*(len(v0)-len(v1)))
|
||||
|
||||
# 从高位向低位比较
|
||||
for i in range(len(v0)):
|
||||
if int(v0[i]) > int(v1[i]):
|
||||
return 1
|
||||
elif int(v0[i]) < int(v1[i]):
|
||||
return -1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def update_all(cli: bool = False) -> bool:
|
||||
"""检查更新并下载源码"""
|
||||
start_time = time.time()
|
||||
|
||||
current_tag = get_current_tag()
|
||||
old_tag = current_tag
|
||||
|
||||
rls_list = get_release_list()
|
||||
|
||||
latest_rls = {}
|
||||
rls_notes = []
|
||||
latest_tag_name = ""
|
||||
for rls in rls_list:
|
||||
rls_notes.append(rls['name']) # 使用发行名称作为note
|
||||
if latest_tag_name == "":
|
||||
latest_tag_name = rls['tag_name']
|
||||
|
||||
if rls['tag_name'] == current_tag:
|
||||
break
|
||||
|
||||
if latest_rls == {}:
|
||||
latest_rls = rls
|
||||
if not cli:
|
||||
logging.info("更新日志: {}".format(rls_notes))
|
||||
else:
|
||||
print("更新日志: {}".format(rls_notes))
|
||||
|
||||
if latest_rls == {} and not is_newer(latest_tag_name, current_tag): # 没有新版本
|
||||
return False
|
||||
|
||||
# 下载最新版本的zip到temp目录
|
||||
if not cli:
|
||||
logging.info("开始下载最新版本: {}".format(latest_rls['zipball_url']))
|
||||
else:
|
||||
print("开始下载最新版本: {}".format(latest_rls['zipball_url']))
|
||||
zip_url = latest_rls['zipball_url']
|
||||
zip_resp = requests.get(
|
||||
url=zip_url,
|
||||
proxies=network.wrapper_proxies()
|
||||
)
|
||||
zip_data = zip_resp.content
|
||||
|
||||
# 检查temp/updater目录
|
||||
if not os.path.exists("temp"):
|
||||
os.mkdir("temp")
|
||||
if not os.path.exists("temp/updater"):
|
||||
os.mkdir("temp/updater")
|
||||
with open("temp/updater/{}.zip".format(latest_rls['tag_name']), "wb") as f:
|
||||
f.write(zip_data)
|
||||
|
||||
if not cli:
|
||||
logging.info("下载最新版本完成: {}".format("temp/updater/{}.zip".format(latest_rls['tag_name'])))
|
||||
else:
|
||||
print("下载最新版本完成: {}".format("temp/updater/{}.zip".format(latest_rls['tag_name'])))
|
||||
|
||||
# 解压zip到temp/updater/<tag_name>/
|
||||
import zipfile
|
||||
# 检查目标文件夹
|
||||
if os.path.exists("temp/updater/{}".format(latest_rls['tag_name'])):
|
||||
import shutil
|
||||
shutil.rmtree("temp/updater/{}".format(latest_rls['tag_name']))
|
||||
os.mkdir("temp/updater/{}".format(latest_rls['tag_name']))
|
||||
with zipfile.ZipFile("temp/updater/{}.zip".format(latest_rls['tag_name']), 'r') as zip_ref:
|
||||
zip_ref.extractall("temp/updater/{}".format(latest_rls['tag_name']))
|
||||
|
||||
# 覆盖源码
|
||||
source_root = ""
|
||||
# 找到temp/updater/<tag_name>/中的第一个子目录路径
|
||||
for root, dirs, files in os.walk("temp/updater/{}".format(latest_rls['tag_name'])):
|
||||
if root != "temp/updater/{}".format(latest_rls['tag_name']):
|
||||
source_root = root
|
||||
break
|
||||
|
||||
# 覆盖源码
|
||||
import shutil
|
||||
for root, dirs, files in os.walk(source_root):
|
||||
# 覆盖所有子文件子目录
|
||||
for file in files:
|
||||
src = os.path.join(root, file)
|
||||
dst = src.replace(source_root, ".")
|
||||
if os.path.exists(dst):
|
||||
os.remove(dst)
|
||||
|
||||
# 检查目标文件夹是否存在
|
||||
if not os.path.exists(os.path.dirname(dst)):
|
||||
os.makedirs(os.path.dirname(dst))
|
||||
# 检查目标文件是否存在
|
||||
if not os.path.exists(dst):
|
||||
# 创建目标文件
|
||||
open(dst, "w").close()
|
||||
|
||||
shutil.copy(src, dst)
|
||||
|
||||
# 把current_tag写入文件
|
||||
current_tag = latest_rls['tag_name']
|
||||
with open("current_tag", "w") as f:
|
||||
f.write(current_tag)
|
||||
|
||||
context.get_center_v2_api().main.post_update_record(
|
||||
spent_seconds=int(time.time()-start_time),
|
||||
infer_reason="update",
|
||||
old_version=old_tag,
|
||||
new_version=current_tag,
|
||||
)
|
||||
|
||||
# 通知管理员
|
||||
if not cli:
|
||||
import pkg.utils.context
|
||||
pkg.utils.context.get_qqbot_manager().notify_admin("已更新到最新版本: {}\n更新日志:\n{}\n完整的更新日志请前往 https://github.com/RockChinQ/QChatGPT/releases 查看。\n请手动重启程序以使用新版本。".format(current_tag, "\n".join(rls_notes[:-1])))
|
||||
else:
|
||||
print("已更新到最新版本: {}\n更新日志:\n{}\n完整的更新日志请前往 https://github.com/RockChinQ/QChatGPT/releases 查看。请手动重启程序以使用新版本。".format(current_tag, "\n".join(rls_notes[:-1])))
|
||||
return True
|
||||
Reference in New Issue
Block a user