Skip to content

Commit

Permalink
Not send retained msg in restored subscriptions (moscajs#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnought authored Sep 13, 2019
1 parent 02913a3 commit 56aa327
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 29 deletions.
2 changes: 2 additions & 0 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ function doConnack (arg, done) {
})
}

// push any queued messages (included retained messages) at the disconnected time
// when QoS > 0 and session is true
function emptyQueue (arg, done) {
var client = this.client
var persistence = client.broker.persistence
Expand Down
39 changes: 22 additions & 17 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,29 @@ function completeSubscribe (err) {
done()
}

var persistence = broker.persistence
var topics = []
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
// Conform to MQTT 3.1.1 section 3.1.2.4
// Restored sessions should not contain any retained message.
// Retained message should be only fetched from SUBSCRIBE.
if (!packet.restore) {
var persistence = broker.persistence
var topics = []
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliverQoS(packet, cb)
}))
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliverQoS(packet, cb)
}))
}

function nop () {}
Expand Down
109 changes: 97 additions & 12 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) {
})
})

test('not clean and retain messages with QoS 1', function (t) {
t.plan(10)
test('disconnect and retain messages with QoS 1 [clean=false]', function (t) {
t.plan(8)

var broker = aedes()
var publisher
Expand Down Expand Up @@ -529,22 +529,107 @@ test('not clean and retain messages with QoS 1', function (t) {
})

subscriber.outStream.once('data', function (packet) {
// receive any queued messages (no matter they are retained messages) at the disconnected time
t.notEqual(packet.messageId, 42, 'messageId must differ')
var prevId = packet.messageId
delete packet.messageId
packet.length = 14
t.deepEqual(packet, expected, 'packet must match')

// message is duplicated
subscriber.outStream.once('data', function (packet2) {
var curId = packet2.messageId
t.notOk(curId === prevId, 'messageId must differ')
subscriber.inStream.write({
cmd: 'puback',
messageId: curId
})
delete packet2.messageId
// there should be no messages come from restored subscriptions
subscriber.outStream.once('data', function (packet) {
t.fail('should not receive any more messages')
})
})
})
})
broker.on('closed', t.end.bind(t))
})

test('disconnect and two retain messages with QoS 1 [clean=false]', function (t) {
t.plan(17)

var broker = aedes()
var publisher
var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
var expected = {
cmd: 'publish',
topic: 'hello',
qos: 1,
dup: false,
length: 14,
retain: true
}

subscribe(t, subscriber, 'hello', 1, function () {
subscriber.inStream.write({
cmd: 'disconnect'
})

subscriber.outStream.on('data', function (packet) {
console.log('original', packet)
})

publisher = connect(setup(broker))

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 41,
retain: true
})

publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world2',
qos: 1,
messageId: 42,
retain: true
})

publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

broker.on('clientError', function (client, err) {
t.equal(err.message, 'connection closed')
})

subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
t.equal(connect.sessionPresent, true, 'session present is set to true')
})

subscriber.outStream.once('data', function (packet) {
// receive any queued messages (included retained messages) at the disconnected time
t.notEqual(packet.messageId, 41, 'messageId must differ')
delete packet.messageId
packet.length = 14
expected.payload = Buffer.from('world')
t.deepEqual(packet, expected, 'packet must match')

// receive any queued messages (included retained messages) at the disconnected time
subscriber.outStream.once('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId must differ')
delete packet.messageId
packet.length = 14
expected.payload = Buffer.from('world2')
t.deepEqual(packet, expected, 'packet must match')

// should get the last retained message when we do a subscribe
subscribe(t, subscriber, 'hello', 1, function () {
subscriber.outStream.on('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId must differ')
delete packet.messageId
packet.length = 14
expected.payload = Buffer.from('world2')
t.deepEqual(packet, expected, 'packet must match')
})
})
})
})
})
})
Expand Down

0 comments on commit 56aa327

Please sign in to comment.