Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
run: sbt scalafmtCheckAll

- name: Build project
run: sbt test
run: sbt +test

publish:
name: Publish Artifacts
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ It adds typed HiveUDFs and implements Spatial Hive UDFs. It consists of the foll

* `hiveless-core` with the typed Hive UDFs API and the initial base set of codecs
* `hiveless-spatial` with Hive GIS UDFs (depends on [GeoMesa](https://github.com/locationtech/geomesa))
* There is also a forked release [CartoDB/analytics-toolbox-databricks](https://github.com/CartoDB/analytics-toolbox-databricks), which is a complete `hiveless-spatial` copy at this point. However, it may contain an extended GIS functionality in the future.
* `hiveless-spatial-index` with extra Hive GIS UDFs that may be used for the GIS indexing purposes (depends on [GeoMesa](https://github.com/locationtech/geomesa) and [GeoTrellis](https://github.com/locationtech/geotrellis))
* There is also a forked release [CartoDB/analytics-toolbox-databricks](https://github.com/CartoDB/analytics-toolbox-databricks), which is a complete `hiveless-spatial` and `hiveless-spatial-index` copy at this point. However, it may contain an extended GIS functionality in the future.

## Hiveless-spatial supported GIS functions
## Hiveless Spatial supported GIS functions

```sql
CREATE OR REPLACE FUNCTION st_geometryFromText as 'com.azavea.hiveless.spatial.ST_GeomFromWKT';
Expand Down
47 changes: 39 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,25 @@ import java.time.Year

val scalaVersions = Seq("2.12.15")

val catsVersion = "2.7.0"
val shapelessVersion = "2.3.3" // to be compatible with Spark 3.1.x
val scalaTestVersion = "3.2.11"
val framelessVersion = "0.11.1"
val geomesaVersion = "3.3.0"
val geotrellisVersion = "3.6.1+1-e4aeec2a-SNAPSHOT"

def ver(for212: String, for213: String) = Def.setting {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 12)) => for212
case Some((2, 13)) => for213
case _ => sys.error("not good")
}
}

def spark(module: String) = Def.setting {
"org.apache.spark" %% s"spark-$module" % ver("3.1.3", "3.2.1").value
}

lazy val commonSettings = Seq(
scalaVersion := scalaVersions.head,
crossScalaVersions := scalaVersions,
Expand Down Expand Up @@ -40,7 +59,8 @@ lazy val commonSettings = Seq(
existingText.flatMap(_ => existingText.map(_.trim)).getOrElse(newText)
}
)
)
),
resolvers += "sonatype-snapshot" at "https://oss.sonatype.org/content/repositories/snapshots/"
)

lazy val root = (project in file("."))
Expand All @@ -52,18 +72,18 @@ lazy val root = (project in file("."))
publish := {},
publishLocal := {}
)
.aggregate(core, spatial)
.aggregate(core, spatial, `spatial-index`)

lazy val core = project
.settings(commonSettings)
.settings(name := "hiveless-core")
.settings(
addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.2" cross CrossVersion.full),
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-core" % "2.6.1",
"com.chuusai" %% "shapeless" % "2.3.3", // to be compatible with Spark 3.1.x
"org.apache.spark" %% "spark-hive" % "3.1.2" % Provided,
"org.scalatest" %% "scalatest" % "3.2.11" % Test
"org.typelevel" %% "cats-core" % catsVersion,
"com.chuusai" %% "shapeless" % shapelessVersion,
spark("hive").value % Provided,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
)

Expand All @@ -73,8 +93,19 @@ lazy val spatial = project
.settings(name := "hiveless-spatial")
.settings(
libraryDependencies ++= Seq(
"org.locationtech.geomesa" %% "geomesa-spark-jts" % "3.3.0",
"org.scalatest" %% "scalatest" % "3.2.10" % Test
"org.locationtech.geomesa" %% "geomesa-spark-jts" % geomesaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
)
)

lazy val `spatial-index` = project
.dependsOn(spatial % "compile->compile;provided->provided")
.settings(commonSettings)
.settings(name := "hiveless-spatial-index")
.settings(
libraryDependencies ++= Seq(
"org.locationtech.geotrellis" %% "geotrellis-store" % geotrellisVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
),
assembly / test := {},
assembly / assemblyShadeRules := {
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/com/azavea/hiveless/implicits/tupler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.implicits

import shapeless.{Generic, HList, IsTuple}
import shapeless.ops.function.FnToProduct

object tupler extends Serializable {
// format: off
/**
* Tuples FunctionN.
* Converts FunctionN(arg1, ..., argn) => {} into arg1 :: ... :: argn :: HList => {} via FnToProduct
* T is a generic tuple, converted into HList via Generic and applied to the HList function
*/
// format: on
implicit def tuplerGeneric[F, I <: HList, O, T: IsTuple](f: F)(implicit ftp: FnToProduct.Aux[F, I => O], gen: Generic.Aux[T, I]): T => O = { t: T =>
ftp(f)(gen.to(t))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ object GenericDeserializer extends Serializable {
dt: GenericDeserializer[F, T]
): GenericDeserializer[F, H :: T] = new GenericDeserializer[F, H :: T] {
def deserialize(arguments: Array[GenericUDF.DeferredObject], inspectors: Array[ObjectInspector]): F[H :: T] =
(dh.deserialize(arguments.head, inspectors.head), dt.deserialize(arguments.tail, inspectors.tail)).mapN(_ :: _)
// take and drop allow us to handle options safely
// take is left for semantics reasons only
(dh.deserialize(arguments.take(1), inspectors.take(1)), dt.deserialize(arguments.drop(1), inspectors.drop(1))).mapN(_ :: _)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.azavea.hiveless.serializers

import com.azavea.hiveless.spark.encoders.syntax._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.sql.catalyst.util.ArrayData

import java.{lang => jl}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

trait HSerializer[T] extends Serializable {
def dataType: DataType
Expand All @@ -41,14 +44,35 @@ object HSerializer extends Serializable {
* Intentionally not used for instances implementation, causes the following failure on DataBricks:
* Unable to find class: com.azavea.hiveless.serializers.HSerializer$$$Lambda$5659/1670981434
* Serialization trace:
* s$1 (com.azavea.hiveless.serializers.HSerializer$$anon$1)
* s$1 (com.azavea.hiveless.serializers.HSerializer$$anon$1)
*/
// format: on
def instance[T](dt: DataType, s: T => Any): HSerializer[T] = new HSerializer[T] {
val dataType: DataType = dt
def serialize: T => Any = s
}

// format: off
/**
* Derive HSerializer from ExpressionEncoder.
* Intentionally not used for instances implementation, causes the following failure on DataBricks;
* TypeTags are not Kryo serializable by default:
* org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
* Serialization trace:
* classes (sun.misc.Launcher$AppClassLoader)
* classloader (java.security.ProtectionDomain)
* context (java.security.AccessControlContext)
* acc (com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader)
* classLoader (scala.reflect.runtime.JavaMirrors$JavaMirror)
* mirror (scala.reflect.api.TypeTags$TypeTagImpl)
* tg$1 (com.azavea.hiveless.serializers.HSerializer$$anon$2)
*/
// format: on
def expressionEncoderSerializer[T: TypeTag](implicit enc: ExpressionEncoder[T]): HSerializer[T] = new HSerializer[T] {
def dataType: DataType = enc.schema
def serialize: T => Any = _.toInternalRow
}

implicit val booleanSerializer: HSerializer[Boolean] = new IdHSerializer[Boolean] { def dataType: DataType = BooleanType }
implicit val doubleSerializer: HSerializer[Double] = new IdHSerializer[Double] { def dataType: DataType = DoubleType }
implicit val floatSerializer: HSerializer[Float] = new IdHSerializer[Float] { def dataType: DataType = FloatType }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@

package com.azavea.hiveless.serializers

import com.azavea.hiveless.implicits.syntax._
import com.azavea.hiveless.serializers.syntax._
import com.azavea.hiveless.spark.encoders.syntax._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hive.HivelessInternals.unwrap
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String
import cats.Id
import cats.syntax.apply._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.util.ArrayData
import shapeless.HNil

import scala.reflect.ClassTag
import scala.util.Try
import scala.reflect.runtime.universe.TypeTag

trait UnaryDeserializer[F[_], T] extends HDeserialier[F, T]

object UnaryDeserializer extends Serializable {
def apply[F[_], T](implicit ev: UnaryDeserializer[F, T]): UnaryDeserializer[F, T] = ev

def id[T](implicit ev: UnaryDeserializer[Id, T]): UnaryDeserializer[Id, T] = ev

// format: off
/**
* On DataBricks:
Expand All @@ -43,7 +49,32 @@ object UnaryDeserializer extends Serializable {
*/
// format: on
implicit def tryUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Try, T] =
(arguments, inspectors) => Try(UnaryDeserializer[Id, T].deserialize(arguments, inspectors))
(arguments, inspectors) => Try(id[T].deserialize(arguments, inspectors))

implicit def optionalUnaryDeserializer[T: UnaryDeserializer[Id, *]]: UnaryDeserializer[Id, Option[T]] =
(arguments, inspectors) => (arguments.headOption, inspectors.headOption).mapN(id[T].deserialize)

// format: off
/**
* Derive UnaryDeserializers from ExpressionEncoders.
* Intentionally not used for instances implementation, causes the following failure on DataBricks;
* TypeTags are not Kryo serializable by default:
* org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
* Serialization trace:
* classes (sun.misc.Launcher$AppClassLoader)
* classloader (java.security.ProtectionDomain)
* context (java.security.AccessControlContext)
* acc (com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader)
* classLoader (scala.reflect.runtime.JavaMirrors$JavaMirror)
* mirror (scala.reflect.api.TypeTags$TypeTagImpl)
* evidence$3$1 (com.azavea.hiveless.serializers.UnaryDeserializer$$anonfun$expressionEncoderUnaryDeserializer$2)
* evidence$1$1 (com.azavea.hiveless.serializers.UnaryDeserializer$$anonfun$tryUnaryDeserializer$3)
* dh$1 (com.azavea.hiveless.serializers.GenericDeserializer$$anon$4)
* d$2 (com.azavea.hiveless.serializers.GenericDeserializer$$anon$2)
*/
// format: on
def expressionEncoderUnaryDeserializer[T: TypeTag: ExpressionEncoder]: UnaryDeserializer[Id, T] =
(arguments, inspectors) => arguments.deserialize[InternalRow](inspectors).as[T]

/** Derivation helper deserializer. */
implicit val hnilUnaryDeserializer: UnaryDeserializer[Id, HNil] = (_, _) => HNil
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/scala/com/azavea/hiveless/serializers/syntax.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.serializers

import cats.Id
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector

object syntax extends Serializable {
implicit class DeferredObjectOps(val self: GenericUDF.DeferredObject) extends AnyVal {

/** Behaves like a regular get, but throws when the result is null. */
def getNonEmpty: AnyRef = Option(self.get) match {
case Some(r) => r
case _ => throw HDeserialier.Errors.NullArgument
}
}

implicit class ArrayDeferredObjectOps(val self: Array[GenericUDF.DeferredObject]) extends AnyVal {
def deserializeF[F[_], T: UnaryDeserializer[F, *]](inspectors: Array[ObjectInspector]): F[T] =
UnaryDeserializer[F, T].deserialize(self, inspectors)

def deserialize[T: UnaryDeserializer[Id, *]](inspectors: Array[ObjectInspector]): T =
deserializeF[Id, T](inspectors)
}

implicit class ConverterOps(val self: Any) extends AnyVal {
def convert[T: HConverter]: T = HConverter[T].convert(self)
}

implicit class SerializerOps[T](val self: T) extends AnyVal {
def serialize(implicit ev: HSerializer[T]): Any = HSerializer[T].serialize(self)
}
}
Loading