Skip to content

Commit

Permalink
Merge pull request yahoo#41 from bellofreedom/halodb_thread_safe
Browse files Browse the repository at this point in the history
Enhance the thread safety when tombstone file merging running
  • Loading branch information
bellofreedom authored Oct 18, 2019
2 parents b96d796 + f4ef922 commit 4e617b4
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions src/main/java/com/oath/halodb/HaloDBInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class HaloDBInternal {

private volatile TombstoneFile currentTombstoneFile;

private volatile Thread tombstoneMergeThread;

private Map<Integer, HaloDBFile> readFileMap = new ConcurrentHashMap<>();

HaloDBOptions options;
Expand Down Expand Up @@ -79,7 +81,7 @@ class HaloDBInternal {

private HaloDBInternal() {}

static synchronized HaloDBInternal open(File directory, HaloDBOptions options) throws HaloDBException, IOException {
static HaloDBInternal open(File directory, HaloDBOptions options) throws HaloDBException, IOException {
checkIfOptionsAreCorrect(options);

HaloDBInternal dbInternal = new HaloDBInternal();
Expand Down Expand Up @@ -137,8 +139,8 @@ static synchronized HaloDBInternal open(File directory, HaloDBOptions options) t
// merge tombstone files at background if clean up set to true
if (options.isCleanUpTombstonesDuringOpen()) {
dbInternal.isTombstoneFilesMerging = true;
Thread t = new Thread(() -> { dbInternal.mergeTombstoneFiles(); });
t.start();
dbInternal.tombstoneMergeThread = new Thread(() -> { dbInternal.mergeTombstoneFiles(); });
dbInternal.tombstoneMergeThread.start();
}

logger.info("Opened HaloDB {}", directory.getName());
Expand Down Expand Up @@ -172,6 +174,15 @@ synchronized void close() throws IOException {
setIOErrorFlag();
}

if (isTombstoneFilesMerging) {
try {
tombstoneMergeThread.join();
} catch (InterruptedException e) {
logger.error("Interrupted when waiting the tombstone files merging");
setIOErrorFlag();
}
}

if (options.isCleanUpInMemoryIndexOnClose())
inMemoryIndex.close();

Expand Down Expand Up @@ -229,7 +240,7 @@ boolean put(byte[] key, byte[] value) throws IOException, HaloDBException {
byte[] get(byte[] key, int attemptNumber) throws IOException, HaloDBException {
if (attemptNumber > maxReadAttempts) {
logger.error("Tried {} attempts but read failed", attemptNumber-1);
throw new HaloDBException("Tried " + attemptNumber + " attempts but failed.");
throw new HaloDBException("Tried " + (attemptNumber-1) + " attempts but failed.");
}
InMemoryIndexMetaData metaData = inMemoryIndex.get(key);
if (metaData == null) {
Expand Down Expand Up @@ -291,9 +302,14 @@ int get(byte[] key, ByteBuffer buffer) throws IOException {
synchronized boolean takeSnapshot() {
logger.info("Start generating the snapshot");

if (this.isTombstoneFilesMerging) {
logger.error("DB is not ready for snapshot now");
return false;
if (isTombstoneFilesMerging) {
logger.info("DB is merging the tombstone files now. Wait it finished");
try {
tombstoneMergeThread.join();
} catch (InterruptedException e) {
logger.error("Interrupted when waiting the tombstone files merging");
return false;
}
}

try {
Expand Down Expand Up @@ -334,7 +350,6 @@ synchronized boolean takeSnapshot() {
compactionManager.forceRolloverCurrentWriteFile();

logger.info("Storage files number need to be linked: {}", filesToLink.length);

for (File file : filesToLink) {
Path dest = Paths.get(snapshotDir.getAbsolutePath(), file.getName());
logger.debug("Create file link from file {} to {}", file.getName(),
Expand All @@ -343,7 +358,6 @@ synchronized boolean takeSnapshot() {
}
} catch(IOException e) {
logger.warn("IOException when creating snapshot", e);

return false;
} finally {
compactionManager.resumeCompaction();
Expand Down Expand Up @@ -788,6 +802,13 @@ private void mergeTombstoneFiles() {
}
}

if (mergedTombstoneFile != null) {
try {
mergedTombstoneFile.close();
} catch (IOException e) {
logger.error("IO exception when closing tombstone file: {}", mergedTombstoneFile.getName(), e);
}
}
logger.info("Tombstone files count, before merge:{}, after merge:{}",
tombStoneFiles.length, dbDirectory.listTombstoneFiles().length);
isTombstoneFilesMerging = false;
Expand Down

0 comments on commit 4e617b4

Please sign in to comment.