Skip to content

Commit

Permalink
remove map file strict capacity constraint, see elastic#7581
Browse files Browse the repository at this point in the history
tests for capacity chage and page and queue level

remove dead STRICT_CAPACITY and remove unused @param comment
  • Loading branch information
colinsurprenant committed Nov 13, 2017
1 parent 653cd34 commit 02bd423
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static class PageIOInvalidVersionException extends IOException {
public static final int WRAPPER_SIZE = HEADER_SIZE + SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;

public static final boolean VERIFY_CHECKSUM = true;
public static final boolean STRICT_CAPACITY = true;

private static final Logger logger = LogManager.getLogger(AbstractByteBufferPageIO.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,28 @@ public MmapPageIO(int pageNum, int capacity, String dirPath) {

@Override
public void open(long minSeqNum, int elementCount) throws IOException {
mapFile(STRICT_CAPACITY);
mapFile();
super.open(minSeqNum, elementCount);
}

// recover will overwrite/update/set this object minSeqNum, capacity and elementCount attributes
// to reflect what it recovered from the page
@Override
public void recover() throws IOException {
mapFile(!STRICT_CAPACITY);
mapFile();
super.recover();
}

// memory map data file to this.buffer and read initial version byte
// @param strictCapacity if true verify that data file size is same as configured page capacity, if false update page capacity to actual file size
private void mapFile(boolean strictCapacity) throws IOException {
private void mapFile() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw");

if (raf.length() > Integer.MAX_VALUE) {
throw new IOException("Page file too large " + this.file);
}
int pageFileCapacity = (int)raf.length();

if (strictCapacity && this.capacity != pageFileCapacity) {
throw new IOException("Page file size " + pageFileCapacity + " different to configured page capacity " + this.capacity + " for " + this.file);
}

// update capacity to actual raf length
// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;

if (this.capacity < MIN_CAPACITY) { throw new IOException(String.format("Page file size is too small to hold elements")); }
Expand Down
38 changes: 38 additions & 0 deletions logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -891,4 +891,42 @@ public void testZeroByteFullyAckedPageOnOpen() throws IOException {
}
}

@Test
public void pageCapacityChangeOnExistingQueue() throws IOException {
final Queueable element = new StringElement("foobarbaz1");
final int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
final int ORIGINAL_CAPACITY = 2 * singleElementCapacity;
final int NEW_CAPACITY = 10 * singleElementCapacity;

try (Queue q = new Queue(TestSettings.persistedQueueSettings(ORIGINAL_CAPACITY, dataPath))) {
q.open();
q.write(element);
}

try (Queue q = new Queue(TestSettings.persistedQueueSettings(NEW_CAPACITY, dataPath))) {
q.open();
assertThat(q.tailPages.get(0).getPageIO().getCapacity(), is(ORIGINAL_CAPACITY));
assertThat(q.headPage.getPageIO().getCapacity(), is(NEW_CAPACITY));
q.write(element);
}

try (Queue q = new Queue(TestSettings.persistedQueueSettings(NEW_CAPACITY, dataPath))) {
q.open();
assertThat(q.tailPages.get(0).getPageIO().getCapacity(), is(ORIGINAL_CAPACITY));
assertThat(q.tailPages.get(1).getPageIO().getCapacity(), is(NEW_CAPACITY));
assertThat(q.headPage.getPageIO().getCapacity(), is(NEW_CAPACITY));

// will read only within a page boundary
Batch b1 = q.readBatch( 10);
assertThat(b1.size(), is(1));
b1.close();

// will read only within a page boundary
Batch b2 = q.readBatch( 10);
assertThat(b2.size(), is(1));
b2.close();

assertThat(q.tailPages.size(), is(0));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.logstash.ackedqueue.io;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIO;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;


public class MmapPageIOTest {
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

private String dir;

@Before
public void setUp() throws Exception {
dir = temporaryFolder.newFolder().getPath();
}

@Test
public void adjustToExistingCapacity() throws IOException {
final int ORIGINAL_CAPACITY = 1024;
final int NEW_CAPACITY = 2048;
final int PAGE_NUM = 0;

try (PageIO io1 = new MmapPageIO(PAGE_NUM, ORIGINAL_CAPACITY, dir)) {
io1.create();
}

try (PageIO io2 = new MmapPageIO(PAGE_NUM, NEW_CAPACITY, dir)) {
io2.open(0, PAGE_NUM);
assertThat(io2.getCapacity(), is(equalTo(ORIGINAL_CAPACITY)));
}
}
}

0 comments on commit 02bd423

Please sign in to comment.