Skip to content

Commit

Permalink
dispatcher: prevent job limit errors with no subscribers (thrasher-co…
Browse files Browse the repository at this point in the history
…rp#1196)

* prevent dispatch error limit

* fixes typo, update benchmark

* big simplification on concept

* fix tests

* can't stop finding leftovers
  • Loading branch information
gloriousCode authored Jul 6, 2023
1 parent 81a8b4a commit 2a6581b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 13 deletions.
26 changes: 19 additions & 7 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -166,14 +167,23 @@ func (d *Dispatcher) relayer() {
for {
select {
case j := <-d.jobs:
if j.ID.IsNil() {
// empty jobs from `channelCapacity` length are sent upon shutdown
// every real job created has an ID set
continue
}
d.rMtx.RLock()
if pipes, ok := d.routes[j.ID]; ok {
for i := range pipes {
select {
case pipes[i] <- j.Data:
default:
// no receiver; don't wait. This limits complexity.
}
pipes, ok := d.routes[j.ID]
if !ok {
log.Warnf(log.DispatchMgr, "%v: %v\n", errDispatcherUUIDNotFoundInRouteList, j.ID)
d.rMtx.RUnlock()
continue
}
for i := range pipes {
select {
case pipes[i] <- j.Data:
default:
// no receiver; don't wait. This limits complexity.
}
}
d.rMtx.RUnlock()
Expand Down Expand Up @@ -247,6 +257,7 @@ func (d *Dispatcher) subscribe(id uuid.UUID) (chan interface{}, error) {
}

d.routes[id] = append(d.routes[id], ch)
atomic.AddInt32(&d.subscriberCount, 1)
return ch, nil
}

Expand Down Expand Up @@ -287,6 +298,7 @@ func (d *Dispatcher) unsubscribe(id uuid.UUID, usedChan chan interface{}) error
pipes[i] = pipes[len(pipes)-1]
pipes[len(pipes)-1] = nil
d.routes[id] = pipes[:len(pipes)-1]
atomic.AddInt32(&d.subscriberCount, -1)

// Drain and put the used chan back in pool; only if it is not closed.
select {
Expand Down
56 changes: 50 additions & 6 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestPublish(t *testing.T) {

d = NewDispatcher()

err = d.publish(nonEmptyUUID, "lol")
err = d.publish(nonEmptyUUID, "test")
if !errors.Is(err, nil) { // If not running, don't send back an error.
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}
Expand All @@ -266,15 +266,17 @@ func TestPublish(t *testing.T) {
t.Fatalf("received: '%v' but expected: '%v'", err, errNoData)
}

// max out worker processing
for x := 0; x < 100; x++ {
err2 := d.publish(nonEmptyUUID, "lol")
// demonstrate job limit error
d.routes[nonEmptyUUID] = []chan interface{}{
make(chan interface{}),
}
for x := 0; x < 200; x++ {
err2 := d.publish(nonEmptyUUID, "test")
if !errors.Is(err2, nil) {
err = err2
break
}
}

if !errors.Is(err, errDispatcherJobsAtLimit) {
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit)
}
Expand Down Expand Up @@ -492,6 +494,19 @@ func TestMuxPublish(t *testing.T) {
t.Fatal(err)
}

// demonstrate that jobs do not get published when the limit should be reached
// but there is no listener associated with job
for x := 0; x < 200; x++ {
err2 := mux.Publish("test", itemID)
if !errors.Is(err2, nil) {
err = err2
break
}
}
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}

pipe, err := mux.Subscribe(itemID)
if err != nil {
t.Error(err)
Expand All @@ -508,14 +523,43 @@ func TestMuxPublish(t *testing.T) {

<-pipe.Channel()

// demonstrate that jobs can be limited when subscribed
for x := 0; x < 200; x++ {
err2 := mux.Publish("test", itemID)
if !errors.Is(err2, nil) {
err = err2
break
}
}
if !errors.Is(err, errDispatcherJobsAtLimit) {
t.Fatalf("received: '%v' but expected: '%v'", err, errDispatcherJobsAtLimit)
}

// demonstrate that jobs go back to not being sent after unsubscribing
err = mux.Unsubscribe(itemID, pipe.C)
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}

for x := 0; x < 200; x++ {
err2 := mux.Publish("test", itemID)
if !errors.Is(err2, nil) {
err = err2
break
}
}
if !errors.Is(err, nil) {
t.Fatalf("received: '%v' but expected: '%v'", err, nil)
}

// Shut down dispatch system
err = d.stop()
if err != nil {
t.Fatal(err)
}
}

// 2363419 468.7 ns/op 142 B/op 1 allocs/op
// 13636467 84.26 ns/op 141 B/op 1 allocs/op
func BenchmarkSubscribe(b *testing.B) {
d := NewDispatcher()
err := d.start(0, 0)
Expand Down
3 changes: 3 additions & 0 deletions dispatch/dispatch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Dispatcher struct {

// dispatcher write protection
m sync.RWMutex
// subscriberCount atomically stores the amount of subscription endpoints
// to verify whether to send out any jobs
subscriberCount int32
}

// job defines a relaying job associated with a ticket which allows routing to
Expand Down
4 changes: 4 additions & 0 deletions dispatch/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dispatch

import (
"errors"
"sync/atomic"

"github.com/gofrs/uuid"
)
Expand Down Expand Up @@ -63,6 +64,9 @@ func (m *Mux) Publish(data interface{}, ids ...uuid.UUID) error {
if len(ids) == 0 {
return errNoIDs
}
if atomic.LoadInt32(&m.d.subscriberCount) == 0 {
return nil
}

for i := range ids {
err := m.d.publish(ids[i], data)
Expand Down

0 comments on commit 2a6581b

Please sign in to comment.