Skip to content

Commit

Permalink
Cleanup old compacted topic ledgers when a new one is available (apac…
Browse files Browse the repository at this point in the history
…he#1302)

When a broker is notified of a new compacted ledger for a topic, and
after it has been persisted to the compaction cursor, delete the old
compacted topic ledger. The deletion is best effort. If it fails it
doesn't affect reading from the newer ledger.

This isn't foolproof by any means. There's still plenty of
opportunities for ledgers to be orphaned. We'll need some sort of GC
based on ledger metadata to handle all cases, but that needs a newer
bookkeeper (for the metadata properties changes).
  • Loading branch information
ivankelly authored and merlimat committed Feb 28, 2018
1 parent 518120d commit 3a6b0fb
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
*/
package org.apache.pulsar.compaction;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;

public interface CompactedTopic {
void newCompactedLedger(Position p, long compactedLedgerId);
CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor, int numberOfEntriesToRead,
ReadEntriesCallback callback, Object ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,20 @@ public CompactedTopicImpl(BookKeeper bk) {
}

@Override
public void newCompactedLedger(Position p, long compactedLedgerId) {
public CompletableFuture<?> newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
compactionHorizon = (PositionImpl)p;

CompletableFuture<CompactedTopicContext> previousContext = compactedTopicContext;
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);

// delete the ledger from the old context once the new one is open
if (previousContext != null) {
return compactedTopicContext.thenCompose((res) -> previousContext)
.thenCompose((res) -> tryDeleteCompactedLedger(bk, res.ledger.getId()));
} else {
return compactedTopicContext;
}
}
}

Expand Down Expand Up @@ -182,6 +192,21 @@ private static CompletableFuture<CompactedTopicContext> openCompactedLedger(Book
ledger, createCache(ledger, DEFAULT_STARTPOINT_CACHE_SIZE)));
}

private static CompletableFuture<Void> tryDeleteCompactedLedger(BookKeeper bk, long id) {
CompletableFuture<Void> promise = new CompletableFuture<>();
bk.asyncDeleteLedger(id,
(rc, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("Error deleting compacted topic ledger {}",
id, BKException.create(rc));
} else {
log.debug("Compacted topic ledger deleted successfully");
}
promise.complete(null); // don't propagate any error
}, null);
return promise;
}

private static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, long to) {
CompletableFuture<Enumeration<LedgerEntry>> promise = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,31 @@
*/
package org.apache.pulsar.compaction;

import static org.apache.pulsar.client.impl.RawReaderTest.extractKey;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;


import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.impl.RawMessageImpl;

Expand All @@ -73,6 +55,7 @@

public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
private static final Logger log = LoggerFactory.getLogger(CompactedTopicTest.class);
private static final ByteBuf emptyBuffer = Unpooled.buffer(0);

@BeforeMethod
@Override
Expand Down Expand Up @@ -105,7 +88,6 @@ public void cleanup() throws Exception {
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
List<Pair<MessageIdData,Long>> positions = new ArrayList<>();
List<Pair<MessageIdData,Long>> idsInGaps = new ArrayList<>();
ByteBuf emptyBuffer = Unpooled.buffer(0);

AtomicLong ledgerIds = new AtomicLong(10L);
AtomicLong entryIds = new AtomicLong(0L);
Expand Down Expand Up @@ -211,4 +193,47 @@ public void testEntryLookup() throws Exception {
Long.valueOf(gap.getRight()));
}
}

@Test
public void testCleanupOldCompactedTopicLedger() throws Exception {
BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
this.conf, null);

LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
oldCompactedLedger.close();
LedgerHandle newCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
newCompactedLedger.close();

// set the compacted topic ledger
CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);
compactedTopic.newCompactedLedger(new PositionImpl(1,2), oldCompactedLedger.getId()).get();

// ensure both ledgers still exist, can be opened
bk.openLedger(oldCompactedLedger.getId(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
bk.openLedger(newCompactedLedger.getId(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();

// update the compacted topic ledger
compactedTopic.newCompactedLedger(new PositionImpl(1,2), newCompactedLedger.getId()).get();

// old ledger should be deleted, new still there
try {
bk.openLedger(oldCompactedLedger.getId(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
Assert.fail("Should have failed to open old ledger");
} catch (BKException.BKNoSuchLedgerExistsException e) {
// correct, expected behaviour
}
bk.openLedger(newCompactedLedger.getId(),
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
}
}

0 comments on commit 3a6b0fb

Please sign in to comment.