From 6d788cadbc24348355810e7c7ae2d0b58f2e6124 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Fri, 11 Jul 2025 21:37:31 +0800 Subject: [PATCH] fix: the fucking logger --- pkg/rag/knowledge/mgr.py | 102 ++++++++++++++++++--------------------- 1 file changed, 48 insertions(+), 54 deletions(-) diff --git a/pkg/rag/knowledge/mgr.py b/pkg/rag/knowledge/mgr.py index 585a5075..89e5b393 100644 --- a/pkg/rag/knowledge/mgr.py +++ b/pkg/rag/knowledge/mgr.py @@ -1,6 +1,5 @@ # rag_manager.py from __future__ import annotations -import logging import os import asyncio import uuid @@ -13,9 +12,8 @@ from pkg.core import app class RAGManager: ap: app.Application - def __init__(self, ap: app.Application, logger: logging.Logger = None): + def __init__(self, ap: app.Application): self.ap = ap - self.logger = logger or logging.getLogger(__name__) self.chroma_manager = None self.parser = FileParser() self.chunker = Chunker() @@ -52,20 +50,20 @@ class RAGManager: session.add(new_kb) session.commit() session.refresh(new_kb) - self.logger.info(f"Knowledge Base '{kb_name}' created.") + self.ap.logger.info(f"Knowledge Base '{kb_name}' created.") return new_kb.id else: - self.logger.info(f"Knowledge Base '{kb_name}' already exists.") + self.ap.logger.info(f"Knowledge Base '{kb_name}' already exists.") except Exception as e: session.rollback() - self.logger.error(f"Error in _create_kb_sync for '{kb_name}': {str(e)}", exc_info=True) + self.ap.logger.error(f"Error in _create_kb_sync for '{kb_name}': {str(e)}", exc_info=True) raise finally: session.close() return await asyncio.to_thread(_create_kb_sync) except Exception as e: - self.logger.error(f"Error creating knowledge base '{kb_name}': {str(e)}", exc_info=True) + self.ap.logger.error(f"Error creating knowledge base '{kb_name}': {str(e)}", exc_info=True) raise async def get_all_knowledge_bases(self): @@ -83,7 +81,7 @@ class RAGManager: return await asyncio.to_thread(_get_all_kbs_sync) except Exception as e: - self.logger.error(f'Error retrieving knowledge bases: {str(e)}', exc_info=True) + self.ap.logger.error(f'Error retrieving knowledge bases: {str(e)}', exc_info=True) return [] async def get_knowledge_base_by_id(self, kb_id: str): @@ -101,7 +99,7 @@ class RAGManager: return await asyncio.to_thread(_get_kb_sync, kb_id) except Exception as e: - self.logger.error(f'Error retrieving knowledge base with ID {kb_id}: {str(e)}', exc_info=True) + self.ap.logger.error(f'Error retrieving knowledge base with ID {kb_id}: {str(e)}', exc_info=True) return None async def get_files_by_knowledge_base(self, kb_id: str): @@ -119,7 +117,7 @@ class RAGManager: return await asyncio.to_thread(_get_files_sync, kb_id) except Exception as e: - self.logger.error(f'Error retrieving files for knowledge base ID {kb_id}: {str(e)}', exc_info=True) + self.ap.logger.error(f'Error retrieving files for knowledge base ID {kb_id}: {str(e)}', exc_info=True) return [] async def get_all_files(self): @@ -138,7 +136,7 @@ class RAGManager: return await asyncio.to_thread(_get_all_files_sync) except Exception as e: - self.logger.error(f'Error retrieving all files: {str(e)}', exc_info=True) + self.ap.logger.error(f'Error retrieving all files: {str(e)}', exc_info=True) return [] async def store_data( @@ -148,27 +146,25 @@ class RAGManager: Parses, chunks, embeds, and stores data from a given file into the RAG system. Associates the file with a knowledge base using kb_id in the File table. """ - self.logger.info(f'Starting data storage process for file: {file_path}') + self.ap.logger.info(f'Starting data storage process for file: {file_path}') session = SessionLocal() file_obj = None try: - kb = session.query(KnowledgeBase).filter_by(name=kb_name).first() if not kb: kb = KnowledgeBase(name=kb_name, description=kb_description) session.add(kb) session.commit() session.refresh(kb) - self.logger.info(f"Knowledge Base '{kb_name}' created during store_data.") + self.ap.logger.info(f"Knowledge Base '{kb_name}' created during store_data.") else: - self.logger.info(f"Knowledge Base '{kb_name}' already exists.") + self.ap.logger.info(f"Knowledge Base '{kb_name}' already exists.") - file_name = os.path.basename(file_path) existing_file = session.query(File).filter_by(kb_id=kb.id, file_name=file_name).first() if existing_file: - self.logger.warning( + self.ap.logger.warning( f"File '{file_name}' already exists in knowledge base '{kb_name}'. Skipping storage." ) return @@ -177,32 +173,32 @@ class RAGManager: session.add(file_obj) session.commit() session.refresh(file_obj) - self.logger.info( + self.ap.logger.info( f"File record '{file_name}' added to database with ID: {file_obj.id}, associated with KB ID: {kb.id}" ) - text = await self.parser.parse(file_path) if not text: - self.logger.warning(f'No text extracted from file {file_path}. Deleting file record ID: {file_obj.id}.') + self.ap.logger.warning( + f'No text extracted from file {file_path}. Deleting file record ID: {file_obj.id}.' + ) session.delete(file_obj) session.commit() return - chunks_texts = await self.chunker.chunk(text) - self.logger.info(f"Chunked file '{file_name}' into {len(chunks_texts)} chunks.") + self.ap.logger.info(f"Chunked file '{file_name}' into {len(chunks_texts)} chunks.") await self.embedder.embed_and_store(file_id=file_obj.id, chunks=chunks_texts) - self.logger.info(f'Data storage process completed for file: {file_path}') + self.ap.logger.info(f'Data storage process completed for file: {file_path}') except Exception as e: session.rollback() - self.logger.error(f'Error in store_data for file {file_path}: {str(e)}', exc_info=True) + self.ap.logger.error(f'Error in store_data for file {file_path}: {str(e)}', exc_info=True) if file_obj and file_obj.id: try: await asyncio.to_thread(self.chroma_manager.delete_by_file_id_sync, file_obj.id) except Exception as chroma_e: - self.logger.warning( + self.ap.logger.warning( f'Could not clean up ChromaDB entries for file_id {file_obj.id} after store_data failure: {chroma_e}' ) raise @@ -213,13 +209,13 @@ class RAGManager: """ Retrieves relevant data chunks based on a given query using the configured retriever. """ - self.logger.info(f"Starting data retrieval process for query: '{query}'") + self.ap.logger.info(f"Starting data retrieval process for query: '{query}'") try: retrieved_chunks = await self.retriever.retrieve(query) - self.logger.info(f'Successfully retrieved {len(retrieved_chunks)} chunks for query.') + self.ap.logger.info(f'Successfully retrieved {len(retrieved_chunks)} chunks for query.') return retrieved_chunks except Exception as e: - self.logger.error(f"Error in retrieve_data for query '{query}': {str(e)}", exc_info=True) + self.ap.logger.error(f"Error in retrieve_data for query '{query}': {str(e)}", exc_info=True) return [] async def delete_data_by_file_id(self, file_id: str): @@ -227,32 +223,34 @@ class RAGManager: Deletes all data associated with a specific file ID, including its chunks and vectors, and the file record itself. """ - self.logger.info(f'Starting data deletion process for file_id: {file_id}') + self.ap.logger.info(f'Starting data deletion process for file_id: {file_id}') session = SessionLocal() try: # 1. 从 ChromaDB 删除 embeddings await asyncio.to_thread(self.chroma_manager.delete_by_file_id_sync, file_id) - self.logger.info(f'Deleted embeddings from ChromaDB for file_id: {file_id}') + self.ap.logger.info(f'Deleted embeddings from ChromaDB for file_id: {file_id}') # 2. 删除与文件关联的 chunks 记录 chunks_to_delete = session.query(Chunk).filter_by(file_id=file_id).all() for chunk in chunks_to_delete: session.delete(chunk) - self.logger.info(f'Deleted {len(chunks_to_delete)} chunk records for file_id: {file_id}') + self.ap.logger.info(f'Deleted {len(chunks_to_delete)} chunk records for file_id: {file_id}') # 3. 删除文件记录本身 file_to_delete = session.query(File).filter_by(id=file_id).first() if file_to_delete: session.delete(file_to_delete) - self.logger.info(f'Deleted file record for file_id: {file_id}') + self.ap.logger.info(f'Deleted file record for file_id: {file_id}') else: - self.logger.warning(f'File with ID {file_id} not found in database. Skipping deletion of file record.') + self.ap.logger.warning( + f'File with ID {file_id} not found in database. Skipping deletion of file record.' + ) session.commit() - self.logger.info(f'Successfully completed data deletion for file_id: {file_id}') + self.ap.logger.info(f'Successfully completed data deletion for file_id: {file_id}') except Exception as e: session.rollback() - self.logger.error(f'Error deleting data for file_id {file_id}: {str(e)}', exc_info=True) + self.ap.logger.error(f'Error deleting data for file_id {file_id}: {str(e)}', exc_info=True) raise finally: session.close() @@ -262,45 +260,39 @@ class RAGManager: Deletes a knowledge base and all associated files, chunks, and vectors. This involves querying for associated files and then deleting them. """ - self.logger.info(f'Starting deletion of knowledge base with ID: {kb_id}') - session = SessionLocal() + self.ap.logger.info(f'Starting deletion of knowledge base with ID: {kb_id}') + session = SessionLocal() try: kb_to_delete = session.query(KnowledgeBase).filter_by(id=kb_id).first() if not kb_to_delete: - self.logger.warning(f'Knowledge Base with ID {kb_id} not found.') + self.ap.logger.warning(f'Knowledge Base with ID {kb_id} not found.') return - files_to_delete = session.query(File).filter_by(kb_id=kb_id).all() - session.close() - for file_obj in files_to_delete: try: await self.delete_data_by_file_id(file_obj.id) except Exception as file_del_e: - self.logger.error(f'Failed to delete file ID {file_obj.id} during KB deletion: {file_del_e}') - + self.ap.logger.error(f'Failed to delete file ID {file_obj.id} during KB deletion: {file_del_e}') - session = SessionLocal() try: - kb_final_delete = session.query(KnowledgeBase).filter_by(id=kb_id).first() if kb_final_delete: session.delete(kb_final_delete) session.commit() - self.logger.info(f'Successfully deleted knowledge base with ID: {kb_id}') + self.ap.logger.info(f'Successfully deleted knowledge base with ID: {kb_id}') else: - self.logger.warning( + self.ap.logger.warning( f'Knowledge Base with ID {kb_id} not found after file deletion, skipping KB deletion.' ) except Exception as kb_del_e: session.rollback() - self.logger.error(f'Error deleting KnowledgeBase record for ID {kb_id}: {kb_del_e}', exc_info=True) + self.ap.logger.error(f'Error deleting KnowledgeBase record for ID {kb_id}: {kb_del_e}', exc_info=True) raise finally: session.close() @@ -309,7 +301,9 @@ class RAGManager: # 如果在最初获取 KB 或文件列表时出错 if session.is_active: session.rollback() - self.logger.error(f'Error during overall knowledge base deletion for ID {kb_id}: {str(e)}', exc_info=True) + self.ap.logger.error( + f'Error during overall knowledge base deletion for ID {kb_id}: {str(e)}', exc_info=True + ) raise finally: if session.is_active: @@ -335,29 +329,29 @@ class RAGManager: """ Associates a file with a knowledge base by updating the kb_id in the File table. """ - self.logger.info(f'Associating file ID {file_id} with knowledge base UUID {knowledge_base_uuid}') + self.ap.logger.info(f'Associating file ID {file_id} with knowledge base UUID {knowledge_base_uuid}') session = SessionLocal() try: # 查询知识库是否存在 kb = session.query(KnowledgeBase).filter_by(id=knowledge_base_uuid).first() if not kb: - self.logger.error(f'Knowledge Base with UUID {knowledge_base_uuid} not found.') + self.ap.logger.error(f'Knowledge Base with UUID {knowledge_base_uuid} not found.') return # 更新文件的 kb_id file_to_update = session.query(File).filter_by(id=file_id).first() if not file_to_update: - self.logger.error(f'File with ID {file_id} not found.') + self.ap.logger.error(f'File with ID {file_id} not found.') return file_to_update.kb_id = kb.id session.commit() - self.logger.info( + self.ap.logger.info( f'Successfully associated file ID {file_id} with knowledge base UUID {knowledge_base_uuid}' ) except Exception as e: session.rollback() - self.logger.error( + self.ap.logger.error( f'Error associating file ID {file_id} with knowledge base UUID {knowledge_base_uuid}: {str(e)}', exc_info=True, )