Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented May 22, 2020

What changes were proposed in this pull request?

Please refer https://issues.apache.org/jira/browse/SPARK-24634 to see rationalization of the issue.

This patch adds a new metric to count the number of inputs arrived later than watermark plus allowed delay. To make changes simpler, this patch doesn't count the exact number of input rows which are later than watermark plus allowed delay. Instead, this patch counts the inputs which are dropped in the logic of operator. The difference of twos are shown in streaming aggregation: to optimize the calculation, streaming aggregation "pre-aggregates" the input rows, and later checks the lateness against "pre-aggregated" inputs, hence the number might be reduced.

The new metric will be provided via two places:

  1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
  2. On Streaming Query Listener: check "numLateInputs" in "stateOperators" in QueryProcessEvent.

Why are the changes needed?

Dropping late inputs means that end users might not get expected outputs. Even end users may indicate the fact and tolerate the result (as that's what allowed lateness is for), but they should be able to observe whether the current value of allowed lateness drops inputs or not so that they can adjust the value.

Also, whatever the chance they have multiple of stateful operators in a single query, if Spark drops late inputs "between" these operators, it becomes "correctness" issue. Spark should disallow such possibility, but given we already provided the flexibility, at least we should provide the way to observe the correctness issue and decide whether they should make correction of their query or not.

Does this PR introduce any user-facing change?

Yes. End users will be able to retrieve the information of late inputs via two ways:

  1. SQL tab in Spark UI
  2. Streaming Query Listener

How was this patch tested?

New UTs added & existing UTs are modified to reflect the change.

And ran manual test reproducing SPARK-28094.

I've picked the specific case on "B outer C outer D" which is enough to represent the "intermediate late row" issue due to global watermark.

https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17

Spark logs warning message on the query which means SPARK-28074 is working correctly,

20/05/30 17:52:47 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details.;
Join LeftOuter, ((D_FK#28 = D_ID#87) AND (B_LAST_MOD#26-T30000ms = D_LAST_MOD#88-T30000ms))
:- Join LeftOuter, ((C_FK#27 = C_ID#58) AND (B_LAST_MOD#26-T30000ms = C_LAST_MOD#59-T30000ms))
:  :- EventTimeWatermark B_LAST_MOD#26: timestamp, 30 seconds
:  :  +- Project [v#23.B_ID AS B_ID#25, v#23.B_LAST_MOD AS B_LAST_MOD#26, v#23.C_FK AS C_FK#27, v#23.D_FK AS D_FK#28]
:  :     +- Project [from_json(StructField(B_ID,StringType,false), StructField(B_LAST_MOD,TimestampType,false), StructField(C_FK,StringType,true), StructField(D_FK,StringType,true), value#21, Some(UTC)) AS v#23]
:  :        +- Project [cast(value#8 as string) AS value#21]
:  :           +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3a7fd18c, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@396d2958, org.apache.spark.sql.util.CaseInsensitiveStringMap@a51ee61a, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@d221af8,kafka,List(),None,List(),None,Map(inferSchema -> true, startingOffsets -> earliest, subscribe -> B, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
:  +- EventTimeWatermark C_LAST_MOD#59: timestamp, 30 seconds
:     +- Project [v#56.C_ID AS C_ID#58, v#56.C_LAST_MOD AS C_LAST_MOD#59]
:        +- Project [from_json(StructField(C_ID,StringType,false), StructField(C_LAST_MOD,TimestampType,false), value#54, Some(UTC)) AS v#56]
:           +- Project [cast(value#41 as string) AS value#54]
:              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3f507373, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7b6736a4, org.apache.spark.sql.util.CaseInsensitiveStringMap@a51ee61b, [key#40, value#41, topic#42, partition#43, offset#44L, timestamp#45, timestampType#46], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@d221af8,kafka,List(),None,List(),None,Map(inferSchema -> true, startingOffsets -> earliest, subscribe -> C, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#33, value#34, topic#35, partition#36, offset#37L, timestamp#38, timestampType#39]
+- EventTimeWatermark D_LAST_MOD#88: timestamp, 30 seconds
   +- Project [v#85.D_ID AS D_ID#87, v#85.D_LAST_MOD AS D_LAST_MOD#88]
      +- Project [from_json(StructField(D_ID,StringType,false), StructField(D_LAST_MOD,TimestampType,false), value#83, Some(UTC)) AS v#85]
         +- Project [cast(value#70 as string) AS value#83]
            +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@2b90e779, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@36f8cd29, org.apache.spark.sql.util.CaseInsensitiveStringMap@a51ee620, [key#69, value#70, topic#71, partition#72, offset#73L, timestamp#74, timestampType#75], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@d221af8,kafka,List(),None,List(),None,Map(inferSchema -> true, startingOffsets -> earliest, subscribe -> D, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#62, value#63, topic#64, partition#65, offset#66L, timestamp#67, timestampType#68]

and we can find the late inputs from the batch 4 as follows:

Screen Shot 2020-05-30 at 18 02 53

which represents intermediate inputs are being lost, ended up with correctness issue.

@HeartSaVioR
Copy link
Contributor Author

I just revive #21617 (instead of pushing forward on #24936) because this approach is straightforward and pretty much simpler compared to #24936, though #24936 provides exact number.

#21617 got good tractions from reviewers, except the fact the representation of "input rows" are not exactly correct. I'm trying to cover such thing in this patch.

@SparkQA
Copy link

SparkQA commented May 22, 2020

Test build #122967 has finished for PR 28607 at commit 9cc0443.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 22, 2020

Test build #122971 has finished for PR 28607 at commit 10d8d56.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 22, 2020

Test build #122973 has finished for PR 28607 at commit 10d8d56.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 22, 2020

Test build #122981 has finished for PR 28607 at commit f09a623.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented May 27, 2020

Test build #123140 has finished for PR 28607 at commit f09a623.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

iter: Iterator[InternalRow],
predicate: BasePredicate): Iterator[InternalRow] = {
iter.filter { row =>
val filteredIn = !predicate.eval(row)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: little confusing of the variable naming here, maybe use a name related with watermark? inWatermark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's clearer if we apply filterNot - then the variable name can be renamed to lateInput which is matched with metric name. Probably the variable predicate is also not clear which row will get true. Let me apply it.

@xuanyuanking
Copy link
Member

Thanks for reviving, I think it's an important metrics. How about also attach a screenshot of SQL UI contains this metric?

@SparkQA
Copy link

SparkQA commented May 30, 2020

Test build #123306 has finished for PR 28607 at commit d31e86e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing the PR. I attached the SQL UI screenshot which has late inputs.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!
LGTM, cc @zsxwing @tdas for taking a look.

@SparkQA
Copy link

SparkQA commented May 30, 2020

Test build #123315 has finished for PR 28607 at commit 0f32541.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 31, 2020

Test build #123345 has finished for PR 28607 at commit 03dc461.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2020

Test build #123357 has finished for PR 28607 at commit 4216405.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jun 14, 2020

Test build #123984 has finished for PR 28607 at commit 4216405.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-24634-v3 branch June 14, 2020 06:36

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numLateInputs" -> SQLMetrics.createMetric(sparkContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe name it as numDroppedRowsByWatermark? Technically, we don't guarantee to drop all late inputs. So this metric doesn't report the accurate number of late inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Yeah I guess I tried to explain the behavior but the name seems to be still confusing to others. I agree the suggested name is clearer.

Btw, would we be better to have accurate number of late inputs? (Just asking because #24936 covers this, and can be applied orthogonally like num of late inputs vs num of dropped inputs.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyukjinKwon pushed a commit that referenced this pull request Jun 16, 2020
…to "numRowsDroppedByWatermark"

### What changes were proposed in this pull request?

This PR renames the variable from "numLateInputs" to "numRowsDroppedByWatermark" so that it becomes self-explanation.

### Why are the changes needed?

This is originated from post-review, see #28607 (comment)

### Does this PR introduce _any_ user-facing change?

No, as SPARK-24634 is not introduced in any release yet.

### How was this patch tested?

Existing UTs.

Closes #28828 from HeartSaVioR/SPARK-24634-v3-followup.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants