diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py index 3ce4280c..c115243b 100644 --- a/src/langbot/pkg/platform/sources/lark.py +++ b/src/langbot/pkg/platform/sources/lark.py @@ -575,6 +575,127 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter): class LarkEventConverter(abstract_platform_adapter.AbstractEventConverter): + _processed_thread_quote_cache: typing.ClassVar[dict[str, float]] = {} + _processed_thread_quote_cache_max_size: typing.ClassVar[int] = 4096 + _processed_thread_quote_cache_ttl_seconds: typing.ClassVar[int] = 86400 + + @classmethod + def _prune_processed_thread_quote_cache(cls, now: typing.Optional[float] = None) -> None: + if now is None: + now = time.time() + + expire_before = now - cls._processed_thread_quote_cache_ttl_seconds + while cls._processed_thread_quote_cache: + oldest_key, oldest_ts = next(iter(cls._processed_thread_quote_cache.items())) + if oldest_ts >= expire_before: + break + cls._processed_thread_quote_cache.pop(oldest_key, None) + + while len(cls._processed_thread_quote_cache) > cls._processed_thread_quote_cache_max_size: + oldest_key = next(iter(cls._processed_thread_quote_cache)) + cls._processed_thread_quote_cache.pop(oldest_key, None) + + @classmethod + def _mark_thread_quote_processed(cls, thread_id: str) -> None: + now = time.time() + cls._prune_processed_thread_quote_cache(now) + cls._processed_thread_quote_cache[thread_id] = now + + @classmethod + def _extract_quote_message_id(cls, message: EventMessage) -> typing.Optional[str]: + """ + Extract the message ID to quote from the given message. + + Rules: + - First thread reply in a topic: return parent_id and mark topic as processed + - Follow-up thread replies in the same topic: return None + - Non-thread message: return parent_id if valid (non-empty, different from message_id) + + Thread reply state is kept in a bounded TTL cache to avoid unbounded memory growth. + """ + parent_id = getattr(message, 'parent_id', None) + if not parent_id: + return None + + message_id = getattr(message, 'message_id', None) + if parent_id == message_id: + return None + + thread_id = getattr(message, 'thread_id', None) + if thread_id: + cls._prune_processed_thread_quote_cache() + if thread_id in cls._processed_thread_quote_cache: + return None + cls._mark_thread_quote_processed(thread_id) + + return parent_id + + @staticmethod + def _build_event_message_from_message_item(message_item: Message) -> typing.Optional[EventMessage]: + """ + Build EventMessage from SDK typed Message item. + + Returns None if body or content is missing. + """ + body = getattr(message_item, 'body', None) + if not body: + return None + + content = getattr(body, 'content', None) + if not content: + return None + + event_data = { + 'message_id': message_item.message_id, + 'message_type': message_item.msg_type, + 'content': content, + 'create_time': message_item.create_time, + 'mentions': getattr(message_item, 'mentions', []) or [], + } + + # Preserve thread-related fields + if hasattr(message_item, 'parent_id') and message_item.parent_id: + event_data['parent_id'] = message_item.parent_id + if hasattr(message_item, 'root_id') and message_item.root_id: + event_data['root_id'] = message_item.root_id + if hasattr(message_item, 'thread_id') and message_item.thread_id: + event_data['thread_id'] = message_item.thread_id + if hasattr(message_item, 'chat_id') and message_item.chat_id: + event_data['chat_id'] = message_item.chat_id + + return EventMessage(event_data) + + @staticmethod + async def _fetch_quoted_message( + quote_message_id: str, + api_client: lark_oapi.Client, + ) -> typing.Optional[platform_message.MessageChain]: + """ + Fetch the quoted message and convert to MessageChain. + + Returns None if: + - API call fails + - Response items is empty + - Message item normalization fails + """ + request = GetMessageRequest.builder().message_id(quote_message_id).build() + response = await api_client.im.v1.message.aget(request) + + if not response.success(): + return None + + items = getattr(response.data, 'items', None) + if not items: + return None + + message_item = items[0] + event_message = LarkEventConverter._build_event_message_from_message_item(message_item) + if event_message is None: + return None + + quote_chain = await LarkMessageConverter.target2yiri(event_message, api_client) + return quote_chain + @staticmethod async def yiri2target( event: platform_events.MessageEvent, @@ -587,6 +708,23 @@ class LarkEventConverter(abstract_platform_adapter.AbstractEventConverter): ) -> platform_events.Event: message_chain = await LarkMessageConverter.target2yiri(event.event.message, api_client) + # Check for quote/reply message + quote_message_id = LarkEventConverter._extract_quote_message_id(event.event.message) + if quote_message_id: + quote_chain = await LarkEventConverter._fetch_quoted_message(quote_message_id, api_client) + if quote_chain: + # Filter out Source component from quoted chain, keep only content + quote_origin = platform_message.MessageChain( + [comp for comp in quote_chain if not isinstance(comp, platform_message.Source)] + ) + if quote_origin: + message_chain.append( + platform_message.Quote( + message_id=quote_message_id, + origin=quote_origin, + ) + ) + if event.event.message.chat_type == 'p2p': return platform_events.FriendMessage( sender=platform_entities.Friend( @@ -770,6 +908,32 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): self.request_tenant_access_token(tenant_key) return self.tenant_access_tokens.get(tenant_key)['token'] if self.tenant_access_tokens.get(tenant_key) else None + def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: + """ + Get topic-scoped launcher_id for thread-aware session isolation. + + For group thread messages, returns "{group_id}_{thread_id}" + to ensure conversation context stays stable per topic. + + Returns None for non-thread messages or P2P messages. + """ + source_event = getattr(event.source_platform_object, 'event', None) + if not source_event: + return None + + message = getattr(source_event, 'message', None) + if not message: + return None + + thread_id = getattr(message, 'thread_id', None) + if not thread_id: + return None + + if isinstance(event, platform_events.GroupMessage): + return f'{event.group.id}_{thread_id}' + + return None + def build_api_client(self, config): app_id = config['app_id'] app_secret = config['app_secret']