Skip to content

Commit

Permalink
Intro program invariant for prematurely closing RunChannel
Browse files Browse the repository at this point in the history
- Our test suite is so highly parallelized that it can start
and stop before a goroutine starts. The demultiplexStarterWg ensures
it has started before unblocking.
  • Loading branch information
dimroc committed Sep 25, 2018
1 parent 996d957 commit c507cc9
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion services/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type jobRunner struct {
workerMutex sync.RWMutex
workers map[string]chan store.RunRequest
wg sync.WaitGroup
demultiplexStarterWg sync.WaitGroup
demultiplexStopperWg sync.WaitGroup
}

Expand All @@ -52,7 +53,10 @@ func (rm *jobRunner) Start() error {
rm.done = make(chan struct{})
rm.started = true

rm.demultiplexStarterWg.Add(1)
go rm.demultiplexRuns()
rm.demultiplexStarterWg.Wait()

return rm.resumeSleepingRuns()
}

Expand Down Expand Up @@ -83,6 +87,7 @@ func (rm *jobRunner) resumeSleepingRuns() error {
}

func (rm *jobRunner) demultiplexRuns() {
rm.demultiplexStarterWg.Done()
rm.demultiplexStopperWg.Add(1)
defer rm.demultiplexStopperWg.Done()
for {
Expand All @@ -92,7 +97,7 @@ func (rm *jobRunner) demultiplexRuns() {
return
case rr, ok := <-rm.store.RunChannel.Receive():
if !ok {
logger.Error("JobRunner RunChannel closed, demultiplexing of job runs finished")
logger.Panic("RunChannel closed before JobRunner, can no longer demultiplexing job runs")
return
}
rm.channelForRun(rr.ID) <- rr
Expand Down

0 comments on commit c507cc9

Please sign in to comment.