diff --git a/pkg/boot/boot.py b/pkg/boot/boot.py index a2b2d7c7..b7cc0f38 100644 --- a/pkg/boot/boot.py +++ b/pkg/boot/boot.py @@ -71,11 +71,6 @@ async def make_app() -> app.Application: "tips-custom-template.py" ) - # 初始化文字转图片 - from pkg.utils import text2img - # TODO make it async - text2img.initialize() - # 检查管理员QQ号 if cfg_mgr.data['admin_qq'] == 0: qcg_logger.warning("未设置管理员QQ号,将无法使用管理员命令,请在 config.py 中修改 admin_qq") diff --git a/pkg/qqbot/blob.py b/pkg/qqbot/blob.py deleted file mode 100644 index d8373cd8..00000000 --- a/pkg/qqbot/blob.py +++ /dev/null @@ -1,100 +0,0 @@ -# 长消息处理相关 -import os -import time -import base64 -import typing - -from mirai.models.message import MessageComponent, MessageChain, Image -from mirai.models.message import ForwardMessageNode -from mirai.models.base import MiraiBaseModel - -from ..utils import text2img -from ..utils import context - - -class ForwardMessageDiaplay(MiraiBaseModel): - title: str = "群聊的聊天记录" - brief: str = "[聊天记录]" - source: str = "聊天记录" - preview: typing.List[str] = [] - summary: str = "查看x条转发消息" - - -class Forward(MessageComponent): - """合并转发。""" - type: str = "Forward" - """消息组件类型。""" - display: ForwardMessageDiaplay - """显示信息""" - node_list: typing.List[ForwardMessageNode] - """转发消息节点列表。""" - def __init__(self, *args, **kwargs): - if len(args) == 1: - self.node_list = args[0] - super().__init__(**kwargs) - super().__init__(*args, **kwargs) - - def __str__(self): - return '[聊天记录]' - - -def text_to_image(text: str) -> MessageComponent: - """将文本转换成图片""" - # 检查temp文件夹是否存在 - if not os.path.exists('temp'): - os.mkdir('temp') - img_path = text2img.text_to_image(text_str=text, save_as='temp/{}.png'.format(int(time.time()))) - - compressed_path, size = text2img.compress_image(img_path, outfile="temp/{}_compressed.png".format(int(time.time()))) - # 读取图片,转换成base64 - with open(compressed_path, 'rb') as f: - img = f.read() - - b64 = base64.b64encode(img) - - # 删除图片 - os.remove(img_path) - - # 判断compressed_path是否存在 - if os.path.exists(compressed_path): - os.remove(compressed_path) - # 返回图片 - return Image(base64=b64.decode('utf-8')) - - -def check_text(text: str) -> list: - """检查文本是否为长消息,并转换成该使用的消息链组件""" - - config = context.get_config_manager().data - - if len(text) > config['blob_message_threshold']: - - # logging.info("长消息: {}".format(text)) - if config['blob_message_strategy'] == 'image': - # 转换成图片 - return [text_to_image(text)] - elif config['blob_message_strategy'] == 'forward': - - # 包装转发消息 - display = ForwardMessageDiaplay( - title='群聊的聊天记录', - brief='[聊天记录]', - source='聊天记录', - preview=["bot: "+text], - summary="查看1条转发消息" - ) - - node = ForwardMessageNode( - sender_id=config['mirai_http_api_config']['qq'], - sender_name='bot', - message_chain=MessageChain([text]) - ) - - forward = Forward( - display=display, - node_list=[node] - ) - - return [forward] - else: - return [text] \ No newline at end of file diff --git a/pkg/qqbot/longtext/__init__.py b/pkg/qqbot/longtext/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/qqbot/longtext/longtext.py b/pkg/qqbot/longtext/longtext.py new file mode 100644 index 00000000..21267880 --- /dev/null +++ b/pkg/qqbot/longtext/longtext.py @@ -0,0 +1,56 @@ +from __future__ import annotations +import os +import traceback + +from PIL import Image, ImageDraw, ImageFont +from mirai.models.message import MessageComponent, Plain + +from ...boot import app +from . import strategy +from .strategies import image, forward + + +class LongTextProcessor: + + ap: app.Application + + strategy_impl: strategy.LongTextStrategy + + def __init__(self, ap: app.Application): + self.ap = ap + + async def initialize(self): + config = self.ap.cfg_mgr.data + if self.ap.cfg_mgr.data['blob_message_strategy'] == 'image': + use_font = config['font_path'] + try: + # 检查是否存在 + if not os.path.exists(use_font): + # 若是windows系统,使用微软雅黑 + if os.name == "nt": + use_font = "C:/Windows/Fonts/msyh.ttc" + if not os.path.exists(use_font): + self.ap.logger.warn("未找到字体文件,且无法使用Windows自带字体,更换为转发消息组件以发送长消息,您可以在config.py中调整相关设置。") + config['blob_message_strategy'] = "forward" + else: + self.ap.logger.info("使用Windows自带字体:" + use_font) + self.ap.cfg_mgr.data['font_path'] = use_font + else: + self.ap.logger.warn("未找到字体文件,且无法使用系统自带字体,更换为转发消息组件以发送长消息,您可以在config.py中调整相关设置。") + self.ap.cfg_mgr.data['blob_message_strategy'] = "forward" + except: + traceback.print_exc() + self.ap.logger.error("加载字体文件失败({}),更换为转发消息组件以发送长消息,您可以在config.py中调整相关设置。".format(use_font)) + self.ap.cfg_mgr.data['blob_message_strategy'] = "forward" + + if self.ap.cfg_mgr.data['blob_message_strategy'] == 'image': + self.strategy_impl = image.Text2ImageStrategy(self.ap) + elif self.ap.cfg_mgr.data['blob_message_strategy'] == 'forward': + self.strategy_impl = forward.ForwardComponentStrategy(self.ap) + await self.strategy_impl.initialize() + + async def check_and_process(self, message: str) -> list[MessageComponent]: + if len(message) > self.ap.cfg_mgr.data['blob_message_threshold']: + return await self.strategy_impl.process(message) + else: + return [Plain(message)] \ No newline at end of file diff --git a/pkg/qqbot/longtext/strategies/__init__.py b/pkg/qqbot/longtext/strategies/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pkg/qqbot/longtext/strategies/forward.py b/pkg/qqbot/longtext/strategies/forward.py new file mode 100644 index 00000000..d1b5c36c --- /dev/null +++ b/pkg/qqbot/longtext/strategies/forward.py @@ -0,0 +1,62 @@ +# 转发消息组件 +from __future__ import annotations +import typing + +from mirai.models import MessageChain +from mirai.models.message import MessageComponent, ForwardMessageNode +from mirai.models.base import MiraiBaseModel + +from .. import strategy as strategy_model + + +class ForwardMessageDiaplay(MiraiBaseModel): + title: str = "群聊的聊天记录" + brief: str = "[聊天记录]" + source: str = "聊天记录" + preview: typing.List[str] = [] + summary: str = "查看x条转发消息" + + +class Forward(MessageComponent): + """合并转发。""" + type: str = "Forward" + """消息组件类型。""" + display: ForwardMessageDiaplay + """显示信息""" + node_list: typing.List[ForwardMessageNode] + """转发消息节点列表。""" + def __init__(self, *args, **kwargs): + if len(args) == 1: + self.node_list = args[0] + super().__init__(**kwargs) + super().__init__(*args, **kwargs) + + def __str__(self): + return '[聊天记录]' + + +class ForwardComponentStrategy(strategy_model.LongTextStrategy): + + async def process(self, message: str) -> list[MessageComponent]: + display = ForwardMessageDiaplay( + title="群聊的聊天记录", + brief="[聊天记录]", + source="聊天记录", + preview=["QQ用户: "+message], + summary="查看1条转发消息" + ) + + node_list = [ + ForwardMessageNode( + sender_id=self.ap.im_mgr.bot_account_id, + sender_name='QQ用户', + message_chain=MessageChain([message]) + ) + ] + + forward = Forward( + display=display, + node_list=node_list + ) + + return [forward] diff --git a/pkg/qqbot/longtext/strategies/image.py b/pkg/qqbot/longtext/strategies/image.py new file mode 100644 index 00000000..4f789098 --- /dev/null +++ b/pkg/qqbot/longtext/strategies/image.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +import typing +import os +import base64 +import time +import re + +from PIL import Image, ImageDraw, ImageFont + +from mirai.models import MessageChain, Image as ImageComponent +from mirai.models.message import MessageComponent + +from .. import strategy as strategy_model + + +class Text2ImageStrategy(strategy_model.LongTextStrategy): + + text_render_font: ImageFont.FreeTypeFont + + async def initialize(self): + self.text_render_font = ImageFont.truetype(self.ap.cfg_mgr.data['font_path'], 32, encoding="utf-8") + + async def process(self, message: str) -> list[MessageComponent]: + img_path = self.text_to_image( + text_str=message, + save_as='temp/{}.png'.format(int(time.time())) + ) + + compressed_path, size = self.compress_image( + img_path, + outfile="temp/{}_compressed.png".format(int(time.time())) + ) + + with open(compressed_path, 'rb') as f: + img = f.read() + + b64 = base64.b64encode(img) + + # 删除图片 + os.remove(img_path) + + if os.path.exists(compressed_path): + os.remove(compressed_path) + + return [ + ImageComponent( + base64=b64.decode('utf-8'), + ) + ] + + def indexNumber(self, path=''): + """ + 查找字符串中数字所在串中的位置 + :param path:目标字符串 + :return:: : [['1', 16], ['2', 35], ['1', 51]] + """ + kv = [] + nums = [] + beforeDatas = re.findall('[\d]+', path) + for num in beforeDatas: + indexV = [] + times = path.count(num) + if times > 1: + if num not in nums: + indexs = re.finditer(num, path) + for index in indexs: + iV = [] + i = index.span()[0] + iV.append(num) + iV.append(i) + kv.append(iV) + nums.append(num) + else: + index = path.find(num) + indexV.append(num) + indexV.append(index) + kv.append(indexV) + # 根据数字位置排序 + indexSort = [] + resultIndex = [] + for vi in kv: + indexSort.append(vi[1]) + indexSort.sort() + for i in indexSort: + for v in kv: + if i == v[1]: + resultIndex.append(v) + return resultIndex + + + def get_size(self, file): + # 获取文件大小:KB + size = os.path.getsize(file) + return size / 1024 + + + def get_outfile(self, infile, outfile): + if outfile: + return outfile + dir, suffix = os.path.splitext(infile) + outfile = '{}-out{}'.format(dir, suffix) + return outfile + + + def compress_image(self, infile, outfile='', kb=100, step=20, quality=90): + """不改变图片尺寸压缩到指定大小 + :param infile: 压缩源文件 + :param outfile: 压缩文件保存地址 + :param mb: 压缩目标,KB + :param step: 每次调整的压缩比率 + :param quality: 初始压缩比率 + :return: 压缩文件地址,压缩文件大小 + """ + o_size = self.get_size(infile) + if o_size <= kb: + return infile, o_size + outfile = self.get_outfile(infile, outfile) + while o_size > kb: + im = Image.open(infile) + im.save(outfile, quality=quality) + if quality - step < 0: + break + quality -= step + o_size = self.get_size(outfile) + return outfile, self.get_size(outfile) + + + def text_to_image(self, text_str: str, save_as="temp.png", width=800): + + text_str = text_str.replace("\t", " ") + + # 分行 + lines = text_str.split('\n') + + # 计算并分割 + final_lines = [] + + text_width = width-80 + + self.ap.logger.debug("lines: {}, text_width: {}".format(lines, text_width)) + for line in lines: + # 如果长了就分割 + line_width = self.text_render_font.getlength(line) + self.ap.logger.debug("line_width: {}".format(line_width)) + if line_width < text_width: + final_lines.append(line) + continue + else: + rest_text = line + while True: + # 分割最前面的一行 + point = int(len(rest_text) * (text_width / line_width)) + + # 检查断点是否在数字中间 + numbers = self.indexNumber(rest_text) + + for number in numbers: + if number[1] < point < number[1] + len(number[0]) and number[1] != 0: + point = number[1] + break + + final_lines.append(rest_text[:point]) + rest_text = rest_text[point:] + line_width = self.text_render_font.getlength(rest_text) + if line_width < text_width: + final_lines.append(rest_text) + break + else: + continue + # 准备画布 + img = Image.new('RGBA', (width, max(280, len(final_lines) * 35 + 65)), (255, 255, 255, 255)) + draw = ImageDraw.Draw(img, mode='RGBA') + + self.ap.logger.debug("正在绘制图片...") + # 绘制正文 + line_number = 0 + offset_x = 20 + offset_y = 30 + for final_line in final_lines: + draw.text((offset_x, offset_y + 35 * line_number), final_line, fill=(0, 0, 0), font=self.text_render_font) + # 遍历此行,检查是否有emoji + idx_in_line = 0 + for ch in final_line: + # 检查字符占位宽 + char_code = ord(ch) + if char_code >= 127: + idx_in_line += 1 + else: + idx_in_line += 0.5 + + line_number += 1 + + self.ap.logger.debug("正在保存图片...") + img.save(save_as) + + return save_as diff --git a/pkg/qqbot/longtext/strategy.py b/pkg/qqbot/longtext/strategy.py new file mode 100644 index 00000000..ef4cc1a5 --- /dev/null +++ b/pkg/qqbot/longtext/strategy.py @@ -0,0 +1,22 @@ +from __future__ import annotations +import abc +import typing + +import mirai +from mirai.models.message import MessageComponent + +from ...boot import app + + +class LongTextStrategy(metaclass=abc.ABCMeta): + ap: app.Application + + def __init__(self, ap: app.Application): + self.ap = ap + + async def initialize(self): + pass + + @abc.abstractmethod + async def process(self, message: str) -> list[MessageComponent]: + return [] diff --git a/pkg/qqbot/manager.py b/pkg/qqbot/manager.py index a801229d..7c295f9d 100644 --- a/pkg/qqbot/manager.py +++ b/pkg/qqbot/manager.py @@ -21,6 +21,7 @@ from ..qqbot import adapter as msadapter from . import resprule from .bansess import bansess from .cntfilter import cntfilter +from .longtext import longtext from ..boot import app @@ -46,6 +47,7 @@ class QQBotManager: bansess_mgr: bansess.SessionBanManager = None cntfilter_mgr: cntfilter.ContentFilterManager = None + longtext_pcs: longtext.LongTextProcessor = None def __init__(self, first_time_init=True, ap: app.Application = None): config = context.get_config_manager().data @@ -53,6 +55,7 @@ class QQBotManager: self.ap = ap self.bansess_mgr = bansess.SessionBanManager(ap) self.cntfilter_mgr = cntfilter.ContentFilterManager(ap) + self.longtext_pcs = longtext.LongTextProcessor(ap) self.timeout = config['process_message_timeout'] self.retry = config['retry_times'] @@ -60,6 +63,7 @@ class QQBotManager: async def initialize(self): await self.bansess_mgr.initialize() await self.cntfilter_mgr.initialize() + await self.longtext_pcs.initialize() config = context.get_config_manager().data @@ -149,6 +153,7 @@ class QQBotManager: await self.on_group_message(event) asyncio.create_task(group_message_handler(event)) + self.adapter.register_listener( GroupMessage, on_group_message diff --git a/pkg/qqbot/process.py b/pkg/qqbot/process.py index fedaddbd..f6379c71 100644 --- a/pkg/qqbot/process.py +++ b/pkg/qqbot/process.py @@ -14,7 +14,6 @@ from ..utils import context from ..plugin import host as plugin_host from ..plugin import models as plugin_models -from ..qqbot import blob import tips as tips_custom from ..boot import app from .cntfilter import entities @@ -158,8 +157,8 @@ async def process_message(launcher_type: str, launcher_id: int, text_message: st return [] else: reply = [cntfilter_res.replacement] - - reply = blob.check_text(reply[0]) + + reply = await mgr.longtext_pcs.check_and_process(reply[0]) else: logging.info("回复[{}]消息".format(session_name)) diff --git a/pkg/utils/text2img.py b/pkg/utils/text2img.py deleted file mode 100644 index 5be723ed..00000000 --- a/pkg/utils/text2img.py +++ /dev/null @@ -1,208 +0,0 @@ -import logging -import re -import os -import traceback - -from PIL import Image, ImageDraw, ImageFont - -from ..utils import context - - -text_render_font: ImageFont = None - -def initialize(): - global text_render_font - logging.debug("初始化文字转图片模块...") - config = context.get_config_manager().data - - if config['blob_message_strategy'] == "image": # 仅在启用了image时才加载字体 - use_font = config['font_path'] - try: - - # 检查是否存在 - if not os.path.exists(use_font): - # 若是windows系统,使用微软雅黑 - if os.name == "nt": - use_font = "C:/Windows/Fonts/msyh.ttc" - if not os.path.exists(use_font): - logging.warn("未找到字体文件,且无法使用Windows自带字体,更换为转发消息组件以发送长消息,您可以在config.py中调整相关设置。") - config['blob_message_strategy'] = "forward" - else: - logging.info("使用Windows自带字体:" + use_font) - text_render_font = ImageFont.truetype(use_font, 32, encoding="utf-8") - else: - logging.warn("未找到字体文件,且无法使用Windows自带字体,更换为转发消息组件以发送长消息,您可以在config.py中调整相关设置。") - config['blob_message_strategy'] = "forward" - else: - text_render_font = ImageFont.truetype(use_font, 32, encoding="utf-8") - except: - traceback.print_exc() - logging.error("加载字体文件失败({}),更换为转发消息组件以发送长消息,您可以在config.py中调整相关设置。".format(use_font)) - config['blob_message_strategy'] = "forward" - - logging.debug("字体文件加载完成。") - - -def indexNumber(path=''): - """ - 查找字符串中数字所在串中的位置 - :param path:目标字符串 - :return:: : [['1', 16], ['2', 35], ['1', 51]] - """ - kv = [] - nums = [] - beforeDatas = re.findall('[\d]+', path) - for num in beforeDatas: - indexV = [] - times = path.count(num) - if times > 1: - if num not in nums: - indexs = re.finditer(num, path) - for index in indexs: - iV = [] - i = index.span()[0] - iV.append(num) - iV.append(i) - kv.append(iV) - nums.append(num) - else: - index = path.find(num) - indexV.append(num) - indexV.append(index) - kv.append(indexV) - # 根据数字位置排序 - indexSort = [] - resultIndex = [] - for vi in kv: - indexSort.append(vi[1]) - indexSort.sort() - for i in indexSort: - for v in kv: - if i == v[1]: - resultIndex.append(v) - return resultIndex - - -def get_size(file): - # 获取文件大小:KB - size = os.path.getsize(file) - return size / 1024 - - -def get_outfile(infile, outfile): - if outfile: - return outfile - dir, suffix = os.path.splitext(infile) - outfile = '{}-out{}'.format(dir, suffix) - return outfile - - -def compress_image(infile, outfile='', kb=100, step=20, quality=90): - """不改变图片尺寸压缩到指定大小 - :param infile: 压缩源文件 - :param outfile: 压缩文件保存地址 - :param mb: 压缩目标,KB - :param step: 每次调整的压缩比率 - :param quality: 初始压缩比率 - :return: 压缩文件地址,压缩文件大小 - """ - o_size = get_size(infile) - if o_size <= kb: - return infile, o_size - outfile = get_outfile(infile, outfile) - while o_size > kb: - im = Image.open(infile) - im.save(outfile, quality=quality) - if quality - step < 0: - break - quality -= step - o_size = get_size(outfile) - return outfile, get_size(outfile) - - -def text_to_image(text_str: str, save_as="temp.png", width=800): - global text_render_font - - logging.debug("正在将文本转换为图片...") - - text_str = text_str.replace("\t", " ") - - # 分行 - lines = text_str.split('\n') - - # 计算并分割 - final_lines = [] - - text_width = width-80 - - logging.debug("lines: {}, text_width: {}".format(lines, text_width)) - for line in lines: - logging.debug(type(text_render_font)) - # 如果长了就分割 - line_width = text_render_font.getlength(line) - logging.debug("line_width: {}".format(line_width)) - if line_width < text_width: - final_lines.append(line) - continue - else: - rest_text = line - while True: - # 分割最前面的一行 - point = int(len(rest_text) * (text_width / line_width)) - - # 检查断点是否在数字中间 - numbers = indexNumber(rest_text) - - for number in numbers: - if number[1] < point < number[1] + len(number[0]) and number[1] != 0: - point = number[1] - break - - final_lines.append(rest_text[:point]) - rest_text = rest_text[point:] - line_width = text_render_font.getlength(rest_text) - if line_width < text_width: - final_lines.append(rest_text) - break - else: - continue - # 准备画布 - img = Image.new('RGBA', (width, max(280, len(final_lines) * 35 + 65)), (255, 255, 255, 255)) - draw = ImageDraw.Draw(img, mode='RGBA') - - logging.debug("正在绘制图片...") - # 绘制正文 - line_number = 0 - offset_x = 20 - offset_y = 30 - for final_line in final_lines: - draw.text((offset_x, offset_y + 35 * line_number), final_line, fill=(0, 0, 0), font=text_render_font) - # 遍历此行,检查是否有emoji - idx_in_line = 0 - for ch in final_line: - # if self.is_emoji(ch): - # emoji_img_valid = ensure_emoji(hex(ord(ch))[2:]) - # if emoji_img_valid: # emoji图像可用,绘制到指定位置 - # emoji_image = Image.open("emojis/{}.png".format(hex(ord(ch))[2:]), mode='r').convert('RGBA') - # emoji_image = emoji_image.resize((32, 32)) - - # x, y = emoji_image.size - - # final_emoji_img = Image.new('RGBA', emoji_image.size, (255, 255, 255)) - # final_emoji_img.paste(emoji_image, (0, 0, x, y), emoji_image) - - # img.paste(final_emoji_img, box=(int(offset_x + idx_in_line * 32), offset_y + 35 * line_number)) - - # 检查字符占位宽 - char_code = ord(ch) - if char_code >= 127: - idx_in_line += 1 - else: - idx_in_line += 0.5 - - line_number += 1 - - logging.debug("正在保存图片...") - img.save(save_as) - - return save_as