Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ object UnsupportedOperationChecker {
" or the output mode is not append on a streaming DataFrames/Datasets")(plan)
}

// Disallow multiple streaming aggregations
val aggregates = collectStreamingAggregates(plan)

if (aggregates.size > 1) {
// multiple aggregates are supported only in append mode
if (outputMode != InternalOutputModes.Append && aggregates.size > 1) {
throwError(
"Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")(plan)
Expand All @@ -96,20 +96,20 @@ object UnsupportedOperationChecker {
// Disallow some output mode
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
val aggregate = aggregates.head

// Find any attributes that are associated with an eventTime watermark.
val watermarkAttributes = aggregate.groupingExpressions.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}
aggregates.foreach(aggregate => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Find any attributes that are associated with an eventTime watermark.
val watermarkAttributes = aggregate.groupingExpressions.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}

// We can append rows to the sink once the group is under the watermark. Without this
// watermark a group is never "finished" so we would never output anything.
if (watermarkAttributes.isEmpty) {
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
// We can append rows to the sink once the group is under the watermark. Without this
// watermark a group is never "finished" so we would never output anything.
if (watermarkAttributes.isEmpty) {
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets without watermark")(plan)
}
}
})

case InternalOutputModes.Complete if aggregates.isEmpty =>
throwError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
Update)

assertNotSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations",
"aggregate - multiple streaming aggregations in update mode",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), streamRelation)),
outputMode = Update,
expectedMsgs = Seq("multiple streaming aggregations"))

assertNotSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations in complete mode",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), streamRelation)),
outputMode = Complete,
expectedMsgs = Seq("multiple streaming aggregations"))

assertSupportedInStreamingPlan(
"aggregate - streaming aggregations in update mode",
Aggregate(Nil, aggExprs("d"), streamRelation),
Expand All @@ -127,6 +133,32 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = Seq("streaming aggregations", "without watermark"))

assertSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations in append mode with watermark",
Aggregate(Seq(attributeWithWatermark), aggExprs("c"),
Aggregate(Seq(attributeWithWatermark), aggExprs("d"), streamRelation)),
outputMode = Append)

assertNotSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations without watermark in append mode",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), streamRelation)),
outputMode = Append,
expectedMsgs = Seq("streaming aggregations", "without watermark"))

assertNotSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations, watermark for second aggregate in append mode",
Aggregate(Nil, aggExprs("c"),
Aggregate(Seq(attributeWithWatermark), aggExprs("d"), streamRelation)),
outputMode = Append,
expectedMsgs = Seq("streaming aggregations", "without watermark"))

assertNotSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations, watermark for first aggregate in append mode",
Aggregate(Seq(attributeWithWatermark), aggExprs("c"),
Aggregate(Nil, aggExprs("d"), streamRelation)),
outputMode = Append,
expectedMsgs = Seq("streaming aggregations", "without watermark"))

// Aggregation: Distinct aggregates not supported on streaming relation
val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c"))
assertSupportedInStreamingPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ object CommitLog {
}


case class CommitMetadata(nextBatchWatermarkMs: Long = 0) {
case class CommitMetadata(nextBatchWatermarkMs: Long = 0,
operatorWatermarks: Map[Long, Long] = Map.empty) {
def json: String = Serialization.write(this)(CommitMetadata.format)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ case class FlatMapGroupsWithStateExec(
true // Always run batches to process timeouts
case EventTimeTimeout =>
// Process another non-data batch only if the watermark has changed in this executed plan
eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get
eventTimeWatermark.isDefined && getWatermark(newMetadata) > eventTimeWatermark.get
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,25 @@ class IncrementalExecution(
numStateStores)
}

// the watermark for the stateful operation if available or the batch watermark
private def getWatermark(operatorId: Long): Long = {
offsetSeqMetadata.operatorWatermarks
.getOrElse(operatorId, offsetSeqMetadata.batchWatermarkMs)
}

/** Locates save/restore pairs surrounding aggregation. */
val state = new Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = plan transform {
case StateStoreSaveExec(keys, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
StateStoreRestoreExec(_, None, _, child))) =>
val aggStateInfo = nextStatefulOperationStateInfo
val aggStateInfo = nextStatefulOperationStateInfo()
StateStoreSaveExec(
keys,
Some(aggStateInfo),
Some(outputMode),
Some(offsetSeqMetadata.batchWatermarkMs),
Some(getWatermark(aggStateInfo.operatorId)),
stateFormatVersion,
agg.withNewChildren(
StateStoreRestoreExec(
Expand All @@ -123,30 +129,33 @@ class IncrementalExecution(
child) :: Nil))

case StreamingDeduplicateExec(keys, child, None, None) =>
val stateInfo = nextStatefulOperationStateInfo()
StreamingDeduplicateExec(
keys,
child,
Some(nextStatefulOperationStateInfo),
Some(offsetSeqMetadata.batchWatermarkMs))
Some(stateInfo),
Some(getWatermark(stateInfo.operatorId)))

case m: FlatMapGroupsWithStateExec =>
val stateInfo = nextStatefulOperationStateInfo()
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
stateInfo = Some(stateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
eventTimeWatermark = Some(getWatermark(stateInfo.operatorId)))

case j: StreamingSymmetricHashJoinExec =>
val stateInfo = nextStatefulOperationStateInfo()
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
stateInfo = Some(stateInfo),
eventTimeWatermark = Some(getWatermark(stateInfo.operatorId)),
stateWatermarkPredicates =
StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition.full,
Some(offsetSeqMetadata.batchWatermarkMs)))
Some(getWatermark(stateInfo.operatorId))))

case l: StreamingGlobalLimitExec =>
l.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
stateInfo = Some(nextStatefulOperationStateInfo()),
outputMode = Some(outputMode))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,11 @@ class MicroBatchExecution(
nextOffsets.metadata.foreach { metadata =>
OffsetSeqMetadata.setSessionConf(metadata, sparkSessionToRunBatches.conf)
offsetSeqMetadata = OffsetSeqMetadata(
metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf)
metadata.batchWatermarkMs, metadata.batchTimestampMs, sparkSessionToRunBatches.conf,
metadata.operatorWatermarks)
watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
watermarkTracker.setWatermark(metadata.batchWatermarkMs)
watermarkTracker.setOperatorWatermarks(metadata.operatorWatermarks)
}

/* identify the current batch id: if commit log indicates we successfully processed the
Expand All @@ -297,6 +299,7 @@ class MicroBatchExecution(
committedOffsets ++= availableOffsets
watermarkTracker.setWatermark(
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
watermarkTracker.setOperatorWatermarks(commitMetadata.operatorWatermarks)
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
Expand Down Expand Up @@ -369,7 +372,8 @@ class MicroBatchExecution(
// Update the query metadata
offsetSeqMetadata = offsetSeqMetadata.copy(
batchWatermarkMs = watermarkTracker.currentWatermark,
batchTimestampMs = triggerClock.getTimeMillis())
batchTimestampMs = triggerClock.getTimeMillis(),
operatorWatermarks = watermarkTracker.currentOperatorWatermarks)

// Check whether next batch should be constructed
val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
Expand Down Expand Up @@ -559,7 +563,8 @@ class MicroBatchExecution(
withProgressLocked {
sinkCommitProgress = batchSinkProgress
watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark,
watermarkTracker.currentOperatorWatermarks))
committedOffsets ++= availableOffsets
}
logDebug(s"Completed batch ${currentBatchId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ object OffsetSeq {
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
* @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions.
* @param operatorWatermarks: the watermarks at the individual stateful operators.
*/
case class OffsetSeqMetadata(
batchWatermarkMs: Long = 0,
batchTimestampMs: Long = 0,
conf: Map[String, String] = Map.empty) {
conf: Map[String, String] = Map.empty,
operatorWatermarks: Map[Long, Long] = Map.empty) {
def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
}

Expand Down Expand Up @@ -114,9 +116,10 @@ object OffsetSeqMetadata extends Logging {
def apply(
batchWatermarkMs: Long,
batchTimestampMs: Long,
sessionConf: RuntimeConfig): OffsetSeqMetadata = {
sessionConf: RuntimeConfig,
operatorWatermarks: Map[Long, Long]): OffsetSeqMetadata = {
val confs = relevantSQLConfs.map { conf => conf.key -> sessionConf.get(conf.key) }.toMap
OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs)
OffsetSeqMetadata(batchWatermarkMs, batchTimestampMs, confs, operatorWatermarks)
}

/** Set the SparkSession configuration with the values in the metadata */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ abstract class StreamExecution(

/** Metadata associated with the offset seq of a batch in the query. */
protected var offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSession.conf)
batchWatermarkMs = 0, batchTimestampMs = 0, sessionConf = sparkSession.conf,
operatorWatermarks = Map.empty)

/**
* A map of current watermarks, keyed by the position of the watermark operator in the
Expand Down Expand Up @@ -277,7 +278,8 @@ abstract class StreamExecution(
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
batchWatermarkMs = 0, batchTimestampMs = 0, sessionConf = sparkSessionForStream.conf,
operatorWatermarks = Map.empty)

if (state.compareAndSet(INITIALIZING, ACTIVE)) {
// Unblock `awaitInitialization`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ case class StreamingSymmetricHashJoinExec(

// Latest watermark value is more than that used in this previous executed plan
val watermarkHasChanged =
eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get
eventTimeWatermark.isDefined && getWatermark(newMetadata) > eventTimeWatermark.get

watermarkUsedForStateCleanup && watermarkHasChanged
}
Expand Down
Loading