Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
// PodReasonSchedulable reason in PodScheduled PodCondition means that the scheduler
// can schedule the pod right now, but not bind yet
PodReasonSchedulable = "Schedulable"
// PodReasonSchedulerError reason in PodScheduled PodCondition means that the scheduler
// tried to schedule the pod, but went error when scheduling
// for example bind pod return error.
PodReasonSchedulerError = "SchedulerError"
)

// FitErrors is set of FitError on many nodes
Expand Down
16 changes: 11 additions & 5 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ type DefaultBinder struct {
}

// Bind will send bind request to api server
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, error) {
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, []error) {
var errTasks []*schedulingapi.TaskInfo
var errs []error
for _, task := range tasks {
p := task.Pod
if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
Expand All @@ -188,14 +189,15 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli
metav1.CreateOptions{}); err != nil {
klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err)
errTasks = append(errTasks, task)
errs = append(errs, err)
} else {
db.recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName)
metrics.UpdateTaskScheduleDuration(metrics.Duration(p.CreationTimestamp.Time)) // update metrics as soon as pod is bind
}
}

if len(errTasks) > 0 {
return errTasks, fmt.Errorf("failed to bind pods")
return errTasks, errs
}

return nil, nil
Expand Down Expand Up @@ -895,11 +897,15 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string)
// Bind binds task to the target host.
func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) {
tmp := time.Now()
errTasks, err := sc.Binder.Bind(sc.kubeClient, tasks)
if err == nil {
errTasks, errs := sc.Binder.Bind(sc.kubeClient, tasks)
if errs == nil {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
} else {
for _, task := range errTasks {
for i, task := range errTasks {
unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, errs[i])
if err := sc.taskUnschedulable(task, schedulingapi.PodReasonSchedulerError, unschedulableMsg, ""); err != nil {
klog.ErrorS(err, "Failed to update pod status when bind task error", "task", task.Name)
}
klog.V(2).Infof("resyncTask task %s", task.Name)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
sc.resyncTask(task)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type VolumeBinder interface {

// Binder interface for binding task and hostname
type Binder interface {
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, error)
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error)
}

// Evictor interface for evict pods
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/util/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ type FakeBinder struct {
}

// Bind used by fake binder struct to bind pods
func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, error) {
func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error) {
fb.Lock()
defer fb.Unlock()
for _, p := range tasks {
Expand Down