From fece98d76e806b9f88c9e67ccfc70114a074396a Mon Sep 17 00:00:00 2001 From: Uros Bojanic Date: Wed, 28 Jan 2026 18:51:56 +0100 Subject: [PATCH 1/4] Initial commit --- .../parquet/ParquetSchemaConverter.scala | 24 ++ .../parquet/ParquetSchemaSuite.scala | 209 ++++++++++++++++++ 2 files changed, 233 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 9e6f4447ca792..183a45f1cf23e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale import org.apache.hadoop.conf.Configuration +import org.apache.parquet.column.schema.EdgeInterpolationAlgorithm import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, PrimitiveColumnIO} import org.apache.parquet.schema._ import org.apache.parquet.schema.LogicalTypeAnnotation._ @@ -31,6 +32,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.VariantMetadata import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{EdgeInterpolationAlgorithm => SparkEdgeInterpolationAlgorithm} /** * This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]] @@ -336,6 +338,17 @@ class ParquetToSparkSchemaConverter( case null => BinaryType case _: BsonLogicalTypeAnnotation => BinaryType case _: DecimalLogicalTypeAnnotation => makeDecimalType() + case geom: GeometryLogicalTypeAnnotation => + GeometryType(Option(geom.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS)) + case geog: GeographyLogicalTypeAnnotation => + val crs = Option(geog.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS) + val sparkAlgorithm = if (geog.getAlgorithm != null) { + SparkEdgeInterpolationAlgorithm.fromString(geog.getAlgorithm.toString) + .getOrElse(SparkEdgeInterpolationAlgorithm.SPHERICAL) + } else { + SparkEdgeInterpolationAlgorithm.SPHERICAL + } + GeographyType(crs, sparkAlgorithm) case _ => illegalType() } @@ -653,6 +666,17 @@ class SparkToParquetSchemaConverter( Types.primitive(BINARY, repetition) .as(LogicalTypeAnnotation.stringType()).named(field.name) + case geom: GeometryType => + Types.primitive(BINARY, repetition) + .as(LogicalTypeAnnotation.geometryType(geom.crs)).named(field.name) + + case geog: GeographyType => + val logicalType = LogicalTypeAnnotation.geographyType( + geog.crs, + EdgeInterpolationAlgorithm.valueOf(geog.algorithm.toString)) + Types.primitive(BINARY, repetition) + .as(logicalType).named(field.name) + case DateType => Types.primitive(INT32, repetition) .as(LogicalTypeAnnotation.dateType()).named(field.name) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 56076175d60e5..53d5316fe0e82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -2204,6 +2204,215 @@ class ParquetSchemaSuite extends ParquetSchemaTest { Seq("f1", "key_value", "value", "g2", "list", "element", "h2")))))))) )))))) + // ============================================================== + // Tests for BINARY geospatial logical types (Geometry/Geography) + // ============================================================== + + /** Parquet to Catalyst conversion for geospatial types. */ + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation with binaryAsString", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation with binaryAsString", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true) + + /** Catalyst to Parquet conversion for geospatial types. */ + + testCatalystToParquet( + "Catalyst to Parquet - GeometryType", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Catalyst to Parquet - GeographyType", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Catalyst to Parquet - GeometryType with non-nullable field", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84"), nullable = false))), + """message root { + | required binary f1 (GEOMETRY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Catalyst to Parquet - GeographyType with non-nullable field", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), nullable = false))), + """message root { + | required binary f1 (GEOGRAPHY); + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + /** Round trip conversion for geospatial types. */ + + testSchema( + "Round-trip schema conversion - GeometryType", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) + + testSchema( + "Round-trip schema conversion - GeographyType", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) + + /** Complex types with geospatial types. */ + + testCatalystToParquet( + "Complex types with GeometryType - array element", + StructType(Seq( + StructField("f1", ArrayType(GeometryType("OGC:CRS84"), containsNull = true)) + )), + """message root { + | optional group f1 (LIST) { + | repeated group list { + | optional binary element (GEOMETRY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeographyType - array element", + StructType(Seq( + StructField("f1", ArrayType( + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), containsNull = true)) + )), + """message root { + | optional group f1 (LIST) { + | repeated group list { + | optional binary element (GEOGRAPHY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeometryType - nested struct", + StructType(Seq( + StructField("outer", StructType(Seq( + StructField("geom", GeometryType("OGC:CRS84")) + ))) + )), + """message root { + | optional group outer { + | optional binary geom (GEOMETRY); + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeographyType - nested struct", + StructType(Seq( + StructField("outer", StructType(Seq( + StructField("geog", GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)) + ))) + )), + """message root { + | optional group outer { + | optional binary geog (GEOGRAPHY); + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeometryType - map value", + StructType(Seq( + StructField("f1", MapType(StringType, GeometryType("OGC:CRS84"), valueContainsNull = true)) + )), + """message root { + | optional group f1 (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | optional binary value (GEOMETRY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + + testCatalystToParquet( + "Complex types with GeographyType - map value", + StructType(Seq( + StructField("f1", MapType(StringType, + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL), valueContainsNull = true)) + )), + """message root { + | optional group f1 (MAP) { + | repeated group key_value { + | required binary key (UTF8); + | optional binary value (GEOGRAPHY); + | } + | } + |} + """.stripMargin, + writeLegacyParquetFormat = false) + // ================================= // Tests for conversion for decimals // ================================= From 2e6a8ad8632210e10964021704677f2bb41ae1a9 Mon Sep 17 00:00:00 2001 From: Uros Bojanic Date: Thu, 29 Jan 2026 08:45:45 +0100 Subject: [PATCH 2/4] Add test --- .../parquet/ParquetSchemaSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 53d5316fe0e82..982f074b86650 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -21,9 +21,12 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import org.apache.parquet.column.ColumnDescriptor +import org.apache.parquet.column.schema.{EdgeInterpolationAlgorithm => ParquetEdgeInterpolationAlgorithm} import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.schema._ +import org.apache.parquet.schema.LogicalTypeAnnotation.geographyType import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException @@ -2252,6 +2255,29 @@ class ParquetSchemaSuite extends ParquetSchemaTest { binaryAsString = true, int96AsTimestamp = true) + test("Parquet to Catalyst - GEOGRAPHY with explicit algorithm") { + val parquetSchema = Types.buildMessage() + .addField( + Types.primitive(BINARY, Repetition.OPTIONAL) + .as(geographyType("OGC:CRS84", ParquetEdgeInterpolationAlgorithm.SPHERICAL)) + .named("f1")) + .named("root") + + val converter = new ParquetToSparkSchemaConverter( + assumeBinaryIsString = false, + assumeInt96IsTimestamp = true) + val actualParquetColumn = converter.convertParquetColumn(parquetSchema) + val actual = actualParquetColumn.sparkType + + val expected = StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))) + assert(actual === expected, + s"""Schema mismatch. + |Expected schema: ${expected.json} + |Actual schema: ${actual.json} + """.stripMargin) + } + /** Catalyst to Parquet conversion for geospatial types. */ testCatalystToParquet( From 4952ed826a145f8937d0a558b4afe471df756744 Mon Sep 17 00:00:00 2001 From: Uros Bojanic Date: Thu, 29 Jan 2026 08:51:08 +0100 Subject: [PATCH 3/4] Cleanup --- .../datasources/parquet/ParquetSchemaSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 982f074b86650..72e74b82a751f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -24,9 +24,8 @@ import org.apache.parquet.column.ColumnDescriptor import org.apache.parquet.column.schema.{EdgeInterpolationAlgorithm => ParquetEdgeInterpolationAlgorithm} import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.schema._ -import org.apache.parquet.schema.LogicalTypeAnnotation.geographyType +import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException @@ -2258,8 +2257,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { test("Parquet to Catalyst - GEOGRAPHY with explicit algorithm") { val parquetSchema = Types.buildMessage() .addField( - Types.primitive(BINARY, Repetition.OPTIONAL) - .as(geographyType("OGC:CRS84", ParquetEdgeInterpolationAlgorithm.SPHERICAL)) + Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.geographyType("OGC:CRS84", + ParquetEdgeInterpolationAlgorithm.SPHERICAL)) .named("f1")) .named("root") From 481c752f4abf14cb68019e7303481d5343a82a65 Mon Sep 17 00:00:00 2001 From: Uros Bojanic Date: Thu, 29 Jan 2026 09:04:57 +0100 Subject: [PATCH 4/4] Update tests --- .../parquet/ParquetSchemaSuite.scala | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 72e74b82a751f..c8d866d8b2d0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -21,10 +21,8 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import org.apache.parquet.column.ColumnDescriptor -import org.apache.parquet.column.schema.{EdgeInterpolationAlgorithm => ParquetEdgeInterpolationAlgorithm} import org.apache.parquet.io.ParquetDecodingException import org.apache.parquet.schema._ -import org.apache.parquet.schema.LogicalTypeAnnotation import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ @@ -2222,6 +2220,16 @@ class ParquetSchemaSuite extends ParquetSchemaTest { binaryAsString = false, int96AsTimestamp = true) + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation (with CRS)", + StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), + """message root { + | optional binary f1 (GEOMETRY(OGC:CRS84)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + testParquetToCatalyst( "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation", StructType(Seq(StructField("f1", @@ -2233,6 +2241,28 @@ class ParquetSchemaSuite extends ParquetSchemaTest { binaryAsString = false, int96AsTimestamp = true) + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with CRS)", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY(OGC:CRS84)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + + testParquetToCatalyst( + "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with CRS and algorithm)", + StructType(Seq(StructField("f1", + GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))), + """message root { + | optional binary f1 (GEOGRAPHY(OGC:CRS84, SPHERICAL)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true) + testParquetToCatalyst( "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation with binaryAsString", StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))), @@ -2254,30 +2284,6 @@ class ParquetSchemaSuite extends ParquetSchemaTest { binaryAsString = true, int96AsTimestamp = true) - test("Parquet to Catalyst - GEOGRAPHY with explicit algorithm") { - val parquetSchema = Types.buildMessage() - .addField( - Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) - .as(LogicalTypeAnnotation.geographyType("OGC:CRS84", - ParquetEdgeInterpolationAlgorithm.SPHERICAL)) - .named("f1")) - .named("root") - - val converter = new ParquetToSparkSchemaConverter( - assumeBinaryIsString = false, - assumeInt96IsTimestamp = true) - val actualParquetColumn = converter.convertParquetColumn(parquetSchema) - val actual = actualParquetColumn.sparkType - - val expected = StructType(Seq(StructField("f1", - GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))) - assert(actual === expected, - s"""Schema mismatch. - |Expected schema: ${expected.json} - |Actual schema: ${actual.json} - """.stripMargin) - } - /** Catalyst to Parquet conversion for geospatial types. */ testCatalystToParquet(