Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down
33 changes: 31 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.io
import java.io._
import java.util.Locale

import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.LZ4BlockOutputStream
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
Expand Down Expand Up @@ -50,13 +51,14 @@ private[spark] object CompressionCodec {

private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec])
|| codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStandardCompressionCodec])
}

private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)
"snappy" -> classOf[SnappyCompressionCodec].getName,
"zstd" -> classOf[SnappyCompressionCodec].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean ZStandardCompressionCodec ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad. Fixed it.


def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
Expand Down Expand Up @@ -216,3 +218,30 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou
}
}
}

/**
* :: DeveloperApi ::
* ZStandard implementation of [[org.apache.spark.io.CompressionCodec]].
Copy link
Contributor

@tejasapatil tejasapatil Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add this link pointing to more details : http://facebook.github.io/zstd/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

*
* @note The wire protocol for this codec is not guaranteed to be compatible across versions
* of Spark. This is intended for use as an internal compression utility within a single Spark
* application.
*/
@DeveloperApi
class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec {

override def compressedOutputStream(s: OutputStream): OutputStream = {
val level = conf.getSizeAsBytes("spark.io.compression.zstandard.level", "1").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment explaining the reason why we chose level 1 over other levels

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

val compressionBuffer = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • wondering if we should share this config value OR have a new one.
  • do you want to set the default to something higher like 1mb or 4mb ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we should not share the config with lz4, created a new one.
Lets keep the default to 32kb which is aligned with the block size used by other compressions.

// Wrap the zstd output stream in a buffered output stream, so that we can
// avoid overhead excessive of JNI call while trying to compress small amount of data.
new BufferedOutputStream(new ZstdOutputStream(s, level), compressionBuffer)
}

override def compressedInputStream(s: InputStream): InputStream = {
val compressionBuffer = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
new BufferedInputStream(new ZstdInputStream(s), compressionBuffer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite {
testConcatenationOfSerializedStreams(codec)
}

test("zstd compression codec") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStandardCompressionCodec].getName)
assert(codec.getClass === classOf[ZStandardCompressionCodec])
testCodec(codec)
}

test("zstd compression codec short form") {
val codec = CompressionCodec.createCodec(conf, "zstd")
assert(codec.getClass === classOf[ZStandardCompressionCodec])
testCodec(codec)
}

test("zstd supports concatenation of serialized zstd") {
val codec = CompressionCodec.createCodec(conf, classOf[ZStandardCompressionCodec].getName)
assert(codec.getClass === classOf[ZStandardCompressionCodec])
testConcatenationOfSerializedStreams(codec)
}

test("bad compression codec") {
intercept[IllegalArgumentException] {
CompressionCodec.createCodec(conf, "foobar")
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.3.0-1</version>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down