mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-03 20:44:36 +00:00
refactor: 独立resprule为单独的包
This commit is contained in:
@@ -18,7 +18,7 @@ from ..plugin import host as plugin_host
|
||||
from ..plugin import models as plugin_models
|
||||
import tips as tips_custom
|
||||
from ..qqbot import adapter as msadapter
|
||||
from . import resprule
|
||||
from .resprule import resprule
|
||||
from .bansess import bansess
|
||||
from .cntfilter import cntfilter
|
||||
from .longtext import longtext
|
||||
@@ -34,11 +34,6 @@ class QQBotManager:
|
||||
|
||||
bot_account_id: int = 0
|
||||
|
||||
enable_banlist = False
|
||||
|
||||
enable_private = True
|
||||
enable_group = True
|
||||
|
||||
ban_person = []
|
||||
ban_group = []
|
||||
|
||||
@@ -48,6 +43,7 @@ class QQBotManager:
|
||||
bansess_mgr: bansess.SessionBanManager = None
|
||||
cntfilter_mgr: cntfilter.ContentFilterManager = None
|
||||
longtext_pcs: longtext.LongTextProcessor = None
|
||||
resprule_chkr: resprule.GroupRespondRuleChecker = None
|
||||
|
||||
def __init__(self, first_time_init=True, ap: app.Application = None):
|
||||
config = context.get_config_manager().data
|
||||
@@ -56,6 +52,7 @@ class QQBotManager:
|
||||
self.bansess_mgr = bansess.SessionBanManager(ap)
|
||||
self.cntfilter_mgr = cntfilter.ContentFilterManager(ap)
|
||||
self.longtext_pcs = longtext.LongTextProcessor(ap)
|
||||
self.resprule_chkr = resprule.GroupRespondRuleChecker(ap)
|
||||
|
||||
self.timeout = config['process_message_timeout']
|
||||
self.retry = config['retry_times']
|
||||
@@ -64,6 +61,7 @@ class QQBotManager:
|
||||
await self.bansess_mgr.initialize()
|
||||
await self.cntfilter_mgr.initialize()
|
||||
await self.longtext_pcs.initialize()
|
||||
await self.resprule_chkr.initialize()
|
||||
|
||||
config = context.get_config_manager().data
|
||||
|
||||
@@ -251,17 +249,13 @@ class QQBotManager:
|
||||
async def on_person_message(self, event: MessageEvent):
|
||||
reply = ''
|
||||
|
||||
if not self.enable_private:
|
||||
logging.debug("已在banlist.py中禁用所有私聊")
|
||||
|
||||
else:
|
||||
reply = await self.common_process(
|
||||
launcher_type="person",
|
||||
launcher_id=event.sender.id,
|
||||
text_message=str(event.message_chain),
|
||||
message_chain=event.message_chain,
|
||||
sender_id=event.sender.id
|
||||
)
|
||||
reply = await self.common_process(
|
||||
launcher_type="person",
|
||||
launcher_id=event.sender.id,
|
||||
text_message=str(event.message_chain),
|
||||
message_chain=event.message_chain,
|
||||
sender_id=event.sender.id
|
||||
)
|
||||
|
||||
if reply:
|
||||
await self.send(event, reply, check_quote=False, check_at_sender=False)
|
||||
@@ -269,39 +263,25 @@ class QQBotManager:
|
||||
# 群消息处理
|
||||
async def on_group_message(self, event: GroupMessage):
|
||||
reply = ''
|
||||
|
||||
if not self.enable_group:
|
||||
logging.debug("已在banlist.py中禁用所有群聊")
|
||||
|
||||
else:
|
||||
do_req = False
|
||||
text = str(event.message_chain).strip()
|
||||
if At(self.bot_account_id) in event.message_chain and resprule.response_at(event.group.id):
|
||||
# 直接调用
|
||||
# reply = await process()
|
||||
event.message_chain.remove(At(self.bot_account_id))
|
||||
text = str(event.message_chain).strip()
|
||||
do_req = True
|
||||
else:
|
||||
check, result = resprule.check_response_rule(event.group.id, str(event.message_chain).strip())
|
||||
text = str(event.message_chain).strip()
|
||||
|
||||
if check:
|
||||
do_req = True
|
||||
text = result.strip()
|
||||
# 检查是否随机响应
|
||||
elif resprule.random_responding(event.group.id):
|
||||
logging.info("随机响应group_{}消息".format(event.group.id))
|
||||
# reply = await process()
|
||||
do_req = True
|
||||
rule_check_res = await self.resprule_chkr.check(
|
||||
text,
|
||||
event.message_chain,
|
||||
event.group.id,
|
||||
event.sender.id
|
||||
)
|
||||
|
||||
if do_req:
|
||||
reply = await self.common_process(
|
||||
launcher_type="group",
|
||||
launcher_id=event.group.id,
|
||||
text_message=text,
|
||||
message_chain=event.message_chain,
|
||||
sender_id=event.sender.id
|
||||
)
|
||||
if rule_check_res.matching:
|
||||
text = str(rule_check_res.replacement).strip()
|
||||
reply = await self.common_process(
|
||||
launcher_type="group",
|
||||
launcher_id=event.group.id,
|
||||
text_message=text,
|
||||
message_chain=rule_check_res.replacement,
|
||||
sender_id=event.sender.id
|
||||
)
|
||||
|
||||
if reply:
|
||||
await self.send(event, reply)
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
from ..utils import context
|
||||
|
||||
|
||||
# 检查消息是否符合泛响应匹配机制
|
||||
def check_response_rule(group_id:int, text: str):
|
||||
config = context.get_config_manager().data
|
||||
|
||||
rules = config['response_rules']
|
||||
|
||||
# 检查是否有特定规则
|
||||
if 'prefix' not in config['response_rules']:
|
||||
if str(group_id) in config['response_rules']:
|
||||
rules = config['response_rules'][str(group_id)]
|
||||
else:
|
||||
rules = config['response_rules']['default']
|
||||
|
||||
# 检查前缀匹配
|
||||
if 'prefix' in rules:
|
||||
for rule in rules['prefix']:
|
||||
if text.startswith(rule):
|
||||
return True, text.replace(rule, "", 1)
|
||||
|
||||
# 检查正则表达式匹配
|
||||
if 'regexp' in rules:
|
||||
for rule in rules['regexp']:
|
||||
import re
|
||||
match = re.match(rule, text)
|
||||
if match:
|
||||
return True, text
|
||||
|
||||
return False, ""
|
||||
|
||||
|
||||
def response_at(group_id: int):
|
||||
config = context.get_config_manager().data
|
||||
|
||||
use_response_rule = config['response_rules']
|
||||
|
||||
# 检查是否有特定规则
|
||||
if 'prefix' not in config['response_rules']:
|
||||
if str(group_id) in config['response_rules']:
|
||||
use_response_rule = config['response_rules'][str(group_id)]
|
||||
else:
|
||||
use_response_rule = config['response_rules']['default']
|
||||
|
||||
if 'at' not in use_response_rule:
|
||||
return True
|
||||
|
||||
return use_response_rule['at']
|
||||
|
||||
|
||||
def random_responding(group_id):
|
||||
config = context.get_config_manager().data
|
||||
|
||||
use_response_rule = config['response_rules']
|
||||
|
||||
# 检查是否有特定规则
|
||||
if 'prefix' not in config['response_rules']:
|
||||
if str(group_id) in config['response_rules']:
|
||||
use_response_rule = config['response_rules'][str(group_id)]
|
||||
else:
|
||||
use_response_rule = config['response_rules']['default']
|
||||
|
||||
if 'random_rate' in use_response_rule:
|
||||
import random
|
||||
return random.random() < use_response_rule['random_rate']
|
||||
return False
|
||||
0
pkg/qqbot/resprule/__init__.py
Normal file
0
pkg/qqbot/resprule/__init__.py
Normal file
10
pkg/qqbot/resprule/entities.py
Normal file
10
pkg/qqbot/resprule/entities.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import pydantic
|
||||
import mirai
|
||||
|
||||
|
||||
class RuleJudgeResult(pydantic.BaseModel):
|
||||
|
||||
matching: bool = False
|
||||
|
||||
replacement: mirai.MessageChain = None
|
||||
|
||||
58
pkg/qqbot/resprule/resprule.py
Normal file
58
pkg/qqbot/resprule/resprule.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import mirai
|
||||
|
||||
from ...boot import app
|
||||
from . import entities, rule
|
||||
from .rules import atbot, prefix, regexp, random
|
||||
|
||||
|
||||
class GroupRespondRuleChecker:
|
||||
"""群组响应规则检查器
|
||||
"""
|
||||
|
||||
ap: app.Application
|
||||
|
||||
rule_matchers: list[rule.GroupRespondRule]
|
||||
|
||||
def __init__(self, ap: app.Application):
|
||||
self.ap = ap
|
||||
|
||||
async def initialize(self):
|
||||
"""初始化检查器
|
||||
"""
|
||||
self.rule_matchers = [
|
||||
atbot.AtBotRule(self.ap),
|
||||
prefix.PrefixRule(self.ap),
|
||||
regexp.RegExpRule(self.ap),
|
||||
random.RandomRespRule(self.ap),
|
||||
]
|
||||
|
||||
for rule_matcher in self.rule_matchers:
|
||||
await rule_matcher.initialize()
|
||||
|
||||
async def check(
|
||||
self,
|
||||
message_text: str,
|
||||
message_chain: mirai.MessageChain,
|
||||
launcher_id: int,
|
||||
sender_id: int,
|
||||
) -> entities.RuleJudgeResult:
|
||||
"""检查消息是否匹配规则
|
||||
"""
|
||||
rules = self.ap.cfg_mgr.data['response_rules']
|
||||
|
||||
use_rule = rules['default']
|
||||
|
||||
if str(launcher_id) in use_rule:
|
||||
use_rule = use_rule[str(launcher_id)]
|
||||
|
||||
for rule_matcher in self.rule_matchers:
|
||||
res = await rule_matcher.match(message_text, message_chain, use_rule)
|
||||
if res.matching:
|
||||
return res
|
||||
|
||||
return entities.RuleJudgeResult(
|
||||
matching=False,
|
||||
replacement=message_chain
|
||||
)
|
||||
31
pkg/qqbot/resprule/rule.py
Normal file
31
pkg/qqbot/resprule/rule.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from __future__ import annotations
|
||||
import abc
|
||||
|
||||
import mirai
|
||||
|
||||
from ...boot import app
|
||||
from . import entities
|
||||
|
||||
|
||||
class GroupRespondRule(metaclass=abc.ABCMeta):
|
||||
"""群组响应规则的抽象类
|
||||
"""
|
||||
|
||||
ap: app.Application
|
||||
|
||||
def __init__(self, ap: app.Application):
|
||||
self.ap = ap
|
||||
|
||||
async def initialize(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def match(
|
||||
self,
|
||||
message_text: str,
|
||||
message_chain: mirai.MessageChain,
|
||||
rule_dict: dict
|
||||
) -> entities.RuleJudgeResult:
|
||||
"""判断消息是否匹配规则
|
||||
"""
|
||||
raise NotImplementedError
|
||||
0
pkg/qqbot/resprule/rules/__init__.py
Normal file
0
pkg/qqbot/resprule/rules/__init__.py
Normal file
28
pkg/qqbot/resprule/rules/atbot.py
Normal file
28
pkg/qqbot/resprule/rules/atbot.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import mirai
|
||||
|
||||
from .. import rule as rule_model
|
||||
from .. import entities
|
||||
|
||||
|
||||
class AtBotRule(rule_model.GroupRespondRule):
|
||||
|
||||
async def match(
|
||||
self,
|
||||
message_text: str,
|
||||
message_chain: mirai.MessageChain,
|
||||
rule_dict: dict
|
||||
) -> entities.RuleJudgeResult:
|
||||
|
||||
if message_chain.has(mirai.At(self.ap.im_mgr.bot_account_id)) and rule_dict['at']:
|
||||
message_chain.remove(mirai.At(self.ap.im_mgr.bot_account_id))
|
||||
return entities.RuleJudgeResult(
|
||||
matching=True,
|
||||
replacement=message_chain,
|
||||
)
|
||||
|
||||
return entities.RuleJudgeResult(
|
||||
matching=False,
|
||||
replacement = message_chain
|
||||
)
|
||||
29
pkg/qqbot/resprule/rules/prefix.py
Normal file
29
pkg/qqbot/resprule/rules/prefix.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import mirai
|
||||
|
||||
from .. import rule as rule_model
|
||||
from .. import entities
|
||||
|
||||
|
||||
class PrefixRule(rule_model.GroupRespondRule):
|
||||
|
||||
async def match(
|
||||
self,
|
||||
message_text: str,
|
||||
message_chain: mirai.MessageChain,
|
||||
rule_dict: dict
|
||||
) -> entities.RuleJudgeResult:
|
||||
prefixes = rule_dict['prefix']
|
||||
|
||||
for prefix in prefixes:
|
||||
if message_text.startswith(prefix):
|
||||
return entities.RuleJudgeResult(
|
||||
matching=True,
|
||||
replacement=mirai.MessageChain([
|
||||
mirai.Plain(message_text[len(prefix):])
|
||||
]),
|
||||
)
|
||||
|
||||
return entities.RuleJudgeResult(
|
||||
matching=False,
|
||||
replacement=message_chain
|
||||
)
|
||||
22
pkg/qqbot/resprule/rules/random.py
Normal file
22
pkg/qqbot/resprule/rules/random.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import random
|
||||
|
||||
import mirai
|
||||
|
||||
from .. import rule as rule_model
|
||||
from .. import entities
|
||||
|
||||
|
||||
class RandomRespRule(rule_model.GroupRespondRule):
|
||||
|
||||
async def match(
|
||||
self,
|
||||
message_text: str,
|
||||
message_chain: mirai.MessageChain,
|
||||
rule_dict: dict
|
||||
) -> entities.RuleJudgeResult:
|
||||
random_rate = rule_dict['random_rate']
|
||||
|
||||
return entities.RuleJudgeResult(
|
||||
matching=random.random() < random_rate,
|
||||
replacement=message_chain
|
||||
)
|
||||
31
pkg/qqbot/resprule/rules/regexp.py
Normal file
31
pkg/qqbot/resprule/rules/regexp.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import re
|
||||
|
||||
import mirai
|
||||
|
||||
from .. import rule as rule_model
|
||||
from .. import entities
|
||||
|
||||
|
||||
class RegExpRule(rule_model.GroupRespondRule):
|
||||
|
||||
async def match(
|
||||
self,
|
||||
message_text: str,
|
||||
message_chain: mirai.MessageChain,
|
||||
rule_dict: dict
|
||||
) -> entities.RuleJudgeResult:
|
||||
regexps = rule_dict['regexp']
|
||||
|
||||
for regexp in regexps:
|
||||
match = re.match(regexp, message_text)
|
||||
|
||||
if match:
|
||||
return entities.RuleJudgeResult(
|
||||
matching=True,
|
||||
replacement=message_chain,
|
||||
)
|
||||
|
||||
return entities.RuleJudgeResult(
|
||||
matching=False,
|
||||
replacement=message_chain
|
||||
)
|
||||
Reference in New Issue
Block a user