Skip to content

Commit

Permalink
DEV BUNDLE UPDATE to add websocket-based ddp client for node.
Browse files Browse the repository at this point in the history
  • Loading branch information
gschmidt committed Dec 21, 2012
1 parent 0368441 commit 3732f0e
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 5 deletions.
3 changes: 2 additions & 1 deletion admin/generate-dev-bundle.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -e
set -u

BUNDLE_VERSION=0.2.12
BUNDLE_VERSION=0.2.13
UNAME=$(uname)
ARCH=$(uname -m)

Expand Down Expand Up @@ -99,6 +99,7 @@ npm install [email protected]
npm install [email protected]
npm install [email protected]
npm install [email protected]
npm install [email protected]

# progress 0.1.0 has a regression where it opens stdin and thus does not
# allow the node process to exit cleanly. See
Expand Down
2 changes: 1 addition & 1 deletion meteor
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

BUNDLE_VERSION=0.2.12
BUNDLE_VERSION=0.2.13

# OS Check. Put here because here is where we download the precompiled
# bundles that are arch specific.
Expand Down
2 changes: 1 addition & 1 deletion packages/livedata/livedata_connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Meteor._LivedataConnection = function (url, options) {
if (typeof url === "object") {
self._stream = url;
} else {
self._stream = new Meteor._Stream(url);
self._stream = new Meteor._DdpClientStream(url);
}

self._lastSessionId = null;
Expand Down
2 changes: 1 addition & 1 deletion packages/livedata/livedata_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ Meteor._LivedataServer = function () {

self.sessions = {}; // map from id to session

self.stream_server = new Meteor._StreamServer;
self.stream_server = new Meteor._DdpStreamServer;

self.stream_server.register(function (socket) {
socket.meteor_session = null;
Expand Down
14 changes: 13 additions & 1 deletion packages/livedata/package.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,18 @@ Package.describe({
});

Package.on_use(function (api) {
api.use(['stream', 'uuid']);
api.use(['uuid']);
api.use(['json', 'underscore', 'deps', 'logging'], ['client', 'server']);

// Transport
api.use(['underscore', 'logging', 'uuid', 'json'], ['client', 'server']);
api.use('reload', 'client');
api.add_files(['sockjs-0.3.4.js',
'stream_client_sockjs.js'], 'client');
api.add_files('stream_client_nodejs.js', 'server');
api.add_files('stream_server.js', 'server');


// livedata_connection.js uses a Minimongo collection internally to
// manage the current set of subscriptions.
api.use('minimongo', ['client', 'server']);
Expand Down Expand Up @@ -34,4 +43,7 @@ Package.on_test(function (api) {
api.add_files('livedata_connection_tests.js', ['client']);
api.add_files('livedata_tests.js', ['client', 'server']);
api.add_files('livedata_test_service.js', ['client', 'server']);

api.use('http', 'client');
api.add_files(['stream_tests.js'], 'client');
});
300 changes: 300 additions & 0 deletions packages/livedata/stream_client_nodejs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// WebSocket-Node https://github.com/Worlize/WebSocket-Node
// Chosen because it can run without native components. It has a
// somewhat idiosyncratic API. We may want to use 'ws' instead in the
// future.
var WebSocketClient = __meteor_bootstrap__.require('websocket').client;

// @param endpoint {String} URL to Meteor app
// "http://subdomain.meteor.com/" or "/" or
// "ddp+sockjs://foo-**.meteor.com/sockjs"
//
// -> Unlike the client, we require something of the form 'mysite.com',
// which we will map to 'ws(s)://mysite.com/websocket'
//
// We don't do any heartbeating. (The logic that did this in sockjs
// was removed, because it used a built-in sockjs mechanism. We could
// do it with WebSocket ping frames or with DDP-level messages.)
Meteor._DdpClientStream = function (endpoint) {
var self = this;

self.client = new WebSocketClient;
self.endpoint = endpoint;
self.currentConnection = null;
self.eventCallbacks = {}; // name -> [callback]
self.forceFail = false; // for debugging.

self.client.on('connect', function (connection) {
return self._onConnect(connection);
});

//// Constants

// how long to wait until we declare the connection attempt
// failed.
self.CONNECT_TIMEOUT = 10000;

// time for initial reconnect attempt.
self.RETRY_BASE_TIMEOUT = 1000;
// exponential factor to increase timeout each attempt.
self.RETRY_EXPONENT = 2.2;
// maximum time between reconnects.
self.RETRY_MAX_TIMEOUT = 1800000; // 30min.
// time to wait for the first 2 retries. this helps page reload
// speed during dev mode restarts, but doesn't hurt prod too
// much (due to CONNECT_TIMEOUT)
self.RETRY_MIN_TIMEOUT = 10;
// how many times to try to reconnect 'instantly'
self.RETRY_MIN_COUNT = 2;
// fuzz factor to randomize reconnect times by. avoid reconnect
// storms.
self.RETRY_FUZZ = 0.5; // +- 25%

//// Reactive status
self.currentStatus = {
status: "connecting", connected: false, retryCount: 0
};

self.statusListeners = (Meteor.deps && new Meteor.deps._ContextSet);
self.statusChanged = function () {
if (self.statusListeners)
self.statusListeners.invalidateAll();
};
self.expectingWelcome = false;

//// Retry logic
self.retryTimer = null;
self.connectionTimer = null;

//// Kickoff!
self._launchConnection();
};

_.extend(Meteor._DdpClientStream, {
_endpointToUrl: function (endpoint) {
// XXX should be secure!
// among other problems
return 'ws://' + endpoint + '/websocket';
}
});

_.extend(Meteor._DdpClientStream.prototype, {
// Register for callbacks.
on: function (name, callback) {
var self = this;

if (name !== 'message' && name !== 'reset' && name !== 'update_available')
throw new Error("unknown event type: " + name);

if (!self.eventCallbacks[name])
self.eventCallbacks[name] = [];
self.eventCallbacks[name].push(callback);
},

// data is a utf8 string. Data sent while not connected is dropped on
// the floor, and it is up the user of this API to retransmit lost
// messages on 'reset'
send: function (data) {
var self = this;
if (self.currentStatus.connected) {
self.currentConnection.send(data);
}
},

// Get current status. Reactive.
status: function () {
var self = this;
if (self.statusListeners)
self.statusListeners.addCurrentContext();
return self.currentStatus;
},

// Trigger a reconnect.
reconnect: function (options) {
var self = this;

if (self.currentStatus.connected) {
if (options && options._force) {
// force reconnect.
self._disconnected();
} // else, noop.
return;
}

// if we're mid-connection, stop it.
if (self.currentStatus.status === "connecting") {
self._fakeConnectFailed();
}

if (self.retryTimer)
clearTimeout(self.retryTimer);
self.retryTimer = null;
self.currentStatus.retryCount -= 1; // don't count manual retries
self._retryNow();
},

// Undocumented function for testing -- as long as the flag is set,
// the connection is forced to be disconnected
forceDisconnect: function (flag) {
var self = this;
self.forceFail = flag;
if (flag && self.currentConnection)
self.currentConnection.close();
},

_onConnect: function (connection) {
var self = this;

if (self.currentStatus.connected) {
// We already have a connection. It must have been the case that
// we started two parallel connection attempts (because we
// wanted to 'reconnect now' on a hanging connection and we had
// no way to cancel the connection attempt.) Just ignore/close
// the latecomer.
connection.close();
return;
}

if (self.connectionTimer) {
clearTimeout(self.connectionTimer);
self.connectionTimer = null;
}

connection.on('error', function (error) {
if (self.currentConnection !== this)
return;

Meteor._debug("stream error", error.toString(),
(new Date()).toDateString());
self._disconnected();
});

connection.on('close', function () {
if (self.currentConnection !== this)
return;

self._disconnected();
});

self.expectingWelcome = true;
connection.on('message', function (message) {
if (self.currentConnection !== this)
return; // old connection still emitting messages

if (self.expectingWelcome) {
// Discard the first message that comes across the
// connection. It is the hot code push version identifier and
// is not actually part of DDP.
self.expectingWelcome = false;
return;
}

if (message.type === "utf8") // ignore binary frames
_.each(self.eventCallbacks.message, function (callback) {
callback(message.utf8Data);
});
});

// update status
self.currentConnection = connection;
self.currentStatus.status = "connected";
self.currentStatus.connected = true;
self.currentStatus.retryCount = 0;
self.statusChanged();

// fire resets. This must come after status change so that clients
// can call send from within a reset callback.
_.each(self.eventCallbacks.reset, function (callback) { callback(); });
},

_cleanupConnection: function () {
var self = this;

if (self.currentConnection) {
self.currentConnection.close();
self.currentConnection = null;
}
},

_disconnected: function () {
var self = this;

if (self.connectionTimer) {
clearTimeout(self.connectionTimer);
self.connectionTimer = null;
}
self._cleanupConnection();
self._retryLater(); // sets status. no need to do it here.
},

_fakeConnectFailed: function () {
var self = this;
self._cleanupConnection();
self._disconnected();
},

_retryTimeout: function (count) {
var self = this;

if (count < self.RETRY_MIN_COUNT)
return self.RETRY_MIN_TIMEOUT;

var timeout = Math.min(
self.RETRY_MAX_TIMEOUT,
self.RETRY_BASE_TIMEOUT * Math.pow(self.RETRY_EXPONENT, count));
// fuzz the timeout randomly, to avoid reconnect storms when a
// server goes down.
timeout = timeout * ((Math.random() * self.RETRY_FUZZ) +
(1 - self.RETRY_FUZZ/2));
return timeout;
},

_retryLater: function () {
var self = this;

var timeout = self._retryTimeout(self.currentStatus.retryCount);
if (self.retryTimer)
clearTimeout(self.retryTimer);
self.retryTimer = setTimeout(_.bind(self._retryNow, self), timeout);

self.currentStatus.status = "waiting";
self.currentStatus.connected = false;
self.currentStatus.retryTime = (new Date()).getTime() + timeout;
self.statusChanged();
},

_retryNow: function () {
var self = this;

if (self.forceFail)
return;

self.currentStatus.retryCount += 1;
self.currentStatus.status = "connecting";
self.currentStatus.connected = false;
delete self.currentStatus.retryTime;
self.statusChanged();

self._launchConnection();
},

_launchConnection: function () {
var self = this;
self._cleanupConnection(); // cleanup the old socket, if there was one.

// launch a connect attempt. we have no way to track it. we either
// get an _onConnect event, or we don't.

// we would like to specify 'ddp' as the protocol here, but
// unfortunately WebSocket-Node fails the handshake if we ask for
// a protocol and the server doesn't send one back (and sockjs
// doesn't). also, related: I guess we have to accept that
// 'stream' is ddp-specific
self.client.connect(Meteor._DdpClientStream._endpointToUrl(self.endpoint));

if (self.connectionTimer)
clearTimeout(self.connectionTimer);
self.connectionTimer = setTimeout(
_.bind(self._fakeConnectFailed, self),
self.CONNECT_TIMEOUT);
}
});

0 comments on commit 3732f0e

Please sign in to comment.