Skip to content

Commit

Permalink
[Master] event logging and channel closure (hyperledger-archives#4441)
Browse files Browse the repository at this point in the history
closes #4425

Signed-off-by: Dave Kelsey <[email protected]>
  • Loading branch information
Dave Kelsey authored Oct 4, 2018
1 parent 384fe26 commit 6140782
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 12 deletions.
18 changes: 11 additions & 7 deletions packages/composer-connector-hlfv1/lib/hlfconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,18 @@ class HLFConnection extends Connection {
}

this.eventHubs.forEach((eventHub) => {
if (eventHub.isconnected()) {
eventHub.disconnect();
try {
if (eventHub.isconnected()) {
eventHub.disconnect();
}
} catch(error) {
// log an error but don't stop
LOG.error(method, `failed to disconnect from eventhub on ${eventHub.getPeerAddr()}`);
LOG.error(method, error);
}
});
this.channel.close();
LOG.exit(method);
})
.catch((error) => {
const newError = new Error('Error trying disconnect. ' + error);
LOG.error(method, newError);
throw newError;
});
}

Expand Down Expand Up @@ -694,6 +696,7 @@ class HLFConnection extends Connection {
const proposal = proposalResponse[1];
const eventHandler = HLFConnection.createTxEventHandler(this.eventHubs, transactionId.getTransactionID(), this.commitTimeout);
eventHandler.startListening();
LOG.debug(method, 'TxEventHandler started listening, sending valid responses to the orderer');
const response = await this.channel.sendTransaction({
proposalResponses: validResponses,
proposal: proposal
Expand Down Expand Up @@ -963,6 +966,7 @@ class HLFConnection extends Connection {
this._checkCCListener();
eventHandler = HLFConnection.createTxEventHandler(this.eventHubs, txId.getTransactionID(), this.commitTimeout);
eventHandler.startListening();
LOG.debug(method, 'TxEventHandler started listening, sending valid responses to the orderer');
const response = await this.channel.sendTransaction({
proposalResponses: validResponses,
proposal: proposal,
Expand Down
8 changes: 6 additions & 2 deletions packages/composer-connector-hlfv1/lib/hlftxeventhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ class HLFTxEventHandler {
eh.unregisterTxEvent(this.txId);

// We reject to let the application know that the commit did not complete within the timeout
reject(new Error(`Failed to receive commit notification from ${eh.getPeerAddr()} for transaction '${this.txId}' within the timeout period`));
const timeoutMsg = `Failed to receive commit notification from ${eh.getPeerAddr()} for transaction '${this.txId}' within the timeout period`;
LOG.error(method, timeoutMsg);
reject(new Error(timeoutMsg));
}, this.timeout);

eh.registerTxEvent(this.txId,
(tx, code) => {
clearTimeout(handle);
eh.unregisterTxEvent(this.txId);
if (code !== 'VALID') {
reject(new Error(`Peer ${eh.getPeerAddr()} has rejected transaction '${this.txId}' with code ${code}`));
const rejectMsg = `Peer ${eh.getPeerAddr()} has rejected transaction '${this.txId}' with code ${code}`;
LOG.error(rejectMsg);
reject(new Error(rejectMsg));
} else {
resolve();
}
Expand Down
43 changes: 40 additions & 3 deletions packages/composer-connector-hlfv1/test/hlfconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ describe('HLFConnection', () => {
let mockPeer1, mockPeer2, mockPeer3, mockEventHub1, mockEventHub2, mockEventHub3, mockQueryHandler;
let connectOptions;
let connection;
let mockTransactionID, logWarnSpy;
let mockTransactionID, logWarnSpy, logErrorSpy;

beforeEach(() => {
sandbox = sinon.sandbox.create();
clock = sinon.useFakeTimers();
const LOG = Logger.getLog('HLFConnection');
logWarnSpy = sandbox.spy(LOG, 'warn');
logErrorSpy = sandbox.spy(LOG, 'error');
mockConnectionManager = sinon.createStubInstance(HLFConnectionManager);
mockChannel = sinon.createStubInstance(Channel);
mockClient = sinon.createStubInstance(Client);
Expand Down Expand Up @@ -396,6 +397,7 @@ describe('HLFConnection', () => {
await connection.disconnect();
clock.tick(100);
sinon.assert.calledOnce(stub);
sinon.assert.calledOnce(mockChannel.close);
});

it('should unregister the exit listener', () => {
Expand All @@ -410,6 +412,7 @@ describe('HLFConnection', () => {
.then(() => {
sinon.assert.calledOnce(stubRemove);
sinon.assert.calledWith(stubRemove, exitListener);
sinon.assert.calledOnce(mockChannel.close);
});


Expand All @@ -421,6 +424,7 @@ describe('HLFConnection', () => {
return connection.disconnect()
.then(() => {
sinon.assert.notCalled(mockEventHub1.unregisterChaincodeEvent);
sinon.assert.calledOnce(mockChannel.close);
});
});

Expand All @@ -433,6 +437,7 @@ describe('HLFConnection', () => {
.then(() => {
sinon.assert.calledOnce(mockEventHub1.unregisterChaincodeEvent);
sinon.assert.calledWith(mockEventHub1.unregisterChaincodeEvent, 'handle');
sinon.assert.calledOnce(mockChannel.close);
});
});

Expand All @@ -442,6 +447,7 @@ describe('HLFConnection', () => {
return connection.disconnect()
.then(() => {
sinon.assert.calledOnce(mockEventHub1.disconnect);
sinon.assert.calledOnce(mockChannel.close);
});
});

Expand All @@ -451,15 +457,46 @@ describe('HLFConnection', () => {
return connection.disconnect()
.then(() => {
sinon.assert.notCalled(mockEventHub1.disconnect);
sinon.assert.calledOnce(mockChannel.close);
});
});

it('should handle an error disconnecting from the event hub', () => {
mockChannel.getPeers.returns([mockPeer1, mockPeer2]);
mockPeer2.isInRole = sinon.stub().withArgs(FABRIC_CONSTANTS.NetworkConfig.EVENT_SOURCE_ROLE).returns(true);
mockChannel.newChannelEventHub.withArgs(mockPeer2).returns(mockEventHub2);

sandbox.stub(process, 'on').withArgs('exit').yields();
connection._connectToEventHubs();
mockEventHub1.isconnected.throws(new Error('such error'));
const err = new Error('isconnected error');
mockEventHub1.isconnected.throws(err);
mockEventHub2.isconnected.returns(true);
return connection.disconnect()
.should.be.rejectedWith(/such error/);
.then(() => {
sinon.assert.notCalled(mockEventHub1.disconnect);
sinon.assert.calledOnce(mockEventHub2.disconnect);
sinon.assert.calledOnce(mockChannel.close);
sinon.assert.calledWith(logErrorSpy, 'disconnect', err);
});
});

it('should handle an error disconnecting from the event hub #2', () => {
mockChannel.getPeers.returns([mockPeer1, mockPeer2]);
mockPeer2.isInRole = sinon.stub().withArgs(FABRIC_CONSTANTS.NetworkConfig.EVENT_SOURCE_ROLE).returns(true);
mockChannel.newChannelEventHub.withArgs(mockPeer2).returns(mockEventHub2);
sandbox.stub(process, 'on').withArgs('exit').yields();
connection._connectToEventHubs();
mockEventHub1.isconnected.returns(true);
mockEventHub2.isconnected.returns(true);
const err = new Error('disconnect error');
mockEventHub1.disconnect.throws(err);
return connection.disconnect()
.then(() => {
sinon.assert.calledOnce(mockEventHub1.disconnect);
sinon.assert.calledOnce(mockEventHub2.disconnect);
sinon.assert.calledOnce(mockChannel.close);
sinon.assert.calledWith(logErrorSpy, 'disconnect', err);
});
});

it('should handle being called twice', () => {
Expand Down

0 comments on commit 6140782

Please sign in to comment.