Skip to content

Commit ca6e77f

Browse files
feat: add or delete CD pipelines from workflow (#4398)
* first commit * fixes * PR comments * removed enum because of cyclic dependency * adding deploymentappcreated in cd-pipeline list api * fix * dfs fix in env filter * level wise sorting * sorting wfm * cleaning up dead code * pr comments * aritfacts list fix
1 parent 631d01d commit ca6e77f

File tree

9 files changed

+128
-32
lines changed

9 files changed

+128
-32
lines changed

internal/sql/repository/appWorkflow/AppWorkflowRepository.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type AppWorkflowRepository interface {
6161
FindByCDPipelineIds(cdPipelineIds []int) ([]*AppWorkflowMapping, error)
6262
FindByWorkflowIds(workflowIds []int) ([]*AppWorkflowMapping, error)
6363
FindMappingByAppIds(appIds []int) ([]*AppWorkflowMapping, error)
64-
UpdateParentComponentDetails(tx *pg.Tx, oldComponentId int, oldComponentType string, newComponentId int, newComponentType string) error
64+
UpdateParentComponentDetails(tx *pg.Tx, oldComponentId int, oldComponentType string, newComponentId int, newComponentType string, componentIdsFilter []int) error
6565
FindWFMappingByComponent(componentType string, componentId int) (*AppWorkflowMapping, error)
6666
FindByComponentId(componentId int) ([]*AppWorkflowMapping, error)
6767
}
@@ -474,19 +474,23 @@ func (impl AppWorkflowRepositoryImpl) FindMappingByAppIds(appIds []int) ([]*AppW
474474
return appWorkflowsMapping, err
475475
}
476476

477-
func (impl AppWorkflowRepositoryImpl) UpdateParentComponentDetails(tx *pg.Tx, oldParentId int, oldParentType string, newParentId int, newParentType string) error {
477+
func (impl AppWorkflowRepositoryImpl) UpdateParentComponentDetails(tx *pg.Tx, oldParentId int, oldParentType string, newParentId int, newParentType string, componentIdFilter []int) error {
478478

479479
/*updateQuery := fmt.Sprintf(" UPDATE app_workflow_mapping "+
480480
" SET parent_type = (select type from new_app_workflow_mapping),parent_id = (select id from new_app_workflow_mapping) where parent_id = %v and parent_type='%v' and active = true", oldComponentId, oldComponentType)
481481
482482
finalQuery := withQuery + updateQuery*/
483-
_, err := tx.Model((*AppWorkflowMapping)(nil)).
483+
query := tx.Model((*AppWorkflowMapping)(nil)).
484484
Set("parent_type = ?", newParentType).
485485
Set("parent_id = ?", newParentId).
486486
Where("parent_type = ?", oldParentType).
487487
Where("parent_id = ?", oldParentId).
488-
Where("active = true").
489-
Update()
488+
Where("active = true")
489+
490+
if len(componentIdFilter) > 0 {
491+
query = query.Where("component_id in (?)", pg.In(componentIdFilter)).Where("type = ?", "CD_PIPELINE")
492+
}
493+
_, err := query.Update()
490494
return err
491495
}
492496

pkg/appClone/AppCloneService.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ func (impl *AppCloneServiceImpl) CreateWf(oldAppId, newAppId int, userId int32,
594594
if err != nil {
595595
return nil, err
596596
}
597+
597598
impl.logger.Debugw("workflow found", "wf", refAppWFs)
598599

599600
createWorkflowMappingDtoResp := CreateWorkflowMappingDto{
@@ -673,6 +674,9 @@ func (impl *AppCloneServiceImpl) createWfInstances(refWfMappings []appWorkflow.A
673674
var ciMapping []appWorkflow.AppWorkflowMappingDto
674675
var cdMappings []appWorkflow.AppWorkflowMappingDto
675676
var webhookMappings []appWorkflow.AppWorkflowMappingDto
677+
678+
refWfMappings = appWorkflow.LevelWiseSort(refWfMappings)
679+
676680
for _, appWf := range refWfMappings {
677681
if appWf.Type == appWorkflow2.CIPIPELINE {
678682
ciMapping = append(ciMapping, appWf)

pkg/appWorkflow/AppWorkflowService.go

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/devtron-labs/devtron/pkg/sql"
3333
"github.com/devtron-labs/devtron/pkg/user"
3434
bean3 "github.com/devtron-labs/devtron/pkg/user/bean"
35+
"github.com/devtron-labs/devtron/pkg/variables/utils"
3536
"github.com/devtron-labs/devtron/util/rbac"
3637
"github.com/go-pg/pg"
3738
"go.uber.org/zap"
@@ -741,7 +742,7 @@ func (impl AppWorkflowServiceImpl) FilterWorkflows(triggerViewConfig *TriggerVie
741742
continue
742743
}
743744

744-
identifierToFilteredWorkflowMapping, leafPipelines := fetchLeafPipelinesAndPopulateChildrenIdsInWorkflowMapping(workflow.AppWorkflowMappingDto)
745+
identifierToFilteredWorkflowMapping, leafPipelines, _ := processWorkflowMappingTree(workflow.AppWorkflowMappingDto)
745746

746747
identifierToFilteredWorkflowMapping = filterMappingOnFilteredCdPipelineIds(identifierToFilteredWorkflowMapping, leafPipelines, cdPipelineIdsFiltered)
747748

@@ -766,24 +767,34 @@ func extractOutFilteredWorkflowMappings(appWorkflowMappings []AppWorkflowMapping
766767
return newAppWorkflowMappingDto
767768
}
768769

769-
// fetchLeafPipelinesAndPopulateChildrenIdsInWorkflowMapping function fetches all the leaf cd pipelines and append
770-
// the children pipelineIds into ChildPipelinesIds object in AppWorkflowMappingDto and returns both object.
771-
func fetchLeafPipelinesAndPopulateChildrenIdsInWorkflowMapping(appWorkflowMappings []AppWorkflowMappingDto) (map[PipelineIdentifier]*AppWorkflowMappingDto, []*AppWorkflowMappingDto) {
770+
// processWorkflowMappingTree function processed the wf mapping array into a tree structure
771+
// returns a map of identifier to mapping, leaf nodes and the root node
772+
func processWorkflowMappingTree(appWorkflowMappings []AppWorkflowMappingDto) (map[PipelineIdentifier]*AppWorkflowMappingDto, []AppWorkflowMappingDto, *AppWorkflowMappingDto) {
772773
identifierToFilteredWorkflowMapping := make(map[PipelineIdentifier]*AppWorkflowMappingDto)
773-
leafPipelines := make([]*AppWorkflowMappingDto, 0)
774+
leafPipelines := make([]AppWorkflowMappingDto, 0)
775+
var rootPipeline *AppWorkflowMappingDto
776+
//initializing the nodes with empty children and collecting leaf
774777
for i, appWorkflowMapping := range appWorkflowMappings {
775-
if appWorkflowMapping.IsLast {
776-
leafPipelines = append(leafPipelines, &appWorkflowMappings[i])
777-
}
778+
appWorkflowMappings[i].ChildPipelinesIds = mapset.NewSet()
778779
identifierToFilteredWorkflowMapping[appWorkflowMapping.getPipelineIdentifier()] = &appWorkflowMappings[i]
779-
if appWorkflowMappings[i].ChildPipelinesIds == nil {
780-
appWorkflowMappings[i].ChildPipelinesIds = mapset.NewSet()
780+
781+
//collecting leaf pipelines
782+
if appWorkflowMapping.IsLast {
783+
leafPipelines = append(leafPipelines, appWorkflowMapping)
781784
}
782-
if appWorkflow, ok := identifierToFilteredWorkflowMapping[appWorkflowMapping.getParentPipelineIdentifier()]; ok {
783-
appWorkflow.ChildPipelinesIds.Add(appWorkflowMapping.ComponentId)
785+
}
786+
787+
for _, appWorkflowMapping := range identifierToFilteredWorkflowMapping {
788+
// populating children in parent nodes
789+
parentId := appWorkflowMapping.getParentPipelineIdentifier()
790+
componentId := appWorkflowMapping.ComponentId
791+
if parentMapping, hasParent := identifierToFilteredWorkflowMapping[parentId]; hasParent && !parentMapping.ChildPipelinesIds.Contains(componentId) {
792+
parentMapping.ChildPipelinesIds.Add(componentId)
793+
} else if !hasParent {
794+
rootPipeline = appWorkflowMapping
784795
}
785796
}
786-
return identifierToFilteredWorkflowMapping, leafPipelines
797+
return identifierToFilteredWorkflowMapping, leafPipelines, rootPipeline
787798
}
788799

789800
// filterMappingOnFilteredCdPipelineIds iterates over all leaf cd-pipelines, if that leaf cd-pipeline is present in the
@@ -792,7 +803,7 @@ func fetchLeafPipelinesAndPopulateChildrenIdsInWorkflowMapping(appWorkflowMappin
792803
// cd-pipeline from componentIdWorkflowMapping's list of AppWorkflowMappingDto and also truncate the child
793804
// cd-pipeline id present in the parent's ChildPipelinesIds object inside AppWorkflowMappingDto.
794805
func filterMappingOnFilteredCdPipelineIds(identifierToFilteredWorkflowMapping map[PipelineIdentifier]*AppWorkflowMappingDto,
795-
leafPipelines []*AppWorkflowMappingDto, cdPipelineIdsFiltered mapset.Set) map[PipelineIdentifier]*AppWorkflowMappingDto {
806+
leafPipelines []AppWorkflowMappingDto, cdPipelineIdsFiltered mapset.Set) map[PipelineIdentifier]*AppWorkflowMappingDto {
796807

797808
leafPipelineSize := len(leafPipelines)
798809
for i := 0; i < leafPipelineSize; i++ {
@@ -803,9 +814,11 @@ func filterMappingOnFilteredCdPipelineIds(identifierToFilteredWorkflowMapping ma
803814
parent := leafPipelines[i].getParentPipelineIdentifier()
804815
identifierToFilteredWorkflowMapping[parent].ChildPipelinesIds.Remove(leafPipelines[i].ComponentId)
805816
}
806-
if identifierToFilteredWorkflowMapping[leafPipelines[i].getParentPipelineIdentifier()].ChildPipelinesIds.Cardinality() == 0 {
817+
parentPipelineIdentifier := leafPipelines[i].getParentPipelineIdentifier()
818+
childPipelineIds := identifierToFilteredWorkflowMapping[parentPipelineIdentifier].ChildPipelinesIds
819+
if childPipelineIds.Cardinality() == 0 {
807820
//this means this pipeline has become leaf, so append this pipelineId in leafPipelines for further processing
808-
leafPipelines = append(leafPipelines, identifierToFilteredWorkflowMapping[leafPipelines[i].getParentPipelineIdentifier()])
821+
leafPipelines = append(leafPipelines, *identifierToFilteredWorkflowMapping[leafPipelines[i].getParentPipelineIdentifier()])
809822
leafPipelineSize += 1
810823
}
811824

@@ -849,3 +862,40 @@ func (impl AppWorkflowServiceImpl) FindAppWorkflowByCiPipelineId(ciPipelineId in
849862
return appWorkflowMapping, nil
850863

851864
}
865+
866+
// LevelWiseSort performs level wise sort for workflow mappings starting from leaves
867+
// This will break if ever the workflow mappings array break the assumption of being a DAG with one root node
868+
func LevelWiseSort(appWorkflowMappings []AppWorkflowMappingDto) []AppWorkflowMappingDto {
869+
870+
if len(appWorkflowMappings) < 2 {
871+
return appWorkflowMappings
872+
}
873+
874+
identifierToNodeMapping, _, root := processWorkflowMappingTree(appWorkflowMappings)
875+
876+
result := make([]AppWorkflowMappingDto, 0)
877+
nodesInCurrentLevel := append(make([]AppWorkflowMappingDto, 0), *root)
878+
for len(result) != len(appWorkflowMappings) {
879+
result = append(result, nodesInCurrentLevel...)
880+
childrenOfCurrentLevel := make([]AppWorkflowMappingDto, 0)
881+
for _, node := range nodesInCurrentLevel {
882+
childrenOfCurrentLevel = append(childrenOfCurrentLevel, getMappingsFromIds(identifierToNodeMapping, utils.ToIntArray(node.ChildPipelinesIds.ToSlice()))...)
883+
}
884+
// cloning slice elements
885+
nodesInCurrentLevel = append(childrenOfCurrentLevel, []AppWorkflowMappingDto{}...)
886+
}
887+
888+
return result
889+
}
890+
891+
func getMappingsFromIds(identifierToNodeMapping map[PipelineIdentifier]*AppWorkflowMappingDto, ids []int) []AppWorkflowMappingDto {
892+
result := make([]AppWorkflowMappingDto, 0)
893+
for _, id := range ids {
894+
identifier := PipelineIdentifier{
895+
PipelineType: CD_PIPELINE_TYPE,
896+
PipelineId: id,
897+
}
898+
result = append(result, *identifierToNodeMapping[identifier])
899+
}
900+
return result
901+
}

pkg/bean/app.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,8 +588,17 @@ type CDPipelineConfigObject struct {
588588
CustomTagStage *repository.PipelineStageType `json:"customTagStage"`
589589
EnableCustomTag bool `json:"enableCustomTag"`
590590
SwitchFromCiPipelineId int `json:"switchFromCiPipelineId"`
591+
CDPipelineAddType CDPipelineAddType `json:"addType"`
592+
ChildPipelineId int `json:"childPipelineId"`
591593
}
592594

595+
type CDPipelineAddType string
596+
597+
const (
598+
SEQUENTIAL CDPipelineAddType = "SEQUENTIAL"
599+
PARALLEL CDPipelineAddType = "PARALLEL"
600+
)
601+
593602
func (cdpipelineConfig *CDPipelineConfigObject) IsSwitchCiPipelineRequest() bool {
594603
return cdpipelineConfig.SwitchFromCiPipelineId > 0 && cdpipelineConfig.AppWorkflowId > 0
595604
}

pkg/pipeline/AppArtifactManager.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -864,11 +864,14 @@ func (impl *AppArtifactManagerImpl) BuildArtifactsForCdStageV2(listingFilterOpts
864864
if listingFilterOpts.ParentCdId > 0 {
865865
//TODO: check if we can fetch LastSuccessfulTriggerOnParent wfr along with last running wf
866866
parentCdWfrList, err := impl.cdWorkflowRepository.FindArtifactByPipelineIdAndRunnerType(listingFilterOpts.ParentCdId, bean.CD_WORKFLOW_TYPE_DEPLOY, 1, []string{application.Healthy, application.SUCCEEDED, application.Progressing})
867-
if err != nil || len(parentCdWfrList) == 0 {
867+
if err != nil {
868868
impl.logger.Errorw("error in getting artifact for parent cd", "parentCdPipelineId", listingFilterOpts.ParentCdId)
869869
return ciArtifacts, totalCount, err
870870
}
871-
artifactRunningOnParentCd = parentCdWfrList[0].CdWorkflow.CiArtifact.Id
871+
872+
if len(parentCdWfrList) != 0 {
873+
artifactRunningOnParentCd = parentCdWfrList[0].CdWorkflow.CiArtifact.Id
874+
}
872875
}
873876

874877
for _, artifact := range cdArtifacts {

pkg/pipeline/BuildPipelineSwitchService.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,5 +329,5 @@ func (impl *BuildPipelineSwitchServiceImpl) saveHistoryOfOverriddenTemplate(ciPi
329329
}
330330

331331
func (impl *BuildPipelineSwitchServiceImpl) updateLinkedAppWorkflowMappings(tx *pg.Tx, oldAppWorkflowMapping *appWorkflow.AppWorkflowMapping, newAppWorkflowMapping *appWorkflow.AppWorkflowMapping) error {
332-
return impl.appWorkflowRepository.UpdateParentComponentDetails(tx, oldAppWorkflowMapping.ComponentId, oldAppWorkflowMapping.Type, newAppWorkflowMapping.ComponentId, newAppWorkflowMapping.Type)
332+
return impl.appWorkflowRepository.UpdateParentComponentDetails(tx, oldAppWorkflowMapping.ComponentId, oldAppWorkflowMapping.Type, newAppWorkflowMapping.ComponentId, newAppWorkflowMapping.Type, nil)
333333
}

pkg/pipeline/CiCdPipelineOrchestrator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,6 +1778,7 @@ func (impl CiCdPipelineOrchestratorImpl) GetCdPipelinesForApp(appId int) (cdPipe
17781778
PreStageConfigMapSecretNames: preStageConfigmapSecrets,
17791779
PostStageConfigMapSecretNames: postStageConfigmapSecrets,
17801780
DeploymentAppType: dbPipeline.DeploymentAppType,
1781+
DeploymentAppCreated: dbPipeline.DeploymentAppCreated,
17811782
DeploymentAppDeleteRequest: dbPipeline.DeploymentAppDeleteRequest,
17821783
IsVirtualEnvironment: dbPipeline.Environment.IsVirtualEnvironment,
17831784
}

pkg/pipeline/DeploymentPipelineConfigService.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -639,10 +639,8 @@ func (impl *CdPipelineConfigServiceImpl) DeleteCdPipeline(pipeline *pipelineConf
639639
if err != nil && err != pg.ErrNoRows {
640640
impl.logger.Errorw("error in getting children cd details", "err", err)
641641
return deleteResponse, err
642-
} else if len(childNodes) > 0 {
643-
impl.logger.Debugw("cannot delete cd pipeline, contains children cd")
644-
return deleteResponse, fmt.Errorf("Please delete children CD pipelines before deleting this pipeline.")
645642
}
643+
646644
//getting deployment group for this pipeline
647645
deploymentGroupNames, err := impl.deploymentGroupRepository.GetNamesByAppIdAndEnvId(pipeline.EnvironmentId, pipeline.AppId)
648646
if err != nil && err != pg.ErrNoRows {
@@ -679,7 +677,7 @@ func (impl *CdPipelineConfigServiceImpl) DeleteCdPipeline(pipeline *pipelineConf
679677
impl.logger.Errorw("error in deleting workflow mapping", "err", err)
680678
return deleteResponse, err
681679
}
682-
if appWorkflowMapping.ParentType == appWorkflow.WEBHOOK {
680+
if appWorkflowMapping.ParentType == appWorkflow.WEBHOOK && len(childNodes) == 0 {
683681
childNodes, err := impl.appWorkflowRepository.FindWFCDMappingByExternalCiId(appWorkflowMapping.ParentId)
684682
if err != nil && !util.IsErrNoRows(err) {
685683
impl.logger.Errorw("error in fetching external ci", "err", err)
@@ -726,6 +724,14 @@ func (impl *CdPipelineConfigServiceImpl) DeleteCdPipeline(pipeline *pipelineConf
726724
return deleteResponse, err
727725
}
728726

727+
if len(childNodes) > 0 {
728+
err = impl.appWorkflowRepository.UpdateParentComponentDetails(tx, appWorkflowMapping.ComponentId, appWorkflowMapping.Type, appWorkflowMapping.ParentId, appWorkflowMapping.ParentType, nil)
729+
if err != nil {
730+
impl.logger.Errorw("error updating wfm for children pipelines of pipeline", "err", err, "id", appWorkflowMapping.Id)
731+
return deleteResponse, err
732+
}
733+
}
734+
729735
if pipeline.PreStageConfig != "" {
730736
err = impl.prePostCdScriptHistoryService.CreatePrePostCdScriptHistory(pipeline, tx, repository4.PRE_CD_TYPE, false, 0, time.Time{})
731737
if err != nil {
@@ -1027,6 +1033,7 @@ func (impl *CdPipelineConfigServiceImpl) GetCdPipelinesForApp(appId int) (cdPipe
10271033
RunPreStageInEnv: dbPipeline.RunPreStageInEnv,
10281034
RunPostStageInEnv: dbPipeline.RunPostStageInEnv,
10291035
DeploymentAppType: dbPipeline.DeploymentAppType,
1036+
DeploymentAppCreated: dbPipeline.DeploymentAppCreated,
10301037
ParentPipelineType: appToWorkflowMapping.ParentType,
10311038
ParentPipelineId: appToWorkflowMapping.ParentId,
10321039
DeploymentAppDeleteRequest: dbPipeline.DeploymentAppDeleteRequest,
@@ -1714,6 +1721,18 @@ func (impl *CdPipelineConfigServiceImpl) createCdPipeline(ctx context.Context, a
17141721
parentPipelineId = pipeline.SourceToNewPipelineId[pipeline.ParentPipelineId]
17151722
}
17161723
}
1724+
1725+
if pipeline.CDPipelineAddType == bean.SEQUENTIAL {
1726+
childPipelineIds := make([]int, 0)
1727+
if pipeline.ChildPipelineId > 0 {
1728+
childPipelineIds = append(childPipelineIds, pipeline.ChildPipelineId)
1729+
}
1730+
err = impl.appWorkflowRepository.UpdateParentComponentDetails(tx, parentPipelineId, parentPipelineType, pipelineId, "CD_PIPELINE", childPipelineIds)
1731+
if err != nil {
1732+
return 0, err
1733+
}
1734+
}
1735+
17171736
appWorkflowMap := &appWorkflow.AppWorkflowMapping{
17181737
AppWorkflowId: pipeline.AppWorkflowId,
17191738
ParentId: parentPipelineId,
@@ -1956,14 +1975,11 @@ func (impl *CdPipelineConfigServiceImpl) DeleteCdPipelinePartial(pipeline *pipel
19561975
deleteResponse.ClusterReachable = false
19571976
}
19581977
//getting children CD pipeline details
1959-
childNodes, err := impl.appWorkflowRepository.FindWFCDMappingByParentCDPipelineId(pipeline.Id)
19601978
if err != nil && err != pg.ErrNoRows {
19611979
impl.logger.Errorw("error in getting children cd details", "err", err)
19621980
return deleteResponse, err
1963-
} else if len(childNodes) > 0 {
1964-
impl.logger.Debugw("cannot delete cd pipeline, contains children cd")
1965-
return deleteResponse, fmt.Errorf("Please delete children CD pipelines before deleting this pipeline.")
19661981
}
1982+
19671983
//getting deployment group for this pipeline
19681984
deploymentGroupNames, err := impl.deploymentGroupRepository.GetNamesByAppIdAndEnvId(pipeline.EnvironmentId, pipeline.AppId)
19691985
if err != nil && err != pg.ErrNoRows {

pkg/variables/utils/conversion-utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ func ToStringArray(interfaceArr []interface{}) []string {
2020
return stringArr
2121
}
2222

23+
// ToIntArray converts an array of interface{} back to an array of int
24+
func ToIntArray(interfaceArr []interface{}) []int {
25+
intArr := make([]int, len(interfaceArr))
26+
for i, v := range interfaceArr {
27+
intArr[i] = v.(int)
28+
}
29+
return intArr
30+
}
31+
2332
func FilterDuplicatesInStringArray(items []string) []string {
2433
itemsSet := mapset.NewSetFromSlice(ToInterfaceArray(items))
2534
uniqueItems := ToStringArray(itemsSet.ToSlice())

0 commit comments

Comments
 (0)