Skip to content

Commit 5bb4e93

Browse files
Sahil TakiarMarcelo Vanzin
authored andcommitted
[SPARK-20466][CORE] HadoopRDD#addLocalConfiguration throws NPE
## What changes were proposed in this pull request? Fix for SPARK-20466, full description of the issue in the JIRA. To summarize, `HadoopRDD` uses a metadata cache to cache `JobConf` objects. The cache uses soft-references, which means the JVM can delete entries from the cache whenever there is GC pressure. `HadoopRDD#getJobConf` had a bug where it would check if the cache contained the `JobConf`, if it did it would get the `JobConf` from the cache and return it. This doesn't work when soft-references are used as the JVM can delete the entry between the existence check and the get call. ## How was this patch tested? Haven't thought of a good way to test this yet given the issue only occurs sometimes, and happens during high GC pressure. Was thinking of using mocks to verify `#getJobConf` is doing the right thing. I deleted the method `HadoopRDD#containsCachedMetadata` so that we don't hit this issue again. Author: Sahil Takiar <[email protected]> Closes #19413 from sahilTakiar/master. (cherry picked from commit e36ec38) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 49e8ccc commit 5bb4e93

1 file changed

Lines changed: 18 additions & 15 deletions

File tree

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -165,20 +165,25 @@ class HadoopRDD[K, V](
165165
if (conf.isInstanceOf[JobConf]) {
166166
logDebug("Re-using user-broadcasted JobConf")
167167
conf.asInstanceOf[JobConf]
168-
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
169-
logDebug("Re-using cached JobConf")
170-
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
171168
} else {
172-
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
173-
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
174-
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
175-
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
176-
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
177-
logDebug("Creating new JobConf and caching it for later re-use")
178-
val newJobConf = new JobConf(conf)
179-
initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
180-
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
181-
newJobConf
169+
Option(HadoopRDD.getCachedMetadata(jobConfCacheKey))
170+
.map { conf =>
171+
logDebug("Re-using cached JobConf")
172+
conf.asInstanceOf[JobConf]
173+
}
174+
.getOrElse {
175+
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in
176+
// the local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
177+
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
178+
// objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097,
179+
// HADOOP-10456).
180+
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
181+
logDebug("Creating new JobConf and caching it for later re-use")
182+
val newJobConf = new JobConf(conf)
183+
initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
184+
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
185+
newJobConf
186+
}
182187
}
183188
}
184189
}
@@ -372,8 +377,6 @@ private[spark] object HadoopRDD extends Logging {
372377
*/
373378
def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key)
374379

375-
def containsCachedMetadata(key: String): Boolean = SparkEnv.get.hadoopJobMetadata.containsKey(key)
376-
377380
private def putCachedMetadata(key: String, value: Any): Unit =
378381
SparkEnv.get.hadoopJobMetadata.put(key, value)
379382

0 commit comments

Comments
 (0)