feat(qqofficial): implement one-click QR binding and enhance localization support

This commit is contained in:
fdc310
2026-06-17 00:31:29 +08:00
parent e6cfee541f
commit b55f073e62
11 changed files with 394 additions and 2 deletions
@@ -5,6 +5,29 @@ from ... import group
from langbot.pkg.utils import importutil
def _decrypt_qqofficial_secret(encrypted_b64: str, key: bytes) -> str:
"""Decrypt the AppSecret returned by the QQ Official QR binding endpoint.
The base64 payload is laid out as `nonce (12 B) | ciphertext | tag (16 B)`.
`key` is the 32-byte AES-256 key locally generated when the bind task
was created and submitted as `key` to `q.qq.com/lite/create_bind_task`.
"""
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
try:
raw = base64.b64decode(encrypted_b64)
except Exception as exc:
raise ValueError('Malformed encrypted credential') from exc
if len(key) != 32 or len(raw) <= 28:
raise ValueError('Invalid encrypted credential layout')
nonce, ciphertext, tag = raw[:12], raw[12:-16], raw[-16:]
try:
return AESGCM(key).decrypt(nonce, ciphertext + tag, None).decode('utf-8')
except Exception as exc:
raise ValueError('Failed to decrypt credential') from exc
@group.group_class('adapters', '/api/v1/platform/adapters')
class AdaptersRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@@ -650,3 +673,220 @@ class AdaptersRouterGroup(group.RouterGroup):
if session and session.get('task') and not session['task'].done():
session['task'].cancel()
return self.success(data={})
# -----------------------------------------------------------------------
# QQ Official QR Binding
# -----------------------------------------------------------------------
_qqofficial_sessions: dict = {}
_QQOFFICIAL_SESSION_TTL = 300 # 5 minutes (QQ bind QR validity window)
def _cleanup_expired_qqofficial_sessions():
import time
now = time.time()
expired = [
sid for sid, s in _qqofficial_sessions.items() if now - s.get('created_at', 0) > _QQOFFICIAL_SESSION_TTL
]
for sid in expired:
session = _qqofficial_sessions.pop(sid, None)
if session and session.get('task') and not session['task'].done():
session['task'].cancel()
@self.route('/qqofficial/bind', methods=['POST'])
async def _() -> str:
"""Start QQ Official QR binding. Returns session_id + QR URL.
Flow: generate a local AES-256 key, register it with
`q.qq.com/lite/create_bind_task`, then poll
`q.qq.com/lite/poll_bind_result` until the user authorizes the
bind inside the QQ Bot Assistant on mobile QQ. The encrypted
AppSecret returned by the poll endpoint is decrypted with the
same key. The key never leaves this process.
"""
import uuid
import time
import secrets
import base64
import aiohttp
QQ_BIND_BASE = 'https://q.qq.com'
_cleanup_expired_qqofficial_sessions()
bind_key_bytes = secrets.token_bytes(32)
bind_key = base64.b64encode(bind_key_bytes).decode('ascii')
session_id = str(uuid.uuid4())
session = {
'status': 'pending',
'qr_url': None,
'expire_at': None,
'appid': None,
'secret': None,
'user_openid': None,
'error': None,
'created_at': time.time(),
'task_id': None,
'bind_key_bytes': bind_key_bytes,
'interval': 2,
}
_qqofficial_sessions[session_id] = session
async def run_qr_binding():
try:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as http:
# Step 1: create_bind_task — register our AES key, get task_id
async with http.post(
f'{QQ_BIND_BASE}/lite/create_bind_task',
json={'key': bind_key},
headers={'Accept': 'application/json'},
) as resp:
try:
data = await resp.json(content_type=None)
except (aiohttp.ContentTypeError, ValueError):
session['status'] = 'error'
session['error'] = 'Invalid response from QQ bind service'
return
if int(data.get('retcode', -1)) != 0:
session['status'] = 'error'
session['error'] = (
data.get('msg') or data.get('message') or 'Failed to create bind task'
)
return
task_id = str((data.get('data') or {}).get('task_id') or '').strip()
if not task_id:
session['status'] = 'error'
session['error'] = 'Missing task_id in QQ response'
return
# The QR encodes a URL that mobile QQ opens inside the QQ Bot Assistant.
# `source=langbot` is a courtesy attribution parameter so Tencent
# can see LangBot adoption metrics, matching the convention used by
# other third-party integrations (e.g. hermes-agent uses `source=hermes`).
qr_url = f'{QQ_BIND_BASE}/qqbot/openclaw/connect.html?task_id={task_id}&_wv=2&source=langbot'
session['task_id'] = task_id
session['qr_url'] = qr_url
session['expire_at'] = time.time() + _QQOFFICIAL_SESSION_TTL
session['status'] = 'waiting'
# Step 2: poll_bind_result until completed (status=2) or expired (3).
deadline = time.time() + _QQOFFICIAL_SESSION_TTL
while time.time() < deadline:
await asyncio.sleep(session['interval'])
async with http.post(
f'{QQ_BIND_BASE}/lite/poll_bind_result',
json={'task_id': task_id},
headers={'Accept': 'application/json'},
) as poll_resp:
try:
poll_data = await poll_resp.json(content_type=None)
except (aiohttp.ContentTypeError, ValueError):
continue
if int(poll_data.get('retcode', -1)) != 0:
session['status'] = 'error'
session['error'] = poll_data.get('msg') or poll_data.get('message') or 'Poll failed'
return
payload = poll_data.get('data') or {}
try:
raw_status = int(payload.get('status', 0))
except (TypeError, ValueError):
raw_status = 0
if raw_status == 2:
appid = str(payload.get('bot_appid') or '').strip()
encrypted = str(payload.get('bot_encrypt_secret') or '').strip()
if not appid or not encrypted:
session['status'] = 'error'
session['error'] = 'Incomplete credential payload'
return
try:
session['secret'] = _decrypt_qqofficial_secret(
encrypted,
bind_key_bytes,
)
except ValueError as exc:
session['status'] = 'error'
session['error'] = str(exc)
return
session['appid'] = appid
# The scanner's OpenID is returned alongside the credentials —
# surfaced to the dashboard for audit / "bound by" display.
session['user_openid'] = str(payload.get('user_openid') or '').strip() or None
session['status'] = 'success'
return
if raw_status == 3:
session['status'] = 'expired'
session['error'] = 'QR code expired'
return
# status 0 / 1: still pending, continue polling
session['status'] = 'expired'
session['error'] = 'QR code expired'
except asyncio.CancelledError:
return
except Exception as e:
session['status'] = 'error'
session['error'] = str(e)
task = asyncio.create_task(run_qr_binding())
session['task'] = task
# Wait up to 10s for the QR URL to be ready before responding.
for _ in range(20):
if session['qr_url'] or session['error']:
break
await asyncio.sleep(0.5)
if session['error']:
task.cancel()
return self.http_status(502, -1, session['error'])
if not session['qr_url']:
task.cancel()
session['status'] = 'error'
session['error'] = 'Timeout waiting for QR code'
return self.http_status(504, -1, 'Timeout waiting for QR code')
return self.success(
data={
'session_id': session_id,
'qr_url': session['qr_url'],
'expire_at': session['expire_at'],
}
)
@self.route('/qqofficial/bind/status/<session_id>', methods=['GET'])
async def _(session_id: str) -> str:
"""Poll QQ Official QR binding status."""
_cleanup_expired_qqofficial_sessions()
session = _qqofficial_sessions.get(session_id)
if not session:
return self.http_status(404, -1, 'Session not found')
data = {'status': session['status']}
if session['status'] == 'success':
data['appid'] = session['appid']
data['secret'] = session['secret']
if session.get('user_openid'):
data['user_openid'] = session['user_openid']
_qqofficial_sessions.pop(session_id, None)
elif session['status'] in ('error', 'expired'):
data['error'] = session['error']
_qqofficial_sessions.pop(session_id, None)
return self.success(data=data)
@self.route('/qqofficial/bind/<session_id>', methods=['DELETE'])
async def _(session_id: str) -> str:
"""Cancel and clean up a QQ Official QR binding session."""
session = _qqofficial_sessions.pop(session_id, None)
if session and session.get('task') and not session['task'].done():
session['task'].cancel()
return self.success(data={})