feat: plugin deletion and upgrade

This commit is contained in:
Junyan Qin
2025-08-17 18:07:51 +08:00
parent a0c42a5f6e
commit b176959836
22 changed files with 336 additions and 956 deletions

View File

@@ -116,6 +116,34 @@ class PluginRuntimeConnector:
if task_context is not None:
task_context.trace(trace)
async def upgrade_plugin(
self, plugin_author: str, plugin_name: str, task_context: taskmgr.TaskContext | None = None
) -> dict[str, Any]:
async for ret in self.handler.upgrade_plugin(plugin_author, plugin_name):
current_action = ret.get('current_action', None)
if current_action is not None:
if task_context is not None:
task_context.set_current_action(current_action)
trace = ret.get('trace', None)
if trace is not None:
if task_context is not None:
task_context.trace(trace)
async def delete_plugin(
self, plugin_author: str, plugin_name: str, task_context: taskmgr.TaskContext | None = None
) -> dict[str, Any]:
async for ret in self.handler.delete_plugin(plugin_author, plugin_name):
current_action = ret.get('current_action', None)
if current_action is not None:
if task_context is not None:
task_context.set_current_action(current_action)
trace = ret.get('trace', None)
if trace is not None:
if task_context is not None:
task_context.trace(trace)
async def list_plugins(self) -> list[dict[str, Any]]:
return await self.handler.list_plugins()

View File

@@ -1,281 +0,0 @@
from __future__ import annotations
import typing
import abc
from ..core import app
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.events as events
def register(
name: str, description: str, version: str, author: str
) -> typing.Callable[[typing.Type[BasePlugin]], typing.Type[BasePlugin]]:
"""注册插件类
使用示例:
@register(
name="插件名称",
description="插件描述",
version="插件版本",
author="插件作者"
)
class MyPlugin(BasePlugin):
pass
"""
pass
def handler(
event: typing.Type[events.BaseEventModel],
) -> typing.Callable[[typing.Callable], typing.Callable]:
"""注册事件监听器
使用示例:
class MyPlugin(BasePlugin):
@handler(NormalMessageResponded)
async def on_normal_message_responded(self, ctx: EventContext):
pass
"""
pass
def llm_func(
name: str = None,
) -> typing.Callable:
"""注册内容函数
使用示例:
class MyPlugin(BasePlugin):
@llm_func("access_the_web_page")
async def _(self, query, url: str, brief_len: int):
\"""Call this function to search about the question before you answer any questions.
- Do not search through google.com at any time.
- If you need to search somthing, visit https://www.sogou.com/web?query=<something>.
- If user ask you to open a url (start with http:// or https://), visit it directly.
- Summary the plain content result by yourself, DO NOT directly output anything in the result you got.
Args:
url(str): url to visit
brief_len(int): max length of the plain text content, recommend 1024-4096, prefer 4096
Returns:
str: plain text content of the web page or error message(starts with 'error:')
\"""
"""
pass
class BasePlugin(metaclass=abc.ABCMeta):
"""插件基类"""
host: APIHost
"""API宿主"""
ap: app.Application
"""应用程序对象"""
config: dict
"""插件配置"""
def __init__(self, host: APIHost):
"""初始化阶段被调用"""
self.host = host
self.config = {}
async def initialize(self):
"""初始化阶段被调用"""
pass
async def destroy(self):
"""释放/禁用插件时被调用"""
pass
def __del__(self):
"""释放/禁用插件时被调用"""
pass
class APIHost:
"""LangBot API 宿主"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self):
pass
# ========== 插件可调用的 API主程序API ==========
def get_platform_adapters(self) -> list[abstract_platform_adapter.AbstractMessagePlatformAdapter]:
"""获取已启用的消息平台适配器列表
Returns:
list[platform.adapter.MessageSourceAdapter]: 已启用的消息平台适配器列表
"""
return self.ap.platform_mgr.get_running_adapters()
async def send_active_message(
self,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
target_type: str,
target_id: str,
message: platform_message.MessageChain,
):
"""发送主动消息
Args:
adapter (platform.adapter.MessageSourceAdapter): 消息平台适配器对象,调用 host.get_platform_adapters() 获取并取用其中某个
target_type (str): 目标类型,`person`或`group`
target_id (str): 目标ID
message (platform.types.MessageChain): 消息链
"""
await adapter.send_message(
target_type=target_type,
target_id=target_id,
message=message,
)
def require_ver(
self,
ge: str,
le: str = 'v999.999.999',
) -> bool:
"""插件版本要求装饰器
Args:
ge (str): 最低版本要求
le (str, optional): 最高版本要求
Returns:
bool: 是否满足要求, False时为无法获取版本号True时为满足要求报错为不满足要求
"""
langbot_version = ''
try:
langbot_version = self.ap.ver_mgr.get_current_version() # 从updater模块获取版本号
except Exception:
return False
if self.ap.ver_mgr.compare_version_str(langbot_version, ge) < 0 or (
self.ap.ver_mgr.compare_version_str(langbot_version, le) > 0
):
raise Exception(
'LangBot 版本不满足要求,某些功能(可能是由插件提供的)无法正常使用。(要求版本:{}-{},但当前版本:{}'.format(
ge, le, langbot_version
)
)
return True
class EventContext:
"""事件上下文, 保存此次事件运行的信息"""
eid = 0
"""事件编号"""
host: APIHost = None
"""API宿主"""
event: events.BaseEventModel = None
"""此次事件的对象具体类型为handler注册时指定监听的类型可查看events.py中的定义"""
__prevent_default__ = False
"""是否阻止默认行为"""
__prevent_postorder__ = False
"""是否阻止后续插件的执行"""
__return_value__ = {}
""" 返回值
示例:
{
"example": [
'value1',
'value2',
3,
4,
{
'key1': 'value1',
},
['value1', 'value2']
]
}
"""
# ========== 插件可调用的 API ==========
def add_return(self, key: str, ret):
"""添加返回值"""
if key not in self.__return_value__:
self.__return_value__[key] = []
self.__return_value__[key].append(ret)
async def reply(self, message_chain: platform_message.MessageChain):
"""回复此次消息请求
Args:
message_chain (platform.types.MessageChain): 源平台的消息链,若用户使用的不是源平台适配器,程序也能自动转换为目标平台消息链
"""
# TODO 添加 at_sender 和 quote_origin 参数
await self.event.query.adapter.reply_message(
message_source=self.event.query.message_event, message=message_chain
)
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
"""主动发送消息
Args:
target_type (str): 目标类型,`person`或`group`
target_id (str): 目标ID
message (platform.types.MessageChain): 源平台的消息链,若用户使用的不是源平台适配器,程序也能自动转换为目标平台消息链
"""
await self.event.query.adapter.send_message(target_type=target_type, target_id=target_id, message=message)
def prevent_postorder(self):
"""阻止后续插件执行"""
self.__prevent_postorder__ = True
def prevent_default(self):
"""阻止默认行为"""
self.__prevent_default__ = True
# ========== 以下是内部保留方法,插件不应调用 ==========
def get_return(self, key: str) -> list:
"""获取key的所有返回值"""
if key in self.__return_value__:
return self.__return_value__[key]
return None
def get_return_value(self, key: str):
"""获取key的首个返回值"""
if key in self.__return_value__:
return self.__return_value__[key][0]
return None
def is_prevented_default(self):
"""是否阻止默认行为"""
return self.__prevent_default__
def is_prevented_postorder(self):
"""是否阻止后序插件执行"""
return self.__prevent_postorder__
def __init__(self, host: APIHost, event: events.BaseEventModel):
self.eid = EventContext.eid
self.host = host
self.event = event
self.__prevent_default__ = False
self.__prevent_postorder__ = False
self.__return_value__ = {}
EventContext.eid += 1

View File

@@ -1,21 +0,0 @@
from __future__ import annotations
class PluginSystemError(Exception):
message: str
def __init__(self, message: str):
self.message = message
def __str__(self):
return self.message
class PluginNotFoundError(PluginSystemError):
def __init__(self, message: str):
super().__init__(f'未找到插件: {message}')
class PluginInstallerError(PluginSystemError):
def __init__(self, message: str):
super().__init__(f'安装器操作错误: {message}')

View File

@@ -437,6 +437,33 @@ class RuntimeConnectionHandler(handler.Handler):
async for ret in gen:
yield ret
async def upgrade_plugin(self, plugin_author: str, plugin_name: str) -> typing.AsyncGenerator[dict[str, Any], None]:
"""Upgrade plugin"""
gen = self.call_action_generator(
LangBotToRuntimeAction.UPGRADE_PLUGIN,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
},
timeout=120,
)
async for ret in gen:
yield ret
async def delete_plugin(self, plugin_author: str, plugin_name: str) -> typing.AsyncGenerator[dict[str, Any], None]:
"""Delete plugin"""
gen = self.call_action_generator(
LangBotToRuntimeAction.DELETE_PLUGIN,
{
'plugin_author': plugin_author,
'plugin_name': plugin_name,
},
)
async for ret in gen:
yield ret
async def list_plugins(self) -> list[dict[str, Any]]:
"""List plugins"""
result = await self.call_action(

View File

@@ -1,9 +0,0 @@
# 此模块已过时
# 请从 pkg.plugin.context 引入 BasePlugin, EventContext 和 APIHost
# 最早将于 v3.4 移除此模块
from .events import *
def emit(*args, **kwargs):
print('插件调用了已弃用的函数 pkg.plugin.host.emit()')

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import abc
from ..core import app, taskmgr
class PluginInstaller(metaclass=abc.ABCMeta):
"""插件安装器抽象类"""
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self):
pass
@abc.abstractmethod
async def install_plugin(
self,
plugin_source: str,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
):
"""安装插件"""
raise NotImplementedError
@abc.abstractmethod
async def uninstall_plugin(
self,
plugin_name: str,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
):
"""卸载插件"""
raise NotImplementedError
@abc.abstractmethod
async def update_plugin(
self,
plugin_name: str,
plugin_source: str = None,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
):
"""更新插件"""
raise NotImplementedError

View File

@@ -1,143 +0,0 @@
from __future__ import annotations
import re
import os
import zipfile
import ssl
import certifi
import aiohttp
import aiofiles
import aiofiles.os as aiofiles_os
import aioshutil
from .. import installer, errors
from ...utils import pkgmgr
from ...core import taskmgr
class GitHubRepoInstaller(installer.PluginInstaller):
"""GitHub仓库插件安装器"""
def get_github_plugin_repo_label(self, repo_url: str) -> list[str]:
"""获取username, repo"""
repo = re.findall(
r'(?:https?://github\.com/|git@github\.com:)([^/]+/[^/]+?)(?:\.git|/|$)',
repo_url,
)
if len(repo) > 0:
return repo[0].split('/')
else:
return None
async def download_plugin_source_code(
self,
repo_url: str,
target_path: str,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
) -> str:
"""下载插件源码(全异步)"""
repo = self.get_github_plugin_repo_label(repo_url)
if repo is None:
raise errors.PluginInstallerError('仅支持GitHub仓库地址')
target_path += repo[1]
self.ap.logger.debug('正在下载源码...')
task_context.trace('下载源码...', 'download-plugin-source-code')
zipball_url = f'https://api.github.com/repos/{"/".join(repo)}/zipball/HEAD'
zip_resp: bytes = None
# 创建自定义SSL上下文使用certifi提供的根证书
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
url=zipball_url,
timeout=aiohttp.ClientTimeout(total=300),
ssl=ssl_context, # 使用自定义SSL上下文来验证证书
) as resp:
if resp.status != 200:
raise errors.PluginInstallerError(f'下载源码失败: {await resp.text()}')
zip_resp = await resp.read()
if await aiofiles_os.path.exists('temp/' + target_path):
await aioshutil.rmtree('temp/' + target_path)
if await aiofiles_os.path.exists(target_path):
await aioshutil.rmtree(target_path)
await aiofiles_os.makedirs('temp/' + target_path)
async with aiofiles.open('temp/' + target_path + '/source.zip', 'wb') as f:
await f.write(zip_resp)
self.ap.logger.debug('解压中...')
task_context.trace('解压中...', 'unzip-plugin-source-code')
with zipfile.ZipFile('temp/' + target_path + '/source.zip', 'r') as zip_ref:
zip_ref.extractall('temp/' + target_path)
await aiofiles_os.remove('temp/' + target_path + '/source.zip')
import glob
unzip_dir = glob.glob('temp/' + target_path + '/*')[0]
await aioshutil.copytree(unzip_dir, target_path + '/')
await aioshutil.rmtree(unzip_dir)
self.ap.logger.debug('源码下载完成。')
return repo[1]
async def install_requirements(self, path: str):
if os.path.exists(path + '/requirements.txt'):
pkgmgr.install_requirements(path + '/requirements.txt')
async def install_plugin(
self,
plugin_source: str,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
):
"""安装插件"""
task_context.trace('下载插件源码...', 'install-plugin')
repo_label = await self.download_plugin_source_code(plugin_source, 'plugins/', task_context)
task_context.trace('安装插件依赖...', 'install-plugin')
await self.install_requirements('plugins/' + repo_label)
task_context.trace('完成.', 'install-plugin')
# Caution: in the v4.0, plugin without manifest will not be able to be updated
# await self.ap.plugin_mgr.setting.record_installed_plugin_source(
# "plugins/" + repo_label + '/', plugin_source
# )
async def uninstall_plugin(
self,
plugin_name: str,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
):
"""卸载插件"""
plugin_container = self.ap.plugin_mgr.get_plugin_by_name(plugin_name)
if plugin_container is None:
raise errors.PluginInstallerError('插件不存在或未成功加载')
else:
task_context.trace('删除插件目录...', 'uninstall-plugin')
await aioshutil.rmtree(plugin_container.pkg_path)
task_context.trace('完成, 重新加载以生效.', 'uninstall-plugin')
async def update_plugin(
self,
plugin_name: str,
plugin_source: str = None,
task_context: taskmgr.TaskContext = taskmgr.TaskContext.placeholder(),
):
"""更新插件"""
task_context.trace('更新插件...', 'update-plugin')
plugin_container = self.ap.plugin_mgr.get_plugin_by_name(plugin_name)
if plugin_container is None:
raise errors.PluginInstallerError('插件不存在或未成功加载')
else:
if plugin_container.plugin_repository:
plugin_source = plugin_container.plugin_repository
task_context.trace('转交安装任务.', 'update-plugin')
await self.install_plugin(plugin_source, task_context)
else:
raise errors.PluginInstallerError('插件无源码信息,无法更新')

View File

@@ -1,25 +0,0 @@
from __future__ import annotations
import abc
from ..core import app
from . import context
class PluginLoader(metaclass=abc.ABCMeta):
"""插件加载器抽象类"""
ap: app.Application
plugins: list[context.RuntimeContainer]
def __init__(self, ap: app.Application):
self.ap = ap
self.plugins = []
async def initialize(self):
pass
@abc.abstractmethod
async def load_plugins(self):
pass

View File

@@ -1,199 +0,0 @@
from __future__ import annotations
import typing
import pkgutil
import importlib
import traceback
from .. import loader, context, models
from langbot_plugin.api.entities.builtin.resource import tool as resource_tool
from ...utils import funcschema
from ...discover import engine as discover_engine
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.events as events
class PluginLoader(loader.PluginLoader):
"""加载 plugins/ 目录下的插件"""
_current_pkg_path = ''
_current_module_path = ''
_current_container: context.RuntimeContainer = None
plugins: list[context.RuntimeContainer] = []
def __init__(self, ap):
self.ap = ap
self.plugins = []
self._current_pkg_path = ''
self._current_module_path = ''
self._current_container = None
async def initialize(self):
"""初始化"""
def register(
self, name: str, description: str, version: str, author: str
) -> typing.Callable[[typing.Type[context.BasePlugin]], typing.Type[context.BasePlugin]]:
self.ap.logger.debug(f'注册插件 {name} {version} by {author}')
container = context.RuntimeContainer(
plugin_name=name,
plugin_label=discover_engine.I18nString(en_US=name, zh_Hans=name),
plugin_description=discover_engine.I18nString(en_US=description, zh_Hans=description),
plugin_version=version,
plugin_author=author,
plugin_repository='',
pkg_path=self._current_pkg_path,
main_file=self._current_module_path,
event_handlers={},
tools=[],
)
self._current_container = container
def wrapper(cls: context.BasePlugin) -> typing.Type[context.BasePlugin]:
container.plugin_class = cls
return cls
return wrapper
# 过时
# 最早将于 v3.4 版本移除
def on(self, event: typing.Type[events.BaseEventModel]) -> typing.Callable[[typing.Callable], typing.Callable]:
"""注册过时的事件处理器"""
self.ap.logger.debug(f'注册事件处理器 {event.__name__}')
def wrapper(func: typing.Callable) -> typing.Callable:
async def handler(plugin: context.BasePlugin, ctx: context.EventContext) -> None:
args = {
'host': ctx.host,
'event': ctx,
}
# 把 ctx.event 所有的属性都放到 args 里
# for k, v in ctx.event.dict().items():
# args[k] = v
for attr_name in ctx.event.__dict__.keys():
args[attr_name] = getattr(ctx.event, attr_name)
func(plugin, **args)
self._current_container.event_handlers[event] = handler
return func
return wrapper
# 过时
# 最早将于 v3.4 版本移除
def func(
self,
name: str = None,
) -> typing.Callable:
"""注册过时的内容函数"""
self.ap.logger.debug(f'注册内容函数 {name}')
def wrapper(func: typing.Callable) -> typing.Callable:
function_schema = funcschema.get_func_schema(func)
function_name = self._current_container.plugin_name + '-' + (func.__name__ if name is None else name)
async def handler(plugin: context.BasePlugin, query: pipeline_query.Query, *args, **kwargs):
return func(*args, **kwargs)
llm_function = resource_tool.LLMTool(
name=function_name,
human_desc='',
description=function_schema['description'],
parameters=function_schema['parameters'],
func=handler,
)
self._current_container.tools.append(llm_function)
return func
return wrapper
def handler(self, event: typing.Type[events.BaseEventModel]) -> typing.Callable[[typing.Callable], typing.Callable]:
"""注册事件处理器"""
self.ap.logger.debug(f'注册事件处理器 {event.__name__}')
def wrapper(func: typing.Callable) -> typing.Callable:
if (
self._current_container is None
): # None indicates this plugin is registered through manifest, so ignore it here
return func
self._current_container.event_handlers[event] = func
return func
return wrapper
def llm_func(
self,
name: str = None,
) -> typing.Callable:
"""注册内容函数"""
self.ap.logger.debug(f'注册内容函数 {name}')
def wrapper(func: typing.Callable) -> typing.Callable:
if (
self._current_container is None
): # None indicates this plugin is registered through manifest, so ignore it here
return func
function_schema = funcschema.get_func_schema(func)
function_name = self._current_container.plugin_name + '-' + (func.__name__ if name is None else name)
llm_function = resource_tool.LLMTool(
name=function_name,
human_desc='',
description=function_schema['description'],
parameters=function_schema['parameters'],
func=func,
)
self._current_container.tools.append(llm_function)
return func
return wrapper
async def _walk_plugin_path(self, module, prefix='', path_prefix=''):
"""遍历插件路径"""
for item in pkgutil.iter_modules(module.__path__):
if item.ispkg:
await self._walk_plugin_path(
__import__(module.__name__ + '.' + item.name, fromlist=['']),
prefix + item.name + '.',
path_prefix + item.name + '/',
)
else:
try:
self._current_pkg_path = 'plugins/' + path_prefix
self._current_module_path = 'plugins/' + path_prefix + item.name + '.py'
self._current_container = None
importlib.import_module(module.__name__ + '.' + item.name)
if self._current_container is not None:
self.plugins.append(self._current_container)
self.ap.logger.debug(f'插件 {self._current_container} 已加载')
except Exception:
self.ap.logger.error(f'加载插件模块 {prefix + item.name} 时发生错误')
traceback.print_exc()
async def load_plugins(self):
"""加载插件"""
setattr(models, 'register', self.register)
setattr(models, 'on', self.on)
setattr(models, 'func', self.func)
setattr(context, 'register', self.register)
setattr(context, 'handler', self.handler)
setattr(context, 'llm_func', self.llm_func)
await self._walk_plugin_path(__import__('plugins', fromlist=['']))

View File

@@ -1,97 +0,0 @@
from __future__ import annotations
import typing
import os
import traceback
from ...core import app
from .. import context
from .. import loader
from ...utils import funcschema
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
import langbot_plugin.api.entities.events as events
class PluginManifestLoader(loader.PluginLoader):
"""通过插件清单发现插件"""
_current_container: context.RuntimeContainer = None
def __init__(self, ap: app.Application):
super().__init__(ap)
def handler(self, event: typing.Type[events.BaseEventModel]) -> typing.Callable[[typing.Callable], typing.Callable]:
"""注册事件处理器"""
self.ap.logger.debug(f'注册事件处理器 {event.__name__}')
def wrapper(func: typing.Callable) -> typing.Callable:
self._current_container.event_handlers[event] = func
return func
return wrapper
def llm_func(
self,
name: str = None,
) -> typing.Callable:
"""注册内容函数"""
self.ap.logger.debug(f'注册内容函数 {name}')
def wrapper(func: typing.Callable) -> typing.Callable:
function_schema = funcschema.get_func_schema(func)
function_name = self._current_container.plugin_name + '-' + (func.__name__ if name is None else name)
llm_function = resource_tool.LLMTool(
name=function_name,
human_desc='',
description=function_schema['description'],
parameters=function_schema['parameters'],
func=func,
)
self._current_container.tools.append(llm_function)
return func
return wrapper
async def load_plugins(self):
"""加载插件"""
setattr(context, 'handler', self.handler)
setattr(context, 'llm_func', self.llm_func)
plugin_manifests = self.ap.discover.get_components_by_kind('Plugin')
for plugin_manifest in plugin_manifests:
try:
config_schema = plugin_manifest.spec['config'] if 'config' in plugin_manifest.spec else []
current_plugin_container = context.RuntimeContainer(
plugin_name=plugin_manifest.metadata.name,
plugin_label=plugin_manifest.metadata.label,
plugin_description=plugin_manifest.metadata.description,
plugin_version=plugin_manifest.metadata.version,
plugin_author=plugin_manifest.metadata.author,
plugin_repository=plugin_manifest.metadata.repository,
main_file=os.path.join(plugin_manifest.rel_dir, plugin_manifest.execution.python.path),
pkg_path=plugin_manifest.rel_dir,
config_schema=config_schema,
event_handlers={},
tools=[],
)
self._current_container = current_plugin_container
# extract the plugin class
# this step will load the plugin module,
# so the event handlers and tools will be registered
plugin_class = plugin_manifest.get_python_component_class()
current_plugin_container.plugin_class = plugin_class
# TODO load component extensions
self.plugins.append(current_plugin_container)
except Exception:
self.ap.logger.error(f'加载插件 {plugin_manifest.metadata.name} 时发生错误')
traceback.print_exc()

View File

@@ -1,29 +0,0 @@
# 此模块已过时,请引入 pkg.plugin.context 中的 register, handler 和 llm_func 来注册插件、事件处理函数和内容函数
# 各个事件模型请从 pkg.plugin.events 引入
# 最早将于 v3.4 移除此模块
from __future__ import annotations
import typing
from .context import BasePlugin as Plugin
from .events import *
import langbot_plugin.api.entities.events as events
def register(
name: str, description: str, version: str, author
) -> typing.Callable[[typing.Type[Plugin]], typing.Type[Plugin]]:
pass
def on(
event: typing.Type[events.BaseEventModel],
) -> typing.Callable[[typing.Callable], typing.Callable]:
pass
def func(
name: str = None,
) -> typing.Callable:
pass