diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index dfeb4afcc29b5..2c94966aa3f0f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -217,6 +217,7 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, Set messageSet = Sets.newHashSet(); for (int i = 0; i < numMessages; i++) { future_msg = consumer.receiveAsync(); + Thread.sleep(10); msg = future_msg.get(); String receivedMessage = new String(msg.getData()); log.info("Received message: [{}]", receivedMessage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index fb47c5052d841..d38ca35f01170 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -25,7 +25,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -2843,50 +2842,4 @@ public void received(Consumer consumer, Message message) assertEquals(latch.getCount(), 1); consumer.close(); } - - /** - * Ack timeout message is redelivered on time. - * Related github issue #2584 - */ - @Test - public void testAckTimeoutRedeliver() throws Exception { - log.info("-- Starting {} test --", methodName); - - // create consumer and producer - ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer() - .topic("persistent://my-property/my-ns/ack-timeout-topic") - .subscriptionName("subscriber-1") - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionType(SubscriptionType.Shared) - .acknowledgmentGroupTime(0, TimeUnit.SECONDS) - .subscribe(); - - Producer producer = pulsarClient.newProducer() - .topic("persistent://my-property/my-ns/ack-timeout-topic") - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); - - // (1) Produced one Message - String content = "my-message-will-ack-timeout"; - producer.send(content.getBytes()); - - // (2) consumer to receive messages, and not ack - Message message = consumer.receive(); - - // (3) should be re-delivered once ack-timeout. - Thread.sleep(1000); - message = consumer.receive(200, TimeUnit.MILLISECONDS); - assertNotNull(message); - - Thread.sleep(1000); - message = consumer.receive(200, TimeUnit.MILLISECONDS); - assertNotNull(message); - - assertEquals(content, new String(message.getData())); - - producer.close(); - consumer.close(); - log.info("-- Exiting {} test --", methodName); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index e178febdcb532..da53760d2b947 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -235,17 +235,16 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception { private static int receiveAllMessage(Consumer consumer, boolean ackMessages) throws Exception { int messagesReceived = 0; - Message msg = consumer.receive(200, TimeUnit.MILLISECONDS); + Message msg = consumer.receive(1, TimeUnit.SECONDS); while (msg != null) { ++messagesReceived; - log.info("Consumer {} received {}", consumer.getConsumerName(), new String(msg.getData())); + log.info("Consumer received {}", new String(msg.getData())); if (ackMessages) { consumer.acknowledge(msg); - log.info("Consumer {} acknowledged {}", consumer.getConsumerName(), new String(msg.getData())); } - msg = consumer.receive(200, TimeUnit.MILLISECONDS); + msg = consumer.receive(1, TimeUnit.SECONDS); } return messagesReceived; @@ -284,31 +283,56 @@ public void testFailoverSingleAckedPartitionedTopic() throws Exception { } // 4. Receive messages + Message message1 = consumer1.receive(); + Message message2 = consumer2.receive(); int messageCount1 = 0; int messageCount2 = 0; - - messageCount1 += receiveAllMessage(consumer1, false); - messageCount2 += receiveAllMessage(consumer2, true); - + int ackCount1 = 0; + int ackCount2 = 0; + do { + if (message1 != null) { + log.info("Consumer1 received " + new String(message1.getData())); + messageCount1 += 1; + } + if (message2 != null) { + log.info("Consumer2 received " + new String(message2.getData())); + messageCount2 += 1; + consumer2.acknowledge(message2); + ackCount2 += 1; + } + message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); + message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); + } while (message1 != null || message2 != null); log.info(key + " messageCount1 = " + messageCount1); log.info(key + " messageCount2 = " + messageCount2); - + log.info(key + " ackCount1 = " + ackCount1); + log.info(key + " ackCount2 = " + ackCount2); assertEquals(messageCount1 + messageCount2, totalMessages); - Thread.sleep((int) (ackTimeOutMillis * 1.1)); - // 5. Check if Messages redelivered again // Since receive is a blocking call hoping that timeout will kick in log.info(key + " Timeout should be triggered now"); + message1 = consumer1.receive(); messageCount1 = 0; - - messageCount1 += receiveAllMessage(consumer1, true); - messageCount2 += receiveAllMessage(consumer2, false); - + do { + if (message1 != null) { + log.info("Consumer1 received " + new String(message1.getData())); + messageCount1 += 1; + consumer1.acknowledge(message1); + ackCount1 += 1; + } + if (message2 != null) { + log.info("Consumer2 received " + new String(message2.getData())); + messageCount2 += 1; + } + message1 = consumer1.receive(500, TimeUnit.MILLISECONDS); + message2 = consumer2.receive(500, TimeUnit.MILLISECONDS); + } while (message1 != null || message2 != null); log.info(key + " messageCount1 = " + messageCount1); log.info(key + " messageCount2 = " + messageCount2); - - assertEquals(messageCount1 + messageCount2, totalMessages); + log.info(key + " ackCount1 = " + ackCount1); + log.info(key + " ackCount2 = " + ackCount2); + assertEquals(ackCount1 + messageCount2, totalMessages); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 504de14fcff6b..266eb3b9db7bd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -87,7 +87,6 @@ public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ac timeout = client.timer().newTimeout(new TimerTask() { @Override public void run(Timeout t) throws Exception { - toggle(); if (isAckTimeout()) { log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size()); Set messageIds = new HashSet<>(); @@ -95,6 +94,7 @@ public void run(Timeout t) throws Exception { oldOpenSet.clear(); consumerBase.redeliverUnacknowledgedMessages(messageIds); } + toggle(); timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS); } }, ackTimeoutMillis, TimeUnit.MILLISECONDS);