-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24855][SQL][EXTERNAL]: Built-in AVRO support should support specified schema on write #21847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
add to whitelist |
|
Test build #93453 has finished for PR 21847 at commit
|
30fc1ae to
71dbc39
Compare
|
Test build #93457 has finished for PR 21847 at commit
|
71dbc39 to
12b3859
Compare
|
Test build #93464 has finished for PR 21847 at commit
|
12b3859 to
033f4dd
Compare
|
Test build #93466 has finished for PR 21847 at commit
|
033f4dd to
f05e67e
Compare
|
Test build #93565 has finished for PR 21847 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the change here related to specifying schema on write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I need the original rootCatalystType so I can determine how those types map back to a user-specified schema. Simply having nullable as context is not sufficient :(
e31100d to
6f686d6
Compare
|
Test build #93604 has finished for PR 21847 at commit
|
6f686d6 to
7e44ca0
Compare
|
Test build #93606 has finished for PR 21847 at commit
|
|
+cc @MaxGekk and @gengliangwang who worked on this part of codebase. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can save foldLeft here?
catalystType.asInstanceOf[StructType].zip(avroFields.asScala).forall {
case (f1, f2) => typeMatchesSchema(f1.dataType, f2.schema)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not caused by this PR, but we better explain what this method does. Can you add a comment for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible nullable == false but avroType.getType == Type.UNION?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've converted this into a match statement that covers the four cases:
- nullable == false and Type.UNION => should "resolve" the union to the appropriate type
- nullable == true and Type.UNION => should "resolve" the union to the appropriate type
- nullable == Any and Any (Type) => just return the Type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark is already available from SharedSQLContext. You don't need to pass it to the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you actually check schemas? checkAnswer does collect of newDf, and check content of 2 dataframes. I am asking because method name forceSchemaCheck confuses slightly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed from forceSchemaCheck to checkSpecifySchemaOnWrite. I think thats a bit more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you return false instead of crashing in the assert in the case of different sizes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map{...}.foldLeft(true)(_ && _) -> forall{...}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment why for DecimalType you pass Type.STRING
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional work to support some logical types would probably be in order for this to work correctly. Prior to my change, AvroSerializer always wrote out DecimalType as a string, so I'm just keeping the existing behavior. I added a comment to reflect this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope you didn't expose your internal info.
8d7c520 to
d85242e
Compare
|
Test build #93619 has finished for PR 21847 at commit
|
|
Test build #93621 has finished for PR 21847 at commit
|
|
Test build #93603 has finished for PR 21847 at commit
|
621be3e to
ed44c76
Compare
|
Test build #93625 has finished for PR 21847 at commit
|
ed44c76 to
c76aadd
Compare
|
Test build #93626 has finished for PR 21847 at commit
|
|
FYI Doing some additional testing around performance and have found a pretty gnarly regression with a particular type of schema. I'll try to track down what's causing it. |
|
Test build #94460 has finished for PR 21847 at commit
|
| new Schema.Parser().parse(expectedAvroSchema)) | ||
| } | ||
|
|
||
| def getAvroSchemaStringFromFiles(filePath: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dbtsai Should we update the test L980 "Validate namespace in avro file that has nested records with the same name" to use this, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. I'll address it in the next push.
|
@cloud-fan @gatorsmile @gengliangwang We need this to write avro files from Spark for other applications to consume because we use the |
|
Test build #94478 has finished for PR 21847 at commit
|
| (getter, ordinal) => new Utf8(getter.getUTF8String(ordinal).getBytes) | ||
| } | ||
| case BinaryType => avroType.getType match { | ||
| case Type.FIXED => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIXED has a "size" attribute, shall we consider it when preparing the bytes? e.g. shall we throw exception if the bytes from Spark exceed the size, and shall we padding the bytes when its length is smaller than the size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw an exception if the size of the bytes to be written is different from the "size" attribute without doing the padding thing.
BTW, avro doesn't handle this well. If the size to be written is larger than the "size" attribute, avro will silently cause data corruption.
| checkAvroSchemaEquals(avroSchema, getAvroSchemaStringFromFiles(tempSaveDir)) | ||
|
|
||
| // Writing df containing data not in the enum will throw an exception | ||
| intercept[SparkException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also check the error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking to do a followup PR on this. What's happening is that avro builds a map of key to indices, and avro looks up the table to get the index. In avro, they don't do any check, so the exception is null pointer exception when looking up non-existent key.
Since I think checking a message against null pointer exception is fragile, I decide to leave it to a followup PR.
|
LGTM except one concern about error checking for fixed type. Thanks for working on it! |
|
Test build #94522 has finished for PR 21847 at commit
|
|
Test build #94523 has finished for PR 21847 at commit
|
|
Test build #94524 has finished for PR 21847 at commit
|
| if (!enumSymbols.contains(data)) { | ||
| throw new IncompatibleSchemaException( | ||
| "Cannot write \"" + data + "\" since it's not defined in enum \"" + | ||
| enumSymbols.mkString("\", \"")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: enumSymbols.mkString("\", \"") + "\"")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
| if (data.length != size) { | ||
| throw new IncompatibleSchemaException( | ||
| s"Cannot write ${data.length} ${if (data.length > 1) "bytes" else "byte"} of " + | ||
| s"binary data into FIXED Type with size of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "binary data into FIXED Type with size of ".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
| val df = spark.createDataFrame(dfWithNull.na.drop().rdd, | ||
| StructType(Seq(StructField("Suit", StringType, false)))) | ||
|
|
||
| val tempSaveDir = s"$tempDir/save1/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems can still use save?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
|
Few minor comments. LGTM. |
|
Test build #94531 has finished for PR 21847 at commit
|
|
LGTM |
|
Thanks all. Merged into master. |
The mapping of Spark schema to Avro schema is many-to-many. (See https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion) The default schema mapping might not be exactly what users want. For example, by default, a "string" column is always written as "string" Avro type, but users might want to output the column as "enum" Avro type. With PR apache#21847, Spark supports user-specified schema in the batch writer. For the function `to_avro`, we should support user-specified output schema as well. Unit test. Closes apache#25419 from gengliangwang/to_avro. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 48adc91)
The mapping of Spark schema to Avro schema is many-to-many. (See https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-spark-sql---avro-conversion) The default schema mapping might not be exactly what users want. For example, by default, a "string" column is always written as "string" Avro type, but users might want to output the column as "enum" Avro type. With PR apache#21847, Spark supports user-specified schema in the batch writer. For the function `to_avro`, we should support user-specified output schema as well. Unit test. Closes apache#25419 from gengliangwang/to_avro. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 48adc91)
When lindblombr at apple developed [SPARK-24855](apache#21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset. With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further. Spark 2.4 ``` spark git:(master) ./build/mvn -DskipTests clean package spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar ``` Spark 2.3 + databricks avro ``` spark git:(branch-2.3) ./build/mvn -DskipTests clean package spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ``` Current master: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.95621| | stddev|0.030895815479469294| | min| 2.915| | max| 3.049| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.31072999999999995| | stddev|0.054139709842390006| | min| 0.259| | max| 0.692| +-------+--------------------+ ``` Current master with this PR: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.5804300000000002| | stddev|0.011175600225672079| | min| 2.558| | max| 2.62| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.29922000000000004| | stddev|0.058261961532514166| | min| 0.251| | max| 0.732| +-------+--------------------+ ``` Spark 2.3 + databricks avro: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 1.7730500000000005| | stddev|0.025199156230863575| | min| 1.729| | max| 1.833| +-------+--------------------+ +-------+-------------------+ |summary| readTimes| +-------+-------------------+ | count| 100| | mean| 0.29715| | stddev|0.05685643358850465| | min| 0.258| | max| 0.718| +-------+-------------------+ ``` The following is the test code to reproduce the result. ```scala spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") val sparkSession = spark import sparkSession.implicits._ val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid => val features = Array.fill(16000)(scala.math.random) (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features) }.toDF("uid", "random", "uuid1", "uuid2", "features").cache() val size = df.count() // Write into ramdisk to rule out the disk IO impact val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/" val n = 150 val writeTimes = new Array[Double](n) var i = 0 while (i < n) { val t1 = System.currentTimeMillis() df.write .format("com.databricks.spark.avro") .mode("overwrite") .save(tempSaveDir) val t2 = System.currentTimeMillis() writeTimes(i) = (t2 - t1) / 1000.0 i += 1 } df.unpersist() // The first 50 runs are for warm-up val readTimes = new Array[Double](n) i = 0 while (i < n) { val t1 = System.currentTimeMillis() val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir) assert(readDF.count() == size) val t2 = System.currentTimeMillis() readTimes(i) = (t2 - t1) / 1000.0 i += 1 } spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show() spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show() ``` Existing tests. Author: DB Tsai <[email protected]> Author: Brian Lindblom <[email protected]> Closes apache#21952 from dbtsai/avro-performance-fix. (cherry picked from commit 273b284) RB=1516361 R=fli,mshen,yezhou,edlu A=fli
…cified schema on write Allows `avroSchema` option to be specified on write, allowing a user to specify a schema in cases where this is required. A trivial use case is reading in an avro dataset, making some small adjustment to a column or columns and writing out using the same schema. Implicit schema creation from SQL Struct results in a schema that while for the most part, is functionally similar, is not necessarily compatible. Allows `fixed` Field type to be utilized for records of specified `avroSchema` Unit tests in AvroSuite are extended to test this with enum and fixed types. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#21847 from lindblombr/specify_schema_on_write. Lead-authored-by: Brian Lindblom <[email protected]> Co-authored-by: DB Tsai <[email protected]> Signed-off-by: DB Tsai <[email protected]> (cherry picked from commit 0cea9e3) This is a partial backport of only the test cases in SPARK-24855, and the core part of SPARK-24855 has been merged in the previous commits. RB=2119392 BUG=LIHADOOP-53602 G=spark-reviewers R=mshen,ekrogen A=ekrogen
What changes were proposed in this pull request?
Allows
avroSchemaoption to be specified on write, allowing a user to specify a schema in cases where this is required. A trivial use case is reading in an avro dataset, making some small adjustment to a column or columns and writing out using the same schema. Implicit schema creation from SQL Struct results in a schema that while for the most part, is functionally similar, is not necessarily compatible.Allows
fixedField type to be utilized for records of specifiedavroSchemaHow was this patch tested?
Unit tests in AvroSuite are extended to test this with enum and fixed types.
Please review http://spark.apache.org/contributing.html before opening a pull request.