Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions pkg/estimator/client/accurate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ func (se *SchedulerEstimator) MaxAvailableReplicas(
}

// MaxAvailableComponentSets returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host.
func (se *SchedulerEstimator) MaxAvailableComponentSets(_ context.Context, _ ComponentSetEstimationRequest) ([]ComponentSetEstimationResponse, error) {
// Dummy implementation: return nothing for now
// TODO: Implement as part of #6734
return nil, nil
func (se *SchedulerEstimator) MaxAvailableComponentSets(
parentCtx context.Context,
req ComponentSetEstimationRequest,
) ([]ComponentSetEstimationResponse, error) {
return getClusterComponentSetsConcurrently(parentCtx, req.Clusters, se.timeout, func(ctx context.Context, cluster string) (int32, error) {
return se.maxAvailableComponentSets(ctx, cluster, req.Namespace, req.Components)
})
}

// GetUnschedulableReplicas gets the unschedulable replicas which belong to a specified workload by calling karmada-scheduler-estimator.
Expand All @@ -86,6 +89,62 @@ func (se *SchedulerEstimator) GetUnschedulableReplicas(
})
}

func (se *SchedulerEstimator) maxAvailableComponentSets(
ctx context.Context,
cluster string,
namespace string,
components []workv1alpha2.Component,
) (int32, error) {
client, err := se.cache.GetClient(cluster)
if err != nil {
return 0, err
}

pbReq := &pb.MaxAvailableComponentSetsRequest{
Cluster: cluster,
Components: make([]pb.Component, 0, len(components)),
}

for _, comp := range components {
// Deep-copy so that pointer is not shared between goroutines
var cr *workv1alpha2.ComponentReplicaRequirements
if comp.ReplicaRequirements != nil {
cr = comp.ReplicaRequirements.DeepCopy()
}

pbReq.Components = append(pbReq.Components, pb.Component{
Name: comp.Name,
Replicas: comp.Replicas,
ReplicaRequirements: toPBReplicaRequirements(cr, namespace),
})
}

res, err := client.MaxAvailableComponentSets(ctx, pbReq)
if err != nil {
return 0, fmt.Errorf("gRPC request cluster(%s) estimator error when calling MaxAvailableComponentSets: %v", cluster, err)
}
return res.MaxSets, nil
}

// toPBReplicaRequirements converts the API ComponentReplicaRequirements to the pb.ReplicaRequirements value.
func toPBReplicaRequirements(cr *workv1alpha2.ComponentReplicaRequirements, namespace string) pb.ReplicaRequirements {
var out pb.ReplicaRequirements
out.Namespace = namespace
if cr == nil {
return out
}
out.ResourceRequest = cr.ResourceRequest
out.PriorityClassName = cr.PriorityClassName
if cr.NodeClaim != nil {
out.NodeClaim = &pb.NodeClaim{
NodeAffinity: cr.NodeClaim.HardNodeAffinity,
NodeSelector: cr.NodeClaim.NodeSelector,
Tolerations: cr.NodeClaim.Tolerations,
}
}
return out
}
Comment on lines +129 to +146
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct and not a blocker. But we should give it another look at the definition of the gRPC data structure. The conversion should be more straightforward.

@seanlaii Do you have any idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS:
I added a follow-up task to #6734 for revising the design here.


func (se *SchedulerEstimator) maxAvailableReplicas(ctx context.Context, cluster string, replicaRequirements *workv1alpha2.ReplicaRequirements) (int32, error) {
client, err := se.cache.GetClient(cluster)
if err != nil {
Expand Down Expand Up @@ -167,3 +226,32 @@ func getClusterReplicasConcurrently(parentCtx context.Context, clusters []string
}
return clusterReplicas, utilerrors.AggregateGoroutines(funcs...)
}

func getClusterComponentSetsConcurrently(
parentCtx context.Context,
clusters []*clusterv1alpha1.Cluster,
timeout time.Duration,
getClusterSetsEstimation func(ctx context.Context, cluster string) (int32, error),
) ([]ComponentSetEstimationResponse, error) {
// add object info to gRPC metadata
if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok {
parentCtx = metadata.AppendToOutgoingContext(parentCtx, string(util.ContextKeyObject), u)
}
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()

results := make([]ComponentSetEstimationResponse, len(clusters))
funcs := make([]func() error, len(clusters))
for i, cluster := range clusters {
localIndex, localCluster := i, cluster
funcs[i] = func() error {
sets, err := getClusterSetsEstimation(ctx, localCluster.Name)
if err != nil {
return err
}
results[localIndex] = ComponentSetEstimationResponse{Name: localCluster.Name, Sets: sets}
return nil
}
}
return results, utilerrors.AggregateGoroutines(funcs...)
}
49 changes: 49 additions & 0 deletions pkg/estimator/server/estimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,55 @@ func (es *AccurateSchedulerEstimatorServer) estimateReplicas(
return res, nil
}

// EstimateComponents returns max available component sets in terms of request and cluster status.
func (es *AccurateSchedulerEstimatorServer) EstimateComponents(ctx context.Context, object string, request *pb.MaxAvailableComponentSetsRequest) (int32, error) {
trace := utiltrace.New("Estimating", utiltrace.Field{Key: "namespacedName", Value: object})
defer trace.LogIfLong(100 * time.Millisecond)

snapShot := schedcache.NewEmptySnapshot()
if err := es.Cache.UpdateSnapshot(snapShot); err != nil {
return 0, err
}
trace.Step("Snapshotting estimator cache and node infos done")

if snapShot.NumNodes() == 0 {
return 0, nil
}

maxAvailableComponentSets, err := es.estimateComponents(ctx, snapShot, request.Components)
if err != nil {
return 0, err
}
trace.Step("Computing estimation done")

return maxAvailableComponentSets, nil
}

func (es *AccurateSchedulerEstimatorServer) estimateComponents(
ctx context.Context,
snapshot *schedcache.Snapshot,
components []pb.Component,
) (int32, error) {
maxSets, ret := es.estimateFramework.RunEstimateComponentsPlugins(ctx, snapshot, components)

// No replicas can be scheduled on the cluster, skip further checks and return 0
if ret.IsUnschedulable() {
return 0, nil
}

if !ret.IsSuccess() && !ret.IsNoOperation() {
return maxSets, fmt.Errorf("estimate components plugins fails with %s", ret.Reasons())
}

// TODO - Node info has not be included in this implementation. Node Resource Estimation will be moved to a separate plugin.

// If no plugins were run (NoOperation), return maxSets value to prevent scheduling failure
if ret.IsSuccess() || ret.IsNoOperation() {
return maxSets, nil
}
return 0, nil
}

func (es *AccurateSchedulerEstimatorServer) nodeMaxAvailableReplica(node *framework.NodeInfo, rl corev1.ResourceList) int32 {
rest := node.Allocatable.Clone().SubResource(node.Requested)
// The number of pods in a node is a kind of resource in node allocatable resources.
Expand Down
2 changes: 2 additions & 0 deletions pkg/estimator/server/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
EstimatingTypeMaxAvailableReplicas = "MaxAvailableReplicas"
// EstimatingTypeGetUnschedulableReplicas - label of estimating type
EstimatingTypeGetUnschedulableReplicas = "GetUnschedulableReplicas"
// EstimatingTypeMaxAvailableComponentSets - label of estimating type
EstimatingTypeMaxAvailableComponentSets = "MaxAvailableComponentSets"
)

const (
Expand Down
33 changes: 31 additions & 2 deletions pkg/estimator/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,37 @@ func (es *AccurateSchedulerEstimatorServer) MaxAvailableReplicas(ctx context.Con

// MaxAvailableComponentSets is the implementation of gRPC interface.
// It returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host.
func (es *AccurateSchedulerEstimatorServer) MaxAvailableComponentSets(context.Context, *pb.MaxAvailableComponentSetsRequest) (*pb.MaxAvailableComponentSetsResponse, error) {
return &pb.MaxAvailableComponentSetsResponse{}, fmt.Errorf("not implemented yet")
func (es *AccurateSchedulerEstimatorServer) MaxAvailableComponentSets(ctx context.Context, request *pb.MaxAvailableComponentSetsRequest) (response *pb.MaxAvailableComponentSetsResponse, rerr error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
klog.Warningf("No metadata from context.")
}
var object string
if m := md.Get(string(util.ContextKeyObject)); len(m) != 0 {
object = m[0]
}

startTime := time.Now()

klog.V(4).Infof("Begin calculating available component sets of resource(%s), request: %s", object, pretty.Sprint(*request))
defer func(start time.Time) {
metrics.CountRequests(rerr, metrics.EstimatingTypeMaxAvailableComponentSets)
metrics.UpdateEstimatingAlgorithmLatency(rerr, metrics.EstimatingTypeMaxAvailableComponentSets, metrics.EstimatingStepTotal, start)
if rerr != nil {
klog.Errorf("Failed to calculate cluster available component sets: %v", rerr)
return
}
klog.V(2).Infof("Finish calculating cluster available component sets of resource(%s), max component sets: %d, time elapsed: %s", object, response.MaxSets, time.Since(start))
}(startTime)

if request.Cluster != es.clusterName {
return nil, fmt.Errorf("cluster name does not match, got: %s, desire: %s", request.Cluster, es.clusterName)
}
maxSets, err := es.EstimateComponents(ctx, object, request)
if err != nil {
return nil, fmt.Errorf("failed to estimate component sets: %v", err)
}
return &pb.MaxAvailableComponentSetsResponse{MaxSets: maxSets}, nil
}

// GetUnschedulableReplicas is the implementation of gRPC interface. It will return the
Expand Down