feat(platform): add lark eba adapter

This commit is contained in:
Junyan Qin
2026-05-11 12:00:24 +08:00
parent 417b83d3aa
commit 197e117900
13 changed files with 2179 additions and 33 deletions

View File

@@ -0,0 +1 @@
"""Lark/Feishu EBA platform adapter."""

View File

@@ -0,0 +1,680 @@
from __future__ import annotations
import asyncio
import base64
import hashlib
import json
import time
import traceback
import typing
import uuid
from Crypto.Cipher import AES
import lark_oapi
from lark_oapi.api.auth.v3 import (
CreateAppAccessTokenRequest,
CreateAppAccessTokenRequestBody,
CreateAppAccessTokenResponse,
CreateTenantAccessTokenRequest,
CreateTenantAccessTokenRequestBody,
CreateTenantAccessTokenResponse,
ResendAppTicketRequest,
ResendAppTicketRequestBody,
ResendAppTicketResponse,
)
from lark_oapi.api.cardkit.v1 import (
ContentCardElementRequest,
ContentCardElementRequestBody,
ContentCardElementResponse,
CreateCardRequest,
CreateCardRequestBody,
CreateCardResponse,
)
from lark_oapi.api.im.v1 import (
CreateMessageRequest,
CreateMessageRequestBody,
CreateMessageResponse,
EventMessage,
EventSender,
P2ImMessageReceiveV1,
P2ImMessageReceiveV1Data,
ReplyMessageRequest,
ReplyMessageRequestBody,
ReplyMessageResponse,
)
import lark_oapi.ws.exception
import pydantic
import quart
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
from langbot.pkg.platform.adapters.lark.api_impl import LarkAPIMixin
from langbot.pkg.platform.adapters.lark.event_converter import LarkEventConverter
from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter
from langbot.pkg.platform.adapters.lark.platform_api import PLATFORM_API_MAP
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class AESCipher:
def __init__(self, key: str):
self.key = hashlib.sha256(self.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
if isinstance(data, str):
return data.encode('utf8')
return data
@staticmethod
def _unpad(value: bytes) -> bytes:
return value[: -value[len(value) - 1]]
def decrypt_string(self, encrypted: str) -> str:
encrypted_bytes = base64.b64decode(encrypted)
iv = encrypted_bytes[: AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(encrypted_bytes[AES.block_size :])).decode('utf8')
class LarkAdapter(LarkAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter):
bot: lark_oapi.ws.Client = pydantic.Field(exclude=True)
api_client: lark_oapi.Client = pydantic.Field(exclude=True)
quart_app: quart.Quart = pydantic.Field(exclude=True)
cipher: AESCipher = pydantic.Field(exclude=True)
config: dict
lark_tenant_key: str = pydantic.Field(exclude=True, default='')
app_ticket: str | None = None
app_access_token: str | None = None
app_access_token_expire_at: int | None = None
tenant_access_tokens: dict[str, dict[str, typing.Any]] = pydantic.Field(default_factory=dict)
bot_uuid: str | None = None
event_loop: asyncio.AbstractEventLoop | None = pydantic.Field(exclude=True, default=None)
message_converter: LarkMessageConverter = LarkMessageConverter()
event_converter: LarkEventConverter = LarkEventConverter()
listeners: dict[
typing.Type[platform_events.Event],
typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
] = pydantic.Field(default_factory=dict)
card_id_dict: dict[str, str] = pydantic.Field(default_factory=dict)
pending_monitoring_msg: dict[str, str] = pydantic.Field(default_factory=dict)
reply_to_monitoring_msg: dict[str, tuple[str, float]] = pydantic.Field(default_factory=dict)
_message_cache: dict[str, platform_events.MessageReceivedEvent] = pydantic.PrivateAttr(default_factory=dict)
_user_cache: dict[str, platform_entities.User] = pydantic.PrivateAttr(default_factory=dict)
_group_cache: dict[str, platform_entities.UserGroup] = pydantic.PrivateAttr(default_factory=dict)
_monitoring_mapping_ttl: int = 600
class Config:
arbitrary_types_allowed = True
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs):
required_keys = ['app_id', 'app_secret', 'bot_name']
missing_keys = [key for key in required_keys if not config.get(key)]
if missing_keys:
raise ValueError(f'Lark missing required config: {", ".join(missing_keys)}')
api_client = self.build_api_client(config)
event_handler = self._build_event_handler()
bot = lark_oapi.ws.Client(config['app_id'], config['app_secret'], event_handler=event_handler)
cipher = AESCipher(config.get('encrypt-key', ''))
super().__init__(
config=config,
logger=logger,
lark_tenant_key=config.get('lark_tenant_key', ''),
bot_account_id=config['bot_name'],
bot=bot,
api_client=api_client,
quart_app=quart.Quart(__name__),
cipher=cipher,
listeners={},
card_id_dict={},
pending_monitoring_msg={},
reply_to_monitoring_msg={},
event_loop=None,
**kwargs,
)
self._message_cache = {}
self._user_cache = {}
self._group_cache = {}
self.request_app_ticket()
def _build_event_handler(self):
async def on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
await self._handle_message_event(event)
def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1):
self._submit_coro(on_message(event))
def sync_on_card_action(event):
return self._handle_card_action_sync(event)
return (
lark_oapi.EventDispatcherHandler.builder('', '')
.register_p2_im_message_receive_v1(sync_on_message)
.register_p2_card_action_trigger(sync_on_card_action)
.build()
)
def get_supported_events(self) -> list[str]:
return ['message.received', 'bot.invited_to_group', 'platform.specific']
def get_supported_apis(self) -> list[str]:
return [
'send_message',
'reply_message',
'get_message',
'get_group_info',
'get_group_member_info',
'get_user_info',
'get_file_url',
'call_platform_api',
]
def build_api_client(self, config: dict) -> lark_oapi.Client:
builder = lark_oapi.Client.builder().app_id(config['app_id']).app_secret(config['app_secret'])
if config.get('app_type', 'self') == 'isv':
builder = builder.app_type(lark_oapi.AppType.ISV)
return builder.build()
def request_app_ticket(self):
if self.config.get('app_type', 'self') != 'isv':
return
request = (
ResendAppTicketRequest.builder()
.request_body(
ResendAppTicketRequestBody.builder()
.app_id(self.config['app_id'])
.app_secret(self.config['app_secret'])
.build()
)
.build()
)
response: ResendAppTicketResponse = self.api_client.auth.v3.app_ticket.resend(request)
if not response.success():
raise RuntimeError(f'Lark app_ticket resend failed: {response.code} {response.msg}')
def request_app_access_token(self):
if self.config.get('app_type', 'self') != 'isv':
return
request = (
CreateAppAccessTokenRequest.builder()
.request_body(
CreateAppAccessTokenRequestBody.builder()
.app_id(self.config['app_id'])
.app_secret(self.config['app_secret'])
.app_ticket(self.app_ticket)
.build()
)
.build()
)
response: CreateAppAccessTokenResponse = self.api_client.auth.v3.app_access_token.create(request)
if not response.success():
raise RuntimeError(f'Lark app_access_token failed: {response.code} {response.msg}')
content = json.loads(response.raw.content)
self.app_access_token = content['app_access_token']
self.app_access_token_expire_at = int(time.time()) + content['expire'] - 300
def get_app_access_token(self):
if self.config.get('app_type', 'self') != 'isv':
return None
if (
self.app_access_token is None
or self.app_access_token_expire_at is None
or int(time.time()) >= self.app_access_token_expire_at
):
self.request_app_access_token()
return self.app_access_token
def request_tenant_access_token(self, tenant_key: str):
if self.config.get('app_type', 'self') != 'isv':
return
request = (
CreateTenantAccessTokenRequest.builder()
.request_body(
CreateTenantAccessTokenRequestBody.builder()
.app_access_token(self.get_app_access_token())
.tenant_key(tenant_key)
.build()
)
.build()
)
response: CreateTenantAccessTokenResponse = self.api_client.auth.v3.tenant_access_token.create(request)
if not response.success():
raise RuntimeError(f'Lark tenant_access_token failed: {response.code} {response.msg}')
content = json.loads(response.raw.content)
self.tenant_access_tokens[tenant_key] = {
'token': content['tenant_access_token'],
'expire_at': int(time.time()) + content['expire'] - 300,
}
def get_tenant_access_token(self, tenant_key: str | None):
if self.config.get('app_type', 'self') != 'isv' or not tenant_key:
return None
cached = self.tenant_access_tokens.get(tenant_key)
if cached is None or int(time.time()) >= cached['expire_at']:
self.request_tenant_access_token(tenant_key)
return self.tenant_access_tokens.get(tenant_key, {}).get('token')
async def send_message(
self,
target_type: str,
target_id: str,
message: platform_message.MessageChain,
) -> platform_events.MessageResult:
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
receive_id_type = 'chat_id' if target_type == 'group' else 'open_id'
message_ids: list[str] = []
for msg_type, content in self._outbound_payloads(text_elements, media_items):
request = (
CreateMessageRequest.builder()
.receive_id_type(receive_id_type)
.request_body(
CreateMessageRequestBody.builder()
.receive_id(str(target_id))
.content(json.dumps(content, ensure_ascii=False))
.msg_type(msg_type)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
response: CreateMessageResponse = await self.api_client.im.v1.message.acreate(request)
if not response.success():
raise RuntimeError(f'Lark send_message failed: {response.code} {response.msg}')
message_ids.append(getattr(response.data, 'message_id', ''))
return platform_events.MessageResult(
message_id=message_ids[-1] if message_ids else '', raw={'message_ids': message_ids}
)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
) -> platform_events.MessageResult:
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
tenant_key = self._tenant_key_from_source(message_source)
message_ids: list[str] = []
for msg_type, content in self._outbound_payloads(text_elements, media_items):
request = (
ReplyMessageRequest.builder()
.message_id(self._message_id_from_source(message_source))
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(content, ensure_ascii=False))
.msg_type(msg_type)
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
request, self.request_option(tenant_key)
)
if not response.success():
raise RuntimeError(f'Lark reply_message failed: {response.code} {response.msg}')
message_ids.append(getattr(response.data, 'message_id', ''))
return platform_events.MessageResult(
message_id=message_ids[-1] if message_ids else '', raw={'message_ids': message_ids}
)
def _outbound_payloads(self, text_elements: list[list[dict]], media_items: list[dict]) -> list[tuple[str, dict]]:
payloads: list[tuple[str, dict]] = []
if text_elements:
needs_post = any(ele.get('tag') == 'at' for paragraph in text_elements for ele in paragraph)
if needs_post:
payloads.append(('post', {'zh_Hans': {'title': '', 'content': text_elements}}))
else:
parts = []
for paragraph in text_elements:
text = ''.join(ele.get('text', '') for ele in paragraph)
if text:
parts.append(text)
payloads.append(('text', {'text': '\n\n'.join(parts)}))
for media in media_items:
payloads.append((media['msg_type'], media['content']))
return payloads
async def is_stream_output_supported(self) -> bool:
return bool(self.config.get('enable-stream-reply', False))
async def on_monitoring_message_created(self, query, monitoring_message_id: str):
user_msg_id = getattr(query.message_event, 'message_id', None)
if user_msg_id:
self.pending_monitoring_msg[str(user_msg_id)] = monitoring_message_id
async def create_message_card(self, message_id, event) -> bool:
card_id = await self.create_card_id(message_id)
content = {'type': 'card', 'data': {'card_id': card_id, 'template_variable': {'content': 'Thinking...'}}}
request = (
ReplyMessageRequest.builder()
.message_id(self._message_id_from_source(event))
.request_body(
ReplyMessageRequestBody.builder().content(json.dumps(content)).msg_type('interactive').build()
)
.build()
)
response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
request, self.request_option(self._tenant_key_from_source(event))
)
if not response.success():
raise RuntimeError(f'Lark create_message_card failed: {response.code} {response.msg}')
return True
async def create_card_id(self, message_id) -> str:
card_data = {
'schema': '2.0',
'config': {'update_multi': True, 'streaming_mode': True},
'body': {
'direction': 'vertical',
'elements': [{'tag': 'markdown', 'content': '', 'element_id': 'streaming_txt'}],
},
}
request = (
CreateCardRequest.builder()
.request_body(CreateCardRequestBody.builder().type('card_json').data(json.dumps(card_data)).build())
.build()
)
response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request)
if not response.success():
raise RuntimeError(f'Lark create_card failed: {response.code} {response.msg}')
self.card_id_dict[str(message_id)] = response.data.card_id
return response.data.card_id
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
quote_origin: bool = False,
is_final: bool = False,
):
if bot_message.msg_sequence % 8 != 0 and not is_final:
return
text_elements, _ = await self.message_converter.yiri2target(message, self.api_client)
content = '\n\n'.join(
''.join(ele.get('text', '') for ele in paragraph if ele.get('tag') in {'text', 'md'})
for paragraph in text_elements
)
request = (
ContentCardElementRequest.builder()
.card_id(self.card_id_dict[bot_message.resp_message_id])
.element_id('streaming_txt')
.request_body(
ContentCardElementRequestBody.builder().content(content).sequence(bot_message.msg_sequence).build()
)
.build()
)
response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content(
request, self.request_option(self._tenant_key_from_source(message_source))
)
if not response.success():
raise RuntimeError(f'Lark card_element update failed: {response.code} {response.msg}')
if is_final and bot_message.tool_calls is None:
self.card_id_dict.pop(bot_message.resp_message_id, None)
async def call_platform_api(self, action: str, params: dict = {}) -> dict:
handler = PLATFORM_API_MAP.get(action)
if handler is None:
raise NotSupportedError(f'call_platform_api:{action}')
return await handler(self, params)
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
self.listeners[event_type] = callback
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
if self.listeners.get(event_type) is callback:
self.listeners.pop(event_type, None)
def set_bot_uuid(self, bot_uuid: str):
self.bot_uuid = bot_uuid
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
source_event = getattr(event.source_platform_object, 'event', None)
message = getattr(source_event, 'message', None) if source_event else None
thread_id = getattr(message, 'thread_id', None)
if thread_id and isinstance(event, platform_events.MessageReceivedEvent) and event.group:
return f'{event.group.id}_{thread_id}'
return None
async def handle_unified_webhook(self, bot_uuid: str, path: str, request):
try:
data = await request.json
if 'encrypt' in data:
data = json.loads(self.cipher.decrypt_string(data['encrypt']))
event_type = self.get_event_type(data)
if event_type == 'url_verification':
return {'challenge': data.get('challenge')}
if event_type == 'app_ticket':
self.app_ticket = self._webhook_event(data).get('app_ticket')
return {'code': 200, 'message': 'ok'}
if event_type == 'im.message.receive_v1':
p2v1 = P2ImMessageReceiveV1()
p2v1.header = self._webhook_header(data)
event_data = P2ImMessageReceiveV1Data()
raw_event = self._webhook_event(data)
event_data.message = EventMessage(raw_event['message'])
event_data.sender = EventSender(raw_event['sender'])
p2v1.event = event_data
p2v1.schema = data.get('schema', '2.0')
await self._handle_message_event(p2v1)
return {'code': 200, 'message': 'ok'}
if event_type == 'im.chat.member.bot.added_v1':
raw_event = self._webhook_event(data)
header = self._webhook_header(data)
chat_id = raw_event.get('chat_id', '')
await self._send_bot_added_welcome(chat_id, getattr(header, 'tenant_key', None))
await self._dispatch_eba_event(LarkEventConverter.bot_invited_to_group(data, chat_id))
return {'code': 200, 'message': 'ok'}
if event_type == 'card.action.trigger':
feedback_event = self._feedback_event_from_webhook(data)
if feedback_event and platform_events.FeedbackEvent in self.listeners:
await self.listeners[platform_events.FeedbackEvent](feedback_event, self)
return {'toast': {'type': 'success', 'content': '感谢您的反馈'}}
await self._dispatch_eba_event(LarkEventConverter.platform_specific(data, event_type, data))
return {'code': 200, 'message': 'ok'}
except Exception:
await self.logger.error(f'Error in lark webhook: {traceback.format_exc()}')
return {'code': 500, 'message': 'error'}
def get_event_type(self, data: dict) -> str:
schema = data.get('schema', '1.0')
if schema == '2.0':
return data.get('header', {}).get('event_type', '')
if 'event' in data:
return data['event'].get('type', '')
return data.get('type', '')
def _webhook_event(self, data: dict) -> dict:
return data.get('event', {})
def _webhook_header(self, data: dict):
return type('LarkWebhookHeader', (), data.get('header', {}))()
async def run_async(self):
self.event_loop = asyncio.get_running_loop()
if not self.config.get('enable-webhook', False):
try:
await self.bot._connect()
except lark_oapi.ws.exception.ClientException:
raise
except Exception:
await self.bot._disconnect()
if self.bot._auto_reconnect:
await self.bot._reconnect()
else:
raise
else:
while True:
await asyncio.sleep(1)
async def kill(self) -> bool:
self.bot._auto_reconnect = False
await self.bot._disconnect()
return True
async def is_muted(self, group_id: int | None = None) -> bool:
return False
async def _handle_message_event(self, event: lark_oapi.im.v1.P2ImMessageReceiveV1):
try:
if platform_events.FriendMessage in self.listeners or platform_events.GroupMessage in self.listeners:
legacy_event = await self.event_converter.target2legacy(event, self.api_client)
if legacy_event and type(legacy_event) in self.listeners:
await self.listeners[type(legacy_event)](legacy_event, self)
eba_event = await self.event_converter.target2yiri(event, self.api_client)
if eba_event:
self._cache_event(eba_event)
await self._dispatch_eba_event(eba_event)
except Exception:
await self.logger.error(f'Error in lark message event: {traceback.format_exc()}')
async def _dispatch_eba_event(self, event: platform_events.Event):
for event_type in (type(event), platform_events.EBAEvent, platform_events.Event):
callback = self.listeners.get(event_type)
if callback:
await callback(event, self)
return
def _cache_event(self, event: platform_events.Event):
if not isinstance(event, platform_events.MessageReceivedEvent):
return
self._message_cache[str(event.message_id)] = event
self._user_cache[str(event.sender.id)] = event.sender
if event.group:
self._group_cache[str(event.group.id)] = event.group
def _handle_card_action_sync(self, event):
feedback_event = self._feedback_event_from_callback(event)
if feedback_event and platform_events.FeedbackEvent in self.listeners:
self._submit_coro(self.listeners[platform_events.FeedbackEvent](feedback_event, self))
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}})
def _submit_coro(self, coro):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = self.event_loop
if loop and loop.is_running():
asyncio.run_coroutine_threadsafe(coro, loop)
return
coro.close()
raise
else:
loop.create_task(coro)
def _feedback_event_from_callback(self, event) -> platform_events.FeedbackEvent | None:
value = getattr(getattr(event.event, 'action', None), 'value', {}) or {}
return self._feedback_event(
raw=event,
feedback_id=getattr(event.header, 'event_id', str(uuid.uuid4())),
feedback_value=value.get('feedback', ''),
user_id=getattr(getattr(event.event, 'operator', None), 'open_id', None),
chat_id=getattr(getattr(event.event, 'context', None), 'open_chat_id', None),
message_id=getattr(getattr(event.event, 'context', None), 'open_message_id', None),
)
def _feedback_event_from_webhook(self, data: dict) -> platform_events.FeedbackEvent | None:
event = data.get('event', {})
value = event.get('action', {}).get('value', {}) or {}
operator = event.get('operator', {})
context = event.get('context', {})
return self._feedback_event(
raw=data,
feedback_id=data.get('header', {}).get('event_id', str(uuid.uuid4())),
feedback_value=value.get('feedback', ''),
user_id=operator.get('open_id') or operator.get('user_id'),
chat_id=context.get('open_chat_id'),
message_id=context.get('open_message_id'),
)
def _feedback_event(
self,
raw,
feedback_id: str,
feedback_value: str,
user_id: str | None,
chat_id: str | None,
message_id: str | None,
) -> platform_events.FeedbackEvent | None:
if feedback_value == '有帮助':
feedback_type = 1
elif feedback_value == '无帮助':
feedback_type = 2
else:
return None
return platform_events.FeedbackEvent(
feedback_id=feedback_id,
feedback_type=feedback_type,
feedback_content=feedback_value,
user_id=user_id,
session_id=f'group_{chat_id}' if chat_id else (f'person_{user_id}' if user_id else None),
message_id=message_id,
stream_id=self.reply_to_monitoring_msg.get(message_id, (None, 0))[0] if message_id else None,
source_platform_object=raw,
)
async def _send_bot_added_welcome(self, chat_id: str, tenant_key: str | None):
welcome = self.config.get('bot_added_welcome', '')
if not welcome or not chat_id:
return
content = {'zh_Hans': {'title': '', 'content': [[{'tag': 'md', 'text': welcome}]]}}
request = (
CreateMessageRequest.builder()
.receive_id_type('chat_id')
.request_body(
CreateMessageRequestBody.builder()
.receive_id(chat_id)
.content(json.dumps(content, ensure_ascii=False))
.msg_type('post')
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
response: CreateMessageResponse = await self.api_client.im.v1.message.acreate(
request, self.request_option(tenant_key)
)
if not response.success():
await self.logger.warning(f'Lark bot_added_welcome failed: {response.code} {response.msg}')
def _tenant_key_from_source(self, event: platform_events.Event) -> str | None:
source = getattr(event, 'source_platform_object', None)
header = getattr(source, 'header', None)
return getattr(header, 'tenant_key', None)
def _message_id_from_source(self, event: platform_events.Event) -> str:
message_id = getattr(event, 'message_id', None)
if message_id:
return str(message_id)
source = getattr(event, 'source_platform_object', None)
source_event = getattr(source, 'event', None)
message = getattr(source_event, 'message', None) if source_event else None
message_id = getattr(message, 'message_id', None)
if message_id:
return str(message_id)
raise RuntimeError('Lark message source does not contain message_id')

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
import typing
from lark_oapi.api.im.v1 import GetChatRequest, GetMessageRequest
from lark_oapi.core.model import RequestOption
from langbot.pkg.platform.adapters.lark.event_converter import LarkEventConverter
from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
class LarkAPIMixin:
_message_cache: dict[str, platform_events.MessageReceivedEvent]
_user_cache: dict[str, platform_entities.User]
_group_cache: dict[str, platform_entities.UserGroup]
async def get_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> platform_events.MessageReceivedEvent:
cached = self._message_cache.get(str(message_id))
if cached:
return cached
request = GetMessageRequest.builder().message_id(str(message_id)).build()
response = await self.api_client.im.v1.message.aget(request, self.request_option(None))
if not response.success():
raise NotSupportedError(f'get_message:{message_id}')
items = getattr(response.data, 'items', None) or []
if not items:
raise NotSupportedError(f'get_message:{message_id}')
event_message = LarkEventConverter._build_event_message_from_message_item(items[0])
if event_message is None:
raise NotSupportedError(f'get_message:{message_id}')
message_chain = await LarkMessageConverter.target2yiri(event_message, self.api_client)
event = platform_events.MessageReceivedEvent(
type='message.received',
adapter_name='lark-eba',
message_id=str(message_id),
message_chain=message_chain,
sender=platform_entities.User(id=''),
chat_type=platform_entities.ChatType.GROUP if chat_type == 'group' else platform_entities.ChatType.PRIVATE,
chat_id=chat_id,
group=platform_entities.UserGroup(id=chat_id, name='') if chat_type == 'group' else None,
timestamp=0,
source_platform_object=items[0],
)
self._message_cache[str(message_id)] = event
return event
async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup:
cached = self._group_cache.get(str(group_id))
if cached:
return cached
request = GetChatRequest.builder().chat_id(str(group_id)).build()
response = await self.api_client.im.v1.chat.aget(request, self.request_option(None))
if not response.success():
raise NotSupportedError(f'get_group_info:{group_id}')
data = response.data
group = platform_entities.UserGroup(
id=getattr(data, 'chat_id', group_id),
name=getattr(data, 'name', '') or '',
description=getattr(data, 'description', None),
avatar_url=getattr(data, 'avatar', None),
owner_id=getattr(data, 'owner_id', None),
)
self._group_cache[str(group.id)] = group
return group
async def get_group_member_info(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> platform_entities.UserGroupMember:
user = self._user_cache.get(str(user_id)) or platform_entities.User(id=user_id)
return platform_entities.UserGroupMember(user=user, group_id=group_id, role=platform_entities.MemberRole.MEMBER)
async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User:
cached = self._user_cache.get(str(user_id))
if cached:
return cached
return platform_entities.User(id=user_id)
async def get_file_url(self, file_id: str) -> str:
if str(file_id).startswith('file://'):
return str(file_id)
raise NotSupportedError('get_file_url requires a file:// path or platform-specific resource download params')
def request_option(self, tenant_key: str | None) -> RequestOption:
app_access_token = self.get_app_access_token()
tenant_access_token = self.get_tenant_access_token(tenant_key)
return (
RequestOption.builder()
.app_ticket(self.app_ticket)
.tenant_key(tenant_key)
.app_access_token(app_access_token)
.tenant_access_token(tenant_access_token)
.build()
)

View File

@@ -0,0 +1,205 @@
from __future__ import annotations
import time
import typing
import lark_oapi
from lark_oapi.api.im.v1 import EventMessage, GetMessageRequest, Message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter
from langbot.pkg.platform.adapters.lark.types import ADAPTER_NAME
from langbot_plugin.api.entities.builtin.platform import entities as platform_entities
from langbot_plugin.api.entities.builtin.platform import events as platform_events
from langbot_plugin.api.entities.builtin.platform import message as platform_message
class LarkEventConverter(abstract_platform_adapter.AbstractEventConverter):
_processed_thread_quote_cache: typing.ClassVar[dict[str, float]] = {}
_processed_thread_quote_cache_max_size: typing.ClassVar[int] = 4096
_processed_thread_quote_cache_ttl_seconds: typing.ClassVar[int] = 86400
@staticmethod
async def yiri2target(event: platform_events.Event):
return getattr(event, 'source_platform_object', None)
@staticmethod
async def target2yiri(
event: lark_oapi.im.v1.P2ImMessageReceiveV1,
api_client: lark_oapi.Client,
) -> platform_events.Event | None:
return await LarkEventConverter.message_to_eba(event, api_client)
@staticmethod
async def target2legacy(
event: lark_oapi.im.v1.P2ImMessageReceiveV1,
api_client: lark_oapi.Client,
) -> platform_events.FriendMessage | platform_events.GroupMessage | None:
eba_event = await LarkEventConverter.message_to_eba(event, api_client)
if eba_event:
return eba_event.to_legacy_event()
return None
@staticmethod
async def message_to_eba(
event: lark_oapi.im.v1.P2ImMessageReceiveV1,
api_client: lark_oapi.Client,
) -> platform_events.MessageReceivedEvent:
message = event.event.message
message_chain = await LarkMessageConverter.target2yiri(message, api_client)
await LarkEventConverter._append_quote_content(message, message_chain, api_client)
sender = LarkEventConverter.user_from_event(event)
chat_type = platform_entities.ChatType.PRIVATE
chat_id = LarkEventConverter.sender_id(event)
group = None
if getattr(message, 'chat_type', '') == 'group':
chat_type = platform_entities.ChatType.GROUP
chat_id = getattr(message, 'chat_id', '') or chat_id
group = platform_entities.UserGroup(id=chat_id, name='')
return platform_events.MessageReceivedEvent(
type='message.received',
adapter_name=ADAPTER_NAME,
message_id=getattr(message, 'message_id', ''),
message_chain=message_chain,
sender=sender,
chat_type=chat_type,
chat_id=chat_id,
group=group,
timestamp=LarkEventConverter._timestamp(getattr(message, 'create_time', None)),
source_platform_object=event,
)
@staticmethod
def user_from_event(event: lark_oapi.im.v1.P2ImMessageReceiveV1) -> platform_entities.User:
sender_id = getattr(getattr(event.event.sender, 'sender_id', None), 'open_id', '') or ''
union_id = getattr(getattr(event.event.sender, 'sender_id', None), 'union_id', '') or ''
return platform_entities.User(id=sender_id, nickname=union_id)
@staticmethod
def sender_id(event: lark_oapi.im.v1.P2ImMessageReceiveV1) -> str:
return getattr(getattr(event.event.sender, 'sender_id', None), 'open_id', '') or ''
@staticmethod
def bot_invited_to_group(
raw_event: typing.Any,
chat_id: str,
operator_id: str | None = None,
) -> platform_events.BotInvitedToGroupEvent:
return platform_events.BotInvitedToGroupEvent(
type='bot.invited_to_group',
adapter_name=ADAPTER_NAME,
group=platform_entities.UserGroup(id=chat_id, name=''),
inviter=platform_entities.User(id=operator_id) if operator_id else None,
timestamp=time.time(),
source_platform_object=raw_event,
)
@staticmethod
def platform_specific(
raw_event: typing.Any, action: str, data: dict | None = None
) -> platform_events.PlatformSpecificEvent:
return platform_events.PlatformSpecificEvent(
type='platform.specific',
adapter_name=ADAPTER_NAME,
action=action,
data=data or {},
timestamp=time.time(),
source_platform_object=raw_event,
)
@classmethod
def _prune_processed_thread_quote_cache(cls, now: float | None = None) -> None:
if now is None:
now = time.time()
expire_before = now - cls._processed_thread_quote_cache_ttl_seconds
while cls._processed_thread_quote_cache:
oldest_key, oldest_ts = next(iter(cls._processed_thread_quote_cache.items()))
if oldest_ts >= expire_before:
break
cls._processed_thread_quote_cache.pop(oldest_key, None)
while len(cls._processed_thread_quote_cache) > cls._processed_thread_quote_cache_max_size:
cls._processed_thread_quote_cache.pop(next(iter(cls._processed_thread_quote_cache)), None)
@classmethod
def _extract_quote_message_id(cls, message: EventMessage) -> str | None:
parent_id = getattr(message, 'parent_id', None)
if not parent_id or parent_id == getattr(message, 'message_id', None):
return None
thread_id = getattr(message, 'thread_id', None)
if thread_id:
cls._prune_processed_thread_quote_cache()
if thread_id in cls._processed_thread_quote_cache:
return None
cls._processed_thread_quote_cache[thread_id] = time.time()
return parent_id
@staticmethod
async def _append_quote_content(
message: EventMessage,
message_chain: platform_message.MessageChain,
api_client: lark_oapi.Client,
) -> None:
quote_message_id = LarkEventConverter._extract_quote_message_id(message)
if not quote_message_id:
return
quote_chain = await LarkEventConverter._fetch_quoted_message(quote_message_id, api_client)
if not quote_chain:
return
origin = platform_message.MessageChain(
[comp for comp in quote_chain if not isinstance(comp, platform_message.Source)]
)
message_chain.append(
platform_message.Quote(
id=quote_message_id,
group_id=getattr(message, 'chat_id', None),
target_id=getattr(message, 'chat_id', None),
origin=origin,
)
)
@staticmethod
async def _fetch_quoted_message(
quote_message_id: str,
api_client: lark_oapi.Client,
) -> platform_message.MessageChain | None:
request = GetMessageRequest.builder().message_id(quote_message_id).build()
response = await api_client.im.v1.message.aget(request)
if not response.success() or not getattr(response.data, 'items', None):
return None
event_message = LarkEventConverter._build_event_message_from_message_item(response.data.items[0])
if event_message is None:
return None
return await LarkMessageConverter.target2yiri(event_message, api_client)
@staticmethod
def _build_event_message_from_message_item(message_item: Message) -> EventMessage | None:
body = getattr(message_item, 'body', None)
content = getattr(body, 'content', None) if body else None
if not content:
return None
event_data = {
'message_id': message_item.message_id,
'message_type': message_item.msg_type,
'content': content,
'create_time': message_item.create_time,
'mentions': getattr(message_item, 'mentions', []) or [],
}
for key in ('parent_id', 'root_id', 'thread_id', 'chat_id'):
value = getattr(message_item, key, None)
if value:
event_data[key] = value
return EventMessage(event_data)
@staticmethod
def _timestamp(value: typing.Any) -> float:
if isinstance(value, (int, float, str)):
try:
timestamp = float(value)
return timestamp / 1000 if timestamp > 10_000_000_000 else timestamp
except ValueError:
pass
if hasattr(value, 'timestamp'):
return float(value.timestamp())
return 0.0

View File

@@ -0,0 +1 @@
<?xml version="1.0" standalone="no"?><!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"><svg t="1711946937387" class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="5208" xmlns:xlink="http://www.w3.org/1999/xlink" width="200" height="200"><path d="M262.339048 243.809524h326.070857s91.672381 84.504381 91.672381 200.655238l-152.81981 105.569524S445.781333 359.960381 262.339048 243.809524z" fill="#00DAB8" p-id="5209"></path><path d="M853.333333 423.350857s-112.103619-42.276571-183.393523-10.581333c-71.338667 31.695238-101.912381 73.923048-132.486096 105.618286-40.71619 42.22781-112.054857 116.150857-173.202285 73.923047-61.147429-42.276571 244.540952 147.846095 244.540952 147.846095s127.463619-71.631238 173.202286-190.122666C822.759619 444.464762 853.333333 423.350857 853.333333 423.350857z" fill="#0C3AA0" p-id="5210"></path><path d="M170.666667 402.236952v316.757334s112.298667 138.142476 376.978285 63.390476c112.103619-31.695238 203.824762-179.541333 203.824762-179.541333S618.934857 824.612571 170.666667 402.285714z" fill="#296DFF" p-id="5211"></path></svg>

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -0,0 +1,185 @@
apiVersion: v1
kind: MessagePlatformAdapter
metadata:
name: lark-eba
label:
en_US: Lark / Feishu (EBA)
zh_Hans: 飞书 (EBA)
zh_Hant: 飛書 (EBA)
ja_JP: Lark (EBA)
description:
en_US: Lark/Feishu adapter (EBA architecture), supporting self-built/store apps and WebSocket/Webhook modes.
zh_Hans: 飞书适配器EBA 架构版本),支持自建/商店应用和长连接/Webhook 两种通信模式。
zh_Hant: 飛書適配器EBA 架構版本),支援自建/商店應用和長連線/Webhook 兩種通訊模式。
ja_JP: Lark アダプターEBA アーキテクチャ)、カスタム/ストアアプリと WebSocket/Webhook モードをサポートします。
icon: lark.svg
spec:
categories:
- popular
- china
- global
help_links:
zh: https://link.langbot.app/zh/platforms/lark
en: https://link.langbot.app/en/platforms/lark
ja: https://link.langbot.app/ja/platforms/lark
config:
- name: app_id
label:
en_US: App ID
zh_Hans: 应用ID
zh_Hant: 應用ID
ja_JP: アプリ ID
type: string
required: true
default: ""
- name: app_secret
label:
en_US: App Secret
zh_Hans: 应用密钥
zh_Hant: 應用密鑰
ja_JP: アプリシークレット
type: string
required: true
default: ""
- name: bot_name
label:
en_US: Bot Name
zh_Hans: 机器人名称
zh_Hant: 機器人名稱
ja_JP: ボット名
description:
en_US: Must match the Lark bot name so group mentions can be recognized.
zh_Hans: 必须与飞书机器人名称一致,否则机器人将无法在群内正常识别 @。
zh_Hant: 必須與飛書機器人名稱一致,否則機器人將無法在群組內正常識別 @。
ja_JP: グループメンションを認識するには Lark のボット名と一致する必要があります。
type: string
required: true
default: ""
- name: enable-webhook
label:
en_US: Enable Webhook Mode
zh_Hans: 启用 Webhook 模式
zh_Hant: 啟用 Webhook 模式
ja_JP: Webhook モードを有効化
description:
en_US: Enable request URL callback mode. Disable it to use WebSocket long connection mode.
zh_Hans: 启用 Request URL 回调模式。关闭时使用 WebSocket 长连接模式。
zh_Hant: 啟用 Request URL 回調模式。關閉時使用 WebSocket 長連線模式。
ja_JP: Request URL コールバックモードを有効化します。無効時は WebSocket 長期接続を使用します。
type: boolean
required: true
default: false
- name: webhook_url
label:
en_US: Webhook Callback URL
zh_Hans: Webhook 回调地址
zh_Hant: Webhook 回調地址
ja_JP: Webhook コールバック URL
description:
en_US: Copy this URL to the Lark app event subscription request URL.
zh_Hans: 复制此地址并粘贴到飞书应用事件订阅的 Request URL 中。
zh_Hant: 複製此地址並貼到飛書應用事件訂閱的 Request URL 中。
ja_JP: この URL を Lark アプリのイベント購読 Request URL に貼り付けてください。
type: webhook-url
required: false
default: ""
show_if:
field: enable-webhook
operator: eq
value: true
- name: encrypt-key
label:
en_US: Encrypt Key
zh_Hans: 加密密钥
zh_Hant: 加密密鑰
ja_JP: 暗号化キー
type: string
required: false
default: ""
show_if:
field: enable-webhook
operator: eq
value: true
- name: enable-stream-reply
label:
en_US: Enable Stream Reply Mode
zh_Hans: 启用飞书流式回复模式
zh_Hant: 啟用飛書串流回覆模式
ja_JP: ストリーミング返信モードを有効化
description:
en_US: If enabled, replies are rendered through an updating Lark card.
zh_Hans: 如果启用,将使用可更新的飞书卡片进行流式回复。
zh_Hant: 如果啟用,將使用可更新的飛書卡片進行串流回覆。
ja_JP: 有効にすると、更新可能な Lark カードでストリーミング返信します。
type: boolean
required: true
default: false
- name: app_type
label:
en_US: App Type
zh_Hans: 应用类型
zh_Hant: 應用類型
ja_JP: アプリタイプ
type: select
options:
- name: self
label:
en_US: Self-built Application
zh_Hans: 自建应用
zh_Hant: 自建應用
ja_JP: カスタムアプリ
- name: isv
label:
en_US: Store Application
zh_Hans: 商店应用
zh_Hant: 商店應用
ja_JP: ストアアプリ
required: false
default: self
- name: bot_added_welcome
label:
en_US: Bot Welcome Message
zh_Hans: 机器人进群欢迎语
zh_Hant: 機器人進群歡迎語
ja_JP: ボット参加時のウェルカムメッセージ
type: text
required: false
default: ""
supported_events:
- message.received
- bot.invited_to_group
- platform.specific
supported_apis:
required:
- send_message
- reply_message
optional:
- get_message
- get_group_info
- get_group_member_info
- get_user_info
- get_file_url
- call_platform_api
platform_specific_apis:
- action: check_tenant_access_token
description: { en_US: "Check whether the tenant access token can be obtained", zh_Hans: "检查 tenant access token 是否可获取" }
- action: refresh_app_access_token
description: { en_US: "Refresh store-app app access token", zh_Hans: "刷新商店应用 app access token" }
- action: refresh_tenant_access_token
description: { en_US: "Refresh store-app tenant access token", zh_Hans: "刷新商店应用 tenant access token" }
- action: get_chat
description: { en_US: "Get Lark chat metadata", zh_Hans: "获取飞书会话信息" }
- action: get_message
description: { en_US: "Get a Lark message", zh_Hans: "获取飞书消息" }
- action: get_message_resource
description: { en_US: "Download message image/file resource", zh_Hans: "下载消息图片/文件资源" }
execution:
python:
path: ./adapter.py
attr: LarkAdapter

View File

@@ -0,0 +1,405 @@
from __future__ import annotations
import base64
import datetime
import json
import mimetypes
import os
import re
import tempfile
import traceback
import lark_oapi
from lark_oapi.api.im.v1 import (
CreateFileRequest,
CreateFileRequestBody,
CreateImageRequest,
CreateImageRequestBody,
EventMessage,
GetMessageResourceRequest,
GetMessageResourceResponse,
)
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot.pkg.utils import httpclient
from langbot_plugin.api.entities.builtin.platform import message as platform_message
class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
async def upload_image_to_lark(msg: platform_message.Image, api_client: lark_oapi.Client) -> str | None:
image_bytes = await LarkMessageConverter._get_component_bytes(msg)
if image_bytes is None:
return None
temp_file_path = ''
try:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(image_bytes)
temp_file.flush()
temp_file_path = temp_file.name
request = (
CreateImageRequest.builder()
.request_body(
CreateImageRequestBody.builder().image_type('message').image(open(temp_file_path, 'rb')).build()
)
.build()
)
response = await api_client.im.v1.image.acreate(request)
if not response.success():
return None
return response.data.image_key
except Exception:
traceback.print_exc()
return None
finally:
if temp_file_path:
try:
os.unlink(temp_file_path)
except FileNotFoundError:
pass
@staticmethod
async def upload_file_to_lark(
file_bytes: bytes,
api_client: lark_oapi.Client,
file_type: str,
file_name: str = 'file',
duration: int | None = None,
) -> str | None:
temp_file_path = ''
try:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(file_bytes)
temp_file.flush()
temp_file_path = temp_file.name
body_builder = (
CreateFileRequestBody.builder()
.file_type(file_type)
.file_name(file_name)
.file(open(temp_file_path, 'rb'))
)
if duration is not None:
body_builder = body_builder.duration(duration)
request = CreateFileRequest.builder().request_body(body_builder.build()).build()
response = await api_client.im.v1.file.acreate(request)
if not response.success():
return None
return response.data.file_key
except Exception:
traceback.print_exc()
return None
finally:
if temp_file_path:
try:
os.unlink(temp_file_path)
except FileNotFoundError:
pass
@staticmethod
async def _get_component_bytes(
msg: platform_message.Image | platform_message.Voice | platform_message.File,
) -> bytes | None:
if getattr(msg, 'base64', None):
try:
base64_data = msg.base64
if ',' in base64_data:
base64_data = base64_data.split(',', 1)[1]
return base64.b64decode(base64_data)
except Exception:
return None
if getattr(msg, 'url', None):
try:
if str(msg.url).startswith('file://'):
with open(str(msg.url)[7:], 'rb') as f:
return f.read()
session = httpclient.get_session()
async with session.get(msg.url) as response:
if response.status == 200:
return await response.read()
except Exception:
return None
if getattr(msg, 'path', None):
try:
with open(msg.path, 'rb') as f:
return f.read()
except Exception:
return None
return None
@staticmethod
def _lark_file_type(file_name: str) -> str:
ext = os.path.splitext(file_name)[1].lstrip('.').lower()
return {
'opus': 'opus',
'mp4': 'mp4',
'pdf': 'pdf',
'doc': 'doc',
'docx': 'doc',
'xls': 'xls',
'xlsx': 'xls',
'ppt': 'ppt',
'pptx': 'ppt',
}.get(ext, 'stream')
@staticmethod
async def yiri2target(
message_chain: platform_message.MessageChain,
api_client: lark_oapi.Client,
) -> tuple[list[list[dict]], list[dict]]:
message_elements: list[list[dict]] = []
media_items: list[dict] = []
pending_paragraph: list[dict] = []
markdown_image_pattern = re.compile(r'!\[([^\]]*)\]\(([^)]+)\)')
async def process_text_with_images(text: str) -> tuple[str, list[str]]:
matches = list(markdown_image_pattern.finditer(text))
if not matches:
return text, []
cleaned_text = text
extracted_urls: list[str] = []
for match in reversed(matches):
extracted_urls.insert(0, match.group(2))
cleaned_text = cleaned_text[: match.start()] + cleaned_text[match.end() :]
cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text).strip()
return cleaned_text, extracted_urls
for msg in message_chain:
if isinstance(msg, platform_message.Source):
continue
if isinstance(msg, platform_message.Plain):
cleaned_text, extracted_urls = await process_text_with_images(msg.text)
if cleaned_text:
segments = re.split(r'\n\s*\n', cleaned_text)
for i, segment in enumerate(segments):
segment = segment.strip()
if not segment:
continue
if i > 0 and pending_paragraph:
message_elements.append(pending_paragraph)
pending_paragraph = []
pending_paragraph.append({'tag': 'md', 'text': segment})
for url in extracted_urls:
image_key = await LarkMessageConverter.upload_image_to_lark(
platform_message.Image(url=url), api_client
)
if image_key:
media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}})
elif isinstance(msg, platform_message.At):
pending_paragraph.append({'tag': 'at', 'user_id': str(msg.target), 'style': []})
elif isinstance(msg, platform_message.AtAll):
pending_paragraph.append({'tag': 'at', 'user_id': 'all', 'style': []})
elif isinstance(msg, platform_message.Image):
image_key = await LarkMessageConverter.upload_image_to_lark(msg, api_client)
if image_key:
media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}})
elif isinstance(msg, platform_message.Voice):
data = await LarkMessageConverter._get_component_bytes(msg)
if data:
duration = int(msg.length * 1000) if msg.length else None
file_key = await LarkMessageConverter.upload_file_to_lark(
data, api_client, file_type='opus', file_name='voice.opus', duration=duration
)
if file_key:
media_items.append({'msg_type': 'audio', 'content': {'file_key': file_key}})
elif isinstance(msg, platform_message.File):
data = await LarkMessageConverter._get_component_bytes(msg)
if data:
file_name = msg.name or 'file'
file_key = await LarkMessageConverter.upload_file_to_lark(
data,
api_client,
file_type=LarkMessageConverter._lark_file_type(file_name),
file_name=file_name,
)
if file_key:
media_items.append({'msg_type': 'file', 'content': {'file_key': file_key}})
elif isinstance(msg, platform_message.Quote):
if msg.id:
pending_paragraph.append({'tag': 'md', 'text': f'[引用消息 {msg.id}] '})
if msg.origin:
sub_elements, sub_media = await LarkMessageConverter.yiri2target(msg.origin, api_client)
message_elements.extend(sub_elements)
media_items.extend(sub_media)
elif isinstance(msg, platform_message.Forward):
for node in msg.node_list:
if node.sender_name or node.sender_id:
pending_paragraph.append({'tag': 'md', 'text': f'\n[{node.sender_name or node.sender_id}] '})
sub_elements, sub_media = await LarkMessageConverter.yiri2target(node.message_chain, api_client)
message_elements.extend(sub_elements)
media_items.extend(sub_media)
if pending_paragraph:
message_elements.append(pending_paragraph)
return message_elements, media_items
@staticmethod
async def target2yiri(
message: EventMessage,
api_client: lark_oapi.Client,
) -> platform_message.MessageChain:
message_content = json.loads(message.content or '{}')
create_time = LarkMessageConverter._message_time(message)
components: list[platform_message.MessageComponent] = [
platform_message.Source(id=message.message_id, time=create_time)
]
normalized = LarkMessageConverter._normalize_inbound_content(message, message_content)
for ele in normalized:
tag = ele.get('tag')
if tag in {'text', 'md'}:
text = ele.get('text') or ''
if text:
components.append(platform_message.Plain(text=text))
elif tag == 'at':
user_id = ele.get('user_id') or ele.get('user_name') or ''
display = ele.get('user_name') or user_id
if user_id == 'all':
components.append(platform_message.AtAll())
else:
components.append(platform_message.At(target=user_id, display=display))
elif tag == 'img':
image_key = ele.get('image_key') or ''
image = await LarkMessageConverter._download_resource(
api_client, message.message_id, image_key, 'image'
)
components.append(platform_message.Image(image_id=image_key, **image))
elif tag == 'audio':
file_key = ele.get('file_key') or ''
audio = await LarkMessageConverter._download_resource(api_client, message.message_id, file_key, 'file')
components.append(
platform_message.Voice(
voice_id=file_key,
length=(ele.get('duration', 0) // 1000) if ele.get('duration') else None,
**audio,
)
)
elif tag == 'file':
file_key = ele.get('file_key') or ''
file_name = ele.get('file_name') or 'file'
file_data = await LarkMessageConverter._download_resource(
api_client, message.message_id, file_key, 'file'
)
components.append(
platform_message.File(
id=file_key,
name=file_name,
size=file_data.pop('size', 0),
**file_data,
)
)
return platform_message.MessageChain(components)
@staticmethod
def _normalize_inbound_content(message: EventMessage, content: dict) -> list[dict]:
if message.message_type == 'text':
text = content.get('text', '')
return LarkMessageConverter._split_text_mentions(text, getattr(message, 'mentions', []) or [])
if message.message_type == 'post':
post_content = content.get('content', [])
flattened: list[dict] = []
for ele in post_content:
if isinstance(ele, dict):
flattened.append(ele)
elif isinstance(ele, list):
flattened.extend(item for item in ele if isinstance(item, dict))
return flattened
if message.message_type == 'image':
return [{'tag': 'img', 'image_key': content.get('image_key', ''), 'style': []}]
if message.message_type == 'file':
return [
{
'tag': 'file',
'file_key': content.get('file_key', ''),
'file_name': content.get('file_name', 'file'),
}
]
if message.message_type == 'audio':
return [
{
'tag': 'audio',
'file_key': content.get('file_key', ''),
'duration': content.get('duration', 0),
}
]
return [{'tag': 'text', 'text': json.dumps(content, ensure_ascii=False), 'style': []}]
@staticmethod
def _split_text_mentions(text: str, mentions: list) -> list[dict]:
if not text:
return []
mention_by_key = {getattr(m, 'key', ''): m for m in mentions}
pattern = re.compile(r'@_user_\d+')
result: list[dict] = []
pos = 0
for match in pattern.finditer(text):
if match.start() > pos:
result.append({'tag': 'text', 'text': text[pos : match.start()], 'style': []})
mention = mention_by_key.get(match.group(0))
if mention:
result.append(
{
'tag': 'at',
'user_id': getattr(mention, 'id', None)
or getattr(mention, 'open_id', None)
or getattr(mention, 'user_id', None)
or getattr(mention, 'key', match.group(0)),
'user_name': getattr(mention, 'name', ''),
'style': [],
}
)
else:
result.append({'tag': 'text', 'text': match.group(0), 'style': []})
pos = match.end()
if pos < len(text):
result.append({'tag': 'text', 'text': text[pos:], 'style': []})
return result
@staticmethod
async def _download_resource(
api_client: lark_oapi.Client,
message_id: str,
file_key: str,
resource_type: str,
) -> dict:
if not file_key:
return {}
request = (
GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(resource_type).build()
)
response: GetMessageResourceResponse = await api_client.im.v1.message_resource.aget(request)
if not response.success():
return {}
data = response.file.read()
content_type = response.raw.headers.get('content-type', 'application/octet-stream')
base64_data = base64.b64encode(data).decode()
ext = mimetypes.guess_extension(content_type.split(';')[0].strip()) or '.bin'
temp_path = os.path.join(tempfile.gettempdir(), f'lark_{file_key}{ext}')
with open(temp_path, 'wb') as f:
f.write(data)
return {
'url': f'file://{temp_path}',
'path': temp_path,
'base64': f'data:{content_type};base64,{base64_data}',
'size': len(data),
}
@staticmethod
def _message_time(message: EventMessage) -> datetime.datetime:
value = getattr(message, 'create_time', None)
if isinstance(value, datetime.datetime):
return value
if isinstance(value, (int, float, str)):
try:
timestamp = float(value)
if timestamp > 10_000_000_000:
timestamp = timestamp / 1000
return datetime.datetime.fromtimestamp(timestamp)
except ValueError:
pass
return datetime.datetime.now()

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
import json
from lark_oapi.api.im.v1 import GetChatRequest, GetMessageRequest, GetMessageResourceRequest
async def check_tenant_access_token(adapter, params: dict) -> dict:
tenant_key = params.get('tenant_key') or getattr(adapter, 'lark_tenant_key', None)
token = adapter.get_tenant_access_token(tenant_key)
return {'ok': bool(token) or adapter.config.get('app_type', 'self') != 'isv'}
async def refresh_app_access_token(adapter, params: dict) -> dict:
adapter.app_access_token = None
adapter.app_access_token_expire_at = None
token = adapter.get_app_access_token()
return {'ok': bool(token) or adapter.config.get('app_type', 'self') != 'isv'}
async def refresh_tenant_access_token(adapter, params: dict) -> dict:
tenant_key = params.get('tenant_key') or getattr(adapter, 'lark_tenant_key', None)
if tenant_key:
adapter.tenant_access_tokens.pop(tenant_key, None)
token = adapter.get_tenant_access_token(tenant_key)
return {'ok': bool(token) or adapter.config.get('app_type', 'self') != 'isv'}
async def get_chat(adapter, params: dict) -> dict:
request = GetChatRequest.builder().chat_id(params['chat_id']).build()
response = await adapter.api_client.im.v1.chat.aget(request, adapter.request_option(params.get('tenant_key')))
return _response_to_dict(response)
async def get_message(adapter, params: dict) -> dict:
request = GetMessageRequest.builder().message_id(params['message_id']).build()
response = await adapter.api_client.im.v1.message.aget(request, adapter.request_option(params.get('tenant_key')))
return _response_to_dict(response)
async def get_message_resource(adapter, params: dict) -> dict:
request = (
GetMessageResourceRequest.builder()
.message_id(params['message_id'])
.file_key(params['file_key'])
.type(params.get('type', 'file'))
.build()
)
response = await adapter.api_client.im.v1.message_resource.aget(
request, adapter.request_option(params.get('tenant_key'))
)
if not response.success():
return _response_to_dict(response)
content_type = response.raw.headers.get('content-type', 'application/octet-stream')
data = response.file.read()
return {'ok': True, 'content_type': content_type, 'size': len(data)}
def _response_to_dict(response) -> dict:
if not response.success():
return {'ok': False, 'code': response.code, 'msg': response.msg, 'log_id': response.get_log_id()}
data = getattr(response, 'data', None)
if hasattr(data, 'to_json'):
data = data.to_json()
return {'ok': True, 'data': _jsonable(data)}
def _jsonable(value):
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, bytes):
return {'bytes': len(value)}
if isinstance(value, (list, tuple, set)):
return [_jsonable(item) for item in value]
if isinstance(value, dict):
return {str(key): _jsonable(item) for key, item in value.items()}
if isinstance(value, str):
return value
try:
return json.loads(value)
except Exception:
pass
raw = getattr(value, '__dict__', None)
if raw:
return {key: _jsonable(item) for key, item in raw.items() if not key.startswith('_')}
return str(value)
PLATFORM_API_MAP = {
'check_tenant_access_token': check_tenant_access_token,
'refresh_app_access_token': refresh_app_access_token,
'refresh_tenant_access_token': refresh_tenant_access_token,
'get_chat': get_chat,
'get_message': get_message,
'get_message_resource': get_message_resource,
}

View File

@@ -0,0 +1,3 @@
from __future__ import annotations
ADAPTER_NAME = 'lark-eba'