Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,16 @@ object SQLConf {
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createWithDefault(2)

val STREAMING_JOIN_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.join.stateFormatVersion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @tgravescs . The incompatibility of state format was the reason of not having this in branch-2.4.

.internal()
.doc("State format version used by streaming join operations in a streaming query. " +
"State between versions are tend to be incompatible, so state format version shouldn't " +
"be modified after running.")
.intConf
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkValues(Set(1, 2))

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion. Btw, the line is actually consistent with the old things, STREAMING_AGGREGATION_STATE_FORMAT_VERSION, FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION. Would we like to change them as well, or just leave it be consistent? Maybe from follow-up PR then to exclude unrelated changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to change the others, feel free to open another PR. But I'd like the better API to be used here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Ignoring the discussion about removing this, in the other comment below.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to leave this as it is, and handle this entirely. We can keep consistency regardless of follow-up PR be accepted or not.

.createWithDefault(2)

val UNSUPPORTED_OPERATION_CHECK_ENABLED =
buildConf("spark.sql.streaming.unsupportedOperationCheck")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _)
if left.isStreaming && right.isStreaming =>

new StreamingSymmetricHashJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
val stateVersion = conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION)
new StreamingSymmetricHashJoinExec(leftKeys, rightKeys, joinType, condition,
stateVersion, planLater(left), planLater(right)) :: Nil

case Join(left, right, _, _, _) if left.isStreaming && right.isStreaming =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager}
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager, SymmetricHashJoinStateManager}
import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _}


Expand Down Expand Up @@ -91,7 +91,8 @@ object OffsetSeqMetadata extends Logging {
private implicit val format = Serialization.formats(NoTypeHints)
private val relevantSQLConfs = Seq(
SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION)
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
STREAMING_JOIN_STATE_FORMAT_VERSION)

/**
* Default values of relevant configurations that are used for backward compatibility.
Expand All @@ -108,7 +109,9 @@ object OffsetSeqMetadata extends Logging {
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key ->
FlatMapGroupsWithStateExecHelper.legacyVersion.toString,
STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key ->
StreamingAggregationStateManager.legacyVersion.toString
StreamingAggregationStateManager.legacyVersion.toString,
STREAMING_JOIN_STATE_FORMAT_VERSION.key ->
SymmetricHashJoinStateManager.legacyVersion.toString
)

def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValueAndMatched
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}

Expand Down Expand Up @@ -131,6 +132,7 @@ case class StreamingSymmetricHashJoinExec(
stateInfo: Option[StatefulOperatorStateInfo],
eventTimeWatermark: Option[Long],
stateWatermarkPredicates: JoinStateWatermarkPredicates,
stateFormatVersion: Int,
left: SparkPlan,
right: SparkPlan) extends SparkPlan with BinaryExecNode with StateStoreWriter {

Expand All @@ -139,13 +141,20 @@ case class StreamingSymmetricHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
condition: Option[Expression],
stateFormatVersion: Int,
left: SparkPlan,
right: SparkPlan) = {

this(
leftKeys, rightKeys, joinType, JoinConditionSplitPredicates(condition, left, right),
stateInfo = None, eventTimeWatermark = None,
stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, right)
stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
}

if (stateFormatVersion < 2 && joinType != Inner) {
logError(s"The query is using stream-stream outer join with state format version" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is logging enough? Shouldn't you just fail the query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this correctness issue is somewhat occurred in "corner case", so I wasn't 100% sure we want to force end users to discard their checkpoint. #24890 is a similar case for "possible" correctness issue and we just log warn message.

If we would like to avoid possible correctness issue in any case, I'm OK to let the query fail. Forcing end users to drop checkpoint might be unhappy one, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I would suggest a config option to choose between error or continue, but then I dislike adding more and more config options.

I guess the question is: in which situations will you have the new version of Spark using the old snapshot format?

The way I understand it, it should only happen if you restart an app that was running 2.4, now running with 3.0. The new query will pick up the state store from the previous run and go from there.

In that situation it doesn't seem horrible to fail the query. But anyway, I'll leave it to your judgement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the impact matters. They need to discard checkpoint and replay the query with their previous inputs from the scratch. The impact would depend on how much data they need to replay - join may store rows depending on the condition of join, may couple of hours, even days (though it doesn't seem to be realistic in production).

Maybe replaying couple of hours of inputs wouldn't matter too much, and they still be able to run the query with old Spark version, so yes I agree it wouldn't be so horrible if we guide how to discard checkpoint and rerun the query.

Btw, as I described below, we may try to provide offline migration tool - though this patch shouldn't be blocked by that.

s" ${stateFormatVersion} - correctness issue is discovered. Please discard the checkpoint" +
" and rerun the query. See SPARK-26154 for more details.")
}

private def throwBadJoinTypeException(): Nothing = {
Expand Down Expand Up @@ -270,20 +279,30 @@ case class StreamingSymmetricHashJoinExec(
// * Getting an iterator over the rows that have aged out on the left side. These rows are
// candidates for being null joined. Note that to avoid doing two passes, this iterator
// removes the rows from the state manager as they're processed.
// * Checking whether the current row matches a key in the right side state, and that key
// has any value which satisfies the filter function when joined. If it doesn't,
// we know we can join with null, since there was never (including this batch) a match
// within the watermark period. If it does, there must have been a match at some point, so
// we know we can't join with null.
// * (state format version 1) Checking whether the current row matches a key in the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left origin comment as it is since it is still applied for state format version 1, and added comment for explaining state format version 2 and the reason of making a change. Please let me know if we just want to remove explanation of state format version 1.

// right side state, and that key has any value which satisfies the filter function when
// joined. If it doesn't, we know we can join with null, since there was never
// (including this batch) a match within the watermark period. If it does, there must have
// been a match at some point, so we know we can't join with null.
// * (state format version 2) We found edge-case of above approach which brings correctness
// issue, and had to take another approach (see SPARK-26154); now Spark stores 'matched'
// flag along with row, which is set to true when there's any matching row on the right.

def matchesWithRightSideState(leftKeyValue: UnsafeRowPair) = {
rightSideJoiner.get(leftKeyValue.key).exists { rightValue =>
postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
}
}
val removedRowIter = leftSideJoiner.removeOldState()
val outerOutputIter = removedRowIter
.filterNot(pair => matchesWithRightSideState(pair))
.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
val outerOutputIter = removedRowIter.filterNot { kvAndMatched =>
stateFormatVersion match {
case 1 => matchesWithRightSideState(
new UnsafeRowPair(kvAndMatched.key, kvAndMatched.value))
case 2 => kvAndMatched.matched
case _ => throw new IllegalStateException("Incorrect state format version! " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Incorrect/Invalid (or Unexpected).

s"version $stateFormatVersion")
}
}.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))

innerOutputIter ++ outerOutputIter
case RightOuter =>
Expand All @@ -294,9 +313,15 @@ case class StreamingSymmetricHashJoinExec(
}
}
val removedRowIter = rightSideJoiner.removeOldState()
val outerOutputIter = removedRowIter
.filterNot(pair => matchesWithLeftSideState(pair))
.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
val outerOutputIter = removedRowIter.filterNot { kvAndMatched =>
stateFormatVersion match {
case 1 => matchesWithLeftSideState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that if your statement doesn't fit in the same line as the case, it's easier to read when it starts on the next line. Otherwise the indentation of the next lines look weird.

new UnsafeRowPair(kvAndMatched.key, kvAndMatched.value))
case 2 => kvAndMatched.matched
case _ => throw new IllegalStateException("Incorrect state format version! " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

s"version $stateFormatVersion")
}
}.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))

innerOutputIter ++ outerOutputIter
case _ => throwBadJoinTypeException()
Expand Down Expand Up @@ -395,7 +420,8 @@ case class StreamingSymmetricHashJoinExec(
newPredicate(preJoinFilterExpr.getOrElse(Literal(true)), inputAttributes).eval _

private val joinStateManager = new SymmetricHashJoinStateManager(
joinSide, inputAttributes, joinKeys, stateInfo, storeConf, hadoopConfBcast.value.value)
joinSide, inputAttributes, joinKeys, stateInfo, storeConf, hadoopConfBcast.value.value,
stateFormatVersion)
private[this] val keyGenerator = UnsafeProjection.create(joinKeys, inputAttributes)

private[this] val stateKeyWatermarkPredicateFunc = stateWatermarkPredicate match {
Expand Down Expand Up @@ -445,16 +471,9 @@ case class StreamingSymmetricHashJoinExec(
// the case of inner join).
if (preJoinFilter(thisRow)) {
val key = keyGenerator(thisRow)
val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
generateJoinedRow(thisRow, thatRow)
}.filter(postJoinFilter)
val shouldAddToState = // add only if both removal predicates do not match
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
if (shouldAddToState) {
joinStateManager.append(key, thisRow)
updatedStateRowsCount += 1
}
outputIter
val outputIter: Iterator[JoinedRow] = otherSideJoiner.joinStateManager
.getJoinedRows(key, thatRow => generateJoinedRow(thisRow, thatRow), postJoinFilter)
new AddingProcessedRowToStateCompletionIterator(key, thisRow, outputIter)
} else {
joinSide match {
case LeftSide if joinType == LeftOuter =>
Expand All @@ -467,6 +486,31 @@ case class StreamingSymmetricHashJoinExec(
}
}

private class AddingProcessedRowToStateCompletionIterator(
key: UnsafeRow,
thisRow: UnsafeRow,
subIter: Iterator[JoinedRow])
extends CompletionIterator[JoinedRow, Iterator[JoinedRow]](subIter) {
private var iteratorNotEmpty: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this just be a val? hasNext is supposed to be cheap, or at the very least idempotent, so should be safe to call here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess yes, if the implementation of iterator is respecting the spec of iterator. Let me change it and see whether it doesn't break anything.


override def hasNext: Boolean = {
val ret = super.hasNext
if (ret && !iteratorNotEmpty) {
iteratorNotEmpty = true
}
ret
}

override def completion(): Unit = {
val shouldAddToState = // add only if both removal predicates do not match
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
if (shouldAddToState) {
joinStateManager.append(key, thisRow, matched = iteratorNotEmpty)
updatedStateRowsCount += 1
}
}
}

/**
* Get an iterator over the values stored in this joiner's state manager for the given key.
*
Expand All @@ -486,7 +530,7 @@ case class StreamingSymmetricHashJoinExec(
* We do this to avoid requiring either two passes or full materialization when
* processing the rows for outer join.
*/
def removeOldState(): Iterator[UnsafeRowPair] = {
def removeOldState(): Iterator[KeyToValueAndMatched] = {
stateWatermarkPredicate match {
case Some(JoinStateKeyWatermarkPredicate(expr)) =>
joinStateManager.removeByKeyCondition(stateKeyWatermarkPredicateFunc)
Expand Down
Loading