diff --git a/backend/models/brains.py b/backend/models/brains.py index 1ee4ac344f0b..8ad07f35495e 100644 --- a/backend/models/brains.py +++ b/backend/models/brains.py @@ -5,6 +5,7 @@ from models.settings import CommonsDep, common_dependencies from models.users import User from pydantic import BaseModel +from utils.vectors import get_unique_files_from_vector_ids class Brain(BaseModel): @@ -191,46 +192,11 @@ def get_unique_brain_files(self): if len(vector_ids) == 0: return [] - self.files = self.get_unique_files_from_vector_ids(vector_ids) + self.files = get_unique_files_from_vector_ids(vector_ids) print("unique_files", self.files) return self.files - def get_unique_files_from_vector_ids(self, vectors_ids: List[int]): - # Move into Vectors class - """ - Retrieve unique user data vectors. - """ - print("vectors_ids", vectors_ids) - print("tuple(vectors_ids)", tuple(vectors_ids)) - if len(vectors_ids) == 1: - vectors_response = ( - self.commons["supabase"] - .table("vectors") - .select( - "name:metadata->>file_name, size:metadata->>file_size", - count="exact", - ) - .filter("id", "eq", vectors_ids[0]) - .execute() - ) - else: - vectors_response = ( - self.commons["supabase"] - .table("vectors") - .select( - "name:metadata->>file_name, size:metadata->>file_size", - count="exact", - ) - .filter("id", "in", tuple(vectors_ids)) - .execute() - ) - - documents = vectors_response.data # Access the data from the response - # Convert each dictionary to a tuple of items, then to a set to remove duplicates, and then back to a dictionary - unique_files = [dict(t) for t in set(tuple(d.items()) for d in documents)] - return unique_files - def delete_file_from_brain(self, file_name: str): # First, get the vector_ids associated with the file_name vector_response = ( diff --git a/backend/utils/vectors.py b/backend/utils/vectors.py index 5ddc6db98dbb..775e8688d8bf 100644 --- a/backend/utils/vectors.py +++ b/backend/utils/vectors.py @@ -1,8 +1,11 @@ +import multiprocessing as mp +from typing import List + from langchain.embeddings.openai import OpenAIEmbeddings from langchain.schema import Document from llm.utils.summarization import llm_summerize from logger import get_logger -from models.settings import BrainSettings, CommonsDep +from models.settings import BrainSettings, CommonsDep, common_dependencies from pydantic import BaseModel logger = get_logger(__name__) @@ -58,3 +61,66 @@ def create_summary(commons: CommonsDep, document_id, content, metadata): commons["supabase"].table("summaries").update( {"document_id": document_id} ).match({"id": sids[0]}).execute() + + +def error_callback(exception): + print('An exception occurred:', exception) + + +def process_batch(batch_ids): + commons = common_dependencies() + if len(batch_ids) == 1: + return ( + commons["supabase"] + .table("vectors") + .select( + "name:metadata->>file_name, size:metadata->>file_size", + count="exact", + ) + .filter("id", "eq", batch_ids[0]) + .execute() + ).data + else: + return ( + commons["supabase"] + .table("vectors") + .select( + "name:metadata->>file_name, size:metadata->>file_size", + count="exact", + ) + .filter("id", "in", tuple(batch_ids)) + .execute() + ).data + + +def get_unique_files_from_vector_ids(vectors_ids: List[int]): + # Move into Vectors class + """ + Retrieve unique user data vectors. + """ + print("vectors_ids", vectors_ids) + + manager = mp.Manager() + vectors_responses = manager.list() + + # constants + BATCH_SIZE = 5 + + # if __name__ == '__main__': + # multiprocessing pool initialization + + pool = mp.Pool() + results = [] + for i in range(0, len(vectors_ids), BATCH_SIZE): + batch_ids = vectors_ids[i:i + BATCH_SIZE] + result = pool.apply_async(process_batch, args=(batch_ids,), error_callback=error_callback) + results.append(result) + # Retrieve the results + vectors_responses = [result.get() for result in results] + pool.close() + pool.join() + + documents = [item for sublist in vectors_responses for item in sublist] + print('document', documents) + unique_files = [dict(t) for t in set(tuple(d.items()) for d in documents)] + return unique_files