diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 9e6f4447ca792..183a45f1cf23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale import org.apache.hadoop.conf.Configuration +import org.apache.parquet.column.schema.EdgeInterpolationAlgorithm import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, PrimitiveColumnIO} import org.apache.parquet.schema._ import org.apache.parquet.schema.LogicalTypeAnnotation._ @@ -31,6 +32,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.VariantMetadata import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{EdgeInterpolationAlgorithm => SparkEdgeInterpolationAlgorithm} /** * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] @@ -336,6 +338,17 @@ class ParquetToSparkSchemaConverter( case null => BinaryType case _: BsonLogicalTypeAnnotation => BinaryType case _: DecimalLogicalTypeAnnotation => makeDecimalType() + case geom: GeometryLogicalTypeAnnotation => + GeometryType(Option(geom.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS)) + case geog: GeographyLogicalTypeAnnotation => + val crs = Option(geog.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS) + val sparkAlgorithm = if (geog.getAlgorithm != null) { + SparkEdgeInterpolationAlgorithm.fromString(geog.getAlgorithm.toString) + .getOrElse(SparkEdgeInterpolationAlgorithm.SPHERICAL) + } else { + SparkEdgeInterpolationAlgorithm.SPHERICAL + } + GeographyType(crs, sparkAlgorithm) case _ => illegalType() } @@ -653,6 +666,17 @@ class SparkToParquetSchemaConverter( Types.primitive(BINARY, repetition) .as(LogicalTypeAnnotation.stringType()).named(field.name) + case geom: GeometryType => + Types.primitive(BINARY, repetition) + .as(LogicalTypeAnnotation.geometryType(geom.crs)).named(field.name) + + case geog: GeographyType => + val logicalType = LogicalTypeAnnotation.geographyType( + geog.crs, + EdgeInterpolationAlgorithm.valueOf(geog.algorithm.toString)) + Types.primitive(BINARY, repetition) + .as(logicalType).named(field.name) + case DateType => Types.primitive(INT32, repetition) .as(LogicalTypeAnnotation.dateType()).named(field.name) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 56076175d60e5..c8d866d8b2d0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -2204,6 +2204,247 @@ class ParquetSchemaSuite extends ParquetSchemaTest { Seq("f1", "key_value", "value", "g2", "list", "element", "h2")))))))) )))))) + // ============================================================== + // Tests for BINARY geospatial logical types (Geometry/Geography) + // ============================================================== + + /** Parquet to Catalyst conversion for geospatial types. */ + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation (with CRS)", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY(OGC:CRS84)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with CRS)", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY(OGC:CRS84)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with CRS and algorithm)", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY(OGC:CRS84, SPHERICAL)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation with binaryAsString", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation with binaryAsString", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true) + + /** Catalyst to Parquet conversion for geospatial types. */ + + testCatalystToParquet( + "Catalyst to Parquet - GeometryType", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Catalyst to Parquet - GeographyType", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Catalyst to Parquet - GeometryType with non-nullable field", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84"), nullable = false))), + """message root { + | required binary f1 (GEOMETRY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Catalyst to Parquet - GeographyType with non-nullable field", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), nullable = false))), + """message root { + | required binary f1 (GEOGRAPHY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + /** Round trip conversion for geospatial types. */ + + testSchema( + "Round-trip schema conversion - GeometryType", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) + + testSchema( + "Round-trip schema conversion - GeographyType", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) + + /** Complex types with geospatial types. */ + + testCatalystToParquet( + "Complex types with GeometryType - array element", + StructType(Seq( + StructField("f1", ArrayType(GeometryType("OGC:CRS84"), containsNull = true)) + )), + """message root { + | optional group f1 (LIST) { + | repeated group list { + | optional binary element (GEOMETRY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeographyType - array element", + StructType(Seq( + StructField("f1", ArrayType( + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), containsNull = true)) + )), + """message root { + | optional group f1 (LIST) { + | repeated group list { + | optional binary element (GEOGRAPHY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeometryType - nested struct", + StructType(Seq( + StructField("outer", StructType(Seq( + StructField("geom", GeometryType("OGC:CRS84")) + ))) + )), + """message root { + | optional group outer { + | optional binary geom (GEOMETRY); + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeographyType - nested struct", + StructType(Seq( + StructField("outer", StructType(Seq( + StructField("geog", GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)) + ))) + )), + """message root { + | optional group outer { + | optional binary geog (GEOGRAPHY); + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeometryType - map value", + StructType(Seq( + StructField("f1", MapType(StringType, GeometryType("OGC:CRS84"), valueContainsNull = true)) + )), + """message root { + | optional group f1 (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | optional binary value (GEOMETRY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeographyType - map value", + StructType(Seq( + StructField("f1", MapType(StringType, + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), valueContainsNull = true)) + )), + """message root { + | optional group f1 (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | optional binary value (GEOGRAPHY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + // ================================= // Tests for conversion for decimals // =================================