Skip to content

Commit 4c730f8

Browse files
feat: Rotate pods feature (#3420)
* init commit for devtron apps with rbac * pod rotation func for all supported types * rotate pod APIs exposed * api spec commit * logging added * deployemnt-strategy-api * making constant --------- Co-authored-by: Ashish-devtron <[email protected]>
1 parent 5283e1c commit 4c730f8

File tree

13 files changed

+340
-5
lines changed

13 files changed

+340
-5
lines changed

api/restHandler/PipelineTriggerRestHandler.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type PipelineTriggerRestHandler interface {
4747
StartStopApp(w http.ResponseWriter, r *http.Request)
4848
StartStopDeploymentGroup(w http.ResponseWriter, r *http.Request)
4949
GetAllLatestDeploymentConfiguration(w http.ResponseWriter, r *http.Request)
50+
RotatePods(w http.ResponseWriter, r *http.Request)
5051
}
5152

5253
type PipelineTriggerRestHandlerImpl struct {
@@ -138,6 +139,48 @@ func (handler PipelineTriggerRestHandlerImpl) OverrideConfig(w http.ResponseWrit
138139
common.WriteJsonResp(w, err, res, http.StatusOK)
139140
}
140141

142+
func (handler PipelineTriggerRestHandlerImpl) RotatePods(w http.ResponseWriter, r *http.Request) {
143+
decoder := json.NewDecoder(r.Body)
144+
userId, err := handler.userAuthService.GetLoggedInUser(r)
145+
if userId == 0 || err != nil {
146+
common.WriteJsonResp(w, err, "Unauthorized User", http.StatusUnauthorized)
147+
return
148+
}
149+
var podRotateRequest pipeline.PodRotateRequest
150+
err = decoder.Decode(&podRotateRequest)
151+
if err != nil {
152+
handler.logger.Errorw("request err, RotatePods", "err", err, "payload", podRotateRequest)
153+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
154+
return
155+
}
156+
podRotateRequest.UserId = userId
157+
handler.logger.Infow("request payload, RotatePods", "err", err, "payload", podRotateRequest)
158+
err = handler.validator.Struct(podRotateRequest)
159+
if err != nil {
160+
handler.logger.Errorw("validation err, RotatePods", "err", err, "payload", podRotateRequest)
161+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
162+
return
163+
}
164+
token := r.Header.Get("token")
165+
object := handler.enforcerUtil.GetAppRBACNameByAppId(podRotateRequest.AppId)
166+
if ok := handler.enforcer.Enforce(token, casbin.ResourceApplications, casbin.ActionTrigger, object); !ok {
167+
common.WriteJsonResp(w, fmt.Errorf("unauthorized user"), "Unauthorized User", http.StatusForbidden)
168+
return
169+
}
170+
object = handler.enforcerUtil.GetEnvRBACNameByAppId(podRotateRequest.AppId, podRotateRequest.EnvironmentId)
171+
if ok := handler.enforcer.Enforce(token, casbin.ResourceEnvironment, casbin.ActionTrigger, object); !ok {
172+
common.WriteJsonResp(w, fmt.Errorf("unauthorized user"), "Unauthorized User", http.StatusForbidden)
173+
return
174+
}
175+
rotatePodResponse, err := handler.workflowDagExecutor.RotatePods(r.Context(), &podRotateRequest)
176+
if err != nil {
177+
handler.logger.Errorw("service err, RotatePods", "err", err, "payload", podRotateRequest)
178+
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
179+
return
180+
}
181+
common.WriteJsonResp(w, nil, rotatePodResponse, http.StatusOK)
182+
}
183+
141184
func (handler PipelineTriggerRestHandlerImpl) StartStopApp(w http.ResponseWriter, r *http.Request) {
142185
decoder := json.NewDecoder(r.Body)
143186
userId, err := handler.userAuthService.GetLoggedInUser(r)

api/restHandler/app/DeploymentPipelineRestHandler.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type DevtronAppDeploymentConfigRestHandler interface {
5959
UpdateAppOverride(w http.ResponseWriter, r *http.Request)
6060
GetConfigmapSecretsForDeploymentStages(w http.ResponseWriter, r *http.Request)
6161
GetDeploymentPipelineStrategy(w http.ResponseWriter, r *http.Request)
62+
GetDefaultDeploymentPipelineStrategy(w http.ResponseWriter, r *http.Request)
6263

6364
AppMetricsEnableDisable(w http.ResponseWriter, r *http.Request)
6465
EnvMetricsEnableDisable(w http.ResponseWriter, r *http.Request)
@@ -1857,6 +1858,37 @@ func (handler PipelineConfigRestHandlerImpl) GetDeploymentPipelineStrategy(w htt
18571858

18581859
common.WriteJsonResp(w, err, result, http.StatusOK)
18591860
}
1861+
func (handler PipelineConfigRestHandlerImpl) GetDefaultDeploymentPipelineStrategy(w http.ResponseWriter, r *http.Request) {
1862+
token := r.Header.Get("token")
1863+
vars := mux.Vars(r)
1864+
appId, err := strconv.Atoi(vars["appId"])
1865+
if err != nil {
1866+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
1867+
return
1868+
}
1869+
envId, err := strconv.Atoi(vars["envId"])
1870+
if err != nil {
1871+
common.WriteJsonResp(w, err, nil, http.StatusBadRequest)
1872+
return
1873+
}
1874+
handler.Logger.Infow("request payload, GetDefaultDeploymentPipelineStrategy", "appId", appId, "envId", envId)
1875+
//RBAC
1876+
object := handler.enforcerUtil.GetAppRBACNameByAppId(appId)
1877+
if ok := handler.enforcer.Enforce(token, casbin.ResourceApplications, casbin.ActionGet, object); !ok {
1878+
common.WriteJsonResp(w, err, "Unauthorized User", http.StatusForbidden)
1879+
return
1880+
}
1881+
//RBAC
1882+
1883+
result, err := handler.pipelineBuilder.FetchDefaultCDPipelineStrategy(appId, envId)
1884+
if err != nil {
1885+
handler.Logger.Errorw("service err, GetDefaultDeploymentPipelineStrategy", "err", err, "appId", appId)
1886+
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
1887+
return
1888+
}
1889+
1890+
common.WriteJsonResp(w, err, result, http.StatusOK)
1891+
}
18601892

18611893
func (handler PipelineConfigRestHandlerImpl) EnvConfigOverrideCreateNamespace(w http.ResponseWriter, r *http.Request) {
18621894
userId, err := handler.userAuthService.GetLoggedInUser(r)

api/router/PipelineConfigRouter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func (router PipelineConfigRouterImpl) initPipelineConfigRouter(configRouter *mu
119119
configRouter.Path("/{appId}/autocomplete/docker").HandlerFunc(router.restHandler.DockerListAutocomplete).Methods("GET")
120120
configRouter.Path("/{appId}/autocomplete/team").HandlerFunc(router.restHandler.TeamListAutocomplete).Methods("GET")
121121

122+
configRouter.Path("/cd-pipeline/defaultStrategy/{appId}/{envId}").HandlerFunc(router.restHandler.GetDefaultDeploymentPipelineStrategy).Methods("GET")
122123
configRouter.Path("/cd-pipeline/{appId}/{envId}/{pipelineId}").HandlerFunc(router.restHandler.IsReadyToTrigger).Methods("GET")
123124
configRouter.Path("/cd-pipeline/strategies/{appId}").HandlerFunc(router.restHandler.GetDeploymentPipelineStrategy).Methods("GET")
124125

api/router/PipelineTriggerRouter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type PipelineTriggerRouterImpl struct {
5959
func (router PipelineTriggerRouterImpl) initPipelineTriggerRouter(pipelineTriggerRouter *mux.Router) {
6060
pipelineTriggerRouter.Path("/cd-pipeline/trigger").HandlerFunc(router.restHandler.OverrideConfig).Methods("POST")
6161
pipelineTriggerRouter.Path("/update-release-status").HandlerFunc(router.restHandler.ReleaseStatusUpdate).Methods("POST")
62+
pipelineTriggerRouter.Path("/rotate-pods").HandlerFunc(router.restHandler.RotatePods).Methods("POST")
6263
pipelineTriggerRouter.Path("/stop-start-app").HandlerFunc(router.restHandler.StartStopApp).Methods("POST")
6364
pipelineTriggerRouter.Path("/stop-start-dg").HandlerFunc(router.restHandler.StartStopDeploymentGroup).Methods("POST")
6465
pipelineTriggerRouter.Path("/release/").

client/k8s/application/Application.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type K8sClientService interface {
3232
GetApiResources(restConfig *rest.Config, includeOnlyVerb string) ([]*K8sApiResource, error)
3333
GetResourceList(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean) (*ResourceListResponse, bool, error)
3434
ApplyResource(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean, manifest string) (*ManifestResponse, error)
35+
PatchResource(ctx context.Context, restConfig *rest.Config, pt types.PatchType, request *K8sRequestBean, manifest string) (*ManifestResponse, error)
3536
}
3637

3738
type K8sClientServiceImpl struct {
@@ -416,7 +417,7 @@ func (impl K8sClientServiceImpl) GetResourceList(ctx context.Context, restConfig
416417
return &ResourceListResponse{*resp}, namespaced, nil
417418
}
418419

419-
func (impl K8sClientServiceImpl) ApplyResource(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean, manifest string) (*ManifestResponse, error) {
420+
func (impl K8sClientServiceImpl) PatchResource(ctx context.Context, restConfig *rest.Config, pt types.PatchType, request *K8sRequestBean, manifest string) (*ManifestResponse, error) {
420421
resourceIf, namespaced, err := impl.GetResourceIf(restConfig, request)
421422
if err != nil {
422423
impl.logger.Errorw("error in getting dynamic interface for resource", "err", err)
@@ -425,13 +426,17 @@ func (impl K8sClientServiceImpl) ApplyResource(ctx context.Context, restConfig *
425426
resourceIdentifier := request.ResourceIdentifier
426427
var resp *unstructured.Unstructured
427428
if len(resourceIdentifier.Namespace) > 0 && namespaced {
428-
resp, err = resourceIf.Namespace(resourceIdentifier.Namespace).Patch(ctx, resourceIdentifier.Name, types.StrategicMergePatchType, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
429+
resp, err = resourceIf.Namespace(resourceIdentifier.Namespace).Patch(ctx, resourceIdentifier.Name, pt, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
429430
} else {
430-
resp, err = resourceIf.Patch(ctx, resourceIdentifier.Name, types.StrategicMergePatchType, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
431+
resp, err = resourceIf.Patch(ctx, resourceIdentifier.Name, pt, []byte(manifest), metav1.PatchOptions{FieldManager: "patch"})
431432
}
432433
if err != nil {
433434
impl.logger.Errorw("error in applying resource", "err", err)
434435
return nil, err
435436
}
436437
return &ManifestResponse{*resp}, nil
437438
}
439+
440+
func (impl K8sClientServiceImpl) ApplyResource(ctx context.Context, restConfig *rest.Config, request *K8sRequestBean, manifest string) (*ManifestResponse, error) {
441+
return impl.PatchResource(ctx, restConfig, types.StrategicMergePatchType, request, manifest)
442+
}

internal/util/K8sUtilBean.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const K8sClusterResourceCronJobKind = "CronJob"
3333
const V1VERSION = "v1"
3434
const BatchGroup = "batch"
3535
const AppsGroup = "apps"
36+
const RestartingNotSupported = "restarting not supported"
3637

3738
var KindVsChildrenGvk = map[string][]schema.GroupVersionKind{
3839
kube.DeploymentKind: append(make([]schema.GroupVersionKind, 0), schema.GroupVersionKind{Group: AppsGroup, Version: V1VERSION, Kind: kube.ReplicaSetKind}, schema.GroupVersionKind{Version: V1VERSION, Kind: kube.PodKind}),

pkg/pipeline/PipelineBuilder.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ type PipelineBuilder interface {
127127
GetCiPipelineMin(appId int) ([]*bean.CiPipelineMin, error)
128128

129129
FetchCDPipelineStrategy(appId int) (PipelineStrategiesResponse, error)
130+
FetchDefaultCDPipelineStrategy(appId int, envId int) (PipelineStrategy, error)
130131
GetCdPipelineById(pipelineId int) (cdPipeline *bean.CDPipelineConfigObject, err error)
131132

132133
FetchConfigmapSecretsForCdStages(appId, envId, cdPipelineId int) (ConfigMapSecretsResponse, error)
@@ -3611,6 +3612,23 @@ func (impl PipelineBuilderImpl) FetchCDPipelineStrategy(appId int) (PipelineStra
36113612
return pipelineStrategiesResponse, nil
36123613
}
36133614

3615+
func (impl PipelineBuilderImpl) FetchDefaultCDPipelineStrategy(appId int, envId int) (PipelineStrategy, error) {
3616+
pipelineStrategy := PipelineStrategy{}
3617+
cdPipelines, err := impl.ciCdPipelineOrchestrator.GetCdPipelinesForAppAndEnv(appId, envId)
3618+
if err != nil || (cdPipelines.Pipelines) == nil || len(cdPipelines.Pipelines) == 0 {
3619+
return pipelineStrategy, err
3620+
}
3621+
cdPipelineId := cdPipelines.Pipelines[0].Id
3622+
3623+
cdPipeline, err := impl.GetCdPipelineById(cdPipelineId)
3624+
if err != nil {
3625+
return pipelineStrategy, nil
3626+
}
3627+
pipelineStrategy.DeploymentTemplate = cdPipeline.DeploymentTemplate
3628+
pipelineStrategy.Default = true
3629+
return pipelineStrategy, nil
3630+
}
3631+
36143632
type PipelineStrategiesResponse struct {
36153633
PipelineStrategy []PipelineStrategy `json:"pipelineStrategy"`
36163634
}

pkg/pipeline/WorkflowDagExecutor.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ import (
2424
"github.com/argoproj/gitops-engine/pkg/health"
2525
blob_storage "github.com/devtron-labs/common-lib/blob-storage"
2626
gitSensorClient "github.com/devtron-labs/devtron/client/gitSensor"
27+
"github.com/devtron-labs/devtron/client/k8s/application"
2728
"github.com/devtron-labs/devtron/pkg/app/status"
2829
util4 "github.com/devtron-labs/devtron/util"
2930
"github.com/devtron-labs/devtron/util/argo"
31+
"github.com/devtron-labs/devtron/util/k8s"
3032
"go.opentelemetry.io/otel"
3133
"strconv"
3234
"strings"
@@ -71,6 +73,7 @@ type WorkflowDagExecutor interface {
7173
TriggerBulkDeploymentAsync(requests []*BulkTriggerRequest, UserId int32) (interface{}, error)
7274
StopStartApp(stopRequest *StopAppRequest, ctx context.Context) (int, error)
7375
TriggerBulkHibernateAsync(request StopDeploymentGroupRequest, ctx context.Context) (interface{}, error)
76+
RotatePods(ctx context.Context, podRotateRequest *PodRotateRequest) (*k8s.RotatePodResponse, error)
7477
}
7578

7679
type WorkflowDagExecutorImpl struct {
@@ -105,6 +108,7 @@ type WorkflowDagExecutorImpl struct {
105108
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
106109
appLabelRepository pipelineConfig.AppLabelRepository
107110
gitSensorGrpcClient gitSensorClient.Client
111+
k8sApplicationService k8s.K8sApplicationService
108112
}
109113

110114
const (
@@ -169,7 +173,8 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
169173
pipelineStatusTimelineService status.PipelineStatusTimelineService,
170174
CiTemplateRepository pipelineConfig.CiTemplateRepository,
171175
ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
172-
appLabelRepository pipelineConfig.AppLabelRepository, gitSensorGrpcClient gitSensorClient.Client) *WorkflowDagExecutorImpl {
176+
appLabelRepository pipelineConfig.AppLabelRepository, gitSensorGrpcClient gitSensorClient.Client,
177+
k8sApplicationService k8s.K8sApplicationService) *WorkflowDagExecutorImpl {
173178
wde := &WorkflowDagExecutorImpl{logger: Logger,
174179
pipelineRepository: pipelineRepository,
175180
cdWorkflowRepository: cdWorkflowRepository,
@@ -201,6 +206,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
201206
ciWorkflowRepository: ciWorkflowRepository,
202207
appLabelRepository: appLabelRepository,
203208
gitSensorGrpcClient: gitSensorGrpcClient,
209+
k8sApplicationService: k8sApplicationService,
204210
}
205211
err := wde.Subscribe()
206212
if err != nil {
@@ -1268,6 +1274,39 @@ type StopDeploymentGroupRequest struct {
12681274
RequestType RequestType `json:"requestType" validate:"oneof=START STOP"`
12691275
}
12701276

1277+
type PodRotateRequest struct {
1278+
AppId int `json:"appId" validate:"required"`
1279+
EnvironmentId int `json:"environmentId" validate:"required"`
1280+
UserId int32 `json:"-"`
1281+
ResourceIdentifiers []application.ResourceIdentifier `json:"resources" validate:"required"`
1282+
}
1283+
1284+
func (impl *WorkflowDagExecutorImpl) RotatePods(ctx context.Context, podRotateRequest *PodRotateRequest) (*k8s.RotatePodResponse, error) {
1285+
impl.logger.Infow("rotate pod request", "payload", podRotateRequest)
1286+
//extract cluster id and namespace from env id
1287+
environmentId := podRotateRequest.EnvironmentId
1288+
environment, err := impl.envRepository.FindById(environmentId)
1289+
if err != nil {
1290+
impl.logger.Errorw("error occurred while fetching env details", "envId", environmentId, "err", err)
1291+
return nil, err
1292+
}
1293+
var resourceIdentifiers []application.ResourceIdentifier
1294+
for _, resourceIdentifier := range podRotateRequest.ResourceIdentifiers {
1295+
resourceIdentifier.Namespace = environment.Namespace
1296+
resourceIdentifiers = append(resourceIdentifiers, resourceIdentifier)
1297+
}
1298+
rotatePodRequest := &k8s.RotatePodRequest{
1299+
ClusterId: environment.ClusterId,
1300+
Resources: resourceIdentifiers,
1301+
}
1302+
response, err := impl.k8sApplicationService.RotatePods(ctx, rotatePodRequest)
1303+
if err != nil {
1304+
return nil, err
1305+
}
1306+
//TODO KB: make entry in cd workflow runner
1307+
return response, nil
1308+
}
1309+
12711310
func (impl *WorkflowDagExecutorImpl) StopStartApp(stopRequest *StopAppRequest, ctx context.Context) (int, error) {
12721311
pipelines, err := impl.pipelineRepository.FindActiveByAppIdAndEnvironmentId(stopRequest.AppId, stopRequest.EnvironmentId)
12731312
if err != nil {

specs/k8s_apis-spec.yaml

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,30 @@ paths:
264264
description: app list
265265
items:
266266
$ref: '#/components/schemas/ClusterResourceListResponse'
267+
/orchestrator/k8s/resources/rotate:
268+
post:
269+
description: this api will be used to rotate pods for provided resources
270+
parameters:
271+
- in: query
272+
name: appId
273+
description: app id
274+
required: true
275+
schema:
276+
type: string
277+
requestBody:
278+
description: json as request body
279+
required: true
280+
content:
281+
application/json:
282+
schema:
283+
$ref: '#/components/schemas/RotatePodRequest'
284+
responses:
285+
'200':
286+
description: response in array of each resource
287+
content:
288+
application/json:
289+
schema:
290+
$ref: "#/components/schemas/RotatePodResponse"
267291
/orchestrator/k8s/resources/apply:
268292
post:
269293
description: this api will be used to apply the resources in cluster
@@ -544,6 +568,61 @@ components:
544568
header-name:
545569
type: string
546570
description: each object from data key contains the objects keys length is equal to headers length
571+
RotatePodRequest:
572+
type: object
573+
properties:
574+
clusterId:
575+
type: number
576+
description: cluster Id
577+
example: 1
578+
nullable: false
579+
resources:
580+
type: array
581+
items:
582+
type: object
583+
properties:
584+
groupVersionKind:
585+
type: object
586+
properties:
587+
Group:
588+
type: string
589+
Version:
590+
type: string
591+
Kind:
592+
type: string
593+
namespace:
594+
type: string
595+
name:
596+
type: string
597+
required:
598+
- name
599+
RotatePodResponse:
600+
type: object
601+
properties:
602+
containsError:
603+
type: boolean
604+
description: contains error
605+
example: true
606+
responses:
607+
type: array
608+
items:
609+
type: object
610+
properties:
611+
groupVersionKind:
612+
type: object
613+
properties:
614+
Group:
615+
type: string
616+
Version:
617+
type: string
618+
Kind:
619+
type: string
620+
namespace:
621+
type: string
622+
name:
623+
type: string
624+
errorResponse:
625+
type: string
547626
ApplyResourcesRequest:
548627
type: object
549628
properties:

0 commit comments

Comments
 (0)