Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,16 @@ class MicroBatchExecution(
}
}

/** Execute a function while locking the stream from making an progress */
private[sql] def withProgressLocked(f: => Unit): Unit = {
awaitProgressLock.lock()
try {
f
} finally {
awaitProgressLock.unlock()
}
}

private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = {
Optional.ofNullable(scalaOption.orNull)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
AddDataMemory(source, data)
}

/**
* Adds data to multiple memory streams such that all the data will be made visible in the
* same batch. This is applicable only to MicroBatchExecution, as this coordination cannot be
* performed at the driver in ContinuousExecutions.
*/
object MultiAddData {
def apply[A]
(source1: MemoryStream[A], data1: A*)(source2: MemoryStream[A], data2: A*): StreamAction = {
val actions = Seq(AddDataMemory(source1, data1), AddDataMemory(source2, data2))
StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | ", " ]"))
}
}

/** A trait that can be extended when testing a source. */
trait AddData extends StreamAction {
/**
Expand Down Expand Up @@ -217,6 +230,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
s"ExpectFailure[${causeClass.getName}, isFatalError: $isFatalError]"
}

/**
* Performs multiple actions while locking the stream from progressing.
* This is applicable only to MicroBatchExecution, as progress of ContinuousExecution
* cannot be controlled from the driver.
*/
case class StreamProgressLockedActions(actions: Seq[StreamAction], desc: String = null)
extends StreamAction {

override def toString(): String = {
if (desc != null) desc else super.toString
}
}

/** Assert that a body is true */
class Assert(condition: => Boolean, val message: String = "") extends StreamAction {
def run(): Unit = { Assertions.assert(condition) }
Expand Down Expand Up @@ -295,6 +321,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
val sink = if (useV2Sink) new MemorySinkV2 else new MemorySink(stream.schema, outputMode)
val resetConfValues = mutable.Map[String, Option[String]]()
val defaultCheckpointLocation =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
var manualClockExpectedTime = -1L

@volatile
var streamThreadDeathCause: Throwable = null
Expand Down Expand Up @@ -425,11 +454,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
}
}

var manualClockExpectedTime = -1L
val defaultCheckpointLocation =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
try {
startedTest.foreach { action =>
def executeAction(action: StreamAction): Unit = {
logInfo(s"Processing test stream action: $action")
action match {
case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) =>
Expand Down Expand Up @@ -663,6 +688,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
}
pos += 1
}

try {
startedTest.foreach {
case StreamProgressLockedActions(actns, _) =>
// Perform actions while holding the stream from progressing
assert(currentStream != null,
s"Cannot perform stream-progress-locked actions $actns when query is not active")
assert(currentStream.isInstanceOf[MicroBatchExecution],
s"Cannot perform stream-progress-locked actions on non-microbatch queries")
currentStream.asInstanceOf[MicroBatchExecution].withProgressLocked {
actns.foreach(executeAction)
}

case action: StreamAction => executeAction(action)
}
if (streamThreadDeathCause != null) {
failTest("Stream Thread Died", streamThreadDeathCause)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
AddData(leftInput, 1, 2, 3),
AddData(rightInput, 3, 4, 5),
MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
// The left rows with leftValue <= 4 should generate their outer join row now and
// not get added to the state.
CheckLastBatch(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)),
assertNumStateRows(total = 4, updated = 4),
// We shouldn't get more outer join rows when the watermark advances.
AddData(leftInput, 20),
AddData(rightInput, 21),
MultiAddData(leftInput, 20)(rightInput, 21),
CheckLastBatch(),
AddData(rightInput, 20),
CheckLastBatch((20, 30, 40, "60"))
Expand All @@ -493,15 +491,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
AddData(leftInput, 3, 4, 5),
AddData(rightInput, 1, 2, 3),
MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
// The right rows with value <= 7 should never be added to the state.
CheckLastBatch(Row(3, 10, 6, "9")),
assertNumStateRows(total = 4, updated = 4),
// When the watermark advances, we get the outer join rows just as we would if they
// were added but didn't match the full join condition.
AddData(leftInput, 20),
AddData(rightInput, 21),
MultiAddData(leftInput, 20)(rightInput, 21),
CheckLastBatch(),
AddData(rightInput, 20),
CheckLastBatch(Row(20, 30, 40, "60"), Row(4, 10, 8, null), Row(5, 10, 10, null))
Expand All @@ -524,15 +520,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
AddData(leftInput, 1, 2, 3),
AddData(rightInput, 3, 4, 5),
MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
// The left rows with value <= 4 should never be added to the state.
CheckLastBatch(Row(3, 10, 6, "9")),
assertNumStateRows(total = 4, updated = 4),
// When the watermark advances, we get the outer join rows just as we would if they
// were added but didn't match the full join condition.
AddData(leftInput, 20),
AddData(rightInput, 21),
MultiAddData(leftInput, 20)(rightInput, 21),
CheckLastBatch(),
AddData(rightInput, 20),
CheckLastBatch(Row(20, 30, 40, "60"), Row(4, 10, null, "12"), Row(5, 10, null, "15"))
Expand All @@ -555,15 +549,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
AddData(leftInput, 3, 4, 5),
AddData(rightInput, 1, 2, 3),
MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
// The right rows with rightValue <= 7 should generate their outer join row now and
// not get added to the state.
CheckLastBatch(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")),
assertNumStateRows(total = 4, updated = 4),
// We shouldn't get more outer join rows when the watermark advances.
AddData(leftInput, 20),
AddData(rightInput, 21),
MultiAddData(leftInput, 20)(rightInput, 21),
CheckLastBatch(),
AddData(rightInput, 20),
CheckLastBatch((20, 30, 40, "60"))
Expand All @@ -575,13 +567,11 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with

testStream(joined)(
// Test inner part of the join.
AddData(leftInput, 1, 2, 3, 4, 5),
AddData(rightInput, 3, 4, 5, 6, 7),
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
// Old state doesn't get dropped until the batch *after* it gets introduced, so the
// nulls won't show up until the next batch after the watermark advances.
AddData(leftInput, 21),
AddData(rightInput, 22),
MultiAddData(leftInput, 21)(rightInput, 22),
CheckLastBatch(),
assertNumStateRows(total = 12, updated = 2),
AddData(leftInput, 22),
Expand All @@ -595,13 +585,11 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with

testStream(joined)(
// Test inner part of the join.
AddData(leftInput, 1, 2, 3, 4, 5),
AddData(rightInput, 3, 4, 5, 6, 7),
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
// Old state doesn't get dropped until the batch *after* it gets introduced, so the
// nulls won't show up until the next batch after the watermark advances.
AddData(leftInput, 21),
AddData(rightInput, 22),
MultiAddData(leftInput, 21)(rightInput, 22),
CheckLastBatch(),
assertNumStateRows(total = 12, updated = 2),
AddData(leftInput, 22),
Expand Down Expand Up @@ -676,34 +664,28 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with

testStream(joined)(
// leftValue <= 10 should generate outer join rows even though it matches right keys
AddData(leftInput, 1, 2, 3),
AddData(rightInput, 1, 2, 3),
MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3),
CheckLastBatch(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)),
AddData(leftInput, 20),
AddData(rightInput, 21),
MultiAddData(leftInput, 20)(rightInput, 21),
CheckLastBatch(),
assertNumStateRows(total = 5, updated = 2),
AddData(rightInput, 20),
CheckLastBatch(
Row(20, 30, 40, 60)),
assertNumStateRows(total = 3, updated = 1),
// leftValue and rightValue both satisfying condition should not generate outer join rows
AddData(leftInput, 40, 41),
AddData(rightInput, 40, 41),
MultiAddData(leftInput, 40, 41)(rightInput, 40, 41),
CheckLastBatch((40, 50, 80, 120), (41, 50, 82, 123)),
AddData(leftInput, 70),
AddData(rightInput, 71),
MultiAddData(leftInput, 70)(rightInput, 71),
CheckLastBatch(),
assertNumStateRows(total = 6, updated = 2),
AddData(rightInput, 70),
CheckLastBatch((70, 80, 140, 210)),
assertNumStateRows(total = 3, updated = 1),
// rightValue between 300 and 1000 should generate outer join rows even though it matches left
AddData(leftInput, 101, 102, 103),
AddData(rightInput, 101, 102, 103),
MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103),
CheckLastBatch(),
AddData(leftInput, 1000),
AddData(rightInput, 1001),
MultiAddData(leftInput, 1000)(rightInput, 1001),
CheckLastBatch(),
assertNumStateRows(total = 8, updated = 2),
AddData(rightInput, 1000),
Expand Down