Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c281373
[SPARK-29248][SQL] Pass in number of partitions to WriteBuilder
edrevo Sep 27, 2019
903d863
move numpartitions to physicalbuildinfo
edrevo Nov 15, 2019
c6b95f5
Merge branch 'master' of https://github.com/apache/spark into add-par…
edrevo Nov 15, 2019
30c8800
fixes
edrevo Nov 15, 2019
15096b2
fixes
edrevo Nov 15, 2019
389afee
lint fixes
edrevo Nov 15, 2019
4f10e54
[SPARK-29655][SQL] Read bucketed tables obeys spark.sql.shuffle.parti…
wangyum Nov 15, 2019
ca4e894
PR feedback
edrevo Nov 15, 2019
16e3c4a
more pr feedback
edrevo Nov 15, 2019
ee4784b
[SPARK-26499][SQL][FOLLOW-UP] Replace `update` with `setByte` for Byt…
maropu Nov 15, 2019
1521889
[SPARK-29902][DOC][MINOR] Add listener event queue capacity configura…
shahidki31 Nov 15, 2019
848bdfa
[SPARK-29829][SQL] SHOW TABLE EXTENDED should do multi-catalog resolu…
planga82 Nov 15, 2019
c0507e0
[SPARK-29833][YARN] Add FileNotFoundException check for spark.yarn.jars
ulysses-you Nov 16, 2019
7720781
[SPARK-29127][SQL][PYTHON] Add a clue for Python related version info…
HyukjinKwon Nov 16, 2019
16e7195
[SPARK-29834][SQL] DESC DATABASE should look up catalog like v2 commands
fuwhu Nov 16, 2019
6d6b233
[SPARK-29343][SQL][FOLLOW-UP] Remove floating-point Sum/Average/Centr…
maropu Nov 16, 2019
1112fc6
[SPARK-29867][ML][PYTHON] Add __repr__ in Python ML Models
huaxingao Nov 16, 2019
f77c10d
[SPARK-29923][SQL][TESTS] Set io.netty.tryReflectionSetAccessible for…
dongjoon-hyun Nov 16, 2019
40ea4a1
[SPARK-29807][SQL] Rename "spark.sql.ansi.enabled" to "spark.sql.dial…
xuanyuanking Nov 16, 2019
d0470d6
[MINOR][TESTS] Ignore GitHub Action and AppVeyor file changes in testing
dongjoon-hyun Nov 16, 2019
5336473
[SPARK-29476][WEBUI] add tooltip for Thread
PavithraRamachandran Nov 16, 2019
e88267c
[SPARK-29928][SQL][TESTS] Check parsing timestamps up to microsecond …
MaxGekk Nov 17, 2019
cc12cf6
[SPARK-29378][R] Upgrade SparkR to use Arrow 0.15 API
dongjoon-hyun Nov 17, 2019
388a737
[SPARK-29858][SQL] ALTER DATABASE (SET DBPROPERTIES) should look up c…
fuwhu Nov 17, 2019
a9959be
[SPARK-29456][WEBUI] Improve tooltip for Session Statistics Table col…
PavithraRamachandran Nov 17, 2019
e1fc38b
[SPARK-29932][R][TESTS] lint-r should do non-zero exit in case of errors
dongjoon-hyun Nov 17, 2019
5eb8973
[SPARK-29930][SQL] Remove SQL configs declared to be removed in Spark…
MaxGekk Nov 17, 2019
c5f644c
[SPARK-16872][ML][PYSPARK] Impl Gaussian Naive Bayes Classifier
zhengruifeng Nov 18, 2019
d83cacf
[SPARK-29907][SQL] Move DELETE/UPDATE/MERGE relative rules to dmlStat…
Nov 18, 2019
f280c6a
[SPARK-29378][R][FOLLOW-UP] Remove manual installation of Arrow depen…
HyukjinKwon Nov 18, 2019
42f8f79
[SPARK-29936][R] Fix SparkR lint errors and add lint-r GitHub Action
dongjoon-hyun Nov 18, 2019
9ff8ac7
javadoc fixes
edrevo Nov 18, 2019
ee3bd6d
[SPARK-25694][SQL] Add a config for `URL.setURLStreamHandlerFactory`
jiangzho Nov 18, 2019
7391237
[SPARK-29020][SQL] Improving array_sort behaviour
Nov 18, 2019
5cebe58
[SPARK-29783][SQL] Support SQL Standard/ISO_8601 output style for int…
yaooqinn Nov 18, 2019
50f6d93
[SPARK-29870][SQL] Unify the logic of multi-units interval string to …
yaooqinn Nov 18, 2019
c32e228
[SPARK-29859][SQL] ALTER DATABASE (SET LOCATION) should look up catal…
fuwhu Nov 18, 2019
ae6b711
[SPARK-29941][SQL] Add ansi type aliases for char and decimal
yaooqinn Nov 18, 2019
ea010a2
[SPARK-29873][SQL][TEST][FOLLOWUP] set operations should not escape w…
yaooqinn Nov 18, 2019
9514b82
[SPARK-29777][SPARKR] SparkR::cleanClosure aggressively removes a fun…
falaki Nov 19, 2019
8469614
[SPARK-25694][SQL][FOLLOW-UP] Move 'spark.sql.defaultUrlStreamHandler…
HyukjinKwon Nov 19, 2019
882f54b
[SPARK-29870][SQL][FOLLOW-UP] Keep CalendarInterval's toString
HyukjinKwon Nov 19, 2019
28a502c
[SPARK-28527][FOLLOW-UP][SQL][TEST] Add guides for ThriftServerQueryT…
wangyum Nov 19, 2019
a834dba
Revert "[SPARK-29644][SQL] Corrected ShortType and ByteType mapping t…
shivsood Nov 19, 2019
3d45779
[SPARK-29728][SQL] Datasource V2: Support ALTER TABLE RENAME TO
imback82 Nov 19, 2019
118d81f
[SPARK-29248][SQL] Add PhysicalWriteInfo with number of partitions
edrevo Nov 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.connector.write.{WriteBuilder, WriteInfo}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -42,8 +42,10 @@ case class AvroTable(
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
new AvroWriteBuilder(options, paths, formatName, supportsDataType)
override def newWriteBuilder(options: CaseInsensitiveStringMap,
writeInfo: WriteInfo): WriteBuilder =
new AvroWriteBuilder(
options, paths, formatName, supportsDataType, writeInfo)

override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.v2.avro
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.WriteInfo
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -29,8 +30,9 @@ class AvroWriteBuilder(
options: CaseInsensitiveStringMap,
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean)
extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
supportsDataType: DataType => Boolean,
writeInfo: WriteInfo)
extends FileWriteBuilder(options, paths, formatName, supportsDataType, writeInfo) {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder, WriteInfo}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -392,18 +392,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
() => new KafkaScan(options)

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
override def newWriteBuilder(options: CaseInsensitiveStringMap,
writeInfo: WriteInfo): WriteBuilder = {
new WriteBuilder {
private var inputSchema: StructType = _
private val inputSchema: StructType = writeInfo.schema()
private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
private val producerParams =
kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap))

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def buildForBatch(): BatchWrite = {
assert(inputSchema != null)
new KafkaBatchWrite(topic, producerParams, inputSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.write.WriteInfo;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand All @@ -32,10 +33,11 @@
* {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or
* {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the
* table for being written to. This table should usually implement {@link SupportsWrite}. A new
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)},
* and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()},
* at which point implementations are expected to commit the table's metadata into the metastore
* along with the data that was written by the writes from the write builder this table created.
* writer will be constructed via
* {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap, WriteInfo)}, and the write will
* be committed. The job concludes with a call to {@link #commitStagedChanges()}, at which point
* implementations are expected to commit the table's metadata into the metastore along with the
* data that was written by the writes from the write builder this table created.
*/
@Experimental
public interface StagedTable extends Table {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.write.WriteInfo;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
Expand All @@ -39,9 +40,9 @@
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
* {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
* the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
* write operation fails, the catalog will have already dropped the table, and the planner cannot
* roll back the dropping of the table.
* the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap, WriteInfo)}.
* However, if the write operation fails, the catalog will have already dropped the table, and the
* planner cannot roll back the dropping of the table.
* <p>
* If the catalog implements this plugin, the catalog can implement the methods to "stage" the
* creation and the replacement of a table. After the table's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.connector.write.WriteInfo;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A mix-in interface of {@link Table}, to indicate that it's writable. This adds
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
* {@link #newWriteBuilder(CaseInsensitiveStringMap, WriteInfo)} that is used to create a write
* for batch or streaming.
*/
@Experimental
Expand All @@ -34,5 +35,5 @@ public interface SupportsWrite extends Table {
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
* this method to configure each data source write.
*/
WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options, WriteInfo info);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,6 @@
@Evolving
public interface WriteBuilder {

/**
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
* possible that there are many queries running at the same time, or a query is restarted and
* resumed. {@link BatchWrite} can use this id to identify the query.
*
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
* `queryId` is ignored. Please override this method to take the `queryId`.
*/
default WriteBuilder withQueryId(String queryId) {
return this;
}

/**
* Passes the schema of the input data from Spark to data source.
*
* @return a new builder with the `schema`. By default it returns `this`, which means the given
* `schema` is ignored. Please override this method to take the `schema`.
*/
default WriteBuilder withInputDataSchema(StructType schema) {
return this;
}

/**
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.types.StructType;

@Experimental
public interface WriteInfo {
String queryId();

StructType schema();

int numPartitions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write

import org.apache.spark.sql.types.StructType

private[sql] case class WriteInfoImpl(queryId: String,
schema: StructType,
numPartitions: Int) extends WriteInfo
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class InMemoryTable(
override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
}

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
override def newWriteBuilder(options: CaseInsensitiveStringMap, info: WriteInfo): WriteBuilder = {
InMemoryTable.maybeSimulateFailedTableWrite(options)

new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.connector.write.{WriteBuilder, WriteInfo}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -88,8 +88,9 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable

override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
delegateTable.newWriteBuilder(options)
override def newWriteBuilder(options: CaseInsensitiveStringMap,
writeInfo: WriteInfo): WriteBuilder = {
delegateTable.newWriteBuilder(options, writeInfo)
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriteInfo, WriterCommitMessage}
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
Expand All @@ -39,7 +39,8 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
}

private[noop] object NoopTable extends Table with SupportsWrite {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def newWriteBuilder(options: CaseInsensitiveStringMap,
writeInfo: WriteInfo): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query) :: Nil
AppendDataExecV1(v1, writeOptions.asOptions, planLater(query)) :: Nil
case v2 =>
AppendDataExec(v2, writeOptions.asOptions, planLater(query)) :: Nil
}
Expand All @@ -242,7 +242,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query) :: Nil
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, planLater(query)) :: Nil
case v2 =>
OverwriteByExpressionExec(v2, filters, writeOptions.asOptions, planLater(query)) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder, WriteInfo}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -43,21 +43,12 @@ abstract class FileWriteBuilder(
options: CaseInsensitiveStringMap,
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean) extends WriteBuilder {
private var schema: StructType = _
private var queryId: String = _
supportsDataType: DataType => Boolean,
writeInfo: WriteInfo) extends WriteBuilder {
private val schema = writeInfo.schema()
private val queryId = writeInfo.queryId()
private var mode: SaveMode = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.schema = schema
this
}

override def withQueryId(queryId: String): WriteBuilder = {
this.queryId = queryId
this
}

def mode(mode: SaveMode): WriteBuilder = {
this.mode = mode
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder, WriteInfo, WriteInfoImpl}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -39,7 +37,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class AppendDataExecV1(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {
query: SparkPlan) extends V1FallbackWriters {

override protected def doExecute(): RDD[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write())
Expand All @@ -61,7 +59,7 @@ case class OverwriteByExpressionExecV1(
table: SupportsWrite,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan) extends V1FallbackWriters {
query: SparkPlan) extends V1FallbackWriters {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
Expand All @@ -82,7 +80,7 @@ case class OverwriteByExpressionExecV1(
}

/** Some helper interfaces that use V2 write semantics through the V1 writer interface. */
sealed trait V1FallbackWriters extends SupportsV1Write {
sealed trait V1FallbackWriters extends SupportsV1Write { this: SupportsV1Write =>
override def output: Seq[Attribute] = Nil
override final def children: Seq[SparkPlan] = Nil

Expand All @@ -98,22 +96,22 @@ sealed trait V1FallbackWriters extends SupportsV1Write {
}

protected def newWriteBuilder(): V1WriteBuilder = {
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(plan.schema)
.withQueryId(UUID.randomUUID().toString)
val writeInfo: WriteInfo = WriteInfoImpl(
queryId = UUID.randomUUID().toString,
schema = query.schema,
numPartitions = rdd.getNumPartitions)
val writeBuilder = table.newWriteBuilder(writeOptions, writeInfo)

writeBuilder.asV1Builder
}
}

/**
* A trait that allows Tables that use V1 Writer interfaces to append data.
*/
trait SupportsV1Write extends SparkPlan {
// TODO: We should be able to work on SparkPlans at this point.
def plan: LogicalPlan

trait SupportsV1Write extends SparkPlan with WriteBase {
protected def writeWithV1(relation: InsertableRelation): RDD[InternalRow] = {
relation.insert(Dataset.ofRows(sqlContext.sparkSession, plan), overwrite = false)
relation.insert(sqlContext.internalCreateDataFrame(rdd, query.schema), overwrite = false)
sparkContext.emptyRDD
}
}
Loading