Skip to content

Commit

Permalink
OAK-1927: TarMK compaction delays journal updates
Browse files Browse the repository at this point in the history
Refactor the existing flush thread functionality to a new BackgroundThread class
Add a new TarMK compaction background thread

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1607185 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jukka committed Jul 1, 2014
1 parent ed48b54 commit df3d85e
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.plugins.segment.file;

import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BackgroundThread extends Thread {

/** Logger instance */
private static final Logger log =
LoggerFactory.getLogger(BackgroundThread.class);

private final String name;

private final long interval;

private long backlog = 0;

private long lastDuration = 0;

private long maxDuration = 0;

BackgroundThread(String name, long interval, Runnable target) {
super(target, name);

this.name = name;
this.interval = interval;

setDaemon(true);
setPriority(MIN_PRIORITY);
start();
}

@Override
public void run() {
try {
while (waitUntilNextIteration()) {
long start = nanoTime();
super.run();
long seconds = SECONDS.convert(nanoTime() - start, NANOSECONDS);

if (lastDuration != seconds) {
lastDuration = seconds;
if (maxDuration < seconds) {
maxDuration = seconds;
}
// make execution statistics visible in thread dumps
setName(name + " " + lastDuration + "/" + maxDuration);
}
}
} catch (InterruptedException e) {
log.error(name + " interrupted", e);
}
}

void trigger() {
trigger(false);
}

void close() {
try {
trigger(true);
join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error(name + " join interrupted", e);
}
}

private synchronized void trigger(boolean close) {
if (close) {
backlog = -1;
} else if (backlog >= 0) {
backlog++;
}
notify();
}

private synchronized boolean waitUntilNextIteration()
throws InterruptedException {
if (backlog == 0) {
// no backlog to process (and not closed), so wait...
if (interval < 0) {
wait();
} else {
wait(interval);
}
}

if (backlog > 0) {
backlog--;
}

return backlog >= 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;

import java.io.File;
Expand All @@ -43,7 +42,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -119,29 +117,24 @@ public class FileStore implements SegmentStore {
* The background flush thread. Automatically flushes the TarMK state
* once every five seconds.
*/
private final Thread flushThread;
private final BackgroundThread flushThread;

/**
* Flag to request revision cleanup during the next flush.
* The background compaction thread. Compacts the TarMK contents whenever
* triggered by the {@link #gc()} method.
*/
private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);
private final BackgroundThread compactionThread;

/**
* Flag to request segment compaction during the next flush.
* Flag to request revision cleanup during the next flush.
*/
private final AtomicBoolean compactNeeded = new AtomicBoolean(false);
private final AtomicBoolean cleanupNeeded = new AtomicBoolean(false);

/**
* List of old tar file generations that are waiting to be removed.
*/
private final LinkedList<File> toBeRemoved = newLinkedList();

/**
* Synchronization aid used by the background flush thread to stop itself
* as soon as the {@link #close()} method is called.
*/
private final CountDownLatch timeToClose = new CountDownLatch(1);

public FileStore(BlobStore blobStore, File directory, int maxFileSizeMB, boolean memoryMapping)
throws IOException {
this(blobStore, directory, EMPTY_NODE, maxFileSizeMB, 0, memoryMapping);
Expand Down Expand Up @@ -233,32 +226,27 @@ public FileStore(
persistedHead = new AtomicReference<RecordId>(null);
}

this.flushThread = new Thread(new Runnable() {
@Override
public void run() {
try {
timeToClose.await(1, SECONDS);
while (timeToClose.getCount() > 0) {
long start = System.nanoTime();
this.flushThread = new BackgroundThread(
"TarMK flush thread [" + directory + "]", 5000, // 5s interval
new Runnable() {
@Override
public void run() {
try {
flush();
} catch (IOException e) {
log.warn("Failed to flush the TarMK at" +
directory, e);
}
long time = SECONDS.convert(
System.nanoTime() - start, NANOSECONDS);
timeToClose.await(Math.max(5, 2 * time), SECONDS);
}
} catch (InterruptedException e) {
log.warn("TarMK flush thread interrupted");
}
}
});
flushThread.setName("TarMK flush thread: " + directory);
flushThread.setDaemon(true);
flushThread.setPriority(Thread.MIN_PRIORITY);
flushThread.start();
});
this.compactionThread = new BackgroundThread(
"TarMK compaction thread [" + directory + "]", -1,
new Runnable() {
@Override
public void run() {
compact();
}
});

log.info("TarMK opened: {} (mmap={})", directory, memoryMapping);
}
Expand Down Expand Up @@ -344,10 +332,6 @@ public synchronized long size() throws IOException {
}

public void flush() throws IOException {
if (compactNeeded.getAndSet(false)) {
compact();
}

synchronized (persistedHead) {
RecordId before = persistedHead.get();
RecordId after = head.get();
Expand Down Expand Up @@ -482,17 +466,13 @@ public boolean setHead(SegmentNodeState base, SegmentNodeState head) {

@Override
public void close() {
try {
// avoid deadlocks while joining the flush thread
timeToClose.countDown();
try {
flushThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while joining the TarMK flush thread", e);
}
// avoid deadlocks by closing (and joining) the background
// threads before acquiring the synchronization lock
compactionThread.close();
flushThread.close();

synchronized (this) {
synchronized (this) {
try {
flush();

writer.close();
Expand All @@ -505,14 +485,14 @@ public void close() {

journalLock.release();
journalFile.close();

System.gc(); // for any memory-mappings that are no longer used
} catch (IOException e) {
throw new RuntimeException(
"Failed to close the TarMK at " + directory, e);
}
} catch (IOException e) {
throw new RuntimeException(
"Failed to close the TarMK at " + directory, e);
}

System.gc(); // for any memory-mappings that are no longer used

log.info("TarMK closed: {}", directory);
}

Expand Down Expand Up @@ -641,7 +621,7 @@ public BlobStore getBlobStore() {

@Override
public void gc() {
compactNeeded.set(true);
compactionThread.trigger();
}

public Map<String, Set<UUID>> getTarReaderIndex() {
Expand Down

0 comments on commit df3d85e

Please sign in to comment.