Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head

// Configure RAY_AUTH_TOKEN and RAY_AUTH_MODE if auth is enabled.
if utils.IsAuthEnabled(&instance.Spec) {
setContainerTokenAuthEnvVars(instance.Name, &autoscalerContainer)
SetContainerTokenAuthEnvVars(instance.Name, &autoscalerContainer)
}

// Merge the user overrides from autoscalerOptions into the autoscaler container config.
Expand Down Expand Up @@ -248,20 +248,21 @@ func setAutoscalerV2EnvVars(podTemplate *corev1.PodTemplateSpec) {

// configureTokenAuth sets environment variables required for Ray token authentication
func configureTokenAuth(clusterName string, podTemplate *corev1.PodTemplateSpec) {
setContainerTokenAuthEnvVars(clusterName, &podTemplate.Spec.Containers[utils.RayContainerIndex])
SetContainerTokenAuthEnvVars(clusterName, &podTemplate.Spec.Containers[utils.RayContainerIndex])
// For RayJob Sidecar mode, we need to set the auth token for the submitter container.

// Configure auth token for wait-gcs-ready init container if it exists
for i, initContainer := range podTemplate.Spec.InitContainers {
if initContainer.Name != "wait-gcs-ready" {
continue
}

setContainerTokenAuthEnvVars(clusterName, &podTemplate.Spec.InitContainers[i])
SetContainerTokenAuthEnvVars(clusterName, &podTemplate.Spec.InitContainers[i])
}
}

// setContainerTokenAuthEnvVars sets Ray authentication env vars for a container.
func setContainerTokenAuthEnvVars(clusterName string, container *corev1.Container) {
// SetContainerTokenAuthEnvVars sets Ray authentication env vars for a container.
func SetContainerTokenAuthEnvVars(clusterName string, container *corev1.Container) {
container.Env = append(container.Env, corev1.EnvVar{
Name: utils.RAY_AUTH_MODE_ENV_VAR,
Value: string(rayv1.AuthModeToken),
Expand Down
10 changes: 7 additions & 3 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func getSubmitterTemplate(rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv
// Set the default value for the optional field SubmitterPodTemplate if not provided.
submitterTemplate := common.GetSubmitterTemplate(&rayJobInstance.Spec, &rayClusterInstance.Spec)

if err := configureSubmitterContainer(&submitterTemplate.Spec.Containers[utils.RayContainerIndex], rayJobInstance, rayv1.K8sJobMode); err != nil {
if err := configureSubmitterContainer(&submitterTemplate.Spec.Containers[utils.RayContainerIndex], rayJobInstance, rayClusterInstance, rayv1.K8sJobMode); err != nil {
return corev1.PodTemplateSpec{}, err
}

Expand All @@ -570,14 +570,15 @@ func getSubmitterTemplate(rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv
func getSubmitterContainer(rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (corev1.Container, error) {
var submitterContainer corev1.Container = common.GetDefaultSubmitterContainer(&rayClusterInstance.Spec)

if err := configureSubmitterContainer(&submitterContainer, rayJobInstance, rayv1.SidecarMode); err != nil {
if err := configureSubmitterContainer(&submitterContainer, rayJobInstance, rayClusterInstance, rayv1.SidecarMode); err != nil {
return corev1.Container{}, err
}

return submitterContainer, nil
}

func configureSubmitterContainer(container *corev1.Container, rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) error {
// pass the RayCluster instance for cluster selector case
func configureSubmitterContainer(container *corev1.Container, rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster, submissionMode rayv1.JobSubmissionMode) error {
// If the command in the submitter container manifest isn't set, use the default command.
jobCmd, err := common.BuildJobSubmitCommand(rayJobInstance, submissionMode)
if err != nil {
Expand All @@ -600,6 +601,9 @@ func configureSubmitterContainer(container *corev1.Container, rayJobInstance *ra
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ADDRESS, Value: rayJobInstance.Status.DashboardURL})
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_JOB_SUBMISSION_ID, Value: rayJobInstance.Status.JobId})
if rayClusterInstance != nil && utils.IsAuthEnabled(&rayClusterInstance.Spec) {
common.SetContainerTokenAuthEnvVars(rayClusterInstance.Name, container)
}

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
InitClient(client *http.Client, dashboardURL string, authToken string)
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error)
Expand All @@ -44,11 +44,20 @@ type RayDashboardClientInterface interface {
type RayDashboardClient struct {
client *http.Client
dashboardURL string
authToken string
}

func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) {
func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, authToken string) {
r.client = client
r.dashboardURL = dashboardURL
r.authToken = authToken
}

// TODO: change authorization to x-ray-authorization after this PR is merged: https://github.com/ray-project/ray/pull/58819
func (r *RayDashboardClient) setAuthHeader(req *http.Request) {
if r.authToken != "" {
req.Header.Set("authorization", fmt.Sprintf("Bearer %s", r.authToken))
}
}

// UpdateDeployments update the deployments in the Ray cluster.
Expand All @@ -60,6 +69,7 @@ func (r *RayDashboardClient) UpdateDeployments(ctx context.Context, configJson [
}

req.Header.Set("Content-Type", "application/json")
r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
Expand Down Expand Up @@ -102,6 +112,8 @@ func (r *RayDashboardClient) GetServeDetails(ctx context.Context) (*utiltypes.Se
return nil, err
}

r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -147,6 +159,8 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti
return nil, err
}

r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -177,6 +191,8 @@ func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobI
return nil, err
}

r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -221,6 +237,8 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype
}

req.Header.Set("Content-Type", "application/json")
r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return
Expand Down Expand Up @@ -255,6 +273,9 @@ func (r *RayDashboardClient) GetJobLog(ctx context.Context, jobName string) (*st
if err != nil {
return nil, err
}

r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -288,6 +309,8 @@ func (r *RayDashboardClient) StopJob(ctx context.Context, jobName string) (err e
}

req.Header.Set("Content-Type", "application/json")
r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return err
Expand Down Expand Up @@ -324,6 +347,8 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro
}

req.Header.Set("Content-Type", "application/json")
r.setAuthHeader(req)

resp, err := r.client.Do(req)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type FakeRayDashboardClient struct {

var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) {
func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ string) {
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, configJson []byte) error {
Expand Down
35 changes: 32 additions & 3 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
meta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -897,6 +898,28 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
dashboardClient := &dashboardclient.RayDashboardClient{}
var authToken string

if rayCluster != nil && rayCluster.Spec.AuthOptions != nil && rayCluster.Spec.AuthOptions.Mode == rayv1.AuthModeToken {
secretName := CheckName(rayCluster.Name)
secret := &corev1.Secret{}
secretKey := types.NamespacedName{
Name: secretName,
Namespace: rayCluster.Namespace,
}

if err := mgr.GetClient().Get(context.Background(), secretKey, secret); err != nil {
return nil, fmt.Errorf("failed to get auth secret %s/%s: %w", rayCluster.Namespace, secretName, err)
}

tokenBytes, exists := secret.Data[RAY_AUTH_TOKEN_SECRET_KEY]
if !exists {
return nil, fmt.Errorf("auth token key '%q' not found in secret %s/%s", RAY_AUTH_TOKEN_SECRET_KEY, rayCluster.Namespace, secretName)
}

authToken = string(tokenBytes)
}

if useKubernetesProxy {
var err error
headSvcName := rayCluster.Status.Head.ServiceName
Expand All @@ -913,13 +936,19 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
// configured to communicate with the Kubernetes API server.
mgr.GetHTTPClient(),
fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
authToken,
)
return dashboardClient, nil
}

dashboardClient.InitClient(&http.Client{
Timeout: 2 * time.Second,
}, "http://"+url)
dashboardClient.InitClient(
&http.Client{
Timeout: 2 * time.Second,
},
"http://"+url,
authToken,
)

return dashboardClient, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/rayjob-submitter/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
}
rayDashboardClient := &dashboardclient.RayDashboardClient{}
address = rayjobsubmitter.JobSubmissionURL(address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, "")
submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req)
if err != nil {
if strings.Contains(err.Error(), "Please use a different submission_id") {
Expand Down
Loading