diff --git a/docs/design/network-topology-aware.md b/docs/design/network-topology-aware.md new file mode 100644 index 0000000000..ae47ab718c --- /dev/null +++ b/docs/design/network-topology-aware.md @@ -0,0 +1,90 @@ +# Network Topology Aware Plugin + +- [Network Topology Aware Plugin](#network-topology-aware-plugin) + - [Backgrounds](#backgrounds) + - [Motivation](#motivation) + - [Proposal one](#proposal-one) + - [Goals](#goals) + - [Non-Goals](#non-goals) + - [Design Action](#design-action) + - [Pod scheduling process](#pod-scheduling-process) + - [Usage](#usage) + - [Drawbacks](#drawbacks) + +## Backgrounds + +A Kubernetes cluster typically comprises numerous nodes distributed across different IDCs, chassis, and switches. + +Data transformations vary in performance across these different components. + +For latency-sensitive workloads, it's crucial to execute tasks within the same IDC and ideally on the same chassis and switch. + +## Motivation + +The goal is to make the Kubernetes scheduler network-topology aware to achieve the following: + +Ensure optimal scheduling of tasks from the same job onto nodes within the same topology, such as the same IDC, chassis, or switch. + +There will be two types of network-topology aware + +- **static**: `network-topology.type: static` is aiming to aware the network topology by nodes' labels +- **dynamic**: `network-topology.type: dynamic` is aiming to use some tools to detect the network topology dynamically. For example, `ibnetdiscover` can be used to discover the InfiniBand network topology + +## Proposal one + +This proposal requires cluster administrators to manage network topology labels on Kubernetes (K8s) nodes. + +Nodes can be labeled to indicate identical topologies with the same label value. + +### Goals + +- **Single-Key Topology Configuration**: Support scheduling all tasks of a job onto nodes that share the same value for a specified key. +- **Multiple-Key Topology Policies**: Prioritize keys listed earlier for better scheduling preference. + +### Non-Goals + +- **Global Solutions**: This proposal does not aim to find solutions across nodes with all possible values of a topology key simultaneously. + +### Design Action + +#### Pod scheduling process + +1. **Recording Topology Information**: When the first task of a job is assigned to a node, record the node's topology information in the scheduling plugin. +2. **Scoring Nodes for Subsequent Tasks**: During scheduling of subsequent tasks, nodes with the same topology as the initially allocated task receive a higher score; others receive a score of zero. +3. **Handling Multiple Keys**: If a node matches multiple keys from the configured list, the first key in the list is prioritized for scoring. + +```go +nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error){ + ... + score := 0 + weight := np.weight + tlabels := tNode.Node.Labels + labels := node.Node.Labels + lenth := len(np.topologyKeys) + for i, key := range np.topologyKeys { + if tlabels[key] == labels[key] { + score += (lenth - i) // key with more priority at front of which with less priority + break + } + } + return float64(score * weight), nil +} +``` + +#### Usage + +1. Label nodes with key-value pairs (e.g., `switch=NvLink-A100`, `rack=rack1,rack2`, `idc=bj,sh`) to partition nodes into different topology zones +2. Add the `network-topology` plugin in the scheduler configuration to implement these policies. + +```yaml +- plugins: + - name: network-topology + arguments: + network-topology.type: static # static means it will use the node's labels to aware network topology + network-topology.keys: rack,switch,idc # required when type is static + network-topology.weight: 10 +``` + +### Drawbacks + +One drawback is that it's not a global solution that ensures all tasks of a job are placed on nodes within the same topology. For example, if nodes labeled with key-value1 lack sufficient resources while nodes labeled with key-value2 have them, and the first task is assigned to key-value1 nodes, subsequent tasks will still attempt to use key-value1 nodes, despite the resource constraints. diff --git a/pkg/scheduler/plugins/factory.go b/pkg/scheduler/plugins/factory.go index 7e654bbf6f..cf360ee659 100644 --- a/pkg/scheduler/plugins/factory.go +++ b/pkg/scheduler/plugins/factory.go @@ -26,6 +26,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/plugins/drf" "volcano.sh/volcano/pkg/scheduler/plugins/extender" "volcano.sh/volcano/pkg/scheduler/plugins/gang" + nettopology "volcano.sh/volcano/pkg/scheduler/plugins/network-topology" "volcano.sh/volcano/pkg/scheduler/plugins/nodegroup" "volcano.sh/volcano/pkg/scheduler/plugins/nodeorder" "volcano.sh/volcano/pkg/scheduler/plugins/numaaware" @@ -56,6 +57,7 @@ func init() { framework.RegisterPluginBuilder(overcommit.PluginName, overcommit.New) framework.RegisterPluginBuilder(sla.PluginName, sla.New) framework.RegisterPluginBuilder(tasktopology.PluginName, tasktopology.New) + framework.RegisterPluginBuilder(nettopology.PluginName, nettopology.New) framework.RegisterPluginBuilder(numaaware.PluginName, numaaware.New) framework.RegisterPluginBuilder(cdp.PluginName, cdp.New) framework.RegisterPluginBuilder(rescheduling.PluginName, rescheduling.New) diff --git a/pkg/scheduler/plugins/network-topology/netaware.go b/pkg/scheduler/plugins/network-topology/netaware.go new file mode 100644 index 0000000000..4b278d098b --- /dev/null +++ b/pkg/scheduler/plugins/network-topology/netaware.go @@ -0,0 +1,107 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopology + +import ( + "strings" + + "k8s.io/klog/v2" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +// PluginName indicates name of volcano scheduler plugin. +const ( + PluginName = "network-topology" + networkTopologyWeight = "network-topology.weight" + networkTopologyType = "network-topology.type" // strategy type for network topology + // strategy value for network-topology.type + staticAware = "static" + dynamicAware = "dynamic" +) + +type netTopPlugin struct { + pluginArguments framework.Arguments + weight int // network-topology plugin score weight + topologyType string // supported topology type: static, dynamic + staticTopAware staticTopAware // use node labels to generate topology +} + +// New return gang plugin +func New(arguments framework.Arguments) framework.Plugin { + return &netTopPlugin{pluginArguments: arguments, weight: 1, staticTopAware: staticTopAware{records: map[api.JobID]string{}}} +} + +func (np *netTopPlugin) Name() string { + return PluginName +} + +func (np *netTopPlugin) parseArguments() { + np.pluginArguments.GetInt(&np.weight, networkTopologyWeight) + value, ok := np.pluginArguments[networkTopologyType] + if !ok { + klog.Warningf("%s is not set, use default strategy %s", networkTopologyType, staticAware) + np.topologyType = staticAware + return + } + + v, ok := value.(string) + if !ok { + klog.Warningf("invalid value for %s, use default strategy %s", networkTopologyType, staticAware) + np.topologyType = staticAware + return + } + np.topologyType = strings.TrimSpace(v) +} + +// parseStaticAwareArguments return a boolean value indicating whether staticAware is valid to be used +func (np *netTopPlugin) parseStaticAwareArguments() bool { + keys, exist := np.pluginArguments[networkTopologyKeys] + if !exist { + klog.Warningf("plugin %s (with type %s) arguments does not configure %s, skip", PluginName, np.topologyType, networkTopologyKeys) + return false + } + topKeys, ok := keys.(string) + if !ok { + klog.Warningf("plugin %s (with type %s) arguments %s should has a string value", PluginName, networkTopologyKeys, networkTopologyKeys) + return false + } + for _, key := range strings.Split(topKeys, ",") { + np.staticTopAware.topologyKeys = append(np.staticTopAware.topologyKeys, strings.TrimSpace(key)) + } + np.staticTopAware.weight = np.weight + return true +} + +func (np *netTopPlugin) OnSessionOpen(ssn *framework.Session) { + np.parseArguments() + if np.topologyType == staticAware { + valid := np.parseStaticAwareArguments() + if valid { + np.staticTopAware.OnSessionOpen(ssn) + } + } else { + klog.Warningf("strategy %s is not supported in plugin %s", np.topologyType, PluginName) + } +} + +func (np *netTopPlugin) OnSessionClose(ssn *framework.Session) { + if np.topologyType == staticAware { + np.staticTopAware.OnSessionClose(ssn) + } +} diff --git a/pkg/scheduler/plugins/network-topology/netaware_test.go b/pkg/scheduler/plugins/network-topology/netaware_test.go new file mode 100644 index 0000000000..d8ba8d8514 --- /dev/null +++ b/pkg/scheduler/plugins/network-topology/netaware_test.go @@ -0,0 +1,190 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopology + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + + schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/scheduler/actions/allocate" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/conf" + "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/plugins/gang" + "volcano.sh/volcano/pkg/scheduler/plugins/predicates" + "volcano.sh/volcano/pkg/scheduler/plugins/priority" + "volcano.sh/volcano/pkg/scheduler/uthelper" + "volcano.sh/volcano/pkg/scheduler/util" +) + +func TestMain(m *testing.M) { + options.Default() // init default options params which used in some packages + os.Exit(m.Run()) +} + +func TestParse(t *testing.T) { + plug := New(framework.Arguments{networkTopologyWeight: 10, networkTopologyType: dynamicAware}) + netplug := plug.(*netTopPlugin) + netplug.parseArguments() + assert.Equal(t, netplug.weight, 10) + assert.Equal(t, netplug.topologyType, dynamicAware) + + plug = New(framework.Arguments{networkTopologyWeight: 10, networkTopologyKeys: " switch, idc"}) + netplug = plug.(*netTopPlugin) + netplug.parseArguments() + assert.Equal(t, netplug.topologyType, staticAware) + netplug.parseStaticAwareArguments() + assert.Equal(t, netplug.staticTopAware.topologyKeys, []string{"switch", "idc"}) + assert.Equal(t, netplug.staticTopAware.weight, 10) +} + +func TestStaticNetTopAware(t *testing.T) { + trueValue := true + plugins := map[string]framework.PluginBuilder{ + PluginName: New, + //Note: when enable gang plugin: pods need to set different role spec, because predicate plugin default enabled cache for same spec task + gang.PluginName: gang.New, + priority.PluginName: priority.New, + predicates.PluginName: predicates.New, + } + high, low := int32(100000), int32(10) + highPrioCls := util.BuildPriorityClass("high-priority", high) + lowPrioCls := util.BuildPriorityClass("low-priority", low) + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: gang.PluginName, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + }, + { + Name: priority.PluginName, + EnabledJobOrder: &trueValue, + EnabledTaskOrder: &trueValue, + }, + { + Name: predicates.PluginName, + EnabledPredicate: &trueValue, + Arguments: map[string]interface{}{ + predicates.NodeAffinityEnable: true, + predicates.PodAffinityEnable: true, + }, + }, + { + Name: PluginName, + EnabledNodeOrder: &trueValue, + Arguments: map[string]interface{}{ + networkTopologyKeys: "role, switch, idc", // same role nodes get higher score, then same switch, final same idc + }, + }, + }, + }, + } + tests := []uthelper.TestCommonStruct{ + { + Name: "with only one key, best effort to allocate to nodes labeled with this key", + PodGroups: []*schedulingv1beta1.PodGroup{ + util.BuildPodGroup("pg1", "ns1", "q1", 2, nil, schedulingv1beta1.PodGroupInqueue), + }, + Pods: []*v1.Pod{ // let pod1 choose node with label role=worker, then check pod2 also scheduled to that node + util.BuildPodWithPriority("ns1", "pod1", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg1", make(map[string]string), map[string]string{"role": "worker"}, &high), + util.BuildPod("ns1", "pod2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg1", make(map[string]string), make(map[string]string)), + }, + Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)}, + Nodes: []*v1.Node{ + util.BuildNode("node1", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker"}), + util.BuildNode("node2", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "ps"}), + util.BuildNode("node3", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "ps"}), + }, + ExpectBindsNum: 2, + ExpectBindMap: map[string]string{"ns1/pod1": "node1", "ns1/pod2": "node1"}, + }, + { + Name: "with multiple keys, the front keys get higher score", + PodGroups: []*schedulingv1beta1.PodGroup{ + util.BuildPodGroup("pg2", "ns1", "q1", 2, nil, schedulingv1beta1.PodGroupInqueue), + }, + Pods: []*v1.Pod{ // let pod1 choose node with label role=worker, then check pod2 also scheduled to other node with same label + util.BuildPodWithPriority("ns1", "pod1", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg2", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"role": "worker"}, &high), + util.BuildPod("ns1", "pod2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg2", map[string]string{"volcano.sh/task-spec": "master"}, make(map[string]string)), + }, + Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)}, + Nodes: []*v1.Node{ // let pod1 first choose node1, which also with label switch; then pod2 preferred to choose node2 with same label switch=swt1 + util.BuildNode("node1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker", "switch": "swt1", "idc": "idc1"}), + util.BuildNode("node2", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"switch": "swt1"}), + util.BuildNode("node3", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"idc": "idc1"}), + }, + ExpectBindsNum: 2, + ExpectBindMap: map[string]string{"ns1/pod1": "node1", "ns1/pod2": "node2"}, + }, + { + Name: "with not enougth resource, then choose other nodes without configed key", + PodGroups: []*schedulingv1beta1.PodGroup{ + util.BuildPodGroup("pg3", "ns1", "q1", 3, nil, schedulingv1beta1.PodGroupInqueue), + }, + Pods: []*v1.Pod{ // all pods must set different role spec, because predicate plugin default enabled cache for same spec task + util.BuildPodWithPriority("ns1", "pod1", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg3", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"role": "worker"}, &high), + util.BuildPod("ns1", "pod-work-2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg3", map[string]string{"volcano.sh/task-spec": "master"}, make(map[string]string)), + util.BuildPod("ns1", "pod-work-3", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg3", make(map[string]string), make(map[string]string)), + }, + Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)}, + Nodes: []*v1.Node{ + util.BuildNode("node1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker", "switch": "swt1"}), + util.BuildNode("node2", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"switch": "swt1"}), + util.BuildNode("node3", api.BuildResourceList("20", "20G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"idc": "idc1"}), + }, + ExpectBindsNum: 3, + ExpectBindMap: map[string]string{"ns1/pod1": "node1", "ns1/pod-work-2": "node2", "ns1/pod-work-3": "node3"}, + }, + { + Name: "with not enougth resource and cannot allocate whole job, deallocating", + PodGroups: []*schedulingv1beta1.PodGroup{ + util.BuildPodGroup("pg4", "ns1", "q1", 2, nil, schedulingv1beta1.PodGroupInqueue), + }, + Pods: []*v1.Pod{ + util.BuildPod("ns1", "pod-work-2", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg4", make(map[string]string), make(map[string]string)), + util.BuildPod("ns1", "pod-work-3", "", v1.PodPending, api.BuildResourceList("10", "10G"), "pg4", make(map[string]string), make(map[string]string)), + }, + Queues: []*schedulingv1beta1.Queue{util.BuildQueue("q1", 1, nil)}, + Nodes: []*v1.Node{ + util.BuildNode("node1", api.BuildResourceList("10", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"role": "worker", "switch": "swt1"}), + }, + ExpectBindsNum: 0, + ExpectBindMap: map[string]string{}, + }, + } + for i, test := range tests { + test.Plugins = plugins + test.PriClass = []*schedulingv1.PriorityClass{highPrioCls, lowPrioCls} + t.Run(test.Name, func(t *testing.T) { + test.RegisterSession(tiers, nil) + defer test.Close() + test.Run([]framework.Action{allocate.New()}) + err := test.CheckAll(i) + if err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/pkg/scheduler/plugins/network-topology/static_aware.go b/pkg/scheduler/plugins/network-topology/static_aware.go new file mode 100644 index 0000000000..8f6438163e --- /dev/null +++ b/pkg/scheduler/plugins/network-topology/static_aware.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package networktopology + +import ( + "errors" + "fmt" + + "k8s.io/klog/v2" + + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" +) + +const ( + networkTopologyKeys = "net-topology.keys" // keys for node labels, split by comma, which are used to distinguish same topology +) + +type staticTopAware struct { + weight int + records map[api.JobID]string // map job id to node name, used to make affinity for same job's other tasks + topologyKeys []string // topology key list, eg: "idc, rack, switch", key with more priority put at front of list +} + +func (st *staticTopAware) OnSessionOpen(ssn *framework.Session) { + if len(st.topologyKeys) == 0 { + return + } + // TODO: for job has running task, parse it and record node info + + nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { + if task == nil || node == nil || node.Node == nil { + return 0, errors.New("invalid task or node info in net-topology plugin") + } + target, ok := st.records[task.Job] + if !ok { // first task of a job, has no initial node info + return 0, nil + } + tNode := ssn.Nodes[target] + if tNode == nil || tNode.Node == nil { + return 0, fmt.Errorf("target node %s not found", target) + } + score := 0 + weight := st.weight + + tlabels := tNode.Node.Labels + labels := node.Node.Labels + lenth := len(st.topologyKeys) + for i, key := range st.topologyKeys { + tval := tlabels[key] + if len(tval) == 0 { // node's target label value is empty, skip + continue + } + if tval == labels[key] { + score += (lenth - i) // key with more priority at front of which with less priority + break + } + } + + klog.V(5).Infof("node %s get score %d multiple weight %d when schedule %s", node.Name, score, weight, task.Name) + return float64(score * weight), nil + } + ssn.AddNodeOrderFn(PluginName, nodeOrderFn) + + // Register event handlers: when job's first task bind to node, record node name + ssn.AddEventHandler(&framework.EventHandler{ + AllocateFunc: func(event *framework.Event) { + job := ssn.Jobs[event.Task.Job] + if job == nil { + return + } + if _, ok := st.records[job.UID]; !ok { + st.records[job.UID] = event.Task.NodeName + klog.V(5).Infof("add job %s affinity record to node %s in ssesion %v", job.UID, event.Task.NodeName, ssn.UID) + } + }, + DeallocateFunc: func(event *framework.Event) { + job := ssn.Jobs[event.Task.Job] + if job == nil { + return + } + delete(st.records, job.UID) + klog.V(5).Infof("clean job %s affinity record to node %s in ssesion %v", job.UID, event.Task.NodeName, ssn.UID) + }, + }) +} + +func (st *staticTopAware) OnSessionClose(ssn *framework.Session) { + st.records = nil +}