Skip to content

Commit

Permalink
elastic#7104 Fix Queue locking up on pagesize == queuesize
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Oct 21, 2017
1 parent 8ebc081 commit 751d5b4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 18 deletions.
25 changes: 20 additions & 5 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


// TODO: Notes
//
// - time-based fsync
Expand Down Expand Up @@ -436,11 +435,27 @@ public long write(Queueable element) throws IOException {
}
}

// @return true if the queue is deemed at full capacity
/**
* <p>Checks if the Queue is full, with "full" defined as either of:</p>
* <p>Assuming a maximum size of the queue larger than 0 is defined:</p>
* <ul>
* <li>The sum of the size of all allocated pages is more than the allowed maximum Queue
* size</li>
* <li>The sum of the size of all allocated pages equal to the allowed maximum Queue size
* and the current head page has no remaining capacity.</li>
* </ul>
* <p>or assuming a max unread count larger than 0, is defined "full" is also defined as:</p>
* <ul>
* <li>The current number of unread events exceeds or is equal to the configured maximum
* number of allowed unread events.</li>
* </ul>
* @return True iff the queue is full
*/
public boolean isFull() {
// TODO: I am not sure if having unreadCount as volatile is sufficient here. all unreadCount updates are done inside synchronized
// TODO: sections, I believe that to only read the value here, having it as volatile is sufficient?
if ((this.maxBytes > 0) && this.currentByteSize >= this.maxBytes) {
if (this.maxBytes > 0L && (
this.currentByteSize > this.maxBytes
|| this.currentByteSize == this.maxBytes && !headPage.hasSpace(1)
)) {
return true;
} else {
return ((this.maxUnread > 0) && this.unreadCount >= this.maxUnread);
Expand Down
49 changes: 36 additions & 13 deletions logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ public void writeToFullyAckedHeadpage() throws IOException {
}
}

/**
* This test ensures that the {@link Queue} functions properly when pagesize is equal to overall
* queue size (i.e. there is only a single page).
* @throws IOException On Failure
*/
@Test(timeout = 5000)
public void writeWhenPageEqualsQueueSize() throws IOException {
final Queueable element = new StringElement("foobarbaz");
// Queue that can only hold one element per page.
try (Queue q = new TestQueue(
TestSettings.volatileQueueSettings(1024, 1024L))) {
q.open();
for (int i = 0; i < 3; ++i) {
q.write(element);
try (Batch b = q.readBatch(1, 500L)) {
assertThat(b.getElements().size(), is(1));
assertThat(b.getElements().get(0).toString(), is(element.toString()));
}
}
assertThat(q.nonBlockReadBatch(1), nullValue());
}
}

@Test
public void singleWriteMultiRead() throws IOException {
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
Expand Down Expand Up @@ -459,7 +482,7 @@ public void reachMaxUnreadWithAcking() throws IOException, InterruptedException,
}

@Test(timeout = 5000)
public void reachMaxSizeTest() throws IOException, InterruptedException, ExecutionException {
public void reachMaxSizeTest() throws IOException, InterruptedException {
Queueable element = new StringElement("0123456789"); // 10 bytes

int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
Expand All @@ -469,8 +492,8 @@ public void reachMaxSizeTest() throws IOException, InterruptedException, Executi
try (TestQueue q = new TestQueue(settings)) {
q.open();

int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
for (int i = 0; i < ELEMENT_COUNT; i++) {
int elementCount = 99; // should be able to write 99 events before getting full
for (int i = 0; i < elementCount; i++) {
q.write(element);
}

Expand All @@ -496,9 +519,9 @@ public void ackingMakesQueueNotFullAgainTest() throws IOException, InterruptedEx
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
try (TestQueue q = new TestQueue(settings)) {
q.open();
// should be able to write 90 events (9 pages) before getting full
final long ELEMENT_COUNT = 90;
for (int i = 0; i < ELEMENT_COUNT; i++) {
// should be able to write 90 + 9 events (9 pages + 1 head-page) before getting full
final long elementCount = 99;
for (int i = 0; i < elementCount; i++) {
q.write(element);
}
assertThat(q.isFull(), is(false));
Expand All @@ -518,7 +541,7 @@ public void ackingMakesQueueNotFullAgainTest() throws IOException, InterruptedEx
while (q.isFull()) { Thread.sleep(10); }
assertThat(q.isFull(), is(false));

assertThat(future.get(), is(ELEMENT_COUNT + 1));
assertThat(future.get(), is(elementCount + 1));
}
}

Expand All @@ -532,9 +555,9 @@ public void resumeWriteOnNoLongerFullQueueTest() throws IOException, Interrupted
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
try (TestQueue q = new TestQueue(settings)) {
q.open();
int ELEMENT_COUNT =
90; // should be able to write 90 events (9 pages) before getting full
for (int i = 0; i < ELEMENT_COUNT; i++) {
// should be able to write 90 + 9 events (9 pages + 1 head-page) before getting full
int elementCount = 99;
for (int i = 0; i < elementCount; i++) {
q.write(element);
}

Expand All @@ -555,12 +578,12 @@ public void resumeWriteOnNoLongerFullQueueTest() throws IOException, Interrupted

b.close(); // purge 1 page

assertThat(future.get(), is(ELEMENT_COUNT + 1L));
assertThat(future.get(), is(elementCount + 1L));
}
}

@Test(timeout = 5000)
public void queueStillFullAfterPartialPageAckTest() throws IOException, InterruptedException, ExecutionException {
public void queueStillFullAfterPartialPageAckTest() throws IOException, InterruptedException {

Queueable element = new StringElement("0123456789"); // 10 bytes

Expand All @@ -571,7 +594,7 @@ public void queueStillFullAfterPartialPageAckTest() throws IOException, Interrup
try (TestQueue q = new TestQueue(settings)) {
q.open();

int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
int ELEMENT_COUNT = 99; // should be able to write 99 events before getting full
for (int i = 0; i < ELEMENT_COUNT; i++) {
q.write(element);
}
Expand Down

0 comments on commit 751d5b4

Please sign in to comment.