Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Locale

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.column.schema.EdgeInterpolationAlgorithm
import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, PrimitiveColumnIO}
import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation._
Expand All @@ -31,6 +32,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.VariantMetadata
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{EdgeInterpolationAlgorithm => SparkEdgeInterpolationAlgorithm}

/**
* This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]]
Expand Down Expand Up @@ -336,6 +338,17 @@ class ParquetToSparkSchemaConverter(
case null => BinaryType
case _: BsonLogicalTypeAnnotation => BinaryType
case _: DecimalLogicalTypeAnnotation => makeDecimalType()
case geom: GeometryLogicalTypeAnnotation =>
GeometryType(Option(geom.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS))
case geog: GeographyLogicalTypeAnnotation =>
val crs = Option(geog.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS)
val sparkAlgorithm = if (geog.getAlgorithm != null) {
SparkEdgeInterpolationAlgorithm.fromString(geog.getAlgorithm.toString)
.getOrElse(SparkEdgeInterpolationAlgorithm.SPHERICAL)
} else {
SparkEdgeInterpolationAlgorithm.SPHERICAL
}
GeographyType(crs, sparkAlgorithm)
case _ => illegalType()
}

Expand Down Expand Up @@ -653,6 +666,17 @@ class SparkToParquetSchemaConverter(
Types.primitive(BINARY, repetition)
.as(LogicalTypeAnnotation.stringType()).named(field.name)

case geom: GeometryType =>
Types.primitive(BINARY, repetition)
.as(LogicalTypeAnnotation.geometryType(geom.crs)).named(field.name)

case geog: GeographyType =>
val logicalType = LogicalTypeAnnotation.geographyType(
geog.crs,
EdgeInterpolationAlgorithm.valueOf(geog.algorithm.toString))
Types.primitive(BINARY, repetition)
.as(logicalType).named(field.name)

case DateType =>
Types.primitive(INT32, repetition)
.as(LogicalTypeAnnotation.dateType()).named(field.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2204,6 +2204,247 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
Seq("f1", "key_value", "value", "g2", "list", "element", "h2"))))))))
))))))

// ==============================================================
// Tests for BINARY geospatial logical types (Geometry/Geography)
// ==============================================================

/** Parquet to Catalyst conversion for geospatial types. */

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOMETRY logical type annotation",
StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
"""message root {
| optional binary f1 (GEOMETRY);
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true)

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOMETRY logical type annotation (with CRS)",
StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
"""message root {
| optional binary f1 (GEOMETRY(OGC:CRS84));
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true)

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
"""message root {
| optional binary f1 (GEOGRAPHY);
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true)

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with CRS)",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
"""message root {
| optional binary f1 (GEOGRAPHY(OGC:CRS84));
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true)

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with CRS and algorithm)",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
"""message root {
| optional binary f1 (GEOGRAPHY(OGC:CRS84, SPHERICAL));
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true)

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOMETRY logical type annotation with binaryAsString",
StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
"""message root {
| optional binary f1 (GEOMETRY);
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true)

testParquetToCatalyst(
"Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation with binaryAsString",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
"""message root {
| optional binary f1 (GEOGRAPHY);
|}
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true)

/** Catalyst to Parquet conversion for geospatial types. */

testCatalystToParquet(
"Catalyst to Parquet - GeometryType",
StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
"""message root {
| optional binary f1 (GEOMETRY);
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Catalyst to Parquet - GeographyType",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
"""message root {
| optional binary f1 (GEOGRAPHY);
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Catalyst to Parquet - GeometryType with non-nullable field",
StructType(Seq(StructField("f1", GeometryType("OGC:CRS84"), nullable = false))),
"""message root {
| required binary f1 (GEOMETRY);
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Catalyst to Parquet - GeographyType with non-nullable field",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), nullable = false))),
"""message root {
| required binary f1 (GEOGRAPHY);
|}
""".stripMargin,
writeLegacyParquetFormat = false)

/** Round trip conversion for geospatial types. */

testSchema(
"Round-trip schema conversion - GeometryType",
StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
"""message root {
| optional binary f1 (GEOMETRY);
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true,
writeLegacyParquetFormat = false)

testSchema(
"Round-trip schema conversion - GeographyType",
StructType(Seq(StructField("f1",
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
"""message root {
| optional binary f1 (GEOGRAPHY);
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true,
writeLegacyParquetFormat = false)

/** Complex types with geospatial types. */

testCatalystToParquet(
"Complex types with GeometryType - array element",
StructType(Seq(
StructField("f1", ArrayType(GeometryType("OGC:CRS84"), containsNull = true))
)),
"""message root {
| optional group f1 (LIST) {
| repeated group list {
| optional binary element (GEOMETRY);
| }
| }
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Complex types with GeographyType - array element",
StructType(Seq(
StructField("f1", ArrayType(
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), containsNull = true))
)),
"""message root {
| optional group f1 (LIST) {
| repeated group list {
| optional binary element (GEOGRAPHY);
| }
| }
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Complex types with GeometryType - nested struct",
StructType(Seq(
StructField("outer", StructType(Seq(
StructField("geom", GeometryType("OGC:CRS84"))
)))
)),
"""message root {
| optional group outer {
| optional binary geom (GEOMETRY);
| }
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Complex types with GeographyType - nested struct",
StructType(Seq(
StructField("outer", StructType(Seq(
StructField("geog", GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL))
)))
)),
"""message root {
| optional group outer {
| optional binary geog (GEOGRAPHY);
| }
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Complex types with GeometryType - map value",
StructType(Seq(
StructField("f1", MapType(StringType, GeometryType("OGC:CRS84"), valueContainsNull = true))
)),
"""message root {
| optional group f1 (MAP) {
| repeated group key_value {
| required binary key (UTF8);
| optional binary value (GEOMETRY);
| }
| }
|}
""".stripMargin,
writeLegacyParquetFormat = false)

testCatalystToParquet(
"Complex types with GeographyType - map value",
StructType(Seq(
StructField("f1", MapType(StringType,
GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), valueContainsNull = true))
)),
"""message root {
| optional group f1 (MAP) {
| repeated group key_value {
| required binary key (UTF8);
| optional binary value (GEOGRAPHY);
| }
| }
|}
""".stripMargin,
writeLegacyParquetFormat = false)

// =================================
// Tests for conversion for decimals
// =================================
Expand Down