Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/gateway-config-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ gateway:

enableSwaggerUI: true

operationTimeoutSeconds:
list: 60
get: 60
create: 60
delete: 60

sparkManager:
clusterAuthType: kubeconfig

Expand Down
39 changes: 30 additions & 9 deletions internal/gateway/application/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/kubeflow/spark-operator/v2/api/v1beta2"
"k8s.io/apimachinery/pkg/util/json"
Expand All @@ -34,10 +35,11 @@ import (
)

type SparkManagerRepository struct {
ClusterEndpoints map[string]string
ClusterEndpoints map[string]string
operationTimeoutSeconds config.OperationTimeoutSeconds
}

func NewSparkManagerRepository(clusters []model.KubeCluster, sparkManagerHostnameTemplate string, sparkManagerPort string, debugPorts map[string]config.DebugPort) (*SparkManagerRepository, error) {
func NewSparkManagerRepository(clusters []model.KubeCluster, sparkManagerHostnameTemplate string, sparkManagerPort string, debugPorts map[string]config.DebugPort, operationTimeoutSeconds config.OperationTimeoutSeconds) (*SparkManagerRepository, error) {

hostNameF := "http://%s:%s"
clusterEndpoints := map[string]string{}
Expand Down Expand Up @@ -66,7 +68,8 @@ func NewSparkManagerRepository(clusters []model.KubeCluster, sparkManagerHostnam
}

return &SparkManagerRepository{
ClusterEndpoints: clusterEndpoints,
ClusterEndpoints: clusterEndpoints,
operationTimeoutSeconds: operationTimeoutSeconds,
}, nil
}

Expand All @@ -81,7 +84,10 @@ func (r *SparkManagerRepository) Get(ctx context.Context, cluster model.KubeClus
return nil, gatewayerrors.NewFrom(fmt.Errorf("error creating %s request: %w", http.MethodGet, err))
}

resp, respBody, err := pkgHttp.HttpRequest(ctx, &http.Client{}, request)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*r.operationTimeoutSeconds.Get)
defer cancel()

resp, respBody, err := pkgHttp.HttpRequest(timeoutCtx, &http.Client{}, request)
if err != nil {
return nil, gatewayerrors.NewFrom(err)
}
Expand Down Expand Up @@ -110,7 +116,10 @@ func (r *SparkManagerRepository) List(ctx context.Context, cluster model.KubeClu
return nil, gatewayerrors.NewFrom(fmt.Errorf("error creating %s request: %w", http.MethodGet, err))
}

resp, respBody, err := pkgHttp.HttpRequest(ctx, &http.Client{}, request)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*r.operationTimeoutSeconds.List)
defer cancel()

resp, respBody, err := pkgHttp.HttpRequest(timeoutCtx, &http.Client{}, request)
if err != nil {
return nil, gatewayerrors.NewFrom(err)
}
Expand Down Expand Up @@ -139,7 +148,10 @@ func (r *SparkManagerRepository) Status(ctx context.Context, cluster model.KubeC
return nil, gatewayerrors.NewFrom(fmt.Errorf("error creating %s request: %w", http.MethodGet, err))
}

resp, respBody, err := pkgHttp.HttpRequest(ctx, &http.Client{}, request)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*r.operationTimeoutSeconds.Get)
defer cancel()

resp, respBody, err := pkgHttp.HttpRequest(timeoutCtx, &http.Client{}, request)
if err != nil {
return nil, gatewayerrors.NewFrom(err)
}
Expand Down Expand Up @@ -168,7 +180,10 @@ func (r *SparkManagerRepository) Logs(ctx context.Context, cluster model.KubeClu
return nil, gatewayerrors.NewFrom(fmt.Errorf("error creating %s request: %w", http.MethodGet, err))
}

resp, respBody, err := pkgHttp.HttpRequest(ctx, &http.Client{}, request)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*r.operationTimeoutSeconds.Get)
defer cancel()

resp, respBody, err := pkgHttp.HttpRequest(timeoutCtx, &http.Client{}, request)
if err != nil {
return nil, gatewayerrors.NewFrom(err)
}
Expand Down Expand Up @@ -203,7 +218,10 @@ func (r *SparkManagerRepository) Create(ctx context.Context, cluster model.KubeC
}
request.Header.Set("Content-Type", "application/json")

resp, respBody, err := pkgHttp.HttpRequest(ctx, &http.Client{}, request)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*r.operationTimeoutSeconds.Create)
defer cancel()

resp, respBody, err := pkgHttp.HttpRequest(timeoutCtx, &http.Client{}, request)
if err != nil {
return nil, gatewayerrors.NewFrom(err)
}
Expand Down Expand Up @@ -232,7 +250,10 @@ func (r *SparkManagerRepository) Delete(ctx context.Context, cluster model.KubeC
return gatewayerrors.NewFrom(fmt.Errorf("error creating %s request: %w", http.MethodDelete, err))
}

resp, respBody, err := pkgHttp.HttpRequest(ctx, &http.Client{}, request)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*r.operationTimeoutSeconds.Delete)
defer cancel()

resp, respBody, err := pkgHttp.HttpRequest(timeoutCtx, &http.Client{}, request)
if err != nil {
return gatewayerrors.NewFrom(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/gateway/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewGateway(ctx context.Context, sgConfig *cfg.SparkGatewayConfig, sparkMana
ginRouter := gin.Default()

//Repos
sparkManagerRepo, err := repository.NewSparkManagerRepository(sgConfig.KubeClusters, sparkManagerHostnameTemplate, sgConfig.SparkManagerPort, sgConfig.DebugPorts)
sparkManagerRepo, err := repository.NewSparkManagerRepository(sgConfig.KubeClusters, sparkManagerHostnameTemplate, sgConfig.SparkManagerPort, sgConfig.DebugPorts, sgConfig.GatewayConfig.OperationTimeoutSeconds)
if err != nil {
return nil, fmt.Errorf("could not create SparkManagerRespository: %w", err)
}
Expand Down
19 changes: 14 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/file"
Expand Down Expand Up @@ -108,12 +109,20 @@ type MiddlewareDefinition struct {
Conf map[string]any `koanf:"conf"`
}

type OperationTimeoutSeconds struct {
List time.Duration `koanf:"list"`
Get time.Duration `koanf:"get"`
Create time.Duration `koanf:"create"`
Delete time.Duration `koanf:"delete"`
}

type GatewayConfig struct {
GatewayApiVersion string `koanf:"gatewayApiVersion"`
GatewayPort string `koanf:"gatewayPort"`
Middleware []MiddlewareDefinition `koanf:"middleware"`
StatusUrlTemplates model.StatusUrlTemplates `koanf:"statusUrlTemplates"`
EnableSwaggerUI bool `koanf:"enableSwaggerUI"`
GatewayApiVersion string `koanf:"gatewayApiVersion"`
GatewayPort string `koanf:"gatewayPort"`
Middleware []MiddlewareDefinition `koanf:"middleware"`
StatusUrlTemplates model.StatusUrlTemplates `koanf:"statusUrlTemplates"`
EnableSwaggerUI bool `koanf:"enableSwaggerUI"`
OperationTimeoutSeconds OperationTimeoutSeconds `koanf:"operationTimeoutSeconds"`
}

func (g *GatewayConfig) Key() string {
Expand Down
Loading