Compare commits

...

20 Commits

Author SHA1 Message Date
Rock Chin
9270dc2c52 Release v2.2.5 2023-03-20 14:02:38 +00:00
Rock Chin
14aec251b4 Merge pull request #315 from RockChinQ/impl-312
[Feat] 访问GitHub API时使用openai_config中设置的代理地址
2023-03-20 21:49:33 +08:00
Rock Chin
d2a7a57245 feat: 为GitHub API的访问使用代理 (#312) 2023-03-20 13:40:23 +00:00
Rock Chin
1964fc76c8 doc: 完善wiki指引 2023-03-20 13:25:02 +00:00
Rock Chin
b8d4b490ce doc: 添加部署说明 2023-03-20 13:12:25 +00:00
Rock Chin
76891e4855 doc: 添加指令说明指引 2023-03-20 13:09:05 +00:00
Rock Chin
3d868b3a39 Merge pull request #308 from RockChinQ/plugin-ctrl-cmd
[Feat] 解耦指令处理、完善插件管理指令
2023-03-20 21:04:06 +08:00
Rock Chin
7b56bcf7a9 feat: 添加插件启用禁用指令 2023-03-20 13:02:30 +00:00
Rock Chin
f96ae56bce feat: 支持指令删除插件 (#286) 2023-03-20 12:50:25 +00:00
Rock Chin
d52108f4e1 doc: 完善README.md 2023-03-20 12:49:18 +00:00
Rock Chin
5f07b7ad1f refactor: 完成所有指令重构 2023-03-20 12:06:02 +00:00
Rock Chin
cda10cf1a6 Update 漏洞反馈.md 2023-03-20 19:17:53 +08:00
Rock Chin
d226b8ebc5 doc: 完善文档 (#310) 2023-03-20 14:46:39 +08:00
Rock Chin
d08794579c feat: 现有指令占位 2023-03-19 14:33:01 +00:00
Rock Chin
7450494741 Update pull_request_template.md 2023-03-19 20:33:23 +08:00
Rock Chin
36dca7ae2f feat: 添加指令抽象类 2023-03-19 12:27:21 +00:00
Rock Chin
5dae777e79 doc: 添加wiki为submodule 2023-03-19 09:43:45 +00:00
Rock Chin
e518d172d7 Merge pull request #304 from RockChinQ/bd-check-exception
[Perf] 百度云审核的异常处理
2023-03-19 17:13:37 +08:00
Rock Chin
af29277acd feat: 长消息检查函数不再检查敏感词 2023-03-19 09:06:32 +00:00
Rock Chin
79bfa0792d feat: 删除print调试信息 2023-03-19 08:45:54 +00:00
21 changed files with 798 additions and 368 deletions

View File

@@ -1,6 +1,6 @@
--- ---
name: 漏洞反馈 name: 漏洞反馈
about: 报错或漏洞请使用这个模板创建 about: 报错或漏洞请使用这个模板创建不使用此模板创建的异常、漏洞相关issue将被直接关闭
title: "[BUG]" title: "[BUG]"
labels: 'bug' labels: 'bug'
assignees: '' assignees: ''

View File

@@ -1,4 +1,4 @@
### 概述 ## 概述
实现/解决/优化的内容: 实现/解决/优化的内容:
@@ -7,10 +7,13 @@
- [ ] 已阅读仓库[贡献指引](../CONTRIBUTING.md) - [ ] 已阅读仓库[贡献指引](../CONTRIBUTING.md)
- [ ] 已与维护者在issues或其他平台沟通此PR大致内容 - [ ] 已与维护者在issues或其他平台沟通此PR大致内容
## 以下内容可在起草PR后、合并PR前逐步完成
### 功能 ### 功能
- [ ] 已编写完善的配置文件字段说明(若有新增) - [ ] 已编写完善的配置文件字段说明(若有新增)
- [ ]测试新功能 - [ ]编写面向用户的新功能说明(若有必要)
- [ ] 已测试新功能或更改
### 兼容性 ### 兼容性

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "QChatGPT.wiki"]
path = QChatGPT.wiki
url = https://github.com/RockChinQ/QChatGPT.wiki.git

1
QChatGPT.wiki Submodule

Submodule QChatGPT.wiki added at 68c4ef5d24

View File

@@ -6,7 +6,7 @@
> 2023/3/3 现已在主线支持官方ChatGPT接口使用方法查看[#195](https://github.com/RockChinQ/QChatGPT/issues/195) > 2023/3/3 现已在主线支持官方ChatGPT接口使用方法查看[#195](https://github.com/RockChinQ/QChatGPT/issues/195)
- 到[项目Wiki](https://github.com/RockChinQ/QChatGPT/wiki)可了解项目详细信息 - 到[项目Wiki](https://github.com/RockChinQ/QChatGPT/wiki)可了解项目详细信息
- 由bilibili TheLazy制作的[视频教程](https://www.bilibili.com/video/BV15v4y1X7aP) - ~~由bilibili TheLazy制作的[视频教程](https://www.bilibili.com/video/BV15v4y1X7aP)~~(寄了,求大佬做个新的)
- 交流、答疑群: ~~204785790~~(已满)、~~691226829~~已满、656285629 - 交流、答疑群: ~~204785790~~(已满)、~~691226829~~已满、656285629
- **进群提问前请您`确保`已经找遍文档和issue均无法解决** - **进群提问前请您`确保`已经找遍文档和issue均无法解决**
- QQ频道机器人见[QQChannelChatGPT](https://github.com/Soulter/QQChannelChatGPT) - QQ频道机器人见[QQChannelChatGPT](https://github.com/Soulter/QQChannelChatGPT)
@@ -15,6 +15,9 @@
## 🍺模型适配一览 ## 🍺模型适配一览
<details>
<summary>点击此处展开</summary>
### 文字对话 ### 文字对话
- OpenAI GPT-3.5模型(ChatGPT API), 本项目原生支持, 默认使用 - OpenAI GPT-3.5模型(ChatGPT API), 本项目原生支持, 默认使用
@@ -38,6 +41,9 @@
- TTS+VITS, 由[插件](https://github.com/dominoar/QChatPlugins)接入 - TTS+VITS, 由[插件](https://github.com/dominoar/QChatPlugins)接入
- Plachta/VITS-Umamusume-voice-synthesizer, 由[插件](https://github.com/oliverkirk-sudo/chat_voice)接入 - Plachta/VITS-Umamusume-voice-synthesizer, 由[插件](https://github.com/oliverkirk-sudo/chat_voice)接入
</details>
## ✅功能 ## ✅功能
<details> <details>
@@ -126,6 +132,8 @@
### - 注册OpenAI账号 ### - 注册OpenAI账号
> 若您要直接使用非OpenAI的模型如New Bing可跳过此步骤直接进行之后的部署完成后按照相关插件的文档进行配置即可
参考以下文章自行注册 参考以下文章自行注册
> [国内注册ChatGPT的方法(100%可用)](https://www.pythonthree.com/register-openai-chatgpt/) > [国内注册ChatGPT的方法(100%可用)](https://www.pythonthree.com/register-openai-chatgpt/)
@@ -204,7 +212,8 @@ python3 main.py
## 🚀使用 ## 🚀使用
查看[Wiki功能使用页](https://github.com/RockChinQ/QChatGPT/wiki/%E5%8A%9F%E8%83%BD%E4%BD%BF%E7%94%A8#%E4%BD%BF%E7%94%A8%E6%96%B9%E5%BC%8F) **部署完成后必看: [指令说明](https://github.com/RockChinQ/QChatGPT/wiki/%E5%8A%9F%E8%83%BD%E4%BD%BF%E7%94%A8#%E6%9C%BA%E5%99%A8%E4%BA%BA%E6%8C%87%E4%BB%A4)**
所有功能查看[Wiki功能使用页](https://github.com/RockChinQ/QChatGPT/wiki/%E5%8A%9F%E8%83%BD%E4%BD%BF%E7%94%A8#%E4%BD%BF%E7%94%A8%E6%96%B9%E5%BC%8F)
## 🧩插件生态 ## 🧩插件生态
@@ -212,6 +221,9 @@ python3 main.py
详见[Wiki插件使用页](https://github.com/RockChinQ/QChatGPT/wiki/%E6%8F%92%E4%BB%B6%E4%BD%BF%E7%94%A8) 详见[Wiki插件使用页](https://github.com/RockChinQ/QChatGPT/wiki/%E6%8F%92%E4%BB%B6%E4%BD%BF%E7%94%A8)
开发教程见[Wiki插件开发页](https://github.com/RockChinQ/QChatGPT/wiki/%E6%8F%92%E4%BB%B6%E5%BC%80%E5%8F%91) 开发教程见[Wiki插件开发页](https://github.com/RockChinQ/QChatGPT/wiki/%E6%8F%92%E4%BB%B6%E5%BC%80%E5%8F%91)
<details>
<summary>查看插件列表</summary>
### 示例插件 ### 示例插件
`tests/plugin_examples`目录下,将其整个目录复制到`plugins`目录下即可使用 `tests/plugin_examples`目录下,将其整个目录复制到`plugins`目录下即可使用
@@ -226,11 +238,12 @@ python3 main.py
- [revLibs](https://github.com/RockChinQ/revLibs) - 将ChatGPT网页版接入此项目关于[官方接口和网页版有什么区别](https://github.com/RockChinQ/QChatGPT/wiki/%E5%AE%98%E6%96%B9%E6%8E%A5%E5%8F%A3%E4%B8%8EChatGPT%E7%BD%91%E9%A1%B5%E7%89%88) - [revLibs](https://github.com/RockChinQ/revLibs) - 将ChatGPT网页版接入此项目关于[官方接口和网页版有什么区别](https://github.com/RockChinQ/QChatGPT/wiki/%E5%AE%98%E6%96%B9%E6%8E%A5%E5%8F%A3%E4%B8%8EChatGPT%E7%BD%91%E9%A1%B5%E7%89%88)
- [hello_plugin](https://github.com/RockChinQ/hello_plugin) - `hello_plugin` 的储存库形式,插件开发模板 - [hello_plugin](https://github.com/RockChinQ/hello_plugin) - `hello_plugin` 的储存库形式,插件开发模板
- [dominoar/QChatPlugins](https://github.com/dominoar/QchatPlugins) - dominoar编写的诸多新功能插件输出、Ranimg、屏蔽词规则等 - [dominoar/QChatPlugins](https://github.com/dominoar/QchatPlugins) - dominoar编写的诸多新功能插件输出、Ranimg、屏蔽词规则等
- [dominoar/QCP-NovelAi](https://github.com/dominoar/QCP-NovelAi) - NovelAI 故事叙述与绘画 - [dominoar/QCP-NovelAi](https://github.com/dominoar/QCP-NovelAi) - NovelAI 故事叙述与绘画
- [oliverkirk-sudo/chat_voice](https://github.com/oliverkirk-sudo/chat_voice) - 文字转语音输出使用HuggingFace上的[VITS-Umamusume-voice-synthesizer模型](https://huggingface.co/spaces/Plachta/VITS-Umamusume-voice-synthesizer) - [oliverkirk-sudo/chat_voice](https://github.com/oliverkirk-sudo/chat_voice) - 文字转语音输出使用HuggingFace上的[VITS-Umamusume-voice-synthesizer模型](https://huggingface.co/spaces/Plachta/VITS-Umamusume-voice-synthesizer)
- [RockChinQ/WaitYiYan](https://github.com/RockChinQ/WaitYiYan) - 实时获取百度`文心一言`等待列表人数 - [RockChinQ/WaitYiYan](https://github.com/RockChinQ/WaitYiYan) - 实时获取百度`文心一言`等待列表人数
- [QChartGPT_Emoticon_Plugin](https://github.com/chordfish-k/QChartGPT_Emoticon_Plugin) - 使机器人根据回复内容发送表情包 - [QChartGPT_Emoticon_Plugin](https://github.com/chordfish-k/QChartGPT_Emoticon_Plugin) - 使机器人根据回复内容发送表情包
</details>
## 😘致谢 ## 😘致谢
@@ -242,6 +255,6 @@ python3 main.py
以及所有[贡献者](https://github.com/RockChinQ/QChatGPT/graphs/contributors)和其他为本项目提供支持的朋友们。 以及所有[贡献者](https://github.com/RockChinQ/QChatGPT/graphs/contributors)和其他为本项目提供支持的朋友们。
## 👍赞赏 <!-- ## 👍赞赏
<img alt="赞赏码" src="res/mm_reward_qrcode_1672840549070.png" width="400" height="400"/> <img alt="赞赏码" src="res/mm_reward_qrcode_1672840549070.png" width="400" height="400"/> -->

View File

@@ -183,7 +183,7 @@ prompt_submit_length = 2048
# 'text-ada-001' # 'text-ada-001'
# #
# 具体请查看OpenAI的文档: https://beta.openai.com/docs/api-reference/completions/create # 具体请查看OpenAI的文档: https://beta.openai.com/docs/api-reference/completions/create
# 请将内容修改到config.py中请勿修改此文件 # 请将内容修改到config.py中请勿修改config-template.py
completion_api_params = { completion_api_params = {
"model": "gpt-3.5-turbo", "model": "gpt-3.5-turbo",
"temperature": 0.9, # 数值越低得到的回答越理性,取值范围[0, 1] "temperature": 0.9, # 数值越低得到的回答越理性,取值范围[0, 1]

View File

@@ -46,7 +46,7 @@ class DataGatherer:
config = pkg.utils.context.get_config() config = pkg.utils.context.get_config()
if hasattr(config, "report_usage") and not config.report_usage: if hasattr(config, "report_usage") and not config.report_usage:
return return
res = requests.get("http://rockchin.top:18989/usage?service_name=qchatgpt.{}&version={}&count={}".format(subservice_name, self.version_str, count)) res = requests.get("http://reports.rockchin.top:18989/usage?service_name=qchatgpt.{}&version={}&count={}".format(subservice_name, self.version_str, count))
if res.status_code != 200 or res.text != "ok": if res.status_code != 200 or res.text != "ok":
logging.warning("report to server failed, status_code: {}, text: {}".format(res.status_code, res.text)) logging.warning("report to server failed, status_code: {}, text: {}".format(res.status_code, res.text))
except: except:

View File

@@ -287,8 +287,6 @@ class Session:
packed_tokens = 0 packed_tokens = 0
print(self.prompt)
while changable_index >= 0 and token_count_index >= 0: while changable_index >= 0 and token_count_index >= 0:
if packed_tokens + self.token_counts[token_count_index] > max_tokens: if packed_tokens + self.token_counts[token_count_index] > max_tokens:
break break
@@ -304,8 +302,6 @@ class Session:
# 将default_prompt和changable_prompts合并 # 将default_prompt和changable_prompts合并
result_prompt = self.default_prompt + changable_prompts result_prompt = self.default_prompt + changable_prompts
print(changable_prompts)
# 添加当前问题 # 添加当前问题
result_prompt.append( result_prompt.append(
{ {

View File

@@ -5,6 +5,7 @@ import importlib
import os import os
import pkgutil import pkgutil
import sys import sys
import shutil
import traceback import traceback
import pkg.utils.context as context import pkg.utils.context as context
@@ -160,6 +161,22 @@ def install_plugin(repo_url: str):
main.reset_logging() main.reset_logging()
def uninstall_plugin(plugin_name: str) -> str:
""" 卸载插件 """
if plugin_name not in __plugins__:
raise Exception("插件不存在")
# 获取文件夹路径
plugin_path = __plugins__[plugin_name]['path'].replace("\\", "/")
# 剪切路径为plugins/插件名
plugin_path = plugin_path.split("plugins/")[1].split("/")[0]
# 删除文件夹
shutil.rmtree("plugins/"+plugin_path)
return "plugins/"+plugin_path
class EventContext: class EventContext:
""" 事件上下文 """ """ 事件上下文 """
eid = 0 eid = 0

View File

@@ -76,8 +76,6 @@ def check_text(text: str) -> list:
# 转换成图片 # 转换成图片
return [text_to_image(text)] return [text_to_image(text)]
elif config.blob_message_strategy == 'forward': elif config.blob_message_strategy == 'forward':
# 敏感词屏蔽
text = context.get_qqbot_manager().reply_filter.process(text)
# 包装转发消息 # 包装转发消息
display = ForwardMessageDiaplay( display = ForwardMessageDiaplay(

View File

36
pkg/qqbot/cmds/func.py Normal file
View File

@@ -0,0 +1,36 @@
from pkg.qqbot.cmds.model import command
import logging
from mirai import Image
import config
import pkg.openai.session
@command(
"draw",
"使用DALL·E模型作画",
"!draw <图片提示语>",
[],
False
)
def cmd_draw(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""使用DALL·E模型作画"""
reply = []
if len(params) == 0:
reply = ["[bot]err:请输入图片描述文字"]
else:
session = pkg.openai.session.get_session(session_name)
res = session.draw_image(" ".join(params))
logging.debug("draw_image result:{}".format(res))
reply = [Image(url=res['data'][0]['url'])]
if not (hasattr(config, 'include_image_description')
and not config.include_image_description):
reply.append(" ".join(params))
return reply

45
pkg/qqbot/cmds/model.py Normal file
View File

@@ -0,0 +1,45 @@
# 指令模型
import logging
commands = []
"""已注册的指令类
{
"name": "指令名",
"description": "指令描述",
"usage": "指令用法",
"aliases": ["别名1", "别名2"],
"admin_only": "是否仅管理员可用",
"func": "指令执行函数"
}
"""
def command(name: str, description: str, usage: str, aliases: list = None, admin_only: bool = False):
"""指令装饰器"""
def wrapper(fun):
commands.append({
"name": name,
"description": description,
"usage": usage,
"aliases": aliases,
"admin_only": admin_only,
"func": fun
})
return fun
return wrapper
def search(cmd: str) -> dict:
"""查找指令"""
for command in commands:
if (command["name"] == cmd) or (cmd in command["aliases"]):
return command
return None
import pkg.qqbot.cmds.func
import pkg.qqbot.cmds.system
import pkg.qqbot.cmds.session
import pkg.qqbot.cmds.plugin

129
pkg/qqbot/cmds/plugin.py Normal file
View File

@@ -0,0 +1,129 @@
from pkg.qqbot.cmds.model import command
import pkg.utils.context
import pkg.plugin.switch as plugin_switch
import os
import threading
import logging
def plugin_operation(cmd, params, is_admin):
reply = []
import pkg.plugin.host as plugin_host
import pkg.utils.updater as updater
plugin_list = plugin_host.__plugins__
if len(params) == 0:
reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__))
idx = 0
for key in plugin_host.iter_plugins_name():
plugin = plugin_list[key]
reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\
.format((idx+1), plugin['name'],
"[已禁用]" if not plugin['enabled'] else "",
plugin['description'],
plugin['version'], plugin['author'])
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1]))
if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT":
reply_str += "源码: "+remote_url+"\n"
idx += 1
reply = [reply_str]
elif params[0] == 'update':
# 更新所有插件
if is_admin:
def closure():
import pkg.utils.context
updated = []
for key in plugin_list:
plugin = plugin_list[key]
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1]))
if success:
updated.append(plugin['name'])
# 检查是否有requirements.txt
pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...")
for key in plugin_list:
plugin = plugin_list[key]
if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"):
logging.info("{}检测到requirements.txt安装依赖".format(plugin['name']))
import pkg.utils.pkgmgr
pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt")
import main
main.reset_logging()
pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated)))
threading.Thread(target=closure).start()
reply = ["[bot]正在更新所有插件,请勿重复发起..."]
else:
reply = ["[bot]err:权限不足"]
elif params[0] == 'del' or params[0] == 'delete':
if is_admin:
if len(params) < 2:
reply = ["[bot]err:未指定插件名"]
else:
plugin_name = params[1]
if plugin_name in plugin_list:
unin_path = plugin_host.uninstall_plugin(plugin_name)
reply = ["[bot]已删除插件: {} ({}), 请发送 !reload 重载插件".format(plugin_name, unin_path)]
else:
reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
elif params[0] == 'on' or params[0] == 'off' :
new_status = params[0] == 'on'
if is_admin:
if len(params) < 2:
reply = ["[bot]err:未指定插件名"]
else:
plugin_name = params[1]
if plugin_name in plugin_list:
plugin_list[plugin_name]['enabled'] = new_status
plugin_switch.dump_switch()
reply = ["[bot]已{}插件: {}".format("启用" if new_status else "禁用", plugin_name)]
else:
reply = ["[bot]err:未找到插件: {}, 请使用!plugin指令查看插件列表".format(plugin_name)]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
elif params[0].startswith("http"):
if is_admin:
def closure():
try:
plugin_host.install_plugin(params[0])
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件")
except Exception as e:
logging.error("插件安装失败:{}".format(e))
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e))
threading.Thread(target=closure, args=()).start()
reply = ["[bot]正在安装插件..."]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
else:
reply = ["[bot]err:未知参数: {}".format(params)]
return reply
@command(
"plugin",
"插件相关操作",
"!plugin\n!plugin <插件仓库地址>\!plugin update\n!plugin del <插件名>\n!plugin on <插件名>\n!plugin off <插件名>",
[],
False
)
def cmd_plugin(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""插件相关操作"""
reply = plugin_operation(cmd, params, is_admin)
return reply

282
pkg/qqbot/cmds/session.py Normal file
View File

@@ -0,0 +1,282 @@
# 会话管理相关指令
import datetime
import json
from pkg.qqbot.cmds.model import command
import pkg.openai.session
import pkg.utils.context
import config
@command(
"reset",
"重置当前会话",
"!reset\n!reset [使用情景预设名称]",
[],
False
)
def cmd_reset(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""重置会话"""
reply = []
if len(params) == 0:
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = ["[bot]会话已重置"]
else:
pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0])
reply = ["[bot]会话已重置,使用场景预设:{}".format(params[0])]
return reply
@command(
"last",
"切换到前一次会话",
"!last",
[],
False
)
def cmd_last(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""切换到前一次会话"""
reply = []
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = ["[bot]没有前一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)]
return reply
@command(
"next",
"切换到后一次会话",
"!next",
[],
False
)
def cmd_next(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: int, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""切换到后一次会话"""
reply = []
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = ["[bot]没有后一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)]
return reply
@command(
"prompt",
"获取当前会话的前文",
"!prompt",
[],
False
)
def cmd_prompt(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""获取当前会话的前文"""
reply = []
msgs = ""
session:list = pkg.openai.session.get_session(session_name).prompt
for msg in session:
if len(params) != 0 and params[0] in ['-all', '-a']:
msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content'])
elif len(msg['content']) > 30:
msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30])
else:
msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content'])
reply = ["[bot]当前对话所有内容:\n{}".format(msgs)]
return reply
@command(
"list",
"列出当前会话的所有历史记录",
"!list\n!list [页数]",
[],
False
)
def cmd_list(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""列出当前会话的所有历史记录"""
reply = []
pkg.openai.session.get_session(session_name).persistence()
page = 0
if len(params) > 0:
try:
page = int(params[0])
except ValueError:
pass
results = pkg.openai.session.get_session(session_name).list_history(page=page)
if len(results) == 0:
reply = ["[bot]第{}页没有历史会话".format(page)]
else:
reply_str = "[bot]历史会话 第{}页:\n".format(page)
current = -1
for i in range(len(results)):
# 时间(使用create_timestamp转换) 序号 部分内容
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp'])
msg = ""
try:
msg = json.loads(results[i]['prompt'])
except json.decoder.JSONDecodeError:
msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt'])
# 持久化
pkg.openai.session.get_session(session_name).persistence()
if len(msg) >= 2:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
msg[0]['content'])
else:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
"无内容")
if results[i]['create_timestamp'] == pkg.openai.session.get_session(
session_name).create_timestamp:
current = i + page * 10
reply_str += "\n以上信息倒序排列"
if current != -1:
reply_str += ",当前会话是 #{}\n".format(current)
else:
reply_str += ",当前处于全新会话或不在此页"
reply = [reply_str]
return reply
@command(
"resend",
"重新获取上一次问题的回复",
"!resend",
[],
False
)
def cmd_resend(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""重新获取上一次问题的回复"""
reply = []
session = pkg.openai.session.get_session(session_name)
to_send = session.undo()
mgr = pkg.utils.context.get_qqbot_manager()
reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config,
launcher_type, launcher_id, sender_id)
return reply
@command(
"del",
"删除当前会话的历史记录",
"!del <序号>\n!del all",
[],
False
)
def cmd_del(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""删除当前会话的历史记录"""
reply = []
if len(params) == 0:
reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"]
else:
if params[0] == 'all':
pkg.openai.session.get_session(session_name).delete_all_history()
reply = ["[bot]已删除所有历史会话"]
elif params[0].isdigit():
if pkg.openai.session.get_session(session_name).delete_history(int(params[0])):
reply = ["[bot]已删除历史会话 #{}".format(params[0])]
else:
reply = ["[bot]没有历史会话 #{}".format(params[0])]
else:
reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"]
return reply
@command(
"default",
"操作情景预设",
"!default\n!default [指定情景预设为默认]",
[],
False
)
def cmd_default(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""操作情景预设"""
reply = []
if len(params) == 0:
# 输出目前所有情景预设
import pkg.openai.dprompt as dprompt
reply_str = "[bot]当前所有情景预设:\n\n"
for key,value in dprompt.get_prompt_dict().items():
reply_str += " - {}: {}\n".format(key,value)
reply_str += "\n当前默认情景预设:{}\n".format(dprompt.get_current())
reply_str += "请使用!default <情景预设>来设置默认情景预设"
reply = [reply_str]
elif len(params) >0 and is_admin:
# 设置默认情景
import pkg.openai.dprompt as dprompt
try:
dprompt.set_current(params[0])
reply = ["[bot]已设置默认情景预设为:{}".format(dprompt.get_current())]
except KeyError:
reply = ["[bot]err: 未找到情景预设:{}".format(params[0])]
else:
reply = ["[bot]err: 仅管理员可设置默认情景预设"]
return reply
@command(
"delhst",
"删除指定会话的所有历史记录",
"!delhst <会话名称>\n!delhst all",
[],
True
)
def cmd_delhst(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""删除指定会话的所有历史记录"""
reply = []
if len(params) == 0:
reply = ["[bot]err:请输入要删除的会话名: group_<群号> 或者 person_<QQ号>, 或使用 !delhst all 删除所有会话的历史记录"]
else:
if params[0] == "all":
pkg.utils.context.get_database_manager().delete_all_session_history()
reply = ["[bot]已删除所有会话的历史记录"]
else:
if pkg.utils.context.get_database_manager().delete_all_history(params[0]):
reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])]
else:
reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])]
return reply

216
pkg/qqbot/cmds/system.py Normal file
View File

@@ -0,0 +1,216 @@
from pkg.qqbot.cmds.model import command
import pkg.utils.context
import pkg.utils.updater
import pkg.utils.credit as credit
import config
import logging
import os
import threading
import traceback
import json
@command(
"help",
"获取帮助信息",
"!help",
[],
False
)
def cmd_help(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""获取帮助信息"""
return ["[bot]" + config.help_message]
@command(
"usage",
"获取使用情况",
"!usage",
[],
False
)
def cmd_usage(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""获取使用情况"""
reply = []
reply_str = "[bot]各api-key使用情况:\n\n"
api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key
for key_name in api_keys:
text_length = pkg.utils.context.get_openai_manager().audit_mgr \
.get_text_length_of_key(api_keys[key_name])
image_count = pkg.utils.context.get_openai_manager().audit_mgr \
.get_image_count_of_key(api_keys[key_name])
reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length),
int(image_count))
# 获取此key的额度
try:
http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None
credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy)
reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted'])
except Exception as e:
logging.warning("获取额度失败:{}".format(e))
reply = [reply_str]
return reply
@command(
"version",
"查看版本信息",
"!version",
[],
False
)
def cmd_version(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""查看版本信息"""
reply = []
reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info())
try:
if pkg.utils.updater.is_new_version_available():
reply_str += "\n有新版本可用,请使用命令 !update 进行更新"
except:
pass
reply = [reply_str]
return reply
@command(
"reload",
"执行热重载",
"!reload",
[],
True
)
def cmd_reload(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""执行热重载"""
import pkg.utils.reloader
def reload_task():
pkg.utils.reloader.reload_all()
threading.Thread(target=reload_task, daemon=True).start()
@command(
"update",
"更新程序",
"!update",
[],
True
)
def cmd_update(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""更新程序"""
reply = []
import pkg.utils.updater
import pkg.utils.reloader
import pkg.utils.context
def update_task():
try:
if pkg.utils.updater.update_all():
pkg.utils.reloader.reload_all(notify=False)
pkg.utils.context.get_qqbot_manager().notify_admin("更新完成")
else:
pkg.utils.context.get_qqbot_manager().notify_admin("无新版本")
except Exception as e0:
traceback.print_exc()
pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0))
return
threading.Thread(target=update_task, daemon=True).start()
reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."]
def config_operation(cmd, params):
reply = []
config = pkg.utils.context.get_config()
reply_str = ""
if len(params) == 0:
reply = ["[bot]err:请输入配置项"]
else:
cfg_name = params[0]
if cfg_name == 'all':
reply_str = "[bot]所有配置项:\n\n"
for cfg in dir(config):
if not cfg.startswith('__') and not cfg == 'logging':
# 根据配置项类型进行格式化如果是字典则转换为json并格式化
if isinstance(getattr(config, cfg), str):
reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg))
elif isinstance(getattr(config, cfg), dict):
# 不进行unicode转义并格式化
reply_str += "{}: {}\n".format(cfg,
json.dumps(getattr(config, cfg),
ensure_ascii=False, indent=4))
else:
reply_str += "{}: {}\n".format(cfg, getattr(config, cfg))
reply = [reply_str]
elif cfg_name in dir(config):
if len(params) == 1:
# 按照配置项类型进行格式化
if isinstance(getattr(config, cfg_name), str):
reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name))
elif isinstance(getattr(config, cfg_name), dict):
reply_str = "[bot]配置项{}: {}\n".format(cfg_name,
json.dumps(getattr(config, cfg_name),
ensure_ascii=False, indent=4))
else:
reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name))
reply = [reply_str]
else:
cfg_value = " ".join(params[1:])
# 类型转换如果是json则转换为字典
if cfg_value == 'true':
cfg_value = True
elif cfg_value == 'false':
cfg_value = False
elif cfg_value.isdigit():
cfg_value = int(cfg_value)
elif cfg_value.startswith('{') and cfg_value.endswith('}'):
cfg_value = json.loads(cfg_value)
else:
try:
cfg_value = float(cfg_value)
except ValueError:
pass
# 检查类型是否匹配
if isinstance(getattr(config, cfg_name), type(cfg_value)):
setattr(config, cfg_name, cfg_value)
pkg.utils.context.set_config(config)
reply = ["[bot]配置项{}修改成功".format(cfg_name)]
else:
reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)]
else:
reply = ["[bot]err:未找到配置项 {}".format(cfg_name)]
return reply
@command(
"cfg",
"配置文件相关操作",
"!cfg all\n!cfg <配置项名称>\n!cfg <配置项名称> <配置项新值>",
[],
True
)
def cmd_cfg(cmd: str, params: list, session_name: str,
text_message: str, launcher_type: str, launcher_id: int,
sender_id: int, is_admin: bool) -> list:
"""配置文件相关操作"""
reply = config_operation(cmd, params)
return reply

View File

@@ -13,151 +13,11 @@ import pkg.utils.updater
import pkg.utils.context import pkg.utils.context
import pkg.qqbot.message import pkg.qqbot.message
import pkg.utils.credit as credit import pkg.utils.credit as credit
import pkg.qqbot.cmds.model as cmdmodel
from mirai import Image from mirai import Image
def config_operation(cmd, params):
reply = []
config = pkg.utils.context.get_config()
reply_str = ""
if len(params) == 0:
reply = ["[bot]err:请输入配置项"]
else:
cfg_name = params[0]
if cfg_name == 'all':
reply_str = "[bot]所有配置项:\n\n"
for cfg in dir(config):
if not cfg.startswith('__') and not cfg == 'logging':
# 根据配置项类型进行格式化如果是字典则转换为json并格式化
if isinstance(getattr(config, cfg), str):
reply_str += "{}: \"{}\"\n".format(cfg, getattr(config, cfg))
elif isinstance(getattr(config, cfg), dict):
# 不进行unicode转义并格式化
reply_str += "{}: {}\n".format(cfg,
json.dumps(getattr(config, cfg),
ensure_ascii=False, indent=4))
else:
reply_str += "{}: {}\n".format(cfg, getattr(config, cfg))
reply = [reply_str]
elif cfg_name in dir(config):
if len(params) == 1:
# 按照配置项类型进行格式化
if isinstance(getattr(config, cfg_name), str):
reply_str = "[bot]配置项{}: \"{}\"\n".format(cfg_name, getattr(config, cfg_name))
elif isinstance(getattr(config, cfg_name), dict):
reply_str = "[bot]配置项{}: {}\n".format(cfg_name,
json.dumps(getattr(config, cfg_name),
ensure_ascii=False, indent=4))
else:
reply_str = "[bot]配置项{}: {}\n".format(cfg_name, getattr(config, cfg_name))
reply = [reply_str]
else:
cfg_value = " ".join(params[1:])
# 类型转换如果是json则转换为字典
if cfg_value == 'true':
cfg_value = True
elif cfg_value == 'false':
cfg_value = False
elif cfg_value.isdigit():
cfg_value = int(cfg_value)
elif cfg_value.startswith('{') and cfg_value.endswith('}'):
cfg_value = json.loads(cfg_value)
else:
try:
cfg_value = float(cfg_value)
except ValueError:
pass
# 检查类型是否匹配
if isinstance(getattr(config, cfg_name), type(cfg_value)):
setattr(config, cfg_name, cfg_value)
pkg.utils.context.set_config(config)
reply = ["[bot]配置项{}修改成功".format(cfg_name)]
else:
reply = ["[bot]err:配置项{}类型不匹配".format(cfg_name)]
else:
reply = ["[bot]err:未找到配置项 {}".format(cfg_name)]
return reply
def plugin_operation(cmd, params, is_admin):
reply = []
import pkg.plugin.host as plugin_host
import pkg.utils.updater as updater
plugin_list = plugin_host.__plugins__
if len(params) == 0:
reply_str = "[bot]所有插件({}):\n".format(len(plugin_host.__plugins__))
idx = 0
for key in plugin_host.iter_plugins_name():
plugin = plugin_list[key]
reply_str += "\n#{} {} {}\n{}\nv{}\n作者: {}\n"\
.format((idx+1), plugin['name'],
"[已禁用]" if not plugin['enabled'] else "",
plugin['description'],
plugin['version'], plugin['author'])
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
remote_url = updater.get_remote_url("/".join(plugin['path'].split('/')[:-1]))
if remote_url != "https://github.com/RockChinQ/QChatGPT" and remote_url != "https://gitee.com/RockChin/QChatGPT":
reply_str += "源码: "+remote_url+"\n"
idx += 1
reply = [reply_str]
elif params[0] == 'update':
# 更新所有插件
if is_admin:
def closure():
import pkg.utils.context
updated = []
for key in plugin_list:
plugin = plugin_list[key]
if updater.is_repo("/".join(plugin['path'].split('/')[:-1])):
success = updater.pull_latest("/".join(plugin['path'].split('/')[:-1]))
if success:
updated.append(plugin['name'])
# 检查是否有requirements.txt
pkg.utils.context.get_qqbot_manager().notify_admin("正在安装依赖...")
for key in plugin_list:
plugin = plugin_list[key]
if os.path.exists("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt"):
logging.info("{}检测到requirements.txt安装依赖".format(plugin['name']))
import pkg.utils.pkgmgr
pkg.utils.pkgmgr.install_requirements("/".join(plugin['path'].split('/')[:-1])+"/requirements.txt")
import main
main.reset_logging()
pkg.utils.context.get_qqbot_manager().notify_admin("已更新插件: {}".format(", ".join(updated)))
threading.Thread(target=closure).start()
reply = ["[bot]正在更新所有插件,请勿重复发起..."]
else:
reply = ["[bot]err:权限不足"]
elif params[0].startswith("http"):
if is_admin:
def closure():
try:
plugin_host.install_plugin(params[0])
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装成功,请发送 !reload 指令重载插件")
except Exception as e:
logging.error("插件安装失败:{}".format(e))
pkg.utils.context.get_qqbot_manager().notify_admin("插件安装失败:{}".format(e))
threading.Thread(target=closure, args=()).start()
reply = ["[bot]正在安装插件..."]
else:
reply = ["[bot]err:权限不足,请使用管理员账号私聊发起"]
return reply
def process_command(session_name: str, text_message: str, mgr, config, def process_command(session_name: str, text_message: str, mgr, config,
launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list: launcher_type: str, launcher_id: int, sender_id: int, is_admin: bool) -> list:
@@ -170,216 +30,30 @@ def process_command(session_name: str, text_message: str, mgr, config,
cmd = text_message[1:].strip().split(' ')[0] cmd = text_message[1:].strip().split(' ')[0]
params = text_message[1:].strip().split(' ')[1:] params = text_message[1:].strip().split(' ')[1:]
if cmd == 'help':
reply = ["[bot]" + config.help_message]
elif cmd == 'reset':
if len(params) == 0:
pkg.openai.session.get_session(session_name).reset(explicit=True)
reply = ["[bot]会话已重置"]
else:
pkg.openai.session.get_session(session_name).reset(explicit=True, use_prompt=params[0])
reply = ["[bot]会话已重置,使用场景预设:{}".format(params[0])]
elif cmd == 'last':
result = pkg.openai.session.get_session(session_name).last_session()
if result is None:
reply = ["[bot]没有前一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到前一次的对话:\n创建时间:{}\n".format(datetime_str)]
elif cmd == 'next':
result = pkg.openai.session.get_session(session_name).next_session()
if result is None:
reply = ["[bot]没有后一次的对话"]
else:
datetime_str = datetime.datetime.fromtimestamp(result.create_timestamp).strftime(
'%Y-%m-%d %H:%M:%S')
reply = ["[bot]已切换到后一次的对话:\n创建时间:{}\n".format(datetime_str)]
elif cmd == 'prompt':
msgs = ""
session:list = pkg.openai.session.get_session(session_name).prompt
for msg in session:
if len(params) != 0 and params[0] in ['-all', '-a']:
msgs = msgs + "{}: {}\n\n".format(msg['role'], msg['content'])
elif len(msg['content']) > 30:
msgs = msgs + "[{}]: {}...\n\n".format(msg['role'], msg['content'][:30])
else:
msgs = msgs + "[{}]: {}\n\n".format(msg['role'], msg['content'])
reply = ["[bot]当前对话所有内容:\n{}".format(msgs)]
elif cmd == 'list':
pkg.openai.session.get_session(session_name).persistence()
page = 0
if len(params) > 0: # 把!~开头的转换成!cfg
try: if cmd.startswith('~'):
page = int(params[0]) params = [cmd[1:]] + params
except ValueError: cmd = 'cfg'
pass
results = pkg.openai.session.get_session(session_name).list_history(page=page) # 选择指令处理函数
if len(results) == 0: cmd_obj = cmdmodel.search(cmd)
reply = ["[bot]第{}页没有历史会话".format(page)] if cmd_obj is not None and (cmd_obj['admin_only'] is False or is_admin):
else: cmd_func = cmd_obj['func']
reply_str = "[bot]历史会话 第{}页:\n".format(page) reply = cmd_func(
current = -1 cmd=cmd,
for i in range(len(results)): params=params,
# 时间(使用create_timestamp转换) 序号 部分内容 session_name=session_name,
datetime_obj = datetime.datetime.fromtimestamp(results[i]['create_timestamp']) text_message=text_message,
msg = "" launcher_type=launcher_type,
try: launcher_id=launcher_id,
msg = json.loads(results[i]['prompt']) sender_id=sender_id,
except json.decoder.JSONDecodeError: is_admin=is_admin,
msg = pkg.openai.session.reset_session_prompt(session_name, results[i]['prompt']) )
# 持久化
pkg.openai.session.get_session(session_name).persistence()
if len(msg) >= 2:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
msg[0]['content'])
else:
reply_str += "#{} 创建:{} {}\n".format(i + page * 10,
datetime_obj.strftime("%Y-%m-%d %H:%M:%S"),
"无内容")
if results[i]['create_timestamp'] == pkg.openai.session.get_session(
session_name).create_timestamp:
current = i + page * 10
reply_str += "\n以上信息倒序排列"
if current != -1:
reply_str += ",当前会话是 #{}\n".format(current)
else:
reply_str += ",当前处于全新会话或不在此页"
reply = [reply_str]
elif cmd == 'resend':
session = pkg.openai.session.get_session(session_name)
to_send = session.undo()
reply = pkg.qqbot.message.process_normal_message(to_send, mgr, config,
launcher_type, launcher_id, sender_id)
elif cmd == 'del': # 删除指定会话历史记录
if len(params) == 0:
reply = ["[bot]参数不足, 格式: !del <序号>\n可以通过!list查看序号"]
else:
if params[0] == 'all':
pkg.openai.session.get_session(session_name).delete_all_history()
reply = ["[bot]已删除所有历史会话"]
elif params[0].isdigit():
if pkg.openai.session.get_session(session_name).delete_history(int(params[0])):
reply = ["[bot]已删除历史会话 #{}".format(params[0])]
else:
reply = ["[bot]没有历史会话 #{}".format(params[0])]
else:
reply = ["[bot]参数错误, 格式: !del <序号>\n可以通过!list查看序号"]
elif cmd == 'usage':
reply_str = "[bot]各api-key使用情况:\n\n"
api_keys = pkg.utils.context.get_openai_manager().key_mgr.api_key
for key_name in api_keys:
text_length = pkg.utils.context.get_openai_manager().audit_mgr \
.get_text_length_of_key(api_keys[key_name])
image_count = pkg.utils.context.get_openai_manager().audit_mgr \
.get_image_count_of_key(api_keys[key_name])
reply_str += "{}:\n - 文本长度:{}\n - 图片数量:{}\n".format(key_name, int(text_length),
int(image_count))
# 获取此key的额度
try:
http_proxy = config.openai_config["http_proxy"] if "http_proxy" in config.openai_config else None
credit_data = credit.fetch_credit_data(api_keys[key_name], http_proxy)
reply_str += " - 使用额度:{:.2f}/{:.2f}\n".format(credit_data['total_used'],credit_data['total_granted'])
except Exception as e:
logging.warning("获取额度失败:{}".format(e))
reply = [reply_str]
elif cmd == 'draw':
if len(params) == 0:
reply = ["[bot]err:请输入图片描述文字"]
else:
session = pkg.openai.session.get_session(session_name)
res = session.draw_image(" ".join(params))
logging.debug("draw_image result:{}".format(res))
reply = [Image(url=res['data'][0]['url'])]
if not (hasattr(config, 'include_image_description')
and not config.include_image_description):
reply.append(" ".join(params))
elif cmd == 'version':
reply_str = "[bot]当前版本:\n{}\n".format(pkg.utils.updater.get_current_version_info())
try:
if pkg.utils.updater.is_new_version_available():
reply_str += "\n有新版本可用,请使用命令 !update 进行更新"
except:
pass
reply = [reply_str]
elif cmd == 'plugin':
reply = plugin_operation(cmd, params, is_admin)
elif cmd == 'default':
if len(params) == 0:
# 输出目前所有情景预设
import pkg.openai.dprompt as dprompt
reply_str = "[bot]当前所有情景预设:\n\n"
for key,value in dprompt.get_prompt_dict().items():
reply_str += " - {}: {}\n".format(key,value)
reply_str += "\n当前默认情景预设:{}\n".format(dprompt.get_current())
reply_str += "请使用!default <情景预设>来设置默认情景预设"
reply = [reply_str]
elif len(params) >0 and is_admin:
# 设置默认情景
import pkg.openai.dprompt as dprompt
try:
dprompt.set_current(params[0])
reply = ["[bot]已设置默认情景预设为:{}".format(dprompt.get_current())]
except KeyError:
reply = ["[bot]err: 未找到情景预设:{}".format(params[0])]
else:
reply = ["[bot]err: 仅管理员可设置默认情景预设"]
elif cmd == "delhst" and is_admin:
if len(params) == 0:
reply = ["[bot]err:请输入要删除的会话名: group_<群号> 或者 person_<QQ号>, 或使用 !delhst all 删除所有会话的历史记录"]
else:
if params[0] == "all":
pkg.utils.context.get_database_manager().delete_all_session_history()
reply = ["[bot]已删除所有会话的历史记录"]
else:
if pkg.utils.context.get_database_manager().delete_all_history(params[0]):
reply = ["[bot]已删除会话 {} 的所有历史记录".format(params[0])]
else:
reply = ["[bot]未找到会话 {} 的历史记录".format(params[0])]
elif cmd == 'reload' and is_admin:
def reload_task():
pkg.utils.reloader.reload_all()
threading.Thread(target=reload_task, daemon=True).start()
elif cmd == 'update' and is_admin:
def update_task():
try:
if pkg.utils.updater.update_all():
pkg.utils.reloader.reload_all(notify=False)
pkg.utils.context.get_qqbot_manager().notify_admin("更新完成")
else:
pkg.utils.context.get_qqbot_manager().notify_admin("无新版本")
except Exception as e0:
traceback.print_exc()
pkg.utils.context.get_qqbot_manager().notify_admin("更新失败:{}".format(e0))
return
threading.Thread(target=update_task, daemon=True).start()
reply = ["[bot]正在更新,请耐心等待,请勿重复发起更新..."]
elif cmd == 'cfg' and is_admin:
reply = config_operation(cmd, params)
else: else:
if cmd.startswith("~") and is_admin: reply = ["[bot]err:未知的指令或权限不足: " + cmd]
config_item = cmd[1:]
params = [config_item] + params return reply
reply = config_operation("cfg", params)
else:
reply = ["[bot]err:未知的指令或权限不足: " + cmd]
except Exception as e: except Exception as e:
mgr.notify_admin("{}指令执行失败:{}".format(session_name, e)) mgr.notify_admin("{}指令执行失败:{}".format(session_name, e))
logging.exception(e) logging.exception(e)

View File

@@ -3,10 +3,13 @@ import os
import requests import requests
import pkg.utils.network as network
def read_latest() -> str: def read_latest() -> str:
resp = requests.get( resp = requests.get(
url="https://api.github.com/repos/RockChinQ/QChatGPT/contents/res/announcement", url="https://api.github.com/repos/RockChinQ/QChatGPT/contents/res/announcement",
proxies=network.wrapper_proxies()
) )
obj_json = resp.json() obj_json = resp.json()
b64_content = obj_json["content"] b64_content = obj_json["content"]

File diff suppressed because one or more lines are too long

9
pkg/utils/network.py Normal file
View File

@@ -0,0 +1,9 @@
def wrapper_proxies() -> dict:
"""获取代理"""
import config
return {
"http": config.openai_config['proxy'],
"https": config.openai_config['proxy']
} if 'proxy' in config.openai_config and (config.openai_config['proxy'] is not None) else None

View File

@@ -6,6 +6,7 @@ import requests
import json import json
import pkg.utils.constants import pkg.utils.constants
import pkg.utils.network as network
def check_dulwich_closure(): def check_dulwich_closure():
@@ -36,7 +37,8 @@ def pull_latest(repo_path: str) -> bool:
def get_release_list() -> list: def get_release_list() -> list:
"""获取发行列表""" """获取发行列表"""
rls_list_resp = requests.get( rls_list_resp = requests.get(
url="https://api.github.com/repos/RockChinQ/QChatGPT/releases" url="https://api.github.com/repos/RockChinQ/QChatGPT/releases",
proxies=network.wrapper_proxies()
) )
rls_list = rls_list_resp.json() rls_list = rls_list_resp.json()
@@ -83,7 +85,10 @@ def update_all(cli: bool = False) -> bool:
else: else:
print("开始下载最新版本: {}".format(latest_rls['zipball_url'])) print("开始下载最新版本: {}".format(latest_rls['zipball_url']))
zip_url = latest_rls['zipball_url'] zip_url = latest_rls['zipball_url']
zip_resp = requests.get(url=zip_url) zip_resp = requests.get(
url=zip_url,
proxies=network.wrapper_proxies()
)
zip_data = zip_resp.content zip_data = zip_resp.content
# 检查temp/updater目录 # 检查temp/updater目录