Skip to content

Commit

Permalink
Refactored launch-mpi-jobs command
Browse files Browse the repository at this point in the history
  • Loading branch information
vanderlei-filho committed Dec 19, 2023
1 parent c440aaa commit ca0449a
Show file tree
Hide file tree
Showing 38 changed files with 276 additions and 5,460 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ create-cluster: ## create a cluster using the cluster_config.yaml configutaion
destroy-cluster: ## destroy the test_cluster
python manage.py destroy_cluster

mpi-launch: ## launch an MPI job
python manage.py mpi_launch --cluster-config-id test_cluster
launch-mpi-jobs: ## launch all MPI jobs configured in mpi_run.yaml
python manage.py launch_mpi_jobs --cluster-config-id test_cluster
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ and rename it to `mpi_run.yaml`. Edit the file as you like, and then run
from the command-line:

```shell
make mpi-launch
make launch-mpi-jobs
```

# Publications
Expand All @@ -96,6 +96,8 @@ Please, find bellow the list of publications related to the HPC@Cloud Toolkit. I

- Vanderlei Munhoz, Márcio Castro, Odorico Mendizabal. *Strategies for Fault-Tolerant Tightly-coupled HPC Workloads Running on Low-Budget Spot Cloud Infrastructures*. **International Symposium on Computer Architecture and High Performance Computing (SBAC-PAD)**. Bordeaux, France: IEEE Computer Society, 2022. [[link]](https://doi.org/10.1109/SBAC-PAD55451.2022.00037) [[bib]](http://www.inf.ufsc.br/~marcio.castro/bibs/2022_sbacpad.bib)

- Vanderlei Munhoz, Márcio Castro. *Enabling the execution of HPC applications on public clouds with HPC@Cloud toolkit*. **Concurrency and Computation Practice and Experience (CCPE)**, 2023. [[link]](https://doi.org/10.1002/cpe.7976)

- Vanderlei Munhoz, Márcio Castro. *HPC@Cloud: A Provider-Agnostic Software Framework for Enabling HPC in Public Cloud Platforms*. **Simpósio em Sistemas Computacionais de Alto Desempenho (WSCAD)**. Florianópolis, Brazil: SBC, 2022. [[link]](https://doi.org/10.5753/wscad.2022.226528) [[bib]](http://www.inf.ufsc.br/~marcio.castro/bibs/2022_wscad.bib)

- Daniel Cordeiro, Emilio Francesquini, Marcos Amaris, Márcio Castro, Alexandro Baldassin, João Vicente Lima. *Green Cloud Computing: Challenges and Opportunities*. **Simpósio Brasileiro de Sistemas de Informação (SBSI)**. Maceió, Brazil: SBC, 2023. [[link]](http://dx.doi.org/10.5753/sbsi_estendido.2023.229291)
Expand Down
131 changes: 55 additions & 76 deletions hpcc_api/clusters/management/commands/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from minio import Minio

from hpcc_api.clusters.models import ClusterConfiguration
from hpcc_api.utils.files import generate_hostfile, transfer_folder_over_ssh


TF_DIR = "./tmp_terraform_dir"
Expand Down Expand Up @@ -65,92 +66,70 @@ class Command(BaseCommand):
help = "Spawns a Cluster from a previously created ClusterConfiguration."

def add_arguments(self, parser):
parser.add_argument(
"config_label",
type=str,
help="The ClusterConfiguration label",
)
parser.add_argument("config_label", type=str)

def print_success(self, message):
self.stdout.write(self.style.SUCCESS(message))

def print_error(self, message):
self.stdout.write(self.style.ERROR(message))

def handle(self, *args, **options):
config_label = options["config_label"]

# Read ClusterConfiguration
cluster_config = ClusterConfiguration.objects.get(label=config_label)

# Launch cluster
master_node_ip = create_cluster(cluster_config)
# Update cluster `entrypoint_ip`:
cluster_config.entrypoint_ip = master_node_ip[0]
cluster_config.save()
ip = cluster_config.entrypoint_ip
user = cluster_config.username
self.print_success(
f"Successfully spawned a Cluster using the `{cluster_config.label} ClusterConfiguration`!"
)

if len(master_node_ip) == 0:
subprocess.run(
["terraform", "destroy", "-auto-approve"], cwd=TF_DIR, check=True
)
self.stdout.write(
self.style.ERROR(
f"Failed spawning a cluster based on ClusterConfig `{cluster_config}`.\n"
"All resources DESTROYED!"
)
# Generate hostfile
ppn = round(cluster_config.vcpus/cluster_config.nodes)
generate_hostfile(
number_of_nodes=cluster_config.nodes,
processes_per_node=ppn,
hostfile_path="./my_files/hostfile",
)
self.print_success(f"Successfully generated hostfile for OpenMPI!")

# Copy everything inside `my_files` to the shared dir inside the Cluster
shared_dir_path = None
if cluster_config.fsx:
shared_dir_path = "/fsx"
elif cluster_config.nfs:
shared_dir_path = "/var/nfs_dir"

if shared_dir_path:
self.print_success(f"Transfering `/my_files` to `{shared_dir_path}`...")
transfer_folder_over_ssh(
local_folder_path="./my_files",
remote_destination_path=shared_dir_path,
ip=ip,
user=user,
)
self.print_success(f"Sucessfully transferred files to cluster!")
else:
# Update `entrypoint_ip`:
cluster_config.entrypoint_ip = master_node_ip[0]
cluster_config.save()

ip = cluster_config.entrypoint_ip
user = cluster_config.username

self.stdout.write(
self.style.SUCCESS(
f"Successfully spawned a Cluster using the `{cluster_config.label} ClusterConfiguration`!"
)
self.print_error(
f"Files in `/my_files` won't be transferred to the cluster (no shared directory)"
)

ppn = round(cluster_config.vcpus/cluster_config.nodes)
self.print_success(
textwrap.dedent(
f"""
To access your cluster over the command-line, use SSH:
ssh {user}@{ip}
# Generate hostfile
self.stdout.write(
self.style.SUCCESS("Generating hostfile...")
)
base_ip = "10.0.0."
hostfile_path = "./my_files/hostfile"
if os.path.exists(hostfile_path):
os.remove(hostfile_path)
with open(hostfile_path, "w") as file:
for i in range(10, 10 + cluster_config.nodes):
file.write(f"{base_ip}{i} slots={ppn}\n")

# Copy everything inside `my_files` to the shared dir inside the Cluster
shared_dir_path = None
if cluster_config.fsx:
shared_dir_path = "/fsx"
elif cluster_config.nfs:
shared_dir_path = "/var/nfs_dir"

if shared_dir_path:
self.stdout.write(
self.style.SUCCESS(f"Transfering `/my_files` to `{shared_dir_path}`...")
)
subprocess.run(
[
"scp",
"-r",
"./my_files",
f"{user}@{ip}:/var/nfs_dir/my_files",
],
check=True,
)
else:
self.stdout.write(
self.style.ERROR(f"Files in `/my_files` won't be transferred to the cluster (no shared directory)")
)

self.stdout.write(
self.style.SUCCESS(
textwrap.dedent(
f"""
To access your cluster over the command-line, use SSH:
ssh {user}@{ip}
Total Nodes: {cluster_config.nodes}
Total vCPU cores: {cluster_config.vcpus}
Maximum MPI ranks per node: {ppn}
"""
)
)
Total Nodes: {cluster_config.nodes}
Total vCPU cores: {cluster_config.vcpus}
Maximum MPI ranks per node: {ppn}
"""
)
)
10 changes: 7 additions & 3 deletions hpcc_api/clusters/management/commands/destroy_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ def destroy_cluster():
class Command(BaseCommand):
help = "Spawns a Cluster from a previously created ClusterConfiguration."

def print_success(self, message):
self.stdout.write(self.style.SUCCESS(message))

def print_error(self, message):
self.stdout.write(self.style.ERROR(message))

def handle(self, *args, **options):
destroy_cluster()

self.stdout.write(
self.style.SUCCESS(f"Successfully DESTROYED all created cloud resources.")
)
self.print_error(f"All cluster cloud resources were deleted.")
112 changes: 112 additions & 0 deletions hpcc_api/experiments/management/commands/launch_mpi_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
import sys
import time

from django.core.management.base import BaseCommand

from hpcc_api.clusters.models import ClusterConfiguration
from hpcc_api.clusters.management.commands.create_cluster import create_cluster
from hpcc_api.clusters.management.commands.destroy_cluster import destroy_cluster
from hpcc_api.utils.files import load_yaml
from hpcc_api.utils.process import launch_over_ssh


class Command(BaseCommand):
help = "Run an MPI workload."

def add_arguments(self, parser):
parser.add_argument("--cluster-config-id", type=str)

def print_success(self, message):
self.stdout.write(self.style.SUCCESS(message))

def print_error(self, message):
self.stdout.write(self.style.ERROR(message))

def handle(self, *args, **options):
cluster_config_id = options["cluster_config_id"]

try:
mpi_run_yaml = load_yaml("./mpi_run.yaml")

cluster_config = ClusterConfiguration.objects.get(
label=cluster_config_id,
)
ip = cluster_config.entrypoint_ip
user = cluster_config.username

mpi_jobs_logs = []
mpi_jobs = mpi_run_yaml["mpi_jobs"]

# Compile target application
self.print_success(f"{len(mpi_jobs)} MPI jobs read from `mpi_run.yaml`, starting...")

for n, mpi_job in enumerate(mpi_jobs):
self.print_success(f"Starting MPI job {n+1} of {len(mpi_jobs)}: {mpi_job['label']}")

# Setup
setup_start = time.time()

# Make sure cluster is healthy before launching MPI job
self.print_success("Checking Cluster health...")
time.sleep(25) # wait while the provider updates the terraform state
create_cluster(cluster_config)

setup_status = launch_over_ssh(mpi_job['setup_command'], ip=ip, user=user, track_output=True)
setup_end = time.time()
setup_dt = setup_end - setup_start

run_failures = 0
run_start = time.time()
run_status = launch_over_ssh(mpi_job['run_command'], ip=ip, user=user, track_output=True)

if run_status == 0: # successful execution in first try
run_end = time.time()
run_dt = run_end - run_start
elif mpi_job.get("restore_command") is not None:
self.print_error("Failure occurred during MPI Job! Running restore command...")
# Retry until success or after maximum retries are reached
while run_status != 0 and run_failures < mpi_job['maximum_retries']:
time.sleep(25) # wait while the provider updates the terraform state

# Make sure cluster is healthy before launching MPI job
self.print_success("Checking Cluster health...")
create_cluster(cluster_config)

run_status = launch_over_ssh(mpi_job['restore_command'], ip=ip, user=user, track_output=True)
if run_status != 0:
run_failures += 1 # increase failure counter
run_end = time.time()
run_dt = run_end - run_start
else:
run_end = time.time()
run_dt = run_end - run_start
run_status = -1
run_failures = 1

# Append timing results
mpi_jobs_logs.append(
{
"label": mpi_job["label"],
"setup_status": "SUCCESS" if setup_status == 0 else "FAILURE",
"setup_duration": setup_dt,
"run_status": "SUCCESS" if run_status == 0 else "FAILURE",
"run_duration": run_dt,
"run_failures": run_failures,
}
)

except Exception as error:
self.print_error(f"CommandError: {error}")
sys.exit(1)

else:
self.print_success(f"Successfully executed {len(mpi_jobs)} MPI jobs defined in `mpi_run.yaml`.")
for n, log in enumerate(mpi_jobs_logs):
print(json.dumps(log, indent=4))

finally:
if mpi_run_yaml["delete_cluster_after"]:
destroy_cluster()
else:
self.print_error(f"\n !!! Remember to destroy your Cluster !!!\n")
Loading

0 comments on commit ca0449a

Please sign in to comment.