Skip to content

Commit

Permalink
Spark 3.5: Never lower advisory partition size (apache#8667)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Sep 27, 2023
1 parent a1966fe commit 8547faf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,12 @@ private long advisoryPartitionSize(long defaultValue) {
private long advisoryPartitionSize(
long expectedFileSize, FileFormat outputFileFormat, String outputCodec) {
double shuffleCompressionRatio = shuffleCompressionRatio(outputFileFormat, outputCodec);
return (long) (expectedFileSize * shuffleCompressionRatio);
long suggestedAdvisoryPartitionSize = (long) (expectedFileSize * shuffleCompressionRatio);
return Math.max(suggestedAdvisoryPartitionSize, sparkAdvisoryPartitionSize());
}

private long sparkAdvisoryPartitionSize() {
return (long) spark.sessionState().conf().getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES());
}

private double shuffleCompressionRatio(FileFormat outputFileFormat, String outputCodec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE;
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Map;
Expand All @@ -53,6 +54,7 @@
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -74,6 +76,24 @@ public void after() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testAdvisoryPartitionSize() {
Table table = validationCatalog.loadTable(tableIdent);

SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of());

long value1 = writeConf.writeRequirements().advisoryPartitionSize();
assertThat(value1).isGreaterThan(64L * 1024 * 1024).isLessThan(2L * 1024 * 1024 * 1024);

spark.conf().set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "2GB");
long value2 = writeConf.writeRequirements().advisoryPartitionSize();
assertThat(value2).isEqualTo(2L * 1024 * 1024 * 1024);

spark.conf().set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "10MB");
long value3 = writeConf.writeRequirements().advisoryPartitionSize();
assertThat(value3).isGreaterThan(10L * 1024 * 1024);
}

@Test
public void testSparkWriteConfDistributionDefault() {
Table table = validationCatalog.loadTable(tableIdent);
Expand Down

0 comments on commit 8547faf

Please sign in to comment.