From 19f417174ce2ee56a0530226d21f77fa55540597 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 29 Dec 2025 22:43:19 +0800 Subject: [PATCH] feat: implement SpaceService for OAuth handling and user management, refactor UserService to utilize new service methods --- .../pkg/api/http/controller/groups/user.py | 11 +- src/langbot/pkg/api/http/service/space.py | 172 ++++++++++++++++++ src/langbot/pkg/api/http/service/user.py | 126 ++----------- src/langbot/pkg/core/app.py | 3 + src/langbot/pkg/core/stages/build_app.py | 4 + src/langbot/pkg/entity/persistence/user.py | 1 + .../dbm014_space_account_support.py | 12 ++ 7 files changed, 219 insertions(+), 110 deletions(-) create mode 100644 src/langbot/pkg/api/http/service/space.py diff --git a/src/langbot/pkg/api/http/controller/groups/user.py b/src/langbot/pkg/api/http/controller/groups/user.py index 91aa4f47..086f9c28 100644 --- a/src/langbot/pkg/api/http/controller/groups/user.py +++ b/src/langbot/pkg/api/http/controller/groups/user.py @@ -105,7 +105,7 @@ class UserRouterGroup(group.RouterGroup): return self.fail(1, 'Missing redirect_uri parameter') try: - authorize_url = self.ap.user_service.get_space_oauth_authorize_url(redirect_uri, state) + authorize_url = self.ap.space_service.get_oauth_authorize_url(redirect_uri, state) return self.success(data={'authorize_url': authorize_url}) except Exception as e: return self.fail(1, str(e)) @@ -121,15 +121,18 @@ class UserRouterGroup(group.RouterGroup): try: # Exchange code for tokens - token_data = await self.ap.user_service.exchange_space_oauth_code(code) + token_data = await self.ap.space_service.exchange_oauth_code(code) access_token = token_data.get('access_token') refresh_token = token_data.get('refresh_token') + expires_in = token_data.get('expires_in', 0) if not access_token: return self.fail(1, 'Failed to get access token from Space') # Authenticate and create/update local user - jwt_token, user_obj = await self.ap.user_service.authenticate_space_user(access_token, refresh_token) + jwt_token, user_obj = await self.ap.user_service.authenticate_space_user( + access_token, refresh_token, expires_in + ) return self.success( data={ @@ -161,7 +164,7 @@ class UserRouterGroup(group.RouterGroup): @self.route('/space-credits', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) async def _(user_email: str) -> str: """Get Space credits balance for current user""" - credits = await self.ap.user_service.get_space_credits(user_email) + credits = await self.ap.space_service.get_credits(user_email) return self.success(data={'credits': credits}) @self.route('/account-info', methods=['GET'], auth_type=group.AuthType.NONE) diff --git a/src/langbot/pkg/api/http/service/space.py b/src/langbot/pkg/api/http/service/space.py new file mode 100644 index 00000000..1961dd4c --- /dev/null +++ b/src/langbot/pkg/api/http/service/space.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import aiohttp +import typing +import datetime +import time +import sqlalchemy + +from ....core import app +from ....entity.persistence import user + + +class SpaceService: + """Service for interacting with LangBot Space API""" + + ap: app.Application + _credits_cache: typing.Dict[str, typing.Tuple[int, float]] # {user_email: (credits, timestamp)} + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + self._credits_cache = {} + + def _get_space_config(self) -> typing.Dict[str, str]: + """Get Space configuration from config file""" + space_config = self.ap.instance_config.data.get('space', {}) + return { + 'url': space_config.get('url', 'https://space.langbot.app'), + 'oauth_authorize_url': space_config.get('oauth_authorize_url', 'https://space.langbot.app/auth/authorize'), + } + + async def _get_user_by_email(self, user_email: str) -> user.User | None: + result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(user.User).where(user.User.user == user_email) + ) + result_list = result.all() + return result_list[0] if result_list else None + + async def _ensure_valid_token(self, user_email: str) -> str | None: + """Ensure access token is valid, refresh if expired. Returns valid access_token or None.""" + user_obj = await self._get_user_by_email(user_email) + if not user_obj or user_obj.account_type != 'space': + return None + + if not user_obj.space_access_token: + return None + + # Check if token is expired (with 60s buffer) + if user_obj.space_access_token_expires_at: + if datetime.datetime.now() >= user_obj.space_access_token_expires_at - datetime.timedelta(seconds=60): + # Token expired, try to refresh + if user_obj.space_refresh_token: + try: + new_token = await self._refresh_and_save_token(user_obj) + return new_token + except Exception: + return None + return None + + return user_obj.space_access_token + + async def _refresh_and_save_token(self, user_obj: user.User) -> str: + """Refresh token and save to database""" + token_data = await self.refresh_token(user_obj.space_refresh_token) + access_token = token_data.get('access_token') + expires_in = token_data.get('expires_in', 0) + + if not access_token: + raise ValueError('Failed to refresh token') + + expires_at = datetime.datetime.now() + datetime.timedelta(seconds=expires_in) if expires_in > 0 else None + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.update(user.User) + .where(user.User.user == user_obj.user) + .values( + space_access_token=access_token, + space_access_token_expires_at=expires_at, + ) + ) + + return access_token + + # === Raw API calls (no token validation) === + + def get_oauth_authorize_url(self, redirect_uri: str, state: str = '') -> str: + """Get the Space OAuth authorization URL for redirect""" + space_config = self._get_space_config() + authorize_url = space_config['oauth_authorize_url'] + params = f'redirect_uri={redirect_uri}' + if state: + params += f'&state={state}' + return f'{authorize_url}?{params}' + + async def exchange_oauth_code(self, code: str) -> typing.Dict: + """Exchange OAuth authorization code for tokens""" + from langbot.pkg.utils import constants + + space_config = self._get_space_config() + space_url = space_config['url'] + + async with aiohttp.ClientSession() as session: + async with session.post( + f'{space_url}/api/v1/accounts/oauth/token', + json={'code': code, 'instance_id': constants.instance_id}, + ) as response: + if response.status != 200: + raise ValueError(f'Failed to exchange OAuth code: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to exchange OAuth code: {data.get("msg")}') + return data.get('data', {}) + + async def refresh_token(self, refresh_token: str) -> typing.Dict: + """Refresh Space access token""" + space_config = self._get_space_config() + space_url = space_config['url'] + + async with aiohttp.ClientSession() as session: + async with session.post( + f'{space_url}/api/v1/accounts/token/refresh', json={'refresh_token': refresh_token} + ) as response: + if response.status != 200: + raise ValueError(f'Failed to refresh token: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to refresh token: {data.get("msg")}') + return data.get('data', {}) + + async def get_user_info_raw(self, access_token: str) -> typing.Dict: + """Get user info from Space using access token (no validation)""" + space_config = self._get_space_config() + space_url = space_config['url'] + + async with aiohttp.ClientSession() as session: + async with session.get( + f'{space_url}/api/v1/accounts/me', headers={'Authorization': f'Bearer {access_token}'} + ) as response: + if response.status != 200: + raise ValueError(f'Failed to get user info: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to get user info: {data.get("msg")}') + return data.get('data', {}) + + # === API calls with token validation === + + async def get_user_info(self, user_email: str) -> typing.Dict | None: + """Get user info from Space (with token validation)""" + access_token = await self._ensure_valid_token(user_email) + if not access_token: + return None + return await self.get_user_info_raw(access_token) + + async def get_credits(self, user_email: str, force_refresh: bool = False) -> int | None: + """Get Space credits for user with caching (60s TTL)""" + cache_ttl = 60 + + if not force_refresh and user_email in self._credits_cache: + credits, ts = self._credits_cache[user_email] + if time.time() - ts < cache_ttl: + return credits + + try: + info = await self.get_user_info(user_email) + if info is None: + return None + credits = info.get('credits') + if credits is not None: + self._credits_cache[user_email] = (credits, time.time()) + return credits + except Exception: + return self._credits_cache.get(user_email, (None, 0))[0] diff --git a/src/langbot/pkg/api/http/service/user.py b/src/langbot/pkg/api/http/service/user.py index 46410e9b..2b37d794 100644 --- a/src/langbot/pkg/api/http/service/user.py +++ b/src/langbot/pkg/api/http/service/user.py @@ -4,7 +4,6 @@ import sqlalchemy import argon2 import jwt import datetime -import aiohttp import typing import asyncio @@ -16,21 +15,10 @@ from ....utils import constants class UserService: ap: app.Application _create_user_lock: asyncio.Lock - _space_credits_cache: typing.Dict[str, typing.Tuple[int, float]] # {user_email: (credits, timestamp)} def __init__(self, ap: app.Application) -> None: self.ap = ap self._create_user_lock = asyncio.Lock() - self._space_credits_cache = {} - - def _get_space_config(self) -> typing.Dict[str, str]: - """Get Space configuration from config file""" - space_config = self.ap.instance_config.data.get('space', {}) - return { - 'url': space_config.get('url', 'https://space.langbot.app'), - 'models_gateway_api_url': space_config.get('models_gateway_api_url', 'https://api.langbot.cloud'), - 'oauth_authorize_url': space_config.get('oauth_authorize_url', 'https://space.langbot.app/auth/authorize'), - } async def is_initialized(self) -> bool: result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(user.User).limit(1)) @@ -131,94 +119,7 @@ class UserService: sqlalchemy.update(user.User).where(user.User.user == user_email).values(password=hashed_password) ) - # Space OAuth methods (redirect flow) - - def get_space_oauth_authorize_url(self, redirect_uri: str, state: str = '') -> str: - """Get the Space OAuth authorization URL for redirect""" - space_config = self._get_space_config() - authorize_url = space_config['oauth_authorize_url'] - - # Build the authorization URL with redirect_uri - params = f'redirect_uri={redirect_uri}' - if state: - params += f'&state={state}' - - return f'{authorize_url}?{params}' - - async def exchange_space_oauth_code(self, code: str) -> typing.Dict: - """Exchange OAuth authorization code for tokens""" - from langbot.pkg.utils import constants - - space_config = self._get_space_config() - space_url = space_config['url'] - - async with aiohttp.ClientSession() as session: - async with session.post( - f'{space_url}/api/v1/accounts/oauth/token', - json={'code': code, 'instance_id': constants.instance_id}, - ) as response: - if response.status != 200: - raise ValueError(f'Failed to exchange OAuth code: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to exchange OAuth code: {data.get("msg")}') - return data.get('data', {}) - - async def get_space_user_info(self, access_token: str) -> typing.Dict: - """Get user info from Space using access token""" - space_config = self._get_space_config() - space_url = space_config['url'] - - async with aiohttp.ClientSession() as session: - async with session.get( - f'{space_url}/api/v1/accounts/me', headers={'Authorization': f'Bearer {access_token}'} - ) as response: - if response.status != 200: - raise ValueError(f'Failed to get user info: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to get user info: {data.get("msg")}') - return data.get('data', {}) - - async def get_space_credits(self, user_email: str, force_refresh: bool = False) -> int | None: - """Get Space credits for user with caching (60s TTL)""" - import time - - cache_ttl = 60 - - if not force_refresh and user_email in self._space_credits_cache: - credits, ts = self._space_credits_cache[user_email] - if time.time() - ts < cache_ttl: - return credits - - user_obj = await self.get_user_by_email(user_email) - if not user_obj or user_obj.account_type != 'space' or not user_obj.space_access_token: - return None - - try: - info = await self.get_space_user_info(user_obj.space_access_token) - credits = info.get('credits') - if credits is not None: - self._space_credits_cache[user_email] = (credits, time.time()) - return credits - except Exception: - return self._space_credits_cache.get(user_email, (None, 0))[0] - - async def refresh_space_token(self, refresh_token: str) -> typing.Dict: - """Refresh Space access token""" - space_config = self._get_space_config() - space_url = space_config['url'] - - async with aiohttp.ClientSession() as session: - async with session.post( - f'{space_url}/api/v1/accounts/token/refresh', json={'refresh_token': refresh_token} - ) as response: - if response.status != 200: - raise ValueError(f'Failed to refresh token: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to refresh token: {data.get("msg")}') - return data.get('data', {}) + # Space user management async def create_or_update_space_user( self, @@ -227,8 +128,11 @@ class UserService: access_token: str, refresh_token: str, api_key: str, + expires_in: int = 0, ) -> user.User: """Create or update a Space user account (only if system not initialized or user exists)""" + expires_at = datetime.datetime.now() + datetime.timedelta(seconds=expires_in) if expires_in > 0 else None + async with self._create_user_lock: # Check if user with this Space UUID already exists existing_user = await self.get_user_by_space_account_uuid(space_account_uuid) @@ -242,6 +146,7 @@ class UserService: space_access_token=access_token, space_refresh_token=refresh_token, space_api_key=api_key, + space_access_token_expires_at=expires_at, ) ) return await self.get_user_by_space_account_uuid(space_account_uuid) @@ -259,6 +164,7 @@ class UserService: space_access_token=access_token, space_refresh_token=refresh_token, space_api_key=api_key, + space_access_token_expires_at=expires_at, ) ) return await self.get_user_by_email(email) @@ -280,15 +186,18 @@ class UserService: space_access_token=access_token, space_refresh_token=refresh_token, space_api_key=api_key, + space_access_token_expires_at=expires_at, ) ) return await self.get_user_by_space_account_uuid(space_account_uuid) - async def authenticate_space_user(self, access_token: str, refresh_token: str) -> typing.Tuple[str, user.User]: + async def authenticate_space_user( + self, access_token: str, refresh_token: str, expires_in: int = 0 + ) -> typing.Tuple[str, user.User]: """Authenticate with Space and return JWT token""" - # Get user info from Space - user_info = await self.get_space_user_info(access_token) + # Get user info from Space using raw API (token just obtained, no need to validate) + user_info = await self.ap.space_service.get_user_info_raw(access_token) account = user_info.get('account', {}) api_key = user_info.get('api_key', '') @@ -306,6 +215,7 @@ class UserService: access_token=access_token, refresh_token=refresh_token, api_key=api_key, + expires_in=expires_in, ) # Generate JWT token @@ -342,15 +252,18 @@ class UserService: async def bind_space_account(self, user_email: str, code: str) -> user.User: """Bind Space account to existing local account""" # Exchange code for tokens - token_data = await self.exchange_space_oauth_code(code) + token_data = await self.ap.space_service.exchange_oauth_code(code) access_token = token_data.get('access_token') refresh_token = token_data.get('refresh_token') + expires_in = token_data.get('expires_in', 0) if not access_token: raise ValueError('Failed to get access token from Space') - # Get Space user info - user_info = await self.get_space_user_info(access_token) + expires_at = datetime.datetime.now() + datetime.timedelta(seconds=expires_in) if expires_in > 0 else None + + # Get Space user info (token just obtained, use raw API) + user_info = await self.ap.space_service.get_user_info_raw(access_token) account = user_info.get('account', {}) api_key = user_info.get('api_key', '') @@ -376,6 +289,7 @@ class UserService: space_access_token=access_token, space_refresh_token=refresh_token, space_api_key=api_key, + space_access_token_expires_at=expires_at, ) ) diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index 1397f233..4dad80a3 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -19,6 +19,7 @@ from ..utils import version as version_mgr, proxy as proxy_mgr from ..persistence import mgr as persistencemgr from ..api.http.controller import main as http_controller from ..api.http.service import user as user_service +from ..api.http.service import space as space_service from ..api.http.service import model as model_service from ..api.http.service import provider as provider_service from ..api.http.service import pipeline as pipeline_service @@ -117,6 +118,8 @@ class Application: user_service: user_service.UserService = None + space_service: space_service.SpaceService = None + llm_model_service: model_service.LLMModelsService = None embedding_models_service: model_service.EmbeddingModelsService = None diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index b2a054ed..b1695fd5 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -16,6 +16,7 @@ from ...platform.webhook_pusher import WebhookPusher from ...persistence import mgr as persistencemgr from ...api.http.controller import main as http_controller from ...api.http.service import user as user_service +from ...api.http.service import space as space_service from ...api.http.service import model as model_service from ...api.http.service import provider as provider_service from ...api.http.service import pipeline as pipeline_service @@ -109,6 +110,9 @@ class BuildAppStage(stage.BootingStage): user_service_inst = user_service.UserService(ap) ap.user_service = user_service_inst + space_service_inst = space_service.SpaceService(ap) + ap.space_service = space_service_inst + llm_model_service_inst = model_service.LLMModelsService(ap) ap.llm_model_service = llm_model_service_inst diff --git a/src/langbot/pkg/entity/persistence/user.py b/src/langbot/pkg/entity/persistence/user.py index 6e834f0b..00ea7380 100644 --- a/src/langbot/pkg/entity/persistence/user.py +++ b/src/langbot/pkg/entity/persistence/user.py @@ -17,6 +17,7 @@ class User(Base): space_account_uuid = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) space_access_token = sqlalchemy.Column(sqlalchemy.Text, nullable=True) space_refresh_token = sqlalchemy.Column(sqlalchemy.Text, nullable=True) + space_access_token_expires_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True) space_api_key = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) created_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, server_default=sqlalchemy.func.now()) diff --git a/src/langbot/pkg/persistence/migrations/dbm014_space_account_support.py b/src/langbot/pkg/persistence/migrations/dbm014_space_account_support.py index 5e594a75..ea93e930 100644 --- a/src/langbot/pkg/persistence/migrations/dbm014_space_account_support.py +++ b/src/langbot/pkg/persistence/migrations/dbm014_space_account_support.py @@ -66,6 +66,18 @@ class DBMigrateSpaceAccountSupport(migration.DBMigration): sqlalchemy.text('ALTER TABLE users ADD COLUMN space_refresh_token TEXT') ) + # Add space_access_token_expires_at column + if 'space_access_token_expires_at' not in columns: + if self.ap.persistence_mgr.db.name == 'postgresql': + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('ALTER TABLE users ADD COLUMN space_access_token_expires_at TIMESTAMP') + ) + + else: + await self.ap.persistence_mgr.execute_async( + sqlalchemy.text('ALTER TABLE users ADD COLUMN space_access_token_expires_at DATETIME') + ) + # Add space_api_key column if 'space_api_key' not in columns: if self.ap.persistence_mgr.db.name == 'postgresql':