-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-10216][SQL] Avoid creating empty files during overwriting with group by query #12855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
8018455
46f085a
689252a
e2749d7
acdc537
9a89ed1
57f2ecc
294b447
ab2d092
dee6a4e
b595b7f
857ba4d
24e16b7
9a61837
5f780a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -239,48 +239,50 @@ private[sql] class DefaultWriterContainer( | |
| extends BaseWriterContainer(relation, job, isAppend) { | ||
|
|
||
| def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { | ||
| executorSideSetup(taskContext) | ||
| val configuration = taskAttemptContext.getConfiguration | ||
| configuration.set("spark.sql.sources.output.path", outputPath) | ||
| var writer = newOutputWriter(getWorkPath) | ||
| writer.initConverter(dataSchema) | ||
|
|
||
| // If anything below fails, we should abort the task. | ||
| try { | ||
| Utils.tryWithSafeFinallyAndFailureCallbacks { | ||
| while (iterator.hasNext) { | ||
| val internalRow = iterator.next() | ||
| writer.writeInternal(internalRow) | ||
| } | ||
| commitTask() | ||
| }(catchBlock = abortTask()) | ||
| } catch { | ||
| case t: Throwable => | ||
| throw new SparkException("Task failed while writing rows", t) | ||
| } | ||
| if (iterator.hasNext) { | ||
| executorSideSetup(taskContext) | ||
| val configuration = taskAttemptContext.getConfiguration | ||
| configuration.set("spark.sql.sources.output.path", outputPath) | ||
| var writer = newOutputWriter(getWorkPath) | ||
| writer.initConverter(dataSchema) | ||
|
|
||
| def commitTask(): Unit = { | ||
| // If anything below fails, we should abort the task. | ||
| try { | ||
| if (writer != null) { | ||
| writer.close() | ||
| writer = null | ||
| } | ||
| super.commitTask() | ||
| Utils.tryWithSafeFinallyAndFailureCallbacks { | ||
| while (iterator.hasNext) { | ||
| val internalRow = iterator.next() | ||
| writer.writeInternal(internalRow) | ||
| } | ||
| commitTask() | ||
| }(catchBlock = abortTask()) | ||
| } catch { | ||
| case cause: Throwable => | ||
| // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and | ||
| // will cause `abortTask()` to be invoked. | ||
| throw new RuntimeException("Failed to commit task", cause) | ||
| case t: Throwable => | ||
| throw new SparkException("Task failed while writing rows", t) | ||
| } | ||
| } | ||
|
|
||
| def abortTask(): Unit = { | ||
| try { | ||
| if (writer != null) { | ||
| writer.close() | ||
| def commitTask(): Unit = { | ||
| try { | ||
| if (writer != null) { | ||
| writer.close() | ||
| writer = null | ||
| } | ||
| super.commitTask() | ||
| } catch { | ||
| case cause: Throwable => | ||
| // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and | ||
| // will cause `abortTask()` to be invoked. | ||
| throw new RuntimeException("Failed to commit task", cause) | ||
| } | ||
| } | ||
|
|
||
| def abortTask(): Unit = { | ||
| try { | ||
| if (writer != null) { | ||
| writer.close() | ||
| } | ||
| } finally { | ||
| super.abortTask() | ||
| } | ||
| } finally { | ||
| super.abortTask() | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -363,84 +365,87 @@ private[sql] class DynamicPartitionWriterContainer( | |
| } | ||
|
|
||
| def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { | ||
| executorSideSetup(taskContext) | ||
|
|
||
| // We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
| val sortingExpressions: Seq[Expression] = partitionColumns ++ bucketIdExpression ++ sortColumns | ||
| val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) | ||
|
|
||
| val sortingKeySchema = StructType(sortingExpressions.map { | ||
| case a: Attribute => StructField(a.name, a.dataType, a.nullable) | ||
| // The sorting expressions are all `Attribute` except bucket id. | ||
| case _ => StructField("bucketId", IntegerType, nullable = false) | ||
| }) | ||
|
|
||
| // Returns the data columns to be written given an input row | ||
| val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema) | ||
|
|
||
| // Returns the partition path given a partition key. | ||
| val getPartitionString = | ||
| UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) | ||
|
|
||
| // Sorts the data before write, so that we only need one writer at the same time. | ||
| // TODO: inject a local sort operator in planning. | ||
| val sorter = new UnsafeKVExternalSorter( | ||
| sortingKeySchema, | ||
| StructType.fromAttributes(dataColumns), | ||
| SparkEnv.get.blockManager, | ||
| SparkEnv.get.serializerManager, | ||
| TaskContext.get().taskMemoryManager().pageSizeBytes) | ||
|
|
||
| while (iterator.hasNext) { | ||
| val currentRow = iterator.next() | ||
| sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) | ||
| } | ||
| logInfo(s"Sorting complete. Writing out partition files one at a time.") | ||
|
|
||
| val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { | ||
| identity | ||
| } else { | ||
| UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { | ||
| case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) | ||
| if (iterator.hasNext) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here as well. Simply added |
||
| executorSideSetup(taskContext) | ||
|
|
||
| // We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
| val sortingExpressions: Seq[Expression] = | ||
| partitionColumns ++ bucketIdExpression ++ sortColumns | ||
| val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) | ||
|
|
||
| val sortingKeySchema = StructType(sortingExpressions.map { | ||
| case a: Attribute => StructField(a.name, a.dataType, a.nullable) | ||
| // The sorting expressions are all `Attribute` except bucket id. | ||
| case _ => StructField("bucketId", IntegerType, nullable = false) | ||
| }) | ||
| } | ||
|
|
||
| val sortedIterator = sorter.sortedIterator() | ||
| // Returns the data columns to be written given an input row | ||
| val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema) | ||
|
|
||
| // Returns the partition path given a partition key. | ||
| val getPartitionString = | ||
| UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) | ||
|
|
||
| // Sorts the data before write, so that we only need one writer at the same time. | ||
| // TODO: inject a local sort operator in planning. | ||
| val sorter = new UnsafeKVExternalSorter( | ||
| sortingKeySchema, | ||
| StructType.fromAttributes(dataColumns), | ||
| SparkEnv.get.blockManager, | ||
| SparkEnv.get.serializerManager, | ||
| TaskContext.get().taskMemoryManager().pageSizeBytes) | ||
|
|
||
| while (iterator.hasNext) { | ||
| val currentRow = iterator.next() | ||
| sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) | ||
| } | ||
| logInfo(s"Sorting complete. Writing out partition files one at a time.") | ||
|
|
||
| val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) { | ||
| identity | ||
| } else { | ||
| UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map { | ||
| case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable) | ||
| }) | ||
| } | ||
|
|
||
| // If anything below fails, we should abort the task. | ||
| var currentWriter: OutputWriter = null | ||
| try { | ||
| Utils.tryWithSafeFinallyAndFailureCallbacks { | ||
| var currentKey: UnsafeRow = null | ||
| while (sortedIterator.next()) { | ||
| val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] | ||
| if (currentKey != nextKey) { | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| currentWriter = null | ||
| } | ||
| currentKey = nextKey.copy() | ||
| logDebug(s"Writing partition: $currentKey") | ||
| val sortedIterator = sorter.sortedIterator() | ||
|
|
||
| currentWriter = newOutputWriter(currentKey, getPartitionString) | ||
| // If anything below fails, we should abort the task. | ||
| var currentWriter: OutputWriter = null | ||
| try { | ||
| Utils.tryWithSafeFinallyAndFailureCallbacks { | ||
| var currentKey: UnsafeRow = null | ||
| while (sortedIterator.next()) { | ||
| val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow] | ||
| if (currentKey != nextKey) { | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| currentWriter = null | ||
| } | ||
| currentKey = nextKey.copy() | ||
| logDebug(s"Writing partition: $currentKey") | ||
|
|
||
| currentWriter = newOutputWriter(currentKey, getPartitionString) | ||
| } | ||
| currentWriter.writeInternal(sortedIterator.getValue) | ||
| } | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| currentWriter = null | ||
| } | ||
| currentWriter.writeInternal(sortedIterator.getValue) | ||
| } | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| currentWriter = null | ||
| } | ||
|
|
||
| commitTask() | ||
| }(catchBlock = { | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| } | ||
| abortTask() | ||
| }) | ||
| } catch { | ||
| case t: Throwable => | ||
| throw new SparkException("Task failed while writing rows", t) | ||
| commitTask() | ||
| }(catchBlock = { | ||
| if (currentWriter != null) { | ||
| currentWriter.close() | ||
| } | ||
| abortTask() | ||
| }) | ||
| } catch { | ||
| case t: Throwable => | ||
| throw new SparkException("Task failed while writing rows", t) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive | |
|
|
||
| import java.io.File | ||
|
|
||
| import org.apache.hadoop.hive.conf.HiveConf | ||
| import org.scalatest.BeforeAndAfter | ||
|
|
||
| import org.apache.spark.SparkException | ||
|
|
@@ -118,10 +117,10 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef | |
|
|
||
| sql( | ||
| s""" | ||
| |CREATE TABLE table_with_partition(c1 string) | ||
| |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) | ||
| |location '${tmpDir.toURI.toString}' | ||
| """.stripMargin) | ||
| |CREATE TABLE table_with_partition(c1 string) | ||
| |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) | ||
| |location '${tmpDir.toURI.toString}' | ||
| """.stripMargin) | ||
| sql( | ||
| """ | ||
| |INSERT OVERWRITE TABLE table_with_partition | ||
|
|
@@ -216,6 +215,33 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef | |
| sql("DROP TABLE hiveTableWithStructValue") | ||
| } | ||
|
|
||
| test("SPARK-10216: Avoid empty files during overwrite into Hive table with group by query") { | ||
| val testDataset = hiveContext.sparkContext.parallelize( | ||
| (1 to 2).map(i => TestData(i, i.toString))).toDF() | ||
| testDataset.registerTempTable("testDataset") | ||
|
|
||
| val tmpDir = Utils.createTempDir() | ||
| sql( | ||
| s""" | ||
| |CREATE TABLE table1(key int,value string) | ||
| |location '${tmpDir.toURI.toString}' | ||
| """.stripMargin) | ||
| sql( | ||
| """ | ||
| |INSERT OVERWRITE TABLE table1 | ||
| |SELECT count(key), value FROM testDataset GROUP BY value | ||
|
||
| """.stripMargin) | ||
|
|
||
| val overwrittenFiles = tmpDir.listFiles() | ||
| .filter(f => f.isFile && !f.getName.endsWith(".crc")) | ||
| .sortBy(_.getName) | ||
| val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) | ||
|
|
||
| assert(overwrittenFiles === overwrittenFilesWithoutEmpty) | ||
|
|
||
| sql("DROP TABLE table1") | ||
| } | ||
|
|
||
| test("Reject partitioning that does not match table") { | ||
| withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { | ||
| sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -879,6 +879,24 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-10216: Avoid empty files during overwriting with group by query") { | ||
| withTempPath { path => | ||
| val df = sqlContext.range(0, 5) | ||
| val groupedDF = df.groupBy("id").count() | ||
| groupedDF.write | ||
| .format(dataSourceName) | ||
| .mode(SaveMode.Overwrite) | ||
| .save(path.getCanonicalPath) | ||
|
||
|
|
||
| val overwrittenFiles = path.listFiles() | ||
| .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) | ||
| .sortBy(_.getName) | ||
| val overwrittenFilesWithoutEmpty = overwrittenFiles.filter(_.length > 0) | ||
|
|
||
| assert(overwrittenFiles === overwrittenFilesWithoutEmpty) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // This class is used to test SPARK-8578. We should not use any custom output committer when | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply added
iterator.hasNextcheck.