@@ -27,15 +27,14 @@ import scala.language.postfixOps
2727import akka .actor .Address
2828import org .json4s ._
2929import org .json4s .jackson .JsonMethods ._
30- import org .scalatest .{ Matchers , PrivateMethodTester }
30+ import org .scalatest .Matchers
3131import org .scalatest .concurrent .Eventually
3232import other .supplier .{CustomPersistenceEngine , CustomRecoveryModeFactory }
3333
34- import org .apache .spark .{SecurityManager , SparkConf , SparkException , SparkFunSuite }
34+ import org .apache .spark .{SparkConf , SparkException , SparkFunSuite }
3535import org .apache .spark .deploy ._
36- import org .apache .spark .rpc .RpcEnv
3736
38- class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester {
37+ class MasterSuite extends SparkFunSuite with Matchers with Eventually {
3938
4039 test(" toAkkaUrl" ) {
4140 val conf = new SparkConf (loadDefaults = false )
@@ -185,196 +184,4 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
185184 }
186185 }
187186
188- test(" basic scheduling - spread out" ) {
189- testBasicScheduling(spreadOut = true )
190- }
191-
192- test(" basic scheduling - no spread out" ) {
193- testBasicScheduling(spreadOut = false )
194- }
195-
196- test(" scheduling with max cores - spread out" ) {
197- testSchedulingWithMaxCores(spreadOut = true )
198- }
199-
200- test(" scheduling with max cores - no spread out" ) {
201- testSchedulingWithMaxCores(spreadOut = false )
202- }
203-
204- test(" scheduling with cores per executor - spread out" ) {
205- testSchedulingWithCoresPerExecutor(spreadOut = true )
206- }
207-
208- test(" scheduling with cores per executor - no spread out" ) {
209- testSchedulingWithCoresPerExecutor(spreadOut = false )
210- }
211-
212- test(" scheduling with cores per executor AND max cores - spread out" ) {
213- testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true )
214- }
215-
216- test(" scheduling with cores per executor AND max cores - no spread out" ) {
217- testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false )
218- }
219-
220- private def testBasicScheduling (spreadOut : Boolean ): Unit = {
221- val master = makeMaster()
222- val appInfo = makeAppInfo(1024 )
223- val workerInfo = makeWorkerInfo(4096 , 10 )
224- val workerInfos = Array (workerInfo, workerInfo, workerInfo)
225- val scheduledCores = master.invokePrivate(
226- _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
227- assert(scheduledCores.length === 3 )
228- assert(scheduledCores(0 ) === 10 )
229- assert(scheduledCores(1 ) === 10 )
230- assert(scheduledCores(2 ) === 10 )
231- }
232-
233- private def testSchedulingWithMaxCores (spreadOut : Boolean ): Unit = {
234- val master = makeMaster()
235- val appInfo1 = makeAppInfo(1024 , maxCores = Some (8 ))
236- val appInfo2 = makeAppInfo(1024 , maxCores = Some (16 ))
237- val workerInfo = makeWorkerInfo(4096 , 10 )
238- val workerInfos = Array (workerInfo, workerInfo, workerInfo)
239- var scheduledCores = master.invokePrivate(
240- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
241- assert(scheduledCores.length === 3 )
242- // With spreading out, each worker should be assigned a few cores
243- if (spreadOut) {
244- assert(scheduledCores(0 ) === 3 )
245- assert(scheduledCores(1 ) === 3 )
246- assert(scheduledCores(2 ) === 2 )
247- } else {
248- // Without spreading out, the cores should be concentrated on the first worker
249- assert(scheduledCores(0 ) === 8 )
250- assert(scheduledCores(1 ) === 0 )
251- assert(scheduledCores(2 ) === 0 )
252- }
253- // Now test the same thing with max cores > cores per worker
254- scheduledCores = master.invokePrivate(
255- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
256- assert(scheduledCores.length === 3 )
257- if (spreadOut) {
258- assert(scheduledCores(0 ) === 6 )
259- assert(scheduledCores(1 ) === 5 )
260- assert(scheduledCores(2 ) === 5 )
261- } else {
262- // Without spreading out, the first worker should be fully booked,
263- // and the leftover cores should spill over to the second worker only.
264- assert(scheduledCores(0 ) === 10 )
265- assert(scheduledCores(1 ) === 6 )
266- assert(scheduledCores(2 ) === 0 )
267- }
268- }
269-
270- private def testSchedulingWithCoresPerExecutor (spreadOut : Boolean ): Unit = {
271- val master = makeMaster()
272- val appInfo1 = makeAppInfo(1024 , coresPerExecutor = Some (2 ))
273- val appInfo2 = makeAppInfo(256 , coresPerExecutor = Some (2 ))
274- val appInfo3 = makeAppInfo(256 , coresPerExecutor = Some (3 ))
275- val workerInfo = makeWorkerInfo(4096 , 10 )
276- val workerInfos = Array (workerInfo, workerInfo, workerInfo)
277- // Each worker should end up with 4 executors with 2 cores each
278- // This should be 4 because of the memory restriction on each worker
279- var scheduledCores = master.invokePrivate(
280- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
281- assert(scheduledCores.length === 3 )
282- assert(scheduledCores(0 ) === 8 )
283- assert(scheduledCores(1 ) === 8 )
284- assert(scheduledCores(2 ) === 8 )
285- // Now test the same thing without running into the worker memory limit
286- // Each worker should now end up with 5 executors with 2 cores each
287- scheduledCores = master.invokePrivate(
288- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
289- assert(scheduledCores.length === 3 )
290- assert(scheduledCores(0 ) === 10 )
291- assert(scheduledCores(1 ) === 10 )
292- assert(scheduledCores(2 ) === 10 )
293- // Now test the same thing with a cores per executor that 10 is not divisible by
294- scheduledCores = master.invokePrivate(
295- _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
296- assert(scheduledCores.length === 3 )
297- assert(scheduledCores(0 ) === 9 )
298- assert(scheduledCores(1 ) === 9 )
299- assert(scheduledCores(2 ) === 9 )
300- }
301-
302- // Sorry for the long method name!
303- private def testSchedulingWithCoresPerExecutorAndMaxCores (spreadOut : Boolean ): Unit = {
304- val master = makeMaster()
305- val appInfo1 = makeAppInfo(256 , coresPerExecutor = Some (2 ), maxCores = Some (4 ))
306- val appInfo2 = makeAppInfo(256 , coresPerExecutor = Some (2 ), maxCores = Some (20 ))
307- val appInfo3 = makeAppInfo(256 , coresPerExecutor = Some (3 ), maxCores = Some (20 ))
308- val workerInfo = makeWorkerInfo(4096 , 10 )
309- val workerInfos = Array (workerInfo, workerInfo, workerInfo)
310- // We should only launch two executors, each with exactly 2 cores
311- var scheduledCores = master.invokePrivate(
312- _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
313- assert(scheduledCores.length === 3 )
314- if (spreadOut) {
315- assert(scheduledCores(0 ) === 2 )
316- assert(scheduledCores(1 ) === 2 )
317- assert(scheduledCores(2 ) === 0 )
318- } else {
319- assert(scheduledCores(0 ) === 4 )
320- assert(scheduledCores(1 ) === 0 )
321- assert(scheduledCores(2 ) === 0 )
322- }
323- // Test max cores > number of cores per worker
324- scheduledCores = master.invokePrivate(
325- _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
326- assert(scheduledCores.length === 3 )
327- if (spreadOut) {
328- assert(scheduledCores(0 ) === 8 )
329- assert(scheduledCores(1 ) === 6 )
330- assert(scheduledCores(2 ) === 6 )
331- } else {
332- assert(scheduledCores(0 ) === 10 )
333- assert(scheduledCores(1 ) === 10 )
334- assert(scheduledCores(2 ) === 0 )
335- }
336- // Test max cores > number of cores per worker AND
337- // a cores per executor that is 10 is not divisible by
338- scheduledCores = master.invokePrivate(
339- _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
340- assert(scheduledCores.length === 3 )
341- if (spreadOut) {
342- assert(scheduledCores(0 ) === 6 )
343- assert(scheduledCores(1 ) === 6 )
344- assert(scheduledCores(2 ) === 6 )
345- } else {
346- assert(scheduledCores(0 ) === 9 )
347- assert(scheduledCores(1 ) === 9 )
348- assert(scheduledCores(2 ) === 0 )
349- }
350- }
351-
352- // ===============================
353- // | Utility methods for testing |
354- // ===============================
355-
356- private val _scheduleExecutorsOnWorkers = PrivateMethod [Array [Int ]](' scheduleExecutorsOnWorkers )
357-
358- private def makeMaster (conf : SparkConf = new SparkConf ): Master = {
359- val securityMgr = new SecurityManager (conf)
360- val rpcEnv = RpcEnv .create(Master .SYSTEM_NAME , " localhost" , 7077 , conf, securityMgr)
361- val master = new Master (rpcEnv, rpcEnv.address, 8080 , securityMgr, conf)
362- master
363- }
364-
365- private def makeAppInfo (
366- memoryPerExecutorMb : Int ,
367- coresPerExecutor : Option [Int ] = None ,
368- maxCores : Option [Int ] = None ): ApplicationInfo = {
369- val desc = new ApplicationDescription (
370- " test" , maxCores, memoryPerExecutorMb, null , " " , None , None , coresPerExecutor)
371- val appId = System .currentTimeMillis.toString
372- new ApplicationInfo (0 , appId, desc, new Date , null , Int .MaxValue )
373- }
374-
375- private def makeWorkerInfo (memoryMb : Int , cores : Int ): WorkerInfo = {
376- val workerId = System .currentTimeMillis.toString
377- new WorkerInfo (workerId, " host" , 100 , cores, memoryMb, null , 101 , " address" )
378- }
379-
380187}
0 commit comments