From 6c9acdefe1c791bad3f00d845a72d07e2113b214 Mon Sep 17 00:00:00 2001 From: frreiss Date: Mon, 8 Aug 2016 19:28:32 -0700 Subject: [PATCH 01/17] Initial version of changes to Source trait --- .../sql/execution/streaming/Source.scala | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 971147840d2fd..36c3d00a7bb5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -25,21 +25,38 @@ import org.apache.spark.sql.types.StructType * monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark * will regularly query each [[Source]] to see if any more data is available. */ -trait Source { +trait Source { /** Returns the schema of the data from this source */ def schema: StructType + /** + * Returns the highest offset that this source has removed from its internal buffer + * in response to a call to `doneWithBatch`. + * Returns `None` if this source has not removed any data. */ + def getMinOffset: Option[Offset] + /** Returns the maximum available offset for this source. */ - def getOffset: Option[Offset] + def getMaxOffset: Option[Offset] /** - * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then - * the batch should begin with the first available record. This method must always return the - * same data for a particular `start` and `end` pair. + * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`, + * then the batch should begin with the first record. This method must always return the + * same data for a particular `start` and `end` pair; even after the Source has been restarted + * on a different node. + *

+ * Higher layers will always call this method with a value of `start` greater than or equal + * to the last value passed to `commit` and a value of `end` less than or equal to the + * last value returned by `getMaxOffset` */ def getBatch(start: Option[Offset], end: Offset): DataFrame + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + def commit(end: Offset) + /** Stop this source and free any resources it has allocated. */ def stop(): Unit } From dae72ff923edbbeb0f0ac2fd251a25d8372ad5fa Mon Sep 17 00:00:00 2001 From: frreiss Date: Thu, 11 Aug 2016 08:42:04 -0700 Subject: [PATCH 02/17] Changes to files that depend on the Source trait --- .../streaming/FileStreamSource.scala | 205 +++++++++++++++--- .../sql/execution/streaming/Source.scala | 10 +- .../execution/streaming/StreamExecution.scala | 33 ++- .../sql/execution/streaming/memory.scala | 51 ++++- .../sql/execution/streaming/socket.scala | 25 +-- .../spark/sql/streaming/StreamSuite.scala | 6 +- .../test/DataStreamReaderWriterSuite.scala | 6 +- 7 files changed, 276 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0cfad659dc92c..c9087ea09a5f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.Try import org.apache.hadoop.fs.Path @@ -31,7 +33,17 @@ import org.apache.spark.util.collection.OpenHashSet /** * A very simple source that reads text files from the given directory as they appear. * - * TODO Clean up the metadata files periodically + * Special options that this source can take (via `options`): + *

*/ class FileStreamSource( sparkSession: SparkSession, @@ -44,42 +56,44 @@ class FileStreamSource( private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf()) private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath) - private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) + + /** + * ID of the last batch committed and cleaned up, or -1 if no files have been removed. + */ + private var minBatchId = -1L + + /** + * ID of the most recent batch that has been entered into the metadata log, or -1 if + * no files have been processed at all. + */ + private var maxBatchId = -1L /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = getMaxFilesPerBatch() + /** + * Should files whose data has been committed be removed from the directory, along with + * their metadata entries? + */ + private val deleteCommittedFiles = getDeleteCommittedFiles() + private val seenFiles = new OpenHashSet[String] - metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => - files.foreach(seenFiles.add) - } /** - * Returns the maximum offset that can be retrieved from the source. - * - * `synchronized` on this method is for solving race conditions in tests. In the normal usage, - * there is no race here, so the cost of `synchronized` should be rare. + * Initialize the state of this source from the on-disk checkpoint, if present. */ - private def fetchMaxOffset(): LongOffset = synchronized { - val newFiles = fetchAllFiles().filter(!seenFiles.contains(_)) - val batchFiles = - if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles - batchFiles.foreach { file => - seenFiles.add(file) - logDebug(s"New file: $file") - } - logTrace(s"Number of new files = ${newFiles.size})") - logTrace(s"Number of files selected for batch = ${batchFiles.size}") - logTrace(s"Number of seen files = ${seenFiles.size}") - if (batchFiles.nonEmpty) { - maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles) - logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") - } + private def initialize() = { + maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) - new LongOffset(maxBatchId) + metadataLog.get(None, Some(maxBatchId)).foreach { + case (batchId, files) => + files.foreach(seenFiles.add) + minBatchId = math.max(minBatchId, batchId) + } } + initialize() + /** * For test only. Run `func` with the internal lock to make sure when `func` is running, * the current offset won't be changed and no new batch will be emitted. @@ -93,9 +107,26 @@ class FileStreamSource( new LongOffset(maxBatchId) } + override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + + ////////////////////////////////// + // BEGIN methods from Source trait + /** - * Returns the data that is between the offsets (`start`, `end`]. + * Returns the highest offset that this source has removed from its internal buffer + * in response to a call to `commit`. + * Returns `None` if this source has not removed any data. */ + override def getMinOffset: Option[Offset] = { + if (-1L == minBatchId) { + None + } else { + Some(new LongOffset(minBatchId)) + } + } + + override def getMaxOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) val endId = end.asInstanceOf[LongOffset].offset @@ -115,6 +146,113 @@ class FileStreamSource( Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation())) } + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + override def commit(end: Offset): Unit = { + if (end.isInstanceOf[LongOffset]) { + val lastCommittedBatch = end.asInstanceOf[LongOffset].offset + + // Build up a list of batches, then process them one by one + val firstCommittedBatch = math.max(minBatchId, 0) + + if (deleteCommittedFiles) { + var batchId = 0L; + for (batchId <- firstCommittedBatch to lastCommittedBatch) { + val files = metadataLog.get(batchId) + if (files.isDefined) { + + // Files may actually be directories, so use the recursive version + // of the delete method. + // TODO: This delete() should be wrapped in more error handling. + // Examples of problems to catch: Spark does not have permission to delete + // the file; or the filesystem metadata is corrupt. + files.get.foreach(f => fs.delete(new Path(f), true)) + + // TODO: Add a "remove" method to HDFSMetadataLog, then add code here to + // remove the metadata for each completed batch. It's important that we + // remove the metadata only after the files are deleted; otherwise we + // may not be able to tell what files to delete after a crash. + } + + } + } + + minBatchId = lastCommittedBatch + + val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end") + } + + batches.trimStart(offsetDiff) + lastCommittedOffset = newOffset + } else { + sys.error(s"FileStreamSource.commit() received an offset ($end) that did not " + + s"originate with an instance of this class") + } + + + } + + override def stop() {} + + // END methods from Source trait + //////////////////////////////// + + // All methods that follow are internal to this class. + + /** + * Scans the directory and creates batches from any new files found. + * + * Returns the maximum offset that can be retrieved from the source. + * + * `synchronized` on this method is for solving race conditions in tests. In the normal usage, + * there is no race here, so the cost of `synchronized` should be rare. + */ + private def fetchMaxOffset(): LongOffset = synchronized { + val newFiles = fetchAllFiles().filter(!seenFiles.contains(_)) + logTrace(s"Number of new files = ${newFiles.size})") + + // To make the source's behavior less nondeterministic, we process files in + // alphabetical order and ensure that every new file goes into a batch. + val remainingFiles = mutable.Queue[String]() + remainingFiles ++= newFiles.sorted + + val batches = ListBuffer[Seq[String]]() + if (maxFilesPerBatch.nonEmpty) { + while (remainingFiles.size > 0) { + batches += remainingFiles.take(maxFilesPerBatch.get) + } + } else { + batches += remainingFiles + } + + newFiles.foreach { file => + seenFiles.add(file) + logDebug(s"New file: $file") + } + + batches.foreach { + case batchFiles => + maxBatchId += 1 + metadataLog.add(maxBatchId, batchFiles) + logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") + } + + val batchFiles = + if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles + batchFiles.foreach { file => + seenFiles.add(file) + logDebug(s"New file: $file") + } + + new LongOffset(maxBatchId) + } + + private def fetchAllFiles(): Seq[String] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) @@ -132,6 +270,16 @@ class FileStreamSource( files } + private def getDeleteCommittedFiles(): Boolean = { + val str = options.getOrElse("deleteCommittedFiles", "false") + try { + str.toBoolean + } catch { + case _ => throw new IllegalArgumentException( + s"Invalid value '$str' for option 'deleteCommittedFiles', must be true or false") + } + } + private def getMaxFilesPerBatch(): Option[Int] = { new CaseInsensitiveMap(options) .get("maxFilesPerTrigger") @@ -143,9 +291,6 @@ class FileStreamSource( } } - override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) - override def toString: String = s"FileStreamSource[$qualifiedBasePath]" - override def stop() {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 36c3d00a7bb5c..e2cc16c8334d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -32,11 +32,15 @@ trait Source { /** * Returns the highest offset that this source has removed from its internal buffer - * in response to a call to `doneWithBatch`. - * Returns `None` if this source has not removed any data. */ + * in response to a call to `commit`. + * Returns `None` if this source has not removed any data. + */ def getMinOffset: Option[Offset] - /** Returns the maximum available offset for this source. */ + /** + * Returns the maximum available offset for this source. + * Returns `None` if this source has never received any data. + */ def getMaxOffset: Option[Offset] /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index af2229a46bebb..fcb894ed16605 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -21,7 +21,9 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Map import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -72,6 +74,8 @@ class StreamExecution( /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. + * Only the scheduler thread should modify this field, and only in atomic steps. Other threads + * must create a local copy before iterating over this data structure. */ @volatile private[sql] var committedOffsets = new StreamProgress @@ -79,6 +83,8 @@ class StreamExecution( /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. + * Only the scheduler thread should modify this field, and only in atomic steps. Other threads + * must create a local copy before iterating over this data structure. */ @volatile private var availableOffsets = new StreamProgress @@ -248,6 +254,21 @@ class StreamExecution( logDebug(s"Resuming with committed offsets: $committedOffsets") } + // Compare the offsets we just read from the checkpoint against the + // sources' own checkpoint data. + val offsetChanges = mutable.Map[Source, Offset]() + committedOffsets.map { + case (src, checkptOffset) => + val srcOffset = src.getMinOffset + if (srcOffset.isDefined && srcOffset.get > checkptOffset) { + logWarning(s"Source $src lost offsets between $checkptOffset " + + s"and $srcOffset when resuming. Skipping ahead to $srcOffset.") + offsetChanges += (src -> srcOffset.get) + } + } + committedOffsets ++= offsetChanges + + case None => // We are starting this stream for the first time. logInfo(s"Starting new streaming query.") currentBatchId = 0 @@ -277,7 +298,7 @@ class StreamExecution( val hasNewData = { awaitBatchLock.lock() try { - val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + val newData = uniqueSources.flatMap(s => s.getMaxOffset.map(o => s -> o)) availableOffsets ++= newData if (dataAvailable) { @@ -294,6 +315,12 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + + // Now that we've updated the scheduler's persistent checkpoint, it is safe for the + // sources to discard batches from the *previous* batch. + committedOffsets.foreach { + case (src, off) => src.commit(off) + } } else { awaitBatchLock.lock() try { @@ -374,6 +401,8 @@ class StreamExecution( logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") // Update committed offsets. committedOffsets ++= availableOffsets + + postEvent(new QueryProgress(this.toInfo)) } @@ -399,7 +428,7 @@ class StreamExecution( /** * Blocks the current thread until processing for data from the given `source` has reached at - * least the given `Offset`. This method is indented for use primarily when writing tests. + * least the given `Offset`. This method is intended for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { def notDone = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index e37f0c77795c3..856cbb53e1929 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.control.NonFatal import org.apache.spark.internal.Logging @@ -51,12 +51,19 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) protected val logicalPlan = StreamingExecutionRelation(this) protected val output = logicalPlan.output + /** + * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. + * Stored in a ListBuffer to facilitate removing committed batches. + */ @GuardedBy("this") - protected val batches = new ArrayBuffer[Dataset[A]] + protected val batches = new ListBuffer[Dataset[A]] @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) + @GuardedBy("this") + protected var lastCommittedOffset : LongOffset = new LongOffset(-1) + def schema: StructType = encoder.schema def toDS()(implicit sqlContext: SQLContext): Dataset[A] = { @@ -84,22 +91,33 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" - override def getOffset: Option[Offset] = synchronized { - if (batches.isEmpty) { + override def getMinOffset: Option[Offset] = synchronized { + if (lastCommittedOffset.offset == -1) { + None + } else { + Some(lastCommittedOffset) + } + } + + override def getMaxOffset: Option[Offset] = synchronized { + if (currentOffset.offset == -1) { None } else { Some(currentOffset) } } - /** - * Returns the data that is between the offsets (`start`, `end`]. - */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startOrdinal = start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 - val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) } + + // Internal buffer only holds the batches after lastCommittedOffset + val newBlocks = synchronized { + val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt + val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt + batches.slice(sliceStart, sliceEnd) + } logDebug( s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") @@ -111,6 +129,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } + override def commit(end: Offset): Unit = synchronized { + if (end.isInstanceOf[LongOffset]) { + val newOffset = end.asInstanceOf[LongOffset] + val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end") + } + + batches.trimStart(offsetDiff) + lastCommittedOffset = newOffset + } else { + sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + + s"an instance of this class") + } + } + override def stop() {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index fb15239f9af98..ef1f0d676db0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -23,6 +23,7 @@ import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Calendar import javax.annotation.concurrent.GuardedBy +import javax.xml.transform.stream.StreamSource import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} @@ -45,7 +46,7 @@ object TextSocketSource { * support for fault recovery and keeping all of the text read in memory forever. */ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends Source with Logging + extends MemoryStream[(String, Timestamp)](-1, sqlContext) with Logging { @GuardedBy("this") private var socket: Socket = null @@ -53,9 +54,6 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo @GuardedBy("this") private var readThread: Thread = null - @GuardedBy("this") - private var lines = new ArrayBuffer[(String, Timestamp)] - initialize() private def initialize(): Unit = synchronized { @@ -74,7 +72,7 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo return } TextSocketSource.this.synchronized { - lines += ((line, + addData((line, Timestamp.valueOf( TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) )) @@ -92,21 +90,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR - /** Returns the maximum available offset for this source. */ - override def getOffset: Option[Offset] = synchronized { - if (lines.isEmpty) None else Some(LongOffset(lines.size - 1)) - } /** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { - val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0) - val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1 - val data = synchronized { lines.slice(startIdx, endIdx) } - import sqlContext.implicits._ + val rawBatch = super.getBatch(start, end) + + // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp + // if requested. if (includeTimestamp) { - data.toDF("value", "timestamp") + rawBatch.toDF("value", "timestamp") } else { - data.map(_._1).toDF("value") + // Strip out timestamp + rawBatch.select("_1").toDF("value") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 1caafb9d74440..5ee3cbc341471 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -298,7 +298,9 @@ class FakeDefaultSource extends StreamSourceProvider { override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) - override def getOffset: Option[Offset] = { + override def getMinOffset: Option[Offset] = None + + override def getMaxOffset: Option[Offset] = { if (offset >= 10) { None } else { @@ -312,6 +314,8 @@ class FakeDefaultSource extends StreamSourceProvider { spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a") } + override def commit(end: Offset): Unit = {} + override def stop() {} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f0994395813e4..77c2749c8b713 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -77,7 +77,9 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { new Source { override def schema: StructType = fakeSchema - override def getOffset: Option[Offset] = Some(new LongOffset(0)) + override def getMinOffset: Option[Offset] = None + + override def getMaxOffset: Option[Offset] = Some(new LongOffset(0)) override def getBatch(start: Option[Offset], end: Offset): DataFrame = { import spark.implicits._ @@ -85,6 +87,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { Seq[Int]().toDS().toDF() } + override def commit(end: Offset): Unit = {} + override def stop() {} } } From cf426fab65c1652fea05ed3518c0663a5da6f8e4 Mon Sep 17 00:00:00 2001 From: frreiss Date: Mon, 15 Aug 2016 12:34:59 -0700 Subject: [PATCH 03/17] Added method to garbage-collect the metadata log. --- .../sql/execution/streaming/HDFSMetadataLog.scala | 11 +++++++++++ .../spark/sql/execution/streaming/MetadataLog.scala | 10 ++++++++++ .../sql/execution/streaming/StreamExecution.scala | 10 ++++++++-- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 698f07b0a187f..135e4924986f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -227,6 +227,17 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) None } + override def trim(batchId: Long) : Unit = { + val idsToDelete = + fileManager + .list(metadataPath, batchFilesFilter) + .map(f => pathToBatchId(f.getPath)) + .filter( _ <= batchId ) + + // TODO: Log failed deletions (fileManager.delete currently fails silently) + idsToDelete.foreach(id => fileManager.delete(batchIdToPath(id))) + } + private def createFileManager(): FileManager = { val hadoopConf = sparkSession.sessionState.newHadoopConf() try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index cc70e1d314d1d..e439c5018ec72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. + * - Inform the log that it is safe to garbage-collect metadata from a batch */ trait MetadataLog[T] { @@ -48,4 +49,13 @@ trait MetadataLog[T] { * Return the latest batch Id and its metadata if exist. */ def getLatest(): Option[(Long, T)] + + + /** + * Inform the log that it may discard metadata about all batches up to and including the + * batch with the indicate ID. + * + * @param batchId ID of the highest batch to discard + */ + def trim(batchId: Long) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 2314b1b7753fa..b1796bb434a8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -256,7 +256,7 @@ class StreamExecution( // Compare the offsets we just read from the checkpoint against the // sources' own checkpoint data. val offsetChanges = mutable.Map[Source, Offset]() - committedOffsets.map { + committedOffsets.foreach { case (src, checkptOffset) => val srcOffset = src.getMinOffset if (srcOffset.isDefined && srcOffset.get > checkptOffset) { @@ -316,10 +316,16 @@ class StreamExecution( logInfo(s"Committed offsets for batch $currentBatchId.") // Now that we've updated the scheduler's persistent checkpoint, it is safe for the - // sources to discard batches from the *previous* batch. + // sources to discard data from the *previous* batch. committedOffsets.foreach { case (src, off) => src.commit(off) } + + // The log can also discard old metadata. Trim one batch less than we could, just + // in case. + if (currentBatchId > 2) { + offsetLog.trim(currentBatchId - 2) + } } else { awaitBatchLock.lock() try { From f92a9a7ec2e6ee47e47698bec896504b771c27d5 Mon Sep 17 00:00:00 2001 From: frreiss Date: Tue, 16 Aug 2016 09:24:24 -0700 Subject: [PATCH 04/17] Fixing problems with building from Maven. --- .../streaming/FileStreamSource.scala | 16 +--- .../sql/execution/streaming/socket.scala | 78 +++++++++++++++++-- .../streaming/TextSocketStreamSuite.scala | 16 ++-- 3 files changed, 79 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index c9087ea09a5f4..c855f87132345 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -181,20 +181,10 @@ class FileStreamSource( minBatchId = lastCommittedBatch - val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt - - if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end") - } - - batches.trimStart(offsetDiff) - lastCommittedOffset = newOffset } else { sys.error(s"FileStreamSource.commit() received an offset ($end) that did not " + s"originate with an instance of this class") } - - } override def stop() {} @@ -252,7 +242,6 @@ class FileStreamSource( new LongOffset(maxBatchId) } - private def fetchAllFiles(): Seq[String] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) @@ -275,7 +264,7 @@ class FileStreamSource( try { str.toBoolean } catch { - case _ => throw new IllegalArgumentException( + case _ : Throwable => throw new IllegalArgumentException( s"Invalid value '$str' for option 'deleteCommittedFiles', must be true or false") } } @@ -290,7 +279,4 @@ class FileStreamSource( } } } - - - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index ef1f0d676db0d..846818e0b0347 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -23,16 +23,16 @@ import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Calendar import javax.annotation.concurrent.GuardedBy -import javax.xml.transform.stream.StreamSource -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + object TextSocketSource { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: @@ -40,13 +40,15 @@ object TextSocketSource { val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") } + + /** * A source that reads text lines through a TCP socket, designed only for tutorials and debugging. * This source will *not* work in production applications due to multiple reasons, including no * support for fault recovery and keeping all of the text read in memory forever. */ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext) - extends MemoryStream[(String, Timestamp)](-1, sqlContext) with Logging + extends Source with Logging { @GuardedBy("this") private var socket: Socket = null @@ -54,6 +56,19 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo @GuardedBy("this") private var readThread: Thread = null + /** + * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. + * Stored in a ListBuffer to facilitate removing committed batches. + */ + @GuardedBy("this") + protected val batches = new ListBuffer[(String, Timestamp)] + + @GuardedBy("this") + protected var currentOffset: LongOffset = new LongOffset(-1) + + @GuardedBy("this") + protected var lastCommittedOffset : LongOffset = new LongOffset(-1) + initialize() private def initialize(): Unit = synchronized { @@ -72,10 +87,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo return } TextSocketSource.this.synchronized { - addData((line, + val newData = (line, Timestamp.valueOf( TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) - )) + ) + currentOffset = currentOffset + 1 + batches.append(newData) } } } catch { @@ -90,10 +107,37 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR + override def getMinOffset: Option[Offset] = synchronized { + if (lastCommittedOffset.offset == -1) { + None + } else { + Some(lastCommittedOffset) + } + } + + override def getMaxOffset: Option[Offset] = synchronized { + if (currentOffset.offset == -1) { + None + } else { + Some(currentOffset) + } + } /** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { - val rawBatch = super.getBatch(start, end) + val startOrdinal = + start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 + val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + + // Internal buffer only holds the batches after lastCommittedOffset + val rawList = synchronized { + val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt + val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt + batches.slice(sliceStart, sliceEnd) + } + + import sqlContext.implicits._ + val rawBatch = sqlContext.createDataset(rawList) // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. @@ -105,6 +149,23 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo } } + override def commit(end: Offset): Unit = synchronized { + if (end.isInstanceOf[LongOffset]) { + val newOffset = end.asInstanceOf[LongOffset] + val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end") + } + + batches.trimStart(offsetDiff) + lastCommittedOffset = newOffset + } else { + sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + + s"originate with an instance of this class") + } + } + /** Stop this source. */ override def stop(): Unit = synchronized { if (socket != null) { @@ -136,7 +197,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis providerName: String, parameters: Map[String, String]): (String, StructType) = { logWarning("The socket source should not be used for production applications! " + - "It does not support recovery and stores state indefinitely.") + "It does not support recovery.") if (!parameters.contains("host")) { throw new AnalysisException("Set a host to read from with option(\"host\", ...).") } @@ -160,6 +221,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis parameters: Map[String, String]): Source = { val host = parameters("host") val port = parameters("port").toInt + new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala index 6b0ba7acb4804..639c63fbff8ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala @@ -62,18 +62,18 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before failAfter(streamingTimeout) { serverThread.enqueue("hello") - while (source.getOffset.isEmpty) { + while (source.getMaxOffset.isEmpty) { Thread.sleep(10) } - val offset1 = source.getOffset.get + val offset1 = source.getMaxOffset.get val batch1 = source.getBatch(None, offset1) assert(batch1.as[String].collect().toSeq === Seq("hello")) serverThread.enqueue("world") - while (source.getOffset.get === offset1) { + while (source.getMaxOffset.get === offset1) { Thread.sleep(10) } - val offset2 = source.getOffset.get + val offset2 = source.getMaxOffset.get val batch2 = source.getBatch(Some(offset1), offset2) assert(batch2.as[String].collect().toSeq === Seq("world")) @@ -101,20 +101,20 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before failAfter(streamingTimeout) { serverThread.enqueue("hello") - while (source.getOffset.isEmpty) { + while (source.getMaxOffset.isEmpty) { Thread.sleep(10) } - val offset1 = source.getOffset.get + val offset1 = source.getMaxOffset.get val batch1 = source.getBatch(None, offset1) val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq assert(batch1Seq.map(_._1) === Seq("hello")) val batch1Stamp = batch1Seq(0)._2 serverThread.enqueue("world") - while (source.getOffset.get === offset1) { + while (source.getMaxOffset.get === offset1) { Thread.sleep(10) } - val offset2 = source.getOffset.get + val offset2 = source.getMaxOffset.get val batch2 = source.getBatch(Some(offset1), offset2) val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq assert(batch2Seq.map(_._1) === Seq("world")) From 4cd181dbb4f16fbb2a08b7405d4f9d10274529fc Mon Sep 17 00:00:00 2001 From: frreiss Date: Fri, 19 Aug 2016 13:51:02 -0700 Subject: [PATCH 05/17] Various bug fixes. --- .../streaming/FileStreamSource.scala | 72 +++++++++---------- .../execution/streaming/StreamExecution.scala | 2 +- .../sql/execution/streaming/memory.scala | 11 ++- .../sql/streaming/FileStreamSourceSuite.scala | 49 ++++++++++++- 4 files changed, 92 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index c855f87132345..9a22ce00258a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.collection.OpenHashSet *
  • deleteCommittedFiles: If true, the source will delete old files and * clean up associated internal metadata when Spark has completed processing * the data in those files. - * Default: False. + * Default: true. * */ class FileStreamSource( @@ -85,10 +85,11 @@ class FileStreamSource( private def initialize() = { maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) + minBatchId = maxBatchId metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) => files.foreach(seenFiles.add) - minBatchId = math.max(minBatchId, batchId) + minBatchId = math.min(minBatchId, batchId) } } @@ -127,6 +128,7 @@ class FileStreamSource( override def getMaxOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) val endId = end.asInstanceOf[LongOffset].offset @@ -135,6 +137,7 @@ class FileStreamSource( val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2) logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId") logTrace(s"Files are:\n\t" + files.mkString("\n\t")) + val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path") val newDataSource = DataSource( @@ -154,10 +157,10 @@ class FileStreamSource( if (end.isInstanceOf[LongOffset]) { val lastCommittedBatch = end.asInstanceOf[LongOffset].offset - // Build up a list of batches, then process them one by one - val firstCommittedBatch = math.max(minBatchId, 0) - if (deleteCommittedFiles) { + // Build up a list of batches, then process them one by one + val firstCommittedBatch = math.max(minBatchId, 0) + var batchId = 0L; for (batchId <- firstCommittedBatch to lastCommittedBatch) { val files = metadataLog.get(batchId) @@ -169,14 +172,13 @@ class FileStreamSource( // Examples of problems to catch: Spark does not have permission to delete // the file; or the filesystem metadata is corrupt. files.get.foreach(f => fs.delete(new Path(f), true)) - - // TODO: Add a "remove" method to HDFSMetadataLog, then add code here to - // remove the metadata for each completed batch. It's important that we - // remove the metadata only after the files are deleted; otherwise we - // may not be able to tell what files to delete after a crash. } } + + // Clean up metadata for the files we've removed. It's important to do this + // AFTER deleting the files. + metadataLog.trim(minBatchId) } minBatchId = lastCommittedBatch @@ -187,7 +189,9 @@ class FileStreamSource( } } - override def stop() {} + override def stop(): Unit = { + logTrace(s"Stopping $this") + } // END methods from Source trait //////////////////////////////// @@ -197,7 +201,13 @@ class FileStreamSource( /** * Scans the directory and creates batches from any new files found. * - * Returns the maximum offset that can be retrieved from the source. + * Updates and returns the maximum offset that can be retrieved from the source. + * If `maxFilesPerBatch` is set, will consume the first (`maxFilesPerBatch`) files + * in alphabetical order. + * + * Note that this implementation relies on the fact that StreamExecution calls + * getMaxOffset() exactly once per clock tick. Otherwise the logic for + * `maxFilesPerBatch` will be incorrect. * * `synchronized` on this method is for solving race conditions in tests. In the normal usage, * there is no race here, so the cost of `synchronized` should be rare. @@ -207,36 +217,24 @@ class FileStreamSource( logTrace(s"Number of new files = ${newFiles.size})") // To make the source's behavior less nondeterministic, we process files in - // alphabetical order and ensure that every new file goes into a batch. - val remainingFiles = mutable.Queue[String]() - remainingFiles ++= newFiles.sorted - - val batches = ListBuffer[Seq[String]]() - if (maxFilesPerBatch.nonEmpty) { - while (remainingFiles.size > 0) { - batches += remainingFiles.take(maxFilesPerBatch.get) + // alphabetical order. + val batchFiles = + if (maxFilesPerBatch.nonEmpty) { + newFiles.sorted.take(maxFilesPerBatch.get) + } else { + newFiles } - } else { - batches += remainingFiles - } - newFiles.foreach { file => + // Pretend we didn't see files that aren't in the batch. + batchFiles.foreach { file => seenFiles.add(file) logDebug(s"New file: $file") } - batches.foreach { - case batchFiles => - maxBatchId += 1 - metadataLog.add(maxBatchId, batchFiles) - logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") - } - - val batchFiles = - if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles - batchFiles.foreach { file => - seenFiles.add(file) - logDebug(s"New file: $file") + if (batchFiles.size > 0) { + maxBatchId += 1 + metadataLog.add(maxBatchId, batchFiles) + logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } new LongOffset(maxBatchId) @@ -260,7 +258,7 @@ class FileStreamSource( } private def getDeleteCommittedFiles(): Boolean = { - val str = options.getOrElse("deleteCommittedFiles", "false") + val str = options.getOrElse("deleteCommittedFiles", "true") try { str.toBoolean } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b1796bb434a8f..77783857b2e05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -261,7 +261,7 @@ class StreamExecution( val srcOffset = src.getMinOffset if (srcOffset.isDefined && srcOffset.get > checkptOffset) { logWarning(s"Source $src lost offsets between $checkptOffset " + - s"and $srcOffset when resuming. Skipping ahead to $srcOffset.") + s"and ${srcOffset.get} when resuming. Skipping ahead to ${srcOffset.get}.") offsetChanges += (src -> srcOffset.get) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 856cbb53e1929..168dc2fb95f82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -61,6 +61,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) + /** + * Last offset that was discarded, or -1 if no commits have occurred. Note that the value + * -1 is used in calculations below and isn't just an arbitrary constant. + */ @GuardedBy("this") protected var lastCommittedOffset : LongOffset = new LongOffset(-1) @@ -108,14 +112,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) val startOrdinal = start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 - // Internal buffer only holds the batches after lastCommittedOffset + // Internal buffer only holds the batches after lastCommittedOffset. val newBlocks = synchronized { - val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt - val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt + val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt - 1 + val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt - 1 batches.slice(sliceStart, sliceEnd) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 47260a23c7ee3..f44760598212d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -591,7 +591,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { /** Create a text file with a single data item */ def createFile(data: Int): File = { - val file = stringToFile(new File(src, s"$data.txt"), data.toString) + // Use 2 character file names padded with zeros so that alphabetical and + // numeric order are the same for the generated file names. + val file = stringToFile(new File(src, f"$data%02d.txt"), data.toString) + + // File modification times aren't currently used to decide what goes into + // the next batch, but they may be used in the future. if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000) lastFileModTime = Some(file.lastModified) file @@ -727,6 +732,48 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + + test("deleteCommittedFiles option") { + withTempDirs { case (src, tmp) => + val textStream = spark.readStream.format("text") + .option("deleteCommittedFiles", "false") + .load(src.getCanonicalPath) + + testStream(textStream)( + AddTextFileData("kneel", src, tmp), + CheckAnswer("kneel"), + StopStream, + AddTextFileData("before", src, tmp), + StartStream(), + CheckAnswer("kneel", "before"), + AddTextFileData("zod", src, tmp), + CheckAnswer("kneel", "before", "zod") + ) + + assert(src.listFiles().size === 3) + } + + withTempDirs { case (src, tmp) => + val textStream = spark.readStream.format("text") + .option("deleteCommittedFiles", "true") + .load(src.getCanonicalPath) + + testStream(textStream)( + AddTextFileData("it's only", src, tmp), + CheckAnswer("it's only"), + StopStream, + AddTextFileData("a", src, tmp), + StartStream(), + CheckAnswer("it's only", "a"), + AddTextFileData("model", src, tmp), + CheckAnswer("it's only", "a", "model") + ) + + assert(src.listFiles().size === 1) + } + } + } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 43ffbf3286054b01ea323552a678e2d0effb812d Mon Sep 17 00:00:00 2001 From: frreiss Date: Mon, 29 Aug 2016 09:38:58 -0700 Subject: [PATCH 06/17] Removed a few blank lines. --- .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 1 - .../apache/spark/sql/execution/streaming/StreamExecution.scala | 2 -- 2 files changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index be904aef8394b..127ece9ab0e56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -227,7 +227,6 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) None } - /** * Removes all the log entry earlier than thresholdBatchId (exclusive). */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5d4c6828513e9..b879038fd95c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -402,8 +402,6 @@ class StreamExecution( logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") // Update committed offsets. committedOffsets ++= availableOffsets - - postEvent(new QueryProgress(this.toInfo)) } From f5c15f82f9ba53b48f4e22d4a80908f7e2906e36 Mon Sep 17 00:00:00 2001 From: frreiss Date: Mon, 29 Aug 2016 09:43:20 -0700 Subject: [PATCH 07/17] Additional whitespace cleanup. --- .../sql/execution/streaming/FileStreamSource.scala | 13 ++++++------- .../spark/sql/execution/streaming/socket.scala | 1 - .../spark/sql/streaming/FileStreamSourceSuite.scala | 1 - 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 5d2de8a61b678..a67bc7e062816 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -33,12 +33,12 @@ import org.apache.spark.sql.types.StructType * TODO: Clean up the metadata log files periodically. */ class FileStreamSource( - sparkSession: SparkSession, - path: String, - fileFormatClassName: String, - override val schema: StructType, - metadataPath: String, - options: Map[String, String]) extends Source with Logging { + sparkSession: SparkSession, + path: String, + fileFormatClassName: String, + override val schema: StructType, + metadataPath: String, + options: Map[String, String]) extends Source with Logging { import FileStreamSource._ @@ -165,7 +165,6 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" - /** * Informs the source that Spark has completed processing all data for offsets less than or * equal to `end` and will only request offsets greater than `end` in the future. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 69b97aa9e07c5..9a74496720b17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -219,7 +219,6 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis parameters: Map[String, String]): Source = { val host = parameters("host") val port = parameters("port").toInt - new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 89858181fa550..91caa9e55cb85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -766,7 +766,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } - } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { From 7c6a30d2da8c31dcd5db8a4337913d5805264306 Mon Sep 17 00:00:00 2001 From: frreiss Date: Wed, 31 Aug 2016 11:21:29 -0700 Subject: [PATCH 08/17] Narrowing the size of the diff by moving some changes out to future work. --- .../apache/spark/sql/execution/streaming/MetadataLog.scala | 1 - .../org/apache/spark/sql/execution/streaming/Source.scala | 4 ++-- .../spark/sql/execution/streaming/StreamExecution.scala | 5 ----- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 7 +------ .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 4 ---- .../sql/streaming/test/DataStreamReaderWriterSuite.scala | 4 ---- 6 files changed, 3 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 692d03cf18c15..78d6be17df05a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,7 +24,6 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. - * - Inform the log that it is safe to garbage-collect metadata from a batch */ trait MetadataLog[T] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index cb6479160688a..2c4ff1ebb4a72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType * monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark * will regularly query each [[Source]] to see if any more data is available. */ -trait Source { +trait Source { /** Returns the schema of the data from this source */ def schema: StructType @@ -59,7 +59,7 @@ trait Source { * Informs the source that Spark has completed processing all data for offsets less than or * equal to `end` and will only request offsets greater than `end` in the future. */ - def commit(end: Offset) + def commit(end: Offset) : Unit = {} /** Stop this source and free any resources it has allocated. */ def stop(): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index b879038fd95c9..ee49a2ba9b6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -317,11 +317,6 @@ class StreamExecution( case (src, off) => src.commit(off) } - // The log can also discard old metadata. Trim one batch less than we could, just - // in case. - if (currentBatchId > 2) { - offsetLog.purge(currentBatchId - 2) - } } else { awaitBatchLock.lock() try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 91caa9e55cb85..03222b4a49c6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -625,12 +625,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { /** Create a text file with a single data item */ def createFile(data: Int): File = { - // Use 2 character file names padded with zeros so that alphabetical and - // numeric order are the same for the generated file names. - val file = stringToFile(new File(src, f"$data%02d.txt"), data.toString) - - // File modification times aren't currently used to decide what goes into - // the next batch, but they may be used in the future. + val file = stringToFile(new File(src, s"$data.txt"), data.toString) if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000) lastFileModTime = Some(file.lastModified) file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 7566508c4b0f8..1caafb9d74440 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -298,8 +298,6 @@ class FakeDefaultSource extends StreamSourceProvider { override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) - override def lastCommittedOffset: Option[Offset] = None - override def getOffset: Option[Offset] = { if (offset >= 10) { None @@ -314,8 +312,6 @@ class FakeDefaultSource extends StreamSourceProvider { spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a") } - override def commit(end: Offset): Unit = {} - override def stop() {} } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 6ad7c49aa703d..f0994395813e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -77,8 +77,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { new Source { override def schema: StructType = fakeSchema - override def lastCommittedOffset: Option[Offset] = None - override def getOffset: Option[Offset] = Some(new LongOffset(0)) override def getBatch(start: Option[Offset], end: Offset): DataFrame = { @@ -87,8 +85,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { Seq[Int]().toDS().toDF() } - override def commit(end: Offset): Unit = {} - override def stop() {} } } From 5e340c2922b367629d131c5645b33cbfe30be2a1 Mon Sep 17 00:00:00 2001 From: frreiss Date: Thu, 8 Sep 2016 08:11:47 -0700 Subject: [PATCH 09/17] Fixed a regression introduced in an earlier merge. --- .../org/apache/spark/sql/execution/streaming/socket.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 9a74496720b17..49bac42bae342 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -127,16 +127,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 - // Internal buffer only holds the batches after lastCommittedOffset + // Internal buffer only holds the batches after lastOffsetCommitted val rawList = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt + val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 + val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 batches.slice(sliceStart, sliceEnd) } import sqlContext.implicits._ val rawBatch = sqlContext.createDataset(rawList) + + // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { From 6334a4be720f02932aca0a5a1e7cbc4bb2169850 Mon Sep 17 00:00:00 2001 From: frreiss Date: Wed, 28 Sep 2016 15:20:21 -0700 Subject: [PATCH 10/17] Fixed compilation problem from merging someone else's PR. --- .../spark/sql/execution/streaming/StreamExecution.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e48a5d0f8e327..7374cfc3e1c67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -256,7 +256,7 @@ class StreamExecution( committedOffsets.foreach { case (src, checkptOffset) => val srcOffset = src.lastCommittedOffset - if (srcOffset.isDefined && srcOffset.get > checkptOffset) { + if (srcOffset.isDefined && srcOffset.get != checkptOffset) { logWarning(s"Source $src lost offsets between $checkptOffset " + s"and ${srcOffset.get} when resuming. Skipping ahead to ${srcOffset.get}.") offsetChanges += (src -> srcOffset.get) @@ -270,7 +270,7 @@ class StreamExecution( constructNextBatch() } } - + /** * Returns true if there is any new data available to be processed. */ From aaf03070305d84c9f58158d64a156dd3dfbb5716 Mon Sep 17 00:00:00 2001 From: frreiss Date: Thu, 29 Sep 2016 16:32:29 -0700 Subject: [PATCH 11/17] Removed a safety check that was invalidated by SPARK-17643 and fixed an off-by-one problem that the check was masking. --- .../execution/streaming/StreamExecution.scala | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 7374cfc3e1c67..9620b25f56607 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -250,27 +250,13 @@ class StreamExecution( logDebug(s"Resuming with committed offsets: $committedOffsets") } - // Compare the offsets we just read from the checkpoint against the - // sources' own checkpoint data. - val offsetChanges = mutable.Map[Source, Offset]() - committedOffsets.foreach { - case (src, checkptOffset) => - val srcOffset = src.lastCommittedOffset - if (srcOffset.isDefined && srcOffset.get != checkptOffset) { - logWarning(s"Source $src lost offsets between $checkptOffset " + - s"and ${srcOffset.get} when resuming. Skipping ahead to ${srcOffset.get}.") - offsetChanges += (src -> srcOffset.get) - } - } - committedOffsets ++= offsetChanges - case None => // We are starting this stream for the first time. logInfo(s"Starting new streaming query.") currentBatchId = 0 constructNextBatch() } } - + /** * Returns true if there is any new data available to be processed. */ @@ -312,17 +298,20 @@ class StreamExecution( logInfo(s"Committed offsets for batch $currentBatchId.") // Now that we've updated the scheduler's persistent checkpoint, it is safe for the - // sources to discard data from the *previous* batch. - committedOffsets.foreach { - case (src, off) => src.commit(off) + // sources to discard data from before the *previous* batch. + val prevBatchOff = offsetLog.get(currentBatchId - 2) + if (prevBatchOff.isDefined) { + prevBatchOff.get.toStreamProgress(sources).foreach { + case (src, off) => src.commit(off) + } } // Now that we have logged the new batch, no further processing will happen for - // the previous batch, and it is safe to discard the old metadata. + // the batch before the previous batch, and it is safe to discard the old metadata. // Note that purge is exclusive, i.e. it purges everything before currentBatchId. // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in // flight at the same time), this cleanup logic will need to change. - offsetLog.purge(currentBatchId) + offsetLog.purge(currentBatchId - 1) } else { awaitBatchLock.lock() try { From 947b510ab007c2d5acdefa40163c58b7f3462f49 Mon Sep 17 00:00:00 2001 From: frreiss Date: Thu, 29 Sep 2016 19:51:11 -0700 Subject: [PATCH 12/17] Updating regression tests after merge. --- .../spark/sql/execution/streaming/StreamExecution.scala | 4 +++- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9620b25f56607..717c0f48b17d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -299,6 +299,8 @@ class StreamExecution( // Now that we've updated the scheduler's persistent checkpoint, it is safe for the // sources to discard data from before the *previous* batch. + // The scheduler might still request the previous batch from a source in some cases + // if a crash and recovery occured. val prevBatchOff = offsetLog.get(currentBatchId - 2) if (prevBatchOff.isDefined) { prevBatchOff.get.toStreamProgress(sources).foreach { @@ -308,7 +310,7 @@ class StreamExecution( // Now that we have logged the new batch, no further processing will happen for // the batch before the previous batch, and it is safe to discard the old metadata. - // Note that purge is exclusive, i.e. it purges everything before currentBatchId. + // Note that purge is exclusive, i.e. it purges everything before the target ID. // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in // flight at the same time), this cleanup logic will need to change. offsetLog.purge(currentBatchId - 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 88f1f188ab2af..9fd63dc0dca08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -129,8 +129,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) - // Run 3 batches, and then assert that only 1 metadata file is left at the end - // since the first 2 should have been purged. + // Run 3 batches, and then assert that only 2 metadata files is are at the end + // since the first should have been purged. testStream(mapped)( AddData(inputData, 1, 2), CheckAnswer(6, 3), @@ -139,11 +139,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { AddData(inputData, 4, 6), CheckAnswer(6, 3, 6, 3, 1, 1), - AssertOnQuery("metadata log should contain only one file") { q => + AssertOnQuery("metadata log should contain only two files") { q => val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 - assert(toTest.size == 1 && toTest.head == "2") + assert(toTest.size == 2 && toTest.head == "1") true } ) From ec674294a83583f1099d9ecd166847981cb5b550 Mon Sep 17 00:00:00 2001 From: frreiss Date: Fri, 14 Oct 2016 20:13:33 -0700 Subject: [PATCH 13/17] Changes to address review comments. --- .../spark/sql/execution/streaming/Source.scala | 9 +-------- .../execution/streaming/StreamExecution.scala | 16 ++++++++-------- .../spark/sql/execution/streaming/socket.scala | 2 -- 3 files changed, 9 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 2c4ff1ebb4a72..9c0ede65ec3b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -30,13 +30,6 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType - /** - * Returns the highest offset that this source has removed from its internal buffer - * in response to a call to `commit`. - * Returns `None` if this source has not removed any data. - */ - def lastCommittedOffset: Option[Offset] = (None) - /** * Returns the maximum available offset for this source. * Returns `None` if this source has never received any data. @@ -48,7 +41,7 @@ trait Source { * then the batch should begin with the first record. This method must always return the * same data for a particular `start` and `end` pair; even after the Source has been restarted * on a different node. - *

    + * * Higher layers will always call this method with a value of `start` greater than or equal * to the last value passed to `commit` and a value of `end` less than or equal to the * last value returned by `getMaxOffset` diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 00a46894423aa..a61ba51f83883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -21,9 +21,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.Map import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -75,8 +73,9 @@ class StreamExecution( /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. - * Only the scheduler thread should modify this field, and only in atomic steps. Other threads - * must create a local copy before iterating over this data structure. + * Only the scheduler thread should modify this field, and only in atomic steps. + * Other threads should make a shallow copy if they are going to access this field more than + * once, since the field's value may change at any time. */ @volatile var committedOffsets = new StreamProgress @@ -84,8 +83,9 @@ class StreamExecution( /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. - * Only the scheduler thread should modify this field, and only in atomic steps. Other threads - * must create a local copy before iterating over this data structure. + * Only the scheduler thread should modify this field, and only in atomic steps. + * Other threads should make a shallow copy if they are going to access this field more than + * once, since the field's value may change at any time. */ @volatile private var availableOffsets = new StreamProgress @@ -345,7 +345,7 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") - + // Now that we've updated the scheduler's persistent checkpoint, it is safe for the // sources to discard data from *before* the previous batch. // The scheduler might still request the previous batch from a source in some cases @@ -356,7 +356,7 @@ class StreamExecution( case (src, off) => src.commit(off) } } - + // Now that we have logged the new batch, no further processing will happen for // the batch before the previous batch, and it is safe to discard the old metadata. // Note that purge is exclusive, i.e. it purges everything before the target ID. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 49bac42bae342..2e8f90b7ad3da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -137,8 +137,6 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo import sqlContext.implicits._ val rawBatch = sqlContext.createDataset(rawList) - - // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp // if requested. if (includeTimestamp) { From e7ef7ab658c9c891399d1246e302db82318c99fa Mon Sep 17 00:00:00 2001 From: frreiss Date: Fri, 14 Oct 2016 20:21:15 -0700 Subject: [PATCH 14/17] Fix compilation problems. --- .../org/apache/spark/sql/execution/streaming/memory.scala | 8 -------- .../org/apache/spark/sql/execution/streaming/socket.scala | 8 -------- 2 files changed, 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 03c8f4e198df0..ddcbc12825084 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -95,14 +95,6 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" - override def lastCommittedOffset: Option[Offset] = synchronized { - if (lastOffsetCommitted.offset == -1) { - None - } else { - Some(lastOffsetCommitted) - } - } - override def getOffset: Option[Offset] = synchronized { if (currentOffset.offset == -1) { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 2e8f90b7ad3da..c662e7c6bc775 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -105,14 +105,6 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR - override def lastCommittedOffset: Option[Offset] = synchronized { - if (lastOffsetCommitted.offset == -1) { - None - } else { - Some(lastOffsetCommitted) - } - } - override def getOffset: Option[Offset] = synchronized { if (currentOffset.offset == -1) { None From c726549079a0e69a0deebd6b4027fe7596a00ad2 Mon Sep 17 00:00:00 2001 From: frreiss Date: Fri, 21 Oct 2016 13:44:41 -0700 Subject: [PATCH 15/17] Changes to address review comments. --- .../spark/sql/execution/streaming/Source.scala | 2 +- .../sql/execution/streaming/StreamExecution.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 9c0ede65ec3b7..f3bd5bfe23fdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -44,7 +44,7 @@ trait Source { * * Higher layers will always call this method with a value of `start` greater than or equal * to the last value passed to `commit` and a value of `end` less than or equal to the - * last value returned by `getMaxOffset` + * last value returned by `getOffset` */ def getBatch(start: Option[Offset], end: Offset): DataFrame diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bdad1c5f9d466..37af1a550aaf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -347,11 +347,13 @@ class StreamExecution( s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + // NOTE: The following code is correct because runBatches() processes exactly one + // batch at a time. If we add pipeline parallelism (multiple batches in flight at + // the same time), this cleanup logic will need to change. + // Now that we've updated the scheduler's persistent checkpoint, it is safe for the - // sources to discard data from *before* the previous batch. - // The scheduler might still request the previous batch from a source in some cases - // if a crash and recovery occured. - val prevBatchOff = offsetLog.get(currentBatchId - 2) + // sources to discard data from the previous batch. + val prevBatchOff = offsetLog.get(currentBatchId - 1) if (prevBatchOff.isDefined) { prevBatchOff.get.toStreamProgress(sources).foreach { case (src, off) => src.commit(off) @@ -361,8 +363,6 @@ class StreamExecution( // Now that we have logged the new batch, no further processing will happen for // the batch before the previous batch, and it is safe to discard the old metadata. // Note that purge is exclusive, i.e. it purges everything before the target ID. - // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in - // flight at the same time), this cleanup logic will need to change. offsetLog.purge(currentBatchId - 1) } } else { From 46f6411390864931c7c2b12adc06ef67d1e797e6 Mon Sep 17 00:00:00 2001 From: frreiss Date: Wed, 26 Oct 2016 14:44:02 -0700 Subject: [PATCH 16/17] Commit before merge. --- .../scala/org/apache/spark/sql/execution/streaming/memory.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index ddcbc12825084..08786e7f26f26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -148,6 +148,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def reset(): Unit = synchronized { batches.clear() currentOffset = new LongOffset(-1) + lastOffsetCommitted = new LongOffset(-1) } } From 0a56e4a25b097dba6f44ad544f83171fbfc3b52a Mon Sep 17 00:00:00 2001 From: frreiss Date: Wed, 26 Oct 2016 15:02:10 -0700 Subject: [PATCH 17/17] Addressing review comments. --- .../sql/execution/streaming/memory.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 08786e7f26f26..48d9791faf1e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -127,19 +127,19 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } override def commit(end: Offset): Unit = synchronized { - if (end.isInstanceOf[LongOffset]) { - val newOffset = end.asInstanceOf[LongOffset] - val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt - - if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") - } - - batches.trimStart(offsetDiff) - lastOffsetCommitted = newOffset - } else { - sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + - s"an instance of this class") + end match { + case newOffset: LongOffset => + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + } + + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset + case _ => + sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + + "an instance of this class") } }