Skip to content
Closed
4 changes: 4 additions & 0 deletions docs/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).

## Upgrading from Structured Streaming 3.2 to 3.3

- Since Spark 3.3, all stateful operators require hash partitioning with exact grouping keys. In previous versions, all stateful operators except stream-stream join require loose partitioning criteria which opens the possibility on correctness issue. (See [SPARK-38204](https://issues.apache.org/jira/browse/SPARK-38204) for more details.) To ensure backward compatibility, we retain the old behavior with the checkpoint built from older versions.

## Upgrading from Structured Streaming 3.0 to 3.1

- In Spark 3.0 and before, for the queries that have stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded, Spark only prints a warning message. Since Spark 3.1, Spark will check for such queries with possible correctness issue and throw AnalysisException for it by default. For the users who understand the possible risk of correctness issue and still decide to run the query, please disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION =
buildConf("spark.sql.streaming.statefulOperator.useStrictDistribution")
.internal()
.doc("The purpose of this config is only compatibility; DO NOT MANUALLY CHANGE THIS!!! " +
"When true, the stateful operator for streaming query will use " +
"StatefulOpClusteredDistribution which guarantees stable state partitioning as long as " +
"the operator provides consistent grouping keys across the lifetime of query. " +
"When false, the stateful operator for streaming query will use ClusteredDistribution " +
"which is not sufficient to guarantee stable state partitioning despite the operator " +
"provides consistent grouping keys across the lifetime of query. " +
"This config will be set to true for new streaming queries to guarantee stable state " +
"partitioning, and set to false for existing streaming queries to not break queries " +
"which are restored from existing checkpoints. Please refer SPARK-38204 for details.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val FILESTREAM_SINK_METADATA_IGNORED =
buildConf("spark.sql.streaming.fileStreamSink.ignoreMetadata")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,28 @@ object AggUtils {
}
}

private def createStreamingAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan = {
createAggregate(
requiredChildDistributionExpressions,
isStreaming = true,
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOffset = initialInputBufferOffset,
resultExpressions = resultExpressions,
child = child)
}

private def createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
isStreaming: Boolean = false,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
Expand All @@ -60,6 +80,8 @@ object AggUtils {
if (useHash && !forceSortAggregate) {
HashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
numShufflePartitions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = mayRemoveAggFilters(aggregateExpressions),
aggregateAttributes = aggregateAttributes,
Expand All @@ -73,6 +95,8 @@ object AggUtils {
if (objectHashEnabled && useObjectHash && !forceSortAggregate) {
ObjectHashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
numShufflePartitions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = mayRemoveAggFilters(aggregateExpressions),
aggregateAttributes = aggregateAttributes,
Expand All @@ -82,6 +106,8 @@ object AggUtils {
} else {
SortAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
numShufflePartitions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = mayRemoveAggFilters(aggregateExpressions),
aggregateAttributes = aggregateAttributes,
Expand Down Expand Up @@ -290,7 +316,7 @@ object AggUtils {
val partialAggregate: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Partial))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
Expand All @@ -302,7 +328,7 @@ object AggUtils {
val partialMerged1: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
requiredChildDistributionExpressions =
Some(groupingAttributes),
groupingExpressions = groupingAttributes,
Expand All @@ -320,7 +346,7 @@ object AggUtils {
val partialMerged2: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
requiredChildDistributionExpressions =
Some(groupingAttributes),
groupingExpressions = groupingAttributes,
Expand Down Expand Up @@ -348,7 +374,7 @@ object AggUtils {
// projection:
val finalAggregateAttributes = finalAggregateExpressions.map(_.resultAttribute)

createAggregate(
createStreamingAggregate(
requiredChildDistributionExpressions = Some(groupingAttributes),
groupingExpressions = groupingAttributes,
aggregateExpressions = finalAggregateExpressions,
Expand Down Expand Up @@ -407,7 +433,7 @@ object AggUtils {
val partialAggregate: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Partial))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
Expand All @@ -424,7 +450,8 @@ object AggUtils {
// this is to reduce amount of rows to shuffle
MergingSessionsExec(
requiredChildDistributionExpressions = None,
requiredChildDistributionOption = None,
isStreaming = true,
numShufflePartitions = None,
groupingExpressions = groupingAttributes,
sessionExpression = sessionExpression,
aggregateExpressions = aggregateExpressions,
Expand All @@ -447,8 +474,10 @@ object AggUtils {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
MergingSessionsExec(
requiredChildDistributionExpressions = None,
requiredChildDistributionOption = Some(restored.requiredChildDistribution),
requiredChildDistributionExpressions = Some(groupingWithoutSessionAttributes),
isStreaming = true,
// This will be replaced with actual value in state rule.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's link the state rule class name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't create a dedicate class for state rule. See

/** Locates save/restore pairs surrounding aggregation. */
val state = new Rule[SparkPlan] {
/**
* Ensures that this plan DOES NOT have any stateful operation in it whose pipelined execution
* depends on this plan. In other words, this function returns true if this plan does
* have a narrow dependency on a stateful subplan.
*/
private def hasNoStatefulOp(plan: SparkPlan): Boolean = {
var statefulOpFound = false
def findStatefulOp(planToCheck: SparkPlan): Unit = {
planToCheck match {
case s: StatefulOperator =>
statefulOpFound = true
case e: ShuffleExchangeLike =>
// Don't search recursively any further as any child stateful operator as we
// are only looking for stateful subplans that this plan has narrow dependencies on.
case p: SparkPlan =>
p.children.foreach(findStatefulOp)
}
}
findStatefulOp(plan)
!statefulOpFound
}
override def apply(plan: SparkPlan): SparkPlan = plan transform {
case StateStoreSaveExec(keys, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
StateStoreRestoreExec(_, None, _, child))) =>
val aggStateInfo = nextStatefulOperationStateInfo
StateStoreSaveExec(
keys,
Some(aggStateInfo),
Some(outputMode),
Some(offsetSeqMetadata.batchWatermarkMs),
stateFormatVersion,
agg.withNewChildren(
StateStoreRestoreExec(
keys,
Some(aggStateInfo),
stateFormatVersion,
child) :: Nil))
case SessionWindowStateStoreSaveExec(keys, session, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
SessionWindowStateStoreRestoreExec(_, _, None, None, _, child))) =>
val aggStateInfo = nextStatefulOperationStateInfo
SessionWindowStateStoreSaveExec(
keys,
session,
Some(aggStateInfo),
Some(outputMode),
Some(offsetSeqMetadata.batchWatermarkMs),
stateFormatVersion,
agg.withNewChildren(
SessionWindowStateStoreRestoreExec(
keys,
session,
Some(aggStateInfo),
Some(offsetSeqMetadata.batchWatermarkMs),
stateFormatVersion,
child) :: Nil))
case StreamingDeduplicateExec(keys, child, None, None) =>
StreamingDeduplicateExec(
keys,
child,
Some(nextStatefulOperationStateInfo),
Some(offsetSeqMetadata.batchWatermarkMs))
case m: FlatMapGroupsWithStateExec =>
// We set this to true only for the first batch of the streaming query.
val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
hasInitialState = hasInitialState
)
case j: StreamingSymmetricHashJoinExec =>
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
stateWatermarkPredicates =
StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition.full,
Some(offsetSeqMetadata.batchWatermarkMs)))
case l: StreamingGlobalLimitExec =>
l.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
outputMode = Some(outputMode))
case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
// Optimize limit execution by replacing StreamingLocalLimitExec (consumes the iterator
// completely) to LocalLimitExec (does not consume the iterator) when the child plan has
// no stateful operator (i.e., consuming the iterator is not needed).
LocalLimitExec(limit, child)
}
}

numShufflePartitions = None,
groupingExpressions = groupingAttributes,
sessionExpression = sessionExpression,
aggregateExpressions = aggregateExpressions,
Expand Down Expand Up @@ -476,8 +505,8 @@ object AggUtils {
// projection:
val finalAggregateAttributes = finalAggregateExpressions.map(_.resultAttribute)

createAggregate(
requiredChildDistributionExpressions = Some(groupingAttributes),
createStreamingAggregate(
requiredChildDistributionExpressions = Some(groupingWithoutSessionAttributes),
groupingExpressions = groupingAttributes,
aggregateExpressions = finalAggregateExpressions,
aggregateAttributes = finalAggregateAttributes,
Expand All @@ -491,10 +520,15 @@ object AggUtils {

private def mayAppendUpdatingSessionExec(
groupingExpressions: Seq[NamedExpression],
maybeChildPlan: SparkPlan): SparkPlan = {
maybeChildPlan: SparkPlan,
isStreaming: Boolean = false): SparkPlan = {
groupingExpressions.find(_.metadata.contains(SessionWindow.marker)) match {
case Some(sessionExpression) =>
UpdatingSessionsExec(
isStreaming = isStreaming,
// numShufflePartitions will be set to None, and replaced to the actual value in the
// state rule if the query is streaming.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

numShufflePartitions = None,
groupingExpressions.map(_.toAttribute),
sessionExpression.toAttribute,
maybeChildPlan)
Expand All @@ -506,7 +540,8 @@ object AggUtils {
private def mayAppendMergingSessionExec(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
partialAggregate: SparkPlan): SparkPlan = {
partialAggregate: SparkPlan,
isStreaming: Boolean = false): SparkPlan = {
groupingExpressions.find(_.metadata.contains(SessionWindow.marker)) match {
case Some(sessionExpression) =>
val aggExpressions = aggregateExpressions.map(_.copy(mode = PartialMerge))
Expand All @@ -519,7 +554,10 @@ object AggUtils {

MergingSessionsExec(
requiredChildDistributionExpressions = Some(groupingWithoutSessionsAttributes),
requiredChildDistributionOption = None,
isStreaming = isStreaming,
// numShufflePartitions will be set to None, and replaced to the actual value in the
// state rule if the query is streaming.
numShufflePartitions = None,
groupingExpressions = groupingAttributes,
sessionExpression = sessionExpression,
aggregateExpressions = aggExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, ExplainUtils, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning

/**
* Holds common logic for aggregate operators
*/
trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning {
def requiredChildDistributionExpressions: Option[Seq[Expression]]
def isStreaming: Boolean
def numShufflePartitions: Option[Int]
def groupingExpressions: Seq[NamedExpression]
def aggregateExpressions: Seq[AggregateExpression]
def aggregateAttributes: Seq[Attribute]
Expand Down Expand Up @@ -92,7 +95,20 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning
override def requiredChildDistribution: List[Distribution] = {
requiredChildDistributionExpressions match {
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
case Some(exprs) => ClusteredDistribution(exprs) :: Nil
case Some(exprs) =>
if (isStreaming) {
numShufflePartitions match {
case Some(parts) =>
StatefulOperatorPartitioning.getCompatibleDistribution(
exprs, parts, conf) :: Nil

case _ =>
throw new IllegalStateException("Expected to set the number of partitions before " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also add a require assertion at class level, e.g. at Line 38

require(isStreaming == numShufflePartitions.isDefined, "Expected to set the number of partitions for streaming aggregate")

Or we can define only one variable numStatefulShufflePartitions: Option[Int] instead of two: isStreaming and numShufflePartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create a node with numShufflePartitions = None and replace the value in state rule. That said, we can't check the condition before state rule has been performed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use the new error framework to throw exception in newly added code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error framework is for user facing errors. This is something like "this should not be called, internal error". I just made the error message be general to make our developer life be easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"constructing required child distribution!")
}
} else {
ClusteredDistribution(exprs) :: Nil
}
case None => UnspecifiedDistribution :: Nil
}
}
Expand All @@ -102,7 +118,8 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning
*/
def toSortAggregate: SortAggregateExec = {
SortAggregateExec(
requiredChildDistributionExpressions, groupingExpressions, aggregateExpressions,
aggregateAttributes, initialInputBufferOffset, resultExpressions, child)
requiredChildDistributionExpressions, isStreaming, numShufflePartitions, groupingExpressions,
aggregateExpressions, aggregateAttributes, initialInputBufferOffset, resultExpressions,
child)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import org.apache.spark.util.Utils
*/
case class HashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, MutableProjection, NamedExpression, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetrics

Expand All @@ -41,7 +40,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class MergingSessionsExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
requiredChildDistributionOption: Option[Seq[Distribution]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
sessionExpression: NamedExpression,
aggregateExpressions: Seq[AggregateExpression],
Expand All @@ -59,17 +59,6 @@ case class MergingSessionsExec(

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def requiredChildDistribution: List[Distribution] = {
requiredChildDistributionExpressions match {
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
case Some(exprs) => ClusteredDistribution(exprs) :: Nil
case None => requiredChildDistributionOption match {
case Some(distributions) => distributions.toList
case None => UnspecifiedDistribution :: Nil
}
}
}

override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
Seq((keyWithoutSessionExpressions ++ Seq(sessionExpression)).map(SortOrder(_, Ascending)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class ObjectHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.sql.internal.SQLConf
*/
case class SortAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning

/**
* This node updates the session window spec of each input rows via analyzing neighbor rows and
Expand All @@ -35,6 +36,8 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
* Refer [[UpdatingSessionsIterator]] for more details.
*/
case class UpdatingSessionsExec(
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpression: Seq[Attribute],
sessionExpression: Attribute,
child: SparkPlan) extends UnaryExecNode {
Expand Down Expand Up @@ -63,7 +66,20 @@ case class UpdatingSessionsExec(
if (groupingWithoutSessionExpression.isEmpty) {
AllTuples :: Nil
} else {
ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
if (isStreaming) {
numShufflePartitions match {
case Some(parts) =>
StatefulOperatorPartitioning.getCompatibleDistribution(
groupingWithoutSessionExpression, parts, conf) :: Nil

case _ =>
throw new IllegalStateException("Expected to set the number of partitions before " +
"constructing required child distribution!")
}

} else {
ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
import org.apache.spark.sql.execution.streaming.state._
Expand Down Expand Up @@ -93,13 +93,10 @@ case class FlatMapGroupsWithStateExec(
* to have the same grouping so that the data are co-lacated on the same task.
*/
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO(SPARK-38204)
ClusteredDistribution(
groupingAttributes, requiredNumPartitions = stateInfo.map(_.numPartitions)) ::
ClusteredDistribution(
initialStateGroupAttrs, requiredNumPartitions = stateInfo.map(_.numPartitions)) ::
StatefulOperatorPartitioning.getCompatibleDistribution(
groupingAttributes, getStateInfo, conf) ::
StatefulOperatorPartitioning.getCompatibleDistribution(
initialStateGroupAttrs, getStateInfo, conf) ::
Nil
}

Expand Down
Loading