Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,42 @@ object WritableConverter {
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.

// The following implicit declarations have been added on top of the very similar ones
// below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta
// expansion of zero-arg methods and thus won't match a no-arg method where it expects
// an implicit that is a function of no args.

implicit val intWritableConverterFn: () => WritableConverter[Int] =
() => simpleWritableConverter[Int, IntWritable](_.get)

implicit val longWritableConverterFn: () => WritableConverter[Long] =
() => simpleWritableConverter[Long, LongWritable](_.get)

implicit val doubleWritableConverterFn: () => WritableConverter[Double] =
() => simpleWritableConverter[Double, DoubleWritable](_.get)

implicit val floatWritableConverterFn: () => WritableConverter[Float] =
() => simpleWritableConverter[Float, FloatWritable](_.get)

implicit val booleanWritableConverterFn: () => WritableConverter[Boolean] =
() => simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit val bytesWritableConverterFn: () => WritableConverter[Array[Byte]] = {
() => simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
}
}

implicit val stringWritableConverterFn: () => WritableConverter[String] =
() => simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverterFn[T <: Writable : ClassTag]: () => WritableConverter[T] =
() => new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])

// These implicits remain included for backwards-compatibility. They fulfill the
// same role as those above.

implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
pool.scheduleWithFixedDelay(
getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes avoid warnings about eta-expansion of zero-arg methods. It works fine in 2.11 as well; just not relying on syntactic sugar for the same.


if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
pool.scheduleWithFixedDelay(
getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
} else {
logDebug("Background update thread disabled for testing")
Expand Down Expand Up @@ -268,7 +270,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.adminAclsGroups.getOrElse("")
ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)))
} else {
None
}
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,9 @@ private[deploy] class Worker(
}
}(cleanupThreadExecutor)

cleanupFuture.onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}(cleanupThreadExecutor)
cleanupFuture.failed.foreach(e =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onFailure, onSuccess are deprecated in 2.12. This should be equivalent in 2.11+2.12

logError("App dir cleanup failed: " + e.getMessage, e)
)(cleanupThreadExecutor)

case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
Expand Down Expand Up @@ -622,10 +621,9 @@ private[deploy] class Worker(
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}(cleanupThreadExecutor).onFailure {
case e: Throwable =>
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
}(cleanupThreadExecutor)
}(cleanupThreadExecutor).failed.foreach(e =>
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
)(cleanupThreadExecutor)
}
shuffleService.applicationRemoved(id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ private[spark] class CoarseGrainedExecutorBackend(
if (notifyDriver && driver.nonEmpty) {
driver.get.ask[Boolean](
RemoveExecutor(executorId, new ExecutorLossReason(reason))
).onFailure { case e =>
).failed.foreach(e =>
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
}(ThreadUtils.sameThread)
)(ThreadUtils.sameThread)
}

System.exit(code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
}

executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}

override def acquireStorageMemory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: (Double, Double)) =>
(x._1.max(e), x._2.min(e))))
Iterator(
items.foldRight((Double.NegativeInfinity, Double.PositiveInfinity)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More warnings about auto converting 2 args to a single tuple arg

)((e: Double, x: (Double, Double)) => (x._1.max(e), x._2.min(e))))
}.reduce { (maxmin1, maxmin2) =>
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv(
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
promise.future.onFailure {
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{HashMap, HashSet, Stack}
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
Expand Down Expand Up @@ -396,12 +396,12 @@ class DAGScheduler(

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val ancestors = new Stack[ShuffleDependency[_, _, _]]
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stack is deprecated in 2.12 for poor performance; ArrayStack should work the same and be faster, in 2.11 too

val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
Expand Down Expand Up @@ -434,7 +434,7 @@ class DAGScheduler(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
Expand All @@ -456,7 +456,7 @@ class DAGScheduler(
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
Expand Down Expand Up @@ -1633,7 +1633,7 @@ class DAGScheduler(
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
// Only log the failure since we don't care about the result.
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
case t => logError(t.getMessage, t)
}(ThreadUtils.sameThread)
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
logError(t.getMessage, t))(ThreadUtils.sameThread)
}

protected def removeWorker(workerId: String, host: String, message: String): Unit = {
driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
case t => logError(t.getMessage, t)
}(ThreadUtils.sameThread)
driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
logError(t.getMessage, t))(ThreadUtils.sameThread)
}

def sufficientResourcesRegistered(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ private class JavaIterableWrapperSerializer
private object JavaIterableWrapperSerializer extends Logging {
// The class returned by JavaConverters.asJava
// (scala.collection.convert.Wrappers$IterableWrapper).
val wrapperClass =
scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
import scala.collection.JavaConverters._
val wrapperClass = Seq(1).asJava.getClass
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WrapAsJava is deprecated and this ought to be the equivalent to obtain a Java Iterable wrapper


// Get the underlying method so we can use it to get the Scala collection for serialization.
private val underlyingMethodOpt = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ class BlockManagerMaster(
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
future.failed.foreach(e =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
Expand All @@ -130,10 +129,9 @@ class BlockManagerMaster(
/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
future.failed.foreach(e =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
Expand All @@ -143,11 +141,10 @@ class BlockManagerMaster(
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
}(ThreadUtils.sameThread)
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ class BlockManagerSlaveEndpoint(
logDebug(actionMessage)
body
}
future.onSuccess { case response =>
logDebug("Done " + actionMessage + ", response is " + response)
future.foreach { response =>
logDebug(s"Done $actionMessage, response is $response")
context.reply(response)
logDebug("Sent response: " + response + " to " + context.senderAddress)
logDebug(s"Sent response: $response to ${context.senderAddress}")
}
future.onFailure { case t: Throwable =>
logError("Error in " + actionMessage, t)
future.failed.foreach { t =>
logError(s"Error in $actionMessage", t)
context.sendFailure(t)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private[spark] object UIWorkloadGenerator {
def nextFloat(): Float = new Random().nextFloat()

val jobs = Seq[(String, () => Long)](
("Count", baseData.count),
("Cache and Count", baseData.map(x => x).cache().count),
("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
("Entirely failed phase", baseData.map(x => throw new Exception).count),
("Partially failed phase", {
("Count", () => baseData.count),
("Cache and Count", () => baseData.map(x => x).cache().count),
("Single Shuffle", () => baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
("Entirely failed phase", () => baseData.map { x => throw new Exception(); 1 }.count),
("Partially failed phase", () => {
baseData.map{x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
Expand All @@ -79,7 +79,7 @@ private[spark] object UIWorkloadGenerator {
1
}.count
}),
("Partially failed phase (longer tasks)", {
("Partially failed phase (longer tasks)", () => {
baseData.map{x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
Expand All @@ -89,7 +89,7 @@ private[spark] object UIWorkloadGenerator {
1
}.count
}),
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
("Job with delays", () => baseData.map(x => Thread.sleep(100)).count)
)

val barrier = new Semaphore(-nJobSet * jobs.size + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver]
assert(resolver.getResolvers.size() === 4)
val expected = repos.split(",").map(r => s"$r/")
resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
if (1 < i && i < 3) {
assert(resolver.getName === s"repo-$i")
assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
}
resolver.getResolvers.toArray.map(_.asInstanceOf[AbstractResolver]).zipWithIndex.foreach {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ends up being interpreted as a partial function, which wasn't the intent, and generates a warning. Should be equivalent

case (r, i) =>
if (1 < i && i < 3) {
assert(r.getName === s"repo-$i")
assert(r.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
logDebug(s"getAppUI($appId, $attemptId)")
getAppUICount += 1
instances.get(CacheKey(appId, attemptId)).map( e =>
LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime)))
LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime)))
}

override def attachSparkUI(
Expand Down Expand Up @@ -122,7 +122,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
completed: Boolean,
timestamp: Long): Unit = {
instances += (CacheKey(appId, attemptId) ->
new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp))
new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
info("Should not have reached this code path (onComplete matching Failure)")
throw new Exception("Task should succeed")
}
f.onSuccess { case a: Any =>
f.foreach { a =>
sem.release()
}
f.onFailure { case t =>
f.failed.foreach { t =>
info("Should not have reached this code path (onFailure)")
throw new Exception("Task should succeed")
}
Expand Down Expand Up @@ -164,11 +164,11 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
case scala.util.Failure(e) =>
sem.release()
}
f.onSuccess { case a: Any =>
f.foreach { a =>
info("Should not have reached this code path (onSuccess)")
throw new Exception("Task should fail")
}
f.onFailure { case t =>
f.failed.foreach { t =>
sem.release()
}
intercept[SparkException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskFailed(taskDescription, fetchFailed)
case (1, _, partition) =>
backend.taskSuccess(taskDescription, 42 + partition)
case unmatched =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many new warnings about a match not being exhaustive; this is one case where it was clearly avoidable. I left other instances alone

fail(s"Unexpected shuffle output $unmatched")
}
}
withBackend(runBackend _) {
Expand Down
Loading