Skip to content

Commit

Permalink
[FLINK-28075][table-planner] get statistics for partitioned table eve…
Browse files Browse the repository at this point in the history
…n without partition pruning

This closes apache#20248
  • Loading branch information
swuferhong authored and godfreyhe committed Jul 14, 2022
1 parent 4bafbe2 commit ffb6b43
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

package org.apache.flink.table.planner.plan.optimize.program;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand All @@ -43,6 +47,10 @@
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -116,44 +124,66 @@ private TableStats recomputeStatistics(
return reportStatEnabled
? ((SupportsStatisticReport) tableSource).reportStatistics()
: null;
} else {
} else if (partitionPushDownSpec != null) {
// ignore filter push down if all pushdown predicates are also in outer Filter operator
// otherwise the result will be estimated twice.
if (partitionPushDownSpec != null) {
// partition push down
// try to get the statistics for the remaining partitions
TableStats newTableStat = getPartitionsTableStats(table, partitionPushDownSpec);
// call reportStatistics method if reportStatEnabled is true and the partition
// statistics is unknown
if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
return ((SupportsStatisticReport) tableSource).reportStatistics();
} else {
return newTableStat;
}
// partition push down
// try to get the statistics for the remaining partitions
TableStats newTableStat = getPartitionsTableStats(table, partitionPushDownSpec);
// call reportStatistics method if reportStatEnabled is true and the partition
// statistics is unknown
if (reportStatEnabled && isUnknownTableStats(newTableStat)) {
return ((SupportsStatisticReport) tableSource).reportStatistics();
} else {
// call reportStatistics method if reportStatEnabled is true and the original
// catalog statistics is unknown
if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
return ((SupportsStatisticReport) tableSource).reportStatistics();
} else {
return origTableStats;
}
return newTableStat;
}
} else {
if (isPartitionedTable(table) && isUnknownTableStats(origTableStats)) {
// if table is partition table, try to recompute stats by catalog.
origTableStats = getPartitionsTableStats(table, null);
}
// call reportStatistics method if reportStatEnabled is true and the newTableStats is
// unknown.
if (reportStatEnabled && isUnknownTableStats(origTableStats)) {
return ((SupportsStatisticReport) tableSource).reportStatistics();
} else {
return origTableStats;
}
}
}

private boolean isPartitionedTable(TableSourceTable table) {
return table.contextResolvedTable()
.<ResolvedCatalogTable>getResolvedTable()
.isPartitioned();
}

private boolean isUnknownTableStats(TableStats stats) {
return stats == null || stats.getRowCount() < 0 && stats.getColumnStats().isEmpty();
}

private TableStats getPartitionsTableStats(
TableSourceTable table, PartitionPushDownSpec partitionPushDownSpec) {
TableSourceTable table, @Nullable PartitionPushDownSpec partitionPushDownSpec) {
TableStats newTableStat = null;
if (table.contextResolvedTable().isPermanent()) {
ObjectIdentifier identifier = table.contextResolvedTable().getIdentifier();
ObjectPath tablePath = identifier.toObjectPath();
Catalog catalog = table.contextResolvedTable().getCatalog().get();
for (Map<String, String> partition : partitionPushDownSpec.getPartitions()) {
List<Map<String, String>> partitionList = new ArrayList<>();
if (partitionPushDownSpec == null) {
try {
List<CatalogPartitionSpec> catalogPartitionSpecs =
catalog.listPartitions(tablePath);
for (CatalogPartitionSpec partitionSpec : catalogPartitionSpecs) {
partitionList.add(partitionSpec.getPartitionSpec());
}
} catch (TableNotExistException | TableNotPartitionedException e) {
throw new TableException("Table not exists!", e);
}
} else {
partitionList = partitionPushDownSpec.getPartitions();
}
for (Map<String, String> partition : partitionList) {
Optional<TableStats> partitionStats =
getPartitionStats(catalog, tablePath, partition);
if (!partitionStats.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public void testNoPartitionPushDownAndCatalogStatisticsExist()
false);

FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from PartTable");
// TODO get partition statistics from catalog
assertThat(statistic.getTableStats()).isEqualTo(new TableStats(3));
assertThat(statistic.getTableStats()).isEqualTo(new TableStats(9));
}

@Test
Expand All @@ -219,7 +218,6 @@ public void testNoPartitionPushDownAndReportStatisticsDisabled() {
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED,
false);
FlinkStatistic statistic = getStatisticsFromOptimizedPlan("select * from PartTable");
// TODO get partition statistics from catalog
assertThat(statistic.getTableStats()).isEqualTo(TableStats.UNKNOWN);
}

Expand Down

0 comments on commit ffb6b43

Please sign in to comment.