Skip to content

Commit

Permalink
Spark 3.2: Fix a separate table cache being created for each rewriteF…
Browse files Browse the repository at this point in the history
…iles (apache#6284)
  • Loading branch information
manuzhang authored Dec 13, 2022
1 parent 548028b commit 4e6af36
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,7 +97,9 @@ public class RewriteDataFilesSparkAction
private RewriteStrategy strategy = null;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark);
super(spark.cloneSession());
// Disable Adaptive Query Execution as this may change the output partitioning of our write
spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
this.table = table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;

public class SparkBinPackStrategy extends BinPackStrategy {
private final Table table;
Expand All @@ -60,12 +59,8 @@ public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);

// Disable Adaptive Query Execution as this may change the output partitioning of our write
SparkSession cloneSession = spark.cloneSession();
cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);

Dataset<Row> scanDF =
cloneSession
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,26 +119,22 @@ public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);

// Disable Adaptive Query Execution as this may change the output partitioning of our write
SparkSession cloneSession = spark.cloneSession();
cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);

// Reset Shuffle Partitions for our sort
long numOutputFiles =
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));

Dataset<Row> scanDF =
cloneSession
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
.load(groupID);

// write the packed data into new files where each split becomes a new file
SQLConf sqlConf = cloneSession.sessionState().conf();
SQLConf sqlConf = spark.sessionState().conf();
LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, scanDF.encoder());

sortedDf
.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,16 @@ public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
tableCache().add(groupID, table());
manager().stageTasks(table(), groupID, filesToRewrite);

// Disable Adaptive Query Execution as this may change the output partitioning of our write
SparkSession cloneSession = spark().cloneSession();
cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
// spark session from parent
SparkSession spark = spark();

// Reset Shuffle Partitions for our sort
long numOutputFiles =
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple()));
cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));

Dataset<Row> scanDF =
cloneSession
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
Expand All @@ -235,9 +234,9 @@ public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {

Dataset<Row> zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray));

SQLConf sqlConf = cloneSession.sessionState().conf();
SQLConf sqlConf = spark.sessionState().conf();
LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf);
Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder());
Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, zvalueDF.encoder());
sortedDf
.select(originalColumns)
.write()
Expand Down

0 comments on commit 4e6af36

Please sign in to comment.