Skip to content

Commit

Permalink
Fix the batch index ack persistent issue. (apache#9504)
Browse files Browse the repository at this point in the history
### Motivation

Fix the batch index ack persistent issue. Currently, the `batchDeletedIndexInfoBuilder` been reused for generating the batch index ack data, but the delete set does not been cleared before adding the delete set.

### Modifications

Clear the delete set before add the new delete set.

### Verifying this change

Test added. Without this fix, the test failed.
  • Loading branch information
codelipenghui authored Feb 7, 2021
1 parent a958ee9 commit 7916666
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,7 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
for (long l : array) {
deleteSet.add(l);
}
batchDeletedIndexInfoBuilder.clearDeleteSet();
batchDeletedIndexInfoBuilder.addAllDeleteSet(deleteSet);
result.add(batchDeletedIndexInfoBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3155,7 +3155,10 @@ public void testBatchIndexDelete() throws ManagedLedgerException, InterruptedExc

@Test
public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerException, InterruptedException {
ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent");
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
// Make sure the cursor metadata updated by the cursor ledger ID.
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(-1);
ManagedLedger ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
ManagedCursor cursor = ledger.openCursor("c1");

final int totalEntries = 100;
Expand All @@ -3165,15 +3168,19 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
positions[i] = ledger.addEntry(("entry-" + i).getBytes(Encoding));
}
assertEquals(cursor.getNumberOfEntries(), totalEntries);
deleteBatchIndex(cursor, positions[6], 10, Lists.newArrayList(new IntRange().setStart(1).setEnd(3)));
deleteBatchIndex(cursor, positions[5], 10, Lists.newArrayList(new IntRange().setStart(3).setEnd(6)));
deleteBatchIndex(cursor, positions[0], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[1], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[2], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));

ledger = factory.open("test_batch_indexes_deletion_persistent");
cursor.close();
ledger.close();
ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
cursor = ledger.openCursor("c1");

List<IntRange> deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
Assert.assertEquals(deletedIndexes.size(), 1);
Assert.assertEquals(deletedIndexes.get(0).getStart(), 3);
Expand All @@ -3183,6 +3190,11 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[5]), 10);
Assert.assertNull(deletedIndexes);
Assert.assertEquals(cursor.getMarkDeletedPosition(), positions[5]);

deletedIndexes = getAckedIndexRange(cursor.getDeletedBatchIndexesAsLongArray((PositionImpl) positions[6]), 10);
Assert.assertEquals(deletedIndexes.size(), 1);
Assert.assertEquals(deletedIndexes.get(0).getStart(), 1);
Assert.assertEquals(deletedIndexes.get(0).getEnd(), 3);
}

private void deleteBatchIndex(ManagedCursor cursor, Position position, int batchSize,
Expand Down

0 comments on commit 7916666

Please sign in to comment.