Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions api/restHandler/PipelineTriggerRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type PipelineTriggerRestHandler interface {
StartStopApp(w http.ResponseWriter, r *http.Request)
StartStopDeploymentGroup(w http.ResponseWriter, r *http.Request)
GetAllLatestDeploymentConfiguration(w http.ResponseWriter, r *http.Request)
RotatePods(w http.ResponseWriter, r *http.Request)
}

type PipelineTriggerRestHandlerImpl struct {
Expand Down Expand Up @@ -138,6 +139,48 @@ func (handler PipelineTriggerRestHandlerImpl) OverrideConfig(w http.ResponseWrit
common.WriteJsonResp(w, err, res, http.StatusOK)
}

func (handler PipelineTriggerRestHandlerImpl) RotatePods(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
userId, err := handler.userAuthService.GetLoggedInUser(r)
if userId == 0 || err != nil {
common.WriteJsonResp(w, err, "Unauthorized User", http.StatusUnauthorized)
return
}
var podRotateRequest pipeline.PodRotateRequest
err = decoder.Decode(&podRotateRequest)
if err != nil {
handler.logger.Errorw("request err, RotatePods", "err", err, "payload", podRotateRequest)
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
podRotateRequest.UserId = userId
handler.logger.Infow("request payload, RotatePods", "err", err, "payload", podRotateRequest)
err = handler.validator.Struct(podRotateRequest)
if err != nil {
handler.logger.Errorw("validation err, RotatePods", "err", err, "payload", podRotateRequest)
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
token := r.Header.Get("token")
object := handler.enforcerUtil.GetAppRBACNameByAppId(podRotateRequest.AppId)
if ok := handler.enforcer.Enforce(token, casbin.ResourceApplications, casbin.ActionTrigger, object); !ok {
common.WriteJsonResp(w, fmt.Errorf("unauthorized user"), "Unauthorized User", http.StatusForbidden)
return
}
object = handler.enforcerUtil.GetEnvRBACNameByAppId(podRotateRequest.AppId, podRotateRequest.EnvironmentId)
if ok := handler.enforcer.Enforce(token, casbin.ResourceEnvironment, casbin.ActionTrigger, object); !ok {
common.WriteJsonResp(w, fmt.Errorf("unauthorized user"), "Unauthorized User", http.StatusForbidden)
return
}
rotatePodResponse, err := handler.workflowDagExecutor.RotatePods(r.Context(), &podRotateRequest)
if err != nil {
handler.logger.Errorw("service err, RotatePods", "err", err, "payload", podRotateRequest)
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}
common.WriteJsonResp(w, nil, rotatePodResponse, http.StatusOK)
}

func (handler PipelineTriggerRestHandlerImpl) StartStopApp(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
userId, err := handler.userAuthService.GetLoggedInUser(r)
Expand Down
1 change: 1 addition & 0 deletions api/router/PipelineTriggerRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type PipelineTriggerRouterImpl struct {
func (router PipelineTriggerRouterImpl) initPipelineTriggerRouter(pipelineTriggerRouter *mux.Router) {
pipelineTriggerRouter.Path("/cd-pipeline/trigger").HandlerFunc(router.restHandler.OverrideConfig).Methods("POST")
pipelineTriggerRouter.Path("/update-release-status").HandlerFunc(router.restHandler.ReleaseStatusUpdate).Methods("POST")
pipelineTriggerRouter.Path("/rotate-pods").HandlerFunc(router.restHandler.RotatePods).Methods("POST")
pipelineTriggerRouter.Path("/stop-start-app").HandlerFunc(router.restHandler.StartStopApp).Methods("POST")
pipelineTriggerRouter.Path("/stop-start-dg").HandlerFunc(router.restHandler.StartStopDeploymentGroup).Methods("POST")
pipelineTriggerRouter.Path("/release/").
Expand Down
11 changes: 8 additions & 3 deletions client/k8s/application/Application.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type K8sClientService interface {
GetApiResources(restConfig *rest.Config, includeOnlyVerb string) ([]*K8sApiResource, error)
GetResourceList(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean) (*ResourceListResponse, bool, error)
ApplyResource(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean, manifest string) (*ManifestResponse, error)
PatchResource(ctx context.Context, restConfig *rest.Config, pt types.PatchType, request *K8sRequestBean, manifest string) (*ManifestResponse, error)
}

type K8sClientServiceImpl struct {
Expand Down Expand Up @@ -416,7 +417,7 @@ func (impl K8sClientServiceImpl) GetResourceList(ctx context.Context, restConfig
return &ResourceListResponse{*resp}, namespaced, nil
}

func (impl K8sClientServiceImpl) ApplyResource(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean, manifest string) (*ManifestResponse, error) {
func (impl K8sClientServiceImpl) PatchResource(ctx context.Context, restConfig *rest.Config, pt types.PatchType, request *K8sRequestBean, manifest string) (*ManifestResponse, error) {
resourceIf, namespaced, err := impl.GetResourceIf(restConfig, request)
if err != nil {
impl.logger.Errorw("error in getting dynamic interface for resource", "err", err)
Expand All @@ -425,13 +426,17 @@ func (impl K8sClientServiceImpl) ApplyResource(ctx context.Context, restConfig *
resourceIdentifier := request.ResourceIdentifier
var resp *unstructured.Unstructured
if len(resourceIdentifier.Namespace) > 0 && namespaced {
resp, err = resourceIf.Namespace(resourceIdentifier.Namespace).Patch(ctx, resourceIdentifier.Name, types.StrategicMergePatchType, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
resp, err = resourceIf.Namespace(resourceIdentifier.Namespace).Patch(ctx, resourceIdentifier.Name, pt, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
} else {
resp, err = resourceIf.Patch(ctx, resourceIdentifier.Name, types.StrategicMergePatchType, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
resp, err = resourceIf.Patch(ctx, resourceIdentifier.Name, pt, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
}
if err != nil {
impl.logger.Errorw("error in applying resource", "err", err)
return nil, err
}
return &ManifestResponse{*resp}, nil
}

func (impl K8sClientServiceImpl) ApplyResource(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean, manifest string) (*ManifestResponse, error) {
return impl.PatchResource(ctx, restConfig, types.StrategicMergePatchType, request, manifest)
}
41 changes: 40 additions & 1 deletion pkg/pipeline/WorkflowDagExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"github.com/argoproj/gitops-engine/pkg/health"
blob_storage "github.com/devtron-labs/common-lib/blob-storage"
gitSensorClient "github.com/devtron-labs/devtron/client/gitSensor"
"github.com/devtron-labs/devtron/client/k8s/application"
"github.com/devtron-labs/devtron/pkg/app/status"
util4 "github.com/devtron-labs/devtron/util"
"github.com/devtron-labs/devtron/util/argo"
"github.com/devtron-labs/devtron/util/k8s"
"go.opentelemetry.io/otel"
"strconv"
"strings"
Expand Down Expand Up @@ -71,6 +73,7 @@ type WorkflowDagExecutor interface {
TriggerBulkDeploymentAsync(requests []*BulkTriggerRequest, UserId int32) (interface{}, error)
StopStartApp(stopRequest *StopAppRequest, ctx context.Context) (int, error)
TriggerBulkHibernateAsync(request StopDeploymentGroupRequest, ctx context.Context) (interface{}, error)
RotatePods(ctx context.Context, podRotateRequest *PodRotateRequest) (*k8s.RotatePodResponse, error)
}

type WorkflowDagExecutorImpl struct {
Expand Down Expand Up @@ -105,6 +108,7 @@ type WorkflowDagExecutorImpl struct {
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
appLabelRepository pipelineConfig.AppLabelRepository
gitSensorGrpcClient gitSensorClient.Client
k8sApplicationService k8s.K8sApplicationService
}

const (
Expand Down Expand Up @@ -169,7 +173,8 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
pipelineStatusTimelineService status.PipelineStatusTimelineService,
CiTemplateRepository pipelineConfig.CiTemplateRepository,
ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
appLabelRepository pipelineConfig.AppLabelRepository, gitSensorGrpcClient gitSensorClient.Client) *WorkflowDagExecutorImpl {
appLabelRepository pipelineConfig.AppLabelRepository, gitSensorGrpcClient gitSensorClient.Client,
k8sApplicationService k8s.K8sApplicationService) *WorkflowDagExecutorImpl {
wde := &WorkflowDagExecutorImpl{logger: Logger,
pipelineRepository: pipelineRepository,
cdWorkflowRepository: cdWorkflowRepository,
Expand Down Expand Up @@ -201,6 +206,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
ciWorkflowRepository: ciWorkflowRepository,
appLabelRepository: appLabelRepository,
gitSensorGrpcClient: gitSensorGrpcClient,
k8sApplicationService: k8sApplicationService,
}
err := wde.Subscribe()
if err != nil {
Expand Down Expand Up @@ -1268,6 +1274,39 @@ type StopDeploymentGroupRequest struct {
RequestType RequestType `json:"requestType" validate:"oneof=START STOP"`
}

type PodRotateRequest struct {
AppId int `json:"appId" validate:"required"`
EnvironmentId int `json:"environmentId" validate:"required"`
UserId int32 `json:"-"`
ResourceIdentifiers []application.ResourceIdentifier `json:"resources" validate:"required"`
}

func (impl *WorkflowDagExecutorImpl) RotatePods(ctx context.Context, podRotateRequest *PodRotateRequest) (*k8s.RotatePodResponse, error) {
impl.logger.Infow("rotate pod request", "payload", podRotateRequest)
//extract cluster id and namespace from env id
environmentId := podRotateRequest.EnvironmentId
environment, err := impl.envRepository.FindById(environmentId)
if err != nil {
impl.logger.Errorw("error occurred while fetching env details", "envId", environmentId, "err", err)
return nil, err
}
var resourceIdentifiers []application.ResourceIdentifier
for _, resourceIdentifier := range podRotateRequest.ResourceIdentifiers {
resourceIdentifier.Namespace = environment.Namespace
resourceIdentifiers = append(resourceIdentifiers, resourceIdentifier)
}
rotatePodRequest := &k8s.RotatePodRequest{
ClusterId: environment.ClusterId,
Resources: resourceIdentifiers,
}
response, err := impl.k8sApplicationService.RotatePods(ctx, rotatePodRequest)
if err != nil {
return nil, err
}
//TODO KB: make entry in cd workflow runner
return response, nil
}

func (impl *WorkflowDagExecutorImpl) StopStartApp(stopRequest *StopAppRequest, ctx context.Context) (int, error) {
pipelines, err := impl.pipelineRepository.FindActiveByAppIdAndEnvironmentId(stopRequest.AppId, stopRequest.EnvironmentId)
if err != nil {
Expand Down
79 changes: 79 additions & 0 deletions specs/k8s_apis-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,30 @@ paths:
description: app list
items:
$ref: '#/components/schemas/ClusterResourceListResponse'
/orchestrator/k8s/resources/rotate:
post:
description: this api will be used to rotate pods for provided resources
parameters:
- in: query
name: appId
description: app id
required: true
schema:
type: string
requestBody:
description: json as request body
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/RotatePodRequest'
responses:
'200':
description: response in array of each resource
content:
application/json:
schema:
$ref: "#/components/schemas/RotatePodResponse"
/orchestrator/k8s/resources/apply:
post:
description: this api will be used to apply the resources in cluster
Expand Down Expand Up @@ -544,6 +568,61 @@ components:
header-name:
type: string
description: each object from data key contains the objects keys length is equal to headers length
RotatePodRequest:
type: object
properties:
clusterId:
type: number
description: cluster Id
example: 1
nullable: false
resources:
type: array
items:
type: object
properties:
groupVersionKind:
type: object
properties:
Group:
type: string
Version:
type: string
Kind:
type: string
namespace:
type: string
name:
type: string
required:
- name
RotatePodResponse:
type: object
properties:
containsError:
type: boolean
description: contains error
example: true
responses:
type: array
items:
type: object
properties:
groupVersionKind:
type: object
properties:
Group:
type: string
Version:
type: string
Kind:
type: string
namespace:
type: string
name:
type: string
errorResponse:
type: string
ApplyResourcesRequest:
type: object
properties:
Expand Down
43 changes: 43 additions & 0 deletions util/k8s/k8sApplicationRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type K8sApplicationRestHandler interface {
GetAllApiResources(w http.ResponseWriter, r *http.Request)
GetResourceList(w http.ResponseWriter, r *http.Request)
ApplyResources(w http.ResponseWriter, r *http.Request)
RotatePod(w http.ResponseWriter, r *http.Request)
}

type K8sApplicationRestHandlerImpl struct {
Expand Down Expand Up @@ -73,6 +74,48 @@ func NewK8sApplicationRestHandlerImpl(logger *zap.SugaredLogger,
}
}

func (handler *K8sApplicationRestHandlerImpl) RotatePod(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
appIdString := vars["appId"]
if appIdString == "" {
common.WriteJsonResp(w, fmt.Errorf("empty appid in request"), nil, http.StatusBadRequest)
return
}
decoder := json.NewDecoder(r.Body)
podRotateRequest := &RotatePodRequest{}
err := decoder.Decode(podRotateRequest)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}
appIdentifier, err := handler.helmAppService.DecodeAppId(appIdString)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
return
}

// RBAC enforcer applying
rbacObject, rbacObject2 := handler.enforcerUtilHelm.GetHelmObjectByClusterIdNamespaceAndAppName(appIdentifier.ClusterId, appIdentifier.Namespace, appIdentifier.ReleaseName)
token := r.Header.Get("token")
ok := handler.enforcer.Enforce(token, casbin.ResourceHelmApp, casbin.ActionUpdate, rbacObject) || handler.enforcer.Enforce(token, casbin.ResourceHelmApp, casbin.ActionUpdate, rbacObject2)
if !ok {
common.WriteJsonResp(w, errors.New("unauthorized"), nil, http.StatusForbidden)
return
}
//RBAC enforcer Ends
handler.logger.Infow("rotate pod request", "payload", podRotateRequest)
rotatePodRequest := &RotatePodRequest{
ClusterId: appIdentifier.ClusterId,
Resources: podRotateRequest.Resources,
}
response, err := handler.k8sApplicationService.RotatePods(r.Context(), rotatePodRequest)
if err != nil {
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
return
}
common.WriteJsonResp(w, nil, response, http.StatusOK)
}

func (handler *K8sApplicationRestHandlerImpl) GetResource(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var request ResourceRequestBean
Expand Down
3 changes: 3 additions & 0 deletions util/k8s/k8sApplicationRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func NewK8sApplicationRouterImpl(k8sApplicationRestHandler K8sApplicationRestHan

func (impl *K8sApplicationRouterImpl) InitK8sApplicationRouter(k8sAppRouter *mux.Router) {

k8sAppRouter.Path("/resource/rotate").Queries("appId", "{appId}").
HandlerFunc(impl.k8sApplicationRestHandler.RotatePod).Methods("POST")

k8sAppRouter.Path("/resource/urls").Queries("appId", "{appId}").
HandlerFunc(impl.k8sApplicationRestHandler.GetHostUrlsByBatch).Methods("GET")

Expand Down
Loading