Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type AppWorkflowRepository interface {
FindByTypeAndComponentId(wfId int, componentId int, componentType string) (*AppWorkflowMapping, error)
FindAllWfsHavingCdPipelinesFromSpecificEnvsOnly(envIds []int, appIds []int) ([]*AppWorkflowMapping, error)
FindCiPipelineIdsFromAppWfIds(appWfIds []int) ([]int, error)
FindChildCDIdsByParentCDPipelineId(cdPipelineId int) ([]int, error)
}

type AppWorkflowRepositoryImpl struct {
Expand Down Expand Up @@ -383,3 +384,10 @@ func (impl AppWorkflowRepositoryImpl) FindWFCDMappingByExternalCiId(externalCiId
Select()
return models, err
}

func (impl AppWorkflowRepositoryImpl) FindChildCDIdsByParentCDPipelineId(cdPipelineId int) ([]int, error) {
var ids []int
query := `select component_id from app_workflow_mapping where parent_id=? and parent_type=? and type=? and active=?;`
_, err := impl.dbConnection.Query(&ids, query, cdPipelineId, CDPIPELINE, CDPIPELINE, true)
return ids, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ func (impl PipelineRepositoryImpl) GetConnection() *pg.DB {
func (impl PipelineRepositoryImpl) FindByIdsIn(ids []int) ([]*Pipeline, error) {
var pipelines []*Pipeline
err := impl.dbConnection.Model(&pipelines).
Column("pipeline.*", "App.app_name", "Environment.environment_name").
Column("pipeline.*", "App.app_name", "Environment.environment_name", "Environment.Cluster").
Join("inner join app a on pipeline.app_id = a.id").
Join("inner join environment e on pipeline.environment_id = e.id").
Join("inner join cluster c on c.id = e.cluster_id").
Where("pipeline.id in (?)", pg.In(ids)).
Where("pipeline.deleted = false").
Select()
Expand Down
86 changes: 80 additions & 6 deletions pkg/pipeline/WorkflowDagExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,25 @@ type WorkflowDagExecutorImpl struct {
argoUserService argo.ArgoUserService
cdPipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository
CiTemplateRepository pipelineConfig.CiTemplateRepository
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
appLabelRepository pipelineConfig.AppLabelRepository
}

const (
CD_PIPELINE_ENV_NAME_KEY = "CD_PIPELINE_ENV_NAME"
CD_PIPELINE_CLUSTER_NAME_KEY = "CD_PIPELINE_CLUSTER_NAME"
GIT_COMMIT_HASH_PREFIX = "GIT_COMMIT_HASH"
GIT_SOURCE_TYPE_PREFIX = "GIT_SOURCE_TYPE"
GIT_SOURCE_VALUE_PREFIX = "GIT_SOURCE_VALUE"
GIT_SOURCE_COUNT = "GIT_SOURCE_COUNT"
APP_LABEL_KEY_PREFIX = "APP_LABEL_KEY"
APP_LABEL_VALUE_PREFIX = "APP_LABEL_VALUE"
APP_LABEL_COUNT = "APP_LABEL_COUNT"
CHILD_CD_ENV_NAME_PREFIX = "CHILD_CD_ENV_NAME"
CHILD_CD_CLUSTER_NAME_PREFIX = "CHILD_CD_CLUSTER_NAME"
CHILD_CD_COUNT = "CHILD_CD_COUNT"
)

type CiArtifactDTO struct {
Id int `json:"id"`
PipelineId int `json:"pipelineId"` //id of the ci pipeline from which this webhook was triggered
Expand Down Expand Up @@ -145,7 +162,9 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
prePostCdScriptHistoryService history2.PrePostCdScriptHistoryService,
argoUserService argo.ArgoUserService,
cdPipelineStatusTimelineRepo pipelineConfig.PipelineStatusTimelineRepository,
CiTemplateRepository pipelineConfig.CiTemplateRepository) *WorkflowDagExecutorImpl {
CiTemplateRepository pipelineConfig.CiTemplateRepository,
ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
appLabelRepository pipelineConfig.AppLabelRepository) *WorkflowDagExecutorImpl {
wde := &WorkflowDagExecutorImpl{logger: Logger,
pipelineRepository: pipelineRepository,
cdWorkflowRepository: cdWorkflowRepository,
Expand Down Expand Up @@ -173,6 +192,8 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
argoUserService: argoUserService,
cdPipelineStatusTimelineRepo: cdPipelineStatusTimelineRepo,
CiTemplateRepository: CiTemplateRepository,
ciWorkflowRepository: ciWorkflowRepository,
appLabelRepository: appLabelRepository,
}
err := util4.AddStream(wde.pubsubClient.JetStrCtxt, util4.ORCHESTRATOR_STREAM, util4.CI_RUNNER_STREAM)
if err != nil {
Expand Down Expand Up @@ -670,11 +691,54 @@ func (impl *WorkflowDagExecutorImpl) buildWFRequest(runner *pipelineConfig.CdWor
OrchestratorToken: impl.cdConfig.OrchestratorToken,
CloudProvider: impl.cdConfig.CloudProvider,
}
extraEnvVariables := make(map[string]string)
env, err := impl.envRepository.FindById(cdPipeline.EnvironmentId)
if err != nil {
impl.logger.Errorw("error in getting environment by id", "err", err)
return nil, err
}
if env != nil {
extraEnvVariables[CD_PIPELINE_ENV_NAME_KEY] = env.Name
if env.Cluster != nil {
extraEnvVariables[CD_PIPELINE_CLUSTER_NAME_KEY] = env.Cluster.ClusterName
}
}
ciWf, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByArtifactId(artifact.Id)
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in getting ciWf by artifactId", "err", err, "artifactId", artifact.Id)
return nil, err
}

if ciWf != nil && ciWf.GitTriggers != nil {
i := 1
for _, gitTrigger := range ciWf.GitTriggers {
extraEnvVariables[fmt.Sprintf("%s_%d", GIT_COMMIT_HASH_PREFIX, i)] = gitTrigger.Commit
extraEnvVariables[fmt.Sprintf("%s_%d", GIT_SOURCE_TYPE_PREFIX, i)] = string(gitTrigger.CiConfigureSourceType)
extraEnvVariables[fmt.Sprintf("%s_%d", GIT_SOURCE_VALUE_PREFIX, i)] = gitTrigger.CiConfigureSourceValue
i++
}
extraEnvVariables[GIT_SOURCE_COUNT] = strconv.Itoa(len(ciWf.GitTriggers))
}

childCdIds, err := impl.appWorkflowRepository.FindChildCDIdsByParentCDPipelineId(cdPipeline.Id)
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in getting child cdPipelineIds by parent cdPipelineId", "err", err, "parent cdPipelineId", cdPipeline.Id)
return nil, err
}
if len(childCdIds) > 0 {
childPipelines, err := impl.pipelineRepository.FindByIdsIn(childCdIds)
if err != nil {
impl.logger.Errorw("error in getting pipelines by ids", "err", err, "ids", childCdIds)
return nil, err
}
for i, childPipeline := range childPipelines {
extraEnvVariables[fmt.Sprintf("%s_%d", CHILD_CD_ENV_NAME_PREFIX, i+1)] = childPipeline.Environment.Name
extraEnvVariables[fmt.Sprintf("%s_%d", CHILD_CD_CLUSTER_NAME_PREFIX, i+1)] = childPipeline.Environment.Cluster.ClusterName
}
extraEnvVariables[CHILD_CD_COUNT] = strconv.Itoa(len(childPipelines))
}
if ciPipeline != nil && ciPipeline.Id > 0 {
extraEnvVariables := make(map[string]string)
extraEnvVariables["APP_NAME"] = ciPipeline.App.AppName
cdStageWorkflowRequest.ExtraEnvironmentVariables = extraEnvVariables
cdStageWorkflowRequest.DockerUsername = ciPipeline.CiTemplate.DockerRegistry.Username
cdStageWorkflowRequest.DockerPassword = ciPipeline.CiTemplate.DockerRegistry.Password
cdStageWorkflowRequest.AwsRegion = ciPipeline.CiTemplate.DockerRegistry.AWSRegion
Expand All @@ -689,9 +753,7 @@ func (impl *WorkflowDagExecutorImpl) buildWFRequest(runner *pipelineConfig.CdWor
if err != nil {
return nil, err
}
extraEnvVariables := make(map[string]string)
extraEnvVariables["APP_NAME"] = ciTemplate.App.AppName
cdStageWorkflowRequest.ExtraEnvironmentVariables = extraEnvVariables
cdStageWorkflowRequest.DockerUsername = ciTemplate.DockerRegistry.Username
cdStageWorkflowRequest.DockerPassword = ciTemplate.DockerRegistry.Password
cdStageWorkflowRequest.AwsRegion = ciTemplate.DockerRegistry.AWSRegion
Expand All @@ -701,8 +763,20 @@ func (impl *WorkflowDagExecutorImpl) buildWFRequest(runner *pipelineConfig.CdWor
cdStageWorkflowRequest.SecretKey = ciTemplate.DockerRegistry.AWSSecretAccessKey
cdStageWorkflowRequest.DockerRegistryType = string(ciTemplate.DockerRegistry.RegistryType)
cdStageWorkflowRequest.DockerRegistryURL = ciTemplate.DockerRegistry.RegistryURL
appLabels, err := impl.appLabelRepository.FindAllByAppId(cdPipeline.AppId)
if err != nil && err != pg.ErrNoRows {
impl.logger.Errorw("error in getting labels by appId", "err", err, "appId", cdPipeline.AppId)
return nil, err
}
for i, appLabel := range appLabels {
extraEnvVariables[fmt.Sprintf("%s_%d", APP_LABEL_KEY_PREFIX, i+1)] = appLabel.Key
extraEnvVariables[fmt.Sprintf("%s_%d", APP_LABEL_VALUE_PREFIX, i+1)] = appLabel.Value
}
if len(appLabels) > 0 {
extraEnvVariables[APP_LABEL_COUNT] = strconv.Itoa(len(appLabels))
}
}

cdStageWorkflowRequest.ExtraEnvironmentVariables = extraEnvVariables
if deployStageTriggeredByUser != nil {
cdStageWorkflowRequest.DeploymentTriggerTime = deployStageWfr.StartedOn
cdStageWorkflowRequest.DeploymentTriggeredBy = deployStageTriggeredByUser.EmailId
Expand Down
5 changes: 3 additions & 2 deletions wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.