diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 0763aa4ed99da..aaa9f15ae9fbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1685,6 +1685,13 @@ class DataFrame private[sql]( @Experimental def write: DataFrameWriter = new DataFrameWriter(this) + /** + * :: Experimental :: + * Interface for starting a streaming query that will continually save data to the specified + * external sink as new data arrives. + */ + def streamTo: DataStreamWriter = new DataStreamWriter(this) + /** * Returns the content of the [[DataFrame]] as a RDD of JSON strings. * @group rdd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala new file mode 100644 index 0000000000000..bc29105e92332 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala @@ -0,0 +1,124 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.apache.spark.sql.execution.streaming.StreamingRelation + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.util.StringUtils + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} +import org.apache.spark.sql.types.StructType + +/** + * :: Experimental :: + * An interface to reading streaming data. Use `sqlContext.streamFrom` to access these methods. + */ +@Experimental +class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging { + + /** + * Specifies the input data source format. + * + * @since 2.0.0 + */ + def format(source: String): DataStreamReader = { + this.source = source + this + } + + /** + * Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema + * automatically from data. By specifying the schema here, the underlying data stream can + * skip the schema inference step, and thus speed up data reading. + * + * @since 2.0.0 + */ + def schema(schema: StructType): DataStreamReader = { + this.userSpecifiedSchema = Option(schema) + this + } + + /** + * Adds an input option for the underlying data stream. + * + * @since 2.0.0 + */ + def option(key: String, value: String): DataStreamReader = { + this.extraOptions += (key -> value) + this + } + + /** + * (Scala-specific) Adds input options for the underlying data stream. + * + * @since 2.0.0 + */ + def options(options: scala.collection.Map[String, String]): DataStreamReader = { + this.extraOptions ++= options + this + } + + /** + * Adds input options for the underlying data stream. + * + * @since 2.0.0 + */ + def options(options: java.util.Map[String, String]): DataStreamReader = { + this.options(options.asScala) + this + } + + /** + * Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g. + * external key-value stores). + * + * @since 2.0.0 + */ + def open(): DataFrame = { + val resolved = ResolvedDataSource.createSource( + sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + providerName = source, + options = extraOptions.toMap) + DataFrame(sqlContext, StreamingRelation(resolved)) + } + + /** + * Loads input in as a [[DataFrame]], for data streams that read from some path. + * + * @since 2.0.0 + */ + def open(path: String): DataFrame = { + option("path", path).open() + } + + /////////////////////////////////////////////////////////////////////////////////////// + // Builder pattern config options + /////////////////////////////////////////////////////////////////////////////////////// + + private var source: String = sqlContext.conf.defaultDataSourceName + + private var userSpecifiedSchema: Option[StructType] = None + + private var extraOptions = new scala.collection.mutable.HashMap[String, String] + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala new file mode 100644 index 0000000000000..db716f4c5d1f0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.Properties + +import org.apache.spark.sql.execution.streaming.{Offset, Sink, Batch, StreamExecution} + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.plans.logical.{Project, InsertIntoTable} +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource} +import org.apache.spark.sql.sources.HadoopFsRelation + + +/** + * :: Experimental :: + * Interface used to start a streaming query query execution. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter private[sql](df: DataFrame) { + + /** + * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. + * + * @since 2.0.0 + */ + def format(source: String): DataStreamWriter = { + this.source = source + this + } + + /** + * Adds an output option for the underlying data source. + * + * @since 2.0.0 + */ + def option(key: String, value: String): DataStreamWriter = { + this.extraOptions += (key -> value) + this + } + + /** + * (Scala-specific) Adds output options for the underlying data source. + * + * @since 2.0.0 + */ + def options(options: scala.collection.Map[String, String]): DataStreamWriter = { + this.extraOptions ++= options + this + } + + /** + * Adds output options for the underlying data source. + * + * @since 2.0.0 + */ + def options(options: java.util.Map[String, String]): DataStreamWriter = { + this.options(options.asScala) + this + } + + /** + * Partitions the output by the given columns on the file system. If specified, the output is + * laid out on the file system similar to Hive's partitioning scheme.\ + * @since 2.0.0 + */ + @scala.annotation.varargs + def partitionBy(colNames: String*): DataStreamWriter = { + this.partitioningColumns = Option(colNames) + this + } + + /** + * Starts the execution of the streaming query, which will continually output results to the given + * path as new data arrives. The returned [[StandingQuery]] object can be used to interact with + * the stream. + * @since 2.0.0 + */ + def start(path: String): StandingQuery = { + this.extraOptions += ("path" -> path) + start() + } + + /** + * Starts the execution of the streaming query, which will continually output results to the given + * path as new data arrives. The returned [[StandingQuery]] object can be used to interact with + * the stream. + * + * @since 2.0.0 + */ + def start(): StandingQuery = { + val sink = ResolvedDataSource.createSink( + df.sqlContext, + source, + extraOptions.toMap) + + new StreamExecution(df.sqlContext, df.logicalPlan, sink) + } + + private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols => + parCols.map { col => + df.logicalPlan.output + .map(_.name) + .find(df.sqlContext.analyzer.resolver(_, col)) + .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " + + s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})")) + } + } + + /////////////////////////////////////////////////////////////////////////////////////// + // Builder pattern config options + /////////////////////////////////////////////////////////////////////////////////////// + + private var source: String = df.sqlContext.conf.defaultDataSourceName + + private var extraOptions = new scala.collection.mutable.HashMap[String, String] + + private var partitioningColumns: Option[Seq[String]] = None + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 022303239f2af..2a91a102a39d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -622,6 +622,15 @@ class SQLContext private[sql]( @Experimental def read: DataFrameReader = new DataFrameReader(this) + + /** + * :: Experimental :: + * Returns a [[DataStreamReader]] than can be used to create queries that execute continuously + * as new data arrives. + */ + @Experimental + def streamFrom: DataStreamReader = new DataStreamReader(this) + /** * :: Experimental :: * Creates an external table from the given path and returns the corresponding DataFrame. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/StandingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/StandingQuery.scala new file mode 100644 index 0000000000000..972c80c9b5519 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/StandingQuery.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.DeveloperApi + +trait StandingQuery { + + /** + * Stops the execution of the streaming query if it is running. This method blocks until the + * thread performing execution has stopped. + */ + def stop(): Unit + + /** Clears the indicator that a batch has completed. Used for testing. */ + @DeveloperApi + def clearBatchMarker(): Unit + + /** + * Awaits the completion of at least one streaming batch. Must be called after `clearBatchMarker` + * to gurantee that a new batch has been processed. + */ + @DeveloperApi + def awaitBatchCompletion(): Unit +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index e02ee6cd6b907..2331c6ea175f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.execution.datasources import java.util.ServiceLoader +import com.sun.jersey.core.impl.provider.entity.DataSourceProvider +import org.apache.spark.sql.execution.streaming.{Sink, Source} + import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Success, Failure, Try} @@ -92,6 +95,36 @@ object ResolvedDataSource extends Logging { } } + def createSource( + sqlContext: SQLContext, + userSpecifiedSchema: Option[StructType], + providerName: String, + options: Map[String, String]): Source = { + val provider = lookupDataSource(providerName).newInstance() match { + case s: StreamSourceProvider => s + case _ => + throw new UnsupportedOperationException( + s"Data source $providerName does not support streamed reading") + } + + provider.createSource(sqlContext, options, userSpecifiedSchema) + } + + def createSink( + sqlContext: SQLContext, + providerName: String, + options: Map[String, String]): Sink = { + val provider = lookupDataSource(providerName).newInstance() match { + case s: StreamSinkProvider => s + case _ => + throw new UnsupportedOperationException( + s"Data source $providerName does not support streamed writing") + } + + provider.createSink(sqlContext, options) + } + + /** Create a [[ResolvedDataSource]] for reading data in. */ def apply( sqlContext: SQLContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index fe69c72d28cb0..d9e18e52bb030 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, BufferHolder} +import org.apache.spark.sql.execution.streaming.{FileStreamSink, Sink, FileStreamSource, Source} import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ @@ -39,7 +40,11 @@ import org.apache.spark.util.SerializableConfiguration /** * A data source for reading text files. */ -class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { +class DefaultSource + extends HadoopFsRelationProvider + with DataSourceRegister + with StreamSourceProvider + with StreamSinkProvider { override def createRelation( sqlContext: SQLContext, @@ -64,6 +69,23 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { s"Text data source supports only a string column, but you have ${tpe.simpleString}.") } } + + override def createSource( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType]): Source = { + val path = parameters("path") + val metadataPath = parameters.getOrElse("metadataPath", s"$path/_metadata") + + new FileStreamSource(sqlContext, metadataPath, path) + } + + override def createSink(sqlContext: SQLContext, parameters: Map[String, String]): Sink = { + val path = parameters("path") + val metadataPath = parameters.getOrElse("metadataPath", s"$path/_metadata") + + new FileStreamSink(sqlContext, metadataPath, path) + } } private[sql] class TextRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala new file mode 100644 index 0000000000000..da9b2eaf7ee65 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.nio.ByteBuffer + +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.Logging +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.sql.SQLContext + +/** + * A very simple sink that stores received data on the filesystem as a text file. + * This is not atomic. + */ +class FileStreamSink( + val sqlContext: SQLContext, + val metadataPath: String, + val path: String) extends Sink with Logging { + + private def sparkContext = sqlContext.sparkContext + private val fs = FileSystem.get(sparkContext.hadoopConfiguration) + private val serializer = new JavaSerializer(sqlContext.sparkContext.conf).newInstance() + + override def currentOffset: Option[Offset] = { + try { + val buffer = new Array[Byte](10240) + val stream = fs.open(new Path(metadataPath)) + val size = stream.read(buffer) + val shrunk = ByteBuffer.wrap(buffer.take(size)) + Some(serializer.deserialize[Offset](shrunk)) + } catch { + case _: java.io.FileNotFoundException => + None + } + } + + // TODO: this is not atomic. + override def addBatch(batch: Batch): Unit = { + batch.data.write.mode("append").text(path) + val offset = serializer.serialize(batch.end) + val stream = fs.create(new Path(metadataPath), true) + stream.write(offset.array()) + stream.close() + logInfo(s"Committed batch ${batch.end}") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala new file mode 100644 index 0000000000000..25f0a9f08f17e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{OutputStreamWriter, BufferedWriter} + +import org.apache.hadoop.fs.{FileStatus, Path, FileSystem} +import org.apache.spark.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.util.collection.OpenHashSet + +import scala.collection.mutable.ArrayBuffer + +/** + * A very simple source that reads text files from the given directory as they appear. + */ +class FileStreamSource( + val sqlContext: SQLContext, + val metadataPath: String, + val path: String) extends Source with Logging { + + import sqlContext.implicits._ + + /** Returns the schema of the data from this source */ + override def schema: StructType = + StructType(Nil).add("value", StringType) + + /** Returns the maximum offset that can be retrieved from the source. */ + def fetchMaxOffset(): LongOffset = synchronized { + val filesPresent = fetchAllFiles() + val newFiles = new ArrayBuffer[String]() + filesPresent.foreach { file => + if (!seenFiles.contains(file)) { + logDebug(s"new file: $file") + newFiles.append(file) + seenFiles.add(file) + } else { + logDebug(s"old file: $file") + } + } + + if (newFiles.nonEmpty) { + maxBatchFile += 1 + writeBatch(maxBatchFile, newFiles) + } + + new LongOffset(maxBatchFile) + } + + /** + * Returns the next batch of data that is available after `start`, if any is available. + */ + override def getNextBatch(start: Option[Offset]): Option[Batch] = { + val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(0L) + val end = fetchMaxOffset() + val endId = end.offset + + val batchFiles = (startId to endId).filter(_ >= 0).map(i => s"$metadataPath/$i") + if (!(batchFiles.isEmpty || start == Some(end))) { + logDebug(s"Producing files from batches $start:$endId") + logDebug(s"Batch files: $batchFiles") + + // Probably does not need to be a spark job... + val files = sqlContext + .read + .text(batchFiles: _*) + .as[String] + .collect() + logDebug(s"Streaming ${files.mkString(", ")}") + Some(new Batch(end, sqlContext.read.text(files: _*))) + } else { + None + } + } + + def restart(): FileStreamSource = { + new FileStreamSource(sqlContext, metadataPath, path) + } + + private def sparkContext = sqlContext.sparkContext + + private val fs = FileSystem.get(sparkContext.hadoopConfiguration) + private val existingBatchFiles = fetchAllBatchFiles() + private val existingBatchIds = existingBatchFiles.map(_.getPath.getName.toInt) + private var maxBatchFile = if (existingBatchIds.isEmpty) -1 else existingBatchIds.max + private val seenFiles = new OpenHashSet[String] + + if (existingBatchFiles.nonEmpty) { + sqlContext.read + .text(existingBatchFiles.map(_.getPath.toString): _*) + .as[String] + .collect() + .foreach { file => + seenFiles.add(file) + } + } + + private def fetchAllBatchFiles(): Seq[FileStatus] = { + try fs.listStatus(new Path(metadataPath)) catch { + case _: java.io.FileNotFoundException => + fs.mkdirs(new Path(metadataPath)) + Seq.empty + } + } + + private def fetchAllFiles(): Seq[String] = { + fs.listStatus(new Path(path)) + .filterNot(_.getPath.getName.startsWith("_")) + .map(_.getPath.toUri.toString) + } + + private def writeBatch(id: Int, files: Seq[String]): Unit = { + val path = new Path(metadataPath + "/" + id) + val fs = FileSystem.get(sparkContext.hadoopConfiguration) + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(path, true))) + files.foreach { file => + writer.write(file) + writer.write("\n") + } + writer.close() + logDebug(s"Wrote batch file $path") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala index 0dbc595556ded..68cf5f41e5e73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala @@ -36,7 +36,7 @@ trait Sink { * function will be called by Spark when restarting a stream in order to determine at which point * in streamed input data computation should be resumed from. */ - def currentProgress: Option[Offset] + def currentOffset: Option[Offset] /** * Accepts a new batch of data as well as a [[StreamProgress]] that denotes how far in the input diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index acad1c1bf1b1e..b88c798917a18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.Logging import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{StandingQuery, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.{SparkPlan, QueryExecution, LogicalRDD} @@ -35,7 +35,13 @@ import scala.collection.mutable.ArrayBuffer class StreamExecution( sqlContext: SQLContext, private[sql] val logicalPlan: LogicalPlan, - val sink: Sink) extends Logging { + val sink: Sink) extends StandingQuery with Logging { + + /** An monitor used to wait/notify when batches complete. */ + val awaitBatchLock = new Object + + @volatile + var batchRun = false /** Minimum amount of time in between the start of each batch. */ val minBatchTime = 10 @@ -50,7 +56,7 @@ class StreamExecution( // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data // that we have already processed). { - sink.currentProgress match { + sink.currentOffset match { case Some(c: CompositeOffset) => val storedProgress = c.offsets val sources = logicalPlan collect { @@ -76,7 +82,10 @@ class StreamExecution( private[sql] val microBatchThread = new Thread("stream execution thread") { override def run(): Unit = { SQLContext.setActive(sqlContext) - while (shouldRun) { attemptBatch() } + while (shouldRun) { + attemptBatch() + Thread.sleep(minBatchTime) // TODO: Could be tighter + } } } microBatchThread.setDaemon(true) @@ -129,20 +138,23 @@ class StreamExecution( val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 logDebug(s"Optimized batch in ${optimizerTime}ms") - // Update the offsets and calculate a new composite offset - newOffsets.foreach(currentOffsets.update) - val newStreamProgress = logicalPlan.collect { - case StreamingRelation(source, _) => currentOffsets.get(source) - } - val batchOffset = CompositeOffset(newStreamProgress) + StreamExecution.this.synchronized { + // Update the offsets and calculate a new composite offset + newOffsets.foreach(currentOffsets.update) + val newStreamProgress = logicalPlan.collect { + case StreamingRelation(source, _) => currentOffsets.get(source) + } + val batchOffset = CompositeOffset(newStreamProgress) - // Construct the batch and send it to the sink. - val nextBatch = new Batch(batchOffset, new DataFrame(sqlContext, newPlan)) - sink.addBatch(nextBatch) + // Construct the batch and send it to the sink. + val nextBatch = new Batch(batchOffset, new DataFrame(sqlContext, newPlan)) + sink.addBatch(nextBatch) + } - // Wake up any threads that are waiting for the stream to progress. - StreamExecution.this.synchronized { - StreamExecution.this.notifyAll() + batchRun = true + awaitBatchLock.synchronized { + // Wake up any threads that are waiting for the stream to progress. + awaitBatchLock.notifyAll() } val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 @@ -150,9 +162,6 @@ class StreamExecution( } logDebug(s"Waiting for data, current: $currentOffsets") - - // TODO: this could be tighter... - Thread.sleep(minBatchTime) } /** @@ -169,11 +178,30 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - while (!currentOffsets.contains(source) || currentOffsets(source) < newOffset) { + def notDone = synchronized { + !currentOffsets.contains(source) || currentOffsets(source) < newOffset + } + + while (notDone) { logInfo(s"Waiting until $newOffset at $source") - synchronized { wait() } + awaitBatchLock.synchronized { awaitBatchLock.wait(100) } } logDebug(s"Unblocked at $newOffset for $source") } + + /** Clears the indicator that a batch has completed. Used for testing. */ + override def clearBatchMarker(): Unit = { + batchRun = false + } + + /** + * Awaits the completion of at least one streaming batch. Must be called after `clearBatchMarker` + * to gurantee that a new batch has been processed. + */ + override def awaitBatchCompletion(): Unit = { + while (!batchRun) { + awaitBatchLock.synchronized { awaitBatchLock.wait(100) } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 08fdb53a5ed1a..0df6e544e9ec7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -27,7 +27,8 @@ class StreamProgress extends Serializable { with mutable.SynchronizedMap[Source, Offset] private[streaming] def update(source: Source, newOffset: Offset): Unit = { - currentOffsets.get(source).foreach(old => assert(newOffset > old)) + currentOffsets.get(source).foreach(old => + assert(newOffset > old, s"Stream going backwards $newOffset -> $old")) currentOffsets.put(source, newOffset) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 4ed6774b7177b..c2c8f6df72ce7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -113,30 +113,34 @@ class MemorySink(schema: StructType) extends Sink with Logging { /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */ private val externalRowConverter = RowEncoder(schema) - override def currentProgress: Option[Offset] = batches.lastOption.map(_.end) + override def currentOffset: Option[Offset] = synchronized { + batches.lastOption.map(_.end) + } - override def addBatch(nextBatch: Batch): Unit = { + override def addBatch(nextBatch: Batch): Unit = synchronized { batches.append(nextBatch) } /** Returns all rows that are stored in this [[Sink]]. */ - def allData: Seq[Row] = + def allData: Seq[Row] = synchronized { batches - .map(_.data) - .reduceOption(_ unionAll _) - .map(_.collect().toSeq) - .getOrElse(Seq.empty) + .map(_.data) + .reduceOption(_ unionAll _) + .map(_.collect().toSeq) + .getOrElse(Seq.empty) + } /** * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the * corresponding point in the input. This function can be used when testing to simulate data * that has been lost due to buffering. */ - def dropBatches(num: Int): Unit = { + def dropBatches(num: Int): Unit = synchronized { batches.remove(batches.size - num, num) } - override def toString: String = + override def toString: String = synchronized { batches.map(b => s"${b.end}: ${b.data.collect().mkString(" ")}").mkString("\n") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index d6c5d1435702d..f1148d55f41c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.execution.{FileRelation, RDDConversions} import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition} +import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql._ import org.apache.spark.util.SerializableConfiguration @@ -123,6 +124,25 @@ trait SchemaRelationProvider { schema: StructType): BaseRelation } +/** + * Implemented by objects that can produce a streaming [[Source]] for a specific format or system. + */ +trait StreamSourceProvider { + def createSource( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType]): Source +} + +/** + * Implemented by objects that can produce a streaming [[Sink]] for a specific format or system. + */ +trait StreamSinkProvider { + def createSink( + sqlContext: SQLContext, + parameters: Map[String, String]): Sink +} + /** * ::Experimental:: * Implemented by objects that produce relations for a specific kind of data source diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala new file mode 100644 index 0000000000000..c5e70db33b6ca --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File + +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.StreamTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +class DataStreamReaderSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("read from text files") { + val src = Utils.createDirectory("streaming.src") + val dest = Utils.createDirectory("streaming.dest") + + val df = + sqlContext + .streamFrom + .format("text") + .open(src.getCanonicalPath) + + val filtered = df.filter($"value" contains "keep") + + val runningQuery = + filtered + .streamTo + .format("text") + .start(dest.getCanonicalPath) + + runningQuery.clearBatchMarker() + + // Add some data + stringToFile(new File(src, "1"), "drop1\nkeep2\nkeep3") + + runningQuery.awaitBatchCompletion() + + val output = sqlContext.read.text(dest.getCanonicalPath).as[String] + checkAnswer(output, "keep2", "keep3") + + runningQuery.stop() + } +}