fix: the fucking logger

This commit is contained in:
Junyan Qin
2025-07-11 21:37:31 +08:00
parent a79a22a74d
commit 6d788cadbc

View File

@@ -1,6 +1,5 @@
# rag_manager.py
from __future__ import annotations
import logging
import os
import asyncio
import uuid
@@ -13,9 +12,8 @@ from pkg.core import app
class RAGManager:
ap: app.Application
def __init__(self, ap: app.Application, logger: logging.Logger = None):
def __init__(self, ap: app.Application):
self.ap = ap
self.logger = logger or logging.getLogger(__name__)
self.chroma_manager = None
self.parser = FileParser()
self.chunker = Chunker()
@@ -52,20 +50,20 @@ class RAGManager:
session.add(new_kb)
session.commit()
session.refresh(new_kb)
self.logger.info(f"Knowledge Base '{kb_name}' created.")
self.ap.logger.info(f"Knowledge Base '{kb_name}' created.")
return new_kb.id
else:
self.logger.info(f"Knowledge Base '{kb_name}' already exists.")
self.ap.logger.info(f"Knowledge Base '{kb_name}' already exists.")
except Exception as e:
session.rollback()
self.logger.error(f"Error in _create_kb_sync for '{kb_name}': {str(e)}", exc_info=True)
self.ap.logger.error(f"Error in _create_kb_sync for '{kb_name}': {str(e)}", exc_info=True)
raise
finally:
session.close()
return await asyncio.to_thread(_create_kb_sync)
except Exception as e:
self.logger.error(f"Error creating knowledge base '{kb_name}': {str(e)}", exc_info=True)
self.ap.logger.error(f"Error creating knowledge base '{kb_name}': {str(e)}", exc_info=True)
raise
async def get_all_knowledge_bases(self):
@@ -83,7 +81,7 @@ class RAGManager:
return await asyncio.to_thread(_get_all_kbs_sync)
except Exception as e:
self.logger.error(f'Error retrieving knowledge bases: {str(e)}', exc_info=True)
self.ap.logger.error(f'Error retrieving knowledge bases: {str(e)}', exc_info=True)
return []
async def get_knowledge_base_by_id(self, kb_id: str):
@@ -101,7 +99,7 @@ class RAGManager:
return await asyncio.to_thread(_get_kb_sync, kb_id)
except Exception as e:
self.logger.error(f'Error retrieving knowledge base with ID {kb_id}: {str(e)}', exc_info=True)
self.ap.logger.error(f'Error retrieving knowledge base with ID {kb_id}: {str(e)}', exc_info=True)
return None
async def get_files_by_knowledge_base(self, kb_id: str):
@@ -119,7 +117,7 @@ class RAGManager:
return await asyncio.to_thread(_get_files_sync, kb_id)
except Exception as e:
self.logger.error(f'Error retrieving files for knowledge base ID {kb_id}: {str(e)}', exc_info=True)
self.ap.logger.error(f'Error retrieving files for knowledge base ID {kb_id}: {str(e)}', exc_info=True)
return []
async def get_all_files(self):
@@ -138,7 +136,7 @@ class RAGManager:
return await asyncio.to_thread(_get_all_files_sync)
except Exception as e:
self.logger.error(f'Error retrieving all files: {str(e)}', exc_info=True)
self.ap.logger.error(f'Error retrieving all files: {str(e)}', exc_info=True)
return []
async def store_data(
@@ -148,27 +146,25 @@ class RAGManager:
Parses, chunks, embeds, and stores data from a given file into the RAG system.
Associates the file with a knowledge base using kb_id in the File table.
"""
self.logger.info(f'Starting data storage process for file: {file_path}')
self.ap.logger.info(f'Starting data storage process for file: {file_path}')
session = SessionLocal()
file_obj = None
try:
kb = session.query(KnowledgeBase).filter_by(name=kb_name).first()
if not kb:
kb = KnowledgeBase(name=kb_name, description=kb_description)
session.add(kb)
session.commit()
session.refresh(kb)
self.logger.info(f"Knowledge Base '{kb_name}' created during store_data.")
self.ap.logger.info(f"Knowledge Base '{kb_name}' created during store_data.")
else:
self.logger.info(f"Knowledge Base '{kb_name}' already exists.")
self.ap.logger.info(f"Knowledge Base '{kb_name}' already exists.")
file_name = os.path.basename(file_path)
existing_file = session.query(File).filter_by(kb_id=kb.id, file_name=file_name).first()
if existing_file:
self.logger.warning(
self.ap.logger.warning(
f"File '{file_name}' already exists in knowledge base '{kb_name}'. Skipping storage."
)
return
@@ -177,32 +173,32 @@ class RAGManager:
session.add(file_obj)
session.commit()
session.refresh(file_obj)
self.logger.info(
self.ap.logger.info(
f"File record '{file_name}' added to database with ID: {file_obj.id}, associated with KB ID: {kb.id}"
)
text = await self.parser.parse(file_path)
if not text:
self.logger.warning(f'No text extracted from file {file_path}. Deleting file record ID: {file_obj.id}.')
self.ap.logger.warning(
f'No text extracted from file {file_path}. Deleting file record ID: {file_obj.id}.'
)
session.delete(file_obj)
session.commit()
return
chunks_texts = await self.chunker.chunk(text)
self.logger.info(f"Chunked file '{file_name}' into {len(chunks_texts)} chunks.")
self.ap.logger.info(f"Chunked file '{file_name}' into {len(chunks_texts)} chunks.")
await self.embedder.embed_and_store(file_id=file_obj.id, chunks=chunks_texts)
self.logger.info(f'Data storage process completed for file: {file_path}')
self.ap.logger.info(f'Data storage process completed for file: {file_path}')
except Exception as e:
session.rollback()
self.logger.error(f'Error in store_data for file {file_path}: {str(e)}', exc_info=True)
self.ap.logger.error(f'Error in store_data for file {file_path}: {str(e)}', exc_info=True)
if file_obj and file_obj.id:
try:
await asyncio.to_thread(self.chroma_manager.delete_by_file_id_sync, file_obj.id)
except Exception as chroma_e:
self.logger.warning(
self.ap.logger.warning(
f'Could not clean up ChromaDB entries for file_id {file_obj.id} after store_data failure: {chroma_e}'
)
raise
@@ -213,13 +209,13 @@ class RAGManager:
"""
Retrieves relevant data chunks based on a given query using the configured retriever.
"""
self.logger.info(f"Starting data retrieval process for query: '{query}'")
self.ap.logger.info(f"Starting data retrieval process for query: '{query}'")
try:
retrieved_chunks = await self.retriever.retrieve(query)
self.logger.info(f'Successfully retrieved {len(retrieved_chunks)} chunks for query.')
self.ap.logger.info(f'Successfully retrieved {len(retrieved_chunks)} chunks for query.')
return retrieved_chunks
except Exception as e:
self.logger.error(f"Error in retrieve_data for query '{query}': {str(e)}", exc_info=True)
self.ap.logger.error(f"Error in retrieve_data for query '{query}': {str(e)}", exc_info=True)
return []
async def delete_data_by_file_id(self, file_id: str):
@@ -227,32 +223,34 @@ class RAGManager:
Deletes all data associated with a specific file ID, including its chunks and vectors,
and the file record itself.
"""
self.logger.info(f'Starting data deletion process for file_id: {file_id}')
self.ap.logger.info(f'Starting data deletion process for file_id: {file_id}')
session = SessionLocal()
try:
# 1. 从 ChromaDB 删除 embeddings
await asyncio.to_thread(self.chroma_manager.delete_by_file_id_sync, file_id)
self.logger.info(f'Deleted embeddings from ChromaDB for file_id: {file_id}')
self.ap.logger.info(f'Deleted embeddings from ChromaDB for file_id: {file_id}')
# 2. 删除与文件关联的 chunks 记录
chunks_to_delete = session.query(Chunk).filter_by(file_id=file_id).all()
for chunk in chunks_to_delete:
session.delete(chunk)
self.logger.info(f'Deleted {len(chunks_to_delete)} chunk records for file_id: {file_id}')
self.ap.logger.info(f'Deleted {len(chunks_to_delete)} chunk records for file_id: {file_id}')
# 3. 删除文件记录本身
file_to_delete = session.query(File).filter_by(id=file_id).first()
if file_to_delete:
session.delete(file_to_delete)
self.logger.info(f'Deleted file record for file_id: {file_id}')
self.ap.logger.info(f'Deleted file record for file_id: {file_id}')
else:
self.logger.warning(f'File with ID {file_id} not found in database. Skipping deletion of file record.')
self.ap.logger.warning(
f'File with ID {file_id} not found in database. Skipping deletion of file record.'
)
session.commit()
self.logger.info(f'Successfully completed data deletion for file_id: {file_id}')
self.ap.logger.info(f'Successfully completed data deletion for file_id: {file_id}')
except Exception as e:
session.rollback()
self.logger.error(f'Error deleting data for file_id {file_id}: {str(e)}', exc_info=True)
self.ap.logger.error(f'Error deleting data for file_id {file_id}: {str(e)}', exc_info=True)
raise
finally:
session.close()
@@ -262,45 +260,39 @@ class RAGManager:
Deletes a knowledge base and all associated files, chunks, and vectors.
This involves querying for associated files and then deleting them.
"""
self.logger.info(f'Starting deletion of knowledge base with ID: {kb_id}')
session = SessionLocal()
self.ap.logger.info(f'Starting deletion of knowledge base with ID: {kb_id}')
session = SessionLocal()
try:
kb_to_delete = session.query(KnowledgeBase).filter_by(id=kb_id).first()
if not kb_to_delete:
self.logger.warning(f'Knowledge Base with ID {kb_id} not found.')
self.ap.logger.warning(f'Knowledge Base with ID {kb_id} not found.')
return
files_to_delete = session.query(File).filter_by(kb_id=kb_id).all()
session.close()
for file_obj in files_to_delete:
try:
await self.delete_data_by_file_id(file_obj.id)
except Exception as file_del_e:
self.logger.error(f'Failed to delete file ID {file_obj.id} during KB deletion: {file_del_e}')
self.ap.logger.error(f'Failed to delete file ID {file_obj.id} during KB deletion: {file_del_e}')
session = SessionLocal()
try:
kb_final_delete = session.query(KnowledgeBase).filter_by(id=kb_id).first()
if kb_final_delete:
session.delete(kb_final_delete)
session.commit()
self.logger.info(f'Successfully deleted knowledge base with ID: {kb_id}')
self.ap.logger.info(f'Successfully deleted knowledge base with ID: {kb_id}')
else:
self.logger.warning(
self.ap.logger.warning(
f'Knowledge Base with ID {kb_id} not found after file deletion, skipping KB deletion.'
)
except Exception as kb_del_e:
session.rollback()
self.logger.error(f'Error deleting KnowledgeBase record for ID {kb_id}: {kb_del_e}', exc_info=True)
self.ap.logger.error(f'Error deleting KnowledgeBase record for ID {kb_id}: {kb_del_e}', exc_info=True)
raise
finally:
session.close()
@@ -309,7 +301,9 @@ class RAGManager:
# 如果在最初获取 KB 或文件列表时出错
if session.is_active:
session.rollback()
self.logger.error(f'Error during overall knowledge base deletion for ID {kb_id}: {str(e)}', exc_info=True)
self.ap.logger.error(
f'Error during overall knowledge base deletion for ID {kb_id}: {str(e)}', exc_info=True
)
raise
finally:
if session.is_active:
@@ -335,29 +329,29 @@ class RAGManager:
"""
Associates a file with a knowledge base by updating the kb_id in the File table.
"""
self.logger.info(f'Associating file ID {file_id} with knowledge base UUID {knowledge_base_uuid}')
self.ap.logger.info(f'Associating file ID {file_id} with knowledge base UUID {knowledge_base_uuid}')
session = SessionLocal()
try:
# 查询知识库是否存在
kb = session.query(KnowledgeBase).filter_by(id=knowledge_base_uuid).first()
if not kb:
self.logger.error(f'Knowledge Base with UUID {knowledge_base_uuid} not found.')
self.ap.logger.error(f'Knowledge Base with UUID {knowledge_base_uuid} not found.')
return
# 更新文件的 kb_id
file_to_update = session.query(File).filter_by(id=file_id).first()
if not file_to_update:
self.logger.error(f'File with ID {file_id} not found.')
self.ap.logger.error(f'File with ID {file_id} not found.')
return
file_to_update.kb_id = kb.id
session.commit()
self.logger.info(
self.ap.logger.info(
f'Successfully associated file ID {file_id} with knowledge base UUID {knowledge_base_uuid}'
)
except Exception as e:
session.rollback()
self.logger.error(
self.ap.logger.error(
f'Error associating file ID {file_id} with knowledge base UUID {knowledge_base_uuid}: {str(e)}',
exc_info=True,
)