Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ case class ClusteredDistribution(
* Since this distribution relies on [[HashPartitioning]] on the physical partitioning of the
* stateful operator, only [[HashPartitioning]] (and HashPartitioning in
* [[PartitioningCollection]]) can satisfy this distribution.
*
* NOTE: This is applied only stream-stream join as of now. For other stateful operators, we have
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"applied only to stream-stream join"?

* been using ClusteredDistribution, which could construct the physical partitioning of the state
* in different way. (ClusteredDistribution requires relaxed condition and multiple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "in different way (ClusteredDistribution requires) ...": no dot after "way".

* partitionings can satisfy the requirement.) We need to construct the way to fix this with
* minimizing possibility to break the existing checkpoints.
*
* TODO: SPARK-38204 to address above note.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit nit: I saw we usually use the pattern TODO(SPARK-38204)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to use both, but I see more usages on () so I'll follow it. Thanks!

*/
case class StatefulOpClusteredDistribution(
expressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ case class FlatMapGroupsWithStateExec(
* to have the same grouping so that the data are co-lacated on the same task.
*/
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO: SPARK-38204
ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) ::
ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) ::
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ case class StateStoreRestoreExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO: SPARK-38204
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
Expand Down Expand Up @@ -493,6 +496,9 @@ case class StateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO: SPARK-38204
if (keyExpressions.isEmpty) {
AllTuples :: Nil
} else {
Expand Down Expand Up @@ -573,6 +579,9 @@ case class SessionWindowStateStoreRestoreExec(
}

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO: SPARK-38204
ClusteredDistribution(keyWithoutSessionExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

Expand Down Expand Up @@ -684,6 +693,9 @@ case class SessionWindowStateStoreSaveExec(
override def outputPartitioning: Partitioning = child.outputPartitioning

override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO: SPARK-38204
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

Expand Down Expand Up @@ -741,8 +753,12 @@ case class StreamingDeduplicateExec(
extends UnaryExecNode with StateStoreWriter with WatermarkSupport {

/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
override def requiredChildDistribution: Seq[Distribution] = {
// NOTE: Please read through the NOTE on the classdoc of StatefulOpClusteredDistribution
// before making any changes.
// TODO: SPARK-38204
ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: Nil
}

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
Expand Down