Skip to content

Conversation

@sitalkedia
Copy link

@sitalkedia sitalkedia commented Aug 2, 2017

What changes were proposed in this pull request?

Using zstd compression for Spark jobs spilling 100s of TBs of data, we could reduce the amount of data written to disk by as much as 50%. This translates to significant latency gain because of reduced disk io operations. There is a degradation CPU time by 2 - 5% because of zstd compression overhead, but for jobs which are bottlenecked by disk IO, this hit can be taken.

Benchmark

Please note that this benchmark is using real world compute heavy production workload spilling TBs of data to disk

zstd performance as compred to LZ4
spill/shuffle bytes -48%
cpu time + 3%
cpu reservation time -40%
latency -40%

How was this patch tested?

Tested by running few jobs spilling large amount of data on the cluster and amount of intermediate data written to disk reduced by as much as 50%.

@sitalkedia
Copy link
Author

cc - @srowen, @tgravescs, @rxin, @sameeragarwal

@sitalkedia
Copy link
Author

Old PR - #17303

@rxin
Copy link
Contributor

rxin commented Aug 2, 2017

Any benchmark data?

@SparkQA
Copy link

SparkQA commented Aug 2, 2017

Test build #80140 has finished for PR 18805 at commit cff558b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec

@SparkQA
Copy link

SparkQA commented Aug 2, 2017

Test build #80141 has finished for PR 18805 at commit 4ee4d2b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia sitalkedia force-pushed the skedia/upstream_zstd branch from 4ee4d2b to 287a9da Compare August 2, 2017 01:14
@SparkQA
Copy link

SparkQA commented Aug 2, 2017

Test build #80142 has finished for PR 18805 at commit 287a9da.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec

@HyukjinKwon
Copy link
Member

cc @dongjinleekr too.

@sitalkedia
Copy link
Author

@rxin - Updated with benchmark data on our production workload.

@sitalkedia
Copy link
Author

sitalkedia commented Aug 2, 2017

Please note that few minor improvements I have made as comapred to old PR - #17303

  1. Use zstd compression level 1 instead of 3, which is significantly faster.
  2. Wrap the zstd input/output stream in buffered input/output stream to avoid overhead of excessive JNI call.

@sitalkedia
Copy link
Author

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Aug 2, 2017

Test build #80144 has finished for PR 18805 at commit 287a9da.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec

@sitalkedia
Copy link
Author

Any idea what is the build failure about?

"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
"snappy" -> classOf[SnappyCompressionCodec].getName,
"zstd" -> classOf[SnappyCompressionCodec].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean ZStandardCompressionCodec ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad. Fixed it.


/**
* :: DeveloperApi ::
* ZStandard implementation of [[org.apache.spark.io.CompressionCodec]].
Copy link
Contributor

@tejasapatil tejasapatil Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add this link pointing to more details : http://facebook.github.io/zstd/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val level = conf.getSizeAsBytes("spark.io.compression.zstandard.level", "1").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment explaining the reason why we chose level 1 over other levels

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


override def compressedOutputStream(s: OutputStream): OutputStream = {
val level = conf.getSizeAsBytes("spark.io.compression.zstandard.level", "1").toInt
val compressionBuffer = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • wondering if we should share this config value OR have a new one.
  • do you want to set the default to something higher like 1mb or 4mb ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we should not share the config with lz4, created a new one.
Lets keep the default to 32kb which is aligned with the block size used by other compressions.

@tejasapatil
Copy link
Contributor

In Benchmark section the values for Lz4 are all zeros which feels confusing while reading.. first thing I thought is they were absolute values but they are supposed to be relative

@tejasapatil
Copy link
Contributor

re build failure: you can repro that locally by running "./dev/test-dependencies.sh". Its failing due to introducing a new dep... you need to add it to dev/deps/spark-deps-hadoop-XXX

@rxin
Copy link
Contributor

rxin commented Aug 2, 2017

How big is the dependency that's getting pulled in? If we are adding more compression codecs maybe we should retire some old ones, or move them into a separate package so downstream apps can optionally depend on them.

@SparkQA
Copy link

SparkQA commented Aug 2, 2017

Test build #80148 has finished for PR 18805 at commit 295f38a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec

@srowen
Copy link
Member

srowen commented Aug 2, 2017

Why does this need to be in Spark? and what are the licensing terms of the native code underneath (just suspicious because it's often GPL)? can a user not just add this with their app?

I tend to think we support what Hadoop supports for us here. Doesn't a later Hadoop pull this in?

@tgravescs
Copy link
Contributor

tgravescs commented Aug 2, 2017

Why does this need to be in Spark?

@srowen you already asked that question and it has been answered on the jira as well as the old pr. A user cannot add zstd compression to the internal spark parts: spark.io.compression.codec. In this particular case he is saying its the shuffle output where its making a big difference.
zstd is already included in other open source projects like Hadoop, but again we don't get that for Spark internal compression code, zstd itself is BSD license. It looks like this pr is using the https://github.com/luben/zstd-jni which also appears to be BSD licensed. We need to decide if using that is ok for us to use directly. Hadoop wrote its own version but I would say if that version is working we use it. Worse case if something happens where that user won't fix something we could fork it and aren't any worse then having our own copy to start with.

@srowen
Copy link
Member

srowen commented Aug 2, 2017

Got it, thanks for the reminder. I think the question is mostly about license and dependency weight then. I think we'd want to use whatever Hadoop provides.

<code>org.apache.spark.io.LZ4CompressionCodec</code>,
<code>org.apache.spark.io.LZFCompressionCodec</code>,
and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
<code>org.apache.spark.io.SnappyCompressionCodec</code>.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: '.' -> ','

<tr>
<td><code>spark.io.compression.zstd.level</code></td>
<td>1</td>
<td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leve -> level

// Default compression level for zstd compression to 1 because it is
// fastest of all with reasonably high compression ratio.
val level = conf.getSizeAsBytes("spark.io.compression.zstd.level", "1").toInt
val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have this variable as a private variable to get this property only once?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it's simpler and cleaner, as it avoids duplicating this property in this file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia how about comments like this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry somehow missed these comments. Will address.

@rxin
Copy link
Contributor

rxin commented Aug 2, 2017

Our compression codec is actually completely decoupled from Hadoops, but dependency management (and licensing) can be annoying to deal with.

@vanzin
Copy link
Contributor

vanzin commented Oct 11, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82644 has finished for PR 18805 at commit 029a753.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Oct 11, 2017

Same test failed, so looks like there's a real non-infra-related issue...

@hvanhovell
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 13, 2017

Test build #82729 has finished for PR 18805 at commit 029a753.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Oct 13, 2017

I haven't been able to reproduce the issue locally, but looking at the jenkins logs I see a bunch of exceptions like these:

17/10/13 06:53:26.609 dispatcher-event-loop-15 ERROR Worker: Failed to launch executor app-20171013030138-0000/3 for Test replay.
java.lang.IllegalStateException: Shutdown hooks cannot be modified during shutdown.
        at org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:195)

And:

17/10/13 06:53:26.687 pool-1-thread-1-ScalaTest-running-ExternalAppendOnlyMapSuite WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor).  This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:127)
org.apache.spark.util.collection.ExternalAppendOnlyMapSuite$$anonfun$12.apply$mcV$sp(ExternalAppendOnlyMapSuite.scala:30

Note that the first error mentions the app name used by ReplayListenerSuite but it actually happens in a completely separate test suite. At the very least, ReplayListenerSuite is doing a poor job of cleaning up after itself and we should fix that.

@vanzin
Copy link
Contributor

vanzin commented Oct 13, 2017

(I'll file a bug and send a PR for it separately, btw.)

@vanzin
Copy link
Contributor

vanzin commented Oct 13, 2017

Turns out that's caused by SparkContext failing to clean up after itself when the UnsatisfiedLinkError happens, so those errors are red herrings...

@hvanhovell
Copy link
Contributor

This seems to be caused by a issue in the zstd-jni library. It probably uses the wrong ClassLoader to load the native library, and as a result it cannot find the library & load it.

@vanzin
Copy link
Contributor

vanzin commented Oct 13, 2017

Yeah but that would also cause it to fail locally if it were the cause, and it passes for me. I can't really figure out from the rest of the logs if something obvious is wrong, so I guess the best bet now is to ask for changes in the zstd-jni so that all errors are properly reported (see #18805 (comment)).

@vanzin
Copy link
Contributor

vanzin commented Oct 13, 2017

Good news is that I can reproduce it on the amplab machine, so I'll try to play around with the zstd-jni code a bit.

@vanzin
Copy link
Contributor

vanzin commented Oct 13, 2017

$ ldd linux/amd64/libzstd-jni.so 
ldd: warning: you do not have execution permission for `linux/amd64/libzstd-jni.so'
linux/amd64/libzstd-jni.so: /lib64/libc.so.6: version `GLIBC_2.14' not found (required by linux/amd64/libzstd-jni.so)
        linux-vdso.so.1 =>  (0x00007ffe0dfda000)
        libc.so.6 => /lib64/libc.so.6 (0x00007f89eb3a4000)
        /lib64/ld-linux-x86-64.so.2 (0x0000003612e00000)

Mystery solved; library is compiled with a newer glibc requirement than the amplab machines have. Can we ask them to tweak their compilation to support older Linux distros?

$ cat /etc/issue
CentOS release 6.9 (Final)

@sitalkedia
Copy link
Author

Created luben/zstd-jni#47.

@SparkQA
Copy link

SparkQA commented Oct 19, 2017

Test build #82911 has finished for PR 18805 at commit 2580633.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

ping.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from one minor question from an old comment that's looking good. The licenses seem in order.

// Default compression level for zstd compression to 1 because it is
// fastest of all with reasonably high compression ratio.
val level = conf.getSizeAsBytes("spark.io.compression.zstd.level", "1").toInt
val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia how about comments like this?

override def compressedOutputStream(s: OutputStream): OutputStream = {
// Default compression level for zstd compression to 1 because it is
// fastest of all with reasonably high compression ratio.
val level = conf.getSizeAsBytes("spark.io.compression.zstd.level", "1").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this getInt instead of getSizeAsBytes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye, fixed.

@SparkQA
Copy link

SparkQA commented Oct 30, 2017

Test build #83204 has finished for PR 18805 at commit eba3024.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@DeveloperApi
class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {

val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be private. The intent was to lift both config values out of the method, so level can do here too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@SparkQA
Copy link

SparkQA commented Nov 1, 2017

Test build #83282 has finished for PR 18805 at commit 95e6b8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

Merging to master. Thanks for seeing this through!

@asfgit asfgit closed this in 444bce1 Nov 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.