diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1821bc87bf62..cec61d85ccf3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2826,6 +2826,42 @@ object WritableConverter {
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
+ // The following implicit declarations have been added on top of the very similar ones
+ // below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta
+ // expansion of zero-arg methods and thus won't match a no-arg method where it expects
+ // an implicit that is a function of no args.
+
+ implicit val intWritableConverterFn: () => WritableConverter[Int] =
+ () => simpleWritableConverter[Int, IntWritable](_.get)
+
+ implicit val longWritableConverterFn: () => WritableConverter[Long] =
+ () => simpleWritableConverter[Long, LongWritable](_.get)
+
+ implicit val doubleWritableConverterFn: () => WritableConverter[Double] =
+ () => simpleWritableConverter[Double, DoubleWritable](_.get)
+
+ implicit val floatWritableConverterFn: () => WritableConverter[Float] =
+ () => simpleWritableConverter[Float, FloatWritable](_.get)
+
+ implicit val booleanWritableConverterFn: () => WritableConverter[Boolean] =
+ () => simpleWritableConverter[Boolean, BooleanWritable](_.get)
+
+ implicit val bytesWritableConverterFn: () => WritableConverter[Array[Byte]] = {
+ () => simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
+ // getBytes method returns array which is longer then data to be returned
+ Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
+ }
+ }
+
+ implicit val stringWritableConverterFn: () => WritableConverter[String] =
+ () => simpleWritableConverter[String, Text](_.toString)
+
+ implicit def writableWritableConverterFn[T <: Writable : ClassTag]: () => WritableConverter[T] =
+ () => new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
+
+ // These implicits remain included for backwards-compatibility. They fulfill the
+ // same role as those above.
+
implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 20fe911f2d29..910121e9878b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -218,11 +218,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
- pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
+ pool.scheduleWithFixedDelay(
+ getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
- pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
+ pool.scheduleWithFixedDelay(
+ getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
} else {
logDebug("Background update thread disabled for testing")
@@ -268,7 +270,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.adminAclsGroups.getOrElse("")
ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
- Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
+ Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize)))
} else {
None
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 29a810fe7abe..ed5fa4b839cd 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -450,10 +450,9 @@ private[deploy] class Worker(
}
}(cleanupThreadExecutor)
- cleanupFuture.onFailure {
- case e: Throwable =>
- logError("App dir cleanup failed: " + e.getMessage, e)
- }(cleanupThreadExecutor)
+ cleanupFuture.failed.foreach(e =>
+ logError("App dir cleanup failed: " + e.getMessage, e)
+ )(cleanupThreadExecutor)
case MasterChanged(masterRef, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
@@ -622,10 +621,9 @@ private[deploy] class Worker(
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
- }(cleanupThreadExecutor).onFailure {
- case e: Throwable =>
- logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
- }(cleanupThreadExecutor)
+ }(cleanupThreadExecutor).failed.foreach(e =>
+ logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+ )(cleanupThreadExecutor)
}
shuffleService.applicationRemoved(id)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed893cd1e948..d27362ae85be 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -163,9 +163,9 @@ private[spark] class CoarseGrainedExecutorBackend(
if (notifyDriver && driver.nonEmpty) {
driver.get.ask[Boolean](
RemoveExecutor(executorId, new ExecutorLossReason(reason))
- ).onFailure { case e =>
+ ).failed.foreach(e =>
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
- }(ThreadUtils.sameThread)
+ )(ThreadUtils.sameThread)
}
System.exit(code)
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index df193552bed3..78edd2c4d7fa 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -143,7 +143,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
}
executionPool.acquireMemory(
- numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
+ numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}
override def acquireStorageMemory(
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 57782c0ebf33..943abae17a91 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -128,9 +128,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
- Iterator(items.foldRight(Double.NegativeInfinity,
- Double.PositiveInfinity)((e: Double, x: (Double, Double)) =>
- (x._1.max(e), x._2.min(e))))
+ Iterator(
+ items.foldRight((Double.NegativeInfinity, Double.PositiveInfinity)
+ )((e: Double, x: (Double, Double)) => (x._1.max(e), x._2.min(e))))
}.reduce { (maxmin1, maxmin2) =>
(maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 1777e7a53975..f951591e02a5 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv(
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
- promise.future.onFailure {
+ promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 562dd1da4fe1..9153751d03c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.Map
-import scala.collection.mutable.{HashMap, HashSet, Stack}
+import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
import scala.language.postfixOps
@@ -396,12 +396,12 @@ class DAGScheduler(
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
- rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
- val ancestors = new Stack[ShuffleDependency[_, _, _]]
+ rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
+ val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
- val waitingForVisit = new Stack[RDD[_]]
+ val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
@@ -434,7 +434,7 @@ class DAGScheduler(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
- val waitingForVisit = new Stack[RDD[_]]
+ val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
@@ -456,7 +456,7 @@ class DAGScheduler(
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
- val waitingForVisit = new Stack[RDD[_]]
+ val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
@@ -1633,7 +1633,7 @@ class DAGScheduler(
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
- val waitingForVisit = new Stack[RDD[_]]
+ val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a0ef20977930..424e43b25c77 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -471,15 +471,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
// Only log the failure since we don't care about the result.
- driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
- case t => logError(t.getMessage, t)
- }(ThreadUtils.sameThread)
+ driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
+ logError(t.getMessage, t))(ThreadUtils.sameThread)
}
protected def removeWorker(workerId: String, host: String, message: String): Unit = {
- driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
- case t => logError(t.getMessage, t)
- }(ThreadUtils.sameThread)
+ driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t =>
+ logError(t.getMessage, t))(ThreadUtils.sameThread)
}
def sufficientResourcesRegistered(): Boolean = true
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 4f03e54e304f..58483c9577d2 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -501,8 +501,8 @@ private class JavaIterableWrapperSerializer
private object JavaIterableWrapperSerializer extends Logging {
// The class returned by JavaConverters.asJava
// (scala.collection.convert.Wrappers$IterableWrapper).
- val wrapperClass =
- scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass
+ import scala.collection.JavaConverters._
+ val wrapperClass = Seq(1).asJava.getClass
// Get the underlying method so we can use it to get the Scala collection for serialization.
private val underlyingMethodOpt = {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index ea5d8423a588..8b1dc0ba6356 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -118,10 +118,9 @@ class BlockManagerMaster(
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
- future.onFailure {
- case e: Exception =>
- logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
- }(ThreadUtils.sameThread)
+ future.failed.foreach(e =>
+ logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
+ )(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
@@ -130,10 +129,9 @@ class BlockManagerMaster(
/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
- future.onFailure {
- case e: Exception =>
- logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
- }(ThreadUtils.sameThread)
+ future.failed.foreach(e =>
+ logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
+ )(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
@@ -143,11 +141,10 @@ class BlockManagerMaster(
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
- future.onFailure {
- case e: Exception =>
- logWarning(s"Failed to remove broadcast $broadcastId" +
- s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
- }(ThreadUtils.sameThread)
+ future.failed.foreach(e =>
+ logWarning(s"Failed to remove broadcast $broadcastId" +
+ s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
+ )(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 1aaa42459df6..742cf4fe393f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -85,13 +85,13 @@ class BlockManagerSlaveEndpoint(
logDebug(actionMessage)
body
}
- future.onSuccess { case response =>
- logDebug("Done " + actionMessage + ", response is " + response)
+ future.foreach { response =>
+ logDebug(s"Done $actionMessage, response is $response")
context.reply(response)
- logDebug("Sent response: " + response + " to " + context.senderAddress)
+ logDebug(s"Sent response: $response to ${context.senderAddress}")
}
- future.onFailure { case t: Throwable =>
- logError("Error in " + actionMessage, t)
+ future.failed.foreach { t =>
+ logError(s"Error in $actionMessage", t)
context.sendFailure(t)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 094953f2f5b5..6229e800957d 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -66,11 +66,11 @@ private[spark] object UIWorkloadGenerator {
def nextFloat(): Float = new Random().nextFloat()
val jobs = Seq[(String, () => Long)](
- ("Count", baseData.count),
- ("Cache and Count", baseData.map(x => x).cache().count),
- ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
- ("Entirely failed phase", baseData.map(x => throw new Exception).count),
- ("Partially failed phase", {
+ ("Count", () => baseData.count),
+ ("Cache and Count", () => baseData.map(x => x).cache().count),
+ ("Single Shuffle", () => baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
+ ("Entirely failed phase", () => baseData.map { x => throw new Exception(); 1 }.count),
+ ("Partially failed phase", () => {
baseData.map{x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
@@ -79,7 +79,7 @@ private[spark] object UIWorkloadGenerator {
1
}.count
}),
- ("Partially failed phase (longer tasks)", {
+ ("Partially failed phase (longer tasks)", () => {
baseData.map{x =>
val probFailure = (4.0 / NUM_PARTITIONS)
if (nextFloat() < probFailure) {
@@ -89,7 +89,7 @@ private[spark] object UIWorkloadGenerator {
1
}.count
}),
- ("Job with delays", baseData.map(x => Thread.sleep(100)).count)
+ ("Job with delays", () => baseData.map(x => Thread.sleep(100)).count)
)
val barrier = new Semaphore(-nJobSet * jobs.size + 1)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 88b77e514342..eb8c203ae775 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -83,11 +83,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver]
assert(resolver.getResolvers.size() === 4)
val expected = repos.split(",").map(r => s"$r/")
- resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) =>
- if (1 < i && i < 3) {
- assert(resolver.getName === s"repo-$i")
- assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
- }
+ resolver.getResolvers.toArray.map(_.asInstanceOf[AbstractResolver]).zipWithIndex.foreach {
+ case (r, i) =>
+ if (1 < i && i < 3) {
+ assert(r.getName === s"repo-$i")
+ assert(r.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1))
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index c175ed3fb6e3..6e50e8454904 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -78,7 +78,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
logDebug(s"getAppUI($appId, $attemptId)")
getAppUICount += 1
instances.get(CacheKey(appId, attemptId)).map( e =>
- LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime)))
+ LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime)))
}
override def attachSparkUI(
@@ -122,7 +122,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
completed: Boolean,
timestamp: Long): Unit = {
instances += (CacheKey(appId, attemptId) ->
- new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp))
+ new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp))
}
/**
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index f4be8eaef701..de0e71a332f2 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -130,10 +130,10 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
info("Should not have reached this code path (onComplete matching Failure)")
throw new Exception("Task should succeed")
}
- f.onSuccess { case a: Any =>
+ f.foreach { a =>
sem.release()
}
- f.onFailure { case t =>
+ f.failed.foreach { t =>
info("Should not have reached this code path (onFailure)")
throw new Exception("Task should succeed")
}
@@ -164,11 +164,11 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
case scala.util.Failure(e) =>
sem.release()
}
- f.onSuccess { case a: Any =>
+ f.foreach { a =>
info("Should not have reached this code path (onSuccess)")
throw new Exception("Task should fail")
}
- f.onFailure { case t =>
+ f.failed.foreach { t =>
sem.release()
}
intercept[SparkException] {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index a8249e123fa0..75ea409e16b4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -625,6 +625,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskFailed(taskDescription, fetchFailed)
case (1, _, partition) =>
backend.taskSuccess(taskDescription, 42 + partition)
+ case unmatched =>
+ fail(s"Unexpected shuffle output $unmatched")
}
}
withBackend(runBackend _) {
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
index 64be96627614..a1cf3570a7a6 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -78,10 +78,10 @@ class KryoBenchmark extends SparkFunSuite {
sum
}
}
- basicTypes("Int", Random.nextInt)
- basicTypes("Long", Random.nextLong)
- basicTypes("Float", Random.nextFloat)
- basicTypes("Double", Random.nextDouble)
+ basicTypes("Int", () => Random.nextInt())
+ basicTypes("Long", () => Random.nextLong())
+ basicTypes("Float", () => Random.nextFloat())
+ basicTypes("Double", () => Random.nextDouble())
// Benchmark Array of Primitives
val arrayCount = 10000
@@ -101,10 +101,10 @@ class KryoBenchmark extends SparkFunSuite {
sum
}
}
- basicTypeArray("Int", Random.nextInt)
- basicTypeArray("Long", Random.nextLong)
- basicTypeArray("Float", Random.nextFloat)
- basicTypeArray("Double", Random.nextDouble)
+ basicTypeArray("Int", () => Random.nextInt())
+ basicTypeArray("Long", () => Random.nextLong())
+ basicTypeArray("Float", () => Random.nextFloat())
+ basicTypeArray("Double", () => Random.nextDouble())
// Benchmark Maps
val mapsCount = 1000
diff --git a/examples/pom.xml b/examples/pom.xml
index 33eca4864572..52a6764ae26a 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -114,7 +114,7 @@
com.github.scopt
scopt_${scala.binary.version}
- 3.3.0
+ 3.7.0
com.twitter
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 1c93079497f6..4324cc6d0f80 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -61,11 +61,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with
}
test("flume polling test") {
- testMultipleTimes(testFlumePolling)
+ testMultipleTimes(() => testFlumePolling())
}
test("flume polling test multiple hosts") {
- testMultipleTimes(testFlumePollingMultipleHost)
+ testMultipleTimes(() => testFlumePollingMultipleHost())
}
/**
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 0f61a10e9b4a..0c9f0aa765a3 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -102,8 +102,19 @@
+
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
+
+
+
+ scala-2.12
+
+ 0.10.1.1
+
+
+
+
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 066a68a5dd31..2df8352f4866 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -173,7 +173,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
AdminUtils.createTopic(zkUtils, topic, partitions, 1)
created = true
} catch {
- case e: kafka.common.TopicExistsException if overwrite => deleteTopic(topic)
+ // Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and
+ // org.apache.kafka.common.errors in 0.10.1 (!)
+ case e: Exception if (e.getClass.getSimpleName == "TopicExistsException") && overwrite =>
+ deleteTopic(topic)
}
}
// wait until metadata is propagated
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 4d9861af1cc0..6eb7ba5f0092 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -87,8 +87,19 @@
+
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
+
+
+
+ scala-2.12
+
+ 0.10.1.1
+
+
+
+
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index b76dc5f93193..103082b7b976 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -250,8 +250,9 @@ object Pipeline extends MLReadable[Pipeline] {
// Save stages
val stagesDir = new Path(path, "stages").toString
- stages.zipWithIndex.foreach { case (stage: MLWritable, idx: Int) =>
- stage.write.save(getStagePath(stage.uid, idx, stages.length, stagesDir))
+ stages.zipWithIndex.foreach { case (stage, idx) =>
+ stage.asInstanceOf[MLWritable].write.save(
+ getStagePath(stage.uid, idx, stages.length, stagesDir))
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 1c97d77d3894..ce400f4f1faf 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -184,7 +184,7 @@ class LinearSVC @Since("2.2.0") (
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
- new MultivariateOnlineSummarizer, new MultiClassSummarizer
+ (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
)(seqOp, combOp, $(aggregationDepth))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index cbc8f4a2d8c2..fa191604218d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -514,7 +514,7 @@ class LogisticRegression @Since("1.2.0") (
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
- new MultivariateOnlineSummarizer, new MultiClassSummarizer
+ (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
)(seqOp, combOp, $(aggregationDepth))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index 92a7742f6c86..3ab99b35ece2 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -235,7 +235,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] {
val extraJson = ("labelMetadata" -> instance.labelMetadata.json) ~
("numClasses" -> instance.models.length)
OneVsRestParams.saveImpl(path, instance, sc, Some(extraJson))
- instance.models.zipWithIndex.foreach { case (model: MLWritable, idx) =>
+ instance.models.map(_.asInstanceOf[MLWritable]).zipWithIndex.foreach { case (model, idx) =>
val modelPath = new Path(path, s"model_$idx").toString
model.save(modelPath)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index b2a968118d1a..df1aa609c1b7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -265,7 +265,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.treeAggregate(
- new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+ (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)
)(seqOp, combOp, $(aggregationDepth))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index f7d969f4ca5d..acfc6399c553 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -169,7 +169,7 @@ private[spark] object RandomForest extends Logging {
training the same tree in the next iteration. This focus allows us to send fewer trees to
workers on each iteration; see topNodesForGroup below.
*/
- val nodeStack = new mutable.Stack[(Int, LearningNode)]
+ val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val rng = new Random()
rng.setSeed(seed)
@@ -367,7 +367,7 @@ private[spark] object RandomForest extends Logging {
nodesForGroup: Map[Int, Array[LearningNode]],
treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]],
splits: Array[Array[Split]],
- nodeStack: mutable.Stack[(Int, LearningNode)],
+ nodeStack: mutable.ArrayStack[(Int, LearningNode)],
timer: TimeTracker = new TimeTracker,
nodeIdCache: Option[NodeIdCache] = None): Unit = {
@@ -1076,7 +1076,7 @@ private[spark] object RandomForest extends Logging {
* The feature indices are None if not subsampling features.
*/
private[tree] def selectNodesToSplit(
- nodeStack: mutable.Stack[(Int, LearningNode)],
+ nodeStack: mutable.ArrayStack[(Int, LearningNode)],
maxMemoryUsage: Long,
metadata: DecisionTreeMetadata,
rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
index 4c7746869dde..f151a6a01b65 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala
@@ -162,7 +162,7 @@ private[spark] abstract class ImpurityCalculator(val stats: Array[Double]) exten
* Fails if the array is empty.
*/
protected def indexOfLargestArrayElement(array: Array[Double]): Int = {
- val result = array.foldLeft(-1, Double.MinValue, 0) {
+ val result = array.foldLeft((-1, Double.MinValue, 0)) {
case ((maxIndex, maxValue, currentIndex), currentValue) =>
if (currentValue > maxValue) {
(currentIndex, currentValue, currentIndex + 1)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
index d7cdeae30be2..9fddf09babb0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala
@@ -174,7 +174,7 @@ object DifferentiableLossAggregatorSuite {
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.aggregate(
- new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+ (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)
)(seqOp, combOp)
}
@@ -191,7 +191,7 @@ object DifferentiableLossAggregatorSuite {
(c1._1.merge(c2._1), c1._2.merge(c2._2))
instances.aggregate(
- new MultivariateOnlineSummarizer, new MultiClassSummarizer
+ (new MultivariateOnlineSummarizer, new MultiClassSummarizer)
)(seqOp, combOp)
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
index df155b464c64..dbe2ea931fb9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
@@ -324,7 +324,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val treeToNodeToIndexInfo = Map((0, Map(
(topNode.id, new RandomForest.NodeIndexInfo(0, None))
)))
- val nodeStack = new mutable.Stack[(Int, LearningNode)]
+ val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
@@ -366,7 +366,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val treeToNodeToIndexInfo = Map((0, Map(
(topNode.id, new RandomForest.NodeIndexInfo(0, None))
)))
- val nodeStack = new mutable.Stack[(Int, LearningNode)]
+ val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
@@ -478,7 +478,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val failString = s"Failed on test with:" +
s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," +
s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
- val nodeStack = new mutable.Stack[(Int, LearningNode)]
+ val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees)
Range(0, numTrees).foreach { treeIndex =>
topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1)
diff --git a/repl/pom.xml b/repl/pom.xml
index 51eb9b60dd54..bd2cfc465aaf 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -171,7 +171,6 @@
-
diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
new file mode 100644
index 000000000000..413594021987
--- /dev/null
+++ b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io.BufferedReader
+
+// scalastyle:off println
+import scala.Predef.{println => _, _}
+// scalastyle:on println
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
+import scala.tools.nsc.util.stringFromStream
+import scala.util.Properties.{javaVersion, javaVmName, versionString}
+
+/**
+ * A Spark-specific interactive shell.
+ */
+class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
+ extends ILoop(in0, out) {
+ def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
+ def this() = this(None, new JPrintWriter(Console.out, true))
+
+ def initializeSpark() {
+ intp.beQuietDuring {
+ processLine("""
+ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
+ org.apache.spark.repl.Main.sparkSession
+ } else {
+ org.apache.spark.repl.Main.createSparkSession()
+ }
+ @transient val sc = {
+ val _sc = spark.sparkContext
+ if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+ val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+ if (proxyUrl != null) {
+ println(
+ s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
+ } else {
+ println(s"Spark Context Web UI is available at Spark Master Public URL")
+ }
+ } else {
+ _sc.uiWebUrl.foreach {
+ webUrl => println(s"Spark context Web UI available at ${webUrl}")
+ }
+ }
+ println("Spark context available as 'sc' " +
+ s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
+ println("Spark session available as 'spark'.")
+ _sc
+ }
+ """)
+ processLine("import org.apache.spark.SparkContext._")
+ processLine("import spark.implicits._")
+ processLine("import spark.sql")
+ processLine("import org.apache.spark.sql.functions._")
+ }
+ }
+
+ /** Print a welcome message */
+ override def printWelcome() {
+ import org.apache.spark.SPARK_VERSION
+ echo("""Welcome to
+ ____ __
+ / __/__ ___ _____/ /__
+ _\ \/ _ \/ _ `/ __/ '_/
+ /___/ .__/\_,_/_/ /_/\_\ version %s
+ /_/
+ """.format(SPARK_VERSION))
+ val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
+ versionString, javaVmName, javaVersion)
+ echo(welcomeMsg)
+ echo("Type in expressions to have them evaluated.")
+ echo("Type :help for more information.")
+ }
+
+ /** Available commands */
+ override def commands: List[LoopCommand] = standardCommands
+
+ /**
+ * We override `createInterpreter` because we need to initialize Spark *before* the REPL
+ * sees any files, so that the Spark context is visible in those files. This is a bit of a
+ * hack, but there isn't another hook available to us at this point.
+ */
+ override def createInterpreter(): Unit = {
+ super.createInterpreter()
+ initializeSpark()
+ }
+
+ override def resetCommand(line: String): Unit = {
+ super.resetCommand(line)
+ initializeSpark()
+ echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
+ }
+}
+
+object SparkILoop {
+
+ /**
+ * Creates an interpreter loop with default settings and feeds
+ * the given code to it as input.
+ */
+ def run(code: String, sets: Settings = new Settings): String = {
+ import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
+
+ stringFromStream { ostream =>
+ Console.withOut(ostream) {
+ val input = new BufferedReader(new StringReader(code))
+ val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
+ val repl = new SparkILoop(input, output)
+
+ if (sets.classpath.isDefault) {
+ sets.classpath.value = sys.props("java.class.path")
+ }
+ repl process sets
+ }
+ }
+ }
+ def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
+}
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala
similarity index 100%
rename from repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
rename to repl/src/main/scala/org/apache/spark/repl/Main.scala
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
similarity index 100%
rename from repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
rename to repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
similarity index 100%
rename from repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
rename to repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
index 8761ae4020e5..4894036e2746 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
@@ -179,7 +179,7 @@ case class Percentile(
val sortedCounts = buffer.toSeq.sortBy(_._1)(
child.dataType.asInstanceOf[NumericType].ordering.asInstanceOf[Ordering[AnyRef]])
- val accumlatedCounts = sortedCounts.scanLeft(sortedCounts.head._1, 0L) {
+ val accumlatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) {
case ((key1, count1), (key2, count2)) => (key2, count1 + count2)
}.tail
val maxPosition = accumlatedCounts.last._2 - 1
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 3aa4bf619f27..352fb545f4b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -177,7 +177,7 @@ object Metadata {
private def toJsonValue(obj: Any): JValue = {
obj match {
case map: Map[_, _] =>
- val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) }
+ val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) }
JObject(fields)
case arr: Array[_] =>
val values = arr.toList.map(toJsonValue)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index c35e5638e927..65ca37491b6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -96,7 +96,7 @@ case class GenerateExec(
} else {
outputRows.map(joinedRow.withRight)
}
- } ++ LazyIterator(boundGenerator.terminate).map { row =>
+ } ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
// we leave the left side as the last element of its child output
// keep it the same as Hive does
joinedRow.withRight(row)
@@ -109,7 +109,7 @@ case class GenerateExec(
} else {
outputRows
}
- } ++ LazyIterator(boundGenerator.terminate)
+ } ++ LazyIterator(() => boundGenerator.terminate())
}
// Convert the rows to unsafe rows.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index d74aae35250f..203d44971751 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -119,6 +119,7 @@ class InMemoryFileIndex(
case None =>
pathsToFetch += path
}
+ Unit // for some reasons scalac 2.12 needs this; return type doesn't matter
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 0e41f3c7aa6b..7d6d7e7eef92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -205,7 +205,7 @@ class UnivocityParser(
}
throw BadRecordException(
() => getCurrentInput,
- getPartialResult,
+ () => getPartialResult(),
new RuntimeException("Malformed CSV record"))
} else {
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 1b6a28cde293..f8058b2f7813 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -216,7 +216,7 @@ private[joins] class UnsafeHashedRelation(
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
- read(in.readInt, in.readLong, in.readFully)
+ read(() => in.readInt(), () => in.readLong(), in.readFully)
}
private def read(
@@ -277,7 +277,7 @@ private[joins] class UnsafeHashedRelation(
}
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
- read(in.readInt, in.readLong, in.readBytes)
+ read(() => in.readInt(), () => in.readLong(), in.readBytes)
}
override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
@@ -766,11 +766,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
override def readExternal(in: ObjectInput): Unit = {
- read(in.readBoolean, in.readLong, in.readFully)
+ read(() => in.readBoolean(), () => in.readLong(), in.readFully)
}
override def read(kryo: Kryo, in: Input): Unit = {
- read(in.readBoolean, in.readLong, in.readBytes)
+ read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
}
/**
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 13b006fc48ac..c132cab1b38c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -1334,6 +1334,10 @@ public String toString() {
return "BeanWithEnum(" + enumField + ", " + regularField + ")";
}
+ public int hashCode() {
+ return Objects.hashCode(enumField, regularField);
+ }
+
public boolean equals(Object other) {
if (other instanceof BeanWithEnum) {
BeanWithEnum beanWithEnum = (BeanWithEnum) other;
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 09502d05f770..247c30e2ee65 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -230,11 +230,9 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
val resNaN1 = dfNaN.stat.approxQuantile("input1", Array(q1, q2), epsilon)
assert(resNaN1.count(_.isNaN) === 0)
- assert(resNaN1.count(_ == null) === 0)
val resNaN2 = dfNaN.stat.approxQuantile("input2", Array(q1, q2), epsilon)
assert(resNaN2.count(_.isNaN) === 0)
- assert(resNaN2.count(_ == null) === 0)
val resNaN3 = dfNaN.stat.approxQuantile("input3", Array(q1, q2), epsilon)
assert(resNaN3.isEmpty)
@@ -242,7 +240,6 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
val resNaNAll = dfNaN.stat.approxQuantile(Array("input1", "input2", "input3"),
Array(q1, q2), epsilon)
assert(resNaNAll.flatten.count(_.isNaN) === 0)
- assert(resNaNAll.flatten.count(_ == null) === 0)
assert(resNaN1(0) === resNaNAll(0)(0))
assert(resNaN1(1) === resNaNAll(0)(1))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
index 66c0263e872b..044bb03480aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
import java.util.UUID
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -32,39 +33,47 @@ import org.apache.spark.sql.test.SharedSQLContext
class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits._
- super.beforeAll()
- private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+ private var baseDf: DataFrame = null
- testEnsureStatefulOpPartitioning(
- "ClusteredDistribution generates Exchange with HashPartitioning",
- baseDf.queryExecution.sparkPlan,
- requiredDistribution = keys => ClusteredDistribution(keys),
- expectedPartitioning =
- keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
- expectShuffle = true)
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+ }
+
+ test("ClusteredDistribution generates Exchange with HashPartitioning") {
+ testEnsureStatefulOpPartitioning(
+ baseDf.queryExecution.sparkPlan,
+ requiredDistribution = keys => ClusteredDistribution(keys),
+ expectedPartitioning =
+ keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
+ expectShuffle = true)
+ }
- testEnsureStatefulOpPartitioning(
- "ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning",
- baseDf.coalesce(1).queryExecution.sparkPlan,
- requiredDistribution = keys => ClusteredDistribution(keys),
- expectedPartitioning =
- keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
- expectShuffle = true)
+ test("ClusteredDistribution with coalesce(1) generates Exchange with HashPartitioning") {
+ testEnsureStatefulOpPartitioning(
+ baseDf.coalesce(1).queryExecution.sparkPlan,
+ requiredDistribution = keys => ClusteredDistribution(keys),
+ expectedPartitioning =
+ keys => HashPartitioning(keys, spark.sessionState.conf.numShufflePartitions),
+ expectShuffle = true)
+ }
- testEnsureStatefulOpPartitioning(
- "AllTuples generates Exchange with SinglePartition",
- baseDf.queryExecution.sparkPlan,
- requiredDistribution = _ => AllTuples,
- expectedPartitioning = _ => SinglePartition,
- expectShuffle = true)
+ test("AllTuples generates Exchange with SinglePartition") {
+ testEnsureStatefulOpPartitioning(
+ baseDf.queryExecution.sparkPlan,
+ requiredDistribution = _ => AllTuples,
+ expectedPartitioning = _ => SinglePartition,
+ expectShuffle = true)
+ }
- testEnsureStatefulOpPartitioning(
- "AllTuples with coalesce(1) doesn't need Exchange",
- baseDf.coalesce(1).queryExecution.sparkPlan,
- requiredDistribution = _ => AllTuples,
- expectedPartitioning = _ => SinglePartition,
- expectShuffle = false)
+ test("AllTuples with coalesce(1) doesn't need Exchange") {
+ testEnsureStatefulOpPartitioning(
+ baseDf.coalesce(1).queryExecution.sparkPlan,
+ requiredDistribution = _ => AllTuples,
+ expectedPartitioning = _ => SinglePartition,
+ expectShuffle = false)
+ }
/**
* For `StatefulOperator` with the given `requiredChildDistribution`, and child SparkPlan
@@ -72,26 +81,23 @@ class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with SharedSQLCont
* ensure the expected partitioning.
*/
private def testEnsureStatefulOpPartitioning(
- testName: String,
inputPlan: SparkPlan,
requiredDistribution: Seq[Attribute] => Distribution,
expectedPartitioning: Seq[Attribute] => Partitioning,
expectShuffle: Boolean): Unit = {
- test(testName) {
- val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1)))
- val executed = executePlan(operator, OutputMode.Complete())
- if (expectShuffle) {
- val exchange = executed.children.find(_.isInstanceOf[Exchange])
- if (exchange.isEmpty) {
- fail(s"Was expecting an exchange but didn't get one in:\n$executed")
- }
- assert(exchange.get ===
- ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan),
- s"Exchange didn't have expected properties:\n${exchange.get}")
- } else {
- assert(!executed.children.exists(_.isInstanceOf[Exchange]),
- s"Unexpected exchange found in:\n$executed")
+ val operator = TestStatefulOperator(inputPlan, requiredDistribution(inputPlan.output.take(1)))
+ val executed = executePlan(operator, OutputMode.Complete())
+ if (expectShuffle) {
+ val exchange = executed.children.find(_.isInstanceOf[Exchange])
+ if (exchange.isEmpty) {
+ fail(s"Was expecting an exchange but didn't get one in:\n$executed")
}
+ assert(exchange.get ===
+ ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), inputPlan),
+ s"Exchange didn't have expected properties:\n${exchange.get}")
+ } else {
+ assert(!executed.children.exists(_.isInstanceOf[Exchange]),
+ s"Unexpected exchange found in:\n$executed")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 2986b7f1eecf..46eec736d402 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -289,7 +289,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
- AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, testBehaviorFor)
+ AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc(), testBehaviorFor)
}
/** Stop a random active query either with `stop()` or with an error */
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index a5a8e2640586..3135a8a275da 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -63,6 +63,16 @@
${hive.group}
hive-beeline
+
+ org.eclipse.jetty
+ jetty-server
+ provided
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ provided
+
org.seleniumhq.selenium
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8c7418ec7ac1..027403816f53 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -596,7 +596,7 @@ class StreamingContext private[streaming] (
}
logDebug("Adding shutdown hook") // force eager creation of logger
shutdownHookRef = ShutdownHookManager.addShutdownHook(
- StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+ StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)