diff --git a/conf/broker.conf b/conf/broker.conf index c1d1f9192a233..6ba9882f94f47 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -281,6 +281,9 @@ maxUnackedMessagesPerBroker=0 # limit/2 messages maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 +# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled) +unblockStuckSubscriptionEnabled=false + # Tick time to schedule task that checks topic publish rate limiting across all topics # Reducing to lower value can give more accuracy while throttling publish but # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index b47ec671be627..5a600d6e812da 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -233,6 +233,9 @@ maxUnackedMessagesPerBroker=0 # limit/2 messages maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 +# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled) +unblockStuckSubscriptionEnabled=false + # Tick time to schedule task that checks topic publish rate limiting across all topics # Reducing to lower value can give more accuracy while throttling publish but # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index d39af92008b3b..be898f3465a44 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -683,4 +683,10 @@ Set asyncReplayEntries( */ ManagedCursorMXBean getStats(); + /** + * Checks if read position changed since this method was called last time. + * + * @return if read position changed + */ + boolean checkAndUpdateReadPositionChanged(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 211d7ea7e717d..81e8399777704 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -119,6 +119,8 @@ public class ManagedCursorImpl implements ManagedCursor { protected static final AtomicReferenceFieldUpdater READ_POSITION_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition"); protected volatile PositionImpl readPosition; + // keeps sample of last read-position for validation and monitoring if read-position is not moving forward. + protected volatile PositionImpl statsLastReadPosition; protected static final AtomicReferenceFieldUpdater LAST_MARK_DELETE_ENTRY_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry"); @@ -2970,5 +2972,15 @@ private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { return Math.min(maxEntriesBasedOnSize, maxEntries); } + @Override + public boolean checkAndUpdateReadPositionChanged() { + PositionImpl lastEntry = ledger.lastConfirmedEntry; + boolean isReadPositionOnTail = lastEntry == null || readPosition == null + || !(lastEntry.compareTo(readPosition) > 0); + boolean isReadPositionChanged = readPosition != null && !readPosition.equals(statsLastReadPosition); + statsLastReadPosition = readPosition; + return isReadPositionOnTail || isReadPositionChanged; + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java index 50382f9fb8807..92baf42236e04 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java @@ -124,7 +124,6 @@ public boolean equals(Object obj) { PositionImpl other = (PositionImpl) obj; return ledgerId == other.ledgerId && entryId == other.entryId; } - return false; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 60043502023cb..baf1c9f06553b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -364,6 +364,11 @@ public List readEntriesOrWait(int maxEntries, long maxSizeBytes) throws InterruptedException, ManagedLedgerException { return null; } + + @Override + public boolean checkAndUpdateReadPositionChanged() { + return false; + } } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 2f3d3bc39f4e1..1d325f6c03d39 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3465,12 +3465,71 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } finally { factory2.shutdown(); - } + } }); factory1.shutdown(); dirtyFactory.shutdown(); } + @Test + public void testCursorCheckReadPositionChanged() throws Exception { + ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + ManagedCursor c1 = ledger.openCursor("c1"); + + // check empty ledger + assertTrue(c1.checkAndUpdateReadPositionChanged()); + assertTrue(c1.checkAndUpdateReadPositionChanged()); + + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + ledger.addEntry("dummy-entry-1".getBytes(Encoding)); + + // read-position has not been moved + assertFalse(c1.checkAndUpdateReadPositionChanged()); + + List entries = c1.readEntries(2); + entries.forEach(e -> { + try { + c1.markDelete(e.getPosition()); + e.release(); + } catch (Exception e1) { + // Ok + } + }); + + // read-position is moved + assertTrue(c1.checkAndUpdateReadPositionChanged()); + // read-position has not been moved since last read + assertFalse(c1.checkAndUpdateReadPositionChanged()); + + c1.close(); + ledger.close(); + + ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); + // recover cursor + ManagedCursor c2 = ledger.openCursor("c1"); + assertTrue(c2.checkAndUpdateReadPositionChanged()); + assertFalse(c2.checkAndUpdateReadPositionChanged()); + + entries = c2.readEntries(2); + entries.forEach(e -> { + try { + c2.markDelete(e.getPosition()); + e.release(); + } catch (Exception e1) { + // Ok + } + }); + + assertTrue(c2.checkAndUpdateReadPositionChanged()); + // returns true because read-position is on tail + assertTrue(c2.checkAndUpdateReadPositionChanged()); + assertTrue(c2.checkAndUpdateReadPositionChanged()); + + ledger.close(); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 526c414f9d2ea..b723705a202d5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -554,6 +554,13 @@ public class ServiceConfiguration implements PulsarConfiguration { + " unacked messages than this percentage limit and subscription will not receive any new messages " + " until that subscription acks back `limit/2` messages") private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Broker periodically checks if subscription is stuck and unblock if flag is enabled. " + + "(Default is disabled)" + ) + private boolean unblockStuckSubscriptionEnabled = false; @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 0d3f3f4df708f..7dab8b6745665 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -124,4 +124,11 @@ default void markDeletePositionMoveForward() { // No-op } + /** + * Checks if dispatcher is stuck and unblocks the dispatch if needed. + */ + default boolean checkAndUnblockIfStuck() { + return false; + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c6caa702f7a8d..eec0f939dec7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -853,6 +853,21 @@ public void addMessageToReplay(long ledgerId, long entryId) { this.messagesToRedeliver.add(ledgerId, entryId); } + @Override + public boolean checkAndUnblockIfStuck() { + if (cursor.checkAndUpdateReadPositionChanged()) { + return false; + } + // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read + if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead + && cursor.getNumberOfEntriesInBacklog(false) > 0) { + log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); + readMoreEntries(); + return true; + } + return false; + } + public PersistentTopic getTopic() { return topic; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 4bc0728199e62..6e28a3cf47efb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -576,5 +576,21 @@ public CompletableFuture close() { return disconnectAllConsumers(); } + @Override + public boolean checkAndUnblockIfStuck() { + if (cursor.checkAndUpdateReadPositionChanged()) { + return false; + } + Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this); + int totalAvailablePermits = consumer.getAvailablePermits(); + // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read + if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) { + log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name); + readMoreEntries(consumer); + return true; + } + return false; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a11e87502087f..7c5681fedf077 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1092,5 +1092,9 @@ public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) { return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position); } + public boolean checkAndUnblockIfStuck() { + return dispatcher.checkAndUnblockIfStuck(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0951ec55baa0b..a7391d530121c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1621,6 +1621,10 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsHelper.aggMsgRateOut += subMsgRateOut; topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut; nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false); + // check stuck subscription + if (brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) { + subscription.checkAndUnblockIfStuck(); + } } catch (Exception e) { log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName, e.getMessage(), e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 9961f3ac88390..278e05bdadec0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -18,20 +18,29 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import java.lang.reflect.Field; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.testng.annotations.AfterMethod; @@ -84,4 +93,67 @@ public void testCleanFailedUnloadTopic() throws Exception { producer.close(); } + + /** + * Test validates if topic's dispatcher is stuck then broker can doscover and unblock it. + * + * @throws Exception + */ + @Test + public void testUnblockStuckSubscription() throws Exception { + final String topicName = "persistent://prop/ns-abc/stuckSubscriptionTopic"; + final String sharedSubName = "shared"; + final String failoverSubName = "failOver"; + + Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe(); + Consumer consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe(); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentSubscription sharedSub = topic.getSubscription(sharedSubName); + PersistentSubscription failOverSub = topic.getSubscription(failoverSubName); + + PersistentDispatcherMultipleConsumers sharedDispatcher = (PersistentDispatcherMultipleConsumers) sharedSub + .getDispatcher(); + PersistentDispatcherSingleActiveConsumer failOverDispatcher = (PersistentDispatcherSingleActiveConsumer) failOverSub + .getDispatcher(); + + // build backlog + consumer1.close(); + consumer2.close(); + + // block sub to read messages + sharedDispatcher.havePendingRead = true; + failOverDispatcher.havePendingRead = true; + + producer.newMessage().value("test").eventTime(5).send(); + producer.newMessage().value("test").eventTime(5).send(); + + consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName(sharedSubName).subscribe(); + consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Failover) + .subscriptionName(failoverSubName).subscribe(); + Message msg = consumer1.receive(2, TimeUnit.SECONDS); + assertNull(msg); + msg = consumer2.receive(2, TimeUnit.SECONDS); + assertNull(msg); + + // allow reads but dispatchers are still blocked + sharedDispatcher.havePendingRead = false; + failOverDispatcher.havePendingRead = false; + + // run task to unblock stuck dispatcher: first iteration sets the lastReadPosition and next iteration will + // unblock the dispatcher read because read-position has not been moved since last iteration. + sharedSub.checkAndUnblockIfStuck(); + failOverDispatcher.checkAndUnblockIfStuck(); + assertTrue(sharedSub.checkAndUnblockIfStuck()); + assertTrue(failOverDispatcher.checkAndUnblockIfStuck()); + + msg = consumer1.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + msg = consumer2.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + } } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index 8308f62830038..117e30d7b8590 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -484,6 +484,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github |maxUnackedMessagesPerSubscription| The same as above, except per subscription rather than per consumer. |200000| | maxUnackedMessagesPerBroker | Maximum number of unacknowledged messages allowed per broker. Once this limit reaches, the broker stops dispatching messages to all shared subscriptions which has a higher number of unacknowledged messages until subscriptions start acknowledging messages back and unacknowledged messages count reaches to limit/2. When the value is set to 0, unacknowledged message limit check is disabled and broker does not block dispatchers. | 0 | | maxUnackedMessagesPerSubscriptionOnBrokerBlocked | Once the broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which have higher unacknowledged messages than this percentage limit and subscription does not receive any new messages until that subscription acknowledges messages back. | 0.16 | +| unblockStuckSubscriptionEnabled|Broker periodically checks if subscription is stuck and unblock if flag is enabled.|false| |maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0| |zookeeperSessionExpiredPolicy|There are two policies when ZooKeeper session expired happens, "shutdown" and "reconnect". If it is set to "shutdown" policy, when ZooKeeper session expired happens, the broker is shutdown. If it is set to "reconnect" policy, the broker tries to reconnect to ZooKeeper server and re-register metadata to ZooKeeper. Note: the "reconnect" policy is an experiment feature.|shutdown| | topicPublisherThrottlingTickTimeMillis | Tick time to schedule task that checks topic publish rate limiting across all topics. A lower value can improve accuracy while throttling publish but it uses more CPU to perform frequent check. (Disable publish throttling with value 0) | 10|