Skip to content

Commit 558b395

Browse files
wangzixuan.wzxuansrowen
authored andcommitted
[SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…
…oDeserializer ### What changes were proposed in this pull request? Add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer. ### Why are the changes needed? - HeapBuffer.get(bytes) puts the data from POS to the end into bytes, and sets POS as the end. The next call will return empty bytes. - The second call of AvroDeserializer will return an InternalRow with empty binary column when avro record has binary column. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add ut in AvroCatalystDataConversionSuite. Closes #36973 from wzx140/avro-fix. Authored-by: wangzixuan.wzxuan <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 5ad8fbc commit 558b395

2 files changed

Lines changed: 23 additions & 0 deletions

File tree

connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ private[sql] class AvroDeserializer(
195195
case b: ByteBuffer =>
196196
val bytes = new Array[Byte](b.remaining)
197197
b.get(bytes)
198+
// Do not forget to reset the position
199+
b.rewind()
198200
bytes
199201
case b: Array[Byte] => b
200202
case other =>

connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,4 +360,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
360360
None,
361361
new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema))
362362
}
363+
364+
test("AvroDeserializer with binary type") {
365+
val jsonFormatSchema =
366+
"""
367+
|{
368+
| "type": "record",
369+
| "name": "record",
370+
| "fields" : [
371+
| {"name": "a", "type": "bytes"}
372+
| ]
373+
|}
374+
""".stripMargin
375+
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
376+
val avroRecord = new GenericData.Record(avroSchema)
377+
val bb = java.nio.ByteBuffer.wrap(Array[Byte](97, 48, 53))
378+
avroRecord.put("a", bb)
379+
380+
val expected = InternalRow(Array[Byte](97, 48, 53))
381+
checkDeserialization(avroSchema, avroRecord, Some(expected))
382+
checkDeserialization(avroSchema, avroRecord, Some(expected))
383+
}
363384
}

0 commit comments

Comments
 (0)