Skip to content

Commit 9809aad

Browse files
tdasRobert Kruszewski
authored andcommitted
[SPARK-18671][SS][TEST] Added tests to ensure stability of that all Structured Streaming log formats
## What changes were proposed in this pull request? To be able to restart StreamingQueries across Spark version, we have already made the logs (offset log, file source log, file sink log) use json. We should added tests with actual json files in the Spark such that any incompatible changes in reading the logs is immediately caught. This PR add tests for FileStreamSourceLog, FileStreamSinkLog, and OffsetSeqLog. ## How was this patch tested? new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#16128 from tdas/SPARK-18671.
1 parent 2402cc6 commit 9809aad

15 files changed

Lines changed: 114 additions & 3 deletions

File tree

dev/.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ org.apache.spark.scheduler.ExternalClusterManager
102102
.Rbuildignore
103103
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
104104
spark-warehouse
105+
structured-streaming/*
105106
circle.yml
106107
.credentials
107108
publish.sh

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ private object JsonUtils {
8181
*/
8282
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
8383
val result = new HashMap[String, HashMap[Int, Long]]()
84-
partitionOffsets.foreach { case (tp, off) =>
84+
implicit val ordering = new Ordering[TopicPartition] {
85+
override def compare(x: TopicPartition, y: TopicPartition): Int = {
86+
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
87+
}
88+
}
89+
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
90+
partitions.foreach { tp =>
91+
val off = partitionOffsets(tp)
8592
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
8693
parts += tp.partition -> off
8794
result += tp.topic -> parts

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
8989
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
9090
}
9191
}
92+
93+
test("read Spark 2.1.0 log format") {
94+
val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
95+
assert(KafkaSourceOffset(offset) ===
96+
KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
97+
}
98+
99+
private def readFromResource(file: String): SerializedOffset = {
100+
import scala.io.Source
101+
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
102+
SerializedOffset(str)
103+
}
92104
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
v1
2+
{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
3+
{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
4+
{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
5+
{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
6+
{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
7+
{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
8+
{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
9+
{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
v1
2+
{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
3+
{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
v1
2+
{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
3+
{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
4+
{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
v1
2+
{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
345

0 commit comments

Comments
 (0)