Skip to content

Commit

Permalink
[improve][client] PIP-313 Support force unsubscribe using consumer api (
Browse files Browse the repository at this point in the history
apache#21687)

Co-authored-by: Jiwe Guo <[email protected]>
  • Loading branch information
rdhabalia and Technoboy- authored Dec 19, 2023
1 parent f970534 commit 631b13a
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ public void disconnect(boolean isResetCursor, Optional<BrokerLookupData> assigne
}
}

public void doUnsubscribe(final long requestId) {
subscription.doUnsubscribe(this).thenAccept(v -> {
public void doUnsubscribe(final long requestId, boolean force) {
subscription.doUnsubscribe(this, force).thenAccept(v -> {
log.info("Unsubscribed successfully from {}", subscription);
cnx.removedConsumer(this);
cnx.getCommandSender().sendSuccessResponse(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,7 @@ protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) {
CompletableFuture<Consumer> consumerFuture = consumers.get(unsubscribe.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId());
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId(), unsubscribe.isForce());
} else {
commandSender.sendErrorResponse(unsubscribe.getRequestId(), ServerError.MetadataError,
"Consumer not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Void> doUnsubscribe(Consumer consumer);

CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean forcefully);

CompletableFuture<Void> clearBacklog();

CompletableFuture<Void> skipMessages(int numMessagesToSkip);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,24 @@ private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
return doUnsubscribe(consumer, false);
}

/**
* Handle unsubscribe command from the client API Check with the dispatcher is this consumer can proceed with
* unsubscribe.
*
* @param consumer consumer object that is initiating the unsubscribe operation
* @param force unsubscribe forcefully by disconnecting consumers and closing subscription
* @return CompletableFuture indicating the completion of ubsubscribe operation
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean force) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
if (dispatcher.canUnsubscribe(consumer)) {
if (force || dispatcher.canUnsubscribe(consumer)) {
consumer.close();
return delete();
return delete(force);
}
future.completeExceptionally(
new ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,11 +1074,27 @@ private CompletableFuture<Void> delete(boolean closeIfConsumersConnected) {
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
return doUnsubscribe(consumer, false);
}

/**
* Handle unsubscribe command from the client API Check with the dispatcher is this consumer can proceed with
* unsubscribe.
*
* @param consumer consumer object that is initiating the unsubscribe operation
* @param force unsubscribe forcefully by disconnecting consumers and closing subscription
* @return CompletableFuture indicating the completion of unsubscribe operation
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer, boolean force) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
if (dispatcher.canUnsubscribe(consumer)) {
if (force || dispatcher.canUnsubscribe(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] unsubscribing forcefully {}-{}", topicName, subName, consumer.consumerName());
}
consumer.close();
return delete();
return delete(force);
}
future.completeExceptionally(
new ServerMetadataException("Unconnected or shared consumer attempting to unsubscribe"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,4 +1073,41 @@ public void testManagedLedgerLazyCursorLedgerCreation() throws Exception {
});
}

@Test
public void testSharedConsumerUnsubscribe() throws Exception {
String topic = "persistent://my-property/my-ns/sharedUnsubscribe";
String sub = "my-subscriber-name";
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
.subscriptionName(sub).subscribe();
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionType(SubscriptionType.Shared)
.subscriptionName(sub).subscribe();
try {
consumer1.unsubscribe();
fail("should have failed as consumer-2 is already connected");
} catch (Exception e) {
// Ok
}

consumer1.unsubscribe(true);
try {
consumer2.unsubscribe(true);
} catch (PulsarClientException.NotConnectedException e) {
// Ok. consumer-2 is already disconnected with force unsubscription
}
assertFalse(consumer1.isConnected());
assertFalse(consumer2.isConnected());
}

@Test(dataProvider = "subType")
public void testUnsubscribeForce(SubscriptionType type) throws Exception {
String topic = "persistent://my-property/my-ns/sharedUnsubscribe";
String sub = "my-subscriber-name";
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionType(type)
.subscriptionName(sub).subscribe();
consumer1.unsubscribe(true);
assertFalse(consumer1.isConnected());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,31 @@ public interface Consumer<T> extends Closeable, MessageAcknowledger {
*/
CompletableFuture<Void> unsubscribeAsync();


/**
* Unsubscribe the consumer.
*
* <p>This call blocks until the consumer is unsubscribed.
*
* <p>Unsubscribing will the subscription to be deleted and all the
* data retained can potentially be deleted as well.
*
* <p>The operation will fail when performed on a shared subscription
* where multiple consumers are currently connected.
*
* @param force forcefully unsubscribe by disconnecting connected consumers.
* @throws PulsarClientException if the operation fails
*/
void unsubscribe(boolean force) throws PulsarClientException;

/**
* Asynchronously unsubscribe the consumer.
*
* @see Consumer#unsubscribe()
* @param force forcefully unsubscribe by disconnecting connected consumers.
* @return {@link CompletableFuture} to track the operation
*/
CompletableFuture<Void> unsubscribeAsync(boolean force);
/**
* Receives a single message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ public NotConnectedException() {
public NotConnectedException(long sequenceId) {
super("Not connected to broker", sequenceId);
}

public NotConnectedException(String msg) {
super(msg);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,13 @@ public void negativeAcknowledge(Messages<?> messages) {

@Override
public void unsubscribe() throws PulsarClientException {
unsubscribe(false);
}

@Override
public void unsubscribe(boolean force) throws PulsarClientException {
try {
unsubscribeAsync().get();
unsubscribeAsync(force).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
Expand All @@ -722,7 +727,12 @@ public void unsubscribe() throws PulsarClientException {
}

@Override
public abstract CompletableFuture<Void> unsubscribeAsync();
public CompletableFuture<Void> unsubscribeAsync() {
return unsubscribeAsync(false);
}

@Override
public abstract CompletableFuture<Void> unsubscribeAsync(boolean force);

@Override
public void close() throws PulsarClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ NegativeAcksTracker getNegativeAcksTracker() {
}

@Override
public CompletableFuture<Void> unsubscribeAsync() {
public CompletableFuture<Void> unsubscribeAsync(boolean force) {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
Expand All @@ -413,7 +413,7 @@ public CompletableFuture<Void> unsubscribeAsync() {
if (isConnected()) {
setState(State.Closing);
long requestId = client.newRequestId();
ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, requestId);
ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, requestId, force);
ClientCnx cnx = cnx();
cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
closeConsumerTasks();
Expand All @@ -433,7 +433,7 @@ public CompletableFuture<Void> unsubscribeAsync() {
});
} else {
unsubscribeFuture.completeExceptionally(
new PulsarClientException(
new PulsarClientException.NotConnectedException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public void negativeAcknowledge(Message<?> message) {
}

@Override
public CompletableFuture<Void> unsubscribeAsync() {
public CompletableFuture<Void> unsubscribeAsync(boolean force) {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
Expand All @@ -568,7 +568,7 @@ public CompletableFuture<Void> unsubscribeAsync() {

CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futureList = consumers.values().stream()
.map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
.map(c -> c.unsubscribeAsync(force)).collect(Collectors.toList());

FutureUtil.waitForAll(futureList)
.thenComposeAsync((r) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,12 @@ private static KeySharedMode convertKeySharedMode(org.apache.pulsar.client.api.K
}
}

public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
public static ByteBuf newUnsubscribe(long consumerId, long requestId, boolean force) {
BaseCommand cmd = localCmd(Type.UNSUBSCRIBE);
cmd.setUnsubscribe()
.setConsumerId(consumerId)
.setRequestId(requestId);
.setRequestId(requestId)
.setForce(force);
return serializeWithSize(cmd);
}

Expand Down
1 change: 1 addition & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ message CommandFlow {
message CommandUnsubscribe {
required uint64 consumer_id = 1;
required uint64 request_id = 2;
optional bool force = 3 [default = false];
}

// Reset an existing consumer to a particular message id
Expand Down

0 comments on commit 631b13a

Please sign in to comment.