Skip to content

Commit

Permalink
Convert BlockMetadata into a record
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Jul 1, 2024
1 parent b983eee commit 6d0c9ac
Show file tree
Hide file tree
Showing 15 changed files with 49 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public BloomFilterStore(ParquetDataSource dataSource, BlockMetadata block, Set<C
requireNonNull(columnsFiltered, "columnsFiltered is null");

ImmutableMap.Builder<ColumnPath, Long> bloomFilterOffsetBuilder = ImmutableMap.builder();
for (ColumnChunkMetadata column : block.getColumns()) {
for (ColumnChunkMetadata column : block.columns()) {
ColumnPath path = column.getPath();
if (hasBloomFilter(column) && columnsFiltered.contains(path)) {
bloomFilterOffsetBuilder.put(path, column.getBloomFilterOffset());
Expand Down Expand Up @@ -106,7 +106,7 @@ public static Optional<BloomFilterStore> getBloomFilterStore(
return Optional.empty();
}

boolean hasBloomFilter = blockMetadata.getColumns().stream().anyMatch(BloomFilterStore::hasBloomFilter);
boolean hasBloomFilter = blockMetadata.columns().stream().anyMatch(BloomFilterStore::hasBloomFilter);
if (!hasBloomFilter) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ public void validateBlocksMetadata(ParquetDataSourceId dataSourceId, List<RowGro
BlockMetadata block = rowGroupInfos.get(rowGroupIndex).blockMetaData();
RowGroup rowGroup = rowGroups.get(rowGroupIndex);
validateParquet(
block.getRowCount() == rowGroup.getNum_rows(),
block.rowCount() == rowGroup.getNum_rows(),
dataSourceId,
"Number of rows %d in row group %d did not match %d",
block.getRowCount(),
block.rowCount(),
rowGroupIndex,
rowGroup.getNum_rows());

List<ColumnChunkMetadata> columnChunkMetaData = block.getColumns();
List<ColumnChunkMetadata> columnChunkMetaData = block.columns();
validateParquet(
columnChunkMetaData.size() == rowGroup.getColumnsSize(),
dataSourceId,
Expand Down Expand Up @@ -358,7 +358,7 @@ public WriteChecksum build()
public void validateRowGroupStatistics(ParquetDataSourceId dataSourceId, BlockMetadata blockMetaData, List<ColumnStatistics> actualColumnStatistics)
throws ParquetCorruptionException
{
List<ColumnChunkMetadata> columnChunks = blockMetaData.getColumns();
List<ColumnChunkMetadata> columnChunks = blockMetaData.columns();
checkArgument(
columnChunks.size() == actualColumnStatistics.size(),
"Column chunk metadata count %s did not match column fields count %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,100 +13,12 @@
*/
package io.trino.parquet.metadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class BlockMetadata
public record BlockMetadata(long rowCount, List<ColumnChunkMetadata> columns)
{
private final List<ColumnChunkMetadata> columns = new ArrayList<>();
private long rowCount;
private long totalByteSize;
private String path;
private int ordinal;
private long rowIndexOffset = -1;

public void setPath(String path)
{
this.path = path;
}

public String getPath()
{
return path;
}

public long getRowCount()
{
return rowCount;
}

public void setRowCount(long rowCount)
{
this.rowCount = rowCount;
}

public long getRowIndexOffset()
{
return rowIndexOffset;
}

public void setRowIndexOffset(long rowIndexOffset)
{
this.rowIndexOffset = rowIndexOffset;
}

public long getTotalByteSize()
{
return totalByteSize;
}

public void setTotalByteSize(long totalByteSize)
{
this.totalByteSize = totalByteSize;
}

public void addColumn(ColumnChunkMetadata column)
{
columns.add(column);
}

public List<ColumnChunkMetadata> getColumns()
{
return Collections.unmodifiableList(columns);
}

public long getStartingPos()
{
return getColumns().getFirst().getStartingPos();
}

@Override
public String toString()
{
String rowIndexOffsetString = "";
if (rowIndexOffset != -1) {
rowIndexOffsetString = ", rowIndexOffset = " + rowIndexOffset;
}
return "BlockMetaData{" + rowCount + ", " + totalByteSize + rowIndexOffsetString + " " + columns + "}";
}

public long getCompressedSize()
{
long totalSize = 0;
for (ColumnChunkMetadata col : getColumns()) {
totalSize += col.getTotalSize();
}
return totalSize;
}

public int getOrdinal()
{
return ordinal;
}

public void setOrdinal(int ordinal)
{
this.ordinal = ordinal;
return columns().getFirst().getStartingPos();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static boolean predicateMatches(
int domainCompactionThreshold)
throws IOException
{
if (block.getRowCount() == 0) {
if (block.rowCount() == 0) {
return false;
}
Map<ColumnDescriptor, Statistics<?>> columnStatistics = getStatistics(block, descriptorsByPath);
Expand Down Expand Up @@ -192,7 +192,7 @@ public static List<RowGroupInfo> getFilteredRowGroups(
long fileRowCount = 0;
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (BlockMetadata block : blocksMetaData) {
long blockStart = block.getColumns().getFirst().getStartingPos();
long blockStart = block.getStartingPos();
boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength;
if (splitContainsBlock) {
for (int i = 0; i < parquetTupleDomains.size(); i++) {
Expand All @@ -215,15 +215,15 @@ public static List<RowGroupInfo> getFilteredRowGroups(
}
}
}
fileRowCount += block.getRowCount();
fileRowCount += block.rowCount();
}
return rowGroupInfoBuilder.build();
}

private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetadata blockMetadata, Map<List<String>, ColumnDescriptor> descriptorsByPath)
{
ImmutableMap.Builder<ColumnDescriptor, Statistics<?>> statistics = ImmutableMap.builder();
for (ColumnChunkMetadata columnMetaData : blockMetadata.getColumns()) {
for (ColumnChunkMetadata columnMetaData : blockMetadata.columns()) {
Statistics<?> columnStatistics = columnMetaData.getStatistics();
if (columnStatistics != null) {
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
Expand All @@ -238,7 +238,7 @@ private static Map<ColumnDescriptor, Statistics<?>> getStatistics(BlockMetadata
private static Map<ColumnDescriptor, Long> getColumnValueCounts(BlockMetadata blockMetadata, Map<List<String>, ColumnDescriptor> descriptorsByPath)
{
ImmutableMap.Builder<ColumnDescriptor, Long> columnValueCounts = ImmutableMap.builder();
for (ColumnChunkMetadata columnMetaData : blockMetadata.getColumns()) {
for (ColumnChunkMetadata columnMetaData : blockMetadata.columns()) {
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
if (descriptor != null) {
columnValueCounts.put(descriptor, columnMetaData.getValueCount());
Expand All @@ -256,7 +256,7 @@ private static boolean dictionaryPredicatesMatch(
Optional<ColumnIndexStore> columnIndexStore)
throws IOException
{
for (ColumnChunkMetadata columnMetaData : blockMetadata.getColumns()) {
for (ColumnChunkMetadata columnMetaData : blockMetadata.columns()) {
ColumnDescriptor descriptor = descriptorsByPath.get(Arrays.asList(columnMetaData.getPath().toArray()));
if (descriptor == null || !candidateColumns.contains(descriptor)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.parquet.reader;

import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
Expand Down Expand Up @@ -134,12 +135,10 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, P
List<RowGroup> rowGroups = fileMetaData.getRow_groups();
if (rowGroups != null) {
for (RowGroup rowGroup : rowGroups) {
BlockMetadata blockMetaData = new BlockMetadata();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
List<ColumnChunk> columns = rowGroup.getColumns();
validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup);
String filePath = columns.get(0).getFile_path();
ImmutableList.Builder<ColumnChunkMetadata> columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size());
for (ColumnChunk columnChunk : columns) {
validateParquet(
(filePath == null && columnChunk.getFile_path() == null)
Expand Down Expand Up @@ -167,10 +166,9 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, P
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
column.setBloomFilterOffset(metaData.bloom_filter_offset);
blockMetaData.addColumn(column);
columnMetadataBuilder.add(column);
}
blockMetaData.setPath(filePath);
blocks.add(blockMetaData);
blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public ParquetReader(
int columnId = field.getId();
ColumnChunkMetadata chunkMetadata = getColumnChunkMetaData(metadata, field.getDescriptor());
ColumnPath columnPath = chunkMetadata.getPath();
long rowGroupRowCount = metadata.getRowCount();
long rowGroupRowCount = metadata.rowCount();
long startingPosition = chunkMetadata.getStartingPos();
long totalLength = chunkMetadata.getTotalSize();
long totalDataSize = 0;
Expand Down Expand Up @@ -299,7 +299,7 @@ private boolean advanceToNextRowGroup()
RowGroupInfo rowGroupInfo = rowGroups.get(currentRowGroup);
currentBlockMetadata = rowGroupInfo.blockMetaData();
firstRowIndexInGroup = rowGroupInfo.fileRowOffset();
currentGroupRowCount = currentBlockMetadata.getRowCount();
currentGroupRowCount = currentBlockMetadata.rowCount();
FilteredRowRanges currentGroupRowRanges = blockRowRanges[currentRowGroup];
log.debug("advanceToNextRowGroup dataSource %s, currentRowGroup %d, rowRanges %s, currentBlockMetadata %s", dataSource.getId(), currentRowGroup, currentGroupRowRanges, currentBlockMetadata);
if (currentGroupRowRanges != null) {
Expand Down Expand Up @@ -448,12 +448,12 @@ private ColumnChunk readPrimitive(PrimitiveField field)
int fieldId = field.getId();
ColumnReader columnReader = columnReaders.get(fieldId);
if (!columnReader.hasPageReader()) {
validateParquet(currentBlockMetadata.getRowCount() > 0, dataSource.getId(), "Row group has 0 rows");
validateParquet(currentBlockMetadata.rowCount() > 0, dataSource.getId(), "Row group has 0 rows");
ColumnChunkMetadata metadata = getColumnChunkMetaData(currentBlockMetadata, columnDescriptor);
FilteredRowRanges rowRanges = blockRowRanges[currentRowGroup];
OffsetIndex offsetIndex = null;
if (rowRanges != null) {
offsetIndex = getFilteredOffsetIndex(rowRanges, currentRowGroup, currentBlockMetadata.getRowCount(), metadata.getPath());
offsetIndex = getFilteredOffsetIndex(rowRanges, currentRowGroup, currentBlockMetadata.rowCount(), metadata.getPath());
}
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
columnReader.setPageReader(
Expand Down Expand Up @@ -493,7 +493,7 @@ public Metrics getMetrics()
private ColumnChunkMetadata getColumnChunkMetaData(BlockMetadata blockMetaData, ColumnDescriptor columnDescriptor)
throws IOException
{
for (ColumnChunkMetadata metadata : blockMetaData.getColumns()) {
for (ColumnChunkMetadata metadata : blockMetaData.columns()) {
// Column paths for nested structures have common root, so we compare in reverse to find mismatch sooner
if (arrayEqualsReversed(metadata.getPath().toArray(), columnDescriptor.getPath())) {
return metadata;
Expand Down Expand Up @@ -585,7 +585,7 @@ private static FilteredRowRanges[] calculateFilteredRowRanges(
continue;
}
BlockMetadata metadata = rowGroupInfo.blockMetaData();
long rowGroupRowCount = metadata.getRowCount();
long rowGroupRowCount = metadata.rowCount();
FilteredRowRanges rowRanges = new FilteredRowRanges(ColumnIndexFilter.calculateRowRanges(
FilterCompat.get(filter.get()),
rowGroupColumnIndexStore.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public TrinoColumnIndexStore(

ImmutableList.Builder<ColumnIndexMetadata> columnIndexBuilder = ImmutableList.builderWithExpectedSize(columnsFiltered.size());
ImmutableList.Builder<ColumnIndexMetadata> offsetIndexBuilder = ImmutableList.builderWithExpectedSize(columnsRead.size());
for (ColumnChunkMetadata column : block.getColumns()) {
for (ColumnChunkMetadata column : block.columns()) {
ColumnPath path = column.getPath();
if (column.getColumnIndexReference() != null && columnsFiltered.contains(path)) {
columnIndexBuilder.add(new ColumnIndexMetadata(
Expand Down Expand Up @@ -149,7 +149,7 @@ public static Optional<ColumnIndexStore> getColumnIndexStore(
}

boolean hasColumnIndex = false;
for (ColumnChunkMetadata column : blockMetadata.getColumns()) {
for (ColumnChunkMetadata column : blockMetadata.columns()) {
if (column.getColumnIndexReference() != null && column.getOffsetIndexReference() != null) {
hasColumnIndex = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada
ImmutableList.Builder<RowGroupInfo> rowGroupInfoBuilder = ImmutableList.builder();
for (BlockMetadata block : parquetMetadata.getBlocks()) {
rowGroupInfoBuilder.add(new RowGroupInfo(block, nextStart, Optional.empty()));
nextStart += block.getRowCount();
nextStart += block.rowCount();
}
return new ParquetReader(
Optional.ofNullable(fileMetaData.getCreatedBy()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public void testColumnReaderMemoryUsage()
assertThat(parquetMetadata.getBlocks().size()).isGreaterThan(1);
// Verify file has only non-dictionary encodings as dictionary memory usage is already tested in TestFlatColumnReader#testMemoryUsage
parquetMetadata.getBlocks().forEach(block -> {
block.getColumns()
block.columns()
.forEach(columnChunkMetaData -> assertThat(columnChunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isFalse());
assertThat(block.getRowCount()).isEqualTo(100);
assertThat(block.rowCount()).isEqualTo(100);
});

AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();
Expand All @@ -105,7 +105,7 @@ public void testColumnReaderMemoryUsage()
assertThat(currentMemoryUsage).isGreaterThan(initialMemoryUsage);

// Memory usage does not change until next row group (1 page per row-group)
long rowGroupRowCount = parquetMetadata.getBlocks().get(0).getRowCount();
long rowGroupRowCount = parquetMetadata.getBlocks().get(0).rowCount();
int rowsRead = page.getPositionCount();
while (rowsRead < rowGroupRowCount) {
rowsRead += reader.nextPage().getPositionCount();
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testEmptyRowRangesWithColumnIndex()
assertThat(metrics).containsKey(COLUMN_INDEX_ROWS_FILTERED);
// Column index should filter at least the first row group
assertThat(((Count<?>) metrics.get(COLUMN_INDEX_ROWS_FILTERED)).getTotal())
.isGreaterThanOrEqualTo(parquetMetadata.getBlocks().get(0).getRowCount());
.isGreaterThanOrEqualTo(parquetMetadata.getBlocks().get(0).rowCount());
}
}

Expand Down
Loading

0 comments on commit 6d0c9ac

Please sign in to comment.