fix: prevent memory overflow from excessive logging in streaming and query processing (#1879)

* Initial plan

* fix: reduce excessive logging to prevent memory overflow

- Add log file rotation (10MB max per file, 5 backups)
- Reduce streaming response logging (every 10th chunk instead of every chunk)
- Remove debug logging from controller tight loop
- Add summary logging after streaming completes

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* refactor: address code review feedback

- Extract log rotation config to module-level constants
- Keep first streaming chunk at INFO level for connection debugging
- Use DEBUG level for subsequent chunks

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* style: fix code formatting whitespace

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>
This commit is contained in:
Copilot
2025-12-22 18:25:24 +08:00
committed by GitHub
parent 88ef9cd6ae
commit 90a22d894d
3 changed files with 37 additions and 5 deletions

View File

@@ -1,4 +1,5 @@
import logging
import logging.handlers
import sys
import time
@@ -15,6 +16,10 @@ log_colors_config = {
'CRITICAL': 'cyan',
}
# Log rotation configuration to prevent unbounded log file growth
LOG_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10MB per file
LOG_FILE_BACKUP_COUNT = 5 # Keep 5 backup files (total ~50MB max)
async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging.Logger:
# Remove all existing loggers
@@ -43,9 +48,17 @@ async def init_logging(extra_handlers: list[logging.Handler] = None) -> logging.
# stream_handler.setFormatter(color_formatter)
stream_handler.stream = open(sys.stdout.fileno(), mode='w', encoding='utf-8', buffering=1)
# Use RotatingFileHandler to prevent unbounded log file growth
rotating_file_handler = logging.handlers.RotatingFileHandler(
log_file_name,
encoding='utf-8',
maxBytes=LOG_FILE_MAX_BYTES,
backupCount=LOG_FILE_BACKUP_COUNT,
)
log_handlers: list[logging.Handler] = [
stream_handler,
logging.FileHandler(log_file_name, encoding='utf-8'),
rotating_file_handler,
]
log_handlers += extra_handlers if extra_handlers is not None else []

View File

@@ -33,11 +33,14 @@ class Controller:
for query in queries:
session = await self.ap.sess_mgr.get_session(query)
self.ap.logger.debug(f'Checking query {query} session {session}')
# Debug logging removed from tight loop to prevent excessive log generation
# that can cause memory overflow in high-traffic scenarios
if not session._semaphore.locked():
selected_query = query
await session._semaphore.acquire()
# Only log when actually selecting a query
self.ap.logger.debug(f'Selected query {query.query_id} for processing')
break

View File

@@ -79,6 +79,7 @@ class ChatMessageHandler(handler.MessageHandler):
raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}')
if is_stream:
resp_message_id = uuid.uuid4()
chunk_count = 0 # Track streaming chunks to reduce excessive logging
async for result in runner.run(query):
result.resp_message_id = str(resp_message_id)
@@ -91,15 +92,30 @@ class ChatMessageHandler(handler.MessageHandler):
await query.adapter.create_message_card(str(resp_message_id), query.message_event)
is_create_card = True
query.resp_messages.append(result)
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming Response: {self.cut_str(result.readable_str())}'
)
chunk_count += 1
# Only log every 10th chunk to reduce excessive logging during streaming
# This prevents memory overflow from thousands of log entries per conversation
# First chunk uses INFO level to confirm connection establishment
if chunk_count == 1:
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming started: {self.cut_str(result.readable_str())}'
)
elif chunk_count % 10 == 0:
self.ap.logger.debug(
f'Conversation({query.query_id}) Streaming chunk {chunk_count}: {self.cut_str(result.readable_str())}'
)
if result.content is not None:
text_length += len(result.content)
yield entities.StageProcessResult(result_type=entities.ResultType.CONTINUE, new_query=query)
# Log final summary after streaming completes
self.ap.logger.info(
f'Conversation({query.query_id}) Streaming completed: {chunk_count} chunks, {text_length} chars'
)
else:
async for result in runner.run(query):
query.resp_messages.append(result)