Skip to content

Commit

Permalink
Support socket closure (hyperledger-archives#4149)
Browse files Browse the repository at this point in the history
The proxy connector creates a single socket to the proxy server shared by all connections. However it never closed that socket which was ok for playground but not much good for anything else.

closes #4097

Signed-off-by: Dave Kelsey <[email protected]>
  • Loading branch information
Dave Kelsey authored Jun 13, 2018
1 parent cd368f7 commit 2c166e4
Show file tree
Hide file tree
Showing 12 changed files with 295 additions and 128 deletions.
2 changes: 1 addition & 1 deletion packages/composer-common/lib/idcard.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class IdCard {
*/
static fromDirectory(cardDirectory, fs) {
const method = 'fromDirectory';
LOG.entry(method, cardDirectory, fs);
LOG.entry(method, cardDirectory);

if (!fs) {
fs = nodeFs;
Expand Down
1 change: 1 addition & 0 deletions packages/composer-connector-proxy/lib/proxyconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ProxyConnection extends Connection {
})
.then(() => {
this.socket.removeListener('events', () => {});
this.connectionManager.disconnect(this.connectionID);
LOG.exit(method);
});
}
Expand Down
197 changes: 134 additions & 63 deletions packages/composer-connector-proxy/lib/proxyconnectionmanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ const ProxyConnection = require('./proxyconnection');
const ProxyUtil = require('./proxyutil');
const socketIOClient = require('socket.io-client');
const Logger = require('composer-common').Logger;
const uuid = require('uuid');

const LOG = Logger.getLog('ProxyConnectionManager');

let connectorServerURL = 'http://localhost:15699';

let connectorStrategy = {
closeOnDisconnect: true
};

/**
* Base class representing a connection manager that establishes and manages
* connections to one or more business networks.
Expand All @@ -40,6 +45,29 @@ class ProxyConnectionManager extends ConnectionManager {
connectorServerURL = url;
}

/**
* @typedef Strategy
* @property {boolean} closeOnDisconnect true to close socket when all connections are disconnected, false to always leave open
*/

/**
* Set the connector strategy
* @param {Strategy} strategy the connector strategy
*/
static setConnectorStrategy(strategy) {
if (strategy) {
connectorStrategy = strategy;
}
}

/**
* get the connector strategy
* @returns {Strategy} strategy the connector strategy
*/
static getConnectorStrategy() {
return connectorStrategy;
}

/**
* Create a connection for ease of unit testing
* @param {ProxyConnectionManager} _this The ConnectionManaget
Expand All @@ -61,14 +89,25 @@ class ProxyConnectionManager extends ConnectionManager {
*/
constructor(connectionProfileManager) {
super(connectionProfileManager);
this.connections = new Set();
this.connected = false;
this.socket = socketIOClient(connectorServerURL);
this.socket.on('connect', () => {
this.connected = true;
});
this.socket.on('disconnect', () => {
this.connected = false;
});
}

/**
* notification of a connection managed by this instance has disconnected.
* @param {string} connectionID the connectionID that was disconnected.
*/
disconnect(connectionID) {
if (this.connections.has(connectionID)) {
this.connections.delete(connectionID);
if (this.connections.size === 0 && connectorStrategy.closeOnDisconnect) {
this.socket.close();

// throw away the socket rather than try to wait for a close and reconnect it if a
// connection is required as it's easier
this.socket = null;
}
}
}

/**
Expand All @@ -78,13 +117,34 @@ class ProxyConnectionManager extends ConnectionManager {
* error.
*/
ensureConnected() {
if (this.connected) {
const method = 'ensureConnected';
LOG.entry(method);

if (this.socket && this.connected) {
LOG.exit(method, 'socket already connected');
return Promise.resolve();
}

LOG.debug(method, 'no socket or socket not connected, creating a new socket');
this.socket = socketIOClient(connectorServerURL);
this.socket.once('disconnect', () => {
LOG.debug(method, 'socket disconnect received');
this.connected = false;
});

// return a promise that will be fulfilled when connection is established
return new Promise((resolve, reject) => {
this.socket.once('connect', () => {
LOG.debug(method, 'socket connect received');
this.connected = true;
resolve();
});
this.socket.once('connect_error', (error) => {
LOG.exit(method, 'socket connect failed', error);
// socket connect failed, so shouldn't be connected so will create a new
// socket when a request to connect is made again.
reject(error);
});
});
}

Expand All @@ -97,21 +157,22 @@ class ProxyConnectionManager extends ConnectionManager {
* @param {string} privateKey the private key
* @returns {Promise} a promise
*/
importIdentity(connectionProfile, connectionOptions, id, certificate, privateKey) {
async importIdentity(connectionProfile, connectionOptions, id, certificate, privateKey) {
const method = 'importIdentity';
LOG.entry(method, connectionProfile, connectionOptions, id, certificate);
return this.ensureConnected()
.then(() => {
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerImportIdentity', connectionProfile, connectionOptions, id, certificate, privateKey, (error) => {
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
LOG.exit(method);
resolve();
});
});
let tempConnection = uuid.v4();
this.connections.add(tempConnection);
await this.ensureConnected();
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerImportIdentity', connectionProfile, connectionOptions, id, certificate, privateKey, (error) => {
this.disconnect(tempConnection);
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
LOG.exit(method);
resolve();
});
});
}

/**
Expand All @@ -121,21 +182,22 @@ class ProxyConnectionManager extends ConnectionManager {
* @param {string} id the id to associate with the identity
* @returns {Promise} a promise
*/
removeIdentity(connectionProfile, connectionOptions, id) {
async removeIdentity(connectionProfile, connectionOptions, id) {
const method = 'importIdentity';
LOG.entry(method, connectionProfile, connectionOptions, id);
return this.ensureConnected()
.then(() => {
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerRemoveIdentity', connectionProfile, connectionOptions, id, (error) => {
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
LOG.exit(method);
resolve();
});
});
let tempConnection = uuid.v4();
this.connections.add(tempConnection);
await this.ensureConnected();
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerRemoveIdentity', connectionProfile, connectionOptions, id, (error) => {
this.disconnect(tempConnection);
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
LOG.exit(method);
resolve();
});
});
}


Expand All @@ -146,21 +208,22 @@ class ProxyConnectionManager extends ConnectionManager {
* @param {String} id - Name of the identity.
* @return {Promise} Resolves to credentials in the form <em>{ certificate: String, privateKey: String }</em>.
*/
exportIdentity(connectionProfileName, connectionOptions, id) {
async exportIdentity(connectionProfileName, connectionOptions, id) {
const method = 'exportIdentity';
LOG.entry(method, connectionProfileName, connectionOptions, id);
return this.ensureConnected()
.then(() => {
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerExportIdentity', connectionProfileName, connectionOptions, id, (error, credentials) => {
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
LOG.exit(method, credentials);
resolve(credentials);
});
});
let tempConnection = uuid.v4();
this.connections.add(tempConnection);
await this.ensureConnected();
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerExportIdentity', connectionProfileName, connectionOptions, id, (error, credentials) => {
this.disconnect(tempConnection);
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
LOG.exit(method, credentials);
resolve(credentials);
});
});
}

/**
Expand All @@ -171,29 +234,37 @@ class ProxyConnectionManager extends ConnectionManager {
* @return {Promise} A promise that is resolved with a {@link Connection}
* object once the connection is established, or rejected with a connection error.
*/
connect(connectionProfile, businessNetworkIdentifier, connectionOptions) {
async connect(connectionProfile, businessNetworkIdentifier, connectionOptions) {
const method = 'connect';
LOG.entry(method, connectionProfile, businessNetworkIdentifier, connectionOptions);
return this.ensureConnected()
.then(() => {
return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerConnect', connectionProfile, businessNetworkIdentifier, connectionOptions, (error, connectionID) => {
if (error) {
return reject(ProxyUtil.inflaterr(error));
}
let connection = ProxyConnectionManager.createConnection(this, connectionProfile, businessNetworkIdentifier, this.socket, connectionID);
// Only emit when client
this.socket.on('events', (myConnectionID, events) => {
LOG.debug(method, events);
if (myConnectionID === connectionID) {
connection.emit('events', events);
}
});
LOG.exit(method, connection);
resolve(connection);
});

// create a temporary connection to avoid a problem with a disconnect breaking a connect
// setup due to timing.
let tempConnection = uuid.v4();
this.connections.add(tempConnection);
await this.ensureConnected();

return new Promise((resolve, reject) => {
this.socket.emit('/api/connectionManagerConnect', connectionProfile, businessNetworkIdentifier, connectionOptions, (error, connectionID) => {
if (error) {
this.disconnect(tempConnection);
return reject(ProxyUtil.inflaterr(error));
}
let connection = ProxyConnectionManager.createConnection(this, connectionProfile, businessNetworkIdentifier, this.socket, connectionID);
this.connections.add(connectionID);
// remove the temp connection now the real one has been added
this.disconnect(tempConnection);
// Only emit when client
this.socket.on('events', (myConnectionID, events) => {
LOG.debug(method, events);
if (myConnectionID === connectionID) {
connection.emit('events', events);
}
});
LOG.exit(method, connection);
resolve(connection);
});
});
}

}
Expand Down
3 changes: 2 additions & 1 deletion packages/composer-connector-proxy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@
"watchify": "3.7.0"
},
"dependencies": {
"socket.io-client": "1.7.3"
"socket.io-client": "1.7.3",
"uuid": "3.0.1"
},
"nyc": {
"exclude": [
Expand Down
6 changes: 4 additions & 2 deletions packages/composer-connector-proxy/test/proxyconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
'use strict';

const { BusinessNetworkDefinition } = require('composer-common');
const ConnectionManager = require('composer-common').ConnectionManager;
const ProxyConnectionManager = require('../lib/proxyconnectionmanager');
const ProxyConnection = require('../lib/proxyconnection');
const ProxySecurityContext = require('../lib/proxysecuritycontext');
const serializerr = require('serializerr');
Expand All @@ -42,7 +42,7 @@ describe('ProxyConnection', () => {
let mockSecurityContext;

beforeEach(() => {
mockConnectionManager = sinon.createStubInstance(ConnectionManager);
mockConnectionManager = sinon.createStubInstance(ProxyConnectionManager);
mockSocket = {
emit: sinon.stub(),
removeListener: sinon.stub()
Expand All @@ -60,6 +60,8 @@ describe('ProxyConnection', () => {
.then(() => {
sinon.assert.calledOnce(mockSocket.emit);
sinon.assert.calledWith(mockSocket.emit, '/api/connectionDisconnect', connectionID, sinon.match.func);
sinon.assert.calledOnce(mockConnectionManager.disconnect);
sinon.assert.calledWith(mockConnectionManager.disconnect, connection.connectionID);
mockSocket.removeListener.withArgs('events', sinon.match.func).yield();
});
});
Expand Down
Loading

0 comments on commit 2c166e4

Please sign in to comment.