Skip to content

Commit

Permalink
handle unobserved errors by unobservedError event
Browse files Browse the repository at this point in the history
  • Loading branch information
JeffreyZhao committed Aug 23, 2012
1 parent aabe73a commit 2d08fe5
Show file tree
Hide file tree
Showing 2 changed files with 365 additions and 76 deletions.
252 changes: 188 additions & 64 deletions src/wind-async.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
(function () {
"use strict";

var supportDefineProperty = (function () {
var i = 0;
var getter = function () {
if (i === 0) {
throw new Error("Execute too soon.");
}

return i;
};

var obj = {};

try {
Object.defineProperty(obj, "value", { get: getter });

i = 1;
return obj.value === 1;
} catch (ex) {
return false;
}
})();

var Wind, _;

var Async = { };
Expand Down Expand Up @@ -95,9 +117,71 @@
Task when helpers
***********************************************************************/

var EventManager = function () {
this._listeners = {};
this._firing = null;
}
EventManager.prototype = {
add: function (name, listener) {
if (this._firing === name) {
var self = this;
setTimeout(function () {
self.add(name, listener);
}, 0);

return;
}

var eventListeners = this._listeners[name];
if (!eventListeners) {
eventListeners = this._listeners[name] = [];
}

eventListeners.push(listener);
},

remove: function (name, listener) {
if (this._firing === name) {
var self = this;
setTimeout(function () {
self.remove(name, listener);
}, 0);

return;
}

var eventListeners = this._listeners[name];
if (!eventListeners) return;

var index = eventListeners.indexOf(listener);
if (index >= 0) {
eventListeners.splice(index, 1);
}
},

fire: function (name, self, args) {
var listeners = this._listeners[name];
if (!listeners) return;

this._firing = name;

for (var i = 0; i < listeners.length; i++) {
try {
listeners[i].call(self, args);
} catch (ex) {
Wind.logger.warn('An error occurred in "' + name + ' listener": ' + ex);
}
}

this._firing = null;
}
};

var taskEventManager = new EventManager();

var Task = Async.Task = function (delegate) {
this._delegate = delegate;
this._listeners = { };
this._eventManager = new EventManager();
this.status = "ready";
}
Task.prototype = {
Expand All @@ -107,85 +191,79 @@
}

this.status = "running";
this._delegate(this);

try {
this._delegate(this);
} catch (ex) {
if (this.status == "running") {
this.complete("failure", ex);
} else {
Wind.logger.warn("An unexpected error occurred after the task is completed: " + ex);
}
}

return this;
},

complete: function (type, value) {
if (type !== "success" && type !== "failure") {
throw new Error("Unsupported type: " + type);
}

if (this.status != "running") {
throw new Error('The "complete" method can only be called in "running" status.');
}

var listeners = this._listeners;
delete this._listeners;
var eventManager = this._eventManager;
this._eventManager = null;

if (type == "success") {

if (type === "success") {
this.result = value;
this.status = "succeeded";
this._notify("success", listeners["success"]);

} else if (type == "failure") {

this.error = value;
eventManager.fire("success", this);
} else {
this._error = value;

if (isCanceledError(value)) {
this.status = "canceled";
} else {
this.status = "faulted";
}

this._notify("failure", listeners["failure"]);

} else {
throw new Error("Unsupported type: " + type);
eventManager.fire("failure", this);
}

this._notify("complete", listeners["complete"]);
eventManager.fire("complete", this);

if (this.error && !listeners["failure"] && !listeners["complete"]) {
Wind.logger.warn("[WARNING] An unhandled error occurred: " + this.error);
}
},

_notify: function (ev, listeners) {
if (!listeners) {
if (type !== "failure") return;

if (!supportDefineProperty) {
this.error = value;
return;
}

for (var i = 0; i < listeners.length; i++) {
listeners[i].call(this);

if (!this._errorObserved) {
var self = this;
this._unobservedTimeoutToken = setTimeout(function () {
self._handleUnobservedError(value);
}, Task.unobservedTimeout);
}
},

_handleUnobservedError: function (error) {
this._unobservedTimeoutToken = null;

addEventListener: function (ev, listener) {
if (!this._listeners) {
return this;
}

if (!this._listeners[ev]) {
this._listeners[ev] = [];
}
var args = {
task: this,
error: error,
observed: false
};

this._listeners[ev].push(listener);
return this;
},

removeEventListener: function (ev, listener) {
if (!this._listeners) {
return this;
}

var evListeners = this._listeners[ev];
if (!evListeners) return this;
taskEventManager.fire("unobservedError", Task, args);

var index = evListeners.indexOf(listener);
if (index >= 0) {
evListeners.splice(index, 1);
if (!args.observed) {
throw error;
}

return this;
},

then: function (nextGenerator) {
Expand All @@ -207,7 +285,7 @@
}

if (nextTask.status == "running") {
nextTask.addEventListener("complete", nextOnComplete);
nextTask.on("complete", nextOnComplete);
} else {
nextOnComplete.call(nextTask);
}
Expand All @@ -233,14 +311,57 @@
}

if (firstTask.status == "running") {
firstTask.addEventListener("complete", firstOnComplete);
firstTask.on("complete", firstOnComplete);
} else {
firstOnComplete.call(firstTask);
}
});
}
};

Task.prototype.on = Task.prototype.addEventListener = function () {
var eventManager = this._eventManager;
if (!eventManager) {
throw new Error("Cannot add event listeners when the task is complete.");
}

eventManager.add.apply(eventManager, arguments);
};

Task.prototype.off = Task.prototype.removeEventListener = function () {
var eventManager = this._eventManager;
if (!eventManager) {
throw new Error("All the event listeners have been removed when the task was complete.");
}

eventManager.remove.apply(eventManager, arguments);
};

if (supportDefineProperty) {
Object.defineProperty(Task.prototype, "error", {
get: function () {
var token = this._unobservedTimeoutToken;
if (token) {
clearTimeout(token);
this._unobservedTimeoutToken = null;
}

this._errorObserved = true;
return this._error;
}
});
}

Task.on = Task.addEventListener = function () {
taskEventManager.add.apply(taskEventManager, arguments);
}

Task.off = Task.removeEventListener = function (name, listener) {
taskEventManager.remove.apply(taskEventManager, arguments);
}

Task.unobservedTimeout = 10 * 1000;

var isTask = Task.isTask = function (t) {
return t && (typeof t.start === "function") && (typeof t.addEventListener) === "function" && (typeof t.removeEventListener) === "function" && (typeof t.complete) === "function";
};
Expand Down Expand Up @@ -268,21 +389,16 @@

return create(function (taskWhenAll) {

var errors;

var done = function () {
var results = _.isArray(inputTasks) ? new Array(inputTasks.length) : { };
var errors = [];

_.each(inputTasks, function (key, task) {
if (task.error) {
errors.push(task.error);
} else {
results[key] = task.result;
}
});

if (errors.length > 0) {
if (errors) {
taskWhenAll.complete("failure", new AggregateError(errors));
} else {
var results = _.map(inputTasks, function (t) {
return t.result;
});

taskWhenAll.complete("success", results);
}
}
Expand All @@ -303,10 +419,18 @@
if (task.status === "running") {
runningNumber++;
task.addEventListener("complete", function () {
if (this.error) {
if (!errors) errors = [];
errors.push(this.error);
}

if (--runningNumber == 0) {
done();
}
});
} else if (task.status === "faulted" || task.status === "canceled") {
if (!errors) errors = [];
errors.push(task.error);
}
});

Expand Down
Loading

0 comments on commit 2d08fe5

Please sign in to comment.