Skip to content

Commit

Permalink
fix: queue concurrent execution (chrisleekr#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisleekr authored Dec 9, 2022
1 parent 5572574 commit 4c00664
Show file tree
Hide file tree
Showing 36 changed files with 238 additions and 67 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ docker-compose.server.yml
.git

tradingview
result.csv
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ log
/public/dist/*
!/public/dist/.gitkeep

result.csv
.~lock.result.csv#
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.

- Added API error message when using the wrong API key/secret - [#544](https://github.com/chrisleekr/binance-trading-bot/pull/544)
- Automatic login when password is autofilled by the browser by [@rando128](https://github.com/rando128) - [#550](https://github.com/chrisleekr/binance-trading-bot/pull/550)
- Fixed the queue concurrency issue - [#551](https://github.com/chrisleekr/binance-trading-bot/pull/551)

Thanks [@rando128](https://github.com/rando128) for your great contributions. 💯 :heart:

Expand Down
4 changes: 3 additions & 1 deletion app/binance/__tests__/candles.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ describe('candles.js', () => {
});

it('triggers queue.executeFor for ETHBTC', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'ETHBTC');
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'ETHBTC', {
correlationId: expect.any(String)
});
});
});
});
13 changes: 10 additions & 3 deletions app/binance/__tests__/tickers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,15 @@ describe('tickers.js', () => {
});

it('triggers queue.executeFor for BTCUSDT', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT');
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', {
correlationId: expect.any(String)
});
});

it('triggers queue.executeFor for BNBUSDT', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BNBUSDT');
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BNBUSDT', {
correlationId: expect.any(String)
});
});

it('checks websocketTickersClean', () => {
Expand Down Expand Up @@ -143,7 +147,10 @@ describe('tickers.js', () => {
it('triggers queue.executeFor for BTCUSDT', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(
loggerMock,
'BTCUSDT'
'BTCUSDT',
{
correlationId: expect.any(String)
}
);
});

Expand Down
6 changes: 4 additions & 2 deletions app/binance/__tests__/user.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ describe('user.js', () => {
it('triggers queue.executeFor', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(
loggerMock,
'ETHUSDT'
'ETHUSDT',
{ correlationId: expect.any(String) }
);
});

Expand Down Expand Up @@ -677,7 +678,8 @@ describe('user.js', () => {
it('triggers queue.executeFor', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(
loggerMock,
'ETHUSDT'
'ETHUSDT',
{ correlationId: expect.any(String) }
);
});

Expand Down
3 changes: 2 additions & 1 deletion app/binance/candles.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { v4: uuidv4 } = require('uuid');
const _ = require('lodash');
const queue = require('../cronjob/trailingTradeHelper/queue');
const { binance, mongo } = require('../helpers');
Expand Down Expand Up @@ -112,7 +113,7 @@ const syncCandles = async (logger, symbols) => {

await mongo.bulkWrite(logger, 'trailing-trade-candles', operations);

queue.executeFor(logger, symbol);
queue.executeFor(logger, symbol, { correlationId: uuidv4() });
};

return getCandles();
Expand Down
12 changes: 10 additions & 2 deletions app/binance/tickers.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { v4: uuidv4 } = require('uuid');
const _ = require('lodash');
const { binance, cache } = require('../helpers');
const queue = require('../cronjob/trailingTradeHelper/queue');
Expand Down Expand Up @@ -44,8 +45,15 @@ const setupTickersWebsocket = async (logger, symbols) => {
monitoringSymbol,
ticker => {
errorHandlerWrapper(logger, 'Tickers', async () => {
const correlationId = uuidv4();

const { eventType, eventTime, curDayClose: close, symbol } = ticker;

const symbolLogger = logger.child({
correlationId,
symbol
});

// Save latest candle for the symbol
await cache.hset(
'trailing-trade-symbols',
Expand All @@ -60,13 +68,13 @@ const setupTickersWebsocket = async (logger, symbols) => {

const canExecuteTrailingTrade = symbols.includes(monitoringSymbol);

logger.info(
symbolLogger.info(
{ ticker, canExecuteTrailingTrade },
'Received new ticker'
);

if (canExecuteTrailingTrade) {
queue.executeFor(logger, monitoringSymbol);
queue.executeFor(symbolLogger, monitoringSymbol, { correlationId });
}
});
}
Expand Down
18 changes: 9 additions & 9 deletions app/binance/user.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ const setupUserWebsocket = async logger => {
orderTime: transactTime // Transaction time
} = evt;

const correlationId = uuidv4();
const symbolLogger = logger.child({
jobName: 'trailingTrade',
correlationId: uuidv4(),
correlationId,
symbol
});

Expand All @@ -67,7 +67,7 @@ const setupUserWebsocket = async logger => {

const checkLastOrder = async () => {
const lastOrder = await getGridTradeLastOrder(
logger,
symbolLogger,
symbol,
side.toLowerCase()
);
Expand Down Expand Up @@ -103,7 +103,7 @@ const setupUserWebsocket = async logger => {
};

await updateGridTradeLastOrder(
logger,
symbolLogger,
symbol,
side.toLowerCase(),
updatedOrder
Expand All @@ -113,17 +113,17 @@ const setupUserWebsocket = async logger => {
`The last order has been updated. ${orderId} - ${side} - ${orderStatus}`
);

queue.executeFor(symbolLogger, symbol);
queue.executeFor(symbolLogger, symbol, { correlationId });
}
};

checkLastOrder();

const checkManualOrder = async () => {
const manualOrder = await getManualOrder(logger, symbol, orderId);
const manualOrder = await getManualOrder(symbolLogger, symbol, orderId);

if (_.isEmpty(manualOrder) === false) {
await saveManualOrder(logger, symbol, orderId, {
await saveManualOrder(symbolLogger, symbol, orderId, {
...manualOrder,
status: orderStatus,
type: orderType,
Expand All @@ -137,12 +137,12 @@ const setupUserWebsocket = async logger => {
updateTime: eventTime
});

logger.info(
symbolLogger.info(
{ symbol, manualOrder, saveLog: true },
'The manual order has been updated.'
);

queue.executeFor(logger, symbol);
queue.executeFor(symbolLogger, symbol, { correlationId });
}
};

Expand Down
4 changes: 2 additions & 2 deletions app/cronjob/trailingTrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ const {
} = require('./trailingTrade/steps');
const { errorHandlerWrapper } = require('../error-handler');

const execute = async (rawLogger, symbol) => {
const execute = async (rawLogger, symbol, correlationId = uuidv4()) => {
const logger = rawLogger.child({
jobName: 'trailingTrade',
correlationId: uuidv4(),
correlationId,
symbol
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ const execute = async (logger, rawData) => {
}
}

// Ensure buy order executed
// Ensure sell order executed
const lastSellOrder = await getGridTradeLastOrder(logger, symbol, 'sell');
if (_.isEmpty(lastSellOrder) === false) {
logger.info({ lastSellOrder }, 'Last grid trade sell order found');
Expand Down
20 changes: 14 additions & 6 deletions app/cronjob/trailingTradeHelper/__tests__/queue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ describe('queue', () => {
jest.mock('config');

mockQueueProcess = jest.fn().mockImplementation((_concurrent, cb) => {
const job = jest.fn();
const job = {
data: { correlationId: 'correlationId' },
progress: jest.fn()
};
cb(job);
});

Expand Down Expand Up @@ -53,30 +56,35 @@ describe('queue', () => {
it('triggers new Queue for BTCUSDT', () => {
expect(mockQueue).toHaveBeenCalledWith('BTCUSDT', expect.any(String), {
prefix: `bull`,
settings: {
guardInterval: 1000
limiter: {
max: 1,
duration: 10000,
bounceBack: true
}
});
});

it('triggers executeTrailingTrade for BTCUSDT', () => {
expect(mockExecuteTrailingTrade).toHaveBeenCalledWith(
logger,
'BTCUSDT'
'BTCUSDT',
'correlationId'
);
});

it('triggers executeTrailingTrade for ETHUSDT', () => {
expect(mockExecuteTrailingTrade).toHaveBeenCalledWith(
logger,
'ETHUSDT'
'ETHUSDT',
'correlationId'
);
});

it('triggers executeTrailingTrade for BNBUSDT', () => {
expect(mockExecuteTrailingTrade).toHaveBeenCalledWith(
logger,
'BNBUSDT'
'BNBUSDT',
'correlationId'
);
});

Expand Down
28 changes: 18 additions & 10 deletions app/cronjob/trailingTradeHelper/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,23 @@ const create = (funcLogger, symbol) => {

const queue = new Queue(symbol, REDIS_URL, {
prefix: `bull`,
settings: {
guardInterval: 1000 // Poll interval for delayed jobs and added jobs.
limiter: {
max: 1,
duration: 10000, // 10 seconds
// bounceBack: When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
bounceBack: true
}
});
// Set concurrent for the job
queue.process(1, async _job => executeTrailingTrade(logger, symbol));
queue.process(1, async job => {
await executeTrailingTrade(
logger,
symbol,
_.get(job.data, 'correlationId')
);

job.progress(100);
});

return queue;
};
Expand All @@ -46,20 +57,17 @@ const init = async (funcLogger, symbols) => {
* @param {*} funcLogger
* @param {*} symbol
*/
const executeFor = async (funcLogger, symbol) => {
const executeFor = async (funcLogger, symbol, jobData = {}) => {
const logger = funcLogger.child({ helper: 'queue' });

if (!(symbol in queues)) {
logger.error({ symbol }, `No queue created for ${symbol}`);
return;
}

await queues[symbol].add(
{},
{
removeOnComplete: 100 // number specified the amount of jobs to keep.
}
);
await queues[symbol].add(jobData, {
removeOnComplete: 100 // number specified the amount of jobs to keep.
});
};

module.exports = {
Expand Down
4 changes: 3 additions & 1 deletion app/frontend/webserver/handlers/auth.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { v4: uuidv4 } = require('uuid');
const _ = require('lodash');
const bcrypt = require('bcryptjs');
const config = require('config');
Expand Down Expand Up @@ -43,9 +44,10 @@ const generateToken = async logger => {
};

const handleAuth = async (funcLogger, app, { loginLimiter }) => {
const logger = funcLogger.child({ endpoint: '/auth' });
const handlerLogger = funcLogger.child({ endpoint: '/auth' });

app.route('/auth').post(async (req, res) => {
const logger = handlerLogger.child({ correlationId: uuidv4() });
const { password: requestedPassword } = req.body;
const clientIp = requestIp.getClientIp(req);

Expand Down
3 changes: 2 additions & 1 deletion app/frontend/websocket/configure.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const { v4: uuidv4 } = require('uuid');
const WebSocket = require('ws');
const config = require('config');

Expand Down Expand Up @@ -78,7 +79,7 @@ const configureWebSocket = async (server, funcLogger, { loginLimiter }) => {
return;
}

const commandLogger = logger.child({ payload });
const commandLogger = logger.child({ payload, correlationId: uuidv4() });

const commandMaps = {
latest: handleLatest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ describe('cancel-order.js', () => {
const { logger } = require('../../../../helpers');

loggerMock = logger;
loggerMock.fields = { correlationId: 'correlationId' };

const { handleCancelOrder } = require('../cancel-order');
await handleCancelOrder(loggerMock, mockWebSocketServer, {
Expand Down Expand Up @@ -63,7 +64,9 @@ describe('cancel-order.js', () => {
});

it('triggers queue.executeFor', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT');
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', {
correlationId: 'correlationId'
});
});

it('triggers ws.send', () => {
Expand Down
Loading

0 comments on commit 4c00664

Please sign in to comment.