Skip to content

Commit

Permalink
More cleanup/renames
Browse files Browse the repository at this point in the history
  • Loading branch information
toddlipcon committed Mar 6, 2013
1 parent 003299e commit 962dd9e
Show file tree
Hide file tree
Showing 17 changed files with 128 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
/**
* contains all the readers for all the columns of the corresponding row group
*
* TODO: rename to RowGroup?
*
* @author Julien Le Dem
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
for (ColumnChunkMetaData columnMetaData : columns) {
ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPage()); // verify this is the right offset
ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset
columnChunk.file_path = null; // same file
columnChunk.meta_data = new parquet.format.ColumnMetaData(
getType(columnMetaData.getType()),
Expand All @@ -143,7 +143,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
columnMetaData.getValueCount(),
columnMetaData.getTotalUncompressedSize(),
columnMetaData.getTotalSize(),
columnMetaData.getFirstDataPage()
columnMetaData.getFirstDataPageOffset()
);
// columnChunk.meta_data.index_page_offset = ;
// columnChunk.meta_data.key_value_metadata = ; // nothing yet
Expand Down Expand Up @@ -245,7 +245,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
ColumnChunkMetaData column = new ColumnChunkMetaData(path, messageType.getType(path).asPrimitiveType().getPrimitiveTypeName(), CompressionCodecName.fromParquet(metaData.codec));
column.setFirstDataPage(metaData.data_page_offset);
column.setFirstDataPageOffset(metaData.data_page_offset);
column.setValueCount(metaData.num_values);
column.setTotalUncompressedSize(metaData.total_uncompressed_size);
column.setTotalSize(metaData.total_compressed_size);
Expand Down
42 changes: 24 additions & 18 deletions parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import parquet.bytes.BytesInput;
import parquet.hadoop.metadata.CompressionCodecName;

public class CodecFactory {
class CodecFactory {

public class BytesDecompressor {

Expand Down Expand Up @@ -143,34 +143,40 @@ private CompressionCodec getCodec(CompressionCodecName codecName) {
String codecClassName = codecName.getHadoopCompressionCodecClass();
if (codecClassName == null) {
return null;
} else if (codecByName.containsKey(codecClassName)) {
return codecByName.get(codecClassName);
} else {
try {
Class<?> codecClass = Class.forName(codecClassName);
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
codecByName.put(codecClassName, codec);
return codec;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class " + codecClassName + " was not found", e);
}
}
CompressionCodec codec = codecByName.get(codecClassName);
if (codec != null) {
return codec;
}

try {
Class<?> codecClass = Class.forName(codecClassName);
codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
codecByName.put(codecClassName, codec);
return codec;
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class " + codecClassName + " was not found", e);
}
}

public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
if (!compressors.containsKey(codecName)) {
BytesCompressor comp = compressors.get(codecName);
if (comp == null) {
CompressionCodec codec = getCodec(codecName);
compressors.put(codecName, new BytesCompressor(codecName, codec, pageSize));
comp = new BytesCompressor(codecName, codec, pageSize);
compressors.put(codecName, comp);
}
return compressors.get(codecName);
return comp;
}

public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
if (!decompressors.containsKey(codecName)) {
BytesDecompressor decomp = decompressors.get(codecName);
if (decomp == null) {
CompressionCodec codec = getCodec(codecName);
decompressors.put(codecName, new BytesDecompressor(codec));
decomp = new BytesDecompressor(codec);
decompressors.put(codecName, decomp);
}
return decompressors.get(codecName);
return decomp;
}

public void release() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand All @@ -28,24 +29,36 @@
import parquet.column.mem.PageReader;
import parquet.hadoop.CodecFactory.BytesDecompressor;


public class ColumnChunkPageReadStore implements PageReadStore {
/**
* TODO: should this actually be called RowGroupImpl or something?
* The name is kind of confusing since it references three different "entities"
* in our format: columns, chunks, and pages
*
*/
class ColumnChunkPageReadStore implements PageReadStore {
private static final Log LOG = Log.getLog(ColumnChunkPageReadStore.class);

/**
* PageReader for a single column chunk. A column chunk contains
* several pages, which are yielded one by one in order.
*
* This implementation is provided with a list of pages, each of which
* is decompressed and passed through.
*/
static final class ColumnChunkPageReader implements PageReader {

private final BytesDecompressor decompressor;
private final long valueCount;
private final List<Page> pages = new ArrayList<Page>();
private int currentPage = 0;
private final List<Page> compressedPages;

ColumnChunkPageReader(BytesDecompressor decompressor, long valueCount) {
ColumnChunkPageReader(BytesDecompressor decompressor, List<Page> compressedPages) {
this.decompressor = decompressor;
this.valueCount = valueCount;
}

public void addPage(Page page) {
pages.add(page);
this.compressedPages = new LinkedList<Page>(compressedPages);
int count = 0;
for (Page p : compressedPages) {
count += p.getValueCount();
}
this.valueCount = count;
}

@Override
Expand All @@ -55,20 +68,18 @@ public long getTotalValueCount() {

@Override
public Page readPage() {
if (currentPage == pages.size()) {
if (compressedPages.isEmpty()) {
return null;
} else {
try {
Page compressedPage= pages.get(currentPage);
++ currentPage;
return new Page(
decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
compressedPage.getValueCount(),
compressedPage.getUncompressedSize(),
compressedPage.getEncoding());
} catch (IOException e) {
throw new RuntimeException(e); // TODO: cleanup
}
}
Page compressedPage = compressedPages.remove(0);
try {
return new Page(
decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
compressedPage.getValueCount(),
compressedPage.getUncompressedSize(),
compressedPage.getEncoding());
} catch (IOException e) {
throw new RuntimeException(e); // TODO: cleanup
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import parquet.hadoop.CodecFactory.BytesCompressor;
import parquet.schema.MessageType;

public class ColumnChunkPageWriteStore implements PageWriteStore {
class ColumnChunkPageWriteStore implements PageWriteStore {

private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @author Julien Le Dem
*
*/
public class ColumnData {
class ColumnData {

private final String[] path;
private final byte[] repetitionLevels;
Expand Down
2 changes: 1 addition & 1 deletion parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @author Julien Le Dem
*
*/
public class Footer {
class Footer {

private final Path file;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import parquet.bytes.BytesInput;

abstract public class PageConsumer {
abstract class PageConsumer {

abstract public void consumePage(String[] path, int valueCount, BytesInput bytes);

Expand Down
71 changes: 48 additions & 23 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ public static List<Footer> readAllFootersInParallelUsingSummaryFiles(final Confi
List<Footer> result = new ArrayList<Footer>(partFiles.size());
List<FileStatus> toRead = new ArrayList<FileStatus>();
for (FileStatus part : partFiles) {
if (cache.containsKey(part.getPath())) {
result.add(cache.get(part.getPath()));
Footer f = cache.get(part.getPath());
if (f != null) {
result.add(f);
} else {
toRead.add(part);
}
Expand Down Expand Up @@ -262,6 +263,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File

private final List<BlockMetaData> blocks;
private final FSDataInputStream f;
private final Path filePath;
private int currentBlock = 0;
private Map<String, ColumnDescriptor> paths = new HashMap<String, ColumnDescriptor>();

Expand All @@ -275,6 +277,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
*/
public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
FileSystem fs = FileSystem.get(configuration);
this.filePath = filePath;
this.f = fs.open(filePath);
this.blocks = blocks;
for (ColumnDescriptor col : columns) {
Expand All @@ -284,12 +287,11 @@ public ParquetFileReader(Configuration configuration, Path filePath, List<BlockM
}

/**
* reads all the columns requested in the next block
* @return the block data for the next block
* Reads all the columns requested from the row group at the current file position.
* @throws IOException if an error occurs while reading
* @return how many records where read or 0 if end reached.
* @return the PageReadStore which can provide PageReaders for each column.
*/
public PageReadStore readColumns() throws IOException {
public PageReadStore readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
}
Expand All @@ -300,31 +302,54 @@ public PageReadStore readColumns() throws IOException {
ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount());
for (ColumnChunkMetaData mc : block.getColumns()) {
String pathKey = Arrays.toString(mc.getPath());
if (paths.containsKey(pathKey)) {
ColumnDescriptor columnDescriptor = paths.get(pathKey);
f.seek(mc.getFirstDataPage());
if (DEBUG) LOG.debug(f.getPos() + ": start column chunk " + Arrays.toString(mc.getPath()) + " " + mc.getType() + " count=" + mc.getValueCount());
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
List<Page> pagesInChunk = readColumnChunkPages(columnDescriptor, mc);
BytesDecompressor decompressor = codecFactory.getDecompressor(mc.getCodec());
ColumnChunkPageReader columnChunkPageReader = new ColumnChunkPageReader(decompressor, mc.getValueCount());
long valuesCountReadSoFar = 0;
while (valuesCountReadSoFar < mc.getValueCount()) {
PageHeader pageHeader = readNextDataPageHeader();
columnChunkPageReader.addPage(
new Page(
BytesInput.copy(BytesInput.from(f, pageHeader.compressed_page_size)),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
));
valuesCountReadSoFar += pageHeader.data_page_header.num_values;
}
ColumnChunkPageReader columnChunkPageReader = new ColumnChunkPageReader(decompressor, pagesInChunk);
columnChunkPageReadStore.addColumn(columnDescriptor, columnChunkPageReader);
}
}
++currentBlock;
return columnChunkPageReadStore;
}

/**
* Read all of the pages in a given column chunk.
* @return the list of pages
*/
private List<Page> readColumnChunkPages(ColumnDescriptor columnDescriptor, ColumnChunkMetaData metadata)
throws IOException {
f.seek(metadata.getFirstDataPageOffset());
if (DEBUG) {
LOG.debug(f.getPos() + ": start column chunk " + Arrays.toString(metadata.getPath()) +
" " + metadata.getType() + " count=" + metadata.getValueCount());
}

List<Page> pagesInChunk = new ArrayList<Page>();
long valuesCountReadSoFar = 0;
while (valuesCountReadSoFar < metadata.getValueCount()) {
PageHeader pageHeader = readNextDataPageHeader();
pagesInChunk.add(
new Page(
BytesInput.copy(BytesInput.from(f, pageHeader.compressed_page_size)),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
));
valuesCountReadSoFar += pageHeader.data_page_header.num_values;
}
if (valuesCountReadSoFar != metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + metadata.getValueCount() + " values in column chunk at " +
filePath + " offset " + metadata.getFirstDataPageOffset() +
" but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + f.getPos());
}
return pagesInChunk;
}

private PageHeader readNextDataPageHeader() throws IOException {
PageHeader pageHeader;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio
if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
currentColumn = new ColumnChunkMetaData(descriptor.getPath(), descriptor.getType(), compressionCodecName);
currentColumn.setValueCount(valueCount);
currentColumn.setFirstDataPage(out.getPos());
currentColumn.setFirstDataPageOffset(out.getPos());
compressedLength = 0;
uncompressedLength = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public int compare(BlockLocation b1, BlockLocation b2) {
splitGroups.add(new ArrayList<BlockMetaData>());
}
for (BlockMetaData block : blocks) {
final long firstDataPage = block.getColumns().get(0).getFirstDataPage();
final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
int index = Arrays.binarySearch(hdfsBlocks, new BlockLocation() {@Override
public long getOffset() {
return firstDataPage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void checkRead() throws IOException {

LOG.info("at row " + current + ". reading next block");
long t0 = System.currentTimeMillis();
PageReadStore pages = reader.readColumns();
PageReadStore pages = reader.readNextRowGroup();
if (pages == null) {
throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ public PrimitiveTypeName getType() {
*
* @param dataStart offset in the file where data starts
*/
public void setFirstDataPage(long firstDataPage) {
public void setFirstDataPageOffset(long firstDataPage) {
this.firstDataPage = firstDataPage;
}

/**
*
* @return start of the column data offset
*/
public long getFirstDataPage() {
public long getFirstDataPageOffset() {
return firstDataPage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testBlocksToSplits() throws IOException, InterruptedException {
private BlockMetaData newBlock(long start) {
BlockMetaData blockMetaData = new BlockMetaData();
ColumnChunkMetaData column = new ColumnChunkMetaData(new String[] {"foo"}, PrimitiveTypeName.BINARY, CompressionCodecName.GZIP);
column.setFirstDataPage(start);
column.setFirstDataPageOffset(start);
blockMetaData.addColumn(column);
return blockMetaData;
}
Expand Down
Loading

0 comments on commit 962dd9e

Please sign in to comment.