Skip to content

Commit

Permalink
[fix][broker] Fix currency bug in BucketDelayedDeliveryTracker#recove…
Browse files Browse the repository at this point in the history
…rBucketSnapshot (apache#19394)
  • Loading branch information
lhotari authored Feb 2, 2023
1 parent cb1a031 commit 35ce526
Showing 1 changed file with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -111,7 +110,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat

private synchronized long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.cursor;
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new ConcurrentHashMap<>();
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>();
cursor.getCursorProperties().keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
Expand All @@ -124,54 +123,62 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
}
});

if (immutableBuckets.asMapOfRanges().isEmpty()) {
Map<Range<Long>, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges();
if (immutableBucketMap.isEmpty()) {
return 0;
}

List<CompletableFuture<Void>> futures = new ArrayList<>(immutableBuckets.asMapOfRanges().size());
for (Map.Entry<Range<Long>, ImmutableBucket> entry :immutableBuckets.asMapOfRanges().entrySet()) {
Map<Range<Long>, CompletableFuture<List<DelayedIndex>>>
futures = new HashMap<>(immutableBucketMap.size());
for (Map.Entry<Range<Long>, ImmutableBucket> entry : immutableBucketMap.entrySet()) {
Range<Long> key = entry.getKey();
ImmutableBucket immutableBucket = entry.getValue();
CompletableFuture<Void> future =
immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime).thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
// Delete bucket snapshot if indexList is empty
toBeDeletedBucketMap.put(key, immutableBucket);
return;
}
DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1);
synchronized (this.snapshotSegmentLastIndexTable) {
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), immutableBucket);
}
synchronized (this.sharedBucketPriorityQueue) {
for (DelayedIndex index : indexList) {
this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}
});
futures.add(future);
futures.put(key, immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime));
}

try {
FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
toBeDeletedBucketMap.forEach((k, immutableBucket) -> {
immutableBuckets.asMapOfRanges().remove(k);
immutableBucket.asyncDeleteBucketSnapshot();
});
}).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(e);
}

for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) {
Range<Long> key = entry.getKey();
// the future will always be completed since it was waited for above
List<DelayedIndex> indexList = entry.getValue().getNow(null);
ImmutableBucket immutableBucket = immutableBucketMap.get(key);
if (CollectionUtils.isEmpty(indexList)) {
// Delete bucket snapshot if indexList is empty
toBeDeletedBucketMap.put(key, immutableBucket);
} else {
DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), immutableBucket);
for (DelayedIndex index : indexList) {
this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}
}

for (Map.Entry<Range<Long>, ImmutableBucket> mapEntry : toBeDeletedBucketMap.entrySet()) {
Range<Long> key = mapEntry.getKey();
ImmutableBucket immutableBucket = mapEntry.getValue();
immutableBucketMap.remove(key);
// delete asynchronously without waiting for completion
immutableBucket.asyncDeleteBucketSnapshot();
}

MutableLong numberDelayedMessages = new MutableLong(0);
immutableBuckets.asMapOfRanges().values().forEach(bucket -> {
immutableBucketMap.values().forEach(bucket -> {
numberDelayedMessages.add(bucket.numberBucketDelayedMessages);
});

log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}",
dispatcher.getName(), immutableBuckets.asMapOfRanges().size(), numberDelayedMessages.getValue());
dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue());

return numberDelayedMessages.getValue();
}
Expand Down

0 comments on commit 35ce526

Please sign in to comment.