From 4c0066452a465d5c14532cfe08709e8bb8dd59ae Mon Sep 17 00:00:00 2001 From: chrisleekr Date: Sat, 10 Dec 2022 07:27:57 +1100 Subject: [PATCH] fix: queue concurrent execution (#551) --- .dockerignore | 1 + .gitignore | 2 + CHANGELOG.md | 1 + app/binance/__tests__/candles.test.js | 4 +- app/binance/__tests__/tickers.test.js | 13 +++- app/binance/__tests__/user.test.js | 6 +- app/binance/candles.js | 3 +- app/binance/tickers.js | 12 +++- app/binance/user.js | 18 +++--- app/cronjob/trailingTrade.js | 4 +- .../step/ensure-grid-trade-order-executed.js | 2 +- .../__tests__/queue.test.js | 20 +++++-- app/cronjob/trailingTradeHelper/queue.js | 28 +++++---- app/frontend/webserver/handlers/auth.js | 4 +- app/frontend/websocket/configure.js | 3 +- .../handlers/__tests__/cancel-order.test.js | 5 +- .../manual-trade-all-symbols.test.js | 7 ++- .../handlers/__tests__/manual-trade.test.js | 5 +- .../__tests__/symbol-enable-action.test.js | 5 +- .../symbol-grid-trade-delete.test.js | 16 +++-- .../__tests__/symbol-setting-delete.test.js | 5 +- .../__tests__/symbol-setting-update.test.js | 5 +- .../__tests__/symbol-trigger-buy.test.js | 5 +- .../__tests__/symbol-trigger-sell.test.js | 5 +- .../symbol-update-last-buy-price.test.js | 13 +++- .../websocket/handlers/cancel-order.js | 5 +- .../handlers/manual-trade-all-symbols.js | 8 ++- .../websocket/handlers/manual-trade.js | 5 +- .../handlers/symbol-enable-action.js | 5 +- .../handlers/symbol-grid-trade-delete.js | 4 +- .../handlers/symbol-setting-delete.js | 5 +- .../handlers/symbol-setting-update.js | 4 +- .../websocket/handlers/symbol-trigger-buy.js | 5 +- .../websocket/handlers/symbol-trigger-sell.js | 5 +- .../handlers/symbol-update-last-buy-price.js | 8 ++- app/scripts/log-analyser.js | 59 +++++++++++++++++++ 36 files changed, 238 insertions(+), 67 deletions(-) create mode 100644 app/scripts/log-analyser.js diff --git a/.dockerignore b/.dockerignore index a9fcf344..42fd9b7e 100644 --- a/.dockerignore +++ b/.dockerignore @@ -33,3 +33,4 @@ docker-compose.server.yml .git tradingview +result.csv diff --git a/.gitignore b/.gitignore index a231614a..b0ab8a21 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,5 @@ log /public/dist/* !/public/dist/.gitkeep +result.csv +.~lock.result.csv# diff --git a/CHANGELOG.md b/CHANGELOG.md index 66889bbe..f5f88333 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file. - Added API error message when using the wrong API key/secret - [#544](https://github.com/chrisleekr/binance-trading-bot/pull/544) - Automatic login when password is autofilled by the browser by [@rando128](https://github.com/rando128) - [#550](https://github.com/chrisleekr/binance-trading-bot/pull/550) +- Fixed the queue concurrency issue - [#551](https://github.com/chrisleekr/binance-trading-bot/pull/551) Thanks [@rando128](https://github.com/rando128) for your great contributions. 💯 :heart: diff --git a/app/binance/__tests__/candles.test.js b/app/binance/__tests__/candles.test.js index ce1c5a77..d81416f2 100644 --- a/app/binance/__tests__/candles.test.js +++ b/app/binance/__tests__/candles.test.js @@ -236,7 +236,9 @@ describe('candles.js', () => { }); it('triggers queue.executeFor for ETHBTC', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'ETHBTC'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'ETHBTC', { + correlationId: expect.any(String) + }); }); }); }); diff --git a/app/binance/__tests__/tickers.test.js b/app/binance/__tests__/tickers.test.js index f78a3eb3..c03570d2 100644 --- a/app/binance/__tests__/tickers.test.js +++ b/app/binance/__tests__/tickers.test.js @@ -110,11 +110,15 @@ describe('tickers.js', () => { }); it('triggers queue.executeFor for BTCUSDT', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', { + correlationId: expect.any(String) + }); }); it('triggers queue.executeFor for BNBUSDT', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BNBUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BNBUSDT', { + correlationId: expect.any(String) + }); }); it('checks websocketTickersClean', () => { @@ -143,7 +147,10 @@ describe('tickers.js', () => { it('triggers queue.executeFor for BTCUSDT', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'BTCUSDT' + 'BTCUSDT', + { + correlationId: expect.any(String) + } ); }); diff --git a/app/binance/__tests__/user.test.js b/app/binance/__tests__/user.test.js index 9781fc59..0610d129 100644 --- a/app/binance/__tests__/user.test.js +++ b/app/binance/__tests__/user.test.js @@ -359,7 +359,8 @@ describe('user.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'ETHUSDT' + 'ETHUSDT', + { correlationId: expect.any(String) } ); }); @@ -677,7 +678,8 @@ describe('user.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'ETHUSDT' + 'ETHUSDT', + { correlationId: expect.any(String) } ); }); diff --git a/app/binance/candles.js b/app/binance/candles.js index 3b1c4fa5..f3c31bed 100644 --- a/app/binance/candles.js +++ b/app/binance/candles.js @@ -1,3 +1,4 @@ +const { v4: uuidv4 } = require('uuid'); const _ = require('lodash'); const queue = require('../cronjob/trailingTradeHelper/queue'); const { binance, mongo } = require('../helpers'); @@ -112,7 +113,7 @@ const syncCandles = async (logger, symbols) => { await mongo.bulkWrite(logger, 'trailing-trade-candles', operations); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { correlationId: uuidv4() }); }; return getCandles(); diff --git a/app/binance/tickers.js b/app/binance/tickers.js index a2eddc78..d0cd120e 100644 --- a/app/binance/tickers.js +++ b/app/binance/tickers.js @@ -1,3 +1,4 @@ +const { v4: uuidv4 } = require('uuid'); const _ = require('lodash'); const { binance, cache } = require('../helpers'); const queue = require('../cronjob/trailingTradeHelper/queue'); @@ -44,8 +45,15 @@ const setupTickersWebsocket = async (logger, symbols) => { monitoringSymbol, ticker => { errorHandlerWrapper(logger, 'Tickers', async () => { + const correlationId = uuidv4(); + const { eventType, eventTime, curDayClose: close, symbol } = ticker; + const symbolLogger = logger.child({ + correlationId, + symbol + }); + // Save latest candle for the symbol await cache.hset( 'trailing-trade-symbols', @@ -60,13 +68,13 @@ const setupTickersWebsocket = async (logger, symbols) => { const canExecuteTrailingTrade = symbols.includes(monitoringSymbol); - logger.info( + symbolLogger.info( { ticker, canExecuteTrailingTrade }, 'Received new ticker' ); if (canExecuteTrailingTrade) { - queue.executeFor(logger, monitoringSymbol); + queue.executeFor(symbolLogger, monitoringSymbol, { correlationId }); } }); } diff --git a/app/binance/user.js b/app/binance/user.js index 10909a38..7aa1e128 100644 --- a/app/binance/user.js +++ b/app/binance/user.js @@ -54,9 +54,9 @@ const setupUserWebsocket = async logger => { orderTime: transactTime // Transaction time } = evt; + const correlationId = uuidv4(); const symbolLogger = logger.child({ - jobName: 'trailingTrade', - correlationId: uuidv4(), + correlationId, symbol }); @@ -67,7 +67,7 @@ const setupUserWebsocket = async logger => { const checkLastOrder = async () => { const lastOrder = await getGridTradeLastOrder( - logger, + symbolLogger, symbol, side.toLowerCase() ); @@ -103,7 +103,7 @@ const setupUserWebsocket = async logger => { }; await updateGridTradeLastOrder( - logger, + symbolLogger, symbol, side.toLowerCase(), updatedOrder @@ -113,17 +113,17 @@ const setupUserWebsocket = async logger => { `The last order has been updated. ${orderId} - ${side} - ${orderStatus}` ); - queue.executeFor(symbolLogger, symbol); + queue.executeFor(symbolLogger, symbol, { correlationId }); } }; checkLastOrder(); const checkManualOrder = async () => { - const manualOrder = await getManualOrder(logger, symbol, orderId); + const manualOrder = await getManualOrder(symbolLogger, symbol, orderId); if (_.isEmpty(manualOrder) === false) { - await saveManualOrder(logger, symbol, orderId, { + await saveManualOrder(symbolLogger, symbol, orderId, { ...manualOrder, status: orderStatus, type: orderType, @@ -137,12 +137,12 @@ const setupUserWebsocket = async logger => { updateTime: eventTime }); - logger.info( + symbolLogger.info( { symbol, manualOrder, saveLog: true }, 'The manual order has been updated.' ); - queue.executeFor(logger, symbol); + queue.executeFor(symbolLogger, symbol, { correlationId }); } }; diff --git a/app/cronjob/trailingTrade.js b/app/cronjob/trailingTrade.js index 5626d38a..647f8996 100644 --- a/app/cronjob/trailingTrade.js +++ b/app/cronjob/trailingTrade.js @@ -27,10 +27,10 @@ const { } = require('./trailingTrade/steps'); const { errorHandlerWrapper } = require('../error-handler'); -const execute = async (rawLogger, symbol) => { +const execute = async (rawLogger, symbol, correlationId = uuidv4()) => { const logger = rawLogger.child({ jobName: 'trailingTrade', - correlationId: uuidv4(), + correlationId, symbol }); diff --git a/app/cronjob/trailingTrade/step/ensure-grid-trade-order-executed.js b/app/cronjob/trailingTrade/step/ensure-grid-trade-order-executed.js index 2cde64b6..085d4219 100644 --- a/app/cronjob/trailingTrade/step/ensure-grid-trade-order-executed.js +++ b/app/cronjob/trailingTrade/step/ensure-grid-trade-order-executed.js @@ -297,7 +297,7 @@ const execute = async (logger, rawData) => { } } - // Ensure buy order executed + // Ensure sell order executed const lastSellOrder = await getGridTradeLastOrder(logger, symbol, 'sell'); if (_.isEmpty(lastSellOrder) === false) { logger.info({ lastSellOrder }, 'Last grid trade sell order found'); diff --git a/app/cronjob/trailingTradeHelper/__tests__/queue.test.js b/app/cronjob/trailingTradeHelper/__tests__/queue.test.js index fc01a0e7..ff8e2f84 100644 --- a/app/cronjob/trailingTradeHelper/__tests__/queue.test.js +++ b/app/cronjob/trailingTradeHelper/__tests__/queue.test.js @@ -16,7 +16,10 @@ describe('queue', () => { jest.mock('config'); mockQueueProcess = jest.fn().mockImplementation((_concurrent, cb) => { - const job = jest.fn(); + const job = { + data: { correlationId: 'correlationId' }, + progress: jest.fn() + }; cb(job); }); @@ -53,8 +56,10 @@ describe('queue', () => { it('triggers new Queue for BTCUSDT', () => { expect(mockQueue).toHaveBeenCalledWith('BTCUSDT', expect.any(String), { prefix: `bull`, - settings: { - guardInterval: 1000 + limiter: { + max: 1, + duration: 10000, + bounceBack: true } }); }); @@ -62,21 +67,24 @@ describe('queue', () => { it('triggers executeTrailingTrade for BTCUSDT', () => { expect(mockExecuteTrailingTrade).toHaveBeenCalledWith( logger, - 'BTCUSDT' + 'BTCUSDT', + 'correlationId' ); }); it('triggers executeTrailingTrade for ETHUSDT', () => { expect(mockExecuteTrailingTrade).toHaveBeenCalledWith( logger, - 'ETHUSDT' + 'ETHUSDT', + 'correlationId' ); }); it('triggers executeTrailingTrade for BNBUSDT', () => { expect(mockExecuteTrailingTrade).toHaveBeenCalledWith( logger, - 'BNBUSDT' + 'BNBUSDT', + 'correlationId' ); }); diff --git a/app/cronjob/trailingTradeHelper/queue.js b/app/cronjob/trailingTradeHelper/queue.js index 63e78df1..a4f69c2d 100644 --- a/app/cronjob/trailingTradeHelper/queue.js +++ b/app/cronjob/trailingTradeHelper/queue.js @@ -15,12 +15,23 @@ const create = (funcLogger, symbol) => { const queue = new Queue(symbol, REDIS_URL, { prefix: `bull`, - settings: { - guardInterval: 1000 // Poll interval for delayed jobs and added jobs. + limiter: { + max: 1, + duration: 10000, // 10 seconds + // bounceBack: When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue + bounceBack: true } }); // Set concurrent for the job - queue.process(1, async _job => executeTrailingTrade(logger, symbol)); + queue.process(1, async job => { + await executeTrailingTrade( + logger, + symbol, + _.get(job.data, 'correlationId') + ); + + job.progress(100); + }); return queue; }; @@ -46,7 +57,7 @@ const init = async (funcLogger, symbols) => { * @param {*} funcLogger * @param {*} symbol */ -const executeFor = async (funcLogger, symbol) => { +const executeFor = async (funcLogger, symbol, jobData = {}) => { const logger = funcLogger.child({ helper: 'queue' }); if (!(symbol in queues)) { @@ -54,12 +65,9 @@ const executeFor = async (funcLogger, symbol) => { return; } - await queues[symbol].add( - {}, - { - removeOnComplete: 100 // number specified the amount of jobs to keep. - } - ); + await queues[symbol].add(jobData, { + removeOnComplete: 100 // number specified the amount of jobs to keep. + }); }; module.exports = { diff --git a/app/frontend/webserver/handlers/auth.js b/app/frontend/webserver/handlers/auth.js index 509b57d9..87892fb2 100644 --- a/app/frontend/webserver/handlers/auth.js +++ b/app/frontend/webserver/handlers/auth.js @@ -1,3 +1,4 @@ +const { v4: uuidv4 } = require('uuid'); const _ = require('lodash'); const bcrypt = require('bcryptjs'); const config = require('config'); @@ -43,9 +44,10 @@ const generateToken = async logger => { }; const handleAuth = async (funcLogger, app, { loginLimiter }) => { - const logger = funcLogger.child({ endpoint: '/auth' }); + const handlerLogger = funcLogger.child({ endpoint: '/auth' }); app.route('/auth').post(async (req, res) => { + const logger = handlerLogger.child({ correlationId: uuidv4() }); const { password: requestedPassword } = req.body; const clientIp = requestIp.getClientIp(req); diff --git a/app/frontend/websocket/configure.js b/app/frontend/websocket/configure.js index 0c35a5ae..a018606c 100644 --- a/app/frontend/websocket/configure.js +++ b/app/frontend/websocket/configure.js @@ -1,3 +1,4 @@ +const { v4: uuidv4 } = require('uuid'); const WebSocket = require('ws'); const config = require('config'); @@ -78,7 +79,7 @@ const configureWebSocket = async (server, funcLogger, { loginLimiter }) => { return; } - const commandLogger = logger.child({ payload }); + const commandLogger = logger.child({ payload, correlationId: uuidv4() }); const commandMaps = { latest: handleLatest, diff --git a/app/frontend/websocket/handlers/__tests__/cancel-order.test.js b/app/frontend/websocket/handlers/__tests__/cancel-order.test.js index 58b37d01..858e7511 100644 --- a/app/frontend/websocket/handlers/__tests__/cancel-order.test.js +++ b/app/frontend/websocket/handlers/__tests__/cancel-order.test.js @@ -35,6 +35,7 @@ describe('cancel-order.js', () => { const { logger } = require('../../../../helpers'); loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; const { handleCancelOrder } = require('../cancel-order'); await handleCancelOrder(loggerMock, mockWebSocketServer, { @@ -63,7 +64,9 @@ describe('cancel-order.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/manual-trade-all-symbols.test.js b/app/frontend/websocket/handlers/__tests__/manual-trade-all-symbols.test.js index 30b09ab4..1723d909 100644 --- a/app/frontend/websocket/handlers/__tests__/manual-trade-all-symbols.test.js +++ b/app/frontend/websocket/handlers/__tests__/manual-trade-all-symbols.test.js @@ -249,6 +249,7 @@ describe('manual-trade-all-symbols.js', () => { const { logger, PubSub } = require('../../../../helpers'); loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; PubSubMock = PubSub; PubSubMock.publish = jest.fn().mockResolvedValue(true); @@ -313,7 +314,8 @@ describe('manual-trade-all-symbols.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - symbol + symbol, + { correlationId: 'correlationId' } ); }); } else { @@ -413,7 +415,8 @@ describe('manual-trade-all-symbols.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); } else { diff --git a/app/frontend/websocket/handlers/__tests__/manual-trade.test.js b/app/frontend/websocket/handlers/__tests__/manual-trade.test.js index b596cf4d..de9d1baa 100644 --- a/app/frontend/websocket/handlers/__tests__/manual-trade.test.js +++ b/app/frontend/websocket/handlers/__tests__/manual-trade.test.js @@ -34,6 +34,7 @@ describe('manual-trade.js', () => { const { logger } = require('../../../../helpers'); loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; const { handleManualTrade } = require('../manual-trade'); await handleManualTrade(loggerMock, mockWebSocketServer, { @@ -63,7 +64,9 @@ describe('manual-trade.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/symbol-enable-action.test.js b/app/frontend/websocket/handlers/__tests__/symbol-enable-action.test.js index f1a7f35e..59aef6ec 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-enable-action.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-enable-action.test.js @@ -30,6 +30,7 @@ describe('symbol-enable-action.test.js', () => { beforeEach(async () => { const { logger } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockDeleteDisableAction = jest.fn().mockResolvedValue(true); @@ -53,7 +54,9 @@ describe('symbol-enable-action.test.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/symbol-grid-trade-delete.test.js b/app/frontend/websocket/handlers/__tests__/symbol-grid-trade-delete.test.js index 01937284..8df621dd 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-grid-trade-delete.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-grid-trade-delete.test.js @@ -47,6 +47,7 @@ describe('symbol-grid-trade-delete.test.js', () => { beforeEach(async () => { const { logger, slack } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockSlack = slack; mockSlack.sendMessage = jest.fn().mockResolvedValue(true); @@ -105,7 +106,8 @@ describe('symbol-grid-trade-delete.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( mockLogger, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); @@ -123,6 +125,7 @@ describe('symbol-grid-trade-delete.test.js', () => { beforeEach(async () => { const { logger, slack } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockSlack = slack; mockSlack.sendMessage = jest.fn().mockResolvedValue(true); @@ -181,7 +184,8 @@ describe('symbol-grid-trade-delete.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( mockLogger, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); @@ -199,6 +203,7 @@ describe('symbol-grid-trade-delete.test.js', () => { beforeEach(async () => { const { logger, slack } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockSlack = slack; mockSlack.sendMessage = jest.fn().mockResolvedValue(true); @@ -246,7 +251,8 @@ describe('symbol-grid-trade-delete.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( mockLogger, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); @@ -264,6 +270,7 @@ describe('symbol-grid-trade-delete.test.js', () => { beforeEach(async () => { const { logger, slack } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockSlack = slack; mockSlack.sendMessage = jest.fn().mockResolvedValue(true); @@ -308,7 +315,8 @@ describe('symbol-grid-trade-delete.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( mockLogger, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); diff --git a/app/frontend/websocket/handlers/__tests__/symbol-setting-delete.test.js b/app/frontend/websocket/handlers/__tests__/symbol-setting-delete.test.js index d455016c..537f4e00 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-setting-delete.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-setting-delete.test.js @@ -30,6 +30,7 @@ describe('symbol-setting-delete.test.js', () => { beforeEach(async () => { const { logger } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockDeleteSymbolConfiguration = jest.fn().mockResolvedValue(true); @@ -56,7 +57,9 @@ describe('symbol-setting-delete.test.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/symbol-setting-update.test.js b/app/frontend/websocket/handlers/__tests__/symbol-setting-update.test.js index 7bcdd8e7..516e95e3 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-setting-update.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-setting-update.test.js @@ -31,6 +31,7 @@ describe('symbol-setting-update.test.js', () => { beforeEach(async () => { const { logger } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; mockGetSymbolConfiguration = jest.fn().mockResolvedValue({ candles: { @@ -267,7 +268,9 @@ describe('symbol-setting-update.test.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/symbol-trigger-buy.test.js b/app/frontend/websocket/handlers/__tests__/symbol-trigger-buy.test.js index cec5fe20..d9f4f1cf 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-trigger-buy.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-trigger-buy.test.js @@ -36,6 +36,7 @@ describe('symbol-trigger-buy.test.js', () => { beforeEach(async () => { const { logger } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; const { handleSymbolTriggerBuy } = require('../symbol-trigger-buy'); await handleSymbolTriggerBuy(mockLogger, mockWebSocketServer, { @@ -61,7 +62,9 @@ describe('symbol-trigger-buy.test.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/symbol-trigger-sell.test.js b/app/frontend/websocket/handlers/__tests__/symbol-trigger-sell.test.js index 0d84b2c4..ce704d22 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-trigger-sell.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-trigger-sell.test.js @@ -36,6 +36,7 @@ describe('symbol-trigger-sell.test.js', () => { beforeEach(async () => { const { logger } = require('../../../../helpers'); mockLogger = logger; + mockLogger.fields = { correlationId: 'correlationId' }; const { handleSymbolTriggerSell } = require('../symbol-trigger-sell'); await handleSymbolTriggerSell(mockLogger, mockWebSocketServer, { @@ -59,7 +60,9 @@ describe('symbol-trigger-sell.test.js', () => { }); it('triggers queue.executeFor', () => { - expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT'); + expect(mockQueue.executeFor).toHaveBeenCalledWith(mockLogger, 'BTCUSDT', { + correlationId: 'correlationId' + }); }); it('triggers ws.send', () => { diff --git a/app/frontend/websocket/handlers/__tests__/symbol-update-last-buy-price.test.js b/app/frontend/websocket/handlers/__tests__/symbol-update-last-buy-price.test.js index a800603c..f4e68f46 100644 --- a/app/frontend/websocket/handlers/__tests__/symbol-update-last-buy-price.test.js +++ b/app/frontend/websocket/handlers/__tests__/symbol-update-last-buy-price.test.js @@ -35,6 +35,7 @@ describe('symbol-update-last-buy-price.test.js', () => { const { mongo, logger, PubSub } = require('../../../../helpers'); mongoMock = mongo; loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; PubSubMock = PubSub; mongoMock.deleteOne = jest.fn().mockResolvedValue(true); @@ -64,7 +65,8 @@ describe('symbol-update-last-buy-price.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); @@ -102,6 +104,7 @@ describe('symbol-update-last-buy-price.test.js', () => { } = require('../../../../helpers'); mongoMock = mongo; loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; PubSubMock = PubSub; cacheMock = cache; @@ -151,6 +154,7 @@ describe('symbol-update-last-buy-price.test.js', () => { beforeEach(async () => { const { cache, logger, PubSub } = require('../../../../helpers'); loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; PubSubMock = PubSub; cacheMock = cache; @@ -216,7 +220,8 @@ describe('symbol-update-last-buy-price.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); @@ -245,6 +250,7 @@ describe('symbol-update-last-buy-price.test.js', () => { beforeEach(async () => { const { cache, logger, PubSub } = require('../../../../helpers'); loggerMock = logger; + loggerMock.fields = { correlationId: 'correlationId' }; PubSubMock = PubSub; cacheMock = cache; @@ -310,7 +316,8 @@ describe('symbol-update-last-buy-price.test.js', () => { it('triggers queue.executeFor', () => { expect(mockQueue.executeFor).toHaveBeenCalledWith( loggerMock, - 'BTCUSDT' + 'BTCUSDT', + { correlationId: 'correlationId' } ); }); diff --git a/app/frontend/websocket/handlers/cancel-order.js b/app/frontend/websocket/handlers/cancel-order.js index fe869cda..ac4c156d 100644 --- a/app/frontend/websocket/handlers/cancel-order.js +++ b/app/frontend/websocket/handlers/cancel-order.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const moment = require('moment'); const { saveOverrideAction @@ -24,7 +25,9 @@ const handleCancelOrder = async (logger, ws, payload) => { `Cancelling the ${side.toLowerCase()} order action has been received. Wait for cancelling the order.` ); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send( JSON.stringify({ diff --git a/app/frontend/websocket/handlers/manual-trade-all-symbols.js b/app/frontend/websocket/handlers/manual-trade-all-symbols.js index 763f1708..efaaa051 100644 --- a/app/frontend/websocket/handlers/manual-trade-all-symbols.js +++ b/app/frontend/websocket/handlers/manual-trade-all-symbols.js @@ -62,7 +62,9 @@ const handleManualTradeAllSymbols = async (logger, ws, payload) => { `Order for ${symbol} has been queued.` ); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); currentTime = moment(currentTime).add( placeManualOrderInterval, @@ -103,7 +105,9 @@ const handleManualTradeAllSymbols = async (logger, ws, payload) => { `Order for ${symbol} has been queued.` ); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); currentTime = moment(currentTime).add( placeManualOrderInterval, diff --git a/app/frontend/websocket/handlers/manual-trade.js b/app/frontend/websocket/handlers/manual-trade.js index e996c415..653f8ae0 100644 --- a/app/frontend/websocket/handlers/manual-trade.js +++ b/app/frontend/websocket/handlers/manual-trade.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const moment = require('moment'); const { saveOverrideAction @@ -23,7 +24,9 @@ const handleManualTrade = async (logger, ws, payload) => { 'The manual order received by the bot. Wait for placing the order.' ); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send( JSON.stringify({ diff --git a/app/frontend/websocket/handlers/symbol-enable-action.js b/app/frontend/websocket/handlers/symbol-enable-action.js index 81d0fd40..b5df85d9 100644 --- a/app/frontend/websocket/handlers/symbol-enable-action.js +++ b/app/frontend/websocket/handlers/symbol-enable-action.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const { deleteDisableAction } = require('../../../cronjob/trailingTradeHelper/common'); @@ -12,7 +13,9 @@ const handleSymbolEnableAction = async (logger, ws, payload) => { await deleteDisableAction(logger, symbol); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send( JSON.stringify({ result: true, type: 'symbol-enable-action-result' }) diff --git a/app/frontend/websocket/handlers/symbol-grid-trade-delete.js b/app/frontend/websocket/handlers/symbol-grid-trade-delete.js index 7406cd87..e9f2ec22 100644 --- a/app/frontend/websocket/handlers/symbol-grid-trade-delete.js +++ b/app/frontend/websocket/handlers/symbol-grid-trade-delete.js @@ -39,7 +39,9 @@ const handleSymbolGridTradeDelete = async (logger, ws, payload) => { await deleteSymbolGridTrade(logger, symbol); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send( JSON.stringify({ result: true, type: 'symbol-grid-trade-delete-result' }) diff --git a/app/frontend/websocket/handlers/symbol-setting-delete.js b/app/frontend/websocket/handlers/symbol-setting-delete.js index a54ee483..11ca2f5d 100644 --- a/app/frontend/websocket/handlers/symbol-setting-delete.js +++ b/app/frontend/websocket/handlers/symbol-setting-delete.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const { deleteSymbolConfiguration } = require('../../../cronjob/trailingTradeHelper/configuration'); @@ -12,7 +13,9 @@ const handleSymbolSettingDelete = async (logger, ws, payload) => { await deleteSymbolConfiguration(logger, symbol); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send( JSON.stringify({ result: true, type: 'symbol-setting-delete-result' }) diff --git a/app/frontend/websocket/handlers/symbol-setting-update.js b/app/frontend/websocket/handlers/symbol-setting-update.js index 3b480ab4..078677aa 100644 --- a/app/frontend/websocket/handlers/symbol-setting-update.js +++ b/app/frontend/websocket/handlers/symbol-setting-update.js @@ -44,7 +44,9 @@ const handleSymbolSettingUpdate = async (logger, ws, payload) => { await saveSymbolConfiguration(logger, symbol, symbolConfiguration); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send( JSON.stringify({ diff --git a/app/frontend/websocket/handlers/symbol-trigger-buy.js b/app/frontend/websocket/handlers/symbol-trigger-buy.js index f94b536a..449a0f7f 100644 --- a/app/frontend/websocket/handlers/symbol-trigger-buy.js +++ b/app/frontend/websocket/handlers/symbol-trigger-buy.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const moment = require('moment'); const { saveOverrideAction @@ -25,7 +26,9 @@ const handleSymbolTriggerBuy = async (logger, ws, payload) => { 'The buy order received by the bot. Wait for placing the order.' ); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send(JSON.stringify({ result: true, type: 'symbol-trigger-buy-result' })); }; diff --git a/app/frontend/websocket/handlers/symbol-trigger-sell.js b/app/frontend/websocket/handlers/symbol-trigger-sell.js index 58a8f7f6..371b7326 100644 --- a/app/frontend/websocket/handlers/symbol-trigger-sell.js +++ b/app/frontend/websocket/handlers/symbol-trigger-sell.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const moment = require('moment'); const { saveOverrideAction @@ -22,7 +23,9 @@ const handleSymbolTriggerSell = async (logger, ws, payload) => { 'The sell order received by the bot. Wait for placing the order.' ); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); ws.send(JSON.stringify({ result: true, type: 'symbol-trigger-sell-result' })); }; diff --git a/app/frontend/websocket/handlers/symbol-update-last-buy-price.js b/app/frontend/websocket/handlers/symbol-update-last-buy-price.js index 68f55b2f..c45dbe78 100644 --- a/app/frontend/websocket/handlers/symbol-update-last-buy-price.js +++ b/app/frontend/websocket/handlers/symbol-update-last-buy-price.js @@ -19,7 +19,9 @@ const deleteLastBuyPrice = async (logger, ws, symbol) => { key: `${symbol}-last-buy-price` }); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); PubSub.publish('frontend-notification', { type: 'success', @@ -97,7 +99,9 @@ const updateLastBuyPrice = async (logger, ws, symbol, lastBuyPrice) => { quantity: baseAssetTotalBalance }); - queue.executeFor(logger, symbol); + queue.executeFor(logger, symbol, { + correlationId: _.get(logger, 'fields.correlationId', '') + }); PubSub.publish('frontend-notification', { type: 'success', diff --git a/app/scripts/log-analyser.js b/app/scripts/log-analyser.js new file mode 100644 index 00000000..820b69b4 --- /dev/null +++ b/app/scripts/log-analyser.js @@ -0,0 +1,59 @@ +/* istanbul ignore file */ +/* eslint-disable no-console */ +const _ = require('lodash'); +const fs = require('fs'); + +const myArgs = process.argv.slice(2); + +const filename = myArgs[0] || ''; +if (filename === '') { + console.log('No file provided.'); + process.exit(1); +} + +const rawdata = fs.readFileSync(filename); +let logs; +try { + logs = JSON.parse(rawdata); +} catch (e) { + console.log('Error while parsing CSV file.'); + process.exit(1); +} + +const csvRows = [ + [ + 'CorrelationID', + 'Logged At', + 'Message', + 'Step Name', + 'Base Balance', + 'Order Status', + 'Executed Qty', + 'New Quantity', + 'New Last Buy Price' + ] +]; + +logs.forEach(log => { + const csvRow = [ + _.get(log, 'data.correlationId', ''), + _.get(log, 'loggedAt', ''), + _.get(log, 'msg', ''), + _.get(log, 'data.stepName', ''), + _.get(log, 'data.data.baseAssetBalance.total', ''), + _.get(log, 'data.evt.status', ''), + _.get(log, 'data.evt.quantity', ''), + _.get(log, 'data.newQuantity', ''), + _.get(log, 'data.newLastBuyPrice', '') + ]; + + csvRows.push(csvRow); +}); + +let csvData = ''; +csvRows.forEach(csvRow => { + csvData += `${csvRow + .map(c => `"${c ? `${c}`.replace(/"/g, "'") : ''}"`) + .join(',')}\r\n`; +}); +fs.writeFileSync('result.csv', csvData);