Skip to content
This repository has been archived by the owner on Jun 24, 2019. It is now read-only.

Commit

Permalink
OSS-4 add very basic javadocs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Chien committed Apr 22, 2014
1 parent 5f0d640 commit 3fa8501
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,82 @@ public final class RecordLogAppender<K,V> extends GenericRecordLogAppender<Opera

private final Comparator<K> keyComparator;

/**
* @param file record log directory
* @param keySerializer key serializer
* @param valueSerializer value serializer
* @param keyComparator key comparator
* @param codec compression codec
* @throws IOException
*/
public RecordLogAppender(File file, Serializer<K> keySerializer, Serializer<V> valueSerializer, Comparator<K> keyComparator, CompressionCodec codec)
throws IOException {
this(file, keySerializer, valueSerializer, keyComparator, codec, null);
}

/**
* @param file record log directory
* @param keySerializer key serializer
* @param valueSerializer value serializer
* @param keyComparator key comparator
* @param codec compression codec
* @param metadataRef atomic reference to a metadata object, if nonnull after construction it will contain a reference to previous flushed metadata if it exists
* @throws IOException
*/
public RecordLogAppender(File file, Serializer<K> keySerializer, Serializer<V> valueSerializer, Comparator<K> keyComparator, CompressionCodec codec, AtomicReference<Map<String, String>> metadataRef)
throws IOException {
this(file, keySerializer, valueSerializer, new CollectionSerializer<K>(keySerializer), keyComparator, codec, metadataRef);
}

/**
* @param file record log directory
* @param keySerializer key serializer
* @param valueSerializer value serializer
* @param keyCollectionSerializer key collection serializer
* @param keyComparator key comparator
* @param codec compression codec
* @throws IOException
*/
public RecordLogAppender(File file, Serializer<K> keySerializer, Serializer<V> valueSerializer, Serializer<Collection<K>> keyCollectionSerializer, Comparator<K> keyComparator, CompressionCodec codec) throws IOException {
this(file, keySerializer, valueSerializer, keyCollectionSerializer, keyComparator, codec, null);
}

/**
* @param file record log directory
* @param keySerializer key serializer
* @param valueSerializer value serializer
* @param keyCollectionSerializer key collection serializer
* @param keyComparator key comparator
* @param codec compression codec
* @param metadataRef atomic reference to a metadata object, if nonnull after construction it will contain a reference to previous flushed metadata if it exists
* @throws IOException
*/
public RecordLogAppender(File file, Serializer<K> keySerializer, Serializer<V> valueSerializer, Serializer<Collection<K>> keyCollectionSerializer, Comparator<K> keyComparator, CompressionCodec codec, AtomicReference<Map<String, String>> metadataRef) throws IOException {
super(file, new OperationSerializer<K, V>(keySerializer, valueSerializer, keyCollectionSerializer), codec, metadataRef);
this.keyComparator = keyComparator;
}

/**
* Marks keys as deleted.
*
* @param ids keys to delete
* @throws IOException
*/
@Override
public void deleteDocs(final Collection<K> ids) throws IOException {
ArrayList<K> sorted = new ArrayList<K>(ids);
Collections.sort(sorted, keyComparator);
writeOperation(new Delete(sorted));
}

/**
* Writes a new key/value entry. Can be used to overwrite previous existing values.
*
* @param key key to write
* @param value value to write
* @return position written
* @throws IOException
*/
@Override
public long write(final K key, final V value) throws IOException {
return writeOperation(new Put<K, V>(key, P.p(value)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,28 @@ public RecordLogDirectoryPoller(final RecordLogDirectory<Operation> recordLogDir
super(recordLogDirectory, checkpointer, loop);
}

/**
* @param recordLogDirectory record log directory
* @param checkpointer checkpointer
* @param loop If true, poller will continually poll for new record logs. If false only polls once.
* @param gc If true, poller will delete record logs up to but excluding the most recently read one.
* @throws IOException
*/
public RecordLogDirectoryPoller(final RecordLogDirectory<Operation> recordLogDirectory, final Checkpointer<Long> checkpointer, final boolean loop, final boolean gc) throws IOException {
super(recordLogDirectory, checkpointer, loop, gc);
}

/**
* Callback interface for processing record logs.
*/
public interface Functions extends GenericRecordLogDirectoryPoller.Functions<Operation> {
/**
* Called once for each operation in a record log.
*
* @param position
* @param op
* @throws IOException
*/
void process(long position, Operation op) throws IOException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.indeed.lsmtree.recordlog.RecordLogDirectory;

/**
* todo need better name because it's not a store itself
* @author jchien
*/
public class RecordLogStore<K, V> implements Closeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ public Writer(File file, Serializer<E> serializer) throws FileNotFoundException
}

@Override
public long append(final E e) throws IOException {
public long append(final E entry) throws IOException {
UnsafeByteArrayOutputStream bytes = new UnsafeByteArrayOutputStream();
serializer.write(e, new DataOutputStream(bytes));
serializer.write(entry, new DataOutputStream(bytes));
final long start = out.position();
out.writeInt(bytes.size());
final CRC32 checksum = new CRC32();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ public Writer(SyncableDataOutput out, Serializer<E> serializer, CompressionCodec
this.blockSize = blockSize;
}

public long append(final E e) throws IOException {
public long append(final E entry) throws IOException {
if ((currentBlockBytes.size() >= blockSize && numRecords > 0) || numRecords == lengthBuffer.length) {
flushBuffer();
}
final int start = currentBlockBytes.size();
serializer.write(e, currentBlockOut);
serializer.write(entry, currentBlockOut);
final int length = (currentBlockBytes.size()-start);
lengthBuffer[numRecords] = length;
final long ret = blockAddress+numRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.codehaus.jackson.map.SerializationConfig;
import org.joda.time.DateTime;

import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
Expand Down Expand Up @@ -46,11 +47,17 @@ public class GenericRecordLogAppender<T> {

private final ObjectMapper mapper;

public GenericRecordLogAppender(File file, Serializer<T> serializer, CompressionCodec codec, AtomicReference<Map<String, String>> metadataRef) throws IOException {
public GenericRecordLogAppender(File file,
Serializer<T> serializer,
CompressionCodec codec,
AtomicReference<Map<String, String>> metadataRef) throws IOException {
this(file, serializer, codec, metadataRef, Long.MAX_VALUE);
}

public GenericRecordLogAppender(File file, Serializer<T> serializer, CompressionCodec codec, AtomicReference<Map<String, String>> metadataRef,
public GenericRecordLogAppender(File file,
Serializer<T> serializer,
CompressionCodec codec,
AtomicReference<Map<String, String>> metadataRef,
long rollFrequency) throws IOException {
mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
Expand Down Expand Up @@ -117,7 +124,7 @@ protected long writeOperation(final T op) throws IOException {
return lastPosition;
}

public synchronized void flushWriter(Map<String, String> metadata) throws IOException {
public synchronized void flushWriter(@Nonnull Map<String, String> metadata) throws IOException {
writer.roll();
maxSegment = (int)(lastPosition >>> (64-RecordLogDirectory.DEFAULT_FILE_INDEX_BITS));
metadata.put(LAST_POSITION_KEY, String.valueOf(lastPosition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,54 @@
* @author jplaisance
*/
public interface RecordFile<E> extends Closeable {
/**
* @param address position
* @return value at address
* @throws IOException
*/
public E get(long address) throws IOException;

/**
* @return reader starting at address 0
* @throws IOException
*/
public Reader<E> reader() throws IOException;

/**
* @param address position
* @return reader seeked to provided address
* @throws IOException
*/
public Reader<E> reader(long address) throws IOException;

public interface Writer<E> extends Closeable, Syncable {
public long append(E e) throws IOException;
/**
* Appends entry to the file
*
* @param entry entry to write
* @return address written to
* @throws IOException
*/
public long append(E entry) throws IOException;
}

public interface Reader<E> extends Closeable {
/**
* Seeks to next entry
*
* @return true if an entry exists
* @throws IOException
*/
public boolean next() throws IOException;

/**
* @return position of current entry
*/
public long getPosition();

/**
* @return value of current entry
*/
public E get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ private RecordFile.Writer createWriter(int segmentNum) throws IOException {
}

@Override
public long append(final E e) throws IOException {
public long append(final E entry) throws IOException {
if (System.currentTimeMillis()-lastRollTime > rollFrequency) {
roll();
}
final long writerAddress = currentWriter.append(e);
final long writerAddress = currentWriter.append(entry);
if (writerAddress >= 1L<< segmentShift) throw new IOException("current writer has exceeded maximum size");
return (((long)currentSegmentNum)<< segmentShift)+writerAddress;
}
Expand Down

0 comments on commit 3fa8501

Please sign in to comment.