Skip to content

Commit

Permalink
Implemented pluggable serializers and msgpack serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Mar 9, 2016
1 parent 0e6fe11 commit b9ab5d8
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 19 deletions.
3 changes: 3 additions & 0 deletions package/lib/autobahn.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var log = require('./log.js');
var session = require('./session.js');
var connection = require('./connection.js');
var configure = require('./configure.js');
var serializer = require('./serializer.js');

var persona = require('./auth/persona.js');
var cra = require('./auth/cra.js');
Expand All @@ -51,6 +52,8 @@ exports.Subscription = session.Subscription;
exports.Registration = session.Registration;
exports.Publication = session.Publication;

exports.serializer = serializer;

exports.auth_persona = persona.auth;
exports.auth_cra = cra;

Expand Down
3 changes: 3 additions & 0 deletions package/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ Connection.prototype._init_transport_factories = function () {
// defaulting to options.url if none is provided
transport_options.url = this._options.url;
}
if (!transport_options.serializers) {
transport_options.serializers = this._options.serializers;
}
if (!transport_options.protocols) {
transport_options.protocols = this._options.protocols;
}
Expand Down
53 changes: 53 additions & 0 deletions package/lib/serializer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
///////////////////////////////////////////////////////////////////////////////
//
// AutobahnJS - http://autobahn.ws, http://wamp.ws
//
// A JavaScript library for WAMP ("The Web Application Messaging Protocol").
//
// Copyright (C) 2011-2016 Tavendo GmbH, http://tavendo.com
//
// Licensed under the MIT License.
// http://www.opensource.org/licenses/mit-license.php
//
///////////////////////////////////////////////////////////////////////////////


function JSONSerializer(replacer, reviver) {
this.replacer = replacer;
this.reviver = reviver;
this.SERIALIZER_ID = 'json';
this.BINARY = false;
}

JSONSerializer.prototype.serialize = function (obj) {
return JSON.stringify(obj, this.replacer);
};

JSONSerializer.prototype.unserialize = function (payload) {
return JSON.parse(payload, this.reviver);
};

exports.JSONSerializer = JSONSerializer;


try {
var msgpack = require('msgpack-lite');

function MsgpackSerializer() {
// https://github.com/mcollina/msgpack5
this.SERIALIZER_ID = 'msgpack';
this.BINARY = true;
}

MsgpackSerializer.prototype.serialize = function (obj) {
return msgpack.encode(obj);
};

MsgpackSerializer.prototype.unserialize = function (payload) {
return msgpack.decode(payload);
};

exports.MsgpackSerializer = MsgpackSerializer;
} catch (err) {
// msgpack-lite not installed
}
2 changes: 1 addition & 1 deletion package/lib/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var WAMP_FEATURES = {
// generate a WAMP ID
//
function newid () {
return Math.floor(Math.random() * 9007199254740992);
return Math.floor(Math.random() * 2147483648);
}


Expand Down
59 changes: 44 additions & 15 deletions package/lib/transport/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,28 @@

var util = require('../util.js');
var log = require('../log.js');

var serializer = require('../serializer.js');

function Factory (options) {
var self = this;

util.assert(options.url !== undefined, "options.url missing");
util.assert(typeof options.url === "string", "options.url must be a string");

if (!options.serializers) {
options.serializers = [new serializer.JSONSerializer()];
if (serializer.MsgpackSerializer) {
options.serializers.push(new serializer.MsgpackSerializer());
}
} else {
util.assert(Array.isArray(options.serializers), "options.serializers must be an array");
}

if (!options.protocols) {
options.protocols = ['wamp.2.json'];
options.protocols = [];
options.serializers.forEach(function (ser) {
options.protocols.push("wamp.2." + ser.SERIALIZER_ID);
});
} else {
util.assert(Array.isArray(options.protocols), "options.protocols must be an array");
}
Expand All @@ -44,6 +56,7 @@ Factory.prototype.create = function () {

// these will get defined further below
transport.protocol = undefined;
transport.serializer = undefined;
transport.send = undefined;
transport.close = undefined;

Expand All @@ -54,8 +67,8 @@ Factory.prototype.create = function () {

transport.info = {
type: 'websocket',
url: null,
protocol: 'wamp.2.json'
url: self._options.url,
protocol: null
};


Expand Down Expand Up @@ -83,25 +96,31 @@ Factory.prototype.create = function () {
}

transport.send = function (msg) {
var payload = JSON.stringify(msg);
websocket.send(payload, {binary: false});
var payload = transport.serializer.serialize(msg);
websocket.send(payload, {binary: transport.serializer.BINARY});
};

transport.close = function (code, reason) {
websocket.close();
};

websocket.on('open', function () {
var serializer_part = websocket.protocol.split('.')[2];
for (var index in self._options.serializers) {
var serializer = self._options.serializers[index];
if (serializer.SERIALIZER_ID == serializer_part) {
transport.serializer = serializer;
break;
}
}

transport.info.protocol = websocket.protocol;
transport.onopen();
});

websocket.on('message', function (data, flags) {
if (flags.binary) {
// FIXME!
} else {
var msg = JSON.parse(data);
transport.onmessage(msg);
}
var msg = transport.serializer.unserialize(data);
transport.onmessage(msg);
});

// FIXME: improve mapping to WS API for the following
Expand Down Expand Up @@ -143,6 +162,7 @@ Factory.prototype.create = function () {
} else {
websocket = new global.WebSocket(self._options.url);
}
websocket.binaryType = 'arrayBuffer';

// older versions of Firefox prefix the WebSocket object
} else if ("MozWebSocket" in global) {
Expand All @@ -159,12 +179,21 @@ Factory.prototype.create = function () {
websocket.onmessage = function (evt) {
log.debug("WebSocket transport receive", evt.data);

var msg = JSON.parse(evt.data);
var msg = transport.serializer.unserialize(evt.data);
transport.onmessage(msg);
}

websocket.onopen = function () {
transport.info.url = self._options.url;
var serializer_part = websocket.protocol.split('.')[2];
for (var index in self._options.serializers) {
var serializer = self._options.serializers[index];
if (serializer.SERIALIZER_ID == serializer_part) {
transport.serializer = serializer;
break;
}
}

transport.info.protocol = websocket.protocol;
transport.onopen();
}

Expand All @@ -183,7 +212,7 @@ Factory.prototype.create = function () {
//websocket.onerror = websocket.onclose;

transport.send = function (msg) {
var payload = JSON.stringify(msg);
var payload = transport.serializer.serialize(msg);
log.debug("WebSocket transport send", payload);
websocket.send(payload);
}
Expand Down
6 changes: 4 additions & 2 deletions package/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
"description": "An implementation of The Web Application Messaging Protocol (WAMP).",
"main": "index.js",
"browser": {
"lib/transport/rawsocket.js": false
"lib/transport/rawsocket.js": false,
"msgpack-lite": "msgpack-lite/global"
},
"scripts": {
"test": "nodeunit test/test.js"
},
"dependencies": {
"crypto-js": ">= 3.1.5",
"when": ">= 3.7.3",
"ws": ">= 0.8.0"
"ws": ">= 0.8.0",
"msgpack-lite": ">= 0.1.17"
},
"devDependencies": {
"browserify": ">= 11.0.1",
Expand Down
2 changes: 2 additions & 0 deletions package/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// this works via https://github.com/caolan/nodeunit

var connect = require('./test_connect.js');
var msgpack_serialization = require('./test_msgpack_serialization.js');
var rpc_complex = require('./test_rpc_complex.js');
var rpc_arguments = require('./test_rpc_arguments.js');
var rpc_error = require('./test_rpc_error.js');
Expand All @@ -34,6 +35,7 @@ var pubsub_publisher_disclose_me = require('./test_pubsub_publisher_disclose_me.


exports.testConnect = connect.testConnect;
exports.testMsgpackSerialization = msgpack_serialization.testMsgpackSerialization;
exports.testRpcArguments = rpc_arguments.testRpcArguments;
exports.testRpcComplex = rpc_complex.testRpcComplex;
exports.testRpcError = rpc_error.testRpcError;
Expand Down
88 changes: 88 additions & 0 deletions package/test/test_msgpack_serialization.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
///////////////////////////////////////////////////////////////////////////////
//
// AutobahnJS - http://autobahn.ws, http://wamp.ws
//
// A JavaScript library for WAMP ("The Web Application Messaging Protocol").
//
// Copyright (C) 2011-2014 Tavendo GmbH, http://tavendo.com
//
// Licensed under the MIT License.
// http://www.opensource.org/licenses/mit-license.php
//
///////////////////////////////////////////////////////////////////////////////

var autobahn = require('./../index.js');
var testutil = require('./testutil.js');


exports.testMsgpackSerialization = function (testcase) {

testcase.expect(1);

var test = new testutil.Testlog("test/test_msgpack_serialization.txt");

var serializer = new autobahn.serializer.MsgpackSerializer();

var config = {
url: testutil.config.url,
realm: testutil.config.realm,
serializers: [serializer]
};
var connection = new autobahn.Connection(config);

connection.onopen = function (session) {

test.log('Connected');

function echo(args) {
return args[0];
}

var endpoints = {
'com.myapp.echo': echo
};

var pl1 = [];

for (var uri in endpoints) {
pl1.push(session.register(uri, endpoints[uri]));
}

autobahn.when.all(pl1).then(
function () {
test.log("All registered.");
test.log("Serializer ID: " + session._socket.serializer.SERIALIZER_ID);

var pl2 = [];

var vals1 = [1.7, "hello", [1, 2, -3], {a: 5, b: "hello2"}, null];

for (var i = 0; i < vals1.length; ++i) {

pl2.push(session.call('com.myapp.echo', [vals1[i]]).then(
function (res) {
test.log("Result:", res);
},
function (err) {
test.log("Error:", err.error, err.args, err.kwargs);
}
));
}

autobahn.when.all(pl2).then(function () {
test.log("All finished.");
connection.close();

var chk = test.check();
testcase.ok(!chk, chk);
testcase.done();
});
},
function () {
test.log("Registration failed!", arguments);
}
);
};

connection.open();
};
9 changes: 9 additions & 0 deletions package/test/test_msgpack_serialization.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
0 "Connected"
1 "All registered."
2 "Serializer ID: msgpack"
3 "Result:" 1.7
4 "Result:" "hello"
5 "Result:" [1,2,-3]
6 "Result:" {"a":5,"b":"hello2"}
7 "Result:" null
8 "All finished."
2 changes: 1 addition & 1 deletion package/test/testutil.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Testlog.prototype.log = function () {

var self = this;

//console.log.apply(this, arguments);
console.log.apply(this, arguments);
self._log.push(arguments);
};

Expand Down

0 comments on commit b9ab5d8

Please sign in to comment.