Skip to content

Commit

Permalink
request/nonce: Refactor to simplify package and prevent consecutive m…
Browse files Browse the repository at this point in the history
…utex lock calls when accessing/setting nonce values (thrasher-corp#1506)

* improv. timed mutex

* Add all protection back in and jankyness because races. :'(

* Add intial benchmarkeroos

* Add master benchmarks

* goodness me

* what?

* what again?

* glorious: nits

* just a swaperino instead

* clean up package nonce so that we only need to aquire mutex once

* unlock before checking master

* commentary

* wha

* more comment

* ch comment

* nonce: Allow for broad customisation externally with a ~2ns overhead

* glorious: nits maybe works?

---------

Co-authored-by: Ryan O'Hara-Reid <[email protected]>
  • Loading branch information
shazbert and Ryan O'Hara-Reid authored Apr 12, 2024
1 parent 9657a57 commit e823f9e
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 155 deletions.
3 changes: 2 additions & 1 deletion exchanges/alphapoint/alphapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)
Expand Down Expand Up @@ -581,7 +582,7 @@ func (a *Alphapoint) SendAuthenticatedHTTPRequest(ctx context.Context, ep exchan
return err
}

n := a.Requester.GetNonce(true)
n := a.Requester.GetNonce(nonce.UnixNano)

headers := make(map[string]string)
headers["Content-Type"] = "application/json"
Expand Down
4 changes: 2 additions & 2 deletions exchanges/bitfinex/bitfinex.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
Expand Down Expand Up @@ -2092,10 +2093,9 @@ func (b *Bitfinex) SendAuthenticatedHTTPRequest(ctx context.Context, ep exchange

fullPath := ePoint + bitfinexAPIVersion + path
return b.SendPayload(ctx, endpoint, func() (*request.Item, error) {
n := b.Requester.GetNonce(true)
req := make(map[string]interface{})
req["request"] = bitfinexAPIVersion + path
req["nonce"] = n.String()
req["nonce"] = b.Requester.GetNonce(nonce.UnixNano).String()

for key, value := range params {
req[key] = value
Expand Down
3 changes: 2 additions & 1 deletion exchanges/bitstamp/bitstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
Expand Down Expand Up @@ -591,7 +592,7 @@ func (b *Bitstamp) SendAuthenticatedHTTPRequest(ctx context.Context, ep exchange

interim := json.RawMessage{}
err = b.SendPayload(ctx, request.Unset, func() (*request.Item, error) {
n := b.Requester.GetNonce(true).String()
n := b.Requester.GetNonce(nonce.UnixNano).String()

values.Set("key", creds.Key)
values.Set("nonce", n)
Expand Down
3 changes: 2 additions & 1 deletion exchanges/exmo/exmo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)

Expand Down Expand Up @@ -317,7 +318,7 @@ func (e *EXMO) SendAuthenticatedHTTPRequest(ctx context.Context, epath exchange.
path := urlPath + fmt.Sprintf("/v%s/%s", exmoAPIVersion, endpoint)

return e.SendPayload(ctx, request.Unset, func() (*request.Item, error) {
n := e.Requester.GetNonce(true).String()
n := e.Requester.GetNonce(nonce.UnixNano).String()
vals.Set("nonce", n)

payload := vals.Encode()
Expand Down
3 changes: 2 additions & 1 deletion exchanges/gemini/gemini.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)

Expand Down Expand Up @@ -421,7 +422,7 @@ func (g *Gemini) SendAuthenticatedHTTPRequest(ctx context.Context, ep exchange.U
return g.SendPayload(ctx, request.Auth, func() (*request.Item, error) {
req := make(map[string]interface{})
req["request"] = fmt.Sprintf("/v%s/%s", geminiAPIVersion, path)
req["nonce"] = g.Requester.GetNonce(true).String()
req["nonce"] = g.Requester.GetNonce(nonce.UnixNano).String()

for key, value := range params {
req[key] = value
Expand Down
3 changes: 2 additions & 1 deletion exchanges/kraken/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/log"
Expand Down Expand Up @@ -1007,7 +1008,7 @@ func (k *Kraken) SendAuthenticatedHTTPRequest(ctx context.Context, ep exchange.U

interim := json.RawMessage{}
err = k.SendPayload(ctx, request.Unset, func() (*request.Item, error) {
nonce := k.Requester.GetNonce(true).String()
nonce := k.Requester.GetNonce(nonce.UnixNano).String()
params.Set("nonce", nonce)
encoded := params.Encode()
var shasum []byte
Expand Down
40 changes: 18 additions & 22 deletions exchanges/nonce/nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,35 @@ package nonce
import (
"strconv"
"sync"
"time"
)

// UnixNano and Unix are default nonce setters
var (
UnixNano Setter = func() int64 { return time.Now().UnixNano() }
Unix Setter = func() int64 { return time.Now().Unix() }
)

// Setter is a function that returns a nonce start value.
type Setter func() int64

// Nonce struct holds the nonce value
type Nonce struct {
n int64
m sync.Mutex
}

// Get retrieves the nonce value
func (n *Nonce) Get() Value {
n.m.Lock()
defer n.m.Unlock()
return Value(n.n)
}

// GetInc increments and returns the value of the nonce
func (n *Nonce) GetInc() Value {
// GetAndIncrement returns the current nonce value and increments it. If value
// is 0, it will set the value to the current time.
func (n *Nonce) GetAndIncrement(set Setter) Value {
n.m.Lock()
defer n.m.Unlock()
if n.n == 0 {
n.n = set()
}
val := n.n
n.n++
return Value(n.n)
}

// Set sets the nonce value
func (n *Nonce) Set(val int64) {
n.m.Lock()
n.n = val
n.m.Unlock()
}

// String returns a string version of the nonce
func (n *Nonce) String() string {
return n.Get().String()
return Value(val)
}

// Value is a return type for GetValue
Expand Down
72 changes: 23 additions & 49 deletions exchanges/nonce/nonce_test.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,36 @@
package nonce

import (
"sync"
"testing"
)

func TestGet(t *testing.T) {
var nonce Nonce
nonce.Set(112321313)
if expected, result := Value(112321313), nonce.Get(); expected != result {
t.Errorf("Expected %d got %d", expected, result)
}
}

func TestGetInc(t *testing.T) {
var nonce Nonce
nonce.Set(1)
if expected, result := Value(2), nonce.GetInc(); expected != result {
t.Errorf("Expected %d got %d", expected, result)
}
}
"github.com/stretchr/testify/assert"
)

func TestSet(t *testing.T) {
func TestGetAndIncrement(t *testing.T) {
var nonce Nonce
nonce.Set(1)
if result, expected := nonce.Get(), Value(1); expected != result {
t.Errorf("Expected %d got %d", expected, result)
}
n1 := nonce.GetAndIncrement(Unix)
assert.NotZero(t, n1)
n2 := nonce.GetAndIncrement(Unix)
assert.NotZero(t, n2)
assert.NotEqual(t, n1, n2)

var nonce2 Nonce
n3 := nonce2.GetAndIncrement(UnixNano)
assert.NotZero(t, n3)
n4 := nonce2.GetAndIncrement(UnixNano)
assert.NotZero(t, n4)
assert.NotEqual(t, n3, n4)

assert.NotEqual(t, n1, n3)
assert.NotEqual(t, n2, n4)
}

func TestString(t *testing.T) {
var nonce Nonce
nonce.Set(12312313131)
expected := "12312313131"
result := nonce.String()
if expected != result {
t.Errorf("Expected %s got %s", expected, result)
}

v := nonce.Get()
if expected != v.String() {
t.Errorf("Expected %s got %s", expected, result)
}
}

func TestNonceConcurrency(t *testing.T) {
var nonce Nonce
nonce.Set(12312)

var wg sync.WaitGroup
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() { nonce.GetInc(); wg.Done() }()
}

wg.Wait()
nonce.n = 12312313131
got := nonce.GetAndIncrement(Unix)
assert.Equal(t, "12312313131", got.String())

if expected, result := Value(12312+1000), nonce.Get(); expected != result {
t.Errorf("Expected %d got %d", expected, result)
}
got = nonce.GetAndIncrement(Unix)
assert.Equal(t, "12312313132", got.String())
}
3 changes: 2 additions & 1 deletion exchanges/poloniex/poloniex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
"github.com/thrasher-corp/gocryptotrader/exchanges/nonce"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)
Expand Down Expand Up @@ -951,7 +952,7 @@ func (p *Poloniex) SendAuthenticatedHTTPRequest(ctx context.Context, ep exchange
headers := make(map[string]string)
headers["Content-Type"] = "application/x-www-form-urlencoded"
headers["Key"] = creds.Key
values.Set("nonce", p.Requester.GetNonce(true).String())
values.Set("nonce", p.Requester.GetNonce(nonce.UnixNano).String())
values.Set("command", endpoint)

hmac, err := crypto.GetHMAC(crypto.HashSHA512,
Expand Down
54 changes: 8 additions & 46 deletions exchanges/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,7 @@ func (r *Requester) doRequest(ctx context.Context, endpoint EndpointLimit, newRe
}

if verbose {
log.Errorf(log.RequestSys,
"%s request has failed. Retrying request in %s, attempt %d",
r.name,
delay,
attempt)
log.Errorf(log.RequestSys, "%s request has failed. Retrying request in %s, attempt %d", r.name, delay, attempt)
}

time.Sleep(delay)
Expand Down Expand Up @@ -281,21 +277,12 @@ func (r *Requester) doRequest(ctx context.Context, endpoint EndpointLimit, newRe

err = resp.Body.Close()
if err != nil {
log.Errorf(log.RequestSys,
"%s failed to close request body %s",
r.name,
err)
log.Errorf(log.RequestSys, "%s failed to close request body %s", r.name, err)
}
if verbose {
log.Debugf(log.RequestSys,
"HTTP status: %s, Code: %v",
resp.Status,
resp.StatusCode)
log.Debugf(log.RequestSys, "HTTP status: %s, Code: %v", resp.Status, resp.StatusCode)
if !p.HTTPDebugging {
log.Debugf(log.RequestSys,
"%s raw response: %s",
r.name,
string(contents))
log.Debugf(log.RequestSys, "%s raw response: %s", r.name, string(contents))
}
}
return unmarshallError
Expand All @@ -304,44 +291,19 @@ func (r *Requester) doRequest(ctx context.Context, endpoint EndpointLimit, newRe

func (r *Requester) drainBody(body io.ReadCloser) {
if _, err := io.Copy(io.Discard, io.LimitReader(body, drainBodyLimit)); err != nil {
log.Errorf(log.RequestSys,
"%s failed to drain request body %s",
r.name,
err)
log.Errorf(log.RequestSys, "%s failed to drain request body %s", r.name, err)
}

if err := body.Close(); err != nil {
log.Errorf(log.RequestSys,
"%s failed to close request body %s",
r.name,
err)
log.Errorf(log.RequestSys, "%s failed to close request body %s", r.name, err)
}
}

// GetNonce returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel
func (r *Requester) GetNonce(isNano bool) nonce.Value {
func (r *Requester) GetNonce(set nonce.Setter) nonce.Value {
r.timedLock.LockForDuration()
if r.Nonce.Get() == 0 {
if isNano {
r.Nonce.Set(time.Now().UnixNano())
} else {
r.Nonce.Set(time.Now().Unix())
}
return r.Nonce.Get()
}
return r.Nonce.GetInc()
}

// GetNonceMilli returns a nonce for requests. This locks and enforces concurrent
// nonce FIFO on the buffered job channel this is for millisecond
func (r *Requester) GetNonceMilli() nonce.Value {
r.timedLock.LockForDuration()
if r.Nonce.Get() == 0 {
r.Nonce.Set(time.Now().UnixMilli())
return r.Nonce.Get()
}
return r.Nonce.GetInc()
return r.Nonce.GetAndIncrement(set)
}

// SetProxy sets a proxy address for the client transport
Expand Down
Loading

0 comments on commit e823f9e

Please sign in to comment.