diff --git a/src/langbot/pkg/platform/adapters/__init__.py b/src/langbot/pkg/platform/adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/langbot/pkg/platform/adapters/telegram/__init__.py b/src/langbot/pkg/platform/adapters/telegram/__init__.py new file mode 100644 index 00000000..f4d2d73d --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/__init__.py @@ -0,0 +1,3 @@ +from langbot.pkg.platform.adapters.telegram.adapter import TelegramAdapter + +__all__ = ["TelegramAdapter"] diff --git a/src/langbot/pkg/platform/adapters/telegram/adapter.py b/src/langbot/pkg/platform/adapters/telegram/adapter.py new file mode 100644 index 00000000..074bbf53 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/adapter.py @@ -0,0 +1,387 @@ +"""Telegram adapter main class (EBA version). + +Inherits AbstractPlatformAdapter, integrating all modules. +Preserves all existing functionality (messaging, streaming output, markdown card, forum topics, etc.). +""" + +from __future__ import annotations + +import time +import typing +import traceback + +import telegram +import telegram.ext +from telegram import Update +from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters +import telegramify_markdown +import pydantic + +from langbot.pkg.utils import httpclient +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.entities.builtin.platform.message as platform_message +import langbot_plugin.api.entities.builtin.platform.events as platform_events +import langbot_plugin.api.entities.builtin.platform.entities as platform_entities +import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger + +from langbot.pkg.platform.adapters.telegram.message_converter import TelegramMessageConverter +from langbot.pkg.platform.adapters.telegram.event_converter import TelegramEventConverter, LegacyEventConverter +from langbot.pkg.platform.adapters.telegram.api_impl import TelegramAPIMixin +from langbot.pkg.platform.adapters.telegram.platform_api import PLATFORM_API_MAP + + +class TelegramAdapter(TelegramAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter): + """Telegram adapter (EBA version).""" + + bot: telegram.Bot = pydantic.Field(exclude=True) + application: telegram.ext.Application = pydantic.Field(exclude=True) + + message_converter: TelegramMessageConverter = TelegramMessageConverter() + event_converter: TelegramEventConverter = TelegramEventConverter() + legacy_event_converter: LegacyEventConverter = LegacyEventConverter() + + config: dict + + msg_stream_id: dict + """Stream message ID map. Key: stream message ID, value: first message source ID.""" + + seq: int + """Sequence number for message ordering.""" + + listeners: typing.Dict[ + typing.Type[platform_events.Event], + typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None], + ] = {} + + class Config: + arbitrary_types_allowed = True + + def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger): + async def telegram_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): + if not update.message and not update.edited_message and not update.chat_member \ + and not update.my_chat_member and not update.callback_query and not update.message_reaction: + return + + # Skip messages from the bot itself + if update.message and update.message.from_user and update.message.from_user.is_bot: + return + + try: + # Legacy event type callbacks (compat with existing botmgr FriendMessage / GroupMessage listeners) + if update.message and (platform_events.FriendMessage in self.listeners + or platform_events.GroupMessage in self.listeners): + legacy_event = await self.legacy_event_converter.target2yiri(update, self.bot, self.bot_account_id) + if legacy_event and type(legacy_event) in self.listeners: + await self.listeners[type(legacy_event)](legacy_event, self) + + # EBA wildcard event callback (Event base class registered as wildcard) + if platform_events.Event in self.listeners: + eba_event = await self.event_converter.target2yiri(update, self.bot, self.bot_account_id) + if eba_event: + await self.listeners[platform_events.Event](eba_event, self) + + # EBA specific event type callback + if platform_events.EBAEvent in self.listeners: + eba_event = await self.event_converter.target2yiri(update, self.bot, self.bot_account_id) + if eba_event: + await self.listeners[platform_events.EBAEvent](eba_event, self) + + except Exception: + await self.logger.error(f'Error in telegram callback: {traceback.format_exc()}') + + application = ApplicationBuilder().token(config['token']).build() + bot = application.bot + + # Register handler for all common update types + application.add_handler( + MessageHandler( + filters.TEXT | (filters.COMMAND) | filters.PHOTO | filters.VOICE | filters.Document.ALL, + telegram_callback, + ) + ) + # Register edited message handler + application.add_handler( + MessageHandler( + filters.UpdateType.EDITED_MESSAGE, + telegram_callback, + ) + ) + + super().__init__( + config=config, + logger=logger, + msg_stream_id={}, + seq=1, + bot=bot, + application=application, + bot_account_id='', + listeners={}, + ) + + # ---- Capability Declaration ---- + + def get_supported_events(self) -> list[str]: + return [ + "message.received", + "message.edited", + "message.deleted", + "message.reaction", + "group.member_joined", + "group.member_left", + "group.member_banned", + "group.info_updated", + "bot.invited_to_group", + "bot.removed_from_group", + ] + + def get_supported_apis(self) -> list[str]: + return [ + "send_message", + "reply_message", + "edit_message", + "delete_message", + "forward_message", + "get_group_info", + "get_group_member_list", + "get_group_member_info", + "get_user_info", + "get_file_url", + "mute_member", + "unmute_member", + "kick_member", + "leave_group", + "call_platform_api", + ] + + # ---- Message Send / Reply (preserving original logic) ---- + + async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain): + components = await TelegramMessageConverter.yiri2target(message, self.bot) + + chat_id_str, _, thread_id_str = str(target_id).partition('#') + chat_id: int | str = int(chat_id_str) if chat_id_str.lstrip('-').isdigit() else chat_id_str + message_thread_id = int(thread_id_str) if thread_id_str and thread_id_str.isdigit() else None + + for component in components: + component_type = component.get('type') + args = {'chat_id': chat_id} + if message_thread_id is not None: + args['message_thread_id'] = message_thread_id + + if component_type == 'text': + text = component.get('text', '') + if self.config['markdown_card'] is True: + text = telegramify_markdown.markdownify(content=text) + args['parse_mode'] = 'MarkdownV2' + args['text'] = text + await self.bot.send_message(**args) + elif component_type == 'photo': + photo = component.get('photo') + if photo is None: + continue + args['photo'] = telegram.InputFile(photo) + await self.bot.send_photo(**args) + elif component_type == 'document': + doc = component.get('document') + if doc is None: + continue + filename = component.get('filename', 'file') + args['document'] = telegram.InputFile(doc, filename=filename) + await self.bot.send_document(**args) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ): + assert isinstance(message_source.source_platform_object, Update) + components = await TelegramMessageConverter.yiri2target(message, self.bot) + + for component in components: + if component['type'] == 'text': + if self.config['markdown_card'] is True: + content = telegramify_markdown.markdownify( + content=component['text'], + ) + else: + content = component['text'] + args = { + 'chat_id': message_source.source_platform_object.effective_chat.id, + 'text': content, + } + if self.config['markdown_card'] is True: + args['parse_mode'] = 'MarkdownV2' + + if message_source.source_platform_object.message.message_thread_id: + args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id + + if quote_origin: + args['reply_to_message_id'] = message_source.source_platform_object.message.id + + await self.bot.send_message(**args) + + # ---- Streaming Output (preserving original logic) ---- + + def _process_markdown(self, text: str) -> str: + if self.config.get('markdown_card', False): + return telegramify_markdown.markdownify(content=text) + return text + + def _build_message_args(self, chat_id: int, text: str, message_thread_id: int = None, **extra_args) -> dict: + args = {'chat_id': chat_id, 'text': self._process_markdown(text), **extra_args} + if message_thread_id: + args['message_thread_id'] = message_thread_id + if self.config.get('markdown_card', False): + args['parse_mode'] = 'MarkdownV2' + return args + + async def create_message_card(self, message_id, event): + assert isinstance(event.source_platform_object, Update) + update = event.source_platform_object + chat_id = update.effective_chat.id + chat_type = update.effective_chat.type + message_thread_id = update.message.message_thread_id + + if chat_type == 'private': + draft_id = int(time.time() * 1000) + self.msg_stream_id[message_id] = ('private', draft_id) + + args = self._build_message_args(chat_id, 'Thinking...', message_thread_id, draft_id=draft_id) + await self.bot.send_message_draft(**args) + else: + args = self._build_message_args(chat_id, 'Thinking...', message_thread_id) + send_msg = await self.bot.send_message(**args) + self.msg_stream_id[message_id] = ('group', send_msg.message_id) + + return True + + async def reply_message_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message, + message: platform_message.MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ): + message_id = bot_message.resp_message_id + msg_seq = bot_message.msg_sequence + assert isinstance(message_source.source_platform_object, Update) + update = message_source.source_platform_object + chat_id = update.effective_chat.id + message_thread_id = update.message.message_thread_id + + if message_id not in self.msg_stream_id: + return + + chat_mode, draft_id = self.msg_stream_id[message_id] + components = await TelegramMessageConverter.yiri2target(message, self.bot) + + if not components or components[0]['type'] != 'text': + if is_final and bot_message.tool_calls is None: + self.msg_stream_id.pop(message_id) + return + + content = components[0]['text'] + + if chat_mode == 'private': + args = self._build_message_args(chat_id, content, message_thread_id, draft_id=draft_id) + await self.bot.send_message_draft(**args) + if is_final and bot_message.tool_calls is None: + del args['draft_id'] + await self.bot.send_message(**args) + self.msg_stream_id.pop(message_id) + else: + stream_id = draft_id + if (msg_seq - 1) % 8 == 0 or is_final: + args = { + 'message_id': stream_id, + 'chat_id': chat_id, + 'text': self._process_markdown(content), + } + if self.config.get('markdown_card', False): + args['parse_mode'] = 'MarkdownV2' + await self.bot.edit_message_text(**args) + + if is_final and bot_message.tool_calls is None: + self.msg_stream_id.pop(message_id) + + # ---- Forum Topic / Custom launcher_id (preserving original logic) ---- + + def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: + if not isinstance(event.source_platform_object, Update): + return None + + message = event.source_platform_object.message + if not message: + return None + + if message.message_thread_id: + if isinstance(event, platform_events.GroupMessage): + return f'{event.group.id}#{message.message_thread_id}' + elif isinstance(event, platform_events.FriendMessage): + return f'{event.sender.id}#{message.message_thread_id}' + + return None + + # ---- Stream Output Support Check ---- + + async def is_stream_output_supported(self) -> bool: + is_stream = False + if self.config.get('enable-stream-reply', None): + is_stream = True + return is_stream + + async def is_muted(self, group_id: int) -> bool: + return False + + # ---- Event Listeners ---- + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + self.listeners[event_type] = callback + + def unregister_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + self.listeners.pop(event_type, None) + + # ---- Pass-through API ---- + + async def call_platform_api( + self, + action: str, + params: dict = {}, + ) -> dict: + """Call a Telegram-specific platform API.""" + handler = PLATFORM_API_MAP.get(action) + if handler is None: + from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + raise NotSupportedError(f"call_platform_api:{action}") + return await handler(self.bot, params) + + # ---- Lifecycle ---- + + async def run_async(self): + await self.application.initialize() + self.bot_account_id = (await self.bot.get_me()).username + await self.application.updater.start_polling(allowed_updates=Update.ALL_TYPES) + await self.application.start() + await self.logger.info('Telegram adapter running') + + async def kill(self) -> bool: + if self.application.running: + await self.application.stop() + if self.application.updater: + await self.application.updater.stop() + await self.logger.info('Telegram adapter stopped') + return True diff --git a/src/langbot/pkg/platform/adapters/telegram/api_impl.py b/src/langbot/pkg/platform/adapters/telegram/api_impl.py new file mode 100644 index 00000000..30a9b924 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/api_impl.py @@ -0,0 +1,242 @@ +"""Telegram universal API implementation (EBA version). + +Implements optional API methods defined in AbstractPlatformAdapter. +""" + +from __future__ import annotations + +import typing + +import telegram + +import langbot_plugin.api.entities.builtin.platform.entities as platform_entities +import langbot_plugin.api.entities.builtin.platform.message as platform_message +import langbot_plugin.api.entities.builtin.platform.events as platform_events + +from langbot.pkg.platform.adapters.telegram.message_converter import TelegramMessageConverter + + +class TelegramAPIMixin: + """Telegram universal API implementation mixin. + + Used via multiple inheritance in TelegramAdapter. + Requires self.bot: telegram.Bot and self.config: dict attributes. + """ + + bot: telegram.Bot + + async def edit_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + new_content: platform_message.MessageChain, + ) -> None: + """Edit a previously sent message.""" + components = await TelegramMessageConverter.yiri2target(new_content, self.bot) + + for component in components: + if component['type'] == 'text': + text = component['text'] + if self.config.get('markdown_card', False): + import telegramify_markdown + text = telegramify_markdown.markdownify(content=text) + args = { + 'chat_id': chat_id, + 'message_id': message_id, + 'text': text, + } + if self.config.get('markdown_card', False): + args['parse_mode'] = 'MarkdownV2' + await self.bot.edit_message_text(**args) + return + + async def delete_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> None: + """Delete / recall a message.""" + await self.bot.delete_message(chat_id=chat_id, message_id=message_id) + + async def forward_message( + self, + from_chat_type: str, + from_chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + to_chat_type: str, + to_chat_id: typing.Union[int, str], + ) -> platform_events.MessageResult: + """Forward a message to another chat.""" + result = await self.bot.forward_message( + chat_id=to_chat_id, + from_chat_id=from_chat_id, + message_id=message_id, + ) + return platform_events.MessageResult( + message_id=result.message_id, + raw={"message_id": result.message_id}, + ) + + async def get_group_info( + self, + group_id: typing.Union[int, str], + ) -> platform_entities.UserGroup: + """Get group information.""" + chat = await self.bot.get_chat(chat_id=group_id) + return platform_entities.UserGroup( + id=chat.id, + name=chat.title or "", + description=chat.description or None, + member_count=await self._get_member_count(group_id), + ) + + async def _get_member_count(self, group_id: typing.Union[int, str]) -> typing.Optional[int]: + """Get group member count.""" + try: + return await self.bot.get_chat_member_count(chat_id=group_id) + except Exception: + return None + + async def get_group_member_list( + self, + group_id: typing.Union[int, str], + ) -> list[platform_entities.UserGroupMember]: + """Get group member list. + + Note: Telegram Bot API only supports fetching the admin list + (get_chat_administrators), not the full member list. + This method returns the admin list. + """ + admins = await self.bot.get_chat_administrators(chat_id=group_id) + members = [] + for admin in admins: + role = platform_entities.MemberRole.MEMBER + if admin.status == 'creator': + role = platform_entities.MemberRole.OWNER + elif admin.status == 'administrator': + role = platform_entities.MemberRole.ADMIN + + members.append(platform_entities.UserGroupMember( + user=platform_entities.User( + id=admin.user.id, + nickname=admin.user.first_name or "", + username=admin.user.username, + is_bot=admin.user.is_bot, + ), + group_id=group_id, + role=role, + display_name=admin.custom_title if hasattr(admin, 'custom_title') else None, + )) + return members + + async def get_group_member_info( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> platform_entities.UserGroupMember: + """Get information about a specific group member.""" + member = await self.bot.get_chat_member(chat_id=group_id, user_id=user_id) + + role = platform_entities.MemberRole.MEMBER + if member.status == 'creator': + role = platform_entities.MemberRole.OWNER + elif member.status == 'administrator': + role = platform_entities.MemberRole.ADMIN + + return platform_entities.UserGroupMember( + user=platform_entities.User( + id=member.user.id, + nickname=member.user.first_name or "", + username=member.user.username, + is_bot=member.user.is_bot, + ), + group_id=group_id, + role=role, + display_name=member.custom_title if hasattr(member, 'custom_title') else None, + ) + + async def get_user_info( + self, + user_id: typing.Union[int, str], + ) -> platform_entities.User: + """Get user information.""" + chat = await self.bot.get_chat(chat_id=user_id) + return platform_entities.User( + id=chat.id, + nickname=chat.first_name or "", + username=chat.username, + ) + + async def upload_file( + self, + file_data: bytes, + filename: str, + ) -> str: + """Upload a file. + + Telegram does not support standalone file uploads; files are sent as + part of messages. This method raises NotSupportedError. + """ + from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + raise NotSupportedError("upload_file") + + async def get_file_url( + self, + file_id: str, + ) -> str: + """Get file download URL.""" + file = await self.bot.get_file(file_id) + return file.file_path + + async def mute_member( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + duration: int = 0, + ) -> None: + """Mute a group member.""" + import datetime + permissions = telegram.ChatPermissions(can_send_messages=False) + kwargs = { + 'chat_id': group_id, + 'user_id': user_id, + 'permissions': permissions, + } + if duration > 0: + kwargs['until_date'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=duration) + await self.bot.restrict_chat_member(**kwargs) + + async def unmute_member( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> None: + """Unmute a group member.""" + permissions = telegram.ChatPermissions( + can_send_messages=True, + can_send_media_messages=True, + can_send_other_messages=True, + can_add_web_page_previews=True, + ) + await self.bot.restrict_chat_member( + chat_id=group_id, + user_id=user_id, + permissions=permissions, + ) + + async def kick_member( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> None: + """Kick a member from the group.""" + await self.bot.ban_chat_member(chat_id=group_id, user_id=user_id) + + async def leave_group( + self, + group_id: typing.Union[int, str], + ) -> None: + """Make the bot leave a group.""" + await self.bot.leave_chat(chat_id=group_id) diff --git a/src/langbot/pkg/platform/adapters/telegram/event_converter.py b/src/langbot/pkg/platform/adapters/telegram/event_converter.py new file mode 100644 index 00000000..41ff40e6 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/event_converter.py @@ -0,0 +1,404 @@ +"""Telegram event converter (EBA version). + +Converts all Telegram Update types to unified EBA events, not just messages. +""" + +from __future__ import annotations + +import typing + +import telegram +from telegram import Update + +import langbot_plugin.api.entities.builtin.platform.events as platform_events +import langbot_plugin.api.entities.builtin.platform.entities as platform_entities +import langbot_plugin.api.entities.builtin.platform.message as platform_message +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter + +from langbot.pkg.platform.adapters.telegram.message_converter import TelegramMessageConverter + + +def _make_user(tg_user: telegram.User) -> platform_entities.User: + """Convert a Telegram User to a unified User entity.""" + return platform_entities.User( + id=tg_user.id, + nickname=tg_user.first_name or "", + username=tg_user.username, + is_bot=tg_user.is_bot, + ) + + +def _make_user_group(tg_chat: telegram.Chat) -> platform_entities.UserGroup: + """Convert a Telegram Chat to a unified UserGroup entity.""" + return platform_entities.UserGroup( + id=tg_chat.id, + name=tg_chat.title or tg_chat.first_name or "", + description=tg_chat.description if hasattr(tg_chat, 'description') else None, + ) + + +def _chat_type(tg_chat: telegram.Chat) -> platform_entities.ChatType: + """Map Telegram Chat type to unified ChatType.""" + if tg_chat.type == 'private': + return platform_entities.ChatType.PRIVATE + return platform_entities.ChatType.GROUP + + +class TelegramEventConverter(abstract_platform_adapter.AbstractEventConverter): + """Telegram event converter (EBA version).""" + + @staticmethod + async def yiri2target(event: platform_events.Event, bot: telegram.Bot): + """Convert a unified event to a raw Telegram event (generally not needed).""" + if hasattr(event, 'source_platform_object'): + return event.source_platform_object + return None + + @staticmethod + async def target2yiri( + update: Update, + bot: telegram.Bot, + bot_account_id: str, + ) -> typing.Optional[platform_events.EBAEvent]: + """Convert a Telegram Update to a unified EBA event. + + Supports: message, edited_message, chat_member, my_chat_member, + callback_query, message_reaction, etc. + Unmappable events are wrapped as PlatformSpecificEvent. + """ + import time + + # ---- Message event ---- + if update.message and update.message.text is not None or ( + update.message and (update.message.photo or update.message.voice or update.message.document) + ): + return await TelegramEventConverter._convert_message(update, bot, bot_account_id) + + # ---- Edited message event ---- + if update.edited_message: + return await TelegramEventConverter._convert_edited_message(update, bot, bot_account_id) + + # ---- Member change event (chat_member) ---- + if update.chat_member: + return TelegramEventConverter._convert_chat_member(update) + + # ---- Bot's own member status change (my_chat_member) ---- + if update.my_chat_member: + return TelegramEventConverter._convert_my_chat_member(update) + + # ---- Callback query (button clicks, etc.) ---- + if update.callback_query: + return platform_events.PlatformSpecificEvent( + type="platform.specific", + timestamp=time.time(), + adapter_name="telegram", + action="callback_query", + data={ + "callback_query_id": update.callback_query.id, + "data": update.callback_query.data, + "from_user_id": update.callback_query.from_user.id if update.callback_query.from_user else None, + "message_id": update.callback_query.message.message_id if update.callback_query.message else None, + }, + source_platform_object=update, + ) + + # ---- Message reaction ---- + if update.message_reaction: + return TelegramEventConverter._convert_reaction(update) + + # ---- Fallback: wrap as PlatformSpecificEvent ---- + return platform_events.PlatformSpecificEvent( + type="platform.specific", + timestamp=time.time(), + adapter_name="telegram", + action="unknown_update", + data={"update_id": update.update_id}, + source_platform_object=update, + ) + + @staticmethod + async def _convert_message( + update: Update, bot: telegram.Bot, bot_account_id: str, + ) -> platform_events.MessageReceivedEvent: + """Convert a Telegram message to MessageReceivedEvent.""" + message = update.message + lb_message = await TelegramMessageConverter.target2yiri(message, bot, bot_account_id) + + sender = _make_user(message.from_user) if message.from_user else platform_entities.User(id="") + chat = message.chat + ct = _chat_type(chat) + + group = None + if ct == platform_entities.ChatType.GROUP: + group = _make_user_group(chat) + + return platform_events.MessageReceivedEvent( + type="message.received", + timestamp=message.date.timestamp() if message.date else 0.0, + adapter_name="telegram", + message_id=message.message_id, + message_chain=lb_message, + sender=sender, + chat_type=ct, + chat_id=chat.id, + group=group, + source_platform_object=update, + ) + + @staticmethod + async def _convert_edited_message( + update: Update, bot: telegram.Bot, bot_account_id: str, + ) -> platform_events.MessageEditedEvent: + """Convert a Telegram edited message to MessageEditedEvent.""" + message = update.edited_message + lb_message = await TelegramMessageConverter.target2yiri(message, bot, bot_account_id) + + editor = _make_user(message.from_user) if message.from_user else platform_entities.User(id="") + chat = message.chat + ct = _chat_type(chat) + + group = None + if ct == platform_entities.ChatType.GROUP: + group = _make_user_group(chat) + + return platform_events.MessageEditedEvent( + type="message.edited", + timestamp=message.edit_date.timestamp() if message.edit_date else 0.0, + adapter_name="telegram", + message_id=message.message_id, + new_content=lb_message, + editor=editor, + chat_type=ct, + chat_id=chat.id, + group=group, + source_platform_object=update, + ) + + @staticmethod + def _convert_chat_member(update: Update) -> typing.Optional[platform_events.EBAEvent]: + """Convert a chat_member update to MemberJoinedEvent / MemberLeftEvent / etc.""" + import time + + cm = update.chat_member + chat = cm.chat + group = _make_user_group(chat) + member = _make_user(cm.new_chat_member.user) if cm.new_chat_member else platform_entities.User(id="") + inviter = _make_user(cm.from_user) if cm.from_user else None + + old_status = cm.old_chat_member.status if cm.old_chat_member else None + new_status = cm.new_chat_member.status if cm.new_chat_member else None + + # Member joined + if old_status in (None, 'left', 'kicked') and new_status in ('member', 'administrator', 'creator', 'restricted'): + return platform_events.MemberJoinedEvent( + type="group.member_joined", + timestamp=cm.date.timestamp() if cm.date else time.time(), + adapter_name="telegram", + group=group, + member=member, + inviter=inviter, + join_type="invite" if inviter and inviter.id != member.id else "direct", + source_platform_object=update, + ) + + # Member left / kicked + if old_status in ('member', 'administrator', 'creator', 'restricted') and new_status in ('left', 'kicked'): + is_kicked = new_status == 'kicked' + return platform_events.MemberLeftEvent( + type="group.member_left", + timestamp=cm.date.timestamp() if cm.date else time.time(), + adapter_name="telegram", + group=group, + member=member, + is_kicked=is_kicked, + operator=inviter if is_kicked else None, + source_platform_object=update, + ) + + # Member muted (restricted with can_send_messages == False) + if new_status == 'restricted' and cm.new_chat_member: + restricted = cm.new_chat_member + if hasattr(restricted, 'can_send_messages') and not restricted.can_send_messages: + duration = None + if hasattr(restricted, 'until_date') and restricted.until_date: + duration = int(restricted.until_date.timestamp() - time.time()) + return platform_events.MemberBannedEvent( + type="group.member_banned", + timestamp=cm.date.timestamp() if cm.date else time.time(), + adapter_name="telegram", + group=group, + member=member, + operator=inviter, + duration=duration, + source_platform_object=update, + ) + + # Other chat_member changes -> PlatformSpecificEvent + return platform_events.PlatformSpecificEvent( + type="platform.specific", + timestamp=cm.date.timestamp() if cm.date else time.time(), + adapter_name="telegram", + action="chat_member_updated", + data={ + "old_status": old_status, + "new_status": new_status, + "chat_id": chat.id, + "user_id": member.id, + }, + source_platform_object=update, + ) + + @staticmethod + def _convert_my_chat_member(update: Update) -> typing.Optional[platform_events.EBAEvent]: + """Convert a my_chat_member update to bot status events.""" + import time + + mcm = update.my_chat_member + chat = mcm.chat + group = _make_user_group(chat) + inviter = _make_user(mcm.from_user) if mcm.from_user else None + + old_status = mcm.old_chat_member.status if mcm.old_chat_member else None + new_status = mcm.new_chat_member.status if mcm.new_chat_member else None + + # Bot invited to group + if old_status in (None, 'left', 'kicked') and new_status in ('member', 'administrator'): + return platform_events.BotInvitedToGroupEvent( + type="bot.invited_to_group", + timestamp=mcm.date.timestamp() if mcm.date else time.time(), + adapter_name="telegram", + group=group, + inviter=inviter, + source_platform_object=update, + ) + + # Bot removed from group + if old_status in ('member', 'administrator', 'creator') and new_status in ('left', 'kicked'): + return platform_events.BotRemovedFromGroupEvent( + type="bot.removed_from_group", + timestamp=mcm.date.timestamp() if mcm.date else time.time(), + adapter_name="telegram", + group=group, + operator=inviter, + source_platform_object=update, + ) + + # Bot muted + if new_status == 'restricted' and mcm.new_chat_member: + restricted = mcm.new_chat_member + if hasattr(restricted, 'can_send_messages') and not restricted.can_send_messages: + duration = None + if hasattr(restricted, 'until_date') and restricted.until_date: + duration = int(restricted.until_date.timestamp() - time.time()) + return platform_events.BotMutedEvent( + type="bot.muted", + timestamp=mcm.date.timestamp() if mcm.date else time.time(), + adapter_name="telegram", + group=group, + operator=inviter, + duration=duration, + source_platform_object=update, + ) + + return platform_events.PlatformSpecificEvent( + type="platform.specific", + timestamp=mcm.date.timestamp() if mcm.date else time.time(), + adapter_name="telegram", + action="my_chat_member_updated", + data={ + "old_status": old_status, + "new_status": new_status, + "chat_id": chat.id, + }, + source_platform_object=update, + ) + + @staticmethod + def _convert_reaction(update: Update) -> platform_events.MessageReactionEvent: + """Convert a Telegram message_reaction to MessageReactionEvent.""" + import time + + reaction = update.message_reaction + chat = reaction.chat + + # Extract newly added emojis + new_emojis = [] + if reaction.new_reaction: + for r in reaction.new_reaction: + if hasattr(r, 'emoji'): + new_emojis.append(r.emoji) + elif hasattr(r, 'custom_emoji_id'): + new_emojis.append(str(r.custom_emoji_id)) + + user = platform_entities.User(id="") + if reaction.user: + user = _make_user(reaction.user) + + ct = _chat_type(chat) + group = _make_user_group(chat) if ct == platform_entities.ChatType.GROUP else None + + return platform_events.MessageReactionEvent( + type="message.reaction", + timestamp=reaction.date.timestamp() if reaction.date else time.time(), + adapter_name="telegram", + message_id=reaction.message_id, + user=user, + reaction=new_emojis[0] if new_emojis else "", + is_add=len(new_emojis) > 0, + chat_type=ct, + chat_id=chat.id, + group=group, + source_platform_object=update, + ) + + +class LegacyEventConverter(abstract_platform_adapter.AbstractEventConverter): + """Legacy event converter (compatibility layer). + + Converts Telegram Updates to the old FriendMessage / GroupMessage format. + Used during the transition period to maintain backward compatibility. + """ + + @staticmethod + async def yiri2target(event: platform_events.MessageEvent, bot: telegram.Bot): + return event.source_platform_object + + @staticmethod + async def target2yiri(event: Update, bot: telegram.Bot, bot_account_id: str): + """Convert to legacy format (FriendMessage / GroupMessage).""" + import langbot_plugin.api.entities.builtin.platform.events as legacy_events + import langbot_plugin.api.entities.builtin.platform.entities as legacy_entities + + if not event.message: + return None + + lb_message = await TelegramMessageConverter.target2yiri(event.message, bot, bot_account_id) + + if event.effective_chat.type == 'private': + return legacy_events.FriendMessage( + sender=legacy_entities.Friend( + id=event.effective_chat.id, + nickname=event.effective_chat.first_name, + remark=str(event.effective_chat.id), + ), + message_chain=lb_message, + time=event.message.date.timestamp(), + source_platform_object=event, + ) + else: + return legacy_events.GroupMessage( + sender=legacy_entities.GroupMember( + id=event.effective_chat.id, + member_name=event.effective_chat.title, + permission=legacy_entities.Permission.Member, + group=legacy_entities.Group( + id=event.effective_chat.id, + name=event.effective_chat.title, + permission=legacy_entities.Permission.Member, + ), + special_title='', + ), + message_chain=lb_message, + time=event.message.date.timestamp(), + source_platform_object=event, + ) diff --git a/src/langbot/pkg/platform/adapters/telegram/manifest.yaml b/src/langbot/pkg/platform/adapters/telegram/manifest.yaml new file mode 100644 index 00000000..5ca602cd --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/manifest.yaml @@ -0,0 +1,97 @@ +apiVersion: v1 +kind: MessagePlatformAdapter + +metadata: + name: telegram-eba + label: + en_US: Telegram (EBA) + zh_Hans: 电报 (EBA) + description: + en_US: Telegram Bot adapter (EBA architecture) + zh_Hans: 电报 Bot 适配器(EBA 架构版本) + icon: telegram.svg + +spec: + config: + - name: token + label: + en_US: Token + zh_Hans: 令牌 + type: string + required: true + default: "" + - name: markdown_card + label: + en_US: Markdown Card + zh_Hans: 是否使用 Markdown 卡片 + type: boolean + required: false + default: true + - name: enable-stream-reply + label: + en_US: Enable Stream Reply Mode + zh_Hans: 启用电报流式回复模式 + description: + en_US: If enabled, the bot will use the stream of telegram reply mode + zh_Hans: 如果启用,将使用电报流式方式来回复内容 + type: boolean + required: true + default: false + + supported_events: + - message.received + - message.edited + - message.deleted + - message.reaction + - group.member_joined + - group.member_left + - group.member_banned + - group.info_updated + - bot.invited_to_group + - bot.removed_from_group + + supported_apis: + required: + - send_message + - reply_message + optional: + - edit_message + - delete_message + - forward_message + - get_group_info + - get_group_member_list + - get_group_member_info + - get_user_info + - get_file_url + - mute_member + - unmute_member + - kick_member + - leave_group + - call_platform_api + + platform_specific_apis: + - action: pin_message + description: { en_US: "Pin a message", zh_Hans: "置顶消息" } + - action: unpin_message + description: { en_US: "Unpin a message", zh_Hans: "取消置顶" } + - action: unpin_all_messages + description: { en_US: "Unpin all messages", zh_Hans: "取消所有置顶" } + - action: get_chat_administrators + description: { en_US: "Get chat admins", zh_Hans: "获取群管理员列表" } + - action: set_chat_title + description: { en_US: "Set chat title", zh_Hans: "修改群名称" } + - action: set_chat_description + description: { en_US: "Set chat description", zh_Hans: "修改群描述" } + - action: get_chat_member_count + description: { en_US: "Get member count", zh_Hans: "获取群成员数量" } + - action: send_chat_action + description: { en_US: "Send chat action (typing, etc.)", zh_Hans: "发送聊天动作" } + - action: create_chat_invite_link + description: { en_US: "Create invite link", zh_Hans: "创建邀请链接" } + - action: answer_callback_query + description: { en_US: "Answer callback query", zh_Hans: "应答回调查询" } + +execution: + python: + path: pkg/platform/adapters/telegram/adapter.py + attr: TelegramAdapter diff --git a/src/langbot/pkg/platform/adapters/telegram/message_converter.py b/src/langbot/pkg/platform/adapters/telegram/message_converter.py new file mode 100644 index 00000000..90ce7655 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/message_converter.py @@ -0,0 +1,143 @@ +"""Telegram message chain converter. + +Migrated from the original sources/telegram.py TelegramMessageConverter. Logic unchanged. +""" + +from __future__ import annotations + +import base64 +import typing + +import telegram + +from langbot.pkg.utils import httpclient +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.entities.builtin.platform.message as platform_message + + +class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverter): + @staticmethod + async def yiri2target(message_chain: platform_message.MessageChain, bot: telegram.Bot) -> list[dict]: + """Convert a LangBot MessageChain to a list of Telegram-sendable components.""" + components = [] + + for component in message_chain: + if isinstance(component, platform_message.Plain): + components.append({'type': 'text', 'text': component.text}) + elif isinstance(component, platform_message.Image): + photo_bytes = None + + if component.base64: + photo_bytes = base64.b64decode(component.base64) + elif component.url: + session = httpclient.get_session() + async with session.get(component.url) as response: + photo_bytes = await response.read() + elif component.path: + with open(component.path, 'rb') as f: + photo_bytes = f.read() + + components.append({'type': 'photo', 'photo': photo_bytes}) + elif isinstance(component, platform_message.File): + file_bytes = None + + if component.base64: + b64_data = component.base64 + if ';base64,' in b64_data: + b64_data = b64_data.split(';base64,', 1)[1] + file_bytes = base64.b64decode(b64_data) + elif component.url: + session = httpclient.get_session() + async with session.get(component.url) as response: + file_bytes = await response.read() + elif component.path: + with open(component.path, 'rb') as f: + file_bytes = f.read() + + file_name = getattr(component, 'name', None) or 'file' + components.append({'type': 'document', 'document': file_bytes, 'filename': file_name}) + elif isinstance(component, platform_message.Forward): + for node in component.node_list: + components.extend(await TelegramMessageConverter.yiri2target(node.message_chain, bot)) + + return components + + @staticmethod + async def target2yiri(message: telegram.Message, bot: telegram.Bot, bot_account_id: str): + """Convert a Telegram Message to a LangBot MessageChain.""" + message_components = [] + + def parse_message_text(text: str) -> list[platform_message.MessageComponent]: + msg_components = [] + + if f'@{bot_account_id}' in text: + msg_components.append(platform_message.At(target=bot_account_id)) + text = text.replace(f'@{bot_account_id}', '') + msg_components.append(platform_message.Plain(text=text)) + + return msg_components + + if message.text: + message_text = message.text + message_components.extend(parse_message_text(message_text)) + + if message.photo: + if message.caption: + message_components.extend(parse_message_text(message.caption)) + + file = await message.photo[-1].get_file() + + file_bytes = None + file_format = '' + + async with httpclient.get_session(trust_env=True).get(file.file_path) as response: + file_bytes = await response.read() + file_format = 'image/jpeg' + + message_components.append( + platform_message.Image( + base64=f'data:{file_format};base64,{base64.b64encode(file_bytes).decode("utf-8")}' + ) + ) + + if message.voice: + if message.caption: + message_components.extend(parse_message_text(message.caption)) + + file = await message.voice.get_file() + + file_bytes = None + file_format = message.voice.mime_type or 'audio/ogg' + + async with httpclient.get_session(trust_env=True).get(file.file_path) as response: + file_bytes = await response.read() + + message_components.append( + platform_message.Voice( + base64=f'data:{file_format};base64,{base64.b64encode(file_bytes).decode("utf-8")}', + length=message.voice.duration, + ) + ) + + if message.document: + if message.caption: + message_components.extend(parse_message_text(message.caption)) + + file = await message.document.get_file() + file_name = message.document.file_name or 'document' + file_size = message.document.file_size or 0 + file_format = message.document.mime_type or 'application/octet-stream' + + file_bytes = None + async with httpclient.get_session(trust_env=True).get(file.file_path) as response: + file_bytes = await response.read() + + message_components.append( + platform_message.File( + name=file_name, + size=file_size, + base64=f'data:{file_format};base64,{base64.b64encode(file_bytes).decode("utf-8")}', + ) + ) + + return platform_message.MessageChain(message_components) diff --git a/src/langbot/pkg/platform/adapters/telegram/platform_api.py b/src/langbot/pkg/platform/adapters/telegram/platform_api.py new file mode 100644 index 00000000..ccd81950 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/platform_api.py @@ -0,0 +1,125 @@ +"""Telegram platform-specific API dispatch table for call_platform_api.""" + +from __future__ import annotations + +import typing + +import telegram + + +async def pin_message(bot: telegram.Bot, params: dict) -> dict: + """Pin a message in a chat.""" + await bot.pin_chat_message( + chat_id=params['chat_id'], + message_id=params['message_id'], + disable_notification=params.get('disable_notification', False), + ) + return {"ok": True} + + +async def unpin_message(bot: telegram.Bot, params: dict) -> dict: + """Unpin a message in a chat.""" + await bot.unpin_chat_message( + chat_id=params['chat_id'], + message_id=params.get('message_id'), + ) + return {"ok": True} + + +async def unpin_all_messages(bot: telegram.Bot, params: dict) -> dict: + """Unpin all messages in a chat.""" + await bot.unpin_all_chat_messages(chat_id=params['chat_id']) + return {"ok": True} + + +async def get_chat_administrators(bot: telegram.Bot, params: dict) -> dict: + """Get chat administrator list.""" + admins = await bot.get_chat_administrators(chat_id=params['chat_id']) + return { + "administrators": [ + { + "user_id": a.user.id, + "username": a.user.username, + "first_name": a.user.first_name, + "status": a.status, + "custom_title": getattr(a, 'custom_title', None), + } + for a in admins + ] + } + + +async def set_chat_title(bot: telegram.Bot, params: dict) -> dict: + """Set chat title.""" + await bot.set_chat_title( + chat_id=params['chat_id'], + title=params['title'], + ) + return {"ok": True} + + +async def set_chat_description(bot: telegram.Bot, params: dict) -> dict: + """Set chat description.""" + await bot.set_chat_description( + chat_id=params['chat_id'], + description=params.get('description', ''), + ) + return {"ok": True} + + +async def get_chat_member_count(bot: telegram.Bot, params: dict) -> dict: + """Get chat member count.""" + count = await bot.get_chat_member_count(chat_id=params['chat_id']) + return {"count": count} + + +async def send_chat_action(bot: telegram.Bot, params: dict) -> dict: + """Send a chat action (e.g. typing).""" + await bot.send_chat_action( + chat_id=params['chat_id'], + action=params.get('action', 'typing'), + ) + return {"ok": True} + + +async def create_chat_invite_link(bot: telegram.Bot, params: dict) -> dict: + """Create a chat invite link.""" + link = await bot.create_chat_invite_link( + chat_id=params['chat_id'], + name=params.get('name'), + expire_date=params.get('expire_date'), + member_limit=params.get('member_limit'), + ) + return { + "invite_link": link.invite_link, + "name": link.name, + "is_primary": link.is_primary, + "is_revoked": link.is_revoked, + } + + +async def answer_callback_query(bot: telegram.Bot, params: dict) -> dict: + """Answer a callback query.""" + await bot.answer_callback_query( + callback_query_id=params['callback_query_id'], + text=params.get('text'), + show_alert=params.get('show_alert', False), + url=params.get('url'), + ) + return {"ok": True} + + +# ---- Action dispatch table ---- + +PLATFORM_API_MAP: dict[str, typing.Callable[[telegram.Bot, dict], typing.Awaitable[dict]]] = { + "pin_message": pin_message, + "unpin_message": unpin_message, + "unpin_all_messages": unpin_all_messages, + "get_chat_administrators": get_chat_administrators, + "set_chat_title": set_chat_title, + "set_chat_description": set_chat_description, + "get_chat_member_count": get_chat_member_count, + "send_chat_action": send_chat_action, + "create_chat_invite_link": create_chat_invite_link, + "answer_callback_query": answer_callback_query, +} diff --git a/src/langbot/pkg/platform/adapters/telegram/types.py b/src/langbot/pkg/platform/adapters/telegram/types.py new file mode 100644 index 00000000..d36239ff --- /dev/null +++ b/src/langbot/pkg/platform/adapters/telegram/types.py @@ -0,0 +1,13 @@ +"""Telegram platform-specific type definitions.""" + +from __future__ import annotations + +from enum import Enum + + +class TelegramChatType(str, Enum): + """Telegram chat type.""" + PRIVATE = "private" + GROUP = "group" + SUPERGROUP = "supergroup" + CHANNEL = "channel"