Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ org.apache.spark.scheduler.ExternalClusterManager
.Rbuildignore
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
structured-streaming/*
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ private object JsonUtils {
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
partitionOffsets.foreach { case (tp, off) =>
implicit val ordering = new Ordering[TopicPartition] {
override def compare(x: TopicPartition, y: TopicPartition): Int = {
Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
}
}
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can use partitionOffsets.toSeq.sortBy(_._1).foreach { case (tp, off) => to simplify the codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to sort by topic and partitions together. so that partitions are ordered when json is generated (currently is not) and hard to read.

val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,16 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}

test("read Spark 2.1.0 log format") {
val offset = readFromResource("kafka-source-offset-version-2.1.0.txt")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe not need to read json from a file since we never write them into a single file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. but its good to have it in a separate file in the same place as other formats. will be easier to track all the things that need compatibility guarantees.

assert(KafkaSourceOffset(offset) ===
KafkaSourceOffset(("topic1", 0, 456L), ("topic1", 1, 789L), ("topic2", 0, 0L)))
}

private def readFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
v1
{"path":"/a/b/0","size":1,"isDir":false,"modificationTime":1,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/1","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/2","size":200,"isDir":false,"modificationTime":200,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/3","size":300,"isDir":false,"modificationTime":300,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/4","size":400,"isDir":false,"modificationTime":400,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/5","size":500,"isDir":false,"modificationTime":500,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/6","size":600,"isDir":false,"modificationTime":600,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/7","size":700,"isDir":false,"modificationTime":700,"blockReplication":1,"blockSize":100,"action":"add"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"}
{"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"/a/b/9","size":900,"isDir":false,"modificationTime":900,"blockReplication":3,"blockSize":200,"action":"add"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
v1
{"path":"/a/b/0","timestamp":1480730949000,"batchId":0}
{"path":"/a/b/1","timestamp":1480730950000,"batchId":1}
{"path":"/a/b/2","timestamp":1480730950000,"batchId":2}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"/a/b/3","timestamp":1480730950000,"batchId":3}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"path":"/a/b/4","timestamp":1480730951000,"batchId":4}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
345
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"topic1":{"0":456,"1":789},"topic2":{"0":0}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}
0
{"topic-0":{"0":1}}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}

test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
))
}

/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
Expand All @@ -206,4 +221,10 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
f(sinkLog)
}
}

private def readFromResource(dir: String): Seq[SinkFileStatus] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
log.allFiles()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,21 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
Array(0 -> batch0Serialized, 1 -> batch1Serialized))
}
}

test("read Spark 2.1.0 log format") {
val (batchId, offsetSeq) = readFromResource("offset-log-version-2.1.0")
assert(batchId === 0)
assert(offsetSeq.offsets === Seq(
Some(SerializedOffset("0")),
Some(SerializedOffset("""{"topic-0":{"0":1}}"""))
))
assert(offsetSeq.metadata ===
Some("""{"batchWatermarkMs":0,"batchTimestampMs":1480981499528}"""))
}

private def readFromResource(dir: String): (Long, OffsetSeq) = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new OffsetSeqLog(spark, input.toString)
log.getLatest().get
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.streaming

import java.io.File

import scala.collection.mutable

import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1022,6 +1021,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
assert(options.maxFilesPerTrigger == Some(1))
}

test("FileStreamSource offset - read Spark 2.1.0 log format") {
val offset = readOffsetFromResource("file-source-offset-version-2.1.0.txt")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe not need to read json from a file since we never write them into a single file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above.

assert(LongOffset.convert(offset) === Some(LongOffset(345)))
}

test("FileStreamSourceLog - read Spark 2.1.0 log format") {
assert(readLogFromResource("file-source-log-version-2.1.0") === Seq(
FileEntry("/a/b/0", 1480730949000L, 0L),
FileEntry("/a/b/1", 1480730950000L, 1L),
FileEntry("/a/b/2", 1480730950000L, 2L),
FileEntry("/a/b/3", 1480730950000L, 3L),
FileEntry("/a/b/4", 1480730951000L, 4L)
))
}

private def readLogFromResource(dir: String): Seq[FileEntry] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
log.allFiles()
}

private def readOffsetFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str.trim)
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down