-
Notifications
You must be signed in to change notification settings - Fork 117
Dispatch tasks to right executors that have tasks' input HDFS data #216
Changes from 11 commits
200ce24
7499e3b
66e79d6
46f1140
23d287f
a026cc1
f56f3f9
177e1eb
a94522a
4a7738e
2aa7c6a
a772e7f
6b1e4b4
fef7ebc
e07b084
b3855d6
7085995
ee958b3
dc0755a
4ce3066
80decdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,13 @@ | |
| package org.apache.spark.deploy.kubernetes | ||
|
|
||
| import java.io.File | ||
| import java.util.concurrent.{ThreadFactory, ThreadPoolExecutor} | ||
|
|
||
| import com.google.common.base.Charsets | ||
| import com.google.common.io.Files | ||
| import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} | ||
| import io.fabric8.kubernetes.client.utils.HttpClientUtils | ||
| import okhttp3.Dispatcher | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.kubernetes.config._ | ||
|
|
@@ -78,6 +81,25 @@ private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: St | |
| } | ||
| serviceAccountConfigBuilder | ||
| } | ||
| new DefaultKubernetesClient(configBuilder.build) | ||
| val threadPoolExecutor = new Dispatcher().executorService().asInstanceOf[ThreadPoolExecutor] | ||
| // Set threads to be daemons in order to allow the driver main thread | ||
| // to shut down upon errors. Otherwise the driver will hang indefinitely. | ||
| threadPoolExecutor.setThreadFactory(new ThreadFactory { | ||
| override def newThread(r: Runnable): Thread = { | ||
| val thread = new Thread(r, "spark-on-k8s") | ||
| thread.setDaemon(true) | ||
| thread | ||
| } | ||
| }) | ||
| // Disable the ping thread that is not daemon, in order to allow | ||
| // the driver main thread to shut down upon errors. Otherwise, the driver | ||
| // will hang indefinitely. | ||
| val config = configBuilder | ||
| .withWebsocketPingInterval(0) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. definitely seems like kubernetes-client should run this as a daemon thread (meaning it doesn't prevent JVM exit per http://stackoverflow.com/questions/2213340/what-is-daemon-thread-in-java ). I don't think we want pinging the websocket to be worth keeping the JVM running for. Check out https://github.com/fabric8io/kubernetes-client/blob/2f3e6de212f848774775bac6108b2f4d57c41f2b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java#L55 and how Spark has a @kimoonkim can you please provide the full stacktrace of the thread that hangs the JVM shutdown?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ash211 @foxish For the web socket ping interval, I believe this is the relevant source code. From RealWebSocket.java: Notice it is calling For the other one, I think this is the source code. From Dispatcher.java: Again, passing Unfortunately, both are inside the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great investigation! With that work I think we have enough information to know what to change in okhttp upstream for it to correctly adhere to its stated contract on daemon threads. Do you feel ready to open an issue with accompanying PR on okhttp? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One can set the Dispatcher instance on okhttp clients also, such that it uses a daemon thread pool. See this in the context of retrofit: https://github.com/apache-spark-on-k8s/spark/pull/227/files#diff-716e31eb38cfd793ba5b2eea49cf5487R43 But Okhttp should certainly be doing this by default.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Let's try to open an issue/PR on okhttp. I'll work on that.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mccheah Right. We can change k8s client to specify a custom dispatcher. Or a custom executor service like this code did. We should keep that as an option in case fixing that part of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kimoonkim did you end up opening an issue in okhttp upstream? I couldn't find the issue if you did when I looked at https://github.com/square/okhttp square/okhttp#1890 seems like the most relevant issue. I think the custom dispatcher fixes this anyway though
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not yet. I was carried away by HDFS experiments. I'll open an issue today or tomorrow. Thanks for the pointer.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, the response in sqaure/okhttp#1890 says that the dispatcher shouldn't be a daemon. I'm going to file an issue for only the ping thread first. |
||
| .build() | ||
| val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() | ||
| .dispatcher(new Dispatcher(threadPoolExecutor)) | ||
| .build() | ||
| new DefaultKubernetesClient(httpClient, config) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,15 +16,55 @@ | |
| */ | ||
| package org.apache.spark.scheduler.cluster.kubernetes | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.SparkContext | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, | ||
| TaskScheduler, TaskSchedulerImpl, TaskSet, TaskSetManager} | ||
|
|
||
| private[spark] class KubernetesClusterManager extends ExternalClusterManager { | ||
|
|
||
| override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") | ||
|
|
||
| override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { | ||
| val scheduler = new TaskSchedulerImpl(sc) | ||
| val scheduler = new TaskSchedulerImpl(sc) { | ||
|
||
|
|
||
| override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { | ||
| new TaskSetManager(sched = this, taskSet, maxTaskFailures) { | ||
|
|
||
| /** | ||
| * Overrides the lookup to use not only the executor pod IP, but also the cluster node | ||
| * name and host IP address that the pod is running on. The base class may have populated | ||
| * the lookup target map with HDFS datanode locations if this task set reads HDFS data. | ||
| * Those datanode locations are based on cluster node names or host IP addresses. Using | ||
| * only executor pod IPs may not match them. | ||
| */ | ||
| override def getPendingTasksForHost(executorIP: String): ArrayBuffer[Int] = { | ||
| var pendingTasks = super.getPendingTasksForHost(executorIP) | ||
|
||
| if (pendingTasks.nonEmpty) { | ||
|
||
| return pendingTasks | ||
| } | ||
| val backend = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].backend.asInstanceOf[ | ||
|
||
| KubernetesClusterSchedulerBackend] | ||
| val pod = backend.getExecutorPodByIP(executorIP) | ||
| if (pod.isEmpty) { | ||
| return pendingTasks // Empty | ||
|
||
| } | ||
| val clusterNodeName = pod.get.getSpec.getNodeName | ||
| val clusterNodeIP = pod.get.getStatus.getHostIP | ||
| pendingTasks = super.getPendingTasksForHost(clusterNodeName) | ||
| if (pendingTasks.isEmpty) { | ||
| pendingTasks = super.getPendingTasksForHost(clusterNodeIP) | ||
| } | ||
| if (pendingTasks.nonEmpty && log.isDebugEnabled) { | ||
|
||
| logDebug(s"Got preferred task list $pendingTasks for executor host $executorIP " + | ||
| s"using cluster node $clusterNodeName at $clusterNodeIP") | ||
| } | ||
| pendingTasks | ||
| } | ||
| } | ||
| } | ||
| } | ||
| sc.taskScheduler = scheduler | ||
| scheduler | ||
| } | ||
|
|
@@ -37,6 +77,5 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager { | |
| override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { | ||
| scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,13 +16,18 @@ | |
| */ | ||
| package org.apache.spark.scheduler.cluster.kubernetes | ||
|
|
||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} | ||
| import java.io.Closeable | ||
| import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, | ||
| EnvVarSourceBuilder, Pod, QuantityBuilder} | ||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable | ||
| import scala.concurrent.{ExecutionContext, Future} | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, | ||
| EnvVarSourceBuilder, Pod, QuantityBuilder} | ||
| import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} | ||
| import io.fabric8.kubernetes.client.Watcher.Action | ||
|
|
||
| import org.apache.spark.{SparkContext, SparkException} | ||
| import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder | ||
| import org.apache.spark.deploy.kubernetes.config._ | ||
|
|
@@ -39,8 +44,11 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
|
|
||
| import KubernetesClusterSchedulerBackend._ | ||
|
|
||
| private val EXECUTOR_MODIFICATION_LOCK = new Object | ||
| private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] | ||
| private val RUNNING_EXECUTOR_PODS_LOCK = new Object | ||
| private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. | ||
|
|
||
| private val EXECUTOR_PODS_BY_IPS_LOCK = new Object | ||
| private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. | ||
|
|
||
| private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) | ||
| private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) | ||
|
|
@@ -93,6 +101,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| super.minRegisteredRatio | ||
| } | ||
|
|
||
| private val executorWatchResource = new AtomicReference[Closeable] | ||
| protected var totalExpectedExecutors = new AtomicInteger(0) | ||
|
|
||
| private val driverUrl = RpcEndpointAddress( | ||
|
|
@@ -125,6 +134,8 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
|
|
||
| override def start(): Unit = { | ||
| super.start() | ||
| executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) | ||
| .watch(new ExecutorPodsWatcher())) | ||
| if (!Utils.isDynamicAllocationEnabled(sc.conf)) { | ||
| doRequestTotalExecutors(initialExecutors) | ||
| } | ||
|
|
@@ -139,7 +150,16 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| // When using Utils.tryLogNonFatalError some of the code fails but without any logs or | ||
| // indication as to why. | ||
| try { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need a
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in the latest diff. |
||
| } | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs.clear() | ||
| } | ||
| val resource = executorWatchResource.getAndSet(null) | ||
| if (resource != null) { | ||
| resource.close() | ||
| } | ||
| } catch { | ||
| case e: Throwable => logError("Uncaught exception while shutting down controllers.", e) | ||
| } | ||
|
|
@@ -149,6 +169,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| case e: Throwable => logError("Uncaught exception while shutting down driver service.", e) | ||
| } | ||
| try { | ||
| logInfo("Closing kubernetes client") | ||
| kubernetesClient.close() | ||
| } catch { | ||
| case e: Throwable => logError("Uncaught exception closing Kubernetes client.", e) | ||
|
|
@@ -242,7 +263,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } | ||
|
|
||
| override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { | ||
| EXECUTOR_MODIFICATION_LOCK.synchronized { | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| if (requestedTotal > totalExpectedExecutors.get) { | ||
| logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" | ||
| + s" additional executors, expecting total $requestedTotal and currently" + | ||
|
|
@@ -257,7 +278,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } | ||
|
|
||
| override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { | ||
| EXECUTOR_MODIFICATION_LOCK.synchronized { | ||
| RUNNING_EXECUTOR_PODS_LOCK.synchronized { | ||
| for (executor <- executorIds) { | ||
| runningExecutorPods.remove(executor) match { | ||
| case Some(pod) => kubernetesClient.pods().delete(pod) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. push the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
|
@@ -267,6 +288,39 @@ private[spark] class KubernetesClusterSchedulerBackend( | |
| } | ||
| true | ||
| } | ||
|
|
||
| def getExecutorPodByIP(podIP: String): Option[Pod] = { | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs.get(podIP) | ||
| } | ||
| } | ||
|
|
||
| private class ExecutorPodsWatcher extends Watcher[Pod] { | ||
|
|
||
| override def eventReceived(action: Action, pod: Pod): Unit = { | ||
| if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" | ||
| && pod.getMetadata.getDeletionTimestamp == null) { | ||
| val podName = pod.getMetadata.getName | ||
| val podIP = pod.getStatus.getPodIP | ||
| val clusterNodeName = pod.getSpec.getNodeName | ||
| logDebug(s"Executor pod $pod ready, launched at $clusterNodeName as IP $podIP.") | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs += ((podIP, pod)) | ||
| } | ||
| } else if (action == Action.MODIFIED && pod.getMetadata.getDeletionTimestamp != null) { | ||
|
||
| val podName = pod.getMetadata.getName | ||
| val podIP = pod.getStatus.getPodIP | ||
| logDebug(s"Executor pod $podName at IP $podIP was deleted.") | ||
| EXECUTOR_PODS_BY_IPS_LOCK.synchronized { | ||
| executorPodsByIPs -= podIP | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we also need to remove from This should probably be the place to prune dead executors out of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel that this can be better handled by @varunkatta with the upcoming recovery handling code. His PR would also use the same watcher, but he has a lot more context of how different pieces work together for dead executors. |
||
| } | ||
| } | ||
|
|
||
| override def onClose(cause: KubernetesClientException): Unit = { | ||
| logDebug("Executor pod watch closed.", cause) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private object KubernetesClusterSchedulerBackend { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@foxish Can you take a look at this part of code? I don't like the fact that we have to do this much workaround to avoid hanging. I wonder if we want to change the upstream k8s client or okhttp code to make this easier. If yes, I'm willing to send a PR to them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem like an issue that you have to do this much to get exceptions in the main thread to terminate the JVM. Let's figure it out in the upstream library as that seems to be where the problem is.