Skip to content

Commit

Permalink
[FLINK-27990][table-planner] Parquet format supports reporting statis…
Browse files Browse the repository at this point in the history
…tics

This closes apache#20008
  • Loading branch information
swuferhong authored and godfreyhe committed Jul 14, 2022
1 parent 74f90d7 commit 4bafbe2
Show file tree
Hide file tree
Showing 9 changed files with 860 additions and 10 deletions.
8 changes: 8 additions & 0 deletions flink-formats/flink-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ under the License.

<!-- Tests -->

<!--Support ParquetFileSystemStatisticsReport using calcite verify plan methods, see FLINK-27990 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.formats.parquet;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
Expand All @@ -26,25 +27,59 @@
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.connector.file.table.factories.BulkWriterFormatFactory;
import org.apache.flink.connector.file.table.format.BulkDecodingFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.format.FileBasedStatisticsReportableInputFormat;
import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.plan.stats.ColumnStats;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.DateTimeUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.DoubleStatistics;
import org.apache.parquet.column.statistics.FloatStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigOptions.key;

Expand Down Expand Up @@ -111,9 +146,14 @@ public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}

private static class ParquetBulkDecodingFormat
/**
* ParquetBulkDecodingFormat which implements {@link FileBasedStatisticsReportableInputFormat}.
*/
@VisibleForTesting
public static class ParquetBulkDecodingFormat
implements ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
BulkDecodingFormat<RowData> {
BulkDecodingFormat<RowData>,
FileBasedStatisticsReportableInputFormat {

private final ReadableConfig formatOptions;

Expand Down Expand Up @@ -142,5 +182,241 @@ public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}

@Override
public TableStats reportStatistics(List<Path> files, DataType producedDataType) {
try {
Configuration hadoopConfig = getParquetConfiguration(formatOptions);
Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
RowType producedRowType = (RowType) producedDataType.getLogicalType();
long rowCount = 0;
for (Path file : files) {
rowCount += updateStatistics(hadoopConfig, file, columnStatisticsMap);
}
Map<String, ColumnStats> columnStatsMap =
convertToColumnStats(columnStatisticsMap, producedRowType);
return new TableStats(rowCount, columnStatsMap);
} catch (Exception e) {
return TableStats.UNKNOWN;
}
}

private Map<String, ColumnStats> convertToColumnStats(
Map<String, Statistics<?>> columnStatisticsMap, RowType producedRowType) {
Map<String, ColumnStats> columnStatMap = new HashMap<>();
for (String column : producedRowType.getFieldNames()) {
Statistics<?> statistics = columnStatisticsMap.get(column);
if (statistics == null) {
continue;
}
ColumnStats columnStats =
convertToColumnStats(
producedRowType.getTypeAt(producedRowType.getFieldIndex(column)),
statistics);
columnStatMap.put(column, columnStats);
}
return columnStatMap;
}

private ColumnStats convertToColumnStats(
LogicalType logicalType, Statistics<?> statistics) {
ColumnStats.Builder builder =
new ColumnStats.Builder().setNullCount(statistics.getNumNulls());

// For complex types: ROW, ARRAY, MAP. The returned statistics have wrong null count
// value, so now complex types stats return null.
switch (logicalType.getTypeRoot()) {
case BOOLEAN:
case BINARY:
case VARBINARY:
break;
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
if (statistics instanceof IntStatistics) {
builder.setMin(((IntStatistics) statistics).getMin())
.setMax(((IntStatistics) statistics).getMax());
} else if (statistics instanceof LongStatistics) {
builder.setMin(((LongStatistics) statistics).getMin())
.setMax(((LongStatistics) statistics).getMax());
} else {
return null;
}
break;
case DOUBLE:
if (statistics instanceof DoubleStatistics) {
builder.setMin(((DoubleStatistics) statistics).getMin())
.setMax(((DoubleStatistics) statistics).getMax());
break;
} else {
return null;
}
case FLOAT:
if (statistics instanceof FloatStatistics) {
builder.setMin(((FloatStatistics) statistics).getMin())
.setMax(((FloatStatistics) statistics).getMax());
break;
} else {
return null;
}
case DATE:
if (statistics instanceof IntStatistics) {
Date min =
Date.valueOf(
DateTimeUtils.formatDate(
((IntStatistics) statistics).getMin()));
Date max =
Date.valueOf(
DateTimeUtils.formatDate(
((IntStatistics) statistics).getMax()));
builder.setMin(min).setMax(max);
break;
} else {
return null;
}
case TIME_WITHOUT_TIME_ZONE:
if (statistics instanceof IntStatistics) {
Time min =
Time.valueOf(
DateTimeUtils.toLocalTime(
((IntStatistics) statistics).getMin()));
Time max =
Time.valueOf(
DateTimeUtils.toLocalTime(
((IntStatistics) statistics).getMax()));
builder.setMin(min).setMax(max);
break;
} else {
return null;
}
case CHAR:
case VARCHAR:
if (statistics instanceof BinaryStatistics) {
Binary min = ((BinaryStatistics) statistics).genericGetMin();
Binary max = ((BinaryStatistics) statistics).genericGetMax();
if (min != null) {
builder.setMin(min.toStringUsingUTF8());
} else {
builder.setMin(null);
}
if (max != null) {
builder.setMax(max.toStringUsingUTF8());
} else {
builder.setMax(null);
}
break;
} else {
return null;
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
if (statistics instanceof LongStatistics) {
builder.setMin(new Timestamp(((LongStatistics) statistics).getMin()))
.setMax(new Timestamp(((LongStatistics) statistics).getMax()));
} else if (statistics instanceof BinaryStatistics) {
Binary min = ((BinaryStatistics) statistics).genericGetMin();
Binary max = ((BinaryStatistics) statistics).genericGetMax();
if (min != null) {
builder.setMin(binaryToTimestamp(min, formatOptions.get(UTC_TIMEZONE)));
} else {
builder.setMin(null);
}
if (max != null) {
builder.setMax(binaryToTimestamp(max, formatOptions.get(UTC_TIMEZONE)));
} else {
builder.setMax(null);
}
} else {
return null;
}
break;
case DECIMAL:
if (statistics instanceof IntStatistics) {
builder.setMin(BigDecimal.valueOf(((IntStatistics) statistics).getMin()))
.setMax(BigDecimal.valueOf(((IntStatistics) statistics).getMax()));
} else if (statistics instanceof LongStatistics) {
builder.setMin(BigDecimal.valueOf(((LongStatistics) statistics).getMin()))
.setMax(BigDecimal.valueOf(((LongStatistics) statistics).getMax()));
} else if (statistics instanceof BinaryStatistics) {
Binary min = ((BinaryStatistics) statistics).genericGetMin();
Binary max = ((BinaryStatistics) statistics).genericGetMax();
if (min != null) {
builder.setMin(
binaryToDecimal(min, ((DecimalType) logicalType).getScale()));
} else {
builder.setMin(null);
}
if (max != null) {
builder.setMax(
binaryToDecimal(max, ((DecimalType) logicalType).getScale()));
} else {
builder.setMax(null);
}
} else {
return null;
}
break;
default:
return null;
}
return builder.build();
}

private long updateStatistics(
Configuration hadoopConfig,
Path file,
Map<String, Statistics<?>> columnStatisticsMap)
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = new org.apache.hadoop.fs.Path(file.toUri());
ParquetMetadata metadata = ParquetFileReader.readFooter(hadoopConfig, hadoopPath);
MessageType schema = metadata.getFileMetaData().getSchema();
List<String> columns =
schema.asGroupType().getFields().stream()
.map(Type::getName)
.collect(Collectors.toList());
List<BlockMetaData> blocks = metadata.getBlocks();
long rowCount = 0;
for (BlockMetaData block : blocks) {
rowCount += block.getRowCount();
for (int i = 0; i < columns.size(); ++i) {
updateStatistics(
block.getColumns().get(i).getStatistics(),
columns.get(i),
columnStatisticsMap);
}
}
return rowCount;
}

private void updateStatistics(
Statistics<?> statistics,
String column,
Map<String, Statistics<?>> columnStatisticsMap) {
Statistics<?> previousStatistics = columnStatisticsMap.get(column);
if (previousStatistics == null) {
columnStatisticsMap.put(column, statistics);
} else {
previousStatistics.mergeStatistics(statistics);
}
}

private static BigDecimal binaryToDecimal(Binary decimal, int scale) {
BigInteger bigInteger = new BigInteger(decimal.getBytesUnsafe());
return new BigDecimal(bigInteger, scale);
}

private static Timestamp binaryToTimestamp(Binary timestamp, boolean utcTimestamp) {
Preconditions.checkArgument(timestamp.length() == 12, "Must be 12 bytes");
ByteBuffer buf = timestamp.toByteBuffer();
buf.order(ByteOrder.LITTLE_ENDIAN);
long timeOfDayNanos = buf.getLong();
int julianDay = buf.getInt();

TimestampData timestampData =
TimestampColumnReader.int96ToTimestamp(utcTimestamp, timeOfDayNanos, julianDay);
return timestampData.toTimestamp();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static TimestampData decodeInt96ToTimestamp(
return int96ToTimestamp(utcTimestamp, buffer.getLong(), buffer.getInt());
}

private static TimestampData int96ToTimestamp(
public static TimestampData int96ToTimestamp(
boolean utcTimestamp, long nanosOfDay, int julianDay) {
long millisecond = julianDayToMillis(julianDay) + (nanosOfDay / NANOS_PER_MILLISECOND);

Expand Down
Loading

0 comments on commit 4bafbe2

Please sign in to comment.