-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16963] [STREAMING] [SQL] Changes to Source trait and related implementation classes #14553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 28 commits
6c9acde
dae72ff
f78b4d5
cf426fa
c028432
f92a9a7
4cd181d
fcc90bd
35cdae9
9096c56
ecaf732
5638281
43ffbf3
f5c15f8
a79c557
7c6a30d
5e340c2
128f7fe
6334a4b
09e4b8e
aaf0307
947b510
ed887ca
ec67429
e7ef7ab
7d98c6b
c726549
47eee52
46f6411
d9eaf5a
0a56e4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming | |
| import java.util.concurrent.atomic.AtomicInteger | ||
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.{ArrayBuffer, ListBuffer} | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
| protected val logicalPlan = StreamingExecutionRelation(this) | ||
| protected val output = logicalPlan.output | ||
|
|
||
| /** | ||
| * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. | ||
| * Stored in a ListBuffer to facilitate removing committed batches. | ||
| */ | ||
| @GuardedBy("this") | ||
| protected val batches = new ArrayBuffer[Dataset[A]] | ||
| protected val batches = new ListBuffer[Dataset[A]] | ||
|
|
||
| @GuardedBy("this") | ||
| protected var currentOffset: LongOffset = new LongOffset(-1) | ||
|
|
||
| /** | ||
| * Last offset that was discarded, or -1 if no commits have occurred. Note that the value | ||
| * -1 is used in calculations below and isn't just an arbitrary constant. | ||
| */ | ||
| @GuardedBy("this") | ||
| protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) | ||
|
|
||
| def schema: StructType = encoder.schema | ||
|
|
||
| def toDS()(implicit sqlContext: SQLContext): Dataset[A] = { | ||
|
|
@@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
| override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" | ||
|
|
||
| override def getOffset: Option[Offset] = synchronized { | ||
| if (batches.isEmpty) { | ||
| if (currentOffset.offset == -1) { | ||
| None | ||
| } else { | ||
| Some(currentOffset) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the data that is between the offsets (`start`, `end`]. | ||
| */ | ||
| override def getBatch(start: Option[Offset], end: Offset): DataFrame = { | ||
| // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) | ||
| val startOrdinal = | ||
| start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 | ||
| val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 | ||
| val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) } | ||
|
|
||
| // Internal buffer only holds the batches after lastCommittedOffset. | ||
| val newBlocks = synchronized { | ||
| val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 | ||
| val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 | ||
| batches.slice(sliceStart, sliceEnd) | ||
| } | ||
|
|
||
| logDebug( | ||
| s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") | ||
|
|
@@ -111,6 +126,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) | |
| } | ||
| } | ||
|
|
||
| override def commit(end: Offset): Unit = synchronized { | ||
| if (end.isInstanceOf[LongOffset]) { | ||
|
||
| val newOffset = end.asInstanceOf[LongOffset] | ||
| val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt | ||
|
|
||
| if (offsetDiff < 0) { | ||
| sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") | ||
| } | ||
|
|
||
| batches.trimStart(offsetDiff) | ||
| lastOffsetCommitted = newOffset | ||
| } else { | ||
| sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + | ||
| s"an instance of this class") | ||
|
||
| } | ||
| } | ||
|
|
||
| override def stop() {} | ||
|
|
||
| def reset(): Unit = synchronized { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be
offsetLog.purge(currentBatchId), it's exclusive, then you can revert changes to StreamingQuerySuite.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move this change to another JIRA if you'd like, but we really should change
currentBatchIdtocurrentBatchId - 1at some point. The call tooffsetLog.purge(currentBatchId), which I introduced in my PR for SPARK-17513, contains a subtle bug. The recovery logic inpopulateStartOffsets()reads the last and second-to-last entries inoffsetLog.populateStartOffsets()uses those entries to populateavailableOffsetsandcommittedOffsets, respectively. CallingoffsetLog.purge(currentBatchId)at line 350/366 results in theoffsetLogbeing truncated to one entry, which in turn results incommittedOffsetsbeing left empty on recovery, which in turn causes the first call togetBatch()for any source to haveNoneas its first argument. Sources that do not prune buffered data in theircommit()methods will return a previously committed data in response to such agetBatch()call.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for your clarifying.