Skip to content

Commit

Permalink
[pulsar-broker] Fix bug that message delivery stops after resetting c…
Browse files Browse the repository at this point in the history
…ursor for failover subscription (apache#5185)

### Motivation

Resetting the cursor for a subscription in Failover mode may cause message delivery to stop. This can be reproduced with the following procedure:

1. Connect multiple consumers to a subscription in Failover mode
1. Reset the subscription cursor to a past position
1. Close some consumers
1. The remaining consumers may not receive new messages from the topic

At this time, the active consumer is already closed one:

```js
"subscriptions" : {
  "sub1" : {
    "msgRateOut" : 0.0,
    "msgThroughputOut" : 0.0,
    "msgRateRedeliver" : 0.0,
    "msgBacklog" : 57604,
    "blockedSubscriptionOnUnackedMsgs" : false,
    "unackedMessages" : 0,
    "type" : "Failover",
    "activeConsumerName" : "04b6c", // This consumer is already closed!
    "msgRateExpired" : 0.0,
    "consumers" : [ {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "consumerName" : "06317b",
      "availablePermits" : 564,
      "unackedMessages" : 0,
      "blockedConsumerOnUnackedMsgs" : false,
      "metadata" : { },
      "connectedSince" : "2019-09-11T18:56:25.413+09:00",
      "clientVersion" : "2.3.2",
      "address" : "/xxx.xxx.xxx.xxx:36968"
    }, {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "consumerName" : "37edc",
      "availablePermits" : 1000,
      "unackedMessages" : 0,
      "blockedConsumerOnUnackedMsgs" : false,
      "metadata" : { },
      "connectedSince" : "2019-09-11T18:56:27.77+09:00",
      "clientVersion" : "2.3.2",
      "address" : "/xxx.xxx.xxx.xxx:38392"
    }, {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "consumerName" : "822f0",
      "availablePermits" : 1000,
      "unackedMessages" : 0,
      "blockedConsumerOnUnackedMsgs" : false,
      "metadata" : { },
      "connectedSince" : "2019-09-11T18:56:27.769+09:00",
      "clientVersion" : "2.3.2",
      "address" : "/xxx.xxx.xxx.xxx:38380"
    }, {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "consumerName" : "b91282",
      "availablePermits" : 1000,
      "unackedMessages" : 0,
      "blockedConsumerOnUnackedMsgs" : false,
      "metadata" : { },
      "connectedSince" : "2019-09-11T18:56:25.413+09:00",
      "clientVersion" : "2.3.2",
      "address" : "/xxx.xxx.xxx.xxx:38408"
    } ]
  }
},
```

This is because `AbstractDispatcherSingleActiveConsumer#closeFuture` is not null, so `pickAndScheduleActiveConsumer()` is not called and the active consumer does not change.
https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L181-L184

`closeFuture` becomes non-null when `disconnectAllConsumers()` is called. And once a value is assigned, it will never return to null.
https://github.com/apache/pulsar/blob/8c3445ad6746df93fef80d2c661374cdab00bc38/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L217-L218

`disconnectAllConsumers()` is called when unloading or deleting a topic, as well as when resetting the cursor.

### Modifications

Added `resetCloseFuture()` method to the Dispatcher classes to return `closeFuture` to null when resetting cursor is completed.

(cherry picked from commit 499069e)
  • Loading branch information
Masahiro Sakamoto authored and wolfstudy committed Nov 19, 2019
1 parent f19d2f5 commit 10bc14f
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,8 @@ private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf hea
return;
}
}

public void resetCloseFuture() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topicName, consumer);
consumer.disconnect();
}

if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
}
Expand Down Expand Up @@ -215,7 +216,13 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers() {
return closeFuture;
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
}

public void reset() {
resetCloseFuture();
IS_CLOSED_UPDATER.set(this, FALSE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public interface Dispatcher {
*/
CompletableFuture<Void> disconnectAllConsumers();

void resetCloseFuture();

/**
* mark dispatcher open to serve new incoming requests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,14 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers() {
return closeFuture;
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
}

@Override
public void reset() {
resetCloseFuture();
IS_CLOSED_UPDATER.set(this, FALSE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,14 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers() {
return closeFuture;
}

@Override
public synchronized void resetCloseFuture() {
closeFuture = null;
}

@Override
public void reset() {
resetCloseFuture();
IS_CLOSED_UPDATER.set(this, FALSE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,13 +632,18 @@ private void resetCursor(Position finalPosition, CompletableFuture<Void> future)
}

disconnectFuture.whenComplete((aVoid, throwable) -> {
if (dispatcher != null) {
dispatcher.resetCloseFuture();
}

if (throwable != null) {
log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(
new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
}

log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
topicName, subName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,73 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep
admin.topics().delete(topicName);
}

@Test
public void persistentTopicsCursorResetAndFailover() throws Exception {
final String namespace = "prop-xyz/ns1";
final String topicName = "persistent://" + namespace + "/reset-cursor-and-failover";
final String subName = "sub1";

admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10));

// Create consumer and failover subscription
Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.consumerName("consumerA").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

publishMessagesOnPersistentTopic(topicName, 5, 0);

// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long messageTimestamp = System.currentTimeMillis();

publishMessagesOnPersistentTopic(topicName, 5, 5);

// Currently the active consumer is consumerA
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumerA.receive(5, TimeUnit.SECONDS);
consumerA.acknowledge(message);
}

admin.topics().resetCursor(topicName, subName, messageTimestamp);

// In v2.5 or later, the first connected consumer is active.
// So consumerB connected later will not be active.
// cf. https://github.com/apache/pulsar/pull/4604
Thread.sleep(1000);
Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.consumerName("consumerB").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();

int receivedAfterReset = 0;
for (int i = 4; i < 10; i++) {
Message<byte[]> message = consumerA.receive(5, TimeUnit.SECONDS);
consumerA.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 6);

// Closing consumerA activates consumerB
consumerA.close();

publishMessagesOnPersistentTopic(topicName, 5, 10);

int receivedAfterFailover = 0;
for (int i = 10; i < 15; i++) {
Message<byte[]> message = consumerB.receive(5, TimeUnit.SECONDS);
consumerB.acknowledge(message);
++receivedAfterFailover;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterFailover, 5);

consumerB.close();
admin.topics().deleteSubscription(topicName, subName);
admin.topics().delete(topicName);
}

@Test(dataProvider = "topicName")
public void partitionedTopicsCursorReset(String topicName) throws Exception {
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
Expand Down

0 comments on commit 10bc14f

Please sign in to comment.