-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944
Changes from 27 commits
6db0e3d
7dad0c1
2475173
07267b5
9d902d7
eddb3c7
1a3d20a
292ec5d
aa337c1
dfa712e
4ebd078
1656580
61dea35
5229152
4825215
20e1b9c
9eb6c76
4d4cd70
1870b35
fe9cea1
3f266c1
2eb6646
be30817
3ece6f2
ef9b095
876256e
1a23abb
97ee3ef
23639f4
40b6dc6
e15213e
42d952f
6f1425d
4deb63e
d140708
337785d
9dbe295
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
eason-yuchen-liu marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -71,7 +71,8 @@ import org.apache.spark.util.ArrayImplicits._ | |
| * to ensure re-executed RDD operations re-apply updates on the correct past version of the | ||
| * store. | ||
| */ | ||
| private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging { | ||
| private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging | ||
| with FineGrainedStateSource { | ||
|
|
||
| private val providerName = "HDFSBackedStateStoreProvider" | ||
|
|
||
|
|
@@ -261,6 +262,22 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
| new HDFSBackedStateStore(version, newMap) | ||
| } | ||
|
|
||
| /** | ||
| * Get the state store of endVersion for reading by applying delta files on the snapshot of | ||
| * startVersion. If snapshot for startVersion does not exist, an error will be thrown. | ||
| * | ||
| * @param startVersion checkpoint version of the snapshot to start with | ||
| * @param endVersion checkpoint version to end with | ||
eason-yuchen-liu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * @return [[HDFSBackedStateStore]] | ||
| */ | ||
| override def replayStoreFromSnapshot(startVersion: Long, endVersion: Long): StateStore = { | ||
| val newMap = replayLoadedMapForStoreFromSnapshot(startVersion, endVersion) | ||
| logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, startVersion)} to " + | ||
|
||
| log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + | ||
| log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update") | ||
| new HDFSBackedStateStore(endVersion, newMap) | ||
| } | ||
|
|
||
| /** Get the state store for reading to specific `version` of the store. */ | ||
| override def getReadStore(version: Long): ReadStateStore = { | ||
| val newMap = getLoadedMapForStore(version) | ||
|
|
@@ -269,6 +286,22 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
| new HDFSBackedReadStateStore(version, newMap) | ||
| } | ||
|
|
||
| /** | ||
| * Get the state store of endVersion for reading by applying delta files on the snapshot of | ||
| * startVersion. If snapshot for startVersion does not exist, an error will be thrown. | ||
| * | ||
| * @param startVersion checkpoint version of the snapshot to start with | ||
| * @param endVersion checkpoint version to end with | ||
| * @return [[HDFSBackedReadStateStore]] | ||
| */ | ||
| override def replayReadStoreFromSnapshot(startVersion: Long, endVersion: Long): ReadStateStore = { | ||
| val newMap = replayLoadedMapForStoreFromSnapshot(startVersion, endVersion) | ||
| logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, startVersion)} to " + | ||
| log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + | ||
| log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for readonly") | ||
| new HDFSBackedReadStateStore(endVersion, newMap) | ||
| } | ||
|
|
||
| private def getLoadedMapForStore(version: Long): HDFSBackedStateStoreMap = synchronized { | ||
| try { | ||
| if (version < 0) { | ||
|
|
@@ -285,6 +318,33 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Consturct the state at endVersion from snapshot from startVersion. | ||
| * Returns a new [[HDFSBackedStateStoreMap]] | ||
| * @param startVersion checkpoint version of the snapshot to start with | ||
|
||
| * @param endVersion checkpoint version to end with | ||
| */ | ||
| private def replayLoadedMapForStoreFromSnapshot(startVersion: Long, endVersion: Long): | ||
| HDFSBackedStateStoreMap = synchronized { | ||
| try { | ||
| if (startVersion < 1) { | ||
| throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion) | ||
| } | ||
| if (endVersion < startVersion || endVersion < 0) { | ||
eason-yuchen-liu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion) | ||
| } | ||
|
|
||
| val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey) | ||
| if (endVersion != 0) { | ||
| newMap.putAll(constructMapFromSnapshot(startVersion, endVersion)) | ||
| } | ||
| newMap | ||
| } | ||
| catch { | ||
| case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e) | ||
| } | ||
| } | ||
|
|
||
| // Run bunch of validations specific to HDFSBackedStateStoreProvider | ||
| private def runValidation( | ||
| useColumnFamilies: Boolean, | ||
|
|
@@ -544,6 +604,33 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
| result | ||
| } | ||
|
|
||
| private def constructMapFromSnapshot(startVersion: Long, endVersion: Long): | ||
| HDFSBackedStateStoreMap = { | ||
| val (result, elapsedMs) = Utils.timeTakenMs { | ||
| val startVersionMap = synchronized { Option(loadedMaps.get(startVersion)) } match { | ||
| case Some(value) => Option(value) | ||
| case None => readSnapshotFile(startVersion) | ||
| } | ||
| if (startVersionMap.isEmpty) { | ||
| throw StateStoreErrors.stateStoreSnapshotFileNotFound( | ||
| snapshotFile(startVersion).toString, toString()) | ||
| } | ||
|
|
||
| // Load all the deltas from the version after the start version up to the end version. | ||
| val resultMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey) | ||
| resultMap.putAll(startVersionMap.get) | ||
| for (deltaVersion <- startVersion + 1 to endVersion) { | ||
| updateFromDeltaFile(deltaVersion, resultMap) | ||
| } | ||
|
|
||
| resultMap | ||
| } | ||
|
|
||
| logDebug(s"Loading state from $startVersion to $endVersion takes $elapsedMs ms.") | ||
|
|
||
| result | ||
| } | ||
|
|
||
| private def writeUpdateToDeltaFile( | ||
| output: DataOutputStream, | ||
| key: UnsafeRow, | ||
|
|
@@ -683,6 +770,11 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Try to read the snapshot file. If the snapshot file is not available, return [[None]]. | ||
| * | ||
| * @param version the version of the snapshot file | ||
| */ | ||
eason-yuchen-liu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private def readSnapshotFile(version: Long): Option[HDFSBackedStateStoreMap] = { | ||
| val fileToRead = snapshotFile(version) | ||
| val map = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -226,6 +226,82 @@ class RocksDB( | |
| this | ||
| } | ||
|
|
||
| /** | ||
| * Load from the start snapshot version and apply all the changelog records to reach the | ||
| * end version. Note that this will copy all the necessary files from DFS to local disk as needed, | ||
| * and possibly restart the native RocksDB instance. | ||
| * | ||
| * @param startVersion version of the snapshot to start with | ||
| * @param endVersion end version | ||
| * @return A RocksDB instance loaded with the state endVersion replayed from startVersion. | ||
| * Note that the instance will be read-only since this method is only used in State Data | ||
| * Source. | ||
| */ | ||
| def loadFromSnapshot(startVersion: Long, endVersion: Long): RocksDB = { | ||
| assert(startVersion >= 0 && endVersion >= startVersion) | ||
| acquire(LoadStore) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lock release path is still the same right ? i assume we release on an abort ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I am copying the existing implementation. Any changes needed here?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea - Im guessing the unlock happens in the end as part of an abort within the state data source reader |
||
| recordedMetrics = None | ||
| logInfo( | ||
| log"Loading ${MDC(LogKeys.VERSION_NUM, endVersion)} from " + | ||
|
||
| log"${MDC(LogKeys.VERSION_NUM, startVersion)}") | ||
| try { | ||
| replayFromCheckpoint(startVersion, endVersion) | ||
|
|
||
| logInfo( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| log"Loaded ${MDC(LogKeys.VERSION_NUM, endVersion)} from " + | ||
| log"${MDC(LogKeys.VERSION_NUM, startVersion)}") | ||
| } catch { | ||
| case t: Throwable => | ||
| loadedVersion = -1 // invalidate loaded data | ||
| throw t | ||
| } | ||
| this | ||
| } | ||
|
|
||
| /** | ||
| * Load from the start checkpoint version and apply all the changelog records to reach the | ||
| * end version. | ||
| * If the start version does not exist, it will throw an exception. | ||
| * | ||
| * @param startVersion start checkpoint version | ||
| * @param endVersion end version | ||
| */ | ||
| private def replayFromCheckpoint(startVersion: Long, endVersion: Long): Any = { | ||
| if (loadedVersion != startVersion) { | ||
|
||
| closeDB() | ||
| val metadata = fileManager.loadCheckpointFromDfs(startVersion, workingDir) | ||
| loadedVersion = startVersion | ||
|
|
||
| // reset last snapshot version | ||
| if (lastSnapshotVersion > startVersion) { | ||
| // discard any newer snapshots | ||
| lastSnapshotVersion = 0L | ||
| latestSnapshot = None | ||
| } | ||
| openDB() | ||
|
|
||
| numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) { | ||
| // we don't track the total number of rows - discard the number being track | ||
| -1L | ||
| } else if (metadata.numKeys < 0) { | ||
| // we track the total number of rows, but the snapshot doesn't have tracking number | ||
| // need to count keys now | ||
| countKeys() | ||
| } else { | ||
| metadata.numKeys | ||
| } | ||
| } | ||
| if (loadedVersion != endVersion) replayChangelog(endVersion) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to see user-friendly error message when changelog file does not exist. Let's say, users may be actually not using changelog checkpointing and somehow mislead that it's supported. Providing FileNotFoundException to them does not give an hint what is possibly not correct - smart user may just notice what is wrong, but better to be user-friendly, and also be a part of error class framework.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error users will get now when the changelog does not exist is: It does not have its own error class so I think we should put this to further tasks: Put this error to its own error class and catch it here to remind user of possible cause. |
||
| // After changelog replay the numKeysOnWritingVersion will be updated to | ||
| // the correct number of keys in the loaded version. | ||
| numKeysOnLoadedVersion = numKeysOnWritingVersion | ||
| fileManagerMetrics = fileManager.latestLoadCheckpointMetrics | ||
|
|
||
| if (conf.resetStatsOnLoad) { | ||
| nativeStats.reset | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Replay change log from the loaded version to the target version. | ||
| */ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.