Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Feb 13, 2020

What changes were proposed in this pull request?

This patch adds some log messages to log elapsed time for "compact" operation in FileStreamSourceLog and FileStreamSinkLog (added in CompactibleFileStreamLog) to help investigating the mysterious latency spike during the batch run.

Why are the changes needed?

Tracking latency is a critical aspect of streaming query. While "compact" operation may bring nontrivial latency (it's even synchronous, adding all the latency to the batch run), it's not measured and end users have to guess.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

N/A for UT. Manual test with streaming query using file source & file sink.

grep "for compact batch"

...
20/02/20 19:27:36 WARN FileStreamSinkLog: Compacting took 24473 ms (load: 14185 ms, write: 10288 ms) for compact batch 21359
20/02/20 19:27:39 WARN FileStreamSinkLog: Loaded 1068000 entries (397985432 bytes in memory), and wrote 1068000 entries for compact batch 21359
20/02/20 19:29:52 WARN FileStreamSourceLog: Compacting took 3777 ms (load: 1524 ms, write: 2253 ms) for compact batch 21369
20/02/20 19:29:52 WARN FileStreamSourceLog: Loaded 229477 entries (68970112 bytes in memory), and wrote 229477 entries for compact batch 21369
20/02/20 19:30:17 WARN FileStreamSinkLog: Compacting took 24183 ms (load: 12992 ms, write: 11191 ms) for compact batch 21369
20/02/20 19:30:20 WARN FileStreamSinkLog: Loaded 1068500 entries (398171880 bytes in memory), and wrote 1068500 entries for compact batch 21369
...

Screen Shot 2020-02-21 at 12 34 22 PM

This messages are explaining why the operation duration peaks per every 10 batches which is compact interval. Latency from addBatch heavily increases in each peak which DOES NOT mean it takes more time to write outputs, but we have no idea if such message is not presented.

NOTE: The output may be a bit different from the code, as it may be changed a bit during review phase.

@HeartSaVioR HeartSaVioR changed the title [SPARK-30804] Measure and log elapsed time for "compact" operation in CompactibleFileStreamLog [SPARK-30804][SS] Measure and log elapsed time for "compact" operation in CompactibleFileStreamLog Feb 13, 2020
@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 13, 2020

cc. @tdas @zsxwing @gaborgsomogyi

The output I added simply represents the necessity to add the log message. That came with reading & writing against "local" filesystem.

}
} ++ logs
}
logInfo(s"It took $loadElapsedMs ms to load ${allLogs.size} entries " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, setting this to INFO won't bother much as it will be only printed per compact interval. InMemoryFileIndex prints the latency information to seek files via INFO which makes sense and it can be reflected here as well.

} ++ logs
}
logInfo(s"It took $loadElapsedMs ms to load ${allLogs.size} entries " +
s"(${SizeEstimator.estimate(allLogs)} bytes in memory) for compact batch $batchId.")
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added estimated memory usage as well, because this is adding the memory usage in "driver", which is easily thought as not too much memory is needed.
(https://issues.apache.org/jira/browse/SPARK-30462)

@SparkQA
Copy link

SparkQA commented Feb 13, 2020

Test build #118336 has finished for PR 27557 at commit 619eee3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Retest this, please

@SparkQA
Copy link

SparkQA commented Feb 13, 2020

Test build #118345 has finished for PR 27557 at commit 619eee3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

I think the information which prints out is not necessary for the users but it's useful debug information. I've double checked the printout is not coming often but I would lower the severity to debug.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 20, 2020

I think the information which prints out is not necessary for the users

I'm not sure I can agree with. The information is pretty much similar with what InMemoryFileIndex provides the information for listing leaf files in InMemoryFileIndex, which level is set to INFO if I remember correctly.

For streaming workloads, latency is the first class consideration. End users would have no idea why the overall latency suddenly increases per N batches unless they know about the details of metadata on FileStreamSource / FileStreamSink. This is completely different user experience they would experience with Kafka streaming source and sink - they may struggle to find the root cause from another spots like their query or so.

But I'd agree that the information may not be necessary for the users if the latency being added here is not considerable. We could set a threshold (like 1s or 2s?) and only print when the latency exceeds the threshold (still print it with DEBUG level even it doesn't reach threshold), but then that would deserve to have higher severity, WARN.

What do you think?

@gaborgsomogyi
Copy link
Contributor

For streaming workloads, latency is the first class consideration.

When the query is not running properly. Sending it all the time which increase the log file size is different. I think it's debug information which helps developers to find out what's the issue and not users (INFO is more like to users in my understanding).

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 20, 2020

For streaming workloads, latency is the first class consideration.

When the query is not running properly.

OK I admit my major experience had been with "low-latency", but even Spark runs with "micro-batch", it doesn't mean latency is not important. The latency is the thing in streaming workload to "define" whether the query is running properly or not. Even Spark had to claim that a micro-batch could run in sub-second because one of major downside for Spark Streaming has been the latency, and continuous processing had to be introduced.

Higher latency doesn't only mean output will be delayed. When you turn on "latestFirst" (with maxFilesPerTrigger, as this case we assume we can't process all the inputs) to start reading from latest files, then the latency on a batch defines the boundary of inputs.

It's a critical aspect which operators should always observe via their monitoring approaches (alerts, time-series DB and dashboard, etc.), and find out what happens when the latency fluctuates a lot.

I think it's debug information which helps developers to find out what's the issue and not users (INFO is more like to users in my understanding).

I'm not sure who do you mean by "users". AFAIK, in many cases (not all cases for sure), users = developers = operators.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 20, 2020

While I still think the latency information is the important aspect, I agree the information about size of logs (length, memory usage) is not a thing to be interested unless there's an issue on latency.

I've adjusted the severity of log message as WARN and DEBUG, and differentiate the information to provide according to the severity.

@SparkQA
Copy link

SparkQA commented Feb 21, 2020

Test build #118736 has finished for PR 27557 at commit b3b2e76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the message.
In the PR description, using the UI screenshot is super clear to explain the problem. So I think is it possible to add the load and write time in the UI chart?


val elapsedMs = loadElapsedMs + writeElapsedMs
if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems these two logs could combine into one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was actually one line and I split it because I felt it's a bit long to have it one-liner, as well as message of second line is only for WARN level.
But if it helps to correlate I would do. Let's have more voices on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two logs are fine I guess.

@HeartSaVioR
Copy link
Contributor Author

is it possible to add the load and write time in the UI chart?

This is from specific data source so we may need to have a way to generalize. I roughly remember that we deferred to support custom metrics on DSv2 (except observe which is for users). Once we find time to design the custom metrics, maybe we can represent it to the UI then.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121232 has finished for PR 27557 at commit b3b2e76.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 15, 2020

Test build #121304 has finished for PR 27557 at commit b3b2e76.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 15, 2020

Test build #121315 has finished for PR 27557 at commit b3b2e76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
s" write: $writeElapsedMs ms) for compact batch $batchId")
logWarning(s"Loaded ${allLogs.size} entries (${SizeEstimator.estimate(allLogs)} bytes in " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we clarify this size is estimated in the log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that sounds better. I'll add "(estimated)" after "bytes". Thanks!


object CompactibleFileStreamLog {
val COMPACT_FILE_SUFFIX = ".compact"
val COMPACT_LATENCY_WARN_THRESHOLD_MS = 2000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this value from the practice? I guess it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a heuristic - I think a batch spending more than 2 seconds only for compacting metadata should be noticed to the end users, as the latency here is opaque to end user if we don't log it and they will be questioning.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. a nit and a question.

@SparkQA
Copy link

SparkQA commented Apr 22, 2020

Test build #121626 has finished for PR 27557 at commit b3b2e76.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121647 has finished for PR 27557 at commit 648f0dc.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 23, 2020

Test build #121660 has finished for PR 27557 at commit 648f0dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.0.

HyukjinKwon pushed a commit that referenced this pull request Apr 24, 2020
…n in CompactibleFileStreamLog

### What changes were proposed in this pull request?

This patch adds some log messages to log elapsed time for "compact" operation in FileStreamSourceLog and FileStreamSinkLog (added in CompactibleFileStreamLog) to help investigating the mysterious latency spike during the batch run.

### Why are the changes needed?

Tracking latency is a critical aspect of streaming query. While "compact" operation may bring nontrivial latency (it's even synchronous, adding all the latency to the batch run), it's not measured and end users have to guess.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A for UT. Manual test with streaming query using file source & file sink.

> grep "for compact batch" <driver log>

```
...
20/02/20 19:27:36 WARN FileStreamSinkLog: Compacting took 24473 ms (load: 14185 ms, write: 10288 ms) for compact batch 21359
20/02/20 19:27:39 WARN FileStreamSinkLog: Loaded 1068000 entries (397985432 bytes in memory), and wrote 1068000 entries for compact batch 21359
20/02/20 19:29:52 WARN FileStreamSourceLog: Compacting took 3777 ms (load: 1524 ms, write: 2253 ms) for compact batch 21369
20/02/20 19:29:52 WARN FileStreamSourceLog: Loaded 229477 entries (68970112 bytes in memory), and wrote 229477 entries for compact batch 21369
20/02/20 19:30:17 WARN FileStreamSinkLog: Compacting took 24183 ms (load: 12992 ms, write: 11191 ms) for compact batch 21369
20/02/20 19:30:20 WARN FileStreamSinkLog: Loaded 1068500 entries (398171880 bytes in memory), and wrote 1068500 entries for compact batch 21369
...
```

![Screen Shot 2020-02-21 at 12 34 22 PM](https://user-images.githubusercontent.com/1317309/75002142-c6830100-54a6-11ea-8da6-17afb056653b.png)

This messages are explaining why the operation duration peaks per every 10 batches which is compact interval. Latency from addBatch heavily increases in each peak which DOES NOT mean it takes more time to write outputs, but we have no idea if such message is not presented.

NOTE: The output may be a bit different from the code, as it may be changed a bit during review phase.

Closes #27557 from HeartSaVioR/SPARK-30804.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 39bc50d)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HeartSaVioR
Copy link
Contributor Author

Thanks for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-30804 branch April 24, 2020 08:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants