Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.roaringbitmap.{ArrayContainer, RoaringArray, RoaringBitmap}


import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
Expand Down Expand Up @@ -202,6 +204,12 @@ private[serializer] object KryoSerializer {
classOf[GetBlock],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[Array[Short]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this right below array[byte]?

classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.serializer

import org.apache.spark.scheduler.HighlyCompressedMapStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort the import

import org.apache.spark.storage.BlockManagerId

import scala.collection.mutable
import scala.reflect.ClassTag

Expand Down Expand Up @@ -242,6 +245,16 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
}
}

test("registration of HighlyCompressedMapStatus") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
val hcmo = HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), Array(0l,2l,5l))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually here. again style nit, add a space after comma for the arrays. and use capital L rather than lowercase l because it is hard to tell the difference between lowercase l and capital L

val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance()
serInstance.serialize(hcmo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this actually fail without registration? My understanding is that the serialized data would contain class names instead of a more efficient identifier, but would still work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

He set "spark.kryo.registrationRequired" to true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks for pointing that out.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this blank line

}
}


Expand Down