@@ -35,6 +35,20 @@ case class ParquetData(intField: Int, stringField: String)
3535// The data that also includes the partitioning key
3636case class ParquetDataWithKey (p : Int , intField : Int , stringField : String )
3737
38+ case class StructContainer (intStructField : Int , stringStructField : String )
39+
40+ case class ParquetDataWithComplexTypes (
41+ intField : Int ,
42+ stringField : String ,
43+ structField : StructContainer ,
44+ arrayField : Seq [Int ])
45+
46+ case class ParquetDataWithKeyAndComplexTypes (
47+ p : Int ,
48+ intField : Int ,
49+ stringField : String ,
50+ structField : StructContainer ,
51+ arrayField : Seq [Int ])
3852
3953/**
4054 * A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -85,6 +99,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
8599 location ' ${new File (normalTableDir, " normal" ).getCanonicalPath}'
86100 """ )
87101
102+ sql(s """
103+ CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
104+ (
105+ intField INT,
106+ stringField STRING,
107+ structField STRUCT<intStructField: INT, stringStructField: STRING>,
108+ arrayField ARRAY<INT>
109+ )
110+ PARTITIONED BY (p int)
111+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
112+ STORED AS
113+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
114+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
115+ LOCATION ' ${partitionedTableDirWithComplexTypes.getCanonicalPath}'
116+ """ )
117+
118+ sql(s """
119+ CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
120+ (
121+ intField INT,
122+ stringField STRING,
123+ structField STRUCT<intStructField: INT, stringStructField: STRING>,
124+ arrayField ARRAY<INT>
125+ )
126+ PARTITIONED BY (p int)
127+ ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
128+ STORED AS
129+ INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
130+ OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
131+ LOCATION ' ${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
132+ """ )
133+
88134 (1 to 10 ).foreach { p =>
89135 sql(s " ALTER TABLE partitioned_parquet ADD PARTITION (p= $p) " )
90136 }
@@ -93,7 +139,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
93139 sql(s " ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p= $p) " )
94140 }
95141
96- val rdd1 = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i, "b":"str ${i}"} """ ))
142+ (1 to 10 ).foreach { p =>
143+ sql(s " ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p= $p) " )
144+ }
145+
146+ (1 to 10 ).foreach { p =>
147+ sql(s " ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p= $p) " )
148+ }
149+
150+ val rdd1 = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a": $i, "b":"str $i"} """ ))
97151 jsonRDD(rdd1).registerTempTable(" jt" )
98152 val rdd2 = sparkContext.parallelize((1 to 10 ).map(i => s """ {"a":[ $i, null]} """ ))
99153 jsonRDD(rdd2).registerTempTable(" jt_array" )
@@ -104,6 +158,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
104158 override def afterAll (): Unit = {
105159 sql(" DROP TABLE partitioned_parquet" )
106160 sql(" DROP TABLE partitioned_parquet_with_key" )
161+ sql(" DROP TABLE partitioned_parquet_with_complextypes" )
162+ sql(" DROP TABLE partitioned_parquet_with_key_and_complextypes" )
107163 sql(" DROP TABLE normal_parquet" )
108164 sql(" DROP TABLE IF EXISTS jt" )
109165 sql(" DROP TABLE IF EXISTS jt_array" )
@@ -408,6 +464,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
408464 path ' ${new File (partitionedTableDir, " p=1" ).getCanonicalPath}'
409465 )
410466 """ )
467+
468+ sql( s """
469+ CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
470+ USING org.apache.spark.sql.parquet
471+ OPTIONS (
472+ path ' ${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
473+ )
474+ """ )
475+
476+ sql( s """
477+ CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
478+ USING org.apache.spark.sql.parquet
479+ OPTIONS (
480+ path ' ${partitionedTableDirWithComplexTypes.getCanonicalPath}'
481+ )
482+ """ )
411483 }
412484}
413485
@@ -446,7 +518,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
446518 var partitionedTableDir : File = null
447519 var normalTableDir : File = null
448520 var partitionedTableDirWithKey : File = null
449-
521+ var partitionedTableDirWithComplexTypes : File = null
522+ var partitionedTableDirWithKeyAndComplexTypes : File = null
450523
451524 override def beforeAll (): Unit = {
452525 partitionedTableDir = File .createTempFile(" parquettests" , " sparksql" )
@@ -482,9 +555,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
482555 .toDF()
483556 .saveAsParquetFile(partDir.getCanonicalPath)
484557 }
558+
559+ partitionedTableDirWithKeyAndComplexTypes = File .createTempFile(" parquettests" , " sparksql" )
560+ partitionedTableDirWithKeyAndComplexTypes.delete()
561+ partitionedTableDirWithKeyAndComplexTypes.mkdir()
562+
563+ (1 to 10 ).foreach { p =>
564+ val partDir = new File (partitionedTableDirWithKeyAndComplexTypes, s " p= $p" )
565+ sparkContext.makeRDD(1 to 10 ).map { i =>
566+ ParquetDataWithKeyAndComplexTypes (
567+ p, i, s " part- $p" , StructContainer (i, f " ${i}_string " ), 1 to i)
568+ }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
569+ }
570+
571+ partitionedTableDirWithComplexTypes = File .createTempFile(" parquettests" , " sparksql" )
572+ partitionedTableDirWithComplexTypes.delete()
573+ partitionedTableDirWithComplexTypes.mkdir()
574+
575+ (1 to 10 ).foreach { p =>
576+ val partDir = new File (partitionedTableDirWithComplexTypes, s " p= $p" )
577+ sparkContext.makeRDD(1 to 10 ).map { i =>
578+ ParquetDataWithComplexTypes (i, s " part- $p" , StructContainer (i, f " ${i}_string " ), 1 to i)
579+ }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
580+ }
581+ }
582+
583+ override protected def afterAll (): Unit = {
584+ partitionedTableDir.delete()
585+ normalTableDir.delete()
586+ partitionedTableDirWithKey.delete()
587+ partitionedTableDirWithComplexTypes.delete()
588+ partitionedTableDirWithKeyAndComplexTypes.delete()
485589 }
486590
487- Seq (" partitioned_parquet" , " partitioned_parquet_with_key" ).foreach { table =>
591+ Seq (
592+ " partitioned_parquet" ,
593+ " partitioned_parquet_with_key" ,
594+ " partitioned_parquet_with_complextypes" ,
595+ " partitioned_parquet_with_key_and_complextypes" ).foreach { table =>
596+
488597 test(s " ordering of the partitioning columns $table" ) {
489598 checkAnswer(
490599 sql(s " SELECT p, stringField FROM $table WHERE p = 1 " ),
@@ -574,6 +683,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
574683 }
575684 }
576685
686+ Seq (
687+ " partitioned_parquet_with_key_and_complextypes" ,
688+ " partitioned_parquet_with_complextypes" ).foreach { table =>
689+
690+ test(s " SPARK-5775 read struct from $table" ) {
691+ checkAnswer(
692+ sql(s " SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1 " ),
693+ (1 to 10 ).map(i => Row (1 , i, f " ${i}_string " )))
694+ }
695+
696+ // Re-enable this after SPARK-5508 is fixed
697+ ignore(s " SPARK-5775 read array from $table" ) {
698+ checkAnswer(
699+ sql(s " SELECT arrayField, p FROM $table WHERE p = 1 " ),
700+ (1 to 10 ).map(i => Row (1 to i, 1 )))
701+ }
702+ }
703+
704+
577705 test(" non-part select(*)" ) {
578706 checkAnswer(
579707 sql(" SELECT COUNT(*) FROM normal_parquet" ),
0 commit comments