Skip to content

Commit

Permalink
dispatcher: Use int32 for atomic operations to prevent crash on ARM 3…
Browse files Browse the repository at this point in the history
…2bit systems (thrasher-corp#370)

See golang/go#599
  • Loading branch information
thrasher- authored Oct 23, 2019
1 parent ccfcdf2 commit 596be31
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
18 changes: 9 additions & 9 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func init() {
}

// Start starts the dispatch system by spawning workers and allocating memory
func Start(workers int64) error {
func Start(workers int) error {
if dispatcher == nil {
return errors.New(errNotInitialised)
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func SpawnWorker() error {

// start compares atomic running value, sets defaults, overides with
// configuration, then spawns workers
func (d *Dispatcher) start(workers int64) error {
func (d *Dispatcher) start(workers int) error {
if atomic.LoadUint32(&d.running) == 1 {
return errors.New(errAlreadyStarted)
}
Expand All @@ -89,14 +89,14 @@ func (d *Dispatcher) start(workers int64) error {
workers = DefaultMaxWorkers
}

d.maxWorkers = workers
d.maxWorkers = int32(workers)
d.shutdown = make(chan *sync.WaitGroup)

if atomic.LoadInt64(&d.count) != 0 {
if atomic.LoadInt32(&d.count) != 0 {
return errors.New("dispatcher leaked workers found")
}

for i := int64(0); i < d.maxWorkers; i++ {
for i := int32(0); i < d.maxWorkers; i++ {
err := d.spawnWorker()
if err != nil {
return err
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *Dispatcher) dropWorker() {

// spawnWorker allocates a new worker for job processing
func (d *Dispatcher) spawnWorker() error {
if atomic.LoadInt64(&d.count) >= d.maxWorkers {
if atomic.LoadInt32(&d.count) >= d.maxWorkers {
return errors.New("dispatcher cannot spawn more workers; ceiling reached")
}
var spawnWg sync.WaitGroup
Expand All @@ -174,7 +174,7 @@ func (d *Dispatcher) spawnWorker() error {

// Relayer routine relays communications across the defined routes
func (d *Dispatcher) relayer(i *sync.WaitGroup) {
atomic.AddInt64(&d.count, 1)
atomic.AddInt32(&d.count, 1)
d.wg.Add(1)
timeout := time.NewTimer(0)
i.Done()
Expand Down Expand Up @@ -219,7 +219,7 @@ func (d *Dispatcher) relayer(i *sync.WaitGroup) {
default:
}
}
atomic.AddInt64(&d.count, -1)
atomic.AddInt32(&d.count, -1)
if v != nil {
v.Done()
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error {
default:
return fmt.Errorf("dispatcher buffer at max capacity [%d] current worker count [%d], spawn more workers via --dispatchworkers=x",
len(d.jobs),
atomic.LoadInt64(&d.count))
atomic.LoadInt32(&d.count))
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions dispatch/dispatch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ type Dispatcher struct {
outbound sync.Pool

// MaxWorkers defines max worker ceiling
maxWorkers int64
maxWorkers int32

// Atomic values -----------------------
// Worker counter
count int64
count int32
// Dispatch status
running uint32

Expand Down
2 changes: 1 addition & 1 deletion engine/engine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ type Settings struct {

// Dispatch system settings
EnableDispatcher bool
DispatchMaxWorkerAmount int64
DispatchMaxWorkerAmount int
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
flag.DurationVar(&settings.EventManagerDelay, "eventmanagerdelay", time.Duration(0), "sets the event managers sleep delay between event checking")
flag.BoolVar(&settings.EnableNTPClient, "ntpclient", true, "enables the NTP client to check system clock drift")
flag.BoolVar(&settings.EnableDispatcher, "dispatch", true, "enables the dispatch system")
flag.Int64Var(&settings.DispatchMaxWorkerAmount, "dispatchworkers", dispatch.DefaultMaxWorkers, "sets the dispatch package max worker generation limit")
flag.IntVar(&settings.DispatchMaxWorkerAmount, "dispatchworkers", dispatch.DefaultMaxWorkers, "sets the dispatch package max worker generation limit")

// Forex provider settings
flag.BoolVar(&settings.EnableCurrencyConverter, "currencyconverter", false, "overrides config and sets up foreign exchange Currency Converter")
Expand Down

0 comments on commit 596be31

Please sign in to comment.