Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode

/**
Expand All @@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging {
}
}

/**
* Checks for possible correctness issue in chained stateful operators. The behavior is
* controlled by SQL config `spark.sql.streaming.statefulOperator.correctnessCheck"`. Once it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove "

* is enabled, an analysis exception will be thrown. Otherwise, Spark will just print a warning
* message which is the behavior before Spark 3.1.0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the practice here so just a 2 cents - as we put the information to both config and migration doc, it feels a bit verbose to repeat which is the behavior before Spark 3.1.0.

*/
def checkStreamingQueryGlobalWatermarkLimit(
plan: LogicalPlan,
outputMode: OutputMode,
failWhenDetected: Boolean): Unit = {
outputMode: OutputMode): Unit = {
def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match {
case s: Aggregate
if s.isStreaming && outputMode == InternalOutputModes.Append => true
Expand All @@ -62,6 +68,8 @@ object UnsupportedOperationChecker extends Logging {
case _ => false
}

val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled

try {
plan.foreach { subPlan =>
if (isStatefulOperation(subPlan)) {
Expand All @@ -73,7 +81,10 @@ object UnsupportedOperationChecker extends Logging {
"The query contains stateful operation which can emit rows older than " +
"the current watermark plus allowed late record delay, which are \"late rows\"" +
" in downstream stateful operations and these rows can be discarded. " +
"Please refer the programming guide doc for more details."
"Please refer the programming guide doc for more details. If you understand " +
"the possible risk of correctness issue and still need to run the query, " +
"you can disable this check by setting the config " +
"`spark.sql.streaming.statefulOperator.correctnessCheck` to false."
throwError(errorMsg)(plan)
}
}
Expand Down Expand Up @@ -388,7 +399,7 @@ object UnsupportedOperationChecker extends Logging {
checkUnsupportedExpressions(subPlan)
}

checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false)
checkStreamingQueryGlobalWatermarkLimit(plan, outputMode)
}

def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,21 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED =
buildConf("spark.sql.streaming.statefulOperator.correctnessCheck")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to add .enabled according to the naming policy, and also it sounds more natural to say checkCorrectness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And once we change we also need to change UnsupportedOperationChecker as well.

.internal()
.doc("When true, the stateful operators for streaming query will be checked for possible " +
"correctness issue due to global watermark. The correctness issue comes from queries " +
"containing stateful operation which can emit rows older than the current watermark " +
"plus allowed late record delay, which are \"late rows\" in downstream stateful " +
"operations and these rows can be discarded. Please refer the programming guide doc for " +
"more details. Once the issue is detected, Spark will throw analysis exception. " +
"When this config is disabled, Spark will just print warning message for users. " +
"Prior to Spark 3.1.0, the behavior is disabling this config.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)

val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Expand Down Expand Up @@ -3028,6 +3043,9 @@ class SQLConf extends Serializable with Logging {

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def statefulOperatorCorrectnessCheckEnabled: Boolean =
getConf(STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)

def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder}
import org.apache.spark.unsafe.types.CalendarInterval

/** A dummy command for testing unsupported operations. */
case class DummyCommand() extends Command

class UnsupportedOperationsSuite extends SparkFunSuite {
class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper {

val attribute = AttributeReference("a", IntegerType, nullable = true)()
val watermarkMetadata = new MetadataBuilder()
Expand Down Expand Up @@ -218,6 +219,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update"))

// FlatMapGroupsWithState(Append) in streaming with aggregation
// Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled.
for (outputMode <- Seq(Append, Update, Complete)) {
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Append) " +
Expand All @@ -228,7 +230,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
FlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null,
streamRelation)),
outputMode = outputMode)
outputMode = outputMode,
SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key -> "false")
}

for (outputMode <- Seq(Append, Update)) {
Expand Down Expand Up @@ -268,14 +271,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
}

// multiple FlatMapGroupsWithStates
// Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled.
assertSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " +
"in append mode",
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null,
FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append,
isMapGroupsWithState = false, null, streamRelation)),
outputMode = Append)
outputMode = Append,
SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key -> "false")

assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" +
Expand Down Expand Up @@ -995,9 +1000,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
def assertSupportedInStreamingPlan(
name: String,
plan: LogicalPlan,
outputMode: OutputMode): Unit = {
outputMode: OutputMode,
configs: (String, String)*): Unit = {
test(s"streaming plan - $name: supported") {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
withSQLConf(configs: _*) {
UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode)
}
}
}

Expand Down Expand Up @@ -1070,14 +1078,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
expectFailure: Boolean): Unit = {
test(s"Global watermark limit - $testNamePostfix") {
if (expectFailure) {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
withSQLConf(SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key -> "true") {
val e = intercept[AnalysisException] {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode)
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
}
assert(e.message.contains("Detected pattern of possible 'correctness' issue"))
} else {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode, failWhenDetected = true)
withSQLConf(SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key -> "false") {
UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit(
wrapInStreaming(plan), outputMode)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
def testWithAllStateVersions(name: String)(func: => Unit): Unit = {
for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) {
test(s"$name - state format version $version") {
withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString) {
withSQLConf(
SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString,
SQLConf.STATEFUL_OPERATOR_CORRECTNESS_CHECK_ENABLED.key -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to turn off the config to run the test, I think the test needs to be reconsidered - we may give misleading and discouraged patterns from tests.

That's not necessarily to be addressed in this PR altogether (follow-up or a new JIRA issue), as removing tests have been always a concern and that leads another kind of discussion.

func
}
}
Expand Down