Skip to content

Commit

Permalink
[SPARK-47962][PYTHON][DOCS] PySpark Dataframe doc test improvement
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Improve the doc test, before it used a batch df and didn't really start the test.

### Why are the changes needed?

Test coverage improvement

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test only addition

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46189 from WweiL/doc-test-improvement.

Authored-by: Wei Liu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
WweiL authored and HyukjinKwon committed Apr 25, 2024
1 parent d233892 commit ea37c86
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4110,6 +4110,7 @@ def observe(
When ``observation`` is a string, streaming queries also work as below.
>>> from pyspark.sql.streaming import StreamingQueryListener
>>> import time
>>> class MyErrorListener(StreamingQueryListener):
... def onQueryStarted(self, event):
... pass
Expand All @@ -4130,13 +4131,24 @@ def observe(
... def onQueryTerminated(self, event):
... pass
...
>>> spark.streams.addListener(MyErrorListener())
>>> error_listener = MyErrorListener()
>>> spark.streams.addListener(error_listener)
>>> sdf = spark.readStream.format("rate").load().withColumn(
... "error", col("value")
... )
>>> # Observe row count (rc) and error row count (erc) in the streaming Dataset
... observed_ds = df.observe(
... observed_ds = sdf.observe(
... "my_event",
... count(lit(1)).alias("rc"),
... count(col("error")).alias("erc")) # doctest: +SKIP
>>> observed_ds.writeStream.format("console").start() # doctest: +SKIP
... count(col("error")).alias("erc"))
>>> try:
... q = observed_ds.writeStream.format("console").start()
... time.sleep(5)
...
... finally:
... q.stop()
... spark.streams.removeListener(error_listener)
...
"""
...

Expand Down

0 comments on commit ea37c86

Please sign in to comment.