From 7c3557e94355fc1dfd02f35d2d332e82764e3455 Mon Sep 17 00:00:00 2001 From: RockChinQ <1010553892@qq.com> Date: Fri, 11 Oct 2024 22:27:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8C=81=E4=B9=85=E5=8C=96=E5=92=8C=20?= =?UTF-8?q?web=20=E6=8E=A5=E5=8F=A3=E5=9F=BA=E7=A1=80=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 7 +- main.py | 13 +-- pkg/api/__init__.py | 0 pkg/api/http/__init__.py | 0 pkg/api/http/controller/__init__.py | 0 pkg/api/http/controller/group.py | 91 +++++++++++++++++++++ pkg/api/http/controller/groups/__init__.py | 0 pkg/api/http/controller/groups/log.py | 21 +++++ pkg/api/http/controller/main.py | 48 +++++++++++ pkg/api/http/service/__init__.py | 0 pkg/core/app.py | 30 +++++-- pkg/core/boot.py | 9 +- pkg/core/bootutils/deps.py | 3 + pkg/core/bootutils/log.py | 5 +- pkg/core/migrations/m013_http_api_config.py | 30 +++++++ pkg/core/stages/build_app.py | 14 ++++ pkg/core/stages/migrate.py | 2 +- pkg/core/stages/setup_logger.py | 36 +++++++- pkg/persistence/__init__.py | 0 pkg/persistence/database.py | 40 +++++++++ pkg/persistence/databases/__init__.py | 0 pkg/persistence/databases/sqlite.py | 13 +++ pkg/persistence/mgr.py | 55 +++++++++++++ pkg/utils/logcache.py | 49 +++++++++++ requirements.txt | 5 +- templates/system.json | 13 ++- 26 files changed, 462 insertions(+), 22 deletions(-) create mode 100644 pkg/api/__init__.py create mode 100644 pkg/api/http/__init__.py create mode 100644 pkg/api/http/controller/__init__.py create mode 100644 pkg/api/http/controller/group.py create mode 100644 pkg/api/http/controller/groups/__init__.py create mode 100644 pkg/api/http/controller/groups/log.py create mode 100644 pkg/api/http/controller/main.py create mode 100644 pkg/api/http/service/__init__.py create mode 100644 pkg/core/migrations/m013_http_api_config.py create mode 100644 pkg/persistence/__init__.py create mode 100644 pkg/persistence/database.py create mode 100644 pkg/persistence/databases/__init__.py create mode 100644 pkg/persistence/databases/sqlite.py create mode 100644 pkg/persistence/mgr.py create mode 100644 pkg/utils/logcache.py diff --git a/.gitignore b/.gitignore index 56282ca8..65ef5e47 100644 --- a/.gitignore +++ b/.gitignore @@ -4,8 +4,8 @@ __pycache__/ database.db qchatgpt.log /banlist.py -plugins/ -!plugins/__init__.py +/plugins/ +!/plugins/__init__.py /revcfg.py prompts/ logs/ @@ -34,4 +34,5 @@ bard.json res/instance_id.json .DS_Store /data -botpy.log* \ No newline at end of file +botpy.log* +/poc \ No newline at end of file diff --git a/main.py b/main.py index 0ad87bd2..273c8579 100644 --- a/main.py +++ b/main.py @@ -13,7 +13,10 @@ asciiart = r""" """ -async def main_entry(): +import asyncio + + +async def main_entry(loop: asyncio.AbstractEventLoop): print(asciiart) import sys @@ -46,7 +49,7 @@ async def main_entry(): sys.exit(0) from pkg.core import boot - await boot.main() + await boot.main(loop) if __name__ == '__main__': @@ -65,8 +68,8 @@ if __name__ == '__main__': if invalid_pwd: print("请在QChatGPT项目根目录下以命令形式运行此程序。") input("按任意键退出...") - exit(0) + exit(1) - import asyncio + loop = asyncio.new_event_loop() - asyncio.run(main_entry()) + loop.run_until_complete(main_entry(loop)) diff --git a/pkg/api/__init__.py b/pkg/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/api/http/__init__.py b/pkg/api/http/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/api/http/controller/__init__.py b/pkg/api/http/controller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/api/http/controller/group.py b/pkg/api/http/controller/group.py new file mode 100644 index 00000000..b9533567 --- /dev/null +++ b/pkg/api/http/controller/group.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import abc +import typing +import quart +from quart.typing import RouteCallable + +from ....core import app + + +preregistered_groups: list[type[RouterGroup]] = [] +"""RouterGroup 的预注册列表""" + +def group_class(name: str, path: str) -> None: + """注册一个 RouterGroup""" + + def decorator(cls: typing.Type[RouterGroup]) -> typing.Type[RouterGroup]: + cls.name = name + cls.path = path + preregistered_groups.append(cls) + return cls + + return decorator + + +class RouterGroup(abc.ABC): + + name: str + + path: str + + ap: app.Application + + quart_app: quart.Quart + + def __init__(self, ap: app.Application, quart_app: quart.Quart) -> None: + self.ap = ap + self.quart_app = quart_app + + @abc.abstractmethod + async def initialize(self) -> None: + pass + + def route(self, rule: str, **options: typing.Any) -> typing.Callable[[RouteCallable], RouteCallable]: # decorator + """注册一个路由""" + def decorator(f: RouteCallable) -> RouteCallable: + nonlocal rule + rule = self.path + rule + + async def handler_error(*args, **kwargs): + try: + return await f(*args, **kwargs) + except Exception as e: # 自动 500 + return self.http_status(500, -2, str(e)) + + new_f = handler_error + new_f.__name__ = f.__name__ + new_f.__doc__ = f.__doc__ + + self.quart_app.route(rule, **options)(new_f) + return f + + return decorator + + def _cors(self, response: quart.Response) -> quart.Response: + # Quart-Cors 似乎很久没维护了,所以自己写 + response.headers['Access-Control-Allow-Origin'] = '*' + response.headers['Access-Control-Allow-Headers'] = '*' + response.headers['Access-Control-Allow-Methods'] = '*' + response.headers['Access-Control-Allow-Credentials'] = 'true' + return response + + def success(self, data: typing.Any = None) -> quart.Response: + """返回一个 200 响应""" + return self._cors(quart.jsonify({ + 'code': 0, + 'msg': 'ok', + 'data': data, + })) + + def fail(self, code: int, msg: str) -> quart.Response: + """返回一个异常响应""" + + return self._cors(quart.jsonify({ + 'code': code, + 'msg': msg, + })) + + def http_status(self, status: int, code: int, msg: str) -> quart.Response: + """返回一个指定状态码的响应""" + return self.fail(code, msg), status diff --git a/pkg/api/http/controller/groups/__init__.py b/pkg/api/http/controller/groups/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/api/http/controller/groups/log.py b/pkg/api/http/controller/groups/log.py new file mode 100644 index 00000000..fe6de631 --- /dev/null +++ b/pkg/api/http/controller/groups/log.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import traceback + +import quart + +from .....core import app +from .. import group + + +@group.group_class('log', '/api/v1/log') +class LogRouterGroup(group.RouterGroup): + + async def initialize(self) -> None: + @self.route('', methods=['GET']) + async def _() -> str: + return self.success( + data={ + "logs": self.ap.log_cache.get_all_logs() + } + ) diff --git a/pkg/api/http/controller/main.py b/pkg/api/http/controller/main.py new file mode 100644 index 00000000..83e86696 --- /dev/null +++ b/pkg/api/http/controller/main.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import asyncio + +import quart + +from ....core import app +from .groups import log +from . import group + + +class HTTPController: + + ap: app.Application + + quart_app: quart.Quart + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + self.quart_app = quart.Quart(__name__) + + async def initialize(self) -> None: + await self.register_routes() + + async def run(self) -> None: + if self.ap.system_cfg.data['http-api']['enable']: + async def shutdown_trigger_placeholder(): + while True: + await asyncio.sleep(1) + + asyncio.create_task(self.quart_app.run_task( + host=self.ap.system_cfg.data['http-api']['host'], + port=self.ap.system_cfg.data['http-api']['port'], + shutdown_trigger=shutdown_trigger_placeholder + )) + + async def register_routes(self) -> None: + + @self.quart_app.route('/healthz') + async def healthz(): + return { + "code": 0, + "msg": "ok" + } + + for g in group.preregistered_groups: + ginst = g(self.ap, self.quart_app) + await ginst.initialize() diff --git a/pkg/api/http/service/__init__.py b/pkg/api/http/service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/core/app.py b/pkg/core/app.py index e6e25ea0..2f0e0340 100644 --- a/pkg/core/app.py +++ b/pkg/core/app.py @@ -17,11 +17,18 @@ from ..plugin import manager as plugin_mgr from ..pipeline import pool from ..pipeline import controller, stagemgr from ..utils import version as version_mgr, proxy as proxy_mgr, announce as announce_mgr +from ..persistence import mgr as persistencemgr +from ..api.http.controller import main as http_controller +from ..utils import logcache class Application: """运行时应用对象和上下文""" + event_loop: asyncio.AbstractEventLoop = None + + asyncio_tasks: list[asyncio.Task] = [] + platform_mgr: im_mgr.PlatformManager = None cmd_mgr: cmdmgr.CommandManager = None @@ -78,6 +85,12 @@ class Application: logger: logging.Logger = None + persistence_mgr: persistencemgr.PersistenceManager = None + + http_ctrl: http_controller.HTTPController = None + + log_cache: logcache.LogCache = None + def __init__(self): pass @@ -91,13 +104,21 @@ class Application: try: + # 后续可能会允许动态重启其他任务 + # 故为了防止程序在非 Ctrl-C 情况下退出,这里创建一个不会结束的协程 + async def never_ending(): + while True: + await asyncio.sleep(1) + tasks = [ - asyncio.create_task(self.platform_mgr.run()), - asyncio.create_task(self.ctrl.run()) + asyncio.create_task(self.platform_mgr.run()), # 消息平台 + asyncio.create_task(self.ctrl.run()), # 消息处理循环 + asyncio.create_task(self.http_ctrl.run()), # http 接口服务 + asyncio.create_task(never_ending()) ] + self.asyncio_tasks.extend(tasks) - # 挂信号处理 - + # 挂系统信号处理 import signal def signal_handler(sig, frame): @@ -109,7 +130,6 @@ class Application: signal.signal(signal.SIGINT, signal_handler) await asyncio.gather(*tasks, return_exceptions=True) - except asyncio.CancelledError: pass except Exception as e: diff --git a/pkg/core/boot.py b/pkg/core/boot.py index 5663c2ef..1e5f1052 100644 --- a/pkg/core/boot.py +++ b/pkg/core/boot.py @@ -1,6 +1,7 @@ from __future__ import print_function import traceback +import asyncio from . import app from ..audit import identifier @@ -19,13 +20,15 @@ stage_order = [ ] -async def make_app() -> app.Application: +async def make_app(loop: asyncio.AbstractEventLoop) -> app.Application: # 生成标识符 identifier.init() ap = app.Application() + ap.event_loop = loop + # 执行启动阶段 for stage_name in stage_order: stage_cls = stage.preregistered_stages[stage_name] @@ -38,9 +41,9 @@ async def make_app() -> app.Application: return ap -async def main(): +async def main(loop: asyncio.AbstractEventLoop): try: - app_inst = await make_app() + app_inst = await make_app(loop) await app_inst.run() except Exception as e: traceback.print_exc() diff --git a/pkg/core/bootutils/deps.py b/pkg/core/bootutils/deps.py index c392f800..8c0127e5 100644 --- a/pkg/core/bootutils/deps.py +++ b/pkg/core/bootutils/deps.py @@ -15,6 +15,9 @@ required_deps = { "psutil": "psutil", "async_lru": "async-lru", "ollama": "ollama", + "quart": "quart", + "sqlalchemy": "sqlalchemy[asyncio]", + "aiosqlite": "aiosqlite", } diff --git a/pkg/core/bootutils/log.py b/pkg/core/bootutils/log.py index d7b6da0a..ad8252df 100644 --- a/pkg/core/bootutils/log.py +++ b/pkg/core/bootutils/log.py @@ -15,7 +15,7 @@ log_colors_config = { } -async def init_logging() -> logging.Logger: +async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging.Logger: # 删除所有现有的logger for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) @@ -41,7 +41,8 @@ async def init_logging() -> logging.Logger: stream_handler = logging.StreamHandler(sys.stdout) - log_handlers: logging.Handler = [stream_handler, logging.FileHandler(log_file_name)] + log_handlers: list[logging.Handler] = [stream_handler, logging.FileHandler(log_file_name)] + log_handlers += extra_handlers if extra_handlers is not None else [] for handler in log_handlers: handler.setLevel(level) diff --git a/pkg/core/migrations/m013_http_api_config.py b/pkg/core/migrations/m013_http_api_config.py new file mode 100644 index 00000000..8d7c453d --- /dev/null +++ b/pkg/core/migrations/m013_http_api_config.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class("http-api-config", 13) +class HttpApiConfigMigration(migration.Migration): + """迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'http-api' not in self.ap.system_cfg.data or "persistence" not in self.ap.system_cfg.data + + async def run(self): + """执行迁移""" + + self.ap.system_cfg.data['http-api'] = { + "enable": True, + "host": "0.0.0.0", + "port": 5300 + } + + self.ap.system_cfg.data['persistence'] = { + "sqlite": { + "path": "data/persistence.db" + }, + "use": "sqlite" + } + + await self.ap.system_cfg.dump_config() diff --git a/pkg/core/stages/build_app.py b/pkg/core/stages/build_app.py index c0e731ce..8c63f7dc 100644 --- a/pkg/core/stages/build_app.py +++ b/pkg/core/stages/build_app.py @@ -15,6 +15,10 @@ from ...provider.sysprompt import sysprompt as llm_prompt_mgr from ...provider.tools import toolmgr as llm_tool_mgr from ...provider import runnermgr from ...platform import manager as im_mgr +from ...persistence import mgr as persistencemgr +from ...api.http.controller import main as http_controller +from ...utils import logcache + @stage.stage_class("BuildAppStage") class BuildAppStage(stage.BootingStage): @@ -58,6 +62,13 @@ class BuildAppStage(stage.BootingStage): ap.query_pool = pool.QueryPool() + log_cache = logcache.LogCache() + ap.log_cache = log_cache + + persistence_mgr_inst = persistencemgr.PersistenceManager(ap) + await persistence_mgr_inst.initialize() + ap.persistence_mgr = persistence_mgr_inst + plugin_mgr_inst = plugin_mgr.PluginManager(ap) await plugin_mgr_inst.initialize() ap.plugin_mgr = plugin_mgr_inst @@ -95,6 +106,9 @@ class BuildAppStage(stage.BootingStage): await stage_mgr.initialize() ap.stage_mgr = stage_mgr + http_ctrl = http_controller.HTTPController(ap) + await http_ctrl.initialize() + ap.http_ctrl = http_ctrl ctrl = controller.Controller(ap) ap.ctrl = ctrl diff --git a/pkg/core/stages/migrate.py b/pkg/core/stages/migrate.py index 92735c90..a4e57e78 100644 --- a/pkg/core/stages/migrate.py +++ b/pkg/core/stages/migrate.py @@ -6,7 +6,7 @@ from .. import stage, app from .. import migration from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg -from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config +from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config @stage.stage_class("MigrationStage") diff --git a/pkg/core/stages/setup_logger.py b/pkg/core/stages/setup_logger.py index 02446b85..8f385d1f 100644 --- a/pkg/core/stages/setup_logger.py +++ b/pkg/core/stages/setup_logger.py @@ -1,9 +1,38 @@ from __future__ import annotations +import logging +import asyncio +from datetime import datetime + from .. import stage, app from ..bootutils import log +class PersistenceHandler(logging.Handler, object): + """ + 保存日志到数据库 + """ + ap: app.Application + + def __init__(self, name, ap: app.Application): + logging.Handler.__init__(self) + self.ap = ap + + def emit(self, record): + """ + emit函数为自定义handler类时必重写的函数,这里可以根据需要对日志消息做一些处理,比如发送日志到服务器 + + 发出记录(Emit a record) + """ + try: + msg = self.format(record) + if self.ap.log_cache is not None: + self.ap.log_cache.add_log(msg) + + except Exception: + self.handleError(record) + + @stage.stage_class("SetupLoggerStage") class SetupLoggerStage(stage.BootingStage): """设置日志器阶段 @@ -12,4 +41,9 @@ class SetupLoggerStage(stage.BootingStage): async def run(self, ap: app.Application): """启动 """ - ap.logger = await log.init_logging() + persistence_handler = PersistenceHandler('LoggerHandler', ap) + + extra_handlers = [] + extra_handlers = [persistence_handler] + + ap.logger = await log.init_logging(extra_handlers) diff --git a/pkg/persistence/__init__.py b/pkg/persistence/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/persistence/database.py b/pkg/persistence/database.py new file mode 100644 index 00000000..0dd82817 --- /dev/null +++ b/pkg/persistence/database.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import abc + +import sqlalchemy.ext.asyncio as sqlalchemy_asyncio + +from ..core import app + + +preregistered_managers: list[type[BaseDatabaseManager]] = [] + +def manager_class(name: str) -> None: + """注册一个数据库管理类""" + + def decorator(cls: type[BaseDatabaseManager]) -> type[BaseDatabaseManager]: + cls.name = name + preregistered_managers.append(cls) + return cls + + return decorator + + +class BaseDatabaseManager(abc.ABC): + """基础数据库管理类""" + + name: str + + ap: app.Application + + engine: sqlalchemy_asyncio.AsyncEngine + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + + @abc.abstractmethod + async def initialize(self) -> None: + pass + + def get_engine(self) -> sqlalchemy_asyncio.AsyncEngine: + return self.engine diff --git a/pkg/persistence/databases/__init__.py b/pkg/persistence/databases/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/persistence/databases/sqlite.py b/pkg/persistence/databases/sqlite.py new file mode 100644 index 00000000..14f89092 --- /dev/null +++ b/pkg/persistence/databases/sqlite.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +import sqlalchemy.ext.asyncio as sqlalchemy_asyncio + +from .. import database + + +@database.manager_class("sqlite") +class SQLiteDatabaseManager(database.BaseDatabaseManager): + """SQLite 数据库管理类""" + + async def initialize(self) -> None: + self.engine = sqlalchemy_asyncio.create_async_engine(f"sqlite+aiosqlite:///{self.ap.system_cfg.data['persistence']['sqlite']['path']}") diff --git a/pkg/persistence/mgr.py b/pkg/persistence/mgr.py new file mode 100644 index 00000000..d0c1fa26 --- /dev/null +++ b/pkg/persistence/mgr.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +import asyncio +import datetime + +import sqlalchemy.ext.asyncio as sqlalchemy_asyncio +import sqlalchemy + +from . import database +from ..core import app +from .databases import sqlite + + +class PersistenceManager: + """持久化模块管理器""" + + ap: app.Application + + db: database.BaseDatabaseManager + """数据库管理器""" + + meta: sqlalchemy.MetaData + + def __init__(self, ap: app.Application): + self.ap = ap + self.meta = sqlalchemy.MetaData() + + async def initialize(self): + + for manager in database.preregistered_managers: + self.db = manager(self.ap) + await self.db.initialize() + + await self.create_tables() + + async def create_tables(self): + # TODO: 对扩展友好 + + # 日志 + async with self.get_db_engine().connect() as conn: + await conn.run_sync(self.meta.create_all) + + await conn.commit() + + async def execute_async( + self, + *args, + **kwargs + ): + async with self.get_db_engine().connect() as conn: + await conn.execute(*args, **kwargs) + await conn.commit() + + def get_db_engine(self) -> sqlalchemy_asyncio.AsyncEngine: + return self.db.get_engine() diff --git a/pkg/utils/logcache.py b/pkg/utils/logcache.py new file mode 100644 index 00000000..0fe419f1 --- /dev/null +++ b/pkg/utils/logcache.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import pydantic + + +LOG_PAGE_SIZE = 20 +MAX_CACHED_PAGES = 10 + + +class LogPage(pydantic.BaseModel): + """日志页""" + + cached_count: int = 0 + + logs: str = "" + + def add_log(self, log: str) -> bool: + """添加日志 + + Returns: + bool: 是否已满 + """ + self.logs += log + self.cached_count += 1 + return self.cached_count >= LOG_PAGE_SIZE + + +class LogCache: + """由于 logger 是同步的,但实例中的数据库操作是异步的; + 同时,持久化的日志信息已经写入文件了,故做一个缓存来为前端提供日志查询服务""" + + log_pages: list[LogPage] = [] + """从前到后,越新的日志页越靠后""" + + def __init__(self): + self.log_pages = [] + self.log_pages.append(LogPage()) + + def add_log(self, log: str): + """添加日志""" + if self.log_pages[-1].add_log(log): + self.log_pages.append(LogPage()) + + if len(self.log_pages) > MAX_CACHED_PAGES: + self.log_pages.pop(0) + + def get_all_logs(self) -> str: + """获取所有日志""" + return "".join([page.logs for page in self.log_pages]) diff --git a/requirements.txt b/requirements.txt index 7bf257ed..f001e6af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,7 @@ websockets urllib3 psutil async-lru -ollama \ No newline at end of file +ollama +quart +sqlalchemy[asyncio] +aiosqlite \ No newline at end of file diff --git a/templates/system.json b/templates/system.json index 0f6669fe..94a8ad16 100644 --- a/templates/system.json +++ b/templates/system.json @@ -11,5 +11,16 @@ }, "pipeline-concurrency": 20, "qcg-center-url": "https://api.qchatgpt.rockchin.top/api/v2", - "help-message": "QChatGPT - 😎高稳定性、🧩支持插件、🌏实时联网的 ChatGPT QQ 机器人🤖\n链接:https://q.rkcn.top" + "help-message": "QChatGPT - 😎高稳定性、🧩支持插件、🌏实时联网的 ChatGPT QQ 机器人🤖\n链接:https://q.rkcn.top", + "http-api": { + "enable": true, + "host": "0.0.0.0", + "port": 5300 + }, + "persistence": { + "sqlite": { + "path": "data/persistence.db" + }, + "use": "sqlite" + } } \ No newline at end of file