Skip to content

Commit

Permalink
[SPARK-27649][SS] Unify the way use 'spark.network.timeout'
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

For historical reasons, structured streaming still has some old way of use
`spark.network.timeout`
, even though
`org.apache.spark.internal.config.Network.NETWORK_TIMEOUT`
is now available.

## How was this patch tested?
Exists UT.

Closes apache#24545 from beliefer/unify-spark-network-timeout.

Authored-by: gengjiaan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
beliefer authored and cloud-fan committed May 8, 2019
1 parent 83f628b commit 8329e7d
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -64,7 +65,7 @@ private[kafka010] class KafkaMicroBatchStream(

private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L)

private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import org.apache.kafka.common.TopicPartition

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -47,9 +48,7 @@ private[kafka010] class KafkaRelation(

private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
(sqlContext.sparkContext.conf.getTimeAsSeconds(
"spark.network.timeout",
"120s") * 1000L).toString
(sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString
).toLong

override def schema: StructType = KafkaOffsetReader.kafkaSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -85,7 +86,7 @@ private[kafka010] class KafkaSource(

private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
(sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString
(sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString
).toLong

private val maxOffsetsPerTrigger =
Expand Down

0 comments on commit 8329e7d

Please sign in to comment.