Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,20 @@ class MicroBatchExecution(
cd.dataType, cd.timeZoneId)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause is CurrentBatchTimestamp is TimeZoneAwareExpression which is unresolved without TimeZoneId.


// Pre-resolve new attributes to ensure all attributes are resolved before
// accessing schema of logical plan. Note that it only leverages the information
// of attributes, so we don't need to concern about the value of literals.

val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even leverage newBatchesPlan and remove this: this effort is to ensure going through the same path with further transformation when extracting schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR . I prefer to leverage the existing lines (501 ~ 509) as you said like the following. I assume that the following is what you mean. Eventually, it's two line changes (excluding comments). WDYT?

    // Rewire the plan to use the new attributes that were returned by the source.
    val newAttributePlan = newBatchesPlan transformAllExpressions {
      case ct: CurrentTimestamp =>
        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          ct.dataType)
+          ct.dataType).toLiteral
      case cd: CurrentDate =>
        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          cd.dataType, cd.timeZoneId)
+          cd.dataType, cd.timeZoneId).toLiteral
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun
Actually I meant we pass newBatchesPlan instead of newAttributePlan only for extracting schema since we actually guess the name and data type will not be changed from IncrementalExecution. But IMHO I feel safer pre-transforming it.

Transforming CurrentBatchTimestamp to Literal not in IncrementalExecution breaks the intention what CurrentBatchTimestamp javadoc describes. Please note that I used transformed plan only for extracting schema and the actual plan is not changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

case cbt: CurrentBatchTimestamp => cbt.toLiteral
}

val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: StreamingWriteSupportProvider =>
val writer = s.createStreamingWriteSupport(
s"$runId",
newAttributePlan.schema,
newAttrPlanPreResolvedForSchema.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,42 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}
}

Seq(true, false).foreach { useV2Sink =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure makes it impossible to execute test independently (at least I don't know). The whole suite has to be executed which makes even small modifications time consuming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RIght. That was to minimize the code, but I guess only some lines will be added and it will become easier to test so better to apply your suggestion. Thanks!

import org.apache.spark.sql.functions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already imported.


val newTestName = "SPARK-26379 Structured Streaming - Exception on adding column to Dataset" +
s" - use v2 sink - $useV2Sink"

test(newTestName) {
val input = MemoryStream[Int]
val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see this covers CurrentTimestamp, maybe worth to add test for CurrentDate as well.


def assertBatchOutputAndUpdateLastTimestamp(
rows: Seq[Row],
curTimestamp: Long,
expectedValue: Int): Long = {
assert(rows.size === 1)
val row = rows.head
assert(row.getInt(0) === expectedValue)
assert(row.getTimestamp(1).getTime > curTimestamp)
row.getTimestamp(1).getTime
}

var lastTimestamp = -1L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much functional difference but maybe this could be now.

testStream(df, useV2Sink = useV2Sink) (
AddData(input, 1),
CheckLastBatch { rows: Seq[Row] =>
lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, 1)
},
Execute { _ => Thread.sleep(3 * 1000) },
AddData(input, 2),
CheckLastBatch { rows: Seq[Row] =>
lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, 2)
}
)
}
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down