Skip to content

Commit

Permalink
populate encodings in column metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
julienledem committed Mar 5, 2013
1 parent ecf19dc commit e8f8429
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
columnChunk.file_path = null; // same file
columnChunk.meta_data = new parquet.format.ColumnMetaData(
getType(columnMetaData.getType()),
Arrays.asList(Encoding.PLAIN), // TODO: deal with encodings
toFormatEncodings(columnMetaData.getEncodings()),
Arrays.asList(columnMetaData.getPath()),
columnMetaData.getCodec().getParquetCompressionCodec(),
columnMetaData.getValueCount(),
Expand All @@ -154,6 +154,22 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
rowGroups.add(rowGroup);
}

private List<Encoding> toFormatEncodings(List<parquet.column.Encoding> encodings) {
List<Encoding> converted = new ArrayList<Encoding>();
for (parquet.column.Encoding encoding : encodings) {
converted.add(getEncoding(encoding));
}
return converted;
}

private List<parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
List<parquet.column.Encoding> converted = new ArrayList<parquet.column.Encoding>();
for (Encoding encoding : encodings) {
converted.add(getEncoding(encoding));
}
return converted;
}

public parquet.column.Encoding getEncoding(Encoding encoding) {
switch (encoding) {
case PLAIN:
Expand Down Expand Up @@ -244,13 +260,16 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
for (ColumnChunk columnChunk : columns) {
parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
ColumnChunkMetaData column = new ColumnChunkMetaData(path, messageType.getType(path).asPrimitiveType().getPrimitiveTypeName(), CompressionCodecName.fromParquet(metaData.codec));
ColumnChunkMetaData column = new ColumnChunkMetaData(
path,
messageType.getType(path).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
fromFormatEncodings(metaData.encodings));
column.setFirstDataPage(metaData.data_page_offset);
column.setValueCount(metaData.num_values);
column.setTotalUncompressedSize(metaData.total_uncompressed_size);
column.setTotalSize(metaData.total_compressed_size);
// TODO
// encodings
// index_page_offset
// key_value_metadata
blockMetaData.addColumn(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
package parquet.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import parquet.bytes.BytesInput;
import parquet.bytes.CapacityByteArrayOutputStream;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.column.mem.PageWriteStore;
import parquet.column.mem.PageWriter;
import parquet.format.DataPageHeader;
import parquet.format.Encoding;
import parquet.format.PageHeader;
import parquet.format.PageType;
import parquet.format.converter.ParquetMetadataConverter;
Expand All @@ -48,14 +51,16 @@ private static final class ColumnChunkPageWriter implements PageWriter {
private long compressedLength;
private long totalValueCount;

private Set<Encoding> encodings = new HashSet<Encoding>();

private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) {
this.path = path;
this.compressor = compressor;
this.buf = new CapacityByteArrayOutputStream(initialSize);
}

@Override
public void writePage(BytesInput bytes, int valueCount, parquet.column.Encoding encoding) throws IOException {
public void writePage(BytesInput bytes, int valueCount, Encoding encoding) throws IOException {
long uncompressedSize = bytes.size();
BytesInput compressedBytes = compressor.compress(bytes);
long compressedSize = compressedBytes.size();
Expand All @@ -67,6 +72,7 @@ public void writePage(BytesInput bytes, int valueCount, parquet.column.Encoding
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
compressedBytes.writeAllTo(buf);
encodings.add(encoding);
}

@Override
Expand All @@ -76,8 +82,9 @@ public long getMemSize() {

public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
writer.startColumn(path, totalValueCount, compressor.getCodecName());
writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength);
writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, new ArrayList<Encoding>(encodings));
writer.endColumn();
encodings.clear();
}

@Override
Expand Down
17 changes: 12 additions & 5 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;


import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -56,7 +58,6 @@ public class ParquetFileWriter {
private static final Log LOG = Log.getLog(ParquetFileWriter.class);

public static final String PARQUET_SUMMARY = "_ParquetSummary";
// TODO: get the right MAGIC
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
public static final int CURRENT_VERSION = 1;

Expand All @@ -71,6 +72,7 @@ public class ParquetFileWriter {
private long uncompressedLength;
private long compressedLength;
private final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
private Set<parquet.column.Encoding> currentEncodings;

/**
* Captures the order in which methods should be called
Expand Down Expand Up @@ -168,7 +170,8 @@ public void startBlock(long recordCount) throws IOException {
public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName) throws IOException {
state = state.startColumn();
if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
currentColumn = new ColumnChunkMetaData(descriptor.getPath(), descriptor.getType(), compressionCodecName);
currentEncodings = new HashSet<parquet.column.Encoding>();
currentColumn = new ColumnChunkMetaData(descriptor.getPath(), descriptor.getType(), compressionCodecName, new ArrayList<parquet.column.Encoding>());
currentColumn.setValueCount(valueCount);
currentColumn.setFirstDataPage(out.getPos());
compressedLength = 0;
Expand All @@ -183,18 +186,19 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio
*/
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes) throws IOException {
BytesInput bytes, parquet.column.Encoding encoding) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write data page: " + valueCount + " values");
int compressedPageSize = (int)bytes.size();
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedPageSize, compressedPageSize);
// pageHeader.crc = ...;
pageHeader.data_page_header = new DataPageHeader(valueCount, Encoding.PLAIN); // TODO: encoding
pageHeader.data_page_header = new DataPageHeader(valueCount, metadataConverter.getEncoding(encoding));
metadataConverter.writePageHeader(pageHeader, out);
this.uncompressedLength += uncompressedPageSize;
this.compressedLength += compressedPageSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
bytes.writeAllTo(out);
currentEncodings.add(encoding);
}

/**
Expand All @@ -204,7 +208,7 @@ public void writeDataPage(
* @param compressedTotalPageSize total compressed size (without page headers)
* @throws IOException
*/
void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compressedTotalPageSize) throws IOException {
void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compressedTotalPageSize, List<parquet.column.Encoding> encodings) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
// int compressedPageSize = (int)bytes.size();
Expand All @@ -216,6 +220,7 @@ void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compr
this.compressedLength += compressedTotalPageSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
bytes.writeAllTo(out);
currentEncodings.addAll(encodings);
}

/**
Expand All @@ -227,11 +232,13 @@ public void endColumn() throws IOException {
if (DEBUG) LOG.debug(out.getPos() + ": end column");
currentColumn.setTotalUncompressedSize(uncompressedLength);
currentColumn.setTotalSize(compressedLength);
currentColumn.getEncodings().addAll(currentEncodings);
currentBlock.addColumn(currentColumn);
if (INFO) LOG.info("ended Column chumk: " + currentColumn);
currentColumn = null;
this.uncompressedLength = 0;
this.compressedLength = 0;
this.currentEncodings.clear();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

import parquet.column.Encoding;
import parquet.schema.PrimitiveType.PrimitiveTypeName;


Expand All @@ -32,6 +34,7 @@ public class ColumnChunkMetaData implements Serializable {
private final CompressionCodecName codec;
private final String[] path;
private final PrimitiveTypeName type;
private final List<Encoding> encodings;

private long firstDataPage;

Expand All @@ -41,15 +44,18 @@ public class ColumnChunkMetaData implements Serializable {

private long totalUncompressedSize;


/**
*
* @param path column identifier
* @param type type of the column
* @param encodings
*/
public ColumnChunkMetaData(String[] path, PrimitiveTypeName type, CompressionCodecName codec) {
public ColumnChunkMetaData(String[] path, PrimitiveTypeName type, CompressionCodecName codec, List<Encoding> encodings) {
this.path = path;
this.type = type;
this.codec = codec;
this.encodings = encodings;
}

public CompressionCodecName getCodec() {
Expand Down Expand Up @@ -131,6 +137,14 @@ public void setTotalSize(long totalSize) {
this.totalSize = totalSize;
}

/**
*
* @return all the encodings used in this column
*/
public List<Encoding> getEncodings() {
return encodings;
}

@Override
public String toString() {
return "ColumnMetaData{" + codec + ", " + firstDataPage + ", " + Arrays.toString(path) + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package parquet.hadoop;

import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -29,6 +29,7 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.junit.Test;

import parquet.column.Encoding;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testBlocksToSplits() throws IOException, InterruptedException {

private BlockMetaData newBlock(long start) {
BlockMetaData blockMetaData = new BlockMetaData();
ColumnChunkMetaData column = new ColumnChunkMetaData(new String[] {"foo"}, PrimitiveTypeName.BINARY, CompressionCodecName.GZIP);
ColumnChunkMetaData column = new ColumnChunkMetaData(new String[] {"foo"}, PrimitiveTypeName.BINARY, CompressionCodecName.GZIP, Arrays.asList(Encoding.PLAIN));
column.setFirstDataPage(start);
blockMetaData.addColumn(column);
return blockMetaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static parquet.column.Encoding.PLAIN;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -66,21 +67,21 @@ public void test() throws Exception {
w.start();
w.startBlock(3);
w.startColumn(c1, 5, codec);
w.writeDataPage(2, 4, BytesInput.from(bytes1));
w.writeDataPage(3, 4, BytesInput.from(bytes1));
w.writeDataPage(2, 4, BytesInput.from(bytes1), PLAIN);
w.writeDataPage(3, 4, BytesInput.from(bytes1), PLAIN);
w.endColumn();
w.startColumn(c2, 6, codec);
w.writeDataPage(2, 4, BytesInput.from(bytes2));
w.writeDataPage(3, 4, BytesInput.from(bytes2));
w.writeDataPage(1, 4, BytesInput.from(bytes2));
w.writeDataPage(2, 4, BytesInput.from(bytes2), PLAIN);
w.writeDataPage(3, 4, BytesInput.from(bytes2), PLAIN);
w.writeDataPage(1, 4, BytesInput.from(bytes2), PLAIN);
w.endColumn();
w.endBlock();
w.startBlock(4);
w.startColumn(c1, 7, codec);
w.writeDataPage(7, 4, BytesInput.from(bytes3));
w.writeDataPage(7, 4, BytesInput.from(bytes3), PLAIN);
w.endColumn();
w.startColumn(c2, 8, codec);
w.writeDataPage(8, 4, BytesInput.from(bytes4));
w.writeDataPage(8, 4, BytesInput.from(bytes4), PLAIN);
w.endColumn();
w.endBlock();
w.end(new HashMap<String, String>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package parquet.pig;

import static parquet.column.Encoding.PLAIN;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
Expand All @@ -27,6 +29,7 @@
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.column.mem.MemColumnWriteStore;
import parquet.column.mem.MemPageStore;
import parquet.column.mem.Page;
Expand Down Expand Up @@ -121,7 +124,7 @@ public static void writeBlock(MessageType schema, MemPageStore pageStore,
Page page = pageReader.readPage();
n += page.getValueCount();
// TODO: change INTFC
w.writeDataPage(page.getValueCount(), (int)page.getBytes().size(), BytesInput.from(page.getBytes().toByteArray()));
w.writeDataPage(page.getValueCount(), (int)page.getBytes().size(), BytesInput.from(page.getBytes().toByteArray()), PLAIN);
} while (n < totalValueCount);
w.endColumn();
}
Expand Down

0 comments on commit e8f8429

Please sign in to comment.