Skip to content

Commit

Permalink
[SPARK-49883][SS] State Store Checkpoint Structure V2 Integration wit…
Browse files Browse the repository at this point in the history
…h RocksDB and RocksDBFileManager

### What changes were proposed in this pull request?

This PR enables RocksDB to read <zip, changelog> file watermarked with unique ids (e.g. `version_uniqueId.zip`, `version_uniqueId.changelog`). Below is a explanation on the changes and rationale.

Now for each changelog file, we put a "version: uniqueId" to its first line, from it's current version to the previous snapshot version. For each snapshot (zip) file, there is no change other than their name (`version_uniqueId.zip`), because snapshot files are single source of truth.

In addition to `LastCommitBasedCheckpointId, lastCommittedCheckpointId, loadedCheckpointId` added in apache#47895 (review), also add a in-memory map `versionToUniqueIdLineage` that maps version to unique Id. This is useful when you reuse the same rocksDB instance in the executor, so you don't need to load the lineage from the changelog file again.

## RocksDB:
#### Load
- When `loadedVersion != version`, try to load a changelog file with `version_checkpointUniqueId.changelog`. There could be multiple cases:
    1. Version corresponds to a zip file:
        1. `version_uniqueId.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v2
        2. `version.zip`, this means (either changelog was not enabled, or it was enabled but version happens to be a checkpoint file), and previously query run ckpt v1
    2. Version corresponds to a changelog file:
        1. `version_uniqueId.changelog`, this means changelog was enabled, and previously query run ckpt v2
        2. `version.changelog`, this means changelog was enabled, and previously query run ckpt v1

- For case i.a, we construct a new empty lineage `(version, sessionCheckpointId)`. `version_uniqueId.changelog`.
- For case ii.a, we read the lineage file stored in
- For case i.b and ii.b, there is no need to load the lineage as they were not presented before, we just load the corresponding file without `uniqueId`, but newer files will be constructed with uniqueId.

checkpoint version v1 to checkpoint version v2.

Next the code finds the latest snapshot version through file listing. When there are multiple snapshot files with the same version but different unique Id (main problem this project was trying to solve), the correct one will be loaded based on the checkpoint id.

Then changelog is replayed with the awareness of lineage. The lineage is stored in memory for next load().

Last, load the changelog writer for version + 1, and write the lineage (version + 1, sessionCheckpointId) to the first line of the file. While it seems that the lineage is written early, it is safe because the change log writer is not committed yet.

- When `loadedVersion == version`, the same rocks db instance is reused and the lineage is stored in memory to `versionToUniqueIdLineage`.

#### Commit
- Also save `sessionCheckpointId` to `latestSnapshot`
- Add `(newVersion, sessionCheckpointId)` to `versionToUniqueIdLineage`

#### Abort
Also clear up `versionToUniqueIdLineage`

## RocksDBFileManager:
- A bunch of add-ups to make until code uniqueId aware. Now all places that return version returns a <version, Option[uniqueId]> pair, in v1 format, the option is None.
-
### deleteOldVersions:
If there are multiple `version_uniqueId1.zip`(changelog) and `versioion.uniqueId2.zip`, all are deleted.

## Changelog Reader / Writer
We purpose to save the lineage to the first line of the changelog files.

For changelog reader, there is an abstract function `readLineage` created. In `RocksDBCheckpointManager.getChangelogReader` function, the `readLineage` will be called right after the initialization of the changelog reader to update the file pointer to after the lineage. Subsequent `getNext` function won't be affecter because of this.

For changelog writer, there is an abstract function `writeLineage` that writes the lineage. This function will be called before any actual changelog data is written in `RocksDB.load()`.

### Why are the changes needed?

Improve fault tolerance to RocksDB State Store.

### Does this PR introduce _any_ user-facing change?

Not yet, after the V2 format project is finished, customer can use the new config to enable it with better rocksDB state store fault tolerance

### How was this patch tested?

Modified existing unit tests. For unit tests and backward compatibility tests please refer to: apache#48356

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48355 from WweiL/integration-for-review.

Lead-authored-by: WweiL <[email protected]>
Co-authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
2 people authored and HeartSaVioR committed Dec 13, 2024
1 parent df08177 commit 429402b
Show file tree
Hide file tree
Showing 16 changed files with 1,235 additions and 651 deletions.
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@
"An error occurred during loading state."
],
"subClass" : {
"CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT" : {
"message" : [
"Cannot find a base snapshot checkpoint with lineage: <lineage>."
]
},
"CANNOT_READ_CHECKPOINT" : {
"message" : [
"Cannot read RocksDB checkpoint metadata. Expected <expectedVersion>, but found <actualVersion>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ private[spark] object LogKeys {
case object LABEL_COLUMN extends LogKey
case object LARGEST_CLUSTER_INDEX extends LogKey
case object LAST_ACCESS_TIME extends LogKey
case object LAST_COMMITTED_CHECKPOINT_ID extends LogKey
case object LAST_COMMIT_BASED_CHECKPOINT_ID extends LogKey
case object LAST_VALID_TIME extends LogKey
case object LATEST_BATCH_ID extends LogKey
case object LATEST_COMMITTED_BATCH_ID extends LogKey
Expand All @@ -361,8 +363,10 @@ private[spark] object LogKeys {
case object LEFT_EXPR extends LogKey
case object LEFT_LOGICAL_PLAN_STATS_SIZE_IN_BYTES extends LogKey
case object LINE extends LogKey
case object LINEAGE extends LogKey
case object LINE_NUM extends LogKey
case object LISTENER extends LogKey
case object LOADED_CHECKPOINT_ID extends LogKey
case object LOADED_VERSION extends LogKey
case object LOAD_FACTOR extends LogKey
case object LOAD_TIME extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
cause = null)
}

def cannotFindBaseSnapshotCheckpoint(lineage: String): Throwable = {
new SparkException (
errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT",
messageParameters = Map("lineage" -> lineage),
cause = null)
}

def unexpectedFileSize(
dfsFile: Path,
localFile: File,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ class MicroBatchExecution(
execCtx.startOffsets ++= execCtx.endOffsets
watermarkTracker.setWatermark(
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
currentStateStoreCkptId ++= commitMetadata.stateUniqueIds
} else if (latestCommittedBatchId == latestBatchId - 1) {
execCtx.endOffsets.foreach {
case (source: Source, end: Offset) =>
Expand Down Expand Up @@ -965,7 +966,8 @@ class MicroBatchExecution(
updateStateStoreCkptId(execCtx, latestExecPlan)
}
execCtx.reportTimeTaken("commitOffsets") {
if (!commitLog.add(execCtx.batchId, CommitMetadata(watermarkTracker.currentWatermark))) {
if (!commitLog.add(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark, currentStateStoreCkptId.toMap))) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
Expand Down
Loading

0 comments on commit 429402b

Please sign in to comment.