From 197e1179003e92555b42fff88dcab521c7002f2a Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 11 May 2026 12:00:24 +0800 Subject: [PATCH] feat(platform): add lark eba adapter --- docs/event-based-agents/adapters/00-index.md | 1 + .../adapters/acceptance-report.md | 71 +- docs/event-based-agents/adapters/lark.md | 135 ++++ .../pkg/platform/adapters/lark/__init__.py | 1 + .../pkg/platform/adapters/lark/adapter.py | 680 ++++++++++++++++++ .../pkg/platform/adapters/lark/api_impl.py | 103 +++ .../platform/adapters/lark/event_converter.py | 205 ++++++ .../pkg/platform/adapters/lark/lark.svg | 1 + .../pkg/platform/adapters/lark/manifest.yaml | 185 +++++ .../adapters/lark/message_converter.py | 405 +++++++++++ .../platform/adapters/lark/platform_api.py | 96 +++ .../pkg/platform/adapters/lark/types.py | 3 + .../platform/test_lark_eba_adapter.py | 326 +++++++++ 13 files changed, 2179 insertions(+), 33 deletions(-) create mode 100644 docs/event-based-agents/adapters/lark.md create mode 100644 src/langbot/pkg/platform/adapters/lark/__init__.py create mode 100644 src/langbot/pkg/platform/adapters/lark/adapter.py create mode 100644 src/langbot/pkg/platform/adapters/lark/api_impl.py create mode 100644 src/langbot/pkg/platform/adapters/lark/event_converter.py create mode 100644 src/langbot/pkg/platform/adapters/lark/lark.svg create mode 100644 src/langbot/pkg/platform/adapters/lark/manifest.yaml create mode 100644 src/langbot/pkg/platform/adapters/lark/message_converter.py create mode 100644 src/langbot/pkg/platform/adapters/lark/platform_api.py create mode 100644 src/langbot/pkg/platform/adapters/lark/types.py create mode 100644 tests/unit_tests/platform/test_lark_eba_adapter.py diff --git a/docs/event-based-agents/adapters/00-index.md b/docs/event-based-agents/adapters/00-index.md index a7bc1346..cc51b1c0 100644 --- a/docs/event-based-agents/adapters/00-index.md +++ b/docs/event-based-agents/adapters/00-index.md @@ -19,6 +19,7 @@ Current acceptance report: [EBA Adapter Acceptance Report](./acceptance-report.m | Discord | Migrated; partial plugin E2E, media-inbound gaps remain | [Discord](./discord.md) | | OneBot v11 / aiocqhttp | Migrated; Matcha UI plus protocol-level multi-component coverage | [OneBot v11 / aiocqhttp](./aiocqhttp.md) | | DingTalk | Migrated; partial plugin E2E, real UI inbound image/file verified; group gap remains | [DingTalk](./dingtalk.md) | +| Lark / Feishu | Migrated; partial live text E2E, media-inbound gap remains | [Lark / Feishu](./lark.md) | ## Documentation Checklist diff --git a/docs/event-based-agents/adapters/acceptance-report.md b/docs/event-based-agents/adapters/acceptance-report.md index 420fbd7e..d503d876 100644 --- a/docs/event-based-agents/adapters/acceptance-report.md +++ b/docs/event-based-agents/adapters/acceptance-report.md @@ -8,6 +8,7 @@ Scope: - `discord-eba` - `aiocqhttp-eba` - `dingtalk-eba` +- `lark-eba` This report follows `acceptance-checklist.md`. Evidence levels are intentionally strict: @@ -26,6 +27,7 @@ This report follows `acceptance-checklist.md`. Evidence levels are intentionally | Discord | Partial EBA acceptance | Real Discord UI covered group text, outbound image/file/quote/mention components, safe SDK APIs, and safe Discord platform APIs. Real UI inbound attachment/image/file/reply/mention was not completed. A later UI retry was blocked because the Discord client kept the send button disabled. | | OneBot v11 / aiocqhttp | Partial EBA acceptance | Matcha UI covered real group text and outbound supported components/APIs. Multi-component inbound `Source/Plain/At/Face/Image/Voice/File/Quote` was verified through the real OneBot reverse WebSocket adapter endpoint, but not through Matcha UI upload/send. Matcha blocks file-send and merged-forward APIs. | | DingTalk | Partial EBA acceptance | Real DingTalk UI covered private text, emoji-as-text inbound, private inbound image/file, outbound image/file/quote/mention fallback components, safe SDK APIs, and safe DingTalk platform APIs. Real UI inbound voice/quote and group trigger were not completed. | +| Lark / Feishu | Partial EBA acceptance | EBA adapter structure, self-built/store app config, WebSocket/Webhook mode handling, converters, common APIs, platform APIs, and unit tests are in place. One real LangBot organization WebSocket private text event reached `EBAEventProbe`; outbound component sweep was visible in Feishu. Latest real UI image/file sends did not reach local plugin evidence, so media receive remains blocked. | Telegram and DingTalk now have real user-side UI image/file upload evidence in plugin JSONL. Discord and aiocqhttp do not yet have real UI inbound image/file evidence. @@ -41,6 +43,8 @@ Telegram and DingTalk now have real user-side UI image/file upload evidence in p | aiocqhttp protocol | OneBot reverse WebSocket endpoint `127.0.0.1:2280/ws` | `data/temp/aiocqhttp-plugin-e2e-20260510-multiformat.jsonl` | | DingTalk | DingTalk Mac, `LangBot Team` org private chat | `data/temp/dingtalk-plugin-e2e-20260510-rerun.jsonl` | | DingTalk private media | DingTalk Mac, `LangBot Team` org private chat | `data/temp/dingtalk-plugin-e2e-media-ui.jsonl` | +| Lark / Feishu unit | local mocked Feishu SDK/client paths | `tests/unit_tests/platform/test_lark_eba_adapter.py` | +| Lark / Feishu partial live | Feishu Mac, LangBot organization `LangBotDev` private chat | `data/temp/lark-plugin-e2e-ws.jsonl` | All plugin runs used SDK standalone runtime ports `5400/5401`, LangBot `--standalone-runtime`, and the real plugin at `langbot-plugin-demo/EBAEventProbe`. @@ -48,44 +52,44 @@ All plugin runs used SDK standalone runtime ports `5400/5401`, LangBot `--standa All four adapters deliver common SDK entities to plugins before LangBot core/plugin logic handles the event. -| Requirement | Telegram | Discord | aiocqhttp | DingTalk | -|-------------|----------|---------|-----------|----------| -| `bot_uuid` filled | plugin-e2e | plugin-e2e | plugin-e2e | plugin-e2e | -| `adapter_name` filled | `telegram` | `discord` | `aiocqhttp` | `dingtalk` | -| common `MessageChain` delivered | `Plain`, group `At + Plain`, private `Image`, private `File` | `Source + Plain` | UI `Source + Plain`; protocol `Source + Plain + At + Face + Image + Voice + File + Quote + Plain` | `Source + Plain`, private `Source + Image`, private `Source + File` | -| common user/group entities | plugin-e2e | plugin-e2e | plugin-e2e | plugin-e2e private user; group not completed | -| raw native object isolation | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | +| Requirement | Telegram | Discord | aiocqhttp | DingTalk | Lark / Feishu | +|-------------|----------|---------|-----------|----------|---------------| +| `bot_uuid` filled | plugin-e2e | plugin-e2e | plugin-e2e | plugin-e2e | live plugin-e2e pending | +| `adapter_name` filled | `telegram` | `discord` | `aiocqhttp` | `dingtalk` | `lark-eba` in current unit/code; older live text evidence recorded `lark` before the naming fix | +| common `MessageChain` delivered | `Plain`, group `At + Plain`, private `Image`, private `File` | `Source + Plain` | UI `Source + Plain`; protocol `Source + Plain + At + Face + Image + Voice + File + Quote + Plain` | `Source + Plain`, private `Source + Image`, private `Source + File` | live private `Source + Plain`; unit `Source + Plain + At/Image/File`; latest live image/file blocked | +| common user/group entities | plugin-e2e | plugin-e2e | plugin-e2e | plugin-e2e private user; group not completed | live private user; unit private/group | +| raw native object isolation | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | raw data stays in `source_platform_object` | ## Message Receive Components -| Component | Telegram | Discord | aiocqhttp | DingTalk | -|-----------|----------|---------|-----------|----------| -| `Source` | design gap: event has message id but chain omits `Source` | plugin-e2e-ui | plugin-e2e-ui/protocol | plugin-e2e-ui | -| `Plain` | plugin-e2e-ui private/group | plugin-e2e-ui | plugin-e2e-ui/protocol | plugin-e2e-ui | -| `At` | plugin-e2e-ui group mention | unit; real UI mention not completed in latest run | plugin-e2e-protocol; unit | unit; group trigger not completed | -| `AtAll` | not-supported | unit only | unit only | unit/send fallback only | -| `Image` | plugin-e2e-ui private | converter/unit; real UI attachment not completed | plugin-e2e-protocol, not Matcha UI | plugin-e2e-ui private | -| `Voice` | converter/unit; real UI inbound not completed | not-supported as native voice; audio is attachment/file | plugin-e2e-protocol, not Matcha UI | converter/unit; real UI inbound not completed | -| `File` | plugin-e2e-ui private | converter/unit; real UI attachment not completed | plugin-e2e-protocol, not Matcha UI | plugin-e2e-ui private | -| `Quote` | converter/unit; real UI reply not completed | unit; real UI reply not completed | plugin-e2e-protocol | converter/unit; real UI quote not completed | -| `Face` | not-supported as common `Face` | not-supported as common `Face` | plugin-e2e-protocol | UI emoji becomes `Plain` (`[smile]` text), not `Face` | -| `Forward` | not-supported inbound | not-supported inbound | unit; Matcha forward UI/action blocked | not-supported inbound | -| Mixed chain | group `At + Plain`; media tested as separate messages | not completed inbound | plugin-e2e-protocol | media tested as separate messages; mixed inbound not completed | +| Component | Telegram | Discord | aiocqhttp | DingTalk | Lark / Feishu | +|-----------|----------|---------|-----------|----------|---------------| +| `Source` | design gap: event has message id but chain omits `Source` | plugin-e2e-ui | plugin-e2e-ui/protocol | plugin-e2e-ui | plugin-e2e-ui private text | +| `Plain` | plugin-e2e-ui private/group | plugin-e2e-ui | plugin-e2e-ui/protocol | plugin-e2e-ui | plugin-e2e-ui private text | +| `At` | plugin-e2e-ui group mention | unit; real UI mention not completed in latest run | plugin-e2e-protocol; unit | unit; group trigger not completed | unit; group trigger not completed | +| `AtAll` | not-supported | unit only | unit only | unit/send fallback only | unit only | +| `Image` | plugin-e2e-ui private | converter/unit; real UI attachment not completed | plugin-e2e-protocol, not Matcha UI | plugin-e2e-ui private | unit; real UI image sent but not observed in plugin evidence | +| `Voice` | converter/unit; real UI inbound not completed | not-supported as native voice; audio is attachment/file | plugin-e2e-protocol, not Matcha UI | converter/unit; real UI inbound not completed | unit; real UI inbound not completed | +| `File` | plugin-e2e-ui private | converter/unit; real UI attachment not completed | plugin-e2e-protocol, not Matcha UI | plugin-e2e-ui private | unit; real UI file sent but not observed in plugin evidence | +| `Quote` | converter/unit; real UI reply not completed | unit; real UI reply not completed | plugin-e2e-protocol | converter/unit; real UI quote not completed | unit/API-backed quote lookup; real UI quote not completed | +| `Face` | not-supported as common `Face` | not-supported as common `Face` | plugin-e2e-protocol | UI emoji becomes `Plain` (`[smile]` text), not `Face` | not-supported as common `Face` | +| `Forward` | not-supported inbound | not-supported inbound | unit; Matcha forward UI/action blocked | not-supported inbound | not-supported inbound | +| Mixed chain | group `At + Plain`; media tested as separate messages | not completed inbound | plugin-e2e-protocol | media tested as separate messages; mixed inbound not completed | unit only | ## Message Send Components -| Component | Telegram | Discord | aiocqhttp | DingTalk | -|-----------|----------|---------|-----------|----------| -| `Plain` | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | -| `At` | plugin-e2e-outbound equivalent | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound fallback/equivalent | -| `AtAll` | plugin-e2e-outbound fallback | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound fallback | -| `Image` | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | -| `Voice` | not-supported in current send converter | not-supported as native voice | converter path; not completed against Matcha UI | fallback as file/text depending DingTalk media support | -| `File` | plugin-e2e-outbound | plugin-e2e-outbound | blocked by Matcha endpoint error | plugin-e2e-outbound | -| `Quote` | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound fallback | -| `Face` | not-supported | not-supported | plugin-e2e-outbound attempted in mixed chain | fallback text | -| `Forward` | flattened fallback | flattened fallback | blocked by Matcha unsupported action | flattened fallback | -| Mixed chain | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound except blocked file/forward | plugin-e2e-outbound | +| Component | Telegram | Discord | aiocqhttp | DingTalk | Lark / Feishu | +|-----------|----------|---------|-----------|----------|---------------| +| `Plain` | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | +| `At` | plugin-e2e-outbound equivalent | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound fallback/equivalent | plugin-e2e-outbound | +| `AtAll` | plugin-e2e-outbound fallback | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound fallback | unit; group live not completed | +| `Image` | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | +| `Voice` | not-supported in current send converter | not-supported as native voice | converter path; not completed against Matcha UI | fallback as file/text depending DingTalk media support | converter path; live not completed | +| `File` | plugin-e2e-outbound | plugin-e2e-outbound | blocked by Matcha endpoint error | plugin-e2e-outbound | plugin-e2e-outbound | +| `Quote` | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound fallback | plugin-e2e-outbound fallback | +| `Face` | not-supported | not-supported | plugin-e2e-outbound attempted in mixed chain | fallback text | not-supported | +| `Forward` | flattened fallback | flattened fallback | blocked by Matcha unsupported action | flattened fallback | plugin-e2e-outbound flattened fallback | +| Mixed chain | plugin-e2e-outbound | plugin-e2e-outbound | plugin-e2e-outbound except blocked file/forward | plugin-e2e-outbound | plugin-e2e-outbound | ## Event Acceptance @@ -141,9 +145,10 @@ The probe logs set `ok=true` when the sweep completed with only expected unsuppo - Discord still requires real UI inbound image/file upload evidence before it can be called media-complete. - aiocqhttp has rich inbound component evidence only at the OneBot reverse WebSocket boundary; Matcha UI did not provide image/file upload coverage. - DingTalk group trigger remains unclosed; current evidence is private chat only. +- Lark / Feishu requires a clean follow-up live pass: the latest LangBot organization WebSocket run connected, but UI-sent text/image/file after the loop-scheduling fix did not append plugin events. - Discord UI retry on May 10, 2026 was blocked by the client keeping the send button disabled even after text was entered. - Destructive moderation and leave APIs are intentionally blocked until disposable users/groups are available. ## Conclusion -The EBA conversion path is implemented and partially proven for all four adapters. Telegram and DingTalk now have real UI private-chat image/file inbound evidence. Discord and aiocqhttp still have explicit UI-level media gaps, so the overall adapter set remains partial acceptance rather than production-complete media acceptance. +The EBA conversion path is implemented and partially proven for the migrated adapters. Telegram and DingTalk now have real UI private-chat image/file inbound evidence. Discord, aiocqhttp, and Lark / Feishu still have explicit UI-level media gaps, so the overall adapter set remains partial acceptance rather than production-complete media acceptance. diff --git a/docs/event-based-agents/adapters/lark.md b/docs/event-based-agents/adapters/lark.md new file mode 100644 index 00000000..c162284b --- /dev/null +++ b/docs/event-based-agents/adapters/lark.md @@ -0,0 +1,135 @@ +# Lark / Feishu EBA Adapter Migration Record + +Status: migrated with unit coverage and partial live plugin E2E. WebSocket text reached the standalone runtime once in the LangBot organization test app, but the latest real UI image/file inbound attempts did not reach the local adapter log, so media receive is not release-complete yet. + +Adapter directory: `src/langbot/pkg/platform/adapters/lark/` + +## What Changed + +The Lark/Feishu adapter now has an Event-Based Agents adapter package with: + +- `manifest.yaml` for adapter metadata, configuration, events, common APIs, platform-specific APIs, app type, and communication mode. +- `adapter.py` for self-built/store app token handling, WebSocket long connection startup, Webhook callback handling, card feedback, streaming-card replies, and EBA dispatch. +- `event_converter.py` for native Feishu events to common EBA events. +- `message_converter.py` for Feishu text/post/image/file/audio payloads to/from common `MessageChain` components. +- `api_impl.py` for common EBA API implementations. +- `platform_api.py` for Feishu-specific `call_platform_api` actions. + +The legacy `lark` adapter remains available while the EBA adapter is registered separately as `lark-eba`. + +## Configuration + +| Field | Required | Notes | +|-------|----------|-------| +| `app_id` | yes | Feishu/Lark application App ID. | +| `app_secret` | yes | Feishu/Lark application App Secret. | +| `bot_name` | yes | Must match the bot name so group mentions can be recognized. | +| `enable-webhook` | yes | `false` uses WebSocket long connection; `true` uses Request URL/Webhook callbacks. | +| `webhook_url` | no | Generated callback URL for Webhook mode. | +| `encrypt-key` | no | Webhook decrypt key when event encryption is enabled. | +| `enable-stream-reply` | yes | Enables streaming replies through an updating Feishu card. | +| `app_type` | no | `self` for self-built apps; `isv` for store apps. | +| `bot_added_welcome` | no | Optional group welcome message sent after bot-added events. | + +## Application And Communication Modes + +| Mode | Support | Implementation | +|------|---------|----------------| +| Self-built application | implemented | Uses standard app credentials and tenant token behavior from the Feishu SDK client. | +| Store application | implemented | Builds an ISV client, requests app tickets, and resolves app/tenant access tokens with per-tenant caching. | +| WebSocket long connection | implemented | Registers `im.message.receive_v1` and card-action callbacks through `lark_oapi.ws.Client`. | +| Webhook Request URL | implemented | Handles URL verification, encrypted payloads, message events, app-ticket events, bot-added events, and card-action feedback. | + +## Supported Events + +| Event | Support | Evidence | +|-------|---------|----------| +| `message.received` | implemented | Unit coverage for private and group native events to common EBA events. | +| `bot.invited_to_group` | implemented | Webhook bot-added event maps to common bot invite event and optional welcome send. | +| `platform.specific` | implemented | Unknown callback events are preserved as `platform.specific`. | +| `FeedbackEvent` | compatibility event | Card button feedback is still dispatched through the existing SDK `FeedbackEvent` type. | + +## Receive Components + +| Component | Support | Evidence | +|-----------|---------|----------| +| `Source` | supported | Unit coverage; live private text evidence. | +| `Plain` | supported | Text and post payloads convert to common text; live private text evidence. | +| `At` | supported | Feishu mentions map to common `At` with user ID and display name. | +| `AtAll` | supported | `user_id=all` maps to common `AtAll`. | +| `Image` | supported | Image payloads download through message resource API and map to common `Image`; real UI image send attempted, but not observed in local plugin evidence yet. | +| `Voice` | supported | Audio payloads download through message resource API and map to common `Voice`. | +| `File` | supported | File payloads download through message resource API and map to common `File`; real UI file send attempted, but not observed in local plugin evidence yet. | +| `Quote` | supported | Parent/thread reply lookup maps quoted content into common `Quote`. | +| `Face` | not native common mapping | Feishu emoji/stickers are not exposed as a portable common `Face` component here. | +| `Forward` | not-supported inbound | Feishu does not expose a portable structured forward event in this adapter. | + +## Send Components + +| Component | Support | Evidence | +|-----------|---------|----------| +| `Plain` | supported | Unit coverage; sends Feishu `text`. | +| `At` | supported | Unit coverage; sends Feishu `post` at element. | +| `AtAll` | supported | Unit coverage; sends Feishu `post` at-all element. | +| `Image` | supported | Uploads image resource and sends Feishu `image`. | +| `Voice` | supported | Uploads OPUS/audio resource and sends Feishu `audio`. | +| `File` | supported | Uploads file resource and sends Feishu `file`. | +| `Quote` | supported/fallback | Sends quote marker plus origin content. | +| `Face` | not-supported | No portable send mapping. | +| `Forward` | flattened fallback | Flattens forward nodes into text/media messages. | + +## Common APIs + +| API | Support | Notes | +|-----|---------|-------| +| `send_message` | supported | Supports private/open_id and group/chat_id targets; live plugin outbound component sweep produced visible Feishu messages. | +| `reply_message` | supported | Replies to the source Feishu message; fixed to recover the native Feishu message ID from legacy-wrapped source events. | +| `get_message` | cache-backed/API-backed | Returns cached inbound event where possible and converts uncached Feishu message API items into common `MessageReceivedEvent`. | +| `get_group_info` | supported | Uses cached group or Feishu chat metadata. | +| `get_group_member_info` | limited | Uses cached user data when available. | +| `get_user_info` | limited | Uses cached user data when available. | +| `get_file_url` | limited | Returns `file://` paths from downloaded inbound resources; remote Feishu resource download uses platform-specific API params. | +| `call_platform_api` | supported | See below. | + +## Platform-Specific APIs + +| Action | Support | Evidence | +|--------|---------|----------| +| `check_tenant_access_token` | supported | Unit coverage. | +| `refresh_app_access_token` | supported | Store-app token path implemented. | +| `refresh_tenant_access_token` | supported | Store-app tenant token path implemented. | +| `get_chat` | supported | Feishu chat metadata API wrapper. | +| `get_message` | supported | Feishu message API wrapper with JSON-safe return values for plugin calls. | +| `get_message_resource` | supported | Feishu message resource download wrapper. | + +## End-to-End Evidence + +Current code-level evidence: + +- `tests/unit_tests/platform/test_lark_eba_adapter.py` +- `PYTHONPATH=../langbot-plugin-sdk/src uv run pytest tests/unit_tests/platform/test_lark_eba_adapter.py -q` + +Live evidence collected on May 11, 2026: + +- Standalone runtime: `uv run lbp rt --ws-control-port 5400 --ws-debug-port 5401 --skip-deps-check` +- LangBot: `uv run main.py --standalone-runtime --debug` +- Plugin: `LangBot__EBAEventProbe` +- Feishu org/app: LangBot organization, `LangBotDev` private chat. +- Observed plugin JSONL: one private `MessageReceived` event with `Source + Plain`; plugin API probe then exercised bot discovery, bot info, `send_message`, outbound component sweep, storage/list APIs, and safe platform API calls. +- Real UI sends attempted after the fixes: private text, local file, and image/video image upload. These appeared in the Feishu client but did not append new `EBAEventProbe` records in the local JSONL during this run. +- Fixes from live testing: reply path now extracts the native Feishu `message_id` from legacy-wrapped source events; WebSocket callbacks are scheduled onto the adapter event loop instead of assuming the SDK callback has a running asyncio loop; platform API results are converted to JSON-safe values. + +Live E2E items still required before marking release-complete: + +- WebSocket self-built app in LangBot organization: repeat private text after callback-loop fix, plus private image/file/audio and group mention message received by `EBAEventProbe`. +- Webhook self-built app in LangBot organization: URL verification plus text/image/file message received by `EBAEventProbe`. +- Store app token path: at least token acquisition/tenant-token safe API through `call_platform_api`; full message E2E if a LangBot organization store-app fixture is available. +- Outbound component sweep: text, mention, at-all, image, file, voice where Feishu accepts the fixture, quote/fallback, and forward/fallback. +- Safe platform API sweep: token check, chat metadata, message lookup, and message resource download using real inbound IDs. + +## Known Limits + +- Store-app live E2E requires a real ISV app ticket/tenant installation fixture. +- Current LangBot organization WebSocket run connected successfully but did not deliver the latest UI-sent image/file attempts to local plugin evidence; this blocks release-complete media acceptance. +- Feishu native emoji/sticker semantics are not represented as common `Face`. +- Destructive org or chat mutations are not declared in this adapter. diff --git a/src/langbot/pkg/platform/adapters/lark/__init__.py b/src/langbot/pkg/platform/adapters/lark/__init__.py new file mode 100644 index 00000000..1b39cde8 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/__init__.py @@ -0,0 +1 @@ +"""Lark/Feishu EBA platform adapter.""" diff --git a/src/langbot/pkg/platform/adapters/lark/adapter.py b/src/langbot/pkg/platform/adapters/lark/adapter.py new file mode 100644 index 00000000..03e6a570 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/adapter.py @@ -0,0 +1,680 @@ +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import json +import time +import traceback +import typing +import uuid + +from Crypto.Cipher import AES +import lark_oapi +from lark_oapi.api.auth.v3 import ( + CreateAppAccessTokenRequest, + CreateAppAccessTokenRequestBody, + CreateAppAccessTokenResponse, + CreateTenantAccessTokenRequest, + CreateTenantAccessTokenRequestBody, + CreateTenantAccessTokenResponse, + ResendAppTicketRequest, + ResendAppTicketRequestBody, + ResendAppTicketResponse, +) +from lark_oapi.api.cardkit.v1 import ( + ContentCardElementRequest, + ContentCardElementRequestBody, + ContentCardElementResponse, + CreateCardRequest, + CreateCardRequestBody, + CreateCardResponse, +) +from lark_oapi.api.im.v1 import ( + CreateMessageRequest, + CreateMessageRequestBody, + CreateMessageResponse, + EventMessage, + EventSender, + P2ImMessageReceiveV1, + P2ImMessageReceiveV1Data, + ReplyMessageRequest, + ReplyMessageRequestBody, + ReplyMessageResponse, +) +import lark_oapi.ws.exception +import pydantic +import quart + +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger +from langbot.pkg.platform.adapters.lark.api_impl import LarkAPIMixin +from langbot.pkg.platform.adapters.lark.event_converter import LarkEventConverter +from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter +from langbot.pkg.platform.adapters.lark.platform_api import PLATFORM_API_MAP +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class AESCipher: + def __init__(self, key: str): + self.key = hashlib.sha256(self.str_to_bytes(key)).digest() + + @staticmethod + def str_to_bytes(data): + if isinstance(data, str): + return data.encode('utf8') + return data + + @staticmethod + def _unpad(value: bytes) -> bytes: + return value[: -value[len(value) - 1]] + + def decrypt_string(self, encrypted: str) -> str: + encrypted_bytes = base64.b64decode(encrypted) + iv = encrypted_bytes[: AES.block_size] + cipher = AES.new(self.key, AES.MODE_CBC, iv) + return self._unpad(cipher.decrypt(encrypted_bytes[AES.block_size :])).decode('utf8') + + +class LarkAdapter(LarkAPIMixin, abstract_platform_adapter.AbstractPlatformAdapter): + bot: lark_oapi.ws.Client = pydantic.Field(exclude=True) + api_client: lark_oapi.Client = pydantic.Field(exclude=True) + quart_app: quart.Quart = pydantic.Field(exclude=True) + cipher: AESCipher = pydantic.Field(exclude=True) + + config: dict + lark_tenant_key: str = pydantic.Field(exclude=True, default='') + app_ticket: str | None = None + app_access_token: str | None = None + app_access_token_expire_at: int | None = None + tenant_access_tokens: dict[str, dict[str, typing.Any]] = pydantic.Field(default_factory=dict) + bot_uuid: str | None = None + event_loop: asyncio.AbstractEventLoop | None = pydantic.Field(exclude=True, default=None) + + message_converter: LarkMessageConverter = LarkMessageConverter() + event_converter: LarkEventConverter = LarkEventConverter() + listeners: dict[ + typing.Type[platform_events.Event], + typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None], + ] = pydantic.Field(default_factory=dict) + card_id_dict: dict[str, str] = pydantic.Field(default_factory=dict) + pending_monitoring_msg: dict[str, str] = pydantic.Field(default_factory=dict) + reply_to_monitoring_msg: dict[str, tuple[str, float]] = pydantic.Field(default_factory=dict) + _message_cache: dict[str, platform_events.MessageReceivedEvent] = pydantic.PrivateAttr(default_factory=dict) + _user_cache: dict[str, platform_entities.User] = pydantic.PrivateAttr(default_factory=dict) + _group_cache: dict[str, platform_entities.UserGroup] = pydantic.PrivateAttr(default_factory=dict) + _monitoring_mapping_ttl: int = 600 + + class Config: + arbitrary_types_allowed = True + + def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs): + required_keys = ['app_id', 'app_secret', 'bot_name'] + missing_keys = [key for key in required_keys if not config.get(key)] + if missing_keys: + raise ValueError(f'Lark missing required config: {", ".join(missing_keys)}') + + api_client = self.build_api_client(config) + event_handler = self._build_event_handler() + bot = lark_oapi.ws.Client(config['app_id'], config['app_secret'], event_handler=event_handler) + cipher = AESCipher(config.get('encrypt-key', '')) + + super().__init__( + config=config, + logger=logger, + lark_tenant_key=config.get('lark_tenant_key', ''), + bot_account_id=config['bot_name'], + bot=bot, + api_client=api_client, + quart_app=quart.Quart(__name__), + cipher=cipher, + listeners={}, + card_id_dict={}, + pending_monitoring_msg={}, + reply_to_monitoring_msg={}, + event_loop=None, + **kwargs, + ) + self._message_cache = {} + self._user_cache = {} + self._group_cache = {} + self.request_app_ticket() + + def _build_event_handler(self): + async def on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): + await self._handle_message_event(event) + + def sync_on_message(event: lark_oapi.im.v1.P2ImMessageReceiveV1): + self._submit_coro(on_message(event)) + + def sync_on_card_action(event): + return self._handle_card_action_sync(event) + + return ( + lark_oapi.EventDispatcherHandler.builder('', '') + .register_p2_im_message_receive_v1(sync_on_message) + .register_p2_card_action_trigger(sync_on_card_action) + .build() + ) + + def get_supported_events(self) -> list[str]: + return ['message.received', 'bot.invited_to_group', 'platform.specific'] + + def get_supported_apis(self) -> list[str]: + return [ + 'send_message', + 'reply_message', + 'get_message', + 'get_group_info', + 'get_group_member_info', + 'get_user_info', + 'get_file_url', + 'call_platform_api', + ] + + def build_api_client(self, config: dict) -> lark_oapi.Client: + builder = lark_oapi.Client.builder().app_id(config['app_id']).app_secret(config['app_secret']) + if config.get('app_type', 'self') == 'isv': + builder = builder.app_type(lark_oapi.AppType.ISV) + return builder.build() + + def request_app_ticket(self): + if self.config.get('app_type', 'self') != 'isv': + return + request = ( + ResendAppTicketRequest.builder() + .request_body( + ResendAppTicketRequestBody.builder() + .app_id(self.config['app_id']) + .app_secret(self.config['app_secret']) + .build() + ) + .build() + ) + response: ResendAppTicketResponse = self.api_client.auth.v3.app_ticket.resend(request) + if not response.success(): + raise RuntimeError(f'Lark app_ticket resend failed: {response.code} {response.msg}') + + def request_app_access_token(self): + if self.config.get('app_type', 'self') != 'isv': + return + request = ( + CreateAppAccessTokenRequest.builder() + .request_body( + CreateAppAccessTokenRequestBody.builder() + .app_id(self.config['app_id']) + .app_secret(self.config['app_secret']) + .app_ticket(self.app_ticket) + .build() + ) + .build() + ) + response: CreateAppAccessTokenResponse = self.api_client.auth.v3.app_access_token.create(request) + if not response.success(): + raise RuntimeError(f'Lark app_access_token failed: {response.code} {response.msg}') + content = json.loads(response.raw.content) + self.app_access_token = content['app_access_token'] + self.app_access_token_expire_at = int(time.time()) + content['expire'] - 300 + + def get_app_access_token(self): + if self.config.get('app_type', 'self') != 'isv': + return None + if ( + self.app_access_token is None + or self.app_access_token_expire_at is None + or int(time.time()) >= self.app_access_token_expire_at + ): + self.request_app_access_token() + return self.app_access_token + + def request_tenant_access_token(self, tenant_key: str): + if self.config.get('app_type', 'self') != 'isv': + return + request = ( + CreateTenantAccessTokenRequest.builder() + .request_body( + CreateTenantAccessTokenRequestBody.builder() + .app_access_token(self.get_app_access_token()) + .tenant_key(tenant_key) + .build() + ) + .build() + ) + response: CreateTenantAccessTokenResponse = self.api_client.auth.v3.tenant_access_token.create(request) + if not response.success(): + raise RuntimeError(f'Lark tenant_access_token failed: {response.code} {response.msg}') + content = json.loads(response.raw.content) + self.tenant_access_tokens[tenant_key] = { + 'token': content['tenant_access_token'], + 'expire_at': int(time.time()) + content['expire'] - 300, + } + + def get_tenant_access_token(self, tenant_key: str | None): + if self.config.get('app_type', 'self') != 'isv' or not tenant_key: + return None + cached = self.tenant_access_tokens.get(tenant_key) + if cached is None or int(time.time()) >= cached['expire_at']: + self.request_tenant_access_token(tenant_key) + return self.tenant_access_tokens.get(tenant_key, {}).get('token') + + async def send_message( + self, + target_type: str, + target_id: str, + message: platform_message.MessageChain, + ) -> platform_events.MessageResult: + text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client) + receive_id_type = 'chat_id' if target_type == 'group' else 'open_id' + message_ids: list[str] = [] + + for msg_type, content in self._outbound_payloads(text_elements, media_items): + request = ( + CreateMessageRequest.builder() + .receive_id_type(receive_id_type) + .request_body( + CreateMessageRequestBody.builder() + .receive_id(str(target_id)) + .content(json.dumps(content, ensure_ascii=False)) + .msg_type(msg_type) + .uuid(str(uuid.uuid4())) + .build() + ) + .build() + ) + response: CreateMessageResponse = await self.api_client.im.v1.message.acreate(request) + if not response.success(): + raise RuntimeError(f'Lark send_message failed: {response.code} {response.msg}') + message_ids.append(getattr(response.data, 'message_id', '')) + + return platform_events.MessageResult( + message_id=message_ids[-1] if message_ids else '', raw={'message_ids': message_ids} + ) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ) -> platform_events.MessageResult: + text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client) + tenant_key = self._tenant_key_from_source(message_source) + message_ids: list[str] = [] + + for msg_type, content in self._outbound_payloads(text_elements, media_items): + request = ( + ReplyMessageRequest.builder() + .message_id(self._message_id_from_source(message_source)) + .request_body( + ReplyMessageRequestBody.builder() + .content(json.dumps(content, ensure_ascii=False)) + .msg_type(msg_type) + .reply_in_thread(False) + .uuid(str(uuid.uuid4())) + .build() + ) + .build() + ) + response: ReplyMessageResponse = await self.api_client.im.v1.message.areply( + request, self.request_option(tenant_key) + ) + if not response.success(): + raise RuntimeError(f'Lark reply_message failed: {response.code} {response.msg}') + message_ids.append(getattr(response.data, 'message_id', '')) + + return platform_events.MessageResult( + message_id=message_ids[-1] if message_ids else '', raw={'message_ids': message_ids} + ) + + def _outbound_payloads(self, text_elements: list[list[dict]], media_items: list[dict]) -> list[tuple[str, dict]]: + payloads: list[tuple[str, dict]] = [] + if text_elements: + needs_post = any(ele.get('tag') == 'at' for paragraph in text_elements for ele in paragraph) + if needs_post: + payloads.append(('post', {'zh_Hans': {'title': '', 'content': text_elements}})) + else: + parts = [] + for paragraph in text_elements: + text = ''.join(ele.get('text', '') for ele in paragraph) + if text: + parts.append(text) + payloads.append(('text', {'text': '\n\n'.join(parts)})) + for media in media_items: + payloads.append((media['msg_type'], media['content'])) + return payloads + + async def is_stream_output_supported(self) -> bool: + return bool(self.config.get('enable-stream-reply', False)) + + async def on_monitoring_message_created(self, query, monitoring_message_id: str): + user_msg_id = getattr(query.message_event, 'message_id', None) + if user_msg_id: + self.pending_monitoring_msg[str(user_msg_id)] = monitoring_message_id + + async def create_message_card(self, message_id, event) -> bool: + card_id = await self.create_card_id(message_id) + content = {'type': 'card', 'data': {'card_id': card_id, 'template_variable': {'content': 'Thinking...'}}} + request = ( + ReplyMessageRequest.builder() + .message_id(self._message_id_from_source(event)) + .request_body( + ReplyMessageRequestBody.builder().content(json.dumps(content)).msg_type('interactive').build() + ) + .build() + ) + response: ReplyMessageResponse = await self.api_client.im.v1.message.areply( + request, self.request_option(self._tenant_key_from_source(event)) + ) + if not response.success(): + raise RuntimeError(f'Lark create_message_card failed: {response.code} {response.msg}') + return True + + async def create_card_id(self, message_id) -> str: + card_data = { + 'schema': '2.0', + 'config': {'update_multi': True, 'streaming_mode': True}, + 'body': { + 'direction': 'vertical', + 'elements': [{'tag': 'markdown', 'content': '', 'element_id': 'streaming_txt'}], + }, + } + request = ( + CreateCardRequest.builder() + .request_body(CreateCardRequestBody.builder().type('card_json').data(json.dumps(card_data)).build()) + .build() + ) + response: CreateCardResponse = self.api_client.cardkit.v1.card.create(request) + if not response.success(): + raise RuntimeError(f'Lark create_card failed: {response.code} {response.msg}') + self.card_id_dict[str(message_id)] = response.data.card_id + return response.data.card_id + + async def reply_message_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message, + message: platform_message.MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ): + if bot_message.msg_sequence % 8 != 0 and not is_final: + return + text_elements, _ = await self.message_converter.yiri2target(message, self.api_client) + content = '\n\n'.join( + ''.join(ele.get('text', '') for ele in paragraph if ele.get('tag') in {'text', 'md'}) + for paragraph in text_elements + ) + request = ( + ContentCardElementRequest.builder() + .card_id(self.card_id_dict[bot_message.resp_message_id]) + .element_id('streaming_txt') + .request_body( + ContentCardElementRequestBody.builder().content(content).sequence(bot_message.msg_sequence).build() + ) + .build() + ) + response: ContentCardElementResponse = self.api_client.cardkit.v1.card_element.content( + request, self.request_option(self._tenant_key_from_source(message_source)) + ) + if not response.success(): + raise RuntimeError(f'Lark card_element update failed: {response.code} {response.msg}') + if is_final and bot_message.tool_calls is None: + self.card_id_dict.pop(bot_message.resp_message_id, None) + + async def call_platform_api(self, action: str, params: dict = {}) -> dict: + handler = PLATFORM_API_MAP.get(action) + if handler is None: + raise NotSupportedError(f'call_platform_api:{action}') + return await handler(self, params) + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + self.listeners[event_type] = callback + + def unregister_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None + ], + ): + if self.listeners.get(event_type) is callback: + self.listeners.pop(event_type, None) + + def set_bot_uuid(self, bot_uuid: str): + self.bot_uuid = bot_uuid + + def get_launcher_id(self, event: platform_events.MessageEvent) -> str | None: + source_event = getattr(event.source_platform_object, 'event', None) + message = getattr(source_event, 'message', None) if source_event else None + thread_id = getattr(message, 'thread_id', None) + if thread_id and isinstance(event, platform_events.MessageReceivedEvent) and event.group: + return f'{event.group.id}_{thread_id}' + return None + + async def handle_unified_webhook(self, bot_uuid: str, path: str, request): + try: + data = await request.json + if 'encrypt' in data: + data = json.loads(self.cipher.decrypt_string(data['encrypt'])) + event_type = self.get_event_type(data) + if event_type == 'url_verification': + return {'challenge': data.get('challenge')} + if event_type == 'app_ticket': + self.app_ticket = self._webhook_event(data).get('app_ticket') + return {'code': 200, 'message': 'ok'} + if event_type == 'im.message.receive_v1': + p2v1 = P2ImMessageReceiveV1() + p2v1.header = self._webhook_header(data) + event_data = P2ImMessageReceiveV1Data() + raw_event = self._webhook_event(data) + event_data.message = EventMessage(raw_event['message']) + event_data.sender = EventSender(raw_event['sender']) + p2v1.event = event_data + p2v1.schema = data.get('schema', '2.0') + await self._handle_message_event(p2v1) + return {'code': 200, 'message': 'ok'} + if event_type == 'im.chat.member.bot.added_v1': + raw_event = self._webhook_event(data) + header = self._webhook_header(data) + chat_id = raw_event.get('chat_id', '') + await self._send_bot_added_welcome(chat_id, getattr(header, 'tenant_key', None)) + await self._dispatch_eba_event(LarkEventConverter.bot_invited_to_group(data, chat_id)) + return {'code': 200, 'message': 'ok'} + if event_type == 'card.action.trigger': + feedback_event = self._feedback_event_from_webhook(data) + if feedback_event and platform_events.FeedbackEvent in self.listeners: + await self.listeners[platform_events.FeedbackEvent](feedback_event, self) + return {'toast': {'type': 'success', 'content': '感谢您的反馈'}} + await self._dispatch_eba_event(LarkEventConverter.platform_specific(data, event_type, data)) + return {'code': 200, 'message': 'ok'} + except Exception: + await self.logger.error(f'Error in lark webhook: {traceback.format_exc()}') + return {'code': 500, 'message': 'error'} + + def get_event_type(self, data: dict) -> str: + schema = data.get('schema', '1.0') + if schema == '2.0': + return data.get('header', {}).get('event_type', '') + if 'event' in data: + return data['event'].get('type', '') + return data.get('type', '') + + def _webhook_event(self, data: dict) -> dict: + return data.get('event', {}) + + def _webhook_header(self, data: dict): + return type('LarkWebhookHeader', (), data.get('header', {}))() + + async def run_async(self): + self.event_loop = asyncio.get_running_loop() + if not self.config.get('enable-webhook', False): + try: + await self.bot._connect() + except lark_oapi.ws.exception.ClientException: + raise + except Exception: + await self.bot._disconnect() + if self.bot._auto_reconnect: + await self.bot._reconnect() + else: + raise + else: + while True: + await asyncio.sleep(1) + + async def kill(self) -> bool: + self.bot._auto_reconnect = False + await self.bot._disconnect() + return True + + async def is_muted(self, group_id: int | None = None) -> bool: + return False + + async def _handle_message_event(self, event: lark_oapi.im.v1.P2ImMessageReceiveV1): + try: + if platform_events.FriendMessage in self.listeners or platform_events.GroupMessage in self.listeners: + legacy_event = await self.event_converter.target2legacy(event, self.api_client) + if legacy_event and type(legacy_event) in self.listeners: + await self.listeners[type(legacy_event)](legacy_event, self) + eba_event = await self.event_converter.target2yiri(event, self.api_client) + if eba_event: + self._cache_event(eba_event) + await self._dispatch_eba_event(eba_event) + except Exception: + await self.logger.error(f'Error in lark message event: {traceback.format_exc()}') + + async def _dispatch_eba_event(self, event: platform_events.Event): + for event_type in (type(event), platform_events.EBAEvent, platform_events.Event): + callback = self.listeners.get(event_type) + if callback: + await callback(event, self) + return + + def _cache_event(self, event: platform_events.Event): + if not isinstance(event, platform_events.MessageReceivedEvent): + return + self._message_cache[str(event.message_id)] = event + self._user_cache[str(event.sender.id)] = event.sender + if event.group: + self._group_cache[str(event.group.id)] = event.group + + def _handle_card_action_sync(self, event): + feedback_event = self._feedback_event_from_callback(event) + if feedback_event and platform_events.FeedbackEvent in self.listeners: + self._submit_coro(self.listeners[platform_events.FeedbackEvent](feedback_event, self)) + from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse + + return P2CardActionTriggerResponse({'toast': {'type': 'success', 'content': '感谢您的反馈'}}) + + def _submit_coro(self, coro): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = self.event_loop + if loop and loop.is_running(): + asyncio.run_coroutine_threadsafe(coro, loop) + return + coro.close() + raise + else: + loop.create_task(coro) + + def _feedback_event_from_callback(self, event) -> platform_events.FeedbackEvent | None: + value = getattr(getattr(event.event, 'action', None), 'value', {}) or {} + return self._feedback_event( + raw=event, + feedback_id=getattr(event.header, 'event_id', str(uuid.uuid4())), + feedback_value=value.get('feedback', ''), + user_id=getattr(getattr(event.event, 'operator', None), 'open_id', None), + chat_id=getattr(getattr(event.event, 'context', None), 'open_chat_id', None), + message_id=getattr(getattr(event.event, 'context', None), 'open_message_id', None), + ) + + def _feedback_event_from_webhook(self, data: dict) -> platform_events.FeedbackEvent | None: + event = data.get('event', {}) + value = event.get('action', {}).get('value', {}) or {} + operator = event.get('operator', {}) + context = event.get('context', {}) + return self._feedback_event( + raw=data, + feedback_id=data.get('header', {}).get('event_id', str(uuid.uuid4())), + feedback_value=value.get('feedback', ''), + user_id=operator.get('open_id') or operator.get('user_id'), + chat_id=context.get('open_chat_id'), + message_id=context.get('open_message_id'), + ) + + def _feedback_event( + self, + raw, + feedback_id: str, + feedback_value: str, + user_id: str | None, + chat_id: str | None, + message_id: str | None, + ) -> platform_events.FeedbackEvent | None: + if feedback_value == '有帮助': + feedback_type = 1 + elif feedback_value == '无帮助': + feedback_type = 2 + else: + return None + return platform_events.FeedbackEvent( + feedback_id=feedback_id, + feedback_type=feedback_type, + feedback_content=feedback_value, + user_id=user_id, + session_id=f'group_{chat_id}' if chat_id else (f'person_{user_id}' if user_id else None), + message_id=message_id, + stream_id=self.reply_to_monitoring_msg.get(message_id, (None, 0))[0] if message_id else None, + source_platform_object=raw, + ) + + async def _send_bot_added_welcome(self, chat_id: str, tenant_key: str | None): + welcome = self.config.get('bot_added_welcome', '') + if not welcome or not chat_id: + return + content = {'zh_Hans': {'title': '', 'content': [[{'tag': 'md', 'text': welcome}]]}} + request = ( + CreateMessageRequest.builder() + .receive_id_type('chat_id') + .request_body( + CreateMessageRequestBody.builder() + .receive_id(chat_id) + .content(json.dumps(content, ensure_ascii=False)) + .msg_type('post') + .uuid(str(uuid.uuid4())) + .build() + ) + .build() + ) + response: CreateMessageResponse = await self.api_client.im.v1.message.acreate( + request, self.request_option(tenant_key) + ) + if not response.success(): + await self.logger.warning(f'Lark bot_added_welcome failed: {response.code} {response.msg}') + + def _tenant_key_from_source(self, event: platform_events.Event) -> str | None: + source = getattr(event, 'source_platform_object', None) + header = getattr(source, 'header', None) + return getattr(header, 'tenant_key', None) + + def _message_id_from_source(self, event: platform_events.Event) -> str: + message_id = getattr(event, 'message_id', None) + if message_id: + return str(message_id) + source = getattr(event, 'source_platform_object', None) + source_event = getattr(source, 'event', None) + message = getattr(source_event, 'message', None) if source_event else None + message_id = getattr(message, 'message_id', None) + if message_id: + return str(message_id) + raise RuntimeError('Lark message source does not contain message_id') diff --git a/src/langbot/pkg/platform/adapters/lark/api_impl.py b/src/langbot/pkg/platform/adapters/lark/api_impl.py new file mode 100644 index 00000000..9bcf11af --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/api_impl.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import typing + +from lark_oapi.api.im.v1 import GetChatRequest, GetMessageRequest +from lark_oapi.core.model import RequestOption + +from langbot.pkg.platform.adapters.lark.event_converter import LarkEventConverter +from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform.errors import NotSupportedError + + +class LarkAPIMixin: + _message_cache: dict[str, platform_events.MessageReceivedEvent] + _user_cache: dict[str, platform_entities.User] + _group_cache: dict[str, platform_entities.UserGroup] + + async def get_message( + self, + chat_type: str, + chat_id: typing.Union[int, str], + message_id: typing.Union[int, str], + ) -> platform_events.MessageReceivedEvent: + cached = self._message_cache.get(str(message_id)) + if cached: + return cached + request = GetMessageRequest.builder().message_id(str(message_id)).build() + response = await self.api_client.im.v1.message.aget(request, self.request_option(None)) + if not response.success(): + raise NotSupportedError(f'get_message:{message_id}') + items = getattr(response.data, 'items', None) or [] + if not items: + raise NotSupportedError(f'get_message:{message_id}') + event_message = LarkEventConverter._build_event_message_from_message_item(items[0]) + if event_message is None: + raise NotSupportedError(f'get_message:{message_id}') + message_chain = await LarkMessageConverter.target2yiri(event_message, self.api_client) + event = platform_events.MessageReceivedEvent( + type='message.received', + adapter_name='lark-eba', + message_id=str(message_id), + message_chain=message_chain, + sender=platform_entities.User(id=''), + chat_type=platform_entities.ChatType.GROUP if chat_type == 'group' else platform_entities.ChatType.PRIVATE, + chat_id=chat_id, + group=platform_entities.UserGroup(id=chat_id, name='') if chat_type == 'group' else None, + timestamp=0, + source_platform_object=items[0], + ) + self._message_cache[str(message_id)] = event + return event + + async def get_group_info(self, group_id: typing.Union[int, str]) -> platform_entities.UserGroup: + cached = self._group_cache.get(str(group_id)) + if cached: + return cached + request = GetChatRequest.builder().chat_id(str(group_id)).build() + response = await self.api_client.im.v1.chat.aget(request, self.request_option(None)) + if not response.success(): + raise NotSupportedError(f'get_group_info:{group_id}') + data = response.data + group = platform_entities.UserGroup( + id=getattr(data, 'chat_id', group_id), + name=getattr(data, 'name', '') or '', + description=getattr(data, 'description', None), + avatar_url=getattr(data, 'avatar', None), + owner_id=getattr(data, 'owner_id', None), + ) + self._group_cache[str(group.id)] = group + return group + + async def get_group_member_info( + self, + group_id: typing.Union[int, str], + user_id: typing.Union[int, str], + ) -> platform_entities.UserGroupMember: + user = self._user_cache.get(str(user_id)) or platform_entities.User(id=user_id) + return platform_entities.UserGroupMember(user=user, group_id=group_id, role=platform_entities.MemberRole.MEMBER) + + async def get_user_info(self, user_id: typing.Union[int, str]) -> platform_entities.User: + cached = self._user_cache.get(str(user_id)) + if cached: + return cached + return platform_entities.User(id=user_id) + + async def get_file_url(self, file_id: str) -> str: + if str(file_id).startswith('file://'): + return str(file_id) + raise NotSupportedError('get_file_url requires a file:// path or platform-specific resource download params') + + def request_option(self, tenant_key: str | None) -> RequestOption: + app_access_token = self.get_app_access_token() + tenant_access_token = self.get_tenant_access_token(tenant_key) + return ( + RequestOption.builder() + .app_ticket(self.app_ticket) + .tenant_key(tenant_key) + .app_access_token(app_access_token) + .tenant_access_token(tenant_access_token) + .build() + ) diff --git a/src/langbot/pkg/platform/adapters/lark/event_converter.py b/src/langbot/pkg/platform/adapters/lark/event_converter.py new file mode 100644 index 00000000..d76a03ab --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/event_converter.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +import time +import typing + +import lark_oapi +from lark_oapi.api.im.v1 import EventMessage, GetMessageRequest, Message + +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter +from langbot.pkg.platform.adapters.lark.types import ADAPTER_NAME +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +class LarkEventConverter(abstract_platform_adapter.AbstractEventConverter): + _processed_thread_quote_cache: typing.ClassVar[dict[str, float]] = {} + _processed_thread_quote_cache_max_size: typing.ClassVar[int] = 4096 + _processed_thread_quote_cache_ttl_seconds: typing.ClassVar[int] = 86400 + + @staticmethod + async def yiri2target(event: platform_events.Event): + return getattr(event, 'source_platform_object', None) + + @staticmethod + async def target2yiri( + event: lark_oapi.im.v1.P2ImMessageReceiveV1, + api_client: lark_oapi.Client, + ) -> platform_events.Event | None: + return await LarkEventConverter.message_to_eba(event, api_client) + + @staticmethod + async def target2legacy( + event: lark_oapi.im.v1.P2ImMessageReceiveV1, + api_client: lark_oapi.Client, + ) -> platform_events.FriendMessage | platform_events.GroupMessage | None: + eba_event = await LarkEventConverter.message_to_eba(event, api_client) + if eba_event: + return eba_event.to_legacy_event() + return None + + @staticmethod + async def message_to_eba( + event: lark_oapi.im.v1.P2ImMessageReceiveV1, + api_client: lark_oapi.Client, + ) -> platform_events.MessageReceivedEvent: + message = event.event.message + message_chain = await LarkMessageConverter.target2yiri(message, api_client) + await LarkEventConverter._append_quote_content(message, message_chain, api_client) + + sender = LarkEventConverter.user_from_event(event) + chat_type = platform_entities.ChatType.PRIVATE + chat_id = LarkEventConverter.sender_id(event) + group = None + if getattr(message, 'chat_type', '') == 'group': + chat_type = platform_entities.ChatType.GROUP + chat_id = getattr(message, 'chat_id', '') or chat_id + group = platform_entities.UserGroup(id=chat_id, name='') + + return platform_events.MessageReceivedEvent( + type='message.received', + adapter_name=ADAPTER_NAME, + message_id=getattr(message, 'message_id', ''), + message_chain=message_chain, + sender=sender, + chat_type=chat_type, + chat_id=chat_id, + group=group, + timestamp=LarkEventConverter._timestamp(getattr(message, 'create_time', None)), + source_platform_object=event, + ) + + @staticmethod + def user_from_event(event: lark_oapi.im.v1.P2ImMessageReceiveV1) -> platform_entities.User: + sender_id = getattr(getattr(event.event.sender, 'sender_id', None), 'open_id', '') or '' + union_id = getattr(getattr(event.event.sender, 'sender_id', None), 'union_id', '') or '' + return platform_entities.User(id=sender_id, nickname=union_id) + + @staticmethod + def sender_id(event: lark_oapi.im.v1.P2ImMessageReceiveV1) -> str: + return getattr(getattr(event.event.sender, 'sender_id', None), 'open_id', '') or '' + + @staticmethod + def bot_invited_to_group( + raw_event: typing.Any, + chat_id: str, + operator_id: str | None = None, + ) -> platform_events.BotInvitedToGroupEvent: + return platform_events.BotInvitedToGroupEvent( + type='bot.invited_to_group', + adapter_name=ADAPTER_NAME, + group=platform_entities.UserGroup(id=chat_id, name=''), + inviter=platform_entities.User(id=operator_id) if operator_id else None, + timestamp=time.time(), + source_platform_object=raw_event, + ) + + @staticmethod + def platform_specific( + raw_event: typing.Any, action: str, data: dict | None = None + ) -> platform_events.PlatformSpecificEvent: + return platform_events.PlatformSpecificEvent( + type='platform.specific', + adapter_name=ADAPTER_NAME, + action=action, + data=data or {}, + timestamp=time.time(), + source_platform_object=raw_event, + ) + + @classmethod + def _prune_processed_thread_quote_cache(cls, now: float | None = None) -> None: + if now is None: + now = time.time() + expire_before = now - cls._processed_thread_quote_cache_ttl_seconds + while cls._processed_thread_quote_cache: + oldest_key, oldest_ts = next(iter(cls._processed_thread_quote_cache.items())) + if oldest_ts >= expire_before: + break + cls._processed_thread_quote_cache.pop(oldest_key, None) + while len(cls._processed_thread_quote_cache) > cls._processed_thread_quote_cache_max_size: + cls._processed_thread_quote_cache.pop(next(iter(cls._processed_thread_quote_cache)), None) + + @classmethod + def _extract_quote_message_id(cls, message: EventMessage) -> str | None: + parent_id = getattr(message, 'parent_id', None) + if not parent_id or parent_id == getattr(message, 'message_id', None): + return None + thread_id = getattr(message, 'thread_id', None) + if thread_id: + cls._prune_processed_thread_quote_cache() + if thread_id in cls._processed_thread_quote_cache: + return None + cls._processed_thread_quote_cache[thread_id] = time.time() + return parent_id + + @staticmethod + async def _append_quote_content( + message: EventMessage, + message_chain: platform_message.MessageChain, + api_client: lark_oapi.Client, + ) -> None: + quote_message_id = LarkEventConverter._extract_quote_message_id(message) + if not quote_message_id: + return + quote_chain = await LarkEventConverter._fetch_quoted_message(quote_message_id, api_client) + if not quote_chain: + return + origin = platform_message.MessageChain( + [comp for comp in quote_chain if not isinstance(comp, platform_message.Source)] + ) + message_chain.append( + platform_message.Quote( + id=quote_message_id, + group_id=getattr(message, 'chat_id', None), + target_id=getattr(message, 'chat_id', None), + origin=origin, + ) + ) + + @staticmethod + async def _fetch_quoted_message( + quote_message_id: str, + api_client: lark_oapi.Client, + ) -> platform_message.MessageChain | None: + request = GetMessageRequest.builder().message_id(quote_message_id).build() + response = await api_client.im.v1.message.aget(request) + if not response.success() or not getattr(response.data, 'items', None): + return None + event_message = LarkEventConverter._build_event_message_from_message_item(response.data.items[0]) + if event_message is None: + return None + return await LarkMessageConverter.target2yiri(event_message, api_client) + + @staticmethod + def _build_event_message_from_message_item(message_item: Message) -> EventMessage | None: + body = getattr(message_item, 'body', None) + content = getattr(body, 'content', None) if body else None + if not content: + return None + event_data = { + 'message_id': message_item.message_id, + 'message_type': message_item.msg_type, + 'content': content, + 'create_time': message_item.create_time, + 'mentions': getattr(message_item, 'mentions', []) or [], + } + for key in ('parent_id', 'root_id', 'thread_id', 'chat_id'): + value = getattr(message_item, key, None) + if value: + event_data[key] = value + return EventMessage(event_data) + + @staticmethod + def _timestamp(value: typing.Any) -> float: + if isinstance(value, (int, float, str)): + try: + timestamp = float(value) + return timestamp / 1000 if timestamp > 10_000_000_000 else timestamp + except ValueError: + pass + if hasattr(value, 'timestamp'): + return float(value.timestamp()) + return 0.0 diff --git a/src/langbot/pkg/platform/adapters/lark/lark.svg b/src/langbot/pkg/platform/adapters/lark/lark.svg new file mode 100644 index 00000000..bf3c202a --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/lark.svg @@ -0,0 +1 @@ + diff --git a/src/langbot/pkg/platform/adapters/lark/manifest.yaml b/src/langbot/pkg/platform/adapters/lark/manifest.yaml new file mode 100644 index 00000000..84770209 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/manifest.yaml @@ -0,0 +1,185 @@ +apiVersion: v1 +kind: MessagePlatformAdapter + +metadata: + name: lark-eba + label: + en_US: Lark / Feishu (EBA) + zh_Hans: 飞书 (EBA) + zh_Hant: 飛書 (EBA) + ja_JP: Lark (EBA) + description: + en_US: Lark/Feishu adapter (EBA architecture), supporting self-built/store apps and WebSocket/Webhook modes. + zh_Hans: 飞书适配器(EBA 架构版本),支持自建/商店应用和长连接/Webhook 两种通信模式。 + zh_Hant: 飛書適配器(EBA 架構版本),支援自建/商店應用和長連線/Webhook 兩種通訊模式。 + ja_JP: Lark アダプター(EBA アーキテクチャ)、カスタム/ストアアプリと WebSocket/Webhook モードをサポートします。 + icon: lark.svg + +spec: + categories: + - popular + - china + - global + help_links: + zh: https://link.langbot.app/zh/platforms/lark + en: https://link.langbot.app/en/platforms/lark + ja: https://link.langbot.app/ja/platforms/lark + config: + - name: app_id + label: + en_US: App ID + zh_Hans: 应用ID + zh_Hant: 應用ID + ja_JP: アプリ ID + type: string + required: true + default: "" + - name: app_secret + label: + en_US: App Secret + zh_Hans: 应用密钥 + zh_Hant: 應用密鑰 + ja_JP: アプリシークレット + type: string + required: true + default: "" + - name: bot_name + label: + en_US: Bot Name + zh_Hans: 机器人名称 + zh_Hant: 機器人名稱 + ja_JP: ボット名 + description: + en_US: Must match the Lark bot name so group mentions can be recognized. + zh_Hans: 必须与飞书机器人名称一致,否则机器人将无法在群内正常识别 @。 + zh_Hant: 必須與飛書機器人名稱一致,否則機器人將無法在群組內正常識別 @。 + ja_JP: グループメンションを認識するには Lark のボット名と一致する必要があります。 + type: string + required: true + default: "" + - name: enable-webhook + label: + en_US: Enable Webhook Mode + zh_Hans: 启用 Webhook 模式 + zh_Hant: 啟用 Webhook 模式 + ja_JP: Webhook モードを有効化 + description: + en_US: Enable request URL callback mode. Disable it to use WebSocket long connection mode. + zh_Hans: 启用 Request URL 回调模式。关闭时使用 WebSocket 长连接模式。 + zh_Hant: 啟用 Request URL 回調模式。關閉時使用 WebSocket 長連線模式。 + ja_JP: Request URL コールバックモードを有効化します。無効時は WebSocket 長期接続を使用します。 + type: boolean + required: true + default: false + - name: webhook_url + label: + en_US: Webhook Callback URL + zh_Hans: Webhook 回调地址 + zh_Hant: Webhook 回調地址 + ja_JP: Webhook コールバック URL + description: + en_US: Copy this URL to the Lark app event subscription request URL. + zh_Hans: 复制此地址并粘贴到飞书应用事件订阅的 Request URL 中。 + zh_Hant: 複製此地址並貼到飛書應用事件訂閱的 Request URL 中。 + ja_JP: この URL を Lark アプリのイベント購読 Request URL に貼り付けてください。 + type: webhook-url + required: false + default: "" + show_if: + field: enable-webhook + operator: eq + value: true + - name: encrypt-key + label: + en_US: Encrypt Key + zh_Hans: 加密密钥 + zh_Hant: 加密密鑰 + ja_JP: 暗号化キー + type: string + required: false + default: "" + show_if: + field: enable-webhook + operator: eq + value: true + - name: enable-stream-reply + label: + en_US: Enable Stream Reply Mode + zh_Hans: 启用飞书流式回复模式 + zh_Hant: 啟用飛書串流回覆模式 + ja_JP: ストリーミング返信モードを有効化 + description: + en_US: If enabled, replies are rendered through an updating Lark card. + zh_Hans: 如果启用,将使用可更新的飞书卡片进行流式回复。 + zh_Hant: 如果啟用,將使用可更新的飛書卡片進行串流回覆。 + ja_JP: 有効にすると、更新可能な Lark カードでストリーミング返信します。 + type: boolean + required: true + default: false + - name: app_type + label: + en_US: App Type + zh_Hans: 应用类型 + zh_Hant: 應用類型 + ja_JP: アプリタイプ + type: select + options: + - name: self + label: + en_US: Self-built Application + zh_Hans: 自建应用 + zh_Hant: 自建應用 + ja_JP: カスタムアプリ + - name: isv + label: + en_US: Store Application + zh_Hans: 商店应用 + zh_Hant: 商店應用 + ja_JP: ストアアプリ + required: false + default: self + - name: bot_added_welcome + label: + en_US: Bot Welcome Message + zh_Hans: 机器人进群欢迎语 + zh_Hant: 機器人進群歡迎語 + ja_JP: ボット参加時のウェルカムメッセージ + type: text + required: false + default: "" + + supported_events: + - message.received + - bot.invited_to_group + - platform.specific + + supported_apis: + required: + - send_message + - reply_message + optional: + - get_message + - get_group_info + - get_group_member_info + - get_user_info + - get_file_url + - call_platform_api + + platform_specific_apis: + - action: check_tenant_access_token + description: { en_US: "Check whether the tenant access token can be obtained", zh_Hans: "检查 tenant access token 是否可获取" } + - action: refresh_app_access_token + description: { en_US: "Refresh store-app app access token", zh_Hans: "刷新商店应用 app access token" } + - action: refresh_tenant_access_token + description: { en_US: "Refresh store-app tenant access token", zh_Hans: "刷新商店应用 tenant access token" } + - action: get_chat + description: { en_US: "Get Lark chat metadata", zh_Hans: "获取飞书会话信息" } + - action: get_message + description: { en_US: "Get a Lark message", zh_Hans: "获取飞书消息" } + - action: get_message_resource + description: { en_US: "Download message image/file resource", zh_Hans: "下载消息图片/文件资源" } + +execution: + python: + path: ./adapter.py + attr: LarkAdapter diff --git a/src/langbot/pkg/platform/adapters/lark/message_converter.py b/src/langbot/pkg/platform/adapters/lark/message_converter.py new file mode 100644 index 00000000..63dc64f0 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/message_converter.py @@ -0,0 +1,405 @@ +from __future__ import annotations + +import base64 +import datetime +import json +import mimetypes +import os +import re +import tempfile +import traceback + +import lark_oapi +from lark_oapi.api.im.v1 import ( + CreateFileRequest, + CreateFileRequestBody, + CreateImageRequest, + CreateImageRequestBody, + EventMessage, + GetMessageResourceRequest, + GetMessageResourceResponse, +) + +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +from langbot.pkg.utils import httpclient +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter): + @staticmethod + async def upload_image_to_lark(msg: platform_message.Image, api_client: lark_oapi.Client) -> str | None: + image_bytes = await LarkMessageConverter._get_component_bytes(msg) + if image_bytes is None: + return None + + temp_file_path = '' + try: + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(image_bytes) + temp_file.flush() + temp_file_path = temp_file.name + + request = ( + CreateImageRequest.builder() + .request_body( + CreateImageRequestBody.builder().image_type('message').image(open(temp_file_path, 'rb')).build() + ) + .build() + ) + response = await api_client.im.v1.image.acreate(request) + if not response.success(): + return None + return response.data.image_key + except Exception: + traceback.print_exc() + return None + finally: + if temp_file_path: + try: + os.unlink(temp_file_path) + except FileNotFoundError: + pass + + @staticmethod + async def upload_file_to_lark( + file_bytes: bytes, + api_client: lark_oapi.Client, + file_type: str, + file_name: str = 'file', + duration: int | None = None, + ) -> str | None: + temp_file_path = '' + try: + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(file_bytes) + temp_file.flush() + temp_file_path = temp_file.name + + body_builder = ( + CreateFileRequestBody.builder() + .file_type(file_type) + .file_name(file_name) + .file(open(temp_file_path, 'rb')) + ) + if duration is not None: + body_builder = body_builder.duration(duration) + + request = CreateFileRequest.builder().request_body(body_builder.build()).build() + response = await api_client.im.v1.file.acreate(request) + if not response.success(): + return None + return response.data.file_key + except Exception: + traceback.print_exc() + return None + finally: + if temp_file_path: + try: + os.unlink(temp_file_path) + except FileNotFoundError: + pass + + @staticmethod + async def _get_component_bytes( + msg: platform_message.Image | platform_message.Voice | platform_message.File, + ) -> bytes | None: + if getattr(msg, 'base64', None): + try: + base64_data = msg.base64 + if ',' in base64_data: + base64_data = base64_data.split(',', 1)[1] + return base64.b64decode(base64_data) + except Exception: + return None + if getattr(msg, 'url', None): + try: + if str(msg.url).startswith('file://'): + with open(str(msg.url)[7:], 'rb') as f: + return f.read() + session = httpclient.get_session() + async with session.get(msg.url) as response: + if response.status == 200: + return await response.read() + except Exception: + return None + if getattr(msg, 'path', None): + try: + with open(msg.path, 'rb') as f: + return f.read() + except Exception: + return None + return None + + @staticmethod + def _lark_file_type(file_name: str) -> str: + ext = os.path.splitext(file_name)[1].lstrip('.').lower() + return { + 'opus': 'opus', + 'mp4': 'mp4', + 'pdf': 'pdf', + 'doc': 'doc', + 'docx': 'doc', + 'xls': 'xls', + 'xlsx': 'xls', + 'ppt': 'ppt', + 'pptx': 'ppt', + }.get(ext, 'stream') + + @staticmethod + async def yiri2target( + message_chain: platform_message.MessageChain, + api_client: lark_oapi.Client, + ) -> tuple[list[list[dict]], list[dict]]: + message_elements: list[list[dict]] = [] + media_items: list[dict] = [] + pending_paragraph: list[dict] = [] + markdown_image_pattern = re.compile(r'!\[([^\]]*)\]\(([^)]+)\)') + + async def process_text_with_images(text: str) -> tuple[str, list[str]]: + matches = list(markdown_image_pattern.finditer(text)) + if not matches: + return text, [] + cleaned_text = text + extracted_urls: list[str] = [] + for match in reversed(matches): + extracted_urls.insert(0, match.group(2)) + cleaned_text = cleaned_text[: match.start()] + cleaned_text[match.end() :] + cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text).strip() + return cleaned_text, extracted_urls + + for msg in message_chain: + if isinstance(msg, platform_message.Source): + continue + if isinstance(msg, platform_message.Plain): + cleaned_text, extracted_urls = await process_text_with_images(msg.text) + if cleaned_text: + segments = re.split(r'\n\s*\n', cleaned_text) + for i, segment in enumerate(segments): + segment = segment.strip() + if not segment: + continue + if i > 0 and pending_paragraph: + message_elements.append(pending_paragraph) + pending_paragraph = [] + pending_paragraph.append({'tag': 'md', 'text': segment}) + for url in extracted_urls: + image_key = await LarkMessageConverter.upload_image_to_lark( + platform_message.Image(url=url), api_client + ) + if image_key: + media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}}) + elif isinstance(msg, platform_message.At): + pending_paragraph.append({'tag': 'at', 'user_id': str(msg.target), 'style': []}) + elif isinstance(msg, platform_message.AtAll): + pending_paragraph.append({'tag': 'at', 'user_id': 'all', 'style': []}) + elif isinstance(msg, platform_message.Image): + image_key = await LarkMessageConverter.upload_image_to_lark(msg, api_client) + if image_key: + media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}}) + elif isinstance(msg, platform_message.Voice): + data = await LarkMessageConverter._get_component_bytes(msg) + if data: + duration = int(msg.length * 1000) if msg.length else None + file_key = await LarkMessageConverter.upload_file_to_lark( + data, api_client, file_type='opus', file_name='voice.opus', duration=duration + ) + if file_key: + media_items.append({'msg_type': 'audio', 'content': {'file_key': file_key}}) + elif isinstance(msg, platform_message.File): + data = await LarkMessageConverter._get_component_bytes(msg) + if data: + file_name = msg.name or 'file' + file_key = await LarkMessageConverter.upload_file_to_lark( + data, + api_client, + file_type=LarkMessageConverter._lark_file_type(file_name), + file_name=file_name, + ) + if file_key: + media_items.append({'msg_type': 'file', 'content': {'file_key': file_key}}) + elif isinstance(msg, platform_message.Quote): + if msg.id: + pending_paragraph.append({'tag': 'md', 'text': f'[引用消息 {msg.id}] '}) + if msg.origin: + sub_elements, sub_media = await LarkMessageConverter.yiri2target(msg.origin, api_client) + message_elements.extend(sub_elements) + media_items.extend(sub_media) + elif isinstance(msg, platform_message.Forward): + for node in msg.node_list: + if node.sender_name or node.sender_id: + pending_paragraph.append({'tag': 'md', 'text': f'\n[{node.sender_name or node.sender_id}] '}) + sub_elements, sub_media = await LarkMessageConverter.yiri2target(node.message_chain, api_client) + message_elements.extend(sub_elements) + media_items.extend(sub_media) + + if pending_paragraph: + message_elements.append(pending_paragraph) + + return message_elements, media_items + + @staticmethod + async def target2yiri( + message: EventMessage, + api_client: lark_oapi.Client, + ) -> platform_message.MessageChain: + message_content = json.loads(message.content or '{}') + create_time = LarkMessageConverter._message_time(message) + components: list[platform_message.MessageComponent] = [ + platform_message.Source(id=message.message_id, time=create_time) + ] + + normalized = LarkMessageConverter._normalize_inbound_content(message, message_content) + for ele in normalized: + tag = ele.get('tag') + if tag in {'text', 'md'}: + text = ele.get('text') or '' + if text: + components.append(platform_message.Plain(text=text)) + elif tag == 'at': + user_id = ele.get('user_id') or ele.get('user_name') or '' + display = ele.get('user_name') or user_id + if user_id == 'all': + components.append(platform_message.AtAll()) + else: + components.append(platform_message.At(target=user_id, display=display)) + elif tag == 'img': + image_key = ele.get('image_key') or '' + image = await LarkMessageConverter._download_resource( + api_client, message.message_id, image_key, 'image' + ) + components.append(platform_message.Image(image_id=image_key, **image)) + elif tag == 'audio': + file_key = ele.get('file_key') or '' + audio = await LarkMessageConverter._download_resource(api_client, message.message_id, file_key, 'file') + components.append( + platform_message.Voice( + voice_id=file_key, + length=(ele.get('duration', 0) // 1000) if ele.get('duration') else None, + **audio, + ) + ) + elif tag == 'file': + file_key = ele.get('file_key') or '' + file_name = ele.get('file_name') or 'file' + file_data = await LarkMessageConverter._download_resource( + api_client, message.message_id, file_key, 'file' + ) + components.append( + platform_message.File( + id=file_key, + name=file_name, + size=file_data.pop('size', 0), + **file_data, + ) + ) + + return platform_message.MessageChain(components) + + @staticmethod + def _normalize_inbound_content(message: EventMessage, content: dict) -> list[dict]: + if message.message_type == 'text': + text = content.get('text', '') + return LarkMessageConverter._split_text_mentions(text, getattr(message, 'mentions', []) or []) + if message.message_type == 'post': + post_content = content.get('content', []) + flattened: list[dict] = [] + for ele in post_content: + if isinstance(ele, dict): + flattened.append(ele) + elif isinstance(ele, list): + flattened.extend(item for item in ele if isinstance(item, dict)) + return flattened + if message.message_type == 'image': + return [{'tag': 'img', 'image_key': content.get('image_key', ''), 'style': []}] + if message.message_type == 'file': + return [ + { + 'tag': 'file', + 'file_key': content.get('file_key', ''), + 'file_name': content.get('file_name', 'file'), + } + ] + if message.message_type == 'audio': + return [ + { + 'tag': 'audio', + 'file_key': content.get('file_key', ''), + 'duration': content.get('duration', 0), + } + ] + return [{'tag': 'text', 'text': json.dumps(content, ensure_ascii=False), 'style': []}] + + @staticmethod + def _split_text_mentions(text: str, mentions: list) -> list[dict]: + if not text: + return [] + mention_by_key = {getattr(m, 'key', ''): m for m in mentions} + pattern = re.compile(r'@_user_\d+') + result: list[dict] = [] + pos = 0 + for match in pattern.finditer(text): + if match.start() > pos: + result.append({'tag': 'text', 'text': text[pos : match.start()], 'style': []}) + mention = mention_by_key.get(match.group(0)) + if mention: + result.append( + { + 'tag': 'at', + 'user_id': getattr(mention, 'id', None) + or getattr(mention, 'open_id', None) + or getattr(mention, 'user_id', None) + or getattr(mention, 'key', match.group(0)), + 'user_name': getattr(mention, 'name', ''), + 'style': [], + } + ) + else: + result.append({'tag': 'text', 'text': match.group(0), 'style': []}) + pos = match.end() + if pos < len(text): + result.append({'tag': 'text', 'text': text[pos:], 'style': []}) + return result + + @staticmethod + async def _download_resource( + api_client: lark_oapi.Client, + message_id: str, + file_key: str, + resource_type: str, + ) -> dict: + if not file_key: + return {} + request = ( + GetMessageResourceRequest.builder().message_id(message_id).file_key(file_key).type(resource_type).build() + ) + response: GetMessageResourceResponse = await api_client.im.v1.message_resource.aget(request) + if not response.success(): + return {} + data = response.file.read() + content_type = response.raw.headers.get('content-type', 'application/octet-stream') + base64_data = base64.b64encode(data).decode() + ext = mimetypes.guess_extension(content_type.split(';')[0].strip()) or '.bin' + temp_path = os.path.join(tempfile.gettempdir(), f'lark_{file_key}{ext}') + with open(temp_path, 'wb') as f: + f.write(data) + return { + 'url': f'file://{temp_path}', + 'path': temp_path, + 'base64': f'data:{content_type};base64,{base64_data}', + 'size': len(data), + } + + @staticmethod + def _message_time(message: EventMessage) -> datetime.datetime: + value = getattr(message, 'create_time', None) + if isinstance(value, datetime.datetime): + return value + if isinstance(value, (int, float, str)): + try: + timestamp = float(value) + if timestamp > 10_000_000_000: + timestamp = timestamp / 1000 + return datetime.datetime.fromtimestamp(timestamp) + except ValueError: + pass + return datetime.datetime.now() diff --git a/src/langbot/pkg/platform/adapters/lark/platform_api.py b/src/langbot/pkg/platform/adapters/lark/platform_api.py new file mode 100644 index 00000000..e739caff --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/platform_api.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import json + +from lark_oapi.api.im.v1 import GetChatRequest, GetMessageRequest, GetMessageResourceRequest + + +async def check_tenant_access_token(adapter, params: dict) -> dict: + tenant_key = params.get('tenant_key') or getattr(adapter, 'lark_tenant_key', None) + token = adapter.get_tenant_access_token(tenant_key) + return {'ok': bool(token) or adapter.config.get('app_type', 'self') != 'isv'} + + +async def refresh_app_access_token(adapter, params: dict) -> dict: + adapter.app_access_token = None + adapter.app_access_token_expire_at = None + token = adapter.get_app_access_token() + return {'ok': bool(token) or adapter.config.get('app_type', 'self') != 'isv'} + + +async def refresh_tenant_access_token(adapter, params: dict) -> dict: + tenant_key = params.get('tenant_key') or getattr(adapter, 'lark_tenant_key', None) + if tenant_key: + adapter.tenant_access_tokens.pop(tenant_key, None) + token = adapter.get_tenant_access_token(tenant_key) + return {'ok': bool(token) or adapter.config.get('app_type', 'self') != 'isv'} + + +async def get_chat(adapter, params: dict) -> dict: + request = GetChatRequest.builder().chat_id(params['chat_id']).build() + response = await adapter.api_client.im.v1.chat.aget(request, adapter.request_option(params.get('tenant_key'))) + return _response_to_dict(response) + + +async def get_message(adapter, params: dict) -> dict: + request = GetMessageRequest.builder().message_id(params['message_id']).build() + response = await adapter.api_client.im.v1.message.aget(request, adapter.request_option(params.get('tenant_key'))) + return _response_to_dict(response) + + +async def get_message_resource(adapter, params: dict) -> dict: + request = ( + GetMessageResourceRequest.builder() + .message_id(params['message_id']) + .file_key(params['file_key']) + .type(params.get('type', 'file')) + .build() + ) + response = await adapter.api_client.im.v1.message_resource.aget( + request, adapter.request_option(params.get('tenant_key')) + ) + if not response.success(): + return _response_to_dict(response) + content_type = response.raw.headers.get('content-type', 'application/octet-stream') + data = response.file.read() + return {'ok': True, 'content_type': content_type, 'size': len(data)} + + +def _response_to_dict(response) -> dict: + if not response.success(): + return {'ok': False, 'code': response.code, 'msg': response.msg, 'log_id': response.get_log_id()} + data = getattr(response, 'data', None) + if hasattr(data, 'to_json'): + data = data.to_json() + return {'ok': True, 'data': _jsonable(data)} + + +def _jsonable(value): + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, bytes): + return {'bytes': len(value)} + if isinstance(value, (list, tuple, set)): + return [_jsonable(item) for item in value] + if isinstance(value, dict): + return {str(key): _jsonable(item) for key, item in value.items()} + if isinstance(value, str): + return value + try: + return json.loads(value) + except Exception: + pass + raw = getattr(value, '__dict__', None) + if raw: + return {key: _jsonable(item) for key, item in raw.items() if not key.startswith('_')} + return str(value) + + +PLATFORM_API_MAP = { + 'check_tenant_access_token': check_tenant_access_token, + 'refresh_app_access_token': refresh_app_access_token, + 'refresh_tenant_access_token': refresh_tenant_access_token, + 'get_chat': get_chat, + 'get_message': get_message, + 'get_message_resource': get_message_resource, +} diff --git a/src/langbot/pkg/platform/adapters/lark/types.py b/src/langbot/pkg/platform/adapters/lark/types.py new file mode 100644 index 00000000..bce1e876 --- /dev/null +++ b/src/langbot/pkg/platform/adapters/lark/types.py @@ -0,0 +1,3 @@ +from __future__ import annotations + +ADAPTER_NAME = 'lark-eba' diff --git a/tests/unit_tests/platform/test_lark_eba_adapter.py b/tests/unit_tests/platform/test_lark_eba_adapter.py new file mode 100644 index 00000000..10b5ed2d --- /dev/null +++ b/tests/unit_tests/platform/test_lark_eba_adapter.py @@ -0,0 +1,326 @@ +from __future__ import annotations + +import pathlib +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import pytest +import yaml + +from langbot.pkg.platform.adapters.lark.adapter import LarkAdapter +from langbot.pkg.platform.adapters.lark.event_converter import LarkEventConverter +from langbot.pkg.platform.adapters.lark.message_converter import LarkMessageConverter +from langbot.pkg.platform.adapters.lark.platform_api import PLATFORM_API_MAP +from langbot_plugin.api.definition.abstract.platform.event_logger import AbstractEventLogger +from langbot_plugin.api.entities.builtin.platform import entities as platform_entities +from langbot_plugin.api.entities.builtin.platform import events as platform_events +from langbot_plugin.api.entities.builtin.platform import message as platform_message + + +class DummyLogger(AbstractEventLogger): + async def info(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def debug(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def warning(self, text, images=None, message_session_id=None, no_throw=True): + pass + + async def error(self, text, images=None, message_session_id=None, no_throw=True): + pass + + +class DummyResponse: + def __init__(self, data=None, ok=True): + self.data = data or SimpleNamespace(message_id='reply-msg') + self.code = 0 if ok else 1 + self.msg = 'ok' if ok else 'failed' + self.raw = SimpleNamespace(content='{}', headers={'content-type': 'text/plain'}) + self.file = SimpleNamespace(read=lambda: b'data') + self._ok = ok + + def success(self): + return self._ok + + def get_log_id(self): + return 'log-id' + + +def message_item(message_id='msg-remote'): + return SimpleNamespace( + message_id=message_id, + msg_type='text', + create_time=1_714_000_000_000, + body=SimpleNamespace(content='{"text":"remote"}'), + mentions=[], + chat_id='chat-1', + ) + + +class DummyAPIClient: + def __init__(self): + self.im = SimpleNamespace( + v1=SimpleNamespace( + message=SimpleNamespace( + acreate=AsyncMock(return_value=DummyResponse()), + areply=AsyncMock(return_value=DummyResponse()), + aget=AsyncMock(return_value=DummyResponse(SimpleNamespace(items=[]))), + ), + chat=SimpleNamespace( + aget=AsyncMock(return_value=DummyResponse(SimpleNamespace(chat_id='chat-1', name='LangBot Team'))) + ), + image=SimpleNamespace( + acreate=AsyncMock(return_value=DummyResponse(SimpleNamespace(image_key='img-key'))) + ), + file=SimpleNamespace( + acreate=AsyncMock(return_value=DummyResponse(SimpleNamespace(file_key='file-key'))) + ), + message_resource=SimpleNamespace(aget=AsyncMock(return_value=DummyResponse())), + ) + ) + self.auth = SimpleNamespace( + v3=SimpleNamespace( + app_ticket=SimpleNamespace(resend=AsyncMock(return_value=DummyResponse())), + app_access_token=SimpleNamespace(create=AsyncMock(return_value=DummyResponse())), + tenant_access_token=SimpleNamespace(create=AsyncMock(return_value=DummyResponse())), + ) + ) + self.cardkit = SimpleNamespace( + v1=SimpleNamespace( + card=SimpleNamespace(create=AsyncMock(return_value=DummyResponse(SimpleNamespace(card_id='card-id')))), + card_element=SimpleNamespace(content=AsyncMock(return_value=DummyResponse())), + ) + ) + + +class DummyWSClient: + def __init__(self): + self._auto_reconnect = True + self._connect = AsyncMock() + self._disconnect = AsyncMock() + self._reconnect = AsyncMock() + + +def manifest() -> dict: + path = ( + pathlib.Path(__file__).parents[3] + / 'src' + / 'langbot' + / 'pkg' + / 'platform' + / 'adapters' + / 'lark' + / 'manifest.yaml' + ) + return yaml.safe_load(path.read_text()) + + +def make_adapter(config: dict | None = None) -> LarkAdapter: + adapter = LarkAdapter( + { + 'app_id': 'cli_xxx', + 'app_secret': 'secret', + 'bot_name': 'LangBotDev', + 'enable-webhook': False, + 'enable-stream-reply': False, + 'app_type': 'self', + **(config or {}), + }, + DummyLogger(), + ) + adapter.api_client = DummyAPIClient() + adapter.bot = DummyWSClient() + return adapter + + +def lark_event(chat_type='group', message_type='text', content=None): + message = SimpleNamespace( + message_id='msg-1', + message_type=message_type, + content=content or '{"text":"hello @_user_1"}', + create_time=1_714_000_000_000, + mentions=[SimpleNamespace(key='@_user_1', id='user-mention', name='Alice')], + chat_type=chat_type, + chat_id='chat-1', + parent_id=None, + thread_id=None, + ) + sender = SimpleNamespace(sender_id=SimpleNamespace(open_id='user-1', union_id='Alice Union')) + header = SimpleNamespace(tenant_key='tenant-1') + return SimpleNamespace(event=SimpleNamespace(message=message, sender=sender), header=header, schema='2.0') + + +def test_lark_supported_events_match_manifest(): + assert make_adapter().get_supported_events() == manifest()['spec']['supported_events'] + + +def test_lark_supported_apis_match_manifest(): + supported_apis = make_adapter().get_supported_apis() + manifest_apis = manifest()['spec']['supported_apis'] + + assert supported_apis == manifest_apis['required'] + manifest_apis['optional'] + + +def test_lark_platform_api_map_matches_manifest(): + manifest_actions = {item['action'] for item in manifest()['spec']['platform_specific_apis']} + + assert set(PLATFORM_API_MAP) == manifest_actions + + +@pytest.mark.asyncio +async def test_lark_message_converter_maps_outbound_components(): + with ( + patch.object(LarkMessageConverter, 'upload_image_to_lark', AsyncMock(return_value='img-key')), + patch.object(LarkMessageConverter, 'upload_file_to_lark', AsyncMock(return_value='file-key')), + patch.object(LarkMessageConverter, '_get_component_bytes', AsyncMock(return_value=b'data')), + ): + text_elements, media_items = await LarkMessageConverter.yiri2target( + platform_message.MessageChain( + [ + platform_message.Plain(text='hello\n\nsecond'), + platform_message.At(target='ou_user', display='Alice'), + platform_message.AtAll(), + platform_message.Image(base64='ZGF0YQ=='), + platform_message.Voice(base64='ZGF0YQ==', length=2), + platform_message.File(name='doc.txt', base64='ZGF0YQ=='), + platform_message.Quote( + id='origin', origin=platform_message.MessageChain([platform_message.Plain(text='quoted')]) + ), + platform_message.Forward( + node_list=[ + platform_message.ForwardMessageNode( + sender_id='user-2', + sender_name='Bob', + message_chain=platform_message.MessageChain([platform_message.Plain(text='node')]), + ) + ] + ), + ] + ), + DummyAPIClient(), + ) + + assert any(ele.get('text') == 'hello' for paragraph in text_elements for ele in paragraph) + assert any(ele.get('user_id') == 'ou_user' for paragraph in text_elements for ele in paragraph) + assert any(ele.get('user_id') == 'all' for paragraph in text_elements for ele in paragraph) + assert {'msg_type': 'image', 'content': {'image_key': 'img-key'}} in media_items + assert {'msg_type': 'audio', 'content': {'file_key': 'file-key'}} in media_items + assert {'msg_type': 'file', 'content': {'file_key': 'file-key'}} in media_items + + +@pytest.mark.asyncio +async def test_lark_message_converter_maps_inbound_components(): + with patch.object( + LarkMessageConverter, + '_download_resource', + AsyncMock( + return_value={ + 'url': 'file:///tmp/file', + 'path': '/tmp/file', + 'base64': 'data:text/plain;base64,ZGF0YQ==', + 'size': 4, + } + ), + ): + text_chain = await LarkMessageConverter.target2yiri(lark_event().event.message, DummyAPIClient()) + image_chain = await LarkMessageConverter.target2yiri( + lark_event(message_type='image', content='{"image_key":"img-key"}').event.message, DummyAPIClient() + ) + file_chain = await LarkMessageConverter.target2yiri( + lark_event(message_type='file', content='{"file_key":"file-key","file_name":"doc.txt"}').event.message, + DummyAPIClient(), + ) + + assert isinstance(text_chain[0], platform_message.Source) + assert isinstance(text_chain[1], platform_message.Plain) + assert isinstance(text_chain[2], platform_message.At) + assert text_chain[2].target == 'user-mention' + assert isinstance(image_chain[1], platform_message.Image) + assert image_chain[1].image_id == 'img-key' + assert isinstance(file_chain[1], platform_message.File) + assert file_chain[1].name == 'doc.txt' + + +@pytest.mark.asyncio +async def test_lark_event_converter_maps_group_and_private_message(): + group_event = await LarkEventConverter.target2yiri(lark_event('group'), DummyAPIClient()) + + assert isinstance(group_event, platform_events.MessageReceivedEvent) + assert group_event.adapter_name == 'lark-eba' + assert group_event.chat_type == platform_entities.ChatType.GROUP + assert group_event.chat_id == 'chat-1' + assert group_event.group.id == 'chat-1' + assert group_event.sender.id == 'user-1' + + private_event = await LarkEventConverter.target2yiri( + lark_event('p2p', content='{"text":"hello"}'), + DummyAPIClient(), + ) + assert private_event.chat_type == platform_entities.ChatType.PRIVATE + assert private_event.chat_id == 'user-1' + assert private_event.group is None + + +@pytest.mark.asyncio +async def test_lark_adapter_dispatches_and_caches_message_event(): + adapter = make_adapter() + calls: list[platform_events.Event] = [] + + async def listener(event, adapter): + calls.append(event) + + adapter.register_listener(platform_events.MessageReceivedEvent, listener) + await adapter._handle_message_event(lark_event()) + + assert len(calls) == 1 + received = calls[0] + assert isinstance(received, platform_events.MessageReceivedEvent) + assert await adapter.get_message('group', 'chat-1', 'msg-1') == received + assert (await adapter.get_group_info('chat-1')).name == '' + assert (await adapter.get_user_info('user-1')).nickname == 'Alice Union' + + +@pytest.mark.asyncio +async def test_lark_get_message_fetches_uncached_message(): + adapter = make_adapter() + adapter.api_client.im.v1.message.aget = AsyncMock( + return_value=DummyResponse(SimpleNamespace(items=[message_item('msg-remote')])) + ) + + event = await adapter.get_message('group', 'chat-1', 'msg-remote') + + assert event.adapter_name == 'lark-eba' + assert event.message_id == 'msg-remote' + assert event.chat_type == platform_entities.ChatType.GROUP + assert isinstance(event.message_chain[1], platform_message.Plain) + assert event.message_chain[1].text == 'remote' + + +@pytest.mark.asyncio +async def test_lark_send_reply_platform_api_and_modes(): + adapter = make_adapter() + message = platform_message.MessageChain([platform_message.Plain(text='hello')]) + + sent = await adapter.send_message('group', 'chat-1', message) + assert sent.raw['message_ids'] == ['reply-msg'] + adapter.api_client.im.v1.message.acreate.assert_awaited_once() + + source_event = await LarkEventConverter.target2yiri(lark_event(), adapter.api_client) + replied = await adapter.reply_message(source_event, message) + assert replied.raw['message_ids'] == ['reply-msg'] + adapter.api_client.im.v1.message.areply.assert_awaited_once() + + assert await adapter.call_platform_api('check_tenant_access_token', {}) == {'ok': True} + + await adapter.run_async() + adapter.bot._connect.assert_awaited_once() + + webhook_adapter = make_adapter({'enable-webhook': True}) + task = asyncio.create_task(webhook_adapter.run_async()) + await asyncio.sleep(0) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + webhook_adapter.bot._connect.assert_not_awaited()