Skip to content

Commit

Permalink
fix: improved queue processing (chrisleekr#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
uhliksk authored Jan 11, 2023
1 parent b12aa5c commit 98b756e
Show file tree
Hide file tree
Showing 44 changed files with 3,750 additions and 3,400 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"MACD",
"mrkdwn",
"tradingview",
"tulind"
"tulind",
"uuidv"
],
"eslint.format.enable": true,
"eslint.codeActionsOnSave.mode": "all"
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ All notable changes to this project will be documented in this file.
- Fixed sorting symbols open trades first by [@uhliksk](https://github.com/uhliksk) - [#564](https://github.com/chrisleekr/binance-trading-bot/pull/564)
- Fixed the issue that cannot export huge logs - [#561](https://github.com/chrisleekr/binance-trading-bot/pull/561), [#567](https://github.com/chrisleekr/binance-trading-bot/pull/567)
- Fixed the balance calculation to include dust balances by [@uhliksk](https://github.com/uhliksk) - [#571](https://github.com/chrisleekr/binance-trading-bot/pull/571)
- Fixed the open orders to be cancelled when the current price is higher/lower than the order price by [@uhliksk](https://github.com/uhliksk) - [#569](https://github.com/chrisleekr/binance-trading-bot/pull/569)
- Fixed the open orders to be cancelled when the current price is higher/lower than the order price by [@uhliksk](https://github.com/uhliksk) - [#569](https://github.com/chrisleekr/binance-trading-bot/pull/569)
- Improved queue processing by replacing Bull queue to customised queue system by [@uhliksk](https://github.com/uhliksk) - [#562](https://github.com/chrisleekr/binance-trading-bot/pull/562)

Thanks [@uhliksk](https://github.com/uhliksk) for your great contributions. 💯 :heart:

Expand Down
21 changes: 15 additions & 6 deletions app/__tests__/server-binance.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('server-binance', () => {
};
mockQueue = {
init: jest.fn().mockResolvedValue(true),
executeFor: jest.fn().mockResolvedValue(true)
execute: jest.fn().mockResolvedValue(true)
};
mockSlack = {
sendMessage: jest.fn().mockResolvedValue(true)
Expand Down Expand Up @@ -461,23 +461,32 @@ describe('server-binance', () => {
await runBinance(logger);
});

it('does not trigger queue.executeFor', () => {
expect(mockQueue.executeFor).not.toHaveBeenCalled();
it('does not trigger queue.execute', () => {
expect(mockQueue.execute).not.toHaveBeenCalled();
});
});

describe('when open orders not empty', () => {
beforeEach(async () => {
mockCache.hgetall = jest.fn().mockResolvedValue({
BTCUSDT: [{ orderId: 1, symbol: 'BTCUSDT' }]
BTCUSDT: [{ orderId: 1, symbol: 'BTCUSDT' }],
LTCUSDT: [{ orderId: 1, symbol: 'LTCUSDT' }]
});

const { runBinance } = require('../server-binance');
await runBinance(logger);
});

it('triggers queue.executeFor', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(logger, 'BTCUSDT');
it('triggers queue.execute twice', () => {
expect(mockQueue.execute).toHaveBeenCalledTimes(2);
});

it('triggers queue.execute for BTCUSDT', () => {
expect(mockQueue.execute).toHaveBeenCalledWith(logger, 'BTCUSDT');
});

it('triggers queue.execute for LTCUSDT', () => {
expect(mockQueue.execute).toHaveBeenCalledWith(logger, 'LTCUSDT');
});
});
});
Expand Down
13 changes: 0 additions & 13 deletions app/__tests__/server-frontend.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ describe('server-frontend', () => {
let mockConfigureWebServer;
let mockConfigureWebSocket;
let mockConfigureLocalTunnel;
let mockConfigureBullBoard;

let mockRateLimiterRedisGet;
let mockRateLimiterRedis;
Expand Down Expand Up @@ -69,7 +68,6 @@ describe('server-frontend', () => {
mockConfigureWebServer = jest.fn().mockReturnValue(true);
mockConfigureWebSocket = jest.fn().mockReturnValue(true);
mockConfigureLocalTunnel = jest.fn().mockReturnValue(true);
mockConfigureBullBoard = jest.fn().mockReturnValue(true);

mockExpressStatic = jest.fn().mockReturnValue(true);

Expand Down Expand Up @@ -150,10 +148,6 @@ describe('server-frontend', () => {
jest.mock('../frontend/local-tunnel/configure', () => ({
configureLocalTunnel: mockConfigureLocalTunnel
}));

jest.mock('../frontend/bull-board/configure', () => ({
configureBullBoard: mockConfigureBullBoard
}));
});

describe('web server', () => {
Expand Down Expand Up @@ -197,13 +191,6 @@ describe('server-frontend', () => {
});
});

it('triggers configureBullBoard', () => {
expect(mockConfigureBullBoard).toHaveBeenCalledWith(
expect.any(Object),
loggerMock
);
});

it('triggers server.listen', () => {
expect(mockExpressListen).toHaveBeenCalledWith(80);
});
Expand Down
20 changes: 12 additions & 8 deletions app/binance/__tests__/candles.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ describe('candles.js', () => {
let loggerMock;
let mongoMock;

let mockQueue;
let mockExecute;

let mockGetConfiguration;
let mockSaveCandle;
Expand Down Expand Up @@ -55,11 +55,14 @@ describe('candles.js', () => {
return mockWebsocketCandlesClean;
});

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
};
mockExecute = jest.fn((funcLogger, symbol, jobPayload) => {
if (!funcLogger || !symbol || !jobPayload) return false;
return jobPayload.preprocessFn();
});

jest.mock('../../cronjob/trailingTradeHelper/queue', () => mockQueue);
jest.mock('../../cronjob/trailingTradeHelper/queue', () => ({
execute: mockExecute
}));

jest.mock('../../cronjob/trailingTradeHelper/common', () => ({
saveCandle: mockSaveCandle
Expand Down Expand Up @@ -235,9 +238,10 @@ describe('candles.js', () => {
);
});

it('triggers queue.executeFor for ETHBTC', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'ETHBTC', {
correlationId: expect.any(String)
it('triggers queue.execute for ETHBTC', () => {
expect(mockExecute).toHaveBeenCalledWith(loggerMock, 'ETHBTC', {
correlationId: expect.any(String),
preprocessFn: expect.any(Function)
});
});
});
Expand Down
50 changes: 26 additions & 24 deletions app/binance/__tests__/tickers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ describe('tickers.js', () => {
let binanceMock;
let loggerMock;
let cacheMock;
let mockQueue;
let mockExecute;

let mockGetAccountInfo;
let mockGetCachedExchangeSymbols;
Expand Down Expand Up @@ -33,11 +33,14 @@ describe('tickers.js', () => {
loggerMock = logger;
cacheMock = cache;

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
};
mockExecute = jest.fn((funcLogger, symbol, jobPayload) => {
if (!funcLogger || !symbol || !jobPayload) return false;
return jobPayload.preprocessFn();
});

jest.mock('../../cronjob/trailingTradeHelper/queue', () => mockQueue);
jest.mock('../../cronjob/trailingTradeHelper/queue', () => ({
execute: mockExecute
}));

mockGetAccountInfo = jest.fn().mockResolvedValue({
balances: [
Expand Down Expand Up @@ -105,19 +108,21 @@ describe('tickers.js', () => {
);
});

it('triggers queue.executeFor twice', () => {
expect(mockQueue.executeFor).toHaveBeenCalledTimes(2);
it('triggers queue.execute 2 times', () => {
expect(mockExecute).toHaveBeenCalledTimes(2);
});

it('triggers queue.executeFor for BTCUSDT', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', {
correlationId: expect.any(String)
it('triggers queue.execute for BTCUSDT', () => {
expect(mockExecute).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', {
correlationId: expect.any(String),
preprocessFn: expect.any(Function)
});
});

it('triggers queue.executeFor for BNBUSDT', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(loggerMock, 'BNBUSDT', {
correlationId: expect.any(String)
it('triggers queue.execute for BNBUSDT', () => {
expect(mockExecute).toHaveBeenCalledWith(loggerMock, 'BNBUSDT', {
correlationId: expect.any(String),
preprocessFn: expect.any(Function)
});
});

Expand All @@ -136,22 +141,19 @@ describe('tickers.js', () => {
describe('when called again', () => {
beforeEach(async () => {
// Reset mock counter
mockQueue.executeFor.mockClear();
mockExecute.mockClear();
await tickers.setupTickersWebsocket(loggerMock, ['BTCUSDT']);
});

it('triggers queue.executeFor twice', () => {
expect(mockQueue.executeFor).toHaveBeenCalledTimes(1);
it('triggers quque.execute', () => {
expect(mockExecute).toHaveBeenCalledTimes(1);
});

it('triggers queue.executeFor for BTCUSDT', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(
loggerMock,
'BTCUSDT',
{
correlationId: expect.any(String)
}
);
it('triggers queue.execute for BTCUSDT', () => {
expect(mockExecute).toHaveBeenCalledWith(loggerMock, 'BTCUSDT', {
correlationId: expect.any(String),
preprocessFn: expect.any(Function)
});
});

it('triggers websocketTickersClean', () => {
Expand Down
47 changes: 24 additions & 23 deletions app/binance/__tests__/user.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
describe('user.js', () => {
let binanceMock;
let loggerMock;
let mockQueue;
let mockExecute;

let mockGetAccountInfoFromAPI;
let mockUpdateAccountInfo;
Expand All @@ -24,11 +24,14 @@ describe('user.js', () => {
binanceMock = binance;
loggerMock = logger;

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
};
mockExecute = jest.fn((funcLogger, symbol, jobPayload) => {
if (!funcLogger || !symbol || !jobPayload) return false;
return jobPayload.preprocessFn();
});

jest.mock('../../cronjob/trailingTradeHelper/queue', () => mockQueue);
jest.mock('../../cronjob/trailingTradeHelper/queue', () => ({
execute: mockExecute
}));
});

describe('when balanceUpdate event received', () => {
Expand Down Expand Up @@ -234,8 +237,8 @@ describe('user.js', () => {
expect(mockUpdateGridTradeLastOrder).not.toHaveBeenCalled();
});

it('does not trigger queue.executeFor', () => {
expect(mockQueue.executeFor).not.toHaveBeenCalled();
it('triggers queue.execute twice', () => {
expect(mockExecute).toHaveBeenCalledTimes(2);
});

it('does not trigger userClean', () => {
Expand Down Expand Up @@ -356,12 +359,11 @@ describe('user.js', () => {
);
});

it('triggers queue.executeFor', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(
loggerMock,
'ETHUSDT',
{ correlationId: expect.any(String) }
);
it('triggers queue.execute', () => {
expect(mockExecute).toHaveBeenCalledWith(loggerMock, 'ETHUSDT', {
correlationId: expect.any(String),
preprocessFn: expect.any(Function)
});
});

it('does not trigger userClean', () => {
Expand Down Expand Up @@ -462,8 +464,8 @@ describe('user.js', () => {
expect(mockUpdateGridTradeLastOrder).not.toHaveBeenCalled();
});

it('does not trigger queue.executeFor', () => {
expect(mockQueue.executeFor).not.toHaveBeenCalled();
it('triggers queue.execute twice', () => {
expect(mockExecute).toHaveBeenCalledTimes(2);
});

it('does not trigger userClean', () => {
Expand Down Expand Up @@ -560,8 +562,8 @@ describe('user.js', () => {
expect(mockSaveManualOrder).not.toHaveBeenCalled();
});

it('does not trigger queue.executeFor', () => {
expect(mockQueue.executeFor).not.toHaveBeenCalled();
it('triggers queue.execute twice', () => {
expect(mockExecute).toHaveBeenCalledTimes(2);
});

it('does not trigger userClean', () => {
Expand Down Expand Up @@ -675,12 +677,11 @@ describe('user.js', () => {
);
});

it('triggers queue.executeFor', () => {
expect(mockQueue.executeFor).toHaveBeenCalledWith(
loggerMock,
'ETHUSDT',
{ correlationId: expect.any(String) }
);
it('triggers queue.execute', () => {
expect(mockExecute).toHaveBeenCalledWith(loggerMock, 'ETHUSDT', {
correlationId: expect.any(String),
preprocessFn: expect.any(Function)
});
});

it('does not trigger userClean', () => {
Expand Down
33 changes: 17 additions & 16 deletions app/binance/candles.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,23 @@ const setupCandlesWebsocket = async (logger, symbols) => {
const syncCandles = async (logger, symbols) => {
await Promise.all(
symbols.map(async symbol => {
await mongo.deleteAll(logger, 'trailing-trade-candles', {
key: symbol
});
const getCandles = async () => {
await mongo.deleteAll(logger, 'trailing-trade-candles', {
key: symbol
});

const symbolConfiguration = await getConfiguration(logger, symbol);
const symbolConfiguration = await getConfiguration(logger, symbol);

const {
candles: { interval, limit }
} = symbolConfiguration;
const {
candles: { interval, limit }
} = symbolConfiguration;

// Retrieve candles
logger.info(
{ debug: true, function: 'candles', interval, limit },
`Retrieving candles from API for ${symbol}`
);
// Retrieve candles
logger.info(
{ debug: true, function: 'candles', interval, limit },
`Retrieving candles from API for ${symbol}`
);

const getCandles = async () => {
const candles = await binance.client.candles({
symbol,
interval,
Expand Down Expand Up @@ -112,11 +112,12 @@ const syncCandles = async (logger, symbols) => {
}));

await mongo.bulkWrite(logger, 'trailing-trade-candles', operations);

queue.executeFor(logger, symbol, { correlationId: uuidv4() });
};

return getCandles();
queue.execute(logger, symbol, {
correlationId: uuidv4(),
preprocessFn: getCandles
});
})
);
};
Expand Down
Loading

0 comments on commit 98b756e

Please sign in to comment.