Skip to content

Commit

Permalink
Catch exceptions thrown from wrapped function.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjrv committed May 4, 2016
1 parent 7e0048e commit bcc3b2b
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 47 deletions.
46 changes: 37 additions & 9 deletions src/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,52 @@ export interface Promisy<PromiseType> {

export interface PromisyClass<PromiseType> {
new(handler: any): PromiseType;

resolve(result: any): PromiseType;
reject(err: any): PromiseType;
}

/** Call func and return a promise for its result.
* Optionally call given resolve or reject handler when the promise settles. */

export function tryFinally<PromiseType extends Promisy<PromiseType>>(
func: () => PromiseType,
onFinish: () => void,
Promise: PromisyClass<PromiseType>,
resolve?: (result: any) => void,
reject?: (err: any) => void
) {
try {
var promise = func();

// Ensure func return value is a promise.
if(typeof(promise.then) != 'function') {
promise = Promise.resolve(promise);
}
} catch(err) {
// If func threw an error, return a rejected promise.
promise = Promise.reject(err);
}

promise.then(onFinish, onFinish);
if(resolve) promise.then(resolve, reject);

return(promise);
}

/** Task wraps a promise, delaying it until some resource gets less busy. */

export class Task<PromiseType extends Promisy<PromiseType>> {
constructor(func: () => PromiseType) {
constructor(func: () => PromiseType, Promise: PromisyClass<PromiseType>) {
this.func = func;
this.Promise = Promise;
}

/** Wrap task result in a new promise so it can be resolved later. */

delay(Promise: PromisyClass<PromiseType>) {
delay() {
if(!this.promise) {
this.promise = new Promise((resolve: any, reject: any) => {
this.promise = new this.Promise((resolve: any, reject: any) => {
this.resolve = resolve;
this.reject = reject;
});
Expand All @@ -36,15 +68,11 @@ export class Task<PromiseType extends Promisy<PromiseType>> {
/** Start the task and call onFinish when done. */

resume(onFinish: () => void) {
var result = this.func();

result.then(onFinish, onFinish);
if(this.resolve) result.then(this.resolve, this.reject);

return(result);
return(tryFinally(this.func, onFinish, this.Promise, this.resolve, this.reject));
}

private func: () => PromiseType;
private Promise: PromisyClass<PromiseType>;

private promise: PromiseType;
private resolve: (result: any) => void;
Expand Down
12 changes: 4 additions & 8 deletions src/TaskQueue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// This file is part of cwait, copyright (c) 2015-2016 BusFaster Ltd.
// Released under the MIT license, see LICENSE.

import {Task, Promisy, PromisyClass} from './Task'
import {Task, Promisy, PromisyClass, tryFinally} from './Task'

export class TaskQueue<PromiseType extends Promisy<PromiseType>> {
constructor(Promise: PromisyClass<PromiseType>, concurrency: number) {
Expand All @@ -19,20 +19,16 @@ export class TaskQueue<PromiseType extends Promisy<PromiseType>> {

++this.busyCount;

var result = func();

result.then(this.nextBound, this.nextBound);

return(result);
return(tryFinally(func, this.nextBound, this.Promise));
} else {
// Schedule the task and return a promise resolving
// to the result of task.start().

var task = new Task(func);
var task = new Task(func, this.Promise);

this.backlog.push(task);

return(task.delay(this.Promise));
return(task.delay());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/cwait.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// This file is part of cwait, copyright (c) 2015-2016 BusFaster Ltd.
// Released under the MIT license, see LICENSE.

export {Task, Promisy} from './Task';
export {Task, Promisy, PromisyClass} from './Task';
export {TaskQueue} from './TaskQueue';
6 changes: 3 additions & 3 deletions test/bluebird-shim.d.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
declare module 'bluebird' {
import {TaskQueue, Promisy} from 'cwait';
import {Promisy, PromisyClass} from 'cwait';

var _Promise: { new(): Promisy<any> };
var _Promise: PromisyClass<Promisy<any>>;

class Promise<ReturnType> extends _Promise implements Promisy<Promise<ReturnType>> {
constructor(handler: (resolve: any, reject: any) => any);

then<ReturnType>(handler: () => ReturnType): Promise<ReturnType>;
then<ReturnType>(resolved: (result?: any) => ReturnType, rejected?: (err?: any) => any): Promise<ReturnType>;

static try<ReturnType>(value: ReturnType): Promise<ReturnType>;
static delay<ReturnType>(milliseconds: number, value: ReturnType): Promise<ReturnType>;
Expand Down
96 changes: 70 additions & 26 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,88 @@ declare var process: any;
import * as Promise from 'bluebird';
import {TaskQueue} from 'cwait';

/** Queue allowing 3 concurrent function calls. */
var queue = new TaskQueue(Promise, 3);
function equals(a: any, b: any) {
if(a != b) {
console.log('ERROR ' + a + ' != ' + b);
process.exit(1);
}

var running = 0;
console.log('OK ' + a);
}

/** Test function returning a promise with a slight delay
* and tracking concurrent executions. */
var testCount = 0;

function run() {
if(++running > maxRunning) maxRunning = running;
function test1() {
/** Queue allowing 3 concurrent function calls. */
var queue = new TaskQueue(Promise, 3);

return(Promise.delay(100, true).then(() => --running));
}
var running = 0;
var maxRunning = 0;

function equals(a: number, b: number) {
if(a != b) {
console.log('ERROR ' + a + ' != ' + b);
process.exit(1);
/** Test function returning a promise with a slight delay
* and tracking concurrent executions. */

function run(item: string) {
if(++running > maxRunning) maxRunning = running;

return(Promise.delay(100, true).then(() => {
--running;
return(item);
}));
}

console.log('OK ' + a);
/** List of 6 items to loop through. */
var list = '123456'.split('');

// Run test without limiting concurrency.

Promise.map(list, run).then((result: string[]) => {
++testCount;

equals(result.join(''), '123456');
equals(maxRunning, 6);

maxRunning = 0;

// Run test and limit concurrency.

return(Promise.map(list, queue.wrap(run)))
}).then((result: string[]) => {
++testCount;

equals(result.join(''), '123456');
equals(maxRunning, 3);
})
}

/** List of 6 items to loop through. */
var list = '123456'.split('');
function test2() {
function throws() {
if(1) throw(new Error('Beep'));

var maxRunning = 0;
return(Promise.resolve(true));
}

var queue = new TaskQueue(Promise, 1);

queue.wrap(throws)().then(null, (err: any) => ++testCount);
queue.wrap(throws)().then(null, (err: any) => ++testCount);
}

// Run test without limiting concurrency.
function test3() {
function rejects() {
return(Promise.reject(new Error('Beep')));
}

Promise.map(list, run).then(() => {
equals(maxRunning, 6);
var queue = new TaskQueue(Promise, 1);

maxRunning = 0;
queue.wrap(rejects)().then(null, (err: any) => ++testCount);
queue.wrap(rejects)().then(null, (err: any) => ++testCount);
}

// Run test and limit concurrency.
test1();
test2();
test3();

return(Promise.map(list, queue.wrap(run)))
}).then(() => {
equals(maxRunning, 3);
})
setTimeout(() => {
equals(testCount, 6);
}, 1000);

0 comments on commit bcc3b2b

Please sign in to comment.