Skip to content

Commit 8023504

Browse files
ericm-dbHeartSaVioR
authored andcommitted
[SPARK-49594][SS] Adding check on whether columnFamilies were added or removed to write StateSchemaV3 file
### What changes were proposed in this pull request? Up until this [PR](#47880) that enabled deleteIfExists, we changed the condition on which we throw an error. However, in doing so, we are not writing schema files whenever we add or remove column families, which is functionally incorrect. Additionally, we were initially always writing the newSchemaFilePath to the OperatorStateMetadata upon every new query run, when we should only do this if the schema changes. ### Why are the changes needed? These changes are needed because we want to write a schema file out every time we add or remove column families. Also, we want to make sure that we point to the old schema file for the current metadata file if the schema has not changed between this run and the last one, as opposed to populating the metadata with a new schema file path every time, even if this file is not created. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Amended unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #48067 from ericm-db/add-remove-cf. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 0f4d289 commit 8023504

2 files changed

Lines changed: 250 additions & 9 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.internal.{Logging, LogKeys, MDC}
2727
import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
2828
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, StatefulOperatorStateInfo}
2929
import org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, SchemaWriter}
30+
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker.SCHEMA_FORMAT_V3
3031
import org.apache.spark.sql.internal.SessionState
3132
import org.apache.spark.sql.types.{DataType, StructType}
3233

@@ -95,7 +96,7 @@ class StateSchemaCompatibilityChecker(
9596
stateStoreColFamilySchema: List[StateStoreColFamilySchema],
9697
stateSchemaVersion: Int): Unit = {
9798
// Ensure that schema file path is passed explicitly for schema version 3
98-
if (stateSchemaVersion == 3 && newSchemaFilePath.isEmpty) {
99+
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFilePath.isEmpty) {
99100
throw new IllegalStateException("Schema file path is required for schema version 3")
100101
}
101102

@@ -186,8 +187,13 @@ class StateSchemaCompatibilityChecker(
186187
check(existingStateSchema, newSchema, ignoreValueSchema)
187188
}
188189
}
190+
val colFamiliesAddedOrRemoved =
191+
newStateSchemaList.map(_.colFamilyName) != existingStateSchemaList.map(_.colFamilyName)
192+
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) {
193+
createSchemaFile(newStateSchemaList, stateSchemaVersion)
194+
}
189195
// TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3
190-
false
196+
colFamiliesAddedOrRemoved
191197
}
192198
}
193199

@@ -196,6 +202,9 @@ class StateSchemaCompatibilityChecker(
196202
}
197203

198204
object StateSchemaCompatibilityChecker {
205+
206+
val SCHEMA_FORMAT_V3: Int = 3
207+
199208
private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
200209
if (!UnsafeRowUtils.isBinaryStable(schema)) {
201210
throw new SparkUnsupportedOperationException(
@@ -275,10 +284,31 @@ object StateSchemaCompatibilityChecker {
275284
if (storeConf.stateSchemaCheckEnabled && result.isDefined) {
276285
throw result.get
277286
}
278-
val schemaFileLocation = newSchemaFilePath match {
279-
case Some(path) => path.toString
280-
case None => checker.schemaFileLocation.toString
287+
val schemaFileLocation = if (evolvedSchema) {
288+
// if we are using the state schema v3, and we have
289+
// evolved schema, this newSchemaFilePath should be defined
290+
// and we want to populate the metadata with this file
291+
if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
292+
newSchemaFilePath.get.toString
293+
} else {
294+
// if we are using any version less than v3, we have written
295+
// the schema to this static location, which we will return
296+
checker.schemaFileLocation.toString
297+
}
298+
} else {
299+
// if we have not evolved schema (there has been a previous schema)
300+
// and we are using state schema v3, this file path would be defined
301+
// so we would just populate the next run's metadata file with this
302+
// file path
303+
if (stateSchemaVersion == SCHEMA_FORMAT_V3) {
304+
oldSchemaFilePath.get.toString
305+
} else {
306+
// if we are using any version less than v3, we have written
307+
// the schema to this static location, which we will return
308+
checker.schemaFileLocation.toString
309+
}
281310
}
311+
282312
StateSchemaValidationResult(evolvedSchema, schemaFileLocation)
283313
}
284314
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala

Lines changed: 215 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,10 @@ class TransformWithStateSuite extends StateStoreMetricsTest
14481448
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
14491449
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
14501450
withTempDir { chkptDir =>
1451+
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
1452+
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
1453+
1454+
val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
14511455
// in this test case, we are changing the state spec back and forth
14521456
// to trigger the writing of the schema and metadata files
14531457
val inputData = MemoryStream[(String, String)]
@@ -1483,6 +1487,11 @@ class TransformWithStateSuite extends StateStoreMetricsTest
14831487
},
14841488
StopStream
14851489
)
1490+
// assert that a metadata and schema file has been written for each run
1491+
// as state variables have been deleted
1492+
assert(getFiles(metadataPath).length == 2)
1493+
assert(getFiles(stateSchemaPath).length == 2)
1494+
14861495
val result3 = inputData.toDS()
14871496
.groupByKey(x => x._1)
14881497
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
@@ -1512,10 +1521,6 @@ class TransformWithStateSuite extends StateStoreMetricsTest
15121521
},
15131522
StopStream
15141523
)
1515-
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
1516-
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
1517-
1518-
val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
15191524
// by the end of the test, there have been 4 batches,
15201525
// so the metadata and schema logs, and commitLog has been purged
15211526
// for batches 0 and 1 so metadata and schema files exist for batches 0, 1, 2, 3
@@ -1527,6 +1532,116 @@ class TransformWithStateSuite extends StateStoreMetricsTest
15271532
}
15281533
}
15291534

1535+
test("transformWithState - verify that schema file is kept after metadata is purged") {
1536+
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
1537+
classOf[RocksDBStateStoreProvider].getName,
1538+
SQLConf.SHUFFLE_PARTITIONS.key ->
1539+
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
1540+
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
1541+
withTempDir { chkptDir =>
1542+
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
1543+
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
1544+
1545+
val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
1546+
// in this test case, we are changing the state spec back and forth
1547+
// to trigger the writing of the schema and metadata files
1548+
val inputData = MemoryStream[(String, String)]
1549+
val result1 = inputData.toDS()
1550+
.groupByKey(x => x._1)
1551+
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
1552+
TimeMode.None(),
1553+
OutputMode.Update())
1554+
testStream(result1, OutputMode.Update())(
1555+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1556+
AddData(inputData, ("a", "str1")),
1557+
CheckNewAnswer(("a", "1", "")),
1558+
Execute { q =>
1559+
eventually(timeout(Span(5, Seconds))) {
1560+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1561+
}
1562+
},
1563+
StopStream
1564+
)
1565+
testStream(result1, OutputMode.Update())(
1566+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1567+
AddData(inputData, ("a", "str1")),
1568+
CheckNewAnswer(("a", "2", "str1")),
1569+
Execute { q =>
1570+
eventually(timeout(Span(5, Seconds))) {
1571+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1572+
}
1573+
},
1574+
StopStream
1575+
)
1576+
val result2 = inputData.toDS()
1577+
.groupByKey(x => x._1)
1578+
.transformWithState(new MostRecentStatefulProcessorWithDeletion(),
1579+
TimeMode.None(),
1580+
OutputMode.Update())
1581+
testStream(result2, OutputMode.Update())(
1582+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1583+
AddData(inputData, ("a", "str2")),
1584+
CheckNewAnswer(("a", "str1")),
1585+
Execute { q =>
1586+
eventually(timeout(Span(5, Seconds))) {
1587+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1588+
}
1589+
},
1590+
StopStream
1591+
)
1592+
assert(getFiles(metadataPath).length == 3)
1593+
assert(getFiles(stateSchemaPath).length == 2)
1594+
1595+
val result3 = inputData.toDS()
1596+
.groupByKey(x => x._1)
1597+
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
1598+
TimeMode.None(),
1599+
OutputMode.Update())
1600+
testStream(result3, OutputMode.Update())(
1601+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1602+
AddData(inputData, ("a", "str3")),
1603+
CheckNewAnswer(("a", "1", "str2")),
1604+
Execute { q =>
1605+
eventually(timeout(Span(5, Seconds))) {
1606+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1607+
}
1608+
},
1609+
StopStream
1610+
)
1611+
// metadata files should be kept for batches 1, 2, 3
1612+
// schema files should be kept for batches 0, 2, 3
1613+
assert(getFiles(metadataPath).length == 3)
1614+
assert(getFiles(stateSchemaPath).length == 3)
1615+
// we want to ensure that we can read batch 1 even though the
1616+
// metadata file for batch 0 was removed
1617+
val batch1Df = spark.read
1618+
.format("statestore")
1619+
.option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
1620+
.option(StateSourceOptions.STATE_VAR_NAME, "countState")
1621+
.option(StateSourceOptions.BATCH_ID, 1)
1622+
.load()
1623+
1624+
val batch1AnsDf = batch1Df.selectExpr(
1625+
"key.value AS groupingKey",
1626+
"single_value.value AS valueId")
1627+
1628+
checkAnswer(batch1AnsDf, Seq(Row("a", 2L)))
1629+
1630+
val batch3Df = spark.read
1631+
.format("statestore")
1632+
.option(StateSourceOptions.PATH, chkptDir.getAbsolutePath)
1633+
.option(StateSourceOptions.STATE_VAR_NAME, "countState")
1634+
.option(StateSourceOptions.BATCH_ID, 3)
1635+
.load()
1636+
1637+
val batch3AnsDf = batch3Df.selectExpr(
1638+
"key.value AS groupingKey",
1639+
"single_value.value AS valueId")
1640+
checkAnswer(batch3AnsDf, Seq(Row("a", 1L)))
1641+
}
1642+
}
1643+
}
1644+
15301645
test("state data source integration - value state supports time travel") {
15311646
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
15321647
classOf[RocksDBStateStoreProvider].getName,
@@ -1708,6 +1823,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest
17081823
}
17091824
}
17101825
}
1826+
1827+
test("transformWithState - verify that no metadata and schema logs are purged after" +
1828+
" removing column family") {
1829+
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
1830+
classOf[RocksDBStateStoreProvider].getName,
1831+
SQLConf.SHUFFLE_PARTITIONS.key ->
1832+
TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString,
1833+
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "3") {
1834+
withTempDir { chkptDir =>
1835+
val inputData = MemoryStream[(String, String)]
1836+
val result1 = inputData.toDS()
1837+
.groupByKey(x => x._1)
1838+
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
1839+
TimeMode.None(),
1840+
OutputMode.Update())
1841+
testStream(result1, OutputMode.Update())(
1842+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1843+
AddData(inputData, ("a", "str1")),
1844+
CheckNewAnswer(("a", "1", "")),
1845+
AddData(inputData, ("a", "str1")),
1846+
CheckNewAnswer(("a", "2", "str1")),
1847+
Execute { q =>
1848+
eventually(timeout(Span(5, Seconds))) {
1849+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1850+
}
1851+
},
1852+
StopStream
1853+
)
1854+
testStream(result1, OutputMode.Update())(
1855+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1856+
AddData(inputData, ("b", "str1")),
1857+
CheckNewAnswer(("b", "1", "")),
1858+
AddData(inputData, ("b", "str1")),
1859+
CheckNewAnswer(("b", "2", "str1")),
1860+
AddData(inputData, ("b", "str1")),
1861+
CheckNewAnswer(("b", "3", "str1")),
1862+
AddData(inputData, ("b", "str1")),
1863+
CheckNewAnswer(("b", "4", "str1")),
1864+
AddData(inputData, ("b", "str1")),
1865+
CheckNewAnswer(("b", "5", "str1")),
1866+
AddData(inputData, ("b", "str1")),
1867+
CheckNewAnswer(("b", "6", "str1")),
1868+
AddData(inputData, ("b", "str1")),
1869+
CheckNewAnswer(("b", "7", "str1")),
1870+
AddData(inputData, ("b", "str1")),
1871+
CheckNewAnswer(("b", "8", "str1")),
1872+
AddData(inputData, ("b", "str1")),
1873+
CheckNewAnswer(("b", "9", "str1")),
1874+
AddData(inputData, ("b", "str1")),
1875+
CheckNewAnswer(("b", "10", "str1")),
1876+
AddData(inputData, ("b", "str1")),
1877+
CheckNewAnswer(("b", "11", "str1")),
1878+
AddData(inputData, ("b", "str1")),
1879+
CheckNewAnswer(("b", "12", "str1")),
1880+
Execute { q =>
1881+
eventually(timeout(Span(5, Seconds))) {
1882+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1883+
}
1884+
},
1885+
StopStream
1886+
)
1887+
val result2 = inputData.toDS()
1888+
.groupByKey(x => x._1)
1889+
.transformWithState(new MostRecentStatefulProcessorWithDeletion(),
1890+
TimeMode.None(),
1891+
OutputMode.Update())
1892+
1893+
testStream(result2, OutputMode.Update())(
1894+
StartStream(checkpointLocation = chkptDir.getCanonicalPath),
1895+
AddData(inputData, ("b", "str2")),
1896+
CheckNewAnswer(("b", "str1")),
1897+
AddData(inputData, ("b", "str3")),
1898+
CheckNewAnswer(("b", "str2")),
1899+
Execute { q =>
1900+
eventually(timeout(Span(5, Seconds))) {
1901+
q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false)
1902+
}
1903+
},
1904+
StopStream
1905+
)
1906+
1907+
val stateOpIdPath = new Path(new Path(chkptDir.getCanonicalPath, "state"), "0")
1908+
val stateSchemaPath = getStateSchemaPath(stateOpIdPath)
1909+
1910+
val metadataPath = OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
1911+
1912+
// Metadata files are written for batches 0, 2, and 14.
1913+
// Schema files are written for 0, 14
1914+
// At the beginning of the last query run, the thresholdBatchId is 11.
1915+
// However, we would need both schema files to be preserved, if we want to
1916+
// be able to read from batch 11 onwards.
1917+
assert(getFiles(metadataPath).length == 2)
1918+
assert(getFiles(stateSchemaPath).length == 2)
1919+
}
1920+
}
1921+
}
17111922
}
17121923

17131924
class TransformWithStateValidationSuite extends StateStoreMetricsTest {

0 commit comments

Comments
 (0)