Files
LangBot/pkg/platform/sources/line.py
fdc310 8050ea1ffb Feat/lineadapter (#1637)
* feat:line adapter and config

* fix:After receiving the message, decode it and handle it as "message_chain"

* feat:add line-bot-sdk

* del print

* feat: add image to base64

* fix: download image to base64

* del Convert binary data to a base64 string

* del print

* perf: i18n specs for zh_Hant and ja_JP

* fix:line adapter  Plugin system

---------

Co-authored-by: Junyan Qin <rockchinq@gmail.com>
2025-09-15 23:11:39 +08:00

287 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import typing
import quart
import traceback
import typing
import asyncio
import re
import base64
import uuid
import json
import datetime
import hashlib
from Crypto.Cipher import AES
from ...core import app
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
from ..logger import EventLogger
from linebot.v3 import (
WebhookHandler
)
from linebot.v3.exceptions import (
InvalidSignatureError
)
from linebot.v3.messaging import (
Configuration,
ApiClient,
MessagingApi,
ReplyMessageRequest,
TextMessage,
ImageMessage
)
from linebot.v3.webhooks import (
MessageEvent,
TextMessageContent,
ImageMessageContent,
VideoMessageContent,
AudioMessageContent,
FileMessageContent,
LocationMessageContent,
StickerMessageContent
)
# from linebot import WebhookParser
from linebot.v3.webhook import WebhookParser
from linebot.v3.messaging import MessagingApiBlob
class LINEMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
async def yiri2target(
message_chain: platform_message.MessageChain, api_client: ApiClient
) -> typing.Tuple[list]:
content_list = []
for component in message_chain:
if isinstance(component, platform_message.At):
content_list.append({'type': 'at', 'target': component.target})
elif isinstance(component, platform_message.Plain):
content_list.append({'type': 'text', 'content': component.text})
elif isinstance(component, platform_message.Image):
if not component.url:
pass
content_list.append({'type': 'image', 'image': component.url})
elif isinstance(component, platform_message.Voice):
content_list.append({'type': 'voice', 'url': component.url, 'length': component.length})
return content_list
@staticmethod
async def target2yiri(
message,
bot_client
) -> platform_message.MessageChain:
lb_msg_list = []
msg_create_time = datetime.datetime.fromtimestamp(int(message.timestamp) / 1000)
lb_msg_list.append(platform_message.Source(id=message.webhook_event_id, time=msg_create_time))
if isinstance(message.message, TextMessageContent):
lb_msg_list.append(platform_message.Plain(text=message.message.text))
elif isinstance(message.message, AudioMessageContent):
pass
elif isinstance(message.message, VideoMessageContent):
pass
elif isinstance(message.message, ImageMessageContent):
message_content = MessagingApiBlob(bot_client).get_message_content(message.message.id)
base64_string = base64.b64encode(message_content).decode('utf-8')
# 如果需要Data URI格式用于直接嵌入HTML等
# 首先需要知道图片类型LINE图片通常是JPEG
data_uri = f"data:image/jpeg;base64,{base64_string}"
lb_msg_list.append(platform_message.Image(base64 = data_uri))
return platform_message.MessageChain(lb_msg_list)
class LINEEventConverter(abstract_platform_adapter.AbstractEventConverter):
@staticmethod
async def yiri2target(
event: platform_events.MessageEvent,
) -> MessageEvent:
pass
@staticmethod
async def target2yiri(
event,
bot_client
) -> platform_events.Event:
message_chain = await LINEMessageConverter.target2yiri(event, bot_client)
if event.source.type== 'user':
return platform_events.FriendMessage(
sender=platform_entities.Friend(
id=event.message.id,
nickname=event.source.user_id,
remark='',
),
message_chain=message_chain,
time=event.timestamp,
source_platform_object=event,
)
else:
return platform_events.GroupMessage(
sender=platform_entities.GroupMember(
id=event.event.sender.sender_id.open_id,
member_name=event.event.sender.sender_id.union_id,
permission=platform_entities.Permission.Member,
group=platform_entities.Group(
id=event.message.id,
name='',
permission=platform_entities.Permission.Member,
),
special_title='',
join_timestamp=0,
last_speak_timestamp=0,
mute_time_remaining=0,
),
message_chain=message_chain,
time=event.timestamp,
source_platform_object=event,
)
class LINEAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
bot: MessagingApi
api_client: ApiClient
bot_account_id: str # 用于在流水线中识别at是否是本bot直接以bot_name作为标识
message_converter: LINEMessageConverter
event_converter: LINEEventConverter
listeners: typing.Dict[
typing.Type[platform_events.Event],
typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
]
config: dict
quart_app: quart.Quart
card_id_dict: dict[str, str] # 消息id到卡片id的映射便于创建卡片后的发送消息到指定卡片
seq: int # 用于在发送卡片消息中识别消息顺序直接以seq作为标识
def __init__(self, config: dict, logger: EventLogger):
configuration = Configuration(access_token=config['channel_access_token'])
line_webhook = WebhookHandler(config['channel_secret'])
parser = WebhookParser(config['channel_secret'])
api_client = ApiClient(configuration)
bot_account_id = config.get('bot_account_id', 'langbot')
super().__init__(
config = config,
logger = logger,
quart_app = quart.Quart(__name__),
listeners = {},
card_id_dict = {},
seq = 1,
event_converter = LINEEventConverter(),
message_converter = LINEMessageConverter(),
line_webhook = line_webhook,
parser = parser,
configuration=configuration,
api_client = api_client,
bot = MessagingApi(api_client),
bot_account_id = bot_account_id,
)
@self.quart_app.route('/line/callback', methods=['POST'])
async def line_callback():
try:
signature = quart.request.headers.get('X-Line-Signature')
body = await quart.request.get_data(as_text=True)
events = parser.parse(body, signature) # 解密解析消息
try:
# print(events)
lb_event = await self.event_converter.target2yiri(events[0], self.api_client)
if lb_event.__class__ in self.listeners:
await self.listeners[lb_event.__class__](lb_event, self)
except InvalidSignatureError:
self.logger.info(f"Invalid signature. Please check your channel access token/channel secret.{traceback.format_exc()}")
return quart.Response('Invalid signature', status=400)
return {'code': 200, 'message': 'ok'}
except Exception:
await self.logger.error(f'Error in LINE callback: {traceback.format_exc()}')
return {'code': 500, 'message': 'error'}
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
pass
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
content_list = await self.message_converter.yiri2target(message, self.api_client)
for content in content_list:
if content['type'] == 'text':
self.bot.reply_message_with_http_info(
ReplyMessageRequest(
reply_token=message_source.source_platform_object.reply_token,
messages=[TextMessage(text=content['content'])]
)
)
elif content['type'] == 'image':
self.bot.reply_message_with_http_info(
ReplyMessageRequest(
reply_token=message_source.source_platform_object.reply_token,
messages=[ImageMessage(text=content['content'])]
)
)
async def is_muted(self, group_id: int) -> bool:
return False
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
):
self.listeners[event_type] = callback
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
):
self.listeners.pop(event_type)
async def run_async(self):
port = self.config['port']
async def shutdown_trigger_placeholder():
while True:
await asyncio.sleep(1)
await self.quart_app.run_task(
host='0.0.0.0',
port=port,
shutdown_trigger=shutdown_trigger_placeholder,
)
async def kill(self) -> bool:
pass