Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@ package org.apache.spark.sql.avro

import java.io.ByteArrayOutputStream

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.{BinaryEncoder, EncoderFactory}

import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{BinaryType, DataType}

case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
case class CatalystDataToAvro(
child: Expression,
jsonFormatSchema: Option[String]) extends UnaryExpression {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a default value None?

-   jsonFormatSchema: Option[String]) extends UnaryExpression {
+   jsonFormatSchema: Option[String] = None) extends UnaryExpression {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am trying to avoid parameter with a default value. The result is quite different with/without a specified schema.
Also, this is consistent with CatalystDataToAvro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless the default value is used a lot in tests, I don't think we should add default value in internal classes. We should force the caller side to specify the parameter when they instantiate the internal class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, @gengliangwang and @cloud-fan .


override def dataType: DataType = BinaryType

@transient private lazy val avroType =
SchemaConverters.toAvroType(child.dataType, child.nullable)
jsonFormatSchema
.map(new Schema.Parser().parse)
.getOrElse(SchemaConverters.toAvroType(child.dataType, child.nullable))

@transient private lazy val serializer =
new AvroSerializer(child.dataType, avroType, child.nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ object functions {
*/
@Experimental
def to_avro(data: Column): Column = {
new Column(CatalystDataToAvro(data.expr))
new Column(CatalystDataToAvro(data.expr, None))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the default value None, we don't need to touch this function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in #25419 (comment)


/**
* Converts a column into binary of avro format.
*
* @param data the data column.
* @param jsonFormatSchema user-specified output avro schema in JSON string format.
*
* @since 3.0.0
*/
@Experimental
def to_avro(data: Column, jsonFormatSchema: String): Column = {
new Column(CatalystDataToAvro(data.expr, Some(jsonFormatSchema)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal}
Expand All @@ -38,12 +38,12 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite

private def checkResult(data: Literal, schema: String, expected: Any): Unit = {
checkEvaluation(
AvroDataToCatalyst(CatalystDataToAvro(data), schema, Map.empty),
AvroDataToCatalyst(CatalystDataToAvro(data, None), schema, Map.empty),
prepareExpectedResult(expected))
}

protected def checkUnsupportedRead(data: Literal, schema: String): Unit = {
val binary = CatalystDataToAvro(data)
val binary = CatalystDataToAvro(data, None)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we have the default value, we don't need to change line 41 and 46.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in #25419 (comment)

intercept[Exception] {
AvroDataToCatalyst(binary, schema, Map("mode" -> "FAILFAST")).eval()
}
Expand Down Expand Up @@ -209,4 +209,32 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
checkUnsupportedRead(input, avroSchema)
}
}

test("user-specified schema") {
val data = Literal("SPADES")
val jsonFormatSchema =
"""
|{ "type": "enum",
| "name": "Suit",
| "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
|}
""".stripMargin
checkEvaluation(
AvroDataToCatalyst(
CatalystDataToAvro(
data,
Some(jsonFormatSchema)),
jsonFormatSchema,
options = Map.empty),
data.eval())
val message = intercept[SparkException] {
AvroDataToCatalyst(
CatalystDataToAvro(
data,
None),
jsonFormatSchema,
options = Map.empty).eval()
}.getMessage
assert(message.contains("Malformed records are detected in record parsing."))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, CatalystDataToAvro ignores the given scheme in case of None, doesn't it? For me, this error seems to come from AvroDataToCatalyst instead of CatalystDataToAvro.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this error comes from AvroDataToCatalyst, this test coverage is misleading. For example, we had better have a test coverage for

  • a test whether CatalystDataToAvro(data, None) successfully ignores None without any exception.
  • a test whether CatalystDataToAvro(data, "") fails with that error message (?)

How do you think about that, @gengliangwang ?

Copy link
Member Author

@gengliangwang gengliangwang Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here AvroDataToCatalyst is just to check the Avro schema of CatalystDataToAvro.

  1. When jsonFormatSchema is provided in CatalystDataToAvro, the output Avro schema is enum type, and we validate it with AvroDataToCatalyst. This proves that the provided schema works.
  2. When the jsonFormatSchema is None, the output Avro schema is string type, and it can't be parsed as enum type.

I will change the order of the two checks in the case and add a new test case for invalid user-specified schema

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Thanks, @gengliangwang .

}
}
21 changes: 15 additions & 6 deletions python/pyspark/sql/avro/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,35 @@ def from_avro(data, jsonFormatSchema, options={}):

@ignore_unicode_prefix
@since(3.0)
def to_avro(data):
def to_avro(data, jsonFormatSchema=""):
"""
Converts a column into binary of avro format.

Note: Avro is built-in but external data source module since Spark 2.4. Please deploy the
application as per the deployment section of "Apache Avro Data Source Guide".

:param data: the data column.
:param jsonFormatSchema: user-specified output avro schema in JSON string format.

>>> from pyspark.sql import Row
>>> from pyspark.sql.avro.functions import to_avro
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_avro(df.value).alias("avro")).collect()
[Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))]
>>> data = ['SPADES']
>>> df = spark.createDataFrame(data, "string")
>>> df.select(to_avro(df.value).alias("suite")).collect()
[Row(suite=bytearray(b'\\x00\\x0cSPADES'))]
>>> jsonFormatSchema = '''["null", {"type": "enum", "name": "value",
... "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]}]'''
>>> df.select(to_avro(df.value, jsonFormatSchema).alias("suite")).collect()
[Row(suite=bytearray(b'\\x02\\x00'))]
"""

sc = SparkContext._active_spark_context
try:
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
if jsonFormatSchema == "":
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(_to_java_column(data))
else:
jc = sc._jvm.org.apache.spark.sql.avro.functions.to_avro(
_to_java_column(data), jsonFormatSchema)
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
_print_missing_jar("Avro", "avro", "avro", sc.version)
Expand Down