refactor: 重构openai包基础组件架构

This commit is contained in:
RockChinQ
2024-01-27 00:06:38 +08:00
parent 411034902a
commit 850a4eeb7c
35 changed files with 779 additions and 59 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import typing
import traceback
from . import app, entities
@@ -24,25 +25,115 @@ class Controller:
async def consumer(self):
"""事件处理循环
"""
while True:
selected_query: entities.Query = None
try:
while True:
selected_query: entities.Query = None
# 取请求
async with self.ap.query_pool:
queries: list[entities.Query] = self.ap.query_pool.queries
# 取请求
async with self.ap.query_pool:
queries: list[entities.Query] = self.ap.query_pool.queries
if queries:
selected_query = queries.pop(0) # FCFS
else:
await self.ap.query_pool.condition.wait()
continue
for query in queries:
session = await self.ap.sess_mgr.get_session(query)
self.ap.logger.debug(f"Checking query {query} session {session}")
if selected_query:
async def _process_query(selected_query):
async with self.semaphore:
await self.process_query(selected_query)
asyncio.create_task(_process_query(selected_query))
if not session.semaphore.locked():
selected_query = query
await session.semaphore.acquire()
break
if selected_query: # 找到了
queries.remove(selected_query)
else: # 没找到 说明:没有请求 或者 所有query对应的session都已达到并发上限
await self.ap.query_pool.condition.wait()
continue
if selected_query:
async def _process_query(selected_query):
async with self.semaphore: # 总并发上限
await self.process_query(selected_query)
async with self.ap.query_pool:
(await self.ap.sess_mgr.get_session(selected_query)).semaphore.release()
# 通知其他协程,有新的请求可以处理了
self.ap.query_pool.condition.notify_all()
asyncio.create_task(_process_query(selected_query))
except Exception as e:
self.ap.logger.error(f"事件处理循环出错: {e}")
traceback.print_exc()
async def _check_output(self, result: pipeline_entities.StageProcessResult):
"""检查输出
"""
if result.user_notice:
await self.ap.im_mgr.send(
result.user_notice
)
if result.debug_notice:
self.ap.logger.debug(result.debug_notice)
if result.console_notice:
self.ap.logger.info(result.console_notice)
async def _execute_from_stage(
self,
stage_index: int,
query: entities.Query,
):
"""从指定阶段开始执行
如何看懂这里为什么这么写?
去问 GPT-4:
Q1: 现在有一个责任链其中有多个stagequery对象在其中传递stage.process可能返回Result也有可能返回typing.AsyncGenerator[Result, None]
如果返回的是生成器需要挨个生成result检查是否result中是否要求继续如果要求继续就进行下一个stage。如果此次生成器产生的result处理完了就继续生成下一个result
调用后续的stage直到该生成器全部生成完。责任链中可能有多个stage会返回生成器
Q2: 不是这样的你可能理解有误。如果我们责任链上有这些Stage
A B C D E F G
如果所有的stage都返回Result且所有Result都要求继续那么执行顺序是
A B C D E F G
现在假设C返回的是AsyncGenerator那么执行顺序是
A B C D E F G C D E F G C D E F G ...
Q3: 但是如果不止一个stage会返回生成器呢
"""
i = stage_index
while i < len(self.ap.stage_mgr.stage_containers):
stage_container = self.ap.stage_mgr.stage_containers[i]
result = await stage_container.inst.process(query, stage_container.inst_name)
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
self.ap.logger.debug(f"Stage {stage_container.inst_name} processed query {query} res {result}")
await self._check_output(result)
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f"Stage {stage_container.inst_name} interrupted query {query}")
break
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query
elif isinstance(result, typing.AsyncGenerator): # 生成器
self.ap.logger.debug(f"Stage {stage_container.inst_name} processed query {query} gen")
async for sub_result in result:
self.ap.logger.debug(f"Stage {stage_container.inst_name} processed query {query} res {sub_result}")
await self._check_output(sub_result)
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f"Stage {stage_container.inst_name} interrupted query {query}")
break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query
await self._execute_from_stage(i + 1, query)
break
i += 1
async def process_query(self, query: entities.Query):
"""处理请求
@@ -50,28 +141,7 @@ class Controller:
self.ap.logger.debug(f"Processing query {query}")
try:
for stage_container in self.ap.stage_mgr.stage_containers:
res = await stage_container.inst.process(query, stage_container.inst_name)
self.ap.logger.debug(f"Stage {stage_container.inst_name} res {res}")
if res.user_notice:
await self.ap.im_mgr.send(
query.message_event,
res.user_notice
)
if res.debug_notice:
self.ap.logger.debug(res.debug_notice)
if res.console_notice:
self.ap.logger.info(res.console_notice)
if res.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f"Stage {stage_container.inst_name} interrupted query {query}")
break
elif res.result_type == pipeline_entities.ResultType.CONTINUE:
query = res.new_query
continue
await self._execute_from_stage(0, query)
except Exception as e:
self.ap.logger.error(f"处理请求时出错 {query}: {e}")
traceback.print_exc()