Skip to content

Commit

Permalink
add mutex to studyjob controller (kubeflow#170)
Browse files Browse the repository at this point in the history
* add mutex to studyjob controller

Signed-off-by: YujiOshima <[email protected]>

* use sync.Map

Signed-off-by: YujiOshima <[email protected]>

* update only when the instance was changed

Signed-off-by: YujiOshima <[email protected]>
  • Loading branch information
YujiOshima authored and k8s-ci-robot committed Sep 17, 2018
1 parent 4085701 commit f4887a6
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions pkg/controller/studyjobcontroller/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"log"
"strconv"
"sync"
"text/template"

"github.com/kubeflow/katib/pkg"
Expand Down Expand Up @@ -60,7 +61,7 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileStudyJobController{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileStudyJobController{Client: mgr.GetClient(), scheme: mgr.GetScheme(), muxMap: sync.Map{}}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -114,6 +115,7 @@ var _ reconcile.Reconciler = &ReconcileStudyJobController{}
type ReconcileStudyJobController struct {
client.Client
scheme *runtime.Scheme
muxMap sync.Map
}

// Reconcile reads that state of the cluster for a StudyJob object and makes changes based on the state read
Expand All @@ -125,42 +127,56 @@ type ReconcileStudyJobController struct {
func (r *ReconcileStudyJobController) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the StudyJob instance
instance := &katibv1alpha1.StudyJob{}
mux := new(sync.Mutex)
if m, loaded := r.muxMap.LoadOrStore(request.NamespacedName.String(), mux); loaded {
mux, _ = m.(*sync.Mutex)
}
mux.Lock()
err := r.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
if _, ok := r.muxMap.Load(request.NamespacedName.String()); ok {
log.Println("%s was deleted. Resouces will be released.", request.NamespacedName.String())
mux.Unlock()
r.muxMap.Delete(request.NamespacedName.String())
}
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
log.Println("No instance")
return reconcile.Result{}, nil
}
log.Printf("Fail to read Object %v", err)
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}
defer mux.Unlock()
var update bool = false
switch instance.Status.Condition {
case katibv1alpha1.ConditionCompleted:
err = r.checkStatus(instance, request.Namespace)
err, update = r.checkStatus(instance, request.Namespace)
case katibv1alpha1.ConditionFailed:
err = r.checkStatus(instance, request.Namespace)
err, update = r.checkStatus(instance, request.Namespace)
case katibv1alpha1.ConditionRunning:
err = r.checkStatus(instance, request.Namespace)
err, update = r.checkStatus(instance, request.Namespace)
default:
err = r.initializeStudy(instance, request.Namespace)
if err != nil {
r.Update(context.TODO(), instance)
log.Printf("Fail to initialize %v", err)
return reconcile.Result{}, err
}
update = true
}
if err != nil {
r.Update(context.TODO(), instance)
log.Printf("Fail to check status %v", err)
return reconcile.Result{}, err
}
err = r.Update(context.TODO(), instance)
if err != nil {
log.Printf("Fail to Update StudyJob %v : %v", instance.Status.StudyId, err)
return reconcile.Result{}, err
if update {
err = r.Update(context.TODO(), instance)
if err != nil {
log.Printf("Fail to Update StudyJob %v : %v", instance.Status.StudyId, err)
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}
Expand All @@ -175,7 +191,6 @@ func (r *ReconcileStudyJobController) getStudyConf(instance *katibv1alpha1.Study
sconf.Name = instance.Spec.StudyName
sconf.Owner = instance.Spec.Owner
if instance.Spec.OptimizationGoal != nil {

sconf.OptimizationGoal = *instance.Spec.OptimizationGoal
}
sconf.ObjectiveValueName = instance.Spec.ObjectiveValueName
Expand Down Expand Up @@ -351,17 +366,18 @@ func (r *ReconcileStudyJobController) initializeStudy(instance *katibv1alpha1.St
return nil
}

func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJob, ns string) error {
func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJob, ns string) (error, bool) {
nextSuggestionSchedule := true
var cwids []string
var update bool = false
if instance.Status.Condition == katibv1alpha1.ConditionCompleted || instance.Status.Condition == katibv1alpha1.ConditionFailed {
nextSuggestionSchedule = false
}
conn, err := grpc.Dial(pkg.ManagerAddr, grpc.WithInsecure())
if err != nil {
log.Printf("Connect katib manager error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return nil
return nil, true
}
defer conn.Close()
c := katibapi.NewManagerClient(conn)
Expand Down Expand Up @@ -391,30 +407,34 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
if ctime.Before(cjob.Status.LastScheduleTime) && len(cjob.Status.Active) == 0 {
r.saveModel(c, instance.Status.StudyId, instance.Status.Trials[i].TrialId, instance.Status.Trials[i].WorkerList[j].WorkerId)
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted
update = true
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerId,
Status: katibapi.State_COMPLETED,
})
if err != nil {
return err
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerId)
return err, false
}
susp := true
cjob.Spec.Suspend = &susp
if err := r.Update(context.TODO(), cjob); err != nil {
return err
return err, false
}
cwids = append(cwids, w.WorkerId)
}
}
}
} else if job.Status.Active > 0 {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionRunning
update = true
if errors.IsNotFound(cjoberr) {
r.spawnMetricsCollector(instance, c, instance.Status.StudyId, t.TrialId, w.WorkerId, ns, instance.Spec.MetricsCollectorSpec)
}
} else if job.Status.Failed > 0 {
update = true
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionFailed
}
}
Expand All @@ -425,6 +445,7 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
if goal {
log.Printf("Study %s reached to the goal. It is completed", instance.Status.StudyId)
instance.Status.Condition = katibv1alpha1.ConditionCompleted
update = true
nextSuggestionSchedule = false
}
if err != nil {
Expand All @@ -434,21 +455,22 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
if nextSuggestionSchedule {
return r.getAndRunSuggestion(instance, c, ns)
} else {
return nil
return nil, update
}
}

func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, ns string) error {
func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, ns string) (error, bool) {
//Check Suggestion Count
sps, err := r.getSuggestionParam(c, instance.Status.SuggestionParameterId)
if err != nil {
return err
return err, false
}
for i := range sps {
if sps[i].Name == "SuggestionCount" {
count, _ := strconv.Atoi(sps[i].Value)
if count >= instance.Status.SuggestionCount+1 {
return fmt.Errorf("Suggestion count mismatched. May be duplicate suggestion request")
//Suggestion count mismatched. May be duplicate suggestion request
return nil, false
}
sps[i].Value = strconv.Itoa(instance.Status.SuggestionCount + 1)
}
Expand All @@ -461,27 +483,27 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
instance.Status.SuggestionParameterId)
if err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
return err
return err, true
}
trials := getSuggestReply.Trials
if len(trials) <= 0 {
log.Printf("Study %s is completed", instance.Status.StudyId)
instance.Status.Condition = katibv1alpha1.ConditionCompleted
return nil
return nil, true
}
log.Printf("Study: %s Suggestions %v", instance.Status.StudyId, getSuggestReply)
wkind, err := r.getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return err
return err, true
}
for _, t := range trials {
wid, err := r.spawnWorker(instance, c, instance.Status.StudyId, t, instance.Spec.WorkerSpec, wkind, false)
if err != nil {
log.Printf("Spawn worker error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return err
return err, true
}
instance.Status.Trials = append(
instance.Status.Trials,
Expand All @@ -507,10 +529,10 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
_, err = c.SetSuggestionParameters(context.Background(), sspr)
if err != nil {
log.Printf("Study %s Suggestion Count update Error %v", instance.Status.StudyId, err)
return err
return err, false
}
instance.Status.SuggestionCount += 1
return nil
return nil, true
}

func (r *ReconcileStudyJobController) createStudy(c katibapi.ManagerClient, studyConfig *katibapi.StudyConfig) (string, error) {
Expand Down

0 comments on commit f4887a6

Please sign in to comment.