Skip to content

Commit 87bdc2a

Browse files
committed
Fix DataBricks serialization issues and code cleanup
1 parent b001b6b commit 87bdc2a

8 files changed

Lines changed: 48 additions & 18 deletions

File tree

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ val sparkVersion = "3.1.3"
77
val catsVersion = "2.6.1"
88
val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x
99
val scalaTestVersion = "3.2.11"
10+
val framelessVersion = "0.11.1"
1011
val geomesaVersion = "3.3.0"
11-
val geotrellisVersion = "3.6.1+0-6b5868af+20220321-1909-SNAPSHOT" //"3.6.1"
12+
val geotrellisVersion = "3.6.1+1-e69dfae5+20220322-1723-SNAPSHOT" //"3.6.1"
1213

1314
lazy val commonSettings = Seq(
1415
scalaVersion := scalaVersions.head,

core/src/main/scala/com/azavea/hiveless/serializers/HSerializer.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,22 @@ object HSerializer extends Serializable {
5252
def serialize: T => Any = s
5353
}
5454

55-
/** Derive HSerializer from ExpressionEncoder. */
56-
implicit def expressionEncoderSerializer[T: TypeTag](implicit enc: ExpressionEncoder[T]): HSerializer[T] = new HSerializer[T] {
55+
// format: off
56+
/**
57+
* Derive HSerializer from ExpressionEncoder.
58+
* Intentionally not used for instances implementation, causes the following failure on DataBricks:
59+
* org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
60+
* Serialization trace:
61+
* classes (sun.misc.Launcher$AppClassLoader)
62+
* classloader (java.security.ProtectionDomain)
63+
* context (java.security.AccessControlContext)
64+
* acc (com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader)
65+
* classLoader (scala.reflect.runtime.JavaMirrors$JavaMirror)
66+
* mirror (scala.reflect.api.TypeTags$TypeTagImpl)
67+
* tg$1 (com.azavea.hiveless.serializers.HSerializer$$anon$2)
68+
*/
69+
// format: on
70+
def expressionEncoderSerializer[T: TypeTag](implicit enc: ExpressionEncoder[T]): HSerializer[T] = new HSerializer[T] {
5771
def dataType: DataType = enc.schema
5872
def serialize: T => Any = _.toInternalRow
5973
}

core/src/main/scala/com/azavea/hiveless/spark/encoders/syntax/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.reflect.runtime.universe.TypeTag
2525
/**
2626
* Source: https://github.com/locationtech/rasterframes/blob/0.10.1/core/src/main/scala/org/locationtech/rasterframes/encoders/syntax/package.scala
2727
*/
28-
package object syntax {
28+
package object syntax extends Serializable {
2929
implicit class CachedExpressionOps[T](val self: T) extends AnyVal {
3030
def toInternalRow(implicit tag: TypeTag[T], encoder: ExpressionEncoder[T]): InternalRow = {
3131
val toRow = SerializersCache.serializer[T]

project/plugins.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.1")
22
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.5")
33
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")
44
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
5-
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.1.0")
5+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")

spatial-index/sql/createUDFs.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
CREATE OR REPLACE FUNCTION st_crsFromText as 'com.azavea.hiveless.spatial.index.st_crsFromText';
2-
CREATE OR REPLACE FUNCTION st_extentFromGeom as 'com.azavea.hiveless.spatial.index.st_extentFromGeom';
3-
CREATE OR REPLACE FUNCTION st_geomReproject as 'com.azavea.hiveless.spatial.index.st_geomReproject';
4-
CREATE OR REPLACE FUNCTION st_partitionCentroid as 'com.azavea.hiveless.spatial.index.st_partitionCentroid';
5-
CREATE OR REPLACE FUNCTION st_z2LatLon as 'com.azavea.hiveless.spatial.index.st_z2LatLon';
1+
CREATE OR REPLACE FUNCTION st_crsFromText as 'com.azavea.hiveless.spatial.index.ST_CrsFromText';
2+
CREATE OR REPLACE FUNCTION st_extentFromGeom as 'com.azavea.hiveless.spatial.index.ST_ExtentFromGeom';
3+
CREATE OR REPLACE FUNCTION st_geomReproject as 'com.azavea.hiveless.spatial.index.ST_GeomReproject';
4+
CREATE OR REPLACE FUNCTION st_partitionCentroid as 'com.azavea.hiveless.spatial.index.ST_PartitionCentroid';
5+
CREATE OR REPLACE FUNCTION st_z2LatLon as 'com.azavea.hiveless.spatial.index.ST_Z2LatLon';

spatial-index/src/main/scala/com/azavea/hiveless/spark/geotrellis/encoders/StandardEncoders.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import scala.reflect.runtime.universe.TypeTag
2525
trait StandardEncoders extends Serializable {
2626
def expressionEncoder[T: TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
2727

28-
implicit lazy val extentEncoder: ExpressionEncoder[Extent] = expressionEncoder
29-
implicit lazy val z2IndexEncoder: ExpressionEncoder[Z2Index] = expressionEncoder
28+
implicit val extentEncoder: ExpressionEncoder[Extent] = expressionEncoder
29+
implicit val z2IndexEncoder: ExpressionEncoder[Z2Index] = expressionEncoder
3030
}
3131

3232
object StandardEncoders extends StandardEncoders

spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/ST_Z2LatLon.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@ import geotrellis.store.index.zcurve.Z2
2323
import org.locationtech.jts.geom.Geometry
2424

2525
class ST_Z2LatLon extends HUDF[Geometry, Z2Index] {
26-
import ST_Z2LatLon._
27-
2826
val name: String = "st_z2LatLon"
29-
def function = { geom =>
30-
val env = geom.getEnvelopeInternal
31-
Z2Index(z2index(env.getMinX, env.getMinY), z2index(env.getMaxX, env.getMaxY))
32-
}
27+
def function = ST_Z2LatLon.function
3328
}
3429

3530
object ST_Z2LatLon {
31+
def function(geom: Geometry): Z2Index = {
32+
val env = geom.getEnvelopeInternal
33+
Z2Index(z2index(env.getMinX, env.getMinY), z2index(env.getMaxX, env.getMaxY))
34+
}
3635
def scaleLat(lat: Double): Int = ((lat + 90) / 180 * (1 << 30)).toInt
3736
def scaleLong(lng: Double): Int = ((lng + 180) / 360 * (1 << 30)).toInt
3837
def z2index(x: Double, y: Double): Long = Z2(scaleLong(x), scaleLat(y)).z

spatial-index/src/main/scala/com/azavea/hiveless/spatial/index/package.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ package com.azavea.hiveless.spatial
1818

1919
import com.azavea.hiveless.serializers.{HConverter, HSerializer, UnaryDeserializer}
2020
import com.azavea.hiveless.serializers.syntax._
21+
import com.azavea.hiveless.spark.encoders.syntax._
2122
import com.azavea.hiveless.spark.geotrellis.encoders.StandardEncoders
2223
import cats.Id
24+
import com.azavea.hiveless.spark.geotrellis.Z2Index
2325
import geotrellis.proj4.CRS
26+
import geotrellis.vector.Extent
2427
import org.apache.spark.sql.types.{DataType, StringType}
2528

2629
package object index extends StandardEncoders {
@@ -35,4 +38,17 @@ package object index extends StandardEncoders {
3538
def dataType: DataType = StringType
3639
def serialize: CRS => Any = crs => crs.toProj4String.serialize
3740
}
41+
42+
/**
43+
* HSerializer.expressionEncoderSerializer causes serialization issues on DataBricks. TODO: investigate this issue.
44+
*/
45+
implicit def extentSerializer: HSerializer[Extent] = new HSerializer[Extent] {
46+
def dataType: DataType = extentEncoder.schema
47+
def serialize: Extent => Any = _.toInternalRow
48+
}
49+
50+
implicit def z2IndexSerializer: HSerializer[Z2Index] = new HSerializer[Z2Index] {
51+
def dataType: DataType = z2IndexEncoder.schema
52+
def serialize: Z2Index => Any = _.toInternalRow
53+
}
3854
}

0 commit comments

Comments
 (0)