Skip to content

Commit

Permalink
Enable event subscription using WebSockets in the REST server (hyperl…
Browse files Browse the repository at this point in the history
…edger-archives#1655)

* Enable event subscription in the LoopBack connector

* Enable event subscription using WebSockets in the REST server
  • Loading branch information
Simon Stone authored Jul 25, 2017
1 parent d4a4473 commit 5274211
Show file tree
Hide file tree
Showing 25 changed files with 1,055 additions and 71 deletions.
13 changes: 9 additions & 4 deletions packages/composer-rest-server/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const yargs = require('yargs')
.option('N', { alias: 'namespaces', describe: 'Use namespaces if conflicting types exist', type: 'string', default: process.env.COMPOSER_NAMESPACES || 'always', choices: ['always', 'required', 'never'] })
.option('P', { alias: 'port', describe: 'The port to serve the REST API on', type: 'number', default: process.env.COMPOSER_PORT || undefined })
.option('S', { alias: 'security', describe: 'Enable security for the REST API', type: 'boolean', default: process.env.COMPOSER_SECURITY || false })
.option('w', { alias: 'websockets', describe: 'Enable event publication over WebSockets', type: 'boolean', default: process.env.COMPOSER_WEBSOCKETS || true })
.alias('v', 'version')
.version(() => {
return getInfo('composer-rest-server')+
Expand Down Expand Up @@ -73,7 +74,8 @@ if (interactive) {
participantId: answers.userid,
participantPwd: answers.secret,
namespaces: answers.namespaces,
security: answers.security
security: answers.security,
websockets: answers.websockets
};
console.log('\nTo restart the REST server using the same options, issue the following command:');
let cmd = [ 'composer-rest-server' ];
Expand All @@ -85,6 +87,7 @@ if (interactive) {
'-N': 'namespaces',
'-P': 'port',
'-S': 'security',
'-w': 'websockets'
};
for (let arg in args) {
const propName = args[arg];
Expand All @@ -109,7 +112,8 @@ if (interactive) {
participantPwd: yargs.s,
namespaces: yargs.N,
port: yargs.P,
security: yargs.S
security: yargs.S,
websockets: yargs.w
});
}
}
Expand All @@ -121,10 +125,11 @@ module.exports = promise.then((composer) => {
return server(composer);

})
.then((app) => {
.then((result) => {

// Start the LoopBack application.
return app.listen(function () {
const app = result.app, server = result.server;
return server.listen(app.get('port'), () => {
app.emit('started');
let baseUrl = app.get('url').replace(/\/$/, '');
console.log('Web server listening at: %s', baseUrl);
Expand Down
6 changes: 6 additions & 0 deletions packages/composer-rest-server/lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class Util {
type: 'confirm',
message: 'Specify if you want the generated REST API to be secured:',
default: false
},
{
name: 'websockets',
type: 'confirm',
message: 'Specify if you want to enable event publication over WebSockets:',
default: true
}
];

Expand Down
1 change: 1 addition & 0 deletions packages/composer-rest-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"serve-favicon": "^2.0.1",
"strong-error-handler": "^1.0.1",
"touch": "^1.0.0",
"ws": "^3.0.0",
"yargs": "^8.0.1"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,15 @@ module.exports = function (app, callback) {

});

// Subscribe to events from the business network.
dataSource.connector.subscribe((event) => {
const wss = app.get('wss');
if (wss) {
const data = JSON.stringify(event);
wss.broadcast(data);
}
});

})
.then(() => {
console.log('Added schemas for all types to Loopback');
Expand Down
38 changes: 35 additions & 3 deletions packages/composer-rest-server/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
const boot = require('loopback-boot');
const bodyParser = require('body-parser');
const cookieParser = require('cookie-parser');
const http = require('http');
const loopback = require('loopback');
const loopbackPassport = require('loopback-component-passport');
const path = require('path');
const session = require('express-session');
const WebSocket = require('ws');

module.exports = function (composer) {

Expand Down Expand Up @@ -133,7 +135,36 @@ module.exports = function (composer) {

}

return app;
// Create the HTTP server.
const server = http.createServer(app);

// The following configuration is only required if WebSockets are enabled.
const websockets = !!composer.websockets;
if (websockets) {

// Create a new WebSocket server that manages clients for us.
const wss = new WebSocket.Server({
server,
clientTracking: true
});

// Add a broadcast method that sends data to all connected clients.
wss.broadcast = (data) => {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
};

// Store the WebSocket server for the boot script to find.
app.set('wss', wss);

}

// Return the application and the server (both are needed to start the thing).
return { app, server };

});

};
Expand All @@ -145,10 +176,11 @@ module.exports = function (composer) {
if (require.main === module) {
const composerConfig = require('./composer.json');
module.exports(composerConfig)
.then((app) => {
.then((result) => {

// Start the LoopBack application.
return app.listen(function () {
const app = result.app, server = result.server;
return server.listen(() => {
app.emit('started');
let baseUrl = app.get('url').replace(/\/$/, '');
console.log('Web server listening at: %s', baseUrl);
Expand Down
4 changes: 2 additions & 2 deletions packages/composer-rest-server/test/assets.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ const bfs_fs = BrowserFS.BFSRequire('fs');
namespaces: namespaces
});
})
.then((app_) => {
app = app_;
.then((result) => {
app = result.app;
businessNetworkConnection = new BusinessNetworkConnection({ fs: bfs_fs });
return businessNetworkConnection.connect('defaultProfile', 'bond-network', 'admin', 'Xurw3yU9zI0l');
})
Expand Down
85 changes: 59 additions & 26 deletions packages/composer-rest-server/test/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ describe('composer-rest-server CLI unit tests', () => {
userid: 'admin',
secret: 'adminpw',
namespaces: 'always',
security: false
security: false,
websockets: true
});
sandbox.stub(process, 'exit');
sandbox.spy(console, 'log');
Expand Down Expand Up @@ -63,10 +64,16 @@ describe('composer-rest-server CLI unit tests', () => {

it('should call inquirer if no arguments specified and start the server', () => {
let listen = sinon.stub();
let get = sinon.stub();
get.withArgs('port').returns(3000);
process.argv = [ process.argv0 ];
const server = sinon.stub();
server.resolves({
listen: listen
const server = sinon.stub().resolves({
app: {
get
},
server: {
listen
}
});
return proxyquire('../cli', {
clear: () => { },
Expand All @@ -82,27 +89,34 @@ describe('composer-rest-server CLI unit tests', () => {
namespaces: 'always',
participantId: 'admin',
participantPwd: 'adminpw',
security: false
security: false,
websockets: true
};
sinon.assert.calledWith(server, settings);
sinon.assert.calledOnce(listen);
listen.args[0][0].should.equal(3000);
listen.args[0][1].should.be.a('function');
});
});

it('should throw an error if command line arguments specified but some are missing', () => {
let listen = sinon.stub();
process.argv = [ process.argv0, 'cli.js', '-n', 'org-acme-biznet' ];
delete require.cache[require.resolve('yargs')];
const server = sinon.stub().resolves({
app: {

},
server: {
listen
}
});
return proxyquire('../cli', {
clear: () => { },
chalk: {
yellow: () => { return ''; }
},
'./server/server': () => {
return Promise.resolve({
listen: listen
});
}
'./server/server': server
}).then(() => {
sinon.assert.notCalled(Util.getConnectionSettings);
sinon.assert.calledOnce(process.exit);
Expand All @@ -113,6 +127,8 @@ describe('composer-rest-server CLI unit tests', () => {

it('should use the argumemts from yargs and start the server', () => {
let listen = sinon.stub();
let get = sinon.stub();
get.withArgs('port').returns(3000);
process.argv = [
process.argv0, 'cli.js',
'-p', 'defaultProfile',
Expand All @@ -121,9 +137,13 @@ describe('composer-rest-server CLI unit tests', () => {
'-s', 'adminpw'
];
delete require.cache[require.resolve('yargs')];
const server = sinon.stub();
server.resolves({
listen: listen
const server = sinon.stub().resolves({
app: {
get
},
server: {
listen
}
});
return proxyquire('../cli', {
clear: () => { },
Expand All @@ -140,17 +160,21 @@ describe('composer-rest-server CLI unit tests', () => {
participantId: 'admin',
participantPwd: 'adminpw',
port: undefined,
security: false
security: false,
websockets: true
};
sinon.assert.calledWith(server, settings);
sinon.assert.calledOnce(listen);
listen.args[0][0].should.equal(3000);
listen.args[0][1].should.be.a('function');
});
});

it('should start and log information when running with explorer', () => {
let listen = sinon.stub();
let emit = sinon.stub();
let get = sinon.stub();
get.withArgs('port').returns(3000);
get.withArgs('url').returns('http://localhost:3000');
get.withArgs('loopback-component-explorer').returns(true);
process.argv = [
Expand All @@ -161,11 +185,14 @@ describe('composer-rest-server CLI unit tests', () => {
'-s', 'adminpw'
];
delete require.cache[require.resolve('yargs')];
const server = sinon.stub();
server.resolves({
listen: listen,
emit: emit,
get: get
const server = sinon.stub().resolves({
app: {
emit,
get
},
server: {
listen
}
});
return proxyquire('../cli', {
clear: () => { },
Expand All @@ -175,7 +202,8 @@ describe('composer-rest-server CLI unit tests', () => {
'./server/server': server
}).then(() => {
sinon.assert.calledOnce(listen);
listen.args[0][0]();
listen.args[0][0].should.equal(3000);
listen.args[0][1]();
sinon.assert.calledOnce(emit);
sinon.assert.calledWith(emit, 'started');
sinon.assert.calledWith(console.log, sinon.match(/Web server listening at/));
Expand All @@ -187,6 +215,7 @@ describe('composer-rest-server CLI unit tests', () => {
let listen = sinon.stub();
let emit = sinon.stub();
let get = sinon.stub();
get.withArgs('port').returns(3000);
get.withArgs('url').returns('http://localhost:3000');
process.argv = [
process.argv0, 'cli.js',
Expand All @@ -196,11 +225,14 @@ describe('composer-rest-server CLI unit tests', () => {
'-s', 'adminpw'
];
delete require.cache[require.resolve('yargs')];
const server = sinon.stub();
server.resolves({
listen: listen,
emit: emit,
get: get
const server = sinon.stub().resolves({
app: {
emit,
get
},
server: {
listen
}
});
return proxyquire('../cli', {
clear: () => { },
Expand All @@ -210,7 +242,8 @@ describe('composer-rest-server CLI unit tests', () => {
'./server/server': server
}).then(() => {
sinon.assert.calledOnce(listen);
listen.args[0][0]();
listen.args[0][0].should.equal(3000);
listen.args[0][1]();
sinon.assert.calledOnce(emit);
sinon.assert.calledWith(emit, 'started');
sinon.assert.calledWith(console.log, sinon.match(/Web server listening at/));
Expand Down
34 changes: 34 additions & 0 deletions packages/composer-rest-server/test/data/bond-network/lib/logic.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,39 @@ function publish(publishBond) {
});
}

/**
* Publish a new bond
* @param {org.acme.bond.EmitBondEvent} emitBondEvent - the publishBond transaction
* @transaction
*/
function bondEventEmitter(emitBondEvent) {
var factory = getFactory();
var bondEvent = factory.newEvent('org.acme.bond', 'BondEvent');
bondEvent.prop1 = 'foo';
bondEvent.prop2 = 'bar';
emit(bondEvent);
}

/**
* Publish a new bond
* @param {org.acme.bond.EmitMultipleBondEvents} emitMultipleBondEvents - the publishBond transaction
* @transaction
*/
function multipleBondEventEmitter(emitMultipleBondEvents) {
var factory = getFactory();
var bondEvent = factory.newEvent('org.acme.bond', 'BondEvent');
bondEvent.prop1 = 'foo';
bondEvent.prop2 = 'bar';
emit(bondEvent);
bondEvent = factory.newEvent('org.acme.bond', 'BondEvent');
bondEvent.prop1 = 'rah';
bondEvent.prop2 = 'car';
emit(bondEvent);
bondEvent = factory.newEvent('org.acme.bond', 'BondEvent');
bondEvent.prop1 = 'zoo';
bondEvent.prop2 = 'moo';
emit(bondEvent);
}

/*eslint-enable no-unused-vars*/
/*eslint-enable no-undef*/
Loading

0 comments on commit 5274211

Please sign in to comment.