From 195b694ecca1b50b4d4226f196e263f38d0b7640 Mon Sep 17 00:00:00 2001 From: Tiankai Ma Date: Mon, 19 Jan 2026 23:42:17 +0800 Subject: [PATCH] feat(telegram): threaded mode support (#1920) * feat(telegram): reply in threaded mode * feat(telegram): thread-level isolation --- src/langbot/pkg/platform/botmgr.py | 18 ++++++++++++-- src/langbot/pkg/platform/sources/telegram.py | 26 ++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/langbot/pkg/platform/botmgr.py b/src/langbot/pkg/platform/botmgr.py index b9d7a5fe..43b8a7ab 100644 --- a/src/langbot/pkg/platform/botmgr.py +++ b/src/langbot/pkg/platform/botmgr.py @@ -75,10 +75,17 @@ class RuntimeBot: # Only add to query pool if no webhook requested to skip pipeline if not skip_pipeline: + launcher_id = event.sender.id + + if hasattr(adapter, 'get_launcher_id'): + custom_launcher_id = adapter.get_launcher_id(event) + if custom_launcher_id: + launcher_id = custom_launcher_id + await self.ap.query_pool.add_query( bot_uuid=self.bot_entity.uuid, launcher_type=provider_session.LauncherTypes.PERSON, - launcher_id=event.sender.id, + launcher_id=launcher_id, sender_id=event.sender.id, message_event=event, message_chain=event.message_chain, @@ -111,10 +118,17 @@ class RuntimeBot: # Only add to query pool if no webhook requested to skip pipeline if not skip_pipeline: + launcher_id = event.group.id + + if hasattr(adapter, 'get_launcher_id'): + custom_launcher_id = adapter.get_launcher_id(event) + if custom_launcher_id: + launcher_id = custom_launcher_id + await self.ap.query_pool.add_query( bot_uuid=self.bot_entity.uuid, launcher_type=provider_session.LauncherTypes.GROUP, - launcher_id=event.group.id, + launcher_id=launcher_id, sender_id=event.sender.id, message_event=event, message_chain=event.message_chain, diff --git a/src/langbot/pkg/platform/sources/telegram.py b/src/langbot/pkg/platform/sources/telegram.py index cfdbe75c..79a959fa 100644 --- a/src/langbot/pkg/platform/sources/telegram.py +++ b/src/langbot/pkg/platform/sources/telegram.py @@ -197,6 +197,10 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): } if self.config['markdown_card'] is True: args['parse_mode'] = 'MarkdownV2' + + if message_source.source_platform_object.message.message_thread_id: + args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id + if quote_origin: args['reply_to_message_id'] = message_source.source_platform_object.message.id @@ -231,8 +235,12 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): 'chat_id': message_source.source_platform_object.effective_chat.id, 'text': content, } + if message_source.source_platform_object.message.message_thread_id: + args['message_thread_id'] = message_source.source_platform_object.message.message_thread_id + if quote_origin: args['reply_to_message_id'] = message_source.source_platform_object.message.id + if self.config['markdown_card'] is True: args['parse_mode'] = 'MarkdownV2' @@ -260,6 +268,24 @@ class TelegramAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): # self.seq = 1 # 消息回复结束之后重置seq self.msg_stream_id.pop(message_id) # 消息回复结束之后删除流式消息id + def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: + if not isinstance(event.source_platform_object, Update): + return None + + message = event.source_platform_object.message + if not message: + return None + + # specifically handle telegram forum topic and private thread(not supported by official client yet but supported by bot api) + if message.message_thread_id: + # check if it is a group + if isinstance(event, platform_events.GroupMessage): + return f'{event.group.id}#{message.message_thread_id}' + elif isinstance(event, platform_events.FriendMessage): + return f'{event.sender.id}#{message.message_thread_id}' + + return None + async def is_stream_output_supported(self) -> bool: is_stream = False if self.config.get('enable-stream-reply', None):