diff --git a/pkg/config/migrations/m008_ad_fixwin_config_migrate.py b/pkg/config/migrations/m008_ad_fixwin_config_migrate.py new file mode 100644 index 00000000..ccd6fbd7 --- /dev/null +++ b/pkg/config/migrations/m008_ad_fixwin_config_migrate.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class("ad-fixwin-cfg-migration", 8) +class AdFixwinConfigMigration(migration.Migration): + """迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return isinstance( + self.ap.pipeline_cfg.data["rate-limit"]["fixwin"]["default"], + int + ) + + async def run(self): + """执行迁移""" + + for session_name in self.ap.pipeline_cfg.data["rate-limit"]["fixwin"]: + + temp_dict = { + "window-size": 60, + "limit": self.ap.pipeline_cfg.data["rate-limit"]["fixwin"][session_name] + } + + self.ap.pipeline_cfg.data["rate-limit"]["fixwin"][session_name] = temp_dict + + await self.ap.pipeline_cfg.dump_config() \ No newline at end of file diff --git a/pkg/core/stages/migrate.py b/pkg/core/stages/migrate.py index 69745fc1..68a87d4e 100644 --- a/pkg/core/stages/migrate.py +++ b/pkg/core/stages/migrate.py @@ -5,7 +5,7 @@ import importlib from .. import stage, app from ...config import migration from ...config.migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion -from ...config.migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url +from ...config.migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate @stage.stage_class("MigrationStage") diff --git a/pkg/pipeline/ratelimit/algos/fixedwin.py b/pkg/pipeline/ratelimit/algos/fixedwin.py index aa380291..3bf8a5e5 100644 --- a/pkg/pipeline/ratelimit/algos/fixedwin.py +++ b/pkg/pipeline/ratelimit/algos/fixedwin.py @@ -1,18 +1,15 @@ -# 固定窗口算法 from __future__ import annotations - import asyncio import time - from .. import algo - +# 固定窗口算法 class SessionContainer: wait_lock: asyncio.Lock records: dict[int, int] - """访问记录,key为每分钟的起始时间戳,value为访问次数""" + """访问记录,key为每窗口长度的起始时间戳,value为访问次数""" def __init__(self): self.wait_lock = asyncio.Lock() @@ -47,30 +44,34 @@ class FixedWindowAlgo(algo.ReteLimitAlgo): # 等待锁 async with container.wait_lock: + + # 获取窗口大小和限制 + window_size = self.ap.pipeline_cfg.data['rate-limit']['fixwin']['default']['window-size'] + limitation = self.ap.pipeline_cfg.data['rate-limit']['fixwin']['default']['limit'] + + if session_name in self.ap.pipeline_cfg.data['rate-limit']['fixwin']: + window_size = self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name]['window-size'] + limitation = self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name]['limit'] + # 获取当前时间戳 now = int(time.time()) - # 获取当前分钟的起始时间戳 - now = now - now % 60 + # 获取当前窗口的起始时间戳 + now = now - now % window_size - # 获取当前分钟的访问次数 + # 获取当前窗口的访问次数 count = container.records.get(now, 0) - limitation = self.ap.pipeline_cfg.data['rate-limit']['fixwin']['default'] - - if session_name in self.ap.pipeline_cfg.data['rate-limit']['fixwin']: - limitation = self.ap.pipeline_cfg.data['rate-limit']['fixwin'][session_name] - # 如果访问次数超过了限制 if count >= limitation: if self.ap.pipeline_cfg.data['rate-limit']['strategy'] == 'drop': return False elif self.ap.pipeline_cfg.data['rate-limit']['strategy'] == 'wait': - # 等待下一分钟 - await asyncio.sleep(60 - time.time() % 60) + # 等待下一窗口 + await asyncio.sleep(window_size - time.time() % window_size) now = int(time.time()) - now = now - now % 60 + now = now - now % window_size if now not in container.records: container.records = {} diff --git a/templates/pipeline.json b/templates/pipeline.json index 975c88b3..aefc195c 100644 --- a/templates/pipeline.json +++ b/templates/pipeline.json @@ -29,7 +29,10 @@ "strategy": "drop", "algo": "fixwin", "fixwin": { - "default": 60 + "default": { + "window-size": 60, + "limit": 60 + } } } } \ No newline at end of file