Skip to content

Commit

Permalink
(Engine) Bugfix: Unlocking an unlocked mutex PANIC + Increase dispatc…
Browse files Browse the repository at this point in the history
…her job capacity via commandline (thrasher-corp#371)

* Removes lock unlock timer and instead sets unlocks between getting a nonce and sending a payload. Increases dispatch channel buffer to deal with len(enabledCurrencies) > ~100

* Adds additional comments to help explain the situation

* Fixes bug that could unlock mutex too early

* Fixes LIES where Gemini gets a nonce and then proceeds to declare it doesn't get a nonce causing an unrecoverable lock

* Fun new concept! The creation of a tested timed mutex. Unlocking an unlocked mutex cannot occur and response can be checked to verify whether the mutex was unlocked from timeout or command.

* Adds new cmd parameter "dispatchjobbuffer"

* Expands comments and renames benchmark. Makes `Timer` property private

* Happy little linters

* Renames jobBuffer and all related instances to jobs limit

* Tiny error message update

* Grammatical fix and setting dispatch.Start to use defaults
  • Loading branch information
gloriousCode authored and thrasher- committed Oct 29, 2019
1 parent 1805c40 commit 242b02c
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 139 deletions.
77 changes: 77 additions & 0 deletions common/timedmutex/timed_mutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package timedmutex

import (
"sync"
"time"
)

// NewTimedMutex creates a new timed mutex with a
// specified duration
func NewTimedMutex(length time.Duration) *TimedMutex {
return &TimedMutex{
duration: length,
}
}

// LockForDuration will start a timer, lock the mutex,
// then allow the caller to continue
// After the duration, the mutex will be unlocked
func (t *TimedMutex) LockForDuration() {
var wg sync.WaitGroup
wg.Add(1)
go t.lockAndSetTimer(&wg)
wg.Wait()
}

func (t *TimedMutex) lockAndSetTimer(wg *sync.WaitGroup) {
t.mtx.Lock()
t.setTimer()
wg.Done()
}

// UnlockIfLocked will unlock the mutex if its currently locked
// Will return true if successfully unlocked
func (t *TimedMutex) UnlockIfLocked() bool {
if t.isTimerNil() {
return false
}

if !t.stopTimer() {
return false
}
t.mtx.Unlock()
return true
}

// stopTimer will return true if timer has been stopped by this command
// If the timer has expired, clear the channel
func (t *TimedMutex) stopTimer() bool {
t.timerLock.Lock()
defer t.timerLock.Unlock()
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
}
return false
}
return true
}

// isTimerNil safely read locks to detect nil
func (t *TimedMutex) isTimerNil() bool {
t.timerLock.RLock()
defer t.timerLock.RUnlock()
return t.timer == nil
}

// setTimer safely locks and sets a timer
// which will automatically execute a mutex unlock
// once timer expires
func (t *TimedMutex) setTimer() {
t.timerLock.Lock()
t.timer = time.AfterFunc(t.duration, func() {
t.mtx.Unlock()
})
t.timerLock.Unlock()
}
86 changes: 86 additions & 0 deletions common/timedmutex/timed_mutex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package timedmutex

import (
"testing"
"time"
)

func BenchmarkTimedMutexTime(b *testing.B) {
tm := NewTimedMutex(20 * time.Millisecond)
for i := 0; i < b.N; i++ {
tm.LockForDuration()
}
}

func TestConsistencyOfPanicFreeUnlock(t *testing.T) {
t.Parallel()
duration := 20 * time.Millisecond
tm := NewTimedMutex(duration)
for i := 1; i <= 50; i++ {
testUnlockTime := time.Duration(i) * time.Millisecond
tm.LockForDuration()
time.Sleep(testUnlockTime)
tm.UnlockIfLocked()
}
}

func TestUnlockAfterTimeout(t *testing.T) {
t.Parallel()
tm := NewTimedMutex(time.Second)
tm.LockForDuration()
time.Sleep(2 * time.Second)
wasUnlocked := tm.UnlockIfLocked()
if wasUnlocked {
t.Error("Mutex should have been unlocked by timeout, not command")
}
}

func TestUnlockBeforeTimeout(t *testing.T) {
t.Parallel()
tm := NewTimedMutex(2 * time.Second)
tm.LockForDuration()
time.Sleep(time.Second)
wasUnlocked := tm.UnlockIfLocked()
if !wasUnlocked {
t.Error("Mutex should have been unlocked by command, not timeout")
}
}

// TestUnlockAtSameTimeAsTimeout this test ensures
// that even if the timeout and the command occur at
// the same time, no panics occur. The result of the
// 'who' unlocking this doesn't matter, so long as
// the unlock occurs without this test panicking
func TestUnlockAtSameTimeAsTimeout(t *testing.T) {
t.Parallel()
duration := time.Second
tm := NewTimedMutex(duration)
tm.LockForDuration()
time.Sleep(duration)
tm.UnlockIfLocked()
}

func TestMultipleUnlocks(t *testing.T) {
t.Parallel()
tm := NewTimedMutex(10 * time.Second)
tm.LockForDuration()
wasUnlocked := tm.UnlockIfLocked()
if !wasUnlocked {
t.Error("Mutex should have been unlocked by command, not timeout")
}
wasUnlocked = tm.UnlockIfLocked()
if wasUnlocked {
t.Error("Mutex should have been already unlocked by command")
}
wasUnlocked = tm.UnlockIfLocked()
if wasUnlocked {
t.Error("Mutex should have been already unlocked by command")
}
}

func TestJustWaitItOut(t *testing.T) {
t.Parallel()
tm := NewTimedMutex(1 * time.Second)
tm.LockForDuration()
time.Sleep(2 * time.Second)
}
15 changes: 15 additions & 0 deletions common/timedmutex/timed_mutex_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package timedmutex

import (
"sync"
"time"
)

// TimedMutex is a blocking mutex which will unlock
// after a specified time
type TimedMutex struct {
mtx sync.Mutex
timerLock sync.RWMutex
timer *time.Timer
duration time.Duration
}
19 changes: 12 additions & 7 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
func init() {
dispatcher = &Dispatcher{
routes: make(map[uuid.UUID][]chan interface{}),
jobs: make(chan *job, DefaultJobBuffer),
outbound: sync.Pool{
New: func() interface{} {
// Create unbuffered channel for data pass
Expand All @@ -25,14 +24,14 @@ func init() {
}

// Start starts the dispatch system by spawning workers and allocating memory
func Start(workers int) error {
func Start(workers, jobsLimit int) error {
if dispatcher == nil {
return errors.New(errNotInitialised)
}

mtx.Lock()
defer mtx.Unlock()
return dispatcher.start(workers)
return dispatcher.start(workers, jobsLimit)
}

// Stop attempts to stop the dispatch service, this will close all pipe channels
Expand Down Expand Up @@ -78,17 +77,22 @@ func SpawnWorker() error {

// start compares atomic running value, sets defaults, overides with
// configuration, then spawns workers
func (d *Dispatcher) start(workers int) error {
func (d *Dispatcher) start(workers, channelCapacity int) error {
if atomic.LoadUint32(&d.running) == 1 {
return errors.New(errAlreadyStarted)
}

if workers < 1 {
log.Warn(log.DispatchMgr,
"Dispatcher: workers cannot be zero using default values")
"Dispatcher: workers cannot be zero, using default values")
workers = DefaultMaxWorkers
}

if channelCapacity < 1 {
log.Warn(log.DispatchMgr,
"Dispatcher: jobs limit cannot be zero, using default values")
channelCapacity = DefaultJobsLimit
}
d.jobs = make(chan *job, channelCapacity)
d.maxWorkers = int32(workers)
d.shutdown = make(chan *sync.WaitGroup)

Expand Down Expand Up @@ -253,7 +257,8 @@ func (d *Dispatcher) publish(id uuid.UUID, data interface{}) error {
select {
case d.jobs <- newJob:
default:
return fmt.Errorf("dispatcher buffer at max capacity [%d] current worker count [%d], spawn more workers via --dispatchworkers=x",
return fmt.Errorf("dispatcher jobs at limit [%d] current worker count [%d]. Spawn more workers via --dispatchworkers=x"+
", or increase the jobs limit via --dispatchjobslimit=x",
len(d.jobs),
atomic.LoadInt32(&d.count))
}
Expand Down
19 changes: 11 additions & 8 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var mux *Mux

func TestMain(m *testing.M) {
err := Start(DefaultMaxWorkers)
err := Start(DefaultMaxWorkers, 0)
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand All @@ -34,11 +34,10 @@ func TestDispatcher(t *testing.T) {
t.Error("error cannot be nil")
}

err = Start(10)
err = Start(10, 0)
if err == nil {
t.Error("error cannot be nil")
}

if IsRunning() {
t.Error("should be false")
}
Expand All @@ -59,7 +58,7 @@ func TestDispatcher(t *testing.T) {
t.Error("should be true")
}

err = Start(10)
err = Start(10, 0)
if err == nil {
t.Error("error cannot be nil")
}
Expand Down Expand Up @@ -99,11 +98,13 @@ func TestDispatcher(t *testing.T) {
t.Error("error cannot be nil")
}

err = Start(0)
err = Start(0, 20)
if err != nil {
t.Error(err)
}

if cap(dispatcher.jobs) != 20 {
t.Errorf("Expected jobs limit to be %v, is %v", 20, cap(dispatcher.jobs))
}
payload := "something"

err = dispatcher.publish(uuid.UUID{}, &payload)
Expand Down Expand Up @@ -141,11 +142,13 @@ func TestDispatcher(t *testing.T) {
t.Error("error cannot be nil")
}

err = dispatcher.start(10)
err = dispatcher.start(10, -1)
if err != nil {
t.Error(err)
}

if cap(dispatcher.jobs) != DefaultJobsLimit {
t.Errorf("Expected jobs limit to be %v, is %v", DefaultJobsLimit, cap(dispatcher.jobs))
}
someID, err := uuid.NewV4()
if err != nil {
t.Error(err)
Expand Down
5 changes: 2 additions & 3 deletions dispatch/dispatch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

const (
// DefaultJobBuffer defines a maxiumum amount of jobs allowed in channel
DefaultJobBuffer = 100
// DefaultJobsLimit defines a maxiumum amount of jobs allowed in channel
DefaultJobsLimit = 100

// DefaultMaxWorkers is the package default worker ceiling amount
DefaultMaxWorkers = 10
Expand Down Expand Up @@ -47,7 +47,6 @@ type Dispatcher struct {

// MaxWorkers defines max worker ceiling
maxWorkers int32

// Atomic values -----------------------
// Worker counter
count int32
Expand Down
4 changes: 3 additions & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func ValidateSettings(b *Engine, s *Settings) {

b.Settings.GlobalHTTPProxy = s.GlobalHTTPProxy
b.Settings.DispatchMaxWorkerAmount = s.DispatchMaxWorkerAmount
b.Settings.DispatchJobsLimit = s.DispatchJobsLimit
}

// PrintSettings returns the engine settings
Expand Down Expand Up @@ -237,6 +238,7 @@ func PrintSettings(s *Settings) {
log.Debugf(log.Global, "\t Enable Database manager: %v", s.EnableDatabaseManager)
log.Debugf(log.Global, "\t Enable dispatcher: %v", s.EnableDispatcher)
log.Debugf(log.Global, "\t Dispatch package max worker amount: %d", s.DispatchMaxWorkerAmount)
log.Debugf(log.Global, "\t Dispatch package jobs limit: %d", s.DispatchJobsLimit)
log.Debugf(log.Global, "- FOREX SETTINGS:")
log.Debugf(log.Global, "\t Enable currency conveter: %v", s.EnableCurrencyConverter)
log.Debugf(log.Global, "\t Enable currency layer: %v", s.EnableCurrencyLayer)
Expand Down Expand Up @@ -274,7 +276,7 @@ func (e *Engine) Start() error {
}

if e.Settings.EnableDispatcher {
if err := dispatch.Start(e.Settings.DispatchMaxWorkerAmount); err != nil {
if err := dispatch.Start(e.Settings.DispatchMaxWorkerAmount, e.Settings.DispatchJobsLimit); err != nil {
log.Errorf(log.DispatchMgr, "Dispatcher unable to start: %v", err)
}
}
Expand Down
1 change: 1 addition & 0 deletions engine/engine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ type Settings struct {
// Dispatch system settings
EnableDispatcher bool
DispatchMaxWorkerAmount int
DispatchJobsLimit int
}
2 changes: 1 addition & 1 deletion engine/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func SetSubsystem(subsys string, enable bool) error {
Bot.ExchangeCurrencyPairManager.Stop()
case "dispatch":
if enable {
return dispatch.Start(Bot.Settings.DispatchMaxWorkerAmount)
return dispatch.Start(Bot.Settings.DispatchMaxWorkerAmount, Bot.Settings.DispatchJobsLimit)
}
return dispatch.Stop()
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/gemini/gemini.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (g *Gemini) SendAuthenticatedHTTPRequest(method, path string, params map[st
nil,
result,
true,
false,
true,
g.Verbose,
g.HTTPDebugging,
g.HTTPRecording)
Expand Down
2 changes: 1 addition & 1 deletion exchanges/orderbook/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestMain(m *testing.M) {
err := dispatch.Start(1)
err := dispatch.Start(1, dispatch.DefaultJobsLimit)
if err != nil {
log.Fatal(err)
}
Expand Down
Loading

0 comments on commit 242b02c

Please sign in to comment.