Skip to content

Commit

Permalink
Spark 3.5: Add distibuted planning benchmarks (apache#8594)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Sep 20, 2023
1 parent da2ad38 commit 6dee8d6
Showing 1 changed file with 88 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark;

import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.spark.sql.functions.lit;

import com.google.errorprone.annotations.FormatMethod;
Expand All @@ -35,16 +37,19 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SparkDistributedDataScan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
Expand Down Expand Up @@ -128,28 +133,88 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void localPlanningWithPartitionAndMinMaxFilter(Blackhole blackhole) {
List<ScanTask> fileTasks = planFilesWithoutColumnStats(PARTITION_AND_SORT_KEY_PREDICATE);
BatchScan scan = table.newBatchScan();
List<ScanTask> fileTasks = planFilesWithoutColumnStats(scan, PARTITION_AND_SORT_KEY_PREDICATE);
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void distributedPlanningWithPartitionAndMinMaxFilter(Blackhole blackhole) {
BatchScan scan = newDistributedScan(DISTRIBUTED, DISTRIBUTED);
List<ScanTask> fileTasks = planFilesWithoutColumnStats(scan, PARTITION_AND_SORT_KEY_PREDICATE);
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningWithMinMaxFilter(Blackhole blackhole) {
List<ScanTask> fileTasks = planFilesWithoutColumnStats(SORT_KEY_PREDICATE);
BatchScan scan = table.newBatchScan();
List<ScanTask> fileTasks = planFilesWithoutColumnStats(scan, SORT_KEY_PREDICATE);
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void distributedPlanningWithMinMaxFilter(Blackhole blackhole) {
BatchScan scan = newDistributedScan(DISTRIBUTED, DISTRIBUTED);
List<ScanTask> fileTasks = planFilesWithoutColumnStats(scan, SORT_KEY_PREDICATE);
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningWithoutFilter(Blackhole blackhole) {
List<ScanTask> fileTasks = planFilesWithoutColumnStats(Expressions.alwaysTrue());
BatchScan scan = table.newBatchScan();
List<ScanTask> fileTasks = planFilesWithoutColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void distributedPlanningWithoutFilter(Blackhole blackhole) {
BatchScan scan = newDistributedScan(DISTRIBUTED, DISTRIBUTED);
List<ScanTask> fileTasks = planFilesWithoutColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningWithoutFilterWithStats(Blackhole blackhole) {
List<ScanTask> fileTasks = planFilesWithColumnStats(Expressions.alwaysTrue());
BatchScan scan = table.newBatchScan();
List<ScanTask> fileTasks = planFilesWithColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void distributedPlanningWithoutFilterWithStats(Blackhole blackhole) {
BatchScan scan = newDistributedScan(DISTRIBUTED, DISTRIBUTED);
List<ScanTask> fileTasks = planFilesWithColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void distributedDataLocalDeletesPlanningWithoutFilterWithStats(Blackhole blackhole) {
BatchScan scan = newDistributedScan(DISTRIBUTED, LOCAL);
List<ScanTask> fileTasks = planFilesWithColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localDataDistributedDeletesPlanningWithoutFilterWithStats(Blackhole blackhole) {
BatchScan scan = newDistributedScan(LOCAL, DISTRIBUTED);
List<ScanTask> fileTasks = planFilesWithColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningViaDistributedScanWithoutFilterWithStats(Blackhole blackhole) {
BatchScan scan = newDistributedScan(LOCAL, LOCAL);
List<ScanTask> fileTasks = planFilesWithColumnStats(scan, Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

Expand All @@ -158,6 +223,7 @@ private void setupSpark() {
SparkSession.builder()
.config("spark.ui.enabled", false)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.driver.maxResultSize", "8G")
.config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName())
.config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName())
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
Expand Down Expand Up @@ -312,30 +378,40 @@ private Dataset<Row> randomDataDF(Schema schema, int numRows) {
return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false);
}

private List<ScanTask> planFilesWithoutColumnStats(Expression predicate) {
return planFiles(predicate, false);
private List<ScanTask> planFilesWithoutColumnStats(BatchScan scan, Expression predicate) {
return planFiles(scan, predicate, false);
}

private List<ScanTask> planFilesWithColumnStats(Expression predicate) {
return planFiles(predicate, true);
private List<ScanTask> planFilesWithColumnStats(BatchScan scan, Expression predicate) {
return planFiles(scan, predicate, true);
}

private List<ScanTask> planFiles(Expression predicate, boolean withColumnStats) {
private List<ScanTask> planFiles(BatchScan scan, Expression predicate, boolean withColumnStats) {
table.refresh();

BatchScan scan = table.newBatchScan().filter(predicate);
BatchScan configuredScan = scan.filter(predicate);

if (withColumnStats) {
scan.includeColumnStats();
configuredScan = scan.includeColumnStats();
}

try (CloseableIterable<ScanTask> fileTasks = scan.planFiles()) {
try (CloseableIterable<ScanTask> fileTasks = configuredScan.planFiles()) {
return Lists.newArrayList(fileTasks);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private BatchScan newDistributedScan(PlanningMode dataMode, PlanningMode deleteMode) {
table
.updateProperties()
.set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
.commit();
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
return new SparkDistributedDataScan(spark, table, readConf);
}

@FormatMethod
private void sql(@FormatString String query, Object... args) {
spark.sql(String.format(query, args));
Expand Down

0 comments on commit 6dee8d6

Please sign in to comment.