Skip to content

Commit

Permalink
[pulsar-broker] Allow broker to discover and unblock stuck subscripti…
Browse files Browse the repository at this point in the history
…on (apache#9789)

### Motivation
We have been frequently seeing issue where subscription gets stuck on different topics and broker is not dispatching messages though consumer has available-permits and no pending reads (example apache#9788). It can happen due to regression bug or unknown issue when expiry runs.. one of the workarounds is manually unload the topic and reload it which is not feasible if this happens frequently to many topics. Or broker should have the capability to discover such stuck subscriptions and unblock them.
Below example shows that:
subscription has available-permit>0, there is no pending reads, cursor's read-position is not moving forward and that builds the backlog until we unload the topic. It happens frequently due to unknown reason:
```
STATS-INTERNAL:
"sub1" : {
      "markDeletePosition" : "11111111:15520",
      "readPosition" : "11111111:15521",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 115521,
      "cursorLedger" : 585099247,
      "cursorLedgerLastEntry" : 597,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-02-25T19:55:50.357Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,

STATS:
"sub1" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 30350,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Shared",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "C1",
        "availablePermits" : 723,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2021-02-25T19:55:50.358285Z",

```

![image](https://user-images.githubusercontent.com/2898254/109894631-ab62d980-7c42-11eb-8dcc-a1a5f4f5d14e.png)


### Modification
Add capability in broker to periodically check if subscription is stuck and unblock it if needed. This check is controlled by flag and for initial release it can be disabled by default (and we can enable by default in later release)


### Result
It helps broker to handle stuck subscription and logs the message for later debugging.
  • Loading branch information
rdhabalia authored Mar 10, 2021
1 parent e246458 commit 8d9a2ab
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false

# Tick time to schedule task that checks topic publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
Expand Down
3 changes: 3 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false

# Tick time to schedule task that checks topic publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,4 +683,10 @@ Set<? extends Position> asyncReplayEntries(
*/
ManagedCursorMXBean getStats();

/**
* Checks if read position changed since this method was called last time.
*
* @return if read position changed
*/
boolean checkAndUpdateReadPositionChanged();
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, PositionImpl> READ_POSITION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, PositionImpl.class, "readPosition");
protected volatile PositionImpl readPosition;
// keeps sample of last read-position for validation and monitoring if read-position is not moving forward.
protected volatile PositionImpl statsLastReadPosition;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, MarkDeleteEntry> LAST_MARK_DELETE_ENTRY_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, MarkDeleteEntry.class, "lastMarkDeleteEntry");
Expand Down Expand Up @@ -2970,5 +2972,15 @@ private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

@Override
public boolean checkAndUpdateReadPositionChanged() {
PositionImpl lastEntry = ledger.lastConfirmedEntry;
boolean isReadPositionOnTail = lastEntry == null || readPosition == null
|| !(lastEntry.compareTo(readPosition) > 0);
boolean isReadPositionChanged = readPosition != null && !readPosition.equals(statsLastReadPosition);
statsLastReadPosition = readPosition;
return isReadPositionOnTail || isReadPositionChanged;
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public boolean equals(Object obj) {
PositionImpl other = (PositionImpl) obj;
return ledgerId == other.ledgerId && entryId == other.entryId;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ public List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
throws InterruptedException, ManagedLedgerException {
return null;
}

@Override
public boolean checkAndUpdateReadPositionChanged() {
return false;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3465,12 +3465,71 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {

} finally {
factory2.shutdown();
}
}
});

factory1.shutdown();
dirtyFactory.shutdown();
}

@Test
public void testCursorCheckReadPositionChanged() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");

// check empty ledger
assertTrue(c1.checkAndUpdateReadPositionChanged());
assertTrue(c1.checkAndUpdateReadPositionChanged());

ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-1".getBytes(Encoding));

// read-position has not been moved
assertFalse(c1.checkAndUpdateReadPositionChanged());

List<Entry> entries = c1.readEntries(2);
entries.forEach(e -> {
try {
c1.markDelete(e.getPosition());
e.release();
} catch (Exception e1) {
// Ok
}
});

// read-position is moved
assertTrue(c1.checkAndUpdateReadPositionChanged());
// read-position has not been moved since last read
assertFalse(c1.checkAndUpdateReadPositionChanged());

c1.close();
ledger.close();

ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
// recover cursor
ManagedCursor c2 = ledger.openCursor("c1");
assertTrue(c2.checkAndUpdateReadPositionChanged());
assertFalse(c2.checkAndUpdateReadPositionChanged());

entries = c2.readEntries(2);
entries.forEach(e -> {
try {
c2.markDelete(e.getPosition());
e.release();
} catch (Exception e1) {
// Ok
}
});

assertTrue(c2.checkAndUpdateReadPositionChanged());
// returns true because read-position is on tail
assertTrue(c2.checkAndUpdateReadPositionChanged());
assertTrue(c2.checkAndUpdateReadPositionChanged());

ledger.close();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " unacked messages than this percentage limit and subscription will not receive any new messages "
+ " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Broker periodically checks if subscription is stuck and unblock if flag is enabled. "
+ "(Default is disabled)"
)
private boolean unblockStuckSubscriptionEnabled = false;
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,11 @@ default void markDeletePositionMoveForward() {
// No-op
}

/**
* Checks if dispatcher is stuck and unblocks the dispatch if needed.
*/
default boolean checkAndUnblockIfStuck() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,21 @@ public void addMessageToReplay(long ledgerId, long entryId) {
this.messagesToRedeliver.add(ledgerId, entryId);
}

@Override
public boolean checkAndUnblockIfStuck() {
if (cursor.checkAndUpdateReadPositionChanged()) {
return false;
}
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
if (totalAvailablePermits > 0 && !havePendingReplayRead && !havePendingRead
&& cursor.getNumberOfEntriesInBacklog(false) > 0) {
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
readMoreEntries();
return true;
}
return false;
}

public PersistentTopic getTopic() {
return topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,5 +576,21 @@ public CompletableFuture<Void> close() {
return disconnectAllConsumers();
}

@Override
public boolean checkAndUnblockIfStuck() {
if (cursor.checkAndUpdateReadPositionChanged()) {
return false;
}
Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
int totalAvailablePermits = consumer.getAvailablePermits();
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
readMoreEntries(consumer);
return true;
}
return false;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1092,5 +1092,9 @@ public boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position) {
return this.pendingAckHandle.checkIsCanDeleteConsumerPendingAck(position);
}

public boolean checkAndUnblockIfStuck() {
return dispatcher.checkAndUnblockIfStuck();
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,10 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsHelper.aggMsgRateOut += subMsgRateOut;
topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false);
// check stuck subscription
if (brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
subscription.checkAndUnblockIfStuck();
}
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -84,4 +93,67 @@ public void testCleanFailedUnloadTopic() throws Exception {

producer.close();
}

/**
* Test validates if topic's dispatcher is stuck then broker can doscover and unblock it.
*
* @throws Exception
*/
@Test
public void testUnblockStuckSubscription() throws Exception {
final String topicName = "persistent://prop/ns-abc/stuckSubscriptionTopic";
final String sharedSubName = "shared";
final String failoverSubName = "failOver";

Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionType(SubscriptionType.Shared).subscriptionName(sharedSubName).subscribe();
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionType(SubscriptionType.Failover).subscriptionName(failoverSubName).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription sharedSub = topic.getSubscription(sharedSubName);
PersistentSubscription failOverSub = topic.getSubscription(failoverSubName);

PersistentDispatcherMultipleConsumers sharedDispatcher = (PersistentDispatcherMultipleConsumers) sharedSub
.getDispatcher();
PersistentDispatcherSingleActiveConsumer failOverDispatcher = (PersistentDispatcherSingleActiveConsumer) failOverSub
.getDispatcher();

// build backlog
consumer1.close();
consumer2.close();

// block sub to read messages
sharedDispatcher.havePendingRead = true;
failOverDispatcher.havePendingRead = true;

producer.newMessage().value("test").eventTime(5).send();
producer.newMessage().value("test").eventTime(5).send();

consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Shared)
.subscriptionName(sharedSubName).subscribe();
consumer2 = pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionType(SubscriptionType.Failover)
.subscriptionName(failoverSubName).subscribe();
Message<String> msg = consumer1.receive(2, TimeUnit.SECONDS);
assertNull(msg);
msg = consumer2.receive(2, TimeUnit.SECONDS);
assertNull(msg);

// allow reads but dispatchers are still blocked
sharedDispatcher.havePendingRead = false;
failOverDispatcher.havePendingRead = false;

// run task to unblock stuck dispatcher: first iteration sets the lastReadPosition and next iteration will
// unblock the dispatcher read because read-position has not been moved since last iteration.
sharedSub.checkAndUnblockIfStuck();
failOverDispatcher.checkAndUnblockIfStuck();
assertTrue(sharedSub.checkAndUnblockIfStuck());
assertTrue(failOverDispatcher.checkAndUnblockIfStuck());

msg = consumer1.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
msg = consumer2.receive(5, TimeUnit.SECONDS);
assertNotNull(msg);
}
}
1 change: 1 addition & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|maxUnackedMessagesPerSubscription| The same as above, except per subscription rather than per consumer. |200000|
| maxUnackedMessagesPerBroker | Maximum number of unacknowledged messages allowed per broker. Once this limit reaches, the broker stops dispatching messages to all shared subscriptions which has a higher number of unacknowledged messages until subscriptions start acknowledging messages back and unacknowledged messages count reaches to limit/2. When the value is set to 0, unacknowledged message limit check is disabled and broker does not block dispatchers. | 0 |
| maxUnackedMessagesPerSubscriptionOnBrokerBlocked | Once the broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which have higher unacknowledged messages than this percentage limit and subscription does not receive any new messages until that subscription acknowledges messages back. | 0.16 |
| unblockStuckSubscriptionEnabled|Broker periodically checks if subscription is stuck and unblock if flag is enabled.|false|
|maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0|
|zookeeperSessionExpiredPolicy|There are two policies when ZooKeeper session expired happens, "shutdown" and "reconnect". If it is set to "shutdown" policy, when ZooKeeper session expired happens, the broker is shutdown. If it is set to "reconnect" policy, the broker tries to reconnect to ZooKeeper server and re-register metadata to ZooKeeper. Note: the "reconnect" policy is an experiment feature.|shutdown|
| topicPublisherThrottlingTickTimeMillis | Tick time to schedule task that checks topic publish rate limiting across all topics. A lower value can improve accuracy while throttling publish but it uses more CPU to perform frequent check. (Disable publish throttling with value 0) | 10|
Expand Down

0 comments on commit 8d9a2ab

Please sign in to comment.