diff --git a/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogAppender.java b/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogAppender.java index 8982aec..65cffb2 100644 --- a/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogAppender.java +++ b/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogAppender.java @@ -21,25 +21,67 @@ public final class RecordLogAppender extends GenericRecordLogAppender keyComparator; + /** + * @param file record log directory + * @param keySerializer key serializer + * @param valueSerializer value serializer + * @param keyComparator key comparator + * @param codec compression codec + * @throws IOException + */ public RecordLogAppender(File file, Serializer keySerializer, Serializer valueSerializer, Comparator keyComparator, CompressionCodec codec) throws IOException { this(file, keySerializer, valueSerializer, keyComparator, codec, null); } + /** + * @param file record log directory + * @param keySerializer key serializer + * @param valueSerializer value serializer + * @param keyComparator key comparator + * @param codec compression codec + * @param metadataRef atomic reference to a metadata object, if nonnull after construction it will contain a reference to previous flushed metadata if it exists + * @throws IOException + */ public RecordLogAppender(File file, Serializer keySerializer, Serializer valueSerializer, Comparator keyComparator, CompressionCodec codec, AtomicReference> metadataRef) throws IOException { this(file, keySerializer, valueSerializer, new CollectionSerializer(keySerializer), keyComparator, codec, metadataRef); } + /** + * @param file record log directory + * @param keySerializer key serializer + * @param valueSerializer value serializer + * @param keyCollectionSerializer key collection serializer + * @param keyComparator key comparator + * @param codec compression codec + * @throws IOException + */ public RecordLogAppender(File file, Serializer keySerializer, Serializer valueSerializer, Serializer> keyCollectionSerializer, Comparator keyComparator, CompressionCodec codec) throws IOException { this(file, keySerializer, valueSerializer, keyCollectionSerializer, keyComparator, codec, null); } + /** + * @param file record log directory + * @param keySerializer key serializer + * @param valueSerializer value serializer + * @param keyCollectionSerializer key collection serializer + * @param keyComparator key comparator + * @param codec compression codec + * @param metadataRef atomic reference to a metadata object, if nonnull after construction it will contain a reference to previous flushed metadata if it exists + * @throws IOException + */ public RecordLogAppender(File file, Serializer keySerializer, Serializer valueSerializer, Serializer> keyCollectionSerializer, Comparator keyComparator, CompressionCodec codec, AtomicReference> metadataRef) throws IOException { super(file, new OperationSerializer(keySerializer, valueSerializer, keyCollectionSerializer), codec, metadataRef); this.keyComparator = keyComparator; } + /** + * Marks keys as deleted. + * + * @param ids keys to delete + * @throws IOException + */ @Override public void deleteDocs(final Collection ids) throws IOException { ArrayList sorted = new ArrayList(ids); @@ -47,6 +89,14 @@ public void deleteDocs(final Collection ids) throws IOException { writeOperation(new Delete(sorted)); } + /** + * Writes a new key/value entry. Can be used to overwrite previous existing values. + * + * @param key key to write + * @param value value to write + * @return position written + * @throws IOException + */ @Override public long write(final K key, final V value) throws IOException { return writeOperation(new Put(key, P.p(value))); diff --git a/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogDirectoryPoller.java b/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogDirectoryPoller.java index 2fd63f1..ea08d2c 100644 --- a/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogDirectoryPoller.java +++ b/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogDirectoryPoller.java @@ -37,11 +37,28 @@ public RecordLogDirectoryPoller(final RecordLogDirectory recordLogDir super(recordLogDirectory, checkpointer, loop); } + /** + * @param recordLogDirectory record log directory + * @param checkpointer checkpointer + * @param loop If true, poller will continually poll for new record logs. If false only polls once. + * @param gc If true, poller will delete record logs up to but excluding the most recently read one. + * @throws IOException + */ public RecordLogDirectoryPoller(final RecordLogDirectory recordLogDirectory, final Checkpointer checkpointer, final boolean loop, final boolean gc) throws IOException { super(recordLogDirectory, checkpointer, loop, gc); } + /** + * Callback interface for processing record logs. + */ public interface Functions extends GenericRecordLogDirectoryPoller.Functions { + /** + * Called once for each operation in a record log. + * + * @param position + * @param op + * @throws IOException + */ void process(long position, Operation op) throws IOException; } diff --git a/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogStore.java b/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogStore.java index d95b3a9..8136f16 100644 --- a/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogStore.java +++ b/recordcache/src/main/java/com/indeed/lsmtree/recordcache/RecordLogStore.java @@ -7,7 +7,6 @@ import com.indeed.lsmtree.recordlog.RecordLogDirectory; /** - * todo need better name because it's not a store itself * @author jchien */ public class RecordLogStore implements Closeable { diff --git a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BasicRecordFile.java b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BasicRecordFile.java index 626e460..252a5d4 100644 --- a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BasicRecordFile.java +++ b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BasicRecordFile.java @@ -152,9 +152,9 @@ public Writer(File file, Serializer serializer) throws FileNotFoundException } @Override - public long append(final E e) throws IOException { + public long append(final E entry) throws IOException { UnsafeByteArrayOutputStream bytes = new UnsafeByteArrayOutputStream(); - serializer.write(e, new DataOutputStream(bytes)); + serializer.write(entry, new DataOutputStream(bytes)); final long start = out.position(); out.writeInt(bytes.size()); final CRC32 checksum = new CRC32(); diff --git a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BlockCompressedRecordFile.java b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BlockCompressedRecordFile.java index 677caa5..fbbe923 100644 --- a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BlockCompressedRecordFile.java +++ b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/BlockCompressedRecordFile.java @@ -184,12 +184,12 @@ public Writer(SyncableDataOutput out, Serializer serializer, CompressionCodec this.blockSize = blockSize; } - public long append(final E e) throws IOException { + public long append(final E entry) throws IOException { if ((currentBlockBytes.size() >= blockSize && numRecords > 0) || numRecords == lengthBuffer.length) { flushBuffer(); } final int start = currentBlockBytes.size(); - serializer.write(e, currentBlockOut); + serializer.write(entry, currentBlockOut); final int length = (currentBlockBytes.size()-start); lengthBuffer[numRecords] = length; final long ret = blockAddress+numRecords; diff --git a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/GenericRecordLogAppender.java b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/GenericRecordLogAppender.java index 13e4ecd..fc21abf 100644 --- a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/GenericRecordLogAppender.java +++ b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/GenericRecordLogAppender.java @@ -13,6 +13,7 @@ import org.codehaus.jackson.map.SerializationConfig; import org.joda.time.DateTime; +import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -46,11 +47,17 @@ public class GenericRecordLogAppender { private final ObjectMapper mapper; - public GenericRecordLogAppender(File file, Serializer serializer, CompressionCodec codec, AtomicReference> metadataRef) throws IOException { + public GenericRecordLogAppender(File file, + Serializer serializer, + CompressionCodec codec, + AtomicReference> metadataRef) throws IOException { this(file, serializer, codec, metadataRef, Long.MAX_VALUE); } - public GenericRecordLogAppender(File file, Serializer serializer, CompressionCodec codec, AtomicReference> metadataRef, + public GenericRecordLogAppender(File file, + Serializer serializer, + CompressionCodec codec, + AtomicReference> metadataRef, long rollFrequency) throws IOException { mapper = new ObjectMapper(); mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); @@ -117,7 +124,7 @@ protected long writeOperation(final T op) throws IOException { return lastPosition; } - public synchronized void flushWriter(Map metadata) throws IOException { + public synchronized void flushWriter(@Nonnull Map metadata) throws IOException { writer.roll(); maxSegment = (int)(lastPosition >>> (64-RecordLogDirectory.DEFAULT_FILE_INDEX_BITS)); metadata.put(LAST_POSITION_KEY, String.valueOf(lastPosition)); diff --git a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordFile.java b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordFile.java index 66d58ab..a47e955 100644 --- a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordFile.java +++ b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordFile.java @@ -9,17 +9,54 @@ * @author jplaisance */ public interface RecordFile extends Closeable { + /** + * @param address position + * @return value at address + * @throws IOException + */ public E get(long address) throws IOException; + + /** + * @return reader starting at address 0 + * @throws IOException + */ public Reader reader() throws IOException; + + /** + * @param address position + * @return reader seeked to provided address + * @throws IOException + */ public Reader reader(long address) throws IOException; public interface Writer extends Closeable, Syncable { - public long append(E e) throws IOException; + /** + * Appends entry to the file + * + * @param entry entry to write + * @return address written to + * @throws IOException + */ + public long append(E entry) throws IOException; } public interface Reader extends Closeable { + /** + * Seeks to next entry + * + * @return true if an entry exists + * @throws IOException + */ public boolean next() throws IOException; + + /** + * @return position of current entry + */ public long getPosition(); + + /** + * @return value of current entry + */ public E get(); } } diff --git a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordLogDirectory.java b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordLogDirectory.java index af5e770..b1f8ae0 100644 --- a/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordLogDirectory.java +++ b/recordlog/src/main/java/com/indeed/lsmtree/recordlog/RecordLogDirectory.java @@ -121,11 +121,11 @@ private RecordFile.Writer createWriter(int segmentNum) throws IOException { } @Override - public long append(final E e) throws IOException { + public long append(final E entry) throws IOException { if (System.currentTimeMillis()-lastRollTime > rollFrequency) { roll(); } - final long writerAddress = currentWriter.append(e); + final long writerAddress = currentWriter.append(entry); if (writerAddress >= 1L<< segmentShift) throw new IOException("current writer has exceeded maximum size"); return (((long)currentSegmentNum)<< segmentShift)+writerAddress; }