Skip to content

Commit

Permalink
websocket/gateio: Add request functions for websocket multi-connectio…
Browse files Browse the repository at this point in the history
…n [SPOT] (thrasher-corp#1598)

* gateio: Add multi asset websocket support WIP.

* meow

* Add tests and shenanigans

* integrate flushing and for enabling/disabling pairs from rpc shenanigans

* some changes

* linter: fixes strikes again.

* Change name ConnectionAssociation -> ConnectionCandidate for better clarity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails.

* Add subscription tests (state functional)

* glorious:nits + proxy handling

* Spelling

* linter: fixerino

* instead of nil, dont do nil.

* clean up nils

* cya nils

* don't need to set URL or check if its running

* stream match update

* update tests

* linter: fix

* glorious: nits + handle context cancellations

* stop ping handler routine leak

* * Fix bug where reader routine on error that is not a disconnection error but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled.
* Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle.
* Dial now uses code from DialContext but just calls context.Background()
* Don't allow reader to return on parse binary response error. Just output error and return a non nil response

* Allow rollback on connect on any error across all connections

* fix shadow jutsu

* glorious/gk: nitters - adds in ws mock server

* linter: fix

* fix deadlock on connection as the previous channel had no reader and would hang connection reader for eternity.

* glorious: whooops

* gk: nits

* Leak issue and edge case

* Websocket: Add SendMessageReturnResponses

* whooooooopsie

* gk: nitssssss

* Update exchanges/stream/stream_match.go

Co-authored-by: Gareth Kirwan <[email protected]>

* Update exchanges/stream/stream_match_test.go

Co-authored-by: Gareth Kirwan <[email protected]>

* linter: appease the linter gods

* gk: nits

* gk: drain brain

* started

* more changes before merge match pr

* gateio: still building out

* gateio: finish spot

* fix up tests in gateio

* Add tests for stream package

* rm unused field

* glorious: nits

* rn files, specifically set function names to asset and offload routing to websocket type.

* linter: fix

* glorious: nits

* add counter and update gateio

* fix collision issue

* Update exchanges/stream/websocket.go

Co-authored-by: Scott <[email protected]>

* glorious: nits

* add tests

* linter: fix

* After merge

* Add error connection info

* upgrade to upstream merge

* Fix edge case where it does not reconnect made by an already closed connection

* stream coverage

* glorious: nits

* glorious: nits removed asset error handling in stream package

* linter: fix

* rm block

* Add basic readme

* fix asset enabled flush cycle for multi connection

* spella: fix

* linter: fix

* Add glorious suggestions, fix some race thing

* reinstate name before any routine gets spawned

* stop on error in mock tests

* glorious: nits

* glorious: nits found in CI build

* Add test for drain, bumped wait times as there seems to be something happening on macos CI builds, used context.WithTimeout because its instant.

* mutex across shutdown and connect for protection

* lint: fix

* test time withoffset, reinstate stop

* fix whoops

* const trafficCheckInterval; rm testmain

* y

* fix lint

* bump time check window

* stream: fix intermittant test failures while testing routines and remove code that is not needed.

* spells

* cant do what I did

* protect race due to routine.

* update testURL

* use mock websocket connection instead of test URL's

* linter: fix

* remove url because its throwing errors on CI builds

* connections drop all the time, don't need to worry about not being able to echo back ws data as it can be easily reviewed _test file side.

* remove another superfluous url thats not really set up for this

* spawn overwatch routine when there is no errors, inline checker instead of waiting for a time period, add sleep inline with echo handler as this is really quick and wanted to ensure that latency is handing correctly

* linter: fixerino uperino

* glorious: panix

* linter: things

* whoops

* dont need to make consecutive Unix() calls

* websocket: fix potential panic on error and no responses and adding waitForResponses

* rm json parser and handle in json package instead

* linter: fix

* linter: fix again

* * change field name OutboundRequestSignature to WrapperDefinedConnectionSignature for agnostic inbound and outbound connections.
* change method name GetOutboundConnection to GetConnection for agnostic inbound and outbound connections.
* drop outbound field map for improved performance just using a range and field check (less complex as well)
* change field name connections to connectionToWrapper for better clarity

* spells and magic and wands

* glorious: nits

* comparable check for signature

* mv err var

* glorious: nits and stuff

* attempt to fix race

* glorious: nits

* gk: nits; engine log cleanup

* gk: nits; OCD

* gk: nits; move function change file names

* gk: nits; 🚀

* gk: nits; convert variadic function and message inspection to interface and include a specific function for that handling so as to not need nil on every call

* gk: nits; continued

* gk: engine nits; rm loaded exchange

* gk: nits; drop WebsocketLoginResponse

* stream: Add match method EnsureMatchWithData

* gk: nits; rn Inspect to IsFinal

* gk: nits; rn to MessageFilter

* linter: fix

* gateio: update rate limit definitions (cherry-pick)

* Add test and missing

* Shared REST rate limit definitions with Websocket service, set lookup item to nil for systems that do not require rate limiting; add glorious nit

* integrate rate limits for websocket trading spot

* bitstamp: fix issue

* glorious: nits

* ch name and commentary

* fix bug add test

* rm a thing

* fix test

* Update engine/engine.go

Co-authored-by: Adrian Gallagher <[email protected]>

* thrasher: nits

* Update exchanges/stream/stream_match_test.go

Co-authored-by: Adrian Gallagher <[email protected]>

* Update exchanges/stream/stream_match_test.go

Co-authored-by: Adrian Gallagher <[email protected]>

* GK: nits rn websocket functions

* explicit function names for single to multi outbound orders

* linter: fix

---------

Co-authored-by: shazbert <[email protected]>
Co-authored-by: Gareth Kirwan <[email protected]>
Co-authored-by: Scott <[email protected]>
Co-authored-by: Adrian Gallagher <[email protected]>
  • Loading branch information
5 people authored Dec 20, 2024
1 parent 143e336 commit 50448ec
Show file tree
Hide file tree
Showing 23 changed files with 951 additions and 103 deletions.
46 changes: 13 additions & 33 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,17 +791,11 @@ func (bot *Engine) LoadExchange(name string) error {

localWG.Wait()
if !bot.Settings.EnableExchangeHTTPRateLimiter {
gctlog.Warnf(gctlog.ExchangeSys,
"Loaded exchange %s rate limiting has been turned off.\n",
exch.GetName(),
)
err = exch.DisableRateLimiter()
if err != nil {
gctlog.Errorf(gctlog.ExchangeSys,
"Loaded exchange %s rate limiting cannot be turned off: %s.\n",
exch.GetName(),
err,
)
gctlog.Errorf(gctlog.ExchangeSys, "%s error disabling rate limiter: %v", exch.GetName(), err)
} else {
gctlog.Warnf(gctlog.ExchangeSys, "%s rate limiting has been turned off", exch.GetName())
}
}

Expand All @@ -820,29 +814,18 @@ func (bot *Engine) LoadExchange(name string) error {
return err
}

base := exch.GetBase()
if base.API.AuthenticatedSupport ||
base.API.AuthenticatedWebsocketSupport {
assetTypes := base.GetAssetTypes(false)
var useAsset asset.Item
for a := range assetTypes {
err = base.CurrencyPairs.IsAssetEnabled(assetTypes[a])
if err != nil {
continue
}
useAsset = assetTypes[a]
break
}
err = exch.ValidateAPICredentials(context.TODO(), useAsset)
b := exch.GetBase()
if b.API.AuthenticatedSupport || b.API.AuthenticatedWebsocketSupport {
err = exch.ValidateAPICredentials(context.TODO(), asset.Spot)
if err != nil {
gctlog.Warnf(gctlog.ExchangeSys,
"%s: Cannot validate credentials, authenticated support has been disabled, Error: %s\n",
base.Name,
err)
base.API.AuthenticatedSupport = false
base.API.AuthenticatedWebsocketSupport = false
gctlog.Warnf(gctlog.ExchangeSys, "%s: Error validating credentials: %v", b.Name, err)
b.API.AuthenticatedSupport = false
b.API.AuthenticatedWebsocketSupport = false
exchCfg.API.AuthenticatedSupport = false
exchCfg.API.AuthenticatedWebsocketSupport = false
if b.Websocket != nil {
b.Websocket.SetCanUseAuthenticatedEndpoints(false)
}
}
}

Expand All @@ -854,10 +837,7 @@ func (bot *Engine) dryRunParamInteraction(param string) {
return
}

gctlog.Warnf(gctlog.Global,
"Command line argument '-%s' induces dry run mode."+
" Set -dryrun=false if you wish to override this.",
param)
gctlog.Warnf(gctlog.Global, "Command line argument '-%s' induces dry run mode. Set -dryrun=false if you wish to override this.", param)

if !bot.Settings.EnableDryRun {
bot.Settings.EnableDryRun = true
Expand Down
24 changes: 13 additions & 11 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,17 +456,20 @@ func (b *Bitfinex) handleWSEvent(respRaw []byte) error {
if err != nil {
return fmt.Errorf("%w 'chanId': %w from message: %s", errParsingWSField, err, respRaw)
}
if !b.Websocket.Match.IncomingWithData("unsubscribe:"+chanID, respRaw) {
return fmt.Errorf("%w: unsubscribe:%v", stream.ErrNoMessageListener, chanID)
err = b.Websocket.Match.RequireMatchWithData("unsubscribe:"+chanID, respRaw)
if err != nil {
return fmt.Errorf("%w: unsubscribe:%v", err, chanID)
}
case wsEventError:
if subID, err := jsonparser.GetUnsafeString(respRaw, "subId"); err == nil {
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%w: subscribe:%v", stream.ErrNoMessageListener, subID)
err = b.Websocket.Match.RequireMatchWithData("subscribe:"+subID, respRaw)
if err != nil {
return fmt.Errorf("%w: subscribe:%v", err, subID)
}
} else if chanID, err := jsonparser.GetUnsafeString(respRaw, "chanId"); err == nil {
if !b.Websocket.Match.IncomingWithData("unsubscribe:"+chanID, respRaw) {
return fmt.Errorf("%w: unsubscribe:%v", stream.ErrNoMessageListener, chanID)
err = b.Websocket.Match.RequireMatchWithData("unsubscribe:"+chanID, respRaw)
if err != nil {
return fmt.Errorf("%w: unsubscribe:%v", err, chanID)
}
} else {
return fmt.Errorf("unknown channel error; Message: %s", respRaw)
Expand Down Expand Up @@ -531,17 +534,16 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
c.Key = int(chanID)

// subscribeToChan removes the old subID keyed Subscription
if err := b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, c); err != nil {
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, c)
if err != nil {
return fmt.Errorf("%w: %w subID: %s", stream.ErrSubscriptionFailure, err, subID)
}

if b.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s ChannelID: %d\n", b.Name, c.Channel, c.Pairs, chanID)
}
if !b.Websocket.Match.IncomingWithData("subscribe:"+subID, respRaw) {
return fmt.Errorf("%w: subscribe:%v", stream.ErrNoMessageListener, subID)
}
return nil

return b.Websocket.Match.RequireMatchWithData("subscribe:"+subID, respRaw)
}

func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
Expand Down
5 changes: 3 additions & 2 deletions exchanges/bitmex/bitmex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ func (b *Bitmex) wsHandleData(respRaw []byte) error {
if e2 != nil {
return fmt.Errorf("%w parsing stream", e2)
}
if !b.Websocket.Match.IncomingWithData(op+":"+streamID, msg) {
return fmt.Errorf("%w: %s:%s", stream.ErrNoMessageListener, op, streamID)
err = b.Websocket.Match.RequireMatchWithData(op+":"+streamID, msg)
if err != nil {
return fmt.Errorf("%w: %s:%s", err, op, streamID)
}
return nil
}
Expand Down
5 changes: 1 addition & 4 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error {
return fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
}
event = strings.TrimSuffix(event, "scription_succeeded")
if !b.Websocket.Match.IncomingWithData(event+":"+channel, respRaw) {
return fmt.Errorf("%w: %s", stream.ErrNoMessageListener, event+":"+channel)
}
return nil
return b.Websocket.Match.RequireMatchWithData(event+":"+channel, respRaw)
}

func (b *Bitstamp) handleWSTrade(msg []byte) error {
Expand Down
8 changes: 2 additions & 6 deletions exchanges/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,7 @@ func (b *Base) SupportsAsset(a asset.Item) bool {
// PrintEnabledPairs prints the exchanges enabled asset pairs
func (b *Base) PrintEnabledPairs() {
for k, v := range b.CurrencyPairs.Pairs {
log.Infof(log.ExchangeSys, "%s Asset type %v:\n\t Enabled pairs: %v",
b.Name, strings.ToUpper(k.String()), v.Enabled)
log.Infof(log.ExchangeSys, "%s Asset type %v:\n\t Enabled pairs: %v", b.Name, strings.ToUpper(k.String()), v.Enabled)
}
}

Expand All @@ -987,10 +986,7 @@ func (b *Base) GetBase() *Base { return b }
// for validation of API credentials
func (b *Base) CheckTransientError(err error) error {
if _, ok := err.(net.Error); ok {
log.Warnf(log.ExchangeSys,
"%s net error captured, will not disable authentication %s",
b.Name,
err)
log.Warnf(log.ExchangeSys, "%s net error captured, will not disable authentication %s", b.Name, err)
return nil
}
return err
Expand Down
13 changes: 7 additions & 6 deletions exchanges/gateio/gateio_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,12 +2008,13 @@ type WsEventResponse struct {

// WsResponse represents generalized websocket push data from the server.
type WsResponse struct {
ID int64 `json:"id"`
Time types.Time `json:"time"`
TimeMs types.Time `json:"time_ms"`
Channel string `json:"channel"`
Event string `json:"event"`
Result json.RawMessage `json:"result"`
ID int64 `json:"id"`
Time types.Time `json:"time"`
TimeMs types.Time `json:"time_ms"`
Channel string `json:"channel"`
Event string `json:"event"`
Result json.RawMessage `json:"result"`
RequestID string `json:"request_id"`
}

// WsTicker websocket ticker information.
Expand Down
72 changes: 65 additions & 7 deletions exchanges/gateio/gateio_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,64 @@ func (g *Gateio) WsConnectSpot(ctx context.Context, conn stream.Connection) erro
return nil
}

// authenticateSpot sends an authentication message to the websocket connection
func (g *Gateio) authenticateSpot(ctx context.Context, conn stream.Connection) error {
return g.websocketLogin(ctx, conn, "spot.login")
}

// websocketLogin authenticates the websocket connection
func (g *Gateio) websocketLogin(ctx context.Context, conn stream.Connection, channel string) error {
if conn == nil {
return fmt.Errorf("%w: %T", common.ErrNilPointer, conn)
}

if channel == "" {
return errChannelEmpty
}

creds, err := g.GetCredentials(ctx)
if err != nil {
return err
}

tn := time.Now().Unix()
msg := "api\n" + channel + "\n" + "\n" + strconv.FormatInt(tn, 10)
mac := hmac.New(sha512.New, []byte(creds.Secret))
if _, err = mac.Write([]byte(msg)); err != nil {
return err
}
signature := hex.EncodeToString(mac.Sum(nil))

payload := WebsocketPayload{
RequestID: strconv.FormatInt(conn.GenerateMessageID(false), 10),
APIKey: creds.Key,
Signature: signature,
Timestamp: strconv.FormatInt(tn, 10),
}

req := WebsocketRequest{Time: tn, Channel: channel, Event: "api", Payload: payload}

resp, err := conn.SendMessageReturnResponse(ctx, websocketRateLimitNotNeededEPL, req.Payload.RequestID, req)
if err != nil {
return err
}

var inbound WebsocketAPIResponse
if err := json.Unmarshal(resp, &inbound); err != nil {
return err
}

if inbound.Header.Status != "200" {
var wsErr WebsocketErrors
if err := json.Unmarshal(inbound.Data, &wsErr.Errors); err != nil {
return err
}
return fmt.Errorf("%s: %s", wsErr.Errors.Label, wsErr.Errors.Message)
}

return nil
}

func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (string, error) {
msg := "channel=" + channel + "&event=" + event + "&time=" + strconv.FormatInt(t, 10)
mac := hmac.New(sha512.New, []byte(secret))
Expand All @@ -109,21 +167,21 @@ func (g *Gateio) generateWsSignature(secret, event, channel string, t int64) (st
// WsHandleSpotData handles spot data
func (g *Gateio) WsHandleSpotData(_ context.Context, respRaw []byte) error {
var push WsResponse
err := json.Unmarshal(respRaw, &push)
if err != nil {
if err := json.Unmarshal(respRaw, &push); err != nil {
return err
}

if push.RequestID != "" {
return g.Websocket.Match.RequireMatchWithData(push.RequestID, respRaw)
}

if push.Event == subscribeEvent || push.Event == unsubscribeEvent {
if !g.Websocket.Match.IncomingWithData(push.ID, respRaw) {
return fmt.Errorf("couldn't match subscription message with ID: %d", push.ID)
}
return nil
return g.Websocket.Match.RequireMatchWithData(push.ID, respRaw)
}

switch push.Channel { // TODO: Convert function params below to only use push.Result
case spotTickerChannel:
return g.processTicker(push.Result, push.Time.Time())
return g.processTicker(push.Result, push.TimeMs.Time())
case spotTradesChannel:
return g.processTrades(push.Result)
case spotCandlesticksChannel:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
)
Expand Down Expand Up @@ -55,7 +54,7 @@ func (g *Gateio) WsDeliveryFuturesConnect(ctx context.Context, conn stream.Conne
if err != nil {
return err
}
conn.SetupPingHandler(request.Unset, stream.PingHandler{
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, stream.PingHandler{
Websocket: true,
Delay: time.Second * 5,
MessageType: websocket.PingMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
Expand Down Expand Up @@ -76,7 +75,7 @@ func (g *Gateio) WsFuturesConnect(ctx context.Context, conn stream.Connection) e
if err != nil {
return err
}
conn.SetupPingHandler(request.Unset, stream.PingHandler{
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, stream.PingHandler{
Websocket: true,
MessageType: websocket.PingMessage,
Delay: time.Second * 15,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/kline"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/request"
"github.com/thrasher-corp/gocryptotrader/exchanges/stream"
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
"github.com/thrasher-corp/gocryptotrader/exchanges/ticker"
Expand Down Expand Up @@ -85,7 +84,7 @@ func (g *Gateio) WsOptionsConnect(ctx context.Context, conn stream.Connection) e
if err != nil {
return err
}
conn.SetupPingHandler(request.Unset, stream.PingHandler{
conn.SetupPingHandler(websocketRateLimitNotNeededEPL, stream.PingHandler{
Websocket: true,
Delay: time.Second * 5,
MessageType: websocket.PingMessage,
Expand Down
Loading

0 comments on commit 50448ec

Please sign in to comment.