Skip to content

Commit cf1a187

Browse files
committed
add network topology plugin
Signed-off-by: lowang-bh <[email protected]> fix testcase when enable gang Signed-off-by: lowang-bh <[email protected]>
1 parent e16ef63 commit cf1a187

File tree

4 files changed

+403
-0
lines changed

4 files changed

+403
-0
lines changed

pkg/scheduler/plugins/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"volcano.sh/volcano/pkg/scheduler/plugins/drf"
2727
"volcano.sh/volcano/pkg/scheduler/plugins/extender"
2828
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
29+
nettopology "volcano.sh/volcano/pkg/scheduler/plugins/network-topology"
2930
"volcano.sh/volcano/pkg/scheduler/plugins/nodegroup"
3031
"volcano.sh/volcano/pkg/scheduler/plugins/nodeorder"
3132
"volcano.sh/volcano/pkg/scheduler/plugins/numaaware"
@@ -56,6 +57,7 @@ func init() {
5657
framework.RegisterPluginBuilder(overcommit.PluginName, overcommit.New)
5758
framework.RegisterPluginBuilder(sla.PluginName, sla.New)
5859
framework.RegisterPluginBuilder(tasktopology.PluginName, tasktopology.New)
60+
framework.RegisterPluginBuilder(nettopology.PluginName, nettopology.New)
5961
framework.RegisterPluginBuilder(numaaware.PluginName, numaaware.New)
6062
framework.RegisterPluginBuilder(cdp.PluginName, cdp.New)
6163
framework.RegisterPluginBuilder(rescheduling.PluginName, rescheduling.New)
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
Copyright 2024 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package networktopology
18+
19+
import (
20+
"strings"
21+
22+
"k8s.io/klog/v2"
23+
24+
"volcano.sh/volcano/pkg/scheduler/api"
25+
"volcano.sh/volcano/pkg/scheduler/framework"
26+
)
27+
28+
// PluginName indicates name of volcano scheduler plugin.
29+
const (
30+
PluginName = "network-topology"
31+
networkTopologyWeight = "network-topology.weight"
32+
networkTopologyType = "network-topology.type" // strategy type for network topology
33+
// strategy value for network-topology.type
34+
staticAware = "static"
35+
dynamicAware = "dynamic"
36+
)
37+
38+
type netTopPlugin struct {
39+
pluginArguments framework.Arguments
40+
weight int // network-topology plugin score weight
41+
topologyType string // supported topology type: static, dynamic
42+
staticTopAware staticTopAware // use node labels to generate topology
43+
}
44+
45+
// New return gang plugin
46+
func New(arguments framework.Arguments) framework.Plugin {
47+
return &netTopPlugin{pluginArguments: arguments, weight: 1, staticTopAware: staticTopAware{records: map[api.JobID]string{}}}
48+
}
49+
50+
func (np *netTopPlugin) Name() string {
51+
return PluginName
52+
}
53+
54+
func (np *netTopPlugin) parseArguments() {
55+
np.pluginArguments.GetInt(&np.weight, networkTopologyWeight)
56+
value, ok := np.pluginArguments[networkTopologyType]
57+
if !ok {
58+
klog.Warningf("%s is not set, use default strategy %s", networkTopologyType, staticAware)
59+
np.topologyType = staticAware
60+
return
61+
}
62+
63+
v, ok := value.(string)
64+
if !ok {
65+
klog.Warningf("invalid value for %s, use default strategy %s", networkTopologyType, staticAware)
66+
np.topologyType = staticAware
67+
return
68+
}
69+
np.topologyType = strings.TrimSpace(v)
70+
}
71+
72+
// parseStaticAwareArguments return a boolean value indicating whether staticAware is valid to be used
73+
func (np *netTopPlugin) parseStaticAwareArguments() bool {
74+
keys, exist := np.pluginArguments[networkTopologyKeys]
75+
if !exist {
76+
klog.Warningf("plugin %s (with type %s) arguments does not configure %s, skip", PluginName, np.topologyType, networkTopologyKeys)
77+
return false
78+
}
79+
topKeys, ok := keys.(string)
80+
if !ok {
81+
klog.Warningf("plugin %s (with type %s) arguments %s should has a string value", PluginName, networkTopologyKeys, networkTopologyKeys)
82+
return false
83+
}
84+
for _, key := range strings.Split(topKeys, ",") {
85+
np.staticTopAware.topologyKeys = append(np.staticTopAware.topologyKeys, strings.TrimSpace(key))
86+
}
87+
np.staticTopAware.weight = np.weight
88+
return true
89+
}
90+
91+
func (np *netTopPlugin) OnSessionOpen(ssn *framework.Session) {
92+
np.parseArguments()
93+
if np.topologyType == staticAware {
94+
valid := np.parseStaticAwareArguments()
95+
if valid {
96+
np.staticTopAware.OnSessionOpen(ssn)
97+
}
98+
} else {
99+
klog.Warningf("strategy %s is not supported in plugin %s", np.topologyType, PluginName)
100+
}
101+
}
102+
103+
func (np *netTopPlugin) OnSessionClose(ssn *framework.Session) {
104+
if np.topologyType == staticAware {
105+
np.staticTopAware.OnSessionClose(ssn)
106+
}
107+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
Copyright 2024 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package networktopology
18+
19+
import (
20+
"os"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
v1 "k8s.io/api/core/v1"
25+
schedulingv1 "k8s.io/api/scheduling/v1"
26+
27+
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
28+
"volcano.sh/volcano/cmd/scheduler/app/options"
29+
"volcano.sh/volcano/pkg/scheduler/actions/allocate"
30+
"volcano.sh/volcano/pkg/scheduler/api"
31+
"volcano.sh/volcano/pkg/scheduler/conf"
32+
"volcano.sh/volcano/pkg/scheduler/framework"
33+
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
34+
"volcano.sh/volcano/pkg/scheduler/plugins/predicates"
35+
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
36+
"volcano.sh/volcano/pkg/scheduler/uthelper"
37+
"volcano.sh/volcano/pkg/scheduler/util"
38+
)
39+
40+
func TestMain(m *testing.M) {
41+
options.Default() // init default options params which used in some packages
42+
os.Exit(m.Run())
43+
}
44+
45+
func TestParse(t *testing.T) {
46+
plug := New(framework.Arguments{networkTopologyWeight: 10, networkTopologyType: dynamicAware})
47+
netplug := plug.(*netTopPlugin)
48+
netplug.parseArguments()
49+
assert.Equal(t, netplug.weight, 10)
50+
assert.Equal(t, netplug.topologyType, dynamicAware)
51+
52+
plug = New(framework.Arguments{networkTopologyWeight: 10, networkTopologyKeys: " switch, idc"})
53+
netplug = plug.(*netTopPlugin)
54+
netplug.parseArguments()
55+
assert.Equal(t, netplug.topologyType, staticAware)
56+
netplug.parseStaticAwareArguments()
57+
assert.Equal(t, netplug.staticTopAware.topologyKeys, []string{"switch", "idc"})
58+
assert.Equal(t, netplug.staticTopAware.weight, 10)
59+
}
60+
61+
func TestStaticNetTopAware(t *testing.T) {
62+
trueValue := true
63+
plugins := map[string]framework.PluginBuilder{
64+
PluginName: New,
65+
//Note: when enable gang plugin: pods need to set different role spec, because predicate plugin default enabled cache for same spec task
66+
gang.PluginName: gang.New,
67+
priority.PluginName: priority.New,
68+
predicates.PluginName: predicates.New,
69+
}
70+
high, low := int32(100000), int32(10)
71+
highPrioCls := util.BuildPriorityClass("high-priority", high)
72+
lowPrioCls := util.BuildPriorityClass("low-priority", low)
73+
tiers := []conf.Tier{
74+
{
75+
Plugins: []conf.PluginOption{
76+
{
77+
Name: gang.PluginName,
78+
EnabledJobReady: &trueValue,
79+
EnabledJobPipelined: &trueValue,
80+
},
81+
{
82+
Name: priority.PluginName,
83+
EnabledJobOrder: &trueValue,
84+
EnabledTaskOrder: &trueValue,
85+
},
86+
{
87+
Name: predicates.PluginName,
88+
EnabledPredicate: &trueValue,
89+
Arguments: map[string]interface{}{
90+
predicates.NodeAffinityEnable: true,
91+
predicates.PodAffinityEnable: true,
92+
},
93+
},
94+
{
95+
Name: PluginName,
96+
EnabledNodeOrder: &trueValue,
97+
Arguments: map[string]interface{}{
98+
networkTopologyKeys: "role, switch, idc", // same role nodes get higher score, then same switch, final same idc
99+
},
100+
},
101+
},
102+
},
103+
}
104+
tests := []uthelper.TestCommonStruct{
105+
{
106+
Name: "with only one key, best effort to allocate to nodes labeled with this key",
107+
PodGroups: []*schedulingv1beta1.PodGroup{
108+
util.BuildPodGroup("pg1", "ns1", "q1", 2, nil, schedulingv1beta1.PodGroupInqueue),
109+
},
110+
Pods: []*v1.Pod{ // let pod1 choose node with label role=worker, then check pod2 also scheduled to that node
111+
util.BuildPodWithPriority("ns1", "pod1", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg1", make(map[string]string), map[string]string{"role": "worker"}, &high),
112+
util.BuildPod("ns1", "pod2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg1", make(map[string]string), make(map[string]string)),
113+
},
114+
Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)},
115+
Nodes: []*v1.Node{
116+
util.BuildNode("node1", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker"}),
117+
util.BuildNode("node2", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "ps"}),
118+
util.BuildNode("node3", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "ps"}),
119+
},
120+
ExpectBindsNum: 2,
121+
ExpectBindMap: map[string]string{"ns1/pod1": "node1", "ns1/pod2": "node1"},
122+
},
123+
{
124+
Name: "with multiple keys, the front keys get higher score",
125+
PodGroups: []*schedulingv1beta1.PodGroup{
126+
util.BuildPodGroup("pg2", "ns1", "q1", 2, nil, schedulingv1beta1.PodGroupInqueue),
127+
},
128+
Pods: []*v1.Pod{ // let pod1 choose node with label role=worker, then check pod2 also scheduled to other node with same label
129+
util.BuildPodWithPriority("ns1", "pod1", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg2", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"role": "worker"}, &high),
130+
util.BuildPod("ns1", "pod2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg2", map[string]string{"volcano.sh/task-spec": "master"}, make(map[string]string)),
131+
},
132+
Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)},
133+
Nodes: []*v1.Node{ // let pod1 first choose node1, which also with label switch; then pod2 preferred to choose node2 with same label switch=swt1
134+
util.BuildNode("node1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker", "switch": "swt1", "idc": "idc1"}),
135+
util.BuildNode("node2", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"switch": "swt1"}),
136+
util.BuildNode("node3", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"idc": "idc1"}),
137+
},
138+
ExpectBindsNum: 2,
139+
ExpectBindMap: map[string]string{"ns1/pod1": "node1", "ns1/pod2": "node2"},
140+
},
141+
{
142+
Name: "with not enougth resource, then choose other nodes without configed key",
143+
PodGroups: []*schedulingv1beta1.PodGroup{
144+
util.BuildPodGroup("pg3", "ns1", "q1", 3, nil, schedulingv1beta1.PodGroupInqueue),
145+
},
146+
Pods: []*v1.Pod{ // all pods must set different role spec, because predicate plugin default enabled cache for same spec task
147+
util.BuildPodWithPriority("ns1", "pod1", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg3", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"role": "worker"}, &high),
148+
util.BuildPod("ns1", "pod-work-2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg3", map[string]string{"volcano.sh/task-spec": "master"}, make(map[string]string)),
149+
util.BuildPod("ns1", "pod-work-3", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg3", make(map[string]string), make(map[string]string)),
150+
},
151+
Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)},
152+
Nodes: []*v1.Node{
153+
util.BuildNode("node1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker", "switch": "swt1"}),
154+
util.BuildNode("node2", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"switch": "swt1"}),
155+
util.BuildNode("node3", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"idc": "idc1"}),
156+
},
157+
ExpectBindsNum: 3,
158+
ExpectBindMap: map[string]string{"ns1/pod1": "node1", "ns1/pod-work-2": "node2", "ns1/pod-work-3": "node3"},
159+
},
160+
{
161+
Name: "with not enougth resource and cannot allocate whole job, deallocating",
162+
PodGroups: []*schedulingv1beta1.PodGroup{
163+
util.BuildPodGroup("pg4", "ns1", "q1", 2, nil, schedulingv1beta1.PodGroupInqueue),
164+
},
165+
Pods: []*v1.Pod{
166+
util.BuildPod("ns1", "pod-work-2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg4", make(map[string]string), make(map[string]string)),
167+
util.BuildPod("ns1", "pod-work-3", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg4", make(map[string]string), make(map[string]string)),
168+
},
169+
Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)},
170+
Nodes: []*v1.Node{
171+
util.BuildNode("node1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker", "switch": "swt1"}),
172+
},
173+
ExpectBindsNum: 0,
174+
ExpectBindMap: map[string]string{},
175+
},
176+
}
177+
for i, test := range tests {
178+
test.Plugins = plugins
179+
test.PriClass = []*schedulingv1.PriorityClass{highPrioCls, lowPrioCls}
180+
t.Run(test.Name, func(t *testing.T) {
181+
test.RegisterSession(tiers, nil)
182+
defer test.Close()
183+
test.Run([]framework.Action{allocate.New()})
184+
err := test.CheckAll(i)
185+
if err != nil {
186+
t.Fatal(err)
187+
}
188+
})
189+
}
190+
}

0 commit comments

Comments
 (0)