Skip to content

Commit

Permalink
Edit k8 script (dgarnitz#96)
Browse files Browse the repository at this point in the history
* fixed k8 deploy script

* moved retries to a separate queue to de-noise the system

* fixed tests

---------

Co-authored-by: David Garnitz <[email protected]>
  • Loading branch information
dgarnitz and David Garnitz authored Nov 18, 2023
1 parent 016c97a commit 888bd53
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 8 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,12 @@ TELEMETRY_DISABLED=True
## Kubernetes
You can run VectorFlow locally in Kubernetes with minikube using `./kube/scripts/deploy-local-k8s.sh`, which will apply all the yaml files located in `kube/`. This script will not work if you have not installed docker, minikube and kubectl.

This script will first build the images locally, then transfer them into minikube.
This script will first build the images locally, then transfer them into minikube. If you want to check what images are available in minikube, run the following:

```
eval $(minikube docker-env)
docker images
```

You will need to run `minikube tunnel` to access the resources located in the cluster from your development machine. The setup script will load the images from your local docker context into minikube's.

Expand Down
1 change: 1 addition & 0 deletions kube/config-map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ data:
RABBITMQ_HOST: rabbitmq
RABBITMQ_PASSWORD: guest
RABBITMQ_USERNAME: guest
RETRY_QUEUE: retry
TELEMETRY_DISABLED: "True"
VDB_UPLOAD_QUEUE: vdb-upload
kind: ConfigMap
Expand Down
8 changes: 7 additions & 1 deletion kube/scripts/deploy-local-k8s.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cd src/
docker build --no-cache --file api/Dockerfile -t vectorflow_api:latest .
docker build --no-cache --file worker/Dockerfile -t vectorflow_worker:latest .
docker build --no-cache --file worker/Dockerfile.vdb-upload-worker -t vectorflow_vdb_upload_worker:latest .
docker build --no-cache --file extractor/Dockerfile -t vectorflow_extractor:latest .
docker build --no-cache --file extract/Dockerfile -t vectorflow_extractor:latest .
docker build --no-cache --file scripts/Dockerfile -t vectorflow-db-init:latest .
docker build --no-cache --file scripts/Dockerfile.minio -t vectorflow-minio-init:latest .
docker build --no-cache --file scripts/Dockerfile.local-qdrant -t vectorflow-qdrant-init:latest .
Expand Down Expand Up @@ -42,6 +42,12 @@ kubectl apply -f kube/rabbitmq-deployment.yaml
kubectl apply -f kube/minio-deployment.yaml
kubectl apply -f kube/qdrant-deployment.yaml

echo "Waiting for deployments to be ready..."
kubectl wait -n vectorflow --for=condition=available --timeout=120s deployment/postgres
kubectl wait -n vectorflow --for=condition=available --timeout=120s deployment/rabbitmq
kubectl wait -n vectorflow --for=condition=available --timeout=120s deployment/minio
kubectl wait -n vectorflow --for=condition=available --timeout=120s deployment/qdrant

echo "Deploying resources with init containers..."
kubectl apply -f kube/db-init.yaml
kubectl apply -f kube/qdrant-init.yaml
Expand Down
5 changes: 5 additions & 0 deletions kube/worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ spec:
configMapKeyRef:
key: EMBEDDING_QUEUE
name: config-map
- name: RETRY_QUEUE
valueFrom:
configMapKeyRef:
key: RETRY_QUEUE
name: config-map
- name: VDB_UPLOAD_QUEUE
valueFrom:
configMapKeyRef:
Expand Down
1 change: 1 addition & 0 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export RABBITMQ_PASSWORD=guest
export RABBITMQ_HOST=rabbitmq
export EXTRACTION_QUEUE=extraction
export EMBEDDING_QUEUE=embeddings
export RETRY_QUEUE=retry
export VDB_UPLOAD_QUEUE=vdb-upload
export LOCAL_VECTOR_DB=qdrant
export API_STORAGE_DIRECTORY=/tmp
Expand Down
5 changes: 4 additions & 1 deletion src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from llama_index import download_loader
from services.minio.minio_service import create_minio_client
from api.posthog import send_telemetry
from datetime import datetime

auth = Auth()
pipeline = Pipeline()
Expand Down Expand Up @@ -79,6 +80,7 @@ def embed():
vectorflow_request_copy = copy.deepcopy(vectorflow_request)
send_telemetry("SINGLE_FILE_UPLOAD_SUCCESS", vectorflow_request_copy)

logging.info(f"{datetime.now()} Successfully created job {job.id} for file {file.filename}")
return jsonify({'message': f"Successfully added {batch_count} batches to the queue", 'JobID': job.id}), 200
else:
return jsonify({'error': 'Uploaded file is not a TXT, PDF, Markdown or DOCX file'}), 400
Expand Down Expand Up @@ -152,11 +154,12 @@ def create_jobs():
pipeline.disconnect()

successfully_uploaded_files[file.filename] = job.id
send_telemetry("MULTI_FILE_UPLOAD_SUCCESS", vectorflow_request)
logging.info(f"{datetime.now()} Successfully created job {job.id} for file {file.filename}")
except Exception as e:
print(f"Error uploading file {file.filename} to min.io, creating job or passing vectorflow request to message broker. \nError: {e}\n\n")
failed_uploads.append(file.filename)

send_telemetry("MULTI_FILE_UPLOAD_SUCCESS", vectorflow_request)
return jsonify({'message': 'Files processed',
'successful_uploads': successfully_uploaded_files,
'failed_uploads': failed_uploads,
Expand Down
4 changes: 2 additions & 2 deletions src/worker/vdb_upload_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from services.rabbitmq.rabbit_service import create_connection_params
from pika.exceptions import AMQPConnectionError

logging.basicConfig(filename='./vdb-upload-log.txt', level=logging.INFO)
logging.basicConfig(filename='./vdb-upload-errors.txt', level=logging.ERROR)
logging.basicConfig(filename='./vdb-log.txt', level=logging.INFO)
logging.basicConfig(filename='./vdb-errors.txt', level=logging.ERROR)

def upload_batch(batch_id, chunks_with_embeddings):
batch = safe_db_operation(batch_service.get_batch, batch_id)
Expand Down
11 changes: 8 additions & 3 deletions src/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
publish_channel = None
connection = None
consume_channel = None
retry_channel = None

def process_batch(batch_id, source_data, vector_db_key, embeddings_api_key):
batch = safe_db_operation(batch_service.get_batch, batch_id)
Expand Down Expand Up @@ -66,7 +67,7 @@ def process_batch(batch_id, source_data, vector_db_key, embeddings_api_key):
if batch.retries < config.MAX_BATCH_RETRIES:
logging.info(f"Adding Batch {batch.id} of job {batch.job_id} to retry queue.\nCurrent attempt {batch.retries} of {config.MAX_BATCH_RETRIES}")
json_data = json.dumps((batch_id, source_data, vector_db_key, embeddings_api_key))
publish_message_to_retry_queue(consume_channel, os.getenv('EMBEDDING_QUEUE'), json_data)
publish_message_to_retry_queue(retry_channel, os.getenv('RETRY_QUEUE'), json_data)
else:
logging.error(f"Max retries reached for batch {batch.id} for job {batch.job_id}.\nBATCH will be marked permanent as FAILED.")

Expand All @@ -77,7 +78,7 @@ def process_batch(batch_id, source_data, vector_db_key, embeddings_api_key):
if batch.retries < config.MAX_BATCH_RETRIES:
logging.info(f"Adding Batch {batch.id} of job {batch.job_id} to retry queue.\nCurrent attempt {batch.retries} of {config.MAX_BATCH_RETRIES}")
json_data = json.dumps((batch_id, source_data, vector_db_key, embeddings_api_key))
publish_message_to_retry_queue(consume_channel, os.getenv('EMBEDDING_QUEUE'), json_data)
publish_message_to_retry_queue(retry_channel, os.getenv('RETRY_QUEUE'), json_data)
else:
logging.error(f"Max retries reached for batch {batch.id} for job {batch.job_id}.\nBATCH will be marked permanent as FAILED.")

Expand All @@ -91,7 +92,7 @@ def process_batch(batch_id, source_data, vector_db_key, embeddings_api_key):
if batch.retries < config.MAX_BATCH_RETRIES:
logging.info(f"Adding Batch {batch.id} of job {batch.job_id} to retry queue.\nCurrent attempt {batch.retries} of {config.MAX_BATCH_RETRIES}")
json_data = json.dumps((batch_id, source_data, vector_db_key, embeddings_api_key))
publish_message_to_retry_queue(consume_channel, os.getenv('EMBEDDING_QUEUE'), json_data)
publish_message_to_retry_queue(retry_channel, os.getenv('RETRY_QUEUE'), json_data)
else:
logging.error(f"Max retries reached for batch {batch.id} for job {batch.job_id}.\nBATCH will be marked permanent as FAILED.")
else:
Expand Down Expand Up @@ -462,19 +463,23 @@ def start_connection(max_retries=5, retry_delay=5):
global publish_channel
global connection
global consume_channel
global retry_channel

for attempt in range(max_retries):
try:
connection_params = create_connection_params()
connection = pika.BlockingConnection(connection_params)
consume_channel = connection.channel()
publish_channel = connection.channel()
retry_channel = connection.channel()

consume_queue_name = os.getenv('EMBEDDING_QUEUE')
publish_queue_name = os.getenv('VDB_UPLOAD_QUEUE')
retry_queue_name = os.getenv('RETRY_QUEUE')

consume_channel.queue_declare(queue=consume_queue_name)
publish_channel.queue_declare(queue=publish_queue_name)
retry_channel.queue_declare(queue=retry_queue_name)

consume_channel.basic_consume(queue=consume_queue_name, on_message_callback=callback)

Expand Down

0 comments on commit 888bd53

Please sign in to comment.