diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 74fa5bdda..696cfa2a3 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -885,9 +885,10 @@ func (c *MPIJobController) getOrCreateService(job *kubeflow.MPIJob, newSvc *core } // If the Service selector is changed, update it. - if !equality.Semantic.DeepEqual(svc.Spec.Selector, newSvc.Spec.Selector) { + if !equality.Semantic.DeepEqual(svc.Spec.Selector, newSvc.Spec.Selector) || svc.Spec.PublishNotReadyAddresses != newSvc.Spec.PublishNotReadyAddresses { svc = svc.DeepCopy() svc.Spec.Selector = newSvc.Spec.Selector + svc.Spec.PublishNotReadyAddresses = newSvc.Spec.PublishNotReadyAddresses return c.kubeClient.CoreV1().Services(svc.Namespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) } @@ -1343,10 +1344,10 @@ func newJobService(job *kubeflow.MPIJob) *corev1.Service { kubeflow.OperatorNameLabel: kubeflow.OperatorName, kubeflow.JobNameLabel: job.Name, } - return newService(job, job.Name, labels) + return newService(job, job.Name, labels, ptr.Deref(job.Spec.RunLauncherAsWorker, false)) } -func newService(job *kubeflow.MPIJob, name string, selector map[string]string) *corev1.Service { +func newService(job *kubeflow.MPIJob, name string, selector map[string]string, isRunLauncherAsWorker bool) *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -1361,6 +1362,9 @@ func newService(job *kubeflow.MPIJob, name string, selector map[string]string) * Spec: corev1.ServiceSpec{ ClusterIP: corev1.ClusterIPNone, Selector: selector, + // The publishNotReadyAddresses must be true only for the MPIJob with runLauncherAsWorker + // to avoid deadlock to wait for Launcher is ready. + PublishNotReadyAddresses: isRunLauncherAsWorker, }, } }