Skip to content

Commit ed013e5

Browse files
committed
Implements maxAvailableComponentSets for accurate estimator
Signed-off-by: mszacillo <[email protected]>
1 parent f60f341 commit ed013e5

File tree

4 files changed

+173
-6
lines changed

4 files changed

+173
-6
lines changed

pkg/estimator/client/accurate.go

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,13 @@ func (se *SchedulerEstimator) MaxAvailableReplicas(
6868
}
6969

7070
// MaxAvailableComponentSets returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host.
71-
func (se *SchedulerEstimator) MaxAvailableComponentSets(_ context.Context, _ ComponentSetEstimationRequest) ([]ComponentSetEstimationResponse, error) {
72-
// Dummy implementation: return nothing for now
73-
// TODO: Implement as part of #6734
74-
return nil, nil
71+
func (se *SchedulerEstimator) MaxAvailableComponentSets(
72+
parentCtx context.Context,
73+
req ComponentSetEstimationRequest,
74+
) ([]ComponentSetEstimationResponse, error) {
75+
return getClusterComponentSetsConcurrently(parentCtx, req.Clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
76+
return se.maxAvailableComponentSets(ctx, cluster, req.Namespace, req.Components)
77+
})
7578
}
7679

7780
// GetUnschedulableReplicas gets the unschedulable replicas which belong to a specified workload by calling karmada-scheduler-estimator.
@@ -86,6 +89,62 @@ func (se *SchedulerEstimator) GetUnschedulableReplicas(
8689
})
8790
}
8891

92+
func (se *SchedulerEstimator) maxAvailableComponentSets(
93+
ctx context.Context,
94+
cluster string,
95+
namespace string,
96+
components []workv1alpha2.Component,
97+
) (int32, error) {
98+
client, err := se.cache.GetClient(cluster)
99+
if err != nil {
100+
return 0, err
101+
}
102+
103+
pbReq := &pb.MaxAvailableComponentSetsRequest{
104+
Cluster: cluster,
105+
Components: make([]pb.Component, 0, len(components)),
106+
}
107+
108+
for _, comp := range components {
109+
// Deep-copy so that pointer is not shared between goroutines
110+
var cr *workv1alpha2.ComponentReplicaRequirements
111+
if comp.ReplicaRequirements != nil {
112+
cr = comp.ReplicaRequirements.DeepCopy()
113+
}
114+
115+
pbReq.Components = append(pbReq.Components, pb.Component{
116+
Name: comp.Name,
117+
Replicas: comp.Replicas,
118+
ReplicaRequirements: toPBReplicaRequirements(cr, namespace),
119+
})
120+
}
121+
122+
res, err := client.MaxAvailableComponentSets(ctx, pbReq)
123+
if err != nil {
124+
return 0, fmt.Errorf("gRPC request cluster(%s) estimator error when calling MaxAvailableComponentSets: %v", cluster, err)
125+
}
126+
return res.MaxSets, nil
127+
}
128+
129+
// toPBReplicaRequirements converts the API ComponentReplicaRequirements to the pb.ReplicaRequirements value.
130+
func toPBReplicaRequirements(cr *workv1alpha2.ComponentReplicaRequirements, namespace string) pb.ReplicaRequirements {
131+
var out pb.ReplicaRequirements
132+
out.Namespace = namespace
133+
if cr == nil {
134+
return out
135+
}
136+
out.ResourceRequest = cr.ResourceRequest
137+
out.PriorityClassName = cr.PriorityClassName
138+
if cr.NodeClaim != nil {
139+
out.NodeClaim = &pb.NodeClaim{
140+
NodeAffinity: cr.NodeClaim.HardNodeAffinity,
141+
NodeSelector: cr.NodeClaim.NodeSelector,
142+
Tolerations: cr.NodeClaim.Tolerations,
143+
}
144+
}
145+
return out
146+
}
147+
89148
func (se *SchedulerEstimator) maxAvailableReplicas(ctx context.Context, cluster string, replicaRequirements *workv1alpha2.ReplicaRequirements) (int32, error) {
90149
client, err := se.cache.GetClient(cluster)
91150
if err != nil {
@@ -167,3 +226,32 @@ func getClusterReplicasConcurrently(parentCtx context.Context, clusters []string
167226
}
168227
return clusterReplicas, utilerrors.AggregateGoroutines(funcs...)
169228
}
229+
230+
func getClusterComponentSetsConcurrently(
231+
parentCtx context.Context,
232+
clusters []*clusterv1alpha1.Cluster,
233+
timeout time.Duration,
234+
getClusterSetsEstimation func(ctx context.Context, cluster string) (int32, error),
235+
) ([]ComponentSetEstimationResponse, error) {
236+
// add object info to gRPC metadata
237+
if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok {
238+
parentCtx = metadata.AppendToOutgoingContext(parentCtx, string(util.ContextKeyObject), u)
239+
}
240+
ctx, cancel := context.WithTimeout(parentCtx, timeout)
241+
defer cancel()
242+
243+
results := make([]ComponentSetEstimationResponse, len(clusters))
244+
funcs := make([]func() error, len(clusters))
245+
for i, cluster := range clusters {
246+
localIndex, localCluster := i, cluster
247+
funcs[i] = func() error {
248+
sets, err := getClusterSetsEstimation(ctx, localCluster.Name)
249+
if err != nil {
250+
return err
251+
}
252+
results[localIndex] = ComponentSetEstimationResponse{Name: localCluster.Name, Sets: sets}
253+
return nil
254+
}
255+
}
256+
return results, utilerrors.AggregateGoroutines(funcs...)
257+
}

pkg/estimator/server/estimate.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,54 @@ func (es *AccurateSchedulerEstimatorServer) estimateReplicas(
101101
return res, nil
102102
}
103103

104+
// EstimateComponents returns max available component sets in terms of request and cluster status.
105+
func (es *AccurateSchedulerEstimatorServer) EstimateComponents(ctx context.Context, object string, request *pb.MaxAvailableComponentSetsRequest) (int32, error) {
106+
trace := utiltrace.New("Estimating", utiltrace.Field{Key: "namespacedName", Value: object})
107+
defer trace.LogIfLong(100 * time.Millisecond)
108+
109+
snapShot := schedcache.NewEmptySnapshot()
110+
if err := es.Cache.UpdateSnapshot(snapShot); err != nil {
111+
return 0, err
112+
}
113+
trace.Step("Snapshotting estimator cache and node infos done")
114+
115+
if snapShot.NumNodes() == 0 {
116+
return 0, nil
117+
}
118+
119+
maxAvailableComponentSets, err := es.estimateComponents(ctx, snapShot, request.Components)
120+
if err != nil {
121+
return 0, err
122+
}
123+
trace.Step("Computing estimation done")
124+
125+
return maxAvailableComponentSets, nil
126+
}
127+
128+
func (es *AccurateSchedulerEstimatorServer) estimateComponents(
129+
ctx context.Context,
130+
snapshot *schedcache.Snapshot,
131+
components []pb.Component,
132+
) (int32, error) {
133+
maxSets, ret := es.estimateFramework.RunEstimateComponentsPlugins(ctx, snapshot, components)
134+
135+
// No replicas can be scheduled on the cluster, skip further checks and return 0
136+
if ret.IsUnschedulable() {
137+
return 0, nil
138+
}
139+
140+
if !ret.IsSuccess() && !ret.IsNoOperation() {
141+
return maxSets, fmt.Errorf("estimate components plugins fails with %s", ret.Reasons())
142+
}
143+
144+
// TODO - Node info has not be included in this implementation. Node Resource Estimation will be moved to a separate plugin.
145+
146+
if ret.IsSuccess() {
147+
return maxSets, nil
148+
}
149+
return 0, nil
150+
}
151+
104152
func (es *AccurateSchedulerEstimatorServer) nodeMaxAvailableReplica(node *framework.NodeInfo, rl corev1.ResourceList) int32 {
105153
rest := node.Allocatable.Clone().SubResource(node.Requested)
106154
// The number of pods in a node is a kind of resource in node allocatable resources.

pkg/estimator/server/metrics/metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ const (
3333
EstimatingTypeMaxAvailableReplicas = "MaxAvailableReplicas"
3434
// EstimatingTypeGetUnschedulableReplicas - label of estimating type
3535
EstimatingTypeGetUnschedulableReplicas = "GetUnschedulableReplicas"
36+
// EstimatingTypeMaxAvailableComponentSets - label of estimating type
37+
EstimatingTypeMaxAvailableComponentSets = "MaxAvailableComponentSets"
3638
)
3739

3840
const (

pkg/estimator/server/server.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,37 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con
225225

226226
// MaxAvailableComponentSets is the implementation of gRPC interface.
227227
// It returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host.
228-
func (es *AccurateSchedulerEstimatorServer) MaxAvailableComponentSets(context.Context, *pb.MaxAvailableComponentSetsRequest) (*pb.MaxAvailableComponentSetsResponse, error) {
229-
return &pb.MaxAvailableComponentSetsResponse{}, fmt.Errorf("not implemented yet")
228+
func (es *AccurateSchedulerEstimatorServer) MaxAvailableComponentSets(ctx context.Context, request *pb.MaxAvailableComponentSetsRequest) (response *pb.MaxAvailableComponentSetsResponse, rerr error) {
229+
md, ok := metadata.FromIncomingContext(ctx)
230+
if !ok {
231+
klog.Warningf("No metadata from context.")
232+
}
233+
var object string
234+
if m := md.Get(string(util.ContextKeyObject)); len(m) != 0 {
235+
object = m[0]
236+
}
237+
238+
startTime := time.Now()
239+
240+
klog.V(4).Infof("Begin calculating available component sets of resource(%s), request: %s", object, pretty.Sprint(*request))
241+
defer func(start time.Time) {
242+
metrics.CountRequests(rerr, metrics.EstimatingTypeMaxAvailableComponentSets)
243+
metrics.UpdateEstimatingAlgorithmLatency(rerr, metrics.EstimatingTypeMaxAvailableComponentSets, metrics.EstimatingStepTotal, start)
244+
if rerr != nil {
245+
klog.Errorf("Failed to calculate cluster available component sets: %v", rerr)
246+
return
247+
}
248+
klog.V(2).Infof("Finish calculating cluster available component sets of resource(%s), max component sets: %d, time elapsed: %s", object, response.MaxSets, time.Since(start))
249+
}(startTime)
250+
251+
if request.Cluster != es.clusterName {
252+
return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName)
253+
}
254+
maxSets, err := es.EstimateComponents(ctx, object, request)
255+
if err != nil {
256+
return nil, fmt.Errorf("failed to estimate component sets: %v", err)
257+
}
258+
return &pb.MaxAvailableComponentSetsResponse{MaxSets: maxSets}, nil
230259
}
231260

232261
// GetUnschedulableReplicas is the implementation of gRPC interface. It will return the

0 commit comments

Comments
 (0)