Skip to content

Commit

Permalink
Spark: Fix vectorization flags (apache#2248)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Mar 6, 2021
1 parent 030144e commit 5099e1e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 21 deletions.
52 changes: 41 additions & 11 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.Filter;
Expand Down Expand Up @@ -81,6 +82,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

private final Table table;
private final DataSourceOptions options;
private final Long snapshotId;
private final Long startSnapshotId;
private final Long endSnapshotId;
Expand All @@ -95,7 +97,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
private final boolean localityPreferred;
private final boolean batchReadsEnabled;
private final int batchSize;

// lazy variables
Expand All @@ -107,6 +108,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
Reader(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
boolean caseSensitive, DataSourceOptions options) {
this.table = table;
this.options = options;
this.snapshotId = options.get(SparkReadOptions.SNAPSHOT_ID).map(Long::parseLong).orElse(null);
this.asOfTimestamp = options.get(SparkReadOptions.AS_OF_TIMESTAMP).map(Long::parseLong).orElse(null);
if (snapshotId != null && asOfTimestamp != null) {
Expand Down Expand Up @@ -157,16 +159,6 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
this.io = io;
this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
String batchReadsSessionConf = SparkSession.active().conf()
.get("spark.sql.iceberg.vectorization.enabled", null);
if (batchReadsSessionConf != null) {
this.batchReadsEnabled = Boolean.valueOf(batchReadsSessionConf);
} else {
this.batchReadsEnabled = options.get(SparkReadOptions.VECTORIZATION_ENABLED)
.map(Boolean::parseBoolean).orElseGet(() ->
PropertyUtil.propertyAsBoolean(table.properties(), TableProperties.PARQUET_VECTORIZATION_ENABLED,
TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT));
}
this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() ->
PropertyUtil.propertyAsInt(table.properties(),
TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
Expand Down Expand Up @@ -338,12 +330,50 @@ public boolean enableBatchRead() {

boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);

boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}
return readUsingBatch;
}

private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) {
if (isParquetOnly) {
return isVectorizationEnabled(FileFormat.PARQUET);
} else if (isOrcOnly) {
return isVectorizationEnabled(FileFormat.ORC);
} else {
return false;
}
}

public boolean isVectorizationEnabled(FileFormat fileFormat) {
String readOptionValue = options.get(SparkReadOptions.VECTORIZATION_ENABLED).orElse(null);
if (readOptionValue != null) {
return Boolean.parseBoolean(readOptionValue);
}

RuntimeConfig sessionConf = SparkSession.active().conf();
String sessionConfValue = sessionConf.get("spark.sql.iceberg.vectorization.enabled", null);
if (sessionConfValue != null) {
return Boolean.parseBoolean(sessionConfValue);
}

switch (fileFormat) {
case PARQUET:
return PropertyUtil.propertyAsBoolean(
table.properties(),
TableProperties.PARQUET_VECTORIZATION_ENABLED,
TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
case ORC:
// TODO: add a table property to enable/disable ORC vectorized reads
return false;
default:
return false;
}
}

private static void mergeIcebergHadoopConfs(
Configuration baseConf, Map<String, String> options) {
options.keySet().stream()
Expand Down
36 changes: 28 additions & 8 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.iceberg.util.SortOrderUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
Expand Down Expand Up @@ -474,15 +476,33 @@ public static boolean isLocalityEnabled(FileIO io, String location, CaseInsensit
return false;
}

public static boolean isVectorizationEnabled(Map<String, String> properties, CaseInsensitiveStringMap readOptions) {
String batchReadsSessionConf = SparkSession.active().conf()
.get("spark.sql.iceberg.vectorization.enabled", null);
if (batchReadsSessionConf != null) {
return Boolean.valueOf(batchReadsSessionConf);
public static boolean isVectorizationEnabled(FileFormat fileFormat,
Map<String, String> properties,
RuntimeConfig sessionConf,
CaseInsensitiveStringMap readOptions) {

String readOptionValue = readOptions.get(SparkReadOptions.VECTORIZATION_ENABLED);
if (readOptionValue != null) {
return Boolean.parseBoolean(readOptionValue);
}

String sessionConfValue = sessionConf.get("spark.sql.iceberg.vectorization.enabled", null);
if (sessionConfValue != null) {
return Boolean.parseBoolean(sessionConfValue);
}

switch (fileFormat) {
case PARQUET:
return PropertyUtil.propertyAsBoolean(
properties,
TableProperties.PARQUET_VECTORIZATION_ENABLED,
TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT);
case ORC:
// TODO: add a table property to enable/disable ORC vectorized reads
return false;
default:
return false;
}
return readOptions.getBoolean(SparkReadOptions.VECTORIZATION_ENABLED,
PropertyUtil.propertyAsBoolean(properties,
TableProperties.PARQUET_VECTORIZATION_ENABLED, TableProperties.PARQUET_VECTORIZATION_ENABLED_DEFAULT));
}

public static int batchSize(Map<String, String> properties, CaseInsensitiveStringMap readOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
Expand All @@ -42,6 +43,8 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
Expand All @@ -66,8 +69,8 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private final List<Expression> filterExpressions;
private final Broadcast<FileIO> io;
private final Broadcast<EncryptionManager> encryptionManager;
private final boolean batchReadsEnabled;
private final int batchSize;
private final CaseInsensitiveStringMap options;

// lazy variables
private StructType readSchema = null;
Expand All @@ -82,8 +85,8 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters : Collections.emptyList();
this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options);
this.batchReadsEnabled = Spark3Util.isVectorizationEnabled(table.properties(), options);
this.batchSize = Spark3Util.batchSize(table.properties(), options);
this.options = options;
}

protected Table table() {
Expand Down Expand Up @@ -156,12 +159,26 @@ public PartitionReaderFactory createReaderFactory() {

boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);

boolean batchReadsEnabled = batchReadsEnabled(allParquetFileScanTasks, allOrcFileScanTasks);

boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));

return new ReaderFactory(readUsingBatch ? batchSize : 0);
}

private boolean batchReadsEnabled(boolean isParquetOnly, boolean isOrcOnly) {
Map<String, String> properties = table.properties();
RuntimeConfig sessionConf = SparkSession.active().conf();
if (isParquetOnly) {
return Spark3Util.isVectorizationEnabled(FileFormat.PARQUET, properties, sessionConf, options);
} else if (isOrcOnly) {
return Spark3Util.isVectorizationEnabled(FileFormat.ORC, properties, sessionConf, options);
} else {
return false;
}
}

@Override
public Statistics estimateStatistics() {
// its a fresh table, no data
Expand Down

0 comments on commit 5099e1e

Please sign in to comment.