diff --git a/pkg/api/http/controller/groups/pipelines.py b/pkg/api/http/controller/groups/pipelines.py new file mode 100644 index 00000000..de077b66 --- /dev/null +++ b/pkg/api/http/controller/groups/pipelines.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import quart + +from .. import group +from .....entity.persistence import pipeline + + +@group.group_class('pipelines', '/api/v1/pipelines') +class PipelinesRouterGroup(group.RouterGroup): + + async def initialize(self) -> None: + + @self.route('', methods=['GET', 'POST']) + async def _() -> str: + if quart.request.method == 'GET': + return self.success(data={ + 'pipelines': await self.ap.pipeline_service.get_pipelines() + }) + elif quart.request.method == 'POST': + json_data = await quart.request.json + + await self.ap.pipeline_service.create_pipeline(json_data) + + return self.success() + + @self.route('/_/metadata', methods=['GET']) + async def _() -> str: + return self.success(data={ + 'configs': await self.ap.pipeline_service.get_pipeline_metadata() + }) + + @self.route('/', methods=['GET', 'DELETE']) + async def _(pipeline_uuid: str) -> str: + if quart.request.method == 'GET': + pipeline = await self.ap.pipeline_service.get_pipeline(pipeline_uuid) + + if pipeline is None: + return self.http_status(404, -1, 'pipeline not found') + + return self.success(data={ + 'pipeline': pipeline + }) + elif quart.request.method == 'DELETE': + await self.ap.pipeline_service.delete_pipeline(pipeline_uuid) + + return self.success() + diff --git a/pkg/api/http/controller/main.py b/pkg/api/http/controller/main.py index 9aced3cf..0f110c89 100644 --- a/pkg/api/http/controller/main.py +++ b/pkg/api/http/controller/main.py @@ -8,7 +8,7 @@ import quart_cors from ....core import app, entities as core_entities -from .groups import logs, system, settings, plugins, stats, user +from .groups import logs, system, settings, plugins, stats, user, pipelines from .groups.provider import models, requesters from . import group diff --git a/pkg/api/http/service/pipeline.py b/pkg/api/http/service/pipeline.py new file mode 100644 index 00000000..68cfb7ca --- /dev/null +++ b/pkg/api/http/service/pipeline.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import uuid +import datetime +import sqlalchemy + +from ....core import app +from ....pipeline import stagemgr +from ....entity.persistence import pipeline as persistence_pipeline + + +class PipelineService: + ap: app.Application + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + + async def get_pipeline_metadata(self) -> dict: + return [ + self.ap.pipeline_config_meta_trigger.data, + self.ap.pipeline_config_meta_safety.data, + self.ap.pipeline_config_meta_ai.data, + self.ap.pipeline_config_meta_output.data + ] + + async def get_pipelines(self) -> list[dict]: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_pipeline.LegacyPipeline) + ) + + pipelines = result.all() + return [ + self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline) + for pipeline in pipelines + ] + + async def get_pipeline(self, pipeline_uuid: str) -> dict | None: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(persistence_pipeline.LegacyPipeline).where(persistence_pipeline.LegacyPipeline.uuid == pipeline_uuid) + ) + + pipeline = result.first() + + if pipeline is None: + return None + + return self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline) + + async def create_pipeline(self, pipeline_data: dict) -> None: + pipeline_data['uuid'] = str(uuid.uuid4()) + pipeline_data['for_version'] = self.ap.ver_mgr.get_current_version() + pipeline_data['stages'] = stagemgr.stage_order.copy() + + # TODO: 检查pipeline config是否完整 + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.insert(persistence_pipeline.LegacyPipeline).values(**pipeline_data) + ) + # TODO: 更新到pipeline manager + + async def delete_pipeline(self, pipeline_uuid: str) -> None: + await self.ap.persistence_mgr.execute_async( + sqlalchemy.delete(persistence_pipeline.LegacyPipeline).where(persistence_pipeline.LegacyPipeline.uuid == pipeline_uuid) + ) + # TODO: 更新到pipeline manager diff --git a/pkg/core/app.py b/pkg/core/app.py index 9f359239..520d954f 100644 --- a/pkg/core/app.py +++ b/pkg/core/app.py @@ -26,6 +26,7 @@ from ..persistence import mgr as persistencemgr from ..api.http.controller import main as http_controller from ..api.http.service import user as user_service from ..api.http.service import model as model_service +from ..api.http.service import pipeline as pipeline_service from ..discover import engine as discover_engine from ..utils import logcache, ip from . import taskmgr @@ -83,6 +84,11 @@ class Application: instance_secret_meta: config_mgr.ConfigManager = None + pipeline_config_meta_trigger: config_mgr.ConfigManager = None + pipeline_config_meta_safety: config_mgr.ConfigManager = None + pipeline_config_meta_ai: config_mgr.ConfigManager = None + pipeline_config_meta_output: config_mgr.ConfigManager = None + # ========================= ctr_mgr: center_mgr.V2CenterAPI = None @@ -115,6 +121,8 @@ class Application: model_service: model_service.ModelsService = None + pipeline_service: pipeline_service.PipelineService = None + def __init__(self): pass diff --git a/pkg/core/bootutils/config.py b/pkg/core/bootutils/config.py index 940a6132..794c329a 100644 --- a/pkg/core/bootutils/config.py +++ b/pkg/core/bootutils/config.py @@ -8,3 +8,4 @@ from ...config.impls import pymodule load_python_module_config = config_mgr.load_python_module_config load_json_config = config_mgr.load_json_config +load_yaml_config = config_mgr.load_yaml_config \ No newline at end of file diff --git a/pkg/core/stages/build_app.py b/pkg/core/stages/build_app.py index 067044e5..5752d16d 100644 --- a/pkg/core/stages/build_app.py +++ b/pkg/core/stages/build_app.py @@ -19,6 +19,7 @@ from ...persistence import mgr as persistencemgr from ...api.http.controller import main as http_controller from ...api.http.service import user as user_service from ...api.http.service import model as model_service +from ...api.http.service import pipeline as pipeline_service from ...discover import engine as discover_engine from ...utils import logcache from .. import taskmgr @@ -127,5 +128,8 @@ class BuildAppStage(stage.BootingStage): model_service_inst = model_service.ModelsService(ap) ap.model_service = model_service_inst + pipeline_service_inst = pipeline_service.PipelineService(ap) + ap.pipeline_service = pipeline_service_inst + ctrl = controller.Controller(ap) ap.ctrl = ctrl diff --git a/pkg/core/stages/load_config.py b/pkg/core/stages/load_config.py index cc154a7c..9476240a 100644 --- a/pkg/core/stages/load_config.py +++ b/pkg/core/stages/load_config.py @@ -82,3 +82,8 @@ class LoadConfigStage(stage.BootingStage): 'jwt_secret': secrets.token_hex(16) }) await ap.instance_secret_meta.dump_config() + + ap.pipeline_config_meta_trigger = await config.load_yaml_config("templates/metadata/pipeline/trigger.yaml", "templates/metadata/pipeline/trigger.yaml") + ap.pipeline_config_meta_safety = await config.load_yaml_config("templates/metadata/pipeline/safety.yaml", "templates/metadata/pipeline/safety.yaml") + ap.pipeline_config_meta_ai = await config.load_yaml_config("templates/metadata/pipeline/ai.yaml", "templates/metadata/pipeline/ai.yaml") + ap.pipeline_config_meta_output = await config.load_yaml_config("templates/metadata/pipeline/output.yaml", "templates/metadata/pipeline/output.yaml") diff --git a/pkg/entity/persistence/pipeline.py b/pkg/entity/persistence/pipeline.py new file mode 100644 index 00000000..6d1b499d --- /dev/null +++ b/pkg/entity/persistence/pipeline.py @@ -0,0 +1,32 @@ +import sqlalchemy + +from .base import Base + + +class LegacyPipeline(Base): + """旧版流水线""" + __tablename__ = 'legacy_pipelines' + + uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + description = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now()) + updated_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now(), onupdate=sqlalchemy.func.now()) + for_version = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + + stages = sqlalchemy.Column(sqlalchemy.JSON, nullable=False) + config = sqlalchemy.Column(sqlalchemy.JSON, nullable=False) + + +class PipelineRunRecord(Base): + """流水线运行记录""" + __tablename__ = 'pipeline_run_records' + + uuid = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True) + pipeline_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + status = sqlalchemy.Column(sqlalchemy.String(255), nullable=False) + created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now()) + updated_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now(), onupdate=sqlalchemy.func.now()) + started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False) + finished_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False) + result = sqlalchemy.Column(sqlalchemy.JSON, nullable=False)