Skip to content

Commit 9a0b75c

Browse files
squitorxin
authored andcommitted
[SPARK-5949] HighlyCompressedMapStatus needs more classes registered w/ kryo
https://issues.apache.org/jira/browse/SPARK-5949 Author: Imran Rashid <[email protected]> Closes #4877 from squito/SPARK-5949_register_roaring_bitmap and squashes the following commits: 7e13316 [Imran Rashid] style style style 5f6bb6d [Imran Rashid] more style 709bfe0 [Imran Rashid] style a5cb744 [Imran Rashid] update tests to cover both types of RoaringBitmapContainers 09610c6 [Imran Rashid] formatting f9a0b7c [Imran Rashid] put primitive array registrations together 97beaf8 [Imran Rashid] SPARK-5949 HighlyCompressedMapStatus needs more classes registered w/ kryo (cherry picked from commit 1f1fccc) Signed-off-by: Reynold Xin <[email protected]>
1 parent 8446ad0 commit 9a0b75c

2 files changed

Lines changed: 33 additions & 5 deletions

File tree

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,23 @@ package org.apache.spark.serializer
2020
import java.io.{EOFException, InputStream, OutputStream}
2121
import java.nio.ByteBuffer
2222

23+
import scala.reflect.ClassTag
24+
2325
import com.esotericsoftware.kryo.{Kryo, KryoException}
2426
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
2527
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
2628
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
29+
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
2730

2831
import org.apache.spark._
2932
import org.apache.spark.api.python.PythonBroadcast
3033
import org.apache.spark.broadcast.HttpBroadcast
31-
import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock}
34+
import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
3235
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
3336
import org.apache.spark.storage._
3437
import org.apache.spark.util.BoundedPriorityQueue
3538
import org.apache.spark.util.collection.CompactBuffer
3639

37-
import scala.reflect.ClassTag
38-
3940
/**
4041
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
4142
*
@@ -202,9 +203,17 @@ private[serializer] object KryoSerializer {
202203
classOf[GetBlock],
203204
classOf[CompressedMapStatus],
204205
classOf[HighlyCompressedMapStatus],
206+
classOf[RoaringBitmap],
207+
classOf[RoaringArray],
208+
classOf[RoaringArray.Element],
209+
classOf[Array[RoaringArray.Element]],
210+
classOf[ArrayContainer],
211+
classOf[BitmapContainer],
205212
classOf[CompactBuffer[_]],
206213
classOf[BlockManagerId],
207214
classOf[Array[Byte]],
215+
classOf[Array[Short]],
216+
classOf[Array[Long]],
208217
classOf[BoundedPriorityQueue[_]],
209218
classOf[SparkConf]
210219
)

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import scala.reflect.ClassTag
2323
import com.esotericsoftware.kryo.Kryo
2424
import org.scalatest.FunSuite
2525

26-
import org.apache.spark.{SparkConf, SharedSparkContext}
26+
import org.apache.spark.{SharedSparkContext, SparkConf}
27+
import org.apache.spark.scheduler.HighlyCompressedMapStatus
2728
import org.apache.spark.serializer.KryoTest._
28-
29+
import org.apache.spark.storage.BlockManagerId
2930

3031
class KryoSerializerSuite extends FunSuite with SharedSparkContext {
3132
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
@@ -242,6 +243,24 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
242243
ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
243244
}
244245
}
246+
247+
test("registration of HighlyCompressedMapStatus") {
248+
val conf = new SparkConf(false)
249+
conf.set("spark.kryo.registrationRequired", "true")
250+
251+
// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
252+
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
253+
// array (sparse) if they use less. So we just create two cases, one sparse and one dense.
254+
// and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
255+
// empty blocks
256+
257+
val ser = new KryoSerializer(conf).newInstance()
258+
val denseBlockSizes = new Array[Long](5000)
259+
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
260+
Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes =>
261+
ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes))
262+
}
263+
}
245264
}
246265

247266

0 commit comments

Comments
 (0)