Skip to content

Commit 22feb41

Browse files
committed
Merge branch 'ESPARK-116' into 'spark_2.1'
[ESPARK-116] 快速合并小文件,支持ORC、RC、Parquet、TextFile等文件格式 快速合并小文件,支持`ORC`、`RC`、`Parquet`、`TextFile`等文件格式 1、对于orc、rc文件,直接调用hive原来的方法进行合并 2、parquet `1.8.2`当中已经集成了快速合并的代码,参照orc的文件合并实现了一下 3、TextFile则是直接把数据块里的内容读出来,再写到新文件去,不做任何加工 4、对于不是以上4种格式的文件,在Optimize阶段就在后面追加了一个`distribute by rand()`过程执行合并 5、支持动态分区和非动态分区两种情况 6、合并的过程是顺序合并,不会打乱文件原来的顺序 7、当合并过程失败,忽略错误,使用合并前的文件作为结果 8、合并的速度接近于直接磁盘读写的速度 resolve apache#116 See merge request !77
2 parents 22e568f + ffab86a commit 22feb41

9 files changed

Lines changed: 728 additions & 15 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,16 @@ object SQLConf {
145145
.booleanConf
146146
.createWithDefault(false)
147147

148+
val MERGE_FILE_PER_TASK = SQLConfigBuilder("spark.sql.hive.merge.size.per.task")
149+
.doc("The size of one file")
150+
.bytesConf(ByteUnit.BYTE)
151+
.createWithDefault(240 * 1024 * 1024)
152+
153+
val MERGE_SMALLFILE_SIZE = SQLConfigBuilder("spark.sql.hive.merge.smallfile.size")
154+
.doc("The average size of smallfile")
155+
.bytesConf(ByteUnit.BYTE)
156+
.createWithDefault(80 * 1024 * 1024)
157+
148158
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
149159
SQLConfigBuilder("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
150160
.doc("The target post-shuffle input size in bytes of a task.")
@@ -790,6 +800,8 @@ class SQLConf extends Serializable with Logging {
790800

791801
def mergeHiveFiles: Boolean = getConf(MERGE_HIVEFILES)
792802

803+
def mergeFileSize: Long = getConf(MERGE_FILE_PER_TASK)
804+
793805
def targetPostShuffleInputSize: Long =
794806
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
795807

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ import org.apache.spark.sql.types._
5252
* When set to false, use standard format defined in parquet-format spec. This argument only
5353
* affects Parquet write path.
5454
*/
55-
private[parquet] class ParquetSchemaConverter(
55+
private[spark] class ParquetSchemaConverter(
5656
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
5757
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
5858
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
@@ -63,10 +63,12 @@ private[parquet] class ParquetSchemaConverter(
6363
writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
6464

6565
def this(conf: Configuration) = this(
66-
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
67-
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
68-
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
69-
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get.toString).toBoolean)
66+
assumeBinaryIsString = conf.getBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key,
67+
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
68+
assumeInt96IsTimestamp = conf.getBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
69+
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
70+
writeLegacyParquetFormat = conf.getBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
71+
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get))
7072

7173
/**
7274
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
3535

3636
private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize
3737

38+
private def mergeFileSize: Long = conf.mergeFileSize * 4
39+
3840
private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
3941

4042
private def minNumPostShufflePartitions: Option[Int] = {
@@ -95,8 +97,18 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
9597
minNumPostShufflePartitions)
9698
children.zip(requiredChildDistributions).map {
9799
case (e: ShuffleExchange, _) =>
98-
// This child is an Exchange, we need to add the coordinator.
99-
e.copy(coordinator = Some(coordinator))
100+
if (e.newPartitioning.asInstanceOf[HashPartitioning]
101+
.toString.contains("SparkMergeTask")) {
102+
val mergeCoordinator =
103+
new ExchangeCoordinator(
104+
children.length,
105+
mergeFileSize,
106+
minNumPostShufflePartitions)
107+
e.copy(coordinator = Some(mergeCoordinator))
108+
} else {
109+
// This child is an Exchange, we need to add the coordinator.
110+
e.copy(coordinator = Some(coordinator))
111+
}
100112
case (child, distribution) =>
101113
// If this child is not an Exchange, we need to add an Exchange for now.
102114
// Ideally, we can try to avoid this Exchange. However, when we reach here,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizerRules.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
3030
import org.apache.spark.sql.catalyst.rules.Rule
3131
import org.apache.spark.sql.execution.command.DDLUtils
3232
import org.apache.spark.sql.execution.datasources.CreateTable
33+
import org.apache.spark.sql.hive.merge.MergeUtils
3334

3435
case class DeterminePartitionedTableStats(sparkSession: SparkSession)
3536
extends Rule[LogicalPlan] with PredicateHelper {
@@ -107,18 +108,23 @@ case class MergeSmallFiles(sparkSession: SparkSession) extends Rule[LogicalPlan]
107108
plan transformDown {
108109
case InsertIntoTable(table: MetastoreRelation, partition,
109110
child, overwrite, ifNotExists) if !child.isInstanceOf[Sort] &&
110-
!child.children.exists(a => a.isInstanceOf[RepartitionByExpression] &&
111-
!a.isInstanceOf[Repartition]) && !table.databaseName.contains("temp") =>
112-
val rand = Alias(new Rand(), "_nondeterministic")()
111+
!child.children.exists(a => a.isInstanceOf[RepartitionByExpression] ||
112+
a.isInstanceOf[Repartition] ||
113+
a.isInstanceOf[Sort]) && !table.databaseName.contains("temp") &&
114+
!MergeUtils.SUPPORTED_FORMAT.contains(table.tableDesc.getOutputFileFormatClassName) =>
115+
val rand = Alias(new Rand(), "SparkMergeTask")()
113116
val newProjected = Project(child.output :+ rand, child)
114117
val mergeFileStage = RepartitionByExpression(Seq(rand.toAttribute), newProjected, None)
115118
val finalOutput = Project(child.output, mergeFileStage)
116119
InsertIntoTable(table: MetastoreRelation, partition, finalOutput, overwrite, ifNotExists)
117120
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" &&
118121
!query.isInstanceOf[Sort] &&
119122
!query.children.exists(a => a.isInstanceOf[RepartitionByExpression]
120-
&& !a.isInstanceOf[Repartition]) && !tableDesc.database.contains("temp") =>
121-
CreateTable(tableDesc, mode, Some(RepartitionByExpression(Seq(new Rand()), query, None)))
123+
|| a.isInstanceOf[Repartition] ||
124+
a.isInstanceOf[Sort]) && !tableDesc.database.contains("temp") &&
125+
!MergeUtils.SUPPORTED_FORMAT.contains(tableDesc.storage.outputFormat.get) =>
126+
CreateTable(tableDesc, mode, Some(RepartitionByExpression(
127+
Seq(Alias(new Rand(), "SparkMergeTask")()), query, None)))
122128
}
123129
}
124130
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@ import java.security.PrivilegedExceptionAction
2323
import java.text.SimpleDateFormat
2424
import java.util.{Date, Locale, Random}
2525

26+
import scala.collection.JavaConverters._
2627
import scala.util.control.NonFatal
2728

2829
import org.apache.hadoop.conf.Configuration
2930
import org.apache.hadoop.fs.{FileSystem, Path}
3031
import org.apache.hadoop.hdfs.DistributedFileSystem
3132
import org.apache.hadoop.hive.common.FileUtils
32-
import org.apache.hadoop.hive.ql.exec.TaskRunner
3333
import org.apache.hadoop.hive.ql.ErrorMsg
34+
import org.apache.hadoop.hive.ql.exec.{TaskRunner, Utilities}
35+
import org.apache.hadoop.hive.ql.io.HiveOutputFormat
36+
import org.apache.hadoop.io.Writable
3437
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
3538
import org.apache.hadoop.security.UserGroupInformation
3639

40+
import org.apache.spark.SparkException
3741
import org.apache.spark.rdd.RDD
3842
import org.apache.spark.sql.AnalysisException
3943
import org.apache.spark.sql.catalyst.InternalRow
@@ -42,8 +46,9 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
4246
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
4347
import org.apache.spark.sql.hive._
4448
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
45-
import org.apache.spark.SparkException
46-
import org.apache.spark.util.SerializableJobConf
49+
import org.apache.spark.sql.hive.merge.MergeUtils
50+
import org.apache.spark.sql.internal.SQLConf
51+
import org.apache.spark.util.{RpcUtils, SerializableJobConf}
4752

4853

4954
/**
@@ -94,6 +99,14 @@ case class InsertIntoHiveTable(
9499
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
95100
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
96101

102+
private val avgConditionSize = sqlContext.sparkSession.conf
103+
.get(SQLConf.MERGE_SMALLFILE_SIZE)
104+
private val outputAverageSize = sqlContext.sparkSession
105+
.conf.get(SQLConf.MERGE_FILE_PER_TASK)
106+
private val mergeHiveFiles = sqlContext.sparkSession.sessionState.conf.mergeHiveFiles
107+
private val targetFileSize = Math.max(avgConditionSize, outputAverageSize)
108+
private val retryWaitMs = RpcUtils.retryWaitMs(sqlContext.sparkContext.conf)
109+
97110
private def executionId: String = {
98111
val rand: Random = new Random
99112
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
@@ -222,6 +235,69 @@ case class InsertIntoHiveTable(
222235
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
223236
}
224237

238+
private def mergeFile(
239+
path: Path,
240+
fs: FileSystem,
241+
fileSinkConf: FileSinkDesc,
242+
conf: SerializableJobConf,
243+
directRenamePathList: java.util.List[String],
244+
speculationEnabled: Boolean): Unit = {
245+
val hiveOutputFormat = conf.value.getOutputFormat
246+
.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
247+
val extension = Utilities.getFileExtension(conf.value,
248+
fileSinkConf.getCompressed, hiveOutputFormat)
249+
val outputClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
250+
val outputDir = path.toString
251+
val tmpMergeLocation = MergeUtils.getExternalMergeTmpPath(path, conf.value)
252+
val tmpMergeLocationDir = tmpMergeLocation.toString
253+
fileSinkConf.dir = tmpMergeLocation.toString
254+
val waitTime = retryWaitMs
255+
val numDynamicPartitions = partition.values.count(_.isEmpty)
256+
if (numDynamicPartitions > 0) {
257+
val mergeRules = MergeUtils.generateDynamicMergeRule(fs, path,
258+
conf.value, avgConditionSize, targetFileSize, directRenamePathList)
259+
sparkContext.union(mergeRules.map { r =>
260+
val groupSize = Math.ceil(r.files.size * 1d / r.numFiles).toInt
261+
val groupedFiles = r.files.toArray.grouped(groupSize).map(x => (r.path, x)).toArray
262+
MergeUtils.mergePathRDD(sparkContext, groupedFiles, groupedFiles.size)
263+
}).foreach { case (partOutputDir, files) =>
264+
val tmpPartMergeLocationDir = partOutputDir.replace("-ext-10000", MergeUtils.TEMP_DIR)
265+
MergeUtils.mergeAction(conf, outputClassName, files, partOutputDir, tmpPartMergeLocationDir,
266+
extension, waitTime)
267+
}
268+
if (speculationEnabled) {
269+
mergeRules.foreach { r =>
270+
val specFiles = fs.listStatus(
271+
new Path(r.path.toString.replace("-ext-10000", MergeUtils.TEMP_DIR)))
272+
.filter(!_.getPath.getName.startsWith("part"))
273+
specFiles.foreach(f => fs.delete(f.getPath))
274+
}
275+
}
276+
} else {
277+
val numFiles = MergeUtils.getTargetFileNum(path, conf.value,
278+
avgConditionSize, targetFileSize)
279+
if (numFiles > 0) {
280+
val files = fs.listStatus(path).filter(_.getLen > 0).map(_.getPath.toString)
281+
val groupSize = Math.ceil(files.size * 1d / numFiles).toInt
282+
val groupedFiles = files.grouped(groupSize).toArray
283+
fileSinkConf.dir = tmpMergeLocation.toString
284+
sparkContext.parallelize(groupedFiles, groupedFiles.size).foreach { files =>
285+
MergeUtils.mergeAction(conf, outputClassName, files, outputDir, tmpMergeLocationDir,
286+
extension, waitTime)
287+
}
288+
if (speculationEnabled) {
289+
val specFiles = fs.listStatus(tmpMergeLocation)
290+
.filter(!_.getPath.getName.startsWith("part"))
291+
specFiles.foreach(f => fs.delete(f.getPath))
292+
}
293+
if (conf.value.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
294+
fs.createNewFile(new Path(tmpMergeLocationDir + "/_SUCCESS"))
295+
}
296+
}
297+
}
298+
FileOutputFormat.setOutputPath(conf.value, tmpMergeLocation)
299+
}
300+
225301
private def saveAsHiveFile(
226302
rdd: RDD[InternalRow],
227303
valueClass: Class[_],
@@ -310,6 +386,7 @@ case class InsertIntoHiveTable(
310386
}
311387

312388
val jobConf = new JobConf(hadoopConf)
389+
jobConf.set(MergeUtils.SCHEMA, table.attributes.toStructType.json)
313390
val jobConfSer = new SerializableJobConf(jobConf)
314391

315392
// When speculation is on and output committer class name contains "Direct", we should warn
@@ -342,6 +419,38 @@ case class InsertIntoHiveTable(
342419
@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
343420
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
344421

422+
val outputFormatClass = fileSinkConf.getTableInfo.getOutputFileFormatClassName
423+
if (mergeHiveFiles && targetFileSize > 0 && !table.databaseName.contains("temp") &&
424+
MergeUtils.SUPPORTED_FORMAT.contains(outputFormatClass)) {
425+
val directRenamePathList = new java.util.ArrayList[String]()
426+
val rollbackPathList = new java.util.ArrayList[String]()
427+
val fs = tmpLocation.getFileSystem(jobConf)
428+
try {
429+
mergeFile(tmpLocation, fs, fileSinkConf, jobConfSer,
430+
directRenamePathList, speculationEnabled)
431+
if (!directRenamePathList.isEmpty) {
432+
directRenamePathList.asScala.foreach { path =>
433+
val destPath = path.replace("-ext-10000", MergeUtils.TEMP_DIR)
434+
rollbackPathList.add(path)
435+
logInfo("rename [" + path + " to " + destPath + "]")
436+
fs.rename(new Path(path), new Path(destPath))
437+
}
438+
}
439+
} catch {
440+
case ex: Exception =>
441+
logInfo("Merge file of " + tmpLocation + " failed!", ex)
442+
fileSinkConf.dir = tmpLocation.toString
443+
if (!rollbackPathList.isEmpty) {
444+
rollbackPathList.asScala.foreach { path =>
445+
val srcPath = path.replace("-ext-10000", MergeUtils.TEMP_DIR)
446+
logInfo("rename [" + srcPath + " to "
447+
+ path + "]")
448+
fs.rename(new Path(srcPath), new Path(path))
449+
}
450+
}
451+
}
452+
}
453+
345454
val outputPath = FileOutputFormat.getOutputPath(jobConf)
346455
// TODO: Correctly set holdDDLTime.
347456
// In most of the time, we should have holdDDLTime = false.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.merge
19+
20+
import org.apache.hadoop.mapred.RecordReader
21+
22+
case class StringWrapper(var file: String)
23+
24+
class FileRecordReader(file: String) extends RecordReader[StringWrapper, StringWrapper] {
25+
var isClose = false
26+
27+
override def next(k: StringWrapper, v: StringWrapper): Boolean = {
28+
if (!isClose) {
29+
k.file = file
30+
v.file = file
31+
isClose = true
32+
true
33+
} else {
34+
false
35+
}
36+
}
37+
38+
override def getProgress: Float = if (isClose) 1 else 0
39+
40+
override def getPos: Long = if (isClose) 1 else 0
41+
42+
override def createKey(): StringWrapper = StringWrapper(file)
43+
44+
override def close(): Unit = {}
45+
46+
override def createValue(): StringWrapper = StringWrapper(file)
47+
}

0 commit comments

Comments
 (0)