Skip to content

Commit

Permalink
work call interval improved, fixes thomasdondorf#13
Browse files Browse the repository at this point in the history
Setting sameDomainDelay could lead to a
high CPU usage due to some bad
work() calls
Also had to remove fake timers from the tests
  • Loading branch information
thomasdondorf committed Sep 7, 2018
1 parent b82f361 commit 17ffc3d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 39 deletions.
38 changes: 26 additions & 12 deletions src/Cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export type TaskFunction = (arg: TaskFunctionArguments) => Promise<void>;

const MONITORING_DISPLAY_INTERVAL = 500;
const CHECK_FOR_WORK_INTERVAL = 100;
const WORK_CALL_INTERVAL_LIMIT = 10;

export default class Cluster extends EventEmitter {

Expand Down Expand Up @@ -192,30 +193,41 @@ export default class Cluster extends EventEmitter {
this.taskFunction = taskFunction;
}

private calledForWork: boolean = false;
private nextWorkCall: number = 0;
private workCallTimeout: NodeJS.Timer|null = null;

// check for new work soon (wait if there will be put more data into the queue, first)
private async work() {
// make sure, there is only one setImmediate call waiting
if (!this.calledForWork) {
this.calledForWork = true;
setImmediate(() => this.doWork());
// make sure, we only call work once every WORK_CALL_INTERVAL_LIMIT (currently: 10ms)
if (this.workCallTimeout === null) {
const now = Date.now();

// calculate when the next work call should happen
this.nextWorkCall = Math.max(
this.nextWorkCall + WORK_CALL_INTERVAL_LIMIT,
now,
);
const timeUntilNextWorkCall = this.nextWorkCall - now;

this.workCallTimeout = setTimeout(
() => {
this.workCallTimeout = null;
this.doWork();
},
timeUntilNextWorkCall,
);
}
}

private async doWork() {
this.calledForWork = false;

// no jobs available
if (this.jobQueue.size() === 0) {
if (this.jobQueue.size() === 0) { // no jobs available
if (this.workersBusy.length === 0) {
this.idleResolvers.forEach(resolve => resolve());
}
return;
}

// no workers available
if (this.workersAvail.length === 0) {
if (this.workersAvail.length === 0) { // no workers available
if (this.allowedToStartWorker()) {
await this.launchWorker();
this.work();
Expand All @@ -227,13 +239,13 @@ export default class Cluster extends EventEmitter {

if (job === undefined) {
// skip, there are items in the queue but they are all delayed
this.work();
return;
}

const url = job.getUrl();
const domain = job.getDomain();

// Check if URL was already crawled (on skipDuplicateUrls)
if (this.options.skipDuplicateUrls
&& url !== undefined && this.duplicateCheckUrls.has(url)) {
// already crawled, just ignore
Expand All @@ -242,6 +254,7 @@ export default class Cluster extends EventEmitter {
return;
}

// Check if the job needs to be delayed due to sameDomainDelay
if (this.options.sameDomainDelay !== 0 && domain !== undefined) {
const lastDomainAccess = this.lastDomainAccesses.get(domain);
if (lastDomainAccess !== undefined
Expand Down Expand Up @@ -356,6 +369,7 @@ export default class Cluster extends EventEmitter {
this.isClosed = true;

clearInterval(this.checkForWorkInterval as NodeJS.Timer);
clearTimeout(this.workCallTimeout as NodeJS.Timer);

// close workers
await Promise.all(this.workers.map(worker => worker.close()));
Expand Down
44 changes: 17 additions & 27 deletions test/Cluster.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Cluster from '../src/Cluster';
import * as http from 'http';
import { timeoutExecute } from '../src/util';

let testServer;

Expand Down Expand Up @@ -174,6 +175,7 @@ describe('options', () => {
});

test('retryDelay = 0', async () => {
expect.assertions(2);
const cluster = await Cluster.launch({
concurrency,
puppeteerOptions: { args: ['--no-sandbox'] },
Expand All @@ -190,34 +192,28 @@ describe('options', () => {
}
});

jest.useFakeTimers();
cluster.queue(ERROR_URL);

const url1 = await cluster.waitForOne();
expect(url1).toBe(ERROR_URL);

jest.advanceTimersByTime(10000);
cluster.queue(TEST_URL);

const url2 = await cluster.waitForOne();
expect(url2).toBe(ERROR_URL);

jest.advanceTimersByTime(100000);
const url3 = await cluster.waitForOne();
expect(url3).toBe(TEST_URL);
await timeoutExecute(200, (async () => {
const url2 = await cluster.waitForOne();
expect(url2).toBe(ERROR_URL);
})());

await cluster.close();

jest.useRealTimers();
});

test('retryDelay > 0', async () => {
expect.assertions(3);

const cluster = await Cluster.launch({
concurrency,
puppeteerOptions: { args: ['--no-sandbox'] },
maxConcurrency: 1,
retryLimit: 1,
retryDelay: 100000,
retryDelay: 250,
});

const ERROR_URL = 'http://example.com/we-are-never-visited-the-page';
Expand All @@ -228,30 +224,26 @@ describe('options', () => {
}
});

jest.useFakeTimers();
cluster.queue(ERROR_URL);

const url1 = await cluster.waitForOne();
expect(url1).toBe(ERROR_URL);

// forward, but no jobs sould be executed
jest.advanceTimersByTime(10000);
cluster.queue(TEST_URL);
try {
await timeoutExecute(200, (async () => {
await cluster.waitForOne(); // should time out!
})());
} catch (err) {
expect(err.message).toMatch(/Timeout/);
}

const url2 = await cluster.waitForOne();
expect(url2).toBe(TEST_URL);

jest.advanceTimersByTime(100000);
const url3 = await cluster.waitForOne();
expect(url3).toBe(ERROR_URL);
expect(url2).toBe(ERROR_URL);

await cluster.close();

jest.useRealTimers();
});

test('sameDomainDelay with one worker', async () => {
jest.useRealTimers();
const cluster = await Cluster.launch({
concurrency,
puppeteerOptions: { args: ['--no-sandbox'] },
Expand Down Expand Up @@ -282,8 +274,6 @@ describe('options', () => {
});

test('sameDomainDelay with multiple workers', async () => {
jest.useRealTimers();

const cluster = await Cluster.launch({
concurrency,
puppeteerOptions: { args: ['--no-sandbox'] },
Expand Down

0 comments on commit 17ffc3d

Please sign in to comment.