Skip to content

Commit

Permalink
test: adding testing setup (#13)
Browse files Browse the repository at this point in the history
* test: adding testing setup

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch authored Apr 15, 2024
1 parent 6491b1d commit 9b29f16
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 24 deletions.
44 changes: 29 additions & 15 deletions controllers/ensemble/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,27 +93,16 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble(
}

// Are we done? If we might have terminated by the user indicated
// not to, just reconcile for a last time
// not to, just reconcile for a last time, and show results
if decision.Action == algorithm.CompleteAction {
return ctrl.Result{}, err
r.showJobInfo(ctx, c, member, algo, decision)
return ctrl.Result{}, nil
}

// Are we terminating? Note that the next check for updated
// cannot happen at the same time as a termination request
if decision.Action == algorithm.TerminateAction {

// Ask for one more listing of jobs!
in := pb.ActionRequest{
Member: member.Type(),
Algorithm: algo.Name(),
Payload: decision.Payload,
Action: algorithm.JobInfoAction,
}
response, err := c.RequestAction(ctx, &in)
fmt.Println(response.Status)
if response.Payload != "" {
fmt.Println(response.Payload)
}
err = r.showJobInfo(ctx, c, member, algo, decision)
if err != nil {
fmt.Printf(" Error with action request %s\n", err)
return ctrl.Result{}, err
Expand Down Expand Up @@ -150,6 +139,31 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble(
return ctrl.Result{RequeueAfter: ensemble.RequeueAfter()}, nil
}

// getLeaderAddress gets the ipAddress of the lead broker
// In all cases of error we requeue
func (r *EnsembleReconciler) showJobInfo(
ctx context.Context,
c client.Client,
member *api.Member,
algo algorithm.AlgorithmInterface,
decision algorithm.AlgorithmDecision,
) error {

// Ask for one more listing of jobs!
in := pb.ActionRequest{
Member: member.Type(),
Algorithm: algo.Name(),
Payload: decision.Payload,
Action: algorithm.JobInfoAction,
}
response, err := c.RequestAction(ctx, &in)
fmt.Println(response.Status)
if response.Payload != "" {
fmt.Println(response.Payload)
}
return err
}

// getLeaderAddress gets the ipAddress of the lead broker
// In all cases of error we requeue
func (r *EnsembleReconciler) getLeaderAddress(
Expand Down
36 changes: 35 additions & 1 deletion examples/algorithms/workload/experiment/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,39 @@ that nodes hung around ~10 minutes after the queue was essentially empty, so I t
aggressive) might be better. We are going to (as an extra bonus) keep track of the time the cluster takes to go back to the smallest
size when no work is running. I didn't see this was a parameter I could update.

This experiment has been moved to the [converged-computing/ensemble-experiments](https://github.com/converged-computing/ensemble-experiments) repository.
This (larger) experiment has been moved to the [converged-computing/ensemble-experiments](https://github.com/converged-computing/ensemble-experiments) repository. This directory is used for testing components.


## 1. Create Cluster

We want to create a GKE cluster first. It won't have autoscaling enabled, etc.

```bash
GOOGLE_PROJECT=myproject
gcloud container clusters create test-cluster \
--threads-per-core=1 \
--placement-type=COMPACT \
--num-nodes=6 \
--region=us-central1-a \
--project=${GOOGLE_PROJECT} \
--machine-type=c2d-standard-8
```

Install the development operator and flux operator

```bash
make test-deploy-recreate
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
```

And apply, develop!

```bash
kubectl apply -f ensemble.yaml
```

When you are done, clean up.

```bash
gcloud container clusters delete test-cluster --region=us-central1-a
```
58 changes: 58 additions & 0 deletions examples/algorithms/workload/experiment/ensemble.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
apiVersion: ensemble.flux-framework.org/v1alpha1
kind: Ensemble
metadata:
name: ensemble
spec:
members:

# This is how you change the sidcar image, if needed. This is the one
# that I push and use for development. Pull always ensures we get latest
- sidecar:
pullAlways: true
image: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test

# Algorithm and options:
# This is the algorithm run by the operator. The options are passed to
# the running queue to further alter the outcome.
# terminateChecks says to terminate after 2 subsequent inactive status checks
algorithm:
name: workload-demand
options:
terminateChecks: 2
scaleUpChecks: 1
order: "ascending"
disableTermination: "yes"

jobs:
- name: lammps-2
command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite
count: 3
nodes: 2
tasks: 6
- name: lammps-4
command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite
count: 3
nodes: 4
tasks: 12

minicluster:
spec:
size: 1
minSize: 1
maxSize: 6

# The workers should not fail when they clean up
flux:
completeWorkers: true

# This is a list because a pod can support multiple containers
containers:
- image: ghcr.io/converged-computing/metric-lammps:latest

# You can set the working directory if your container WORKDIR is not correct.
workingDir: /opt/lammps/examples/reaxff/HNS
resources:
limits:
cpu: 3
requests:
cpu: 3
24 changes: 18 additions & 6 deletions python/ensemble_operator/members/minicluster/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,33 @@ def get_node_metrics():

def get_next_jobs():
"""
Get the next 10 jobs in the queue
Get the next 10 jobs in the queue.
"""
jobs = flux.job.job_list(handle)
listing = jobs.get()
next_jobs = []

for i, item in enumerate(listing.get("jobs", [])):
nodes = item["nnodes"]
next_jobs.append(nodes)
count = 0
for item in listing.get("jobs", []):
# We only want jobs that aren't running or inactive
state = flux.job.info.statetostr(item["state"])

# Assume these might need resources.
# If the cluster had enough nodes and they were free,
# it would be running, so we don't include RUN
if state not in ["DEPEND", "SCHED"]:
continue
next_jobs.append(item)

# Arbitrary cutoff
if i == 10:
if count == 10:
break
count += 1

return next_jobs
# Sort by submit time - the ones we submit first should
# go back to the operator first
next_jobs = sorted(next_jobs, key=lambda d: d["t_submit"])
return [j["nnodes"] for j in next_jobs]


def get_waiting_sizes():
Expand Down
4 changes: 2 additions & 2 deletions python/ensemble_operator/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def RequestAction(self, request, context):
try:
infos = member.job_info()
if infos:
print(json.dumps(infos))
response.payload = infos
print(json.dumps(infos, indent=4))
response.payload = json.dumps(infos)
except Exception as e:
print(e)
response.status = ensemble_service_pb2.Response.ResultType.ERROR
Expand Down

0 comments on commit 9b29f16

Please sign in to comment.