diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 6972f391f2852..76c7b5deb35e1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow @@ -64,7 +65,7 @@ private[kafka010] class KafkaMicroBatchStream( private val pollTimeoutMs = options.getLong( "kafkaConsumer.pollTimeoutMs", - SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) + SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L) private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index 9effa29591102..48cc089e77d51 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow @@ -47,9 +48,7 @@ private[kafka010] class KafkaRelation( private val pollTimeoutMs = sourceOptions.getOrElse( "kafkaConsumer.pollTimeoutMs", - (sqlContext.sparkContext.conf.getTimeAsSeconds( - "spark.network.timeout", - "120s") * 1000L).toString + (sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong override def schema: StructType = KafkaOffsetReader.kafkaSchema diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 82d746ef0fbde..fa93e8f8c4613 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -85,7 +86,7 @@ private[kafka010] class KafkaSource( private val pollTimeoutMs = sourceOptions.getOrElse( "kafkaConsumer.pollTimeoutMs", - (sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString + (sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong private val maxOffsetsPerTrigger =