diff --git a/scripts/start_celery_and_flower.sh b/scripts/start_celery_and_flower.sh index 816d7ba..cc05ea6 100755 --- a/scripts/start_celery_and_flower.sh +++ b/scripts/start_celery_and_flower.sh @@ -1,22 +1,43 @@ #!/bin/bash -# Fetch the Rabbitmq IP address by directly invoking the get_amqp_ip function -AMQP_IP=$(python -c 'from poc_celery.get_container_ip import get_amqp_ip; print(get_amqp_ip())') +# Wait for a service to be available on a specific port +wait_for_service() { + local host=$1 + local port=$2 + local timeout=$3 + + echo "Waiting for service on $host:$port..." + for ((i=0; i /dev/null 2>&1; then + echo "Service on $host:$port is available." + return 0 + fi + sleep 1 + done + echo "Timed out waiting for service on $host:$port." + exit 1 +} + +# Check for required commands +command -v nc >/dev/null 2>&1 || { echo >&2 "This script requires 'nc' but it's not installed. Aborting."; exit 1; } + +# Get RabbitMQ IP from a Python utility, assuming the function is reliable and necessary +AMQP_IP=$(python -c 'from src.poc_celery.get_container_ip import get_amqp_ip; print(get_amqp_ip())') # Validate the fetched IP -if [ -z "$AMQP_IP" ]; then - echo "Failed to get Rabbitmq IP address." +if [[ -z "$AMQP_IP" ]]; then + echo "Failed to get RabbitMQ IP address." exit 1 fi -echo "Rabbitmq IP: $AMQP_IP" +# Wait for RabbitMQ to be fully operational +wait_for_service $AMQP_IP 5672 60 -# Start the Celery worker +# Start Celery and Flower using the RabbitMQ IP echo "Starting Celery worker..." -celery -A poc_celery.celery_app worker --loglevel=INFO & +celery -A src.poc_celery.celery_app worker --loglevel=INFO & -# Start Flower -echo "Starting Flower with Rabbitmq at $AMQP_IP..." -celery -A poc_celery.celery_app flower --broker=amqp://guest:guest@{AMQP_IP}:5672 & +echo "Starting Flower for monitoring Celery..." +celery -A src.poc_celery.celery_app flower & echo "Celery and Flower have been started." diff --git a/scripts/stop_celery_and_flower.sh b/scripts/stop_celery_and_flower.sh new file mode 100755 index 0000000..d603562 --- /dev/null +++ b/scripts/stop_celery_and_flower.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +echo "Forcefully killing Celery and Flower processes..." + +# Function to forcefully kill a process by its name +force_kill_process() { + local process_name=$1 + echo "Searching for processes named $process_name to kill..." + + # Use pgrep to find all process IDs matching the process name and kill them + pids=$(pgrep -f "$process_name") + if [ ! -z "$pids" ]; then + echo "Found processes with IDs: $pids. Force killing..." + echo $pids | xargs kill -9 + echo "$process_name processes have been forcefully terminated." + else + echo "No $process_name processes found running." + fi +} + +# Specific names or part of the command that was used to start Celery and Flower +force_kill_process "celery -A src.poc_celery.celery_app worker" +force_kill_process "celery -A src.poc_celery.celery_app flower" + +echo "All relevant Celery and Flower processes have been forcefully killed." diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..d9a5396 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +from src import * diff --git a/src/poc_celery/__init__.py b/src/poc_celery/__init__.py index f2d66a9..ff6170d 100644 --- a/src/poc_celery/__init__.py +++ b/src/poc_celery/__init__.py @@ -1,3 +1,3 @@ -from poc_celery.celery_app import app as celery_app +# from poc_celery.celery_app import app as celery_app -__all__ = ("celery_app",) +# __all__ = ("celery_app",) diff --git a/src/poc_celery/celery_app.py b/src/poc_celery/celery_app.py index a5b0067..d021215 100644 --- a/src/poc_celery/celery_app.py +++ b/src/poc_celery/celery_app.py @@ -1,6 +1,6 @@ from celery import Celery -from poc_celery.get_container_ip import get_amqp_ip, get_redis_ip +from src.poc_celery.get_container_ip import get_amqp_ip, get_redis_ip # Get the Rabbitmq container IP address AMQP_IP = get_amqp_ip() @@ -12,8 +12,8 @@ broker=f"amqp://guest:guest@{AMQP_IP}:5672", backend=f"redis://{REDIS_IP}:6379/0", include=[ - "poc_celery.tasks_async", - "poc_celery.tasks_collectors", + "src.poc_celery.tasks_collectors", + "src.poc_celery.tasks_async", ], ) diff --git a/src/poc_celery/tasks_async.py b/src/poc_celery/tasks_async.py index 7526bdf..322942d 100644 --- a/src/poc_celery/tasks_async.py +++ b/src/poc_celery/tasks_async.py @@ -1,6 +1,6 @@ from pathlib import Path -from poc_celery.celery_app import app +from src.poc_celery.celery_app import app # app = Celery('tasks', broker='your_broker_url', backend='your_backend_url') DATA_DIR = Path(__file__).parent.parent / "data" diff --git a/src/poc_celery/tasks_collectors.py b/src/poc_celery/tasks_collectors.py index 78ad3c9..22c5a74 100644 --- a/src/poc_celery/tasks_collectors.py +++ b/src/poc_celery/tasks_collectors.py @@ -3,7 +3,7 @@ from celery import chord, group -from poc_celery.celery_app import app +from src.poc_celery.celery_app import app def generate_collector_request(topic: str) -> str: