Skip to content

Commit

Permalink
Docs: Improve the Spark Structured Streaming jobs (apache#8203)
Browse files Browse the repository at this point in the history
* Docs: Improve the Spark Structured Streaming jobs

Promote usage of using `toTable(...)` instead of `.option("path", ...)`
because I've found that the `.start()` might not inject the catalog
functions when doing the query planning. If the catalog is missing,
this causes issues when using partitioned tables, since the
transforms are not available.

* Update docs/spark-structured-streaming.md

* Update spark-structured-streaming.md

* Update docs/spark-structured-streaming.md

Co-authored-by: Brian "bits" Olsen <[email protected]>

* Update docs/spark-structured-streaming.md

Co-authored-by: Brian "bits" Olsen <[email protected]>

* Update docs/spark-structured-streaming.md

Co-authored-by: Brian "bits" Olsen <[email protected]>

* Update docs/spark-structured-streaming.md

Co-authored-by: Brian "bits" Olsen <[email protected]>

* Update docs/spark-structured-streaming.md

Co-authored-by: Brian "bits" Olsen <[email protected]>

* Update docs/spark-structured-streaming.md

Co-authored-by: Brian "bits" Olsen <[email protected]>

---------

Co-authored-by: Brian "bits" Olsen <[email protected]>
  • Loading branch information
Fokko and bitsondatadev authored Sep 3, 2023
1 parent c3b2564 commit 1b9f412
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions docs/spark-structured-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ menu:

# Spark Structured Streaming

Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API
with different levels of support in Spark versions.

As of Spark 3, DataFrame reads and writes are supported.
Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions.

## Streaming Reads

Expand All @@ -53,77 +50,79 @@ Iceberg only supports reading data from append snapshots. Overwrite snapshots ca
To write values from streaming query to Iceberg table, use `DataStreamWriter`:

```scala
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()
.toTable("database.table_name")
```

The `tableIdentifier` can be:
If you're using Spark 3.0 or earlier, you need to use `.option("path", "database.table_name").start()`, instead of `.toTable("database.table_name")`.

* The fully-qualified path to a HDFS table, like `hdfs://nn:8020/path/to/table`
* A table name if the table is tracked by a catalog, like `database.table_name`
In the case of the directory-based Hadoop catalog:

Iceberg doesn't support "continuous processing", as it doesn't provide the interface to "commit" the output.
```scala
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", "hdfs://nn:8020/path/to/table")
.option("checkpointLocation", checkpointPath)
.start()
```

Iceberg supports `append` and `complete` output modes:

* `append`: appends the rows of every micro-batch to the table
* `complete`: replaces the table contents every micro-batch

The table should be created in prior to start the streaming query. Refer [SQL create table](../spark-ddl/#create-table)
on Spark page to see how to create the Iceberg table.
Prior to starting the streaming query, ensure you created the table. Refer to the [SQL create table](../spark-ddl/#create-table) documentation to learn how to create the Iceberg table.

Iceberg doesn't support experimental [continuous processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing), as it doesn't provide the interface to "commit" the output.

### Writing against partitioned table
### Partitioned table

Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write
Iceberg requires sorting data by partition per task prior to writing the data. In Spark tasks are split by Spark partition.
against partitioned table. For batch queries you're encouraged to do explicit sort to fulfill the requirement
(see [here](../spark-writes/#writing-to-partitioned-tables)), but the approach would bring additional latency as
repartition and sort are considered as heavy operations for streaming workload. To avoid additional latency, you can
enable fanout writer to eliminate the requirement.

```scala
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("fanout-enabled", "true")
.option("checkpointLocation", checkpointPath)
.start()
.toTable("database.table_name")
```

Fanout writer opens the files per partition value and doesn't close these files till write task is finished.
This functionality is discouraged for batch query, as explicit sort against output rows isn't expensive for batch workload.
Fanout writer opens the files per partition value and doesn't close these files till the write task finishes. Avoid using the fanout writer for batch writing, as explicit sort against output rows is cheap for batch workloads.

## Maintenance for streaming tables

Streaming queries can create new table versions quickly, which creates lots of table metadata to track those versions.
Streaming writes can create new table versions quickly, creating lots of table metadata to track those versions.
Maintaining metadata by tuning the rate of commits, expiring old snapshots, and automatically cleaning up metadata files
is highly recommended.

### Tune the rate of commits

Having high rate of commits would produce lots of data files, manifests, and snapshots which leads the table hard
to maintain. We encourage having trigger interval 1 minute at minimum, and increase the interval if needed.
Having a high rate of commits produces data files, manifests, and snapshots which leads to additional maintenance. It is recommended to have a trigger interval of 1 minute at the minimum and increase the interval if needed.

The triggers section in [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)
documents how to configure the interval.

### Expire old snapshots

Each micro-batch written to a table produces a new snapshot, which are tracked in table metadata until they are expired to remove the metadata and any data files that are no longer needed. Snapshots accumulate quickly with frequent commits, so it is highly recommended that tables written by streaming queries are [regularly maintained](../maintenance#expire-snapshots).
Each batch written to a table produces a new snapshot. Iceberg tracks snapshots in table metadata until they are expired. Snapshots accumulate quickly with frequent commits, so it is highly recommended that tables written by streaming queries are [regularly maintained](../maintenance#expire-snapshots). [Snapshot expiration](../spark-procedures/#expire_snapshots) is the procedure of removing the metadata and any data files that are no longer needed. By default, the procedure will expire the snapshots older than five days.

### Compacting data files

The amount of data written in a micro batch is typically small, which can cause the table metadata to track lots of small files. [Compacting small files into larger files](../maintenance#compact-data-files) reduces the metadata needed by the table, and increases query efficiency.
The amount of data written from a streaming process is typically small, which can cause the table metadata to track lots of small files. [Compacting small files into larger files](../maintenance#compact-data-files) reduces the metadata needed by the table, and increases query efficiency. Iceberg and Spark [comes with the `rewrite_data_files` procedure](../spark-procedures/#rewrite_data_files).

### Rewrite manifests

To optimize write latency on streaming workload, Iceberg may write the new snapshot with a "fast" append that does not automatically compact manifests.
This could lead lots of small manifest files. Manifests can be [rewritten to optimize queries and to compact](../maintenance#rewrite-manifests).
To optimize write latency on a streaming workload, Iceberg can write the new snapshot with a "fast" append that does not automatically compact manifests.
This could lead lots of small manifest files. Iceberg can [rewrite the number of manifest files to improve query performance](../maintenance#rewrite-manifests). Iceberg and Spark [come with the `rewrite_manifests` procedure](../spark-procedures/#rewrite_manifests).

0 comments on commit 1b9f412

Please sign in to comment.