Skip to content

Commit 0159701

Browse files
committed
address review comments
1 parent 762f02a commit 0159701

2 files changed

Lines changed: 9 additions & 57 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
363363
val valueRowBuffer = new Array[Byte](valueSize)
364364
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
365365
val valueRow = new UnsafeRow(valueSchema.fields.length)
366-
// If valueSize in existing file is not multiple of 8, round it down to multiple of 8
366+
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
367+
// This is work around for the following.
368+
// Pre-Spark 2.3 mistakenly append 4 bytes to the value row in
369+
// `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data
370+
assert(valueSize % 8 == 0)
367371
valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
368372
map.put(keyRow, valueRow)
369373
}
@@ -428,7 +432,10 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
428432
val valueRowBuffer = new Array[Byte](valueSize)
429433
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
430434
val valueRow = new UnsafeRow(valueSchema.fields.length)
431-
// If valueSize in existing file is not multiple of 8, round it down to multiple of 8
435+
// If valueSize in existing file is not multiple of 8, floor it to multiple of 8.
436+
// This is work around for the following.
437+
// Pre-Spark 2.3 mistakenly append 4 bytes to the value row in
438+
// `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data
432439
valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8)
433440
map.put(keyRow, valueRow)
434441
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -479,61 +479,6 @@ class StreamSuite extends StreamTest {
479479
CheckAnswer((1, 2), (2, 2), (3, 2)))
480480
}
481481

482-
testQuietly("store to and recover from a checkpoint") {
483-
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
484-
485-
def query(data: MemoryStream[Int], checkpointDir: String, queryName: String):
486-
DataStreamWriter[Row] = {
487-
data.toDF
488-
.groupBy($"value")
489-
.agg(count("*"))
490-
.writeStream
491-
.outputMode("complete")
492-
.option("checkpointLocation", checkpointDir)
493-
.format("memory")
494-
.queryName(queryName)
495-
}
496-
497-
withSQLConf(
498-
SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
499-
var writeQuery: StreamingQuery = null
500-
try {
501-
val data = MemoryStream[Int]
502-
writeQuery = query(data, checkpointDir, "write").start()
503-
504-
data.addData(1, 2, 3, 4)
505-
writeQuery.processAllAvailable()
506-
data.addData(3, 4, 5, 6)
507-
writeQuery.processAllAvailable()
508-
data.addData(5, 6, 7, 8)
509-
writeQuery.processAllAvailable()
510-
} finally {
511-
assert(writeQuery != null)
512-
writeQuery.stop()
513-
}
514-
515-
var restartQuery: StreamingQuery = null
516-
try {
517-
val data = MemoryStream[Int]
518-
data.addData(1, 2, 3, 4)
519-
data.addData(3, 4, 5, 6)
520-
data.addData(5, 6, 7, 8)
521-
522-
restartQuery = query(data, checkpointDir, "counts").start()
523-
restartQuery.processAllAvailable()
524-
data.addData(9)
525-
restartQuery.processAllAvailable()
526-
527-
QueryTest.checkAnswer(spark.table("counts").toDF,
528-
Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
529-
Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
530-
} finally {
531-
assert(restartQuery != null)
532-
restartQuery.stop()
533-
}
534-
}
535-
}
536-
537482
testQuietly("recover from a Spark v2.1 checkpoint") {
538483
var inputData: MemoryStream[Int] = null
539484
var query: DataStreamWriter[Row] = null

0 commit comments

Comments
 (0)