[SPARK-24479][SS] Added config for registering streamingQueryListeners
authorArun Mahadevan <arunm@apache.org>
Wed, 13 Jun 2018 12:43:16 +0000 (20:43 +0800)
committerhyukjinkwon <gurwls223@apache.org>
Wed, 13 Jun 2018 12:43:16 +0000 (20:43 +0800)
## What changes were proposed in this pull request?

Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to  "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners.

## How was this patch tested?

New unit test and running example programs.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Arun Mahadevan <arunm@apache.org>

Closes #21504 from arunmahadevan/SPARK-24480.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala [new file with mode: 0644]

index fe0ad39..382ef28 100644 (file)
@@ -96,6 +96,14 @@ object StaticSQLConf {
     .toSequence
     .createOptional
 
+  val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streaming.streamingQueryListeners")
+    .doc("List of class names implementing StreamingQueryListener that will be automatically " +
+      "added to newly created sessions. The classes should have either a no-arg constructor, " +
+      "or a constructor that expects a SparkConf argument.")
+    .stringConf
+    .toSequence
+    .createOptional
+
   val UI_RETAINED_EXECUTIONS =
     buildStaticConf("spark.sql.ui.retainedExecutions")
       .doc("Number of executions to retain in the Spark UI.")
index 97da2b1..25bb052 100644 (file)
@@ -24,6 +24,7 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
@@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger}
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
 import org.apache.spark.sql.sources.v2.StreamWriteSupport
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+    sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames =>
+      Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+        sparkSession.sparkContext.conf).foreach(listener => {
+        addListener(listener)
+        logInfo(s"Registered listener ${listener.getClass.getName}")
+      })
+    }
+  } catch {
+    case e: Exception =>
+      throw new SparkException("Exception when registering StreamingQueryListener", e)
+  }
+
   /**
    * Returns a list of active queries associated with this SQLContext
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
new file mode 100644 (file)
index 0000000..1aaf8a9
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.language.reflectiveCalls
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+
+
+class StreamingQueryListenersConfSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+
+
+  override protected def sparkConf: SparkConf =
+    super.sparkConf.set("spark.sql.streaming.streamingQueryListeners",
+      "org.apache.spark.sql.streaming.TestListener")
+
+  test("test if the configured query lister is loaded") {
+    testStream(MemoryStream[Int].toDS)(
+      StartStream(),
+      StopStream
+    )
+
+    assert(TestListener.queryStartedEvent != null)
+    assert(TestListener.queryTerminatedEvent != null)
+  }
+
+}
+
+object TestListener {
+  @volatile var queryStartedEvent: QueryStartedEvent = null
+  @volatile var queryTerminatedEvent: QueryTerminatedEvent = null
+}
+
+class TestListener(sparkConf: SparkConf) extends StreamingQueryListener {
+
+  override def onQueryStarted(event: QueryStartedEvent): Unit = {
+    TestListener.queryStartedEvent = event
+  }
+
+  override def onQueryProgress(event: QueryProgressEvent): Unit = {}
+
+  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
+    TestListener.queryTerminatedEvent = event
+  }
+}