Skip to content

Commit

Permalink
[SPARK-6328][PYTHON] Python API for StreamingListener
Browse files Browse the repository at this point in the history
Author: Daniel Jalova <[email protected]>

Closes apache#9186 from djalova/SPARK-6328.
  • Loading branch information
djalova authored and tdas committed Nov 16, 2015
1 parent de5e531 commit ace0db4
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 2 deletions.
3 changes: 2 additions & 1 deletion python/pyspark/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@

from pyspark.streaming.context import StreamingContext
from pyspark.streaming.dstream import DStream
from pyspark.streaming.listener import StreamingListener

__all__ = ['StreamingContext', 'DStream']
__all__ = ['StreamingContext', 'DStream', 'StreamingListener']
8 changes: 8 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,11 @@ def union(self, *dstreams):
first = dstreams[0]
jrest = [d._jdstream for d in dstreams[1:]]
return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer)

def addStreamingListener(self, streamingListener):
"""
Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
receiving system events related to streaming.
"""
self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(
self._jvm.PythonStreamingListenerWrapper(streamingListener)))
75 changes: 75 additions & 0 deletions python/pyspark/streaming/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

__all__ = ["StreamingListener"]


class StreamingListener(object):

def __init__(self):
pass

def onReceiverStarted(self, receiverStarted):
"""
Called when a receiver has been started
"""
pass

def onReceiverError(self, receiverError):
"""
Called when a receiver has reported an error
"""
pass

def onReceiverStopped(self, receiverStopped):
"""
Called when a receiver has been stopped
"""
pass

def onBatchSubmitted(self, batchSubmitted):
"""
Called when a batch of jobs has been submitted for processing.
"""
pass

def onBatchStarted(self, batchStarted):
"""
Called when processing of a batch of jobs has started.
"""
pass

def onBatchCompleted(self, batchCompleted):
"""
Called when processing of a batch of jobs has completed.
"""
pass

def onOutputOperationStarted(self, outputOperationStarted):
"""
Called when processing of a job of a batch has started.
"""
pass

def onOutputOperationCompleted(self, outputOperationCompleted):
"""
Called when processing of a job of a batch has completed
"""
pass

class Java:
implements = ["org.apache.spark.streaming.api.java.PythonStreamingListener"]
126 changes: 125 additions & 1 deletion python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from pyspark.streaming.flume import FlumeUtils
from pyspark.streaming.mqtt import MQTTUtils
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.streaming.listener import StreamingListener


class PySparkStreamingTestCase(unittest.TestCase):
Expand Down Expand Up @@ -403,6 +404,128 @@ def func(dstream):
self._test_func(input, func, expected)


class StreamingListenerTests(PySparkStreamingTestCase):

duration = .5

class BatchInfoCollector(StreamingListener):

def __init__(self):
super(StreamingListener, self).__init__()
self.batchInfosCompleted = []
self.batchInfosStarted = []
self.batchInfosSubmitted = []

def onBatchSubmitted(self, batchSubmitted):
self.batchInfosSubmitted.append(batchSubmitted.batchInfo())

def onBatchStarted(self, batchStarted):
self.batchInfosStarted.append(batchStarted.batchInfo())

def onBatchCompleted(self, batchCompleted):
self.batchInfosCompleted.append(batchCompleted.batchInfo())

def test_batch_info_reports(self):
batch_collector = self.BatchInfoCollector()
self.ssc.addStreamingListener(batch_collector)
input = [[1], [2], [3], [4]]

def func(dstream):
return dstream.map(int)
expected = [[1], [2], [3], [4]]
self._test_func(input, func, expected)

batchInfosSubmitted = batch_collector.batchInfosSubmitted
batchInfosStarted = batch_collector.batchInfosStarted
batchInfosCompleted = batch_collector.batchInfosCompleted

self.wait_for(batchInfosCompleted, 4)

self.assertGreaterEqual(len(batchInfosSubmitted), 4)
for info in batchInfosSubmitted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
self.assertGreaterEqual(info.submissionTime(), 0)

for streamId in info.streamIdToInputInfo():
streamInputInfo = info.streamIdToInputInfo()[streamId]
self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
self.assertGreaterEqual(streamInputInfo.numRecords, 0)
for key in streamInputInfo.metadata():
self.assertIsNotNone(streamInputInfo.metadata()[key])
self.assertIsNotNone(streamInputInfo.metadataDescription())

for outputOpId in info.outputOperationInfos():
outputInfo = info.outputOperationInfos()[outputOpId]
self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
self.assertGreaterEqual(outputInfo.id(), 0)
self.assertIsNotNone(outputInfo.name())
self.assertIsNotNone(outputInfo.description())
self.assertGreaterEqual(outputInfo.startTime(), -1)
self.assertGreaterEqual(outputInfo.endTime(), -1)
self.assertIsNone(outputInfo.failureReason())

self.assertEqual(info.schedulingDelay(), -1)
self.assertEqual(info.processingDelay(), -1)
self.assertEqual(info.totalDelay(), -1)
self.assertEqual(info.numRecords(), 0)

self.assertGreaterEqual(len(batchInfosStarted), 4)
for info in batchInfosStarted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
self.assertGreaterEqual(info.submissionTime(), 0)

for streamId in info.streamIdToInputInfo():
streamInputInfo = info.streamIdToInputInfo()[streamId]
self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
self.assertGreaterEqual(streamInputInfo.numRecords, 0)
for key in streamInputInfo.metadata():
self.assertIsNotNone(streamInputInfo.metadata()[key])
self.assertIsNotNone(streamInputInfo.metadataDescription())

for outputOpId in info.outputOperationInfos():
outputInfo = info.outputOperationInfos()[outputOpId]
self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
self.assertGreaterEqual(outputInfo.id(), 0)
self.assertIsNotNone(outputInfo.name())
self.assertIsNotNone(outputInfo.description())
self.assertGreaterEqual(outputInfo.startTime(), -1)
self.assertGreaterEqual(outputInfo.endTime(), -1)
self.assertIsNone(outputInfo.failureReason())

self.assertGreaterEqual(info.schedulingDelay(), 0)
self.assertEqual(info.processingDelay(), -1)
self.assertEqual(info.totalDelay(), -1)
self.assertEqual(info.numRecords(), 0)

self.assertGreaterEqual(len(batchInfosCompleted), 4)
for info in batchInfosCompleted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
self.assertGreaterEqual(info.submissionTime(), 0)

for streamId in info.streamIdToInputInfo():
streamInputInfo = info.streamIdToInputInfo()[streamId]
self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
self.assertGreaterEqual(streamInputInfo.numRecords, 0)
for key in streamInputInfo.metadata():
self.assertIsNotNone(streamInputInfo.metadata()[key])
self.assertIsNotNone(streamInputInfo.metadataDescription())

for outputOpId in info.outputOperationInfos():
outputInfo = info.outputOperationInfos()[outputOpId]
self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
self.assertGreaterEqual(outputInfo.id(), 0)
self.assertIsNotNone(outputInfo.name())
self.assertIsNotNone(outputInfo.description())
self.assertGreaterEqual(outputInfo.startTime(), 0)
self.assertGreaterEqual(outputInfo.endTime(), 0)
self.assertIsNone(outputInfo.failureReason())

self.assertGreaterEqual(info.schedulingDelay(), 0)
self.assertGreaterEqual(info.processingDelay(), 0)
self.assertGreaterEqual(info.totalDelay(), 0)
self.assertEqual(info.numRecords(), 0)


class WindowFunctionTests(PySparkStreamingTestCase):

timeout = 15
Expand Down Expand Up @@ -1308,7 +1431,8 @@ def search_kinesis_asl_assembly_jar():

os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
StreamingListenerTests]

if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,82 @@
package org.apache.spark.streaming.api.java

import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler.StreamingListener

private[streaming] trait PythonStreamingListener{

/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }

/** Called when a receiver has reported an error */
def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }

/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { }

/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }

/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }

/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }

/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }

/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
}

private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
extends JavaStreamingListener {

/** Called when a receiver has been started */
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
listener.onReceiverStarted(receiverStarted)
}

/** Called when a receiver has reported an error */
override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
listener.onReceiverError(receiverError)
}

/** Called when a receiver has been stopped */
override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
listener.onReceiverStopped(receiverStopped)
}

/** Called when a batch of jobs has been submitted for processing. */
override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
listener.onBatchSubmitted(batchSubmitted)
}

/** Called when processing of a batch of jobs has started. */
override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
listener.onBatchStarted(batchStarted)
}

/** Called when processing of a batch of jobs has completed. */
override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
listener.onBatchCompleted(batchCompleted)
}

/** Called when processing of a job of a batch has started. */
override def onOutputOperationStarted(
outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
listener.onOutputOperationStarted(outputOperationStarted)
}

/** Called when processing of a job of a batch has completed. */
override def onOutputOperationCompleted(
outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
listener.onOutputOperationCompleted(outputOperationCompleted)
}
}

/**
* A listener interface for receiving information about an ongoing streaming computation.
Expand Down

0 comments on commit ace0db4

Please sign in to comment.