Skip to content
This repository has been archived by the owner on Nov 4, 2020. It is now read-only.

Commit

Permalink
optimistic recovery of queue head page on open
Browse files Browse the repository at this point in the history
recover method wip

recover method wip

recevered lenght 0 is invalid

fix firstUnackedSeqNum

DRYied open and recover

DRYed and refactored to extract AbstractByteBufferPageIO from ByteBufferPageIO and MmapPageIO

better exception messages

cleanup and remove subject idiom

rename _mapFile to mapFile

added invalid state recovery tests

use log4j

renamed TestSettings methods to improve readability

duplicate code

add version check

typo

use test exceptions annotation

use parametrized tests

added uncheck() method to clean test stream

add better message todo

proper javadoc comment

typo
  • Loading branch information
colinsurprenant committed Jan 31, 2017
1 parent ca9ff91 commit 356dd71
Show file tree
Hide file tree
Showing 14 changed files with 686 additions and 508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ public class HeadPage extends Page {
// create a new HeadPage object and new page.{pageNum} empty valid data file
public HeadPage(int pageNum, Queue queue, PageIO pageIO) throws IOException {
super(pageNum, queue, 0, 0, 0, new BitSet(), pageIO);
pageIO.create();
}

// create a new HeadPage object from an existing checkpoint and open page.{pageNum} empty valid data file
// @param pageIO is expected to be open/recover/create
public HeadPage(Checkpoint checkpoint, Queue queue, PageIO pageIO) throws IOException {
super(checkpoint.getPageNum(), queue, checkpoint.getMinSeqNum(), checkpoint.getElementCount(), checkpoint.getFirstUnackedSeqNum(), new BitSet(), pageIO);

// open the data file and reconstruct the IO object internal state
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
assert checkpoint.getMinSeqNum() == pageIO.getMinSeqNum() && checkpoint.getElementCount() == pageIO.getElementCount() :
String.format("checkpoint minSeqNum=%d or elementCount=%d is different than pageIO minSeqNum=%d or elementCount=%d", checkpoint.getMinSeqNum(), checkpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());

// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
Expand Down
91 changes: 53 additions & 38 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.logstash.ackedqueue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.common.io.CheckpointIO;
import org.logstash.common.io.PageIO;
import org.logstash.common.io.PageIOFactory;
Expand Down Expand Up @@ -60,22 +62,24 @@ public class Queue implements Closeable {
private final Method deserializeMethod;

// thread safety
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

private static final Logger logger = LogManager.getLogger(Queue.class);

public Queue(Settings settings) {
this(
settings.getDirPath(),
settings.getCapacity(),
settings.getQueueMaxBytes(),
settings.getCheckpointIOFactory().build(settings.getDirPath()),
settings.getPageIOFactory(),
settings.getElementClass(),
settings.getMaxUnread(),
settings.getCheckpointMaxWrites(),
settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxInterval()
settings.getDirPath(),
settings.getCapacity(),
settings.getQueueMaxBytes(),
settings.getCheckpointIOFactory().build(settings.getDirPath()),
settings.getPageIOFactory(),
settings.getElementClass(),
settings.getMaxUnread(),
settings.getCheckpointMaxWrites(),
settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxInterval()
);
}

Expand Down Expand Up @@ -142,11 +146,8 @@ public void open() throws IOException {
try {
headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName());
} catch (NoSuchFileException e) {
headCheckpoint = null;
}
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method

// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
if (headCheckpoint == null) {
this.seqNum = 0;
headPageNum = 0;

Expand All @@ -162,33 +163,45 @@ public void open() throws IOException {
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {

// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
Checkpoint tailCheckpoint = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));

PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());

add(tailCheckpoint, pageIO);
add(cp, new TailPage(cp, this, pageIO));
}

// transform the head page into a tail page only if the headpage is non-empty
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages

if (headCheckpoint.getMinSeqNum() <= 0 && headCheckpoint.getElementCount() <= 0) {
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data

if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) {
// the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes

logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum());
logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount());

long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum();
if (firstUnackedSeqNum < pageIO.getMinSeqNum()) {
logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum());
firstUnackedSeqNum = pageIO.getMinSeqNum();
}
headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount());
}
this.headPage = new HeadPage(headCheckpoint, this, pageIO);

if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) {
// head page is empty, let's keep it as-is

PageIO headPageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
this.headPage = new HeadPage(headCheckpoint, this, headPageIO);
this.currentByteSize += headPageIO.getCapacity();
this.currentByteSize += pageIO.getCapacity();

// but checkpoint it to update the firstUnackedPageNum if it changed
this.headPage.checkpoint();
} else {
// head page is non-empty, transform it into a tail page and create a new empty head page

PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);

TailPage p = new TailPage(headCheckpoint, this, pageIO);
p.checkpoint();
add(headCheckpoint, pageIO);
add(headCheckpoint, this.headPage.behead());

headPageNum = headCheckpoint.getPageNum() + 1;
newCheckpointedHeadpage(headPageNum);
Expand All @@ -207,10 +220,12 @@ public void open() throws IOException {
this.closed.set(false);
}

private void add(Checkpoint checkpoint, PageIO pageIO) throws IOException {
// add a read tail page into this queue structures but also verify that this tail page
// is not fully acked in which case it will be purged
private void add(Checkpoint checkpoint, TailPage page) throws IOException {
if (checkpoint.isFullyAcked()) {
// first make sure any fully acked page per the checkpoint is purged if not already
try { pageIO.purge(); } catch (NoSuchFileException e) { /* ignore */ }
try { page.getPageIO().purge(); } catch (NoSuchFileException e) { /* ignore */ }

// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
Expand All @@ -223,30 +238,30 @@ private void add(Checkpoint checkpoint, PageIO pageIO) throws IOException {
} else {
// create a tail page with a null PageIO and add it to tail pages but not unreadTailPages
// since it is fully read because also fully acked
// TODO: I don't like this null pageIO tail page...
this.tailPages.add(new TailPage(checkpoint, this, null));
}
} else {
TailPage p = new TailPage(checkpoint, this, pageIO);
this.tailPages.add(p);
this.unreadTailPages.add(p);
this.unreadCount += p.unreadCount();
this.currentByteSize += pageIO.getCapacity();
this.tailPages.add(page);
this.unreadTailPages.add(page);
this.unreadCount += page.unreadCount();
this.currentByteSize += page.getPageIO().getCapacity();

// for now deactivate all tail pages, we will only reactivate the first one at the end
pageIO.deactivate();
page.getPageIO().deactivate();
}

// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
if (checkpoint.maxSeqNum() > this.seqNum) {
this.seqNum = checkpoint.maxSeqNum();
}

}

// create a new empty headpage for the given pageNum and imidiately checkpoint it
// @param pageNum the page number of the new head page
private void newCheckpointedHeadpage(int pageNum) throws IOException {
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
headPageIO.create();
this.headPage = new HeadPage(pageNum, this, headPageIO);
this.headPage.forceCheckpoint();
this.currentByteSize += headPageIO.getCapacity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ public TailPage(HeadPage page) {
}

// create a new TailPage object for an exiting Checkpoint and data file
// @param pageIO the PageIO object is expected to be open/recover/create
public TailPage(Checkpoint checkpoint, Queue queue, PageIO pageIO) throws IOException {
super(checkpoint.getPageNum(), queue, checkpoint.getMinSeqNum(), checkpoint.getElementCount(), checkpoint.getFirstUnackedSeqNum(), new BitSet(), pageIO);

// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
this.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}

if (pageIO != null) {
// open the data file and reconstruct the IO object internal state
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
}

}

public void checkpoint() throws IOException {
Expand Down
Loading

0 comments on commit 356dd71

Please sign in to comment.