Skip to content

Commit

Permalink
PQ: Improve stability of ack logic
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed May 24, 2018
1 parent f8afd2f commit 91a23d5
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 80 deletions.
19 changes: 11 additions & 8 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.logstash.ackedqueue.io.LongVector;

public class Batch implements Closeable {

private final List<Queueable> elements;

private final LongVector seqNums;
private final long firstSeqNum;

private final Queue queue;
private final AtomicBoolean closed;

public Batch(SequencedList<byte[]> serialized, Queue q) {
this(serialized.getElements(), serialized.getSeqNums(), q);
this(
serialized.getElements(),
serialized.getSeqNums().size() == 0 ? -1L : serialized.getSeqNums().get(0), q
);
}

public Batch(List<byte[]> elements, LongVector seqNums, Queue q) {
public Batch(List<byte[]> elements, long firstSeqNum, Queue q) {
this.elements = deserializeElements(elements, q);
this.seqNums = seqNums;
this.firstSeqNum = elements.isEmpty() ? -1L : firstSeqNum;
this.queue = q;
this.closed = new AtomicBoolean(false);
}
Expand All @@ -30,7 +33,9 @@ public Batch(List<byte[]> elements, LongVector seqNums, Queue q) {
@Override
public void close() throws IOException {
if (closed.getAndSet(true) == false) {
this.queue.ack(this.seqNums);
if (firstSeqNum >= 0L) {
this.queue.ack(firstSeqNum, elements.size());
}
} else {
// TODO: how should we handle double-closing?
throw new IOException("double closing batch");
Expand All @@ -45,8 +50,6 @@ public List<? extends Queueable> getElements() {
return elements;
}

public LongVector getSeqNums() { return this.seqNums; }

public Queue getQueue() {
return queue;
}
Expand Down
44 changes: 19 additions & 25 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Page.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package org.logstash.ackedqueue;

import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.IOException;
import java.util.BitSet;

import org.codehaus.commons.nullanalysis.NotNull;
import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.PageIO;

public final class Page implements Closeable {
Expand Down Expand Up @@ -99,10 +98,9 @@ public boolean isFullyRead() {
}

public boolean isFullyAcked() {
// TODO: it should be something similar to this when we use a proper bitset class like ES
// this.ackedSeqNum.firstUnackedBit >= this.elementCount;
// TODO: for now use a naive & inefficient mechanism with a simple Bitset
return this.elementCount > 0 && this.ackedSeqNums.cardinality() >= this.elementCount;
final int cardinality = ackedSeqNums.cardinality();
return elementCount > 0 && cardinality == ackedSeqNums.length()
&& cardinality == elementCount;
}

public long unreadCount() {
Expand All @@ -116,37 +114,33 @@ public long unreadCount() {
* the head page to update firstUnackedPageNum because it will be updated in the next upcoming head page checkpoint
* and in a crash condition, the Queue open recovery will detect and purge fully acked pages
*
* @param seqNums the list of same-page seqNums to ack
* @param firstSeqNum Lowest sequence number to ack
* @param count Number of elements to ack
* @param checkpointMaxAcks number of acks before forcing a checkpoint
* @throws IOException
*/
public void ack(LongVector seqNums, int checkpointMaxAcks) throws IOException {
final int count = seqNums.size();
for (int i = 0; i < count; ++i) {
final long seqNum = seqNums.get(i);
// TODO: eventually refactor to use new bit handling class

assert seqNum >= this.minSeqNum :
String.format("seqNum=%d is smaller than minSeqnum=%d", seqNum, this.minSeqNum);

assert seqNum < this.minSeqNum + this.elementCount :
String.format("seqNum=%d is greater than minSeqnum=%d + elementCount=%d = %d", seqNum, this.minSeqNum, this.elementCount, this.minSeqNum + this.elementCount);
int index = (int)(seqNum - this.minSeqNum);

this.ackedSeqNums.set(index);
}

public void ack(long firstSeqNum, int count, int checkpointMaxAcks) throws IOException {
assert firstSeqNum >= this.minSeqNum :
String.format("seqNum=%d is smaller than minSeqnum=%d", firstSeqNum, this.minSeqNum);
final long maxSeqNum = firstSeqNum + count;
assert maxSeqNum <= this.minSeqNum + this.elementCount :
String.format(
"seqNum=%d is greater than minSeqnum=%d + elementCount=%d = %d", maxSeqNum,
this.minSeqNum, this.elementCount, this.minSeqNum + this.elementCount
);
final int offset = Ints.checkedCast(firstSeqNum - this.minSeqNum);
ackedSeqNums.flip(offset, offset + count);
// checkpoint if totally acked or we acked more than checkpointMaxAcks elements in this page since last checkpoint
// note that fully acked pages cleanup is done at queue level in Queue.ack()
long firstUnackedSeqNum = firstUnackedSeqNum();
final long firstUnackedSeqNum = firstUnackedSeqNum();

if (isFullyAcked()) {
checkpoint();

assert firstUnackedSeqNum >= this.minSeqNum + this.elementCount - 1:
String.format("invalid firstUnackedSeqNum=%d for minSeqNum=%d and elementCount=%d and cardinality=%d", firstUnackedSeqNum, this.minSeqNum, this.elementCount, this.ackedSeqNums.cardinality());

} else if (checkpointMaxAcks > 0 && (firstUnackedSeqNum >= this.lastCheckpoint.getFirstUnackedSeqNum() + checkpointMaxAcks)) {
} else if (checkpointMaxAcks > 0 && firstUnackedSeqNum >= this.lastCheckpoint.getFirstUnackedSeqNum() + checkpointMaxAcks) {
// did we acked more than checkpointMaxAcks elements? if so checkpoint now
checkpoint();
}
Expand Down
41 changes: 16 additions & 25 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.logstash.LockException;
import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.MmapPageIOV2;
import org.logstash.ackedqueue.io.PageIO;
import org.logstash.common.FsUtil;
Expand Down Expand Up @@ -508,11 +507,11 @@ public void ensurePersistedUpto(long seqNum) throws IOException{
* @return {@link Batch} the batch containing 1 or more element up to the required limit or null of no elements were available
* @throws IOException
*/
public Batch nonBlockReadBatch(int limit) throws IOException {
public synchronized Batch nonBlockReadBatch(int limit) throws IOException {
lock.lock();
try {
Page p = nextReadPage();
return (isHeadPage(p) && p.isFullyRead()) ? null : _readPageBatch(p, limit, 0L);
return (isHeadPage(p) && p.isFullyRead()) ? null : readPageBatch(p, limit, 0L);
} finally {
lock.unlock();
}
Expand All @@ -525,10 +524,10 @@ public Batch nonBlockReadBatch(int limit) throws IOException {
* @return the read {@link Batch} or null if no element upon timeout
* @throws IOException
*/
public Batch readBatch(int limit, long timeout) throws IOException {
public synchronized Batch readBatch(int limit, long timeout) throws IOException {
lock.lock();
try {
return _readPageBatch(nextReadPage(), limit, timeout);
return readPageBatch(nextReadPage(), limit, timeout);
} finally {
lock.unlock();
}
Expand All @@ -543,14 +542,13 @@ public Batch readBatch(int limit, long timeout) throws IOException {
* @return {@link Batch} with read elements or null if nothing was read
* @throws IOException
*/
private Batch _readPageBatch(Page p, int limit, long timeout) throws IOException {
private Batch readPageBatch(Page p, int limit, long timeout) throws IOException {
int left = limit;
final List<byte[]> elements = new ArrayList<>(limit);
final LongVector seqNums = new LongVector(limit);

// NOTE: the tricky thing here is that upon entering this method, if p is initially a head page
// it could become a tail page upon returning from the notEmpty.await call.

long firstSeqNum = -1L;
while (left > 0) {
if (isHeadPage(p) && p.isFullyRead()) {
boolean elapsed;
Expand All @@ -575,7 +573,9 @@ private Batch _readPageBatch(Page p, int limit, long timeout) throws IOException
int n = serialized.getElements().size();
assert n > 0 : "page read returned 0 elements";
elements.addAll(serialized.getElements());
seqNums.add(serialized.getSeqNums());
if (firstSeqNum == -1L) {
firstSeqNum = serialized.getSeqNums().get(0);
}

this.unreadCount -= n;
left -= n;
Expand All @@ -594,7 +594,7 @@ private Batch _readPageBatch(Page p, int limit, long timeout) throws IOException
removeUnreadPage(p);
}

return new Batch(elements, seqNums, this);
return new Batch(elements, firstSeqNum, this);
}

private static class TailPageResult {
Expand Down Expand Up @@ -652,19 +652,14 @@ private TailPageResult linearFindPageForSeqnum(long seqNum) {
* same-page elements. A fully acked page will trigger a checkpoint for that page. Also if a page has more than checkpointMaxAcks
* acks since last checkpoint it will also trigger a checkpoint.
*
* @param seqNums the list of same-page sequence numbers to ack
* @param firstAckSeqNum First Sequence Number to Ack
* @param ackCount Number of Elements to Ack
* @throws IOException
*/
public void ack(LongVector seqNums) throws IOException {
if (seqNums.size() == 0) {
return;
}
public void ack(final long firstAckSeqNum, final int ackCount) throws IOException {
// as a first implementation we assume that all batches are created from the same page
// so we will avoid multi pages acking here for now

// find the page to ack by traversing from oldest tail page
long firstAckSeqNum = seqNums.get(0);

lock.lock();
try {
TailPageResult result = null;
Expand All @@ -686,17 +681,14 @@ public void ack(LongVector seqNums) throws IOException {
String.format("seqNum=%d is not in head page with minSeqNum=%d", firstAckSeqNum, this.headPage.getMinSeqNum());

// page acking checkpoints fully acked pages
this.headPage.ack(seqNums, this.checkpointMaxAcks);
this.headPage.ack(firstAckSeqNum, ackCount, this.checkpointMaxAcks);
} else {
// page acking also checkpoints fully acked pages or upon reaching the checkpointMaxAcks threshold
result.page.ack(seqNums, this.checkpointMaxAcks);
result.page.ack(firstAckSeqNum, ackCount, this.checkpointMaxAcks);

// cleanup fully acked tail page
if (result.page.isFullyAcked()) {
boolean wasFull = isFull();

this.tailPages.remove(result.index);

// remove page data file regardless if it is the first or a middle tail page to free resources
result.page.purge();

Expand All @@ -715,8 +707,7 @@ public void ack(LongVector seqNums) throws IOException {
nextPageNum++;
}
}

if (wasFull) { notFull.signalAll(); }
notFull.signalAll();
}

this.headPage.checkpoint();
Expand Down
41 changes: 19 additions & 22 deletions logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.MmapPageIOV2;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -653,12 +652,11 @@ public void testAckedCount() throws IOException {
}

long secondSeqNum;
long thirdSeqNum;
try(Queue q = new Queue(settings)){
q.open();

secondSeqNum = q.write(element2);
thirdSeqNum = q.write(element3);
q.write(element3);

b = q.nonBlockReadBatch(1);
assertThat(b.getElements().size(), is(1));
Expand All @@ -669,9 +667,7 @@ public void testAckedCount() throws IOException {
assertThat(b.getElements().get(0), is(element2));
assertThat(b.getElements().get(1), is(element3));

final LongVector seqs = new LongVector(1);
seqs.add(firstSeqNum);
q.ack(seqs);
q.ack(firstSeqNum, 1);
}

try(Queue q = new Queue(settings)) {
Expand All @@ -680,10 +676,7 @@ public void testAckedCount() throws IOException {
b = q.nonBlockReadBatch(2);
assertThat(b.getElements().size(), is(2));

final LongVector seqs = new LongVector(2);
seqs.add(secondSeqNum);
seqs.add(thirdSeqNum);
q.ack(seqs);
q.ack(secondSeqNum, 2);

assertThat(q.getAckedCount(), equalTo(0L));
assertThat(q.getUnackedCount(), equalTo(0L));
Expand Down Expand Up @@ -834,21 +827,24 @@ public void getsPersistedByteSizeCorrectlyForFullyAckedDeletedTailPages() throws

private void stableUnderStress(final int capacity) throws IOException {
Settings settings = TestSettings.persistedQueueSettings(capacity, dataPath);
final ExecutorService exec = Executors.newScheduledThreadPool(2);
final int concurrent = 2;
final ExecutorService exec = Executors.newScheduledThreadPool(concurrent);
final int count = 20_000;
try (Queue queue = new Queue(settings)) {
final int count = 20_000;
final int concurrent = 2;
queue.open();
final List<Future<Integer>> futures = new ArrayList<>(concurrent);
final AtomicInteger counter = new AtomicInteger(0);
for (int c = 0; c < concurrent; ++c) {
futures.add(exec.submit(() -> {
int i = 0;
try {
while (i < count / concurrent) {
final Batch batch = queue.readBatch(1, TimeUnit.SECONDS.toMillis(1));
if (batch != null) {
while (counter.get() < count) {
try (final Batch batch = queue.readBatch(
50, TimeUnit.SECONDS.toMillis(1L))
) {
for (final Queueable elem : batch.getElements()) {
if (elem != null) {
counter.incrementAndGet();
++i;
}
}
Expand All @@ -860,9 +856,9 @@ private void stableUnderStress(final int capacity) throws IOException {
}
}));
}
final Queueable evnt = new StringElement("foo");
for (int i = 0; i < count; ++i) {
try {
final Queueable evnt = new StringElement("foo");
queue.write(evnt);
} catch (final IOException ex) {
throw new IllegalStateException(ex);
Expand All @@ -871,13 +867,14 @@ private void stableUnderStress(final int capacity) throws IOException {
assertThat(
futures.stream().map(i -> {
try {
return i.get(2L, TimeUnit.MINUTES);
return i.get(5L, TimeUnit.MINUTES);
} catch (final InterruptedException | ExecutionException | TimeoutException ex) {
throw new IllegalStateException(ex);
}
}).reduce((x, y) -> x + y).orElse(0),
is(20_000)
is(count)
);
assertThat(queue.isFullyAcked(), is(true));
}
}

Expand Down Expand Up @@ -914,7 +911,7 @@ public void testZeroByteFullyAckedPageOnOpen() throws IOException {
Queueable element2 = new StringElement("9876543210");

// write 2 elements to force a new page.
q.write(element1);
final long firstSeq = q.write(element1);
q.write(element2);
assertThat(q.tailPages.size(), is(1));

Expand All @@ -923,7 +920,7 @@ public void testZeroByteFullyAckedPageOnOpen() throws IOException {
Page tp = q.tailPages.get(0);
Batch b = new Batch(tp.read(1), q);
assertThat(b.getElements().get(0), is(element1));
tp.ack(b.getSeqNums(), 1);
tp.ack(firstSeq, 1, 1);
assertThat(tp.isFullyAcked(), is(true));

}
Expand Down Expand Up @@ -988,7 +985,7 @@ public void pageCapacityChangeOnExistingQueue() throws IOException {


@Test(timeout = 5000)
public void maximizeBatch() throws IOException, InterruptedException, ExecutionException {
public void maximizeBatch() throws IOException {

// very small pages to maximize page creation
Settings settings = TestSettings.persistedQueueSettings(1000, dataPath);
Expand Down

0 comments on commit 91a23d5

Please sign in to comment.