Skip to content

Commit

Permalink
[SPARK-24479][SS] Added config for registering streamingQueryListeners
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to  "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners.

## How was this patch tested?

New unit test and running example programs.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Arun Mahadevan <[email protected]>

Closes apache#21504 from arunmahadevan/SPARK-24480.
  • Loading branch information
arunmahadevan authored and HyukjinKwon committed Jun 13, 2018
1 parent 4c388bc commit 7703b46
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ object StaticSQLConf {
.toSequence
.createOptional

val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streaming.streamingQueryListeners")
.doc("List of class names implementing StreamingQueryListener that will be automatically " +
"added to newly created sessions. The classes should have either a no-arg constructor, " +
"or a constructor that expects a SparkConf argument.")
.stringConf
.toSequence
.createOptional

val UI_RETAINED_EXECUTIONS =
buildStaticConf("spark.sql.ui.retainedExecutions")
.doc("Number of executions to retain in the Spark UI.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
Expand All @@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger}
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
import org.apache.spark.sql.sources.v2.StreamWriteSupport
import org.apache.spark.util.{Clock, SystemClock, Utils}

Expand All @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
@GuardedBy("awaitTerminationLock")
private var lastTerminatedQuery: StreamingQuery = null

try {
sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames =>
Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
sparkSession.sparkContext.conf).foreach(listener => {
addListener(listener)
logInfo(s"Registered listener ${listener.getClass.getName}")
})
}
} catch {
case e: Exception =>
throw new SparkException("Exception when registering StreamingQueryListener", e)
}

/**
* Returns a list of active queries associated with this SQLContext
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.streaming

import scala.language.reflectiveCalls

import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._


class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter {

import testImplicits._


override protected def sparkConf: SparkConf =
super.sparkConf.set("spark.sql.streaming.streamingQueryListeners",
"org.apache.spark.sql.streaming.TestListener")

test("test if the configured query lister is loaded") {
testStream(MemoryStream[Int].toDS)(
StartStream(),
StopStream
)

assert(TestListener.queryStartedEvent != null)
assert(TestListener.queryTerminatedEvent != null)
}

}

object TestListener {
@volatile var queryStartedEvent: QueryStartedEvent = null
@volatile var queryTerminatedEvent: QueryTerminatedEvent = null
}

class TestListener(sparkConf: SparkConf) extends StreamingQueryListener {

override def onQueryStarted(event: QueryStartedEvent): Unit = {
TestListener.queryStartedEvent = event
}

override def onQueryProgress(event: QueryProgressEvent): Unit = {}

override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
TestListener.queryTerminatedEvent = event
}
}

0 comments on commit 7703b46

Please sign in to comment.