Skip to content

Commit

Permalink
Mirror Strategy: track orders triggered on backingExchange by trades …
Browse files Browse the repository at this point in the history
…on primaryExchange (closes stellar-deprecated#503) (stellar-deprecated#504)

* 1 - create strategy_mirror_trade_triggers table

* 2 - thread db instance through to mirror strategy

* 3 - thread marketID and backingMarketID through to mirrorStrategy

* 4 - insert trade trigger when offsetting trades in HandleFill

* 5 - update TestTradeUpgradeScripts to validate upgrade script changes from (1)

* 6 - register backingMarketID in the db

* 7 - construct fill tracker for backing exchange and trigger in HandleFills to confirm taker orders consumed

* 8 - trigger fill tracking on backing exchange at creation time

* 9 - fixed bug, shadowing of backingFillTracker, causing a nil dereference everywhere!

* 10 - update strategy_mirror_trade_triggers schema and PKEY to replace backing_txid with backing_order_id, and remove backing_market_id and backing_order_id from PKEY so we don't offset trades more than once

* 11 - add order_id to trades table along with logic to update it

* 12 - extract orderID from FetchMyTrades call of ccxtExchange for binance

* 13 - query StrategyMirrorTradeTriggerExists to ensure we don't re-offset trades in error scenarios

* 14 - fix bug in StrategyMirrorTradeTriggerExists, row.Scan needs inputs even if unused
  • Loading branch information
nikhilsaraf authored Sep 9, 2020
1 parent 1f8b66b commit b4c802d
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 94 deletions.
6 changes: 3 additions & 3 deletions cmd/exchanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ func init() {
checkInitRootFlags()
// call sdk.GetExchangeList() here so we pre-load exchanges before displaying the table
sdk.GetExchangeList()
fmt.Printf(" Exchange\t\t\tTested\t\tTrading\t\tAtomic Post-Only\t\tDescription\n")
fmt.Printf(" --------------------------------------------------------------------------------\n")
fmt.Printf(" Exchange\t\t\tTested\t\tTrading\t\tAtomic Post-Only\tTrade Has OrderID\t\tDescription\n")
fmt.Printf(" -----------------------------------------------------------------------------------------------------------------------------\n")
exchanges := plugins.Exchanges()
for _, name := range sortedExchangeKeys(exchanges) {
fmt.Printf(" %-24s\t%v\t\t%v\t\t%v\t\t%s\n", name, exchanges[name].Tested, exchanges[name].TradeEnabled, exchanges[name].AtomicPostOnly, exchanges[name].Description)
fmt.Printf(" %-24s\t%v\t\t%v\t\t%v\t\t\t%v\t\t%s\n", name, exchanges[name].Tested, exchanges[name].TradeEnabled, exchanges[name].AtomicPostOnly, exchanges[name].TradeHasOrderId, exchanges[name].Description)
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ var upgradeScripts = []*database.UpgradeScript{
kelpdb.SqlTradesTableAlter1,
kelpdb.SqlTradesIndexCreate3,
),
database.MakeUpgradeScript(6,
kelpdb.SqlStrategyMirrorTradeTriggersTableCreate,
kelpdb.SqlTradesTableAlter2,
),
}

const tradeExamples = ` kelp trade --botConf ./path/trader.cfg --strategy buysell --stratConf ./path/buysell.cfg
Expand Down Expand Up @@ -320,11 +324,13 @@ func makeStrategy(
exchangeShim api.ExchangeShim,
assetBase hProtocol.Asset,
assetQuote hProtocol.Asset,
marketID string,
ieif *plugins.IEIF,
tradingPair *model.TradingPair,
filterFactory *plugins.FilterFactory,
options inputs,
threadTracker *multithreading.ThreadTracker,
db *sql.DB,
) api.Strategy {
// setting the temp hack variables for the sdex price feeds
e := plugins.SetPrivateSdexHack(client, plugins.MakeIEIF(true), network)
Expand All @@ -343,11 +349,13 @@ func makeStrategy(
tradingPair,
&assetBase,
&assetQuote,
marketID,
*options.strategy,
*options.stratConfigPath,
*options.simMode,
botConfig.IsTradingSdex(),
filterFactory,
db,
)
if e != nil {
l.Info("")
Expand Down Expand Up @@ -583,6 +591,15 @@ func runTradeCmd(options inputs) {
QuoteAsset: assetQuote,
DB: db,
}
baseString, e := assetDisplayFn(tradingPair.Base)
if e != nil {
logger.Fatal(l, fmt.Errorf("could not convert base trading pair to string: %s", e))
}
quoteString, e := assetDisplayFn(tradingPair.Quote)
if e != nil {
logger.Fatal(l, fmt.Errorf("could not convert quote trading pair to string: %s", e))
}
marketID := plugins.MakeMarketID(botConfig.TradingExchangeName(), baseString, quoteString)
strategy := makeStrategy(
l,
network,
Expand All @@ -592,11 +609,13 @@ func runTradeCmd(options inputs) {
exchangeShim,
assetBase,
assetQuote,
marketID,
ieif,
tradingPair,
filterFactory,
options,
threadTracker,
db,
)
fillTracker := makeFillTracker(
l,
Expand Down
62 changes: 58 additions & 4 deletions cmd/trade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ func TestTradeUpgradeScripts(t *testing.T) {
}

// assert current state of the database
assert.Equal(t, 3, database.GetNumTablesInDb(db))
assert.Equal(t, 4, database.GetNumTablesInDb(db))
assert.True(t, database.CheckTableExists(db, "db_version"))
assert.True(t, database.CheckTableExists(db, "markets"))
assert.True(t, database.CheckTableExists(db, "trades"))
assert.True(t, database.CheckTableExists(db, "strategy_mirror_trade_triggers"))

// check schema of db_version table
var columns []database.TableColumn
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestTradeUpgradeScripts(t *testing.T) {

// check schema of trades table
columns = database.GetTableSchema(db, "trades")
assert.Equal(t, 10, len(columns), fmt.Sprintf("%v", columns))
assert.Equal(t, 11, len(columns), fmt.Sprintf("%v", columns))
database.AssertTableColumnsEqual(t, &database.TableColumn{
ColumnName: "market_id",
OrdinalPosition: 1,
Expand Down Expand Up @@ -200,30 +201,83 @@ func TestTradeUpgradeScripts(t *testing.T) {
DataType: "text",
CharacterMaximumLength: nil,
}, &columns[9])
database.AssertTableColumnsEqual(t, &database.TableColumn{
ColumnName: "order_id",
OrdinalPosition: 11,
ColumnDefault: nil,
IsNullable: "YES",
DataType: "text",
CharacterMaximumLength: nil,
}, &columns[10])
// check indexes of trades table
indexes = database.GetTableIndexes(db, "trades")
assert.Equal(t, 3, len(indexes))
database.AssertIndex(t, "trades", "trades_pkey", "CREATE UNIQUE INDEX trades_pkey ON public.trades USING btree (market_id, txid)", indexes)
database.AssertIndex(t, "trades", "trades_mdd", "CREATE INDEX trades_mdd ON public.trades USING btree (market_id, date(date_utc), date_utc)", indexes)
database.AssertIndex(t, "trades", "trades_amt", "CREATE UNIQUE INDEX trades_amt ON public.trades USING btree (account_id, market_id, txid)", indexes)

// check schema of strategy_mirror_trade_triggers table
columns = database.GetTableSchema(db, "strategy_mirror_trade_triggers")
assert.Equal(t, 4, len(columns), fmt.Sprintf("%v", columns))
database.AssertTableColumnsEqual(t, &database.TableColumn{
ColumnName: "market_id",
OrdinalPosition: 1,
ColumnDefault: nil,
IsNullable: "NO",
DataType: "text",
CharacterMaximumLength: nil,
}, &columns[0])
database.AssertTableColumnsEqual(t, &database.TableColumn{
ColumnName: "txid",
OrdinalPosition: 2,
ColumnDefault: nil,
IsNullable: "NO",
DataType: "text",
CharacterMaximumLength: nil,
}, &columns[1])
database.AssertTableColumnsEqual(t, &database.TableColumn{
ColumnName: "backing_market_id",
OrdinalPosition: 3,
ColumnDefault: nil,
IsNullable: "NO",
DataType: "text",
CharacterMaximumLength: nil,
}, &columns[2])
database.AssertTableColumnsEqual(t, &database.TableColumn{
ColumnName: "backing_order_id",
OrdinalPosition: 4,
ColumnDefault: nil,
IsNullable: "NO",
DataType: "text",
CharacterMaximumLength: nil,
}, &columns[3])
// check indexes of strategy_mirror_trade_triggers table
indexes = database.GetTableIndexes(db, "strategy_mirror_trade_triggers")
assert.Equal(t, 1, len(indexes))
database.AssertIndex(t, "strategy_mirror_trade_triggers", "strategy_mirror_trade_triggers_pkey", "CREATE UNIQUE INDEX strategy_mirror_trade_triggers_pkey ON public.strategy_mirror_trade_triggers USING btree (market_id, txid)", indexes)

// check entries of db_version table
var allRows [][]interface{}
allRows = database.QueryAllRows(db, "db_version")
assert.Equal(t, 5, len(allRows))
assert.Equal(t, 6, len(allRows))
// first three code_version_string is nil becuase the field was not supported at the time when the upgrade script was run, and only in version 4 of
// the database do we add the field. See upgradeScripts and RunUpgradeScripts() for more details
database.ValidateDBVersionRow(t, allRows[0], 1, time.Now(), 1, 50, nil)
database.ValidateDBVersionRow(t, allRows[1], 2, time.Now(), 3, 150, nil)
database.ValidateDBVersionRow(t, allRows[2], 3, time.Now(), 2, 100, nil)
database.ValidateDBVersionRow(t, allRows[3], 4, time.Now(), 1, 50, &codeVersionString)
database.ValidateDBVersionRow(t, allRows[4], 5, time.Now(), 2, 100, &codeVersionString)
database.ValidateDBVersionRow(t, allRows[5], 6, time.Now(), 2, 100, &codeVersionString)

// check entries of markets table
allRows = database.QueryAllRows(db, "markets")
assert.Equal(t, 0, len(allRows))

// check entries of markets table
// check entries of trades table
allRows = database.QueryAllRows(db, "trades")
assert.Equal(t, 0, len(allRows))

// check entries of strategy_mirror_trade_triggers table
allRows = database.QueryAllRows(db, "strategy_mirror_trade_triggers")
assert.Equal(t, 0, len(allRows))
}
7 changes: 7 additions & 0 deletions examples/configs/trader/sample_mirror.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ PER_LEVEL_SPREAD=0.005
# set to true if you want the bot to offset your trades onto the backing exchange to realize the per_level_spread against each trade
# requires you to specify the EXCHANGE_API_KEYS below
#OFFSET_TRADES=true
# this is the account_id in the trades table of the database. This is required if you enable the OFFSET_TRADES field above.
# This account_id is for the backing exchange, which is different from the account_id specified in the trader.cfg file when using OFFSET_TRADES
# see sample_trader.cfg for more details on this field.
#BACKING_DB_OVERRIDE__ACCOUNT_ID="account1"
# uncomment if we want to override what is used as the last trade cursor when loading filled trades for the backing exchange
#BACKING_FILL_TRACKER_LAST_TRADE_CURSOR_OVERRIDE="1570415431000"

# you can use multiple API keys to overcome rate limit concerns
#[[EXCHANGE_API_KEYS]]
#KEY=""
Expand Down
7 changes: 6 additions & 1 deletion kelpdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package kelpdb
const SqlMarketsTableCreate = "CREATE TABLE IF NOT EXISTS markets (market_id TEXT PRIMARY KEY, exchange_name TEXT NOT NULL, base TEXT NOT NULL, quote TEXT NOT NULL)"
const SqlTradesTableCreate = "CREATE TABLE IF NOT EXISTS trades (market_id TEXT NOT NULL, txid TEXT NOT NULL, date_utc TIMESTAMP WITHOUT TIME ZONE NOT NULL, action TEXT NOT NULL, type TEXT NOT NULL, counter_price DOUBLE PRECISION NOT NULL, base_volume DOUBLE PRECISION NOT NULL, counter_cost DOUBLE PRECISION NOT NULL, fee DOUBLE PRECISION NOT NULL, PRIMARY KEY (market_id, txid))"
const SqlTradesTableAlter1 = "ALTER TABLE trades ADD COLUMN account_id TEXT"
const SqlStrategyMirrorTradeTriggersTableCreate = "CREATE TABLE IF NOT EXISTS strategy_mirror_trade_triggers (market_id TEXT NOT NULL, txid TEXT NOT NULL, backing_market_id TEXT NOT NULL, backing_order_id TEXT NOT NULL, PRIMARY KEY (market_id, txid))"
const SqlTradesTableAlter2 = "ALTER TABLE trades ADD COLUMN order_id TEXT"

/*
indexes
Expand All @@ -26,7 +28,10 @@ const SqlTradesIndexCreate3 = "CREATE UNIQUE INDEX IF NOT EXISTS trades_amt ON t
const SqlMarketsInsertTemplate = "INSERT INTO markets (market_id, exchange_name, base, quote) VALUES ('%s', '%s', '%s', '%s')"

// SqlTradesInsertTemplate inserts into the trades table
const SqlTradesInsertTemplate = "INSERT INTO trades (market_id, txid, date_utc, action, type, counter_price, base_volume, counter_cost, fee, account_id) VALUES ('%s', '%s', '%s', '%s', '%s', %.15f, %.15f, %.15f, %.15f, '%s')"
const SqlTradesInsertTemplate = "INSERT INTO trades (market_id, txid, date_utc, action, type, counter_price, base_volume, counter_cost, fee, account_id, order_id) VALUES ('%s', '%s', '%s', '%s', '%s', %.15f, %.15f, %.15f, %.15f, '%s', '%s')"

// SqlStrategyMirrorTradeTriggersInsertTemplate inserts into the strategy_mirror_trade_triggers table
const SqlStrategyMirrorTradeTriggersInsertTemplate = "INSERT INTO strategy_mirror_trade_triggers (market_id, txid, backing_market_id, backing_order_id) VALUES ('%s', '%s', '%s', '%s')"

/*
queries
Expand Down
4 changes: 3 additions & 1 deletion model/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (r CancelOrderResult) String() string {
type Trade struct {
Order
TransactionID *TransactionID
OrderID string
Cost *Number
Fee *Number
}
Expand Down Expand Up @@ -264,8 +265,9 @@ func (t TradesByTsID) Less(i int, j int) bool {
}

func (t Trade) String() string {
return fmt.Sprintf("Trade[txid: %s, ts: %s, pair: %s, action: %s, type: %s, counterPrice: %s, baseVolume: %s, counterCost: %s, fee: %s]",
return fmt.Sprintf("Trade[txid: %s, orderId: %s, ts: %s, pair: %s, action: %s, type: %s, counterPrice: %s, baseVolume: %s, counterCost: %s, fee: %s]",
utils.CheckedString(t.TransactionID),
t.OrderID,
utils.CheckedString(t.Timestamp),
*t.Pair,
t.OrderAction,
Expand Down
21 changes: 21 additions & 0 deletions plugins/ccxtExchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var _ api.Exchange = ccxtExchange{}
// ccxtExchangeSpecificParamFactory knows how to create the exchange-specific params for each exchange
type ccxtExchangeSpecificParamFactory interface {
getParamsForAddOrder(submitMode api.SubmitMode) interface{}
getParamsForGetTradeHistory() interface{}
}

// ccxtExchange is the implementation for the CCXT REST library that supports many exchanges (https://github.com/franz-see/ccxt-rest, https://github.com/ccxt/ccxt/)
Expand Down Expand Up @@ -228,13 +229,32 @@ func (c ccxtExchange) GetTradeHistory(pair model.TradingPair, maybeCursorStart i
return nil, fmt.Errorf("error while fetching trade history for trading pair '%s': %s", pairString, e)
}

var maybeExchangeSpecificParams interface{}
if c.esParamFactory != nil {
maybeExchangeSpecificParams = c.esParamFactory.getParamsForGetTradeHistory()
}

trades := []model.Trade{}
for _, raw := range tradesRaw {
var t *model.Trade
t, e = c.readTrade(&pair, pairString, raw)
if e != nil {
return nil, fmt.Errorf("error while reading trade: %s", e)
}

orderID := ""
if maybeExchangeSpecificParams != nil {
paramsMap := maybeExchangeSpecificParams.(map[string]interface{})
if oidRes, ok := paramsMap["order_id"]; ok {
oidFn := oidRes.(func(info interface{}) (string, error))
orderID, e = oidFn(raw.Info)
if e != nil {
return nil, fmt.Errorf("error while reading 'order_id' from raw.Info for exchange with specific params: %s", e)
}
}
}
t.OrderID = orderID

trades = append(trades, *t)
}

Expand Down Expand Up @@ -320,6 +340,7 @@ func (c ccxtExchange) readTrade(pair *model.TradingPair, pairString string, rawT
TransactionID: model.MakeTransactionID(rawTrade.ID),
Cost: model.NumberFromFloat(rawTrade.Cost, feecCostPrecision),
Fee: model.NumberFromFloat(rawTrade.Fee.Cost, feecCostPrecision),
// OrderID read by calling function depending on override set for exchange params in "orderId" field of Info object
}

if rawTrade.Side == "sell" {
Expand Down
53 changes: 53 additions & 0 deletions plugins/ccxtExchangeSpecificParamFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package plugins

import (
"fmt"
"strconv"

"github.com/stellar/kelp/api"
)

type ccxtExchangeSpecificParamFactoryCoinbasepro struct{}

func (f *ccxtExchangeSpecificParamFactoryCoinbasepro) getParamsForAddOrder(submitMode api.SubmitMode) interface{} {
if submitMode == api.SubmitModeMakerOnly {
return map[string]interface{}{
"post_only": true,
}
}
return nil
}

func (f *ccxtExchangeSpecificParamFactoryCoinbasepro) getParamsForGetTradeHistory() interface{} {
return nil
}

var _ ccxtExchangeSpecificParamFactory = &ccxtExchangeSpecificParamFactoryCoinbasepro{}

type ccxtExchangeSpecificParamFactoryBinance struct{}

func (f *ccxtExchangeSpecificParamFactoryBinance) getParamsForAddOrder(submitMode api.SubmitMode) interface{} {
return nil
}

func (f *ccxtExchangeSpecificParamFactoryBinance) getParamsForGetTradeHistory() interface{} {
return map[string]interface{}{
"order_id": func(info interface{}) (string, error) {
rawInfo, ok := info.(map[string]interface{})
if !ok {
return "", fmt.Errorf("unable to convert input 'info' to a map[string]interface{}: %+v (type=%T)", rawInfo, rawInfo)
}

orderIDFloat64, ok := rawInfo["orderId"].(float64)
if !ok {
return "", fmt.Errorf("unable to parse info[\"orderId\"] as a float64: %+v (type=%T)", rawInfo["orderId"], rawInfo["orderId"])
}

orderIDInt64 := int64(orderIDFloat64)
orderID := strconv.FormatInt(orderIDInt64, 10)
return orderID, nil
},
}
}

var _ ccxtExchangeSpecificParamFactory = &ccxtExchangeSpecificParamFactoryBinance{}
18 changes: 0 additions & 18 deletions plugins/ccxtExchangeSpecificParamFactoryCoinbasepro.go

This file was deleted.

Loading

0 comments on commit b4c802d

Please sign in to comment.