Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming.continuous

import java.util.concurrent.atomic.AtomicLong

import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

Expand All @@ -36,6 +38,9 @@ class RateStreamContinuousStream(rowsPerSecond: Long, numPartitions: Int) extend

val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

private[sql] val highestCommittedValue = new AtomicLong(Long.MinValue)
private[sql] val firstCommittedTime = new AtomicLong(Long.MinValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have a doubt on the value of these. We usually don't add a variable like this just for testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I commented earlier, creationTime in this class above is one of fields just for testing.

Unlike microbatch mode, it's not easy to track and capture the progress. That might be the reason we were relying on wait time. I totally agree the change is not beauty, but given rate source is for testing/benchmarking purpose, and end users don't deal with RateStreamContinuousStream directly, we might be just OK with it.

If it's still a thing for us to be concerned, please let me know, and we can rollback the change and just add more seconds in wait time to make test be higher rate to be passed.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first approach is definitely more ugly than this. Just a question (without having a deep look) is there a metric maybe which can be used? Or just add something like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream query listener is not ready for continuous processing. #24537 is trying to address it, but seems like it's not ready. Actually the query listener mainly reports per batch to show changing numbers, and if we apply this to continuous processing, the report could be far behind if there's skew among partitions and epochs for partitions differ.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you subclass this for testing and only add the new fields in the subclass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also require a new data source provider, as we just use it by the name of data source.

The change would bring the test version of RateStreamProvider, and test version of RateStreamTable (maybe subclassing to deduplicate), and test version of RateStreamContinuousStream. I'd like to confirm whether it's OK to apply the change since the changeset is going to be bigger.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, hmm, it's not instantiated directly.
I'm also uneasy about adding this just for a test; is there any way to avoid this while fixing the test, even if it means the test can't test as much?
I'm not super against this though. I'd mark it as visible for testing and comment of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK maybe I can try to find alternative approach first and stick with this only when I can't find one. I didn't do that since alternatives would be pretty less simpler, but I feel we would like to avoid the change like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I should admit that I just realized creationTime is not only used for testing. My bad. Sorry about the missing.


override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
assert(offsets.length == numPartitions)
val tuples = offsets.map {
Expand Down Expand Up @@ -82,7 +87,16 @@ class RateStreamContinuousStream(rowsPerSecond: Long, numPartitions: Int) extend
RateStreamContinuousReaderFactory
}

override def commit(end: Offset): Unit = {}
override def commit(end: Offset): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change shouldn't bring noticeable perf hit, as it is only called per epoch which interval would be at least hundreds of milliseconds.

end.asInstanceOf[RateStreamOffset].partitionToValueAndRunTimeMs.foreach {
case (_, ValueRunTimeMsPair(value, _)) =>
if (highestCommittedValue.get() < value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the 'atomic' part is essential here, but if it is, I think you have a race condition here. You'd want to use updateAndGet or something to make sure the check and update are atomic.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out! Only one writer and one reader will run concurrently. I'm revisiting the change, and it looks like just over-engineering. volatile would just work. If reader is reading the old value they just need to wait a bit more, so not strictly need to have atomicity on update. I'll make a change.

highestCommittedValue.set(value)
}
}
firstCommittedTime.compareAndSet(Long.MinValue, System.currentTimeMillis())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here: one writer and one reader, and we have alternative logic (in waitForRateSourceTriggers) when reader reads old value so atomicity is not strictly needed.

}

override def stop(): Unit = {}

private def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,52 @@ class ContinuousSuiteBase extends StreamTest {
sparkConf.set("spark.sql.testkey", "true")))

protected def waitForRateSourceTriggers(query: StreamExecution, numTriggers: Int): Unit = {
query match {
findRateStreamContinuousStream(query).foreach { reader =>
// Make sure epoch 0 is completed.
query.asInstanceOf[ContinuousExecution].awaitEpoch(0)

// This is called after waiting first epoch to be committed, but there might be
// a gap between committing epoch to commit log and committing epoch to source.
// If epoch 0 is not reported to rate source yet, use current time instead.
var firstCommittedTime = reader.firstCommittedTime.longValue()
if (firstCommittedTime < 0) {
firstCommittedTime = System.currentTimeMillis()
}

val deltaMs = numTriggers * 1000 + 300
while (System.currentTimeMillis < firstCommittedTime + deltaMs) {
Thread.sleep(firstCommittedTime + deltaMs - System.currentTimeMillis)
}
}
}

protected def waitForRateSourceCommittedValue(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safest approach to expect some rows to produce outputs. We still need to have max time to wait, since it may block infinitely in case of bugs.

query: StreamExecution,
desiredValue: Long,
maxWaitTimeMs: Long): Unit = {
findRateStreamContinuousStream(query).foreach { reader =>
val startTime = System.currentTimeMillis()
val maxWait = startTime + maxWaitTimeMs
while (System.currentTimeMillis() < maxWait &&
reader.highestCommittedValue.get() < desiredValue) {
Thread.sleep(100)
}
if (System.currentTimeMillis() > maxWait) {
logWarning(s"Couldn't reach desired value in $maxWaitTimeMs milliseconds!" +
s"Current highest committed value is ${reader.highestCommittedValue}")
}
}
}

private def findRateStreamContinuousStream(
query: StreamExecution): Option[RateStreamContinuousStream] = query match {

case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
s.lastExecution.executedPlan.collectFirst {
case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
}.get

val deltaMs = numTriggers * 1000 + 300
while (System.currentTimeMillis < reader.creationTime + deltaMs) {
Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis)
}
}

case _ => None
}

// A continuous trigger that will only fire the initial time for the duration of a test.
Expand Down Expand Up @@ -218,8 +252,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
.start()
val continuousExecution =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.asInstanceOf[ContinuousExecution]
continuousExecution.awaitEpoch(0)
waitForRateSourceTriggers(continuousExecution, 2)
waitForRateSourceCommittedValue(continuousExecution, 3, 20 * 1000)
query.stop()

val results = spark.read.table("noharness").collect()
Expand All @@ -241,7 +274,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
testStream(df)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 10)),
Execute(waitForRateSourceTriggers(_, 5)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change saves couple of seconds in my machine.

IncrementEpoch(),
StopStream,
CheckAnswerRowsContains(scala.Range(0, 2500).map(Row(_)))
Expand All @@ -259,7 +292,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase {
testStream(df)(
StartStream(Trigger.Continuous(2012)),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 10)),
Execute(waitForRateSourceTriggers(_, 5)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

IncrementEpoch(),
StopStream,
CheckAnswerRowsContains(scala.Range(0, 2500).map(Row(_))))
Expand Down