Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 28, 2022

What changes were proposed in this pull request?

This PR proposes to use StatefulOpClusteredDistribution for stateful operators which requires exact order of clustering keys without allowing sub-clustering keys, so that stateful operators will have consistent partitioning across lifetime of the query.
(It doesn't cover the case grouping keys are changed. We have state schema checker verifying on the changes, but changing name is allowed so swapping keys with same data type is still allowed. So there are still grey areas.)

The change will break the existing queries having checkpoint in prior to Spark 3.3 and bring silent correctness issues. To remedy the problem, we introduce a new internal config spark.sql.streaming.statefulOperator.useStrictDistribution, which defaults to true for new queries but defaults to false for queries starting from checkpoint in prior to Spark 3.3. If the new config is set to false, stateful operator will use ClusteredDistribution which retains the old requirement of child distribution.

Note that in this change we don't fix the root problem against old checkpoints. Long-term fix should be crafted carefully, after collecting evidence on the impact of SPARK-38204. (e.g. how many queries on end users would encounter SPARK-38204.)

This PR adds E2E tests for the cases which trigger SPARK-38204, and verify the behavior with new query (3.3) & old query (in prior to 3.3).

Why are the changes needed?

Please refer the description of JIRA issue SPARK-38024 for details, since the description is quite long to include here.

Does this PR introduce any user-facing change?

Yes, stateful operators no longer accept the child output partitioning having subset of grouping keys and trigger additional shuffle. This will ensure consistent partitioning with stateful operators across lifetime of the query.

How was this patch tested?

New UTs including backward compatibility are added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed keyExpressions to keyWithoutSessionExpressions which is actually same, but clearer to see the behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate and provide a fix.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR
Copy link
Contributor Author

cc. @cloud-fan @viirya @sunchao @c21 who have context of this fix (this is a continuous work of #35419)
cc. @tdas @zsxwing @brkyvz @xuanyuanking for seeking eyes of reviews on additional streaming experts

@LuciferYang
Copy link
Contributor

4 UT failed, maybe related to the current pr:

  • SPARK-19690: do not convert batch aggregation in streaming query to streaming - state format version 1
  • SPARK-19690: do not convert batch aggregation in streaming query to streaming - state format version 2
  • SPARK-21977: coalesce(1) with aggregation should still be repartitioned when it has non-empty grouping keys - state format version 1
  • SPARK-21977: coalesce(1) with aggregation should still be repartitioned when it has non-empty grouping keys - state format version 2

@HeartSaVioR
Copy link
Contributor Author

Thanks for letting me know. Probably the changed behavior breaks some UTs while it is intentional. I'll audit failed tests and see whether we should fix the tests or remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change of this test is intended - the main change leads the streaming aggregation to always introduce shuffle just after partial aggregation, so that remaining parts execute in the same stage with same output partitioning. The test was relying on old behavior which triggers shuffle just before state store load node in some case.

@HeartSaVioR
Copy link
Contributor Author

Failed test - org.apache.spark.sql.streaming.StreamingQueryListenerSuite.single listener, check trigger events are generated correctly - doesn't seem to be relevant.

Scala 2.13 failure seems to be relevant

[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StatefulOpClusteredDistributionTestHelper.scala:30:70: match may not be exhaustive.
[error] It would fail on the following inputs: (AllTuples, _), (BroadcastDistribution(_), _), (ClusteredDistribution(_, _, _), _), (OrderedDistribution(_), _), (StatefulOpClusteredDistribution(_, _), _), (UnspecifiedDistribution, _)
[error]     plan.requiredChildDistribution.zip(desiredClusterColumns).forall {
[error]                                                                      ^
[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/StatefulOpClusteredDistributionTestHelper.scala:42:70: match may not be exhaustive.
[error] It would fail on the following inputs: (AllTuples, _), (BroadcastDistribution(_), _), (ClusteredDistribution(_, _, _), _), (OrderedDistribution(_), _), (StatefulOpClusteredDistribution(_, _), _), (UnspecifiedDistribution, _)
[error]     plan.requiredChildDistribution.zip(desiredClusterColumns).forall {
[error]                                                                      ^

It caught the missing cases - I may need to add these cases.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 3, 2022

cc. friendly reminder, @cloud-fan @viirya @sunchao @c21 @xuanyuanking

Worth noting that I marked this as a blocker for 3.3 - while this is not a regression, we should not leave this to break more queries silently once we identified a problem. Long-term fix can be done on demand. (e.g. let stateful operator have desired fixed output partitioning despite it's not full cluster keys)

@HeartSaVioR
Copy link
Contributor Author

Friendly reminder, @cloud-fan @viirya @sunchao @c21 @xuanyuanking

@HeartSaVioR HeartSaVioR force-pushed the SPARK-38204-short-term-fix branch from 86055e7 to cc30d92 Compare March 11, 2022 07:03
Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM.
Since this related with compatibility, should we leave some comments in the migration guide?

// merge sessions with calculating aggregation values.
val interExec: SparkPlan = mayAppendMergingSessionExec(groupingExpressions,
aggregateExpressions, partialAggregate)
aggregateExpressions, partialAggregate, isStreaming = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about using default value in mayAppendMergingSessionExec and let here unchange?

// calculate sessions for input rows and update rows' session column, so that further
// aggregations can aggregate input rows for the same session.
val maySessionChild = mayAppendUpdatingSessionExec(groupingExpressions, child)
val maySessionChild = mayAppendUpdatingSessionExec(groupingExpressions, child,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

requiredChildDistributionOption = Some(restored.requiredChildDistribution),
requiredChildDistributionExpressions = Some(groupingWithoutSessionAttributes),
isStreaming = true,
// This will be replaced with actual value in state rule.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's link the state rule class name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't create a dedicate class for state rule. See

/** Locates save/restore pairs surrounding aggregation. */
val state = new Rule[SparkPlan] {
/**
* Ensures that this plan DOES NOT have any stateful operation in it whose pipelined execution
* depends on this plan. In other words, this function returns true if this plan does
* have a narrow dependency on a stateful subplan.
*/
private def hasNoStatefulOp(plan: SparkPlan): Boolean = {
var statefulOpFound = false
def findStatefulOp(planToCheck: SparkPlan): Unit = {
planToCheck match {
case s: StatefulOperator =>
statefulOpFound = true
case e: ShuffleExchangeLike =>
// Don't search recursively any further as any child stateful operator as we
// are only looking for stateful subplans that this plan has narrow dependencies on.
case p: SparkPlan =>
p.children.foreach(findStatefulOp)
}
}
findStatefulOp(plan)
!statefulOpFound
}
override def apply(plan: SparkPlan): SparkPlan = plan transform {
case StateStoreSaveExec(keys, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
StateStoreRestoreExec(_, None, _, child))) =>
val aggStateInfo = nextStatefulOperationStateInfo
StateStoreSaveExec(
keys,
Some(aggStateInfo),
Some(outputMode),
Some(offsetSeqMetadata.batchWatermarkMs),
stateFormatVersion,
agg.withNewChildren(
StateStoreRestoreExec(
keys,
Some(aggStateInfo),
stateFormatVersion,
child) :: Nil))
case SessionWindowStateStoreSaveExec(keys, session, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
SessionWindowStateStoreRestoreExec(_, _, None, None, _, child))) =>
val aggStateInfo = nextStatefulOperationStateInfo
SessionWindowStateStoreSaveExec(
keys,
session,
Some(aggStateInfo),
Some(outputMode),
Some(offsetSeqMetadata.batchWatermarkMs),
stateFormatVersion,
agg.withNewChildren(
SessionWindowStateStoreRestoreExec(
keys,
session,
Some(aggStateInfo),
Some(offsetSeqMetadata.batchWatermarkMs),
stateFormatVersion,
child) :: Nil))
case StreamingDeduplicateExec(keys, child, None, None) =>
StreamingDeduplicateExec(
keys,
child,
Some(nextStatefulOperationStateInfo),
Some(offsetSeqMetadata.batchWatermarkMs))
case m: FlatMapGroupsWithStateExec =>
// We set this to true only for the first batch of the streaming query.
val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
hasInitialState = hasInitialState
)
case j: StreamingSymmetricHashJoinExec =>
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
stateWatermarkPredicates =
StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition.full,
Some(offsetSeqMetadata.batchWatermarkMs)))
case l: StreamingGlobalLimitExec =>
l.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
outputMode = Some(outputMode))
case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
// Optimize limit execution by replacing StreamingLocalLimitExec (consumes the iterator
// completely) to LocalLimitExec (does not consume the iterator) when the child plan has
// no stateful operator (i.e., consuming the iterator is not needed).
LocalLimitExec(limit, child)
}
}

UpdatingSessionsExec(
isStreaming = isStreaming,
// numShufflePartitions will be set to None, and replaced to the actual value in the
// state rule if the query is streaming.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

* for details.
*
* Do not use methods in this object for stateful operators which already uses
* StatefulOpClusteredDistribution as its required child distribution.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: [[StatefulOpClusteredDistribution]]

"This config will be set to true for new streaming queries to guarantee stable state " +
"partitioning, and set to false for existing streaming queries to not break queries " +
"which are restored from existing checkpoints. Please refer SPARK-38204 for details. " +
"The purpose of this config is only compatibility; DO NOT MANUALLY CHANGE THIS!!!")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move the description of This config is for compatility only; DO NOT MANUALLY CHANGE THIS!!! as the first sentence in the doc.

@github-actions github-actions bot added the DOCS label Mar 15, 2022
@HeartSaVioR
Copy link
Contributor Author

@xuanyuanking Thanks for the review! I addressed your review comments. Please take a look again.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HeartSaVioR for the work! LGTM with some minor comments.

getClusteredDistributionWithBackwardCompatibility(expressions, stateInfo.numPartitions, conf)
}

def getClusteredDistributionWithBackwardCompatibility(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it feels a little bit too long, how about something like getCompatibleDistribution, or just getDistribution given the class name StatefulOperatorPartitioning already has some context. Non-blocking comment. cc @cloud-fan as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCompatibleDistribution sounds good to me. Not that long and produces all context. Thanks for the suggestion!

SymmetricHashJoinStateManager.legacyVersion.toString,
STATE_STORE_COMPRESSION_CODEC.key -> "lz4"
STATE_STORE_COMPRESSION_CODEC.key -> "lz4",
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION.key -> "false"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat workaround!

exprs, parts, conf) :: Nil

case _ =>
throw new IllegalStateException("Expected to set the number of partitions before " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also add a require assertion at class level, e.g. at Line 38

require(isStreaming == numShufflePartitions.isDefined, "Expected to set the number of partitions for streaming aggregate")

Or we can define only one variable numStatefulShufflePartitions: Option[Int] instead of two: isStreaming and numShufflePartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create a node with numShufflePartitions = None and replace the value in state rule. That said, we can't check the condition before state rule has been performed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use the new error framework to throw exception in newly added code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error framework is for user facing errors. This is something like "this should not be called, internal error". I just made the error message be general to make our developer life be easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xuanyuanking
Copy link
Member

Thank you @HeartSaVioR for the fix. Merged to master!
We can use a separate PR to do the 3.2 backport if we want.

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging! Let's port back this to the next bugfix release(s) as this is about correctness. I'll craft a PR for 3.2 branch as well. Not sure we will plan to have another 3.1.x after having 3.3.0, hence let's do this for 3.2 only for now.

HeartSaVioR added a commit to HeartSaVioR/spark that referenced this pull request Mar 18, 2022
…s with respecting backward compatibility

This PR proposes to use HashClusteredDistribution for stateful operators which requires exact order of clustering keys without allowing sub-clustering keys, so that stateful operators will have consistent partitioning across lifetime of the query.
(It doesn't cover the case grouping keys are changed. We have state schema checker verifying on the changes, but changing name is allowed so swapping keys with same data type is still allowed. So there are still grey areas.)

The change will break the existing queries having checkpoint in prior to Spark 3.2.2 and bring silent correctness issues. To remedy the problem, we introduce a new internal config `spark.sql.streaming.statefulOperator.useStrictDistribution`, which defaults to true for new queries but defaults to false for queries starting from checkpoint in prior to Spark 3.2.2. If the new config is set to false, stateful operator will use ClusteredDistribution which retains the old requirement of child distribution.

Note that in this change we don't fix the root problem against old checkpoints. Long-term fix should be crafted carefully, after collecting evidence on the impact of SPARK-38204. (e.g. how many queries on end users would encounter SPARK-38204.)

This PR adds E2E tests for the cases which trigger SPARK-38204, and verify the behavior with new query (3.2.2) & old query (in prior to 3.2.2).

Please refer the description of JIRA issue [SPARK-38024](https://issues.apache.org/jira/browse/SPARK-38204) for details, since the description is quite long to include here.

Yes, stateful operators no longer accept the child output partitioning having subset of grouping keys and trigger additional shuffle. This will ensure consistent partitioning with stateful operators across lifetime of the query.

New UTs including backward compatibility are added.

Closes apache#35673 from HeartSaVioR/SPARK-38204-short-term-fix.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Yuanjian Li <[email protected]>
@HeartSaVioR
Copy link
Contributor Author

FYI I submitted a PR for 3.2 #35908.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants