Skip to content

Commit

Permalink
Redis connection lock (openhab#413)
Browse files Browse the repository at this point in the history
* Logging updates

Signed-off-by: Dan Cunningham <[email protected]>

* Use redis locking for connections

Signed-off-by: Dan Cunningham <[email protected]>

* Add cleanup if lock is no longer present on a active connection

Signed-off-by: Dan Cunningham <[email protected]>

* Simplify lock value

Signed-off-by: Dan Cunningham <[email protected]>

* PR feedback changes

Signed-off-by: Dan Cunningham <[email protected]>

---------

Signed-off-by: Dan Cunningham <[email protected]>
  • Loading branch information
digitaldan authored Jan 29, 2023
1 parent 1e885bf commit 1b5eacd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 36 deletions.
117 changes: 81 additions & 36 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,9 @@ function sendIosNotifications(iosDeviceTokens, notification) {
}
}

// In case of polling transport set poll duration to 300 seconds
//io.set('polling duration', 300);

// Socket.io Routes
io.use(function (socket, next) {
var handshakeData = socket.handshake;
logger.info('openHAB-cloud: Authorizing incoming openHAB connection');
handshakeData.uuid = handshakeData.query['uuid'];
handshakeData.openhabVersion = handshakeData.query['openhabversion'];
handshakeData.clientVersion = handshakeData.query['clientVersion'];
Expand All @@ -406,6 +403,7 @@ io.use(function (socket, next) {
if (!handshakeData.clientVersion) {
handshakeData.clientVersion = 'unknown';
}
logger.info('openHAB-cloud: Authorizing incoming openHAB connection for ' + handshakeData.uuid );
Openhab.findOne({
uuid: handshakeData.uuid,
secret: handshakeSecret
Expand All @@ -425,15 +423,47 @@ io.use(function (socket, next) {
});
});

io.use(function (socket, next) {
logger.info('openHAB-cloud: obtaining lock for connection for uuid ' + socket.handshake.uuid);
socket.connectionId = uuid.v1(); //we will check this when we handle disconnects
//set a lock so only one connection from the same client can connect, avoids split brain when clients reconnect
socket.redisLockKey = 'connection:' + socket.handshake.uuid;
redis.set(socket.redisLockKey, socket.connectionId, 'NX', 'EX', system.getConnectionLockTimeSeconds(), (err, result) => {
if(err) {
logger.info('openHAB-cloud: error attaining connection lock for uuid ' + socket.handshake.uuid + ' connectionId ' + socket.connectionId + ' ' + err);
return next(new Error('connection lock error'));
}

if(!result){
//this key already exists, which means another connection exists
logger.info('openHAB-cloud: another connection has lock for uuid ' + socket.handshake.uuid + ' my connectionId ' + socket.connectionId);
return next(new Error('already connected'));
}
next();
});
});

io.sockets.on('connection', function (socket) {
logger.info('openHAB-cloud: Incoming openHAB connection for uuid ' + socket.handshake.uuid);
if(!socket.openhab){
logger.info('openHAB-cloud: openhab schema not found on socket for uuid' + socket.handshake.uuid);
socket.disconnect();
return;
}
logger.info('openHAB-cloud: connection for uuid ' + socket.handshake.uuid + + ' connectionId ' + socket.connectionId);
socket.join(socket.handshake.uuid);
socket.connectionId = uuid.v1(); //we will check this when we handle disconnects
//listen for pings from the client
socket.conn.on('packet', function (packet) {
if (packet.type === 'ping') {
//reset the expire time
redis.expire(socket.redisLockKey, system.getConnectionLockTimeSeconds(), (error, number) => {
if(error){
logger.error('openHAB-cloud: error updating lock expire for uuid ' + socket.handshake.uuid + + ' connectionId ' + socket.connectionId + " " + error);
return;
}
if(number === 0){
logger.error('openHAB-cloud: lock no longer present for for uuid ' + socket.handshake.uuid + + ' connectionId ' + socket.connectionId + " " + error);
//we have lost our lock, something has gone wrong, lets cleanup
socket.disconnect();
return;
}
});
};
});

const openhab = socket.openhab;
const lastOnline = openhab.last_online;
Expand All @@ -453,7 +483,7 @@ io.sockets.on('connection', function (socket) {
socket.disconnect();
return;
}
logger.info('openHAB-cloud: connect success uuid ' + openhab.uuid + ' prevous address ' + lastServerAddress + " my address " + internalAddress);
logger.info('openHAB-cloud: connect success uuid ' + openhab.uuid + ' connectionId ' + socket.connectionId + ' prevous address ' + lastServerAddress + " my address " + internalAddress);

socket.openhabId = openhab.id;

Expand All @@ -477,35 +507,50 @@ io.sockets.on('connection', function (socket) {
}
}
);

socket.on('disconnect', function () {
logger.info('openHAB-cloud: Disconnected uuid ' + socket.handshake.uuid + ' connectionId ' + socket.connectionId);
Openhab.setOffline(socket.connectionId, function (error, openhab) {
if (error) {
logger.error('openHAB-cloud: Error saving openHAB disconnect: ' + error);

//remove our lock, but make sure its our connection id, and not a healthy reconnection from the same client
redis.get(socket.redisLockKey, (err, reply) => {
if (err) {
logger.info('openHAB-cloud: error removing connection lock for uuid ' + openhab.uuid + ' connectionId ' + socket.connectionId + ' ' + err);
return;
}
if(openhab) {
//we will try and notifiy users of being offline
offlineOpenhabs[openhab.uuid] = {
date: Date.now(),
connectionId: socket.connectionId
}

var disconnectevent = new Event({
openhab: openhab.id,
source: 'openhab',
status: 'offline',
color: 'bad'
});

disconnectevent.save(function (error) {
//check if either null, in which case the lock is gone and we should clean up,
//or check if its's there and belongs to this connectionId (and not a reconnect)
if (!reply || reply === socket.connectionId) {
redis.del(socket.redisLockKey); //just ignore if the key does not exist
Openhab.setOffline(socket.connectionId, function (error, openhab) {
if (error) {
logger.error('openHAB-cloud: Error saving disconnect event: ' + error);
logger.error('openHAB-cloud: Error saving openHAB disconnect: ' + error);
return;
}
if(openhab) {
//we will try and notifiy users of being offline
offlineOpenhabs[openhab.uuid] = {
date: Date.now(),
connectionId: socket.connectionId
}

var disconnectevent = new Event({
openhab: openhab.id,
source: 'openhab',
status: 'offline',
color: 'bad'
});

disconnectevent.save(function (error) {
if (error) {
logger.error('openHAB-cloud: Error saving disconnect event: ' + error);
}
});
} else {
logger.warn(`openHAB-cloud: ${socket.handshake.uuid} Did not mark as offline, another instance is connected`);
}
});
} else {
logger.warn(`openHAB-cloud: ${socket.handshake.uuid} Did not mark as offline, another instance is connected`);
logger.info('openHAB-cloud: will not delete lock, another connection has lock for uuid ' + socket.handshake.uuid + ' my connectionId ' + socket.connectionId + ' their info: ' + reply);
}
});
});
Expand All @@ -519,7 +564,7 @@ io.sockets.on('connection', function (socket) {
if (self.handshake.uuid === request.openhab.uuid && !request.headersSent) {
request.writeHead(data.responseStatusCode, data.responseStatusText, data.headers);
} else {
logger.warn('openHAB-cloud: ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own');
logger.warn('openHAB-cloud: responseHeader ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own ' + request.openhab.uuid + " or headers have already been sent");
}
} else {
self.emit('cancel', {
Expand All @@ -536,7 +581,7 @@ io.sockets.on('connection', function (socket) {
if (self.handshake.uuid === request.openhab.uuid) {
request.write(data.body);
} else {
logger.warn('openHAB-cloud: ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own');
logger.warn('openHAB-cloud: responseContentBinary ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own ' + request.openhab.uuid);
}
} else {
self.emit('cancel', {
Expand All @@ -553,7 +598,7 @@ io.sockets.on('connection', function (socket) {
if (self.handshake.uuid === request.openhab.uuid) {
request.end();
} else {
logger.warn('openHAB-cloud: ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own');
logger.warn('openHAB-cloud: responseFinished ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own' + request.openhab.uuid);
}
}
});
Expand All @@ -566,7 +611,7 @@ io.sockets.on('connection', function (socket) {
if (self.handshake.uuid === request.openhab.uuid) {
request.send(500, data.responseStatusText);
} else {
logger.warn('openHAB-cloud: ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own');
logger.warn('openHAB-cloud: responseError ' + self.handshake.uuid + ' tried to respond to request which it doesn\'t own' + request.openhab.uuid);
}
}
});
Expand Down
12 changes: 12 additions & 0 deletions system/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ System.prototype.getMuteNotifications = function() {
}
};

/**
* Returns the amount of seconds for openhab connection locks to persist in redis
* @returns {Number}
*/
System.prototype.getConnectionLockTimeSeconds = function() {
try {
return this.getConfig(['system', 'connectionLockTimeSeconds']);
} catch (e) {
return 70;
}
};

/**
* Checks, if new user registration should be enabled or not.
*
Expand Down

0 comments on commit 1b5eacd

Please sign in to comment.