From c151665419f65bd23c203b52a42c65ad041796dc Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 22 Oct 2024 18:09:18 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/api/http/controller/main.py | 10 ++++- pkg/audit/center/apigroup.py | 7 ++-- pkg/core/app.py | 40 ++++++++---------- pkg/core/boot.py | 10 +++++ pkg/core/bootutils/log.py | 4 +- pkg/core/bootutils/misc.py | 0 pkg/core/stages/build_app.py | 2 + pkg/core/taskmgr.py | 73 +++++++++++++++++++++++++++++++++ pkg/pipeline/controller.py | 5 ++- pkg/platform/manager.py | 6 +-- 10 files changed, 121 insertions(+), 36 deletions(-) delete mode 100644 pkg/core/bootutils/misc.py create mode 100644 pkg/core/taskmgr.py diff --git a/pkg/api/http/controller/main.py b/pkg/api/http/controller/main.py index 99eb28e2..1e48b094 100644 --- a/pkg/api/http/controller/main.py +++ b/pkg/api/http/controller/main.py @@ -30,12 +30,18 @@ class HTTPController: while True: await asyncio.sleep(1) - task = asyncio.create_task(self.quart_app.run_task( + # task = asyncio.create_task(self.quart_app.run_task( + # host=self.ap.system_cfg.data['http-api']['host'], + # port=self.ap.system_cfg.data['http-api']['port'], + # shutdown_trigger=shutdown_trigger_placeholder + # )) + # self.ap.asyncio_tasks.append(task) + self.ap.task_mgr.create_task(self.quart_app.run_task( host=self.ap.system_cfg.data['http-api']['host'], port=self.ap.system_cfg.data['http-api']['port'], shutdown_trigger=shutdown_trigger_placeholder )) - self.ap.asyncio_tasks.append(task) + async def register_routes(self) -> None: diff --git a/pkg/audit/center/apigroup.py b/pkg/audit/center/apigroup.py index 6ae8be5d..fe7ab4f6 100644 --- a/pkg/audit/center/apigroup.py +++ b/pkg/audit/center/apigroup.py @@ -69,11 +69,12 @@ class APIGroup(metaclass=abc.ABCMeta): **kwargs ) -> asyncio.Task: """执行请求""" - task = asyncio.create_task(self._do(method, path, data, params, headers, **kwargs)) + # task = asyncio.create_task(self._do(method, path, data, params, headers, **kwargs)) - self.ap.asyncio_tasks.append(task) + # self.ap.asyncio_tasks.append(task) - return task + + return self.ap.task_mgr.create_task(self._do(method, path, data, params, headers, **kwargs)).task def gen_rid( self diff --git a/pkg/core/app.py b/pkg/core/app.py index c67cb082..c7364f30 100644 --- a/pkg/core/app.py +++ b/pkg/core/app.py @@ -2,6 +2,7 @@ from __future__ import annotations import logging import asyncio +import threading import traceback from ..platform import manager as im_mgr @@ -21,6 +22,7 @@ from ..utils import version as version_mgr, proxy as proxy_mgr, announce as anno from ..persistence import mgr as persistencemgr from ..api.http.controller import main as http_controller from ..utils import logcache +from . import taskmgr class Application: @@ -28,7 +30,8 @@ class Application: event_loop: asyncio.AbstractEventLoop = None - asyncio_tasks: list[asyncio.Task] = [] + # asyncio_tasks: list[asyncio.Task] = [] + task_mgr: taskmgr.AsyncTaskManager = None platform_mgr: im_mgr.PlatformManager = None @@ -103,8 +106,6 @@ class Application: async def run(self): await self.plugin_mgr.initialize_plugins() - tasks = [] - try: # 后续可能会允许动态重启其他任务 @@ -113,28 +114,19 @@ class Application: while True: await asyncio.sleep(1) - tasks = [ - asyncio.create_task(self.platform_mgr.run()), # 消息平台 - asyncio.create_task(self.ctrl.run()), # 消息处理循环 - asyncio.create_task(self.http_ctrl.run()), # http 接口服务 - asyncio.create_task(never_ending()) - ] - self.asyncio_tasks.extend(tasks) + # tasks = [ + # asyncio.create_task(self.platform_mgr.run()), # 消息平台 + # asyncio.create_task(self.ctrl.run()), # 消息处理循环 + # asyncio.create_task(self.http_ctrl.run()), # http 接口服务 + # asyncio.create_task(never_ending()) + # ] + # self.asyncio_tasks.extend(tasks) + self.task_mgr.create_task(self.platform_mgr.run()) + self.task_mgr.create_task(self.ctrl.run()) + self.task_mgr.create_task(self.http_ctrl.run()) + self.task_mgr.create_task(never_ending()) - # 挂系统信号处理 - import signal - - def signal_handler(sig, frame): - for task in self.asyncio_tasks: - task.cancel() - self.logger.info("程序退出.") - # 结束当前事件循环 - self.event_loop.stop() - exit(0) - - signal.signal(signal.SIGINT, signal_handler) - - await asyncio.gather(*tasks, return_exceptions=True) + await self.task_mgr.wait_all() except asyncio.CancelledError: pass except Exception as e: diff --git a/pkg/core/boot.py b/pkg/core/boot.py index 0f0ab3ae..dff772d9 100644 --- a/pkg/core/boot.py +++ b/pkg/core/boot.py @@ -49,6 +49,16 @@ async def make_app(loop: asyncio.AbstractEventLoop) -> app.Application: async def main(loop: asyncio.AbstractEventLoop): try: + + # 挂系统信号处理 + import signal + + def signal_handler(sig, frame): + print("[Signal] 程序退出.") + os._exit(0) + + signal.signal(signal.SIGINT, signal_handler) + app_inst = await make_app(loop) await app_inst.run() except Exception as e: diff --git a/pkg/core/bootutils/log.py b/pkg/core/bootutils/log.py index 36bc51a0..dea961b5 100644 --- a/pkg/core/bootutils/log.py +++ b/pkg/core/bootutils/log.py @@ -27,8 +27,8 @@ async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging. if constants.debug_mode: level = logging.DEBUG - log_file_name = "data/logs/qcg-%s.log" % time.strftime( - "%Y-%m-%d-%H-%M-%S", time.localtime() + log_file_name = "data/logs/langbot-%s.log" % time.strftime( + "%Y-%m-%d", time.localtime() ) qcg_logger = logging.getLogger("qcg") diff --git a/pkg/core/bootutils/misc.py b/pkg/core/bootutils/misc.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pkg/core/stages/build_app.py b/pkg/core/stages/build_app.py index 8c63f7dc..bc169b17 100644 --- a/pkg/core/stages/build_app.py +++ b/pkg/core/stages/build_app.py @@ -18,6 +18,7 @@ from ...platform import manager as im_mgr from ...persistence import mgr as persistencemgr from ...api.http.controller import main as http_controller from ...utils import logcache +from .. import taskmgr @stage.stage_class("BuildAppStage") @@ -28,6 +29,7 @@ class BuildAppStage(stage.BootingStage): async def run(self, ap: app.Application): """构建app对象的各个组件对象并初始化 """ + ap.task_mgr = taskmgr.AsyncTaskManager(ap) proxy_mgr = proxy.ProxyManager(ap) await proxy_mgr.initialize() diff --git a/pkg/core/taskmgr.py b/pkg/core/taskmgr.py new file mode 100644 index 00000000..ffe15e50 --- /dev/null +++ b/pkg/core/taskmgr.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import asyncio +import typing + +from . import app + + +class TaskContext: + """任务跟踪上下文""" + + current_action: str + """当前正在执行的动作""" + + log: str + """记录日志""" + + def __init__(self): + self.current_action = "" + self.log = "" + + def log(self, msg: str): + self.log += msg + "\n" + + def set_current_action(self, action: str): + self.current_action = action + + +class TaskWrapper: + """任务包装器""" + + task_type: str = "system" # 任务类型: system 或 user + """任务类型""" + + task_context: TaskContext + """任务上下文""" + + task: asyncio.Task + """任务""" + + ap: app.Application + """应用实例""" + + def __init__(self, ap: app.Application, coro: typing.Coroutine, task_type: str = "system", context: TaskContext = None): + self.ap = ap + self.task_context = context or TaskContext() + self.task = self.ap.event_loop.create_task(coro) + self.task_type = task_type + + +class AsyncTaskManager: + """保存app中的所有异步任务 + 包含系统级的和用户级(插件安装、更新等由用户直接发起的)的""" + + ap: app.Application + + tasks: list[TaskWrapper] + """所有任务""" + + def __init__(self, ap: app.Application): + self.ap = ap + self.tasks = [] + + def create_task(self, coro: typing.Coroutine, task_type: str = "system", context: TaskContext = None) -> TaskWrapper: + wrapper = TaskWrapper(self.ap, coro, task_type, context) + self.tasks.append(wrapper) + return wrapper + + async def wait_all(self): + await asyncio.gather(*[t.task for t in self.tasks], return_exceptions=True) + + def get_all_tasks(self) -> list[TaskWrapper]: + return self.tasks diff --git a/pkg/pipeline/controller.py b/pkg/pipeline/controller.py index 3b4e2c64..29d53d71 100644 --- a/pkg/pipeline/controller.py +++ b/pkg/pipeline/controller.py @@ -60,8 +60,9 @@ class Controller: # 通知其他协程,有新的请求可以处理了 self.ap.query_pool.condition.notify_all() - task = asyncio.create_task(_process_query(selected_query)) - self.ap.asyncio_tasks.append(task) + # task = asyncio.create_task(_process_query(selected_query)) + # self.ap.asyncio_tasks.append(task) + self.ap.task_mgr.create_task(_process_query(selected_query)) except Exception as e: # traceback.print_exc() diff --git a/pkg/platform/manager.py b/pkg/platform/manager.py index 4b22d5c9..47822c28 100644 --- a/pkg/platform/manager.py +++ b/pkg/platform/manager.py @@ -184,10 +184,10 @@ class PlatformManager: tasks.append(exception_wrapper(adapter)) for task in tasks: - async_task = asyncio.create_task(task) - self.ap.asyncio_tasks.append(async_task) + # async_task = asyncio.create_task(task) + # self.ap.asyncio_tasks.append(async_task) + self.ap.task_mgr.create_task(task) except Exception as e: self.ap.logger.error('平台适配器运行出错: ' + str(e)) self.ap.logger.debug(f"Traceback: {traceback.format_exc()}") - From a4589327a6ef50e6f0d3e0af9f324a29d1699cdc Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Tue, 22 Oct 2024 18:17:09 +0800 Subject: [PATCH 2/5] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20python=20?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/main.py b/main.py index 273c8579..baf339c6 100644 --- a/main.py +++ b/main.py @@ -54,6 +54,13 @@ async def main_entry(loop: asyncio.AbstractEventLoop): if __name__ == '__main__': import os + import sys + + # 必须大于 3.10.1 + if sys.version_info < (3, 10, 1): + print("需要 Python 3.10.1 及以上版本,当前 Python 版本为:", sys.version) + input("按任意键退出...") + exit(1) # 检查本目录是否有main.py,且包含QChatGPT字符串 invalid_pwd = False From d5e31203509df46df9c0b9228f8ed81455d67c47 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Thu, 24 Oct 2024 14:26:18 +0800 Subject: [PATCH 3/5] =?UTF-8?q?chore:=20=E7=A1=AE=E4=BF=9D=20pydantic<2.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 389f172f..1f808177 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ Pillow tiktoken PyYaml aiohttp -pydantic +pydantic<2.0 websockets urllib3 psutil @@ -18,4 +18,4 @@ ollama quart sqlalchemy[asyncio] aiosqlite -quart-cors \ No newline at end of file +quart-cors From 2f05f5b4560aee29c9b4812e9096825b08361d14 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Thu, 24 Oct 2024 18:28:57 +0800 Subject: [PATCH 4/5] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=88=97=E8=A1=A8=E6=A1=86=E6=9E=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/src/App.vue | 39 ++++++++++++++++++++++- web/src/components/TaskDialog.vue | 51 +++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 web/src/components/TaskDialog.vue diff --git a/web/src/App.vue b/web/src/App.vue index 54763a00..94d0b51c 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -34,6 +34,23 @@