Skip to content

Commit f466ff0

Browse files
Added ParquetAvro tests and revised Array conversion
1 parent adc1258 commit f466ff0

4 files changed

Lines changed: 169 additions & 26 deletions

File tree

project/SparkBuild.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,9 @@ object SparkBuild extends Build {
503503
libraryDependencies ++= Seq(
504504
"com.twitter" % "parquet-column" % parquetVersion,
505505
"com.twitter" % "parquet-hadoop" % parquetVersion,
506+
"com.twitter" % "parquet-avro" % parquetVersion % "test",
507+
// here we need version >= 1.7.5 because of AVRO-1274
508+
"org.apache.avro" % "avro" % "1.7.6" % "test"
506509
"com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
507510
),
508511
initialCommands in console :=

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ private[parquet] object CatalystConverter {
3232
// The type internally used for fields
3333
type FieldType = StructField
3434

35-
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`)
36-
val ARRAY_ELEMENTS_SCHEMA_NAME = "values"
35+
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
36+
// Note that "array" for the array elements is chosen by ParquetAvro.
37+
// Using a different value will result in Parquet silently dropping columns.
38+
val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
3739
val MAP_KEY_SCHEMA_NAME = "key"
3840
val MAP_VALUE_SCHEMA_NAME = "value"
3941
val MAP_SCHEMA_NAME = "map"

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,10 @@ private[sql] object ParquetTestData {
121121
|message AddressBook {
122122
|required binary owner;
123123
|optional group ownerPhoneNumbers {
124-
|repeated binary values;
124+
|repeated binary array;
125125
|}
126126
|optional group contacts {
127-
|repeated group values {
127+
|repeated group array {
128128
|required binary name;
129129
|optional binary phoneNumber;
130130
|}
@@ -139,18 +139,18 @@ private[sql] object ParquetTestData {
139139
|required int32 firstInt;
140140
|optional int32 secondInt;
141141
|optional group longs {
142-
|repeated int64 values;
142+
|repeated int64 array;
143143
|}
144144
|required group entries {
145-
|repeated group values {
145+
|repeated group array {
146146
|required double value;
147147
|optional boolean truth;
148148
|}
149149
|}
150150
|optional group outerouter {
151-
|repeated group values {
152-
|repeated group values {
153-
|repeated int32 values;
151+
|repeated group array {
152+
|repeated group array {
153+
|repeated int32 array;
154154
|}
155155
|}
156156
|}
@@ -162,10 +162,10 @@ private[sql] object ParquetTestData {
162162
|message TestNested3 {
163163
|required int32 x;
164164
|optional group booleanNumberPairs {
165-
|repeated group values {
165+
|repeated group array {
166166
|required int32 key;
167167
|optional group value {
168-
|repeated group values {
168+
|repeated group array {
169169
|required double nestedValue;
170170
|optional boolean truth;
171171
|}
@@ -273,9 +273,9 @@ private[sql] object ParquetTestData {
273273
val r1 = new SimpleGroup(schema)
274274
r1.add(0, "Julien Le Dem")
275275
r1.addGroup(1)
276-
.append("values", "555 123 4567")
277-
.append("values", "555 666 1337")
278-
.append("values", "XXX XXX XXXX")
276+
.append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "555 123 4567")
277+
.append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "555 666 1337")
278+
.append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "XXX XXX XXXX")
279279
val contacts = r1.addGroup(2)
280280
contacts.addGroup(0)
281281
.append("name", "Dmitriy Ryaboy")
@@ -398,10 +398,10 @@ private[sql] object ParquetTestData {
398398
val fs: FileSystem = path.getFileSystem(configuration)
399399
val schema: MessageType = MessageTypeParser.parseMessageType(schemaString)
400400
assert(schema != null)
401-
val outputStatus: FileStatus = fs.getFileStatus(path)
401+
val outputStatus: FileStatus = fs.getFileStatus(new Path(path.toString))
402402
val footers = ParquetFileReader.readFooter(configuration, outputStatus)
403403
assert(footers != null)
404-
val reader = new ParquetReader(path, new GroupReadSupport())
404+
val reader = new ParquetReader(new Path(path.toString), new GroupReadSupport())
405405
val first = reader.read()
406406
assert(first != null)
407407
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 148 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ package org.apache.spark.sql.parquet
1919

2020
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
2121

22+
import org.apache.avro.{SchemaBuilder, Schema}
23+
import org.apache.avro.generic.{GenericData, GenericRecord}
24+
2225
import org.apache.hadoop.fs.{Path, FileSystem}
2326
import org.apache.hadoop.mapreduce.Job
2427

28+
import parquet.avro.AvroParquetWriter
2529
import parquet.hadoop.ParquetFileWriter
2630
import parquet.hadoop.util.ContextUtil
2731
import parquet.schema.MessageTypeParser
@@ -34,11 +38,12 @@ import org.apache.spark.sql.SchemaRDD
3438
import org.apache.spark.sql.catalyst.expressions._
3539
import org.apache.spark.sql.catalyst.types.IntegerType
3640
import org.apache.spark.util.Utils
41+
import org.apache.spark.sql.SchemaRDD
42+
import org.apache.spark.sql.catalyst.util.getTempFilePath
43+
import org.apache.spark.sql.catalyst.expressions.Row
3744
import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
38-
import org.apache.spark.sql.{parquet, SchemaRDD}
3945
import org.apache.spark.sql.catalyst.expressions.AttributeReference
40-
import scala.Tuple2
41-
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
46+
import org.apache.spark.util.Utils
4247

4348
// Implicits
4449
import org.apache.spark.sql.test.TestSQLContext._
@@ -398,9 +403,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
398403

399404
test("Importing nested Parquet file (Addressbook)") {
400405
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
401-
ParquetTestData.readNestedFile(
402-
ParquetTestData.testNestedDir1,
403-
ParquetTestData.testNestedSchema1)
404406
val result = TestSQLContext
405407
.parquetFile(ParquetTestData.testNestedDir1.toString)
406408
.toSchemaRDD
@@ -426,9 +428,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
426428

427429
test("Importing nested Parquet file (nested numbers)") {
428430
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
429-
ParquetTestData.readNestedFile(
430-
ParquetTestData.testNestedDir2,
431-
ParquetTestData.testNestedSchema2)
432431
val result = TestSQLContext
433432
.parquetFile(ParquetTestData.testNestedDir2.toString)
434433
.toSchemaRDD
@@ -602,6 +601,145 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
602601
Utils.deleteRecursively(tmpdir)
603602
}
604603

604+
test("Importing data generated with Avro") {
605+
val tmpdir = Utils.createTempDir()
606+
val file: File = new File(tmpdir, "test.avro")
607+
608+
val primitiveArrayType: Schema = SchemaBuilder.array.items.intType
609+
val complexArrayType: Schema = SchemaBuilder.array.items.map.values.stringType
610+
val primitiveMapType: Schema = SchemaBuilder.map.values.booleanType
611+
val complexMapType: Schema = SchemaBuilder.map.values.array.items.floatType
612+
val schema: Schema = SchemaBuilder
613+
.record("TestRecord")
614+
.namespace("")
615+
.fields
616+
.name("testInt")
617+
.`type`.
618+
intType
619+
.noDefault
620+
.name("testDouble")
621+
.`type`
622+
.doubleType
623+
.noDefault
624+
.name("testString")
625+
.`type`
626+
.nullable
627+
.stringType
628+
.stringDefault("")
629+
.name("testPrimitiveArray")
630+
.`type`(primitiveArrayType)
631+
.noDefault
632+
.name("testComplexArray")
633+
.`type`(complexArrayType)
634+
.noDefault
635+
.name("testPrimitiveMap")
636+
.`type`(primitiveMapType)
637+
.noDefault
638+
.name("testComplexMap")
639+
.`type`(complexMapType)
640+
.noDefault
641+
.endRecord
642+
643+
val record1: GenericRecord = new GenericData.Record(schema)
644+
645+
// primitive fields
646+
record1.put("testInt", 256)
647+
record1.put("testDouble", 0.5)
648+
record1.put("testString", "foo")
649+
650+
val primitiveArrayData = new GenericData.Array[Integer](10, primitiveArrayType)
651+
val complexArrayData: GenericData.Array[java.util.Map[String, String]] =
652+
new GenericData.Array[java.util.Map[String, String]](10, SchemaBuilder.array.items.map.values.stringType)
653+
654+
// two arrays: one primitive (array of ints), one complex (array of string->string maps)
655+
primitiveArrayData.add(1)
656+
primitiveArrayData.add(2)
657+
primitiveArrayData.add(3)
658+
val map1 = new java.util.HashMap[String, String]
659+
map1.put("key11", "data11")
660+
map1.put("key12", "data12")
661+
val map2 = new java.util.HashMap[String, String]
662+
map2.put("key21", "data21")
663+
map2.put("key22", "data22")
664+
complexArrayData.add(0, map1)
665+
complexArrayData.add(1, map2)
666+
667+
record1.put("testPrimitiveArray", primitiveArrayData)
668+
record1.put("testComplexArray", complexArrayData)
669+
670+
// two maps: one primitive (string->boolean), one complex (string->array of floats)
671+
val primitiveMap = new java.util.HashMap[String, Boolean](10)
672+
primitiveMap.put("key1", true)
673+
primitiveMap.put("key2", false)
674+
val complexMap = new java.util.HashMap[String, GenericData.Array[Float]](10)
675+
val value1: GenericData.Array[Float] = new GenericData.Array[Float](10, SchemaBuilder.array.items.floatType)
676+
value1.add(0.1f)
677+
value1.add(0.2f)
678+
value1.add(0.3f)
679+
complexMap.put("compKey1", value1)
680+
val value2: GenericData.Array[Float] = new GenericData.Array[Float](10, SchemaBuilder.array.items.floatType)
681+
value2.add(1.1f)
682+
value2.add(1.2f)
683+
value2.add(1.3f)
684+
complexMap.put("compKey2", value2)
685+
686+
record1.put("testPrimitiveMap", primitiveMap)
687+
record1.put("testComplexMap", complexMap)
688+
689+
// TODO: test array or map with value type Avro record
690+
691+
val writer = new AvroParquetWriter[GenericRecord](new Path(file.toString), schema)
692+
writer.write(record1)
693+
writer.close()
694+
695+
val data = TestSQLContext
696+
.parquetFile(tmpdir.toString)
697+
.toSchemaRDD
698+
data.registerAsTable("avroTable")
699+
val resultPrimitives = sql("SELECT testInt, testDouble, testString FROM avroTable").collect()
700+
assert(resultPrimitives(0)(0) === 256)
701+
assert(resultPrimitives(0)(1) === 0.5)
702+
assert(resultPrimitives(0)(2) === "foo")
703+
val resultPrimitiveArray = sql("SELECT testPrimitiveArray FROM avroTable").collect()
704+
assert(resultPrimitiveArray(0)(0).asInstanceOf[Row](0) === 1)
705+
assert(resultPrimitiveArray(0)(0).asInstanceOf[Row](1) === 2)
706+
assert(resultPrimitiveArray(0)(0).asInstanceOf[Row](2) === 3)
707+
val resultComplexArray = sql("SELECT testComplexArray FROM avroTable").collect()
708+
assert(resultComplexArray(0)(0).asInstanceOf[Row].size === 2)
709+
assert(
710+
resultComplexArray(0)(0)
711+
.asInstanceOf[Row]
712+
.apply(0)
713+
.asInstanceOf[Map[String, String]].get("key11").get.equals("data11"))
714+
assert(
715+
resultComplexArray(0)(0)
716+
.asInstanceOf[Row]
717+
.apply(1)
718+
.asInstanceOf[Map[String, String]].get("key22").get.equals("data22"))
719+
val resultPrimitiveMap = sql("SELECT testPrimitiveMap FROM avroTable").collect()
720+
assert(
721+
resultPrimitiveMap(0)(0)
722+
.asInstanceOf[Map[String, Boolean]].get("key1").get === true)
723+
assert(
724+
resultPrimitiveMap(0)(0)
725+
.asInstanceOf[Map[String, Boolean]].get("key2").get === false)
726+
val resultComplexMap = sql("SELECT testComplexMap FROM avroTable").collect()
727+
val mapResult1 =
728+
resultComplexMap(0)(0)
729+
.asInstanceOf[Map[String, Row]]
730+
.get("compKey1")
731+
.get
732+
val mapResult2 =
733+
resultComplexMap(0)(0)
734+
.asInstanceOf[Map[String, Row]]
735+
.get("compKey2")
736+
.get
737+
assert(mapResult1(0) === 0.1f)
738+
assert(mapResult1(2) === 0.3f)
739+
assert(mapResult2(0) === 1.1f)
740+
assert(mapResult2(2) === 1.3f)
741+
}
742+
605743
/**
606744
* Creates an empty SchemaRDD backed by a ParquetRelation.
607745
*
@@ -613,6 +751,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
613751
val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
614752
new SchemaRDD(
615753
TestSQLContext,
616-
parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
754+
ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
617755
}
618756
}

0 commit comments

Comments
 (0)