From 144bec371c319b2c5f06bc987b29707a233fb90d Mon Sep 17 00:00:00 2001 From: Junyan Chin Date: Mon, 22 Jun 2026 13:38:00 +0800 Subject: [PATCH] feat(platform): standalone HTTP Bot adapter (server-to-server) (#2274) * docs(platform): add HTTP Bot adapter design (RFC) Standalone server-to-server HTTP adapter for driving a pipeline from external systems (LangBot Space ticketing et al). Inbound via the existing unified webhook route; outbound via signed callback POSTs. Preserves pipeline-native N->1 aggregation and 1->M multi-reply without a long-lived WebSocket. No core changes required (router/aggregator/pipeline untouched). * feat(platform): add standalone HTTP Bot adapter A first-class, vendor-neutral message-platform adapter (http_bot) for server-to-server integrations (LangBot Space ticketing et al). Drives a pipeline over plain HTTP with no long-lived connection: - Inbound: signed POST to the existing unified webhook route /bots/, carrying a caller-defined session_id mapped to the LangBot launcher id via get_launcher_id -> per-session isolation. Preserves pipeline-native N->1 aggregation for free. - Outbound: each reply_message / reply_message_chunk becomes one signed callback POST to the config-only callback_url, delivered in per-session sequence order with retry/backoff -> 1->M multi-reply. - Sub-paths: /reset (drop a session) and /sync (block for the collapsed reply). - Auth: symmetric HMAC-SHA256 both directions (timestamp + replay window), no JWT/Turnstile, no socket. Decisions: callback URL is config-only (SSRF closed); reset + sync shipped; Python + TS reference clients shipped (signing verified byte-identical 3-way). No core changes: the unified webhook router, aggregator, query pool and pipeline are untouched. Adapter is auto-discovered from platform/sources/. Adds: src/langbot/pkg/platform/sources/http_bot.{py,yaml,svg} src/langbot/pkg/platform/sources/http_bot_signing.py docs/platforms/http-bot.md, docs/http-bot-openapi.json examples/http-bot/{client.py,client.ts,README.md} Updates docs/HTTP_BOT_ADAPTER_DESIGN.md (status: implemented). * docs(examples): add interactive HTTP Bot playground (browser debug console) A single-file aiohttp web app (examples/http-bot/playground.py) that lets you chat with a RUNNING http_bot bot from the browser and watch the protocol live: signed inbound POST -> 202 ack -> 1->M signed callbacks streamed back via SSE, with a debug panel showing the signature, HTTP status, and per-callback sequence/verification. Light LangBot-styled UI. On startup it reads the API key + http_bot bot from data/langbot.db and points the bot's callback_url + secrets back at itself via the LangBot API (live reload, no restart). README updated with a playground section. * docs(examples): add Chinese README for http-bot reference clients * style(platform): use code icon for http_bot adapter logo * docs(examples): point http-bot guide links to docs.langbot.app * style(platform): make http_bot icon a transparent monochrome so WebUI tints it like other adapters * Revert to colorful badge for http_bot icon (WebUI renders it as-is) --- docs/HTTP_BOT_ADAPTER_DESIGN.md | 575 ++++++++++++++++++ docs/http-bot-openapi.json | 198 ++++++ docs/platforms/http-bot.md | 256 ++++++++ examples/http-bot/README.md | 75 +++ examples/http-bot/README.zh.md | 71 +++ examples/http-bot/client.py | 167 +++++ examples/http-bot/client.ts | 123 ++++ examples/http-bot/playground.py | 349 +++++++++++ src/langbot/pkg/platform/sources/http_bot.py | 509 ++++++++++++++++ src/langbot/pkg/platform/sources/http_bot.svg | 9 + .../pkg/platform/sources/http_bot.yaml | 153 +++++ .../pkg/platform/sources/http_bot_signing.py | 95 +++ 12 files changed, 2580 insertions(+) create mode 100644 docs/HTTP_BOT_ADAPTER_DESIGN.md create mode 100644 docs/http-bot-openapi.json create mode 100644 docs/platforms/http-bot.md create mode 100644 examples/http-bot/README.md create mode 100644 examples/http-bot/README.zh.md create mode 100644 examples/http-bot/client.py create mode 100644 examples/http-bot/client.ts create mode 100644 examples/http-bot/playground.py create mode 100644 src/langbot/pkg/platform/sources/http_bot.py create mode 100644 src/langbot/pkg/platform/sources/http_bot.svg create mode 100644 src/langbot/pkg/platform/sources/http_bot.yaml create mode 100644 src/langbot/pkg/platform/sources/http_bot_signing.py diff --git a/docs/HTTP_BOT_ADAPTER_DESIGN.md b/docs/HTTP_BOT_ADAPTER_DESIGN.md new file mode 100644 index 000000000..31e9a4861 --- /dev/null +++ b/docs/HTTP_BOT_ADAPTER_DESIGN.md @@ -0,0 +1,575 @@ +# HTTP Bot Adapter — Design Document + +> Status: **Implemented** · Branch: `feat/http-bot-adapter` · Author: LangBot core +> +> A first-class, **standalone** message-platform adapter (`http_bot`) that lets +> any external system (e.g. LangBot Space ticketing, an internal back-office, a +> CRM, a custom web app) talk to a LangBot pipeline over plain HTTP — **inbound** +> by POSTing messages in, **outbound** by receiving replies on a callback URL — +> with full support for the pipeline's native N→1 aggregation and 1→M +> multi-reply semantics, and **without** holding a long-lived WebSocket +> connection. +> +> **Shipped in this branch:** +> - `src/langbot/pkg/platform/sources/http_bot.yaml` — adapter manifest (auto-discovered) +> - `src/langbot/pkg/platform/sources/http_bot.py` — `HttpBotAdapter` +> - `src/langbot/pkg/platform/sources/http_bot_signing.py` — HMAC helpers +> - `src/langbot/pkg/platform/sources/http_bot.svg` — icon +> - `docs/platforms/http-bot.md` — integration guide +> - `docs/http-bot-openapi.json` — machine-readable contract +> - `examples/http-bot/` — Python + TypeScript reference clients +> +> **Final decisions (resolving the original open questions):** +> 1. Callback URL is **config-only** — never accepted per-message (SSRF closed). +> 2. **Session reset is provided** — `POST /bots//reset` keyed by `session_id`. +> 3. Reference **clients are provided** — `examples/http-bot/client.py` + `client.ts`. +> 4. **Sync convenience mode is included** — `POST /bots//sync` (opt-in, lossy). + +--- + +## 1. Background & Motivation + +### 1.1 The concrete need + +LangBot Space wants to use a LangBot pipeline as the brain for **ticket +handling**. The integration is **server-to-server**: Space's backend pushes a +user's ticket messages into LangBot and renders LangBot's replies back into the +ticket thread. + +This interaction is **not** request/response shaped: + +- **N → 1**: a user may fire several messages in a row ("the app crashed" … + "when I click export" … "here's a screenshot"). The pipeline's + **message aggregation** feature should debounce and merge these into one turn. +- **1 → N**: a single turn may yield **multiple** outbound messages — a tool/ + function call narrating progress, a plugin emitting several cards, a streamed + answer split into chunks. + +### 1.2 Why the existing options don't fit + +LangBot today exposes exactly one externally-reachable way to drive a pipeline +that is **not** tied to a specific IM vendor: the **WebSocket** path +(`/api/v1/pipelines//ws/connect` for dashboard debug, and +`/api/v1/embed//ws/connect` for the embeddable web widget). + +For a server-to-server integration the WebSocket path has real friction: + +| Problem | Detail | +|---|---| +| Long-lived connection | Caller must maintain a socket, heartbeats, and reconnect logic for what is fundamentally a fire-and-collect workload. | +| Session identity | Inbound messages are keyed by the transient `connection_id` (`websocket_{connection_id}`); the caller **cannot supply a stable, business-meaningful session id** (e.g. a ticket number). Multi-ticket isolation is not expressible. | +| Auth mismatch | The debug socket is gated by the **dashboard JWT** (must not be handed to an external service); the embed socket is gated by **Cloudflare Turnstile** (a *browser* human-check that a backend cannot satisfy). Neither is a server-to-server credential. | +| In-memory, single-process state | Session history lives in process memory and is lost on restart. | + +> **Key realisation.** The N→1 / 1→M behaviour the caller wants is **not** +> provided by WebSocket — it is provided by the **pipeline** (aggregation + +> the adapter being free to call `reply_message` any number of times). It is +> therefore **transport-independent**. We can deliver the exact same semantics +> over a far lighter HTTP transport. + +### 1.3 Why a *new, standalone* adapter (not a refactor of an existing one) + +The brief is explicit: **do not reuse / fork an existing vendor adapter.** The +vendor adapters (`lark`, `wecom`, `qqofficial`, `slack`, …) carry vendor-specific +signature schemes, payload shapes, and message-segment mappings. Bending one of +them into a "generic" mode would couple a public integration surface to one +vendor's quirks and make the developer experience worse for everyone. + +Instead we ship `http_bot` as a clean, independent adapter whose **entire +contract is LangBot's own** — documented, versioned, and designed front-to-back +around *integrator* developer experience. + +--- + +## 2. Goals & Non-Goals + +### Goals + +- **G1** A standalone `http_bot` adapter, selectable like any other platform + adapter in the dashboard, with its own config schema and docs. +- **G2** **Inbound**: external systems POST messages to a stable LangBot URL, + carrying a **caller-defined `session_id`** that maps 1:1 to a LangBot session. +- **G3** **Outbound**: LangBot delivers each reply by POSTing to a + caller-configured **callback URL**; one turn may produce **many** callbacks. +- **G4** Preserve pipeline-native **N→1 aggregation** and **1→M multi-reply**. +- **G5** Server-to-server **auth**: shared-secret HMAC request signing both + directions (no JWT, no Turnstile, no long-lived socket). +- **G6** **Great DX**: copy-pasteable curl, a tiny reference client, an OpenAPI + fragment, idempotency, clear error envelope, and a local echo-server recipe. + +### Non-Goals + +- Not replacing or deprecating the WebSocket / embed widget path (that remains + the right tool for *browser*, real-time, streaming chat UIs). +- Not a synchronous "one request → one response" RPC (explicitly rejected: it + cannot express 1→M; see §9 for the optional sync convenience mode). +- No built-in message **persistence/replay** in v1 (callbacks are at-least-once + best-effort; durability is the caller's responsibility — see §8). +- No multi-tenant API-key management UI in v1 (one secret per bot; see §11). + +--- + +## 3. How LangBot routes a message (the parts we plug into) + +Understanding the existing flow is what makes this adapter cheap. A message +flows through these stages (verified against current `master`): + +``` + INBOUND OUTBOUND + external POST ─┐ ┌─ reply_message() + ▼ │ reply_message_chunk() + POST /bots/ (unified webhook router, AuthType.NONE) + │ webhooks.py → adapter.handle_unified_webhook(bot_uuid, path, request) + ▼ │ + HttpBotAdapter.handle_unified_webhook │ (called 0..N times + • verify HMAC signature │ per turn by the + • parse {session_id, message[]} │ pipeline / plugins) + • build FriendMessage / GroupMessage │ + • fire registered listener ───────────────┐ │ + │ │ │ + ▼ ▼ │ + botmgr.on_friend_message / on_group_message │ + • (optional) webhook_pusher fan-out │ + • msg_aggregator.add_message(...) ── N→1 debounce ──►│ + │ │ + ▼ │ + query_pool → pipeline.run() ─── invokes adapter ─────┘ + reply methods 1..M times +``` + +Two framework facts we rely on: + +1. **N→1 aggregation is free.** `botmgr` hands every inbound event to + `self.ap.msg_aggregator.add_message(...)`, which debounces per + `session_id` and merges consecutive messages into one pipeline turn + (`pkg/pipeline/aggregator.py`). The adapter does nothing special. + +2. **1→M is free.** The pipeline (and any plugin in the chain) calls + `adapter.reply_message()` / `reply_message_chunk()` **as many times as it + wants** per turn. The adapter's only job is to deliver each call outward. + For `http_bot` that means: **one outbound callback POST per call.** + +3. **A unified inbound route already exists.** `WebhookRouterGroup` + (`pkg/api/http/controller/groups/webhooks.py`) maps + `POST /bots/[/]` (auth `NONE`) to + `adapter.handle_unified_webhook(bot_uuid, path, request)`. `http_bot` + implements that method and is reachable **without registering any new + route** — it does its own signature verification, exactly like the vendor + webhook adapters do. + +> Net new code is essentially: one `http_bot.py` adapter, one `http_bot.yaml` +> schema, signing helpers, and docs. No router, aggregator, or pipeline changes. + +--- + +## 4. Architecture Overview + +``` +┌────────────────────┐ (1) inbound: POST signed message +│ External system │ ──────────────────────────────────────────────► ┌──────────────────────┐ +│ (LangBot Space, │ POST /bots/ │ LangBot │ +│ CRM, web app …) │ X-LB-Signature, X-LB-Timestamp │ │ +│ │ { session_id, message:[...] } │ HttpBotAdapter │ +│ - callback server │ ◄────────────────────────────────────────────── │ (platform/sources) │ +│ (receives │ (4) outbound: POST signed reply(s) │ │ +│ replies) │ POST │ pipeline + aggregator│ +└────────────────────┘ X-LB-Signature, X-LB-Timestamp └──────────────────────┘ + { session_id, sequence, is_final, + message:[...] } (sent 1..M times) +``` + +- The adapter is **stateless across requests** at the HTTP layer; session + continuity is carried by `session_id` and resolved by LangBot's normal + session manager. +- **Inbound** and **outbound** are **independent HTTP exchanges**. LangBot does + not answer the inbound POST with the pipeline result; it `202 Accepts` it and + later POSTs the reply(s) to the callback URL. This is what makes 1→M natural. + +--- + +## 5. Configuration Schema (`http_bot.yaml`) + +Follows the existing `MessagePlatformAdapter` manifest convention (cf. +`slack.yaml`). Fields: + +| field | type | required | purpose | +|---|---|---|---| +| `inbound_secret` | string (secret) | yes | HMAC key the **caller** uses to sign inbound POSTs; LangBot verifies. | +| `callback_url` | string (url) | no* | Where LangBot POSTs replies. *Optional if the caller supplies `callback_url` per-message (see §6.1); a static default lives here. | +| `outbound_secret` | string (secret) | no | HMAC key LangBot uses to sign outbound callbacks; caller verifies. Defaults to `inbound_secret` if empty. | +| `default_session_type` | enum `person`/`group` | no | Default when a message omits `session_type`. Default `person`. | +| `signature_required` | bool | no | If `false`, skip inbound signature check (dev only; logs a warning). Default `true`. | +| `callback_timeout` | int (seconds) | no | Per-callback HTTP timeout. Default `15`. | +| `callback_max_retries` | int | no | Retries on 5xx/timeout with backoff. Default `3`. | +| `webhook_url` | webhook-url (display) | — | Read-only field rendering the inbound URL `…/bots/` for copy-paste, like other webhook adapters. | + +Manifest sketch (i18n labels elided for brevity): + +```yaml +apiVersion: v1 +kind: MessagePlatformAdapter +metadata: + name: http_bot + label: { en_US: "HTTP Bot", zh_Hans: "HTTP 通用接入" } + description: + en_US: "Integrate any backend over plain HTTP. Push messages in, receive replies on a callback URL. Server-to-server, no long-lived connection." + zh_Hans: "通过 HTTP 接入任意后端系统。推入消息、在回调地址接收回复。面向服务间集成,无需长连接。" + icon: http_bot.svg +spec: + categories: [popular, global] + help_links: + zh: https://docs.langbot.app/zh/platforms/http-bot + en: https://docs.langbot.app/en/platforms/http-bot + config: + - { name: inbound_secret, type: string, required: true, default: "" } + - { name: callback_url, type: string, required: false, default: "" } + - { name: outbound_secret, type: string, required: false, default: "" } + - { name: default_session_type, type: select, required: false, default: "person", + options: [person, group] } + - { name: signature_required, type: boolean, required: false, default: true } + - { name: callback_timeout, type: integer, required: false, default: 15 } + - { name: callback_max_retries, type: integer, required: false, default: 3 } + - { name: webhook_url, type: webhook-url, required: false, default: "" } +execution: + python: + path: ./http_bot.py + attr: HttpBotAdapter +``` + +--- + +## 6. The HTTP Contract (this is the DX surface) + +### 6.1 Inbound — push a message into LangBot + +``` +POST /bots/{bot_uuid} +Content-Type: application/json +X-LB-Timestamp: 1718000000 +X-LB-Signature: sha256= +X-LB-Idempotency-Key: # optional, dedup window +``` + +Body: + +```jsonc +{ + "session_id": "ticket-10293", // REQUIRED. Caller-defined. Maps 1:1 to a LangBot session. + "session_type": "person", // optional, "person" | "group"; default from config + "sender": { // optional metadata, surfaced to pipeline/plugins + "id": "user-5567", + "name": "Alice" + }, + "message": [ // REQUIRED. A LangBot MessageChain (list of segments). + { "type": "Plain", "text": "Export keeps failing on the dashboard." }, + { "type": "Image", "url": "https://.../screenshot.png" } + ] +} +``` + +Response (LangBot does **not** block on the pipeline): + +```jsonc +// 202 Accepted +{ + "code": 0, + "msg": "accepted", + "data": { + "session_id": "ticket-10293", + "accepted_message_id": "in_01H....", // server-assigned id for this inbound message + "aggregating": true // true if buffered by the aggregator + } +} +``` + +**N→1 in practice.** Fire three POSTs with the same `session_id` inside the +aggregation window → the pipeline runs **once** with the three messages merged. +No special flag needed; this is the aggregator's default behaviour when enabled +on the pipeline. + +### 6.2 Outbound — LangBot delivers replies to your callback + +For each `reply_message` / `reply_message_chunk` the pipeline emits, LangBot +POSTs to `callback_url`: + +``` +POST {callback_url} +Content-Type: application/json +X-LB-Timestamp: 1718000001 +X-LB-Signature: sha256= +``` + +Body: + +```jsonc +{ + "session_id": "ticket-10293", // echoes the inbound session + "reply_to": "in_01H....", // the inbound message id this answers + "sequence": 1, // 1-based ordinal within this turn (for 1→M ordering) + "is_final": false, // false for intermediate/streamed parts + "stream": false, // true when this is a streamed chunk + "message": [ + { "type": "Plain", "text": "Looking into it — checking your export logs…" } + ], + "timestamp": "2026-06-22T09:00:01Z" +} +``` + +**1→M in practice.** A turn that fires a function call then a final answer +produces e.g.: + +``` +POST callback → { sequence: 1, is_final: false, message: ["Checking logs…"] } +POST callback → { sequence: 2, is_final: false, message: ["Found 2 failed exports."] } +POST callback → { sequence: 3, is_final: true, message: ["Fixed. Try again now."] } +``` + +The caller stitches by `session_id` + `sequence`, and knows the turn is complete +when `is_final: true` arrives. + +Your callback endpoint should return `200` quickly. A non-2xx triggers retry +with backoff (`callback_max_retries`). + +### 6.3 Error envelope (inbound) + +Consistent, machine-readable; never leak internals: + +```jsonc +{ "code": 40101, "msg": "invalid signature", "data": null } +``` + +| HTTP | code | meaning | +|---|---|---| +| 202 | 0 | accepted | +| 400 | 40001 | malformed body / missing `session_id` or `message` | +| 401 | 40101 | bad/expired signature | +| 403 | 40301 | bot disabled | +| 404 | 40401 | bot_uuid not found / not an `http_bot` adapter | +| 409 | 40901 | duplicate idempotency key (already accepted) | +| 413 | 41301 | message too large | +| 500 | 50001 | internal error | + +--- + +## 7. Signing scheme (both directions) + +Symmetric, dependency-free HMAC-SHA256 — trivial to implement in any language. + +``` +signing_string = "{timestamp}.{raw_request_body}" +signature = "sha256=" + hex(HMAC_SHA256(secret, signing_string)) +``` + +Verification rules: + +- Reject if `|now - timestamp| > 300s` (replay window). +- Constant-time compare (`hmac.compare_digest`). +- Inbound verified with `inbound_secret`; outbound signed with + `outbound_secret` (falls back to `inbound_secret`). +- `signature_required: false` bypasses verification **and logs a warning** — + intended only for local development behind a trusted network. + +Reference (Python, ~6 lines): + +```python +import hmac, hashlib, time + +def sign(secret: str, body: bytes, ts: int | None = None) -> tuple[str, str]: + ts = ts or int(time.time()) + mac = hmac.new(secret.encode(), f"{ts}.".encode() + body, hashlib.sha256) + return str(ts), "sha256=" + mac.hexdigest() +``` + +--- + +## 8. Delivery semantics & reliability + +- **Inbound**: `202 Accepted` means *queued*, not *processed*. Use + `X-LB-Idempotency-Key` to make client retries safe (dedup window, e.g. 10 min). +- **Outbound**: **at-least-once**, best-effort. Retries on timeout/5xx with + exponential backoff up to `callback_max_retries`. Callbacks for one + `session_id` are delivered **in `sequence` order** (serialised per session); + across sessions they may interleave. +- **No persistence in v1**: if LangBot restarts mid-turn, in-flight callbacks + may be lost. Durable replay is deferred (see §13). Callers needing exactly-once + should dedup on `(session_id, reply_to, sequence)`. +- **Backpressure**: the adapter must not block the pipeline on slow callbacks — + outbound POSTs run on a per-session ordered queue with the configured timeout. + +--- + +## 9. Optional: synchronous convenience mode (v1.1, behind a flag) + +Some simple callers genuinely want "POST a message, get the reply in the HTTP +response" and don't care about streaming/multi-part. We can offer an **opt-in** +sync endpoint that internally waits for `is_final` and **collapses** all 1→M +parts into one array: + +``` +POST /bots/{bot_uuid}/sync → 200 { session_id, message: [ ...all parts concatenated... ] } +``` + +Implemented by attaching a per-request future that resolves on the final reply, +with a hard timeout. This is a **convenience wrapper** over the same machinery, +explicitly documented as lossy for streaming/ordering. Not in v1 core. + +--- + +## 10. Adapter implementation sketch (`platform/sources/http_bot.py`) + +Implements `AbstractMessagePlatformAdapter`. Key methods: + +```python +class HttpBotAdapter(AbstractMessagePlatformAdapter): + listeners: dict = pydantic.Field(default_factory=dict, exclude=True) + + # --- inbound ------------------------------------------------------- + async def handle_unified_webhook(self, bot_uuid, path, request): + body = await request.get_body() + if self.config.get("signature_required", True): + if not self._verify(request, body): + return jsonify({"code": 40101, "msg": "invalid signature"}), 401 + data = json.loads(body) + session_id = data["session_id"] # caller-defined identity + session_type = data.get("session_type", self.config.get("default_session_type", "person")) + chain = MessageChain.model_validate(data["message"]) + event = self._build_event(session_type, session_id, data.get("sender"), chain) + # remember where to send replies for this session + self._callback_for[session_id] = data.get("callback_url") or self.config.get("callback_url") + # fire the registered listener → botmgr → msg_aggregator (N→1) → pipeline + if type(event) in self.listeners: + asyncio.create_task(self.listeners[type(event)](event, self)) + return jsonify({"code": 0, "msg": "accepted", + "data": {"session_id": session_id, "accepted_message_id": event.message_id}}), 202 + + # --- outbound (called 1..M times per turn by the pipeline) --------- + async def reply_message(self, message_source, message, quote_origin=False): + return await self._post_callback(message_source, message, is_final=True, stream=False) + + async def reply_message_chunk(self, message_source, bot_message, message, + quote_origin=False, is_final=False): + return await self._post_callback(message_source, message, is_final=is_final, stream=True) + + async def is_stream_output_supported(self) -> bool: + return True + + def register_listener(self, event_type, func): self.listeners[event_type] = func + def unregister_listener(self, event_type, func): self.listeners.pop(event_type, None) + async def run_async(self): pass # nothing to poll; purely webhook-driven + async def kill(self): pass +``` + +`_post_callback` resolves the session's callback URL, assigns the next +`sequence`, signs the body, and enqueues an ordered, retrying POST. + +Session→callback mapping is kept in a small in-memory dict keyed by +`session_id` (acceptable for v1; a turn's callback URL is captured at inbound +time so replies always have a destination even if config later changes). + +--- + +## 11. Security considerations + +- **Inbound route is `AuthType.NONE`** at the framework level (same as all + webhook adapters) — the adapter **must** enforce HMAC itself. Default + `signature_required: true`. +- **Timestamp window** (±300s) + idempotency key blunt replay. +- **SSRF on callback_url**: validate scheme (`https` in prod), and consider an + allow-list / block of private CIDRs since LangBot initiates the POST. Document + this; enforce in code where feasible. +- **Secret storage**: secrets live in the bot's `adapter_config` like every + other adapter credential; surfaced as `type: string`/secret in the dashboard. +- **One secret per bot** in v1. Per-caller key rotation / multiple keys is a + future enhancement (§13). + +--- + +## 12. Developer Experience (explicit deliverables) + +The whole point of a standalone adapter is that **integrating is pleasant**. v1 +ships: + +1. **`docs/platforms/http-bot.md`** — task-oriented integration guide: + create the bot → copy inbound URL → set secret → stand up a callback + endpoint → send first message → handle 1→M. +2. **Copy-paste curl** for the first message (with a working signing one-liner). +3. **Reference clients** (≤50 LOC each) in `examples/http-bot/`: + `client.py` (push + a Flask/Quart callback receiver) and `client.ts`. +4. **OpenAPI fragment** `docs/http-bot-openapi.json` describing inbound + + callback shapes, so integrators can codegen. +5. **Local echo recipe**: a one-command callback server that prints every + reply, so a developer sees N→1 and 1→M working in under five minutes. +6. **Postman/Hoppscotch collection** (nice-to-have). + +DX acceptance check: *a developer who has never seen LangBot can, from the docs +alone, push a message and observe a multi-part reply on their callback within +10 minutes.* + +### Quickstart (curl) + +```bash +BOT=https://your-langbot/bots/2f1c.... +SECRET=supersecret +BODY='{"session_id":"ticket-10293","message":[{"type":"Plain","text":"hello"}]}' +TS=$(date +%s) +SIG="sha256=$(printf '%s.%s' "$TS" "$BODY" | openssl dgst -sha256 -hmac "$SECRET" -r | cut -d' ' -f1)" +curl -sS -X POST "$BOT" \ + -H "Content-Type: application/json" \ + -H "X-LB-Timestamp: $TS" \ + -H "X-LB-Signature: $SIG" \ + -d "$BODY" +``` + +--- + +## 13. Future work + +- **Durable outbound queue** (persist + replay across restarts; exactly-once). +- **Per-caller API keys** with rotation and scopes (multi-tenant Space usage). +- **Sync convenience endpoint** (§9) once core is stable. +- **Server-Sent Events outbound option** for callers that *do* want a stream but + not a full duplex socket — single GET, server pushes chunks. +- **Dashboard "test console"** for `http_bot` (send a message, watch callbacks) + mirroring the existing WebSocket debug panel. + +--- + +## 14. Rollout / task breakdown + +| # | Task | Touches | +|---|---|---| +| 1 | `http_bot.yaml` manifest + icon | `platform/sources/` | +| 2 | `HttpBotAdapter` (inbound verify, event build, outbound queue) | `platform/sources/http_bot.py` | +| 3 | Signing helper module (shared) | `platform/sources/` or `utils/` | +| 4 | i18n strings (en/zh/ja) | adapter yaml + web locale | +| 5 | Integration docs `docs/platforms/http-bot.md` | `docs/` | +| 6 | OpenAPI fragment + reference clients | `docs/`, `examples/http-bot/` | +| 7 | Tests: signature verify, N→1 aggregation, 1→M ordering, retry | `tests/` | +| 8 | (opt) SSRF guard for callback_url | adapter | + +No changes required to: the unified webhook router, the aggregator, the query +pool, or the pipeline. That is the design's main payoff. + +--- + +## 15. Resolved decisions + +1. **Callback URL trust** — **config-only.** The inbound message may not carry a + `callback_url`; replies always go to the bot-config URL. Closes the SSRF + vector where a leaked inbound secret could redirect replies. +2. **Session lifecycle** — **`POST /bots//reset`** (body `{session_id, + session_type?}`) drops the matching session from the session manager; the + next message starts a fresh conversation. Implemented via sub-path routing in + `handle_unified_webhook`. +3. **Group semantics** — for `session_type: group`, `session_id` is the group/ + launcher id; `sender.id` (and optional `sender.group_name`) identify the + member. A Space ticket maps to one `session_id`. +4. **Backpressure** — bounded per-session outbound queue (maxlen 1000); on + overflow the oldest reply is dropped and a warning logged, so a persistently + down callback can never exhaust memory. + +### Still open / deferred (see §13) + +- Durable outbound queue (persist + replay across restarts). +- Per-caller API keys with rotation/scopes for multi-tenant Space usage. +- SSE outbound option and a dashboard test console. diff --git a/docs/http-bot-openapi.json b/docs/http-bot-openapi.json new file mode 100644 index 000000000..2cac6e439 --- /dev/null +++ b/docs/http-bot-openapi.json @@ -0,0 +1,198 @@ +{ + "openapi": "3.0.3", + "info": { + "title": "LangBot HTTP Bot Adapter", + "version": "1.0.0", + "description": "Server-to-server HTTP integration for a LangBot pipeline. Inbound messages are POSTed to the unified webhook route; replies are delivered to a configured callback URL (one POST per reply part). All requests are HMAC-SHA256 signed. See docs/platforms/http-bot.md." + }, + "paths": { + "/bots/{bot_uuid}": { + "post": { + "summary": "Push a message into the pipeline (fire-and-collect)", + "description": "Returns 202 immediately. Replies arrive asynchronously on the configured callback URL. Reuse the same session_id within the aggregation window to merge multiple messages into one turn (N->1).", + "parameters": [ + { "$ref": "#/components/parameters/BotUuid" }, + { "$ref": "#/components/parameters/Timestamp" }, + { "$ref": "#/components/parameters/Signature" }, + { "$ref": "#/components/parameters/Idempotency" } + ], + "requestBody": { + "required": true, + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/InboundMessage" } } } + }, + "responses": { + "202": { + "description": "Accepted (queued for the pipeline)", + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/AcceptedResponse" } } } + }, + "400": { "$ref": "#/components/responses/Error" }, + "401": { "$ref": "#/components/responses/Error" }, + "409": { "$ref": "#/components/responses/Error" }, + "413": { "$ref": "#/components/responses/Error" } + } + } + }, + "/bots/{bot_uuid}/sync": { + "post": { + "summary": "Push a message and wait for the collapsed reply", + "description": "Blocking convenience mode. Waits for is_final and returns all reply parts collapsed into one array. Lossy (no sequence/streaming). One in-flight sync per session_id.", + "parameters": [ + { "$ref": "#/components/parameters/BotUuid" }, + { "$ref": "#/components/parameters/Timestamp" }, + { "$ref": "#/components/parameters/Signature" } + ], + "requestBody": { + "required": true, + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/InboundMessage" } } } + }, + "responses": { + "200": { + "description": "The collapsed reply", + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/SyncResponse" } } } + }, + "400": { "$ref": "#/components/responses/Error" }, + "401": { "$ref": "#/components/responses/Error" } + } + } + }, + "/bots/{bot_uuid}/reset": { + "post": { + "summary": "Reset a session's conversation", + "parameters": [ + { "$ref": "#/components/parameters/BotUuid" }, + { "$ref": "#/components/parameters/Timestamp" }, + { "$ref": "#/components/parameters/Signature" } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "type": "object", + "required": ["session_id"], + "properties": { + "session_id": { "type": "string" }, + "session_type": { "type": "string", "enum": ["person", "group"] } + } + } + } + } + }, + "responses": { + "200": { "description": "Reset done" }, + "400": { "$ref": "#/components/responses/Error" }, + "401": { "$ref": "#/components/responses/Error" } + } + } + } + }, + "components": { + "parameters": { + "BotUuid": { + "name": "bot_uuid", "in": "path", "required": true, + "schema": { "type": "string", "format": "uuid" } + }, + "Timestamp": { + "name": "X-LB-Timestamp", "in": "header", "required": true, + "description": "Unix seconds; rejected if more than +/-300s from server time.", + "schema": { "type": "string" } + }, + "Signature": { + "name": "X-LB-Signature", "in": "header", "required": true, + "description": "sha256= of HMAC-SHA256(secret, \"{timestamp}.\" + raw_body).", + "schema": { "type": "string" } + }, + "Idempotency": { + "name": "X-LB-Idempotency-Key", "in": "header", "required": false, + "description": "Dedup key; a repeat within the dedup window returns 409.", + "schema": { "type": "string" } + } + }, + "schemas": { + "Segment": { + "type": "object", + "required": ["type"], + "properties": { + "type": { "type": "string", "enum": ["Plain", "Image", "Voice", "File", "At", "Quote"] }, + "text": { "type": "string", "description": "For type=Plain." }, + "url": { "type": "string", "description": "For media types." }, + "base64": { "type": "string", "description": "For media types (data URI or raw base64)." } + } + }, + "InboundMessage": { + "type": "object", + "required": ["session_id", "message"], + "properties": { + "session_id": { "type": "string", "description": "Caller-defined; maps 1:1 to a LangBot session." }, + "session_type": { "type": "string", "enum": ["person", "group"], "default": "person" }, + "sender": { + "type": "object", + "properties": { + "id": { "type": "string" }, + "name": { "type": "string" }, + "group_name": { "type": "string", "description": "For session_type=group." } + } + }, + "message": { "type": "array", "items": { "$ref": "#/components/schemas/Segment" } } + } + }, + "AcceptedResponse": { + "type": "object", + "properties": { + "code": { "type": "integer", "example": 0 }, + "msg": { "type": "string", "example": "accepted" }, + "data": { + "type": "object", + "properties": { + "session_id": { "type": "string" }, + "accepted_message_id": { "type": "string", "example": "in_01H..." }, + "aggregating": { "type": "boolean" } + } + } + } + }, + "SyncResponse": { + "type": "object", + "properties": { + "code": { "type": "integer", "example": 0 }, + "msg": { "type": "string", "example": "ok" }, + "data": { + "type": "object", + "properties": { + "session_id": { "type": "string" }, + "reply_to": { "type": "string" }, + "message": { "type": "array", "items": { "$ref": "#/components/schemas/Segment" } } + } + } + } + }, + "Callback": { + "type": "object", + "description": "Delivered by LangBot to your callback_url, one POST per reply part. Signed with the outbound secret.", + "properties": { + "session_id": { "type": "string" }, + "reply_to": { "type": "string", "description": "The accepted_message_id this answers." }, + "sequence": { "type": "integer", "description": "1-based ordinal within the turn." }, + "is_final": { "type": "boolean", "description": "True on the last part of the turn." }, + "stream": { "type": "boolean" }, + "message": { "type": "array", "items": { "$ref": "#/components/schemas/Segment" } }, + "timestamp": { "type": "string", "format": "date-time" } + } + }, + "ErrorEnvelope": { + "type": "object", + "properties": { + "code": { "type": "integer", "example": 40101 }, + "msg": { "type": "string", "example": "invalid signature: signature_mismatch" }, + "data": { "nullable": true } + } + } + }, + "responses": { + "Error": { + "description": "Error envelope", + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/ErrorEnvelope" } } } + } + } + } +} diff --git a/docs/platforms/http-bot.md b/docs/platforms/http-bot.md new file mode 100644 index 000000000..71fbc6799 --- /dev/null +++ b/docs/platforms/http-bot.md @@ -0,0 +1,256 @@ +# HTTP Bot Adapter — Integration Guide + +Integrate **any backend system** with a LangBot pipeline over plain HTTP. Push +messages in via a signed webhook; receive replies on a callback URL. No +long-lived connection, full support for message **aggregation** (many inbound +messages merged into one turn) and **multi-part replies** (one turn → many +outbound messages). + +This is the right adapter for **server-to-server** integrations — ticketing +systems, CRMs, internal tools, custom web backends. (For an in-browser, +real-time chat widget, use the embeddable Web Page Bot instead.) + +> **5-minute goal:** stand up a callback receiver, send a message, and watch a +> multi-part reply arrive — using the reference client in +> [`examples/http-bot/`](../../examples/http-bot/). + +--- + +## 1. Mental model + +``` +Your backend ──(1) POST signed message──► LangBot /bots/ + (pipeline runs: aggregate → think → reply) +Your callback ◄─(2) POST signed reply(s)── LangBot one POST per reply part +``` + +- **(1) Inbound** is *fire-and-collect*: LangBot answers `202 Accepted` + immediately and does **not** return the pipeline result on that response. +- **(2) Outbound** replies arrive later as separate signed POSTs to your + `callback_url`. A single turn may produce **several** callbacks (e.g. a tool + call narration followed by the final answer). +- Everything is keyed by a **`session_id` you choose** (e.g. a ticket number). + Each `session_id` maps to one isolated LangBot conversation. + +--- + +## 2. Create the bot + +1. In the LangBot dashboard, add a bot and choose the **HTTP Bot** platform. +2. Fill in the config: + + | Field | Required | Notes | + |---|---|---| + | **Inbound Signing Secret** | yes | Your backend signs inbound requests with this. | + | **Outbound Callback URL** | yes | Where LangBot POSTs replies. **Config-only** — cannot be overridden per message (SSRF protection). | + | **Outbound Signing Secret** | no | LangBot signs callbacks with this; defaults to the inbound secret. | + | **Default Session Type** | no | `person` (default) or `group`. | + | **Require Inbound Signature** | no | Keep `true` in production. | + | **Callback Timeout / Max Retries** | no | Defaults: 15s, 3 retries. | + +3. Bind the bot to a **pipeline** and **enable** it. +4. Copy the **Inbound Webhook URL** shown in the config — it looks like + `https://your-langbot/bots/`. + +--- + +## 3. The signature scheme + +Both directions use the same dependency-free HMAC-SHA256 scheme: + +``` +signing_string = "{timestamp}." + raw_body_bytes +signature = "sha256=" + hex(HMAC_SHA256(secret, signing_string)) +``` + +Sent as headers: + +| Header | Meaning | +|---|---| +| `X-LB-Timestamp` | Unix seconds. Rejected if more than **±300s** from server time. | +| `X-LB-Signature` | `sha256=` over `"{timestamp}." + body`. | +| `X-LB-Idempotency-Key` | *(optional, inbound)* dedup key; retries with the same key return `409`. | + +Verify outbound callbacks the same way, using the **outbound** secret (or the +inbound secret if you left it blank). + +A six-line reference implementation is in `examples/http-bot/client.py` +(`sign()` / `verify()`); a Node/TS version is in `client.ts`. + +--- + +## 4. Send your first message (curl) + +```bash +BOT="https://your-langbot/bots/" +SECRET="your-inbound-secret" +BODY='{"session_id":"ticket-10293","message":[{"type":"Plain","text":"Export keeps failing on the dashboard."}]}' +TS=$(date +%s) +SIG="sha256=$(printf '%s.%s' "$TS" "$BODY" | openssl dgst -sha256 -hmac "$SECRET" -r | cut -d' ' -f1)" + +curl -sS -X POST "$BOT" \ + -H "Content-Type: application/json" \ + -H "X-LB-Timestamp: $TS" \ + -H "X-LB-Signature: $SIG" \ + -d "$BODY" +# -> 202 {"code":0,"msg":"accepted","data":{"session_id":"ticket-10293","accepted_message_id":"in_...","aggregating":true}} +``` + +The reply(s) will be POSTed to your configured callback URL shortly after. + +--- + +## 5. Inbound request format + +`POST /bots/{bot_uuid}` + +```jsonc +{ + "session_id": "ticket-10293", // REQUIRED. Your stable id. Maps 1:1 to a LangBot session. + "session_type": "person", // optional: "person" | "group"; default from config + "sender": { // optional metadata, surfaced to the pipeline/plugins + "id": "user-5567", + "name": "Alice" + }, + "message": [ // REQUIRED. A LangBot MessageChain (array of segments). + { "type": "Plain", "text": "Export keeps failing on the dashboard." }, + { "type": "Image", "url": "https://example.com/screenshot.png" } + ] +} +``` + +**Message segments.** Text uses `{"type":"Plain","text":"..."}`. Images use +`{"type":"Image","url":"..."}` (or `base64`). Other supported types: `Voice`, +`File`, `At`, `Quote`. + +> Note: the callback URL is **not** accepted in the body — it is taken only from +> bot config. This is deliberate (prevents an attacker who obtains the inbound +> secret from redirecting replies to an arbitrary host). + +### Aggregation (N → 1) + +If your pipeline has **message aggregation** enabled, send several messages with +the **same `session_id`** within the aggregation window and they are merged into +**one** pipeline turn. No special flag — just reuse the `session_id`. + +--- + +## 6. Outbound callback format + +LangBot POSTs each reply part to your `callback_url`: + +```jsonc +{ + "session_id": "ticket-10293", // echoes the inbound session + "reply_to": "in_01H...", // the accepted_message_id this answers + "sequence": 1, // 1-based ordinal within this turn + "is_final": false, // true on the last part of the turn + "stream": false, // true for streamed chunks + "message": [ { "type": "Plain", "text": "Looking into it…" } ], + "timestamp": "2026-06-22T09:00:01Z" +} +``` + +Your endpoint should return `2xx` quickly. Non-2xx / timeout → LangBot retries +with exponential backoff (up to `callback_max_retries`). + +### Multi-part replies (1 → M) + +One turn may emit multiple callbacks, delivered **in `sequence` order** for a +given session: + +``` +seq=1 is_final=false "Checking your export logs…" +seq=2 is_final=false "Found 2 failed exports." +seq=3 is_final=true "Fixed — please try again." +``` + +Stitch by `session_id` + `sequence`; the turn is complete when +`is_final: true` arrives. + +--- + +## 7. Reset a session + +Start a fresh conversation for a `session_id` (drops history): + +``` +POST /bots/{bot_uuid}/reset +{ "session_id": "ticket-10293", "session_type": "person" } +→ 200 { "code":0, "msg":"reset", "data": { "session_id":"ticket-10293", "removed": true } } +``` + +Signed exactly like an inbound message. + +--- + +## 8. Synchronous convenience mode + +If you don't need streaming/multi-part and just want one reply back on the same +HTTP call, POST to `/sync`. LangBot waits for the turn to finish and returns all +parts **collapsed** into one array: + +``` +POST /bots/{bot_uuid}/sync +{ "session_id": "ticket-10293", "message": [ { "type":"Plain", "text":"hi" } ] } +→ 200 { "code":0, "msg":"ok", + "data": { "session_id":"ticket-10293", "reply_to":"in_...", + "message": [ {"type":"Plain","text":"..."}, ... ] } } +``` + +This is **lossy** (you lose `sequence` / streaming boundaries) and blocks up to +`callback_timeout × 4` seconds. Prefer the callback model for anything +real-time or multi-part. Only one in-flight `/sync` per `session_id`. + +--- + +## 9. Error envelope + +```jsonc +{ "code": 40101, "msg": "invalid signature: signature_mismatch", "data": null } +``` + +| HTTP | code | meaning | +|---|---|---| +| 202 | 0 | accepted | +| 400 | 40001 | malformed body / missing `session_id` or `message` | +| 401 | 40101 | bad/expired signature | +| 409 | 40901 | duplicate idempotency key | +| 413 | 41301 | message too large (>1 MiB) | +| 500 | 50001 | internal error | + +--- + +## 10. Try it end-to-end in 5 minutes + +```bash +cd examples/http-bot +pip install flask requests + +# Terminal 1 — your callback receiver (point the bot's callback_url here, e.g. via a tunnel): +python client.py serve --port 8900 --secret SHARED_SECRET + +# Terminal 2 — push a message: +python client.py push \ + --url https://your-langbot/bots/ \ + --secret SHARED_SECRET \ + --session ticket-1 \ + --text "hello" +``` + +Watch Terminal 1 print each reply part (`[part ]` / `[FINAL]`) with its +sequence number — that's 1→M working, signatures verified. + +A machine-readable contract is in +[`docs/http-bot-openapi.json`](../http-bot-openapi.json). + +--- + +## 11. Security checklist + +- Keep **Require Inbound Signature** on in production. +- Use **HTTPS** callback URLs; the URL is config-only (no per-message override). +- Treat the secrets like passwords; rotate via the dashboard. +- The inbound route is unauthenticated at the framework level **by design** — + security comes entirely from the HMAC signature, so never disable it on a + public deployment. diff --git a/examples/http-bot/README.md b/examples/http-bot/README.md new file mode 100644 index 000000000..b04387d78 --- /dev/null +++ b/examples/http-bot/README.md @@ -0,0 +1,75 @@ +# HTTP Bot Adapter — Reference Clients + +> English | [中文](./README.zh.md) + +Minimal, dependency-light clients for the LangBot **HTTP Bot** platform adapter. +They show the whole loop: signing a request, pushing a message, and receiving +multi-part replies on a callback endpoint. + +Full guide: [docs.langbot.app — HTTP Bot](https://docs.langbot.app/en/usage/platforms/http-bot). +Machine-readable contract: [`docs/http-bot-openapi.json`](../../docs/http-bot-openapi.json). + +## Files + +| File | What it is | +|---|---| +| `playground.py` | **Interactive browser debug console** — a single-file web app you open in a browser to chat with a running `http_bot` bot and watch signing / 202 / callbacks live. Zero extra deps. | +| `client.py` | Python client + Flask callback receiver (`pip install flask requests`). | +| `client.ts` | TypeScript/Node 18+ client + callback receiver, **zero deps** (`npx tsx client.ts`). | + +All three implement the identical HMAC-SHA256 scheme +(`sha256=hex(HMAC(secret, "{timestamp}." + body))`) — verified byte-for-byte +against the adapter. + +## Interactive playground (recommended first run) + +A self-contained web console: type a message in your browser, it is signed and +POSTed to a **running** `http_bot` bot, and the bot's replies stream back into +the page — with a debug panel showing the signature, the `202` ack, and each +callback's `sequence` / signature-verification. + +```bash +# From the LangBot repo root, with the backend already running: +PUBLIC_IP= ./.venv/bin/python examples/http-bot/playground.py +# then open http://:8920/ +``` + +On startup it reads the LangBot API key + the `http_bot` bot from +`data/langbot.db`, and configures that bot (inbound/outbound secret + +`callback_url`) to point back at itself via the LangBot API — the bot reloads +live, no restart needed. Requirements: an enabled `http_bot` bot bound to a +working pipeline, and port `8920` reachable from your browser. + +Env knobs: `PUBLIC_IP` (default `127.0.0.1`), `PLAYGROUND_PORT` (default `8920`). + +## Headless clients + +```bash +# Python — Terminal 1: callback receiver (your callback_url target) +python client.py serve --port 8900 --secret SHARED_SECRET + +# Python — Terminal 2: push a message +python client.py push --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-1 --text "hello" + +# blocking sync mode +python client.py sync --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-1 --text "hello" + +# reset a session +python client.py reset --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-1 +``` + +```bash +# TypeScript (Node 18+) +npx tsx client.ts serve 8900 SHARED_SECRET +npx tsx client.ts push https://your-langbot/bots/ SHARED_SECRET ticket-1 "hello" +``` + +When the bot replies, the receiver prints each part with its `sequence` and an +`[FINAL]` marker on the last one — that's the 1→M multi-reply model in action. + +> The bot's `callback_url` must be reachable from LangBot. For local testing, +> expose your receiver with a tunnel (cloudflared / ngrok) and set that URL in +> the bot config. diff --git a/examples/http-bot/README.zh.md b/examples/http-bot/README.zh.md new file mode 100644 index 000000000..1baf81272 --- /dev/null +++ b/examples/http-bot/README.zh.md @@ -0,0 +1,71 @@ +# HTTP Bot 适配器 —— 参考客户端 + +> [English](./README.md) | 中文 + +面向 LangBot **HTTP Bot** 平台适配器的极简、低依赖客户端示例。 +它们完整展示了整条链路:对请求签名、推送一条消息、在回调端点接收 +1→M 的多段回复。 + +完整指南:[docs.langbot.app —— HTTP Bot](https://docs.langbot.app/zh/usage/platforms/http-bot)。 +机器可读的接口契约:[`docs/http-bot-openapi.json`](../../docs/http-bot-openapi.json)。 + +## 文件清单 + +| 文件 | 是什么 | +|---|---| +| `playground.py` | **浏览器交互式调试台** —— 单文件 Web 应用,在浏览器里和一个运行中的 `http_bot` bot 对话,实时观察签名 / 202 / 回调。零额外依赖。 | +| `client.py` | Python 客户端 + Flask 回调接收端(`pip install flask requests`)。 | +| `client.ts` | TypeScript/Node 18+ 客户端 + 回调接收端,**零依赖**(`npx tsx client.ts`)。 | + +三者实现完全一致的 HMAC-SHA256 签名方案 +(`sha256=hex(HMAC(secret, "{timestamp}." + body))`)—— 已与适配器逐字节比对验证。 + +## 交互式 playground(推荐先跑这个) + +一个自包含的 Web 控制台:在浏览器里输入消息,它会被签名并 POST 给一个 +**运行中**的 `http_bot` bot,bot 的回复会流式回到页面上 —— 调试面板会显示 +签名、`202` 确认,以及每条回调的 `sequence` / 签名验证结果。 + +```bash +# 在 LangBot 仓库根目录、后端已启动的前提下: +PUBLIC_IP=<你的主机IP> ./.venv/bin/python examples/http-bot/playground.py +# 然后打开 http://<你的主机IP>:8920/ +``` + +启动时它会从 `data/langbot.db` 读取 LangBot API key 和 `http_bot` bot, +并通过 LangBot API 把该 bot 配好(入站/出站密钥 + `callback_url`)指回自己 —— +bot 会热加载,无需重启。前提:有一个已启用、绑定了可用 pipeline 的 +`http_bot` bot,且端口 `8920` 能从你的浏览器访问到。 + +可调环境变量:`PUBLIC_IP`(默认 `127.0.0.1`)、`PLAYGROUND_PORT`(默认 `8920`)。 + +## 无头客户端 + +```bash +# Python —— 终端 1:回调接收端(你的 callback_url 指向它) +python client.py serve --port 8900 --secret SHARED_SECRET + +# Python —— 终端 2:推送一条消息 +python client.py push --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-1 --text "hello" + +# 阻塞式同步模式 +python client.py sync --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-1 --text "hello" + +# 重置一个会话 +python client.py reset --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-1 +``` + +```bash +# TypeScript(Node 18+) +npx tsx client.ts serve 8900 SHARED_SECRET +npx tsx client.ts push https://your-langbot/bots/ SHARED_SECRET ticket-1 "hello" +``` + +当 bot 回复时,接收端会逐条打印,带上各自的 `sequence`,并在最后一条标记 +`[FINAL]` —— 这就是 1→M 多段回复模型的实际效果。 + +> bot 的 `callback_url` 必须能从 LangBot 访问到。本地测试时,可用隧道 +> (cloudflared / ngrok)把你的接收端暴露出去,并把那个 URL 填进 bot 配置。 diff --git a/examples/http-bot/client.py b/examples/http-bot/client.py new file mode 100644 index 000000000..cbf9eff43 --- /dev/null +++ b/examples/http-bot/client.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +"""LangBot HTTP Bot adapter — reference client (Python). + +Two things in one file: + +1. ``push()`` / ``push_sync()`` — send a message into a LangBot ``http_bot`` bot. +2. A tiny Flask callback receiver that verifies signatures and prints replies, + so you can watch N->1 aggregation and 1->M multi-reply working live. + +Usage +----- + pip install flask requests + + # Terminal 1 — start the callback receiver (this is your callback_url): + python client.py serve --port 8900 --secret SHARED_SECRET + + # Terminal 2 — push a message (async; reply lands on the receiver): + python client.py push \ + --url https://your-langbot/bots/ \ + --secret SHARED_SECRET \ + --session ticket-10293 \ + --text "Export keeps failing on the dashboard." + + # Or push and block for the collapsed reply (sync convenience mode): + python client.py sync --url https://your-langbot/bots/ \ + --secret SHARED_SECRET --session ticket-10293 --text "hi" + +The signing scheme is HMAC-SHA256 over ``"{timestamp}." + raw_body``; see +``sign()`` below — it is intentionally tiny and easy to port. +""" + +from __future__ import annotations + +import argparse +import hashlib +import hmac +import json +import sys +import time +import uuid + +HEADER_TIMESTAMP = 'X-LB-Timestamp' +HEADER_SIGNATURE = 'X-LB-Signature' +HEADER_IDEMPOTENCY = 'X-LB-Idempotency-Key' +REPLAY_WINDOW = 300 + + +def sign(secret: str, body: bytes, timestamp: int | None = None) -> tuple[str, str]: + """Return (timestamp, signature) for *body*.""" + ts = str(timestamp if timestamp is not None else int(time.time())) + mac = hmac.new(secret.encode(), f'{ts}.'.encode() + body, hashlib.sha256) + return ts, 'sha256=' + mac.hexdigest() + + +def verify(secret: str, body: bytes, timestamp: str | None, signature: str | None) -> bool: + """Verify an inbound signature (used by the callback receiver).""" + if not timestamp or not signature: + return False + try: + if abs(int(time.time()) - int(float(timestamp))) > REPLAY_WINDOW: + return False + except ValueError: + return False + _, expected = sign(secret, body, int(float(timestamp))) + return hmac.compare_digest(expected, signature) + + +def _post(url: str, secret: str, payload: dict, idempotency: bool = True): + import requests + + body = json.dumps(payload, ensure_ascii=False).encode() + ts, sig = sign(secret, body) + headers = { + 'Content-Type': 'application/json', + HEADER_TIMESTAMP: ts, + HEADER_SIGNATURE: sig, + } + if idempotency: + headers[HEADER_IDEMPOTENCY] = uuid.uuid4().hex + resp = requests.post(url, data=body, headers=headers, timeout=30) + print(f'-> {resp.status_code} {resp.text}') + return resp + + +def push(url: str, secret: str, session: str, text: str, session_type: str = 'person'): + """Fire-and-collect: returns 202 immediately; reply arrives on your callback.""" + payload = { + 'session_id': session, + 'session_type': session_type, + 'message': [{'type': 'Plain', 'text': text}], + } + return _post(url.rstrip('/'), secret, payload) + + +def push_sync(url: str, secret: str, session: str, text: str, session_type: str = 'person'): + """Blocking convenience: POST to /sync and get the collapsed reply back.""" + payload = { + 'session_id': session, + 'session_type': session_type, + 'message': [{'type': 'Plain', 'text': text}], + } + resp = _post(url.rstrip('/') + '/sync', secret, payload, idempotency=False) + return resp + + +def reset(url: str, secret: str, session: str, session_type: str = 'person'): + """Reset a session's conversation (next message starts fresh).""" + payload = {'session_id': session, 'session_type': session_type} + return _post(url.rstrip('/') + '/reset', secret, payload, idempotency=False) + + +def serve(port: int, secret: str): + """Run a callback receiver that verifies signatures and prints replies.""" + from flask import Flask, request + + app = Flask(__name__) + + @app.route('/', methods=['POST']) + def recv(): + raw = request.get_data() + ok = verify(secret, raw, request.headers.get(HEADER_TIMESTAMP), request.headers.get(HEADER_SIGNATURE)) + if not ok: + print('!! signature verification FAILED — rejecting') + return {'error': 'bad signature'}, 401 + data = json.loads(raw) + text_parts = [c.get('text', '') for c in data.get('message', []) if c.get('type') == 'Plain'] + marker = 'FINAL' if data.get('is_final') else 'part ' + print( + f'[{marker}] session={data["session_id"]} seq={data["sequence"]} ' + f'reply_to={data.get("reply_to")}: {" ".join(text_parts)}' + ) + return {'ok': True} + + print(f'callback receiver listening on http://0.0.0.0:{port}/ (Ctrl-C to stop)') + app.run(host='0.0.0.0', port=port) + + +def main(argv=None): + p = argparse.ArgumentParser(description='LangBot HTTP Bot reference client') + sub = p.add_subparsers(dest='cmd', required=True) + + sp = sub.add_parser('serve', help='run the callback receiver') + sp.add_argument('--port', type=int, default=8900) + sp.add_argument('--secret', required=True) + + for name in ('push', 'sync', 'reset'): + c = sub.add_parser(name) + c.add_argument('--url', required=True, help='https://host/bots/') + c.add_argument('--secret', required=True) + c.add_argument('--session', required=True) + c.add_argument('--session-type', default='person', choices=['person', 'group']) + if name != 'reset': + c.add_argument('--text', required=True) + + args = p.parse_args(argv) + if args.cmd == 'serve': + serve(args.port, args.secret) + elif args.cmd == 'push': + push(args.url, args.secret, args.session, args.text, args.session_type) + elif args.cmd == 'sync': + push_sync(args.url, args.secret, args.session, args.text, args.session_type) + elif args.cmd == 'reset': + reset(args.url, args.secret, args.session, args.session_type) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/examples/http-bot/client.ts b/examples/http-bot/client.ts new file mode 100644 index 000000000..ec121823c --- /dev/null +++ b/examples/http-bot/client.ts @@ -0,0 +1,123 @@ +/** + * LangBot HTTP Bot adapter — reference client (TypeScript / Node 18+). + * + * Zero runtime dependencies (uses global `fetch`, `crypto`, and `http`). + * + * - `push()` : fire-and-collect; reply lands on your callback URL. + * - `pushSync()` : POST /sync and await the collapsed reply. + * - `reset()` : reset a session's conversation. + * - `startReceiver()` : a callback server that verifies signatures and logs + * replies, so you can watch N->1 and 1->M live. + * + * Run the demos: + * npx tsx client.ts serve 8900 SHARED_SECRET + * npx tsx client.ts push https://host/bots/ SHARED_SECRET ticket-1 "hello" + * npx tsx client.ts sync https://host/bots/ SHARED_SECRET ticket-1 "hello" + * npx tsx client.ts reset https://host/bots/ SHARED_SECRET ticket-1 + */ + +import { createHmac, randomUUID, timingSafeEqual } from 'node:crypto'; +import { createServer } from 'node:http'; + +const HEADER_TIMESTAMP = 'X-LB-Timestamp'; +const HEADER_SIGNATURE = 'X-LB-Signature'; +const HEADER_IDEMPOTENCY = 'X-LB-Idempotency-Key'; +const REPLAY_WINDOW = 300; + +/** Compute the `sha256=` signature over `"{ts}." + body`. */ +export function sign(secret: string, body: Buffer | string, timestamp?: number): [string, string] { + const ts = String(timestamp ?? Math.floor(Date.now() / 1000)); + const buf = typeof body === 'string' ? Buffer.from(body) : body; + const mac = createHmac('sha256', secret).update(Buffer.concat([Buffer.from(`${ts}.`), buf])).digest('hex'); + return [ts, `sha256=${mac}`]; +} + +/** Verify an inbound signature (used by the callback receiver). */ +export function verify(secret: string, body: Buffer, timestamp?: string, signature?: string): boolean { + if (!timestamp || !signature) return false; + if (Math.abs(Math.floor(Date.now() / 1000) - Number(timestamp)) > REPLAY_WINDOW) return false; + const [, expected] = sign(secret, body, Number(timestamp)); + const a = Buffer.from(expected); + const b = Buffer.from(signature); + return a.length === b.length && timingSafeEqual(a, b); +} + +interface Segment { type: string; text?: string; url?: string; [k: string]: unknown } + +async function post(url: string, secret: string, payload: object, idempotency = true) { + const body = Buffer.from(JSON.stringify(payload)); + const [ts, sig] = sign(secret, body); + const headers: Record = { + 'Content-Type': 'application/json', + [HEADER_TIMESTAMP]: ts, + [HEADER_SIGNATURE]: sig, + }; + if (idempotency) headers[HEADER_IDEMPOTENCY] = randomUUID(); + const resp = await fetch(url, { method: 'POST', headers, body }); + const text = await resp.text(); + console.log(`-> ${resp.status} ${text}`); + return { status: resp.status, text }; +} + +/** Fire-and-collect: 202 now, reply later on your callback URL. */ +export function push(url: string, secret: string, session: string, text: string, sessionType = 'person') { + return post(url.replace(/\/$/, ''), secret, { + session_id: session, + session_type: sessionType, + message: [{ type: 'Plain', text }] as Segment[], + }); +} + +/** Blocking convenience: POST /sync, get the collapsed reply. */ +export function pushSync(url: string, secret: string, session: string, text: string, sessionType = 'person') { + return post(`${url.replace(/\/$/, '')}/sync`, secret, { + session_id: session, + session_type: sessionType, + message: [{ type: 'Plain', text }] as Segment[], + }, false); +} + +/** Reset a session's conversation. */ +export function reset(url: string, secret: string, session: string, sessionType = 'person') { + return post(`${url.replace(/\/$/, '')}/reset`, secret, { session_id: session, session_type: sessionType }, false); +} + +/** Run a callback receiver that verifies signatures and prints replies. */ +export function startReceiver(port: number, secret: string) { + const server = createServer((req, res) => { + if (req.method !== 'POST') { res.writeHead(405).end(); return; } + const chunks: Buffer[] = []; + req.on('data', (c) => chunks.push(c)); + req.on('end', () => { + const raw = Buffer.concat(chunks); + const ok = verify(secret, raw, req.headers[HEADER_TIMESTAMP.toLowerCase()] as string, + req.headers[HEADER_SIGNATURE.toLowerCase()] as string); + if (!ok) { + console.log('!! signature verification FAILED — rejecting'); + res.writeHead(401, { 'Content-Type': 'application/json' }).end(JSON.stringify({ error: 'bad signature' })); + return; + } + const data = JSON.parse(raw.toString()); + const parts = (data.message as Segment[]).filter((c) => c.type === 'Plain').map((c) => c.text).join(' '); + const marker = data.is_final ? 'FINAL' : 'part '; + console.log(`[${marker}] session=${data.session_id} seq=${data.sequence} reply_to=${data.reply_to}: ${parts}`); + res.writeHead(200, { 'Content-Type': 'application/json' }).end(JSON.stringify({ ok: true })); + }); + }); + server.listen(port, () => console.log(`callback receiver listening on http://0.0.0.0:${port}/ (Ctrl-C to stop)`)); +} + +// --- CLI --- +const [cmd, ...rest] = process.argv.slice(2); +if (cmd === 'serve') { + startReceiver(Number(rest[0] ?? 8900), rest[1] ?? 'SHARED_SECRET'); +} else if (cmd === 'push') { + push(rest[0], rest[1], rest[2], rest[3]); +} else if (cmd === 'sync') { + pushSync(rest[0], rest[1], rest[2], rest[3]); +} else if (cmd === 'reset') { + reset(rest[0], rest[1], rest[2]); +} else if (cmd) { + console.error(`unknown command: ${cmd}`); + process.exit(1); +} diff --git a/examples/http-bot/playground.py b/examples/http-bot/playground.py new file mode 100644 index 000000000..ea77d26b9 --- /dev/null +++ b/examples/http-bot/playground.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 +"""LangBot HTTP Bot — interactive playground (public, browser-based). + +This is a REAL end-to-end demo against the RUNNING LangBot instance on this +host. It is NOT a mock and NOT an in-process import: every message you type in +the browser is signed and POSTed to the live `http_bot` bot at +http://127.0.0.1:5300/bots/, and the bot's replies come back to this +server's /callback endpoint over real HTTP, then stream to your browser via SSE. + +What it does on startup: + 1. Reads the LangBot API key + the http_bot bot from data/langbot.db. + 2. Configures the bot via the LangBot API (PUT /api/v1/platform/bots/): + sets inbound_secret + outbound_secret + callback_url to point back here. + (LangBot reloads the bot live — no server restart needed.) + 3. Serves a chat page on 0.0.0.0: so you can open it from the internet. + +Run: ./.venv/bin/python examples/http-bot/playground.py +Then open: http://:/ +""" + +from __future__ import annotations + +import asyncio +import json +import os +import sqlite3 +import sys + +REPO = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.insert(0, os.path.join(REPO, 'src')) + +from aiohttp import web # noqa: E402 +import aiohttp # noqa: E402 + +from langbot.pkg.platform.sources import http_bot_signing as sg # noqa: E402 + +# ---- config ----------------------------------------------------------------- +LANGBOT_BASE = 'http://127.0.0.1:5300' +DB_PATH = os.path.join(REPO, 'data', 'langbot.db') +PUBLIC_IP = os.environ.get('PUBLIC_IP', '127.0.0.1') +PORT = int(os.environ.get('PLAYGROUND_PORT', '8920')) +SECRET = 'playground-shared-secret' + +# SSE subscribers: list of asyncio.Queue +subscribers: list[asyncio.Queue] = [] + + +def db_lookup() -> tuple[str, str]: + """Return (api_key, http_bot_uuid) from the LangBot DB.""" + db = sqlite3.connect(DB_PATH) + db.row_factory = sqlite3.Row + api_key = db.execute('SELECT key FROM api_keys LIMIT 1').fetchone()['key'] + bot = db.execute("SELECT uuid FROM bots WHERE adapter='http_bot' LIMIT 1").fetchone() + if not bot: + raise SystemExit('No http_bot bot found. Create one in the WebUI first.') + return api_key, bot['uuid'] + + +async def configure_bot(api_key: str, bot_uuid: str, callback_url: str): + """Point the live bot at this playground via the LangBot API. + + update_bot() runs a raw SQL UPDATE with whatever keys we send, so we send a + MINIMAL payload: only adapter_config (built from scratch, not read back — + the GET masks secrets). LangBot reloads + reruns the bot live. + """ + cfg = { + 'inbound_secret': SECRET, + 'outbound_secret': SECRET, + 'callback_url': callback_url, + 'signature_required': True, + 'default_session_type': 'person', + 'callback_timeout': 15, + 'callback_max_retries': 3, + } + async with aiohttp.ClientSession() as s: + async with s.put( + f'{LANGBOT_BASE}/api/v1/platform/bots/{bot_uuid}', + headers={'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json'}, + json={'adapter_config': cfg}, + ) as r: + txt = await r.text() + print(f'[configure] PUT adapter_config -> {r.status} {txt[:200]}') + return r.status < 400 + + +async def broadcast(event: dict): + for q in list(subscribers): + try: + q.put_nowait(event) + except Exception: + pass + + +# ---- HTTP handlers ---------------------------------------------------------- +async def index(request: web.Request): + return web.Response(text=PAGE, content_type='text/html') + + +async def send(request: web.Request): + """Browser -> here -> signed POST -> live LangBot bot.""" + body_in = await request.json() + session_id = body_in.get('session_id') or 'playground-1' + text = body_in.get('text', '') + bot_uuid = request.app['bot_uuid'] + + payload = { + 'session_id': session_id, + 'sender': {'id': 'browser-user', 'name': 'You'}, + 'message': [{'type': 'Plain', 'text': text}], + } + raw = json.dumps(payload, ensure_ascii=False).encode() + ts, sig = sg.sign(SECRET, raw) + url = f'{LANGBOT_BASE}/bots/{bot_uuid}' + + # echo what we send to the browser timeline + await broadcast( + {'dir': 'out', 'kind': 'request', 'session_id': session_id, 'text': text, 'url': url, 'sig': sig[:24] + '…'} + ) + + async with aiohttp.ClientSession() as s: + async with s.post( + url, + data=raw, + headers={ + 'Content-Type': 'application/json', + sg.HEADER_TIMESTAMP: ts, + sg.HEADER_SIGNATURE: sig, + }, + ) as r: + status = r.status + try: + jr = await r.json() + except Exception: + jr = {'raw': await r.text()} + await broadcast({'dir': 'in', 'kind': 'ack', 'status': status, 'data': jr}) + return web.json_response({'status': status, 'data': jr}) + + +async def callback(request: web.Request): + """Live LangBot bot -> here. Verify signature, stream to browser.""" + raw = await request.read() + ok, why = sg.verify(SECRET, raw, request.headers.get(sg.HEADER_TIMESTAMP), request.headers.get(sg.HEADER_SIGNATURE)) + data = json.loads(raw) + text = ' '.join(c.get('text', '') for c in data.get('message', []) if c.get('type') == 'Plain') + await broadcast( + { + 'dir': 'in', + 'kind': 'reply', + 'session_id': data.get('session_id'), + 'sequence': data.get('sequence'), + 'is_final': data.get('is_final'), + 'sig_ok': ok, + 'sig_why': why, + 'text': text, + } + ) + return web.json_response({'ok': True}) + + +async def events(request: web.Request): + """SSE stream to the browser.""" + resp = web.StreamResponse( + headers={ + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Access-Control-Allow-Origin': '*', + } + ) + await resp.prepare(request) + q: asyncio.Queue = asyncio.Queue() + subscribers.append(q) + try: + await resp.write(b': connected\n\n') + while True: + try: + ev = await asyncio.wait_for(q.get(), timeout=15) + await resp.write(f'data: {json.dumps(ev, ensure_ascii=False)}\n\n'.encode()) + except asyncio.TimeoutError: + await resp.write(b': ping\n\n') + except (asyncio.CancelledError, ConnectionResetError): + pass + finally: + if q in subscribers: + subscribers.remove(q) + return resp + + +PAGE = r""" + + +LangBot HTTP Bot · 调试台 + + +
+ + HTTP Bot 调试台examples/http-bot + 连接中… +
+
+ +
+

对话 · 真实发往运行中的 http_bot

+
+
+ + +
+
+ +
+

调试信息

+
入站地址
/bots/<uuid>
+
签名 HMAC-SHA256 · X-LB-Signature
+
+ 会话 + + +
+
+
+
+ +""" + + +async def main(): + api_key, bot_uuid = db_lookup() + callback_url = f'http://{PUBLIC_IP}:{PORT}/callback' + print(f'[init] http_bot uuid = {bot_uuid}') + print(f'[init] callback_url = {callback_url}') + ok = await configure_bot(api_key, bot_uuid, callback_url) + if not ok: + print('[warn] bot config update failed; check the API key / payload shape') + + app = web.Application() + app['bot_uuid'] = bot_uuid + app.router.add_get('/', index) + app.router.add_post('/send', send) + app.router.add_post('/callback', callback) + app.router.add_get('/events', events) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, '0.0.0.0', PORT) + await site.start() + print(f'\n ▶ 打开: http://{PUBLIC_IP}:{PORT}/\n') + while True: + await asyncio.sleep(3600) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/langbot/pkg/platform/sources/http_bot.py b/src/langbot/pkg/platform/sources/http_bot.py new file mode 100644 index 000000000..16a891991 --- /dev/null +++ b/src/langbot/pkg/platform/sources/http_bot.py @@ -0,0 +1,509 @@ +"""HTTP Bot adapter — standalone server-to-server platform adapter. + +Lets any external backend drive a LangBot pipeline over plain HTTP: + +* **Inbound** — the backend POSTs a signed message to the unified webhook + route ``POST /bots/``; this adapter verifies the signature, builds + a platform event carrying the caller-defined ``session_id`` as the launcher + id, and fires it into the normal pipeline (so message aggregation, N->1, + works for free). +* **Outbound** — every ``reply_message`` / ``reply_message_chunk`` the pipeline + emits is delivered as a signed POST to the configured ``callback_url``. A + single turn may emit many replies (1->M); each is one callback, ordered per + session via a small worker queue. + +Design notes: + +* The callback URL is taken **only** from adapter config (never from the + inbound message) to keep the SSRF surface closed. +* Replies for one ``session_id`` are delivered in ``sequence`` order; the + caller knows a turn is complete when ``is_final: true`` arrives. +* No new HTTP route is registered — the existing unified webhook dispatcher + (``pkg/api/http/controller/groups/webhooks.py``) calls + ``handle_unified_webhook`` on this adapter. + +See docs/platforms/http-bot.md for the full integration guide. +""" + +from __future__ import annotations + +import asyncio +import json +import time +import typing +import uuid +from datetime import datetime + +import aiohttp +import pydantic +import quart + +import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter +import langbot_plugin.api.entities.builtin.platform.message as platform_message +import langbot_plugin.api.entities.builtin.platform.events as platform_events +import langbot_plugin.api.entities.builtin.platform.entities as platform_entities +import langbot_plugin.api.definition.abstract.platform.event_logger as abstract_platform_logger + +from . import http_bot_signing as signing +from ...utils import httpclient + + +# Error envelope codes (HTTP status -> body code), documented in the design doc. +_ERR = { + 'bad_request': (400, 40001), + 'bad_signature': (401, 40101), + 'duplicate': (409, 40901), + 'too_large': (413, 41301), + 'internal': (500, 50001), +} + +# Max accepted inbound body size (bytes). +_MAX_BODY = 1 * 1024 * 1024 + +# Idempotency dedup window (seconds) and cap. +_IDEMPOTENCY_TTL = 600 +_IDEMPOTENCY_MAX = 4096 + + +class _SessionOutbound: + """Per-session outbound state: ordered delivery queue + sequence counter.""" + + def __init__(self) -> None: + self.queue: asyncio.Queue = asyncio.Queue(maxsize=1000) + self.worker: asyncio.Task | None = None + self.sequence: int = 0 + self.last_was_final: bool = True # so the first reply of a turn starts at seq 1 + + +class _SyncCollector: + """Collects reply parts for a /sync request and resolves when the turn ends.""" + + def __init__(self) -> None: + self.parts: list = [] + self.done: asyncio.Event = asyncio.Event() + + +class HttpBotAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): + """Standalone HTTP adapter (inbound webhook + outbound callbacks).""" + + bot_uuid: str = pydantic.Field(default='', exclude=True) + + listeners: dict[ + typing.Type[platform_events.Event], + typing.Callable[[platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], None], + ] = pydantic.Field(default_factory=dict, exclude=True) + + # session_id -> outbound state + outbound_states: dict[str, _SessionOutbound] = pydantic.Field(default_factory=dict, exclude=True) + # idempotency key -> accepted-at epoch + idempotency_cache: dict[str, float] = pydantic.Field(default_factory=dict, exclude=True) + # session_id -> sync collector (set while a /sync request is awaiting a turn) + sync_waiters: dict[str, '_SyncCollector'] = pydantic.Field(default_factory=dict, exclude=True) + + model_config = pydantic.ConfigDict(arbitrary_types_allowed=True) + + def __init__(self, config: dict, logger: abstract_platform_logger.AbstractEventLogger, **kwargs): + super().__init__(config=config, logger=logger, **kwargs) + self.bot_account_id = 'http_bot' + self.outbound_states = {} + self.idempotency_cache = {} + self.sync_waiters = {} + + # -- framework hooks ------------------------------------------------------ + + def set_bot_uuid(self, bot_uuid: str) -> None: + """Called by the bot manager so the adapter knows its own bot uuid.""" + object.__setattr__(self, 'bot_uuid', bot_uuid) + + def get_launcher_id(self, event: platform_events.MessageEvent) -> str: + """Map an inbound event to a LangBot launcher id. + + We return the caller-defined ``session_id`` (stashed on the sender / + group id at inbound time) so that each external session maps 1:1 to an + isolated LangBot session. + """ + if isinstance(event, platform_events.GroupMessage): + return str(event.sender.group.id) + return str(event.sender.id) + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + func: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], typing.Awaitable[None] + ], + ): + self.listeners[event_type] = func + + def unregister_listener( + self, + event_type: typing.Type[platform_events.Event], + func: typing.Callable[ + [platform_events.Event, abstract_platform_adapter.AbstractMessagePlatformAdapter], typing.Awaitable[None] + ], + ): + self.listeners.pop(event_type, None) + + async def is_muted(self, group_id: int) -> bool: + return False + + async def is_stream_output_supported(self) -> bool: + return True + + async def run_async(self): + # Purely webhook-driven; nothing to poll. Stay alive. + while True: + await asyncio.sleep(3600) + + async def kill(self): + # Cancel any outbound workers. + for state in self.outbound_states.values(): + if state.worker and not state.worker.done(): + state.worker.cancel() + return True + + # -- inbound -------------------------------------------------------------- + + def _err(self, kind: str, detail: str = ''): + status, code = _ERR[kind] + return quart.jsonify({'code': code, 'msg': detail or kind, 'data': None}), status + + def _prune_idempotency(self) -> None: + now = time.time() + if len(self.idempotency_cache) > _IDEMPOTENCY_MAX: + self.idempotency_cache.clear() + return + expired = [k for k, ts in self.idempotency_cache.items() if now - ts > _IDEMPOTENCY_TTL] + for k in expired: + self.idempotency_cache.pop(k, None) + + async def handle_unified_webhook(self, bot_uuid: str, path: str, request): + """Handle an inbound POST from the unified webhook dispatcher. + + Sub-path routing: + (no path) -> push a message + "reset" -> reset a session's conversation (body: {session_id, session_type?}) + "sync" -> push a message and wait for the final reply (collapses 1->M) + """ + object.__setattr__(self, 'bot_uuid', bot_uuid) + + if path == 'reset': + return await self._handle_reset(request) + if path == 'sync': + return await self._handle_inbound(request, sync=True) + if path in ('', None): + return await self._handle_inbound(request, sync=False) + return self._err('bad_request', f'unknown sub-path: {path}') + + async def _read_and_verify(self, request) -> tuple[dict | None, typing.Any]: + """Read body, enforce size + signature. Returns (data, error_response).""" + body = await request.get_data() + if body and len(body) > _MAX_BODY: + return None, self._err('too_large', 'message too large') + + if self.config.get('signature_required', True): + ok, reason = signing.verify( + secret=self.config.get('inbound_secret', ''), + body=body, + timestamp=request.headers.get(signing.HEADER_TIMESTAMP), + signature=request.headers.get(signing.HEADER_SIGNATURE), + ) + if not ok: + await self.logger.warning(f'http_bot inbound signature rejected: {reason}') + return None, self._err('bad_signature', f'invalid signature: {reason}') + + try: + data = json.loads(body) + except (json.JSONDecodeError, ValueError): + return None, self._err('bad_request', 'body is not valid JSON') + if not isinstance(data, dict): + return None, self._err('bad_request', 'body must be a JSON object') + return data, None + + def _build_event(self, data: dict) -> tuple[platform_events.MessageEvent, str, str, str]: + """Build a platform event from inbound data. + + Returns (event, session_id, session_type, message_id). + """ + session_id = str(data['session_id']) + session_type = data.get('session_type') or self.config.get('default_session_type', 'person') + sender_meta = data.get('sender') or {} + sender_name = str(sender_meta.get('name', 'User')) + + message_id = 'in_' + uuid.uuid4().hex + chain = platform_message.MessageChain.model_validate(data['message']) + # Carry the inbound message id + timestamp as the Source component. + chain.insert(0, platform_message.Source(id=message_id, time=datetime.now())) + + if session_type == 'group': + group = platform_entities.Group( + id=session_id, + name=str(sender_meta.get('group_name', session_id)), + permission=platform_entities.Permission.Member, + ) + sender = platform_entities.GroupMember( + id=str(sender_meta.get('id', session_id)), + member_name=sender_name, + group=group, + permission=platform_entities.Permission.Member, + ) + event = platform_events.GroupMessage(sender=sender, message_chain=chain, time=datetime.now().timestamp()) + else: + sender = platform_entities.Friend(id=session_id, nickname=sender_name, remark=sender_name) + event = platform_events.FriendMessage(sender=sender, message_chain=chain, time=datetime.now().timestamp()) + return event, session_id, session_type, message_id + + async def _handle_inbound(self, request, sync: bool): + data, err = await self._read_and_verify(request) + if err is not None: + return err + + if 'session_id' not in data or 'message' not in data: + return self._err('bad_request', 'session_id and message are required') + + # Idempotency. + idem = request.headers.get(signing.HEADER_IDEMPOTENCY) + if idem: + self._prune_idempotency() + if idem in self.idempotency_cache: + return self._err('duplicate', 'idempotency key already accepted') + self.idempotency_cache[idem] = time.time() + + try: + event, session_id, session_type, message_id = self._build_event(data) + except Exception as e: # noqa: BLE001 + return self._err('bad_request', f'failed to parse message: {e}') + + listener = self.listeners.get(type(event)) + if listener is None: + return self._err('internal', 'no listener registered for event type') + + if sync: + return await self._run_sync(event, listener, session_id, message_id) + + # Fire-and-collect: kick the pipeline, return 202 immediately. + asyncio.create_task(listener(event, self)) + return quart.jsonify( + { + 'code': 0, + 'msg': 'accepted', + 'data': { + 'session_id': session_id, + 'accepted_message_id': message_id, + 'aggregating': True, + }, + } + ), 202 + + async def _handle_reset(self, request): + data, err = await self._read_and_verify(request) + if err is not None: + return err + if 'session_id' not in data: + return self._err('bad_request', 'session_id is required') + + session_id = str(data['session_id']) + session_type = data.get('session_type') or self.config.get('default_session_type', 'person') + launcher_type = 'group' if session_type == 'group' else 'person' + + removed = await self._reset_session(launcher_type, session_id) + return quart.jsonify({'code': 0, 'msg': 'reset', 'data': {'session_id': session_id, 'removed': removed}}), 200 + + async def _reset_session(self, launcher_type: str, launcher_id: str) -> bool: + """Drop the matching session so the next message starts a fresh conversation.""" + sess_mgr = self.ap.sess_mgr + before = len(sess_mgr.session_list) + sess_mgr.session_list = [ + s + for s in sess_mgr.session_list + if not ( + str(s.launcher_type.value if hasattr(s.launcher_type, 'value') else s.launcher_type) == launcher_type + and str(s.launcher_id) == launcher_id + ) + ] + return len(sess_mgr.session_list) < before + + # -- outbound ------------------------------------------------------------- + + @staticmethod + def _extract_session_id(message_source: platform_events.MessageEvent) -> str: + if isinstance(message_source, platform_events.GroupMessage): + return str(message_source.sender.group.id) + return str(message_source.sender.id) + + @staticmethod + def _extract_reply_to(message_source: platform_events.MessageEvent) -> str: + for comp in message_source.message_chain: + if isinstance(comp, platform_message.Source): + return str(comp.id) + return '' + + def _next_sequence(self, session_id: str, is_final: bool) -> int: + state = self.outbound_states.setdefault(session_id, _SessionOutbound()) + if state.last_was_final: + state.sequence = 1 + else: + state.sequence += 1 + state.last_was_final = is_final + return state.sequence + + async def _enqueue_callback(self, session_id: str, payload: dict) -> None: + state = self.outbound_states.setdefault(session_id, _SessionOutbound()) + if state.worker is None or state.worker.done(): + state.worker = asyncio.create_task(self._outbound_worker(session_id, state)) + try: + state.queue.put_nowait(payload) + except asyncio.QueueFull: + # Drop oldest to bound memory, then enqueue (best-effort, at-least-once). + try: + state.queue.get_nowait() + except asyncio.QueueEmpty: + pass + await self.logger.warning(f'http_bot outbound queue full for session {session_id}; dropped oldest') + state.queue.put_nowait(payload) + + async def _outbound_worker(self, session_id: str, state: _SessionOutbound) -> None: + while True: + payload = await state.queue.get() + try: + await self._deliver_callback(payload) + except Exception as e: # noqa: BLE001 + await self.logger.error(f'http_bot callback delivery failed for {session_id}: {e}') + finally: + state.queue.task_done() + + async def _deliver_callback(self, payload: dict) -> None: + callback_url = self.config.get('callback_url', '') + if not callback_url: + await self.logger.warning('http_bot has no callback_url configured; dropping reply') + return + + body = json.dumps(payload, ensure_ascii=False).encode() + secret = self.config.get('outbound_secret') or self.config.get('inbound_secret', '') + ts, sig = signing.sign(secret, body) + headers = { + 'Content-Type': 'application/json', + signing.HEADER_TIMESTAMP: ts, + signing.HEADER_SIGNATURE: sig, + } + timeout = aiohttp.ClientTimeout(total=int(self.config.get('callback_timeout', 15))) + max_retries = int(self.config.get('callback_max_retries', 3)) + + session = httpclient.get_session() + attempt = 0 + while True: + attempt += 1 + try: + async with session.post(callback_url, data=body, headers=headers, timeout=timeout) as resp: + if resp.status < 400: + return + if resp.status < 500 or attempt > max_retries: + await self.logger.warning(f'http_bot callback {callback_url} -> {resp.status}, giving up') + return + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + if attempt > max_retries: + await self.logger.warning(f'http_bot callback {callback_url} failed after {attempt} tries: {e}') + return + await asyncio.sleep(min(2 ** (attempt - 1), 30)) + + async def _emit_reply( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + is_final: bool, + stream: bool, + ) -> dict: + session_id = self._extract_session_id(message_source) + reply_to = self._extract_reply_to(message_source) + sequence = self._next_sequence(session_id, is_final) + parts = [c.model_dump() if hasattr(c, 'model_dump') else c.__dict__ for c in message] + payload = { + 'session_id': session_id, + 'reply_to': reply_to, + 'sequence': sequence, + 'is_final': is_final, + 'stream': stream, + 'message': parts, + 'timestamp': datetime.now().isoformat(), + } + + # If a /sync request is awaiting this session, collect instead of POSTing. + collector = self.sync_waiters.get(session_id) + if collector is not None: + collector.parts.extend(parts) + if is_final: + collector.done.set() + return payload + + await self._enqueue_callback(session_id, payload) + return payload + + async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain) -> dict: + """Proactively push a message to a session (target_id == session_id).""" + sequence = self._next_sequence(str(target_id), is_final=True) + payload = { + 'session_id': str(target_id), + 'reply_to': '', + 'sequence': sequence, + 'is_final': True, + 'stream': False, + 'message': [c.model_dump() if hasattr(c, 'model_dump') else c.__dict__ for c in message], + 'timestamp': datetime.now().isoformat(), + } + await self._enqueue_callback(str(target_id), payload) + return payload + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ) -> dict: + return await self._emit_reply(message_source, message, is_final=True, stream=False) + + async def reply_message_chunk( + self, + message_source: platform_events.MessageEvent, + bot_message, + message: platform_message.MessageChain, + quote_origin: bool = False, + is_final: bool = False, + ) -> dict: + message_is_final = is_final and getattr(bot_message, 'tool_calls', None) is None + return await self._emit_reply(message_source, message, is_final=message_is_final, stream=True) + + # -- sync convenience mode ------------------------------------------------ + + async def _run_sync(self, event, listener, session_id: str, message_id: str): + """Push a message and wait for the final reply, collapsing 1->M parts. + + Lossy by design (drops streaming/ordering nuance); documented as such. + Concurrency-safe: routing is via the per-session ``_sync_waiters`` + registry that ``_emit_reply`` consults, not by patching methods. + """ + if session_id in self.sync_waiters: + return self._err('duplicate', 'a sync request is already in flight for this session') + + collector = _SyncCollector() + self.sync_waiters[session_id] = collector + try: + asyncio.create_task(listener(event, self)) + timeout = int(self.config.get('callback_timeout', 15)) * 4 + try: + await asyncio.wait_for(collector.done.wait(), timeout=timeout) + except asyncio.TimeoutError: + await self.logger.warning(f'http_bot sync wait timed out for session {session_id}') + finally: + self.sync_waiters.pop(session_id, None) + + return quart.jsonify( + { + 'code': 0, + 'msg': 'ok', + 'data': { + 'session_id': session_id, + 'reply_to': message_id, + 'message': collector.parts, + }, + } + ), 200 diff --git a/src/langbot/pkg/platform/sources/http_bot.svg b/src/langbot/pkg/platform/sources/http_bot.svg new file mode 100644 index 000000000..e092e755f --- /dev/null +++ b/src/langbot/pkg/platform/sources/http_bot.svg @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/src/langbot/pkg/platform/sources/http_bot.yaml b/src/langbot/pkg/platform/sources/http_bot.yaml new file mode 100644 index 000000000..56ef57f26 --- /dev/null +++ b/src/langbot/pkg/platform/sources/http_bot.yaml @@ -0,0 +1,153 @@ +apiVersion: v1 +kind: MessagePlatformAdapter +metadata: + name: http_bot + label: + en_US: HTTP Bot + zh_Hans: HTTP 通用接入 + zh_Hant: HTTP 通用接入 + ja_JP: HTTP ボット + description: + en_US: Integrate any backend over plain HTTP. Push messages in via a signed webhook, receive replies on a callback URL. Server-to-server, no long-lived connection. Preserves message aggregation (N->1) and multi-part replies (1->M). + zh_Hans: 通过 HTTP 接入任意后端系统。以签名 Webhook 推入消息,在回调地址接收回复。面向服务间集成,无需长连接。完整保留消息聚合(多条合一)与多段回复(一条问、多条回)能力。 + zh_Hant: 透過 HTTP 接入任意後端系統。以簽名 Webhook 推入訊息,在回調地址接收回覆。面向服務間整合,無需長連線。完整保留訊息聚合(多條合一)與多段回覆(一條問、多條回)能力。 + ja_JP: 任意のバックエンドを HTTP で接続。署名付き Webhook でメッセージを送信し、コールバック URL で返信を受信します。サーバー間連携、長時間接続不要。メッセージ集約(N→1)とマルチパート返信(1→M)に対応。 + icon: http_bot.svg +spec: + categories: + - popular + - global + help_links: + zh: https://docs.langbot.app/zh/platforms/http-bot + en: https://docs.langbot.app/en/platforms/http-bot + ja: https://docs.langbot.app/ja/platforms/http-bot + config: + - name: webhook_url + label: + en_US: Inbound Webhook URL + zh_Hans: 入站 Webhook 地址 + zh_Hant: 入站 Webhook 地址 + ja_JP: 受信 Webhook URL + description: + en_US: Copy this URL. Your backend POSTs messages here (signed with the inbound secret). + zh_Hans: 复制此地址。你的后端将消息以签名方式 POST 到这里。 + zh_Hant: 複製此地址。你的後端將訊息以簽名方式 POST 到這裡。 + ja_JP: この URL をコピーしてください。バックエンドは署名付きでここにメッセージを POST します。 + type: webhook-url + required: false + default: "" + - name: inbound_secret + label: + en_US: Inbound Signing Secret + zh_Hans: 入站签名密钥 + zh_Hant: 入站簽名密鑰 + ja_JP: 受信署名シークレット + description: + en_US: HMAC-SHA256 secret your backend uses to sign inbound requests. LangBot verifies every inbound POST with it. + zh_Hans: 你的后端用于对入站请求做 HMAC-SHA256 签名的密钥;LangBot 据此校验每个入站 POST。 + zh_Hant: 你的後端用於對入站請求做 HMAC-SHA256 簽名的密鑰;LangBot 據此校驗每個入站 POST。 + ja_JP: バックエンドが受信リクエストの署名に使う HMAC-SHA256 シークレット。LangBot は受信 POST ごとに検証します。 + type: string + required: true + default: "" + - name: callback_url + label: + en_US: Outbound Callback URL + zh_Hans: 出站回调地址 + zh_Hant: 出站回調地址 + ja_JP: 送信コールバック URL + description: + en_US: Where LangBot POSTs replies. One turn may trigger multiple callbacks (1->M). For security the callback URL is taken ONLY from this config and cannot be overridden per-message. + zh_Hans: LangBot 将回复 POST 到此地址。一轮对话可能触发多次回调(一问多答)。出于安全考虑,回调地址只取自此配置,不允许逐条消息覆盖。 + zh_Hant: LangBot 將回覆 POST 到此地址。一輪對話可能觸發多次回調(一問多答)。出於安全考慮,回調地址只取自此配置,不允許逐條訊息覆蓋。 + ja_JP: LangBot が返信を POST する先。1 ターンで複数回のコールバック(1→M)が発生し得ます。セキュリティ上、コールバック URL はこの設定からのみ取得し、メッセージ単位で上書きできません。 + type: string + required: true + default: "" + - name: outbound_secret + label: + en_US: Outbound Signing Secret + zh_Hans: 出站签名密钥 + zh_Hant: 出站簽名密鑰 + ja_JP: 送信署名シークレット + description: + en_US: HMAC-SHA256 secret LangBot uses to sign outbound callbacks so your receiver can verify them. Falls back to the inbound secret when empty. + zh_Hans: LangBot 用于对出站回调签名的密钥,供你的接收端校验。留空时回退使用入站密钥。 + zh_Hant: LangBot 用於對出站回調簽名的密鑰,供你的接收端校驗。留空時回退使用入站密鑰。 + ja_JP: LangBot が送信コールバックの署名に使う HMAC-SHA256 シークレット。受信側で検証できます。空の場合は受信シークレットを使用します。 + type: string + required: false + default: "" + - name: default_session_type + label: + en_US: Default Session Type + zh_Hans: 默认会话类型 + zh_Hant: 預設會話類型 + ja_JP: デフォルトセッションタイプ + description: + en_US: Session type used when an inbound message omits session_type. + zh_Hans: 入站消息未携带 session_type 时使用的会话类型。 + zh_Hant: 入站訊息未攜帶 session_type 時使用的會話類型。 + ja_JP: 受信メッセージに session_type がない場合に使用するセッションタイプ。 + type: select + options: + - name: person + label: + en_US: Person (1-on-1) + zh_Hans: 个人(一对一) + zh_Hant: 個人(一對一) + ja_JP: 個人(1 対 1) + - name: group + label: + en_US: Group + zh_Hans: 群组 + zh_Hant: 群組 + ja_JP: グループ + required: false + default: person + - name: signature_required + label: + en_US: Require Inbound Signature + zh_Hans: 强制入站签名校验 + zh_Hant: 強制入站簽名校驗 + ja_JP: 受信署名を必須にする + description: + en_US: When enabled (recommended), every inbound POST must carry a valid signature. Disable ONLY for local development behind a trusted network. + zh_Hans: 开启(推荐)后,每个入站 POST 都必须带有效签名。仅在受信任内网的本地开发时关闭。 + zh_Hant: 開啟(推薦)後,每個入站 POST 都必須帶有效簽名。僅在受信任內網的本地開發時關閉。 + ja_JP: 有効(推奨)にすると、すべての受信 POST に有効な署名が必要です。信頼できるネットワーク内のローカル開発時のみ無効化してください。 + type: boolean + required: false + default: true + - name: callback_timeout + label: + en_US: Callback Timeout (seconds) + zh_Hans: 回调超时(秒) + zh_Hant: 回調逾時(秒) + ja_JP: コールバックタイムアウト(秒) + description: + en_US: Per-callback HTTP timeout. + zh_Hans: 单次回调的 HTTP 超时时间。 + zh_Hant: 單次回調的 HTTP 逾時時間。 + ja_JP: コールバックごとの HTTP タイムアウト。 + type: integer + required: false + default: 15 + - name: callback_max_retries + label: + en_US: Callback Max Retries + zh_Hans: 回调最大重试次数 + zh_Hant: 回調最大重試次數 + ja_JP: コールバック最大リトライ回数 + description: + en_US: Retries on timeout or 5xx, with exponential backoff. + zh_Hans: 超时或 5xx 时按指数退避重试的次数。 + zh_Hant: 逾時或 5xx 時按指數退避重試的次數。 + ja_JP: タイムアウトまたは 5xx 時に指数バックオフでリトライする回数。 + type: integer + required: false + default: 3 +execution: + python: + path: ./http_bot.py + attr: HttpBotAdapter diff --git a/src/langbot/pkg/platform/sources/http_bot_signing.py b/src/langbot/pkg/platform/sources/http_bot_signing.py new file mode 100644 index 000000000..0578721c4 --- /dev/null +++ b/src/langbot/pkg/platform/sources/http_bot_signing.py @@ -0,0 +1,95 @@ +"""HMAC signing utilities for the HTTP Bot adapter. + +A dependency-free, symmetric HMAC-SHA256 scheme used in *both* directions: + + signing_string = "{timestamp}." + raw_body_bytes + signature = "sha256=" + hex(HMAC_SHA256(secret, signing_string)) + +Inbound requests are signed by the caller and verified here; outbound +callbacks are signed here and verified by the caller. The scheme is trivial to +reproduce in any language (see docs/platforms/http-bot.md for JS/curl). +""" + +from __future__ import annotations + +import hashlib +import hmac +import time + +# Header names (kept here so adapter + clients agree on a single source). +HEADER_TIMESTAMP = 'X-LB-Timestamp' +HEADER_SIGNATURE = 'X-LB-Signature' +HEADER_IDEMPOTENCY = 'X-LB-Idempotency-Key' + +# Maximum allowed clock skew between signer and verifier (seconds). +DEFAULT_REPLAY_WINDOW = 300 + + +def compute_signature(secret: str, body: bytes, timestamp: str | int) -> str: + """Compute the ``sha256=`` signature for *body* at *timestamp*. + + Args: + secret: Shared HMAC secret. + body: Raw request body bytes (exactly as sent on the wire). + timestamp: Unix timestamp (seconds) as str or int. + + Returns: + The signature string, e.g. ``sha256=ab12...``. + """ + signing_string = f'{timestamp}.'.encode() + body + digest = hmac.new(secret.encode(), signing_string, hashlib.sha256).hexdigest() + return f'sha256={digest}' + + +def sign(secret: str, body: bytes, timestamp: int | None = None) -> tuple[str, str]: + """Produce ``(timestamp, signature)`` for an outbound request. + + Args: + secret: Shared HMAC secret. + body: Raw request body bytes. + timestamp: Optional fixed timestamp; defaults to ``int(time.time())``. + + Returns: + ``(timestamp_str, signature_str)``. + """ + ts = str(timestamp if timestamp is not None else int(time.time())) + return ts, compute_signature(secret, body, ts) + + +def verify( + secret: str, + body: bytes, + timestamp: str | None, + signature: str | None, + replay_window: int = DEFAULT_REPLAY_WINDOW, +) -> tuple[bool, str]: + """Verify an inbound signature. + + Args: + secret: Shared HMAC secret. + body: Raw request body bytes. + timestamp: Value of the timestamp header. + signature: Value of the signature header. + replay_window: Max allowed skew in seconds. + + Returns: + ``(ok, reason)``. ``reason`` is empty when ``ok`` is True, otherwise a + short machine-friendly cause (``missing_headers`` / ``bad_timestamp`` / + ``expired`` / ``signature_mismatch``). + """ + if not timestamp or not signature: + return False, 'missing_headers' + + try: + ts_int = int(float(timestamp)) + except (ValueError, TypeError): + return False, 'bad_timestamp' + + if abs(int(time.time()) - ts_int) > replay_window: + return False, 'expired' + + expected = compute_signature(secret, body, timestamp) + if not hmac.compare_digest(expected, signature): + return False, 'signature_mismatch' + + return True, ''