diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 2ac83c8ee6b38..4dacc03c72719 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -97,7 +97,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi } def getShards(): Seq[Shard] = { - kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala + kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq } def splitShard(shardId: String): Unit = { @@ -137,7 +137,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi * Expose a Python friendly API. */ def pushData(testData: java.util.List[Int]): Unit = { - pushData(testData.asScala, aggregate = false) + pushData(testData.asScala.toSeq, aggregate = false) } def deleteStream(): Unit = { @@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator( sentSeqNumbers += ((num, seqNumber)) } - shardIdToSeqNumbers.toMap + shardIdToSeqNumbers.mapValues(_.toSeq).toMap } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index af84498d5e47e..c76eb7c29dd94 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService) } producer.flushSync() - shardIdToSeqNumbers.toMap + shardIdToSeqNumbers.mapValues(_.toSeq).toMap } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 2c7b9c58e6fa6..12d950096b4c2 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} + shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap + shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => val seqNumRange = SequenceNumberRange( testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index fd8f6979c9e65..da7fe7cdda328 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -62,7 +62,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true") .list() .getItems - .asScala) + .asScala.toSeq) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala index d68dc3ebef5d8..5c192c690eba5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala @@ -131,7 +131,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul try { val snapshots = new ArrayList[ExecutorPodsSnapshot]() snapshotsBuffer.drainTo(snapshots) - onNewSnapshots(snapshots.asScala) + onNewSnapshots(snapshots.asScala.toSeq) } catch { case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e) } finally { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala index 9ac7e0222054a..6dc052a5e6894 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala @@ -35,7 +35,7 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore override def stop(): Unit = {} override def notifySubscribers(): Unit = { - subscribers.foreach(_(snapshotsBuffer)) + subscribers.foreach(_(snapshotsBuffer.toSeq)) snapshotsBuffer.clear() } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index 7d76a22e6d363..cce842ce62f01 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -47,6 +47,6 @@ object ProcessUtils extends Logging { assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}" + s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}") - outputLines + outputLines.toSeq } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 26939ef23eaab..e5c18539a01ae 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -383,13 +383,13 @@ private[spark] class MesosClusterScheduler( taskId.split(s"${RETRY_SEP}").head } - private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { + private def adjust[A, B](m: Map[A, B], k: A, default: B)(f: B => B) = { m.updated(k, f(m.getOrElse(k, default))) } private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { // TODO(mgummelt): Don't do this here. This should be passed as a --conf - val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( + val commandEnv = adjust(desc.command.environment.toMap, "SPARK_SUBMIT_OPTS", "")( v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}" ) @@ -686,14 +686,14 @@ private[spark] class MesosClusterScheduler( } scheduleTasks( - copyBuffer(driversToRetry), + copyBuffer(driversToRetry).toSeq, removeFromPendingRetryDrivers, currentOffers, tasks) // Then we walk through the queued drivers and try to schedule them. scheduleTasks( - copyBuffer(queuedDrivers), + copyBuffer(queuedDrivers).toSeq, removeFromQueuedDrivers, currentOffers, tasks) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5e7a29ac6d344..32cd50298bc6c 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -491,8 +491,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) // offerID -> resources - val remainingResources = mutable.Map(offers.map(offer => - (offer.getId.getValue, offer.getResourcesList)): _*) + val remainingResources = mutable.Map[String, JList[Resource]]() + remainingResources ++= offers.map(offer => (offer.getId.getValue, offer.getResourcesList)) var launchTasks = true diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 5784ee314aa17..2be8835f77e36 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -379,7 +379,7 @@ trait MesosSchedulerUtils extends Logging { } else { v.split(',').toSet } - ) + ).toMap } catch { case NonFatal(e) => throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index bb37bbd2d8046..2433c16c34af6 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -146,7 +146,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val resources = taskInfo.getResourcesList assert(scheduler.getResource(resources, "cpus") == 1.5) assert(scheduler.getResource(resources, "mem") == 1200) - val resourcesSeq: Seq[Resource] = resources.asScala + val resourcesSeq: Seq[Resource] = resources.asScala.toSeq val cpus = resourcesSeq.filter(_.getName == "cpus").toList assert(cpus.size == 2) assert(cpus.exists(_.getRole() == "role2")) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 36ed84858dbfb..67ecf3242f52d 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -267,7 +267,8 @@ class MesosFineGrainedSchedulerBackendSuite properties = new Properties(), resources = immutable.Map.empty[String, ResourceInformation], ByteBuffer.wrap(new Array[Byte](0))) - when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.resourceOffers( + expectedWorkerOffers.toIndexedSeq)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) @@ -379,7 +380,8 @@ class MesosFineGrainedSchedulerBackendSuite properties = new Properties(), resources = immutable.Map.empty[String, ResourceInformation], ByteBuffer.wrap(new Array[Byte](0))) - when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.resourceOffers( + expectedWorkerOffers.toIndexedSeq)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(1) val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7b121194d1b31..1045fb089c017 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -553,7 +553,7 @@ private[spark] class Client( } // Propagate the local URIs to the containers using the configuration. - sparkConf.set(SPARK_JARS, localJars) + sparkConf.set(SPARK_JARS, localJars.toSeq) case None => // No configuration, so fall back to uploading local jar files. @@ -628,7 +628,7 @@ private[spark] class Client( } } if (cachedSecondaryJarLinks.nonEmpty) { - sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks) + sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq) } if (isClusterMode && args.primaryPyFile != null) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index e6e0ea38ade94..e02fbd0c91495 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -91,11 +91,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * Writes down information about cached files needed in executors to the given configuration. */ def updateConfiguration(conf: SparkConf): Unit = { - conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString)) - conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size)) - conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime)) - conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name())) - conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name())) + conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString).toSeq) + conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size).toSeq) + conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime).toSeq) + conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()).toSeq) + conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()).toSeq) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index cd0e7d5c87bc8..dc093235288a9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -296,7 +296,7 @@ private[yarn] class YarnAllocator( val profResource = rpIdToYarnResource.get(id) val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource) .asScala.flatMap(_.asScala) - allContainerRequests(id) = result + allContainerRequests(id) = result.toSeq } allContainerRequests.toMap } @@ -426,13 +426,13 @@ private[yarn] class YarnAllocator( getNumExecutorsStarting, allocateResponse.getAvailableResources)) - handleAllocatedContainers(allocatedContainers.asScala) + handleAllocatedContainers(allocatedContainers.asScala.toSeq) } val completedContainers = allocateResponse.getCompletedContainersStatuses() if (completedContainers.size > 0) { logDebug("Completed %d containers".format(completedContainers.size)) - processCompletedContainers(completedContainers.asScala) + processCompletedContainers(completedContainers.asScala.toSeq) logDebug("Finished processing %d completed containers. Current running executor count: %d." .format(completedContainers.size, getNumExecutorsRunning)) } @@ -960,7 +960,7 @@ private[yarn] class YarnAllocator( } } - (localityMatched, localityUnMatched, localityFree) + (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 669e39fb7c1c7..ce46ffa06f0fe 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -73,8 +73,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, - arguments.resourcesFileOpt, resourceProfile) + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, + env, arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index 446669d08e76b..2dc06fcf6ce5a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -37,7 +37,7 @@ private[hive] trait SparkOperation extends Operation with Logging { protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString() - protected def cleanup(): Unit = Unit // noop by default + protected def cleanup(): Unit = () // noop by default abstract override def run(): Unit = { withLocalProperties {