Skip to content

Commit 370dabe

Browse files
committed
Fix ContinuousMemoryStream
1 parent 77fb2be commit 370dabe

3 files changed

Lines changed: 5 additions & 3 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousRecordEndpoint.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.sql.execution.streaming
1818

19+
import scala.collection.mutable
20+
1921
import org.apache.spark.SparkEnv
2022
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
2123
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -33,7 +35,7 @@ case class GetRecord(offset: ContinuousRecordPartitionOffset)
3335
* to the number of partitions.
3436
* @param lock a lock object for locking the buckets for read
3537
*/
36-
class ContinuousRecordEndpoint(buckets: Seq[Seq[UnsafeRow]], lock: Object)
38+
class ContinuousRecordEndpoint(buckets: Seq[mutable.Seq[UnsafeRow]], lock: Object)
3739
extends ThreadSafeRpcEndpoint {
3840

3941
private var startOffsets: Seq[Int] = List.fill(buckets.size)(0)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class TextSocketContinuousStream(
7474
// Exposed for tests.
7575
private[spark] var startOffset: TextSocketOffset = _
7676

77-
private val recordEndpoint = new ContinuousRecordEndpoint(buckets.map(_.toSeq), this)
77+
private val recordEndpoint = new ContinuousRecordEndpoint(buckets, this)
7878
@volatile private var endpointRef: RpcEndpointRef = _
7979

8080
initialize()

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
5353
@GuardedBy("this")
5454
private val records = Seq.fill(numPartitions)(new ListBuffer[UnsafeRow])
5555

56-
private val recordEndpoint = new ContinuousRecordEndpoint(records.map(_.toSeq), this)
56+
private val recordEndpoint = new ContinuousRecordEndpoint(records, this)
5757
@volatile private var endpointRef: RpcEndpointRef = _
5858

5959
def addData(data: TraversableOnce[A]): Offset = synchronized {

0 commit comments

Comments
 (0)