Skip to content

Commit

Permalink
🐛 send select request supabase in batches with multiprocessing (Quivr…
Browse files Browse the repository at this point in the history
  • Loading branch information
gozineb authored Jul 5, 2023
1 parent d51d4a1 commit 01ea71a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 37 deletions.
38 changes: 2 additions & 36 deletions backend/models/brains.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from models.settings import CommonsDep, common_dependencies
from models.users import User
from pydantic import BaseModel
from utils.vectors import get_unique_files_from_vector_ids


class Brain(BaseModel):
Expand Down Expand Up @@ -191,46 +192,11 @@ def get_unique_brain_files(self):
if len(vector_ids) == 0:
return []

self.files = self.get_unique_files_from_vector_ids(vector_ids)
self.files = get_unique_files_from_vector_ids(vector_ids)
print("unique_files", self.files)

return self.files

def get_unique_files_from_vector_ids(self, vectors_ids: List[int]):
# Move into Vectors class
"""
Retrieve unique user data vectors.
"""
print("vectors_ids", vectors_ids)
print("tuple(vectors_ids)", tuple(vectors_ids))
if len(vectors_ids) == 1:
vectors_response = (
self.commons["supabase"]
.table("vectors")
.select(
"name:metadata->>file_name, size:metadata->>file_size",
count="exact",
)
.filter("id", "eq", vectors_ids[0])
.execute()
)
else:
vectors_response = (
self.commons["supabase"]
.table("vectors")
.select(
"name:metadata->>file_name, size:metadata->>file_size",
count="exact",
)
.filter("id", "in", tuple(vectors_ids))
.execute()
)

documents = vectors_response.data # Access the data from the response
# Convert each dictionary to a tuple of items, then to a set to remove duplicates, and then back to a dictionary
unique_files = [dict(t) for t in set(tuple(d.items()) for d in documents)]
return unique_files

def delete_file_from_brain(self, file_name: str):
# First, get the vector_ids associated with the file_name
vector_response = (
Expand Down
68 changes: 67 additions & 1 deletion backend/utils/vectors.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import multiprocessing as mp
from typing import List

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.schema import Document
from llm.utils.summarization import llm_summerize
from logger import get_logger
from models.settings import BrainSettings, CommonsDep
from models.settings import BrainSettings, CommonsDep, common_dependencies
from pydantic import BaseModel

logger = get_logger(__name__)
Expand Down Expand Up @@ -58,3 +61,66 @@ def create_summary(commons: CommonsDep, document_id, content, metadata):
commons["supabase"].table("summaries").update(
{"document_id": document_id}
).match({"id": sids[0]}).execute()


def error_callback(exception):
print('An exception occurred:', exception)


def process_batch(batch_ids):
commons = common_dependencies()
if len(batch_ids) == 1:
return (
commons["supabase"]
.table("vectors")
.select(
"name:metadata->>file_name, size:metadata->>file_size",
count="exact",
)
.filter("id", "eq", batch_ids[0])
.execute()
).data
else:
return (
commons["supabase"]
.table("vectors")
.select(
"name:metadata->>file_name, size:metadata->>file_size",
count="exact",
)
.filter("id", "in", tuple(batch_ids))
.execute()
).data


def get_unique_files_from_vector_ids(vectors_ids: List[int]):
# Move into Vectors class
"""
Retrieve unique user data vectors.
"""
print("vectors_ids", vectors_ids)

manager = mp.Manager()
vectors_responses = manager.list()

# constants
BATCH_SIZE = 5

# if __name__ == '__main__':
# multiprocessing pool initialization

pool = mp.Pool()
results = []
for i in range(0, len(vectors_ids), BATCH_SIZE):
batch_ids = vectors_ids[i:i + BATCH_SIZE]
result = pool.apply_async(process_batch, args=(batch_ids,), error_callback=error_callback)
results.append(result)
# Retrieve the results
vectors_responses = [result.get() for result in results]
pool.close()
pool.join()

documents = [item for sublist in vectors_responses for item in sublist]
print('document', documents)
unique_files = [dict(t) for t in set(tuple(d.items()) for d in documents)]
return unique_files

0 comments on commit 01ea71a

Please sign in to comment.