diff --git a/src/langbot/pkg/core/bootutils/log.py b/src/langbot/pkg/core/bootutils/log.py index 631b05e2..75ed5a92 100644 --- a/src/langbot/pkg/core/bootutils/log.py +++ b/src/langbot/pkg/core/bootutils/log.py @@ -1,4 +1,5 @@ import logging +import logging.handlers import sys import time @@ -15,6 +16,10 @@ log_colors_config = { 'CRITICAL': 'cyan', } +# Log rotation configuration to prevent unbounded log file growth +LOG_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10MB per file +LOG_FILE_BACKUP_COUNT = 5 # Keep 5 backup files (total ~50MB max) + async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging.Logger: # Remove all existing loggers @@ -43,9 +48,17 @@ async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging. # stream_handler.setFormatter(color_formatter) stream_handler.stream = open(sys.stdout.fileno(), mode='w', encoding='utf-8', buffering=1) + # Use RotatingFileHandler to prevent unbounded log file growth + rotating_file_handler = logging.handlers.RotatingFileHandler( + log_file_name, + encoding='utf-8', + maxBytes=LOG_FILE_MAX_BYTES, + backupCount=LOG_FILE_BACKUP_COUNT, + ) + log_handlers: list[logging.Handler] = [ stream_handler, - logging.FileHandler(log_file_name, encoding='utf-8'), + rotating_file_handler, ] log_handlers += extra_handlers if extra_handlers is not None else [] diff --git a/src/langbot/pkg/pipeline/controller.py b/src/langbot/pkg/pipeline/controller.py index b1dde4a6..988306cf 100644 --- a/src/langbot/pkg/pipeline/controller.py +++ b/src/langbot/pkg/pipeline/controller.py @@ -33,11 +33,14 @@ class Controller: for query in queries: session = await self.ap.sess_mgr.get_session(query) - self.ap.logger.debug(f'Checking query {query} session {session}') + # Debug logging removed from tight loop to prevent excessive log generation + # that can cause memory overflow in high-traffic scenarios if not session._semaphore.locked(): selected_query = query await session._semaphore.acquire() + # Only log when actually selecting a query + self.ap.logger.debug(f'Selected query {query.query_id} for processing') break diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index e4337742..b76b7a2b 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -79,6 +79,7 @@ class ChatMessageHandler(handler.MessageHandler): raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}') if is_stream: resp_message_id = uuid.uuid4() + chunk_count = 0 # Track streaming chunks to reduce excessive logging async for result in runner.run(query): result.resp_message_id = str(resp_message_id) @@ -91,15 +92,30 @@ class ChatMessageHandler(handler.MessageHandler): await query.adapter.create_message_card(str(resp_message_id), query.message_event) is_create_card = True query.resp_messages.append(result) - self.ap.logger.info( - f'Conversation({query.query_id}) Streaming Response: {self.cut_str(result.readable_str())}' - ) + + chunk_count += 1 + # Only log every 10th chunk to reduce excessive logging during streaming + # This prevents memory overflow from thousands of log entries per conversation + # First chunk uses INFO level to confirm connection establishment + if chunk_count == 1: + self.ap.logger.info( + f'Conversation({query.query_id}) Streaming started: {self.cut_str(result.readable_str())}' + ) + elif chunk_count % 10 == 0: + self.ap.logger.debug( + f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}' + ) if result.content is not None: text_length += len(result.content) yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query) + # Log final summary after streaming completes + self.ap.logger.info( + f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars' + ) + else: async for result in runner.run(query): query.resp_messages.append(result)