From d31cb76756f7aa2c9c3c803d263ae81f5f509ff2 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 18 Feb 2017 09:20:18 +0800 Subject: [PATCH 1/3] File Source reads from File Sink --- .../execution/datasources/DataSource.scala | 26 +---- .../execution/streaming/FileStreamSink.scala | 27 ++++- .../streaming/FileStreamSource.scala | 20 +++- .../sql/streaming/FileStreamSourceSuite.scala | 108 ++++++++++++++++++ .../spark/sql/streaming/StreamTest.scala | 13 ++- 5 files changed, 167 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d510581f90e6..c1353d41e07b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -279,28 +279,6 @@ case class DataSource( } } - /** - * Returns true if there is a single path that has a metadata log indicating which files should - * be read. - */ - def hasMetadata(path: Seq[String]): Boolean = { - path match { - case Seq(singlePath) => - try { - val hdfsPath = new Path(singlePath) - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir) - val res = fs.exists(metadataPath) - res - } catch { - case NonFatal(e) => - logWarning(s"Error while looking for metadata directory.") - false - } - case _ => false - } - } - /** * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this * [[DataSource]] @@ -331,7 +309,9 @@ case class DataSource( // We are reading from the results of a streaming query. Load files from the metadata log // instead of listing them using HDFS APIs. case (format: FileFormat, _) - if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => + if FileStreamSink.hasMetadata( + caseInsensitiveOptions.get("path").toSeq ++ paths, + sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) val dataSchema = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 0dbe2a71ed3b..07ec4e9429e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.streaming +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -25,9 +28,31 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} -object FileStreamSink { +object FileStreamSink extends Logging { // The name of the subdirectory that is used to store metadata about which files are valid. val metadataDir = "_spark_metadata" + + /** + * Returns true if there is a single path that has a metadata log indicating which files should + * be read. + */ + def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = { + path match { + case Seq(singlePath) => + try { + val hdfsPath = new Path(singlePath) + val fs = hdfsPath.getFileSystem(hadoopConf) + val metadataPath = new Path(hdfsPath, metadataDir) + val res = fs.exists(metadataPath) + res + } catch { + case NonFatal(e) => + logWarning(s"Error while looking for metadata directory.") + false + } + case _ => false + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 39c0b4979687..3312409be4be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -157,13 +157,29 @@ class FileStreamSource( checkFilesExist = false))) } + /** + * If the source has a metadata log indicating which files should be read, then we should use it. + * We figure out whether there exists some metadata log only when user gives a non-glob path. + */ + private val sourceHasMetadata: Boolean = + !SparkHadoopUtil.get.isGlobPath(new Path(path)) && + FileStreamSink.hasMetadata(Seq(path), sparkSession.sessionState.newHadoopConf()) + /** * Returns a list of files found, sorted by their timestamp. */ private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime - val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) - val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) + val catalog = + if (sourceHasMetadata) { + // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a + // non-glob path + new MetadataLogFileIndex(sparkSession, qualifiedBasePath) + } else { + // `qualifiedBasePath` can contains glob patterns + val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) + new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) + } val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 5110d89c85b1..9d1e67c3013c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -662,6 +662,114 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to text files + val q1_source = MemoryStream[String] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1 = + q1_source + .toDF() + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .format("text") + .start(q1_outputDir) + + // q2 is a streaming query that reads q1's text outputs + val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + + def q1AddData(data: String*): StreamAction = + Execute { _ => + q1_source.addData(data) + q1.processAllAvailable() + } + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { + // create a text file that won't be on q1's sink log + // thus even if its contents contains "keep", it should NOT appear in q2's answer + val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") + stringToFile(shouldNotKeep, "should_not_keep!!!") + shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { + // compact interval is 3, so file "2.compact" should exist + new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually + Execute { _ => q1.stop() } + ) + } + } + } + + test("read partitioned data from outputs of another streaming query") { + withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to partitioned json files + val q1_source = MemoryStream[(String, String)] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1 = + q1_source + .toDF() + .select($"_1" as "partition", $"_2" as "value") + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .partitionBy("partition") + .format("json") + .start(q1_outputDir) + + // q2 is a streaming query that reads q1's partitioned json outputs + val schema = new StructType().add("value", StringType).add("partition", StringType) + val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep") + + def q1AddData(data: (String, String)*): StreamAction = + Execute { _ => + q1_source.addData(data) + q1.processAllAvailable() + } + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( + // batch 0: append to a new partition=foo, should read value and partition + q1AddData(("foo", "drop1"), ("foo", "keep2")), + q2ProcessAllAvailable(), + CheckAnswer(("keep2", "foo")), + + // batch 1: append to same partition=foo, should read value and partition + q1AddData(("foo", "keep3")), + q2ProcessAllAvailable(), + CheckAnswer(("keep2", "foo"), ("keep3", "foo")), + + // batch 2: append to a different partition=bar, should read value and partition + q1AddData(("bar", "keep4")), + q2ProcessAllAvailable(), + CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")), + + // stop q1 manually + Execute { _ => q1.stop() } + ) + } + } + test("when schema inference is turned on, should read partition data") { def createFile(content: String, src: File, tmp: File): Unit = { val tempFile = Utils.tempFileWith(new File(tmp, "text")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index af2f31a34d8d..e32d3ea89ab0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } + /** Execute arbitrary code */ + case class Execute(val func: StreamExecution => Any) extends StreamAction { + override def toString: String = s"Execute()" + } + class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { private var waitStartTime: Option[Long] = None @@ -472,10 +477,16 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case a: AssertOnQuery => verify(currentStream != null || lastStream != null, - "cannot assert when not stream has been started") + "cannot assert when no stream has been started") val streamToAssert = Option(currentStream).getOrElse(lastStream) verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}") + case exe: Execute => + verify(currentStream != null || lastStream != null, + "cannot execute when no stream has been started") + val streamToExecute = Option(currentStream).getOrElse(lastStream) + exe.func(streamToExecute) + case a: Assert => val streamToAssert = Option(currentStream).getOrElse(lastStream) verify({ a.run(); true }, s"Assert failed: ${a.message}") From eed1c049c7cc955085c701f43f0d461e86aba328 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 28 Feb 2017 14:28:09 +0800 Subject: [PATCH 2/3] Deal with corner cases --- .../streaming/FileStreamSource.scala | 68 +++++++++++++++---- .../sql/streaming/FileStreamSourceSuite.scala | 62 ++++++++++++++--- 2 files changed, 104 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 3312409be4be..df45352bde3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -43,8 +43,10 @@ class FileStreamSource( private val sourceOptions = new FileStreamOptions(options) + private val hadoopConf = sparkSession.sessionState.newHadoopConf() + private val qualifiedBasePath: Path = { - val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = new Path(path).getFileSystem(hadoopConf) fs.makeQualified(new Path(path)) // can contains glob patterns } @@ -159,28 +161,64 @@ class FileStreamSource( /** * If the source has a metadata log indicating which files should be read, then we should use it. - * We figure out whether there exists some metadata log only when user gives a non-glob path. + * Only when user gives a non-glob path that will we figure out whether the source has some + * metadata log + * + * None means we don't know at the moment + * Some(true) means we know for sure the source DOES have metadata + * Some(false) means we know for sure the source DOSE NOT have metadata */ - private val sourceHasMetadata: Boolean = - !SparkHadoopUtil.get.isGlobPath(new Path(path)) && - FileStreamSink.hasMetadata(Seq(path), sparkSession.sessionState.newHadoopConf()) + @volatile private[sql] var sourceHasMetadata: Option[Boolean] = + if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None + + private def allFilesUsingInMemoryFileIndex() = { + val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) + val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) + fileIndex.allFiles() + } + + private def allFilesUsingMetadataLogFileIndex() = { + // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a + // non-glob path + new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + } /** * Returns a list of files found, sorted by their timestamp. */ private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime - val catalog = - if (sourceHasMetadata) { - // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a - // non-glob path - new MetadataLogFileIndex(sparkSession, qualifiedBasePath) + + var allFiles: Seq[FileStatus] = null + if (sourceHasMetadata.isEmpty) { + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + sourceHasMetadata = Some(true) + allFiles = allFilesUsingMetadataLogFileIndex() } else { - // `qualifiedBasePath` can contains glob patterns - val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) - new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) + allFiles = allFilesUsingInMemoryFileIndex() + if (allFiles.isEmpty) { + // we still cannot decide + } else { + // decide what to use for future rounds + // double check whether source has metadata, preventing the extreme corner case that + // metadata log and data files are only generated after the previous + // `FileStreamSink.hasMetadata` check + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + sourceHasMetadata = Some(true) + allFiles = allFilesUsingMetadataLogFileIndex() + } else { + sourceHasMetadata = Some(false) + // `allFiles` have already been fetched using InMemoryFileIndex in this round + } + } } - val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status => + } else if (sourceHasMetadata == Some(true)) { + allFiles = allFilesUsingMetadataLogFileIndex() + } else { + allFiles = allFilesUsingInMemoryFileIndex() + } + + val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9d1e67c3013c..ba36f402b1e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -52,10 +52,7 @@ abstract class FileStreamSourceTest query.nonEmpty, "Cannot add data when there is no query for finding the active file stream source") - val sources = query.get.logicalPlan.collect { - case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => - source.asInstanceOf[FileStreamSource] - } + val sources = getSourcesFromStreamingQuery(query.get) if (sources.isEmpty) { throw new Exception( "Could not find file source in the StreamExecution logical plan to add data to") @@ -134,6 +131,14 @@ abstract class FileStreamSourceTest }.head } + protected def getSourcesFromStreamingQuery(query: StreamExecution): Seq[FileStreamSource] = { + query.logicalPlan.collect { + case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => + source.asInstanceOf[FileStreamSource] + } + } + + protected def withTempDirs(body: (File, File) => Unit) { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") @@ -388,9 +393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { CheckAnswer("a", "b", "c", "d"), AssertOnQuery("seen files should contain only one entry") { streamExecution => - val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation => - e.source.asInstanceOf[FileStreamSource] - }.head + val source = getSourcesFromStreamingQuery(streamExecution).head assert(source.seenFiles.size == 1) true } @@ -770,6 +773,46 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("start before another streaming query, and read its output") { + withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to text files + val q1_source = MemoryStream[String] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir") + assert(q1_outputDir.mkdir()) // prepare the output dir for q2 to read + val q1_write = q1_source + .toDF() + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .format("text") // define q1, but don't start it for now + var q1: StreamingQuery = null + + val q2 = + createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep") + + testStream(q2)( + AssertOnQuery { q2 => + val fileSource = getSourcesFromStreamingQuery(q2).head + fileSource.sourceHasMetadata === None // q1 has not started yet, verify that q2 + // doesn't know whether q1 has metadata + }, + Execute { _ => + q1 = q1_write.start(q1_outputDir.getCanonicalPath) // start q1 !!! + q1_source.addData("drop1", "keep2") + q1.processAllAvailable() + }, + AssertOnQuery { q2 => + q2.processAllAvailable() + val fileSource = getSourcesFromStreamingQuery(q2).head + fileSource.sourceHasMetadata === Some(true) // q1 has started, verify that q2 knows q1 has + // metadata by now + }, + CheckAnswer("keep2"), // answer should be correct + Execute { _ => q1.stop() } // stop q1 manually + ) + } + } + test("when schema inference is turned on, should read partition data") { def createFile(content: String, src: File, tmp: File): Unit = { val tempFile = Utils.tempFileWith(new File(tmp, "text")) @@ -863,10 +906,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .streamingQuery q.processAllAvailable() val memorySink = q.sink.asInstanceOf[MemorySink] - val fileSource = q.logicalPlan.collect { - case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => - source.asInstanceOf[FileStreamSource] - }.head + val fileSource = getSourcesFromStreamingQuery(q).head /** Check the data read in the last batch */ def checkLastBatchData(data: Int*): Unit = { From 62fd5189d9ba27ed3ed20cc103252aa9fdac052a Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Wed, 1 Mar 2017 10:50:21 +0800 Subject: [PATCH 3/3] Fix styles --- .../streaming/FileStreamSource.scala | 43 ++++--- .../sql/streaming/FileStreamSourceSuite.scala | 111 +++++------------- .../spark/sql/streaming/StreamTest.scala | 11 +- 3 files changed, 53 insertions(+), 112 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index df45352bde3e..6a7263ca45d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -190,32 +190,31 @@ class FileStreamSource( val startTime = System.nanoTime var allFiles: Seq[FileStatus] = null - if (sourceHasMetadata.isEmpty) { - if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { - sourceHasMetadata = Some(true) - allFiles = allFilesUsingMetadataLogFileIndex() - } else { - allFiles = allFilesUsingInMemoryFileIndex() - if (allFiles.isEmpty) { - // we still cannot decide + sourceHasMetadata match { + case None => + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + sourceHasMetadata = Some(true) + allFiles = allFilesUsingMetadataLogFileIndex() } else { - // decide what to use for future rounds - // double check whether source has metadata, preventing the extreme corner case that - // metadata log and data files are only generated after the previous - // `FileStreamSink.hasMetadata` check - if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { - sourceHasMetadata = Some(true) - allFiles = allFilesUsingMetadataLogFileIndex() + allFiles = allFilesUsingInMemoryFileIndex() + if (allFiles.isEmpty) { + // we still cannot decide } else { - sourceHasMetadata = Some(false) - // `allFiles` have already been fetched using InMemoryFileIndex in this round + // decide what to use for future rounds + // double check whether source has metadata, preventing the extreme corner case that + // metadata log and data files are only generated after the previous + // `FileStreamSink.hasMetadata` check + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + sourceHasMetadata = Some(true) + allFiles = allFilesUsingMetadataLogFileIndex() + } else { + sourceHasMetadata = Some(false) + // `allFiles` have already been fetched using InMemoryFileIndex in this round + } } } - } - } else if (sourceHasMetadata == Some(true)) { - allFiles = allFilesUsingMetadataLogFileIndex() - } else { - allFiles = allFilesUsingInMemoryFileIndex() + case Some(true) => allFiles = allFilesUsingMetadataLogFileIndex() + case Some(false) => allFiles = allFilesUsingInMemoryFileIndex() } val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index ba36f402b1e4..1586850c77fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -667,25 +667,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("read data from outputs of another streaming query") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { - withTempDirs { case (dir, tmp) => + withTempDirs { case (outputDir, checkpointDir) => // q1 is a streaming query that reads from memory and writes to text files - val q1_source = MemoryStream[String] - val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath - val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1Source = MemoryStream[String] val q1 = - q1_source + q1Source .toDF() .writeStream - .option("checkpointLocation", q1_checkpointDir) + .option("checkpointLocation", checkpointDir.getCanonicalPath) .format("text") - .start(q1_outputDir) + .start(outputDir.getCanonicalPath) // q2 is a streaming query that reads q1's text outputs - val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + val q2 = + createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep") def q1AddData(data: String*): StreamAction = Execute { _ => - q1_source.addData(data) + q1Source.addData(data) q1.processAllAvailable() } def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } @@ -699,8 +698,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // batch 1 Assert { // create a text file that won't be on q1's sink log - // thus even if its contents contains "keep", it should NOT appear in q2's answer - val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") + // thus even if its content contains "keep", it should NOT appear in q2's answer + val shouldNotKeep = new File(outputDir, "should_not_keep.txt") stringToFile(shouldNotKeep, "should_not_keep!!!") shouldNotKeep.exists() }, @@ -712,103 +711,51 @@ class FileStreamSourceSuite extends FileStreamSourceTest { q1AddData("keep4"), Assert { // compact interval is 3, so file "2.compact" should exist - new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() }, q2ProcessAllAvailable(), CheckAnswer("keep2", "keep3", "keep4"), - // stop q1 manually Execute { _ => q1.stop() } ) } } } - test("read partitioned data from outputs of another streaming query") { - withTempDirs { case (dir, tmp) => - // q1 is a streaming query that reads from memory and writes to partitioned json files - val q1_source = MemoryStream[(String, String)] - val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath - val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath - val q1 = - q1_source - .toDF() - .select($"_1" as "partition", $"_2" as "value") - .writeStream - .option("checkpointLocation", q1_checkpointDir) - .partitionBy("partition") - .format("json") - .start(q1_outputDir) - - // q2 is a streaming query that reads q1's partitioned json outputs - val schema = new StructType().add("value", StringType).add("partition", StringType) - val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep") - - def q1AddData(data: (String, String)*): StreamAction = - Execute { _ => - q1_source.addData(data) - q1.processAllAvailable() - } - def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } - - testStream(q2)( - // batch 0: append to a new partition=foo, should read value and partition - q1AddData(("foo", "drop1"), ("foo", "keep2")), - q2ProcessAllAvailable(), - CheckAnswer(("keep2", "foo")), - - // batch 1: append to same partition=foo, should read value and partition - q1AddData(("foo", "keep3")), - q2ProcessAllAvailable(), - CheckAnswer(("keep2", "foo"), ("keep3", "foo")), - - // batch 2: append to a different partition=bar, should read value and partition - q1AddData(("bar", "keep4")), - q2ProcessAllAvailable(), - CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")), - - // stop q1 manually - Execute { _ => q1.stop() } - ) - } - } - test("start before another streaming query, and read its output") { - withTempDirs { case (dir, tmp) => + withTempDirs { case (outputDir, checkpointDir) => // q1 is a streaming query that reads from memory and writes to text files - val q1_source = MemoryStream[String] - val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath - val q1_outputDir = new File(dir, "q1_outputDir") - assert(q1_outputDir.mkdir()) // prepare the output dir for q2 to read - val q1_write = q1_source - .toDF() - .writeStream - .option("checkpointLocation", q1_checkpointDir) - .format("text") // define q1, but don't start it for now + val q1Source = MemoryStream[String] + // define q1, but don't start it for now + val q1Write = + q1Source + .toDF() + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("text") var q1: StreamingQuery = null - val q2 = - createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep") + val q2 = createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep") testStream(q2)( AssertOnQuery { q2 => val fileSource = getSourcesFromStreamingQuery(q2).head - fileSource.sourceHasMetadata === None // q1 has not started yet, verify that q2 - // doesn't know whether q1 has metadata + // q1 has not started yet, verify that q2 doesn't know whether q1 has metadata + fileSource.sourceHasMetadata === None }, Execute { _ => - q1 = q1_write.start(q1_outputDir.getCanonicalPath) // start q1 !!! - q1_source.addData("drop1", "keep2") + q1 = q1Write.start(outputDir.getCanonicalPath) + q1Source.addData("drop1", "keep2") q1.processAllAvailable() }, AssertOnQuery { q2 => q2.processAllAvailable() val fileSource = getSourcesFromStreamingQuery(q2).head - fileSource.sourceHasMetadata === Some(true) // q1 has started, verify that q2 knows q1 has - // metadata by now + // q1 has started, verify that q2 knows q1 has metadata by now + fileSource.sourceHasMetadata === Some(true) }, - CheckAnswer("keep2"), // answer should be correct - Execute { _ => q1.stop() } // stop q1 manually + CheckAnswer("keep2"), + Execute { _ => q1.stop() } ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index e32d3ea89ab0..60e2375a9817 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -209,8 +209,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } /** Execute arbitrary code */ - case class Execute(val func: StreamExecution => Any) extends StreamAction { - override def toString: String = s"Execute()" + object Execute { + def apply(func: StreamExecution => Any): AssertOnQuery = + AssertOnQuery(query => { func(query); true }) } class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { @@ -481,12 +482,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val streamToAssert = Option(currentStream).getOrElse(lastStream) verify(a.condition(streamToAssert), s"Assert on query failed: ${a.message}") - case exe: Execute => - verify(currentStream != null || lastStream != null, - "cannot execute when no stream has been started") - val streamToExecute = Option(currentStream).getOrElse(lastStream) - exe.func(streamToExecute) - case a: Assert => val streamToAssert = Option(currentStream).getOrElse(lastStream) verify({ a.run(); true }, s"Assert failed: ${a.message}")