Skip to content

Commit d9bcb5a

Browse files
fenzhuGitHub Enterprise
authored andcommitted
[CARMEL-7358][REFACTOR] Improve limit only query (apache#142)
1 parent de30549 commit d9bcb5a

File tree

14 files changed

+1042
-56
lines changed

14 files changed

+1042
-56
lines changed
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import java.io.IOException
21+
import java.text.SimpleDateFormat
22+
import java.util.{Date, Locale}
23+
24+
import org.apache.hadoop.mapred._
25+
import org.apache.hadoop.mapred.lib.CombineFileSplit
26+
27+
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
28+
import org.apache.spark.annotation.DeveloperApi
29+
import org.apache.spark.broadcast.Broadcast
30+
import org.apache.spark.deploy.SparkHadoopUtil
31+
import org.apache.spark.errors.SparkCoreErrors
32+
import org.apache.spark.internal.Logging
33+
import org.apache.spark.internal.config.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES}
34+
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, TaskCompletionListener}
35+
36+
/**
37+
* :: DeveloperApi ::
38+
* An Hadoop RDD that read all the files in a single partition.
39+
*
40+
* @param sc The SparkContext to associate the RDD with.
41+
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
42+
* variable references an instance of JobConf, then that JobConf will be used for the Hadoop job.
43+
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
44+
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
45+
* creates.
46+
* @param inputFormatClass Storage format of the data to be read.
47+
* @param keyClass Class of the key associated with the inputFormatClass.
48+
* @param valueClass Class of the value associated with the inputFormatClass.
49+
*/
50+
@DeveloperApi
51+
class SinglePartitionHadoopRDD[K, V](
52+
sc: SparkContext,
53+
broadcastedConf: Broadcast[SerializableConfiguration],
54+
initLocalJobConfFuncOpt: Option[JobConf => Unit],
55+
inputFormatClass: Class[_ <: InputFormat[K, V]],
56+
keyClass: Class[K],
57+
valueClass: Class[V]) extends HadoopRDD[K, V](
58+
sc,
59+
broadcastedConf,
60+
initLocalJobConfFuncOpt,
61+
inputFormatClass,
62+
keyClass,
63+
valueClass,
64+
1) {
65+
66+
private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
67+
68+
private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
69+
70+
override def getPartitions: Array[Partition] = {
71+
val jobConf = getJobConf()
72+
// add the credentials here as this can be called before SparkContext initialized
73+
SparkHadoopUtil.get.addCredentials(jobConf)
74+
jobConf.setLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE,
75+
java.lang.Long.MAX_VALUE)
76+
try {
77+
val inputSplits = getInputFormat(jobConf).getSplits(jobConf, 1)
78+
79+
val allNonEmptySplits = inputSplits.filter(_.getLength > 0).map(_.asInstanceOf[FileSplit])
80+
val (files, lengths) = allNonEmptySplits.map(f => {
81+
(f.getPath, f.getLength)
82+
}).unzip
83+
84+
// Use CombineFileSplit to represent a single partition split.
85+
// Need to convert back to normal FileSplit in execute side.
86+
val array = new Array[Partition](1)
87+
array(0) = new HadoopPartition(id, 0, new CombineFileSplit(jobConf, files, lengths))
88+
array
89+
} catch {
90+
case e: InvalidInputException if ignoreMissingFiles =>
91+
val inputDir = jobConf.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR)
92+
logWarning(s"$inputDir doesn't exist and no partitions returned from this path.", e)
93+
Array.empty[Partition]
94+
case e: IOException if e.getMessage.startsWith("Not a file:") =>
95+
val path = e.getMessage.split(":").map(_.trim).apply(2)
96+
throw SparkCoreErrors.pathNotSupportedError(path)
97+
}
98+
}
99+
100+
override def compute(theSplit: Partition,
101+
context: TaskContext): InterruptibleIterator[(K, V)] = {
102+
HadoopRDD.addLocalConfiguration(
103+
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date()),
104+
context.stageId, theSplit.index, context.attemptNumber, getJobConf())
105+
106+
val combineSplit = theSplit.asInstanceOf[HadoopPartition]
107+
.inputSplit.value.asInstanceOf[CombineFileSplit]
108+
new InterruptibleIterator[(K, V)](context,
109+
new CombineFileSplitScanIterator(context, combineSplit, ignoreCorruptFiles))
110+
}
111+
112+
class CombineFileSplitScanIterator(
113+
context: TaskContext,
114+
split: CombineFileSplit,
115+
ignoreCorruptFiles: Boolean)
116+
extends Iterator[(K, V)] with AutoCloseable with Logging {
117+
val jobConf = getJobConf()
118+
val inputMetrics = context.taskMetrics().inputMetrics
119+
val existingBytesRead = inputMetrics.bytesRead
120+
val paths = split.getPaths
121+
val lengths = split.getLengths
122+
val fileSplits = paths.zip(lengths).map(f =>
123+
new FileSplit(f._1, 0, f._2, split.getJob)).toIterator
124+
125+
var currentFile: FileSplit = null
126+
var currentIterator: Iterator[(K, V)] = null
127+
128+
override def hasNext: Boolean = {
129+
hasNextRecord()
130+
}
131+
132+
override def next(): (K, V) = {
133+
currentIterator.next()
134+
}
135+
136+
override def close(): Unit = {}
137+
138+
protected def hasNextRecord(): Boolean = {
139+
(currentIterator != null && currentIterator.hasNext) || nextIterator()
140+
}
141+
142+
protected def nextIterator(): Boolean = {
143+
if (fileSplits.hasNext) {
144+
currentFile = fileSplits.next()
145+
logInfo(s"Reading File ${currentFile.getPath}")
146+
currentIterator = readCurrentFile()
147+
hasNextRecord()
148+
} else {
149+
currentFile = null
150+
InputFileBlockHolder.unset()
151+
false
152+
}
153+
}
154+
155+
protected def readCurrentFile() = {
156+
currentIterator = new NextIterator[(K, V)] {
157+
158+
// Sets InputFileBlockHolder for the file block's information
159+
InputFileBlockHolder.set(currentFile.getPath.toString,
160+
currentFile.getStart, currentFile.getLength)
161+
162+
// Find a function that will return the FileSystem bytes read by this thread.
163+
// Do this before creating RecordReader, because RecordReader's constructor
164+
// might read some bytes
165+
private val getBytesReadCallback: Option[() => Long] = {
166+
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
167+
}
168+
169+
// We get our input bytes from thread-local Hadoop FileSystem statistics.
170+
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
171+
// task and in the same thread, in which case we need to avoid override values written by
172+
// previous partitions (SPARK-13071).
173+
private def updateBytesRead(): Unit = {
174+
getBytesReadCallback.foreach { getBytesRead =>
175+
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
176+
}
177+
}
178+
179+
private val inputFormat = getInputFormat(jobConf)
180+
private var reader =
181+
try {
182+
inputFormat.getRecordReader(currentFile, jobConf, Reporter.NULL)
183+
} catch {
184+
case e: IOException if ignoreCorruptFiles =>
185+
logWarning(s"Skipped the rest content in " +
186+
s"the corrupted file: ${currentFile.getPath}", e)
187+
finished = true
188+
null
189+
}
190+
// Register an on-task-completion callback to close the input stream.
191+
context.addTaskCompletionListener(new TaskCompletionListener {
192+
override def onTaskCompletion(context: TaskContext): Unit = {
193+
// Update the bytes read before closing is to make sure lingering bytesRead statistics
194+
// in this thread get correctly added.
195+
updateBytesRead()
196+
closeIfNeeded()
197+
}
198+
})
199+
200+
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
201+
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
202+
203+
override def getNext(): (K, V) = {
204+
try {
205+
finished = !reader.next(key, value)
206+
} catch {
207+
case e: IOException if ignoreCorruptFiles =>
208+
logWarning(s"Skipped the rest content in " +
209+
s"the corrupted file: ${currentFile.getPath}", e)
210+
finished = true
211+
}
212+
if (!finished) {
213+
inputMetrics.incRecordsRead(1)
214+
}
215+
if (inputMetrics.recordsRead % SparkHadoopUtil
216+
.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
217+
updateBytesRead()
218+
}
219+
(key, value)
220+
}
221+
222+
override def close(): Unit = {
223+
if (reader != null) {
224+
InputFileBlockHolder.unset()
225+
try {
226+
reader.close()
227+
} catch {
228+
case e: Exception =>
229+
if (!ShutdownHookManager.inShutdown()) {
230+
logWarning("Exception in RecordReader.close()", e)
231+
}
232+
} finally {
233+
reader = null
234+
}
235+
if (getBytesReadCallback.isDefined) {
236+
updateBytesRead()
237+
} else {
238+
// If we can't get the bytes read from the FS stats, fall back to the split size,
239+
// which may be inaccurate.
240+
try {
241+
inputMetrics.incBytesRead(currentFile.getLength)
242+
} catch {
243+
case e: IOException =>
244+
logWarning("Unable to get input size to set InputMetrics for task", e)
245+
}
246+
}
247+
}
248+
}
249+
}
250+
currentIterator
251+
}
252+
}
253+
}

core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ private[spark] object HadoopFSUtils extends Logging {
6464
ignoreMissingFiles: Boolean,
6565
ignoreLocality: Boolean,
6666
parallelismThreshold: Int,
67-
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
67+
parallelismMax: Int,
68+
maxLeafFiles: Int = -1): Seq[(Path, Seq[FileStatus])] = {
6869
parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
69-
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
70+
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax, maxLeafFiles)
7071
}
7172

7273
private def parallelListLeafFilesInternal(
@@ -78,27 +79,57 @@ private[spark] object HadoopFSUtils extends Logging {
7879
ignoreMissingFiles: Boolean,
7980
ignoreLocality: Boolean,
8081
parallelismThreshold: Int,
81-
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
82+
parallelismMax: Int,
83+
maxLeafFiles: Int): Seq[(Path, Seq[FileStatus])] = {
8284

8385
// Short-circuits parallel listing when serial listing is likely to be faster.
84-
if (paths.size <= parallelismThreshold) {
86+
if ((maxLeafFiles > 0 && maxLeafFiles < paths.size) || paths.size <= parallelismThreshold) {
87+
var totalLimit = maxLeafFiles
8588
return paths.map { path =>
86-
val leafFiles = listLeafFiles(
87-
path,
88-
hadoopConf,
89-
filter,
90-
Some(sc),
91-
ignoreMissingFiles = ignoreMissingFiles,
92-
ignoreLocality = ignoreLocality,
93-
isRootPath = isRootLevel,
94-
parallelismThreshold = parallelismThreshold,
95-
parallelismMax = parallelismMax)
96-
(path, leafFiles)
89+
if (maxLeafFiles > 0) {
90+
if (totalLimit > 0) {
91+
val filesUnderPath = listLeafFiles(
92+
path,
93+
hadoopConf,
94+
filter,
95+
Some(sc),
96+
ignoreMissingFiles = ignoreMissingFiles,
97+
ignoreLocality = ignoreLocality,
98+
isRootPath = isRootLevel,
99+
parallelismThreshold = parallelismThreshold,
100+
parallelismMax = parallelismMax,
101+
maxLeafFiles = totalLimit)
102+
totalLimit -= filesUnderPath.length
103+
(path, filesUnderPath)
104+
} else {
105+
(path, Seq.empty)
106+
}
107+
108+
} else {
109+
val leafFiles = listLeafFiles(
110+
path,
111+
hadoopConf,
112+
filter,
113+
Some(sc),
114+
ignoreMissingFiles = ignoreMissingFiles,
115+
ignoreLocality = ignoreLocality,
116+
isRootPath = isRootLevel,
117+
parallelismThreshold = parallelismThreshold,
118+
parallelismMax = parallelismMax,
119+
maxLeafFiles = maxLeafFiles)
120+
(path, leafFiles)
121+
}
97122
}
98123
}
99124

100-
logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." +
101-
s" The first several paths are: ${paths.take(10).mkString(", ")}.")
125+
val logMsg = if (paths.size <= 5) {
126+
s"Listing leaf files and directories in parallel under: " +
127+
s"${paths.mkString(", ")}"
128+
} else {
129+
s"Listing leaf files and directories in parallel under: " +
130+
s"${paths.take(5).mkString(", ")}, ..."
131+
}
132+
logInfo(logMsg)
102133
HiveCatalogMetrics.incrementParallelListingJobCount(1)
103134

104135
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -131,7 +162,8 @@ private[spark] object HadoopFSUtils extends Logging {
131162
ignoreLocality = ignoreLocality,
132163
isRootPath = isRootLevel,
133164
parallelismThreshold = Int.MaxValue,
134-
parallelismMax = 0)
165+
parallelismMax = 0,
166+
maxLeafFiles = maxLeafFiles)
135167
(path, leafFiles)
136168
}
137169
}.collect()
@@ -158,7 +190,8 @@ private[spark] object HadoopFSUtils extends Logging {
158190
ignoreLocality: Boolean,
159191
isRootPath: Boolean,
160192
parallelismThreshold: Int,
161-
parallelismMax: Int): Seq[FileStatus] = {
193+
parallelismMax: Int,
194+
maxLeafFiles: Int): Seq[FileStatus] = {
162195

163196
logTrace(s"Listing $path")
164197
val fs = path.getFileSystem(hadoopConf)
@@ -204,8 +237,12 @@ private[spark] object HadoopFSUtils extends Logging {
204237
Array.empty[FileStatus]
205238
}
206239

207-
val filteredStatuses =
240+
val filteredStatuses = if (maxLeafFiles > 0) {
241+
statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))
242+
.take(maxLeafFiles)
243+
} else {
208244
statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))
245+
}
209246

210247
val allLeafStatuses = {
211248
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
@@ -220,7 +257,8 @@ private[spark] object HadoopFSUtils extends Logging {
220257
ignoreMissingFiles = ignoreMissingFiles,
221258
ignoreLocality = ignoreLocality,
222259
parallelismThreshold = parallelismThreshold,
223-
parallelismMax = parallelismMax
260+
parallelismMax = parallelismMax,
261+
maxLeafFiles = maxLeafFiles
224262
).flatMap(_._2)
225263
case _ =>
226264
dirs.flatMap { dir =>
@@ -233,7 +271,8 @@ private[spark] object HadoopFSUtils extends Logging {
233271
ignoreLocality = ignoreLocality,
234272
isRootPath = false,
235273
parallelismThreshold = parallelismThreshold,
236-
parallelismMax = parallelismMax)
274+
parallelismMax = parallelismMax,
275+
maxLeafFiles = maxLeafFiles)
237276
}
238277
}
239278
val filteredTopLevelFiles = if (filter != null) {

0 commit comments

Comments
 (0)