Skip to content

Commit

Permalink
Handle unsubscription safer while client closes (#454)
Browse files Browse the repository at this point in the history
* Handle unsubscription safer while client closes

* Added test

* Added assertion

* Update README.md
  • Loading branch information
gnought authored Mar 3, 2020
1 parent 209efd0 commit 05d2ee3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 12 deletions.
3 changes: 2 additions & 1 deletion docs/Client.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ Subscribe client to the list of topics.
## client.unsubscribe (unsubscriptions, [callback])

- `unsubscriptions` `<object>`
- `callback` `<Function>`
- `callback` `<Function>` `(error) => void`
- error `<Error>` | `null`

Unsubscribe client to the list of topics.

Expand Down
1 change: 0 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ Client.prototype.close = function (done) {
handleUnsubscribe(
this,
{
close: true,
unsubscriptions: Object.keys(this.subscriptions)
},
finish)
Expand Down
21 changes: 11 additions & 10 deletions lib/handlers/unsubscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
const write = require('../write')
const { validateTopic } = require('../utils')

function UnSubAck (packet) {
this.cmd = 'unsuback'
this.messageId = packet.messageId // [MQTT-3.10.4-4]
}

function UnsubscribeState (client, packet, finish) {
this.client = client
this.packet = packet
Expand Down Expand Up @@ -71,20 +76,16 @@ function completeUnsubscribe (err) {
const packet = this.packet
const done = this.finish

if ((!packet.close || client.clean === true) && packet.unsubscriptions.length > 0) {
client.broker.emit('unsubscribe', packet.unsubscriptions, client)
}

if (packet.messageId) {
const response = {
cmd: 'unsuback',
messageId: packet.messageId
}

write(client, response, done)
write(client, new UnSubAck(packet),
done)
} else {
done()
}

if ((!client.closed || client.clean === true) && packet.unsubscriptions.length > 0) {
client.broker.emit('unsubscribe', packet.unsubscriptions, client)
}
}

module.exports = handleUnsubscribe
22 changes: 22 additions & 0 deletions test/meta.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,28 @@ test('emit unsubscribe event', function (t) {
})
})

test('emit unsubscribe event if unrecognized params in unsubscribe packet structure', function (t) {
t.plan(3)

const broker = aedes()
t.tearDown(broker.close.bind(broker))

const s = noError(connect(setup(broker)))
const unsubs = [{ topic: 'hello', qos: 0 }]

broker.on('unsubscribe', function (unsubscriptions, client) {
t.equal(unsubscriptions, unsubs)
t.deepEqual(client, s.client)
})

s.client.unsubscribe({
unsubscriptions: unsubs,
close: true
}, function (err) {
t.error(err)
})
})

test('dont emit unsubscribe event on client close', function (t) {
t.plan(3)

Expand Down

0 comments on commit 05d2ee3

Please sign in to comment.