Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
6db0e3d
initial implementation
eason-yuchen-liu Jun 4, 2024
7dad0c1
Merge branch 'skipSnapshotAtBatch' of https://github.com/eason-yuchen…
eason-yuchen-liu Jun 4, 2024
2475173
add test cases for two options in HDFS state store
eason-yuchen-liu Jun 6, 2024
07267b5
allow rocksdb to reconstruct state from a specific checkpoint
eason-yuchen-liu Jun 7, 2024
9d902d7
test directly on the method instead of end to end
eason-yuchen-liu Jun 10, 2024
eddb3c7
Merge branch 'apache:master' into skipSnapshotAtBatch
eason-yuchen-liu Jun 10, 2024
1a3d20a
make sure test is stable
eason-yuchen-liu Jun 10, 2024
292ec5d
delete useless test files
eason-yuchen-liu Jun 10, 2024
aa337c1
add new test on partition not found error
eason-yuchen-liu Jun 11, 2024
dfa712e
clean up and format
eason-yuchen-liu Jun 11, 2024
4ebd078
move partition error
eason-yuchen-liu Jun 11, 2024
1656580
improve doc
eason-yuchen-liu Jun 11, 2024
61dea35
minor
eason-yuchen-liu Jun 11, 2024
5229152
support reading join states
eason-yuchen-liu Jun 12, 2024
4825215
address reviews by Wei partially
eason-yuchen-liu Jun 13, 2024
20e1b9c
address comments from Anish & Wei
eason-yuchen-liu Jun 13, 2024
9eb6c76
Merge branch 'master' into skipSnapshotAtBatch
eason-yuchen-liu Jun 13, 2024
4d4cd70
log StateSourceOptions optionally
eason-yuchen-liu Jun 13, 2024
1870b35
Merge branch 'skipSnapshotAtBatch' of https://github.com/eason-yuchen…
eason-yuchen-liu Jun 13, 2024
fe9cea1
address more comments from Anish
eason-yuchen-liu Jun 14, 2024
3f266c1
style
eason-yuchen-liu Jun 17, 2024
2eb6646
also update the name of StateTable
eason-yuchen-liu Jun 21, 2024
be30817
Reflect more comments from Anish
eason-yuchen-liu Jun 22, 2024
3ece6f2
resort error-conditions
eason-yuchen-liu Jun 22, 2024
ef9b095
create integration test against golden files
eason-yuchen-liu Jun 25, 2024
876256e
reflect comments from Jungtaek
eason-yuchen-liu Jun 25, 2024
1a23abb
refactor the code to isolate from current state stores used by stream…
eason-yuchen-liu Jun 25, 2024
97ee3ef
some naming and formatting comments from Anish and Jungtaek
eason-yuchen-liu Jun 26, 2024
23639f4
create new error for SupportsFineGrainedReplayFromSnapshot
eason-yuchen-liu Jun 26, 2024
40b6dc6
move error to StateStoreErrors
eason-yuchen-liu Jun 26, 2024
e15213e
rename to startVersion to snapshotVersion to make its function clear
eason-yuchen-liu Jun 27, 2024
42d952f
rename SupportsFineGrainedReplayFromSnapshot to SupportsFineGrainedRe…
eason-yuchen-liu Jun 27, 2024
6f1425d
reflect more comments from Jungtaek
eason-yuchen-liu Jun 27, 2024
4deb63e
throw the exception
eason-yuchen-liu Jun 27, 2024
d140708
provide the script to regenerate golden files
eason-yuchen-liu Jun 27, 2024
337785d
address comments from Anish
eason-yuchen-liu Jun 29, 2024
9dbe295
minor
eason-yuchen-liu Jul 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after version, as the next string does not start with space.

log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update")
new HDFSBackedStateStore(endVersion, newMap)
Expand All @@ -917,9 +918,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here

log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for readonly")
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for read-only")
new HDFSBackedReadStateStore(endVersion, newMap)
}

Expand All @@ -935,14 +937,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < snapshotVersion || endVersion < 0) {
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}

val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
if (endVersion != 0) {
newMap.putAll(constructMapFromSnapshot(snapshotVersion, endVersion))
}
newMap.putAll(constructMapFromSnapshot(snapshotVersion, endVersion))

newMap
}
catch {
Expand Down Expand Up @@ -972,7 +973,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
resultMap
}

logDebug(s"Loading state from $snapshotVersion to $endVersion takes $elapsedMs ms.")
logDebug(s"Loading snapshot at version $snapshotVersion and apply delta files to version " +
s"$endVersion takes $elapsedMs ms.")

result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ class RocksDB(
acquire(LoadStore)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock release path is still the same right ? i assume we release on an abort ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am copying the existing implementation. Any changes needed here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - Im guessing the unlock happens in the end as part of an abort within the state data source reader

recordedMetrics = None
logInfo(
log"Loading ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, snapshotVersion)}")
log"Loading snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
try {
replayFromCheckpoint(snapshotVersion, endVersion)

logInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

log"Loaded ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, snapshotVersion)}")
log"Loaded snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
Expand All @@ -267,29 +267,27 @@ class RocksDB(
* @param endVersion end version
*/
private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = {
if (loadedVersion != snapshotVersion) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir)
loadedVersion = snapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > snapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir)
loadedVersion = snapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > snapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
if (loadedVersion != endVersion) replayChangelog(endVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see user-friendly error message when changelog file does not exist. Let's say, users may be actually not using changelog checkpointing and somehow mislead that it's supported. Providing FileNotFoundException to them does not give an hint what is possibly not correct - smart user may just notice what is wrong, but better to be user-friendly, and also be a part of error class framework.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error users will get now when the changelog does not exist is:

Cause: org.apache.spark.SparkException: [CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE] An error occurred during loading state. Error reading streaming state file of <checkpointLocation> does not exist.
If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location. SQLSTATE: 58030

It does not have its own error class so I think we should put this to further tasks: Put this error to its own error class and catch it here to remind user of possible cause.

// After changelog replay the numKeysOnWritingVersion will be updated to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils

private[sql] class RocksDBStateStoreProvider
extends StateStoreProvider with Logging with Closeable
with SupportsFineGrainedReplay {
with SupportsFineGrainedReplay {
import RocksDBStateStoreProvider._

class RocksDBStateStore(lastVersion: Long) extends StateStore {
Expand Down Expand Up @@ -384,22 +384,6 @@ private[sql] class RocksDBStateStoreProvider
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
new RocksDBStateStore(endVersion)
}
catch {
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}
}

object RocksDBStateStoreProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
}

protected def testSnapshotNotFound(): Unit = {
withTempDir(tempDir => {
withTempDir { tempDir =>
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
for (i <- 1 to 4) {
val store = provider.getStore(i - 1)
Expand All @@ -989,11 +989,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
.replayReadStateFromSnapshot(1, 2)
}
checkError(exc, "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can provide users the better error message e.g. snapshot file does not exist, but I'm OK with addressing this later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put it later along with the changelog file not found exception.

})
}
}

protected def testGetReadStoreWithStartVersion(): Unit = {
withTempDir(tempDir => {
withTempDir { tempDir =>
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
for (i <- 1 to 4) {
val store = provider.getStore(i - 1)
Expand All @@ -1012,11 +1012,11 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
assert(get(result, "a", 4).isEmpty)

provider.close()
})
}
}

protected def testSnapshotPartitionId(): Unit = {
withTempDir(tempDir => {
withTempDir { tempDir =>
val inputData = MemoryStream[Int]
val df = inputData.toDF().limit(10)

Expand All @@ -1039,16 +1039,15 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
val stateDfError = spark.read.format("statestore")
.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
.option(
StateSourceOptions.SNAPSHOT_PARTITION_ID,
spark.sessionState.conf.numShufflePartitions)
StateSourceOptions.SNAPSHOT_PARTITION_ID, 1)
.option(StateSourceOptions.BATCH_ID, 0)
.load(tempDir.getAbsolutePath)

val exc = intercept[StateStoreSnapshotPartitionNotFound] {
stateDfError.show()
}
assert(exc.getErrorClass === "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND")
})
}
}

// Todo: Should also test against state generated by 3.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it remaining TODO, or does not need to be done at all? If we don't need to, let's remove the golden files for 3.5. I guess it's not intentional to test cross version compatibility.

Expand Down