Skip to content

Commit

Permalink
Admin-api to reset cursor by position (apache#785)
Browse files Browse the repository at this point in the history
* Admin-api to reset cursor by position

* Fix: api notes for global topic

* reset cursor using messageId
  • Loading branch information
rdhabalia authored Oct 10, 2017
1 parent 193ea53 commit 34ae881
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.Codec.decode;

import java.io.IOException;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
Expand All @@ -87,8 +89,6 @@
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
Expand All @@ -113,7 +113,6 @@
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

/**
*/
Expand Down Expand Up @@ -906,11 +905,9 @@ public void expireMessagesForAllSubscriptions(@PathParam("property") String prop

@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor/{timestamp}")
@ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "There should not be any active consumers on the subscription.")
@ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Not supported for global and non-persistent topics"),
@ApiResponse(code = 412, message = "Subscription has active consumers") })
@ApiResponse(code = 404, message = "Topic/Subscription does not exist") })
public void resetCursor(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@PathParam("subName") String subName, @PathParam("timestamp") long timestamp,
Expand Down Expand Up @@ -944,8 +941,8 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to time {}", clientAppId(), dn, subName,
timestamp, partitionException);
throw new RestException(Status.PRECONDITION_FAILED, partitionException.getMessage());
} else if (numPartException > 0 && log.isDebugEnabled()) {
log.debug("[{}][{}] partial errors for reset cursor on subscription {} to time {} - ", clientAppId(),
} else if (numPartException > 0) {
log.warn("[{}][{}] partial errors for reset cursor on subscription {} to time {} - ", clientAppId(),
destination, subName, timestamp, partitionException);
}

Expand All @@ -954,6 +951,9 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
log.info("[{}][{}] received reset cursor on subscription {} to time {}", clientAppId(), destination,
subName, timestamp);
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
if (topic == null) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
Expand All @@ -967,8 +967,6 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} else if (e instanceof NotAllowedException) {
throw new RestException(Status.METHOD_NOT_ALLOWED, e.getMessage());
} else if (t instanceof SubscriptionBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
} else if (t instanceof SubscriptionInvalidCursorPosition) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for timestamp specified -" + t.getMessage());
Expand All @@ -979,6 +977,56 @@ public void resetCursor(@PathParam("property") String property, @PathParam("clus
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
public void resetCursorOnPosition(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@PathParam("subName") String subName,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), destination,
subName, messageId);

PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace,
destination, authoritative);

if (partitionMetadata.partitions > 0) {
log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), dn, subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for partitioned-topic");
} else {
validateAdminOperationOnDestination(dn, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
if (topic == null) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] successfully reset cursor on subscription {} to position {}", clientAppId(), dn,
subName, messageId);
} catch (Exception e) {
Throwable t = e.getCause();
log.warn("[{}] [{}] Failed to reset cursor on subscription {} to position {}", clientAppId(), dn,
subName, messageId, e);
if (e instanceof NullPointerException) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} else if (t instanceof SubscriptionInvalidCursorPosition) {
throw new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage());
} else {
throw new RestException(e);
}
}
}
}

@GET
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/position/{messagePosition}")
@ApiOperation(value = "Peek nth message on a topic subscription.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;

Expand Down Expand Up @@ -59,6 +59,8 @@ public interface Subscription {
CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> resetCursor(long timestamp);

CompletableFuture<Void> resetCursor(Position position);

CompletableFuture<Entry> peekNthMessage(int messagePosition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -347,6 +348,11 @@ public void markTopicWithBatchMessagePublished() {
topic.markBatchMessagePublished();
}

@Override
public CompletableFuture<Void> resetCursor(Position position) {
return CompletableFuture.completedFuture(null);
}

private static final Logger log = LoggerFactory.getLogger(NonPersistentSubscription.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ResetCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
Expand Down Expand Up @@ -343,62 +344,7 @@ public void findEntryComplete(Position position, Object ctx) {
} else {
finalPosition = position;
}

if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription"));
return;
}

final CompletableFuture<Void> disconnectFuture;
if (dispatcher != null && dispatcher.isConsumerConnected()) {
disconnectFuture = dispatcher.disconnectAllConsumers();
} else {
disconnectFuture = CompletableFuture.completedFuture(null);
}

disconnectFuture.whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
}
log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset", topicName, subName);

try {
cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() {
@Override
public void resetComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName,
timestamp);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
}

@Override
public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to timestamp {}", topicName, subName, timestamp,
exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
future.completeExceptionally(new SubscriptionInvalidCursorPosition(exception.getMessage()));
} else if (exception instanceof ConcurrentFindCursorPositionException) {
future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
} else {
future.completeExceptionally(new BrokerServiceException(exception));
}
}
});
} catch (Exception e) {
log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(new BrokerServiceException(e));
}
});
resetCursor(finalPosition, future);
}

@Override
Expand All @@ -415,6 +361,73 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

@Override
public CompletableFuture<Void> resetCursor(Position position) {
CompletableFuture<Void> future = new CompletableFuture<>();
resetCursor(position, future);
return future;
}

private void resetCursor(Position finalPosition, CompletableFuture<Void> future) {
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription"));
return;
}

final CompletableFuture<Void> disconnectFuture;
if (dispatcher != null && dispatcher.isConsumerConnected()) {
disconnectFuture = dispatcher.disconnectAllConsumers();
} else {
disconnectFuture = CompletableFuture.completedFuture(null);
}

disconnectFuture.whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(
new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
}
log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
topicName, subName);

try {
cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() {
@Override
public void resetComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
finalPosition);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
}

@Override
public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName,
finalPosition, exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
future.completeExceptionally(new SubscriptionInvalidCursorPosition(exception.getMessage()));
} else if (exception instanceof ConcurrentFindCursorPositionException) {
future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
} else {
future.completeExceptionally(new BrokerServiceException(exception));
}
}
});
} catch (Exception e) {
log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(new BrokerServiceException(e));
}
});
}

@Override
public CompletableFuture<Entry> peekNthMessage(int messagePosition) {
CompletableFuture<Entry> future = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit 34ae881

Please sign in to comment.