Skip to content

Commit

Permalink
[fgi] ensure specific cluster free
Browse files Browse the repository at this point in the history
Closes: diem#8982
  • Loading branch information
rustielin authored and bors-libra committed Aug 23, 2021
1 parent bf8b2b2 commit b9dd7cb
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 45 deletions.
24 changes: 10 additions & 14 deletions scripts/fgi/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,10 @@ def get_cluster_context(cluster_name):
def get_cluster_name_from_context(context):
return context.split("/")[1]

# randomly select a cluster that is free based on its pod status:
# - no other forge pods currently Running or Pending
# - all monitoring pods are ready
def kube_select_cluster():
shuffled_clusters = random.sample(FORGE_K8S_CLUSTERS, len(FORGE_K8S_CLUSTERS))
def kube_ensure_cluster(clusters):
attempts = 360
for attempt in range(attempts):
for cluster in shuffled_clusters:
for cluster in clusters:
context = get_cluster_context(cluster)
running_pods = get_forge_pods_by_phase(context, "Running")
pending_pods = get_forge_pods_by_phase(context, "Pending")
Expand All @@ -103,14 +99,7 @@ def kube_select_cluster():
num_pending_pods = len(pending_pods["items"])
for pod in monitoring_pods["items"]:
pod_name = pod["metadata"]["name"]
healthy = all(
list(
map(
lambda container: container["ready"],
pod["status"]["containerStatuses"],
)
)
)
healthy = pod["status"]["phase"] == "Running"
if not healthy:
print(
f"{cluster} has an unhealthy monitoring pod {pod_name}. Skipping."
Expand All @@ -131,6 +120,13 @@ def kube_select_cluster():
print("Failed to schedule forge pod. All clusters are busy")
return None

# randomly select a cluster that is free based on its pod status:
# - no other forge pods currently Running or Pending
# - all monitoring pods are ready
def kube_select_cluster():
shuffled_clusters = random.sample(FORGE_K8S_CLUSTERS, len(FORGE_K8S_CLUSTERS))
return kube_ensure_cluster(shuffled_clusters)


def kube_wait_job(job_name, context):
attempts = 360
Expand Down
6 changes: 6 additions & 0 deletions scripts/fgi/run
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import getpass, os, subprocess, sys
from kube import (
kube_init_context,
kube_select_cluster,
kube_ensure_cluster,
get_cluster_context,
kube_wait_job,
create_forge_job,
Expand Down Expand Up @@ -129,6 +130,11 @@ if not args.workspace:
if not workspace:
print(f"{FAIL}Failed to select forge testnet cluster{RESTORE}")
sys.exit(1)
else:
ret = kube_ensure_cluster([workspace])
if not ret:
print(f"{FAIL}Failed to acquire specified forge testnet cluster {workspace}{RESTORE}")
sys.exit(1)
context = get_cluster_context(workspace)
print(f"Running experiments on cluster: {workspace}")
grafana_url = get_grafana_url(workspace)
Expand Down
56 changes: 25 additions & 31 deletions testsuite/forge/src/backend/k8s/cluster_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ pub fn set_eks_nodegroup_size(
) -> Result<()> {
// https://github.com/lucdew/rusoto-example/blob/master/src/client.rs
// Create rusoto client through an http proxy
let eks_client = create_eks_client(auth_with_k8s_env).unwrap();
let eks_client = create_eks_client(auth_with_k8s_env)?;
println!("Created rusoto http client");

// nodegroup scaling factors
Expand All @@ -440,31 +440,25 @@ pub fn set_eks_nodegroup_size(
};

// submit the scaling requests
let rt = Runtime::new().unwrap();
let validators_update_id = rt
.block_on(submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"validators",
validator_scaling,
))
.unwrap();
let utilities_update_id = rt
.block_on(submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"utilities",
utilities_scaling,
))
.unwrap();
let trusted_update_id = rt
.block_on(submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"trusted",
trusted_scaling,
))
.unwrap();
let rt = Runtime::new()?;
let validators_update_id = rt.block_on(submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"validators",
validator_scaling,
))?;
let utilities_update_id = rt.block_on(submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"utilities",
utilities_scaling,
))?;
let trusted_update_id = rt.block_on(submit_update_nodegroup_config_request(
&eks_client,
&cluster_name,
"trusted",
trusted_scaling,
))?;

// wait for nodegroup updates
let updates: Vec<(&str, &str)> = vec![
Expand Down Expand Up @@ -492,9 +486,8 @@ pub fn set_eks_nodegroup_size(
.unwrap()
.update
.unwrap();
let stat = describe_update.status;
match stat {
Some(s) => match s.as_str() {
if let Some(s) = describe_update.status {
match s.as_str() {
"Failed" => bail!("Nodegroup update failed"),
"Successful" => {
println!(
Expand All @@ -510,8 +503,9 @@ pub fn set_eks_nodegroup_size(
);
bail!("Waiting for valid update status")
}
},
None => bail!("Failed to describe nodegroup update"),
}
} else {
bail!("Failed to describe nodegroup update")
}
})
})
Expand Down

0 comments on commit b9dd7cb

Please sign in to comment.