diff --git a/src/langbot/pkg/platform/adapters/telegram/api_impl.py b/src/langbot/pkg/platform/adapters/telegram/api_impl.py index 30a9b924..f058d54e 100644 --- a/src/langbot/pkg/platform/adapters/telegram/api_impl.py +++ b/src/langbot/pkg/platform/adapters/telegram/api_impl.py @@ -40,6 +40,7 @@ class TelegramAPIMixin: text = component['text'] if self.config.get('markdown_card', False): import telegramify_markdown + text = telegramify_markdown.markdownify(content=text) args = { 'chat_id': chat_id, @@ -76,7 +77,7 @@ class TelegramAPIMixin: ) return platform_events.MessageResult( message_id=result.message_id, - raw={"message_id": result.message_id}, + raw={'message_id': result.message_id}, ) async def get_group_info( @@ -87,7 +88,7 @@ class TelegramAPIMixin: chat = await self.bot.get_chat(chat_id=group_id) return platform_entities.UserGroup( id=chat.id, - name=chat.title or "", + name=chat.title or '', description=chat.description or None, member_count=await self._get_member_count(group_id), ) @@ -118,17 +119,19 @@ class TelegramAPIMixin: elif admin.status == 'administrator': role = platform_entities.MemberRole.ADMIN - members.append(platform_entities.UserGroupMember( - user=platform_entities.User( - id=admin.user.id, - nickname=admin.user.first_name or "", - username=admin.user.username, - is_bot=admin.user.is_bot, - ), - group_id=group_id, - role=role, - display_name=admin.custom_title if hasattr(admin, 'custom_title') else None, - )) + members.append( + platform_entities.UserGroupMember( + user=platform_entities.User( + id=admin.user.id, + nickname=admin.user.first_name or '', + username=admin.user.username, + is_bot=admin.user.is_bot, + ), + group_id=group_id, + role=role, + display_name=admin.custom_title if hasattr(admin, 'custom_title') else None, + ) + ) return members async def get_group_member_info( @@ -148,7 +151,7 @@ class TelegramAPIMixin: return platform_entities.UserGroupMember( user=platform_entities.User( id=member.user.id, - nickname=member.user.first_name or "", + nickname=member.user.first_name or '', username=member.user.username, is_bot=member.user.is_bot, ), @@ -165,7 +168,7 @@ class TelegramAPIMixin: chat = await self.bot.get_chat(chat_id=user_id) return platform_entities.User( id=chat.id, - nickname=chat.first_name or "", + nickname=chat.first_name or '', username=chat.username, ) @@ -180,7 +183,8 @@ class TelegramAPIMixin: part of messages. This method raises NotSupportedError. """ from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError - raise NotSupportedError("upload_file") + + raise NotSupportedError('upload_file') async def get_file_url( self, @@ -198,6 +202,7 @@ class TelegramAPIMixin: ) -> None: """Mute a group member.""" import datetime + permissions = telegram.ChatPermissions(can_send_messages=False) kwargs = { 'chat_id': group_id, @@ -216,9 +221,14 @@ class TelegramAPIMixin: """Unmute a group member.""" permissions = telegram.ChatPermissions( can_send_messages=True, - can_send_media_messages=True, can_send_other_messages=True, can_add_web_page_previews=True, + can_send_audios=True, + can_send_documents=True, + can_send_photos=True, + can_send_videos=True, + can_send_video_notes=True, + can_send_voice_notes=True, ) await self.bot.restrict_chat_member( chat_id=group_id, diff --git a/tests/e2e/live_telegram_eba_probe.py b/tests/e2e/live_telegram_eba_probe.py index 760d1622..38753772 100644 --- a/tests/e2e/live_telegram_eba_probe.py +++ b/tests/e2e/live_telegram_eba_probe.py @@ -5,12 +5,15 @@ import asyncio import base64 import json import os +import re from pathlib import Path +from typing import Any import telegram from langbot.pkg.platform.adapters.telegram.adapter import TelegramAdapter from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities from langbot_plugin.api.entities.builtin.platform import events as platform_events from langbot_plugin.api.entities.builtin.platform import message as platform_message @@ -54,10 +57,62 @@ def summarize_event(event: platform_events.EBAEvent) -> dict: if hasattr(value, 'value'): value = value.value data[field] = value + if hasattr(event, 'member') and event.member is not None: + data['member'] = event.member.model_dump() + if hasattr(event, 'group') and event.group is not None: + data['group'] = event.group.model_dump() + if hasattr(event, 'operator') and event.operator is not None: + data['operator'] = event.operator.model_dump() return data -async def run_probe(token: str, log_path: Path, timeout: int): +def chat_type_value(chat_type: platform_entities.ChatType | str) -> str: + return chat_type.value if hasattr(chat_type, 'value') else str(chat_type) + + +def record_api(results: list[dict[str, Any]], name: str, ok: bool, result: Any = None, error: Exception | None = None): + entry = {'name': name, 'ok': ok} + if result is not None: + entry['result'] = redact_sensitive(result) + if error is not None: + entry['error'] = repr(error) + results.append(entry) + print('TELEGRAM_EBA_API', json.dumps(entry, ensure_ascii=False, default=str)) + + +def redact_sensitive(value: Any) -> Any: + if isinstance(value, str): + return re.sub(r'bot\d+:[A-Za-z0-9_-]+', 'bot', value) + if isinstance(value, dict): + return {key: redact_sensitive(item) for key, item in value.items()} + if isinstance(value, list): + return [redact_sensitive(item) for item in value] + if isinstance(value, int | float | bool) or value is None: + return value + return redact_sensitive(str(value)) + + +async def run_api(results: list[dict[str, Any]], name: str, func): + try: + result = await func() + record_api(results, name, True, result) + return result + except Exception as exc: + record_api(results, name, False, error=exc) + return None + + +async def run_probe( + token: str, + log_path: Path, + timeout: int, + group_chat_id: str | None, + moderation_user_id: str | None, + kick_user_id: str | None, + allow_group_mutation: bool, + allow_unpin_all: bool, + allow_leave_group: bool, +): adapter = TelegramAdapter( { 'token': token, @@ -67,9 +122,15 @@ async def run_probe(token: str, log_path: Path, timeout: int): ProbeLogger(), ) events: list[platform_events.EBAEvent] = [] + api_results: list[dict[str, Any]] = [] first_message = asyncio.Event() + callback_event = asyncio.Event() + callback_query_id: str | None = None + callback_probe_message_id: int | None = None + awaiting_callback = False async def listener(event, adapter): + nonlocal callback_query_id events.append(event) log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open('a', encoding='utf-8') as fp: @@ -77,6 +138,11 @@ async def run_probe(token: str, log_path: Path, timeout: int): print('TELEGRAM_EBA_EVENT', json.dumps(summarize_event(event), ensure_ascii=False)) if isinstance(event, platform_events.MessageReceivedEvent): first_message.set() + if isinstance(event, platform_events.PlatformSpecificEvent) and event.action == 'callback_query': + message_id = event.data.get('message_id') + if awaiting_callback and message_id == callback_probe_message_id: + callback_query_id = event.data.get('callback_query_id') + callback_event.set() adapter.register_listener(platform_events.EBAEvent, listener) await adapter.run_async() @@ -85,63 +151,381 @@ async def run_probe(token: str, log_path: Path, timeout: int): print('READY: send a private or group message to the Telegram test bot now.') await asyncio.wait_for(first_message.wait(), timeout=timeout) source = next(event for event in events if isinstance(event, platform_events.MessageReceivedEvent)) + source_chat_type = chat_type_value(source.chat_type) + group_id = group_chat_id or (str(source.chat_id) if source_chat_type == 'group' else None) + actor_user_id = str(source.sender.id) + target_chat_id = str(source.chat_id) - await adapter.reply_message( - source, - platform_message.MessageChain( - [ - platform_message.Plain(text='EBA live reply: text'), - platform_message.Image(base64=base64.b64encode(PNG_1X1).decode()), - platform_message.File( - name='eba-live.txt', - size=8, - base64='data:text/plain;base64,' + base64.b64encode(b'eba-live').decode(), + await run_api( + api_results, + 'reply_message:text_image_file', + lambda: adapter.reply_message( + source, + platform_message.MessageChain( + [ + platform_message.Plain(text='EBA live reply: text'), + platform_message.Image(base64=base64.b64encode(PNG_1X1).decode()), + platform_message.File( + name='eba-live.txt', + size=8, + base64='data:text/plain;base64,' + base64.b64encode(b'eba-live').decode(), + ), + ] + ), + quote_origin=True, + ), + ) + await run_api( + api_results, + 'send_message:text_image_file', + lambda: adapter.send_message( + source_chat_type, + target_chat_id, + platform_message.MessageChain( + [ + platform_message.Plain(text='EBA live send_message OK'), + platform_message.Image(base64=base64.b64encode(PNG_1X1).decode()), + platform_message.File( + name='eba-send-live.txt', + size=13, + base64='data:text/plain;base64,' + base64.b64encode(b'eba-send-live').decode(), + ), + ] + ), + ), + ) + + edit_probe = await run_api( + api_results, + 'bot.send_message:edit_delete_probe', + lambda: adapter.bot.send_message(chat_id=target_chat_id, text='EBA edit/delete probe'), + ) + if edit_probe: + await run_api( + api_results, + 'edit_message', + lambda: adapter.edit_message( + source_chat_type, + target_chat_id, + edit_probe.message_id, + platform_message.MessageChain([platform_message.Plain(text='EBA edit probe edited')]), + ), + ) + await run_api( + api_results, + 'delete_message', + lambda: adapter.delete_message(source_chat_type, target_chat_id, edit_probe.message_id), + ) + + forward_probe = await run_api( + api_results, + 'bot.send_message:forward_probe', + lambda: adapter.bot.send_message(chat_id=target_chat_id, text='EBA forward probe'), + ) + if forward_probe: + forwarded = await run_api( + api_results, + 'forward_message', + lambda: adapter.forward_message( + source_chat_type, + target_chat_id, + forward_probe.message_id, + source_chat_type, + target_chat_id, + ), + ) + if forwarded: + await run_api( + api_results, + 'delete_message:forwarded_cleanup', + lambda: adapter.delete_message(source_chat_type, target_chat_id, forwarded.message_id), + ) + await run_api( + api_results, + 'delete_message:forward_source_cleanup', + lambda: adapter.delete_message(source_chat_type, target_chat_id, forward_probe.message_id), + ) + + document_probe = await run_api( + api_results, + 'bot.send_document:get_file_url_probe', + lambda: adapter.bot.send_document( + chat_id=target_chat_id, + document=telegram.InputFile(b'eba-file-url', filename='eba-file-url.txt'), + ), + ) + if document_probe and document_probe.document: + await run_api( + api_results, + 'get_file_url', + lambda: adapter.get_file_url(document_probe.document.file_id), + ) + + await run_api( + api_results, + 'get_user_info', + lambda: adapter.get_user_info(actor_user_id), + ) + await run_api( + api_results, + 'call_platform_api:send_chat_action', + lambda: adapter.call_platform_api('send_chat_action', {'chat_id': target_chat_id, 'action': 'typing'}), + ) + + callback_probe = await run_api( + api_results, + 'bot.send_message:callback_probe', + lambda: adapter.bot.send_message( + chat_id=target_chat_id, + text='EBA callback probe', + reply_markup=telegram.InlineKeyboardMarkup( + [[telegram.InlineKeyboardButton('callback probe', callback_data='eba-callback-probe')]] + ), + ), + ) + if callback_probe: + callback_probe_message_id = callback_probe.message_id + awaiting_callback = True + callback_event.clear() + print('READY: click the callback button to test answer_callback_query.') + try: + await asyncio.wait_for(callback_event.wait(), timeout=max(15, timeout // 3)) + except asyncio.TimeoutError: + record_api( + api_results, + 'call_platform_api:answer_callback_query', + False, + error=TimeoutError('callback button was not clicked before timeout'), + ) + else: + await run_api( + api_results, + 'call_platform_api:answer_callback_query', + lambda: adapter.call_platform_api( + 'answer_callback_query', + {'callback_query_id': callback_query_id, 'text': 'EBA callback answered'}, ), - ] - ), - quote_origin=True, - ) - await adapter.send_message( - source.chat_type.value if hasattr(source.chat_type, 'value') else str(source.chat_type), - source.chat_id, - platform_message.MessageChain([platform_message.Plain(text='EBA live send_message OK')]), - ) + ) + finally: + awaiting_callback = False - edit_probe = await adapter.bot.send_message(chat_id=source.chat_id, text='EBA edit/delete probe') - await adapter.edit_message( - str(source.chat_type), - source.chat_id, - edit_probe.message_id, - platform_message.MessageChain([platform_message.Plain(text='EBA edit probe edited')]), - ) - await adapter.delete_message(str(source.chat_type), source.chat_id, edit_probe.message_id) + if group_id: + await run_api(api_results, 'get_group_info', lambda: adapter.get_group_info(group_id)) + await run_api(api_results, 'get_group_member_list', lambda: adapter.get_group_member_list(group_id)) + await run_api( + api_results, + 'get_group_member_info', + lambda: adapter.get_group_member_info(group_id, actor_user_id), + ) + await run_api( + api_results, + 'call_platform_api:get_chat_administrators', + lambda: adapter.call_platform_api('get_chat_administrators', {'chat_id': group_id}), + ) + await run_api( + api_results, + 'call_platform_api:get_chat_member_count', + lambda: adapter.call_platform_api('get_chat_member_count', {'chat_id': group_id}), + ) + await run_api( + api_results, + 'call_platform_api:create_chat_invite_link', + lambda: adapter.call_platform_api( + 'create_chat_invite_link', {'chat_id': group_id, 'name': 'eba-probe'} + ), + ) - await adapter.bot.send_message( - chat_id=source.chat_id, - text='EBA callback probe', - reply_markup=telegram.InlineKeyboardMarkup( - [[telegram.InlineKeyboardButton('callback probe', callback_data='eba-callback-probe')]] - ), - ) + pin_probe = await run_api( + api_results, + 'bot.send_message:pin_probe', + lambda: adapter.bot.send_message(chat_id=group_id, text='EBA pin probe'), + ) + if pin_probe: + await run_api( + api_results, + 'call_platform_api:pin_message', + lambda: adapter.call_platform_api( + 'pin_message', + {'chat_id': group_id, 'message_id': pin_probe.message_id, 'disable_notification': True}, + ), + ) + await run_api( + api_results, + 'call_platform_api:unpin_message', + lambda: adapter.call_platform_api( + 'unpin_message', + {'chat_id': group_id, 'message_id': pin_probe.message_id}, + ), + ) + await run_api( + api_results, + 'delete_message:pin_probe_cleanup', + lambda: adapter.delete_message('group', group_id, pin_probe.message_id), + ) - if str(source.chat_type) == 'ChatType.GROUP' or getattr(source.chat_type, 'value', '') == 'group': - group_info = await adapter.get_group_info(source.chat_id) - print('GROUP_INFO', group_info.model_dump()) - members = await adapter.get_group_member_list(source.chat_id) - print('GROUP_MEMBER_LIST_COUNT', len(members)) - await adapter.call_platform_api('send_chat_action', {'chat_id': source.chat_id, 'action': 'typing'}) - count = await adapter.call_platform_api('get_chat_member_count', {'chat_id': source.chat_id}) - print('GROUP_MEMBER_COUNT', count) + if allow_unpin_all: + await run_api( + api_results, + 'call_platform_api:unpin_all_messages', + lambda: adapter.call_platform_api('unpin_all_messages', {'chat_id': group_id}), + ) + else: + record_api(api_results, 'call_platform_api:unpin_all_messages', False, error=RuntimeError('skipped')) - print('READY: click the callback button or react to a bot message if you want live callback/reaction events.') - await asyncio.sleep(max(5, timeout // 3)) + if allow_group_mutation: + chat = await adapter.bot.get_chat(chat_id=group_id) + original_title = chat.title or 'EBA Probe Group' + original_description = chat.description or '' + await run_api( + api_results, + 'call_platform_api:set_chat_title', + lambda: adapter.call_platform_api( + 'set_chat_title', + {'chat_id': group_id, 'title': f'{original_title} EBA'}, + ), + ) + await run_api( + api_results, + 'call_platform_api:set_chat_title:restore', + lambda: adapter.call_platform_api('set_chat_title', {'chat_id': group_id, 'title': original_title}), + ) + await run_api( + api_results, + 'call_platform_api:set_chat_description', + lambda: adapter.call_platform_api( + 'set_chat_description', + {'chat_id': group_id, 'description': 'EBA probe temporary description'}, + ), + ) + await run_api( + api_results, + 'call_platform_api:set_chat_description:restore', + lambda: adapter.call_platform_api( + 'set_chat_description', + {'chat_id': group_id, 'description': original_description}, + ), + ) + else: + record_api(api_results, 'call_platform_api:set_chat_title', False, error=RuntimeError('skipped')) + record_api(api_results, 'call_platform_api:set_chat_description', False, error=RuntimeError('skipped')) + + if moderation_user_id: + await run_api( + api_results, + 'mute_member', + lambda: adapter.mute_member(group_id, moderation_user_id, duration=30), + ) + await run_api( + api_results, + 'unmute_member', + lambda: adapter.unmute_member(group_id, moderation_user_id), + ) + else: + record_api(api_results, 'mute_member', False, error=RuntimeError('skipped')) + record_api(api_results, 'unmute_member', False, error=RuntimeError('skipped')) + + if kick_user_id: + await run_api(api_results, 'kick_member', lambda: adapter.kick_member(group_id, kick_user_id)) + else: + record_api(api_results, 'kick_member', False, error=RuntimeError('skipped')) + + if allow_leave_group: + await run_api(api_results, 'leave_group', lambda: adapter.leave_group(group_id)) + else: + record_api(api_results, 'leave_group', False, error=RuntimeError('skipped')) + else: + for name in ( + 'get_group_info', + 'get_group_member_list', + 'get_group_member_info', + 'mute_member', + 'unmute_member', + 'kick_member', + 'leave_group', + 'call_platform_api:get_chat_administrators', + 'call_platform_api:get_chat_member_count', + 'call_platform_api:create_chat_invite_link', + 'call_platform_api:pin_message', + 'call_platform_api:unpin_message', + 'call_platform_api:unpin_all_messages', + 'call_platform_api:set_chat_title', + 'call_platform_api:set_chat_description', + ): + record_api(api_results, name, False, error=RuntimeError('skipped: no group chat id')) + + await asyncio.sleep(3) finally: await adapter.kill() summary = { 'events': [summarize_event(event) for event in events], 'event_types': [event.type for event in events], + 'api_results': api_results, + 'api_passed': [result['name'] for result in api_results if result['ok']], + 'api_failed': [result for result in api_results if not result['ok']], } - print('SUMMARY', json.dumps(summary, ensure_ascii=False)) + print('SUMMARY', json.dumps(summary, ensure_ascii=False, default=str)) + + +async def run_callback_probe(token: str, chat_id: str, timeout: int): + adapter = TelegramAdapter( + { + 'token': token, + 'markdown_card': False, + 'enable-stream-reply': False, + }, + ProbeLogger(), + ) + api_results: list[dict[str, Any]] = [] + + callback_probe = await adapter.bot.send_message( + chat_id=chat_id, + text='EBA callback-only probe', + reply_markup=telegram.InlineKeyboardMarkup( + [[telegram.InlineKeyboardButton('callback probe', callback_data='eba-callback-only-probe')]] + ), + ) + deadline = asyncio.get_running_loop().time() + timeout + offset: int | None = None + try: + print('READY: click the callback-only probe button.') + callback_query_id: str | None = None + while asyncio.get_running_loop().time() < deadline and callback_query_id is None: + updates = await adapter.bot.get_updates( + offset=offset, + timeout=2, + allowed_updates=['callback_query'], + ) + for update in updates: + offset = update.update_id + 1 + callback_query = update.callback_query + if callback_query is None or callback_query.message is None: + continue + if callback_query.message.message_id == callback_probe.message_id: + callback_query_id = callback_query.id + break + if callback_query_id is None: + raise TimeoutError('callback button was not clicked before timeout') + await run_api( + api_results, + 'call_platform_api:answer_callback_query', + lambda: adapter.call_platform_api( + 'answer_callback_query', + {'callback_query_id': callback_query_id, 'text': 'EBA callback answered'}, + ), + ) + finally: + print( + 'SUMMARY', + json.dumps( + { + 'api_results': api_results, + 'api_passed': [result['name'] for result in api_results if result['ok']], + 'api_failed': [result for result in api_results if not result['ok']], + }, + ensure_ascii=False, + default=str, + ), + ) def main(): @@ -149,6 +533,13 @@ def main(): parser.add_argument('--token', default=os.getenv('TELEGRAM_BOT_TOKEN', '')) parser.add_argument('--log', default='data/temp/live_telegram_eba_probe.jsonl') parser.add_argument('--timeout', type=int, default=90) + parser.add_argument('--group-chat-id', default=os.getenv('TELEGRAM_EBA_GROUP_CHAT_ID')) + parser.add_argument('--moderation-user-id', default=os.getenv('TELEGRAM_EBA_MODERATION_USER_ID')) + parser.add_argument('--kick-user-id', default=os.getenv('TELEGRAM_EBA_KICK_USER_ID')) + parser.add_argument('--allow-group-mutation', action='store_true') + parser.add_argument('--allow-unpin-all', action='store_true') + parser.add_argument('--allow-leave-group', action='store_true') + parser.add_argument('--callback-chat-id', default=os.getenv('TELEGRAM_EBA_CALLBACK_CHAT_ID')) args = parser.parse_args() if not args.token: @@ -157,7 +548,22 @@ def main(): log_path = Path(args.log) if log_path.exists(): log_path.unlink() - asyncio.run(run_probe(args.token, log_path, args.timeout)) + if args.callback_chat_id: + asyncio.run(run_callback_probe(args.token, args.callback_chat_id, args.timeout)) + return + asyncio.run( + run_probe( + args.token, + log_path, + args.timeout, + args.group_chat_id, + args.moderation_user_id, + args.kick_user_id, + args.allow_group_mutation, + args.allow_unpin_all, + args.allow_leave_group, + ) + ) if __name__ == '__main__': diff --git a/tests/unit_tests/platform/test_telegram_eba_adapter.py b/tests/unit_tests/platform/test_telegram_eba_adapter.py index cf624254..806ef8de 100644 --- a/tests/unit_tests/platform/test_telegram_eba_adapter.py +++ b/tests/unit_tests/platform/test_telegram_eba_adapter.py @@ -402,6 +402,20 @@ async def test_telegram_platform_apis_call_underlying_bot_methods(): assert await PLATFORM_API_MAP['answer_callback_query'](bot, {'callback_query_id': 'cb'}) == {'ok': True} +@pytest.mark.asyncio +async def test_telegram_unmute_member_uses_current_chat_permissions_fields(): + adapter = make_adapter() + bot = SimpleNamespace(restrict_chat_member=AsyncMock()) + object.__setattr__(adapter, 'bot', bot) + + await adapter.unmute_member(group_id=-1001, user_id=123) + + permissions = bot.restrict_chat_member.await_args.kwargs['permissions'] + assert permissions.can_send_messages is True + assert permissions.can_send_photos is True + assert permissions.can_send_documents is True + + def test_runtime_bot_maps_telegram_eba_events_to_plugin_events(): group = platform_entities.UserGroup(id='group-1', name='Group') user = platform_entities.User(id='user-1', nickname='User')