Skip to content

Commit

Permalink
Use incrementing IDs for session requests (crossbario#377)
Browse files Browse the repository at this point in the history
* Use incrementing IDs for session requests.

Fixes crossbario#160.

* Remove global scope random ID generation.

We also weren't doing anything serializer specific. Relates to
crossbario#160.

* Restore global scope ID generation function to the util module.

Per request of @oberstet in crossbario#377. Also make it
inclusive of the max by adding 1 to the floor. Relates to
crossbario#160.
  • Loading branch information
JustinTArthur authored and oberstet committed Sep 15, 2018
1 parent 09333e6 commit e5a1f14
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 24 deletions.
17 changes: 0 additions & 17 deletions lib/serializer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,11 @@
var log = require('./log.js');


// generate a WAMP ID: this might be serializer specific, as
// we need to enforce encoding into an integer, not float
// eg we need to do some extra stuff for msgpack (json and
// cbor are fine "as is")
function newid () {
return Math.floor(Math.random() * 9007199254740992);
}


function JSONSerializer(replacer, reviver) {
this.replacer = replacer;
this.reviver = reviver;
this.SERIALIZER_ID = 'json';
this.BINARY = false;

// JSON encoder does not need anything special here
this.newid = newid;
}

JSONSerializer.prototype.serialize = function (obj) {
Expand Down Expand Up @@ -63,8 +51,6 @@ try {
function MsgpackSerializer() {
this.SERIALIZER_ID = 'msgpack';
this.BINARY = true;

this.newid = newid;
}

MsgpackSerializer.prototype.serialize = function (obj) {
Expand Down Expand Up @@ -100,9 +86,6 @@ try {
function CBORSerializer() {
this.SERIALIZER_ID = 'cbor';
this.BINARY = true;

// CBOR encoder does not need anything special here
this.newid = newid;
}

CBORSerializer.prototype.serialize = function (obj) {
Expand Down
21 changes: 15 additions & 6 deletions lib/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ var Session = function (socket, defer, onchallenge, on_user_error, on_internal_e
self._MESSAGE_MAP = {};
self._MESSAGE_MAP[MSG_TYPE.ERROR] = {};

var next_req_id = 0;
self._new_request_id = function(){
next_req_id += 1;
if (next_req_id > 9007199254740992) {
next_req_id = 1;
}
return next_req_id;
};


self._process_SUBSCRIBED = function (msg) {
//
Expand Down Expand Up @@ -1220,7 +1229,7 @@ Session.prototype.call = function (procedure, args, kwargs, options) {
// create and remember new CALL request
//
var d = self._defer();
var request = self._socket.serializer.newid();
var request = self._new_request_id();
self._call_reqs[request] = [d, options];

// construct CALL message
Expand Down Expand Up @@ -1293,7 +1302,7 @@ Session.prototype.publish = function (topic, args, kwargs, options) {
// create and remember new PUBLISH request
//
var d = null;
var request = self._socket.serializer.newid();
var request = self._new_request_id();
if (options.acknowledge) {
d = self._defer();
self._publish_reqs[request] = [d, options];
Expand Down Expand Up @@ -1341,7 +1350,7 @@ Session.prototype.subscribe = function (topic, handler, options) {

// create an remember new SUBSCRIBE request
//
var request = self._socket.serializer.newid();
var request = self._new_request_id();
var d = self._defer();
self._subscribe_reqs[request] = [d, topic, handler, options];

Expand Down Expand Up @@ -1382,7 +1391,7 @@ Session.prototype.register = function (procedure, endpoint, options) {

// create an remember new REGISTER request
//
var request = self._socket.serializer.newid();
var request = self._new_request_id();
var d = self._defer();
self._register_reqs[request] = [d, procedure, endpoint, options];

Expand Down Expand Up @@ -1446,7 +1455,7 @@ Session.prototype.unsubscribe = function (subscription) {

// create and remember new UNSUBSCRIBE request
//
var request = self._socket.serializer.newid();
var request = self._new_request_id();
self._unsubscribe_reqs[request] = [d, subscription.id];

// construct UNSUBSCRIBE message
Expand Down Expand Up @@ -1483,7 +1492,7 @@ Session.prototype.unregister = function (registration) {

// create and remember new UNREGISTER request
//
var request = self._socket.serializer.newid();
var request = self._new_request_id();
var d = self._defer();
self._unregister_reqs[request] = [d, registration];

Expand Down
12 changes: 11 additions & 1 deletion lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,20 @@ var handle_error = function(handler, error, error_message) {
} else {
console.error(error_message || 'Unhandled exception raised: ', error);
}
}
};

/**
* Generate a new ID to identify a WAMP global scope entity, such as a session or a publication.
* Represented as a JavaScript Number (double float), so ensure that an appropriate serialization
* for an integer is used for use in transported WAMP protocol messages.
*/
var new_global_id = function() {
return Math.floor(Math.random() * 9007199254740992) + 1;
};

exports.handle_error = handle_error;
exports.rand_normal = rand_normal;
exports.assert = assert;
exports.http_post = http_post;
exports.defaults = defaults;
exports.new_global_id = new_global_id;
2 changes: 2 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var rpc_options = require('./test_rpc_options.js');
var rpc_progress = require('./test_rpc_progress.js');
var rpc_slowsquare = require('./test_rpc_slowsquare.js');
var rpc_routing = require('./test_rpc_routing.js');
var rpc_request_id_sequence = require('./test_rpc_request_id_sequence.js');

var pubsub_basic = require('./test_pubsub_basic.js');
var pubsub_complex = require('./test_pubsub_complex.js');
Expand Down Expand Up @@ -60,6 +61,7 @@ exports.testRpcOptions = rpc_options.testRpcOptions;
exports.testRpcProgress = rpc_progress.testRpcProgress;
exports.testRpcSlowsquare = rpc_slowsquare.testRpcSlowsquare;
exports.testRpcRouting = rpc_routing.testRpcRouting;
exports.testRpcRequestIdSequence = rpc_request_id_sequence.testRpcRequestIdSequence;

exports.testPubsubBasic = pubsub_basic.testPubsubBasic;
exports.testPubsubComplex = pubsub_complex.testPubsubComplex;
Expand Down
83 changes: 83 additions & 0 deletions test/test_rpc_request_id_sequence.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
///////////////////////////////////////////////////////////////////////////////
//
// AutobahnJS - http://autobahn.ws, http://wamp.ws
//
// A JavaScript library for WAMP ("The Web Application Messaging Protocol").
//
// Copyright (c) Crossbar.io Technologies GmbH and contributors
//
// Licensed under the MIT License.
// http://www.opensource.org/licenses/mit-license.php
//
///////////////////////////////////////////////////////////////////////////////

var autobahn = require('./../index.js');
var testutil = require('./testutil.js');

var REGISTER_MSG_TYPE = 64;
var CALL_MSG_TYPE = 48;

exports.testRpcRequestIdSequence = function (testcase) {

testcase.expect(2);

var test = new testutil.Testlog("test/test_rpc_request_id_sequence.txt");

var connection = new autobahn.Connection(testutil.config);

connection.onopen = function (session) {

test.log('Connected');

// Hijack invocation processing to collect request IDs of incoming calls.
var sent_request_ids = [];
var original_send_wamp = session._send_wamp;
session._send_wamp = function(msg) {
if ((msg[0] === CALL_MSG_TYPE) || ((msg[0] === REGISTER_MSG_TYPE))) {
var requestId = msg[1];
test.log('' + 'Sent call with id ' + requestId);
sent_request_ids.push(requestId);
}
return original_send_wamp(msg)
};

function noop() { return null; }

session.register('com.myapp.do_nothing', noop).then(
function () {
test.log("Procedure registered.");

// Enforce sequential execution of RPCs to get "stable" test results
var d = session.call('com.myapp.do_nothing');
d = d.then(function (res) {
test.log('Received response.');
return session.call('com.myapp.do_nothing')
});
d = d.then(function (res) {
test.log('Received response.');
return session.call('com.myapp.do_nothing')
});
d = d.then(function (res) {
test.log('Received response.');
});

d.then(function () {
test.log("All calls made.");
testcase.deepEqual(sent_request_ids, [1, 2, 3, 4]);

session._send_wamp = original_send_wamp;
connection.close();

var chk = test.check();
testcase.ok(!chk, chk);
testcase.done();
});
},
function () {
test.log("Registration failed!", arguments);
}
);
};

connection.open();
};
10 changes: 10 additions & 0 deletions test/test_rpc_request_id_sequence.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0 "Connected"
1 "Sent call with id 1"
2 "Procedure registered."
3 "Sent call with id 2"
4 "Received response."
5 "Sent call with id 3"
6 "Received response."
7 "Sent call with id 4"
8 "Received response."
9 "All calls made."

0 comments on commit e5a1f14

Please sign in to comment.