Skip to content

Commit

Permalink
Add concurrency: for ensuring that only a single runbook using the …
Browse files Browse the repository at this point in the history
…same group will run at a time.
  • Loading branch information
k1LoW committed Mar 11, 2023
1 parent 0149aa9 commit ed2905a
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 23 deletions.
1 change: 1 addition & 0 deletions book.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type book struct {
intervalStr string
interval time.Duration
loop *Loop
concurrency string
useMap bool
t *testing.T
included bool
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/jhump/protoreflect v1.14.1
github.com/juliangruber/go-intersect v1.1.0
github.com/k1LoW/concgroup v1.0.0
github.com/k1LoW/curlreq v0.3.2
github.com/k1LoW/duration v1.2.0
github.com/k1LoW/exec v0.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/juliangruber/go-intersect v1.1.0 h1:sc+y5dCjMMx0pAdYk/N6KBm00tD/f3tq+Iox7dYDUrY=
github.com/juliangruber/go-intersect v1.1.0/go.mod h1:WMau+1kAmnlQnKiikekNJbtGtfmILU/mMU6H7AgKbWQ=
github.com/k1LoW/concgroup v1.0.0 h1:iVD/xulNDLsHQwFQ93w8A6QakO6YA2dJLJ6H4M+L2No=
github.com/k1LoW/concgroup v1.0.0/go.mod h1:cD18DUBcac9rl5mf8aK5MwITLeSoodExDG4dI7vp79k=
github.com/k1LoW/curlreq v0.3.2 h1:EpFL5X9WiMv2y/CrSeitITEbwCGeaHbquV3pPpDhtz4=
github.com/k1LoW/curlreq v0.3.2/go.mod h1:kh0wTrg3kahGoGnAREXK5gYOcTXW7W/TnL8qbHe5Ppc=
github.com/k1LoW/duration v1.2.0 h1:qq1gWtPh7YROFyerBufVP+ATR11mOOHDInrcC/Xe/6A=
Expand Down
15 changes: 10 additions & 5 deletions operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (

"github.com/fatih/color"
"github.com/goccy/go-json"
"github.com/k1LoW/concgroup"
"github.com/k1LoW/stopw"
"github.com/rs/xid"
"github.com/ryo-yamaoka/otchkiss"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -46,6 +46,7 @@ type operator struct {
profile bool
interval time.Duration
loop *Loop
concurrency string
root string
t *testing.T
thisT *testing.T
Expand Down Expand Up @@ -187,6 +188,7 @@ func New(opts ...Option) (*operator, error) {
profile: bk.profile,
interval: bk.interval,
loop: bk.loop,
concurrency: bk.concurrency,
t: bk.t,
thisT: bk.t,
failFast: bk.failFast,
Expand All @@ -207,6 +209,9 @@ func New(opts ...Option) (*operator, error) {
if o.debug {
o.capturers = append(o.capturers, NewDebugger(o.stderr))
}
if o.concurrency == "" {
o.concurrency = o.id
}

root, err := bk.generateOperatorRoot()
if err != nil {
Expand Down Expand Up @@ -1207,16 +1212,16 @@ func (ops *operators) runN(ctx context.Context) (*runNResult, error) {
}
defer ops.sw.Start().Stop()
defer ops.Close()
eg, cctx := errgroup.WithContext(ctx)
eg.SetLimit(int(ops.concmax))
cg, cctx := concgroup.WithContext(ctx)
cg.SetLimit(int(ops.concmax))
selected, err := ops.SelectedOperators()
if err != nil {
return result, err
}
result.Total.Add(int64(len(selected)))
for _, o := range selected {
o := o
eg.Go(func() error {
cg.Go(o.concurrency, func() error {
select {
case <-cctx.Done():
return errors.New("context canceled")
Expand All @@ -1243,7 +1248,7 @@ func (ops *operators) runN(ctx context.Context) (*runNResult, error) {
return nil
})
}
if err := eg.Wait(); err != nil {
if err := cg.Wait(); err != nil {
return result, err
}
return result, nil
Expand Down
1 change: 1 addition & 0 deletions operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func TestShard(t *testing.T) {
cmpopts.IgnoreUnexported(ignore...),
cmpopts.IgnoreFields(stopw.Span{}, "ID"),
cmpopts.IgnoreFields(operator{}, "id"),
cmpopts.IgnoreFields(operator{}, "concurrency"),
cmpopts.IgnoreFields(cdpRunner{}, "ctx"),
cmpopts.IgnoreFields(cdpRunner{}, "cancel"),
cmpopts.IgnoreFields(sshRunner{}, "client"),
Expand Down
1 change: 1 addition & 0 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func Overlay(path string) Option {
bk.debug = loaded.debug
bk.skipTest = loaded.skipTest
bk.loop = loaded.loop
bk.concurrency = loaded.concurrency
bk.grpcNoTLS = loaded.grpcNoTLS
bk.interval = loaded.interval
return nil
Expand Down
39 changes: 21 additions & 18 deletions runbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,32 @@ import (
)

type runbook struct {
Desc string `yaml:"desc"`
Runners map[string]interface{} `yaml:"runners,omitempty"`
Vars map[string]interface{} `yaml:"vars,omitempty"`
Steps []yaml.MapSlice `yaml:"steps"`
Debug bool `yaml:"debug,omitempty"`
Interval string `yaml:"interval,omitempty"`
If string `yaml:"if,omitempty"`
SkipTest bool `yaml:"skipTest,omitempty"`
Loop interface{} `yaml:"loop,omitempty"`
Desc string `yaml:"desc"`
Runners map[string]interface{} `yaml:"runners,omitempty"`
Vars map[string]interface{} `yaml:"vars,omitempty"`
Steps []yaml.MapSlice `yaml:"steps"`
Debug bool `yaml:"debug,omitempty"`
Interval string `yaml:"interval,omitempty"`
If string `yaml:"if,omitempty"`
SkipTest bool `yaml:"skipTest,omitempty"`
Loop interface{} `yaml:"loop,omitempty"`
Concurrency string `yaml:"concurrency,omitempty"`

useMap bool
stepKeys []string
}

type runbookMapped struct {
Desc string `yaml:"desc,omitempty"`
Runners map[string]interface{} `yaml:"runners,omitempty"`
Vars map[string]interface{} `yaml:"vars,omitempty"`
Steps yaml.MapSlice `yaml:"steps,omitempty"`
Debug bool `yaml:"debug,omitempty"`
Interval string `yaml:"interval,omitempty"`
If string `yaml:"if,omitempty"`
SkipTest bool `yaml:"skipTest,omitempty"`
Loop interface{} `yaml:"loop,omitempty"`
Desc string `yaml:"desc,omitempty"`
Runners map[string]interface{} `yaml:"runners,omitempty"`
Vars map[string]interface{} `yaml:"vars,omitempty"`
Steps yaml.MapSlice `yaml:"steps,omitempty"`
Debug bool `yaml:"debug,omitempty"`
Interval string `yaml:"interval,omitempty"`
If string `yaml:"if,omitempty"`
SkipTest bool `yaml:"skipTest,omitempty"`
Loop interface{} `yaml:"loop,omitempty"`
Concurrency string `yaml:"concurrency,omitempty"`
}

func NewRunbook(desc string) *runbook {
Expand Down Expand Up @@ -347,6 +349,7 @@ func (rb *runbook) toBook() (*book, error) {
return nil, err
}
}
bk.concurrency = rb.Concurrency
bk.useMap = rb.useMap
bk.stepKeys = rb.stepKeys

Expand Down

0 comments on commit ed2905a

Please sign in to comment.