Skip to content

Commit

Permalink
rate limit: make context aware (thrasher-corp#731)
Browse files Browse the repository at this point in the history
* rate limits: Make context aware

* binance: rate limit allow for cancellation of reservation when deadline is exceeded

* request: add context.done() before initiating any bulk work.

* binance: update error return for rate limiting

* request: updated dealine check to remove after time.Now procedure as this will obfuscate a deadline which will be limited by the context check on every attempt, so no need to sleep with delay.
  • Loading branch information
shazbert authored Aug 10, 2021
1 parent 4602ade commit 232d6eb
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 198 deletions.
20 changes: 18 additions & 2 deletions exchanges/binance/ratelimit.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package binance

import (
"context"
"fmt"
"time"

"github.com/thrasher-corp/gocryptotrader/exchanges/request"
Expand Down Expand Up @@ -103,7 +105,7 @@ type RateLimit struct {
}

// Limit executes rate limiting functionality for Binance
func (r *RateLimit) Limit(f request.EndpointLimit) error {
func (r *RateLimit) Limit(ctx context.Context, f request.EndpointLimit) error {
var limiter *rate.Limiter
var tokens int
switch f {
Expand Down Expand Up @@ -214,11 +216,25 @@ func (r *RateLimit) Limit(f request.EndpointLimit) error {
}

var finalDelay time.Duration
var reserves = make([]*rate.Reservation, tokens)
for i := 0; i < tokens; i++ {
// Consume tokens 1 at a time as this avoids needing burst capacity in the limiter,
// which would otherwise allow the rate limit to be exceeded over short periods
finalDelay = limiter.Reserve().Delay()
reserves[i] = limiter.Reserve()
finalDelay = reserves[i].Delay()
}

if dl, ok := ctx.Deadline(); ok && dl.Before(time.Now().Add(finalDelay)) {
// Cancel all potential reservations to free up rate limiter if deadline
// is exceeded.
for x := range reserves {
reserves[x].Cancel()
}
return fmt.Errorf("rate limit delay of %s will exceed deadline: %w",
finalDelay,
context.DeadlineExceeded)
}

time.Sleep(finalDelay)
return nil
}
Expand Down
16 changes: 14 additions & 2 deletions exchanges/binance/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package binance

import (
"context"
"errors"
"testing"
"time"

"github.com/thrasher-corp/gocryptotrader/exchanges/request"
)
Expand All @@ -12,6 +15,7 @@ func TestRateLimit_Limit(t *testing.T) {
testTable := map[string]struct {
Expected request.EndpointLimit
Limit request.EndpointLimit
Deadline time.Time
}{
"All Orderbooks Ticker": {Expected: spotOrderbookTickerAllRate, Limit: bestPriceLimit("")},
"Orderbook Ticker": {Expected: spotDefaultRate, Limit: bestPriceLimit(symbol)},
Expand All @@ -24,6 +28,7 @@ func TestRateLimit_Limit(t *testing.T) {
"Orderbook Depth 500": {Expected: spotOrderbookDepth500Rate, Limit: orderbookLimit(500)},
"Orderbook Depth 1000": {Expected: spotOrderbookDepth1000Rate, Limit: orderbookLimit(1000)},
"Orderbook Depth 5000": {Expected: spotOrderbookDepth5000Rate, Limit: orderbookLimit(5000)},
"Exceeds deadline": {Expected: spotOrderbookDepth5000Rate, Limit: orderbookLimit(5000), Deadline: time.Now().Add(time.Nanosecond)},
}
for name, tt := range testTable {
tt := tt
Expand All @@ -35,8 +40,15 @@ func TestRateLimit_Limit(t *testing.T) {
t.Fatalf("incorrect limit applied.\nexp: %v\ngot: %v", exp, got)
}

ctx := context.Background()
if !tt.Deadline.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, tt.Deadline)
defer cancel()
}

l := SetRateLimit()
if err := l.Limit(tt.Limit); err != nil {
if err := l.Limit(ctx, tt.Limit); err != nil && !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("error applying rate limit: %v", err)
}
})
Expand All @@ -56,7 +68,7 @@ func TestRateLimit_LimitStatic(t *testing.T) {
t.Parallel()

l := SetRateLimit()
if err := l.Limit(tt); err != nil {
if err := l.Limit(context.Background(), tt); err != nil {
t.Fatalf("error applying rate limit: %v", err)
}
})
Expand Down
146 changes: 73 additions & 73 deletions exchanges/bitfinex/ratelimit.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bitfinex

import (
"context"
"errors"
"time"

Expand Down Expand Up @@ -255,156 +256,155 @@ type RateLimit struct {
}

// Limit limits outbound requests
func (r *RateLimit) Limit(f request.EndpointLimit) error {
func (r *RateLimit) Limit(ctx context.Context, f request.EndpointLimit) error {
switch f {
case platformStatus:
time.Sleep(r.PlatformStatus.Reserve().Delay())
return r.PlatformStatus.Wait(ctx)
case tickerBatch:
time.Sleep(r.TickerBatch.Reserve().Delay())
return r.TickerBatch.Wait(ctx)
case tickerFunction:
time.Sleep(r.Ticker.Reserve().Delay())
return r.Ticker.Wait(ctx)
case tradeRateLimit:
time.Sleep(r.Trade.Reserve().Delay())
return r.Trade.Wait(ctx)
case orderbookFunction:
time.Sleep(r.Orderbook.Reserve().Delay())
return r.Orderbook.Wait(ctx)
case stats:
time.Sleep(r.Stats.Reserve().Delay())
return r.Stats.Wait(ctx)
case candle:
time.Sleep(r.Candle.Reserve().Delay())
return r.Candle.Wait(ctx)
case configs:
time.Sleep(r.Configs.Reserve().Delay())
return r.Configs.Wait(ctx)
case status:
time.Sleep(r.Stats.Reserve().Delay())
return r.Stats.Wait(ctx)
case liquid:
time.Sleep(r.Liquid.Reserve().Delay())
return r.Liquid.Wait(ctx)
case leaderBoard:
time.Sleep(r.LeaderBoard.Reserve().Delay())
return r.LeaderBoard.Wait(ctx)
case marketAveragePrice:
time.Sleep(r.MarketAveragePrice.Reserve().Delay())
return r.MarketAveragePrice.Wait(ctx)
case fx:
time.Sleep(r.Fx.Reserve().Delay())
return r.Fx.Wait(ctx)
case accountWalletBalance:
time.Sleep(r.AccountWalletBalance.Reserve().Delay())
return r.AccountWalletBalance.Wait(ctx)
case accountWalletHistory:
time.Sleep(r.AccountWalletHistory.Reserve().Delay())
return r.AccountWalletHistory.Wait(ctx)
case retrieveOrder:
time.Sleep(r.RetrieveOrder.Reserve().Delay())
return r.RetrieveOrder.Wait(ctx)
case submitOrder:
time.Sleep(r.SubmitOrder.Reserve().Delay())
return r.SubmitOrder.Wait(ctx)
case updateOrder:
time.Sleep(r.UpdateOrder.Reserve().Delay())
return r.UpdateOrder.Wait(ctx)
case cancelOrder:
time.Sleep(r.CancelOrder.Reserve().Delay())
return r.CancelOrder.Wait(ctx)
case orderBatch:
time.Sleep(r.OrderBatch.Reserve().Delay())
return r.OrderBatch.Wait(ctx)
case cancelBatch:
time.Sleep(r.CancelBatch.Reserve().Delay())
return r.CancelBatch.Wait(ctx)
case orderHistory:
time.Sleep(r.OrderHistory.Reserve().Delay())
return r.OrderHistory.Wait(ctx)
case getOrderTrades:
time.Sleep(r.GetOrderTrades.Reserve().Delay())
return r.GetOrderTrades.Wait(ctx)
case getTrades:
time.Sleep(r.GetTrades.Reserve().Delay())
return r.GetTrades.Wait(ctx)
case getLedgers:
time.Sleep(r.GetLedgers.Reserve().Delay())
return r.GetLedgers.Wait(ctx)
case getAccountMarginInfo:
time.Sleep(r.GetAccountMarginInfo.Reserve().Delay())
return r.GetAccountMarginInfo.Wait(ctx)
case getActivePositions:
time.Sleep(r.GetActivePositions.Reserve().Delay())
return r.GetActivePositions.Wait(ctx)
case claimPosition:
time.Sleep(r.ClaimPosition.Reserve().Delay())
return r.ClaimPosition.Wait(ctx)
case getPositionHistory:
time.Sleep(r.GetPositionHistory.Reserve().Delay())
return r.GetPositionHistory.Wait(ctx)
case getPositionAudit:
time.Sleep(r.GetPositionAudit.Reserve().Delay())
return r.GetPositionAudit.Wait(ctx)
case updateCollateralOnPosition:
time.Sleep(r.UpdateCollateralOnPosition.Reserve().Delay())
return r.UpdateCollateralOnPosition.Wait(ctx)
case getActiveFundingOffers:
time.Sleep(r.GetActiveFundingOffers.Reserve().Delay())
return r.GetActiveFundingOffers.Wait(ctx)
case submitFundingOffer:
time.Sleep(r.SubmitFundingOffer.Reserve().Delay())
return r.SubmitFundingOffer.Wait(ctx)
case cancelFundingOffer:
time.Sleep(r.CancelFundingOffer.Reserve().Delay())
return r.CancelFundingOffer.Wait(ctx)
case cancelAllFundingOffer:
time.Sleep(r.CancelAllFundingOffer.Reserve().Delay())
return r.CancelAllFundingOffer.Wait(ctx)
case closeFunding:
time.Sleep(r.CloseFunding.Reserve().Delay())
return r.CloseFunding.Wait(ctx)
case fundingAutoRenew:
time.Sleep(r.FundingAutoRenew.Reserve().Delay())
return r.FundingAutoRenew.Wait(ctx)
case keepFunding:
time.Sleep(r.KeepFunding.Reserve().Delay())
return r.KeepFunding.Wait(ctx)
case getOffersHistory:
time.Sleep(r.GetOffersHistory.Reserve().Delay())
return r.GetOffersHistory.Wait(ctx)
case getFundingLoans:
time.Sleep(r.GetFundingLoans.Reserve().Delay())
return r.GetFundingLoans.Wait(ctx)
case getFundingLoanHistory:
time.Sleep(r.GetFundingLoanHistory.Reserve().Delay())
return r.GetFundingLoanHistory.Wait(ctx)
case getFundingCredits:
time.Sleep(r.GetFundingCredits.Reserve().Delay())
return r.GetFundingCredits.Wait(ctx)
case getFundingCreditsHistory:
time.Sleep(r.GetFundingCreditsHistory.Reserve().Delay())
return r.GetFundingCreditsHistory.Wait(ctx)
case getFundingTrades:
time.Sleep(r.GetFundingTrades.Reserve().Delay())
return r.GetFundingTrades.Wait(ctx)
case getFundingInfo:
time.Sleep(r.GetFundingInfo.Reserve().Delay())
return r.GetFundingInfo.Wait(ctx)
case getUserInfo:
time.Sleep(r.GetUserInfo.Reserve().Delay())
return r.GetUserInfo.Wait(ctx)
case transferBetweenWallets:
time.Sleep(r.TransferBetweenWallets.Reserve().Delay())
return r.TransferBetweenWallets.Wait(ctx)
case getDepositAddress:
time.Sleep(r.GetDepositAddress.Reserve().Delay())
return r.GetDepositAddress.Wait(ctx)
case withdrawal:
time.Sleep(r.Withdrawal.Reserve().Delay())
return r.Withdrawal.Wait(ctx)
case getMovements:
time.Sleep(r.GetMovements.Reserve().Delay())
return r.GetMovements.Wait(ctx)
case getAlertList:
time.Sleep(r.GetAlertList.Reserve().Delay())
return r.GetAlertList.Wait(ctx)
case setPriceAlert:
time.Sleep(r.SetPriceAlert.Reserve().Delay())
return r.SetPriceAlert.Wait(ctx)
case deletePriceAlert:
time.Sleep(r.DeletePriceAlert.Reserve().Delay())
return r.DeletePriceAlert.Wait(ctx)
case getBalanceForOrdersOffers:
time.Sleep(r.GetBalanceForOrdersOffers.Reserve().Delay())
return r.GetBalanceForOrdersOffers.Wait(ctx)
case userSettingsWrite:
time.Sleep(r.UserSettingsWrite.Reserve().Delay())
return r.UserSettingsWrite.Wait(ctx)
case userSettingsRead:
time.Sleep(r.UserSettingsRead.Reserve().Delay())
return r.UserSettingsRead.Wait(ctx)
case userSettingsDelete:
time.Sleep(r.UserSettingsDelete.Reserve().Delay())
return r.UserSettingsDelete.Wait(ctx)

// Bitfinex V1 API
case getAccountFees:
time.Sleep(r.GetAccountFees.Reserve().Delay())
return r.GetAccountFees.Wait(ctx)
case getWithdrawalFees:
time.Sleep(r.GetWithdrawalFees.Reserve().Delay())
return r.GetWithdrawalFees.Wait(ctx)
case getAccountSummary:
time.Sleep(r.GetAccountSummary.Reserve().Delay())
return r.GetAccountSummary.Wait(ctx)
case newDepositAddress:
time.Sleep(r.NewDepositAddress.Reserve().Delay())
return r.NewDepositAddress.Wait(ctx)
case getKeyPermissions:
time.Sleep(r.GetKeyPermissions.Reserve().Delay())
return r.GetKeyPermissions.Wait(ctx)
case getMarginInfo:
time.Sleep(r.GetMarginInfo.Reserve().Delay())
return r.GetMarginInfo.Wait(ctx)
case getAccountBalance:
time.Sleep(r.GetAccountBalance.Reserve().Delay())
return r.GetAccountBalance.Wait(ctx)
case walletTransfer:
time.Sleep(r.WalletTransfer.Reserve().Delay())
return r.WalletTransfer.Wait(ctx)
case withdrawV1:
time.Sleep(r.WithdrawV1.Reserve().Delay())
return r.WithdrawV1.Wait(ctx)
case orderV1:
time.Sleep(r.OrderV1.Reserve().Delay())
return r.OrderV1.Wait(ctx)
case orderMulti:
time.Sleep(r.OrderMulti.Reserve().Delay())
return r.OrderMulti.Wait(ctx)
case statsV1:
time.Sleep(r.Stats.Reserve().Delay())
return r.Stats.Wait(ctx)
case fundingbook:
time.Sleep(r.Fundingbook.Reserve().Delay())
return r.Fundingbook.Wait(ctx)
case lends:
time.Sleep(r.Lends.Reserve().Delay())
return r.Lends.Wait(ctx)
default:
return errors.New("endpoint rate limit functionality not found")
}
return nil
}

// SetRateLimit returns the rate limit for the exchange
Expand Down
30 changes: 18 additions & 12 deletions exchanges/bitflyer/ratelimit.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bitflyer

import (
"context"
"time"

"github.com/thrasher-corp/gocryptotrader/exchanges/request"
Expand Down Expand Up @@ -29,24 +30,29 @@ type RateLimit struct {
}

// Limit limits outbound requests
func (r *RateLimit) Limit(f request.EndpointLimit) error {
func (r *RateLimit) Limit(ctx context.Context, f request.EndpointLimit) error {
switch f {
case request.Auth:
time.Sleep(r.Auth.Reserve().Delay())
return r.Auth.Wait(ctx)
case orders:
res := r.Auth.Reserve()
time.Sleep(r.Order.Reserve().Delay())
time.Sleep(res.Delay())
err := r.Auth.Wait(ctx)
if err != nil {
return err
}
return r.Order.Wait(ctx)
case lowVolume:
authShell := r.Auth.Reserve()
orderShell := r.Order.Reserve()
time.Sleep(r.LowVolume.Reserve().Delay())
time.Sleep(orderShell.Delay())
time.Sleep(authShell.Delay())
err := r.LowVolume.Wait(ctx)
if err != nil {
return err
}
err = r.Order.Wait(ctx)
if err != nil {
return err
}
return r.Auth.Wait(ctx)
default:
time.Sleep(r.UnAuth.Reserve().Delay())
return r.UnAuth.Wait(ctx)
}
return nil
}

// SetRateLimit returns the rate limit for the exchange
Expand Down
Loading

0 comments on commit 232d6eb

Please sign in to comment.