Skip to content

Commit

Permalink
AutoQueue Parallelism (subquery#2000)
Browse files Browse the repository at this point in the history
* Fix parallelism with auto-queue and utilise to speed up single threaded performance

* Fix auto queue flushing

* Tidy up

* Improve error handling and add timeout to autoqueue

* Update changelog

* Bring back smart batch size funcitonality. Make AutoQueue parallelism changable
  • Loading branch information
stwiname authored Sep 12, 2023
1 parent 593fa60 commit 97d3903
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 88 deletions.
1 change: 1 addition & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Move more code from node to node-core. Including configure module, workers (#1797)
- Update api service generics to support multiple block types (#1968)
- UnfinalizedBlocksService: make private methods protected to allow custom fork detection (#2009)
- Update fetching blocks to use moving window rather than batches (#2000)

### Added
- Project upgrades feature and many other changes to support it (#1797)
Expand Down
129 changes: 65 additions & 64 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export abstract class BlockDispatcher<B, DS>
extends BaseBlockDispatcher<Queue<number>, DS>
implements OnApplicationShutdown
{
private fetchQueue: AutoQueue<B>;
private processQueue: AutoQueue<void>;

private fetchBlocksBatches: BatchBlockFetcher<B>;
Expand Down Expand Up @@ -67,6 +68,7 @@ export abstract class BlockDispatcher<B, DS>
dynamicDsService
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3);
this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize);

if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches');
Expand Down Expand Up @@ -97,6 +99,7 @@ export abstract class BlockDispatcher<B, DS>

flushQueue(height: number): void {
super.flushQueue(height);
this.fetchQueue.flush();
this.processQueue.flush();
}

Expand All @@ -106,93 +109,91 @@ export abstract class BlockDispatcher<B, DS>

private async fetchBlocksFromQueue(): Promise<void> {
if (this.fetching || this.isShutdown) return;
// Process queue is full, no point in fetching more blocks
// if (this.processQueue.freeSpace < this.nodeConfig.batchSize) return;

this.fetching = true;

try {
while (!this.isShutdown) {
// We know processQueue will have freeSpace defined because we define a capacity
// Wait for blocks or capacity to fetch blocks
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const blockNums = this.queue.takeMany(Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace!));
// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

// Queue is empty
if (!blockNums.length) {
// The process queue might be full so no block nums were taken, wait and try again
if (this.queue.size) {
await delay(1);
continue;
}
break;
}

if (this.memoryleft() < 0) {
//stop fetching until memory is freed
await waitForBatchSize(this.minimumHeapLimit);
if (!this.queue.size || !this.fetchQueue.freeSpace!) {
await delay(1);
continue;
}

logger.info(
`fetch block [${blockNums[0]},${blockNums[blockNums.length - 1]}], total ${blockNums.length} blocks`
);

// If specVersion not changed, a known overallSpecVer will be pass in
// Otherwise use api to fetch runtimes
const blockNum = this.queue.take();

if (memoryLock.isLocked()) {
await memoryLock.waitForUnlock();
// This shouldn't happen but if it does it whould get caught above
if (!blockNum) {
continue;
}

const blocks = await this.fetchBlocksBatches(blockNums);

this.smartBatchService.addToSizeBuffer(blocks);
// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

// Check if the queues have been flushed between queue.takeMany and fetchBlocksBatches resolving
// Peeking the queue is because the latestBufferedHeight could have regrown since fetching block
const peeked = this.queue.peek();
if (bufferedHeight > this._latestBufferedHeight || (peeked && peeked < Math.min(...blockNums))) {
logger.info(`Queue was reset for new DS, discarding fetched blocks`);
continue;
if (this.memoryleft() < 0) {
//stop fetching until memory is freed
await waitForBatchSize(this.minimumHeapLimit);
}

const blockTasks = blocks.map((block) => async () => {
const height = this.getBlockHeight(block);
try {
await this.preProcessBlock(height);
// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexBlock(block);

await this.postProcessBlock(height, processBlockResponse);

//set block to null for garbage collection
(block as any) = null;
} catch (e: any) {
// TODO discard any cache changes from this block height
if (this.isShutdown) {
return;
void this.fetchQueue
.put(async () => {
if (memoryLock.isLocked()) {
await memoryLock.waitForUnlock();
}
logger.error(
e,
`failed to index block at height ${height} ${e.handler ? `${e.handler}(${e.stack ?? ''})` : ''}`
);
throw e;
}
});
const [block] = await this.fetchBlocksBatches([blockNum]);

// There can be enough of a delay after fetching blocks that shutdown could now be true
if (this.isShutdown) break;

this.processQueue.putMany(blockTasks);
this.smartBatchService.addToSizeBuffer([block]);
return block;
})
.catch((e) => {
logger.error(e, `Failed to fetch block ${blockNum}.`);
throw e;
})
.then((block) => {
const height = this.getBlockHeight(block);

return this.processQueue.put(async () => {
// Check if the queues have been flushed between queue.takeMany and fetchBlocksBatches resolving
// Peeking the queue is because the latestBufferedHeight could have regrown since fetching block
const peeked = this.queue.peek();
if (bufferedHeight > this._latestBufferedHeight || (peeked && peeked < blockNum)) {
logger.info(`Queue was reset for new DS, discarding fetched blocks`);
return;
}

try {
await this.preProcessBlock(height);
// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexBlock(block);

await this.postProcessBlock(height, processBlockResponse);

//set block to null for garbage collection
(block as any) = null;
} catch (e: any) {
// TODO discard any cache changes from this block height
if (this.isShutdown) {
return;
}
logger.error(
e,
`Failed to index block at height ${height} ${e.handler ? `${e.handler}(${e.stack ?? ''})` : ''}`
);
throw e;
}
});
})
.catch((e) => {
process.exit(1);
});

this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
value: this.processQueue.size,
});
}
} catch (e: any) {
logger.error(e, 'Failed to fetch blocks from queue');
logger.error(e, 'Failed to process blocks from queue');
if (!this.isShutdown) {
process.exit(1);
}
Expand Down
121 changes: 121 additions & 0 deletions packages/node-core/src/utils/autoQueue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {AutoQueue} from './autoQueue';

describe('AutoQueue', () => {
it('resovles promises in the order they are pushed', async () => {
const autoQueue = new AutoQueue<number>(10, 5);
const results: number[] = [];

const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map((v) => async () => {
const randomTime = Math.floor(Math.random() * 1000);
await new Promise((resolve) => setTimeout(resolve, randomTime));

return v;
});

await Promise.all(
tasks.map(async (t) => {
const r = await autoQueue.put(t);

results.push(r);
})
);

expect(results).toEqual([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('doesnt resolve tasks if flush is called', async () => {
const autoQueue = new AutoQueue<number>(10, 2);
const results: number[] = [];

const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map((v) => async () => {
await new Promise((resolve) => setTimeout(resolve, 100));

return v;
});

tasks.map((t) => {
void autoQueue.put(t).then((r) => {
results.push(r);
});
});

// Wait for some tasks to complete
await new Promise((resolve) => setTimeout(resolve, 110));

autoQueue.flush();

expect(autoQueue.size).toBe(0);

// Wait for any other tasks to be completed
await new Promise((resolve) => setTimeout(resolve, 1000));

expect(results).toEqual([1, 2]);
});

it('has a cap on the number of out of order tasks', async () => {
const autoQueue = new AutoQueue<number>(10, 2, 0.2);
const results: number[] = [];

const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map((v) => async () => {
// Set a large timeout for the first task to simulate blocking
const randomTime = v === 1 ? 400 : 100;
await new Promise((resolve) => setTimeout(resolve, randomTime));

return v;
});

await expect(
Promise.all(
tasks.map(async (t) => {
const r = await autoQueue.put(t);

results.push(r);
})
)
).rejects.toEqual(new Error('timeout'));
});

it('can adjust the parallelism while running', async () => {
const autoQueue = new AutoQueue<number>(10, 2);
const results: number[] = [];

const tasks = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map((v) => async () => {
await new Promise((resolve) => setTimeout(resolve, 100));

return v;
});

const start1 = new Date();
await Promise.all(
tasks.map(async (t) => {
const r = await autoQueue.put(t);
results.push(r);
})
);

const end1 = new Date();
expect(end1.getTime() - start1.getTime()).toBeGreaterThanOrEqual(500);

// Update the concurrency, the next batch should complete much quicker
autoQueue.concurrency = 5;

const tasks2 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20].map((v) => async () => {
await new Promise((resolve) => setTimeout(resolve, 100));

return v;
});

const start2 = new Date();
await Promise.all(
tasks2.map(async (t) => {
const r = await autoQueue.put(t);
results.push(r);
})
);
const end2 = new Date();
expect(end2.getTime() - start2.getTime()).toBeLessThanOrEqual(300);
});
});
Loading

0 comments on commit 97d3903

Please sign in to comment.