refactor: 独立长消息处理为longtext包

This commit is contained in:
RockChinQ
2024-01-25 17:05:09 +08:00
parent a9a798b19d
commit ea9ae85428
11 changed files with 344 additions and 316 deletions

View File

@@ -71,11 +71,6 @@ async def make_app() -> app.Application:
"tips-custom-template.py"
)
# 初始化文字转图片
from pkg.utils import text2img
# TODO make it async
text2img.initialize()
# 检查管理员QQ号
if cfg_mgr.data['admin_qq'] == 0:
qcg_logger.warning("未设置管理员QQ号将无法使用管理员命令请在 config.py 中修改 admin_qq")

View File

@@ -1,100 +0,0 @@
# 长消息处理相关
import os
import time
import base64
import typing
from mirai.models.message import MessageComponent, MessageChain, Image
from mirai.models.message import ForwardMessageNode
from mirai.models.base import MiraiBaseModel
from ..utils import text2img
from ..utils import context
class ForwardMessageDiaplay(MiraiBaseModel):
title: str = "群聊的聊天记录"
brief: str = "[聊天记录]"
source: str = "聊天记录"
preview: typing.List[str] = []
summary: str = "查看x条转发消息"
class Forward(MessageComponent):
"""合并转发。"""
type: str = "Forward"
"""消息组件类型。"""
display: ForwardMessageDiaplay
"""显示信息"""
node_list: typing.List[ForwardMessageNode]
"""转发消息节点列表。"""
def __init__(self, *args, **kwargs):
if len(args) == 1:
self.node_list = args[0]
super().__init__(**kwargs)
super().__init__(*args, **kwargs)
def __str__(self):
return '[聊天记录]'
def text_to_image(text: str) -> MessageComponent:
"""将文本转换成图片"""
# 检查temp文件夹是否存在
if not os.path.exists('temp'):
os.mkdir('temp')
img_path = text2img.text_to_image(text_str=text, save_as='temp/{}.png'.format(int(time.time())))
compressed_path, size = text2img.compress_image(img_path, outfile="temp/{}_compressed.png".format(int(time.time())))
# 读取图片转换成base64
with open(compressed_path, 'rb') as f:
img = f.read()
b64 = base64.b64encode(img)
# 删除图片
os.remove(img_path)
# 判断compressed_path是否存在
if os.path.exists(compressed_path):
os.remove(compressed_path)
# 返回图片
return Image(base64=b64.decode('utf-8'))
def check_text(text: str) -> list:
"""检查文本是否为长消息,并转换成该使用的消息链组件"""
config = context.get_config_manager().data
if len(text) > config['blob_message_threshold']:
# logging.info("长消息: {}".format(text))
if config['blob_message_strategy'] == 'image':
# 转换成图片
return [text_to_image(text)]
elif config['blob_message_strategy'] == 'forward':
# 包装转发消息
display = ForwardMessageDiaplay(
title='群聊的聊天记录',
brief='[聊天记录]',
source='聊天记录',
preview=["bot: "+text],
summary="查看1条转发消息"
)
node = ForwardMessageNode(
sender_id=config['mirai_http_api_config']['qq'],
sender_name='bot',
message_chain=MessageChain([text])
)
forward = Forward(
display=display,
node_list=[node]
)
return [forward]
else:
return [text]

View File

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
import os
import traceback
from PIL import Image, ImageDraw, ImageFont
from mirai.models.message import MessageComponent, Plain
from ...boot import app
from . import strategy
from .strategies import image, forward
class LongTextProcessor:
ap: app.Application
strategy_impl: strategy.LongTextStrategy
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self):
config = self.ap.cfg_mgr.data
if self.ap.cfg_mgr.data['blob_message_strategy'] == 'image':
use_font = config['font_path']
try:
# 检查是否存在
if not os.path.exists(use_font):
# 若是windows系统使用微软雅黑
if os.name == "nt":
use_font = "C:/Windows/Fonts/msyh.ttc"
if not os.path.exists(use_font):
self.ap.logger.warn("未找到字体文件且无法使用Windows自带字体更换为转发消息组件以发送长消息您可以在config.py中调整相关设置。")
config['blob_message_strategy'] = "forward"
else:
self.ap.logger.info("使用Windows自带字体" + use_font)
self.ap.cfg_mgr.data['font_path'] = use_font
else:
self.ap.logger.warn("未找到字体文件且无法使用系统自带字体更换为转发消息组件以发送长消息您可以在config.py中调整相关设置。")
self.ap.cfg_mgr.data['blob_message_strategy'] = "forward"
except:
traceback.print_exc()
self.ap.logger.error("加载字体文件失败({})更换为转发消息组件以发送长消息您可以在config.py中调整相关设置。".format(use_font))
self.ap.cfg_mgr.data['blob_message_strategy'] = "forward"
if self.ap.cfg_mgr.data['blob_message_strategy'] == 'image':
self.strategy_impl = image.Text2ImageStrategy(self.ap)
elif self.ap.cfg_mgr.data['blob_message_strategy'] == 'forward':
self.strategy_impl = forward.ForwardComponentStrategy(self.ap)
await self.strategy_impl.initialize()
async def check_and_process(self, message: str) -> list[MessageComponent]:
if len(message) > self.ap.cfg_mgr.data['blob_message_threshold']:
return await self.strategy_impl.process(message)
else:
return [Plain(message)]

View File

@@ -0,0 +1,62 @@
# 转发消息组件
from __future__ import annotations
import typing
from mirai.models import MessageChain
from mirai.models.message import MessageComponent, ForwardMessageNode
from mirai.models.base import MiraiBaseModel
from .. import strategy as strategy_model
class ForwardMessageDiaplay(MiraiBaseModel):
title: str = "群聊的聊天记录"
brief: str = "[聊天记录]"
source: str = "聊天记录"
preview: typing.List[str] = []
summary: str = "查看x条转发消息"
class Forward(MessageComponent):
"""合并转发。"""
type: str = "Forward"
"""消息组件类型。"""
display: ForwardMessageDiaplay
"""显示信息"""
node_list: typing.List[ForwardMessageNode]
"""转发消息节点列表。"""
def __init__(self, *args, **kwargs):
if len(args) == 1:
self.node_list = args[0]
super().__init__(**kwargs)
super().__init__(*args, **kwargs)
def __str__(self):
return '[聊天记录]'
class ForwardComponentStrategy(strategy_model.LongTextStrategy):
async def process(self, message: str) -> list[MessageComponent]:
display = ForwardMessageDiaplay(
title="群聊的聊天记录",
brief="[聊天记录]",
source="聊天记录",
preview=["QQ用户: "+message],
summary="查看1条转发消息"
)
node_list = [
ForwardMessageNode(
sender_id=self.ap.im_mgr.bot_account_id,
sender_name='QQ用户',
message_chain=MessageChain([message])
)
]
forward = Forward(
display=display,
node_list=node_list
)
return [forward]

View File

@@ -0,0 +1,197 @@
from __future__ import annotations
import typing
import os
import base64
import time
import re
from PIL import Image, ImageDraw, ImageFont
from mirai.models import MessageChain, Image as ImageComponent
from mirai.models.message import MessageComponent
from .. import strategy as strategy_model
class Text2ImageStrategy(strategy_model.LongTextStrategy):
text_render_font: ImageFont.FreeTypeFont
async def initialize(self):
self.text_render_font = ImageFont.truetype(self.ap.cfg_mgr.data['font_path'], 32, encoding="utf-8")
async def process(self, message: str) -> list[MessageComponent]:
img_path = self.text_to_image(
text_str=message,
save_as='temp/{}.png'.format(int(time.time()))
)
compressed_path, size = self.compress_image(
img_path,
outfile="temp/{}_compressed.png".format(int(time.time()))
)
with open(compressed_path, 'rb') as f:
img = f.read()
b64 = base64.b64encode(img)
# 删除图片
os.remove(img_path)
if os.path.exists(compressed_path):
os.remove(compressed_path)
return [
ImageComponent(
base64=b64.decode('utf-8'),
)
]
def indexNumber(self, path=''):
"""
查找字符串中数字所在串中的位置
:param path:目标字符串
:return:<class 'list'>: <class 'list'>: [['1', 16], ['2', 35], ['1', 51]]
"""
kv = []
nums = []
beforeDatas = re.findall('[\d]+', path)
for num in beforeDatas:
indexV = []
times = path.count(num)
if times > 1:
if num not in nums:
indexs = re.finditer(num, path)
for index in indexs:
iV = []
i = index.span()[0]
iV.append(num)
iV.append(i)
kv.append(iV)
nums.append(num)
else:
index = path.find(num)
indexV.append(num)
indexV.append(index)
kv.append(indexV)
# 根据数字位置排序
indexSort = []
resultIndex = []
for vi in kv:
indexSort.append(vi[1])
indexSort.sort()
for i in indexSort:
for v in kv:
if i == v[1]:
resultIndex.append(v)
return resultIndex
def get_size(self, file):
# 获取文件大小:KB
size = os.path.getsize(file)
return size / 1024
def get_outfile(self, infile, outfile):
if outfile:
return outfile
dir, suffix = os.path.splitext(infile)
outfile = '{}-out{}'.format(dir, suffix)
return outfile
def compress_image(self, infile, outfile='', kb=100, step=20, quality=90):
"""不改变图片尺寸压缩到指定大小
:param infile: 压缩源文件
:param outfile: 压缩文件保存地址
:param mb: 压缩目标,KB
:param step: 每次调整的压缩比率
:param quality: 初始压缩比率
:return: 压缩文件地址,压缩文件大小
"""
o_size = self.get_size(infile)
if o_size <= kb:
return infile, o_size
outfile = self.get_outfile(infile, outfile)
while o_size > kb:
im = Image.open(infile)
im.save(outfile, quality=quality)
if quality - step < 0:
break
quality -= step
o_size = self.get_size(outfile)
return outfile, self.get_size(outfile)
def text_to_image(self, text_str: str, save_as="temp.png", width=800):
text_str = text_str.replace("\t", " ")
# 分行
lines = text_str.split('\n')
# 计算并分割
final_lines = []
text_width = width-80
self.ap.logger.debug("lines: {}, text_width: {}".format(lines, text_width))
for line in lines:
# 如果长了就分割
line_width = self.text_render_font.getlength(line)
self.ap.logger.debug("line_width: {}".format(line_width))
if line_width < text_width:
final_lines.append(line)
continue
else:
rest_text = line
while True:
# 分割最前面的一行
point = int(len(rest_text) * (text_width / line_width))
# 检查断点是否在数字中间
numbers = self.indexNumber(rest_text)
for number in numbers:
if number[1] < point < number[1] + len(number[0]) and number[1] != 0:
point = number[1]
break
final_lines.append(rest_text[:point])
rest_text = rest_text[point:]
line_width = self.text_render_font.getlength(rest_text)
if line_width < text_width:
final_lines.append(rest_text)
break
else:
continue
# 准备画布
img = Image.new('RGBA', (width, max(280, len(final_lines) * 35 + 65)), (255, 255, 255, 255))
draw = ImageDraw.Draw(img, mode='RGBA')
self.ap.logger.debug("正在绘制图片...")
# 绘制正文
line_number = 0
offset_x = 20
offset_y = 30
for final_line in final_lines:
draw.text((offset_x, offset_y + 35 * line_number), final_line, fill=(0, 0, 0), font=self.text_render_font)
# 遍历此行,检查是否有emoji
idx_in_line = 0
for ch in final_line:
# 检查字符占位宽
char_code = ord(ch)
if char_code >= 127:
idx_in_line += 1
else:
idx_in_line += 0.5
line_number += 1
self.ap.logger.debug("正在保存图片...")
img.save(save_as)
return save_as

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
import abc
import typing
import mirai
from mirai.models.message import MessageComponent
from ...boot import app
class LongTextStrategy(metaclass=abc.ABCMeta):
ap: app.Application
def __init__(self, ap: app.Application):
self.ap = ap
async def initialize(self):
pass
@abc.abstractmethod
async def process(self, message: str) -> list[MessageComponent]:
return []

View File

@@ -21,6 +21,7 @@ from ..qqbot import adapter as msadapter
from . import resprule
from .bansess import bansess
from .cntfilter import cntfilter
from .longtext import longtext
from ..boot import app
@@ -46,6 +47,7 @@ class QQBotManager:
bansess_mgr: bansess.SessionBanManager = None
cntfilter_mgr: cntfilter.ContentFilterManager = None
longtext_pcs: longtext.LongTextProcessor = None
def __init__(self, first_time_init=True, ap: app.Application = None):
config = context.get_config_manager().data
@@ -53,6 +55,7 @@ class QQBotManager:
self.ap = ap
self.bansess_mgr = bansess.SessionBanManager(ap)
self.cntfilter_mgr = cntfilter.ContentFilterManager(ap)
self.longtext_pcs = longtext.LongTextProcessor(ap)
self.timeout = config['process_message_timeout']
self.retry = config['retry_times']
@@ -60,6 +63,7 @@ class QQBotManager:
async def initialize(self):
await self.bansess_mgr.initialize()
await self.cntfilter_mgr.initialize()
await self.longtext_pcs.initialize()
config = context.get_config_manager().data
@@ -149,6 +153,7 @@ class QQBotManager:
await self.on_group_message(event)
asyncio.create_task(group_message_handler(event))
self.adapter.register_listener(
GroupMessage,
on_group_message

View File

@@ -14,7 +14,6 @@ from ..utils import context
from ..plugin import host as plugin_host
from ..plugin import models as plugin_models
from ..qqbot import blob
import tips as tips_custom
from ..boot import app
from .cntfilter import entities
@@ -158,8 +157,8 @@ async def process_message(launcher_type: str, launcher_id: int, text_message: st
return []
else:
reply = [cntfilter_res.replacement]
reply = blob.check_text(reply[0])
reply = await mgr.longtext_pcs.check_and_process(reply[0])
else:
logging.info("回复[{}]消息".format(session_name))

View File

@@ -1,208 +0,0 @@
import logging
import re
import os
import traceback
from PIL import Image, ImageDraw, ImageFont
from ..utils import context
text_render_font: ImageFont = None
def initialize():
global text_render_font
logging.debug("初始化文字转图片模块...")
config = context.get_config_manager().data
if config['blob_message_strategy'] == "image": # 仅在启用了image时才加载字体
use_font = config['font_path']
try:
# 检查是否存在
if not os.path.exists(use_font):
# 若是windows系统使用微软雅黑
if os.name == "nt":
use_font = "C:/Windows/Fonts/msyh.ttc"
if not os.path.exists(use_font):
logging.warn("未找到字体文件且无法使用Windows自带字体更换为转发消息组件以发送长消息您可以在config.py中调整相关设置。")
config['blob_message_strategy'] = "forward"
else:
logging.info("使用Windows自带字体" + use_font)
text_render_font = ImageFont.truetype(use_font, 32, encoding="utf-8")
else:
logging.warn("未找到字体文件且无法使用Windows自带字体更换为转发消息组件以发送长消息您可以在config.py中调整相关设置。")
config['blob_message_strategy'] = "forward"
else:
text_render_font = ImageFont.truetype(use_font, 32, encoding="utf-8")
except:
traceback.print_exc()
logging.error("加载字体文件失败({})更换为转发消息组件以发送长消息您可以在config.py中调整相关设置。".format(use_font))
config['blob_message_strategy'] = "forward"
logging.debug("字体文件加载完成。")
def indexNumber(path=''):
"""
查找字符串中数字所在串中的位置
:param path:目标字符串
:return:<class 'list'>: <class 'list'>: [['1', 16], ['2', 35], ['1', 51]]
"""
kv = []
nums = []
beforeDatas = re.findall('[\d]+', path)
for num in beforeDatas:
indexV = []
times = path.count(num)
if times > 1:
if num not in nums:
indexs = re.finditer(num, path)
for index in indexs:
iV = []
i = index.span()[0]
iV.append(num)
iV.append(i)
kv.append(iV)
nums.append(num)
else:
index = path.find(num)
indexV.append(num)
indexV.append(index)
kv.append(indexV)
# 根据数字位置排序
indexSort = []
resultIndex = []
for vi in kv:
indexSort.append(vi[1])
indexSort.sort()
for i in indexSort:
for v in kv:
if i == v[1]:
resultIndex.append(v)
return resultIndex
def get_size(file):
# 获取文件大小:KB
size = os.path.getsize(file)
return size / 1024
def get_outfile(infile, outfile):
if outfile:
return outfile
dir, suffix = os.path.splitext(infile)
outfile = '{}-out{}'.format(dir, suffix)
return outfile
def compress_image(infile, outfile='', kb=100, step=20, quality=90):
"""不改变图片尺寸压缩到指定大小
:param infile: 压缩源文件
:param outfile: 压缩文件保存地址
:param mb: 压缩目标,KB
:param step: 每次调整的压缩比率
:param quality: 初始压缩比率
:return: 压缩文件地址,压缩文件大小
"""
o_size = get_size(infile)
if o_size <= kb:
return infile, o_size
outfile = get_outfile(infile, outfile)
while o_size > kb:
im = Image.open(infile)
im.save(outfile, quality=quality)
if quality - step < 0:
break
quality -= step
o_size = get_size(outfile)
return outfile, get_size(outfile)
def text_to_image(text_str: str, save_as="temp.png", width=800):
global text_render_font
logging.debug("正在将文本转换为图片...")
text_str = text_str.replace("\t", " ")
# 分行
lines = text_str.split('\n')
# 计算并分割
final_lines = []
text_width = width-80
logging.debug("lines: {}, text_width: {}".format(lines, text_width))
for line in lines:
logging.debug(type(text_render_font))
# 如果长了就分割
line_width = text_render_font.getlength(line)
logging.debug("line_width: {}".format(line_width))
if line_width < text_width:
final_lines.append(line)
continue
else:
rest_text = line
while True:
# 分割最前面的一行
point = int(len(rest_text) * (text_width / line_width))
# 检查断点是否在数字中间
numbers = indexNumber(rest_text)
for number in numbers:
if number[1] < point < number[1] + len(number[0]) and number[1] != 0:
point = number[1]
break
final_lines.append(rest_text[:point])
rest_text = rest_text[point:]
line_width = text_render_font.getlength(rest_text)
if line_width < text_width:
final_lines.append(rest_text)
break
else:
continue
# 准备画布
img = Image.new('RGBA', (width, max(280, len(final_lines) * 35 + 65)), (255, 255, 255, 255))
draw = ImageDraw.Draw(img, mode='RGBA')
logging.debug("正在绘制图片...")
# 绘制正文
line_number = 0
offset_x = 20
offset_y = 30
for final_line in final_lines:
draw.text((offset_x, offset_y + 35 * line_number), final_line, fill=(0, 0, 0), font=text_render_font)
# 遍历此行,检查是否有emoji
idx_in_line = 0
for ch in final_line:
# if self.is_emoji(ch):
# emoji_img_valid = ensure_emoji(hex(ord(ch))[2:])
# if emoji_img_valid: # emoji图像可用,绘制到指定位置
# emoji_image = Image.open("emojis/{}.png".format(hex(ord(ch))[2:]), mode='r').convert('RGBA')
# emoji_image = emoji_image.resize((32, 32))
# x, y = emoji_image.size
# final_emoji_img = Image.new('RGBA', emoji_image.size, (255, 255, 255))
# final_emoji_img.paste(emoji_image, (0, 0, x, y), emoji_image)
# img.paste(final_emoji_img, box=(int(offset_x + idx_in_line * 32), offset_y + 35 * line_number))
# 检查字符占位宽
char_code = ord(ch)
if char_code >= 127:
idx_in_line += 1
else:
idx_in_line += 0.5
line_number += 1
logging.debug("正在保存图片...")
img.save(save_as)
return save_as