Skip to content

Commit

Permalink
improve the list run query (kubeflow#687)
Browse files Browse the repository at this point in the history
* optimize list run query

* update list job
  • Loading branch information
IronPan authored and k8s-ci-robot committed Jan 15, 2019
1 parent ce91c08 commit c2834c3
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
35 changes: 20 additions & 15 deletions backend/src/apiserver/storage/job_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func (s *JobStore) ListJobs(
return nil, "", util.NewInternalServerError(err, "Failed to list jobs: %v", err)
}

sqlBuilder := s.selectJob()
// Add filter condition
sqlBuilder, err := s.toFilteredQuery(sqlBuilder, filterContext)
filteredSelectBuilder, err := s.toFilteredQuery(filterContext)
if err != nil {
return errorF(err)
}

sqlBuilder := s.selectJob(filteredSelectBuilder)
if err != nil {
return errorF(err)
}
Expand Down Expand Up @@ -77,25 +81,26 @@ func (s *JobStore) ListJobs(
return jobs[:opts.PageSize], npt, err
}

func (s *JobStore) toFilteredQuery(selectBuilder sq.SelectBuilder, filterContext *common.FilterContext) (sq.SelectBuilder, error) {
sql, args, err := selectBuilder.ToSql()
if err != nil {
return selectBuilder, util.NewInternalServerError(err, "Failed to append filter condition to list job: %v",
err.Error())
}
func (s *JobStore) toFilteredQuery(filterContext *common.FilterContext) (sq.SelectBuilder, error) {
selectBuilder := sq.Select("*").From("jobs")
if filterContext.ReferenceKey != nil {
selectBuilder = sq.Select("list_job.*").
From("resource_references AS rf").
Join(fmt.Sprintf("(%s) as list_job on list_job.UUID=rf.ResourceUUID", sql), args...).
resourceReferenceFilter, args, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
sq.Eq{"rf.ResourceType": common.Job},
sq.Eq{"rf.ReferenceUUID": filterContext.ID},
sq.Eq{"rf.ReferenceType": filterContext.Type}})
sq.Eq{"rf.ReferenceType": filterContext.Type}}).ToSql()
if err != nil {
return selectBuilder, util.NewInternalServerError(
err, "Failed to create subquery to filter by resource reference: %v", err.Error())
}
return selectBuilder.Where(fmt.Sprintf("UUID in (%s)", resourceReferenceFilter), args...), nil
}
return selectBuilder, nil
}

func (s *JobStore) GetJob(id string) (*model.Job, error) {
sql, args, err := s.selectJob().
sql, args, err := s.selectJob(sq.Select("*").From("jobs")).
Where(sq.Eq{"uuid": id}).
Limit(1).
ToSql()
Expand All @@ -119,11 +124,11 @@ func (s *JobStore) GetJob(id string) (*model.Job, error) {
return jobs[0], nil
}

func (s *JobStore) selectJob() sq.SelectBuilder {
func (s *JobStore) selectJob(filteredSelectBuilder sq.SelectBuilder) sq.SelectBuilder {
resourceRefConcatQuery := s.db.Concat([]string{`"["`, s.db.GroupConcat("r.Payload", ","), `"]"`}, "")
return sq.
Select("jobs.*", resourceRefConcatQuery+" AS refs").
From("jobs").
FromSelect(filteredSelectBuilder, "jobs").
// Append all the resource references for the run as a json column
LeftJoin("(select * from resource_references where ResourceType='Job') AS r ON jobs.UUID=r.ResourceUUID").
GroupBy("jobs.UUID")
Expand Down
35 changes: 20 additions & 15 deletions backend/src/apiserver/storage/run_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ func (s *RunStore) ListRuns(
return nil, "", util.NewInternalServerError(err, "Failed to list runs: %v", err)
}

sqlBuilder := s.selectRunDetails()
// Add filter condition
sqlBuilder, err := s.toFilteredQuery(sqlBuilder, filterContext)
filteredSelectBuilder, err := s.toFilteredQuery(filterContext)
if err != nil {
return errorF(err)
}

sqlBuilder := s.selectRunDetails(filteredSelectBuilder)
if err != nil {
return errorF(err)
}
Expand Down Expand Up @@ -104,26 +108,27 @@ func (s *RunStore) ListRuns(
return runs[:opts.PageSize], npt, err
}

func (s *RunStore) toFilteredQuery(selectBuilder sq.SelectBuilder, filterContext *common.FilterContext) (sq.SelectBuilder, error) {
sql, args, err := selectBuilder.ToSql()
if err != nil {
return selectBuilder, util.NewInternalServerError(err, "Failed to append filter condition to list run: %v",
err.Error())
}
func (s *RunStore) toFilteredQuery(filterContext *common.FilterContext) (sq.SelectBuilder, error) {
selectBuilder := sq.Select("*").From("run_details")
if filterContext.ReferenceKey != nil {
selectBuilder = sq.Select("list_run.*").
From("resource_references AS rf").
Join(fmt.Sprintf("(%s) as list_run on list_run.UUID=rf.ResourceUUID", sql), args...).
resourceReferenceFilter, args, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
sq.Eq{"rf.ResourceType": common.Run},
sq.Eq{"rf.ReferenceUUID": filterContext.ID},
sq.Eq{"rf.ReferenceType": filterContext.Type}})
sq.Eq{"rf.ReferenceType": filterContext.Type}}).ToSql()
if err != nil {
return selectBuilder, util.NewInternalServerError(
err, "Failed to create subquery to filter by resource reference: %v", err.Error())
}
return selectBuilder.Where(fmt.Sprintf("UUID in (%s)", resourceReferenceFilter), args...), nil
}
return selectBuilder, nil
}

// GetRun Get the run manifest from Workflow CRD
func (s *RunStore) GetRun(runId string) (*model.RunDetail, error) {
sql, args, err := s.selectRunDetails().
sql, args, err := s.selectRunDetails(sq.Select("*").From("run_details")).
Where(sq.Eq{"UUID": runId}).
Limit(1).
ToSql()
Expand Down Expand Up @@ -151,11 +156,11 @@ func (s *RunStore) GetRun(runId string) (*model.RunDetail, error) {
return runs[0], nil
}

func (s *RunStore) selectRunDetails() sq.SelectBuilder {
func (s *RunStore) selectRunDetails(filteredSelectBuilder sq.SelectBuilder) sq.SelectBuilder {
metricConcatQuery := s.db.Concat([]string{`"["`, s.db.GroupConcat("m.Payload", ","), `"]"`}, "")
subQ := sq.
Select("rd.*", metricConcatQuery+" AS metrics").
From("run_details AS rd").
FromSelect(filteredSelectBuilder, "rd").
LeftJoin("run_metrics AS m ON rd.UUID=m.RunUUID").
GroupBy("rd.UUID")

Expand Down

0 comments on commit c2834c3

Please sign in to comment.