Skip to content

Commit

Permalink
Implement pause/resume in Minimongo. Uses a pessimal diff algorithm.
Browse files Browse the repository at this point in the history
  • Loading branch information
n1mmy committed Apr 4, 2012
1 parent b0f7e18 commit ee1d25f
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 34 deletions.
76 changes: 55 additions & 21 deletions packages/livedata/livedata_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,29 @@ Meteor._LivedataConnection = function (url, restart_on_update) {
self.stores = {}; // name -> object with methods
self.method_handlers = {}; // name -> func
self.next_method_id = 1;
// waiting for results of method
self.outstanding_methods = []; // each item has keys: msg, callback
// waiting for data from method
self.unsatisfied_methods = {}; // map from method_id -> true
// sub was ready, is no longer (due to reconnect)
self.unready_subscriptions = {}; // map from sub._id -> true
// messages from the server that have not been applied
self.pending_data = []; // array of pending data messages
self.queued = {}; // name -> updates for (yet to be created) collection
self.quiesce_callbacks = [];
self.retry_migrate = null; // if we're blocking a migration, the retry func
// name -> updates for (yet to be created) collection
self.queued = {};
// if we're blocking a migration, the retry func
self.retry_migrate = null;

// metadata for subscriptions
self.subs = new LocalCollection;
// keyed by subs._id. value is unset or an array. if set, sub is not
// yet ready.
self.sub_ready_callbacks = {};

// just for testing
self.quiesce_callbacks = [];


// Setup auto-reload persistence.
var reload_key = "Server-" + url;
var reload_data = Meteor._reload.migration_data(reload_key);
Expand Down Expand Up @@ -333,7 +344,7 @@ _.extend(Meteor._LivedataConnection.prototype, {

// If we're using the default callback on the server,
// synchronously return the result from the remote host.
if (future) {
if (future) { // XXX should this be typeof !== undefined?
var outcome = future.wait();
if (outcome[0])
throw outcome[0];
Expand Down Expand Up @@ -384,22 +395,23 @@ _.extend(Meteor._LivedataConnection.prototype, {
// successful reconnection -- pick up where we left off.
return;

// clear out the local database!

// XXX this causes flicker ("database flap") and needs to be
// rewritten. we need to put a reset message in pending_data
// (optionally clearing pending_data and queued first, as an
// optimization), and defer processing pending_data until all of
// the subscriptions that we previously told the user were ready,
// are now once again ready. then, when we do go to process the
// messages, we need to do it in one atomic batch (the reset and
// the redeliveries together) so that livequeries don't observe
// spurious 'added' and 'removed' messages, which would cause, eg,
// DOM elements to fail to get semantically matched, leading to a
// loss of focus/input state.
_.each(self.stores, function (s) { s.reset(); });
self.pending_data = [];
// Server doesn't have our data any more. Re-sync a new session.

// Put a reset message into the pending data queue and discard any
// previous messages (they are unimportant now).
self.pending_data = ["reset"];
self.queued = {};

// Mark all currently ready subscriptions as 'unready'.
var all_subs = self.subs.find({}).fetch();
self.unready_subscriptions = {};
_.each(all_subs, function (sub) {
if (!self.sub_ready_callbacks[sub._id])
self.unready_subscriptions[sub._id] = true;
});

// Do not remove the database here. That happens once all the subs
// are re-ready and we process pending_data.
},

_livedata_data: function (msg) {
Expand All @@ -408,19 +420,41 @@ _.extend(Meteor._LivedataConnection.prototype, {
// Add the data message to the queue
self.pending_data.push(msg);

// If there are still method invocations in flight, stop
// Process satisfied methods and subscriptions.
// NOTE: does not fire callbacks here, that happens when
// the data message is processed for real. This is just for
// quiescing.
_.each(msg.methods || [], function (method_id) {
delete self.unsatisfied_methods[method_id];
});
_.each(msg.subs || [], function (sub_id) {
delete self.unready_subscriptions[sub_id];
});

// If there are still method invocations in flight, stop
for (var method_id in self.unsatisfied_methods)
return;
// If there are still uncomplete subscriptions, stop
for (var sub_id in self.unready_subscriptions)
return;

// All methods have landed. Blow away local changes and replace
// We have quiesced. Blow away local changes and replace
// with authoritative changes from server.

_.each(self.stores, function (s) { s.beginUpdate(); });

_.each(self.pending_data, function (msg) {
// Reset message from reconnect. Blow away everything.
//
// XXX instead of reset message, we could have a flag, and pass
// that to beginUpdate. This would be more efficient since we don't
// have to restore a snapshot if we're just going to blow away the
// db.
if (msg === "reset") {
_.each(self.stores, function (s) { s.reset(); });
return;
}

if (msg.collection && msg.id) {
var store = self.stores[msg.collection];

Expand Down
14 changes: 14 additions & 0 deletions packages/minimongo/diff.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// old_result: array of documents.
// new_result: array of documents.
// observer: object with 'added', 'changed', 'moved', 'removed' functions
LocalCollection._diffQuery = function (old_result, new_result, observer) {
// XXX implement
// console.log("_diffQuery", arguments);

// Pessimal, but simple, implementation.
for (var i = old_result.length - 1; i >= 0; i--)
observer.removed(old_result[i]._id, i);

for (var i = 0; i < new_result.length; i++)
observer.added(LocalCollection._deepcopy(new_result[i]), i);
};
84 changes: 74 additions & 10 deletions packages/minimongo/minimongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@ LocalCollection = function () {

this.next_qid = 1; // live query id generator

// qid -> live query object. keys: results, selector_f, sort_f, cursor, (callbacks)
// qid -> live query object. keys:
// results: array of current results
// results_snapshot: snapshot of results. null if not paused.
// cursor: Cursor object for the query.
// selector_f, sort_f, (callbacks): functions
this.queries = {};

// when we have a snapshot, this will contain a deep copy of 'docs'.
this.current_snapshot = null;

// True when observers are paused and we should not send callbacks.
this.paused = false;
};

// options may include sort, skip, limit, reactive
Expand Down Expand Up @@ -173,15 +180,27 @@ LocalCollection.Cursor.prototype.observe = function (options) {
selector_f: self.selector_f, // not fast pathed
sort_f: self.sort_f,
results: [],
results_snapshot: self.collection.paused ? [] : null,
cursor: this
};
query.results = self._getRawObjects();

query.added = options.added || function () {};
query.changed = options.changed || function () {};
query.moved = options.moved || function () {};
query.removed = options.removed || function () {};
if (!options._suppress_initial)
// wrap callbacks we were passed. callbacks only fire when not paused
// and are never undefined.
var if_not_paused = function (f) {
if (!f)
return function () {};
return function (/*args*/) {
if (!self.collection.paused)
f.apply(this, arguments);
};
};
query.added = if_not_paused(options.added);
query.changed = if_not_paused(options.changed);
query.moved = if_not_paused(options.moved);
query.removed = if_not_paused(options.removed);

if (!options._suppress_initial && !self.collection.paused)
for (var i = 0; i < query.results.length; i++)
query.added(LocalCollection._deepcopy(query.results[i]), i);

Expand Down Expand Up @@ -454,12 +473,57 @@ LocalCollection.prototype.restore = function () {
// tell what changed)
for (var qid in this.queries) {
var query = this.queries[qid];
for (var i = query.results.length - 1; i >= 0; i--)
query.removed(query.results[i]._id, i);
if (!this.paused)
for (var i = query.results.length - 1; i >= 0; i--)
query.removed(query.results[i]._id, i);

query.results = query.cursor._getRawObjects();

for (var i = 0; i < query.results.length; i++)
query.added(LocalCollection._deepcopy(query.results[i]), i);
if (!this.paused)
for (var i = 0; i < query.results.length; i++)
query.added(LocalCollection._deepcopy(query.results[i]), i);
}
};


// Pause the observers. No callbacks from observers will fire until
// 'resumeObservers' is called.
LocalCollection.prototype.pauseObservers = function () {
// No-op if already paused.
if (this.paused)
return;

// Set the 'paused' flag such that new observer messages don't fire.
this.paused = true;

// Take a snapshot of the query results for each query.
for (var qid in this.queries) {
var query = this.queries[qid];

query.results_snapshot = LocalCollection._deepcopy(query.results);
}
};

// Resume the observers. Observers immediately receive change
// notifications to bring them to the current state of the
// database. Note that this is not just replaying all the changes that
// happened during the pause, it is a smarter 'coalesced' diff.
LocalCollection.prototype.resumeObservers = function () {
// No-op if not paused.
if (!this.paused)
return;

// Unset the 'paused' flag. Make sure to do this first, otherwise
// observer methods won't actually fire when we trigger them.
this.paused = false;

for (var qid in this.queries) {
var query = this.queries[qid];
// Diff the current results against the snapshot and send to observers.
// pass the query object for its observer callbacks.
LocalCollection._diffQuery(query.results_snapshot, query.results, query);
query.results_snapshot = null;
}

};

3 changes: 2 additions & 1 deletion packages/minimongo/package.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ Package.on_use(function (api, where) {
'selector.js',
'sort.js',
'uuid.js',
'modify.js'
'modify.js',
'diff.js'
], where);
});

Expand Down
8 changes: 6 additions & 2 deletions packages/mongo-livedata/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Meteor.Collection = function (name, manager, driver) {
// to start by backing out any local writes and returning to the
// last state delivered by the server.
beginUpdate: function () {
// pause observers so users don't see flicker.
self._collection.pauseObservers();

// restore db snapshot
if (self._was_snapshot) {
self._collection.restore();
self._was_snapshot = false;
Expand Down Expand Up @@ -66,9 +70,9 @@ Meteor.Collection = function (name, manager, driver) {
}
},

// Called at the end of a batch of updates, just for symmetry,
// or in case some future database driver needs it.
// Called at the end of a batch of updates.
endUpdate: function () {
self._collection.resumeObservers();
},

// Reset the collection to its original, empty state.
Expand Down

0 comments on commit ee1d25f

Please sign in to comment.