From e8f8429e11372c04afffdcd133199ff87872eebf Mon Sep 17 00:00:00 2001 From: julien Date: Mon, 4 Mar 2013 16:15:54 -0800 Subject: [PATCH 1/9] populate encodings in column metadata --- .../converter/ParquetMetadataConverter.java | 25 ++++++++++++++++--- .../hadoop/ColumnChunkPageWriteStore.java | 13 +++++++--- .../parquet/hadoop/ParquetFileWriter.java | 17 +++++++++---- .../hadoop/metadata/ColumnChunkMetaData.java | 16 +++++++++++- .../java/parquet/hadoop/TestInputFormat.java | 5 ++-- .../parquet/hadoop/TestParquetFileWriter.java | 15 +++++------ .../java/parquet/pig/GenerateIntTestFile.java | 5 +++- 7 files changed, 74 insertions(+), 22 deletions(-) diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java index c2ac34b..ee32065 100644 --- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java @@ -137,7 +137,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List rowGrou columnChunk.file_path = null; // same file columnChunk.meta_data = new parquet.format.ColumnMetaData( getType(columnMetaData.getType()), - Arrays.asList(Encoding.PLAIN), // TODO: deal with encodings + toFormatEncodings(columnMetaData.getEncodings()), Arrays.asList(columnMetaData.getPath()), columnMetaData.getCodec().getParquetCompressionCodec(), columnMetaData.getValueCount(), @@ -154,6 +154,22 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List rowGrou rowGroups.add(rowGroup); } + private List toFormatEncodings(List encodings) { + List converted = new ArrayList(); + for (parquet.column.Encoding encoding : encodings) { + converted.add(getEncoding(encoding)); + } + return converted; + } + + private List fromFormatEncodings(List encodings) { + List converted = new ArrayList(); + for (Encoding encoding : encodings) { + converted.add(getEncoding(encoding)); + } + return converted; + } + public parquet.column.Encoding getEncoding(Encoding encoding) { switch (encoding) { case PLAIN: @@ -244,13 +260,16 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws for (ColumnChunk columnChunk : columns) { parquet.format.ColumnMetaData metaData = columnChunk.meta_data; String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]); - ColumnChunkMetaData column = new ColumnChunkMetaData(path, messageType.getType(path).asPrimitiveType().getPrimitiveTypeName(), CompressionCodecName.fromParquet(metaData.codec)); + ColumnChunkMetaData column = new ColumnChunkMetaData( + path, + messageType.getType(path).asPrimitiveType().getPrimitiveTypeName(), + CompressionCodecName.fromParquet(metaData.codec), + fromFormatEncodings(metaData.encodings)); column.setFirstDataPage(metaData.data_page_offset); column.setValueCount(metaData.num_values); column.setTotalUncompressedSize(metaData.total_uncompressed_size); column.setTotalSize(metaData.total_compressed_size); // TODO - // encodings // index_page_offset // key_value_metadata blockMetaData.addColumn(column); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 195ec4b..6c5cf35 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -16,17 +16,20 @@ package parquet.hadoop; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import parquet.bytes.BytesInput; import parquet.bytes.CapacityByteArrayOutputStream; import parquet.column.ColumnDescriptor; +import parquet.column.Encoding; import parquet.column.mem.PageWriteStore; import parquet.column.mem.PageWriter; import parquet.format.DataPageHeader; -import parquet.format.Encoding; import parquet.format.PageHeader; import parquet.format.PageType; import parquet.format.converter.ParquetMetadataConverter; @@ -48,6 +51,8 @@ private static final class ColumnChunkPageWriter implements PageWriter { private long compressedLength; private long totalValueCount; + private Set encodings = new HashSet(); + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { this.path = path; this.compressor = compressor; @@ -55,7 +60,7 @@ private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, } @Override - public void writePage(BytesInput bytes, int valueCount, parquet.column.Encoding encoding) throws IOException { + public void writePage(BytesInput bytes, int valueCount, Encoding encoding) throws IOException { long uncompressedSize = bytes.size(); BytesInput compressedBytes = compressor.compress(bytes); long compressedSize = compressedBytes.size(); @@ -67,6 +72,7 @@ public void writePage(BytesInput bytes, int valueCount, parquet.column.Encoding this.compressedLength += compressedSize; this.totalValueCount += valueCount; compressedBytes.writeAllTo(buf); + encodings.add(encoding); } @Override @@ -76,8 +82,9 @@ public long getMemSize() { public void writeToFileWriter(ParquetFileWriter writer) throws IOException { writer.startColumn(path, totalValueCount, compressor.getCodecName()); - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength); + writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, new ArrayList(encodings)); writer.endColumn(); + encodings.clear(); } @Override diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java index a808145..880e173 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java @@ -22,8 +22,10 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -56,7 +58,6 @@ public class ParquetFileWriter { private static final Log LOG = Log.getLog(ParquetFileWriter.class); public static final String PARQUET_SUMMARY = "_ParquetSummary"; - // TODO: get the right MAGIC public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); public static final int CURRENT_VERSION = 1; @@ -71,6 +72,7 @@ public class ParquetFileWriter { private long uncompressedLength; private long compressedLength; private final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); + private Set currentEncodings; /** * Captures the order in which methods should be called @@ -168,7 +170,8 @@ public void startBlock(long recordCount) throws IOException { public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName) throws IOException { state = state.startColumn(); if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); - currentColumn = new ColumnChunkMetaData(descriptor.getPath(), descriptor.getType(), compressionCodecName); + currentEncodings = new HashSet(); + currentColumn = new ColumnChunkMetaData(descriptor.getPath(), descriptor.getType(), compressionCodecName, new ArrayList()); currentColumn.setValueCount(valueCount); currentColumn.setFirstDataPage(out.getPos()); compressedLength = 0; @@ -183,18 +186,19 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio */ public void writeDataPage( int valueCount, int uncompressedPageSize, - BytesInput bytes) throws IOException { + BytesInput bytes, parquet.column.Encoding encoding) throws IOException { state = state.write(); if (DEBUG) LOG.debug(out.getPos() + ": write data page: " + valueCount + " values"); int compressedPageSize = (int)bytes.size(); PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedPageSize, compressedPageSize); // pageHeader.crc = ...; - pageHeader.data_page_header = new DataPageHeader(valueCount, Encoding.PLAIN); // TODO: encoding + pageHeader.data_page_header = new DataPageHeader(valueCount, metadataConverter.getEncoding(encoding)); metadataConverter.writePageHeader(pageHeader, out); this.uncompressedLength += uncompressedPageSize; this.compressedLength += compressedPageSize; if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); bytes.writeAllTo(out); + currentEncodings.add(encoding); } /** @@ -204,7 +208,7 @@ public void writeDataPage( * @param compressedTotalPageSize total compressed size (without page headers) * @throws IOException */ - void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compressedTotalPageSize) throws IOException { + void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compressedTotalPageSize, List encodings) throws IOException { state = state.write(); if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); // int compressedPageSize = (int)bytes.size(); @@ -216,6 +220,7 @@ void writeDataPages(BytesInput bytes, long uncompressedTotalPageSize, long compr this.compressedLength += compressedTotalPageSize; if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); bytes.writeAllTo(out); + currentEncodings.addAll(encodings); } /** @@ -227,11 +232,13 @@ public void endColumn() throws IOException { if (DEBUG) LOG.debug(out.getPos() + ": end column"); currentColumn.setTotalUncompressedSize(uncompressedLength); currentColumn.setTotalSize(compressedLength); + currentColumn.getEncodings().addAll(currentEncodings); currentBlock.addColumn(currentColumn); if (INFO) LOG.info("ended Column chumk: " + currentColumn); currentColumn = null; this.uncompressedLength = 0; this.compressedLength = 0; + this.currentEncodings.clear(); } /** diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java index 9468896..27182c1 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -17,7 +17,9 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.List; +import parquet.column.Encoding; import parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -32,6 +34,7 @@ public class ColumnChunkMetaData implements Serializable { private final CompressionCodecName codec; private final String[] path; private final PrimitiveTypeName type; + private final List encodings; private long firstDataPage; @@ -41,15 +44,18 @@ public class ColumnChunkMetaData implements Serializable { private long totalUncompressedSize; + /** * * @param path column identifier * @param type type of the column + * @param encodings */ - public ColumnChunkMetaData(String[] path, PrimitiveTypeName type, CompressionCodecName codec) { + public ColumnChunkMetaData(String[] path, PrimitiveTypeName type, CompressionCodecName codec, List encodings) { this.path = path; this.type = type; this.codec = codec; + this.encodings = encodings; } public CompressionCodecName getCodec() { @@ -131,6 +137,14 @@ public void setTotalSize(long totalSize) { this.totalSize = totalSize; } + /** + * + * @return all the encodings used in this column + */ + public List getEncodings() { + return encodings; + } + @Override public String toString() { return "ColumnMetaData{" + codec + ", " + firstDataPage + ", " + Arrays.toString(path) + "}"; diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java index 89ea4ef..d54ba0a 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java @@ -15,7 +15,7 @@ */ package parquet.hadoop; -import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.ArrayList; @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.junit.Test; +import parquet.column.Encoding; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.CompressionCodecName; @@ -63,7 +64,7 @@ public void testBlocksToSplits() throws IOException, InterruptedException { private BlockMetaData newBlock(long start) { BlockMetaData blockMetaData = new BlockMetaData(); - ColumnChunkMetaData column = new ColumnChunkMetaData(new String[] {"foo"}, PrimitiveTypeName.BINARY, CompressionCodecName.GZIP); + ColumnChunkMetaData column = new ColumnChunkMetaData(new String[] {"foo"}, PrimitiveTypeName.BINARY, CompressionCodecName.GZIP, Arrays.asList(Encoding.PLAIN)); column.setFirstDataPage(start); blockMetaData.addColumn(column); return blockMetaData; diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java index 0d3ff8f..f8d7f6d 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static parquet.column.Encoding.PLAIN; import java.io.File; import java.io.IOException; @@ -66,21 +67,21 @@ public void test() throws Exception { w.start(); w.startBlock(3); w.startColumn(c1, 5, codec); - w.writeDataPage(2, 4, BytesInput.from(bytes1)); - w.writeDataPage(3, 4, BytesInput.from(bytes1)); + w.writeDataPage(2, 4, BytesInput.from(bytes1), PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes1), PLAIN); w.endColumn(); w.startColumn(c2, 6, codec); - w.writeDataPage(2, 4, BytesInput.from(bytes2)); - w.writeDataPage(3, 4, BytesInput.from(bytes2)); - w.writeDataPage(1, 4, BytesInput.from(bytes2)); + w.writeDataPage(2, 4, BytesInput.from(bytes2), PLAIN); + w.writeDataPage(3, 4, BytesInput.from(bytes2), PLAIN); + w.writeDataPage(1, 4, BytesInput.from(bytes2), PLAIN); w.endColumn(); w.endBlock(); w.startBlock(4); w.startColumn(c1, 7, codec); - w.writeDataPage(7, 4, BytesInput.from(bytes3)); + w.writeDataPage(7, 4, BytesInput.from(bytes3), PLAIN); w.endColumn(); w.startColumn(c2, 8, codec); - w.writeDataPage(8, 4, BytesInput.from(bytes4)); + w.writeDataPage(8, 4, BytesInput.from(bytes4), PLAIN); w.endColumn(); w.endBlock(); w.end(new HashMap()); diff --git a/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java b/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java index 03deb5c..3acd5be 100644 --- a/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java +++ b/parquet-pig/src/test/java/parquet/pig/GenerateIntTestFile.java @@ -15,6 +15,8 @@ */ package parquet.pig; +import static parquet.column.Encoding.PLAIN; + import java.io.File; import java.io.IOException; import java.util.HashMap; @@ -27,6 +29,7 @@ import parquet.Log; import parquet.bytes.BytesInput; import parquet.column.ColumnDescriptor; +import parquet.column.Encoding; import parquet.column.mem.MemColumnWriteStore; import parquet.column.mem.MemPageStore; import parquet.column.mem.Page; @@ -121,7 +124,7 @@ public static void writeBlock(MessageType schema, MemPageStore pageStore, Page page = pageReader.readPage(); n += page.getValueCount(); // TODO: change INTFC - w.writeDataPage(page.getValueCount(), (int)page.getBytes().size(), BytesInput.from(page.getBytes().toByteArray())); + w.writeDataPage(page.getValueCount(), (int)page.getBytes().size(), BytesInput.from(page.getBytes().toByteArray()), PLAIN); } while (n < totalValueCount); w.endColumn(); } From d35c264e8f922667d9d04263df2e3b18ec9205cc Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 10:15:13 -0800 Subject: [PATCH 2/9] turn byte[] into Binary object in the api --- .../java/parquet/column/ColumnReader.java | 4 +- .../java/parquet/column/ColumnWriter.java | 4 +- .../column/mem/MemColumnReadStore.java | 5 +- .../parquet/column/mem/MemColumnReader.java | 3 +- .../parquet/column/mem/MemColumnWriter.java | 3 +- .../column/primitive/DevNullColumnReader.java | 8 +- .../column/primitive/DevNullColumnWriter.java | 3 +- .../column/primitive/PlainColumnReader.java | 6 +- .../column/primitive/PlainColumnWriter.java | 7 +- .../primitive/PrimitiveColumnReader.java | 4 +- .../primitive/PrimitiveColumnWriter.java | 3 +- .../main/java/parquet/example/data/Group.java | 9 +- .../example/data/GroupRecordConsumer.java | 3 +- .../example/data/GroupValueSource.java | 5 +- .../example/data/simple/BinaryValue.java | 11 ++- .../example/data/simple/Primitive.java | 3 +- .../example/data/simple/SimpleGroup.java | 7 +- .../convert/SimplePrimitiveConverter.java | 3 +- .../java/parquet/io/BaseRecordReader.java | 2 +- .../src/main/java/parquet/io/Binary.java | 99 +++++++++++++++++++ .../java/parquet/io/ConverterConsumer.java | 2 +- .../main/java/parquet/io/MessageColumnIO.java | 4 +- .../main/java/parquet/io/RecordConsumer.java | 2 +- .../io/RecordConsumerLoggingWrapper.java | 4 +- .../parquet/io/ValidatingRecordConsumer.java | 2 +- .../io/convert/PrimitiveConverter.java | 4 +- .../parquet/column/mem/TestMemColumn.java | 5 +- .../io/ExpectationValidatingConverter.java | 4 +- .../ExpectationValidatingRecordConsumer.java | 4 +- .../test/java/parquet/io/TestColumnIO.java | 4 +- .../java/parquet/pig/TupleRecordConsumer.java | 5 +- .../java/parquet/pig/TupleWriteSupport.java | 3 +- .../parquet/pig/convert/MapConverter.java | 5 +- .../parquet/pig/convert/TupleConverter.java | 9 +- .../test/java/parquet/pig/GenerateTPCH.java | 3 +- .../parquet/thrift/ParquetWriteProtocol.java | 7 +- .../parquet/thrift/ThriftRecordConverter.java | 24 +---- 37 files changed, 200 insertions(+), 83 deletions(-) create mode 100644 parquet-column/src/main/java/parquet/io/Binary.java diff --git a/parquet-column/src/main/java/parquet/column/ColumnReader.java b/parquet-column/src/main/java/parquet/column/ColumnReader.java index 63963e9..90e54a4 100644 --- a/parquet-column/src/main/java/parquet/column/ColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/ColumnReader.java @@ -15,6 +15,8 @@ */ package parquet.column; +import parquet.io.Binary; + /** * reader for (repetition level, definition level, values) triplets * each iteration looks at the current definition level and value as well as the next repetition level @@ -75,7 +77,7 @@ public interface ColumnReader { * * @return the current value */ - byte[] getBinary(); + Binary getBinary(); /** * diff --git a/parquet-column/src/main/java/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/parquet/column/ColumnWriter.java index 780ba6f..8a6b0ea 100644 --- a/parquet-column/src/main/java/parquet/column/ColumnWriter.java +++ b/parquet-column/src/main/java/parquet/column/ColumnWriter.java @@ -15,6 +15,8 @@ */ package parquet.column; +import parquet.io.Binary; + /** * writer for (repetition level, definition level, values) triplets * @@ -53,7 +55,7 @@ public interface ColumnWriter { * @param repetitionLevel * @param definitionLevel */ - void write(byte[] value, int repetitionLevel, int definitionLevel); + void write(Binary value, int repetitionLevel, int definitionLevel); /** * writes the current value diff --git a/parquet-column/src/main/java/parquet/column/mem/MemColumnReadStore.java b/parquet-column/src/main/java/parquet/column/mem/MemColumnReadStore.java index 3efe1c6..e1fac09 100644 --- a/parquet-column/src/main/java/parquet/column/mem/MemColumnReadStore.java +++ b/parquet-column/src/main/java/parquet/column/mem/MemColumnReadStore.java @@ -20,6 +20,7 @@ import parquet.column.ColumnDescriptor; import parquet.column.ColumnReadStore; import parquet.column.ColumnReader; +import parquet.io.Binary; import parquet.io.ParquetDecodingException; @@ -105,14 +106,14 @@ public String getCurrentValueToString() throws IOException { } private static final class BINARYMemColumnReader extends MemColumnReader { - private byte[] current; + private Binary current; public BINARYMemColumnReader(ColumnDescriptor path, PageReader pageReader) { super(path, pageReader); } @Override - public byte[] getBinary() { + public Binary getBinary() { checkValueRead(); return current; } diff --git a/parquet-column/src/main/java/parquet/column/mem/MemColumnReader.java b/parquet-column/src/main/java/parquet/column/mem/MemColumnReader.java index ab1edb9..39d3c38 100644 --- a/parquet-column/src/main/java/parquet/column/mem/MemColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/mem/MemColumnReader.java @@ -28,6 +28,7 @@ import parquet.column.primitive.BoundedColumnFactory; import parquet.column.primitive.PlainColumnReader; import parquet.column.primitive.PrimitiveColumnReader; +import parquet.io.Binary; import parquet.io.ParquetDecodingException; /** @@ -131,7 +132,7 @@ public long getLong() { * @see parquet.column.ColumnReader#getBinary() */ @Override - public byte[] getBinary() { + public Binary getBinary() { throw new UnsupportedOperationException(); } diff --git a/parquet-column/src/main/java/parquet/column/mem/MemColumnWriter.java b/parquet-column/src/main/java/parquet/column/mem/MemColumnWriter.java index 1979ebc..3883874 100644 --- a/parquet-column/src/main/java/parquet/column/mem/MemColumnWriter.java +++ b/parquet-column/src/main/java/parquet/column/mem/MemColumnWriter.java @@ -27,6 +27,7 @@ import parquet.column.primitive.DataColumnWriter; import parquet.column.primitive.PlainColumnWriter; import parquet.column.primitive.PrimitiveColumnWriter; +import parquet.io.Binary; import parquet.io.ParquetEncodingException; @@ -110,7 +111,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) { } @Override - public void write(byte[] value, int repetitionLevel, int definitionLevel) { + public void write(Binary value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); repetitionLevelColumn.writeInteger(repetitionLevel); definitionLevelColumn.writeInteger(definitionLevel); diff --git a/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnReader.java b/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnReader.java index 617123b..c03d2cf 100644 --- a/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnReader.java @@ -17,6 +17,8 @@ import java.io.IOException; +import parquet.io.Binary; + public class DevNullColumnReader extends PrimitiveColumnReader { private boolean defaultBoolean = false; private int defaultInt = 0; @@ -24,7 +26,7 @@ public class DevNullColumnReader extends PrimitiveColumnReader { private byte defaultByte = 0; private float defaultFloat = 0f; private double defaultDouble = 0.0; - private byte[] defaultBytes = new byte[0]; + private Binary defaultBytes = Binary.EMPTY; public void setDefaultBoolean(boolean defaultBoolean) { this.defaultBoolean = defaultBoolean; @@ -50,7 +52,7 @@ public void setDefaultByte(byte defaultByte) { this.defaultByte = defaultByte; } - public void setDefaultBytes(byte[] defaultBytes) { + public void setDefaultBytes(Binary defaultBytes) { this.defaultBytes = defaultBytes; } @@ -66,7 +68,7 @@ public float readFloat() { return defaultFloat; } - public byte[] readBytes() { + public Binary readBytes() { return defaultBytes; } diff --git a/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnWriter.java b/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnWriter.java index 9b5984d..7aca710 100644 --- a/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnWriter.java +++ b/parquet-column/src/main/java/parquet/column/primitive/DevNullColumnWriter.java @@ -16,6 +16,7 @@ package parquet.column.primitive; import parquet.bytes.BytesInput; +import parquet.io.Binary; /** * This is a special writer that doesn't write anything. The idea being that @@ -45,7 +46,7 @@ public void writeBoolean(boolean v) { } @Override - public void writeBytes(byte[] v) { + public void writeBytes(Binary v) { } @Override diff --git a/parquet-column/src/main/java/parquet/column/primitive/PlainColumnReader.java b/parquet-column/src/main/java/parquet/column/primitive/PlainColumnReader.java index 8ff6c7b..e08c2c2 100644 --- a/parquet-column/src/main/java/parquet/column/primitive/PlainColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/primitive/PlainColumnReader.java @@ -22,6 +22,7 @@ import parquet.Log; import parquet.bytes.LittleEndianDataInputStream; +import parquet.io.Binary; import parquet.io.ParquetDecodingException; @@ -46,11 +47,12 @@ public float readFloat() { } @Override - public byte[] readBytes() { + public Binary readBytes() { try { byte[] value = new byte[in.readInt()]; in.readFully(value); - return value; + // TODO: we don't need to read to an array. + return Binary.fromByteArray(value); } catch (IOException e) { throw new ParquetDecodingException("could not read bytes", e); } diff --git a/parquet-column/src/main/java/parquet/column/primitive/PlainColumnWriter.java b/parquet-column/src/main/java/parquet/column/primitive/PlainColumnWriter.java index d3a03d9..ab3650b 100644 --- a/parquet-column/src/main/java/parquet/column/primitive/PlainColumnWriter.java +++ b/parquet-column/src/main/java/parquet/column/primitive/PlainColumnWriter.java @@ -23,6 +23,7 @@ import parquet.bytes.CapacityByteArrayOutputStream; import parquet.bytes.LittleEndianDataOutputStream; import parquet.column.Encoding; +import parquet.io.Binary; import parquet.io.ParquetEncodingException; @@ -46,10 +47,10 @@ public PlainColumnWriter(int initialSize) { } @Override - public final void writeBytes(byte[] v) { + public final void writeBytes(Binary v) { try { - out.writeInt(v.length); - out.write(v); + out.writeInt(v.length()); + v.writeTo(out); } catch (IOException e) { throw new ParquetEncodingException("could not write bytes", e); } diff --git a/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnReader.java b/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnReader.java index 700fdc4..a37922f 100644 --- a/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnReader.java +++ b/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnReader.java @@ -17,6 +17,8 @@ import java.io.IOException; +import parquet.io.Binary; + /** * base class to implement an encoding for a given column * @@ -71,7 +73,7 @@ public float readFloat() { /** * @return the next boolean from the page */ - public byte[] readBytes() { + public Binary readBytes() { throw new UnsupportedOperationException(); } diff --git a/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnWriter.java b/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnWriter.java index 5af86a2..73f9726 100644 --- a/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnWriter.java +++ b/parquet-column/src/main/java/parquet/column/primitive/PrimitiveColumnWriter.java @@ -17,6 +17,7 @@ import parquet.bytes.BytesInput; import parquet.column.Encoding; +import parquet.io.Binary; /** * base class to implement an encoding for a given column @@ -66,7 +67,7 @@ public void writeBoolean(boolean v) { /** * @param value the value to encode */ - public void writeBytes(byte[] v) { + public void writeBytes(Binary v) { throw new UnsupportedOperationException(); } diff --git a/parquet-column/src/main/java/parquet/example/data/Group.java b/parquet-column/src/main/java/parquet/example/data/Group.java index b63a1d9..811a1a4 100644 --- a/parquet-column/src/main/java/parquet/example/data/Group.java +++ b/parquet-column/src/main/java/parquet/example/data/Group.java @@ -16,6 +16,7 @@ package parquet.example.data; import parquet.Log; +import parquet.io.Binary; import parquet.io.RecordConsumer; abstract public class Group extends GroupValueSource { @@ -38,7 +39,7 @@ public void add(String field, boolean value) { add(getType().getFieldIndex(field), value); } - public void add(String field, byte[] value) { + public void add(String field, Binary value) { add(getType().getFieldIndex(field), value); } @@ -59,7 +60,7 @@ public Group getGroup(String field, int index) { abstract public void add(int fieldIndex, boolean value); - abstract public void add(int fieldIndex, byte[] value); + abstract public void add(int fieldIndex, Binary value); abstract public void add(int fieldIndex, float value); @@ -84,7 +85,7 @@ public Group append(String fieldName, long value) { } public Group append(String fieldName, String value) { - add(fieldName, value.getBytes()); + add(fieldName, Binary.fromString(value)); return this; } @@ -93,7 +94,7 @@ public Group append(String fieldName, boolean value) { return this; } - public Group append(String fieldName, byte[] value) { + public Group append(String fieldName, Binary value) { add(fieldName, value); return this; } diff --git a/parquet-column/src/main/java/parquet/example/data/GroupRecordConsumer.java b/parquet-column/src/main/java/parquet/example/data/GroupRecordConsumer.java index 6f8faf0..4fdc7ad 100644 --- a/parquet-column/src/main/java/parquet/example/data/GroupRecordConsumer.java +++ b/parquet-column/src/main/java/parquet/example/data/GroupRecordConsumer.java @@ -20,6 +20,7 @@ import java.util.ArrayDeque; import java.util.Deque; +import parquet.io.Binary; import parquet.io.RecordMaterializer; @@ -82,7 +83,7 @@ public void addBoolean(boolean value) { } @Override - public void addBinary(byte[] value) { + public void addBinary(Binary value) { groups.peek().add(fields.peek(), value); } diff --git a/parquet-column/src/main/java/parquet/example/data/GroupValueSource.java b/parquet-column/src/main/java/parquet/example/data/GroupValueSource.java index 586f845..eee0cb0 100644 --- a/parquet-column/src/main/java/parquet/example/data/GroupValueSource.java +++ b/parquet-column/src/main/java/parquet/example/data/GroupValueSource.java @@ -15,6 +15,7 @@ */ package parquet.example.data; +import parquet.io.Binary; import parquet.schema.GroupType; abstract public class GroupValueSource { @@ -39,7 +40,7 @@ public boolean getBoolean(String field, int index) { return getBoolean(getType().getFieldIndex(field), index); } - public byte[] getBinary(String field, int index) { + public Binary getBinary(String field, int index) { return getBinary(getType().getFieldIndex(field), index); } @@ -53,7 +54,7 @@ public byte[] getBinary(String field, int index) { abstract public boolean getBoolean(int fieldIndex, int index); - abstract public byte[] getBinary(int fieldIndex, int index); + abstract public Binary getBinary(int fieldIndex, int index); abstract public String getValueToString(int fieldIndex, int index); diff --git a/parquet-column/src/main/java/parquet/example/data/simple/BinaryValue.java b/parquet-column/src/main/java/parquet/example/data/simple/BinaryValue.java index 9a39271..7dc0efe 100644 --- a/parquet-column/src/main/java/parquet/example/data/simple/BinaryValue.java +++ b/parquet-column/src/main/java/parquet/example/data/simple/BinaryValue.java @@ -16,25 +16,26 @@ package parquet.example.data.simple; import parquet.bytes.BytesUtils; +import parquet.io.Binary; import parquet.io.RecordConsumer; public class BinaryValue extends Primitive { - private final byte[] binary; + private final Binary binary; - public BinaryValue(byte[] binary) { + public BinaryValue(Binary binary) { this.binary = binary; } @Override - public byte[] getBinary() { + public Binary getBinary() { return binary; } @Override public String getString() { - return new String(binary); + return binary.toStringUsingUTF8(); } @Override @@ -44,6 +45,6 @@ public void writeValue(RecordConsumer recordConsumer) { @Override public String toString() { - return new String(binary, BytesUtils.UTF8); + return getString(); } } diff --git a/parquet-column/src/main/java/parquet/example/data/simple/Primitive.java b/parquet-column/src/main/java/parquet/example/data/simple/Primitive.java index 1d1a2ea..79fb4aa 100644 --- a/parquet-column/src/main/java/parquet/example/data/simple/Primitive.java +++ b/parquet-column/src/main/java/parquet/example/data/simple/Primitive.java @@ -15,6 +15,7 @@ */ package parquet.example.data.simple; +import parquet.io.Binary; import parquet.io.RecordConsumer; public abstract class Primitive { @@ -35,7 +36,7 @@ public boolean getBoolean() { throw new UnsupportedOperationException(); } - public byte[] getBinary() { + public Binary getBinary() { throw new UnsupportedOperationException(); } diff --git a/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java b/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java index 1c3a07b..c968bae 100644 --- a/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java +++ b/parquet-column/src/main/java/parquet/example/data/simple/SimpleGroup.java @@ -19,6 +19,7 @@ import java.util.List; import parquet.example.data.Group; +import parquet.io.Binary; import parquet.io.RecordConsumer; import parquet.schema.GroupType; import parquet.schema.Type; @@ -131,7 +132,7 @@ public boolean getBoolean(int fieldIndex, int index) { } @Override - public byte[] getBinary(int fieldIndex, int index) { + public Binary getBinary(int fieldIndex, int index) { return ((BinaryValue)getValue(fieldIndex, index)).getBinary(); } @@ -147,7 +148,7 @@ public void add(int fieldIndex, long value) { @Override public void add(int fieldIndex, String value) { - add(fieldIndex, new BinaryValue(value.getBytes())); + add(fieldIndex, new BinaryValue(Binary.fromString(value))); } @Override @@ -156,7 +157,7 @@ public void add(int fieldIndex, boolean value) { } @Override - public void add(int fieldIndex, byte[] value) { + public void add(int fieldIndex, Binary value) { add(fieldIndex, new BinaryValue(value)); } diff --git a/parquet-column/src/main/java/parquet/example/data/simple/convert/SimplePrimitiveConverter.java b/parquet-column/src/main/java/parquet/example/data/simple/convert/SimplePrimitiveConverter.java index a7f2336..bd2f3b7 100644 --- a/parquet-column/src/main/java/parquet/example/data/simple/convert/SimplePrimitiveConverter.java +++ b/parquet-column/src/main/java/parquet/example/data/simple/convert/SimplePrimitiveConverter.java @@ -15,6 +15,7 @@ */ package parquet.example.data.simple.convert; +import parquet.io.Binary; import parquet.io.convert.PrimitiveConverter; class SimplePrimitiveConverter extends PrimitiveConverter { @@ -33,7 +34,7 @@ class SimplePrimitiveConverter extends PrimitiveConverter { * @see parquet.io.convert.PrimitiveConverter#addBinary(byte[]) */ @Override - public void addBinary(byte[] value) { + public void addBinary(Binary value) { parent.getCurrentRecord().add(index, value); } diff --git a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java index f9711cb..dabe209 100644 --- a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java +++ b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java @@ -97,7 +97,7 @@ private void endField(String field, int index) { endIndex = index; } - final protected void addPrimitiveBINARY(String field, int index, byte[] value) { + final protected void addPrimitiveBINARY(String field, int index, Binary value) { startField(field, index); if (DEBUG) LOG.debug("addBinary("+value+")"); recordConsumer.addBinary(value); diff --git a/parquet-column/src/main/java/parquet/io/Binary.java b/parquet-column/src/main/java/parquet/io/Binary.java new file mode 100644 index 0000000..7946924 --- /dev/null +++ b/parquet-column/src/main/java/parquet/io/Binary.java @@ -0,0 +1,99 @@ +package parquet.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +import parquet.bytes.BytesUtils; + +abstract public class Binary { + + public static final Binary EMPTY = new Binary() { + @Override + public String toStringUsingUTF8() { + return ""; + } + + @Override + public int length() { + return 0; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + } + + @Override + public byte[] getBytes() { + return new byte[0]; + } + + }; + + public static Binary fromByteArray( + final byte[] value, + final int offset, + final int length) { + + return new Binary() { + @Override + public String toStringUsingUTF8() { + return new String(value, offset, length, BytesUtils.UTF8); + } + + @Override + public int length() { + return length; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(value, offset, length); + } + + @Override + public byte[] getBytes() { + return Arrays.copyOfRange(value, offset, offset + length); + } + + }; + } + + public static Binary fromByteArray(final byte[] value) { + return new Binary() { + @Override + public String toStringUsingUTF8() { + return new String(value, BytesUtils.UTF8); + } + + @Override + public int length() { + return value.length; + } + + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(value); + } + + @Override + public byte[] getBytes() { + return value; + } + + }; + } + + public static Binary fromString(final String value) { + return fromByteArray(value.getBytes(BytesUtils.UTF8)); + } + + abstract public String toStringUsingUTF8(); + + abstract public int length(); + + abstract public void writeTo(OutputStream out) throws IOException; + + abstract public byte[] getBytes(); + +} diff --git a/parquet-column/src/main/java/parquet/io/ConverterConsumer.java b/parquet-column/src/main/java/parquet/io/ConverterConsumer.java index e58f7f1..10c60bf 100644 --- a/parquet-column/src/main/java/parquet/io/ConverterConsumer.java +++ b/parquet-column/src/main/java/parquet/io/ConverterConsumer.java @@ -95,7 +95,7 @@ public void addBoolean(boolean value) { } @Override - public void addBinary(byte[] value) { + public void addBinary(Binary value) { currentPrimitive.addBinary(value); } diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java index d70c6e1..f22d91f 100644 --- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java @@ -218,8 +218,8 @@ public void addBoolean(boolean value) { } @Override - public void addBinary(byte[] value) { - if (DEBUG) log("addBinary("+value.length+" bytes)"); + public void addBinary(Binary value) { + if (DEBUG) log("addBinary("+value.length()+" bytes)"); getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); setRepetitionLevel(); diff --git a/parquet-column/src/main/java/parquet/io/RecordConsumer.java b/parquet-column/src/main/java/parquet/io/RecordConsumer.java index ee424fb..0ba5fd2 100644 --- a/parquet-column/src/main/java/parquet/io/RecordConsumer.java +++ b/parquet-column/src/main/java/parquet/io/RecordConsumer.java @@ -107,7 +107,7 @@ abstract public class RecordConsumer { * add a binary value in the current field * @param value */ - abstract public void addBinary(byte[] value); + abstract public void addBinary(Binary value); /** * add a float value in the current field diff --git a/parquet-column/src/main/java/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/parquet/io/RecordConsumerLoggingWrapper.java index 0c8d930..669da0c 100644 --- a/parquet-column/src/main/java/parquet/io/RecordConsumerLoggingWrapper.java +++ b/parquet-column/src/main/java/parquet/io/RecordConsumerLoggingWrapper.java @@ -105,8 +105,8 @@ public void addBoolean(boolean value) { * {@inheritDoc} */ @Override - public void addBinary(byte[] value) { - if (DEBUG) log(new String(value)); + public void addBinary(Binary value) { + if (DEBUG) log(value.toStringUsingUTF8()); delegate.addBinary(value); } diff --git a/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java index d7df4b8..d80ab69 100644 --- a/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java +++ b/parquet-column/src/main/java/parquet/io/ValidatingRecordConsumer.java @@ -169,7 +169,7 @@ public void addBoolean(boolean value) { /** * {@inheritDoc} */ - public void addBinary(byte[] value) { + public void addBinary(Binary value) { validate(PrimitiveTypeName.BINARY); delegate.addBinary(value); } diff --git a/parquet-column/src/main/java/parquet/io/convert/PrimitiveConverter.java b/parquet-column/src/main/java/parquet/io/convert/PrimitiveConverter.java index 779e5d5..cdda518 100644 --- a/parquet-column/src/main/java/parquet/io/convert/PrimitiveConverter.java +++ b/parquet-column/src/main/java/parquet/io/convert/PrimitiveConverter.java @@ -15,6 +15,8 @@ */ package parquet.io.convert; +import parquet.io.Binary; + /** * converter for leaves of the schema * @@ -27,7 +29,7 @@ abstract public class PrimitiveConverter extends Converter { * @param fieldIndex index of the field * @param value value to set */ - public void addBinary(byte[] value) { + public void addBinary(Binary value) { throw new UnsupportedOperationException(getClass().getName()); } diff --git a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java index 51e030f..25c52a7 100644 --- a/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/parquet/column/mem/TestMemColumn.java @@ -26,6 +26,7 @@ import parquet.column.mem.MemColumnReadStore; import parquet.column.mem.MemColumnWriteStore; import parquet.column.mem.MemPageStore; +import parquet.io.Binary; import parquet.parser.MessageTypeParser; import parquet.schema.MessageType; @@ -68,14 +69,14 @@ public void testMemColumnBinary() throws Exception { ColumnDescriptor path = getCol(schema, col); ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path); - columnWriter.write("42".getBytes(), 0, 0); + columnWriter.write(Binary.fromString("42"), 0, 0); columnWriter.flush(); ColumnReader columnReader = new MemColumnReadStore(memPageStore).getColumnReader(path); while (!columnReader.isFullyConsumed()) { assertEquals(columnReader.getCurrentRepetitionLevel(), 0); assertEquals(columnReader.getCurrentDefinitionLevel(), 0); - assertEquals(new String(columnReader.getBinary()), "42"); + assertEquals(columnReader.getBinary().toStringUsingUTF8(), "42"); columnReader.consume(); } } diff --git a/parquet-column/src/test/java/parquet/io/ExpectationValidatingConverter.java b/parquet-column/src/test/java/parquet/io/ExpectationValidatingConverter.java index 010baa0..ae4bf95 100644 --- a/parquet-column/src/test/java/parquet/io/ExpectationValidatingConverter.java +++ b/parquet-column/src/test/java/parquet/io/ExpectationValidatingConverter.java @@ -55,8 +55,8 @@ private void validate(String message) { } @Override - public void addBinary(byte[] value) { - validate("addBinary("+new String(value)+")"); + public void addBinary(Binary value) { + validate("addBinary("+value.toStringUsingUTF8()+")"); } @Override diff --git a/parquet-column/src/test/java/parquet/io/ExpectationValidatingRecordConsumer.java b/parquet-column/src/test/java/parquet/io/ExpectationValidatingRecordConsumer.java index 9c25044..9317e68 100644 --- a/parquet-column/src/test/java/parquet/io/ExpectationValidatingRecordConsumer.java +++ b/parquet-column/src/test/java/parquet/io/ExpectationValidatingRecordConsumer.java @@ -80,8 +80,8 @@ public void addBoolean(boolean value) { } @Override - public void addBinary(byte[] value) { - validate("addBinary("+new String(value)+")"); + public void addBinary(Binary value) { + validate("addBinary("+value.toStringUsingUTF8()+")"); } @Override diff --git a/parquet-column/src/test/java/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/parquet/io/TestColumnIO.java index 589e6b3..fff511b 100644 --- a/parquet-column/src/test/java/parquet/io/TestColumnIO.java +++ b/parquet-column/src/test/java/parquet/io/TestColumnIO.java @@ -286,8 +286,8 @@ public void writeNull(int repetitionLevel, int definitionLevel) { } @Override - public void write(byte[] value, int repetitionLevel, int definitionLevel) { - validate(new String(value), repetitionLevel, definitionLevel); + public void write(Binary value, int repetitionLevel, int definitionLevel) { + validate(value.toStringUsingUTF8(), repetitionLevel, definitionLevel); } @Override diff --git a/parquet-pig/src/main/java/parquet/pig/TupleRecordConsumer.java b/parquet-pig/src/main/java/parquet/pig/TupleRecordConsumer.java index c15a397..71564d2 100644 --- a/parquet-pig/src/main/java/parquet/pig/TupleRecordConsumer.java +++ b/parquet-pig/src/main/java/parquet/pig/TupleRecordConsumer.java @@ -35,6 +35,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import parquet.Log; +import parquet.io.Binary; import parquet.io.RecordMaterializer; import parquet.schema.GroupType; import parquet.schema.MessageType; @@ -211,8 +212,8 @@ public void addBoolean(boolean value) { } @Override - public void addBinary(byte[] value) { - setCurrentField(new DataByteArray(value)); + public void addBinary(Binary value) { + setCurrentField(new DataByteArray(value.getBytes())); } @Override diff --git a/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java index 23d582a..4286d4a 100644 --- a/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java +++ b/parquet-pig/src/main/java/parquet/pig/TupleWriteSupport.java @@ -37,6 +37,7 @@ import org.apache.pig.parser.ParserException; import parquet.hadoop.WriteSupport; +import parquet.io.Binary; import parquet.io.RecordConsumer; import parquet.schema.GroupType; import parquet.schema.MessageType; @@ -153,7 +154,7 @@ private void writeValue(Type type, FieldSchema pigType, Tuple t, int i) { } else { throw new UnsupportedOperationException("can not convert from " + DataType.findTypeName(pigType.type) + " to BINARY "); } - recordConsumer.addBinary(bytes); + recordConsumer.addBinary(Binary.fromByteArray(bytes)); break; case BOOLEAN: recordConsumer.addBoolean((Boolean)t.get(i)); diff --git a/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java b/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java index d9f17f0..9f93e26 100644 --- a/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java +++ b/parquet-pig/src/main/java/parquet/pig/convert/MapConverter.java @@ -32,6 +32,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import parquet.io.Binary; import parquet.io.convert.Converter; import parquet.io.convert.GroupConverter; import parquet.io.convert.PrimitiveConverter; @@ -149,8 +150,8 @@ final public void end() { final class StringKeyConverter extends PrimitiveConverter { @Override - final public void addBinary(byte[] value) { - currentKey = new String(value, UTF8); + final public void addBinary(Binary value) { + currentKey = value.toStringUsingUTF8(); } } diff --git a/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java index 73de162..4b50469 100644 --- a/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java +++ b/parquet-pig/src/main/java/parquet/pig/convert/TupleConverter.java @@ -30,6 +30,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import parquet.io.Binary; import parquet.io.ParquetDecodingException; import parquet.io.convert.Converter; import parquet.io.convert.GroupConverter; @@ -129,8 +130,8 @@ public FieldStringConverter(int index) { } @Override - final public void addBinary(byte[] value) { - set(index, new String(value, UTF8)); + final public void addBinary(Binary value) { + set(index, value.toStringUsingUTF8()); } } @@ -144,8 +145,8 @@ public FieldByteArrayConverter(int index) { } @Override - final public void addBinary(byte[] value) { - set(index, new DataByteArray(value)); + final public void addBinary(Binary value) { + set(index, new DataByteArray(value.getBytes())); } } diff --git a/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java b/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java index cb4b7e1..0067cd0 100644 --- a/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java +++ b/parquet-pig/src/test/java/parquet/pig/GenerateTPCH.java @@ -28,6 +28,7 @@ import parquet.Log; import parquet.column.mem.MemColumnWriteStore; import parquet.column.mem.MemPageStore; +import parquet.io.Binary; import parquet.io.ColumnIOFactory; import parquet.io.MessageColumnIO; import parquet.io.RecordConsumer; @@ -98,7 +99,7 @@ private static void writeField(RecordConsumer recordWriter, int index, String na if (value instanceof Integer) { recordWriter.addInteger((Integer)value); } else if (value instanceof String) { - recordWriter.addBinary(((String)value).getBytes()); + recordWriter.addBinary(Binary.fromString((String)value)); } else if (value instanceof Double) { recordWriter.addDouble((Double)value); } else { diff --git a/parquet-thrift/src/main/java/parquet/thrift/ParquetWriteProtocol.java b/parquet-thrift/src/main/java/parquet/thrift/ParquetWriteProtocol.java index c9a59ce..0fed4b7 100644 --- a/parquet-thrift/src/main/java/parquet/thrift/ParquetWriteProtocol.java +++ b/parquet-thrift/src/main/java/parquet/thrift/ParquetWriteProtocol.java @@ -30,6 +30,7 @@ import parquet.Log; import parquet.bytes.BytesUtils; +import parquet.io.Binary; import parquet.io.ColumnIO; import parquet.io.GroupColumnIO; import parquet.io.MessageColumnIO; @@ -606,12 +607,12 @@ public void writeBinary(ByteBuffer buf) throws TException { } private void writeBinaryToRecordConsumer(ByteBuffer buf) { - // TODO: check this - recordConsumer.addBinary(buf.array()); + // TODO: check this => pretty sure this is no good + recordConsumer.addBinary(Binary.fromByteArray(buf.array())); } private void writeStringToRecordConsumer(String str) { - recordConsumer.addBinary(str.getBytes(BytesUtils.UTF8)); + recordConsumer.addBinary(Binary.fromString(str)); } private TProtocol getProtocol(ThriftField field, ColumnIO columnIO, Events returnClause) { diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java index 3bc3647..014dad4 100644 --- a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java @@ -1,18 +1,3 @@ -/** - * Copyright 2012 Twitter, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package parquet.thrift; import java.util.ArrayList; @@ -28,6 +13,7 @@ import org.apache.thrift.protocol.TType; import parquet.bytes.BytesUtils; +import parquet.io.Binary; import parquet.io.convert.Converter; import parquet.io.convert.GroupConverter; import parquet.io.convert.PrimitiveConverter; @@ -79,7 +65,7 @@ public TField readFieldBegin() throws TException { } @Override - public void addBinary(byte[] value) { + public void addBinary(Binary value) { startField(); delegate.addBinary(value); endField(); @@ -212,7 +198,7 @@ public PrimitiveCounter(PrimitiveConverter delegate) { } @Override - public void addBinary(byte[] value) { + public void addBinary(Binary value) { delegate.addBinary(value); ++ count; } @@ -330,11 +316,11 @@ public FieldStringConverter(List events, ThriftField field) { } @Override - public void addBinary(final byte[] value) { + public void addBinary(final Binary value) { events.add(new ParquetProtocol() { @Override public String readString() throws TException { - return new String(value, BytesUtils.UTF8); + return value.toStringUsingUTF8(); } }); } From 932695cbb394dcc005ccbff07ef9ab8c0f6cf5c4 Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 10:15:38 -0800 Subject: [PATCH 3/9] update dependency on elephant-bird --- parquet-thrift/pom.xml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index 558a9ff..282e434 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -16,6 +16,7 @@ https://github.com/julienledem/Parquet/parquet-mr + 3.0.7 @@ -37,12 +38,12 @@ com.twitter.elephantbird elephant-bird-core - 3.0.5 + ${elephant-bird.version} com.twitter.elephantbird elephant-bird-pig - 3.0.5 + ${elephant-bird.version} org.codehaus.jackson From 07ea133db57bdfa32fdcc03b370f0ac89d35fd0d Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 10:16:54 -0800 Subject: [PATCH 4/9] license headers --- .../src/main/java/parquet/io/Binary.java | 15 +++++++++++++++ .../parquet/thrift/ThriftRecordConverter.java | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/parquet-column/src/main/java/parquet/io/Binary.java b/parquet-column/src/main/java/parquet/io/Binary.java index 7946924..f11025e 100644 --- a/parquet-column/src/main/java/parquet/io/Binary.java +++ b/parquet-column/src/main/java/parquet/io/Binary.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.io; import java.io.IOException; diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java index 014dad4..ef0c317 100644 --- a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java @@ -1,3 +1,18 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parquet.thrift; import java.util.ArrayList; From ee8ec11a25ede3279df609ee1e402476d8b6d028 Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 11:41:50 -0800 Subject: [PATCH 5/9] javadoc; turn off the compatibility test for now --- .../parquet/hadoop/TestReadIntTestFile.java | 57 +++++++- .../parquet/thrift/ThriftRecordConverter.java | 123 ++++++++++++++---- 2 files changed, 153 insertions(+), 27 deletions(-) diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java index e91549c..e68e034 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java @@ -15,9 +15,16 @@ */ package parquet.hadoop; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.FileWriter; +import java.io.FilenameFilter; import java.io.IOException; import java.io.Writer; import java.util.List; @@ -43,19 +50,40 @@ public class TestReadIntTestFile { private static final Log LOG = Log.getLog(TestReadIntTestFile.class); - @Test +// @Test // TODO move this test to a specific compatibility test repo public void readTest() throws IOException { - Path testFile = new Path(new File("/Users/julien/github/Parquet/parquet-format/testdata/tpch/customer.parquet").toURI()); + + File baseDir = new File("/Users/julien/github/Parquet/parquet-format/testdata/tpch"); + final File[] parquetFiles = baseDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith(".parquet"); + } + }); + + for (File parquetFile : parquetFiles) { + convertToCSV(parquetFile); + } + } + + private void convertToCSV(File parquetFile) throws IOException { + LOG.info("converting " + parquetFile.getName()); + Path testInputFile = new Path(parquetFile.toURI()); + File expectedOutputFile = new File( + parquetFile.getParentFile(), + parquetFile.getName().substring(0, parquetFile.getName().length() - ".parquet".length()) + ".csv"); + File csvOutputFile = new File("target/test/fromExampleFiles", parquetFile.getName()+".readFromJava.csv"); + csvOutputFile.getParentFile().mkdirs(); Configuration configuration = new Configuration(true); - ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, testFile); + ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, testInputFile); MessageType schema = readFooter.getFileMetaData().getSchema(); - ParquetFileReader parquetFileReader = new ParquetFileReader(configuration, testFile, readFooter.getBlocks(), schema.getColumns()); + ParquetFileReader parquetFileReader = new ParquetFileReader(configuration, testInputFile, readFooter.getBlocks(), schema.getColumns()); PageReadStore pages = parquetFileReader.readColumns(); final long rows = pages.getRowCount(); LOG.info("rows: "+rows); final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); - BufferedWriter w = new BufferedWriter(new FileWriter("/Users/julien/github/Parquet/parquet-format/testdata/tpch/customer.parquet.csv")); + BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile)); try { for (int i = 0; i < rows; i++) { final Group g = recordReader.read(); @@ -80,5 +108,24 @@ public void readTest() throws IOException { } finally { w.close(); } + verify(expectedOutputFile, csvOutputFile); + LOG.info("verified " + parquetFile.getName()); + } + + private void verify(File expectedOutputFile, File csvOutputFile) throws IOException { + final BufferedReader expected = new BufferedReader(new FileReader(expectedOutputFile)); + final BufferedReader out = new BufferedReader(new FileReader(csvOutputFile)); + String lineIn; + String lineOut = null; + int lineNumber = 0; + while ((lineIn = expected.readLine()) != null && (lineOut = out.readLine()) != null) { + ++ lineNumber; + lineOut = lineOut.substring(lineOut.indexOf("\t") + 1); + assertEquals("line " + lineNumber, lineIn, lineOut); + } + assertNull("line " + lineNumber, lineIn); + assertNull("line " + lineNumber, out.readLine()); + expected.close(); + out.close(); } } diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java index ef0c317..a3e05b4 100644 --- a/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java +++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftRecordConverter.java @@ -27,25 +27,30 @@ import org.apache.thrift.protocol.TStruct; import org.apache.thrift.protocol.TType; -import parquet.bytes.BytesUtils; import parquet.io.Binary; +import parquet.io.ParquetDecodingException; import parquet.io.convert.Converter; import parquet.io.convert.GroupConverter; import parquet.io.convert.PrimitiveConverter; import parquet.io.convert.RecordConverter; import parquet.schema.GroupType; import parquet.schema.MessageType; -import parquet.schema.PrimitiveType; import parquet.schema.Type; import parquet.thrift.struct.ThriftField; import parquet.thrift.struct.ThriftField.Requirement; import parquet.thrift.struct.ThriftType; import parquet.thrift.struct.ThriftType.ListType; import parquet.thrift.struct.ThriftType.MapType; -import parquet.thrift.struct.ThriftType.SetType; import parquet.thrift.struct.ThriftType.StructType; import parquet.thrift.struct.ThriftTypeID; +/** + * converts the columnar events into a Thrift protocol. + * + * @author Julien Le Dem + * + * @param + */ public class ThriftRecordConverter extends RecordConverter { final ParquetProtocol readFieldEnd = new ParquetProtocol("readFieldEnd()") { @@ -54,7 +59,13 @@ public void readFieldEnd() throws TException { } }; - public class PrimitiveFieldHandler extends PrimitiveConverter { + /** + * Handles field events creation by wrapping the converter for the actual type + * + * @author Julien Le Dem + * + */ + class PrimitiveFieldHandler extends PrimitiveConverter { private final PrimitiveConverter delegate; private final List events; @@ -123,7 +134,13 @@ public void addLong(long value) { } - public class GroupFieldhandler extends GroupConverter { + /** + * Handles field events creation by wrapping the converter for the actual type + * + * @author Julien Le Dem + * + */ + class GroupFieldhandler extends GroupConverter { private final GroupConverter delegate; private final List events; @@ -166,7 +183,13 @@ interface Counter { } - public class GroupCounter extends GroupConverter implements Counter { + /** + * counts the instances created to use in List/Set/Map that need to inform of the element count in the protocol + * + * @author Julien Le Dem + * + */ + class GroupCounter extends GroupConverter implements Counter { private final GroupConverter delegate; private int count; @@ -203,7 +226,13 @@ public int getCount() { } - public class PrimitiveCounter extends PrimitiveConverter implements Counter { + /** + * counts the instances created to use in List/Set/Map that need to inform of the element count in the protocol + * + * @author Julien Le Dem + * + */ + class PrimitiveCounter extends PrimitiveConverter implements Counter { private final PrimitiveConverter delegate; private int count; @@ -260,7 +289,13 @@ public int getCount() { } - public class FieldPrimitiveConverter extends PrimitiveConverter { + /** + * convert primitive values + * + * @author Julien Le Dem + * + */ + class FieldPrimitiveConverter extends PrimitiveConverter { private final List events; @@ -322,7 +357,12 @@ public long readI64() throws TException { } - public class FieldStringConverter extends PrimitiveConverter { + /** + * converts Binary into String + * @author Julien Le Dem + * + */ + class FieldStringConverter extends PrimitiveConverter { private final List events; @@ -342,10 +382,14 @@ public String readString() throws TException { } - public class MapConverter extends GroupConverter { + /** + * convert to Maps + * @author Julien Le Dem + * + */ + class MapConverter extends GroupConverter { private final GroupCounter child; - private final ThriftField field; private final List mapEvents = new ArrayList(); private final List parentEvents; private final byte keyType; @@ -353,7 +397,6 @@ public class MapConverter extends GroupConverter { MapConverter(List parentEvents, GroupType parquetSchema, ThriftField field) { this.parentEvents = parentEvents; - this.field = field; if (parquetSchema.getFieldCount() != 1) { throw new IllegalArgumentException("maps have only one field. " + parquetSchema + " size = " + parquetSchema.getFieldCount()); } @@ -401,7 +444,12 @@ public TMap readMapBegin() throws TException { } - public class MapKeyValueConverter extends GroupConverter { + /** + * converts to a key value pair (in maps) + * @author Julien Le Dem + * + */ + class MapKeyValueConverter extends GroupConverter { private Converter keyConverter; private Converter valueConverter; @@ -434,7 +482,12 @@ public void end() { } - public class SetConverter extends CollectionConverter { + /** + * converts to a Set + * @author Julien Le Dem + * + */ + class SetConverter extends CollectionConverter { final ParquetProtocol readSetEnd = new ParquetProtocol("readSetEnd()") { @Override @@ -466,6 +519,11 @@ void collectionEnd() { } + /** + * converts to a List + * @author Julien Le Dem + * + */ class ListConverter extends CollectionConverter { final ParquetProtocol readListEnd = new ParquetProtocol("readListEnd()") { @@ -498,6 +556,11 @@ void collectionEnd() { } + /** + * Base class to convert List and Set which basically work the same + * @author Julien Le Dem + * + */ abstract class CollectionConverter extends GroupConverter { private final Converter child; @@ -554,9 +617,13 @@ public void end() { } - public class StructConverter extends GroupConverter { + /** + * converts to Struct + * @author Julien Le Dem + * + */ + class StructConverter extends GroupConverter { - private final GroupType parquetSchema; private final int schemaSize; private final Converter[] converters; @@ -569,7 +636,6 @@ private StructConverter(List events, GroupType parquetSchema, ThriftF this.events = events; this.name = field.getName(); this.tStruct = new TStruct(name); - this.parquetSchema = parquetSchema; this.thriftType = (StructType)field.getType(); this.schemaSize = parquetSchema.getFieldCount(); if (schemaSize != thriftType.getChildren().size()) { @@ -629,6 +695,13 @@ public void end() { private final GroupConverter structConverter; private List rootEvents = new ArrayList(); + /** + * + * @param thriftReader the class responsible for instantiating the final object and read from the protocol + * @param name the name of that type ( the thrift class simple name) + * @param parquetSchema the schema for the incoming columnar events + * @param thriftType the thrift type descriptor + */ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageType parquetSchema, ThriftType.StructType thriftType) { super(); this.thriftReader = thriftReader; @@ -636,6 +709,11 @@ public ThriftRecordConverter(ThriftReader thriftReader, String name, MessageT this.structConverter = new StructConverter(rootEvents, parquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType)); } + /** + * + * {@inheritDoc} + * @see parquet.io.convert.RecordConverter#getCurrentRecord() + */ @Override public T getCurrentRecord() { try { @@ -643,11 +721,15 @@ public T getCurrentRecord() { rootEvents.clear(); return thriftReader.readOneRecord(protocol); } catch (TException e) { - //TODO: cleanup - throw new RuntimeException(e); + throw new ParquetDecodingException("Could not read thrift object from protocol", e); } } + /** + * + * {@inheritDoc} + * @see parquet.io.convert.RecordConverter#getRootConverter() + */ @Override public GroupConverter getRootConverter() { return structConverter; @@ -665,9 +747,6 @@ private Converter newConverter(List events, Type type, ThriftField fi return new StructConverter(events, type.asGroupType(), field); case STRING: return new FieldStringConverter(events, field); -// case BINARY: -// primitiveConverters[i] = new FieldByteArrayConverter(i); -// break; default: return new FieldPrimitiveConverter(events, field); } From 54dd652ef68572c3796efbbe636699bf00d3fc6c Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 13:47:25 -0800 Subject: [PATCH 6/9] integrate the thrift changes --- .../converter/ParquetMetadataConverter.java | 42 ++++++++++++------- .../hadoop/ColumnChunkPageWriteStore.java | 13 +++--- .../parquet/hadoop/ParquetFileWriter.java | 14 +++---- .../hadoop/metadata/ParquetMetadata.java | 2 +- .../parquet/hadoop/TestReadIntTestFile.java | 1 - 5 files changed, 39 insertions(+), 33 deletions(-) diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java index ee32065..6fe2bd4 100644 --- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java @@ -35,11 +35,14 @@ import parquet.format.ColumnChunk; import parquet.format.CompressionCodec; +import parquet.format.DataPageHeader; import parquet.format.Encoding; +import parquet.format.FieldLevelEncoding; import parquet.format.FieldRepetitionType; import parquet.format.FileMetaData; import parquet.format.KeyValue; import parquet.format.PageHeader; +import parquet.format.PageType; import parquet.format.RowGroup; import parquet.format.SchemaElement; import parquet.format.Type; @@ -188,20 +191,6 @@ public Encoding getEncoding(parquet.column.Encoding encoding) { } } - private CompressionCodec getCodec(String codecClassName) { - if (codecClassName.equals("org.apache.hadoop.io.compress.GzipCodec")) { - return CompressionCodec.GZIP; - } else if (codecClassName.equals("com.hadoop.compression.lzo.LzopCodec")) { - return CompressionCodec.LZO; - } else if (codecClassName.equals("org.apache.hadoop.io.compress.SnappyCodec")) { - return CompressionCodec.SNAPPY; - } else if (codecClassName.equals("")) { - return CompressionCodec.UNCOMPRESSED; - } else { - throw new RuntimeException("Unknown Codec "+ codecClassName); - } - } - private PrimitiveTypeName getPrimitive(Type type) { switch (type) { case BYTE_ARRAY: @@ -327,7 +316,16 @@ private Repetition fromParquetRepetition(FieldRepetitionType repetition) { throw new RuntimeException("unknown repetition: " + repetition); } - public void writePageHeader(PageHeader pageHeader, OutputStream to) throws IOException { + public void writeDataPageHeader( + int uncompressedSize, + int compressedSize, + int valueCount, + parquet.column.Encoding encoding, + OutputStream to) throws IOException { + writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, encoding), to); + } + + protected void writePageHeader(PageHeader pageHeader, OutputStream to) throws IOException { write(pageHeader, to); } @@ -382,5 +380,19 @@ private void write(TBase tbase, OutputStream to) } } + private PageHeader newDataPageHeader( + int uncompressedSize, int compressedSize, + int valueCount, + parquet.column.Encoding encoding) { + PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, (int)uncompressedSize, (int)compressedSize); + // TODO: pageHeader.crc = ...; + pageHeader.data_page_header = new DataPageHeader( + valueCount, + getEncoding(encoding), + FieldLevelEncoding.RLE, // TODO: manage several encodings + FieldLevelEncoding.BIT_PACKED); + return pageHeader; + } + } diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java index 6c5cf35..e2b7ef6 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -29,9 +29,6 @@ import parquet.column.Encoding; import parquet.column.mem.PageWriteStore; import parquet.column.mem.PageWriter; -import parquet.format.DataPageHeader; -import parquet.format.PageHeader; -import parquet.format.PageType; import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactory.BytesCompressor; import parquet.schema.MessageType; @@ -64,10 +61,12 @@ public void writePage(BytesInput bytes, int valueCount, Encoding encoding) throw long uncompressedSize = bytes.size(); BytesInput compressedBytes = compressor.compress(bytes); long compressedSize = compressedBytes.size(); - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, (int)uncompressedSize, (int)compressedSize); - // pageHeader.crc = ...; - pageHeader.data_page_header = new DataPageHeader(valueCount, parquetMetadataConverter.getEncoding(encoding)); - parquetMetadataConverter.writePageHeader(pageHeader, buf); + parquetMetadataConverter.writeDataPageHeader( + (int)uncompressedSize, + (int)compressedSize, + valueCount, + encoding, + buf); this.uncompressedLength += uncompressedSize; this.compressedLength += compressedSize; this.totalValueCount += valueCount; diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java index 880e173..4b2938b 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -37,10 +36,6 @@ import parquet.bytes.BytesInput; import parquet.bytes.BytesUtils; import parquet.column.ColumnDescriptor; -import parquet.format.DataPageHeader; -import parquet.format.Encoding; -import parquet.format.PageHeader; -import parquet.format.PageType; import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; @@ -190,10 +185,11 @@ public void writeDataPage( state = state.write(); if (DEBUG) LOG.debug(out.getPos() + ": write data page: " + valueCount + " values"); int compressedPageSize = (int)bytes.size(); - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedPageSize, compressedPageSize); - // pageHeader.crc = ...; - pageHeader.data_page_header = new DataPageHeader(valueCount, metadataConverter.getEncoding(encoding)); - metadataConverter.writePageHeader(pageHeader, out); + metadataConverter.writeDataPageHeader( + uncompressedPageSize, compressedPageSize, + valueCount, + encoding, + out); this.uncompressedLength += uncompressedPageSize; this.compressedLength += compressedPageSize; if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ParquetMetadata.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ParquetMetadata.java index a699189..f83d2c8 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ParquetMetadata.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ParquetMetadata.java @@ -39,7 +39,7 @@ public class ParquetMetadata { private static ObjectMapper objectMapper = new ObjectMapper(); private static ObjectMapper prettyObjectMapper = new ObjectMapper(); static { - prettyObjectMapper.getSerializationConfig().set(Feature.INDENT_OUTPUT, true); + prettyObjectMapper.configure(Feature.INDENT_OUTPUT, true); } /** diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java index e68e034..f73f5b3 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java @@ -91,7 +91,6 @@ private void convertToCSV(File parquetFile) throws IOException { final Type type = schema.getFields().get(j); if (j > 0) { w.write('|'); -// System.out.print("|"); } String valueToString = g.getValueToString(j, 0); if (type.isPrimitive() From 0973e7dd05799d75a3907c0e27415b68310c90ea Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 14:56:57 -0800 Subject: [PATCH 7/9] removed outdated comment --- .../src/main/java/parquet/thrift/ThriftSchemaConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java index 701d35f..cfc6e06 100644 --- a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java +++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java @@ -95,7 +95,7 @@ private Type toSchema(String name, Field field, Type.Repetition rep) { } else if (field.isBuffer()) { return new PrimitiveType(rep, BINARY, name); } else if (field.isEnum()) { - return new PrimitiveType(rep, BINARY, name, ENUM); // TODO: should probably store as an int. + return new PrimitiveType(rep, BINARY, name, ENUM); } else if (field.isMap()) { final Field mapKeyField = field.getMapKeyField(); if (mapKeyField.getType() != TType.STRING && From 591c2b9d2bb8c428597091698f762155d6fdee4f Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 15:27:37 -0800 Subject: [PATCH 8/9] move compatibility test to the appropriate repo --- .../parquet/hadoop/TestReadIntTestFile.java | 130 ------------------ 1 file changed, 130 deletions(-) delete mode 100644 parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java deleted file mode 100644 index f73f5b3..0000000 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestReadIntTestFile.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Copyright 2012 Twitter, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.Writer; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.junit.Test; - -import parquet.Log; -import parquet.column.mem.PageReadStore; -import parquet.example.data.Group; -import parquet.example.data.GroupRecordConsumer; -import parquet.example.data.simple.SimpleGroupFactory; -import parquet.example.data.simple.convert.GroupRecordConverter; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.io.RecordReader; -import parquet.schema.MessageType; -import parquet.schema.Type; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - -public class TestReadIntTestFile { - private static final Log LOG = Log.getLog(TestReadIntTestFile.class); - -// @Test // TODO move this test to a specific compatibility test repo - public void readTest() throws IOException { - - File baseDir = new File("/Users/julien/github/Parquet/parquet-format/testdata/tpch"); - final File[] parquetFiles = baseDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(".parquet"); - } - }); - - for (File parquetFile : parquetFiles) { - convertToCSV(parquetFile); - } - } - - private void convertToCSV(File parquetFile) throws IOException { - LOG.info("converting " + parquetFile.getName()); - Path testInputFile = new Path(parquetFile.toURI()); - File expectedOutputFile = new File( - parquetFile.getParentFile(), - parquetFile.getName().substring(0, parquetFile.getName().length() - ".parquet".length()) + ".csv"); - File csvOutputFile = new File("target/test/fromExampleFiles", parquetFile.getName()+".readFromJava.csv"); - csvOutputFile.getParentFile().mkdirs(); - Configuration configuration = new Configuration(true); - ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, testInputFile); - MessageType schema = readFooter.getFileMetaData().getSchema(); - ParquetFileReader parquetFileReader = new ParquetFileReader(configuration, testInputFile, readFooter.getBlocks(), schema.getColumns()); - PageReadStore pages = parquetFileReader.readColumns(); - final long rows = pages.getRowCount(); - LOG.info("rows: "+rows); - final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); - final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); - BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile)); - try { - for (int i = 0; i < rows; i++) { - final Group g = recordReader.read(); - for (int j = 0; j < schema.getFieldCount(); j++) { - final Type type = schema.getFields().get(j); - if (j > 0) { - w.write('|'); - } - String valueToString = g.getValueToString(j, 0); - if (type.isPrimitive() - && (type.asPrimitiveType().getPrimitiveTypeName() == PrimitiveTypeName.FLOAT - || type.asPrimitiveType().getPrimitiveTypeName() == PrimitiveTypeName.DOUBLE) - && valueToString.endsWith(".0")) { - valueToString = valueToString.substring(0, valueToString.length() - 2); - } - w.write(valueToString);// no repetition here - } - w.write('\n'); - // LOG.info(i + ": " + g); - } - } finally { - w.close(); - } - verify(expectedOutputFile, csvOutputFile); - LOG.info("verified " + parquetFile.getName()); - } - - private void verify(File expectedOutputFile, File csvOutputFile) throws IOException { - final BufferedReader expected = new BufferedReader(new FileReader(expectedOutputFile)); - final BufferedReader out = new BufferedReader(new FileReader(csvOutputFile)); - String lineIn; - String lineOut = null; - int lineNumber = 0; - while ((lineIn = expected.readLine()) != null && (lineOut = out.readLine()) != null) { - ++ lineNumber; - lineOut = lineOut.substring(lineOut.indexOf("\t") + 1); - assertEquals("line " + lineNumber, lineIn, lineOut); - } - assertNull("line " + lineNumber, lineIn); - assertNull("line " + lineNumber, out.readLine()); - expected.close(); - out.close(); - } -} From 76fd1d8011bf4f125118e28f11f5aa5539f06fbd Mon Sep 17 00:00:00 2001 From: julien Date: Tue, 5 Mar 2013 16:09:59 -0800 Subject: [PATCH 9/9] cleanup --- parquet-hadoop/pom.xml | 2 +- .../main/java/parquet/hadoop/BlockData.java | 52 ----------- .../main/java/parquet/hadoop/ColumnData.java | 88 ------------------- .../java/parquet/hadoop/PageConsumer.java | 24 ----- .../hadoop/metadata/ColumnMetaData.java | 37 -------- parquet-pig/pom.xml | 2 +- parquet-thrift/pom.xml | 2 +- pom.xml | 2 +- 8 files changed, 4 insertions(+), 205 deletions(-) delete mode 100644 parquet-hadoop/src/main/java/parquet/hadoop/BlockData.java delete mode 100644 parquet-hadoop/src/main/java/parquet/hadoop/ColumnData.java delete mode 100644 parquet-hadoop/src/main/java/parquet/hadoop/PageConsumer.java delete mode 100644 parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnMetaData.java diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 37b19e3..89e1d6c 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -13,7 +13,7 @@ jar Parquet Hadoop - https://github.com/julienledem/Parquet/parquet-mr + https://github.com/Parquet/parquet-mr diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/BlockData.java b/parquet-hadoop/src/main/java/parquet/hadoop/BlockData.java deleted file mode 100644 index e6b8f83..0000000 --- a/parquet-hadoop/src/main/java/parquet/hadoop/BlockData.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2012 Twitter, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import java.util.List; - -/** - * Contains the data of a block in raw form - * - * @author Julien Le Dem - * - */ -class BlockData { - - private final long recordCount; - private final List columns; - - public BlockData(long recordCount, List columns) { - this.recordCount = recordCount; - this.columns = columns; - } - - /** - * - * @return count of records in this block - */ - public long getRecordCount() { - return recordCount; - } - - /** - * column data for this block - * @return - */ - public List getColumns() { - return columns; - } - -} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnData.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnData.java deleted file mode 100644 index 424f68c..0000000 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnData.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright 2012 Twitter, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import java.util.Arrays; - -/** - * Column data for a given block in raw format - * - * @author Julien Le Dem - * - */ -public class ColumnData { - - private final String[] path; - private final byte[] repetitionLevels; - private final byte[] definitionLevels; - private final byte[] data; - - /** - * - * @param path identifier of the column - * @param repetitionLevels repetition levels data - * @param definitionLevels definition levels data - * @param data actual column data - */ - public ColumnData(String[] path, - byte[] repetitionLevels, byte[] definitionLevels, byte[] data) { - super(); - this.path = path; - this.repetitionLevels = repetitionLevels; - this.definitionLevels = definitionLevels; - this.data = data; - } - - /** - * - * @return identifier of the column - */ - public String[] getPath() { - return path; - } - - /** - * - * @return repetition level data - */ - public byte[] getRepetitionLevels() { - return repetitionLevels; - } - - /** - * - * @return definition levels data - */ - public byte[] getDefinitionLevels() { - return definitionLevels; - } - - /** - * - * @return raw column data - */ - public byte[] getData() { - return data; - } - - @Override - public String toString() { - return "ColumnData{"+Arrays.toString(path) + " " - + repetitionLevels.length + "B " - + data.length + "B " - + data.length + "B}"; - } -} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/PageConsumer.java b/parquet-hadoop/src/main/java/parquet/hadoop/PageConsumer.java deleted file mode 100644 index 7b53bac..0000000 --- a/parquet-hadoop/src/main/java/parquet/hadoop/PageConsumer.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright 2012 Twitter, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop; - -import parquet.bytes.BytesInput; - -abstract public class PageConsumer { - - abstract public void consumePage(String[] path, int valueCount, BytesInput bytes); - -} diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnMetaData.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnMetaData.java deleted file mode 100644 index 7d4d8b6..0000000 --- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnMetaData.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright 2012 Twitter, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package parquet.hadoop.metadata; - -public final class ColumnMetaData { - - private final String[] columnPath; - private final CompressionCodecName codec; - - public ColumnMetaData(String[] columnPath, CompressionCodecName codec) { - super(); - this.columnPath = columnPath; - this.codec = codec; - } - - public String[] getColumnPath() { - return columnPath; - } - - public CompressionCodecName getCodec() { - return codec; - } - -} diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml index ba0575e..9d0c9bc 100644 --- a/parquet-pig/pom.xml +++ b/parquet-pig/pom.xml @@ -13,7 +13,7 @@ jar Parquet Pig - https://github.com/julienledem/Parquet/parquet-mr + https://github.com/Parquet/parquet-mr diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index 282e434..517f24c 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -13,7 +13,7 @@ jar Parquet Thrift - https://github.com/julienledem/Parquet/parquet-mr + https://github.com/Parquet/parquet-mr 3.0.7 diff --git a/pom.xml b/pom.xml index 6ded578..0b900a4 100644 --- a/pom.xml +++ b/pom.xml @@ -77,13 +77,13 @@ cobertura-maven-plugin 2.5.2 + true *Exception **/*Exception.class - parquet/example/**/*.class