Skip to content

Commit

Permalink
OAK-1932: TarMK compaction can create mixed segments
Browse files Browse the repository at this point in the history
Allow a different SegmentWriter to be used with the compactor
Adjust compaction and cleanup code to make it easier to test and monitor

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1607077 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jukka committed Jul 1, 2014
1 parent 1dac310 commit df034a5
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* average, the amortized size of each entry in this mapping is about
* {@code 20/n + 8} bytes, assuming compressed pointers.
*/
class CompactionMap {
public class CompactionMap {

private final int compressInterval;
private final Map<RecordId, RecordId> recent = newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,6 @@ public class Compactor {
/** Logger instance */
private static final Logger log = LoggerFactory.getLogger(Compactor.class);

public static CompactionMap compact(SegmentStore store) {
SegmentWriter writer = store.getTracker().getWriter();
Compactor compactor = new Compactor(writer);

log.debug("TarMK compaction");

SegmentNodeBuilder builder = writer.writeNode(EMPTY_NODE).builder();
SegmentNodeState before = store.getHead();
EmptyNodeState.compareAgainstEmptyState(
before, compactor.newCompactDiff(builder));

SegmentNodeState after = builder.getNodeState();
while (!store.setHead(before, after)) {
// Some other concurrent changes have been made.
// Rebase (and compact) those changes on top of the
// compacted state before retrying to set the head.
SegmentNodeState head = store.getHead();
head.compareAgainstBaseState(
before, compactor.newCompactDiff(builder));
before = head;
after = builder.getNodeState();
}

compactor.map.compress();
return compactor.map;
}

/**
* Locks down the RecordId persistence structure
*/
Expand All @@ -89,6 +62,8 @@ static long[] recordAsKey(RecordId r) {

private final SegmentWriter writer;

private final SegmentNodeBuilder builder;

private CompactionMap map = new CompactionMap(100000);

/**
Expand All @@ -98,12 +73,19 @@ static long[] recordAsKey(RecordId r) {
*/
private final Map<String, List<RecordId>> binaries = newHashMap();

private Compactor(SegmentWriter writer) {
public Compactor(SegmentWriter writer) {
this.writer = writer;
this.builder = writer.writeNode(EMPTY_NODE).builder();
}

public SegmentNodeState compact(NodeState before, NodeState after) {
after.compareAgainstBaseState(before, new CompactDiff(builder));
return builder.getNodeState();
}

private CompactDiff newCompactDiff(NodeBuilder builder) {
return new CompactDiff(builder);
public CompactionMap getCompactionMap() {
map.compress();
return map;
}

private class CompactDiff extends ApplyDiff {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.jackrabbit.oak.plugins.segment.SegmentTracker;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentStore;
import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
Expand Down Expand Up @@ -239,7 +240,6 @@ public void run() {
timeToClose.await(1, SECONDS);
while (timeToClose.getCount() > 0) {
long start = System.nanoTime();
compact();
try {
flush();
} catch (IOException e) {
Expand Down Expand Up @@ -335,7 +335,19 @@ static Map<Integer, Map<Character, File>> collectFiles(File directory)
return dataFiles;
}

public synchronized long size() throws IOException {
long size = writeFile.length();
for (TarReader reader : readers) {
size += reader.size();
}
return size;
}

public void flush() throws IOException {
if (compactNeeded.getAndSet(false)) {
compact();
}

synchronized (persistedHead) {
RecordId before = persistedHead.get();
RecordId after = head.get();
Expand All @@ -356,39 +368,7 @@ public void flush() throws IOException {
persistedHead.set(after);

if (cleanup) {
long start = System.nanoTime();

// Suggest to the JVM that now would be a good time
// to clear stale weak references in the SegmentTracker
System.gc();

Set<UUID> ids = newHashSet();
for (SegmentId id : tracker.getReferencedSegmentIds()) {
ids.add(new UUID(
id.getMostSignificantBits(),
id.getLeastSignificantBits()));
}
writer.cleanup(ids);

List<TarReader> list =
newArrayListWithCapacity(readers.size());
for (TarReader reader : readers) {
TarReader cleaned = reader.cleanup(ids);
if (cleaned == reader) {
list.add(reader);
} else {
if (cleaned != null) {
list.add(cleaned);
}
File file = reader.close();
log.info("TarMK GC: Cleaned up file {}", file);
toBeRemoved.addLast(file);
}
}
readers = list;

log.debug("TarMK GC: Completed in {}ms",
MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
cleanup();
}
}

Expand All @@ -405,15 +385,65 @@ public void flush() throws IOException {
}
}

synchronized void cleanup() throws IOException {
long start = System.nanoTime();
log.info("TarMK revision cleanup started");

// Suggest to the JVM that now would be a good time
// to clear stale weak references in the SegmentTracker
System.gc();

Set<UUID> ids = newHashSet();
for (SegmentId id : tracker.getReferencedSegmentIds()) {
ids.add(new UUID(
id.getMostSignificantBits(),
id.getLeastSignificantBits()));
}
writer.cleanup(ids);

List<TarReader> list =
newArrayListWithCapacity(readers.size());
for (TarReader reader : readers) {
TarReader cleaned = reader.cleanup(ids);
if (cleaned == reader) {
list.add(reader);
} else {
if (cleaned != null) {
list.add(cleaned);
}
File file = reader.close();
log.info("TarMK revision cleanup reclaiming {}", file.getName());
toBeRemoved.addLast(file);
}
}
readers = list;

log.info("TarMK revision cleanup completed in {}ms",
MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS));
}

public void compact() {
if (compactNeeded.getAndSet(false)) {
long start = System.nanoTime();
tracker.getWriter().dropCache();
tracker.setCompactionMap(Compactor.compact(this));
log.info("TarMK Compaction: Completed in {}ms", MILLISECONDS
.convert(System.nanoTime() - start, NANOSECONDS));
cleanupNeeded.set(true);
long start = System.nanoTime();
log.info("TarMK compaction started");

SegmentWriter writer = new SegmentWriter(this, tracker);
Compactor compactor = new Compactor(writer);

SegmentNodeState before = getHead();
SegmentNodeState after = compactor.compact(EMPTY_NODE, before);
while (!setHead(before, after)) {
// Some other concurrent changes have been made.
// Rebase (and compact) those changes on top of the
// compacted state before retrying to set the head.
SegmentNodeState head = getHead();
after = compactor.compact(before, head);
before = head;
}
tracker.setCompactionMap(compactor.getCompactionMap());

log.info("TarMK compaction completed in {}ms", MILLISECONDS
.convert(System.nanoTime() - start, NANOSECONDS));
cleanupNeeded.set(true);
}

public synchronized Iterable<SegmentId> getSegmentIds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,10 @@ private TarReader(File file, FileAccess access, ByteBuffer index)
this.graph = loadGraph(file, access, index);
}

long size() {
return file.length();
}

Set<UUID> getUUIDs() {
Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
int position = index.position();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import java.util.Random;

import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
import org.apache.jackrabbit.oak.plugins.segment.Compactor;
import org.apache.jackrabbit.oak.plugins.segment.SegmentBlob;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentWriter;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -83,6 +87,88 @@ public void testRestartAndGC(boolean memoryMapping) throws IOException {
store.close();
}

@Test
public void testCompaction() throws IOException {
int largeBinarySize = 10 * 1024 * 1024;

FileStore store = new FileStore(directory, 1, false);
SegmentWriter writer = store.getTracker().getWriter();

SegmentNodeState base = store.getHead();
SegmentNodeBuilder builder = base.builder();
byte[] data = new byte[largeBinarySize];
new Random().nextBytes(data);
SegmentBlob blob = writer.writeStream(new ByteArrayInputStream(data));
builder.setProperty("foo", blob);
builder.getNodeState(); // write the blob reference to the segment
builder.setProperty("foo", "bar");
SegmentNodeState head = builder.getNodeState();
assertTrue(store.setHead(base, head));
assertEquals("bar", store.getHead().getString("foo"));
store.close();

// First simulate the case where during compaction a reference to the
// older segments is added to a segment that the compactor is writing
store = new FileStore(directory, 1, false);
head = store.getHead();
assertTrue(store.size() > largeBinarySize);
Compactor compactor = new Compactor(writer);
SegmentNodeState compacted =
compactor.compact(EmptyNodeState.EMPTY_NODE, head);
builder = head.builder();
builder.setChildNode("old", head); // reference to pre-compacted state
builder.getNodeState();
assertTrue(store.setHead(head, compacted));
store.close();

// In this case the revision cleanup is unable to reclaim the old data
store = new FileStore(directory, 1, false);
assertTrue(store.size() > largeBinarySize);
store.cleanup();
assertTrue(store.size() > largeBinarySize);
store.close();

// Now we do the same thing, but let the compactor use a different
// SegmentWriter
store = new FileStore(directory, 1, false);
head = store.getHead();
assertTrue(store.size() > largeBinarySize);
writer = new SegmentWriter(store, store.getTracker());
compactor = new Compactor(writer);
compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
builder = head.builder();
builder.setChildNode("old", head); // reference to pre-compacted state
builder.getNodeState();
writer.flush();
assertTrue(store.setHead(head, compacted));
store.close();

// Revision cleanup is still unable to reclaim extra space (OAK-1932)
store = new FileStore(directory, 1, false);
assertTrue(store.size() > largeBinarySize);
store.cleanup();
assertTrue(store.size() > largeBinarySize); // FIXME: should be <
store.close();

// Finally do the compaction without concurrent writes
store = new FileStore(directory, 1, false);
head = store.getHead();
assertTrue(store.size() > largeBinarySize);
writer = new SegmentWriter(store, store.getTracker());
compactor = new Compactor(writer);
compacted = compactor.compact(EmptyNodeState.EMPTY_NODE, head);
writer.flush();
assertTrue(store.setHead(head, compacted));
store.close();

// Now the revision cleanup is able to reclaim the extra space
store = new FileStore(directory, 1, false);
assertTrue(store.size() > largeBinarySize);
store.cleanup();
assertTrue(store.size() < largeBinarySize);
store.close();
}

@Test
public void testRecovery() throws IOException {
FileStore store = new FileStore(directory, 1, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.jackrabbit.oak.plugins.backup.FileStoreRestore;
import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.segment.Compactor;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentId;
Expand Down Expand Up @@ -216,7 +215,7 @@ private static void compact(String[] args) throws IOException {
System.out.println(" -> compacting");
FileStore store = new FileStore(directory, 256, false);
try {
Compactor.compact(store);
store.compact();
} finally {
store.close();
}
Expand Down

0 comments on commit df034a5

Please sign in to comment.