diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 3945bed57c897..e73c437e81b49 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -22,9 +22,18 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.beust.jcommander.internal.Maps; import com.google.common.collect.Sets; +import java.net.URL; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -42,20 +51,13 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.net.URL; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - @Test(groups = "broker") public class BacklogQuotaManagerTest { PulsarService pulsar; @@ -66,7 +68,7 @@ public class BacklogQuotaManagerTest { LocalBookkeeperEnsemble bkEnsemble; - private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5; + private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 3; private static final int MAX_ENTRIES_PER_LEDGER = 5; @BeforeMethod @@ -145,14 +147,14 @@ public void testBacklogQuotaWithReader() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", - new BacklogQuota(10 * 1024, 5, BacklogQuota.RetentionPolicy.producer_exception)); + new BacklogQuota(10 * 1024, TIME_TO_CHECK_BACKLOG_QUOTA, BacklogQuota.RetentionPolicy.producer_exception)); try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) { final String topic1 = "persistent://prop/ns-quota/topic1"; final int numMsgs = 20; Reader reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { @@ -185,19 +187,21 @@ public void testBacklogQuotaWithReader() throws Exception { fail("Should not have gotten exception: " + ce.getMessage()); } - // make sure ledgers are trimmed - PersistentTopicInternalStats internalStats = - admin.topics().getInternalStats(topic1, false); - - // check there is only one ledger left // TODO in theory there shouldn't be any ledgers left if we are using readers. // However, trimming of ledgers are piggy packed onto ledger operations. // So if there isn't new data coming in, trimming never occurs. // We need to trigger trimming on a schedule to actually delete all remaining ledgers - assertEquals(internalStats.ledgers.size(), 1); + Awaitility.await().untilAsserted(() -> { + // make sure ledgers are trimmed + PersistentTopicInternalStats internalStats = + admin.topics().getInternalStats(topic1, false); + + // check there is only one ledger left + assertEquals(internalStats.ledgers.size(), 1); - // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); + // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER + assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); + }); // check reader can still read with out error @@ -216,18 +220,18 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", - new BacklogQuota(10 * 1024, 2, BacklogQuota.RetentionPolicy.producer_exception)); + new BacklogQuota(10 * 1024, TIME_TO_CHECK_BACKLOG_QUOTA, BacklogQuota.RetentionPolicy.producer_exception)); try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) { final String topic1 = "persistent://prop/ns-quota/topic1" + UUID.randomUUID(); final int numMsgs = 20; Reader reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create(); - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { content[0] = (byte) (content[0] + 1); producer.send(content); } - Thread.sleep(2 * 1000); + Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000); admin.brokers().backlogQuotaCheck(); rolloverStats(); TopicStats stats = admin.topics().getStats(topic1); @@ -248,15 +252,16 @@ public void testTriggerBacklogQuotaSizeWithReader() throws Exception { fail("Should not have gotten exception: " + ce.getMessage()); } - // make sure ledgers are trimmed - PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); + Awaitility.await().untilAsserted(() -> { + // make sure ledgers are trimmed + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); - // check there is only one ledger left - assertEquals(internalStats.ledgers.size(), 1); - - // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); + // check there is only one ledger left + assertEquals(internalStats.ledgers.size(), 1); + // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER + assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); + }); // check reader can still read with out error while (true) { @@ -281,18 +286,18 @@ public void testTriggerBacklogTimeQuotaWithReader() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", - new BacklogQuota(10 * 1024, 5, BacklogQuota.RetentionPolicy.producer_exception)); + new BacklogQuota(10 * 1024, TIME_TO_CHECK_BACKLOG_QUOTA, BacklogQuota.RetentionPolicy.producer_exception)); try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();) { final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID(); final int numMsgs = 9; Reader reader = client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create(); - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { content[0] = (byte) (content[0] + 1); producer.send(content); } - Thread.sleep(5 * 1000); + Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000); admin.brokers().backlogQuotaCheck(); rolloverStats(); TopicStats stats = admin.topics().getStats(topic1); @@ -304,7 +309,18 @@ public void testTriggerBacklogTimeQuotaWithReader() throws Exception { // non-durable subscription won't trigger the check for time based backlog quota // and cause back pressure action to be token. Since broker don't keep track consuming position for reader. assertEquals(nonDurableSubscriptionBacklog, numMsgs, - "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ; + "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); + + Awaitility.await() + .pollDelay(Duration.ofSeconds(TIME_TO_CHECK_BACKLOG_QUOTA)) + .pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + // make sure ledgers are trimmed + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); + + // check that there are 2 ledgers + assertEquals(internalStats.ledgers.size(), 2); + }); + try { // try to send over backlog quota and make sure it fails for (int i = 0; i < numMsgs; i++) { @@ -315,15 +331,6 @@ public void testTriggerBacklogTimeQuotaWithReader() throws Exception { fail("Should not have gotten exception: " + ce.getMessage()); } - // make sure ledgers are trimmed - PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic1, false); - - // check there is only one ledger left - assertEquals(internalStats.ledgers.size(), 2); - - // check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER - assertEquals(internalStats.ledgers.get(0).ledgerId, (2 * numMsgs / MAX_ENTRIES_PER_LEDGER) - 1); - // check reader can still read without error while (true) { Message msg = reader.readNext(5, TimeUnit.SECONDS); @@ -354,7 +361,7 @@ public void testConsumerBacklogEvictionSizeQuota() throws Exception { Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -386,7 +393,7 @@ public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception { Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -424,7 +431,7 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -462,7 +469,7 @@ public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception { Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -494,7 +501,7 @@ public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exceptio Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { @@ -529,12 +536,23 @@ public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exceptio client.close(); } + private Producer createProducer(PulsarClient client, String topic) + throws PulsarClientException { + return client.newProducer() + .enableBatching(false) + .sendTimeout(2, TimeUnit.SECONDS) + .topic(topic) + .create(); + } + @Test public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", - new BacklogQuota(10 * 1024, TIME_TO_CHECK_BACKLOG_QUOTA, BacklogQuota.RetentionPolicy.consumer_backlog_eviction)); + new BacklogQuota(20 * 1024, 2 * TIME_TO_CHECK_BACKLOG_QUOTA, + BacklogQuota.RetentionPolicy.consumer_backlog_eviction)); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); final String topic1 = "persistent://prop/ns-quota/topic12"; @@ -544,40 +562,56 @@ public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception { Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1).create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; + List> messagesToAcknowledge = new ArrayList<>(); + for (int i = 0; i < numMsgs; i++) { producer.send(content); - consumer1.receive(); + messagesToAcknowledge.add(consumer1.receive()); consumer2.receive(); } - TopicStats stats = admin.topics().getStats(topic1); - assertEquals(stats.subscriptions.get(subName1).msgBacklog, 14); - assertEquals(stats.subscriptions.get(subName2).msgBacklog, 14); + { + TopicStats stats = admin.topics().getStats(topic1); + assertEquals(stats.subscriptions.get(subName1).msgBacklog, 14); + assertEquals(stats.subscriptions.get(subName2).msgBacklog, 14); + } - consumer1.redeliverUnacknowledgedMessages(); for (int i = 0; i < numMsgs; i++) { + // pause before acknowledging the 11. message so that 2 first ledgers (5 msgs/ledger) will expire before the + // last ledger + if (i == 10) { + Thread.sleep(TIME_TO_CHECK_BACKLOG_QUOTA * 1000L); + } // only one consumer acknowledges the message - consumer1.acknowledge(consumer1.receive()); + consumer1.acknowledge(messagesToAcknowledge.get(i)); } - Thread.sleep(1000); - rolloverStats(); - stats = admin.topics().getStats(topic1); - // sub1 has empty backlog as it acked all messages - assertEquals(stats.subscriptions.get(subName1).msgBacklog, 0); - assertEquals(stats.subscriptions.get(subName2).msgBacklog, 14); - - Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000); - rolloverStats(); - - stats = admin.topics().getStats(topic1); - // Messages on first 2 ledgers should be expired, backlog is number of - // message in current ledger which should be 4. - assertEquals(stats.subscriptions.get(subName2).msgBacklog, 4); - client.close(); + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .untilAsserted(() -> { + rolloverStats(); + TopicStats stats = admin.topics().getStats(topic1); + // sub1 has empty backlog as it acked all messages + assertEquals(stats.subscriptions.get(subName1).msgBacklog, 0); + assertEquals(stats.subscriptions.get(subName2).msgBacklog, 14); + }); + + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(4 * TIME_TO_CHECK_BACKLOG_QUOTA)) + .untilAsserted(() -> { + // Messages on first 2 ledgers should be expired, backlog is number of + // message in current ledger which should be 4. + long msgBacklog = admin.topics().getStats(topic1).subscriptions.get(subName2).msgBacklog; + // TODO: for some reason the backlog size is sometimes off by one + // Internally there's a method `long getNumberOfEntriesInBacklog(boolean getPreciseBacklog)` + // on org.apache.pulsar.broker.service.Subscription interface + // the `boolean getPreciseBacklog` parameter indicates that the backlog size isn't accurate + assertEquals(msgBacklog, 4, 1); + }); } @Test @@ -608,8 +642,7 @@ public void testConcurrentAckAndEviction() throws Exception { public void run() { try { barrier.await(); - org.apache.pulsar.client.api.Producer producer = client.newProducer().topic(topic1) - .create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -623,7 +656,7 @@ public void run() { } }; - Thread ConsumerThread = new Thread() { + Thread consumerThread = new Thread() { public void run() { try { barrier.await(); @@ -641,7 +674,7 @@ public void run() { }; producerThread.start(); - ConsumerThread.start(); + consumerThread.start(); // test hangs without timeout since there is nothing to consume due to eviction counter.await(20, TimeUnit.SECONDS); @@ -682,8 +715,7 @@ public void testNoEviction() throws Exception { public void run() { try { barrier.await(); - org.apache.pulsar.client.api.Producer producer = client2.newProducer().topic(topic1) - .create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client2, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -697,7 +729,7 @@ public void run() { } }; - Thread ConsumerThread = new Thread() { + Thread consumerThread = new Thread() { public void run() { try { barrier.await(); @@ -714,7 +746,7 @@ public void run() { }; producerThread.start(); - ConsumerThread.start(); + consumerThread.start(); counter.await(); assertFalse(gotException.get()); } @@ -751,8 +783,7 @@ public void testEvictionMulti() throws Exception { public void run() { try { barrier.await(); - org.apache.pulsar.client.api.Producer producer = client2.newProducer().topic(topic1) - .create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client2, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -770,8 +801,7 @@ public void run() { public void run() { try { barrier.await(); - org.apache.pulsar.client.api.Producer producer = client3.newProducer().topic(topic1) - .create(); + org.apache.pulsar.client.api.Producer producer = createProducer(client3, topic1); byte[] content = new byte[1024]; for (int i = 0; i < numMsgs; i++) { producer.send(content); @@ -785,7 +815,7 @@ public void run() { } }; - Thread ConsumerThread1 = new Thread() { + Thread consumerThread1 = new Thread() { public void run() { try { barrier.await(); @@ -800,7 +830,7 @@ public void run() { } }; - Thread ConsumerThread2 = new Thread() { + Thread consumerThread2 = new Thread() { public void run() { try { barrier.await(); @@ -817,8 +847,8 @@ public void run() { producerThread1.start(); producerThread2.start(); - ConsumerThread1.start(); - ConsumerThread2.start(); + consumerThread1.start(); + consumerThread2.start(); counter.await(20, TimeUnit.SECONDS); assertFalse(gotException.get()); Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); @@ -844,7 +874,7 @@ public void testAheadProducerOnHold() throws Exception { Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); byte[] content = new byte[1024]; - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); for (int i = 0; i <= numMsgs; i++) { try { producer.send(content); @@ -883,7 +913,7 @@ public void testAheadProducerOnHoldTimeout() throws Exception { client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); byte[] content = new byte[1024]; - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); for (int i = 0; i < 10; i++) { producer.send(content); } @@ -918,7 +948,7 @@ public void testProducerException() throws Exception { client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); byte[] content = new byte[1024]; - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); for (int i = 0; i < 10; i++) { producer.send(content); } @@ -955,7 +985,7 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); byte[] content = new byte[1024]; - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); for (int i = 0; i < 10; i++) { producer.send(content); } @@ -1015,7 +1045,7 @@ public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Excepti Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); byte[] content = new byte[1024]; - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); for (int i = 0; i < numMsgs; i++) { producer.send(content); } @@ -1077,7 +1107,7 @@ public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception { Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); byte[] content = new byte[1024]; - Producer producer = client.newProducer().topic(topic1).sendTimeout(2, TimeUnit.SECONDS).create(); + Producer producer = createProducer(client, topic1); for (int i = 0; i < numMsgs; i++) { producer.send(content); }