Skip to content

Commit

Permalink
Spark: Add compute stats to scan builder also (apache#5136)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jun 27, 2022
1 parent 35b8558 commit 313f497
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
SupportsReportStatistics {

private static final Filter[] NO_FILTERS = new Filter[0];

private final SparkSession spark;
Expand Down Expand Up @@ -167,4 +171,14 @@ public Scan buildMergeScan() {
spark, table, readConf, caseSensitive, ignoreResiduals,
schemaWithMetadataColumns(), filterExpressions, options);
}

@Override
public Statistics estimateStatistics() {
return ((SparkBatchScan) build()).estimateStatistics();
}

@Override
public StructType readSchema() {
return build().readSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
SupportsReportStatistics {

private static final Filter[] NO_FILTERS = new Filter[0];

private final SparkSession spark;
Expand Down Expand Up @@ -167,4 +171,14 @@ public Scan buildMergeScan() {
spark, table, readConf, caseSensitive, ignoreResiduals,
schemaWithMetadataColumns(), filterExpressions, options);
}

@Override
public Statistics estimateStatistics() {
return ((SparkBatchScan) build()).estimateStatistics();
}

@Override
public StructType readSchema() {
return build().readSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
SupportsReportStatistics {

private static final Filter[] NO_FILTERS = new Filter[0];

private final SparkSession spark;
Expand Down Expand Up @@ -278,4 +282,14 @@ private TableScan configureSplitPlanning(TableScan scan) {

return configuredScan;
}

@Override
public Statistics estimateStatistics() {
return ((SparkScan) build()).estimateStatistics();
}

@Override
public StructType readSchema() {
return build().readSchema();
}
}

0 comments on commit 313f497

Please sign in to comment.