Skip to content

Commit

Permalink
[Spark] Support building against both Spark 3.0 and Spark 3.1. (apach…
Browse files Browse the repository at this point in the history
…e#2512)

Code changes that allow spark3 and spark3-extensions to be tested against
both Spark 3.0 and Spark 3.1 while still built against a single Spark 3.0 version.

Although additional tests are are created we still only produce a single set of Spark3 binaries which 
are compatible with Spark 3.0 and 3.1
  • Loading branch information
wypoon authored Jun 24, 2021
1 parent 92a264b commit 111fe81
Show file tree
Hide file tree
Showing 18 changed files with 314 additions and 47 deletions.
98 changes: 89 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ allprojects {
mavenCentral()
mavenLocal()
}
project.ext {
Spark30Version = '3.0.1'
Spark31Version = '3.1.1'
}
}

subprojects {
Expand Down Expand Up @@ -977,6 +981,21 @@ if (jdkVersion == '8') {
}

project(':iceberg-spark3') {
sourceSets {
// Compile test source against Spark 3.1 and main classes compiled against Spark 3.0
spark31 {
java.srcDir "$projectDir/src/test/java"
resources.srcDir "$projectDir/src/test/resources"
compileClasspath += sourceSets.test.output + sourceSets.main.output
runtimeClasspath += sourceSets.test.output
}
}

configurations {
spark31Implementation.extendsFrom testImplementation
spark31RuntimeOnly.extendsFrom testRuntimeOnly
}

dependencies {
compile project(':iceberg-api')
compile project(':iceberg-common')
Expand All @@ -989,7 +1008,7 @@ project(':iceberg-spark3') {
compile project(':iceberg-spark')

compileOnly "org.apache.avro:avro"
compileOnly("org.apache.spark:spark-hive_2.12") {
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
}
Expand All @@ -1003,9 +1022,14 @@ project(':iceberg-spark3') {
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile "org.xerial:sqlite-jdbc"

spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
}
}

test {
tasks.withType(Test) {
// For vectorized reads
// Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
systemProperty("arrow.enable_unsafe_memory_access", "true")
Expand All @@ -1014,16 +1038,39 @@ project(':iceberg-spark3') {
systemProperty("arrow.enable_null_check_for_get", "false")

// Vectorized reads need more memory
maxHeapSize '2500m'
maxHeapSize '2560m'
}

task testSpark31(type: Test) {
dependsOn classes
description = "Test against Spark 3.1"
testClassesDirs = sourceSets.spark31.output.classesDirs
classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
}

test.dependsOn testSpark31
}

project(":iceberg-spark3-extensions") {
apply plugin: 'java'
apply plugin: 'scala'
apply plugin: 'antlr'

sourceSets {
// Compile test source against Spark 3.1 and main classes compiled against Spark 3.0
spark31 {
// Main source is in scala, but test source is only in java
java.srcDir "$projectDir/src/test/java"
resources.srcDir "$projectDir/src/test/resources"
compileClasspath += sourceSets.test.output + sourceSets.main.output
runtimeClasspath += sourceSets.test.output
}
}

configurations {
spark31Implementation.extendsFrom testImplementation
spark31RuntimeOnly.extendsFrom testRuntimeOnly

/*
The Gradle Antlr plugin erroneously adds both antlr-build and runtime dependencies to the runtime path. This
bug https://github.com/gradle/gradle/issues/820 exists because older versions of Antlr do not have separate
Expand All @@ -1037,10 +1084,9 @@ project(":iceberg-spark3-extensions") {
}

dependencies {
compileOnly project(':iceberg-spark3')

compileOnly "org.scala-lang:scala-library"
compileOnly("org.apache.spark:spark-hive_2.12") {
compileOnly project(':iceberg-spark3')
compileOnly("org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
}
Expand All @@ -1050,6 +1096,11 @@ project(":iceberg-spark3-extensions") {
testCompile project(path: ':iceberg-spark', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-spark3', configuration: 'testArtifacts')

spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
}

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
// We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
runtime "org.antlr:antlr4-runtime:4.7.1"
Expand All @@ -1060,6 +1111,15 @@ project(":iceberg-spark3-extensions") {
maxHeapSize = "64m"
arguments += ['-visitor', '-package', 'org.apache.spark.sql.catalyst.parser.extensions']
}

task testSpark31(type: Test) {
dependsOn classes
description = "Test against Spark 3.1"
testClassesDirs = sourceSets.spark31.output.classesDirs
classpath = sourceSets.spark31.runtimeClasspath + sourceSets.main.output
}

test.dependsOn testSpark31
}

project(':iceberg-spark3-runtime') {
Expand All @@ -1072,6 +1132,12 @@ project(':iceberg-spark3-runtime') {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
}
spark31 {
java.srcDir "$projectDir/src/integration/java"
resources.srcDir "$projectDir/src/integration/resources"
compileClasspath += sourceSets.integration.output
runtimeClasspath += sourceSets.integration.output
}
}

configurations {
Expand All @@ -1086,6 +1152,8 @@ project(':iceberg-spark3-runtime') {
exclude group: 'javax.xml.bind'
exclude group: 'javax.annotation'
}
spark31Implementation.extendsFrom integrationImplementation
spark31CompileOnly.extendsFrom integrationCompileOnly
}

dependencies {
Expand All @@ -1096,7 +1164,7 @@ project(':iceberg-spark3-runtime') {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}

integrationImplementation 'org.apache.spark:spark-hive_2.12'
integrationImplementation "org.apache.spark:spark-hive_2.12:${project.ext.Spark30Version}"
integrationImplementation 'junit:junit'
integrationImplementation 'org.slf4j:slf4j-simple'
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
Expand All @@ -1107,6 +1175,8 @@ project(':iceberg-spark3-runtime') {
// Not allowed on our classpath, only the runtime jar is allowed
integrationCompileOnly project(':iceberg-spark3-extensions')
integrationCompileOnly project(':iceberg-spark3')

spark31Implementation "org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}"
}

shadowJar {
Expand Down Expand Up @@ -1144,14 +1214,24 @@ project(':iceberg-spark3-runtime') {
}

task integrationTest(type: Test) {
description = "Test Spark3 Runtime Jar"
description = "Test Spark3 Runtime Jar against Spark 3.0"
group = "verification"
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
inputs.file(shadowJar.archiveFile.get().asFile.path)
}
integrationTest.dependsOn shadowJar
check.dependsOn integrationTest

task spark31IntegrationTest(type: Test) {
dependsOn classes
description = "Test Spark3 Runtime Jar against Spark 3.1"
group = "verification"
testClassesDirs = sourceSets.spark31.output.classesDirs
classpath = sourceSets.spark31.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
}
spark31IntegrationTest.dependsOn shadowJar

check.dependsOn integrationTest, spark31IntegrationTest

jar {
enabled = false
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private DateTimeUtil() {

public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
public static final long MICROS_PER_MILLIS = 1000L;

public static LocalDate dateFromDays(int daysFromEpoch) {
return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
Expand Down Expand Up @@ -66,6 +67,13 @@ public static long microsFromTimestamp(LocalDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
}

public static long microsToMillis(long micros) {
// When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion.
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
// In millis precision the above needs to be represented as (-157700927877).
return Math.floorDiv(micros, MICROS_PER_MILLIS);
}

public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
// analyzer extensions
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectPostHocResolutionRule { spark => AlignRowLevelOperations(spark.sessionState.conf)}
extensions.injectPostHocResolutionRule { spark => AlignRowLevelOperations }
extensions.injectCheckRule { _ => RowLevelOperationsPredicateCheck }

// optimizer extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.internal.SQLConf

case class AlignRowLevelOperations(conf: SQLConf) extends Rule[LogicalPlan] with AssignmentAlignmentSupport {
case object AlignRowLevelOperations extends Rule[LogicalPlan]
with AssignmentAlignmentSupport with CastSupport {

override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UpdateTable if u.resolved && isIcebergRelation(u.table)=>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper.createAlias
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import scala.collection.mutable

trait AssignmentAlignmentSupport extends CastSupport {
trait AssignmentAlignmentSupport {

def conf: SQLConf

private case class ColumnUpdate(ref: Seq[String], expr: Expression)

Expand Down Expand Up @@ -96,7 +100,7 @@ trait AssignmentAlignmentSupport extends CastSupport {
case StructType(fields) =>
// build field expressions
val fieldExprs = fields.zipWithIndex.map { case (field, ordinal) =>
Alias(GetStructField(col, ordinal, Some(field.name)), field.name)()
createAlias(GetStructField(col, ordinal, Some(field.name)), field.name)
}

// recursively apply this method on nested fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.Not
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.catalyst.plans.logical.Filter
Expand All @@ -38,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
import org.apache.spark.sql.catalyst.plans.logical.Sort
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.catalyst.utils.RewriteRowLevelOperationHelper
import org.apache.spark.sql.connector.catalog.Table
Expand All @@ -52,6 +52,9 @@ case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with Rew

import ExtendedDataSourceV2Implicits._
import RewriteRowLevelOperationHelper._
import DistributionAndOrderingUtils._

override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// don't rewrite deletes that can be answered by passing filters to deleteWhere in SupportsDelete
Expand All @@ -66,7 +69,7 @@ case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with Rew
val mergeBuilder = r.table.asMergeable.newMergeBuilder("delete", writeInfo)

val matchingRowsPlanBuilder = scanRelation => Filter(cond, scanRelation)
val scanPlan = buildDynamicFilterScanPlan(spark, r.table, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)
val scanPlan = buildDynamicFilterScanPlan(spark, r, r.output, mergeBuilder, cond, matchingRowsPlanBuilder)

val remainingRowFilter = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
val remainingRowsPlan = Filter(remainingRowFilter, scanPlan)
Expand All @@ -91,11 +94,11 @@ case class RewriteDelete(spark: SparkSession) extends Rule[LogicalPlan] with Rew
remainingRowsPlan
case _ =>
// apply hash partitioning by file if the distribution mode is hash or range
val numShufflePartitions = SQLConf.get.numShufflePartitions
val numShufflePartitions = conf.numShufflePartitions
RepartitionByExpression(Seq(fileNameCol), remainingRowsPlan, numShufflePartitions)
}

val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))
val order = Seq(createSortOrder(fileNameCol, Ascending), createSortOrder(rowPosCol, Ascending))
val sort = Sort(order, global = false, planWithDistribution)
Project(output, sort)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implici
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.BooleanType

case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with RewriteRowLevelOperationHelper {
import ExtendedDataSourceV2Implicits._
import RewriteMergeInto._
import RewriteRowLevelOperationHelper._

override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transform {
Expand All @@ -79,7 +82,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with

val outputExprs = insertAction.assignments.map(_.value)
val outputColNames = target.output.map(_.name)
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) => Alias(expr, name)() }
val outputCols = outputExprs.zip(outputColNames).map { case (expr, name) => createAlias(expr, name) }
val mergePlan = Project(outputCols, joinPlan)

val writePlan = buildWritePlan(mergePlan, target.table)
Expand Down Expand Up @@ -121,7 +124,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with

// when there are no not-matched actions, use a right outer join to ignore source rows that do not match, but
// keep all unmatched target rows that must be preserved.
val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, ROW_FROM_SOURCE)())
val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
val newSourceTableScan = Project(sourceTableProj, source)
val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder, target, source, cond, matchedActions)
val joinPlan = Join(newSourceTableScan, targetTableScan, RightOuter, Some(cond), JoinHint.NONE)
Expand Down Expand Up @@ -151,10 +154,10 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
val (matchedConditions, matchedOutputs) = rewriteMatchedActions(matchedActions, target.output)

// use a full outer join because there are both matched and not matched actions
val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL, ROW_FROM_SOURCE)())
val sourceTableProj = source.output ++ Seq(createAlias(TRUE_LITERAL, ROW_FROM_SOURCE))
val newSourceTableScan = Project(sourceTableProj, source)
val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder, target, source, cond, matchedActions)
val targetTableProj = targetTableScan.output ++ Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
val targetTableProj = targetTableScan.output ++ Seq(createAlias(TRUE_LITERAL, ROW_FROM_TARGET))
val newTargetTableScan = Project(targetTableProj, targetTableScan)
val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)

Expand Down Expand Up @@ -202,7 +205,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
val output = target.output
val matchingRowsPlanBuilder = rel => Join(source, rel, Inner, Some(cond), JoinHint.NONE)
val runCardinalityCheck = isCardinalityCheckEnabled(table) && isCardinalityCheckNeeded(matchedActions)
buildDynamicFilterScanPlan(spark, table, output, mergeBuilder, cond, matchingRowsPlanBuilder, runCardinalityCheck)
buildDynamicFilterScanPlan(spark, target, output, mergeBuilder, cond, matchingRowsPlanBuilder, runCardinalityCheck)
}

private def rewriteMatchedActions(
Expand Down
Loading

0 comments on commit 111fe81

Please sign in to comment.