diff --git a/pkg/plugin/models.py b/pkg/plugin/models.py index 19580309..c10e09a3 100644 --- a/pkg/plugin/models.py +++ b/pkg/plugin/models.py @@ -1 +1,26 @@ -from .context import BasePlugin as Plugin \ No newline at end of file +from __future__ import annotations + +import typing + +from .context import BasePlugin as Plugin +from . import events + +def register( + name: str, + description: str, + version: str, + author +) -> typing.Callable[[typing.Type[Plugin]], typing.Type[Plugin]]: + pass + + +def on( + event: typing.Type[events.BaseEventModel] +) -> typing.Callable[[typing.Callable], typing.Callable]: + pass + + +def func( + name: str=None, +) -> typing.Callable: + pass diff --git a/pkg/provider/modelmgr.py b/pkg/provider/modelmgr.py deleted file mode 100644 index f4bf69dd..00000000 --- a/pkg/provider/modelmgr.py +++ /dev/null @@ -1,139 +0,0 @@ -"""OpenAI 接口底层封装 - -目前使用的对话接口有: -ChatCompletion - gpt-3.5-turbo 等模型 -Completion - text-davinci-003 等模型 -此模块封装此两个接口的请求实现,为上层提供统一的调用方式 -""" -import tiktoken -import openai - -from ..provider.api import model as api_model -from ..provider.api import completion as api_completion -from ..provider.api import chat_completion as api_chat_completion - -COMPLETION_MODELS = { - "gpt-3.5-turbo-instruct", -} - -CHAT_COMPLETION_MODELS = { - # GPT 4 系列 - "gpt-4-1106-preview", - "gpt-4-vision-preview", - "gpt-4", - "gpt-4-32k", - "gpt-4-0613", - "gpt-4-32k-0613", - "gpt-4-0314", # legacy - "gpt-4-32k-0314", # legacy - # GPT 3.5 系列 - "gpt-3.5-turbo-1106", - "gpt-3.5-turbo", - "gpt-3.5-turbo-16k", - "gpt-3.5-turbo-0613", # legacy - "gpt-3.5-turbo-16k-0613", # legacy - "gpt-3.5-turbo-0301", # legacy - # One-API 接入 - "SparkDesk", - "chatglm_pro", - "chatglm_std", - "chatglm_lite", - "qwen-v1", - "qwen-plus-v1", - "ERNIE-Bot", - "ERNIE-Bot-turbo", - "gemini-pro", -} - -EDIT_MODELS = { - -} - -IMAGE_MODELS = { - -} - - -def select_request_cls(client: openai.Client, model_name: str, messages: list, args: dict) -> api_model.RequestBase: - if model_name in CHAT_COMPLETION_MODELS: - return api_chat_completion.ChatCompletionRequest(client, model_name, messages, **args) - elif model_name in COMPLETION_MODELS: - return api_completion.CompletionRequest(client, model_name, messages, **args) - raise ValueError("不支持模型[{}],请检查配置文件".format(model_name)) - - -def count_chat_completion_tokens(messages: list, model: str) -> int: - """Return the number of tokens used by a list of messages.""" - try: - encoding = tiktoken.encoding_for_model(model) - except KeyError: - print("Warning: model not found. Using cl100k_base encoding.") - encoding = tiktoken.get_encoding("cl100k_base") - if model in { - "gpt-3.5-turbo-0613", - "gpt-3.5-turbo-16k-0613", - "gpt-4-0314", - "gpt-4-32k-0314", - "gpt-4-0613", - "gpt-4-32k-0613", - "SparkDesk", - "chatglm_pro", - "chatglm_std", - "chatglm_lite", - "qwen-v1", - "qwen-plus-v1", - "ERNIE-Bot", - "ERNIE-Bot-turbo", - "gemini-pro", - }: - tokens_per_message = 3 - tokens_per_name = 1 - elif model == "gpt-3.5-turbo-0301": - tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n - tokens_per_name = -1 # if there's a name, the role is omitted - elif "gpt-3.5-turbo" in model: - # print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.") - return count_chat_completion_tokens(messages, model="gpt-3.5-turbo-0613") - elif "gpt-4" in model: - # print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.") - return count_chat_completion_tokens(messages, model="gpt-4-0613") - else: - raise NotImplementedError( - f"""count_chat_completion_tokens() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" - ) - num_tokens = 0 - for message in messages: - num_tokens += tokens_per_message - for key, value in message.items(): - num_tokens += len(encoding.encode(value)) - if key == "name": - num_tokens += tokens_per_name - num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> - return num_tokens - - -def count_completion_tokens(messages: list, model: str) -> int: - - try: - encoding = tiktoken.encoding_for_model(model) - except KeyError: - print("Warning: model not found. Using cl100k_base encoding.") - encoding = tiktoken.get_encoding("cl100k_base") - - text = "" - - for message in messages: - text += message['role'] + message['content'] + "\n" - - text += "assistant: " - - return len(encoding.encode(text)) - - -def count_tokens(messages: list, model: str): - - if model in CHAT_COMPLETION_MODELS: - return count_chat_completion_tokens(messages, model) - elif model in COMPLETION_MODELS: - return count_completion_tokens(messages, model) - raise ValueError("不支持模型[{}],请检查配置文件".format(model)) diff --git a/pkg/utils/log.py b/pkg/utils/log.py deleted file mode 100644 index 6be28b5b..00000000 --- a/pkg/utils/log.py +++ /dev/null @@ -1,67 +0,0 @@ -import os -import time -import logging -import shutil - -from . import context - - -log_file_name = "qchatgpt.log" - - -log_colors_config = { - 'DEBUG': 'green', # cyan white - 'INFO': 'white', - 'WARNING': 'yellow', - 'ERROR': 'red', - 'CRITICAL': 'cyan', -} - - -def init_runtime_log_file(): - """为此次运行生成日志文件 - 格式: qchatgpt-yyyy-MM-dd-HH-mm-ss.log - """ - global log_file_name - - # 检查logs目录是否存在 - if not os.path.exists("logs"): - os.mkdir("logs") - - log_file_name = "logs/qchatgpt-%s.log" % time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) - - -def reset_logging(): - global log_file_name - - import pkg.utils.context - import colorlog - - if pkg.utils.context.context['logger_handler'] is not None: - logging.getLogger().removeHandler(pkg.utils.context.context['logger_handler']) - - for handler in logging.getLogger().handlers: - logging.getLogger().removeHandler(handler) - - config_mgr = context.get_config_manager() - - logging_level = logging.INFO if config_mgr is None else config_mgr.data['logging_level'] - - logging.basicConfig(level=logging_level, # 设置日志输出格式 - filename=log_file_name, # log日志输出的文件位置和文件名 - format="[%(asctime)s.%(msecs)03d] %(pathname)s (%(lineno)d) - [%(levelname)s] :\n%(message)s", - # 日志输出的格式 - # -8表示占位符,让输出左对齐,输出长度都为8位 - datefmt="%Y-%m-%d %H:%M:%S" # 时间输出的格式 - ) - sh = logging.StreamHandler() - sh.setLevel(logging_level) - sh.setFormatter(colorlog.ColoredFormatter( - fmt="%(log_color)s[%(asctime)s.%(msecs)03d] %(filename)s (%(lineno)d) - [%(levelname)s] : " - "%(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - log_colors=log_colors_config - )) - logging.getLogger().addHandler(sh) - pkg.utils.context.context['logger_handler'] = sh - return sh diff --git a/pkg/utils/updater.py b/pkg/utils/updater.py deleted file mode 100644 index f2d357f2..00000000 --- a/pkg/utils/updater.py +++ /dev/null @@ -1,174 +0,0 @@ -from __future__ import annotations - -import datetime -import logging -import os.path -import time - -import requests - -from . import constants - - -def is_newer(new_tag: str, old_tag: str): - """判断版本是否更新,忽略第四位版本和第一位版本""" - if new_tag == old_tag: - return False - - new_tag = new_tag.split(".") - old_tag = old_tag.split(".") - - # 判断主版本是否相同 - if new_tag[0] != old_tag[0]: - return False - - if len(new_tag) < 4: - return True - - # 合成前三段,判断是否相同 - new_tag = ".".join(new_tag[:3]) - old_tag = ".".join(old_tag[:3]) - - return new_tag != old_tag - - -def compare_version_str(v0: str, v1: str) -> int: - """比较两个版本号""" - - # 删除版本号前的v - if v0.startswith("v"): - v0 = v0[1:] - if v1.startswith("v"): - v1 = v1[1:] - - v0:list = v0.split(".") - v1:list = v1.split(".") - - # 如果两个版本号节数不同,把短的后面用0补齐 - if len(v0) < len(v1): - v0.extend(["0"]*(len(v1)-len(v0))) - elif len(v0) > len(v1): - v1.extend(["0"]*(len(v0)-len(v1))) - - # 从高位向低位比较 - for i in range(len(v0)): - if int(v0[i]) > int(v1[i]): - return 1 - elif int(v0[i]) < int(v1[i]): - return -1 - - return 0 - - -def update_all(cli: bool = False) -> bool: - """检查更新并下载源码""" - start_time = time.time() - - current_tag = get_current_tag() - old_tag = current_tag - - rls_list = get_release_list() - - latest_rls = {} - rls_notes = [] - latest_tag_name = "" - for rls in rls_list: - rls_notes.append(rls['name']) # 使用发行名称作为note - if latest_tag_name == "": - latest_tag_name = rls['tag_name'] - - if rls['tag_name'] == current_tag: - break - - if latest_rls == {}: - latest_rls = rls - if not cli: - logging.info("更新日志: {}".format(rls_notes)) - else: - print("更新日志: {}".format(rls_notes)) - - if latest_rls == {} and not is_newer(latest_tag_name, current_tag): # 没有新版本 - return False - - # 下载最新版本的zip到temp目录 - if not cli: - logging.info("开始下载最新版本: {}".format(latest_rls['zipball_url'])) - else: - print("开始下载最新版本: {}".format(latest_rls['zipball_url'])) - zip_url = latest_rls['zipball_url'] - zip_resp = requests.get( - url=zip_url, - proxies=network.wrapper_proxies() - ) - zip_data = zip_resp.content - - # 检查temp/updater目录 - if not os.path.exists("temp"): - os.mkdir("temp") - if not os.path.exists("temp/updater"): - os.mkdir("temp/updater") - with open("temp/updater/{}.zip".format(latest_rls['tag_name']), "wb") as f: - f.write(zip_data) - - if not cli: - logging.info("下载最新版本完成: {}".format("temp/updater/{}.zip".format(latest_rls['tag_name']))) - else: - print("下载最新版本完成: {}".format("temp/updater/{}.zip".format(latest_rls['tag_name']))) - - # 解压zip到temp/updater// - import zipfile - # 检查目标文件夹 - if os.path.exists("temp/updater/{}".format(latest_rls['tag_name'])): - import shutil - shutil.rmtree("temp/updater/{}".format(latest_rls['tag_name'])) - os.mkdir("temp/updater/{}".format(latest_rls['tag_name'])) - with zipfile.ZipFile("temp/updater/{}.zip".format(latest_rls['tag_name']), 'r') as zip_ref: - zip_ref.extractall("temp/updater/{}".format(latest_rls['tag_name'])) - - # 覆盖源码 - source_root = "" - # 找到temp/updater//中的第一个子目录路径 - for root, dirs, files in os.walk("temp/updater/{}".format(latest_rls['tag_name'])): - if root != "temp/updater/{}".format(latest_rls['tag_name']): - source_root = root - break - - # 覆盖源码 - import shutil - for root, dirs, files in os.walk(source_root): - # 覆盖所有子文件子目录 - for file in files: - src = os.path.join(root, file) - dst = src.replace(source_root, ".") - if os.path.exists(dst): - os.remove(dst) - - # 检查目标文件夹是否存在 - if not os.path.exists(os.path.dirname(dst)): - os.makedirs(os.path.dirname(dst)) - # 检查目标文件是否存在 - if not os.path.exists(dst): - # 创建目标文件 - open(dst, "w").close() - - shutil.copy(src, dst) - - # 把current_tag写入文件 - current_tag = latest_rls['tag_name'] - with open("current_tag", "w") as f: - f.write(current_tag) - - context.get_center_v2_api().main.post_update_record( - spent_seconds=int(time.time()-start_time), - infer_reason="update", - old_version=old_tag, - new_version=current_tag, - ) - - # 通知管理员 - if not cli: - import pkg.utils.context - pkg.utils.context.get_qqbot_manager().notify_admin("已更新到最新版本: {}\n更新日志:\n{}\n完整的更新日志请前往 https://github.com/RockChinQ/QChatGPT/releases 查看。\n请手动重启程序以使用新版本。".format(current_tag, "\n".join(rls_notes[:-1]))) - else: - print("已更新到最新版本: {}\n更新日志:\n{}\n完整的更新日志请前往 https://github.com/RockChinQ/QChatGPT/releases 查看。请手动重启程序以使用新版本。".format(current_tag, "\n".join(rls_notes[:-1]))) - return True