Skip to content

Commit 9688905

Browse files
committed
validation
1 parent 405eb55 commit 9688905

2 files changed

Lines changed: 58 additions & 50 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.json4s.jackson.JsonMethods
2828
import org.json4s.jackson.JsonMethods.{compact, render}
2929

3030
import org.apache.spark.broadcast.Broadcast
31-
import org.apache.spark.internal.Logging
3231
import org.apache.spark.rdd.RDD
3332
import org.apache.spark.sql.catalyst.InternalRow
3433
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -439,56 +438,62 @@ case class TransformWithStateExec(
439438
}
440439
}
441440

441+
private def checkOperatorPropEquality[T](
442+
fieldName: String,
443+
oldMetadataV2: OperatorStateMetadataV2,
444+
newMetadataV2: OperatorStateMetadataV2): Unit = {
445+
val oldJsonString = oldMetadataV2.operatorPropertiesJson
446+
val newJsonString = newMetadataV2.operatorPropertiesJson
447+
// verify that timeMode, outputMode are the same
448+
implicit val formats: DefaultFormats.type = DefaultFormats
449+
val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]]
450+
val newJsonProps = JsonMethods.parse(newJsonString).extract[Map[String, Any]]
451+
val oldProp = oldJsonProps(fieldName).asInstanceOf[T]
452+
val newProp = newJsonProps(fieldName).asInstanceOf[T]
453+
if (oldProp != newProp) {
454+
throw StateStoreErrors.invalidConfigChangedAfterRestart(
455+
fieldName,
456+
oldProp.toString,
457+
newProp.toString
458+
)
459+
}
460+
}
461+
462+
private def checkStateVariableEquality(oldMetadataV2: OperatorStateMetadataV2): Unit = {
463+
val oldJsonString = oldMetadataV2.operatorPropertiesJson
464+
implicit val formats: DefaultFormats.type = DefaultFormats
465+
val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]]
466+
// compare state variable infos
467+
val oldStateVariableInfos = oldJsonProps("stateVariables").
468+
asInstanceOf[List[Map[String, Any]]]
469+
.map(TransformWithStateVariableInfo.fromMap)
470+
val newStateVariableInfos = getStateVariableInfos()
471+
oldStateVariableInfos.foreach { oldInfo =>
472+
val newInfo = newStateVariableInfos.get(oldInfo.stateName)
473+
newInfo match {
474+
case Some(stateVarInfo) =>
475+
if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) {
476+
throw StateStoreErrors.invalidVariableTypeChange(
477+
stateVarInfo.stateName,
478+
oldInfo.stateVariableType.toString,
479+
stateVarInfo.stateVariableType.toString
480+
)
481+
}
482+
case None =>
483+
}
484+
}
485+
}
486+
442487
def validateMetadatas(
443488
oldMetadata: OperatorStateMetadata,
444489
newMetadata: OperatorStateMetadata): Unit = {
445-
// if both metadatas are instance of OperatorStateMetadatV2
446490
(oldMetadata, newMetadata) match {
447-
case (oldMetadataV2: OperatorStateMetadataV2,
491+
case (
492+
oldMetadataV2: OperatorStateMetadataV2,
448493
newMetadataV2: OperatorStateMetadataV2) =>
449-
val oldJsonString = oldMetadataV2.operatorPropertiesJson
450-
val newJsonString = newMetadataV2.operatorPropertiesJson
451-
// verify that timeMode, outputMode are the same
452-
implicit val formats: DefaultFormats.type = DefaultFormats
453-
val oldJsonProps = JsonMethods.parse(oldJsonString).extract[Map[String, Any]]
454-
val newJsonProps = JsonMethods.parse(newJsonString).extract[Map[String, Any]]
455-
val oldTimeMode = oldJsonProps("timeMode").asInstanceOf[String]
456-
val oldOutputMode = oldJsonProps("outputMode").asInstanceOf[String]
457-
val newTimeMode = newJsonProps("timeMode").asInstanceOf[String]
458-
val newOutputMode = newJsonProps("outputMode").asInstanceOf[String]
459-
if (oldTimeMode != newTimeMode) {
460-
throw StateStoreErrors.invalidConfigChangedAfterRestart(
461-
"timeMode",
462-
oldTimeMode,
463-
newTimeMode
464-
)
465-
}
466-
if (oldOutputMode != newOutputMode) {
467-
throw StateStoreErrors.invalidConfigChangedAfterRestart(
468-
"outputMode",
469-
oldOutputMode,
470-
newOutputMode
471-
)
472-
}
473-
// compare state variable infos
474-
val oldStateVariableInfos = oldJsonProps("stateVariables").
475-
asInstanceOf[List[Map[String, Any]]]
476-
.map(TransformWithStateVariableInfo.fromMap)
477-
val newStateVariableInfos = getStateVariableInfos()
478-
oldStateVariableInfos.foreach { oldInfo =>
479-
val newInfo = newStateVariableInfos.get(oldInfo.stateName)
480-
newInfo match {
481-
case Some(stateVarInfo) =>
482-
if (oldInfo.stateVariableType != stateVarInfo.stateVariableType) {
483-
throw StateStoreErrors.invalidVariableTypeChange(
484-
stateVarInfo.stateName,
485-
oldInfo.stateVariableType.toString,
486-
stateVarInfo.stateVariableType.toString
487-
)
488-
}
489-
case None =>
490-
}
491-
}
494+
checkOperatorPropEquality[String]("timeMode", oldMetadataV2, newMetadataV2)
495+
checkOperatorPropEquality[String]("outputMode", oldMetadataV2, newMetadataV2)
496+
checkStateVariableEquality(oldMetadataV2)
492497
case (_, _) =>
493498
}
494499
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
973973
}
974974

975975
test("transformWithState - verify that OperatorStateMetadataV2" +
976-
" file is being written correctly") {
976+
" integrates with state-metadata source") {
977977
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
978978
classOf[RocksDBStateStoreProvider].getName,
979979
SQLConf.SHUFFLE_PARTITIONS.key ->
@@ -1002,17 +1002,20 @@ class TransformWithStateSuite extends StateStoreMetricsTest
10021002
Row(0, "transformWithStateExec", "default", 5, 0L, 0L),
10031003
Row(0, "transformWithStateExec", "default", 5, 1L, 1L)
10041004
))
1005+
// need line to be unbroken, otherwise the test will fail.
1006+
// scalastyle:off
1007+
val expectedAnswer = """{"timeMode":"NoTime","outputMode":"Update","stateVariables":[{"stateName":"countState","stateVariableType":"ValueState","ttlEnabled":false}]}"""
1008+
// scalastyle:on
10051009
checkAnswer(df.select(df.metadataColumn("_operatorProperties")),
10061010
Seq(
1007-
Row("""{"timeMode":"NoTime","outputMode":"Update"}"""),
1008-
Row("""{"timeMode":"NoTime","outputMode":"Update"}""")
1011+
Row(expectedAnswer),
1012+
Row(expectedAnswer)
10091013
)
10101014
)
10111015
}
10121016
}
10131017
}
10141018

1015-
10161019
test("transformWithState - verify that metadata logs are purged") {
10171020
withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
10181021
classOf[RocksDBStateStoreProvider].getName,

0 commit comments

Comments
 (0)