feat: Telegram EBA adapter - full implementation

- TelegramAdapter inherits AbstractPlatformAdapter with all capabilities
- TelegramEventConverter handles all Update types: message, edited_message,
  chat_member, my_chat_member, callback_query, message_reaction
- TelegramAPIMixin implements: edit_message, delete_message, forward_message,
  get_group_info, get_group_member_list/info, get_user_info, get_file_url,
  mute/unmute/kick_member, leave_group
- PLATFORM_API_MAP for call_platform_api: pin/unpin message, set chat title/desc,
  get admins, send chat action, create invite link, answer callback query
- Full backward compat: legacy FriendMessage/GroupMessage listeners still work
- Preserves all existing functionality: stream output, markdown card, forum topics
- Old sources/telegram.py untouched for gradual migration
This commit is contained in:
RockChinQ
2026-03-22 22:32:27 +08:00
committed by Junyan Qin
parent 9f23f4c572
commit d1b7d56392
9 changed files with 1414 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
from langbot.pkg.platform.adapters.telegram.adapter import TelegramAdapter
__all__ = ["TelegramAdapter"]

View File

@@ -0,0 +1,387 @@
"""Telegram adapter main class (EBA version).
Inherits AbstractPlatformAdapter, integrating all modules.
Preserves all existing functionality (messaging, streaming output, markdown card, forum topics, etc.).
"""
from __future__ import annotations
import time
import typing
import traceback
import telegram
import telegram.ext
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters
import telegramify_markdown
import pydantic
from langbot.pkg.utils import httpclient
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger
from langbot.pkg.platform.adapters.telegram.message_converter import TelegramMessageConverter
from langbot.pkg.platform.adapters.telegram.event_converter import TelegramEventConverter, LegacyEventConverter
from langbot.pkg.platform.adapters.telegram.api_impl import TelegramAPIMixin
from langbot.pkg.platform.adapters.telegram.platform_api import PLATFORM_API_MAP
class TelegramAdapter(TelegramAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter):
"""Telegram adapter (EBA version)."""
bot: telegram.Bot = pydantic.Field(exclude=True)
application: telegram.ext.Application = pydantic.Field(exclude=True)
message_converter: TelegramMessageConverter = TelegramMessageConverter()
event_converter: TelegramEventConverter = TelegramEventConverter()
legacy_event_converter: LegacyEventConverter = LegacyEventConverter()
config: dict
msg_stream_id: dict
"""Stream message ID map. Key: stream message ID, value: first message source ID."""
seq: int
"""Sequence number for message ordering."""
listeners: typing.Dict[
typing.Type[platform_events.Event],
typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None],
] = {}
class Config:
arbitrary_types_allowed = True
def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger):
async def telegram_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
if not update.message and not update.edited_message and not update.chat_member \
and not update.my_chat_member and not update.callback_query and not update.message_reaction:
return
# Skip messages from the bot itself
if update.message and update.message.from_user and update.message.from_user.is_bot:
return
try:
# Legacy event type callbacks (compat with existing botmgr FriendMessage / GroupMessage listeners)
if update.message and (platform_events.FriendMessage in self.listeners
or platform_events.GroupMessage in self.listeners):
legacy_event = await self.legacy_event_converter.target2yiri(update, self.bot, self.bot_account_id)
if legacy_event and type(legacy_event) in self.listeners:
await self.listeners[type(legacy_event)](legacy_event, self)
# EBA wildcard event callback (Event base class registered as wildcard)
if platform_events.Event in self.listeners:
eba_event = await self.event_converter.target2yiri(update, self.bot, self.bot_account_id)
if eba_event:
await self.listeners[platform_events.Event](eba_event, self)
# EBA specific event type callback
if platform_events.EBAEvent in self.listeners:
eba_event = await self.event_converter.target2yiri(update, self.bot, self.bot_account_id)
if eba_event:
await self.listeners[platform_events.EBAEvent](eba_event, self)
except Exception:
await self.logger.error(f'Error in telegram callback: {traceback.format_exc()}')
application = ApplicationBuilder().token(config['token']).build()
bot = application.bot
# Register handler for all common update types
application.add_handler(
MessageHandler(
filters.TEXT | (filters.COMMAND) | filters.PHOTO | filters.VOICE | filters.Document.ALL,
telegram_callback,
)
)
# Register edited message handler
application.add_handler(
MessageHandler(
filters.UpdateType.EDITED_MESSAGE,
telegram_callback,
)
)
super().__init__(
config=config,
logger=logger,
msg_stream_id={},
seq=1,
bot=bot,
application=application,
bot_account_id='',
listeners={},
)
# ---- Capability Declaration ----
def get_supported_events(self) -> list[str]:
return [
"message.received",
"message.edited",
"message.deleted",
"message.reaction",
"group.member_joined",
"group.member_left",
"group.member_banned",
"group.info_updated",
"bot.invited_to_group",
"bot.removed_from_group",
]
def get_supported_apis(self) -> list[str]:
return [
"send_message",
"reply_message",
"edit_message",
"delete_message",
"forward_message",
"get_group_info",
"get_group_member_list",
"get_group_member_info",
"get_user_info",
"get_file_url",
"mute_member",
"unmute_member",
"kick_member",
"leave_group",
"call_platform_api",
]
# ---- Message Send / Reply (preserving original logic) ----
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
components = await TelegramMessageConverter.yiri2target(message, self.bot)
chat_id_str, _, thread_id_str = str(target_id).partition('#')
chat_id: int | str = int(chat_id_str) if chat_id_str.lstrip('-').isdigit() else chat_id_str
message_thread_id = int(thread_id_str) if thread_id_str and thread_id_str.isdigit() else None
for component in components:
component_type = component.get('type')
args = {'chat_id': chat_id}
if message_thread_id is not None:
args['message_thread_id'] = message_thread_id
if component_type == 'text':
text = component.get('text', '')
if self.config['markdown_card'] is True:
text = telegramify_markdown.markdownify(content=text)
args['parse_mode'] = 'MarkdownV2'
args['text'] = text
await self.bot.send_message(**args)
elif component_type == 'photo':
photo = component.get('photo')
if photo is None:
continue
args['photo'] = telegram.InputFile(photo)
await self.bot.send_photo(**args)
elif component_type == 'document':
doc = component.get('document')
if doc is None:
continue
filename = component.get('filename', 'file')
args['document'] = telegram.InputFile(doc, filename=filename)
await self.bot.send_document(**args)
async def reply_message(
self,
message_source: platform_events.MessageEvent,
message: platform_message.MessageChain,
quote_origin: bool = False,
):
assert isinstance(message_source.source_platform_object, Update)
components = await TelegramMessageConverter.yiri2target(message, self.bot)
for component in components:
if component['type'] == 'text':
if self.config['markdown_card'] is True:
content = telegramify_markdown.markdownify(
content=component['text'],
)
else:
content = component['text']
args = {
'chat_id': message_source.source_platform_object.effective_chat.id,
'text': content,
}
if self.config['markdown_card'] is True:
args['parse_mode'] = 'MarkdownV2'
if message_source.source_platform_object.message.message_thread_id:
args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id
if quote_origin:
args['reply_to_message_id'] = message_source.source_platform_object.message.id
await self.bot.send_message(**args)
# ---- Streaming Output (preserving original logic) ----
def _process_markdown(self, text: str) -> str:
if self.config.get('markdown_card', False):
return telegramify_markdown.markdownify(content=text)
return text
def _build_message_args(self, chat_id: int, text: str, message_thread_id: int = None, **extra_args) -> dict:
args = {'chat_id': chat_id, 'text': self._process_markdown(text), **extra_args}
if message_thread_id:
args['message_thread_id'] = message_thread_id
if self.config.get('markdown_card', False):
args['parse_mode'] = 'MarkdownV2'
return args
async def create_message_card(self, message_id, event):
assert isinstance(event.source_platform_object, Update)
update = event.source_platform_object
chat_id = update.effective_chat.id
chat_type = update.effective_chat.type
message_thread_id = update.message.message_thread_id
if chat_type == 'private':
draft_id = int(time.time() * 1000)
self.msg_stream_id[message_id] = ('private', draft_id)
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id)
await self.bot.send_message_draft(**args)
else:
args = self._build_message_args(chat_id, 'Thinking...', message_thread_id)
send_msg = await self.bot.send_message(**args)
self.msg_stream_id[message_id] = ('group', send_msg.message_id)
return True
async def reply_message_chunk(
self,
message_source: platform_events.MessageEvent,
bot_message,
message: platform_message.MessageChain,
quote_origin: bool = False,
is_final: bool = False,
):
message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence
assert isinstance(message_source.source_platform_object, Update)
update = message_source.source_platform_object
chat_id = update.effective_chat.id
message_thread_id = update.message.message_thread_id
if message_id not in self.msg_stream_id:
return
chat_mode, draft_id = self.msg_stream_id[message_id]
components = await TelegramMessageConverter.yiri2target(message, self.bot)
if not components or components[0]['type'] != 'text':
if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id)
return
content = components[0]['text']
if chat_mode == 'private':
args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id)
await self.bot.send_message_draft(**args)
if is_final and bot_message.tool_calls is None:
del args['draft_id']
await self.bot.send_message(**args)
self.msg_stream_id.pop(message_id)
else:
stream_id = draft_id
if (msg_seq - 1) % 8 == 0 or is_final:
args = {
'message_id': stream_id,
'chat_id': chat_id,
'text': self._process_markdown(content),
}
if self.config.get('markdown_card', False):
args['parse_mode'] = 'MarkdownV2'
await self.bot.edit_message_text(**args)
if is_final and bot_message.tool_calls is None:
self.msg_stream_id.pop(message_id)
# ---- Forum Topic / Custom launcher_id (preserving original logic) ----
def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None:
if not isinstance(event.source_platform_object, Update):
return None
message = event.source_platform_object.message
if not message:
return None
if message.message_thread_id:
if isinstance(event, platform_events.GroupMessage):
return f'{event.group.id}#{message.message_thread_id}'
elif isinstance(event, platform_events.FriendMessage):
return f'{event.sender.id}#{message.message_thread_id}'
return None
# ---- Stream Output Support Check ----
async def is_stream_output_supported(self) -> bool:
is_stream = False
if self.config.get('enable-stream-reply', None):
is_stream = True
return is_stream
async def is_muted(self, group_id: int) -> bool:
return False
# ---- Event Listeners ----
def register_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
self.listeners[event_type] = callback
def unregister_listener(
self,
event_type: typing.Type[platform_events.Event],
callback: typing.Callable[
[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None
],
):
self.listeners.pop(event_type, None)
# ---- Pass-through API ----
async def call_platform_api(
self,
action: str,
params: dict = {},
) -> dict:
"""Call a Telegram-specific platform API."""
handler = PLATFORM_API_MAP.get(action)
if handler is None:
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
raise NotSupportedError(f"call_platform_api:{action}")
return await handler(self.bot, params)
# ---- Lifecycle ----
async def run_async(self):
await self.application.initialize()
self.bot_account_id = (await self.bot.get_me()).username
await self.application.updater.start_polling(allowed_updates=Update.ALL_TYPES)
await self.application.start()
await self.logger.info('Telegram adapter running')
async def kill(self) -> bool:
if self.application.running:
await self.application.stop()
if self.application.updater:
await self.application.updater.stop()
await self.logger.info('Telegram adapter stopped')
return True

View File

@@ -0,0 +1,242 @@
"""Telegram universal API implementation (EBA version).
Implements optional API methods defined in AbstractPlatformAdapter.
"""
from __future__ import annotations
import typing
import telegram
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
from langbot.pkg.platform.adapters.telegram.message_converter import TelegramMessageConverter
class TelegramAPIMixin:
"""Telegram universal API implementation mixin.
Used via multiple inheritance in TelegramAdapter.
Requires self.bot: telegram.Bot and self.config: dict attributes.
"""
bot: telegram.Bot
async def edit_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
new_content: platform_message.MessageChain,
) -> None:
"""Edit a previously sent message."""
components = await TelegramMessageConverter.yiri2target(new_content, self.bot)
for component in components:
if component['type'] == 'text':
text = component['text']
if self.config.get('markdown_card', False):
import telegramify_markdown
text = telegramify_markdown.markdownify(content=text)
args = {
'chat_id': chat_id,
'message_id': message_id,
'text': text,
}
if self.config.get('markdown_card', False):
args['parse_mode'] = 'MarkdownV2'
await self.bot.edit_message_text(**args)
return
async def delete_message(
self,
chat_type: str,
chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
) -> None:
"""Delete / recall a message."""
await self.bot.delete_message(chat_id=chat_id, message_id=message_id)
async def forward_message(
self,
from_chat_type: str,
from_chat_id: typing.Union[int, str],
message_id: typing.Union[int, str],
to_chat_type: str,
to_chat_id: typing.Union[int, str],
) -> platform_events.MessageResult:
"""Forward a message to another chat."""
result = await self.bot.forward_message(
chat_id=to_chat_id,
from_chat_id=from_chat_id,
message_id=message_id,
)
return platform_events.MessageResult(
message_id=result.message_id,
raw={"message_id": result.message_id},
)
async def get_group_info(
self,
group_id: typing.Union[int, str],
) -> platform_entities.UserGroup:
"""Get group information."""
chat = await self.bot.get_chat(chat_id=group_id)
return platform_entities.UserGroup(
id=chat.id,
name=chat.title or "",
description=chat.description or None,
member_count=await self._get_member_count(group_id),
)
async def _get_member_count(self, group_id: typing.Union[int, str]) -> typing.Optional[int]:
"""Get group member count."""
try:
return await self.bot.get_chat_member_count(chat_id=group_id)
except Exception:
return None
async def get_group_member_list(
self,
group_id: typing.Union[int, str],
) -> list[platform_entities.UserGroupMember]:
"""Get group member list.
Note: Telegram Bot API only supports fetching the admin list
(get_chat_administrators), not the full member list.
This method returns the admin list.
"""
admins = await self.bot.get_chat_administrators(chat_id=group_id)
members = []
for admin in admins:
role = platform_entities.MemberRole.MEMBER
if admin.status == 'creator':
role = platform_entities.MemberRole.OWNER
elif admin.status == 'administrator':
role = platform_entities.MemberRole.ADMIN
members.append(platform_entities.UserGroupMember(
user=platform_entities.User(
id=admin.user.id,
nickname=admin.user.first_name or "",
username=admin.user.username,
is_bot=admin.user.is_bot,
),
group_id=group_id,
role=role,
display_name=admin.custom_title if hasattr(admin, 'custom_title') else None,
))
return members
async def get_group_member_info(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> platform_entities.UserGroupMember:
"""Get information about a specific group member."""
member = await self.bot.get_chat_member(chat_id=group_id, user_id=user_id)
role = platform_entities.MemberRole.MEMBER
if member.status == 'creator':
role = platform_entities.MemberRole.OWNER
elif member.status == 'administrator':
role = platform_entities.MemberRole.ADMIN
return platform_entities.UserGroupMember(
user=platform_entities.User(
id=member.user.id,
nickname=member.user.first_name or "",
username=member.user.username,
is_bot=member.user.is_bot,
),
group_id=group_id,
role=role,
display_name=member.custom_title if hasattr(member, 'custom_title') else None,
)
async def get_user_info(
self,
user_id: typing.Union[int, str],
) -> platform_entities.User:
"""Get user information."""
chat = await self.bot.get_chat(chat_id=user_id)
return platform_entities.User(
id=chat.id,
nickname=chat.first_name or "",
username=chat.username,
)
async def upload_file(
self,
file_data: bytes,
filename: str,
) -> str:
"""Upload a file.
Telegram does not support standalone file uploads; files are sent as
part of messages. This method raises NotSupportedError.
"""
from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError
raise NotSupportedError("upload_file")
async def get_file_url(
self,
file_id: str,
) -> str:
"""Get file download URL."""
file = await self.bot.get_file(file_id)
return file.file_path
async def mute_member(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
duration: int = 0,
) -> None:
"""Mute a group member."""
import datetime
permissions = telegram.ChatPermissions(can_send_messages=False)
kwargs = {
'chat_id': group_id,
'user_id': user_id,
'permissions': permissions,
}
if duration > 0:
kwargs['until_date'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=duration)
await self.bot.restrict_chat_member(**kwargs)
async def unmute_member(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> None:
"""Unmute a group member."""
permissions = telegram.ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_other_messages=True,
can_add_web_page_previews=True,
)
await self.bot.restrict_chat_member(
chat_id=group_id,
user_id=user_id,
permissions=permissions,
)
async def kick_member(
self,
group_id: typing.Union[int, str],
user_id: typing.Union[int, str],
) -> None:
"""Kick a member from the group."""
await self.bot.ban_chat_member(chat_id=group_id, user_id=user_id)
async def leave_group(
self,
group_id: typing.Union[int, str],
) -> None:
"""Make the bot leave a group."""
await self.bot.leave_chat(chat_id=group_id)

View File

@@ -0,0 +1,404 @@
"""Telegram event converter (EBA version).
Converts all Telegram Update types to unified EBA events, not just messages.
"""
from __future__ import annotations
import typing
import telegram
from telegram import Update
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.platform.entities as platform_entities
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
from langbot.pkg.platform.adapters.telegram.message_converter import TelegramMessageConverter
def _make_user(tg_user: telegram.User) -> platform_entities.User:
"""Convert a Telegram User to a unified User entity."""
return platform_entities.User(
id=tg_user.id,
nickname=tg_user.first_name or "",
username=tg_user.username,
is_bot=tg_user.is_bot,
)
def _make_user_group(tg_chat: telegram.Chat) -> platform_entities.UserGroup:
"""Convert a Telegram Chat to a unified UserGroup entity."""
return platform_entities.UserGroup(
id=tg_chat.id,
name=tg_chat.title or tg_chat.first_name or "",
description=tg_chat.description if hasattr(tg_chat, 'description') else None,
)
def _chat_type(tg_chat: telegram.Chat) -> platform_entities.ChatType:
"""Map Telegram Chat type to unified ChatType."""
if tg_chat.type == 'private':
return platform_entities.ChatType.PRIVATE
return platform_entities.ChatType.GROUP
class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter):
"""Telegram event converter (EBA version)."""
@staticmethod
async def yiri2target(event: platform_events.Event, bot: telegram.Bot):
"""Convert a unified event to a raw Telegram event (generally not needed)."""
if hasattr(event, 'source_platform_object'):
return event.source_platform_object
return None
@staticmethod
async def target2yiri(
update: Update,
bot: telegram.Bot,
bot_account_id: str,
) -> typing.Optional[platform_events.EBAEvent]:
"""Convert a Telegram Update to a unified EBA event.
Supports: message, edited_message, chat_member, my_chat_member,
callback_query, message_reaction, etc.
Unmappable events are wrapped as PlatformSpecificEvent.
"""
import time
# ---- Message event ----
if update.message and update.message.text is not None or (
update.message and (update.message.photo or update.message.voice or update.message.document)
):
return await TelegramEventConverter._convert_message(update, bot, bot_account_id)
# ---- Edited message event ----
if update.edited_message:
return await TelegramEventConverter._convert_edited_message(update, bot, bot_account_id)
# ---- Member change event (chat_member) ----
if update.chat_member:
return TelegramEventConverter._convert_chat_member(update)
# ---- Bot's own member status change (my_chat_member) ----
if update.my_chat_member:
return TelegramEventConverter._convert_my_chat_member(update)
# ---- Callback query (button clicks, etc.) ----
if update.callback_query:
return platform_events.PlatformSpecificEvent(
type="platform.specific",
timestamp=time.time(),
adapter_name="telegram",
action="callback_query",
data={
"callback_query_id": update.callback_query.id,
"data": update.callback_query.data,
"from_user_id": update.callback_query.from_user.id if update.callback_query.from_user else None,
"message_id": update.callback_query.message.message_id if update.callback_query.message else None,
},
source_platform_object=update,
)
# ---- Message reaction ----
if update.message_reaction:
return TelegramEventConverter._convert_reaction(update)
# ---- Fallback: wrap as PlatformSpecificEvent ----
return platform_events.PlatformSpecificEvent(
type="platform.specific",
timestamp=time.time(),
adapter_name="telegram",
action="unknown_update",
data={"update_id": update.update_id},
source_platform_object=update,
)
@staticmethod
async def _convert_message(
update: Update, bot: telegram.Bot, bot_account_id: str,
) -> platform_events.MessageReceivedEvent:
"""Convert a Telegram message to MessageReceivedEvent."""
message = update.message
lb_message = await TelegramMessageConverter.target2yiri(message, bot, bot_account_id)
sender = _make_user(message.from_user) if message.from_user else platform_entities.User(id="")
chat = message.chat
ct = _chat_type(chat)
group = None
if ct == platform_entities.ChatType.GROUP:
group = _make_user_group(chat)
return platform_events.MessageReceivedEvent(
type="message.received",
timestamp=message.date.timestamp() if message.date else 0.0,
adapter_name="telegram",
message_id=message.message_id,
message_chain=lb_message,
sender=sender,
chat_type=ct,
chat_id=chat.id,
group=group,
source_platform_object=update,
)
@staticmethod
async def _convert_edited_message(
update: Update, bot: telegram.Bot, bot_account_id: str,
) -> platform_events.MessageEditedEvent:
"""Convert a Telegram edited message to MessageEditedEvent."""
message = update.edited_message
lb_message = await TelegramMessageConverter.target2yiri(message, bot, bot_account_id)
editor = _make_user(message.from_user) if message.from_user else platform_entities.User(id="")
chat = message.chat
ct = _chat_type(chat)
group = None
if ct == platform_entities.ChatType.GROUP:
group = _make_user_group(chat)
return platform_events.MessageEditedEvent(
type="message.edited",
timestamp=message.edit_date.timestamp() if message.edit_date else 0.0,
adapter_name="telegram",
message_id=message.message_id,
new_content=lb_message,
editor=editor,
chat_type=ct,
chat_id=chat.id,
group=group,
source_platform_object=update,
)
@staticmethod
def _convert_chat_member(update: Update) -> typing.Optional[platform_events.EBAEvent]:
"""Convert a chat_member update to MemberJoinedEvent / MemberLeftEvent / etc."""
import time
cm = update.chat_member
chat = cm.chat
group = _make_user_group(chat)
member = _make_user(cm.new_chat_member.user) if cm.new_chat_member else platform_entities.User(id="")
inviter = _make_user(cm.from_user) if cm.from_user else None
old_status = cm.old_chat_member.status if cm.old_chat_member else None
new_status = cm.new_chat_member.status if cm.new_chat_member else None
# Member joined
if old_status in (None, 'left', 'kicked') and new_status in ('member', 'administrator', 'creator', 'restricted'):
return platform_events.MemberJoinedEvent(
type="group.member_joined",
timestamp=cm.date.timestamp() if cm.date else time.time(),
adapter_name="telegram",
group=group,
member=member,
inviter=inviter,
join_type="invite" if inviter and inviter.id != member.id else "direct",
source_platform_object=update,
)
# Member left / kicked
if old_status in ('member', 'administrator', 'creator', 'restricted') and new_status in ('left', 'kicked'):
is_kicked = new_status == 'kicked'
return platform_events.MemberLeftEvent(
type="group.member_left",
timestamp=cm.date.timestamp() if cm.date else time.time(),
adapter_name="telegram",
group=group,
member=member,
is_kicked=is_kicked,
operator=inviter if is_kicked else None,
source_platform_object=update,
)
# Member muted (restricted with can_send_messages == False)
if new_status == 'restricted' and cm.new_chat_member:
restricted = cm.new_chat_member
if hasattr(restricted, 'can_send_messages') and not restricted.can_send_messages:
duration = None
if hasattr(restricted, 'until_date') and restricted.until_date:
duration = int(restricted.until_date.timestamp() - time.time())
return platform_events.MemberBannedEvent(
type="group.member_banned",
timestamp=cm.date.timestamp() if cm.date else time.time(),
adapter_name="telegram",
group=group,
member=member,
operator=inviter,
duration=duration,
source_platform_object=update,
)
# Other chat_member changes -> PlatformSpecificEvent
return platform_events.PlatformSpecificEvent(
type="platform.specific",
timestamp=cm.date.timestamp() if cm.date else time.time(),
adapter_name="telegram",
action="chat_member_updated",
data={
"old_status": old_status,
"new_status": new_status,
"chat_id": chat.id,
"user_id": member.id,
},
source_platform_object=update,
)
@staticmethod
def _convert_my_chat_member(update: Update) -> typing.Optional[platform_events.EBAEvent]:
"""Convert a my_chat_member update to bot status events."""
import time
mcm = update.my_chat_member
chat = mcm.chat
group = _make_user_group(chat)
inviter = _make_user(mcm.from_user) if mcm.from_user else None
old_status = mcm.old_chat_member.status if mcm.old_chat_member else None
new_status = mcm.new_chat_member.status if mcm.new_chat_member else None
# Bot invited to group
if old_status in (None, 'left', 'kicked') and new_status in ('member', 'administrator'):
return platform_events.BotInvitedToGroupEvent(
type="bot.invited_to_group",
timestamp=mcm.date.timestamp() if mcm.date else time.time(),
adapter_name="telegram",
group=group,
inviter=inviter,
source_platform_object=update,
)
# Bot removed from group
if old_status in ('member', 'administrator', 'creator') and new_status in ('left', 'kicked'):
return platform_events.BotRemovedFromGroupEvent(
type="bot.removed_from_group",
timestamp=mcm.date.timestamp() if mcm.date else time.time(),
adapter_name="telegram",
group=group,
operator=inviter,
source_platform_object=update,
)
# Bot muted
if new_status == 'restricted' and mcm.new_chat_member:
restricted = mcm.new_chat_member
if hasattr(restricted, 'can_send_messages') and not restricted.can_send_messages:
duration = None
if hasattr(restricted, 'until_date') and restricted.until_date:
duration = int(restricted.until_date.timestamp() - time.time())
return platform_events.BotMutedEvent(
type="bot.muted",
timestamp=mcm.date.timestamp() if mcm.date else time.time(),
adapter_name="telegram",
group=group,
operator=inviter,
duration=duration,
source_platform_object=update,
)
return platform_events.PlatformSpecificEvent(
type="platform.specific",
timestamp=mcm.date.timestamp() if mcm.date else time.time(),
adapter_name="telegram",
action="my_chat_member_updated",
data={
"old_status": old_status,
"new_status": new_status,
"chat_id": chat.id,
},
source_platform_object=update,
)
@staticmethod
def _convert_reaction(update: Update) -> platform_events.MessageReactionEvent:
"""Convert a Telegram message_reaction to MessageReactionEvent."""
import time
reaction = update.message_reaction
chat = reaction.chat
# Extract newly added emojis
new_emojis = []
if reaction.new_reaction:
for r in reaction.new_reaction:
if hasattr(r, 'emoji'):
new_emojis.append(r.emoji)
elif hasattr(r, 'custom_emoji_id'):
new_emojis.append(str(r.custom_emoji_id))
user = platform_entities.User(id="")
if reaction.user:
user = _make_user(reaction.user)
ct = _chat_type(chat)
group = _make_user_group(chat) if ct == platform_entities.ChatType.GROUP else None
return platform_events.MessageReactionEvent(
type="message.reaction",
timestamp=reaction.date.timestamp() if reaction.date else time.time(),
adapter_name="telegram",
message_id=reaction.message_id,
user=user,
reaction=new_emojis[0] if new_emojis else "",
is_add=len(new_emojis) > 0,
chat_type=ct,
chat_id=chat.id,
group=group,
source_platform_object=update,
)
class LegacyEventConverter(abstract_platform_adapter.AbstractEventConverter):
"""Legacy event converter (compatibility layer).
Converts Telegram Updates to the old FriendMessage / GroupMessage format.
Used during the transition period to maintain backward compatibility.
"""
@staticmethod
async def yiri2target(event: platform_events.MessageEvent, bot: telegram.Bot):
return event.source_platform_object
@staticmethod
async def target2yiri(event: Update, bot: telegram.Bot, bot_account_id: str):
"""Convert to legacy format (FriendMessage / GroupMessage)."""
import langbot_plugin.api.entities.builtin.platform.events as legacy_events
import langbot_plugin.api.entities.builtin.platform.entities as legacy_entities
if not event.message:
return None
lb_message = await TelegramMessageConverter.target2yiri(event.message, bot, bot_account_id)
if event.effective_chat.type == 'private':
return legacy_events.FriendMessage(
sender=legacy_entities.Friend(
id=event.effective_chat.id,
nickname=event.effective_chat.first_name,
remark=str(event.effective_chat.id),
),
message_chain=lb_message,
time=event.message.date.timestamp(),
source_platform_object=event,
)
else:
return legacy_events.GroupMessage(
sender=legacy_entities.GroupMember(
id=event.effective_chat.id,
member_name=event.effective_chat.title,
permission=legacy_entities.Permission.Member,
group=legacy_entities.Group(
id=event.effective_chat.id,
name=event.effective_chat.title,
permission=legacy_entities.Permission.Member,
),
special_title='',
),
message_chain=lb_message,
time=event.message.date.timestamp(),
source_platform_object=event,
)

View File

@@ -0,0 +1,97 @@
apiVersion: v1
kind: MessagePlatformAdapter
metadata:
name: telegram-eba
label:
en_US: Telegram (EBA)
zh_Hans: 电报 (EBA)
description:
en_US: Telegram Bot adapter (EBA architecture)
zh_Hans: 电报 Bot 适配器EBA 架构版本)
icon: telegram.svg
spec:
config:
- name: token
label:
en_US: Token
zh_Hans: 令牌
type: string
required: true
default: ""
- name: markdown_card
label:
en_US: Markdown Card
zh_Hans: 是否使用 Markdown 卡片
type: boolean
required: false
default: true
- name: enable-stream-reply
label:
en_US: Enable Stream Reply Mode
zh_Hans: 启用电报流式回复模式
description:
en_US: If enabled, the bot will use the stream of telegram reply mode
zh_Hans: 如果启用,将使用电报流式方式来回复内容
type: boolean
required: true
default: false
supported_events:
- message.received
- message.edited
- message.deleted
- message.reaction
- group.member_joined
- group.member_left
- group.member_banned
- group.info_updated
- bot.invited_to_group
- bot.removed_from_group
supported_apis:
required:
- send_message
- reply_message
optional:
- edit_message
- delete_message
- forward_message
- get_group_info
- get_group_member_list
- get_group_member_info
- get_user_info
- get_file_url
- mute_member
- unmute_member
- kick_member
- leave_group
- call_platform_api
platform_specific_apis:
- action: pin_message
description: { en_US: "Pin a message", zh_Hans: "置顶消息" }
- action: unpin_message
description: { en_US: "Unpin a message", zh_Hans: "取消置顶" }
- action: unpin_all_messages
description: { en_US: "Unpin all messages", zh_Hans: "取消所有置顶" }
- action: get_chat_administrators
description: { en_US: "Get chat admins", zh_Hans: "获取群管理员列表" }
- action: set_chat_title
description: { en_US: "Set chat title", zh_Hans: "修改群名称" }
- action: set_chat_description
description: { en_US: "Set chat description", zh_Hans: "修改群描述" }
- action: get_chat_member_count
description: { en_US: "Get member count", zh_Hans: "获取群成员数量" }
- action: send_chat_action
description: { en_US: "Send chat action (typing, etc.)", zh_Hans: "发送聊天动作" }
- action: create_chat_invite_link
description: { en_US: "Create invite link", zh_Hans: "创建邀请链接" }
- action: answer_callback_query
description: { en_US: "Answer callback query", zh_Hans: "应答回调查询" }
execution:
python:
path: pkg/platform/adapters/telegram/adapter.py
attr: TelegramAdapter

View File

@@ -0,0 +1,143 @@
"""Telegram message chain converter.
Migrated from the original sources/telegram.py TelegramMessageConverter. Logic unchanged.
"""
from __future__ import annotations
import base64
import typing
import telegram
from langbot.pkg.utils import httpclient
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
import langbot_plugin.api.entities.builtin.platform.message as platform_message
class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
@staticmethod
async def yiri2target(message_chain: platform_message.MessageChain, bot: telegram.Bot) -> list[dict]:
"""Convert a LangBot MessageChain to a list of Telegram-sendable components."""
components = []
for component in message_chain:
if isinstance(component, platform_message.Plain):
components.append({'type': 'text', 'text': component.text})
elif isinstance(component, platform_message.Image):
photo_bytes = None
if component.base64:
photo_bytes = base64.b64decode(component.base64)
elif component.url:
session = httpclient.get_session()
async with session.get(component.url) as response:
photo_bytes = await response.read()
elif component.path:
with open(component.path, 'rb') as f:
photo_bytes = f.read()
components.append({'type': 'photo', 'photo': photo_bytes})
elif isinstance(component, platform_message.File):
file_bytes = None
if component.base64:
b64_data = component.base64
if ';base64,' in b64_data:
b64_data = b64_data.split(';base64,', 1)[1]
file_bytes = base64.b64decode(b64_data)
elif component.url:
session = httpclient.get_session()
async with session.get(component.url) as response:
file_bytes = await response.read()
elif component.path:
with open(component.path, 'rb') as f:
file_bytes = f.read()
file_name = getattr(component, 'name', None) or 'file'
components.append({'type': 'document', 'document': file_bytes, 'filename': file_name})
elif isinstance(component, platform_message.Forward):
for node in component.node_list:
components.extend(await TelegramMessageConverter.yiri2target(node.message_chain, bot))
return components
@staticmethod
async def target2yiri(message: telegram.Message, bot: telegram.Bot, bot_account_id: str):
"""Convert a Telegram Message to a LangBot MessageChain."""
message_components = []
def parse_message_text(text: str) -> list[platform_message.MessageComponent]:
msg_components = []
if f'@{bot_account_id}' in text:
msg_components.append(platform_message.At(target=bot_account_id))
text = text.replace(f'@{bot_account_id}', '')
msg_components.append(platform_message.Plain(text=text))
return msg_components
if message.text:
message_text = message.text
message_components.extend(parse_message_text(message_text))
if message.photo:
if message.caption:
message_components.extend(parse_message_text(message.caption))
file = await message.photo[-1].get_file()
file_bytes = None
file_format = ''
async with httpclient.get_session(trust_env=True).get(file.file_path) as response:
file_bytes = await response.read()
file_format = 'image/jpeg'
message_components.append(
platform_message.Image(
base64=f'data:{file_format};base64,{base64.b64encode(file_bytes).decode("utf-8")}'
)
)
if message.voice:
if message.caption:
message_components.extend(parse_message_text(message.caption))
file = await message.voice.get_file()
file_bytes = None
file_format = message.voice.mime_type or 'audio/ogg'
async with httpclient.get_session(trust_env=True).get(file.file_path) as response:
file_bytes = await response.read()
message_components.append(
platform_message.Voice(
base64=f'data:{file_format};base64,{base64.b64encode(file_bytes).decode("utf-8")}',
length=message.voice.duration,
)
)
if message.document:
if message.caption:
message_components.extend(parse_message_text(message.caption))
file = await message.document.get_file()
file_name = message.document.file_name or 'document'
file_size = message.document.file_size or 0
file_format = message.document.mime_type or 'application/octet-stream'
file_bytes = None
async with httpclient.get_session(trust_env=True).get(file.file_path) as response:
file_bytes = await response.read()
message_components.append(
platform_message.File(
name=file_name,
size=file_size,
base64=f'data:{file_format};base64,{base64.b64encode(file_bytes).decode("utf-8")}',
)
)
return platform_message.MessageChain(message_components)

View File

@@ -0,0 +1,125 @@
"""Telegram platform-specific API dispatch table for call_platform_api."""
from __future__ import annotations
import typing
import telegram
async def pin_message(bot: telegram.Bot, params: dict) -> dict:
"""Pin a message in a chat."""
await bot.pin_chat_message(
chat_id=params['chat_id'],
message_id=params['message_id'],
disable_notification=params.get('disable_notification', False),
)
return {"ok": True}
async def unpin_message(bot: telegram.Bot, params: dict) -> dict:
"""Unpin a message in a chat."""
await bot.unpin_chat_message(
chat_id=params['chat_id'],
message_id=params.get('message_id'),
)
return {"ok": True}
async def unpin_all_messages(bot: telegram.Bot, params: dict) -> dict:
"""Unpin all messages in a chat."""
await bot.unpin_all_chat_messages(chat_id=params['chat_id'])
return {"ok": True}
async def get_chat_administrators(bot: telegram.Bot, params: dict) -> dict:
"""Get chat administrator list."""
admins = await bot.get_chat_administrators(chat_id=params['chat_id'])
return {
"administrators": [
{
"user_id": a.user.id,
"username": a.user.username,
"first_name": a.user.first_name,
"status": a.status,
"custom_title": getattr(a, 'custom_title', None),
}
for a in admins
]
}
async def set_chat_title(bot: telegram.Bot, params: dict) -> dict:
"""Set chat title."""
await bot.set_chat_title(
chat_id=params['chat_id'],
title=params['title'],
)
return {"ok": True}
async def set_chat_description(bot: telegram.Bot, params: dict) -> dict:
"""Set chat description."""
await bot.set_chat_description(
chat_id=params['chat_id'],
description=params.get('description', ''),
)
return {"ok": True}
async def get_chat_member_count(bot: telegram.Bot, params: dict) -> dict:
"""Get chat member count."""
count = await bot.get_chat_member_count(chat_id=params['chat_id'])
return {"count": count}
async def send_chat_action(bot: telegram.Bot, params: dict) -> dict:
"""Send a chat action (e.g. typing)."""
await bot.send_chat_action(
chat_id=params['chat_id'],
action=params.get('action', 'typing'),
)
return {"ok": True}
async def create_chat_invite_link(bot: telegram.Bot, params: dict) -> dict:
"""Create a chat invite link."""
link = await bot.create_chat_invite_link(
chat_id=params['chat_id'],
name=params.get('name'),
expire_date=params.get('expire_date'),
member_limit=params.get('member_limit'),
)
return {
"invite_link": link.invite_link,
"name": link.name,
"is_primary": link.is_primary,
"is_revoked": link.is_revoked,
}
async def answer_callback_query(bot: telegram.Bot, params: dict) -> dict:
"""Answer a callback query."""
await bot.answer_callback_query(
callback_query_id=params['callback_query_id'],
text=params.get('text'),
show_alert=params.get('show_alert', False),
url=params.get('url'),
)
return {"ok": True}
# ---- Action dispatch table ----
PLATFORM_API_MAP: dict[str, typing.Callable[[telegram.Bot, dict], typing.Awaitable[dict]]] = {
"pin_message": pin_message,
"unpin_message": unpin_message,
"unpin_all_messages": unpin_all_messages,
"get_chat_administrators": get_chat_administrators,
"set_chat_title": set_chat_title,
"set_chat_description": set_chat_description,
"get_chat_member_count": get_chat_member_count,
"send_chat_action": send_chat_action,
"create_chat_invite_link": create_chat_invite_link,
"answer_callback_query": answer_callback_query,
}

View File

@@ -0,0 +1,13 @@
"""Telegram platform-specific type definitions."""
from __future__ import annotations
from enum import Enum
class TelegramChatType(str, Enum):
"""Telegram chat type."""
PRIVATE = "private"
GROUP = "group"
SUPERGROUP = "supergroup"
CHANNEL = "channel"