Skip to content

Commit

Permalink
cleanup & DRY Queue open(), addIO() and addPage()
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant committed Dec 1, 2017
1 parent 9a31b7d commit 54de04c
Showing 1 changed file with 57 additions and 70 deletions.
127 changes: 57 additions & 70 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,24 @@ public void open() throws IOException {
for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) {

// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
final Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));

logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());

PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
addIO(cp, pageIO);
// important to NOT pageIO.open() just yet, we must first verify if it is fully acked in which case
// we can purge it and we don't care about its integrity for example if it is of zero-byte file size.
if (cp.isFullyAcked()) {
purgeTailPage(cp, pageIO);
} else {
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
addTailPage(cp, PageFactory.newTailPage(cp, this, pageIO));
}

// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
if (cp.maxSeqNum() > this.seqNum) {
this.seqNum = cp.maxSeqNum();
}
}

// transform the head page into a tail page only if the headpage is non-empty
Expand Down Expand Up @@ -222,17 +234,23 @@ public void open() throws IOException {
// but checkpoint it to update the firstUnackedPageNum if it changed
this.headPage.checkpoint();
} else {
// head page is non-empty, transform it into a tail page and create a new empty head page
// head page is non-empty, transform it into a tail page
this.headPage.behead();
addPage(headCheckpoint, this.headPage);

headPageNum = headCheckpoint.getPageNum() + 1;
newCheckpointedHeadpage(headPageNum);
if (headCheckpoint.isFullyAcked()) {
purgeTailPage(headCheckpoint, pageIO);
} else {
addTailPage(headCheckpoint, this.headPage);
}

// track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum
if (headCheckpoint.maxSeqNum() > this.seqNum) {
this.seqNum = headCheckpoint.maxSeqNum();
}

// create a new empty head page
headPageNum = headCheckpoint.getPageNum() + 1;
newCheckpointedHeadpage(headPageNum);
}

// only activate the first tail page
Expand All @@ -250,75 +268,44 @@ public void open() throws IOException {
}
}

// TODO: addIO and addPage are almost identical - we should refactor to DRY it up.

// addIO is basically the same as addPage except that it avoid calling PageIO.open
// before actually purging the page if it is fully acked. This avoid dealing with
// zero byte page files that are fully acked.
// see issue #7809
private void addIO(Checkpoint checkpoint, PageIO pageIO) throws IOException {
if (checkpoint.isFullyAcked()) {
// first make sure any fully acked page per the checkpoint is purged if not already
try { pageIO.purge(); } catch (NoSuchFileException e) { /* ignore */ }

// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
// we will remove any prepended fully acked tail pages but keep all other checkpoints between the first
// unacked tail page and the head page. we did however purge the data file to free disk resources.

if (this.tailPages.size() == 0) {
// this is the first tail page and it is fully acked so just purge it
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
}
} else {
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
Page page = PageFactory.newTailPage(checkpoint, this, pageIO);

this.tailPages.add(page);
this.unreadTailPages.add(page);
this.unreadCount += page.unreadCount();
this.currentByteSize += page.getPageIO().getCapacity();
/**
* delete files for the given page
*
* @param checkpoint the tail page {@link Checkpoint}
* @param pageIO the tail page {@link PageIO}
* @throws IOException
*/
private void purgeTailPage(Checkpoint checkpoint, PageIO pageIO) throws IOException {
try {
pageIO.purge();
} catch (NoSuchFileException e) { /* ignore */ }

// for now deactivate all tail pages, we will only reactivate the first one at the end
page.getPageIO().deactivate();
}
// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
// we will remove any prepended fully acked tail pages but keep all other checkpoints between the first
// unacked tail page and the head page. we did however purge the data file to free disk resources.

// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
if (checkpoint.maxSeqNum() > this.seqNum) {
this.seqNum = checkpoint.maxSeqNum();
if (this.tailPages.size() == 0) {
// this is the first tail page and it is fully acked so just purge it
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
}
}

// add a read tail page into this queue structures but also verify that this tail page
// is not fully acked in which case it will be purged
private void addPage(Checkpoint checkpoint, Page page) throws IOException {
if (checkpoint.isFullyAcked()) {
// first make sure any fully acked page per the checkpoint is purged if not already
try { page.getPageIO().purge(); } catch (NoSuchFileException e) { /* ignore */ }

// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
// we will remove any prepended fully acked tail pages but keep all other checkpoints between the first
// unacked tail page and the head page. we did however purge the data file to free disk resources.

if (this.tailPages.size() == 0) {
// this is the first tail page and it is fully acked so just purge it
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
}
} else {
this.tailPages.add(page);
this.unreadTailPages.add(page);
this.unreadCount += page.unreadCount();
this.currentByteSize += page.getPageIO().getCapacity();

// for now deactivate all tail pages, we will only reactivate the first one at the end
page.getPageIO().deactivate();
}

// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
if (checkpoint.maxSeqNum() > this.seqNum) {
this.seqNum = checkpoint.maxSeqNum();
}
/**
* add a not fully-acked tail page into this queue structures and un-mmap it.
*
* @param checkpoint the tail page {@link Checkpoint}
* @param page the tail {@link Page}
* @throws IOException
*/
private void addTailPage(Checkpoint checkpoint, Page page) throws IOException {
this.tailPages.add(page);
this.unreadTailPages.add(page);
this.unreadCount += page.unreadCount();
this.currentByteSize += page.getPageIO().getCapacity();

// for now deactivate all tail pages, we will only reactivate the first one at the end
page.getPageIO().deactivate();
}

// create a new empty headpage for the given pageNum and immediately checkpoint it
Expand Down

0 comments on commit 54de04c

Please sign in to comment.