Skip to content

Commit c01ead5

Browse files
author
Monokaix
committed
Add plugin for networkTopology and score logic
Signed-off-by: Monokaix <[email protected]>
1 parent 90d1c52 commit c01ead5

File tree

5 files changed

+186
-58
lines changed

5 files changed

+186
-58
lines changed

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 77 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,18 @@ type Action struct {
3535
// configured flag for error cache
3636
enablePredicateErrorCache bool
3737
hyperNodesTiers []int
38+
39+
// hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
40+
// all nodes' scores in each available hyperNode only when job has hard network topology constrains
41+
// jobUID -> hyperNodeName -> score
42+
hyperNodeScoresByJob map[string]map[string]float64
3843
}
3944

4045
func New() *Action {
4146
return &Action{
4247
enablePredicateErrorCache: true, // default to enable it
4348
hyperNodesTiers: []int{},
49+
hyperNodeScoresByJob: make(map[string]map[string]float64),
4450
}
4551
}
4652

@@ -306,7 +312,7 @@ func (alloc *Action) selectBestHyperNode(jobStmts map[string]*framework.Statemen
306312
candidateHyperNodeGroups[hyperNodeName] = ssn.HyperNodes[hyperNodeName]
307313
}
308314

309-
hyperNodeScores, err := util.PrioritizeHyperNodes(candidateHyperNodeGroups, job, ssn.HyperNodeOrderMapFn)
315+
hyperNodeScores, err := util.PrioritizeHyperNodes(candidateHyperNodeGroups, alloc.hyperNodeScoresByJob[string(job.UID)], job, ssn.HyperNodeOrderMapFn)
310316
if err != nil {
311317
klog.V(3).ErrorS(err, "Failed to allocate resource for job", "jobName", job.UID)
312318
return nil, bestHyperNodeName
@@ -379,54 +385,12 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
379385
}
380386
}
381387

382-
// Candidate nodes are divided into two gradients:
383-
// - the first gradient node: a list of free nodes that satisfy the task resource request;
384-
// - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
385-
// Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
386-
// otherwise, score the second gradient node and select the appropriate node.
387-
var candidateNodes [][]*api.NodeInfo
388-
var idleCandidateNodes []*api.NodeInfo
389-
var futureIdleCandidateNodes []*api.NodeInfo
390-
for _, n := range predicateNodes {
391-
if task.InitResreq.LessEqual(n.Idle, api.Zero) {
392-
idleCandidateNodes = append(idleCandidateNodes, n)
393-
} else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
394-
futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
395-
} else {
396-
klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
397-
n.Name, n.Idle, n.FutureIdle(), task.Name)
398-
}
399-
}
400-
candidateNodes = append(candidateNodes, idleCandidateNodes)
401-
candidateNodes = append(candidateNodes, futureIdleCandidateNodes)
402-
403-
var bestNode *api.NodeInfo
404-
for index, nodes := range candidateNodes {
405-
if klog.V(5).Enabled() {
406-
for _, node := range nodes {
407-
klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle())
408-
}
409-
}
410-
switch {
411-
case len(nodes) == 0:
412-
klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
413-
case len(nodes) == 1: // If only one node after predicate, just use it.
414-
bestNode = nodes[0]
415-
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
416-
nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
417-
418-
bestNode = ssn.BestNodeFn(task, nodeScores)
419-
if bestNode == nil {
420-
bestNode = util.SelectBestNode(nodeScores)
421-
}
422-
}
423-
424-
// If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
425-
if bestNode != nil {
426-
break
427-
}
388+
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
389+
if bestNode == nil {
390+
continue
428391
}
429392

393+
alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore)
430394
alloc.allocateResourcesForTask(stmt, task, bestNode, job)
431395

432396
if ssn.JobReady(job) && !tasks.Empty() {
@@ -445,6 +409,72 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
445409
}
446410
}
447411

412+
func (alloc *Action) sumNodeScoresInHyperNode(jobUID, hyperNode string, score float64) {
413+
// normal vc job without networkTopology has no hyperNode, skip node scores accumulation.
414+
if hyperNode == "" {
415+
return
416+
}
417+
418+
if alloc.hyperNodeScoresByJob[jobUID] == nil {
419+
alloc.hyperNodeScoresByJob[jobUID] = make(map[string]float64)
420+
}
421+
422+
alloc.hyperNodeScoresByJob[jobUID][hyperNode] += score
423+
}
424+
425+
// prioritizeNodes selects the highest score node.
426+
func (alloc *Action) prioritizeNodes(ssn *framework.Session, task *api.TaskInfo, predicateNodes []*api.NodeInfo) (*api.NodeInfo, float64) {
427+
// Candidate nodes are divided into two gradients:
428+
// - the first gradient node: a list of free nodes that satisfy the task resource request;
429+
// - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
430+
// Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
431+
// otherwise, score the second gradient node and select the appropriate node.
432+
var candidateNodes [][]*api.NodeInfo
433+
var idleCandidateNodes []*api.NodeInfo
434+
var futureIdleCandidateNodes []*api.NodeInfo
435+
for _, n := range predicateNodes {
436+
if task.InitResreq.LessEqual(n.Idle, api.Zero) {
437+
idleCandidateNodes = append(idleCandidateNodes, n)
438+
} else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
439+
futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
440+
} else {
441+
klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
442+
n.Name, n.Idle, n.FutureIdle(), task.Name)
443+
}
444+
}
445+
candidateNodes = append(candidateNodes, idleCandidateNodes)
446+
candidateNodes = append(candidateNodes, futureIdleCandidateNodes)
447+
448+
var bestNode *api.NodeInfo
449+
var higestScore float64
450+
for index, nodes := range candidateNodes {
451+
if klog.V(5).Enabled() {
452+
for _, node := range nodes {
453+
klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle())
454+
}
455+
}
456+
switch {
457+
case len(nodes) == 0:
458+
klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
459+
case len(nodes) == 1: // If only one node after predicate, just use it.
460+
bestNode = nodes[0]
461+
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
462+
nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
463+
464+
bestNode = ssn.BestNodeFn(task, nodeScores)
465+
if bestNode == nil {
466+
bestNode, higestScore = util.SelectBestNodeAndScore(nodeScores)
467+
}
468+
}
469+
470+
// If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
471+
if bestNode != nil {
472+
break
473+
}
474+
}
475+
return bestNode, higestScore
476+
}
477+
448478
func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *api.TaskInfo, node *api.NodeInfo, job *api.JobInfo) {
449479
// Allocate idle resource to the task.
450480
if task.InitResreq.LessEqual(node.Idle, api.Zero) {

pkg/scheduler/actions/allocate/allocate_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"volcano.sh/volcano/pkg/scheduler/cache"
4141
"volcano.sh/volcano/pkg/scheduler/conf"
4242
"volcano.sh/volcano/pkg/scheduler/framework"
43+
"volcano.sh/volcano/pkg/scheduler/plugins/binpack"
4344
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
4445
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
4546
"volcano.sh/volcano/pkg/scheduler/plugins/nodeorder"
@@ -477,6 +478,87 @@ func TestAllocateWithNetWorkTopologies(t *testing.T) {
477478
}
478479
}
479480

481+
func TestNodeLevelScoreWithNetWorkTopologies(t *testing.T) {
482+
plugins := map[string]framework.PluginBuilder{
483+
predicates.PluginName: predicates.New,
484+
gang.PluginName: gang.New,
485+
binpack.PluginName: binpack.New,
486+
}
487+
488+
tests := []uthelper.TestCommonStruct{
489+
{
490+
Name: "hard network topology constrain, allocate job to highest score hypeNode with node level binpack",
491+
PodGroups: []*schedulingv1.PodGroup{
492+
util.BuildPodGroupWithNetWorkTopologies("pg1", "c1", "q1", 2, nil, schedulingv1.PodGroupInqueue, "hard", 1),
493+
util.BuildPodGroupWithNetWorkTopologies("pg2", "c1", "q1", 2, nil, schedulingv1.PodGroupRunning, "", 1),
494+
},
495+
Pods: []*v1.Pod{
496+
// should use different role, because allocate actions default to enable the role caches when predicate
497+
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4Gi"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
498+
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("4", "8Gi"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
499+
500+
util.BuildPod("c1", "p3", "s0-n1", v1.PodRunning, api.BuildResourceList("2", "4Gi"), "pg2", map[string]string{"volcano.sh/task-spec": "master"}, nil),
501+
util.BuildPod("c1", "p4", "s0-n2", v1.PodRunning, api.BuildResourceList("4", "8Gi"), "pg2", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
502+
},
503+
Nodes: []*v1.Node{
504+
util.BuildNode("s0-n1", api.BuildResourceList("4", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
505+
util.BuildNode("s0-n2", api.BuildResourceList("8", "16Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
506+
util.BuildNode("s1-n3", api.BuildResourceList("4", "8Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
507+
util.BuildNode("s1-n4", api.BuildResourceList("8", "16Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil),
508+
},
509+
HyperNodesListByTier: map[int][]string{0: {"s0", "s1"}},
510+
HyperNodes: map[string]sets.Set[string]{
511+
"s0": sets.New[string]("s0-n1", "s0-n2"),
512+
"s1": sets.New[string]("s1-n3", "s1-n4"),
513+
"s2": sets.New[string]("s0-n1", "s0-n2", "s1-n3", "s1-n4"),
514+
},
515+
Queues: []*schedulingv1.Queue{
516+
util.BuildQueue("q1", 1, nil),
517+
},
518+
ExpectBindsNum: 2,
519+
// "s0-n1" and "s0-n2" nodes have running pods, so get higher score when enable binpack.
520+
ExpectBindMap: map[string]string{
521+
"c1/p1": "s0-n1",
522+
"c1/p2": "s0-n2",
523+
},
524+
},
525+
}
526+
527+
trueValue := true
528+
tiers := []conf.Tier{
529+
{
530+
Plugins: []conf.PluginOption{
531+
{
532+
Name: gang.PluginName,
533+
EnabledJobOrder: &trueValue,
534+
EnabledJobReady: &trueValue,
535+
EnabledJobPipelined: &trueValue,
536+
EnabledJobStarving: &trueValue,
537+
},
538+
{
539+
Name: predicates.PluginName,
540+
EnabledPredicate: &trueValue,
541+
},
542+
{
543+
Name: binpack.PluginName,
544+
EnabledNodeOrder: &trueValue,
545+
},
546+
},
547+
},
548+
}
549+
for i, test := range tests {
550+
t.Run(test.Name, func(t *testing.T) {
551+
test.Plugins = plugins
552+
test.RegisterSession(tiers, nil)
553+
defer test.Close()
554+
test.Run([]framework.Action{New()})
555+
if err := test.CheckAll(i); err != nil {
556+
t.Fatal(err)
557+
}
558+
})
559+
}
560+
}
561+
480562
func TestFareShareAllocate(t *testing.T) {
481563
plugins := map[string]framework.PluginBuilder{
482564
drf.PluginName: drf.New,

pkg/scheduler/actions/backfill/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (backfill *Action) Execute(ssn *framework.Session) {
8585
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
8686
node = ssn.BestNodeFn(task, nodeScores)
8787
if node == nil {
88-
node = util.SelectBestNode(nodeScores)
88+
node, _ = util.SelectBestNodeAndScore(nodeScores)
8989
}
9090
}
9191

pkg/scheduler/util/scheduler_helper.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,27 +127,37 @@ func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.Batc
127127
return nodeScores
128128
}
129129

130-
// PrioritizeHyperNodes prioritize hyperNodes score of all plugins for job and return hyperNode name with the highest score.
131-
func PrioritizeHyperNodes(candidateHyperNodes map[string][]*api.NodeInfo, job *api.JobInfo, fn api.HyperNodeOrderMapFn) (map[float64][]string, error) {
132-
pluginHyperNodesScoreMap := make(map[string]float64)
130+
// PrioritizeHyperNodes returns a map whose key is hyperNode's score and value are corresponding hyperNodes
131+
// it accumulates two parts score:
132+
// 1.node level scores of each hyperNode in NodeOrder extension.
133+
// 2.hyperNode level scores scored in HyperNodeOrder extension.
134+
func PrioritizeHyperNodes(candidateHyperNodes map[string][]*api.NodeInfo, nodeScoresInHyperNode map[string]float64, job *api.JobInfo, fn api.HyperNodeOrderMapFn) (map[float64][]string, error) {
135+
hyperNodesScoreMap := make(map[string]float64)
133136
mapScores, err := fn(job, candidateHyperNodes)
134137
if err != nil {
135138
return nil, err
136139
}
137140

141+
// plugin scores of hyperNode.
138142
for pluginName, scores := range mapScores {
139143
for hyperNode, score := range scores {
140144
klog.V(5).InfoS("Add plugin score at hypeNode", "jobName", job.UID, "pluginName", pluginName, "hyperNodeName", hyperNode, "score", score)
141-
pluginHyperNodesScoreMap[hyperNode] += score
145+
hyperNodesScoreMap[hyperNode] += score
142146
}
143147
}
144148

149+
// accumulate node scores in NodeOrder and hyperNode score itself as the final score of each hyperNode.
150+
for hyperNodeName, score := range nodeScoresInHyperNode {
151+
klog.V(5).InfoS("Add node level scores to final hyperNode score", "jobName", job.UID, "hyperNodeName", hyperNodeName, "score", score)
152+
hyperNodesScoreMap[hyperNodeName] += score
153+
}
154+
145155
hyperNodeScores := make(map[float64][]string)
146156
hyperNodeScoreMap := make(map[string]float64)
147157
for hyperNodeName := range candidateHyperNodes {
148158
// If no plugin is applied to this node, the default is 0.0
149159
score := 0.0
150-
if value, ok := pluginHyperNodesScoreMap[hyperNodeName]; ok {
160+
if value, ok := hyperNodesScoreMap[hyperNodeName]; ok {
151161
score += value
152162
}
153163
hyperNodeScores[score] = append(hyperNodeScores[score], hyperNodeName)
@@ -176,8 +186,8 @@ func SortNodes(nodeScores map[float64][]*api.NodeInfo) []*api.NodeInfo {
176186
return nodesInorder
177187
}
178188

179-
// SelectBestNode returns best node whose score is highest, pick one randomly if there are many nodes with same score.
180-
func SelectBestNode(nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo {
189+
// SelectBestNodeAndScore returns the best node whose score is highest and the highest score, pick one randomly if there are many nodes with same score.
190+
func SelectBestNodeAndScore(nodeScores map[float64][]*api.NodeInfo) (*api.NodeInfo, float64) {
181191
var bestNodes []*api.NodeInfo
182192
maxScore := -1.0
183193
for score, nodes := range nodeScores {
@@ -188,10 +198,10 @@ func SelectBestNode(nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo {
188198
}
189199

190200
if len(bestNodes) == 0 {
191-
return nil
201+
return nil, 0
192202
}
193203

194-
return bestNodes[rand.Intn(len(bestNodes))]
204+
return bestNodes[rand.Intn(len(bestNodes))], maxScore
195205
}
196206

197207
// SelectBestHyperNode return the best hyperNode name whose score is highest, pick one randomly if there are many hyperNodes with same score.

pkg/scheduler/util/scheduler_helper_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ func TestSelectBestNode(t *testing.T) {
3232
NodeScores map[float64][]*api.NodeInfo
3333
// Expected node is one of ExpectedNodes
3434
ExpectedNodes []*api.NodeInfo
35+
ExpectedScore float64
3536
}{
3637
{
3738
NodeScores: map[float64][]*api.NodeInfo{
3839
1.0: {&api.NodeInfo{Name: "node1"}, &api.NodeInfo{Name: "node2"}},
3940
2.0: {&api.NodeInfo{Name: "node3"}, &api.NodeInfo{Name: "node4"}},
4041
},
4142
ExpectedNodes: []*api.NodeInfo{{Name: "node3"}, {Name: "node4"}},
43+
ExpectedScore: 2.0,
4244
},
4345
{
4446
NodeScores: map[float64][]*api.NodeInfo{
@@ -47,6 +49,7 @@ func TestSelectBestNode(t *testing.T) {
4749
2.0: {&api.NodeInfo{Name: "node4"}, &api.NodeInfo{Name: "node5"}},
4850
},
4951
ExpectedNodes: []*api.NodeInfo{{Name: "node3"}},
52+
ExpectedScore: 3.0,
5053
},
5154
{
5255
NodeScores: map[float64][]*api.NodeInfo{},
@@ -63,10 +66,13 @@ func TestSelectBestNode(t *testing.T) {
6366
return false
6467
}
6568
for i, test := range cases {
66-
result := SelectBestNode(test.NodeScores)
69+
result, score := SelectBestNodeAndScore(test.NodeScores)
6770
if !oneOf(result, test.ExpectedNodes) {
6871
t.Errorf("Failed test case #%d, expected: %#v, got %#v", i, test.ExpectedNodes, result)
6972
}
73+
if score != test.ExpectedScore {
74+
t.Errorf("Failed test case #%d, expected: %#v, got %#v", i, test.ExpectedScore, score)
75+
}
7076
}
7177
}
7278

0 commit comments

Comments
 (0)