Skip to content

Commit

Permalink
optimize checksum computation to make sure we only work on buffers an…
Browse files Browse the repository at this point in the history
…d no on single bytes
  • Loading branch information
kimchy committed Dec 17, 2011
1 parent a3ca1af commit dd87bec
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 22 deletions.
151 changes: 151 additions & 0 deletions src/main/java/org/apache/lucene/store/OpenBufferedIndexOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package org.apache.lucene.store;

import java.io.IOException;

/**
* Exactly the same as Lucene {@link BufferedIndexOutput} but with the ability to set the buffer size
*/
// LUCENE MONITOR
public abstract class OpenBufferedIndexOutput extends IndexOutput {

public static final int DEFAULT_BUFFER_SIZE = BufferedIndexOutput.BUFFER_SIZE;

final int BUFFER_SIZE;

private final byte[] buffer;
private long bufferStart = 0; // position in file of buffer
private int bufferPosition = 0; // position in buffer

protected OpenBufferedIndexOutput(int BUFFER_SIZE) {
this.BUFFER_SIZE = BUFFER_SIZE;
this.buffer = new byte[BUFFER_SIZE];
}

/**
* Writes a single byte.
*
* @see IndexInput#readByte()
*/
@Override
public void writeByte(byte b) throws IOException {
if (bufferPosition >= BUFFER_SIZE)
flush();
buffer[bufferPosition++] = b;
}

/**
* Writes an array of bytes.
*
* @param b the bytes to write
* @param length the number of bytes to write
* @see IndexInput#readBytes(byte[], int, int)
*/
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
int bytesLeft = BUFFER_SIZE - bufferPosition;
// is there enough space in the buffer?
if (bytesLeft >= length) {
// we add the data to the end of the buffer
System.arraycopy(b, offset, buffer, bufferPosition, length);
bufferPosition += length;
// if the buffer is full, flush it
if (BUFFER_SIZE - bufferPosition == 0)
flush();
} else {
// is data larger then buffer?
if (length > BUFFER_SIZE) {
// we flush the buffer
if (bufferPosition > 0)
flush();
// and write data at once
flushBuffer(b, offset, length);
bufferStart += length;
} else {
// we fill/flush the buffer (until the input is written)
int pos = 0; // position in the input data
int pieceLength;
while (pos < length) {
pieceLength = (length - pos < bytesLeft) ? length - pos : bytesLeft;
System.arraycopy(b, pos + offset, buffer, bufferPosition, pieceLength);
pos += pieceLength;
bufferPosition += pieceLength;
// if the buffer is full, flush it
bytesLeft = BUFFER_SIZE - bufferPosition;
if (bytesLeft == 0) {
flush();
bytesLeft = BUFFER_SIZE;
}
}
}
}
}

/**
* Forces any buffered output to be written.
*/
@Override
public void flush() throws IOException {
flushBuffer(buffer, bufferPosition);
bufferStart += bufferPosition;
bufferPosition = 0;
}

/**
* Expert: implements buffer write. Writes bytes at the current position in
* the output.
*
* @param b the bytes to write
* @param len the number of bytes to write
*/
private void flushBuffer(byte[] b, int len) throws IOException {
flushBuffer(b, 0, len);
}

/**
* Expert: implements buffer write. Writes bytes at the current position in
* the output.
*
* @param b the bytes to write
* @param offset the offset in the byte array
* @param len the number of bytes to write
*/
protected abstract void flushBuffer(byte[] b, int offset, int len) throws IOException;

/**
* Closes this stream to further operations.
*/
@Override
public void close() throws IOException {
flush();
}

/**
* Returns the current position in this file, where the next write will
* occur.
*
* @see #seek(long)
*/
@Override
public long getFilePointer() {
return bufferStart + bufferPosition;
}

/**
* Sets current position in this file, where the next write will occur.
*
* @see #getFilePointer()
*/
@Override
public void seek(long pos) throws IOException {
flush();
bufferStart = pos;
}

/**
* The number of bytes in the file.
*/
@Override
public abstract long length() throws IOException;


}
32 changes: 10 additions & 22 deletions src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public void forceSync(String name) throws IOException {
}
}

class StoreIndexOutput extends IndexOutput {
class StoreIndexOutput extends OpenBufferedIndexOutput {

private final StoreFileMetaData metaData;

Expand All @@ -535,6 +535,9 @@ class StoreIndexOutput extends IndexOutput {
private final Checksum digest;

StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name, boolean computeChecksum) {
// we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly
// to the output without being copied over to the delegate buffer
super(OpenBufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64);
this.metaData = metaData;
this.delegate = delegate;
this.name = name;
Expand All @@ -559,6 +562,7 @@ class StoreIndexOutput extends IndexOutput {

@Override
public void close() throws IOException {
super.close();
delegate.close();
String checksum = null;
if (digest != null) {
Expand All @@ -572,18 +576,10 @@ public void close() throws IOException {
}

@Override
public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
delegate.writeBytes(b, offset, len);
if (digest != null) {
digest.update(b);
}
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
if (digest != null) {
digest.update(b, offset, length);
digest.update(b, offset, len);
}
}

Expand All @@ -594,19 +590,16 @@ public void writeBytes(byte[] b, int offset, int length) throws IOException {

@Override
public void flush() throws IOException {
super.flush();
delegate.flush();
}

@Override
public long getFilePointer() {
return delegate.getFilePointer();
}

@Override
public void seek(long pos) throws IOException {
// seek might be called on files, which means that the checksum is not file checksum
// but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene
super.seek(pos);
delegate.seek(pos);
}

Expand All @@ -619,10 +612,5 @@ public long length() throws IOException {
public void setLength(long length) throws IOException {
delegate.setLength(length);
}

@Override
public void writeStringStringMap(Map<String, String> map) throws IOException {
delegate.writeStringStringMap(map);
}
}
}

0 comments on commit dd87bec

Please sign in to comment.