Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ case class FlatMapGroupsWithStateExec(
val encSchemaAttribs = stateEncoder.schema.toAttributes
if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute else encSchemaAttribs
}
// Converter for translating state Java objects to rows
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
private val stateDeserializer = stateEncoder.resolveAndBind().deserializer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment that this has to happen on the driver so we don't forget why its here and move it?



/** Distribute by grouping attributes */
override def requiredChildDistribution: Seq[Distribution] =
Expand Down Expand Up @@ -139,19 +150,9 @@ case class FlatMapGroupsWithStateExec(
ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes)
private val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType)

// Converter for translating state rows to Java objects
// Converters for translating state between rows and Java objects
private val getStateObjFromRow = ObjectOperator.deserializeRowToObject(
stateEncoder.resolveAndBind().deserializer, stateAttributes)

// Converter for translating state Java objects to rows
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
}
stateDeserializer, stateAttributes)
private val getStateRowFromObj = ObjectOperator.serializeObjectToRow(stateSerializer)

// Index of the additional metadata fields in the state row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout

import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
Expand Down Expand Up @@ -574,11 +576,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assertNumStateRows(total = 1, updated = 2),

StopStream,
StartStream(ProcessingTime("1 second"), triggerClock = clock),
AdvanceManualClock(10 * 1000),
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),

AddData(inputData, "c"),
AdvanceManualClock(1 * 1000),
AdvanceManualClock(11 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,18 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
try {
// Add data and get the source where it was added, and the expected offset of the
// added data.
if (currentStream != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment here too. I had no idea what this was doing until read the PR description.

currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
eventually("Error while synchronizing with manual clock before adding data") {
if (currentStream.isActive) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (!currentStream.isActive) {
failTest("Query terminated while synchronizing with manual clock")
}
}
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)

Expand Down