Skip to content

Commit

Permalink
Support truncate topic (apache#10326)
Browse files Browse the repository at this point in the history
Fixes apache#9597 

### Motivation

Add support for truncate all data of the topic without disconnect the producers and consumers.

### Modifications

Add a API to truncate data of the topic which can delete all inactive ledgers even if the data retention is enabled. The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers. The truncate operation will not rollover the current ledger, so the active ledger will not been deleted. So for this cases, if users want to delete all data from the topic and currently no new data writes to the topic, all the data will be deleted after the ledger rollover triggered.

And, in the future we can add a separate command for rollover the ledger manually. Which can decouple with the truncate api. So that from the admin side, we can combine the rollover API and the truncate API to delete both the current active ledger and inactive ledgers.
  • Loading branch information
jangwind authored May 12, 2021
1 parent 0cc3301 commit 99ccd1d
Show file tree
Hide file tree
Showing 17 changed files with 468 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,4 +600,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
* will got null if corresponding ledger not exists.
*/
CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);

/**
* Truncate ledgers
* The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.
*/
CompletableFuture<Void> asyncTruncate();
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -2152,8 +2153,12 @@ public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise)));
}

private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);
public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, promise)));
}

private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100, TimeUnit.MILLISECONDS);
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
Expand Down Expand Up @@ -2260,9 +2265,13 @@ private boolean isOffloadedNeedsDelete(OffloadContext offload) {
* @throws Exception
*/
void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
internalTrimLedgers(false, promise);
}

void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
// Ensure only one trimming operation is active
if (!trimmerMutex.tryLock()) {
scheduleDeferredTrimming(promise);
scheduleDeferredTrimming(isTruncate, promise);
return;
}

Expand Down Expand Up @@ -2316,11 +2325,11 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
break;
} else if (expired) {
log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
} else if (expired || isTruncate) {
log.debug("[{}] Ledger {} has expired or be truncated, expired is {}, isTruncate is {}, ts {}", name, ls.getLedgerId(), expired, isTruncate, ls.getTimestamp());
ledgersToDelete.add(ls);
} else if (overRetentionQuota) {
log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());
} else if (overRetentionQuota || isTruncate) {
log.debug("[{}] Ledger {} is over quota or be truncated, overRetentionQuota is {}, isTruncate is {}", name, ls.getLedgerId(), overRetentionQuota, isTruncate);
ledgersToDelete.add(ls);
} else {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
Expand All @@ -2343,7 +2352,7 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {

if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(promise);
scheduleDeferredTrimming(isTruncate, promise);
trimmerMutex.unlock();
return;
}
Expand Down Expand Up @@ -3738,6 +3747,35 @@ public void setEntriesAddedCounter(long count) {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

@Override
public CompletableFuture<Void> asyncTruncate() {

final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (ManagedCursor cursor : cursors) {
final CompletableFuture<Void> future = new CompletableFuture<>();
cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() {
@Override
public void clearBacklogComplete(Object ctx) {
future.complete(null);
}

@Override
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
futures.add(future);
}
CompletableFuture<Void> future = new CompletableFuture();
FutureUtil.waitForAll(futures).thenAccept(p -> {
internalTrimLedgers(true, future);
}).exceptionally(e -> {
future.completeExceptionally(e);
return null;
});
return future;
}

public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
LedgerInfo ledgerInfo = ledgers.get(ledgerId);
if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -2917,4 +2916,59 @@ public void testManagedLedgerRollOverIfFull() throws Exception {
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
Assert.assertEquals(ledger.getTotalSize(), 0);
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(50);
config.setRetentionTime(1, TimeUnit.DAYS);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
ManagedCursor cursor = ledger.openCursor("test-cursor");
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));

ledger.close();
ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
ledger2.addEntry("test-entry-2".getBytes(Encoding));


CompletableFuture<Void> future = ledger2.asyncTruncate();
future.get();

assertTrue(ledger2.getLedgersInfoAsList().size() <= 1);
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerSlowestCursor() throws Exception {

ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig config = new ManagedLedgerConfig();

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
ManagedCursor cursor = ledger.openCursor("test-cursor");
ManagedCursor cursor2 = ledger.openCursor("test-cursor2");
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));
ledger.addEntry("test-entry-1".getBytes(Encoding));

ledger.close();
ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)factory.open("truncate_ledger", config);
ledger2.addEntry("test-entry-2".getBytes(Encoding));
ManagedCursor cursor3 = ledger2.openCursor("test-cursor");
cursor3.resetCursor(new PositionImpl(ledger2.getLastPosition()));

CompletableFuture<Void> future = ledger2.asyncTruncate();
future.get();

assertTrue(ledger2.getLedgersInfoAsList().size() == 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3915,4 +3915,76 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
}
}
}

protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
Topic topic;
try {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
topic = getTopicReference(topicName);
} catch (Exception e) {
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
CompletableFuture<Void> future = topic.truncate();
future.thenAccept(a -> {
asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
Response.Status.NO_CONTENT.getReasonPhrase()));
}).exceptionally(e -> {
asyncResponse.resume(e);
return null;
});
}

protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {

// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
if (meta.partitions > 0) {
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.truncateAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
asyncResponse.resume(new RestException(e));
return;
}
}
FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable th = exception.getCause();
if (th instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
} else if (th instanceof WebApplicationException) {
asyncResponse.resume(th);
} else {
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
asyncResponse.resume(new RestException(exception));
}
} else {
asyncResponse.resume(Response.noContent().build());
}
return null;
});
} else {
internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
}
}).exceptionally(t -> {
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, t);
if (t instanceof WebApplicationException) {
asyncResponse.resume(t);
} else {
asyncResponse.resume(new RestException(t));
}
return null;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -378,6 +379,32 @@ public void getListFromBundle(
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/truncate")
@ApiOperation(value = "Truncate a topic.",
notes = "NonPersistentTopic does not support truncate.")
@ApiResponses(value = {
@ApiResponse(code = 412, message = "NonPersistentTopic does not support truncate.")
})
public void truncateTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative){
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED.getStatusCode(),
"unsupport truncate"));
}

protected void validateAdminOperationOnTopic(TopicName topicName, boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
}

private Topic getTopicReference(TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3103,5 +3103,31 @@ public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/truncate")
@ApiOperation(value = "Truncate a topic.",
notes = "The truncate operation will move all cursors to the end of the topic "
+ "and delete all inactive ledgers.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void truncateTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative){
validateTopicName(tenant, namespace, encodedTopic);
internalTruncateTopic(asyncResponse, authoritative);

}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,11 @@ default boolean isSystemTopic() {
*/
CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark);

/**
* Truncate a topic.
* The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers.
* @return
*/
CompletableFuture<Void> truncate();

}
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,11 @@ public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterM
new Exception("Unsupported operation endTxn in non-persistent topic."));
}

@Override
public CompletableFuture<Void> truncate() {
return FutureUtil.failedFuture(new NotAllowedException("Unsupported truncate"));
}

protected boolean isTerminated() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2837,6 +2837,11 @@ public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterM
});
}

@Override
public CompletableFuture<Void> truncate() {
return ledger.asyncTruncate();
}

public long getDelayedDeliveryTickTimeMillis() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
Expand Down
Loading

0 comments on commit 99ccd1d

Please sign in to comment.