Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -799,9 +799,19 @@ private[deploy] class Master(
}

private def relaunchDriver(driver: DriverInfo) {
driver.worker = None
driver.state = DriverState.RELAUNCHING
waitingDrivers += driver
// We must setup a new driver with a new driver id here, because the original driver may
// be still running. Consider this scenario: a worker is network partitioned with master,
// the master then relaunches driver driverID1 with a driver id driverID2, then the worker
// reconnects to master. From this point on, if driverID2 is equal to driverID1, then master
// can not distinguish the statusUpdate of the original driver and the newly relaunched one,
// for example, when DriverStateChanged(driverID1, KILLED) arrives at master, master will
// remove driverID1, so the newly relaunched driver disappears too. See SPARK-19900 for details.
removeDriver(driver.id, DriverState.RELAUNCHING, None)
val newDriver = createDriver(driver.desc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a good reason to remove and create the driver in this case? It looks like some kind of overkill compared to the old logic.

Copy link
Contributor Author

@lycplus lycplus May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, we must distinguish the original driver and the newly relaunched one, because there will be statusUpdate of the two versions to arrive at master. For example, when the network partitioned worker reconnects to master, it will send DriverStateChanged with the driver id, and master must recognize it is the state of the original driver and not state of the newly launched driver.

The patch simply choose a new driver id to do this, which also has some Shortcomings, however. For example, In the UI, the two versions of driver are not related, and the final state is RELAUNCHING(which seems better to be relaunched).

Another way is to add some like attemptId to driver state, and then Let DriverStateChanged bring the attemptId to indicate its entity. This seems more complex.

What's your opinion?

persistenceEngine.addDriver(newDriver)
drivers.add(newDriver)
waitingDrivers += newDriver

schedule()
}

Expand Down
109 changes: 109 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.deploy.master

import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.io.Source
Expand All @@ -40,6 +42,49 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer

object MockWorker {
val counter = new AtomicInteger(10000)
}

class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extends RpcEndpoint {
val seq = MockWorker.counter.incrementAndGet()
val id = seq.toString
override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
conf, new SecurityManager(conf))
var apps = new mutable.HashMap[String, String]()
val driverIdToAppId = new mutable.HashMap[String, String]()
def newDriver(driverId: String): RpcEndpointRef = {
val name = s"driver_${drivers.size}"
rpcEnv.setupEndpoint(name, new RpcEndpoint {
override val rpcEnv: RpcEnv = MockWorker.this.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(appId, _) =>
apps(appId) = appId
driverIdToAppId(driverId) = appId
}
})
}

val appDesc = DeployTestUtils.createAppDesc()
val drivers = new mutable.HashMap[String, String]
override def receive: PartialFunction[Any, Unit] = {
case RegisteredWorker(masterRef, _, _) =>
masterRef.send(WorkerLatestState(id, Nil, drivers.keys.toSeq))
case LaunchDriver(driverId, desc) =>
drivers(driverId) = driverId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems drivers can be a set instead of a map?

master.send(RegisterApplication(appDesc, newDriver(driverId)))
case KillDriver(driverId) =>
master.send(DriverStateChanged(driverId, DriverState.KILLED, None))
drivers.remove(driverId)
driverIdToAppId.get(driverId) match {
case Some(appId) =>
apps.remove(appId)
master.send(UnregisterApplication(appId))
}
driverIdToAppId.remove(driverId)
}
}

class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {

Expand Down Expand Up @@ -588,6 +633,70 @@ class MasterSuite extends SparkFunSuite
}
}

test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
val conf = new SparkConf().set("spark.worker.timeout", "1")
val master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move this out of the eventually {...} block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this can not be moved because MasterStateResponse is changed over time. If we move the rpc out, the masterState will never change, and the assert will fail.

See the above test SPARK-20529:..., there is a same eventually assert.

assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
}
val worker1 = new MockWorker(master.self)
worker1.rpcEnv.setupEndpoint("worker", worker1)
val worker1Reg = RegisterWorker(
worker1.id,
"localhost",
9998,
worker1.self,
10,
1024,
"http://localhost:8080",
RpcAddress("localhost2", 10000))
master.self.send(worker1Reg)
val driver = DeployTestUtils.createDriverDesc().copy(supervise = true)
master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))

eventually(timeout(10.seconds)) {
assert(worker1.apps.nonEmpty)
}

eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

assert(masterState.workers(0).state == WorkerState.DEAD)
}

val worker2 = new MockWorker(master.self)
worker2.rpcEnv.setupEndpoint("worker", worker2)
master.self.send(RegisterWorker(
worker2.id,
"localhost",
9999,
worker2.self,
10,
1024,
"http://localhost:8081",
RpcAddress("localhost", 10001)))
eventually(timeout(10.seconds)) {
assert(worker2.apps.nonEmpty)
}

master.self.send(worker1Reg)
eventually(timeout(10.seconds)) {
val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)

val worker = masterState.workers.filter(w => w.id == worker1.id)
assert(worker.length == 1)
// make sure the `DriverStateChanged` arrives at Master.
assert(worker(0).drivers.isEmpty)
assert(worker1.apps.isEmpty)
assert(worker1.drivers.isEmpty)
assert(worker2.apps.size == 1)
assert(worker2.drivers.size == 1)
assert(masterState.activeDrivers.length == 1)
assert(masterState.activeApps.length == 1)
}
}

private def getDrivers(master: Master): HashSet[DriverInfo] = {
master.invokePrivate(_drivers())
}
Expand Down