Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
// Converter for translating state Java objects to rows
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment that this has to happen on the driver so we don't forget why its here and move it?



/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
Expand Down Expand Up @@ -139,19 +150,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)

// Converter for translating state rows to Java objects
// Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
stateEncoder.resolveAndBind().deserializer, stateAttributes)

// Converter for translating state Java objects to rows
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)

// Index of the additional metadata fields in the state row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("max files per trigger - incorrect values") {
testQuietly("max files per trigger - incorrect values") {
val testTable = "maxFilesPerTrigger_test"
withTable(testTable) {
withTempDir { case src =>
Expand Down Expand Up @@ -1326,7 +1326,7 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

import testImplicits._

test("file source stress test") {
testQuietly("file source stress test") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout

import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
Expand Down Expand Up @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assertNumStateRows(total = 1, updated = 2),

StopStream,
StartStream(ProcessingTime("1 second"), triggerClock = clock),
AdvanceManualClock(10 * 1000),
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),

AddData(inputData, "c"),
AdvanceManualClock(1 * 1000),
AdvanceManualClock(11 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class StreamSuite extends StreamTest {
CheckAnswer((1, 2), (2, 2), (3, 2)))
}

test("recover from a Spark v2.1 checkpoint") {
testQuietly("recover from a Spark v2.1 checkpoint") {
var inputData: MemoryStream[Int] = null
var query: DataStreamWriter[Row] = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,18 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
try {
// Add data and get the source where it was added, and the expected offset of the
// added data.
if (currentStream != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment here too. I had no idea what this was doing until read the PR description.

currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
eventually("Error while synchronizing with manual clock before adding data") {
if (currentStream.isActive) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (!currentStream.isActive) {
failTest("Query terminated while synchronizing with manual clock")
}
}
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}

test("StreamingQuery should be Serializable but cannot be used in executors") {
testQuietly("StreamingQuery should be Serializable but cannot be used in executors") {
def startQuery(ds: Dataset[Int], queryName: String): StreamingQuery = {
ds.writeStream
.queryName(queryName)
Expand Down