Skip to content

Commit

Permalink
Dispatch: Fix race during stop (thrasher-corp#1443)
Browse files Browse the repository at this point in the history
* Dispatch: Assertify tests

* Dispatch: Fix race during stop

If we have blocking writers, then we need to synchronise them exiting
before closing off their channels.

* Dispatch: Rename Routes mutex for clarity
  • Loading branch information
gbjk authored Jan 22, 2024
1 parent 0c40f90 commit 45d65c4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 298 deletions.
57 changes: 33 additions & 24 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,31 @@ func (d *Dispatcher) stop() error {
// Release finished workers
close(d.shutdown)

d.rMtx.Lock()
ch := make(chan struct{}, 1)
go func(ch chan<- struct{}) {
d.wg.Wait()
ch <- struct{}{}
}(ch)

select {
case <-ch:
case <-time.After(time.Second):
return errDispatchShutdown
}

// Wait for all relayers to have exited, including any blocking channel writes, before closing channels
d.routesMtx.Lock()
for key, pipes := range d.routes {
for i := range pipes {
// Boot off receivers waiting on pipes.
close(pipes[i])
}
// Flush all pipes, re-subscription will need to occur.
d.routes[key] = nil
}
d.rMtx.Unlock()
d.routesMtx.Unlock()

ch := make(chan struct{})
timer := time.NewTimer(time.Second)
go func(ch chan<- struct{}) { d.wg.Wait(); ch <- struct{}{} }(ch)
select {
case <-ch:
log.Debugln(log.DispatchMgr, "Dispatch manager shutdown.")
return nil
case <-timer.C:
return errDispatchShutdown
}
log.Debugln(log.DispatchMgr, "Dispatch manager shutdown")

return nil
}

// isRunning returns if the dispatch system is running
Expand All @@ -172,19 +176,24 @@ func (d *Dispatcher) relayer() {
// every real job created has an ID set
continue
}
d.rMtx.RLock()
d.routesMtx.Lock()
pipes, ok := d.routes[j.ID]
if !ok {
log.Warnf(log.DispatchMgr, "%v: %v\n", errDispatcherUUIDNotFoundInRouteList, j.ID)
d.rMtx.RUnlock()
d.routesMtx.Unlock()
continue
}
for i := range pipes {
d.wg.Add(1)
go func(p chan any) {
p <- j.Data
defer d.wg.Done()
select {
case p <- j.Data:
case <-d.shutdown: // Avoids race on blocking consumer when we go to stop
}
}(pipes[i])
}
d.rMtx.RUnlock()
d.routesMtx.Unlock()
case <-d.shutdown:
d.wg.Done()
return
Expand Down Expand Up @@ -242,8 +251,8 @@ func (d *Dispatcher) subscribe(id uuid.UUID) (chan interface{}, error) {
return nil, ErrNotRunning
}

d.rMtx.Lock()
defer d.rMtx.Unlock()
d.routesMtx.Lock()
defer d.routesMtx.Unlock()
if _, ok := d.routes[id]; !ok {
return nil, errDispatcherUUIDNotFoundInRouteList
}
Expand Down Expand Up @@ -281,8 +290,8 @@ func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan chan interface{}) error
return nil
}

d.rMtx.Lock()
defer d.rMtx.Unlock()
d.routesMtx.Lock()
defer d.routesMtx.Unlock()
pipes, ok := d.routes[id]
if !ok {
return errDispatcherUUIDNotFoundInRouteList
Expand Down Expand Up @@ -334,8 +343,8 @@ func (d *Dispatcher) getNewID(genFn func() (uuid.UUID, error)) (uuid.UUID, error
return uuid.Nil, err
}

d.rMtx.Lock()
defer d.rMtx.Unlock()
d.routesMtx.Lock()
defer d.routesMtx.Unlock()
// Check to see if it already exists
if _, ok := d.routes[newID]; ok {
return uuid.Nil, errUUIDCollision
Expand Down
Loading

0 comments on commit 45d65c4

Please sign in to comment.