Skip to content

Commit

Permalink
Added test case for issue moquette-io#415
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Jun 26, 2018
1 parent bb52d2c commit a6f133e
Showing 1 changed file with 248 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Copyright (c) 2012-2018 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.moquette.server;

import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import io.moquette.spi.impl.security.AcceptAllAuthenticator;
import io.moquette.spi.impl.subscriptions.Topic;
import io.moquette.spi.security.IAuthorizator;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Properties;

import static io.moquette.spi.impl.ProtocolProcessorTest.EMPTY_OBSERVERS;
import static io.moquette.spi.impl.ProtocolProcessorTest.NO_OBSERVERS_INTERCEPTOR;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.*;

public class ServerIntegrationPahoCanPublishOnReadBlockedTopicTest {

private static final Logger LOG = LoggerFactory.getLogger(ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.class);

static MqttClientPersistence s_dataStore;
static MqttClientPersistence s_pubDataStore;

Server m_server;
IMqttClient m_client;
IMqttClient m_publisher;
MessageCollector m_messagesCollector;
IConfig m_config;

@BeforeClass
public static void beforeTests() {
String tmpDir = System.getProperty("java.io.tmpdir");
s_dataStore = new MqttDefaultFilePersistence(tmpDir);
s_pubDataStore = new MqttDefaultFilePersistence(tmpDir + File.separator + "publisher");
}

protected void startServer() throws IOException {
m_server = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties();
m_config = new MemoryConfig(configProps);

final IAuthorizator authorizator = new IAuthorizator() {
@Override
public boolean canWrite(Topic topic, String user, String client) {
return true;
}

@Override
public boolean canRead(Topic topic, String user, String client) {
return false;
}
};

m_server.startServer(m_config, EMPTY_OBSERVERS, null, new AcceptAllAuthenticator(), authorizator);
}

@Before
public void setUp() throws Exception {
startServer();

m_client = new MqttClient("tcp://localhost:1883", "TestClient", s_dataStore);
m_messagesCollector = new MessageCollector();
m_client.setCallback(m_messagesCollector);

m_publisher = new MqttClient("tcp://localhost:1883", "Publisher", s_pubDataStore);
}

@After
public void tearDown() throws Exception {
if (m_client != null && m_client.isConnected()) {
m_client.disconnect();
}

if (m_publisher != null && m_publisher.isConnected()) {
m_publisher.disconnect();
}

stopServer();
}

private void stopServer() {
m_server.stopServer();
}

@Test
public void shouldNotInternalPublishOnReadBlockedSubscriptionTopic() throws Exception {
LOG.info("*** shouldNotInternalPublishOnReadBlockedSubscriptionTopic ***");

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
m_client.connect(options);
try {
m_client.subscribe("/topic", 0);
fail();
} catch (MqttException mex) {
// it's OK, the subscribed should fail with error code 128
}

// Exercise
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName("/topic")
.retained(true)
.qos(MqttQoS.AT_MOST_ONCE)
// .qQos(MqttQoS.AT_LEAST_ONCE)
// .qos(MqttQoS.EXACTLY_ONCE)
.payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
.build();

m_server.internalPublish(message, "INTRLPUB");

// TODO verify the message is not published
final MqttMessage mqttMessage = m_messagesCollector.waitMessage(1);
assertNull(mqttMessage);
}


// @Ignore("This test hasn't any meaning using in memory storage service")
// @Test
// public void testCleanSession_maintainClientSubscriptions_withServerRestart() throws Exception {
// LOG.info("*** testCleanSession_maintainClientSubscriptions_withServerRestart ***");
// MqttConnectOptions options = new MqttConnectOptions();
// options.setCleanSession(false);
// m_client.connect(options);
// m_client.subscribe("/topic", 0);
// m_client.disconnect();
//
// m_server.stopServer();
//
// m_server.startServer(IntegrationUtils.prepareTestProperties());
//
// // reconnect and publish
// m_client.connect(options);
// m_client.publish("/topic", "Test my payload".getBytes(UTF_8), 0, false);
//
// assertEquals("/topic", m_messagesCollector.getTopic());
// }
//
// /**
// * subscriber A connect and subscribe on "a/b" QoS 1 subscriber B connect and subscribe on "a/+"
// * BUT with QoS 2 publisher connects and send a message "hello" on "a/b" subscriber A must
// * receive a notification with QoS1 subscriber B must receive a notification with QoS2
// */
// @Test
// public void checkSubscribersGetCorrectQosNotifications() throws Exception {
// LOG.info("*** checkSubscribersGetCorrectQosNotifications ***");
// String tmpDir = System.getProperty("java.io.tmpdir");
//
// MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence(tmpDir + File.separator + "subscriberA");
//
// MqttClient subscriberA = new MqttClient("tcp://localhost:1883", "SubscriberA", dsSubscriberA);
// MessageCollector cbSubscriberA = new MessageCollector();
// subscriberA.setCallback(cbSubscriberA);
// subscriberA.connect();
// subscriberA.subscribe("a/b", 1);
//
// MqttClientPersistence dsSubscriberB = new MqttDefaultFilePersistence(tmpDir + File.separator + "subscriberB");
//
// MqttClient subscriberB = new MqttClient("tcp://localhost:1883", "SubscriberB", dsSubscriberB);
// MessageCollector cbSubscriberB = new MessageCollector();
// subscriberB.setCallback(cbSubscriberB);
// subscriberB.connect();
// subscriberB.subscribe("a/+", 2);
//
// m_client.connect();
// m_client.publish("a/b", "Hello world MQTT!!".getBytes(UTF_8), 2, false);
//
// MqttMessage messageOnA = cbSubscriberA.waitMessage(1);
// assertEquals("Hello world MQTT!!", new String(messageOnA.getPayload(), UTF_8));
// assertEquals(1, messageOnA.getQos());
// subscriberA.disconnect();
//
// MqttMessage messageOnB = cbSubscriberB.waitMessage(1);
// assertEquals("Hello world MQTT!!", new String(messageOnB.getPayload(), UTF_8));
// assertEquals(2, messageOnB.getQos());
// subscriberB.disconnect();
// }
//
// @Test
// public void testSubcriptionDoesntStayActiveAfterARestart() throws Exception {
// LOG.info("*** testSubcriptionDoesntStayActiveAfterARestart ***");
// String tmpDir = System.getProperty("java.io.tmpdir");
// // clientForSubscribe1 connect and subscribe to /topic QoS2
// MqttClientPersistence dsSubscriberA = new MqttDefaultFilePersistence(
// tmpDir + File.separator + "clientForSubscribe1");
//
// MqttClient clientForSubscribe1 = new MqttClient("tcp://localhost:1883", "clientForSubscribe1", dsSubscriberA);
// MessageCollector cbSubscriber1 = new MessageCollector();
// clientForSubscribe1.setCallback(cbSubscriber1);
// clientForSubscribe1.connect();
// clientForSubscribe1.subscribe("topic", 0);
//
// // server stop
// m_server.stopServer();
// System.out.println("\n\n SEVER REBOOTING \n\n");
// // server start
// startServer();
//
// // clientForSubscribe2 connect and subscribe to /topic QoS2
// MqttClientPersistence dsSubscriberB = new MqttDefaultFilePersistence(
// tmpDir + File.separator + "clientForSubscribe2");
// MqttClient clientForSubscribe2 = new MqttClient("tcp://localhost:1883", "clientForSubscribe2", dsSubscriberB);
// MessageCollector cbSubscriber2 = new MessageCollector();
// clientForSubscribe2.setCallback(cbSubscriber2);
// clientForSubscribe2.connect();
// clientForSubscribe2.subscribe("topic", 0);
//
// // clientForPublish publish on /topic with QoS2 a message
// MqttClientPersistence dsSubscriberPUB = new MqttDefaultFilePersistence(
// tmpDir + File.separator + "clientForPublish");
// MqttClient clientForPublish = new MqttClient("tcp://localhost:1883", "clientForPublish", dsSubscriberPUB);
// clientForPublish.connect();
// clientForPublish.publish("topic", "Hello".getBytes(UTF_8), 2, true);
//
// // verify clientForSubscribe1 doesn't receive a notification but clientForSubscribe2 yes
// LOG.info("Before waiting to receive 1 sec from {}", clientForSubscribe1.getClientId());
// assertFalse(clientForSubscribe1.isConnected());
// assertTrue(clientForSubscribe2.isConnected());
// LOG.info("Waiting to receive 1 sec from {}", clientForSubscribe2.getClientId());
// MqttMessage messageOnB = cbSubscriber2.waitMessage(1);
// assertEquals("Hello", new String(messageOnB.getPayload(), UTF_8));
// }
}

0 comments on commit a6f133e

Please sign in to comment.