Skip to content

Commit

Permalink
Fixed leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Nov 10, 2018
1 parent 11bb0c6 commit 9ceb263
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void processPublish(MqttPublishMessage msg) {
}
switch (qos) {
case AT_MOST_ONCE:
postOffice.receivedPublishQos0(topic, username, clientId, payload, retain);
postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
break;
case AT_LEAST_ONCE: {
final int messageID = msg.variableHeader().packetId();
Expand Down
5 changes: 1 addition & 4 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int
mqttConnection.sendUnsubAckMessage(topics, clientID, messageId);
}

void receivedPublishQos0(Topic topic, String username, String clientID, ByteBuf payload, boolean retain) {
void receivedPublishQos0(Topic topic, String username, String clientID, ByteBuf payload, boolean retain, MqttPublishMessage msg) {
if (!authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic: {}", clientID, topic);
return;
Expand All @@ -155,9 +155,6 @@ void receivedPublishQos0(Topic topic, String username, String clientID, ByteBuf
retainedRepository.cleanRetained(topic);
}

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic.toString(), 0);
MqttPublishMessage msg = new MqttPublishMessage(fixedHeader, varHeader, payload.retainedDuplicate());
interceptor.notifyTopicPublished(msg, clientID, username);
}

Expand Down
6 changes: 4 additions & 2 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.moquette.spi.impl.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.*;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -335,12 +336,13 @@ void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload

public void receivedPublishQos2(int messageID, MqttPublishMessage msg) {
qos2Receiving.put(messageID, msg);
msg.retain();
msg.retain(); // retain to put in the inflight map
mqttConnection.sendPublishReceived(messageID);
}

public void receivedPubRelQos2(int messageID) {
qos2Receiving.remove(messageID);
final MqttPublishMessage removedMsg = qos2Receiving.remove(messageID);
ReferenceCountUtil.release(removedMsg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,18 @@ public class MQTTMessageLogger extends ChannelDuplexHandler {

@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
logMQTTMessage(ctx, message, "C->B");
logMQTTMessageRead(ctx, message);
ctx.fireChannelRead(message);
}

private void logMQTTMessageRead(ChannelHandlerContext ctx, Object message) throws Exception {
logMQTTMessage(ctx, message, "C->B");
}

private void logMQTTMessageWrite(ChannelHandlerContext ctx, Object message) throws Exception {
logMQTTMessage(ctx, message, "C<-B");
}

private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String direction) throws Exception {
if (!(message instanceof MqttMessage)) {
return;
Expand Down Expand Up @@ -106,7 +114,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
logMQTTMessage(ctx, msg, "C<-B");
logMQTTMessageWrite(ctx, msg);
ctx.write(msg, promise).addListener(CLOSE_ON_FAILURE);
}
}
28 changes: 24 additions & 4 deletions broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ public void testPublishQoS0ToItself() {

// Exercise
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

// Verify
ConnectionTestUtils.verifyReceivePublish(channel, NEWS_TOPIC, "Hello world!");
Expand Down Expand Up @@ -187,7 +192,12 @@ public void testPublishToMultipleSubscribers() {

// Exercise
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

// Verify
ConnectionTestUtils.verifyReceivePublish(channel1, NEWS_TOPIC, "Hello world!");
Expand All @@ -206,7 +216,12 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, anyPayload, true);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, anyPayload, true,
MqttMessageBuilders.publish()
.payload(anyPayload)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

// Verify
assertTrue("QoS0 MUST clean retained message for topic", retainedRepository.isEmtpy());
Expand Down Expand Up @@ -363,7 +378,12 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() {
// publish a QoS0 retained message
// Exercise
final ByteBuf qos0Payload = Unpooled.copiedBuffer("QoS0 payload", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(), qos0Payload, true);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(), qos0Payload, true,
MqttMessageBuilders.publish()
.payload(qos0Payload)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

// Verify
assertTrue("Retained message for topic /news must be cleared", retainedRepository.isEmtpy());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.moquette.broker;

import io.moquette.persistence.MemoryStorageService;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.impl.MockAuthenticator;
import io.moquette.spi.impl.security.PermitAllAuthorizatorPolicy;
import io.moquette.spi.impl.subscriptions.CTrieSubscriptionDirectory;
Expand Down Expand Up @@ -225,7 +223,12 @@ public void testCleanSession_maintainClientSubscriptions() {
assertEquals("After a reconnect, subscription MUST be still present", 1, subscriptions.size());

final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyPublishIsReceived(anotherChannel, AT_MOST_ONCE, "Hello world!");
}
Expand Down Expand Up @@ -265,7 +268,12 @@ public void testCleanSession_correctlyClientSubscriptions() {

// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false,
MqttMessageBuilders.publish()
.payload(payload)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

// verify no publish is fired
ConnectionTestUtils.verifyNoPublishIsReceived(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,25 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn

// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyPublishIsReceived(channel, AT_MOST_ONCE, "Hello world!");

unsubscribeAndVerifyAck(NEWS_TOPIC);

// publish on /news
final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false,
MqttMessageBuilders.publish()
.payload(payload)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyNoPublishIsReceived(channel);
}
Expand All @@ -152,7 +162,12 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn
subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE);
// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyPublishIsReceived(channel, AT_MOST_ONCE, "Hello world!");

Expand All @@ -167,7 +182,12 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn

// publish on /news
final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false,
MqttMessageBuilders.publish()
.payload(payload2)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyNoPublishIsReceived(anotherChannel);
}
Expand Down Expand Up @@ -242,7 +262,12 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() {
subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE);
// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyPublishIsReceived((EmbeddedChannel) connection.channel, AT_MOST_ONCE, "Hello world!");

Expand All @@ -257,7 +282,12 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() {
subscribe(subscriberConnection, NEWS_TOPIC, AT_MOST_ONCE);
// publish on /news
final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world2!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false);
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false,
MqttMessageBuilders.publish()
.payload(payload2.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());

ConnectionTestUtils.verifyPublishIsReceived(subscriberChannel, AT_MOST_ONCE, "Hello world2!");

Expand Down
Loading

0 comments on commit 9ceb263

Please sign in to comment.