From 8329e7debdaf6db9f3a52094bbc5dc4c1e2771ea Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 8 May 2019 11:43:03 +0800 Subject: [PATCH] [SPARK-27649][SS] Unify the way use 'spark.network.timeout' ## What changes were proposed in this pull request? For historical reasons, structured streaming still has some old way of use `spark.network.timeout` , even though `org.apache.spark.internal.config.Network.NETWORK_TIMEOUT` is now available. ## How was this patch tested? Exists UT. Closes #24545 from beliefer/unify-spark-network-timeout. Authored-by: gengjiaan Signed-off-by: Wenchen Fan --- .../apache/spark/sql/kafka010/KafkaMicroBatchStream.scala | 3 ++- .../scala/org/apache/spark/sql/kafka010/KafkaRelation.scala | 5 ++--- .../scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 6972f391f2852..76c7b5deb35e1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -64,7 +65,7 @@ private[kafka010] class KafkaMicroBatchStream( private val pollTimeoutMs = options.getLong( "kafkaConsumer.pollTimeoutMs", - SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) + SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L) private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index 9effa29591102..48cc089e77d51 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow @@ -47,9 +48,7 @@ private[kafka010] class KafkaRelation( private val pollTimeoutMs = sourceOptions.getOrElse( "kafkaConsumer.pollTimeoutMs", - (sqlContext.sparkContext.conf.getTimeAsSeconds( - "spark.network.timeout", - "120s") * 1000L).toString + (sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong override def schema: StructType = KafkaOffsetReader.kafkaSchema diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 82d746ef0fbde..fa93e8f8c4613 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -85,7 +86,7 @@ private[kafka010] class KafkaSource( private val pollTimeoutMs = sourceOptions.getOrElse( "kafkaConsumer.pollTimeoutMs", - (sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString + (sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong private val maxOffsetsPerTrigger =