@@ -35,12 +35,18 @@ type Action struct {
3535 // configured flag for error cache
3636 enablePredicateErrorCache bool
3737 hyperNodesTiers []int
38+
39+ // hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
40+ // all nodes' scores in each available hyperNode only when job has hard network topology constrains
41+ // jobUID -> hyperNodeName -> score
42+ hyperNodeScoresByJob map [string ]map [string ]float64
3843}
3944
4045func New () * Action {
4146 return & Action {
4247 enablePredicateErrorCache : true , // default to enable it
4348 hyperNodesTiers : []int {},
49+ hyperNodeScoresByJob : make (map [string ]map [string ]float64 ),
4450 }
4551}
4652
@@ -306,7 +312,7 @@ func (alloc *Action) selectBestHyperNode(jobStmts map[string]*framework.Statemen
306312 candidateHyperNodeGroups [hyperNodeName ] = ssn .HyperNodes [hyperNodeName ]
307313 }
308314
309- hyperNodeScores , err := util .PrioritizeHyperNodes (candidateHyperNodeGroups , job , ssn .HyperNodeOrderMapFn )
315+ hyperNodeScores , err := util .PrioritizeHyperNodes (candidateHyperNodeGroups , alloc . hyperNodeScoresByJob [ string ( job . UID )], job , ssn .HyperNodeOrderMapFn )
310316 if err != nil {
311317 klog .V (3 ).ErrorS (err , "Failed to allocate resource for job" , "jobName" , job .UID )
312318 return nil , bestHyperNodeName
@@ -379,54 +385,12 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
379385 }
380386 }
381387
382- // Candidate nodes are divided into two gradients:
383- // - the first gradient node: a list of free nodes that satisfy the task resource request;
384- // - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
385- // Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
386- // otherwise, score the second gradient node and select the appropriate node.
387- var candidateNodes [][]* api.NodeInfo
388- var idleCandidateNodes []* api.NodeInfo
389- var futureIdleCandidateNodes []* api.NodeInfo
390- for _ , n := range predicateNodes {
391- if task .InitResreq .LessEqual (n .Idle , api .Zero ) {
392- idleCandidateNodes = append (idleCandidateNodes , n )
393- } else if task .InitResreq .LessEqual (n .FutureIdle (), api .Zero ) {
394- futureIdleCandidateNodes = append (futureIdleCandidateNodes , n )
395- } else {
396- klog .V (5 ).Infof ("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v" ,
397- n .Name , n .Idle , n .FutureIdle (), task .Name )
398- }
399- }
400- candidateNodes = append (candidateNodes , idleCandidateNodes )
401- candidateNodes = append (candidateNodes , futureIdleCandidateNodes )
402-
403- var bestNode * api.NodeInfo
404- for index , nodes := range candidateNodes {
405- if klog .V (5 ).Enabled () {
406- for _ , node := range nodes {
407- klog .V (5 ).Infof ("node %v, idle: %v, future idle: %v" , node .Name , node .Idle , node .FutureIdle ())
408- }
409- }
410- switch {
411- case len (nodes ) == 0 :
412- klog .V (5 ).Infof ("Task: %v, no matching node is found in the candidateNodes(index: %d) list." , task .Name , index )
413- case len (nodes ) == 1 : // If only one node after predicate, just use it.
414- bestNode = nodes [0 ]
415- case len (nodes ) > 1 : // If more than one node after predicate, using "the best" one
416- nodeScores := util .PrioritizeNodes (task , nodes , ssn .BatchNodeOrderFn , ssn .NodeOrderMapFn , ssn .NodeOrderReduceFn )
417-
418- bestNode = ssn .BestNodeFn (task , nodeScores )
419- if bestNode == nil {
420- bestNode = util .SelectBestNode (nodeScores )
421- }
422- }
423-
424- // If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
425- if bestNode != nil {
426- break
427- }
388+ bestNode , highestScore := alloc .prioritizeNodes (ssn , task , predicateNodes )
389+ if bestNode == nil {
390+ continue
428391 }
429392
393+ alloc .sumNodeScoresInHyperNode (string (job .UID ), hyperNode , highestScore )
430394 alloc .allocateResourcesForTask (stmt , task , bestNode , job )
431395
432396 if ssn .JobReady (job ) && ! tasks .Empty () {
@@ -445,6 +409,73 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
445409 }
446410}
447411
412+ func (alloc * Action ) sumNodeScoresInHyperNode (jobUID , hyperNode string , score float64 ) {
413+ // normal vc job without networkTopology has no hyperNode, skip node scores accumulation.
414+ if hyperNode == "" {
415+ return
416+ }
417+
418+ if alloc .hyperNodeScoresByJob [jobUID ] == nil {
419+ alloc .hyperNodeScoresByJob [jobUID ] = make (map [string ]float64 )
420+ }
421+
422+ alloc.hyperNodeScoresByJob [jobUID ][hyperNode ] += score
423+ }
424+
425+ // prioritizeNodes selects the highest score node.
426+ func (alloc * Action ) prioritizeNodes (ssn * framework.Session , task * api.TaskInfo , predicateNodes []* api.NodeInfo ) (* api.NodeInfo , float64 ) {
427+ // Candidate nodes are divided into two gradients:
428+ // - the first gradient node: a list of free nodes that satisfy the task resource request;
429+ // - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
430+ // Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
431+ // otherwise, score the second gradient node and select the appropriate node.
432+ var candidateNodes [][]* api.NodeInfo
433+ var idleCandidateNodes []* api.NodeInfo
434+ var futureIdleCandidateNodes []* api.NodeInfo
435+ for _ , n := range predicateNodes {
436+ if task .InitResreq .LessEqual (n .Idle , api .Zero ) {
437+ idleCandidateNodes = append (idleCandidateNodes , n )
438+ } else if task .InitResreq .LessEqual (n .FutureIdle (), api .Zero ) {
439+ futureIdleCandidateNodes = append (futureIdleCandidateNodes , n )
440+ } else {
441+ klog .V (5 ).Infof ("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v" ,
442+ n .Name , n .Idle , n .FutureIdle (), task .Name )
443+ }
444+ }
445+ candidateNodes = append (candidateNodes , idleCandidateNodes )
446+ candidateNodes = append (candidateNodes , futureIdleCandidateNodes )
447+
448+ var bestNode * api.NodeInfo
449+ var higestScore float64
450+ for index , nodes := range candidateNodes {
451+ if klog .V (5 ).Enabled () {
452+ for _ , node := range nodes {
453+ klog .V (5 ).Infof ("node %v, idle: %v, future idle: %v" , node .Name , node .Idle , node .FutureIdle ())
454+ }
455+ }
456+ switch {
457+ case len (nodes ) == 0 :
458+ klog .V (5 ).Infof ("Task: %v, no matching node is found in the candidateNodes(index: %d) list." , task .Name , index )
459+ case len (nodes ) == 1 : // If only one node after predicate, just use it.
460+ bestNode = nodes [0 ]
461+ case len (nodes ) > 1 : // If more than one node after predicate, using "the best" one
462+ nodeScores := util .PrioritizeNodes (task , nodes , ssn .BatchNodeOrderFn , ssn .NodeOrderMapFn , ssn .NodeOrderReduceFn )
463+
464+ bestNode = ssn .BestNodeFn (task , nodeScores )
465+ if bestNode == nil {
466+ bestNode , higestScore = util .SelectBestNodeAndScore (nodeScores )
467+ }
468+
469+ }
470+
471+ // If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
472+ if bestNode != nil {
473+ break
474+ }
475+ }
476+ return bestNode , higestScore
477+ }
478+
448479func (alloc * Action ) allocateResourcesForTask (stmt * framework.Statement , task * api.TaskInfo , node * api.NodeInfo , job * api.JobInfo ) {
449480 // Allocate idle resource to the task.
450481 if task .InitResreq .LessEqual (node .Idle , api .Zero ) {
0 commit comments