Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,27 @@ import scala.collection.mutable.ArrayBuffer
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()

private val converter: Any => Any = rootCatalystType match {
def deserialize(data: Any): Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
(data: Any) => InternalRow.empty
InternalRow.empty

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val writer = getRecordWriter(rootAvroType, st, Nil)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
writer(fieldUpdater, record)
resultRow
}
val record = data.asInstanceOf[GenericRecord]
writer(fieldUpdater, record)
resultRow

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil)
(data: Any) => {
writer(fieldUpdater, 0, data)
tmpRow.get(0, rootCatalystType)
}
writer(fieldUpdater, 0, data)
tmpRow.get(0, rootCatalystType)
}

def deserialize(data: Any): Any = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ import org.apache.spark.sql.types._
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val converter: Any => Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
case st: StructType =>
Expand All @@ -63,14 +59,13 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
converter.apply(tmpRow, 0)
}
if (nullable) {
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
} else {
baseConverter
baseConverter.apply(catalystData)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,28 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
private val timestampRebaseFunc = createTimestampRebaseFuncInRead(
datetimeRebaseMode, "Avro")

private val converter: Any => Option[Any] = rootCatalystType match {
def deserialize(data: Any): Option[Any] = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
(data: Any) => Some(InternalRow.empty)
Some(InternalRow.empty)

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, applyFilters)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
}
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil)
(data: Any) => {
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}

def deserialize(data: Any): Option[Any] = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = {
def serialize(catalystData: Any): Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = rootCatalystType match {
case st: StructType =>
Expand All @@ -80,14 +76,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
converter.apply(tmpRow, 0)
}
if (nullable) {
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
} else {
baseConverter
baseConverter.apply(catalystData)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,39 +72,33 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,

private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")

private val converter: Any => Option[Any] = try {
def deserialize(data: Any): Option[Any] = try {
rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
(_: Any) => Some(InternalRow.empty)
Some(InternalRow.empty)

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
}
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
(data: Any) => {
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
} catch {
case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
}

def deserialize(data: Any): Option[Any] = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = {
def serialize(catalystData: Any): Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = try {
rootCatalystType match {
Expand All @@ -94,14 +90,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
}
if (nullable) {
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
} else {
baseConverter
baseConverter.apply(catalystData)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,39 +71,33 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,

private val timestampRebaseFunc = createTimestampRebaseFuncInRead(datetimeRebaseSpec, "Avro")

private val converter: Any => Option[Any] = try {
def deserialize(data: Any): Option[Any] = try {
rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
(_: Any) => Some(InternalRow.empty)
Some(InternalRow.empty)

case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
val fieldUpdater = new RowUpdater(resultRow)
val applyFilters = filters.skipRow(resultRow, _)
val writer = getRecordWriter(rootAvroType, st, Nil, Nil, applyFilters)
(data: Any) => {
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)
}
val record = data.asInstanceOf[GenericRecord]
val skipRow = writer(fieldUpdater, record)
if (skipRow) None else Some(resultRow)

case _ =>
val tmpRow = new SpecificInternalRow(Seq(rootCatalystType))
val fieldUpdater = new RowUpdater(tmpRow)
val writer = newWriter(rootAvroType, rootCatalystType, Nil, Nil)
(data: Any) => {
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
writer(fieldUpdater, 0, data)
Some(tmpRow.get(0, rootCatalystType))
}
} catch {
case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException(
s"Cannot convert Avro type $rootAvroType to SQL type ${rootCatalystType.sql}.", ise)
}

def deserialize(data: Any): Option[Any] = converter(data)

/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
}

def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}

private val dateRebaseFunc = createDateRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val timestampRebaseFunc = createTimestampRebaseFuncInWrite(
datetimeRebaseMode, "Avro")

private val converter: Any => Any = {
def serialize(catalystData: Any): Any = {
val actualAvroType = resolveNullableType(rootAvroType, nullable)
val baseConverter = try {
rootCatalystType match {
Expand All @@ -93,14 +89,13 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
s"Cannot convert SQL type ${rootCatalystType.sql} to Avro type $rootAvroType.", ise)
}
if (nullable) {
(data: Any) =>
if (data == null) {
null
} else {
baseConverter.apply(data)
}
if (catalystData == null) {
null
} else {
baseConverter.apply(catalystData)
}
} else {
baseConverter
baseConverter.apply(catalystData)
}
}

Expand Down