Skip to content

Commit e112af9

Browse files
committed
create Write at physical plan
1 parent f2ead4d commit e112af9

File tree

11 files changed

+222
-173
lines changed

11 files changed

+222
-173
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownF
2929
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
3030
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
3131
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
32+
import org.apache.spark.sql.execution.streaming.{WriteMicroBatch, WriteMicroBatchExec}
3233
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
3334
import org.apache.spark.sql.sources
3435
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -176,9 +177,6 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
176177

177178
withProjection :: Nil
178179

179-
case WriteToDataSourceV2(writer, query) =>
180-
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
181-
182180
case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
183181
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil
184182

@@ -265,8 +263,13 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
265263
}).toArray
266264
DeleteFromTableExec(r.table.asDeletable, filters) :: Nil
267265

268-
case WriteToContinuousDataSource(writer, query) =>
269-
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
266+
case WriteMicroBatch(table, query, queryId, querySchema, outputMode, options, epochId) =>
267+
WriteMicroBatchExec(
268+
table, planLater(query), queryId, querySchema, outputMode, options, epochId) :: Nil
269+
270+
case WriteToContinuousDataSource(table, query, queryId, querySchema, outputMode, options) =>
271+
WriteToContinuousDataSourceExec(
272+
table, planLater(query), queryId, querySchema, outputMode, options) :: Nil
270273

271274
case Repartition(1, false, child) =>
272275
val isContinuous = child.find {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,6 @@ import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
3838
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3939
import org.apache.spark.util.{LongAccumulator, Utils}
4040

41-
/**
42-
* Deprecated logical plan for writing data into data source v2. This is being replaced by more
43-
* specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
44-
*/
45-
@deprecated("Use specific logical plans like AppendData instead", "2.4.0")
46-
case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
47-
extends LogicalPlan {
48-
override def children: Seq[LogicalPlan] = Seq(query)
49-
override def output: Seq[Attribute] = Nil
50-
}
51-
5241
/**
5342
* Physical plan node for v2 create table as select when the catalog does not support staging
5443
* the table creation.
@@ -315,17 +304,6 @@ case class OverwritePartitionsDynamicExec(
315304
}
316305
}
317306

318-
case class WriteToDataSourceV2Exec(
319-
batchWrite: BatchWrite,
320-
query: SparkPlan) extends V2TableWriteExec {
321-
322-
def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()
323-
324-
override protected def doExecute(): RDD[InternalRow] = {
325-
writeWithV2(batchWrite)
326-
}
327-
}
328-
329307
/**
330308
* Helper for physical plans that build batch writes.
331309
*/
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
24+
import org.apache.spark.sql.connector.catalog.SupportsWrite
25+
import org.apache.spark.sql.connector.write.SupportsTruncate
26+
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
27+
import org.apache.spark.sql.streaming.OutputMode
28+
import org.apache.spark.sql.types.StructType
29+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
30+
import org.apache.spark.util.Utils
31+
32+
trait BaseStreamingWriteExec extends UnaryExecNode {
33+
def table: SupportsWrite
34+
def query: SparkPlan
35+
def queryId: String
36+
def querySchema: StructType
37+
def outputMode: OutputMode
38+
def options: Map[String, String]
39+
40+
override def child: SparkPlan = query
41+
override def output: Seq[Attribute] = Nil
42+
43+
protected lazy val inputRDD = query.execute()
44+
lazy val streamWrite = {
45+
val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
46+
.withQueryId(queryId)
47+
.withInputDataSchema(querySchema)
48+
outputMode match {
49+
case Append =>
50+
writeBuilder.buildForStreaming()
51+
52+
case Complete =>
53+
// TODO: we should do this check earlier when we have capability API.
54+
require(writeBuilder.isInstanceOf[SupportsTruncate],
55+
table.name + " does not support Complete mode.")
56+
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
57+
58+
case Update =>
59+
// Although no v2 sinks really support Update mode now, but during tests we do want them
60+
// to pretend to support Update mode, and treat Update mode same as Append mode.
61+
if (Utils.isTesting) {
62+
writeBuilder.buildForStreaming()
63+
} else {
64+
throw new IllegalArgumentException(
65+
"Data source v2 streaming sinks does not support Update mode.")
66+
}
67+
}
68+
}
69+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.util.truncatedString
2727
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
2828
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
2929
import org.apache.spark.sql.execution.SQLExecution
30-
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
31-
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
30+
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress}
31+
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
3232
import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
3434
import org.apache.spark.util.Clock
@@ -127,8 +127,8 @@ class MicroBatchExecution(
127127
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
128128
sink match {
129129
case s: SupportsWrite =>
130-
val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan)
131-
WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)
130+
WriteToMicroBatchDataSource(
131+
s, _logicalPlan, id.toString, _logicalPlan.schema, outputMode, extraOptions)
132132

133133
case _ => _logicalPlan
134134
}
@@ -557,7 +557,7 @@ class MicroBatchExecution(
557557
nextBatch.collect()
558558
}
559559
lastExecution.executedPlan match {
560-
case w: WriteToDataSourceV2Exec => w.commitProgress
560+
case w: WriteMicroBatchExec => w.commitProgress
561561
case _ => None
562562
}
563563
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -584,35 +584,6 @@ abstract class StreamExecution(
584584
|batch = $batchDescription""".stripMargin
585585
}
586586

587-
protected def createStreamingWrite(
588-
table: SupportsWrite,
589-
options: Map[String, String],
590-
inputPlan: LogicalPlan): StreamingWrite = {
591-
val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava))
592-
.withQueryId(id.toString)
593-
.withInputDataSchema(inputPlan.schema)
594-
outputMode match {
595-
case Append =>
596-
writeBuilder.buildForStreaming()
597-
598-
case Complete =>
599-
// TODO: we should do this check earlier when we have capability API.
600-
require(writeBuilder.isInstanceOf[SupportsTruncate],
601-
table.name + " does not support Complete mode.")
602-
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
603-
604-
case Update =>
605-
// Although no v2 sinks really support Update mode now, but during tests we do want them
606-
// to pretend to support Update mode, and treat Update mode same as Append mode.
607-
if (Utils.isTesting) {
608-
writeBuilder.buildForStreaming()
609-
} else {
610-
throw new IllegalArgumentException(
611-
"Data source v2 streaming sinks does not support Update mode.")
612-
}
613-
}
614-
}
615-
616587
protected def purge(threshold: Long): Unit = {
617588
logDebug(s"Purging metadata at threshold=$threshold")
618589
offsetLog.purge(threshold)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.Attribute
23+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
24+
import org.apache.spark.sql.connector.catalog.SupportsWrite
25+
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage}
26+
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
27+
import org.apache.spark.sql.execution.SparkPlan
28+
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
29+
import org.apache.spark.sql.streaming.OutputMode
30+
import org.apache.spark.sql.types.StructType
31+
32+
/**
33+
* The logical plan for writing data to a micro-batch stream.
34+
*
35+
* Note that this logical plan does not have a corresponding physical plan, as it will be converted
36+
* to [[WriteMicroBatch]] with epoch id for each micro-batch.
37+
*/
38+
case class WriteToMicroBatchDataSource(
39+
table: SupportsWrite,
40+
query: LogicalPlan,
41+
queryId: String,
42+
querySchema: StructType,
43+
outputMode: OutputMode,
44+
options: Map[String, String])
45+
extends LogicalPlan {
46+
override def children: Seq[LogicalPlan] = Seq(query)
47+
override def output: Seq[Attribute] = Nil
48+
49+
def createPlan(epochId: Long): WriteMicroBatch = {
50+
WriteMicroBatch(table, query, queryId, querySchema, outputMode, options, epochId)
51+
}
52+
}
53+
54+
case class WriteMicroBatch(
55+
table: SupportsWrite,
56+
query: LogicalPlan,
57+
queryId: String,
58+
querySchema: StructType,
59+
outputMode: OutputMode,
60+
options: Map[String, String],
61+
epochId: Long) extends UnaryNode {
62+
override def child: LogicalPlan = query
63+
override def output: Seq[Attribute] = Nil
64+
}
65+
66+
case class WriteMicroBatchExec(
67+
table: SupportsWrite,
68+
query: SparkPlan,
69+
queryId: String,
70+
querySchema: StructType,
71+
outputMode: OutputMode,
72+
options: Map[String, String],
73+
epochId: Long) extends BaseStreamingWriteExec with V2TableWriteExec {
74+
75+
override protected def doExecute(): RDD[InternalRow] = {
76+
val batchWrite = new MicroBatchWrite(epochId, streamWrite)
77+
writeWithV2(batchWrite)
78+
}
79+
}
80+
81+
/**
82+
* A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements
83+
* the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
84+
* streaming write support.
85+
*/
86+
class MicroBatchWrite(epochId: Long, streamWrite: StreamingWrite) extends BatchWrite {
87+
88+
override def commit(messages: Array[WriterCommitMessage]): Unit = {
89+
streamWrite.commit(epochId, messages)
90+
}
91+
92+
override def abort(messages: Array[WriterCommitMessage]): Unit = {
93+
streamWrite.abort(epochId, messages)
94+
}
95+
96+
override def createBatchWriterFactory(): DataWriterFactory = {
97+
new MicroBatchWriterFactory(epochId, streamWrite.createStreamingWriterFactory())
98+
}
99+
}
100+
101+
class MicroBatchWriterFactory(epochId: Long, streamingWriterFactory: StreamingDataWriterFactory)
102+
extends DataWriterFactory {
103+
104+
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
105+
streamingWriterFactory.createWriter(partitionId, taskId, epochId)
106+
}
107+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class ContinuousExecution(
8888

8989
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
9090
WriteToContinuousDataSource(
91-
createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
91+
sink, _logicalPlan, id.toString, _logicalPlan.schema, outputMode, extraOptions)
9292
}
9393

9494
private val triggerExecutor = trigger match {
@@ -178,7 +178,7 @@ class ContinuousExecution(
178178
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
179179
}
180180

181-
reportTimeTaken("queryPlanning") {
181+
val write = reportTimeTaken("queryPlanning") {
182182
lastExecution = new IncrementalExecution(
183183
sparkSessionForQuery,
184184
withNewSources,
@@ -188,7 +188,8 @@ class ContinuousExecution(
188188
runId,
189189
currentBatchId,
190190
offsetSeqMetadata)
191-
lastExecution.executedPlan // Force the lazy generation of execution plan
191+
// Force the lazy generation of execution plan and get the `StreamWrite`.
192+
lastExecution.executedPlan.asInstanceOf[WriteToContinuousDataSourceExec].streamWrite
192193
}
193194

194195
val stream = withNewSources.collect {
@@ -212,7 +213,7 @@ class ContinuousExecution(
212213

213214
// Use the parent Spark session for the endpoint since it's where this query ID is registered.
214215
val epochEndpoint = EpochCoordinatorRef.create(
215-
logicalPlan.write,
216+
write,
216217
stream,
217218
this,
218219
epochCoordinatorId,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@ package org.apache.spark.sql.execution.streaming.continuous
1919

2020
import org.apache.spark.sql.catalyst.expressions.Attribute
2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22-
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
22+
import org.apache.spark.sql.connector.catalog.SupportsWrite
23+
import org.apache.spark.sql.streaming.OutputMode
24+
import org.apache.spark.sql.types.StructType
2325

2426
/**
2527
* The logical plan for writing data in a continuous stream.
2628
*/
27-
case class WriteToContinuousDataSource(write: StreamingWrite, query: LogicalPlan)
28-
extends LogicalPlan {
29+
case class WriteToContinuousDataSource(
30+
table: SupportsWrite,
31+
query: LogicalPlan,
32+
queryId: String,
33+
querySchema: StructType,
34+
outputMode: OutputMode,
35+
options: Map[String, String]) extends LogicalPlan {
2936
override def children: Seq[LogicalPlan] = Seq(query)
3037
override def output: Seq[Attribute] = Nil
3138
}

0 commit comments

Comments
 (0)