Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class ContinuousSuiteBase extends StreamTest {
case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
}.get

val deltaMs = numTriggers * 1000 + 300
// Adding 3s in case of slow initialization of partition reader - rows will be committed
// on epoch which they're written.
// Since previous epochs should be committed before to commit the epoch which output rows
// are written, slow initialization of partition reader and tiny trigger interval leads
// output rows to wait long time to be committed.
val deltaMs = numTriggers * 1000 + 3000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for investigating this, @HeartSaVioR .
The original logic looks not safe. Do you think if there is any other better way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put the test in a loop left it for couple of hours and haven't seen any problem.
On the other hand I agree with @dongjoon-hyun, the original waitForaWhile approach is not safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I can perfectly track when these rows are committed (the safest way), as we are checking this from driver side whereas things are delayed in executor side.

But given the main delay is occurred before first epoch being committed, instead of adding more gaps, we may be able to add 2s (which would be enough for 4 rows to be emitted in rate source) after waiting first epoch to be committed. I'll make a change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I found we can track the highest committed value, so we can wait for specific value to be committed. Updated the patch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

while (System.currentTimeMillis < reader.creationTime + deltaMs) {
Thread.sleep(reader.creationTime + deltaMs - System.currentTimeMillis)
}
Expand Down