From ed2905af12f353a89c3f45ea2ebbaf813fcf7bcb Mon Sep 17 00:00:00 2001 From: k1LoW Date: Sat, 11 Mar 2023 11:41:51 +0900 Subject: [PATCH] Add `concurrency:` for ensuring that only a single runbook using the same group will run at a time. --- book.go | 1 + go.mod | 1 + go.sum | 2 ++ operator.go | 15 ++++++++++----- operator_test.go | 1 + option.go | 1 + runbook.go | 39 +++++++++++++++++++++------------------ 7 files changed, 37 insertions(+), 23 deletions(-) diff --git a/book.go b/book.go index 1a1a9f10..88dc9a02 100644 --- a/book.go +++ b/book.go @@ -38,6 +38,7 @@ type book struct { intervalStr string interval time.Duration loop *Loop + concurrency string useMap bool t *testing.T included bool diff --git a/go.mod b/go.mod index 499c7272..a1ac7fb8 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/jhump/protoreflect v1.14.1 github.com/juliangruber/go-intersect v1.1.0 + github.com/k1LoW/concgroup v1.0.0 github.com/k1LoW/curlreq v0.3.2 github.com/k1LoW/duration v1.2.0 github.com/k1LoW/exec v0.2.0 diff --git a/go.sum b/go.sum index 231ee614..104aac77 100644 --- a/go.sum +++ b/go.sum @@ -237,6 +237,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/juliangruber/go-intersect v1.1.0 h1:sc+y5dCjMMx0pAdYk/N6KBm00tD/f3tq+Iox7dYDUrY= github.com/juliangruber/go-intersect v1.1.0/go.mod h1:WMau+1kAmnlQnKiikekNJbtGtfmILU/mMU6H7AgKbWQ= +github.com/k1LoW/concgroup v1.0.0 h1:iVD/xulNDLsHQwFQ93w8A6QakO6YA2dJLJ6H4M+L2No= +github.com/k1LoW/concgroup v1.0.0/go.mod h1:cD18DUBcac9rl5mf8aK5MwITLeSoodExDG4dI7vp79k= github.com/k1LoW/curlreq v0.3.2 h1:EpFL5X9WiMv2y/CrSeitITEbwCGeaHbquV3pPpDhtz4= github.com/k1LoW/curlreq v0.3.2/go.mod h1:kh0wTrg3kahGoGnAREXK5gYOcTXW7W/TnL8qbHe5Ppc= github.com/k1LoW/duration v1.2.0 h1:qq1gWtPh7YROFyerBufVP+ATR11mOOHDInrcC/Xe/6A= diff --git a/operator.go b/operator.go index e687668a..1da45795 100644 --- a/operator.go +++ b/operator.go @@ -17,11 +17,11 @@ import ( "github.com/fatih/color" "github.com/goccy/go-json" + "github.com/k1LoW/concgroup" "github.com/k1LoW/stopw" "github.com/rs/xid" "github.com/ryo-yamaoka/otchkiss" "go.uber.org/multierr" - "golang.org/x/sync/errgroup" ) var ( @@ -46,6 +46,7 @@ type operator struct { profile bool interval time.Duration loop *Loop + concurrency string root string t *testing.T thisT *testing.T @@ -187,6 +188,7 @@ func New(opts ...Option) (*operator, error) { profile: bk.profile, interval: bk.interval, loop: bk.loop, + concurrency: bk.concurrency, t: bk.t, thisT: bk.t, failFast: bk.failFast, @@ -207,6 +209,9 @@ func New(opts ...Option) (*operator, error) { if o.debug { o.capturers = append(o.capturers, NewDebugger(o.stderr)) } + if o.concurrency == "" { + o.concurrency = o.id + } root, err := bk.generateOperatorRoot() if err != nil { @@ -1207,8 +1212,8 @@ func (ops *operators) runN(ctx context.Context) (*runNResult, error) { } defer ops.sw.Start().Stop() defer ops.Close() - eg, cctx := errgroup.WithContext(ctx) - eg.SetLimit(int(ops.concmax)) + cg, cctx := concgroup.WithContext(ctx) + cg.SetLimit(int(ops.concmax)) selected, err := ops.SelectedOperators() if err != nil { return result, err @@ -1216,7 +1221,7 @@ func (ops *operators) runN(ctx context.Context) (*runNResult, error) { result.Total.Add(int64(len(selected))) for _, o := range selected { o := o - eg.Go(func() error { + cg.Go(o.concurrency, func() error { select { case <-cctx.Done(): return errors.New("context canceled") @@ -1243,7 +1248,7 @@ func (ops *operators) runN(ctx context.Context) (*runNResult, error) { return nil }) } - if err := eg.Wait(); err != nil { + if err := cg.Wait(); err != nil { return result, err } return result, nil diff --git a/operator_test.go b/operator_test.go index a9033e6d..4ca4d250 100644 --- a/operator_test.go +++ b/operator_test.go @@ -526,6 +526,7 @@ func TestShard(t *testing.T) { cmpopts.IgnoreUnexported(ignore...), cmpopts.IgnoreFields(stopw.Span{}, "ID"), cmpopts.IgnoreFields(operator{}, "id"), + cmpopts.IgnoreFields(operator{}, "concurrency"), cmpopts.IgnoreFields(cdpRunner{}, "ctx"), cmpopts.IgnoreFields(cdpRunner{}, "cancel"), cmpopts.IgnoreFields(sshRunner{}, "client"), diff --git a/option.go b/option.go index 087fff5b..c263b96a 100644 --- a/option.go +++ b/option.go @@ -82,6 +82,7 @@ func Overlay(path string) Option { bk.debug = loaded.debug bk.skipTest = loaded.skipTest bk.loop = loaded.loop + bk.concurrency = loaded.concurrency bk.grpcNoTLS = loaded.grpcNoTLS bk.interval = loaded.interval return nil diff --git a/runbook.go b/runbook.go index a7bfb30a..12f42491 100644 --- a/runbook.go +++ b/runbook.go @@ -16,30 +16,32 @@ import ( ) type runbook struct { - Desc string `yaml:"desc"` - Runners map[string]interface{} `yaml:"runners,omitempty"` - Vars map[string]interface{} `yaml:"vars,omitempty"` - Steps []yaml.MapSlice `yaml:"steps"` - Debug bool `yaml:"debug,omitempty"` - Interval string `yaml:"interval,omitempty"` - If string `yaml:"if,omitempty"` - SkipTest bool `yaml:"skipTest,omitempty"` - Loop interface{} `yaml:"loop,omitempty"` + Desc string `yaml:"desc"` + Runners map[string]interface{} `yaml:"runners,omitempty"` + Vars map[string]interface{} `yaml:"vars,omitempty"` + Steps []yaml.MapSlice `yaml:"steps"` + Debug bool `yaml:"debug,omitempty"` + Interval string `yaml:"interval,omitempty"` + If string `yaml:"if,omitempty"` + SkipTest bool `yaml:"skipTest,omitempty"` + Loop interface{} `yaml:"loop,omitempty"` + Concurrency string `yaml:"concurrency,omitempty"` useMap bool stepKeys []string } type runbookMapped struct { - Desc string `yaml:"desc,omitempty"` - Runners map[string]interface{} `yaml:"runners,omitempty"` - Vars map[string]interface{} `yaml:"vars,omitempty"` - Steps yaml.MapSlice `yaml:"steps,omitempty"` - Debug bool `yaml:"debug,omitempty"` - Interval string `yaml:"interval,omitempty"` - If string `yaml:"if,omitempty"` - SkipTest bool `yaml:"skipTest,omitempty"` - Loop interface{} `yaml:"loop,omitempty"` + Desc string `yaml:"desc,omitempty"` + Runners map[string]interface{} `yaml:"runners,omitempty"` + Vars map[string]interface{} `yaml:"vars,omitempty"` + Steps yaml.MapSlice `yaml:"steps,omitempty"` + Debug bool `yaml:"debug,omitempty"` + Interval string `yaml:"interval,omitempty"` + If string `yaml:"if,omitempty"` + SkipTest bool `yaml:"skipTest,omitempty"` + Loop interface{} `yaml:"loop,omitempty"` + Concurrency string `yaml:"concurrency,omitempty"` } func NewRunbook(desc string) *runbook { @@ -347,6 +349,7 @@ func (rb *runbook) toBook() (*book, error) { return nil, err } } + bk.concurrency = rb.Concurrency bk.useMap = rb.useMap bk.stepKeys = rb.stepKeys