Skip to content

Commit

Permalink
Remove uncessary connectors (dgarnitz#102)
Browse files Browse the repository at this point in the history
* removed unused VDB connectors; removed retry logic that was unnecessary

* updated README
  • Loading branch information
dgarnitz authored Mar 29, 2024
1 parent acce58c commit 7db7dd9
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 273 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ RABBITMQ_PASSWORD=guest
RABBITMQ_HOST=rabbitmq
EMBEDDING_QUEUE=embeddings
VDB_UPLOAD_QUEUE=vdb-upload
LOCAL_VECTOR_DB=qdrant | milvus | weaviate
LOCAL_VECTOR_DB=qdrant | weaviate
API_STORAGE_DIRECTORY=/tmp
MINIO_ACCESS_KEY=minio99
MINIO_SECRET_KEY=minio123
Expand All @@ -89,7 +89,7 @@ Make sure you pull Rabbit MQ, Postgres, Min.io into your local docker repo. We a
```
docker pull rabbitmq
docker pull postgres
docker pull qdrant/qdrant | docker pull milvusdb/milvus | docker pull semitechnologies/weaviate
docker pull qdrant/qdrant | docker pull semitechnologies/weaviate
docker pull minio/minio
```

Expand Down Expand Up @@ -122,9 +122,7 @@ To use VectorFlow for development, make an HTTP request to your API's URL - for

All requests require an HTTP Header with `Authorization` key which is the same as your `INTERNAL_API_KEY` env var that you defined before (see above). You must pass your vector database api key with the HTTP Header `X-VectorDB-Key` if you are running a connecting to a cloud-based instance of a vector DB, and the embedding api key with `X-EmbeddingAPI-Key` if you are using OpenAI. HuggingFace Sentence Transformer embeddings do not require an api key, but you must follow the above steps to run the container with the model you need.

VectorFlow currently supports Pinecone, Qdrant, Weaviate, Milvus, Redis, MongoDB and LanceDB vector databases.

If you are using MongoDB, please note that for the `environment` variable, you will need to pass your MongoDB connection URI, which you can find in your Atlas Console under `Database Deployments->Connect->Drivers`. Your URI should look like `mongodb+srv://<username>:<password>@cluster.mongodb.net/?retryWrites=true&w=majority`, where you will replace `<username>` with your username, and pass the string as the environment variable keeping `<password>` as is in the string. For the password, you pass it in the `X-VectorDB-Key` header option.
VectorFlow currently supports Pinecone, Qdrant and Weaviate vector databases.

#### Embed a Single File
To submit a single file for embedding, make a `POST` request to the `/embed` endpoint with a file attached, the `'Content-Type: multipart/form-data'` header and the following payload:
Expand All @@ -141,7 +139,7 @@ To submit a single file for embedding, make a `POST` request to the `/embed` end
"hugging_face_model_name": "sentence-transformer-model-name-here"
}'
'VectorDBMetadata={
"vector_db_type": "PINECONE | QDRANT | WEAVIATE | MILVUS | REDIS | LANCEDB | MONGODB",
"vector_db_type": "PINECONE | QDRANT | WEAVIATE",
"index_name": "index_name",
"environment": "env_name"
}'
Expand Down Expand Up @@ -304,7 +302,7 @@ We also recommend you add verification evidence, such as screenshots, that show

- [ ] Support for multi-file, directory data ingestion from sources such as Salesforce, Google Drive, etc
- [ ] Retry mechanism
- [ ] Langchain & Llama Index integrations
- [ ] Langchain integrations
- [ ] Support callbacks for writing object metadata to a separate store
- [ ] Dynamically configurable vector DB schemas
- [ ] Deduplication capabilities
Expand All @@ -321,7 +319,7 @@ Note that the `TESTING_ENV` variable is the equivalent of the `environment` fiel
The `testing_clients` directory has sample scripts you can follow to run vectorflow. Add your embedding and database keys to the `env_scrips/env_vars.sh` script that was generated and set the `filepath` variable in `testing_clients/standard_upload_client.py` to point to the file you want to embed. Then run:
```
source env_scrips/env_vars.sh
python clients/standard_upload_client.py
python testing-clients/standard_upload_client.py
```

To upload multiple files at once, use the `testing_clients/streaming_upload_client.py`
Expand Down Expand Up @@ -357,7 +355,7 @@ To perform a search, send a `POST` request to `/images/search` endpoint with an
'ReturnVectors': boolean,
'TopK': integer, less than 1000,
'VectorDBMetadata={
"vector_db_type": "PINECONE | QDRANT | WEAVIATE | MILVUS | REDIS",
"vector_db_type": "PINECONE | QDRANT | WEAVIATE",
"index_name": "index_name",
"environment": "env_name"
}'
Expand Down
13 changes: 3 additions & 10 deletions src/worker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,23 @@ cachetools==5.3.1
certifi==2023.7.22
cffi==1.15.1
charset-normalizer==3.2.0
click==8.1.7
cryptography==41.0.3
decorator==5.1.1
deprecation==2.1.0
dnspython==2.4.1
environs==9.5.0
exceptiongroup==1.1.2
frozenlist==1.4.0
googleapis-common-protos==1.60.0
greenlet==3.0.0
grpc-gateway-protoc-gen-openapiv2==0.1.0
grpcio
grpcio-tools
grpcio==1.56.0
grpcio-tools==1.48.2
h11==0.14.0
h2==4.1.0
hpack==4.0.0
httpcore==0.17.3
httpx==0.24.1
hyperframe==6.0.1
idna==3.4
lancedb
loguru==0.7.0
lz4==4.3.2
Mako==1.2.4
Expand All @@ -51,16 +47,13 @@ pyarrow==13.0.0
pycparser==2.21
pydantic==2.1.1
pydantic_core==2.4.0
pylance
pymilvus==2.2.15
pymongo==4.5.0
pylance==0.8.1
python-dateutil==2.8.2
python-dotenv==1.0.0
pytz==2023.3
PyYAML==6.0.1
qdrant-client==1.4.0
ratelimiter==1.2.0.post0
redis==5.0.0
regex==2023.10.3
requests==2.31.0
retry==0.9.2
Expand Down
201 changes: 0 additions & 201 deletions src/worker/vdb_upload_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
import pinecone
import logging
import weaviate
import redis
import lancedb
import pymongo
import pyarrow as pa
import numpy as np
import worker.config as config
import services.database.batch_service as batch_service
import services.database.job_service as job_service
Expand All @@ -23,7 +18,6 @@
from shared.batch_status import BatchStatus
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
from pymilvus import Collection, connections
from shared.embeddings_type import EmbeddingsType
from shared.vector_db_type import VectorDBType
from shared.utils import generate_uuid_from_tuple
Expand Down Expand Up @@ -64,70 +58,9 @@ def write_embeddings_to_vector_db(chunks, vector_db_metadata, batch_id, job_id):
return write_embeddings_to_qdrant(upsert_list, vector_db_metadata)
elif vector_db_metadata.vector_db_type == VectorDBType.WEAVIATE:
return write_embeddings_to_weaviate(text_embeddings_list, vector_db_metadata, batch_id, job_id, source_filename)
elif vector_db_metadata.vector_db_type == VectorDBType.MILVUS:
upsert_list = create_milvus_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename)
return write_embeddings_to_milvus(upsert_list, vector_db_metadata)
elif vector_db_metadata.vector_db_type == VectorDBType.REDIS:
upsert_list = create_redis_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename)
return write_embeddings_to_redis(upsert_list, vector_db_metadata)
elif vector_db_metadata.vector_db_type == VectorDBType.LANCEDB:
upsert_list = create_lancedb_source_chunks(text_embeddings_list, batch_id, job_id, source_filename)
return write_embeddings_to_lancedb(upsert_list, batch_id)
elif vector_db_metadata.vector_db_type == VectorDBType.MONGODB:
upsert_list = create_mongodb_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename)
return write_embeddings_to_mongodb(upsert_list, vector_db_metadata)
else:
logging.error('Unsupported vector DB type: %s', vector_db_metadata.vector_db_type.value)

def create_mongodb_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename):
upsert_list = []
for i, (source_text, embedding) in enumerate(text_embeddings_list):
upsert_list.append(
{"_id": generate_uuid_from_tuple((job_id, batch_id, i)),
"values": embedding,
"source_text": source_text,
"source_document": source_filename
})
return upsert_list

def write_embeddings_to_mongodb(upsert_list, vector_db_metadata):
mongo_conn_uri = vector_db_metadata.environment
mongo_password = quote_plus(os.getenv('VECTOR_DB_KEY'))
mongo_conn_uri = mongo_conn_uri.replace("<password>", mongo_password)

mongo_client = pymongo.MongoClient(mongo_conn_uri)
db_name, collection = vector_db_metadata.index_name.split(".")
db = mongo_client[db_name]

try:
db.command("ping")
except Exception as e:
logging.error(f"Error connecting to MongoDB via python client: {e}")
return None

if collection not in db.list_collection_names():
logging.error(f"Index {vector_db_metadata.index_name} does not exist in environment {vector_db_metadata.environment}")
return None

index = db.get_collection(collection)

logging.info(f"Starting MongoDB upsert for {len(upsert_list)} vectors")

batch_size = config.PINECONE_BATCH_SIZE
vectors_uploaded = 0

for i in range(0,len(upsert_list), batch_size):
try:
upsert_batch = upsert_list[i:i+batch_size]
upsert_response = index.insert_many(upsert_batch)
vectors_uploaded += len(upsert_batch)
except Exception as e:
logging.error('Error writing embeddings to Mongo:', e)
return None

logging.info(f"Successfully uploaded {vectors_uploaded} vectors to MongoDB")
return vectors_uploaded

def create_pinecone_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename):
upsert_list = []
for i, (source_text, embedding) in enumerate(text_embeddings_list):
Expand Down Expand Up @@ -161,45 +94,6 @@ def write_embeddings_to_pinecone(upsert_list, vector_db_metadata):
logging.info(f"Successfully uploaded {vectors_uploaded} vectors to pinecone")
return vectors_uploaded

def create_redis_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename):
ids = []
source_texts = []
source_documents = []
embeddings = []

for i, (source_text, embedding) in enumerate(text_embeddings_list):
ids.append(generate_uuid_from_tuple((job_id, batch_id, i)))
source_texts.append(source_text)
embeddings.append(embedding)
source_documents.append(source_filename)

return [ids, source_texts, embeddings, source_documents]

def write_embeddings_to_redis(upsert_list, vector_db_metadata):
redis_client = redis.from_url(url=vector_db_metadata.environment, password=os.getenv('VECTOR_DB_KEY'), decode_responses=True)

try:
redis_client.ft(vector_db_metadata.index_name).info()
except redis.exceptions.ResponseError as e:
if "Unknown Index name" in str(e):
logging.error(f"Index {vector_db_metadata.index_name} does not exist at redis URL {vector_db_metadata.environment}")
return None

logging.info(f"Starting redis upsert for {len(upsert_list)} vectors")

redis_pipeline = redis_client.pipeline()

for i in range(0,len(upsert_list[0])):
key = f'{vector_db_metadata.collection}:{upsert_list[0][i]}'
obj = {"source_data": upsert_list[1][i], "embeddings": np.array(upsert_list[2][i]).tobytes(), "source_document": upsert_list[3][i]}

redis_pipeline.hset(key, mapping=obj)

res = redis_pipeline.execute()

logging.info(f"Successfully uploaded {len(res)} vectors to redis")
return len(res)

def create_qdrant_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename):
upsert_list = []
for i, (source_text, embedding) in enumerate(text_embeddings_list):
Expand Down Expand Up @@ -277,101 +171,6 @@ def write_embeddings_to_weaviate(text_embeddings_list, vector_db_metadata, batc
logging.info(f"Successfully uploaded {len(text_embeddings_list)} vectors to Weaviate")
return len(text_embeddings_list)

def create_milvus_source_chunk_dict(text_embeddings_list, batch_id, job_id, source_filename):
ids = []
source_texts = []
embeddings = []
source_filenames = []
for i, (source_text, embedding) in enumerate(text_embeddings_list):
ids.append(generate_uuid_from_tuple((job_id, batch_id, i)))
source_texts.append(source_text)
embeddings.append(embedding)
source_filenames.append(source_filename)
return [ids, source_texts, embeddings, source_filenames]

def write_embeddings_to_milvus(upsert_list, vector_db_metadata):
if vector_db_metadata.environment != os.getenv('LOCAL_VECTOR_DB'):
connections.connect("default",
uri = vector_db_metadata.environment,
token = os.getenv('VECTOR_DB_KEY')
)
else:
connections.connect("default",
host = vector_db_metadata.environment
)

collection = Collection(vector_db_metadata.index_name)
if not collection:
logging.error(f"Index {vector_db_metadata.index_name} does not exist in environment {vector_db_metadata.environment}")
return None

logging.info(f"Starting Milvus insert for {len(upsert_list)} vectors")
batch_size = config.PINECONE_BATCH_SIZE
vectors_uploaded = 0

for i in range(0,len(upsert_list), batch_size):
try:
insert_response = collection.insert(upsert_list[i:i+batch_size])
vectors_uploaded += insert_response.insert_count
except Exception as e:
logging.error('Error writing embeddings to milvus: %s', e)
return None

logging.info(f"Successfully uploaded {vectors_uploaded} vectors to milvus")
return vectors_uploaded

def create_lancedb_source_chunks(text_embeddings_list, batch_id, job_id, source_filename):
upsert_list = []
for i, (source_text, embedding) in enumerate(text_embeddings_list):
upsert_list.append(
{
"id": generate_uuid_from_tuple((job_id, batch_id, i)),
"vector": embedding,
"source_text": source_text,
"source_document": source_filename
}
)
return upsert_list

def write_embeddings_to_lancedb(upsert_list, batch_id):
# right now only local connection, since its serverless and their cloud is in beta
batch = safe_db_operation(batch_service.get_batch, batch_id)
db = lancedb.connect(batch.vector_db_metadata.environment)
try:
table = db.open_table(batch.vector_db_metadata.index_name)
except FileNotFoundError as e:
logging.info(f"Table {batch.vector_db_metadata.index_name} does not exist in environment {batch.vector_db_metadata.environment}.")

if batch.embeddings_metadata.embeddings_type == EmbeddingsType.OPEN_AI:
schema = pa.schema(
[
pa.field("id", pa.string()),
pa.field("vector", pa.list_(pa.float32(), 1536)),
pa.field("source_text", pa.string()),
pa.field("source_document", pa.string()),
])
table = db.create_table(batch.vector_db_metadata.index_name, schema=schema)
logging.info(f"Created table {batch.vector_db_metadata.index_name} in environment {batch.vector_db_metadata.environment}.")
else:
logging.error(f"Embeddings type {batch.embeddings_metadata.embeddings_type} not supported for LanceDB. Only Open AI")
return None

logging.info(f"Starting LanceDB upsert for {len(upsert_list)} vectors")

batch_size = config.PINECONE_BATCH_SIZE
vectors_uploaded = 0

for i in range(0,len(upsert_list), batch_size):
try:
table.add(data=upsert_list[i:i+batch_size])
vectors_uploaded += batch_size
except Exception as e:
logging.error('Error writing embeddings to lance db:', e)
return None

logging.info(f"Successfully uploaded {vectors_uploaded} vectors to lance db")
return vectors_uploaded

# TODO: refactor into utils
def update_batch_and_job_status(job_id, batch_status, batch_id):
try:
Expand Down
Loading

0 comments on commit 7db7dd9

Please sign in to comment.