diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 2c94966aa3f0f..dfeb4afcc29b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -217,7 +217,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, Set messageSet = Sets.newHashSet(); for (int i = 0; i < numMessages; i++) { future_msg = consumer.receiveAsync(); - Thread.sleep(10); msg = future_msg.get(); String receivedMessage = new String(msg.getData()); log.info("Received message: [{}]", receivedMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 61bdad034ad1f..df383b532fd87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -2662,4 +2663,50 @@ public void received(Consumer consumer, Message message) assertEquals(latch.getCount(), 1); consumer.close(); } + + /** + * Ack timeout message is redelivered on time. + * Related github issue #2584 + */ + @Test + public void testAckTimeoutRedeliver() throws Exception { + log.info("-- Starting {} test --", methodName); + + // create consumer and producer + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer() + .topic("persistent://my-property/my-ns/ack-timeout-topic") + .subscriptionName("subscriber-1") + .ackTimeout(1, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscribe(); + + Producer producer = pulsarClient.newProducer() + .topic("persistent://my-property/my-ns/ack-timeout-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + // (1) Produced one Message + String content = "my-message-will-ack-timeout"; + producer.send(content.getBytes()); + + // (2) consumer to receive messages, and not ack + Message message = consumer.receive(); + + // (3) should be re-delivered once ack-timeout. + Thread.sleep(1000); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + assertNotNull(message); + + Thread.sleep(1000); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + assertNotNull(message); + + assertEquals(content, new String(message.getData())); + + producer.close(); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index da53760d2b947..e178febdcb532 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -235,16 +235,17 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception { private static int receiveAllMessage(Consumer consumer, boolean ackMessages) throws Exception { int messagesReceived = 0; - Message msg = consumer.receive(1, TimeUnit.SECONDS); + Message msg = consumer.receive(200, TimeUnit.MILLISECONDS); while (msg != null) { ++messagesReceived; - log.info("Consumer received {}", new String(msg.getData())); + log.info("Consumer {} received {}", consumer.getConsumerName(), new String(msg.getData())); if (ackMessages) { consumer.acknowledge(msg); + log.info("Consumer {} acknowledged {}", consumer.getConsumerName(), new String(msg.getData())); } - msg = consumer.receive(1, TimeUnit.SECONDS); + msg = consumer.receive(200, TimeUnit.MILLISECONDS); } return messagesReceived; @@ -283,56 +284,31 @@ public void testFailoverSingleAckedPartitionedTopic() throws Exception { } // 4. Receive messages - Message message1 = consumer1.receive(); - Message message2 = consumer2.receive(); int messageCount1 = 0; int messageCount2 = 0; - int ackCount1 = 0; - int ackCount2 = 0; - do { - if (message1 != null) { - log.info("Consumer1 received " + new String(message1.getData())); - messageCount1 += 1; - } - if (message2 != null) { - log.info("Consumer2 received " + new String(message2.getData())); - messageCount2 += 1; - consumer2.acknowledge(message2); - ackCount2 += 1; - } - message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); - message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); - } while (message1 != null || message2 != null); + + messageCount1 += receiveAllMessage(consumer1, false); + messageCount2 += receiveAllMessage(consumer2, true); + log.info(key + " messageCount1 = " + messageCount1); log.info(key + " messageCount2 = " + messageCount2); - log.info(key + " ackCount1 = " + ackCount1); - log.info(key + " ackCount2 = " + ackCount2); + assertEquals(messageCount1 + messageCount2, totalMessages); + Thread.sleep((int) (ackTimeOutMillis * 1.1)); + // 5. Check if Messages redelivered again // Since receive is a blocking call hoping that timeout will kick in log.info(key + " Timeout should be triggered now"); - message1 = consumer1.receive(); messageCount1 = 0; - do { - if (message1 != null) { - log.info("Consumer1 received " + new String(message1.getData())); - messageCount1 += 1; - consumer1.acknowledge(message1); - ackCount1 += 1; - } - if (message2 != null) { - log.info("Consumer2 received " + new String(message2.getData())); - messageCount2 += 1; - } - message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); - message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); - } while (message1 != null || message2 != null); + + messageCount1 += receiveAllMessage(consumer1, true); + messageCount2 += receiveAllMessage(consumer2, false); + log.info(key + " messageCount1 = " + messageCount1); log.info(key + " messageCount2 = " + messageCount2); - log.info(key + " ackCount1 = " + ackCount1); - log.info(key + " ackCount2 = " + ackCount2); - assertEquals(ackCount1 + messageCount2, totalMessages); + + assertEquals(messageCount1 + messageCount2, totalMessages); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 266eb3b9db7bd..504de14fcff6b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -87,6 +87,7 @@ public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ac timeout = client.timer().newTimeout(new TimerTask() { @Override public void run(Timeout t) throws Exception { + toggle(); if (isAckTimeout()) { log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size()); Set messageIds = new HashSet<>(); @@ -94,7 +95,6 @@ public void run(Timeout t) throws Exception { oldOpenSet.clear(); consumerBase.redeliverUnacknowledgedMessages(messageIds); } - toggle(); timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS); } }, ackTimeoutMillis, TimeUnit.MILLISECONDS);