Skip to content

Commit

Permalink
add compression back; make page size configurable; rename children_co…
Browse files Browse the repository at this point in the history
…unt to num_children
  • Loading branch information
julienledem committed Feb 11, 2013
1 parent 867e24b commit 9c7e5f5
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 64 deletions.
2 changes: 1 addition & 1 deletion redelm-column/src/main/java/redelm/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class Log {
/**
* this is the compile time log level
*/
public static final Level LEVEL = Level.FINEST;
public static final Level LEVEL = Level.INFO;

public static final boolean DEBUG = (LEVEL.intValue() <= Level.FINE.intValue());
public static final boolean INFO = (LEVEL.intValue() <= Level.INFO.intValue());
Expand Down
5 changes: 5 additions & 0 deletions redelm-pig/src/main/java/redelm/hadoop/CodecFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public CodecFactory(Configuration configuration) {
this.configuration = configuration;
}

/**
*
* @param codecName the requested codec
* @return the corresponding hadoop codec. null if UNCOMPRESSED
*/
public CompressionCodec getCodec(CompressionCodecName codecName) {
String codecClassName = codecName.getHadoopCompressionCodecClass();
if (codecClassName == null) {
Expand Down
24 changes: 9 additions & 15 deletions redelm-pig/src/main/java/redelm/hadoop/RedelmFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import redelm.Log;
import redelm.bytes.BytesInput;
import redelm.bytes.BytesUtils;
import redelm.column.ColumnDescriptor;
import redelm.hadoop.metadata.BlockMetaData;
Expand Down Expand Up @@ -67,6 +67,7 @@ public class RedelmFileWriter {
private int currentRecordCount;
private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
private long uncompressedLength;
private long compressedLength;
private final RedFileMetadataConverter metadataConverter = new RedFileMetadataConverter();

/**
Expand Down Expand Up @@ -178,26 +179,18 @@ public void startColumn(ColumnDescriptor descriptor, int valueCount, Compression
*/
public void writeDataPage(
int valueCount, int uncompressedPageSize,
byte[] bytes, int offset, int length) throws IOException {
BytesInput bytes) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write data page: " + valueCount + " values");
// codec = codecFactory.getCodec(currentColumn.getCodec());
// compressor = CodecPool.getCompressor(codec);
// cos = codec.createOutputStream(compressedOut, compressor);
// uncompressedLength = 0;
// cos.write(data, offset, length);
// cos.finish();
// cos = null;
// CodecPool.returnCompressor(compressor);
// compressor = null;
// codec = null;
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedPageSize, length);
int compressedPageSize = (int)bytes.size();
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedPageSize, compressedPageSize);
// pageHeader.crc = ...;
pageHeader.data_page = new DataPageHeader(valueCount, Encoding.PLAIN); // TODO: encoding
metadataConverter.writePageHeader(pageHeader, out);
this.uncompressedLength += uncompressedPageSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data page content "+length);
out.write(bytes, offset, length);
this.compressedLength += compressedPageSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
bytes.writeAllTo(out);
}

/**
Expand All @@ -208,6 +201,7 @@ public void endColumn() throws IOException {
state = state.endColumn();
if (DEBUG) LOG.debug(out.getPos() + ": end column");
currentColumn.setTotalUncompressedSize(uncompressedLength);
currentColumn.setTotalSize(compressedLength);
currentBlock.addColumn(currentColumn);
if (INFO) LOG.info(currentColumn);
currentColumn = null;
Expand Down
50 changes: 40 additions & 10 deletions redelm-pig/src/main/java/redelm/hadoop/RedelmOutputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static redelm.Log.INFO;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import redelm.Log;
Expand All @@ -42,16 +41,21 @@
* it requires the schema of the incoming records
* it allows storing extra metadata in the footer (for example: for schema compatibility purpose when converting from a different schema language)
*
* data is compressed according to the job conf (per block per column):
* format config controlled in job conf settings:
* <pre>
* redelm.block.size=52428800 # in bytes, default = 50 * 1024 * 1024
* redelm.page.size=8000 # in bytes, default = 8 * 1024
* redelm.compression=UNCOMPRESSED # one of: UNCOMPRESSED, SNAPPY, GZIP, LZO. Default: UNCOMPRESSED. Supersedes mapred.output.compress*
* </pre>
*
* If redelm.compression is not set, the following properties are checked (FileOutputFormat behavior)
* <pre>
* mapred.output.compress=true
* mapred.output.compression.codec=org.apache.hadoop.io.compress.SomeCodec
* </pre>
*
* block size is controlled in job conf settings:
* <pre>
* redelm.block.size=52428800 # in bytes, default = 50 * 1024 * 1024
*</pre>
* if none of those is set the data is uncompressed.
*
* @author Julien Le Dem
*
* @param <T> the type of the materialized records
Expand All @@ -60,15 +64,37 @@ public class RedelmOutputFormat<T> extends FileOutputFormat<Void, T> {
private static final Log LOG = Log.getLog(RedelmOutputFormat.class);

public static final String BLOCK_SIZE = "redelm.block.size";
public static final String PAGE_SIZE = "redelm.page.size";
public static final String COMPRESSION = "redelm.compression";

public static void setBlockSize(Job job, int blockSize) {
job.getConfiguration().setInt(BLOCK_SIZE, blockSize);
}

public static void setPageSize(Job job, int pageSize) {
job.getConfiguration().setInt(PAGE_SIZE, pageSize);
}

public static void setCompression(Job job, CompressionCodecName compression) {
job.getConfiguration().set(COMPRESSION, compression.name());
}

public static int getBlockSize(JobContext jobContext) {
return jobContext.getConfiguration().getInt(BLOCK_SIZE, 50*1024*1024);
}

public static int getPageSize(JobContext jobContext) {
return jobContext.getConfiguration().getInt(PAGE_SIZE, 8*1024);
}

public static CompressionCodecName getCompression(JobContext jobContext) {
return CompressionCodecName.fromConf(jobContext.getConfiguration().get(COMPRESSION, CompressionCodecName.UNCOMPRESSED.name()));
}

public static boolean isCompressionSet(JobContext jobContext) {
return jobContext.getConfiguration().get(COMPRESSION) != null;
}

private final MessageType schema;
private Class<?> writeSupportClass;

Expand Down Expand Up @@ -96,28 +122,32 @@ public <S extends WriteSupport<T>> RedelmOutputFormat(Class<S> writeSupportClass
public RecordWriter<Void, T> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
final Configuration conf = taskAttemptContext.getConfiguration();

CodecFactory codecFactory = new CodecFactory(conf);
int blockSize = getBlockSize(taskAttemptContext);
if (INFO) LOG.info("RedElm block size to " + blockSize);
int pageSize = getPageSize(taskAttemptContext);
if (INFO) LOG.info("RedElm page size to " + pageSize);

String extension = ".redelm";
CompressionCodecName codec;
if (getCompressOutput(taskAttemptContext)) {
if (isCompressionSet(taskAttemptContext)) { // explicit redelm config
codec = getCompression(taskAttemptContext);
} else if (getCompressOutput(taskAttemptContext)) { // from hadoop config
// find the right codec
Class<?> codecClass = getOutputCompressorClass(taskAttemptContext, DefaultCodec.class);
if (INFO) LOG.info("Compression codec: " + codecClass.getName());
codec = CompressionCodecName.fromCompressionCodec(codecClass);
extension += codec.getExtension();
} else {
if (INFO) LOG.info("Compression set to false");
codec = CompressionCodecName.UNCOMPRESSED;
}
extension += codec.getExtension();
final Path file = getDefaultWorkFile(taskAttemptContext, extension);

final RedelmFileWriter w = new RedelmFileWriter(conf, schema, file);
w.start();
try {
return new RedelmRecordWriter<T>(w, (WriteSupport<T>) writeSupportClass.newInstance(), schema, extraMetaData, blockSize, codec);
return new RedelmRecordWriter<T>(w, (WriteSupport<T>) writeSupportClass.newInstance(), schema, extraMetaData, blockSize, pageSize, codec, codecFactory.getCodec(codec));
} catch (InstantiationException e) {
throw new RuntimeException("could not instantiate " + writeSupportClass.getName(), e);
} catch (IllegalAccessException e) {
Expand Down
59 changes: 41 additions & 18 deletions redelm-pig/src/main/java/redelm/hadoop/RedelmRecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,28 @@
package redelm.hadoop;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import redelm.bytes.BytesInput;
import redelm.column.ColumnDescriptor;
import redelm.column.ColumnWriter;
import redelm.column.mem.MemColumn;
import redelm.column.mem.MemColumnsStore;
import redelm.column.mem.MemPageStore;
import redelm.column.mem.Page;
import redelm.column.mem.PageReader;
import redelm.column.mem.PageStore;
import redelm.column.mem.PageWriter;
import redelm.hadoop.metadata.CompressionCodecName;
import redelm.io.ColumnIOFactory;
import redelm.io.MessageColumnIO;
import redelm.schema.MessageType;

import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
* Writes records to a Redelm file
*
Expand All @@ -53,14 +51,19 @@ public class RedelmRecordWriter<T> extends
RecordWriter<Void, T> {

private final RedelmFileWriter w;
private final WriteSupport<T> writeSupport;
private final MessageType schema;
private final Map<String, String> extraMetaData;
private WriteSupport<T> writeSupport;
private int recordCount;
private MemColumnsStore store;
private final int blockSize;
private final CompressionCodecName codec;
private final int pageSize;
private final CompressionCodecName codecName;
private final CompressionCodec codec;
private final Compressor compressor;
private final ByteArrayOutputStream compressedOutBuffer;

private int recordCount;

private MemColumnsStore store;
private MemPageStore pageStore;

/**
Expand All @@ -72,7 +75,7 @@ public class RedelmRecordWriter<T> extends
* @param blockSize the size of a block in the file (this will be approximate)
* @param codec the codec used to compress
*/
RedelmRecordWriter(RedelmFileWriter w, WriteSupport<T> writeSupport, MessageType schema, Map<String, String> extraMetaData, int blockSize, CompressionCodecName codec) {
RedelmRecordWriter(RedelmFileWriter w, WriteSupport<T> writeSupport, MessageType schema, Map<String, String> extraMetaData, int blockSize, int pageSize, CompressionCodecName codecName, CompressionCodec codec) {
if (writeSupport == null) {
throw new NullPointerException("writeSupport");
}
Expand All @@ -81,13 +84,22 @@ public class RedelmRecordWriter<T> extends
this.schema = schema;
this.extraMetaData = extraMetaData;
this.blockSize = blockSize;
this.pageSize = pageSize;
this.codecName = codecName;
this.codec = codec;
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
} else {
this.compressor = null;
this.compressedOutBuffer = null;
}
initStore();
}

private void initStore() {
pageStore = new MemPageStore();
store = new MemColumnsStore(1024 * 1024 * 1, pageStore, 8*1024);
store = new MemColumnsStore(1024 * 1024 / 2, pageStore, pageSize);
//
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
writeSupport.initForWrite(columnIO.getRecordWriter(store), schema, extraMetaData);
Expand All @@ -101,6 +113,7 @@ public void close(TaskAttemptContext taskAttemptContext) throws IOException,
InterruptedException {
flushStore();
w.end(extraMetaData);
CodecPool.returnCompressor(compressor);
}

/**
Expand Down Expand Up @@ -128,13 +141,23 @@ private void flushStore()
for (ColumnDescriptor columnDescriptor : columns) {
PageReader pageReader = pageStore.getPageReader(columnDescriptor);
int totalValueCount = pageReader.getTotalValueCount();
w.startColumn(columnDescriptor, totalValueCount, codec);
w.startColumn(columnDescriptor, totalValueCount, codecName);
int n = 0;
BytesInput compressedBytes;
do {
Page page = pageReader.readPage();
n += page.getValueCount();
// TODO: change INTFC
w.writeDataPage(page.getValueCount(), (int)page.getBytes().size(), page.getBytes().toByteArray(), 0, (int)page.getBytes().size());
long uncompressedSize = page.getBytes().size();
if (codec == null) {
compressedBytes = page.getBytes();
} else {
compressedOutBuffer.reset();
CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
page.getBytes().writeAllTo(cos);
cos.finish();
compressedBytes = BytesInput.from(compressedOutBuffer);
}
w.writeDataPage(page.getValueCount(), (int)uncompressedSize, compressedBytes);
} while (n < totalValueCount);
w.endColumn();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ public enum CompressionCodecName {
GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
LZO("com.hadoop.compression.lzo.LzopCodec", CompressionCodec.LZO, ".lzo");

public static CompressionCodecName fromConf(String name) {
if (name == null) {
return UNCOMPRESSED;
}
return valueOf(name.toUpperCase());
}

public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
if (clazz == null) {
return UNCOMPRESSED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void visit(GroupType groupType) {

private void visitChildren(final List<SchemaElement> result,
GroupType groupType, SchemaElement element) {
element.setChildren_count(groupType.getFieldCount());
element.setNum_children(groupType.getFieldCount());
result.add(element);
for (redelm.schema.Type field : groupType.getFields()) {
addToList(result, field);
Expand All @@ -132,9 +132,8 @@ private void addRowGroup(RedelmMetaData redelmMetadata, List<RowGroup> rowGroups
List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnChunk> redFileColumns = new ArrayList<ColumnChunk>();
for (ColumnChunkMetaData columnMetaData : columns) {
ColumnChunk columnChunk = new ColumnChunk();
ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPage()); // verify this is the right offset
columnChunk.file_path = null; // same file
columnChunk.file_offset = columnMetaData.getFirstDataPage(); // TODO ?
columnChunk.meta_data = new redfile.ColumnMetaData(
getType(columnMetaData.getType()),
Arrays.asList(Encoding.PLAIN), // TODO: deal with encodings
Expand Down Expand Up @@ -252,15 +251,15 @@ public RedelmMetaData fromRedFileMetadata(FileMetaData redFileMetadata) throws I
MessageType fromRedFileSchema(List<SchemaElement> schema) {
Iterator<SchemaElement> iterator = schema.iterator();
SchemaElement root = iterator.next();
return new MessageType(root.getName(), convertChildren(iterator, root.getChildren_count()));
return new MessageType(root.getName(), convertChildren(iterator, root.getNum_children()));
}

private redelm.schema.Type[] convertChildren(Iterator<SchemaElement> schema, int childrenCount) {
redelm.schema.Type[] result = new redelm.schema.Type[childrenCount];
for (int i = 0; i < result.length; i++) {
SchemaElement schemaElement = schema.next();
if ((!schemaElement.isSetType() && !schemaElement.isSetChildren_count())
|| (schemaElement.isSetType() && schemaElement.isSetChildren_count())) {
if ((!schemaElement.isSetType() && !schemaElement.isSetNum_children())
|| (schemaElement.isSetType() && schemaElement.isSetNum_children())) {
throw new RuntimeException("bad element " + schemaElement);
}
Repetition repetition = fromRedFileRepetition(schemaElement.getField_type());
Expand All @@ -274,7 +273,7 @@ private redelm.schema.Type[] convertChildren(Iterator<SchemaElement> schema, int
result[i] = new GroupType(
repetition,
name,
convertChildren(schema, schemaElement.getChildren_count()));
convertChildren(schema, schemaElement.getNum_children()));
}
}
return result;
Expand Down
Loading

0 comments on commit 9c7e5f5

Please sign in to comment.