Skip to content

Commit

Permalink
[SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' a…
Browse files Browse the repository at this point in the history
…s milliseconds

## What changes were proposed in this pull request?

This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#21382 from zsxwing/fix-network-timeout-conf.
  • Loading branch information
zsxwing committed May 24, 2018
1 parent 0d89943 commit 53c06dd
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[kafka010] class KafkaMicroBatchReader(

private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)

private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ private[kafka010] class KafkaRelation(

private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", "120s").toString
(sqlContext.sparkContext.conf.getTimeAsSeconds(
"spark.network.timeout",
"120s") * 1000L).toString
).toLong

override def schema: StructType = KafkaOffsetReader.kafkaSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[kafka010] class KafkaSource(

private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
(sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString
).toLong

private val maxOffsetsPerTrigger =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class KafkaRDD[K, V](

// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
conf.getTimeAsMs("spark.network.timeout", "120s"))
conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slave.hostname,
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
slave.shuffleRegistered = true
}
Expand Down

0 comments on commit 53c06dd

Please sign in to comment.