diff --git a/src/langbot/pkg/api/http/controller/groups/system.py b/src/langbot/pkg/api/http/controller/groups/system.py index 8f211537..0cc5c990 100644 --- a/src/langbot/pkg/api/http/controller/groups/system.py +++ b/src/langbot/pkg/api/http/controller/groups/system.py @@ -136,6 +136,10 @@ class SystemRouterGroup(group.RouterGroup): return self.success(data=task.to_dict()) + @self.route('/storage-analysis', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) + async def _() -> str: + return self.success(data=await self.ap.maintenance_service.get_storage_analysis()) + @self.route('/debug/exec', methods=['POST'], auth_type=group.AuthType.USER_TOKEN) async def _() -> str: if not constants.debug_mode: diff --git a/src/langbot/pkg/api/http/service/maintenance.py b/src/langbot/pkg/api/http/service/maintenance.py new file mode 100644 index 00000000..e755800e --- /dev/null +++ b/src/langbot/pkg/api/http/service/maintenance.py @@ -0,0 +1,309 @@ +from __future__ import annotations + +import datetime +import os +import re +from pathlib import Path +from typing import Any + +import sqlalchemy + +from ....core import app +from ....entity.persistence import bstorage as persistence_bstorage +from ....entity.persistence import monitoring as persistence_monitoring + + +LOG_FILE_PATTERN = re.compile(r'^langbot-(\d{4}-\d{2}-\d{2})\.log(?:\.\d+)?$') +DEFAULT_UPLOAD_FILE_RETENTION_DAYS = 7 +DEFAULT_LOG_RETENTION_DAYS = 3 + + +class MaintenanceService: + """Storage maintenance and diagnostics.""" + + ap: app.Application + + def __init__(self, ap: app.Application) -> None: + self.ap = ap + + async def cleanup_expired_files(self) -> dict[str, int]: + cleanup_cfg = self.ap.instance_config.data.get('storage', {}).get('cleanup', {}) + upload_retention_days = self._positive_int( + cleanup_cfg.get('uploaded_file_retention_days'), + DEFAULT_UPLOAD_FILE_RETENTION_DAYS, + 'storage.cleanup.uploaded_file_retention_days', + ) + log_retention_days = self._positive_int( + cleanup_cfg.get('log_retention_days'), + DEFAULT_LOG_RETENTION_DAYS, + 'storage.cleanup.log_retention_days', + ) + + return { + 'uploaded_files': await self._cleanup_expired_uploaded_files(upload_retention_days), + 'log_files': self._cleanup_expired_log_files(log_retention_days), + } + + async def get_storage_analysis(self) -> dict[str, Any]: + cleanup_cfg = self.ap.instance_config.data.get('storage', {}).get('cleanup', {}) + upload_retention_days = self._positive_int( + cleanup_cfg.get('uploaded_file_retention_days'), + DEFAULT_UPLOAD_FILE_RETENTION_DAYS, + 'storage.cleanup.uploaded_file_retention_days', + ) + log_retention_days = self._positive_int( + cleanup_cfg.get('log_retention_days'), + DEFAULT_LOG_RETENTION_DAYS, + 'storage.cleanup.log_retention_days', + ) + + database_cfg = self.ap.instance_config.data.get('database', {}) + database_type = database_cfg.get('use', 'sqlite') + database_path = ( + Path(database_cfg.get('sqlite', {}).get('path', 'data/langbot.db')) if database_type == 'sqlite' else None + ) + roots: list[tuple[str, Path | None]] = [ + ('database', database_path), + ('logs', Path('data/logs')), + ('storage', Path('data/storage')), + ('vector_store', Path('data/chroma')), + ('plugins', Path('data/plugins')), + ('mcp', Path('data/mcp')), + ('temp', Path('data/temp')), + ] + + sections = [] + for key, path in roots: + sections.append( + { + 'key': key, + 'path': str(path) if path else '', + 'exists': path.exists() if path else False, + 'size_bytes': self._path_size(path) if path else 0, + 'file_count': self._file_count(path) if path else 0, + } + ) + + monitoring_counts = await self._monitoring_counts() + binary_storage = await self._binary_storage_stats() + upload_candidates = await self._expired_uploaded_candidates(upload_retention_days) + log_candidates = self._expired_log_candidates(log_retention_days) + + return { + 'generated_at': datetime.datetime.now(datetime.timezone.utc).isoformat(), + 'cleanup_policy': { + 'uploaded_file_retention_days': upload_retention_days, + 'log_retention_days': log_retention_days, + }, + 'sections': sections, + 'database': { + 'type': database_type, + 'monitoring_counts': monitoring_counts, + 'binary_storage': binary_storage, + }, + 'cleanup_candidates': { + 'uploaded_files': upload_candidates, + 'log_files': log_candidates, + }, + 'tasks': self.ap.task_mgr.get_stats() if self.ap.task_mgr else {}, + } + + async def _cleanup_expired_uploaded_files(self, retention_days: int) -> int: + provider = self.ap.storage_mgr.storage_provider + provider_name = provider.__class__.__name__ + if provider_name == 'LocalStorageProvider': + candidates = self._expired_local_upload_candidates(retention_days, include_paths=True) + deleted = 0 + for item in candidates: + try: + os.remove(item['path']) + deleted += 1 + except FileNotFoundError: + pass + except Exception as e: + self.ap.logger.warning(f'Failed to delete expired uploaded file {item["key"]}: {e}') + return deleted + + if provider_name == 'S3StorageProvider': + return await self._cleanup_expired_s3_uploaded_files(retention_days) + + return 0 + + async def _expired_uploaded_candidates(self, retention_days: int) -> list[dict[str, Any]]: + provider_name = self.ap.storage_mgr.storage_provider.__class__.__name__ + if provider_name == 'LocalStorageProvider': + return self._expired_local_upload_candidates(retention_days) + if provider_name == 'S3StorageProvider': + return await self._expired_s3_upload_candidates(retention_days) + return [] + + async def _cleanup_expired_s3_uploaded_files(self, retention_days: int) -> int: + provider = self.ap.storage_mgr.storage_provider + candidates = await self._expired_s3_upload_candidates(retention_days) + deleted = 0 + for item in candidates: + await provider.delete(item['key']) + deleted += 1 + return deleted + + async def _expired_s3_upload_candidates(self, retention_days: int) -> list[dict[str, Any]]: + provider = self.ap.storage_mgr.storage_provider + cutoff = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=retention_days) + candidates = [] + paginator = provider.s3_client.get_paginator('list_objects_v2') + + for page in paginator.paginate(Bucket=provider.bucket_name): + for obj in page.get('Contents', []): + key = obj.get('Key', '') + last_modified = obj.get('LastModified') + if not self._is_uploaded_file_key(key): + continue + if last_modified and last_modified < cutoff: + candidates.append( + { + 'key': key, + 'size_bytes': obj.get('Size', 0), + 'modified_at': last_modified.isoformat(), + } + ) + + return candidates + + def _cleanup_expired_log_files(self, retention_days: int) -> int: + deleted = 0 + for item in self._expired_log_candidates(retention_days, include_paths=True): + try: + os.remove(item['path']) + deleted += 1 + except FileNotFoundError: + pass + except Exception as e: + self.ap.logger.warning(f'Failed to delete expired log file {item["name"]}: {e}') + return deleted + + def _expired_local_upload_candidates( + self, retention_days: int, include_paths: bool = False + ) -> list[dict[str, Any]]: + storage_root = Path('data/storage') + if not storage_root.exists(): + return [] + + cutoff = datetime.datetime.now().timestamp() - retention_days * 86400 + candidates = [] + for entry in storage_root.iterdir(): + if not entry.is_file() or not self._is_uploaded_file_key(entry.name): + continue + stat = entry.stat() + if stat.st_mtime >= cutoff: + continue + item = { + 'key': entry.name, + 'size_bytes': stat.st_size, + 'modified_at': datetime.datetime.fromtimestamp(stat.st_mtime, datetime.timezone.utc).isoformat(), + } + if include_paths: + item['path'] = str(entry) + candidates.append(item) + return candidates + + def _expired_log_candidates(self, retention_days: int, include_paths: bool = False) -> list[dict[str, Any]]: + log_root = Path('data/logs') + if not log_root.exists(): + return [] + + cutoff_date = datetime.date.today() - datetime.timedelta(days=retention_days - 1) + candidates = [] + for entry in log_root.iterdir(): + if not entry.is_file(): + continue + match = LOG_FILE_PATTERN.match(entry.name) + if not match: + continue + try: + file_date = datetime.date.fromisoformat(match.group(1)) + except ValueError: + continue + if file_date >= cutoff_date: + continue + stat = entry.stat() + item = { + 'name': entry.name, + 'date': file_date.isoformat(), + 'size_bytes': stat.st_size, + } + if include_paths: + item['path'] = str(entry) + candidates.append(item) + return candidates + + def _is_uploaded_file_key(self, key: str) -> bool: + return '/' not in key and not key.startswith('plugin_config_') + + async def _monitoring_counts(self) -> dict[str, int]: + tables = { + 'messages': persistence_monitoring.MonitoringMessage.id, + 'llm_calls': persistence_monitoring.MonitoringLLMCall.id, + 'embedding_calls': persistence_monitoring.MonitoringEmbeddingCall.id, + 'errors': persistence_monitoring.MonitoringError.id, + 'sessions': persistence_monitoring.MonitoringSession.session_id, + 'feedback': persistence_monitoring.MonitoringFeedback.id, + } + counts: dict[str, int] = {} + for key, column in tables.items(): + result = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(sqlalchemy.func.count(column))) + counts[key] = result.scalar() or 0 + return counts + + async def _binary_storage_stats(self) -> dict[str, Any]: + count_result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(sqlalchemy.func.count(persistence_bstorage.BinaryStorage.unique_key)) + ) + size_bytes = None + try: + size_result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(sqlalchemy.func.sum(sqlalchemy.func.length(persistence_bstorage.BinaryStorage.value))) + ) + size_bytes = size_result.scalar() or 0 + except Exception as e: + self.ap.logger.warning(f'Failed to estimate binary storage size: {e}') + + return { + 'count': count_result.scalar() or 0, + 'size_bytes': size_bytes, + } + + def _path_size(self, path: Path) -> int: + if not path.exists(): + return 0 + if path.is_file(): + return path.stat().st_size + total = 0 + for root, _, files in os.walk(path): + for file_name in files: + file_path = Path(root) / file_name + try: + total += file_path.stat().st_size + except FileNotFoundError: + pass + return total + + def _file_count(self, path: Path) -> int: + if not path.exists(): + return 0 + if path.is_file(): + return 1 + count = 0 + for _, _, files in os.walk(path): + count += len(files) + return count + + def _positive_int(self, value: Any, default: int, name: str) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + self.ap.logger.warning(f'Invalid {name}: {value!r}, using {default}') + return default + if parsed < 1: + self.ap.logger.warning(f'Invalid {name}: {value!r}, using {default}') + return default + return parsed diff --git a/src/langbot/pkg/api/http/service/monitoring.py b/src/langbot/pkg/api/http/service/monitoring.py index e1c60fec..1ba66482 100644 --- a/src/langbot/pkg/api/http/service/monitoring.py +++ b/src/langbot/pkg/api/http/service/monitoring.py @@ -18,55 +18,119 @@ class MonitoringService: # ========== Cleanup Methods ========== - async def cleanup_expired_records(self, retention_days: int) -> dict[str, int]: + async def cleanup_expired_records(self, retention_days: int, batch_size: int = 1000) -> dict[str, int]: """Delete monitoring records older than the specified retention period. Args: retention_days: Number of days to retain records. + batch_size: Maximum rows to delete per table batch. Returns: A dict mapping table name to the number of deleted rows. """ + if retention_days < 1: + raise ValueError('retention_days must be >= 1') + if batch_size < 1: + raise ValueError('batch_size must be >= 1') + cutoff = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta( days=retention_days ) - tables_and_columns: list[tuple[str, type, sqlalchemy.Column]] = [ + tables_and_columns: list[tuple[str, type, sqlalchemy.Column, sqlalchemy.Column]] = [ ( 'monitoring_messages', persistence_monitoring.MonitoringMessage, persistence_monitoring.MonitoringMessage.timestamp, + persistence_monitoring.MonitoringMessage.id, ), ( 'monitoring_llm_calls', persistence_monitoring.MonitoringLLMCall, persistence_monitoring.MonitoringLLMCall.timestamp, + persistence_monitoring.MonitoringLLMCall.id, ), ( 'monitoring_embedding_calls', persistence_monitoring.MonitoringEmbeddingCall, persistence_monitoring.MonitoringEmbeddingCall.timestamp, + persistence_monitoring.MonitoringEmbeddingCall.id, ), ( 'monitoring_errors', persistence_monitoring.MonitoringError, persistence_monitoring.MonitoringError.timestamp, + persistence_monitoring.MonitoringError.id, ), ( 'monitoring_sessions', persistence_monitoring.MonitoringSession, persistence_monitoring.MonitoringSession.last_activity, + persistence_monitoring.MonitoringSession.session_id, + ), + ( + 'monitoring_feedback', + persistence_monitoring.MonitoringFeedback, + persistence_monitoring.MonitoringFeedback.timestamp, + persistence_monitoring.MonitoringFeedback.id, ), ] deleted_counts: dict[str, int] = {} - for table_name, model_cls, ts_column in tables_and_columns: - result = await self.ap.persistence_mgr.execute_async(sqlalchemy.delete(model_cls).where(ts_column < cutoff)) - deleted_counts[table_name] = result.rowcount + for table_name, model_cls, ts_column, pk_column in tables_and_columns: + deleted_counts[table_name] = await self._delete_expired_in_batches( + model_cls=model_cls, + ts_column=ts_column, + pk_column=pk_column, + cutoff=cutoff, + batch_size=batch_size, + ) + + if sum(deleted_counts.values()) > 0: + await self._release_sqlite_space() return deleted_counts + async def _delete_expired_in_batches( + self, + model_cls: type, + ts_column: sqlalchemy.Column, + pk_column: sqlalchemy.Column, + cutoff: datetime.datetime, + batch_size: int, + ) -> int: + deleted_total = 0 + + while True: + select_result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.select(pk_column).where(ts_column < cutoff).limit(batch_size) + ) + pk_values = list(select_result.scalars().all()) + if not pk_values: + break + + delete_result = await self.ap.persistence_mgr.execute_async( + sqlalchemy.delete(model_cls).where(pk_column.in_(pk_values)) + ) + deleted = delete_result.rowcount or 0 + deleted_total += deleted + + if len(pk_values) < batch_size: + break + + return deleted_total + + async def _release_sqlite_space(self) -> None: + database_type = self.ap.instance_config.data.get('database', {}).get('use', 'sqlite') + if database_type != 'sqlite': + return + + async with self.ap.persistence_mgr.get_db_engine().connect() as conn: + autocommit_conn = await conn.execution_options(isolation_level='AUTOCOMMIT') + await autocommit_conn.execute(sqlalchemy.text('PRAGMA wal_checkpoint(TRUNCATE)')) + await autocommit_conn.execute(sqlalchemy.text('VACUUM')) + # ========== Recording Methods ========== async def record_message( diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index aa1acd61..7e5386cf 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -31,6 +31,7 @@ from ..api.http.service import mcp as mcp_service from ..api.http.service import apikey as apikey_service from ..api.http.service import webhook as webhook_service from ..api.http.service import monitoring as monitoring_service +from ..api.http.service import maintenance as maintenance_service from ..discover import engine as discover_engine from ..storage import mgr as storagemgr @@ -155,6 +156,8 @@ class Application: monitoring_service: monitoring_service.MonitoringService = None + maintenance_service: maintenance_service.MaintenanceService = None + def __init__(self): pass @@ -194,14 +197,30 @@ class Application: monitoring_cfg = self.instance_config.data.get('monitoring', {}) auto_cleanup_cfg = monitoring_cfg.get('auto_cleanup', {}) if auto_cleanup_cfg.get('enabled', True): - retention_days = auto_cleanup_cfg.get('retention_days', 30) - check_interval_hours = auto_cleanup_cfg.get('check_interval_hours', 1) + retention_days = self._get_positive_int_config( + auto_cleanup_cfg.get('retention_days', 30), + default=30, + name='monitoring.auto_cleanup.retention_days', + ) + delete_batch_size = self._get_positive_int_config( + auto_cleanup_cfg.get('delete_batch_size', 1000), + default=1000, + name='monitoring.auto_cleanup.delete_batch_size', + ) + check_interval_hours = self._get_positive_float_config( + auto_cleanup_cfg.get('check_interval_hours', 1), + default=1, + name='monitoring.auto_cleanup.check_interval_hours', + ) async def monitoring_cleanup_loop(): check_interval_seconds = check_interval_hours * 3600 while True: try: - deleted = await self.monitoring_service.cleanup_expired_records(retention_days) + deleted = await self.monitoring_service.cleanup_expired_records( + retention_days, + batch_size=delete_batch_size, + ) total_deleted = sum(deleted.values()) if total_deleted > 0: self.logger.info( @@ -218,6 +237,33 @@ class Application: scopes=[core_entities.LifecycleControlScope.APPLICATION], ) + # Start storage/log maintenance task if enabled + storage_cleanup_cfg = self.instance_config.data.get('storage', {}).get('cleanup', {}) + if storage_cleanup_cfg.get('enabled', True) and self.maintenance_service is not None: + check_interval_hours = self._get_positive_float_config( + storage_cleanup_cfg.get('check_interval_hours', 1), + default=1, + name='storage.cleanup.check_interval_hours', + ) + + async def storage_cleanup_loop(): + check_interval_seconds = check_interval_hours * 3600 + while True: + try: + deleted = await self.maintenance_service.cleanup_expired_files() + total_deleted = sum(deleted.values()) + if total_deleted > 0: + self.logger.info(f'Storage maintenance: deleted expired files: {deleted}') + except Exception as e: + self.logger.warning(f'Storage maintenance error: {e}') + await asyncio.sleep(check_interval_seconds) + + self.task_mgr.create_task( + storage_cleanup_loop(), + name='storage-maintenance', + scopes=[core_entities.LifecycleControlScope.APPLICATION], + ) + self.task_mgr.create_task( never_ending(), name='never-ending-task', @@ -232,6 +278,28 @@ class Application: self.logger.error(f'Application runtime fatal exception: {e}') self.logger.debug(f'Traceback: {traceback.format_exc()}') + def _get_positive_int_config(self, value, default: int, name: str) -> int: + try: + parsed = int(value) + except (TypeError, ValueError): + self.logger.warning(f'Invalid {name}: {value!r}, using {default}') + return default + if parsed < 1: + self.logger.warning(f'Invalid {name}: {value!r}, using {default}') + return default + return parsed + + def _get_positive_float_config(self, value, default: float, name: str) -> float: + try: + parsed = float(value) + except (TypeError, ValueError): + self.logger.warning(f'Invalid {name}: {value!r}, using {default}') + return default + if parsed <= 0: + self.logger.warning(f'Invalid {name}: {value!r}, using {default}') + return default + return parsed + def dispose(self): self.plugin_connector.dispose() diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 71ff4262..3bb5ffd7 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -28,6 +28,7 @@ from ...api.http.service import mcp as mcp_service from ...api.http.service import apikey as apikey_service from ...api.http.service import webhook as webhook_service from ...api.http.service import monitoring as monitoring_service +from ...api.http.service import maintenance as maintenance_service from ...discover import engine as discover_engine from ...storage import mgr as storagemgr from ...utils import logcache @@ -167,6 +168,9 @@ class BuildAppStage(stage.BootingStage): monitoring_service_inst = monitoring_service.MonitoringService(ap) ap.monitoring_service = monitoring_service_inst + maintenance_service_inst = maintenance_service.MaintenanceService(ap) + ap.maintenance_service = maintenance_service_inst + async def runtime_disconnect_callback(connector: plugin_connector.PluginRuntimeConnector) -> None: await asyncio.sleep(3) await plugin_connector_inst.initialize() diff --git a/src/langbot/pkg/core/taskmgr.py b/src/langbot/pkg/core/taskmgr.py index c6846594..8bf8784a 100644 --- a/src/langbot/pkg/core/taskmgr.py +++ b/src/langbot/pkg/core/taskmgr.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import typing import datetime +import time from . import app from . import entities as core_entities @@ -119,6 +120,7 @@ class TaskWrapper: self.label = label if label != '' else name self.task.set_name(name) self.scopes = scopes + self.created_at = time.time() def assume_exception(self): try: @@ -154,6 +156,7 @@ class TaskWrapper: 'name': self.name, 'label': self.label, 'scopes': [scope.value for scope in self.scopes], + 'created_at': self.created_at, 'task_context': self.task_context.to_dict(), 'runtime': { 'done': self.task.done(), @@ -193,6 +196,8 @@ class AsyncTaskManager: ) -> TaskWrapper: wrapper = TaskWrapper(self.ap, coro, task_type, kind, name, label, context, scopes) self.tasks.append(wrapper) + wrapper.task.add_done_callback(lambda _: self._prune_completed_tasks()) + self._prune_completed_tasks() return wrapper def create_user_task( @@ -226,6 +231,15 @@ class AsyncTaskManager: 'id_index': TaskWrapper._id_index, } + def get_stats(self) -> dict: + completed = sum(1 for t in self.tasks if t.task.done()) + return { + 'total': len(self.tasks), + 'running': len(self.tasks) - completed, + 'completed': completed, + 'id_index': TaskWrapper._id_index, + } + def get_task_by_id(self, id: int) -> TaskWrapper | None: for t in self.tasks: if t.id == id: @@ -243,3 +257,27 @@ class AsyncTaskManager: if not wrapper.task.done(): wrapper.task.cancel() return + + def _prune_completed_tasks(self): + completed_limit = ( + self.ap.instance_config.data.get('system', {}) + .get('task_retention', {}) + .get( + 'completed_limit', + 200, + ) + ) + try: + completed_limit = int(completed_limit) + except (TypeError, ValueError): + completed_limit = 200 + if completed_limit < 1: + completed_limit = 1 + + completed_tasks = [wrapper for wrapper in self.tasks if wrapper.task.done()] + overflow = len(completed_tasks) - completed_limit + if overflow <= 0: + return + + remove_ids = {wrapper.id for wrapper in completed_tasks[:overflow]} + self.tasks = [wrapper for wrapper in self.tasks if wrapper.id not in remove_ids] diff --git a/src/langbot/pkg/plugin/handler.py b/src/langbot/pkg/plugin/handler.py index 58d6a4aa..60922003 100644 --- a/src/langbot/pkg/plugin/handler.py +++ b/src/langbot/pkg/plugin/handler.py @@ -367,6 +367,22 @@ class RuntimeConnectionHandler(handler.Handler): owner_type = data['owner_type'] owner = data['owner'] value = base64.b64decode(data['value_base64']) + max_value_bytes = ( + self.ap.instance_config.data.get('plugin', {}) + .get('binary_storage', {}) + .get( + 'max_value_bytes', + 10 * 1024 * 1024, + ) + ) + try: + max_value_bytes = int(max_value_bytes) + except (TypeError, ValueError): + max_value_bytes = 10 * 1024 * 1024 + if max_value_bytes >= 0 and len(value) > max_value_bytes: + return handler.ActionResponse.error( + message=f'Binary storage value exceeds limit ({len(value)} > {max_value_bytes} bytes)', + ) result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_bstorage.BinaryStorage) diff --git a/src/langbot/pkg/rag/knowledge/kbmgr.py b/src/langbot/pkg/rag/knowledge/kbmgr.py index 8fadc341..cd37994c 100644 --- a/src/langbot/pkg/rag/knowledge/kbmgr.py +++ b/src/langbot/pkg/rag/knowledge/kbmgr.py @@ -148,52 +148,60 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface): supported_extensions = {'txt', 'pdf', 'docx', 'md', 'html'} stored_file_tasks = [] - # use utf-8 encoding - with zipfile.ZipFile(io.BytesIO(zip_bytes), 'r', metadata_encoding='utf-8') as zip_ref: - for file_info in zip_ref.filelist: - # skip directories and hidden files - if file_info.is_dir() or file_info.filename.startswith('.'): - continue - - _, file_ext = os.path.splitext(file_info.filename) - file_extension = file_ext.lstrip('.').lower() - if file_extension not in supported_extensions: - self.ap.logger.debug(f'Skipping unsupported file in ZIP: {file_info.filename}') - continue - - try: - file_content = zip_ref.read(file_info.filename) - - base_name = file_info.filename.replace('/', '_').replace('\\', '_') - file_stem, file_ext = os.path.splitext(base_name) - extension = file_ext.lstrip('.') - - if file_stem.startswith('__MACOSX'): + try: + # use utf-8 encoding + with zipfile.ZipFile(io.BytesIO(zip_bytes), 'r', metadata_encoding='utf-8') as zip_ref: + for file_info in zip_ref.filelist: + # skip directories and hidden files + if file_info.is_dir() or file_info.filename.startswith('.'): continue - extracted_file_id = file_stem + '_' + str(uuid.uuid4())[:8] + '.' + extension - # save file to storage + _, file_ext = os.path.splitext(file_info.filename) + file_extension = file_ext.lstrip('.').lower() + if file_extension not in supported_extensions: + self.ap.logger.debug(f'Skipping unsupported file in ZIP: {file_info.filename}') + continue - await self.ap.storage_mgr.storage_provider.save(extracted_file_id, file_content) + try: + file_content = zip_ref.read(file_info.filename) - task_id = await self.store_file(extracted_file_id, parser_plugin_id=parser_plugin_id) - stored_file_tasks.append(task_id) + base_name = file_info.filename.replace('/', '_').replace('\\', '_') + file_stem, file_ext = os.path.splitext(base_name) + extension = file_ext.lstrip('.') - self.ap.logger.info( - f'Extracted and stored file from ZIP: {file_info.filename} -> {extracted_file_id}' - ) + if file_stem.startswith('__MACOSX'): + continue - except Exception as e: - self.ap.logger.warning(f'Failed to extract file {file_info.filename} from ZIP: {e}') - continue + extracted_file_id = file_stem + '_' + str(uuid.uuid4())[:8] + '.' + extension + # save file to storage - if not stored_file_tasks: - raise Exception('No supported files found in ZIP archive') + await self.ap.storage_mgr.storage_provider.save(extracted_file_id, file_content) - self.ap.logger.info(f'Successfully processed ZIP file {zip_file_id}, extracted {len(stored_file_tasks)} files') - await self.ap.storage_mgr.storage_provider.delete(zip_file_id) + task_id = await self.store_file(extracted_file_id, parser_plugin_id=parser_plugin_id) + stored_file_tasks.append(task_id) - return stored_file_tasks[0] if stored_file_tasks else '' + self.ap.logger.info( + f'Extracted and stored file from ZIP: {file_info.filename} -> {extracted_file_id}' + ) + + except Exception as e: + self.ap.logger.warning(f'Failed to extract file {file_info.filename} from ZIP: {e}') + continue + + if not stored_file_tasks: + raise Exception('No supported files found in ZIP archive') + + self.ap.logger.info( + f'Successfully processed ZIP file {zip_file_id}, extracted {len(stored_file_tasks)} files' + ) + return stored_file_tasks[0] if stored_file_tasks else '' + finally: + try: + await self.ap.storage_mgr.storage_provider.delete(zip_file_id) + except FileNotFoundError: + pass + except Exception as e: + self.ap.logger.warning(f'Failed to cleanup ZIP file {zip_file_id}: {e}') async def retrieve(self, query: str, settings: dict | None = None) -> list[rag_context.RetrievalResultEntry]: # Merge stored retrieval_settings with per-request overrides diff --git a/src/langbot/templates/config.yaml b/src/langbot/templates/config.yaml index 7b7e59fe..5cf9b98b 100644 --- a/src/langbot/templates/config.yaml +++ b/src/langbot/templates/config.yaml @@ -25,6 +25,9 @@ system: max_bots: -1 max_pipelines: -1 max_extensions: -1 + task_retention: + # Keep at most this many completed async task records in memory + completed_limit: 200 jwt: expire: 604800 secret: '' @@ -68,6 +71,15 @@ vdb: password: 'postgres' storage: use: local + cleanup: + # Enable periodic cleanup of local/S3 uploaded files and old log files + enabled: true + # Cleanup check interval in hours + check_interval_hours: 1 + # Root-level uploaded files older than this will be deleted + uploaded_file_retention_days: 7 + # LangBot log files older than this many days will be deleted + log_retention_days: 3 s3: endpoint_url: '' access_key_id: '' @@ -79,6 +91,9 @@ plugin: runtime_ws_url: 'ws://langbot_plugin_runtime:5400/control/ws' enable_marketplace: true display_plugin_debug_url: 'ws://localhost:5401/plugin/debug/ws' + binary_storage: + # Max bytes for a single plugin binary storage value + max_value_bytes: 10485760 monitoring: auto_cleanup: # Enable automatic cleanup of expired monitoring records @@ -87,6 +102,8 @@ monitoring: retention_days: 30 # Cleanup check interval in hours check_interval_hours: 1 + # Number of expired rows to delete per table batch + delete_batch_size: 1000 space: # Space service URL for OAuth and API url: 'https://space.langbot.app' diff --git a/web/src/app/home/components/home-sidebar/sidbarConfigList.tsx b/web/src/app/home/components/home-sidebar/sidbarConfigList.tsx index 12e28121..71b5cece 100644 --- a/web/src/app/home/components/home-sidebar/sidbarConfigList.tsx +++ b/web/src/app/home/components/home-sidebar/sidbarConfigList.tsx @@ -1,5 +1,6 @@ import { SidebarChildVO } from '@/app/home/components/home-sidebar/HomeSidebarChild'; import i18n from '@/i18n'; +import { HardDrive } from 'lucide-react'; const t = (key: string) => { return i18n.t(key); @@ -51,6 +52,18 @@ export const sidebarConfigList = [ }, section: 'home', }), + new SidebarChildVO({ + id: 'storage-analysis', + name: t('storageAnalysis.title'), + icon: , + route: '/home/storage-analysis', + description: t('storageAnalysis.description'), + helpLink: { + en_US: '', + zh_Hans: '', + }, + section: 'home', + }), new SidebarChildVO({ id: 'bots', name: t('bots.title'), diff --git a/web/src/app/home/storage-analysis/page.tsx b/web/src/app/home/storage-analysis/page.tsx new file mode 100644 index 00000000..270caff1 --- /dev/null +++ b/web/src/app/home/storage-analysis/page.tsx @@ -0,0 +1,297 @@ +'use client'; + +import { + type ReactNode, + useCallback, + useEffect, + useMemo, + useState, +} from 'react'; +import { useTranslation } from 'react-i18next'; +import { RefreshCw, HardDrive, Database, FileWarning } from 'lucide-react'; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, +} from '@/components/ui/dialog'; +import { Button } from '@/components/ui/button'; +import { backendClient } from '@/app/infra/http'; + +interface StorageSection { + key: string; + path: string; + exists: boolean; + size_bytes: number; + file_count: number; +} + +interface CleanupCandidate { + key?: string; + name?: string; + size_bytes: number; +} + +interface StorageAnalysis { + generated_at: string; + cleanup_policy: { + uploaded_file_retention_days: number; + log_retention_days: number; + }; + sections: StorageSection[]; + database: { + type: string; + monitoring_counts: Record; + binary_storage: { + count: number; + size_bytes: number | null; + }; + }; + cleanup_candidates: { + uploaded_files: CleanupCandidate[]; + log_files: CleanupCandidate[]; + }; + tasks: { + total?: number; + running?: number; + completed?: number; + }; +} + +function formatBytes(bytes: number | null | undefined): string { + if (bytes === null || bytes === undefined) { + return '-'; + } + if (bytes < 1024) { + return `${bytes} B`; + } + const units = ['KB', 'MB', 'GB', 'TB']; + let value = bytes / 1024; + let unitIndex = 0; + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex += 1; + } + return `${value.toFixed(value >= 10 ? 1 : 2)} ${units[unitIndex]}`; +} + +export default function StorageAnalysisPage() { + const { t } = useTranslation(); + const [open, setOpen] = useState(true); + const [analysis, setAnalysis] = useState(null); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const loadAnalysis = useCallback(async () => { + setLoading(true); + setError(null); + try { + const result = await backendClient.get( + '/api/v1/system/storage-analysis', + ); + setAnalysis(result); + } catch (err) { + setError(err instanceof Error ? err.message : String(err)); + } finally { + setLoading(false); + } + }, []); + + useEffect(() => { + loadAnalysis(); + }, [loadAnalysis]); + + const totalBytes = useMemo(() => { + return ( + analysis?.sections.reduce((sum, item) => sum + item.size_bytes, 0) ?? 0 + ); + }, [analysis]); + + const uploadedCandidateBytes = useMemo(() => { + return ( + analysis?.cleanup_candidates.uploaded_files.reduce( + (sum, item) => sum + item.size_bytes, + 0, + ) ?? 0 + ); + }, [analysis]); + + const logCandidateBytes = useMemo(() => { + return ( + analysis?.cleanup_candidates.log_files.reduce( + (sum, item) => sum + item.size_bytes, + 0, + ) ?? 0 + ); + }, [analysis]); + + return ( +
+
+
+

+ {t('storageAnalysis.title')} +

+

+ {t('storageAnalysis.description')} +

+
+ +
+ + + + + + + {t('storageAnalysis.dialogTitle')} + + + +
+
+ {analysis + ? t('storageAnalysis.generatedAt', { + time: new Date(analysis.generated_at).toLocaleString(), + }) + : t('storageAnalysis.loading')} +
+ +
+ + {error && ( +
+ {error} +
+ )} + + {analysis && ( +
+
+ } + /> + } + /> + } + /> + } + /> +
+ +
+

+ {t('storageAnalysis.sections')} +

+
+ {analysis.sections.map((section) => ( +
+
+
+ {t(`storageAnalysis.sectionNames.${section.key}`)} +
+
+ {section.path} +
+
+
+ {formatBytes(section.size_bytes)} +
+
+ {section.file_count} +
+
+ ))} +
+
+ +
+
+

+ {t('storageAnalysis.monitoringTables')} +

+ +
+
+

+ {t('storageAnalysis.runtimeTasks')} +

+ +
+
+
+ )} +
+
+
+ ); +} + +function SummaryItem({ + label, + value, + icon, +}: { + label: string; + value: string; + icon: ReactNode; +}) { + return ( +
+
+ {icon} + {label} +
+
{value}
+
+ ); +} + +function KeyValueList({ + values, +}: { + values: Record; +}) { + return ( +
+ {Object.entries(values).map(([key, value]) => ( +
+ {key} + {value ?? '-'} +
+ ))} +
+ ); +} diff --git a/web/src/i18n/locales/en-US.ts b/web/src/i18n/locales/en-US.ts index 082f8d8f..0e502c6d 100644 --- a/web/src/i18n/locales/en-US.ts +++ b/web/src/i18n/locales/en-US.ts @@ -1225,6 +1225,31 @@ const enUS = { feedback: 'User Feedback', }, }, + storageAnalysis: { + title: 'Storage Analysis', + description: 'Inspect storage usage and cleanup candidates', + openDialog: 'View Analysis', + dialogTitle: 'Storage Analysis', + generatedAt: 'Generated at {{time}}', + loading: 'Loading...', + refresh: 'Refresh', + totalSize: 'Total size', + binaryStorage: 'Binary storage', + uploadCleanup: 'Expired uploads', + logCleanup: 'Expired logs', + sections: 'Storage sections', + monitoringTables: 'Monitoring tables', + runtimeTasks: 'Runtime tasks', + sectionNames: { + database: 'Database', + logs: 'Logs', + storage: 'Uploaded files', + vector_store: 'Vector store', + plugins: 'Plugins', + mcp: 'MCP', + temp: 'Temporary files', + }, + }, limitation: { maxBotsReached: 'Maximum number of bots ({{max}}) reached. Please remove an existing bot before creating a new one.', diff --git a/web/src/i18n/locales/es-ES.ts b/web/src/i18n/locales/es-ES.ts index 39c45b72..add448c7 100644 --- a/web/src/i18n/locales/es-ES.ts +++ b/web/src/i18n/locales/es-ES.ts @@ -1259,6 +1259,32 @@ const esES = { feedback: 'Comentarios de usuarios', }, }, + storageAnalysis: { + title: 'Análisis de almacenamiento', + description: + 'Inspecciona el uso de almacenamiento y los candidatos de limpieza', + openDialog: 'Ver análisis', + dialogTitle: 'Análisis de almacenamiento', + generatedAt: 'Generado el {{time}}', + loading: 'Cargando...', + refresh: 'Actualizar', + totalSize: 'Tamaño total', + binaryStorage: 'Almacenamiento binario de plugins', + uploadCleanup: 'Subidas caducadas', + logCleanup: 'Registros caducados', + sections: 'Secciones de almacenamiento', + monitoringTables: 'Tablas de monitoreo', + runtimeTasks: 'Tareas en ejecución', + sectionNames: { + database: 'Base de datos', + logs: 'Registros', + storage: 'Archivos subidos', + vector_store: 'Almacén vectorial', + plugins: 'Plugins', + mcp: 'MCP', + temp: 'Archivos temporales', + }, + }, limitation: { maxBotsReached: 'Se ha alcanzado el número máximo de Bots ({{max}}). Por favor, elimina un Bot existente antes de crear uno nuevo.', diff --git a/web/src/i18n/locales/ja-JP.ts b/web/src/i18n/locales/ja-JP.ts index 54c9e5d5..5025a683 100644 --- a/web/src/i18n/locales/ja-JP.ts +++ b/web/src/i18n/locales/ja-JP.ts @@ -1230,6 +1230,31 @@ feedback: 'ユーザーフィードバック', }, }, + storageAnalysis: { + title: 'ストレージ分析', + description: 'ストレージ使用量とクリーンアップ候補を確認します', + openDialog: '分析を表示', + dialogTitle: 'ストレージ分析', + generatedAt: '生成日時 {{time}}', + loading: '読み込み中...', + refresh: '更新', + totalSize: '合計サイズ', + binaryStorage: 'プラグインバイナリストレージ', + uploadCleanup: '期限切れアップロード', + logCleanup: '期限切れログ', + sections: 'ストレージセクション', + monitoringTables: '監視テーブル', + runtimeTasks: '実行タスク', + sectionNames: { + database: 'データベース', + logs: 'ログ', + storage: 'アップロードファイル', + vector_store: 'ベクターストア', + plugins: 'プラグイン', + mcp: 'MCP', + temp: '一時ファイル', + }, + }, limitation: { maxBotsReached: 'ボット数が上限({{max}}個)に達しました。新しいボットを作成するには、既存のボットを削除してください。', diff --git a/web/src/i18n/locales/ru-RU.ts b/web/src/i18n/locales/ru-RU.ts index e8e34b29..122cfb0b 100644 --- a/web/src/i18n/locales/ru-RU.ts +++ b/web/src/i18n/locales/ru-RU.ts @@ -1234,6 +1234,31 @@ const ruRU = { feedback: 'Отзывы пользователей', }, }, + storageAnalysis: { + title: 'Анализ хранилища', + description: 'Проверьте использование хранилища и кандидатов на очистку', + openDialog: 'Открыть анализ', + dialogTitle: 'Анализ хранилища', + generatedAt: 'Создано {{time}}', + loading: 'Загрузка...', + refresh: 'Обновить', + totalSize: 'Общий размер', + binaryStorage: 'Бинарное хранилище плагинов', + uploadCleanup: 'Просроченные загрузки', + logCleanup: 'Просроченные журналы', + sections: 'Разделы хранилища', + monitoringTables: 'Таблицы мониторинга', + runtimeTasks: 'Задачи runtime', + sectionNames: { + database: 'База данных', + logs: 'Журналы', + storage: 'Загруженные файлы', + vector_store: 'Векторное хранилище', + plugins: 'Плагины', + mcp: 'MCP', + temp: 'Временные файлы', + }, + }, limitation: { maxBotsReached: 'Достигнуто максимальное количество ботов ({{max}}). Удалите существующего бота перед созданием нового.', diff --git a/web/src/i18n/locales/th-TH.ts b/web/src/i18n/locales/th-TH.ts index ff1c3799..2c3a78a8 100644 --- a/web/src/i18n/locales/th-TH.ts +++ b/web/src/i18n/locales/th-TH.ts @@ -1205,6 +1205,31 @@ const thTH = { feedback: 'ความคิดเห็นผู้ใช้', }, }, + storageAnalysis: { + title: 'วิเคราะห์พื้นที่จัดเก็บ', + description: 'ตรวจสอบการใช้พื้นที่จัดเก็บและรายการที่สามารถล้างได้', + openDialog: 'ดูการวิเคราะห์', + dialogTitle: 'วิเคราะห์พื้นที่จัดเก็บ', + generatedAt: 'สร้างเมื่อ {{time}}', + loading: 'กำลังโหลด...', + refresh: 'รีเฟรช', + totalSize: 'ขนาดรวม', + binaryStorage: 'พื้นที่จัดเก็บไบนารีของปลั๊กอิน', + uploadCleanup: 'ไฟล์อัปโหลดที่หมดอายุ', + logCleanup: 'บันทึกที่หมดอายุ', + sections: 'ส่วนพื้นที่จัดเก็บ', + monitoringTables: 'ตารางการตรวจสอบ', + runtimeTasks: 'งาน runtime', + sectionNames: { + database: 'ฐานข้อมูล', + logs: 'บันทึก', + storage: 'ไฟล์อัปโหลด', + vector_store: 'คลังเวกเตอร์', + plugins: 'ปลั๊กอิน', + mcp: 'MCP', + temp: 'ไฟล์ชั่วคราว', + }, + }, limitation: { maxBotsReached: 'จำนวน Bot สูงสุด ({{max}}) ถึงขีดจำกัดแล้ว กรุณาลบ Bot ที่มีอยู่ก่อนสร้างใหม่', diff --git a/web/src/i18n/locales/vi-VN.ts b/web/src/i18n/locales/vi-VN.ts index 601ddb55..155dbe7d 100644 --- a/web/src/i18n/locales/vi-VN.ts +++ b/web/src/i18n/locales/vi-VN.ts @@ -1227,6 +1227,31 @@ const viVN = { feedback: 'Phản hồi người dùng', }, }, + storageAnalysis: { + title: 'Phân tích lưu trữ', + description: 'Kiểm tra dung lượng lưu trữ và các mục có thể dọn dẹp', + openDialog: 'Xem phân tích', + dialogTitle: 'Phân tích lưu trữ', + generatedAt: 'Tạo lúc {{time}}', + loading: 'Đang tải...', + refresh: 'Làm mới', + totalSize: 'Tổng dung lượng', + binaryStorage: 'Lưu trữ nhị phân plugin', + uploadCleanup: 'Tệp tải lên hết hạn', + logCleanup: 'Nhật ký hết hạn', + sections: 'Khu vực lưu trữ', + monitoringTables: 'Bảng giám sát', + runtimeTasks: 'Tác vụ runtime', + sectionNames: { + database: 'Cơ sở dữ liệu', + logs: 'Nhật ký', + storage: 'Tệp tải lên', + vector_store: 'Kho vector', + plugins: 'Plugin', + mcp: 'MCP', + temp: 'Tệp tạm', + }, + }, limitation: { maxBotsReached: 'Đã đạt số lượng Bot tối đa ({{max}}). Vui lòng xóa một Bot hiện có trước khi tạo mới.', diff --git a/web/src/i18n/locales/zh-Hans.ts b/web/src/i18n/locales/zh-Hans.ts index 93b673ed..e1dd2361 100644 --- a/web/src/i18n/locales/zh-Hans.ts +++ b/web/src/i18n/locales/zh-Hans.ts @@ -1171,6 +1171,31 @@ const zhHans = { feedback: '用户反馈', }, }, + storageAnalysis: { + title: '存储分析', + description: '查看存储占用和可清理文件', + openDialog: '查看分析', + dialogTitle: '存储分析', + generatedAt: '生成时间 {{time}}', + loading: '加载中...', + refresh: '刷新', + totalSize: '总占用', + binaryStorage: '插件二进制存储', + uploadCleanup: '过期上传文件', + logCleanup: '过期日志', + sections: '存储分区', + monitoringTables: '监控表', + runtimeTasks: '运行任务', + sectionNames: { + database: '数据库', + logs: '日志', + storage: '上传文件', + vector_store: '向量库', + plugins: '插件', + mcp: 'MCP', + temp: '临时文件', + }, + }, limitation: { maxBotsReached: '已达到机器人数量上限({{max}}个)。请先删除已有机器人后再创建新的。', diff --git a/web/src/i18n/locales/zh-Hant.ts b/web/src/i18n/locales/zh-Hant.ts index fd56f406..7396100e 100644 --- a/web/src/i18n/locales/zh-Hant.ts +++ b/web/src/i18n/locales/zh-Hant.ts @@ -1171,6 +1171,31 @@ const zhHant = { feedback: '使用者回饋', }, }, + storageAnalysis: { + title: '儲存分析', + description: '查看儲存占用和可清理檔案', + openDialog: '查看分析', + dialogTitle: '儲存分析', + generatedAt: '生成時間 {{time}}', + loading: '載入中...', + refresh: '重新整理', + totalSize: '總占用', + binaryStorage: '插件二進位儲存', + uploadCleanup: '過期上傳檔案', + logCleanup: '過期日誌', + sections: '儲存分區', + monitoringTables: '監控表', + runtimeTasks: '執行任務', + sectionNames: { + database: '資料庫', + logs: '日誌', + storage: '上傳檔案', + vector_store: '向量庫', + plugins: '插件', + mcp: 'MCP', + temp: '暫存檔案', + }, + }, limitation: { maxBotsReached: '已達到機器人數量上限({{max}}個)。請先刪除已有機器人後再建立新的。',