Skip to content

Commit

Permalink
Add a read lock when traversing batchDeletedIndexes (apache#10763)
Browse files Browse the repository at this point in the history
### Motivation
We will add a write lock when writing batchDeletedIndexes.
The traversal read operation is not atomic, and read lock need to be added

### Modifications
add read lock in  `buildBatchEntryDeletionIndexInfoList`
  • Loading branch information
315157973 authored Jun 1, 2021
1 parent e6b35d9 commit 76d3426
Showing 1 changed file with 28 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2519,30 +2519,35 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
}

private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletionIndexInfoList() {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
lock.readLock().lock();
try {
if (!config.isDeletionAtBatchIndexLevelEnabled() || batchDeletedIndexes == null || batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
deleteSet.add(l);
}
batchDeletedIndexInfoBuilder.clearDeleteSet();
batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
result.add(batchDeletedIndexInfoBuilder.build());
}
return result;
} finally {
lock.readLock().unlock();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
MLDataFormats.BatchedEntryDeletionIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchedEntryDeletionIndexInfo
.newBuilder();
List<MLDataFormats.BatchedEntryDeletionIndexInfo> result = Lists.newArrayList();
Iterator<Map.Entry<PositionImpl, BitSetRecyclable>> iterator = batchDeletedIndexes.entrySet().iterator();
while (iterator.hasNext() && result.size() < config.getMaxBatchDeletedIndexToPersist()) {
Map.Entry<PositionImpl, BitSetRecyclable> entry = iterator.next();
nestedPositionBuilder.setLedgerId(entry.getKey().getLedgerId());
nestedPositionBuilder.setEntryId(entry.getKey().getEntryId());
batchDeletedIndexInfoBuilder.setPosition(nestedPositionBuilder.build());
long[] array = entry.getValue().toLongArray();
List<Long> deleteSet = new ArrayList<>(array.length);
for (long l : array) {
deleteSet.add(l);
}
batchDeletedIndexInfoBuilder.clearDeleteSet();
batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
result.add(batchDeletedIndexInfoBuilder.build());
}
return result;
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
Expand Down

0 comments on commit 76d3426

Please sign in to comment.