diff --git a/pkg/estimator/client/accurate.go b/pkg/estimator/client/accurate.go index 7720f3ab426d..249817565746 100644 --- a/pkg/estimator/client/accurate.go +++ b/pkg/estimator/client/accurate.go @@ -68,10 +68,13 @@ func (se *SchedulerEstimator) MaxAvailableReplicas( } // MaxAvailableComponentSets returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host. -func (se *SchedulerEstimator) MaxAvailableComponentSets(_ context.Context, _ ComponentSetEstimationRequest) ([]ComponentSetEstimationResponse, error) { - // Dummy implementation: return nothing for now - // TODO: Implement as part of #6734 - return nil, nil +func (se *SchedulerEstimator) MaxAvailableComponentSets( + parentCtx context.Context, + req ComponentSetEstimationRequest, +) ([]ComponentSetEstimationResponse, error) { + return getClusterComponentSetsConcurrently(parentCtx, req.Clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) { + return se.maxAvailableComponentSets(ctx, cluster, req.Namespace, req.Components) + }) } // GetUnschedulableReplicas gets the unschedulable replicas which belong to a specified workload by calling karmada-scheduler-estimator. @@ -86,6 +89,62 @@ func (se *SchedulerEstimator) GetUnschedulableReplicas( }) } +func (se *SchedulerEstimator) maxAvailableComponentSets( + ctx context.Context, + cluster string, + namespace string, + components []workv1alpha2.Component, +) (int32, error) { + client, err := se.cache.GetClient(cluster) + if err != nil { + return 0, err + } + + pbReq := &pb.MaxAvailableComponentSetsRequest{ + Cluster: cluster, + Components: make([]pb.Component, 0, len(components)), + } + + for _, comp := range components { + // Deep-copy so that pointer is not shared between goroutines + var cr *workv1alpha2.ComponentReplicaRequirements + if comp.ReplicaRequirements != nil { + cr = comp.ReplicaRequirements.DeepCopy() + } + + pbReq.Components = append(pbReq.Components, pb.Component{ + Name: comp.Name, + Replicas: comp.Replicas, + ReplicaRequirements: toPBReplicaRequirements(cr, namespace), + }) + } + + res, err := client.MaxAvailableComponentSets(ctx, pbReq) + if err != nil { + return 0, fmt.Errorf("gRPC request cluster(%s) estimator error when calling MaxAvailableComponentSets: %v", cluster, err) + } + return res.MaxSets, nil +} + +// toPBReplicaRequirements converts the API ComponentReplicaRequirements to the pb.ReplicaRequirements value. +func toPBReplicaRequirements(cr *workv1alpha2.ComponentReplicaRequirements, namespace string) pb.ReplicaRequirements { + var out pb.ReplicaRequirements + out.Namespace = namespace + if cr == nil { + return out + } + out.ResourceRequest = cr.ResourceRequest + out.PriorityClassName = cr.PriorityClassName + if cr.NodeClaim != nil { + out.NodeClaim = &pb.NodeClaim{ + NodeAffinity: cr.NodeClaim.HardNodeAffinity, + NodeSelector: cr.NodeClaim.NodeSelector, + Tolerations: cr.NodeClaim.Tolerations, + } + } + return out +} + func (se *SchedulerEstimator) maxAvailableReplicas(ctx context.Context, cluster string, replicaRequirements *workv1alpha2.ReplicaRequirements) (int32, error) { client, err := se.cache.GetClient(cluster) if err != nil { @@ -167,3 +226,32 @@ func getClusterReplicasConcurrently(parentCtx context.Context, clusters []string } return clusterReplicas, utilerrors.AggregateGoroutines(funcs...) } + +func getClusterComponentSetsConcurrently( + parentCtx context.Context, + clusters []*clusterv1alpha1.Cluster, + timeout time.Duration, + getClusterSetsEstimation func(ctx context.Context, cluster string) (int32, error), +) ([]ComponentSetEstimationResponse, error) { + // add object info to gRPC metadata + if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok { + parentCtx = metadata.AppendToOutgoingContext(parentCtx, string(util.ContextKeyObject), u) + } + ctx, cancel := context.WithTimeout(parentCtx, timeout) + defer cancel() + + results := make([]ComponentSetEstimationResponse, len(clusters)) + funcs := make([]func() error, len(clusters)) + for i, cluster := range clusters { + localIndex, localCluster := i, cluster + funcs[i] = func() error { + sets, err := getClusterSetsEstimation(ctx, localCluster.Name) + if err != nil { + return err + } + results[localIndex] = ComponentSetEstimationResponse{Name: localCluster.Name, Sets: sets} + return nil + } + } + return results, utilerrors.AggregateGoroutines(funcs...) +} diff --git a/pkg/estimator/server/estimate.go b/pkg/estimator/server/estimate.go index 7687079b4df8..e052e21842cf 100644 --- a/pkg/estimator/server/estimate.go +++ b/pkg/estimator/server/estimate.go @@ -101,6 +101,55 @@ func (es *AccurateSchedulerEstimatorServer) estimateReplicas( return res, nil } +// EstimateComponents returns max available component sets in terms of request and cluster status. +func (es *AccurateSchedulerEstimatorServer) EstimateComponents(ctx context.Context, object string, request *pb.MaxAvailableComponentSetsRequest) (int32, error) { + trace := utiltrace.New("Estimating", utiltrace.Field{Key: "namespacedName", Value: object}) + defer trace.LogIfLong(100 * time.Millisecond) + + snapShot := schedcache.NewEmptySnapshot() + if err := es.Cache.UpdateSnapshot(snapShot); err != nil { + return 0, err + } + trace.Step("Snapshotting estimator cache and node infos done") + + if snapShot.NumNodes() == 0 { + return 0, nil + } + + maxAvailableComponentSets, err := es.estimateComponents(ctx, snapShot, request.Components) + if err != nil { + return 0, err + } + trace.Step("Computing estimation done") + + return maxAvailableComponentSets, nil +} + +func (es *AccurateSchedulerEstimatorServer) estimateComponents( + ctx context.Context, + snapshot *schedcache.Snapshot, + components []pb.Component, +) (int32, error) { + maxSets, ret := es.estimateFramework.RunEstimateComponentsPlugins(ctx, snapshot, components) + + // No replicas can be scheduled on the cluster, skip further checks and return 0 + if ret.IsUnschedulable() { + return 0, nil + } + + if !ret.IsSuccess() && !ret.IsNoOperation() { + return maxSets, fmt.Errorf("estimate components plugins fails with %s", ret.Reasons()) + } + + // TODO - Node info has not be included in this implementation. Node Resource Estimation will be moved to a separate plugin. + + // If no plugins were run (NoOperation), return maxSets value to prevent scheduling failure + if ret.IsSuccess() || ret.IsNoOperation() { + return maxSets, nil + } + return 0, nil +} + func (es *AccurateSchedulerEstimatorServer) nodeMaxAvailableReplica(node *framework.NodeInfo, rl corev1.ResourceList) int32 { rest := node.Allocatable.Clone().SubResource(node.Requested) // The number of pods in a node is a kind of resource in node allocatable resources. diff --git a/pkg/estimator/server/metrics/metrics.go b/pkg/estimator/server/metrics/metrics.go index abbc74324f06..7803ba7877ed 100644 --- a/pkg/estimator/server/metrics/metrics.go +++ b/pkg/estimator/server/metrics/metrics.go @@ -33,6 +33,8 @@ const ( EstimatingTypeMaxAvailableReplicas = "MaxAvailableReplicas" // EstimatingTypeGetUnschedulableReplicas - label of estimating type EstimatingTypeGetUnschedulableReplicas = "GetUnschedulableReplicas" + // EstimatingTypeMaxAvailableComponentSets - label of estimating type + EstimatingTypeMaxAvailableComponentSets = "MaxAvailableComponentSets" ) const ( diff --git a/pkg/estimator/server/server.go b/pkg/estimator/server/server.go index 22a4ee5ed6bc..56e83908d344 100644 --- a/pkg/estimator/server/server.go +++ b/pkg/estimator/server/server.go @@ -225,8 +225,37 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con // MaxAvailableComponentSets is the implementation of gRPC interface. // It returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host. -func (es *AccurateSchedulerEstimatorServer) MaxAvailableComponentSets(context.Context, *pb.MaxAvailableComponentSetsRequest) (*pb.MaxAvailableComponentSetsResponse, error) { - return &pb.MaxAvailableComponentSetsResponse{}, fmt.Errorf("not implemented yet") +func (es *AccurateSchedulerEstimatorServer) MaxAvailableComponentSets(ctx context.Context, request *pb.MaxAvailableComponentSetsRequest) (response *pb.MaxAvailableComponentSetsResponse, rerr error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + klog.Warningf("No metadata from context.") + } + var object string + if m := md.Get(string(util.ContextKeyObject)); len(m) != 0 { + object = m[0] + } + + startTime := time.Now() + + klog.V(4).Infof("Begin calculating available component sets of resource(%s), request: %s", object, pretty.Sprint(*request)) + defer func(start time.Time) { + metrics.CountRequests(rerr, metrics.EstimatingTypeMaxAvailableComponentSets) + metrics.UpdateEstimatingAlgorithmLatency(rerr, metrics.EstimatingTypeMaxAvailableComponentSets, metrics.EstimatingStepTotal, start) + if rerr != nil { + klog.Errorf("Failed to calculate cluster available component sets: %v", rerr) + return + } + klog.V(2).Infof("Finish calculating cluster available component sets of resource(%s), max component sets: %d, time elapsed: %s", object, response.MaxSets, time.Since(start)) + }(startTime) + + if request.Cluster != es.clusterName { + return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName) + } + maxSets, err := es.EstimateComponents(ctx, object, request) + if err != nil { + return nil, fmt.Errorf("failed to estimate component sets: %v", err) + } + return &pb.MaxAvailableComponentSetsResponse{MaxSets: maxSets}, nil } // GetUnschedulableReplicas is the implementation of gRPC interface. It will return the