Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down
36 changes: 34 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.io
import java.io._
import java.util.Locale

import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.LZ4BlockOutputStream
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
Expand Down Expand Up @@ -50,13 +51,14 @@ private[spark] object CompressionCodec {

private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec])
|| codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
}

private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
"snappy" -> classOf[SnappyCompressionCodec].getName,
"zstd" -> classOf[ZStdCompressionCodec].getName)

def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
Expand Down Expand Up @@ -216,3 +218,33 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou
}
}
}

/**
* :: DeveloperApi ::
* ZStandard implementation of [[org.apache.spark.io.CompressionCodec]]. For more
* details see - http://facebook.github.io/zstd/
*
* @note The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
@DeveloperApi
class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
// Default compression level for zstd compression to 1 because it is
// fastest of all with reasonably high compression ratio.
val level = conf.getSizeAsBytes("spark.io.compression.zstd.level", "1").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this getInt instead of getSizeAsBytes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye, fixed.

val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have this variable as a private variable to get this property only once?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it's simpler and cleaner, as it avoids duplicating this property in this file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia how about comments like this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry somehow missed these comments. Will address.

// Wrap the zstd output stream in a buffered output stream, so that we can
// avoid overhead excessive of JNI call while trying to compress small amount of data.
new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize)
}

override def compressedInputStream(s: InputStream): InputStream = {
val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
new BufferedInputStream(new ZstdInputStream(s), bufferSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite {
testConcatenationOfSerializedStreams(codec)
}

test("zstd compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
}

test("zstd compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "zstd")
assert(codec.getClass === classOf[ZStdCompressionCodec])
testCodec(codec)
}

test("zstd supports concatenation of serialized zstd") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
assert(codec.getClass === classOf[ZStdCompressionCodec])
testConcatenationOfSerializedStreams(codec)
}

test("bad compression codec") {
intercept[IllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar")
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,4 @@ xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.0.jar
zookeeper-3.4.6.jar
zstd-jni-1.3.0-1.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be updated, it seems

1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,4 @@ xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.0.jar
zookeeper-3.4.6.jar
zstd-jni-1.3.0-1.jar
20 changes: 19 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@ Apart from these, the following properties are also available, and may be useful
e.g.
<code>org.apache.spark.io.LZ4CompressionCodec</code>,
<code>org.apache.spark.io.LZFCompressionCodec</code>,
and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
<code>org.apache.spark.io.SnappyCompressionCodec</code>.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: '.' -> ','

and <code>org.apache.spark.io.ZstdCompressionCodec</code>.
</td>
</tr>
<tr>
Expand All @@ -885,6 +886,23 @@ Apart from these, the following properties are also available, and may be useful
is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
</td>
</tr>
<tr>
<td><code>spark.io.compression.zstd.level</code></td>
<td>1</td>
<td>
Compression leve for Zstd compression codec. Increasing the compression level will result in better
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: leve -> level

compression at the expense of more CPU and memory.
</td>
</tr>
<tr>
<td><code>spark.io.compression.zstd.bufferSize</code></td>
<td>32k</td>
<td>
Buffer size used in Zstd compression, in the case when Zstd compression codec
is used. Lowering this size will lower the shuffle memory usage when Zstd is used, but it
might increase the compression cost because of excessive JNI call overhead.
</td>
</tr>
<tr>
<td><code>spark.kryo.classesToRegister</code></td>
<td>(none)</td>
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.3.0-1</version>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down