Skip to content

Commit

Permalink
[SPARK-33916][CORE] Fix fallback storage offset and improve compressi…
Browse files Browse the repository at this point in the history
…on codec test coverage

### What changes were proposed in this pull request?

This PR aims to fix offset bug and improve compression codec test coverage.

### Why are the changes needed?

When the user choose a non-default codec, it causes a failure.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the extended test suite.

Closes apache#30934 from dongjoon-hyun/SPARK-33916.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 29, 2020
1 parent 0617dfc commit 6497ccb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ object FallbackStorage extends Logging {
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
val f = fallbackFileSystem.open(dataFile)
val size = nextOffset - 1 - offset
val size = nextOffset - offset
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
test("fallback storage APIs - copy/exists") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")
.set(SHUFFLE_COMPRESS, false)
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
Expand Down Expand Up @@ -227,43 +228,45 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("Newly added executors should access old data from remote storage") {
sc = new SparkContext(getSparkConf(2, 0))
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(x => (x % 2, 1))
val rdd3 = rdd2.reduceByKey(_ + _)
assert(rdd3.collect() === Array((0, 5), (1, 5)))
Seq("lz4", "lzf", "snappy", "zstd").foreach { codec =>
test(s"$codec - Newly added executors should access old data from remote storage") {
sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec))
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(x => (x % 2, 1))
val rdd3 = rdd2.reduceByKey(_ + _)
assert(rdd3.collect() === Array((0, 5), (1, 5)))

// Decommission all
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
sc.getExecutorIds().foreach {
sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
}

// Decommission all
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
sc.getExecutorIds().foreach {
sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
}
// Make it sure that fallback storage are ready
val fallbackStorage = new FallbackStorage(sc.getConf)
eventually(timeout(10.seconds), interval(1.seconds)) {
Seq(
"shuffle_0_0_0.index", "shuffle_0_0_0.data",
"shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file =>
assert(fallbackStorage.exists(0, file))
}
}

// Make it sure that fallback storage are ready
val fallbackStorage = new FallbackStorage(sc.getConf)
eventually(timeout(10.seconds), interval(1.seconds)) {
Seq(
"shuffle_0_0_0.index", "shuffle_0_0_0.data",
"shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file =>
assert(fallbackStorage.exists(0, file))
// Since the data is safe, force to shrink down to zero executor
sc.getExecutorIds().foreach { id =>
sched.killExecutor(id)
}
eventually(timeout(20.seconds), interval(1.seconds)) {
assert(sc.getExecutorIds().isEmpty)
}
}

// Since the data is safe, force to shrink down to zero executor
sc.getExecutorIds().foreach { id =>
sched.killExecutor(id)
}
eventually(timeout(20.seconds), interval(1.seconds)) {
assert(sc.getExecutorIds().isEmpty)
// Dynamic allocation will start new executors
assert(rdd3.collect() === Array((0, 5), (1, 5)))
assert(rdd3.sortByKey().count() == 2)
assert(sc.getExecutorIds().nonEmpty)
}

// Dynamic allocation will start new executors
assert(rdd3.collect() === Array((0, 5), (1, 5)))
assert(rdd3.sortByKey().count() == 2)
assert(sc.getExecutorIds().nonEmpty)
}
}
}

0 comments on commit 6497ccb

Please sign in to comment.