From f94fda607ed89823d18a792b4a5dcd9362260667 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 12 Sep 2023 12:40:48 +0530 Subject: [PATCH 01/27] retriggering ci-workflow with deleted workflow git triggers --- .../pipelineConfig/CiWorkflowRepository.go | 59 +++++++++----- pkg/pipeline/CiConfig.go | 1 + pkg/pipeline/CiHandler.go | 80 +++++++++++++++++++ pkg/pipeline/CiService.go | 25 +++--- 4 files changed, 134 insertions(+), 31 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index 870afb3a67..8987b7f294 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -18,6 +18,7 @@ package pipelineConfig import ( + "fmt" "github.com/go-pg/pg" "go.uber.org/zap" "time" @@ -33,6 +34,8 @@ type CiWorkflowRepository interface { FindByStatusesIn(activeStatuses []string) ([]*CiWorkflow, error) FindByPipelineId(pipelineId int, offset int, size int) ([]WorkflowWithArtifact, error) FindById(id int) (*CiWorkflow, error) + FindReferenceWorkflowById(id int) (*CiWorkflow, error) + FindRetriedWorkflowCountByReferenceId(id int) (int, error) FindCiWorkflowGitTriggersById(id int) (workflow *CiWorkflow, err error) FindByName(name string) (*CiWorkflow, error) @@ -51,25 +54,26 @@ type CiWorkflowRepositoryImpl struct { } type CiWorkflow struct { - tableName struct{} `sql:"ci_workflow" pg:",discard_unknown_columns"` - Id int `sql:"id,pk"` - Name string `sql:"name"` - Status string `sql:"status"` - PodStatus string `sql:"pod_status"` - Message string `sql:"message"` - StartedOn time.Time `sql:"started_on"` - FinishedOn time.Time `sql:"finished_on"` - CiPipelineId int `sql:"ci_pipeline_id"` - Namespace string `sql:"namespace"` - BlobStorageEnabled bool `sql:"blob_storage_enabled,notnull"` - LogLocation string `sql:"log_file_path"` - GitTriggers map[int]GitCommit `sql:"git_triggers"` - TriggeredBy int32 `sql:"triggered_by"` - CiArtifactLocation string `sql:"ci_artifact_location"` - PodName string `sql:"pod_name"` - CiBuildType string `sql:"ci_build_type"` - EnvironmentId int `sql:"environment_id"` - CiPipeline *CiPipeline + tableName struct{} `sql:"ci_workflow" pg:",discard_unknown_columns"` + Id int `sql:"id,pk"` + Name string `sql:"name"` + Status string `sql:"status"` + PodStatus string `sql:"pod_status"` + Message string `sql:"message"` + StartedOn time.Time `sql:"started_on"` + FinishedOn time.Time `sql:"finished_on"` + CiPipelineId int `sql:"ci_pipeline_id"` + Namespace string `sql:"namespace"` + BlobStorageEnabled bool `sql:"blob_storage_enabled,notnull"` + LogLocation string `sql:"log_file_path"` + GitTriggers map[int]GitCommit `sql:"git_triggers"` + TriggeredBy int32 `sql:"triggered_by"` + CiArtifactLocation string `sql:"ci_artifact_location"` + PodName string `sql:"pod_name"` + CiBuildType string `sql:"ci_build_type"` + EnvironmentId int `sql:"environment_id"` + ReferenceCiWorkflowId int `sql:"ref_ci_workflow_id"` + CiPipeline *CiPipeline } type WorkflowWithArtifact struct { @@ -192,6 +196,23 @@ func (impl *CiWorkflowRepositoryImpl) FindById(id int) (*CiWorkflow, error) { return workflow, err } +func (impl *CiWorkflowRepositoryImpl) FindReferenceWorkflowById(id int) (*CiWorkflow, error) { + workflow, err := impl.FindById(id) + if workflow.ReferenceCiWorkflowId != 0 { + workflow, err = impl.FindById(workflow.ReferenceCiWorkflowId) + } + return workflow, err +} + +func (impl *CiWorkflowRepositoryImpl) FindRetriedWorkflowCountByReferenceId(id int) (int, error) { + retryCount := 0 + query := fmt.Sprintf("select count(*) "+ + "from ci_workflow where ref_ci_workflow_id = %v", id) + + _, err := impl.dbConnection.Query(&retryCount, query) + return retryCount, err +} + func (impl *CiWorkflowRepositoryImpl) FindCiWorkflowGitTriggersById(id int) (ciWorkflow *CiWorkflow, err error) { workflow := &CiWorkflow{} err = impl.dbConnection.Model(workflow). diff --git a/pkg/pipeline/CiConfig.go b/pkg/pipeline/CiConfig.go index 0949f55272..86ca2449e5 100644 --- a/pkg/pipeline/CiConfig.go +++ b/pkg/pipeline/CiConfig.go @@ -85,6 +85,7 @@ type CiConfig struct { OrchestratorHost string `env:"ORCH_HOST" envDefault:"http://devtroncd-orchestrator-service-prod.devtroncd/webhook/msg/nats"` OrchestratorToken string `env:"ORCH_TOKEN" envDefault:""` BuildxK8sDriverOptions string `env:"BUILDX_K8S_DRIVER_OPTIONS" envDefault:""` + MaxCiWorkflowRetries int `env:"MAX_CI_WORKFLOW_RETRIES" envDefault:"0"` } type CiVolumeMount struct { diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index dca4644afd..a87954471a 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -58,6 +58,7 @@ import ( type CiHandler interface { HandleCIWebhook(gitCiTriggerRequest bean.GitCiTriggerRequest) (int, error) HandleCIManual(ciTriggerRequest bean.CiTriggerRequest) (int, error) + HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) FetchMaterialsByPipelineId(pipelineId int, showAll bool) ([]pipelineConfig.CiPipelineMaterialResponse, error) FetchMaterialsByPipelineIdAndGitMaterialId(pipelineId int, gitMaterialId int, showAll bool) ([]pipelineConfig.CiPipelineMaterialResponse, error) @@ -180,12 +181,70 @@ type Trigger struct { InvalidateCache bool ExtraEnvironmentVariables map[string]string // extra env variables which will be used for CI EnvironmentId int + ReferenceCiWorkflowId int } const WorkflowCancel = "CANCELLED" const DefaultCiWorkflowNamespace = "devtron-ci" const Running = "Running" const Starting = "Starting" +const POD_DELETED_MESSAGE = "pod deleted" + +func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) { + if impl.ciConfig.MaxCiWorkflowRetries == 0 { + return + } + status, message, retryCount, ciWorkFlow, err := impl.extractPodStatusAndWorkflow(workflowStatus) + if !(status == string(v1alpha1.NodeFailed) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.ciConfig.MaxCiWorkflowRetries) { + return + } + impl.Logger.Debugw("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) + //commitHashes, extraEnvironmentVariables, err := impl.buildManualTriggerCommitHashes(ciTriggerRequest) + + //build commithashes from parent workflow + commitHashes := make(map[int]bean.GitCommit) + for k, v := range ciWorkFlow.GitTriggers { + gitCommit := bean.GitCommit{ + Commit: v.Commit, + Author: v.Author, + Date: v.Date, + Message: v.Message, + Changes: v.Changes, + CiConfigureSourceValue: v.CiConfigureSourceValue, + CiConfigureSourceType: v.CiConfigureSourceType, + GitRepoUrl: v.GitRepoUrl, + GitRepoName: v.GitRepoName, + } + webhookData := v.WebhookData + if webhookData.Id != 0 { + gitCommit.WebhookData = &bean.WebhookData{ + Id: webhookData.Id, + EventActionType: webhookData.EventActionType, + Data: webhookData.Data, + } + } + + commitHashes[k] = gitCommit + } + + trigger := Trigger{ + PipelineId: ciWorkFlow.CiPipelineId, + CommitHashes: commitHashes, + CiMaterials: nil, + TriggeredBy: 1, + InvalidateCache: true, //TODO: ciTriggerRequest.InvalidateCache, + //TODO: ExtraEnvironmentVariables: extraEnvironmentVariables, + EnvironmentId: ciWorkFlow.EnvironmentId, + ReferenceCiWorkflowId: ciWorkFlow.Id, + } + _, err = impl.ciService.TriggerCiPipeline(trigger) + + if err != nil { + impl.Logger.Errorw("error occured in retriggering ciWorkflow", "triggerDetails", trigger, "err", err) + return + } + +} func (impl *CiHandlerImpl) HandleCIManual(ciTriggerRequest bean.CiTriggerRequest) (int, error) { impl.Logger.Debugw("HandleCIManual for pipeline ", "PipelineId", ciTriggerRequest.PipelineId) @@ -904,6 +963,27 @@ func (impl *CiHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.Workflow const CiStageFailErrorCode = 2 +func (impl *CiHandlerImpl) extractPodStatusAndWorkflow(workflowStatus v1alpha1.WorkflowStatus) (string, string, int, *pipelineConfig.CiWorkflow, error) { + workflowName, status, _, message, _, _ := impl.extractWorkfowStatus(workflowStatus) + if workflowName == "" { + impl.Logger.Errorw("extract workflow status, invalid wf name", "workflowName", workflowName, "status", status, "message", message) + return status, message, 0, nil, errors.New("invalid wf name") + } + workflowId, err := strconv.Atoi(workflowName[:strings.Index(workflowName, "-")]) + if err != nil { + impl.Logger.Errorw("invalid wf status update req", "err", err) + return status, message, 0, nil, err + } + + savedWorkflow, err := impl.ciWorkflowRepository.FindReferenceWorkflowById(workflowId) + if err != nil { + impl.Logger.Errorw("cannot get saved wf", "err", err) + return status, message, 0, savedWorkflow, err + } + retryCount, err := impl.ciWorkflowRepository.FindRetriedWorkflowCountByReferenceId(savedWorkflow.Id) + return status, message, retryCount, savedWorkflow, nil +} + func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, error) { workflowName, status, podStatus, message, _, podName := impl.extractWorkfowStatus(workflowStatus) if workflowName == "" { diff --git a/pkg/pipeline/CiService.go b/pkg/pipeline/CiService.go index d900349597..8fe1b2d62b 100644 --- a/pkg/pipeline/CiService.go +++ b/pkg/pipeline/CiService.go @@ -160,7 +160,7 @@ func (impl *CiServiceImpl) TriggerCiPipeline(trigger Trigger) (int, error) { UserMessage: "No tasks are configured in this job pipeline", } } - savedCiWf, err := impl.saveNewWorkflow(pipeline, ciWorkflowConfig, trigger.CommitHashes, trigger.TriggeredBy, trigger.EnvironmentId, isJob) + savedCiWf, err := impl.saveNewWorkflow(pipeline, ciWorkflowConfig, trigger.CommitHashes, trigger.TriggeredBy, trigger.EnvironmentId, isJob, trigger.ReferenceCiWorkflowId) if err != nil { impl.Logger.Errorw("could not save new workflow", "err", err) return 0, err @@ -291,7 +291,7 @@ func (impl *CiServiceImpl) BuildPayload(trigger Trigger, pipeline *pipelineConfi } func (impl *CiServiceImpl) saveNewWorkflow(pipeline *pipelineConfig.CiPipeline, wfConfig *pipelineConfig.CiWorkflowConfig, - commitHashes map[int]bean.GitCommit, userId int32, EnvironmentId int, isJob bool) (wf *pipelineConfig.CiWorkflow, error error) { + commitHashes map[int]bean.GitCommit, userId int32, EnvironmentId int, isJob bool, refCiWorkflowId int) (wf *pipelineConfig.CiWorkflow, error error) { gitTriggers := make(map[int]pipelineConfig.GitCommit) for k, v := range commitHashes { gitCommit := pipelineConfig.GitCommit{ @@ -318,16 +318,17 @@ func (impl *CiServiceImpl) saveNewWorkflow(pipeline *pipelineConfig.CiPipeline, } ciWorkflow := &pipelineConfig.CiWorkflow{ - Name: pipeline.Name + "-" + strconv.Itoa(pipeline.Id), - Status: pipelineConfig.WorkflowStarting, - Message: "", - StartedOn: time.Now(), - CiPipelineId: pipeline.Id, - Namespace: impl.ciConfig.DefaultNamespace, - BlobStorageEnabled: impl.ciConfig.BlobStorageEnabled, - GitTriggers: gitTriggers, - LogLocation: "", - TriggeredBy: userId, + Name: pipeline.Name + "-" + strconv.Itoa(pipeline.Id), + Status: pipelineConfig.WorkflowStarting, + Message: "", + StartedOn: time.Now(), + CiPipelineId: pipeline.Id, + Namespace: impl.ciConfig.DefaultNamespace, + BlobStorageEnabled: impl.ciConfig.BlobStorageEnabled, + GitTriggers: gitTriggers, + LogLocation: "", + TriggeredBy: userId, + ReferenceCiWorkflowId: refCiWorkflowId, } if isJob { ciWorkflow.Namespace = wfConfig.Namespace From 555b2e41ce9e8ea91150c1ea1eb0cdf4accc99e0 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 12 Sep 2023 12:44:22 +0530 Subject: [PATCH 02/27] re-triggering cd stages with current configurations --- .../pubsub/WorkflowStatusUpdateHandler.go | 6 ++- .../pipelineConfig/CdWorfkflowRepository.go | 46 ++++++++++++------- pkg/pipeline/CdConfig.go | 1 + pkg/pipeline/CdHandler.go | 38 +++++++++++++++ pkg/pipeline/WorkflowDagExecutor.go | 40 ++++++++-------- 5 files changed, 94 insertions(+), 37 deletions(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 49e836d3ca..255104bfe6 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -77,7 +77,7 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error { impl.logger.Errorw("error while unmarshalling wf status update", "err", err, "msg", string(msg.Data)) return } - + go impl.ciHandler.HandleReTriggerCI(wfStatus) _, err = impl.ciHandler.UpdateWorkflow(wfStatus) if err != nil { impl.logger.Errorw("error on update workflow status", "err", err, "msg", string(msg.Data)) @@ -125,6 +125,10 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { } else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { eventType = util.Fail } + + if wfrStatus == string(v1alpha1.NodeFailed) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { + impl.cdHandler.HandleCdStageReTrigger(wfr) + } if wfr.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { event := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, util.CD) impl.logger.Debugw("event pre stage", "event", event) diff --git a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go index 1e83fba28d..ffac320e2d 100644 --- a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go @@ -19,6 +19,7 @@ package pipelineConfig import ( "context" + "fmt" "github.com/argoproj/gitops-engine/pkg/health" "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/client/argocdServer/application" @@ -49,6 +50,7 @@ type CdWorkflowRepository interface { FindPreviousCdWfRunnerByStatus(pipelineId int, currentWFRunnerId int, status []string) ([]*CdWorkflowRunner, error) FindConfigByPipelineId(pipelineId int) (*CdWorkflowConfig, error) FindWorkflowRunnerById(wfrId int) (*CdWorkflowRunner, error) + FindRetriedWorkflowCountByReferenceId(wfrId int) (int, error) FindLatestWfrByAppIdAndEnvironmentId(appId int, environmentId int) (*CdWorkflowRunner, error) FindLatestCdWorkflowRunnerByEnvironmentIdAndRunnerType(appId int, environmentId int, runnerType bean.WorkflowType) (CdWorkflowRunner, error) @@ -143,23 +145,24 @@ const WORKFLOW_EXECUTOR_TYPE_AWF = "AWF" const WORKFLOW_EXECUTOR_TYPE_SYSTEM = "SYSTEM" type CdWorkflowRunner struct { - tableName struct{} `sql:"cd_workflow_runner" pg:",discard_unknown_columns"` - Id int `sql:"id,pk"` - Name string `sql:"name"` - WorkflowType bean.WorkflowType `sql:"workflow_type"` //pre,post,deploy - ExecutorType WorkflowExecutorType `sql:"executor_type"` //awf, system - Status string `sql:"status"` - PodStatus string `sql:"pod_status"` - Message string `sql:"message"` - StartedOn time.Time `sql:"started_on"` - FinishedOn time.Time `sql:"finished_on"` - Namespace string `sql:"namespace"` - LogLocation string `sql:"log_file_path"` - TriggeredBy int32 `sql:"triggered_by"` - CdWorkflowId int `sql:"cd_workflow_id"` - PodName string `sql:"pod_name"` - BlobStorageEnabled bool `sql:"blob_storage_enabled,notnull"` - CdWorkflow *CdWorkflow + tableName struct{} `sql:"cd_workflow_runner" pg:",discard_unknown_columns"` + Id int `sql:"id,pk"` + Name string `sql:"name"` + WorkflowType bean.WorkflowType `sql:"workflow_type"` //pre,post,deploy + ExecutorType WorkflowExecutorType `sql:"executor_type"` //awf, system + Status string `sql:"status"` + PodStatus string `sql:"pod_status"` + Message string `sql:"message"` + StartedOn time.Time `sql:"started_on"` + FinishedOn time.Time `sql:"finished_on"` + Namespace string `sql:"namespace"` + LogLocation string `sql:"log_file_path"` + TriggeredBy int32 `sql:"triggered_by"` + CdWorkflowId int `sql:"cd_workflow_id"` + PodName string `sql:"pod_name"` + BlobStorageEnabled bool `sql:"blob_storage_enabled,notnull"` + RefCdWorkflowRunnerId int `sql:"ref_cd_workflow_runner_id"` + CdWorkflow *CdWorkflow sql.AuditLog } @@ -486,6 +489,15 @@ func (impl *CdWorkflowRepositoryImpl) FindWorkflowRunnerById(wfrId int) (*CdWork return wfr, err } +func (impl *CdWorkflowRepositoryImpl) FindRetriedWorkflowCountByReferenceId(wfrId int) (int, error) { + retryCount := 0 + query := fmt.Sprintf("select count(*) "+ + "from cd_workflow_runner where ref_cd_workflow_runner_id = %v", wfrId) + + _, err := impl.dbConnection.Query(&retryCount, query) + return retryCount, err +} + func (impl *CdWorkflowRepositoryImpl) FindByWorkflowIdAndRunnerType(ctx context.Context, wfId int, runnerType bean.WorkflowType) (CdWorkflowRunner, error) { var wfr CdWorkflowRunner _, span := otel.Tracer("orchestrator").Start(ctx, "cdWorkflowRepository.FindByWorkflowIdAndRunnerType") diff --git a/pkg/pipeline/CdConfig.go b/pkg/pipeline/CdConfig.go index 0fb7f47443..6dc6c42078 100644 --- a/pkg/pipeline/CdConfig.go +++ b/pkg/pipeline/CdConfig.go @@ -73,6 +73,7 @@ type CdConfig struct { BaseLogLocationPath string `env:"BASE_LOG_LOCATION_PATH" envDefault:"/home/devtron/"` CdWorkflowExecutorType pipelineConfig.WorkflowExecutorType `env:"CD_WORKFLOW_EXECUTOR_TYPE" envDefault:"AWF"` InAppLoggingEnabled bool `env:"IN_APP_LOGGING_ENABLED" envDefault:"false"` + MaxCdWorkflowRunnerRetries int `env:"MAX_CD_WORKFLOW_RUNNER_RETRIES" envDefault:"0"` } func GetCdConfig() (*CdConfig, error) { diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 5fc6d35254..9041fde1db 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -65,6 +65,7 @@ const ( ) type CdHandler interface { + HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) GetCdBuildHistory(appId int, environmentId int, pipelineId int, offset int, size int) ([]pipelineConfig.CdWorkflowWithArtifact, error) GetRunningWorkflowLogs(environmentId int, pipelineId int, workflowId int) (*bufio.Reader, func() error, error) @@ -170,6 +171,43 @@ const WorklowTypeDeploy = "DEPLOY" const WorklowTypePre = "PRE" const WorklowTypePost = "POST" +func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) { + var err error + if runner == nil || impl.cdConfig.MaxCdWorkflowRunnerRetries == 0 { + return + } + if runner.RefCdWorkflowRunnerId != 0 { + runner, err = impl.cdWorkflowRepository.FindWorkflowRunnerById(runner.RefCdWorkflowRunnerId) + if err != nil { + impl.Logger.Errorw("error in FindWorkflowRunnerById by id ", "err", err, "wfrId", runner.RefCdWorkflowRunnerId) + return + } + } + retryCnt, err := impl.cdWorkflowRepository.FindRetriedWorkflowCountByReferenceId(runner.Id) + if err != nil { + impl.Logger.Errorw("error in FindRetriedWorkflowCountByReferenceId ", "err", err, "cdWorkflowRunnerId", runner.Id) + return + } + + if retryCnt >= impl.cdConfig.MaxCdWorkflowRunnerRetries { + impl.Logger.Debugw("maximum retries for this workflow are exhausted, not re-triggering again", "retries", retryCnt, "wfrId", runner.Id) + return + } + + if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { + err = impl.workflowDagExecutor.TriggerPreStage(context.Background(), nil, runner.CdWorkflow.CiArtifact, runner.CdWorkflow.Pipeline, 1, false, runner.Id) + if err != nil { + return + } + } else if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_POST { + err = impl.workflowDagExecutor.TriggerPostStage(runner.CdWorkflow, runner.CdWorkflow.Pipeline, 1, runner.Id) + if err != nil { + return + } + } + return +} + func (impl *CdHandlerImpl) CheckArgoAppStatusPeriodicallyAndUpdateInDb(getPipelineDeployedBeforeMinutes int, getPipelineDeployedWithinHours int) error { pipelines, err := impl.pipelineRepository.GetArgoPipelinesHavingLatestTriggerStuckInNonTerminalStatuses(getPipelineDeployedBeforeMinutes, getPipelineDeployedWithinHours) if err != nil { diff --git a/pkg/pipeline/WorkflowDagExecutor.go b/pkg/pipeline/WorkflowDagExecutor.go index b81b0bd0fb..1021ba9b39 100644 --- a/pkg/pipeline/WorkflowDagExecutor.go +++ b/pkg/pipeline/WorkflowDagExecutor.go @@ -69,7 +69,8 @@ type WorkflowDagExecutor interface { HandleDeploymentSuccessEvent(gitHash string, pipelineOverrideId int) error HandlePostStageSuccessEvent(cdWorkflowId int, cdPipelineId int, triggeredBy int32) error Subscribe() error - TriggerPostStage(cdWf *pipelineConfig.CdWorkflow, cdPipeline *pipelineConfig.Pipeline, triggeredBy int32) error + TriggerPostStage(cdWf *pipelineConfig.CdWorkflow, cdPipeline *pipelineConfig.Pipeline, triggeredBy int32, refCdWorkflowRunnerId int) error + TriggerPreStage(ctx context.Context, cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32, applyAuth bool, refCdWorkflowRunnerId int) error TriggerDeployment(cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, applyAuth bool, triggeredBy int32) error ManualCdTrigger(overrideRequest *bean.ValuesOverrideRequest, ctx context.Context) (int, error) TriggerBulkDeploymentAsync(requests []*BulkTriggerRequest, UserId int32) (interface{}, error) @@ -397,7 +398,7 @@ func (impl *WorkflowDagExecutorImpl) triggerStage(cdWf *pipelineConfig.CdWorkflo // pre stage exists if pipeline.PreTriggerType == pipelineConfig.TRIGGER_TYPE_AUTOMATIC { impl.logger.Debugw("trigger pre stage for pipeline", "artifactId", artifact.Id, "pipelineId", pipeline.Id) - err = impl.TriggerPreStage(context.Background(), cdWf, artifact, pipeline, artifact.UpdatedBy, applyAuth) //TODO handle error here + err = impl.TriggerPreStage(context.Background(), cdWf, artifact, pipeline, artifact.UpdatedBy, applyAuth, 0) //TODO handle error here return err } } else if pipeline.TriggerType == pipelineConfig.TRIGGER_TYPE_AUTOMATIC { @@ -427,7 +428,7 @@ func (impl *WorkflowDagExecutorImpl) triggerStageForBulk(cdWf *pipelineConfig.Cd if len(pipeline.PreStageConfig) > 0 || (preStage != nil && !deleted) { //pre stage exists impl.logger.Debugw("trigger pre stage for pipeline", "artifactId", artifact.Id, "pipelineId", pipeline.Id) - err = impl.TriggerPreStage(context.Background(), cdWf, artifact, pipeline, artifact.UpdatedBy, applyAuth) //TODO handle error here + err = impl.TriggerPreStage(context.Background(), cdWf, artifact, pipeline, artifact.UpdatedBy, applyAuth, 0) //TODO handle error here return err } else { // trigger deployment @@ -469,7 +470,7 @@ func (impl *WorkflowDagExecutorImpl) HandlePreStageSuccessEvent(cdStageCompleteE return nil } -func (impl *WorkflowDagExecutorImpl) TriggerPreStage(ctx context.Context, cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32, applyAuth bool) error { +func (impl *WorkflowDagExecutorImpl) TriggerPreStage(ctx context.Context, cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32, applyAuth bool, refCdWorkflowRunnerId int) error { //setting triggeredAt variable to have consistent data for various audit log places in db for deployment time triggeredAt := time.Now() @@ -587,22 +588,23 @@ func convert(ts string) (*time.Time, error) { return &t, nil } -func (impl *WorkflowDagExecutorImpl) TriggerPostStage(cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline, triggeredBy int32) error { +func (impl *WorkflowDagExecutorImpl) TriggerPostStage(cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline, triggeredBy int32, refCdWorkflowRunnerId int) error { //setting triggeredAt variable to have consistent data for various audit log places in db for deployment time triggeredAt := time.Now() runner := &pipelineConfig.CdWorkflowRunner{ - Name: pipeline.Name, - WorkflowType: bean.CD_WORKFLOW_TYPE_POST, - ExecutorType: impl.cdConfig.CdWorkflowExecutorType, - Status: pipelineConfig.WorkflowStarting, - TriggeredBy: triggeredBy, - StartedOn: triggeredAt, - Namespace: impl.cdConfig.DefaultNamespace, - BlobStorageEnabled: impl.cdConfig.BlobStorageEnabled, - CdWorkflowId: cdWf.Id, - LogLocation: fmt.Sprintf("%s/%s%s-%s/main.log", impl.cdConfig.DefaultBuildLogsKeyPrefix, strconv.Itoa(cdWf.Id), string(bean.CD_WORKFLOW_TYPE_POST), pipeline.Name), - AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy}, + Name: pipeline.Name, + WorkflowType: bean.CD_WORKFLOW_TYPE_POST, + ExecutorType: impl.cdConfig.CdWorkflowExecutorType, + Status: pipelineConfig.WorkflowStarting, + TriggeredBy: triggeredBy, + StartedOn: triggeredAt, + Namespace: impl.cdConfig.DefaultNamespace, + BlobStorageEnabled: impl.cdConfig.BlobStorageEnabled, + CdWorkflowId: cdWf.Id, + LogLocation: fmt.Sprintf("%s/%s%s-%s/main.log", impl.cdConfig.DefaultBuildLogsKeyPrefix, strconv.Itoa(cdWf.Id), string(bean.CD_WORKFLOW_TYPE_POST), pipeline.Name), + AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy}, + RefCdWorkflowRunnerId: refCdWorkflowRunnerId, } var env *repository2.Environment var err error @@ -1166,7 +1168,7 @@ func (impl *WorkflowDagExecutorImpl) HandleDeploymentSuccessEvent(gitHash string pipelineOverride.DeploymentType != models.DEPLOYMENTTYPE_STOP && pipelineOverride.DeploymentType != models.DEPLOYMENTTYPE_START { - err = impl.TriggerPostStage(cdWorkflow, pipelineOverride.Pipeline, triggeredByUser) + err = impl.TriggerPostStage(cdWorkflow, pipelineOverride.Pipeline, triggeredByUser, 0) if err != nil { impl.logger.Errorw("error in triggering post stage after successful deployment event", "err", err, "cdWorkflow", cdWorkflow) return err @@ -1629,7 +1631,7 @@ func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.Value return 0, err } _, span = otel.Tracer("orchestrator").Start(ctx, "TriggerPreStage") - err = impl.TriggerPreStage(ctx, nil, artifact, cdPipeline, overrideRequest.UserId, false) + err = impl.TriggerPreStage(ctx, nil, artifact, cdPipeline, overrideRequest.UserId, false, 0) span.End() if err != nil { impl.logger.Errorw("err", "err", err) @@ -1787,7 +1789,7 @@ func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.Value } } _, span = otel.Tracer("orchestrator").Start(ctx, "TriggerPostStage") - err = impl.TriggerPostStage(cdWf, cdPipeline, overrideRequest.UserId) + err = impl.TriggerPostStage(cdWf, cdPipeline, overrideRequest.UserId, 0) span.End() } return releaseId, err From 21924e0e8ba37aeccf1085c17b065ad5239081d3 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 12 Sep 2023 13:21:18 +0530 Subject: [PATCH 03/27] sql script added --- scripts/sql/169_workflow_retry.down.sql | 2 ++ scripts/sql/169_workflow_retry.up.sql | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 scripts/sql/169_workflow_retry.down.sql create mode 100644 scripts/sql/169_workflow_retry.up.sql diff --git a/scripts/sql/169_workflow_retry.down.sql b/scripts/sql/169_workflow_retry.down.sql new file mode 100644 index 0000000000..06a7604cb1 --- /dev/null +++ b/scripts/sql/169_workflow_retry.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE ci_workflow DROP COLUMN ref_ci_workflow_id; +ALTER TABLE cd_workflow_runner DROP COLUMN ref_cd_workflow_runner_id; \ No newline at end of file diff --git a/scripts/sql/169_workflow_retry.up.sql b/scripts/sql/169_workflow_retry.up.sql new file mode 100644 index 0000000000..b724489e02 --- /dev/null +++ b/scripts/sql/169_workflow_retry.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE ci_workflow ADD COLUMN ref_ci_workflow_id integer; +ALTER TABLE cd_workflow_runner ADD COLUMN ref_cd_workflow_runner_id integer; \ No newline at end of file From 02d7a8b516a7826d4f9ca6c0496afd1fa5896b9a Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 12 Sep 2023 15:20:28 +0530 Subject: [PATCH 04/27] added retrigger cancel checks --- .../pipelineConfig/CiWorkflowRepository.go | 9 --------- pkg/pipeline/CdHandler.go | 3 ++- pkg/pipeline/CiHandler.go | 16 +++++++++++++++- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index 8987b7f294..c3e38ee012 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -34,7 +34,6 @@ type CiWorkflowRepository interface { FindByStatusesIn(activeStatuses []string) ([]*CiWorkflow, error) FindByPipelineId(pipelineId int, offset int, size int) ([]WorkflowWithArtifact, error) FindById(id int) (*CiWorkflow, error) - FindReferenceWorkflowById(id int) (*CiWorkflow, error) FindRetriedWorkflowCountByReferenceId(id int) (int, error) FindCiWorkflowGitTriggersById(id int) (workflow *CiWorkflow, err error) FindByName(name string) (*CiWorkflow, error) @@ -196,14 +195,6 @@ func (impl *CiWorkflowRepositoryImpl) FindById(id int) (*CiWorkflow, error) { return workflow, err } -func (impl *CiWorkflowRepositoryImpl) FindReferenceWorkflowById(id int) (*CiWorkflow, error) { - workflow, err := impl.FindById(id) - if workflow.ReferenceCiWorkflowId != 0 { - workflow, err = impl.FindById(workflow.ReferenceCiWorkflowId) - } - return workflow, err -} - func (impl *CiWorkflowRepositoryImpl) FindRetriedWorkflowCountByReferenceId(id int) (int, error) { retryCount := 0 query := fmt.Sprintf("select count(*) "+ diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 9041fde1db..a12d8c520b 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -173,7 +173,8 @@ const WorklowTypePost = "POST" func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) { var err error - if runner == nil || impl.cdConfig.MaxCdWorkflowRunnerRetries == 0 { + // do not re-trigger if retries = 0 or last workflow is aborted + if runner == nil || impl.cdConfig.MaxCdWorkflowRunnerRetries == 0 || runner.Status == WorkflowCancel { return } if runner.RefCdWorkflowRunnerId != 0 { diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index a87954471a..2a04a1e0e9 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -195,6 +195,10 @@ func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowSta return } status, message, retryCount, ciWorkFlow, err := impl.extractPodStatusAndWorkflow(workflowStatus) + if err != nil { + impl.Logger.Errorw("error in extractPodStatusAndWorkflow", "err", err) + return + } if !(status == string(v1alpha1.NodeFailed) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.ciConfig.MaxCiWorkflowRetries) { return } @@ -975,7 +979,17 @@ func (impl *CiHandlerImpl) extractPodStatusAndWorkflow(workflowStatus v1alpha1.W return status, message, 0, nil, err } - savedWorkflow, err := impl.ciWorkflowRepository.FindReferenceWorkflowById(workflowId) + savedWorkflow, err := impl.ciWorkflowRepository.FindById(workflowId) + if err != nil { + impl.Logger.Errorw("cannot get saved wf", "err", err) + return status, message, 0, savedWorkflow, err + } + if savedWorkflow.Status == WorkflowCancel { + return status, message, 0, nil, errors.New("not re-triggering as previous trigger is aborted/cancelled") + } + if savedWorkflow.ReferenceCiWorkflowId != 0 { + savedWorkflow, err = impl.ciWorkflowRepository.FindById(savedWorkflow.ReferenceCiWorkflowId) + } if err != nil { impl.Logger.Errorw("cannot get saved wf", "err", err) return status, message, 0, savedWorkflow, err From cd07ac000c03ba18b917799d4a71eb18149fa6dd Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Fri, 15 Sep 2023 14:48:15 +0530 Subject: [PATCH 05/27] retry workflow method added in workflowstatus check cron --- pkg/pipeline/CiHandler.go | 48 ++++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 2a04a1e0e9..40223f094d 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -191,6 +191,7 @@ const Starting = "Starting" const POD_DELETED_MESSAGE = "pod deleted" func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) { + if impl.ciConfig.MaxCiWorkflowRetries == 0 { return } @@ -199,6 +200,12 @@ func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowSta impl.Logger.Errorw("error in extractPodStatusAndWorkflow", "err", err) return } + + impl.reTriggerCi(status, message, retryCount, ciWorkFlow) + +} + +func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, ciWorkFlow *pipelineConfig.CiWorkflow) { if !(status == string(v1alpha1.NodeFailed) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.ciConfig.MaxCiWorkflowRetries) { return } @@ -241,13 +248,12 @@ func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowSta EnvironmentId: ciWorkFlow.EnvironmentId, ReferenceCiWorkflowId: ciWorkFlow.Id, } - _, err = impl.ciService.TriggerCiPipeline(trigger) + _, err := impl.ciService.TriggerCiPipeline(trigger) if err != nil { impl.Logger.Errorw("error occured in retriggering ciWorkflow", "triggerDetails", trigger, "err", err) return } - } func (impl *CiHandlerImpl) HandleCIManual(ciTriggerRequest bean.CiTriggerRequest) (int, error) { @@ -987,15 +993,23 @@ func (impl *CiHandlerImpl) extractPodStatusAndWorkflow(workflowStatus v1alpha1.W if savedWorkflow.Status == WorkflowCancel { return status, message, 0, nil, errors.New("not re-triggering as previous trigger is aborted/cancelled") } + + retryCount, savedWorkflow, err := impl.getRefWorkflowAndCiRetryCount(savedWorkflow) + return status, message, retryCount, savedWorkflow, err +} + +func (impl *CiHandlerImpl) getRefWorkflowAndCiRetryCount(savedWorkflow *pipelineConfig.CiWorkflow) (int, *pipelineConfig.CiWorkflow, error) { + var err error + if savedWorkflow.ReferenceCiWorkflowId != 0 { savedWorkflow, err = impl.ciWorkflowRepository.FindById(savedWorkflow.ReferenceCiWorkflowId) } if err != nil { impl.Logger.Errorw("cannot get saved wf", "err", err) - return status, message, 0, savedWorkflow, err + return 0, savedWorkflow, err } retryCount, err := impl.ciWorkflowRepository.FindRetriedWorkflowCountByReferenceId(savedWorkflow.Id) - return status, message, retryCount, savedWorkflow, nil + return retryCount, savedWorkflow, err } func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, error) { @@ -1538,6 +1552,7 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil } isEligibleToMarkFailed := false + isPodDeleted := false if time.Since(ciWorkflow.StartedOn) > (time.Minute * time.Duration(timeoutForFailureCiBuild)) { //check weather pod is exists or not, if exits check its status _, err := impl.workflowService.GetWorkflow(ciWorkflow.Name, ciWorkflow.Namespace, isExt, env) @@ -1555,7 +1570,7 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil //if ci workflow is exists, check its pod if !isEligibleToMarkFailed { - _, err = impl.K8sUtil.GetPodByName(DefaultCiWorkflowNamespace, ciWorkflow.PodName, client) + pod, err := impl.K8sUtil.GetPodByName(DefaultCiWorkflowNamespace, ciWorkflow.PodName, client) if err != nil { impl.Logger.Warnw("unable to fetch ci workflow - pod", "err", err) statusError, ok := err.(*errors2.StatusError) @@ -1567,12 +1582,29 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil // skip this and process for next ci workflow } } + if pod.Status.Message == POD_DELETED_MESSAGE { + isPodDeleted = true + ciWorkflow.PodStatus = string(pod.Status.Phase) + } } } if isEligibleToMarkFailed { - ciWorkflow.Status = "Failed" - ciWorkflow.PodStatus = "Failed" - ciWorkflow.Message = "marked failed by job" + if isPodDeleted { + ciWorkflow.Status = "Failed" + ciWorkflow.Message = POD_DELETED_MESSAGE + if ciWorkflow.Status != WorkflowCancel { + retryCount, refCiWorkflow, err := impl.getRefWorkflowAndCiRetryCount(ciWorkflow) + if err != nil { + impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id) + continue + } + impl.reTriggerCi(ciWorkflow.Status, ciWorkflow.Message, retryCount, refCiWorkflow) + } + } else { + ciWorkflow.Status = "Failed" + ciWorkflow.PodStatus = "Failed" + ciWorkflow.Message = "marked failed by job" + } err := impl.ciWorkflowRepository.UpdateWorkFlow(ciWorkflow) if err != nil { impl.Logger.Errorw("unable to update ci workflow, its eligible to mark failed", "err", err) From ec14e3103ed593fc220bc5e2126ed8fb914cfd50 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Fri, 15 Sep 2023 15:13:32 +0530 Subject: [PATCH 06/27] fixed bug around ci running in external cluster --- pkg/pipeline/CiHandler.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 40223f094d..6c3bd9aa41 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -1570,7 +1570,22 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil //if ci workflow is exists, check its pod if !isEligibleToMarkFailed { - pod, err := impl.K8sUtil.GetPodByName(DefaultCiWorkflowNamespace, ciWorkflow.PodName, client) + ns := DefaultCiWorkflowNamespace + if isExt { + clusterConfig, err := cluster.GetClusterBean(*env.Cluster).GetClusterConfig() + if err != nil { + impl.Logger.Warnw("error in getting cluster Config", "err", err, "clusterId", env.Cluster.Id) + continue + } + + client, err = impl.K8sUtil.GetCoreV1Client(clusterConfig) + if err != nil { + impl.Logger.Warnw("error in getting core v1 client using clusterConfig", "err", err, "clusterId", env.Cluster.Id) + continue + } + ns = env.Namespace + } + pod, err := impl.K8sUtil.GetPodByName(ns, ciWorkflow.PodName, client) if err != nil { impl.Logger.Warnw("unable to fetch ci workflow - pod", "err", err) statusError, ok := err.(*errors2.StatusError) From c1e9d764451cd9b5388c37d8427e142289f1fd9f Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 11:59:14 +0530 Subject: [PATCH 07/27] get the status from wf itself --- pkg/pipeline/CiHandler.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 6c3bd9aa41..5bf1439469 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -1555,7 +1555,7 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil isPodDeleted := false if time.Since(ciWorkflow.StartedOn) > (time.Minute * time.Duration(timeoutForFailureCiBuild)) { //check weather pod is exists or not, if exits check its status - _, err := impl.workflowService.GetWorkflow(ciWorkflow.Name, ciWorkflow.Namespace, isExt, env) + wf, err := impl.workflowService.GetWorkflow(ciWorkflow.Name, ciWorkflow.Namespace, isExt, env) if err != nil { impl.Logger.Warnw("unable to fetch ci workflow", "err", err) statusError, ok := err.(*errors2.StatusError) @@ -1585,7 +1585,7 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil } ns = env.Namespace } - pod, err := impl.K8sUtil.GetPodByName(ns, ciWorkflow.PodName, client) + _, err := impl.K8sUtil.GetPodByName(ns, ciWorkflow.PodName, client) if err != nil { impl.Logger.Warnw("unable to fetch ci workflow - pod", "err", err) statusError, ok := err.(*errors2.StatusError) @@ -1597,9 +1597,10 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil // skip this and process for next ci workflow } } - if pod.Status.Message == POD_DELETED_MESSAGE { + + //check workflow status,get the status + if wf.Status.Phase == v1alpha1.WorkflowFailed && wf.Status.Message == POD_DELETED_MESSAGE { isPodDeleted = true - ciWorkflow.PodStatus = string(pod.Status.Phase) } } } From cb0d3c5ed79695ef0f59b4b315663470d1f9b054 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 16:55:55 +0530 Subject: [PATCH 08/27] sending referenceWorkflowId in build history --- .../pipelineConfig/CiWorkflowRepository.go | 45 ++++++++++--------- pkg/pipeline/CiHandler.go | 42 ++++++++--------- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index c3e38ee012..7c30b4eb68 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -76,28 +76,29 @@ type CiWorkflow struct { } type WorkflowWithArtifact struct { - Id int `json:"id"` - Name string `json:"name"` - PodName string `json:"podName"` - Status string `json:"status"` - PodStatus string `json:"pod_status"` - Message string `json:"message"` - StartedOn time.Time `json:"started_on"` - FinishedOn time.Time `json:"finished_on"` - CiPipelineId int `json:"ci_pipeline_id"` - Namespace string `json:"namespace"` - LogFilePath string `json:"log_file_path"` - GitTriggers map[int]GitCommit `json:"git_triggers"` - TriggeredBy int32 `json:"triggered_by"` - EmailId string `json:"email_id"` - Image string `json:"image"` - CiArtifactLocation string `json:"ci_artifact_location"` - CiArtifactId int `json:"ci_artifact_d"` - BlobStorageEnabled bool `json:"blobStorageEnabled"` - CiBuildType string `json:"ci_build_type"` - IsArtifactUploaded bool `json:"is_artifact_uploaded"` - EnvironmentId int `json:"environmentId"` - EnvironmentName string `json:"environmentName"` + Id int `json:"id"` + Name string `json:"name"` + PodName string `json:"podName"` + Status string `json:"status"` + PodStatus string `json:"pod_status"` + Message string `json:"message"` + StartedOn time.Time `json:"started_on"` + FinishedOn time.Time `json:"finished_on"` + CiPipelineId int `json:"ci_pipeline_id"` + Namespace string `json:"namespace"` + LogFilePath string `json:"log_file_path"` + GitTriggers map[int]GitCommit `json:"git_triggers"` + TriggeredBy int32 `json:"triggered_by"` + EmailId string `json:"email_id"` + Image string `json:"image"` + CiArtifactLocation string `json:"ci_artifact_location"` + CiArtifactId int `json:"ci_artifact_d"` + BlobStorageEnabled bool `json:"blobStorageEnabled"` + CiBuildType string `json:"ci_build_type"` + IsArtifactUploaded bool `json:"is_artifact_uploaded"` + EnvironmentId int `json:"environmentId"` + EnvironmentName string `json:"environmentName"` + ReferenceCiWorkflowId int `json:"reference_ci_workflow_id"` } type GitCommit struct { diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 5bf1439469..59c2f3893f 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -158,6 +158,7 @@ type WorkflowResponse struct { EnvironmentName string `json:"environmentName"` ImageReleaseTags []*repository2.ImageTag `json:"imageReleaseTags"` ImageComment *repository2.ImageComment `json:"imageComment"` + ReferenceWorkflowId int `json:"referenceWorkflowId"` } type GitTriggerInfoResponse struct { @@ -549,26 +550,27 @@ func (impl *CiHandlerImpl) GetBuildHistory(pipelineId int, appId int, offset int var ciWorkLowResponses []WorkflowResponse for _, w := range workFlows { wfResponse := WorkflowResponse{ - Id: w.Id, - Name: w.Name, - Status: w.Status, - PodStatus: w.PodStatus, - Message: w.Message, - StartedOn: w.StartedOn, - FinishedOn: w.FinishedOn, - CiPipelineId: w.CiPipelineId, - Namespace: w.Namespace, - LogLocation: w.LogFilePath, - GitTriggers: w.GitTriggers, - CiMaterials: ciPipelineMaterialResponses, - Artifact: w.Image, - TriggeredBy: w.TriggeredBy, - TriggeredByEmail: w.EmailId, - ArtifactId: w.CiArtifactId, - BlobStorageEnabled: w.BlobStorageEnabled, - IsArtifactUploaded: w.IsArtifactUploaded, - EnvironmentId: w.EnvironmentId, - EnvironmentName: w.EnvironmentName, + Id: w.Id, + Name: w.Name, + Status: w.Status, + PodStatus: w.PodStatus, + Message: w.Message, + StartedOn: w.StartedOn, + FinishedOn: w.FinishedOn, + CiPipelineId: w.CiPipelineId, + Namespace: w.Namespace, + LogLocation: w.LogFilePath, + GitTriggers: w.GitTriggers, + CiMaterials: ciPipelineMaterialResponses, + Artifact: w.Image, + TriggeredBy: w.TriggeredBy, + TriggeredByEmail: w.EmailId, + ArtifactId: w.CiArtifactId, + BlobStorageEnabled: w.BlobStorageEnabled, + IsArtifactUploaded: w.IsArtifactUploaded, + EnvironmentId: w.EnvironmentId, + EnvironmentName: w.EnvironmentName, + ReferenceWorkflowId: w.ReferenceCiWorkflowId, } if imageTagsDataMap[w.CiArtifactId] != nil { wfResponse.ImageReleaseTags = imageTagsDataMap[w.CiArtifactId] //if artifact is not yet created,empty list will be sent From c4844d9cdf71423f4ab97260e94943d5e0191705 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 17:41:14 +0530 Subject: [PATCH 09/27] code refactoring regarding duplicate GitCommit struct definition --- .../pipelineConfig/CiWorkflowRepository.go | 6 +- pkg/bean/app.go | 33 ++------ pkg/bulkAction/BulkUpdateService.go | 2 +- pkg/git/GitWebhookService.go | 4 +- pkg/pipeline/CiHandler.go | 77 ++++++------------- pkg/pipeline/CiService.go | 58 ++------------ 6 files changed, 43 insertions(+), 137 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index 7c30b4eb68..be13d712c8 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -115,9 +115,9 @@ type GitCommit struct { } type WebhookData struct { - Id int - EventActionType string - Data map[string]string + Id int `json:"id"` + EventActionType string `json:"eventActionType"` + Data map[string]string `json:"data"` } type CiWorkflowConfig struct { diff --git a/pkg/bean/app.go b/pkg/bean/app.go index ffdc2e1458..53f19602ac 100644 --- a/pkg/bean/app.go +++ b/pkg/bean/app.go @@ -242,35 +242,16 @@ type GitCiTriggerRequest struct { ExtraEnvironmentVariables map[string]string `json:"extraEnvironmentVariables"` // extra env variables which will be used for CI } -type GitCommit struct { - Commit string //git hash - Author string - Date time.Time - Message string - Changes []string - WebhookData *WebhookData - GitRepoUrl string - GitRepoName string - CiConfigureSourceType pipelineConfig.SourceType - CiConfigureSourceValue string -} - -type WebhookData struct { - Id int `json:"id"` - EventActionType string `json:"eventActionType"` - Data map[string]string `json:"data"` -} - type SourceType string type CiPipelineMaterial struct { - Id int `json:"Id"` - GitMaterialId int `json:"GitMaterialId"` - Type string `json:"Type"` - Value string `json:"Value"` - Active bool `json:"Active"` - GitCommit GitCommit `json:"GitCommit"` - GitTag string `json:"GitTag"` + Id int `json:"Id"` + GitMaterialId int `json:"GitMaterialId"` + Type string `json:"Type"` + Value string `json:"Value"` + Active bool `json:"Active"` + GitCommit pipelineConfig.GitCommit `json:"GitCommit"` + GitTag string `json:"GitTag"` } type CiTriggerRequest struct { diff --git a/pkg/bulkAction/BulkUpdateService.go b/pkg/bulkAction/BulkUpdateService.go index cc3c4e1abb..d118c23982 100644 --- a/pkg/bulkAction/BulkUpdateService.go +++ b/pkg/bulkAction/BulkUpdateService.go @@ -1448,7 +1448,7 @@ func (impl BulkUpdateServiceImpl) BulkBuildTrigger(request *BulkApplicationForEn var ciMaterials []bean2.CiPipelineMaterial ciMaterials = append(ciMaterials, bean2.CiPipelineMaterial{ Id: materialId, - GitCommit: bean2.GitCommit{Commit: commitHash}, + GitCommit: pipelineConfig.GitCommit{Commit: commitHash}, }) ciTriggerRequest := bean2.CiTriggerRequest{ PipelineId: ciPipelineId, diff --git a/pkg/git/GitWebhookService.go b/pkg/git/GitWebhookService.go index a9bde07504..6d93460773 100644 --- a/pkg/git/GitWebhookService.go +++ b/pkg/git/GitWebhookService.go @@ -51,7 +51,7 @@ func (impl *GitWebhookServiceImpl) HandleGitWebhook(gitWebhookRequest gitSensor. Type: string(gitWebhookRequest.Type), Value: gitWebhookRequest.Value, Active: gitWebhookRequest.Active, - GitCommit: bean.GitCommit{ + GitCommit: pipelineConfig.GitCommit{ Commit: gitWebhookRequest.GitCommit.Commit, Author: gitWebhookRequest.GitCommit.Author, Date: gitWebhookRequest.GitCommit.Date, @@ -62,7 +62,7 @@ func (impl *GitWebhookServiceImpl) HandleGitWebhook(gitWebhookRequest gitSensor. if string(gitWebhookRequest.Type) == string(pipelineConfig.SOURCE_TYPE_WEBHOOK) { webhookData := gitWebhookRequest.GitCommit.WebhookData - ciPipelineMaterial.GitCommit.WebhookData = &bean.WebhookData{ + ciPipelineMaterial.GitCommit.WebhookData = pipelineConfig.WebhookData{ Id: webhookData.Id, EventActionType: webhookData.EventActionType, Data: webhookData.Data, diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 59c2f3893f..2a78c20327 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -176,7 +176,7 @@ type GitTriggerInfoResponse struct { type Trigger struct { PipelineId int - CommitHashes map[int]bean.GitCommit + CommitHashes map[int]pipelineConfig.GitCommit CiMaterials []*pipelineConfig.CiPipelineMaterial TriggeredBy int32 InvalidateCache bool @@ -211,37 +211,10 @@ func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, c return } impl.Logger.Debugw("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) - //commitHashes, extraEnvironmentVariables, err := impl.buildManualTriggerCommitHashes(ciTriggerRequest) - - //build commithashes from parent workflow - commitHashes := make(map[int]bean.GitCommit) - for k, v := range ciWorkFlow.GitTriggers { - gitCommit := bean.GitCommit{ - Commit: v.Commit, - Author: v.Author, - Date: v.Date, - Message: v.Message, - Changes: v.Changes, - CiConfigureSourceValue: v.CiConfigureSourceValue, - CiConfigureSourceType: v.CiConfigureSourceType, - GitRepoUrl: v.GitRepoUrl, - GitRepoName: v.GitRepoName, - } - webhookData := v.WebhookData - if webhookData.Id != 0 { - gitCommit.WebhookData = &bean.WebhookData{ - Id: webhookData.Id, - EventActionType: webhookData.EventActionType, - Data: webhookData.Data, - } - } - - commitHashes[k] = gitCommit - } trigger := Trigger{ PipelineId: ciWorkFlow.CiPipelineId, - CommitHashes: commitHashes, + CommitHashes: ciWorkFlow.GitTriggers, CiMaterials: nil, TriggeredBy: 1, InvalidateCache: true, //TODO: ciTriggerRequest.InvalidateCache, @@ -1127,8 +1100,8 @@ func (impl *CiHandlerImpl) GetCiPipeline(ciMaterialId int) (*pipelineConfig.CiPi return ciPipeline, nil } -func (impl *CiHandlerImpl) buildAutomaticTriggerCommitHashes(ciMaterials []*pipelineConfig.CiPipelineMaterial, request bean.GitCiTriggerRequest) (map[int]bean.GitCommit, error) { - commitHashes := map[int]bean.GitCommit{} +func (impl *CiHandlerImpl) buildAutomaticTriggerCommitHashes(ciMaterials []*pipelineConfig.CiPipelineMaterial, request bean.GitCiTriggerRequest) (map[int]pipelineConfig.GitCommit, error) { + commitHashes := map[int]pipelineConfig.GitCommit{} for _, ciMaterial := range ciMaterials { if ciMaterial.Id == request.CiPipelineMaterial.Id || len(ciMaterials) == 1 { request.CiPipelineMaterial.GitCommit = SetGitCommitValuesForBuildingCommitHash(ciMaterial, request.CiPipelineMaterial.GitCommit) @@ -1137,7 +1110,7 @@ func (impl *CiHandlerImpl) buildAutomaticTriggerCommitHashes(ciMaterials []*pipe // this is possible in case of non Webhook, as there would be only one pipeline material per git material in case of PR lastCommit, err := impl.getLastSeenCommit(ciMaterial.Id) if err != nil { - return map[int]bean.GitCommit{}, err + return map[int]pipelineConfig.GitCommit{}, err } lastCommit = SetGitCommitValuesForBuildingCommitHash(ciMaterial, lastCommit) commitHashes[ciMaterial.Id] = lastCommit @@ -1146,7 +1119,7 @@ func (impl *CiHandlerImpl) buildAutomaticTriggerCommitHashes(ciMaterials []*pipe return commitHashes, nil } -func SetGitCommitValuesForBuildingCommitHash(ciMaterial *pipelineConfig.CiPipelineMaterial, oldGitCommit bean.GitCommit) bean.GitCommit { +func SetGitCommitValuesForBuildingCommitHash(ciMaterial *pipelineConfig.CiPipelineMaterial, oldGitCommit pipelineConfig.GitCommit) pipelineConfig.GitCommit { newGitCommit := oldGitCommit newGitCommit.CiConfigureSourceType = ciMaterial.Type newGitCommit.CiConfigureSourceValue = ciMaterial.Value @@ -1155,15 +1128,15 @@ func SetGitCommitValuesForBuildingCommitHash(ciMaterial *pipelineConfig.CiPipeli return newGitCommit } -func (impl *CiHandlerImpl) buildManualTriggerCommitHashes(ciTriggerRequest bean.CiTriggerRequest) (map[int]bean.GitCommit, map[string]string, error) { - commitHashes := map[int]bean.GitCommit{} +func (impl *CiHandlerImpl) buildManualTriggerCommitHashes(ciTriggerRequest bean.CiTriggerRequest) (map[int]pipelineConfig.GitCommit, map[string]string, error) { + commitHashes := map[int]pipelineConfig.GitCommit{} extraEnvironmentVariables := make(map[string]string) for _, ciPipelineMaterial := range ciTriggerRequest.CiPipelineMaterial { pipeLineMaterialFromDb, err := impl.ciPipelineMaterialRepository.GetById(ciPipelineMaterial.Id) if err != nil { impl.Logger.Errorw("err in fetching pipeline material by id", "err", err) - return map[int]bean.GitCommit{}, nil, err + return map[int]pipelineConfig.GitCommit{}, nil, err } pipelineType := pipeLineMaterialFromDb.Type @@ -1171,7 +1144,7 @@ func (impl *CiHandlerImpl) buildManualTriggerCommitHashes(ciTriggerRequest bean. gitCommit, err := impl.BuildManualTriggerCommitHashesForSourceTypeBranchFix(ciPipelineMaterial, pipeLineMaterialFromDb) if err != nil { impl.Logger.Errorw("err", "err", err) - return map[int]bean.GitCommit{}, nil, err + return map[int]pipelineConfig.GitCommit{}, nil, err } commitHashes[ciPipelineMaterial.Id] = gitCommit @@ -1179,7 +1152,7 @@ func (impl *CiHandlerImpl) buildManualTriggerCommitHashes(ciTriggerRequest bean. gitCommit, extraEnvVariables, err := impl.BuildManualTriggerCommitHashesForSourceTypeWebhook(ciPipelineMaterial, pipeLineMaterialFromDb) if err != nil { impl.Logger.Errorw("err", "err", err) - return map[int]bean.GitCommit{}, nil, err + return map[int]pipelineConfig.GitCommit{}, nil, err } commitHashes[ciPipelineMaterial.Id] = gitCommit extraEnvironmentVariables = extraEnvVariables @@ -1189,7 +1162,7 @@ func (impl *CiHandlerImpl) buildManualTriggerCommitHashes(ciTriggerRequest bean. return commitHashes, extraEnvironmentVariables, nil } -func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeBranchFix(ciPipelineMaterial bean.CiPipelineMaterial, pipeLineMaterialFromDb *pipelineConfig.CiPipelineMaterial) (bean.GitCommit, error) { +func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeBranchFix(ciPipelineMaterial bean.CiPipelineMaterial, pipeLineMaterialFromDb *pipelineConfig.CiPipelineMaterial) (pipelineConfig.GitCommit, error) { commitMetadataRequest := &gitSensor.CommitMetadataRequest{ PipelineMaterialId: ciPipelineMaterial.Id, GitHash: ciPipelineMaterial.GitCommit.Commit, @@ -1198,13 +1171,13 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeBranchFix( gitCommitResponse, err := impl.gitSensorClient.GetCommitMetadataForPipelineMaterial(context.Background(), commitMetadataRequest) if err != nil { impl.Logger.Errorw("err in fetching commit metadata", "commitMetadataRequest", commitMetadataRequest, "err", err) - return bean.GitCommit{}, err + return pipelineConfig.GitCommit{}, err } if gitCommitResponse == nil { - return bean.GitCommit{}, errors.New("commit not found") + return pipelineConfig.GitCommit{}, errors.New("commit not found") } - gitCommit := bean.GitCommit{ + gitCommit := pipelineConfig.GitCommit{ Commit: gitCommitResponse.Commit, Author: gitCommitResponse.Author, Date: gitCommitResponse.Date, @@ -1219,7 +1192,7 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeBranchFix( return gitCommit, nil } -func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ciPipelineMaterial bean.CiPipelineMaterial, pipeLineMaterialFromDb *pipelineConfig.CiPipelineMaterial) (bean.GitCommit, map[string]string, error) { +func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ciPipelineMaterial bean.CiPipelineMaterial, pipeLineMaterialFromDb *pipelineConfig.CiPipelineMaterial) (pipelineConfig.GitCommit, map[string]string, error) { webhookDataInput := ciPipelineMaterial.GitCommit.WebhookData // fetch webhook data on the basis of Id @@ -1231,7 +1204,7 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ci webhookAndCiData, err := impl.gitSensorClient.GetWebhookData(context.Background(), webhookDataRequest) if err != nil { impl.Logger.Errorw("err", "err", err) - return bean.GitCommit{}, nil, err + return pipelineConfig.GitCommit{}, nil, err } webhookData := webhookAndCiData.WebhookData @@ -1242,7 +1215,7 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ci targetBranchName := webhookData.Data[bean.WEBHOOK_SELECTOR_TARGET_BRANCH_NAME_NAME] if targetBranchName == "" { impl.Logger.Error("target branch not found from webhook data") - return bean.GitCommit{}, nil, err + return pipelineConfig.GitCommit{}, nil, err } // get latest commit hash for target branch @@ -1255,7 +1228,7 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ci if err != nil { impl.Logger.Errorw("err", "err", err) - return bean.GitCommit{}, nil, err + return pipelineConfig.GitCommit{}, nil, err } // update webhookData (local) with target latest hash @@ -1264,12 +1237,12 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ci } // build git commit - gitCommit := bean.GitCommit{ + gitCommit := pipelineConfig.GitCommit{ GitRepoName: pipeLineMaterialFromDb.GitMaterial.Name[strings.Index(pipeLineMaterialFromDb.GitMaterial.Name, "-")+1:], GitRepoUrl: pipeLineMaterialFromDb.GitMaterial.Url, CiConfigureSourceValue: pipeLineMaterialFromDb.Value, CiConfigureSourceType: pipeLineMaterialFromDb.Type, - WebhookData: &bean.WebhookData{ + WebhookData: pipelineConfig.WebhookData{ Id: int(webhookData.Id), EventActionType: webhookData.EventActionType, Data: webhookData.Data, @@ -1279,7 +1252,7 @@ func (impl *CiHandlerImpl) BuildManualTriggerCommitHashesForSourceTypeWebhook(ci return gitCommit, webhookAndCiData.ExtraEnvironmentVariables, nil } -func (impl *CiHandlerImpl) getLastSeenCommit(ciMaterialId int) (bean.GitCommit, error) { +func (impl *CiHandlerImpl) getLastSeenCommit(ciMaterialId int) (pipelineConfig.GitCommit, error) { var materialIds []int materialIds = append(materialIds, ciMaterialId) headReq := &gitSensor.HeadRequest{ @@ -1287,12 +1260,12 @@ func (impl *CiHandlerImpl) getLastSeenCommit(ciMaterialId int) (bean.GitCommit, } res, err := impl.gitSensorClient.GetHeadForPipelineMaterials(context.Background(), headReq) if err != nil { - return bean.GitCommit{}, err + return pipelineConfig.GitCommit{}, err } if len(res) == 0 { - return bean.GitCommit{}, errors.New("received empty response") + return pipelineConfig.GitCommit{}, errors.New("received empty response") } - gitCommit := bean.GitCommit{ + gitCommit := pipelineConfig.GitCommit{ Commit: res[0].GitCommit.Commit, Author: res[0].GitCommit.Author, Date: res[0].GitCommit.Date, diff --git a/pkg/pipeline/CiService.go b/pkg/pipeline/CiService.go index 50a75b44b2..3eec03327d 100644 --- a/pkg/pipeline/CiService.go +++ b/pkg/pipeline/CiService.go @@ -247,31 +247,7 @@ func (impl *CiServiceImpl) WriteCITriggerEvent(trigger Trigger, pipeline *pipeli event := impl.eventFactory.Build(util2.Trigger, &pipeline.Id, pipeline.AppId, nil, util2.CI) material := &client.MaterialTriggerInfo{} - gitTriggers := make(map[int]pipelineConfig.GitCommit) - - for k, v := range trigger.CommitHashes { - gitCommit := pipelineConfig.GitCommit{ - Commit: v.Commit, - Author: v.Author, - Changes: v.Changes, - Message: v.Message, - Date: v.Date, - } - - // set webhook data in gitTriggers - _webhookData := v.WebhookData - if _webhookData != nil { - gitCommit.WebhookData = pipelineConfig.WebhookData{ - Id: _webhookData.Id, - EventActionType: _webhookData.EventActionType, - Data: _webhookData.Data, - } - } - - gitTriggers[k] = gitCommit - } - - material.GitTriggers = gitTriggers + material.GitTriggers = trigger.CommitHashes event.UserId = int(trigger.TriggeredBy) event.CiWorkflowRunnerId = workflowRequest.WorkflowId @@ -291,31 +267,7 @@ func (impl *CiServiceImpl) BuildPayload(trigger Trigger, pipeline *pipelineConfi } func (impl *CiServiceImpl) saveNewWorkflow(pipeline *pipelineConfig.CiPipeline, wfConfig *pipelineConfig.CiWorkflowConfig, - commitHashes map[int]bean.GitCommit, userId int32, EnvironmentId int, isJob bool, refCiWorkflowId int) (wf *pipelineConfig.CiWorkflow, error error) { - gitTriggers := make(map[int]pipelineConfig.GitCommit) - for k, v := range commitHashes { - gitCommit := pipelineConfig.GitCommit{ - Commit: v.Commit, - Author: v.Author, - Date: v.Date, - Message: v.Message, - Changes: v.Changes, - CiConfigureSourceValue: v.CiConfigureSourceValue, - CiConfigureSourceType: v.CiConfigureSourceType, - GitRepoUrl: v.GitRepoUrl, - GitRepoName: v.GitRepoName, - } - webhookData := v.WebhookData - if webhookData != nil { - gitCommit.WebhookData = pipelineConfig.WebhookData{ - Id: webhookData.Id, - EventActionType: webhookData.EventActionType, - Data: webhookData.Data, - } - } - - gitTriggers[k] = gitCommit - } + commitHashes map[int]pipelineConfig.GitCommit, userId int32, EnvironmentId int, isJob bool, refCiWorkflowId int) (wf *pipelineConfig.CiWorkflow, error error) { ciWorkflow := &pipelineConfig.CiWorkflow{ Name: pipeline.Name + "-" + strconv.Itoa(pipeline.Id), @@ -325,7 +277,7 @@ func (impl *CiServiceImpl) saveNewWorkflow(pipeline *pipelineConfig.CiPipeline, CiPipelineId: pipeline.Id, Namespace: impl.ciConfig.DefaultNamespace, BlobStorageEnabled: impl.ciConfig.BlobStorageEnabled, - GitTriggers: gitTriggers, + GitTriggers: commitHashes, LogLocation: "", TriggeredBy: userId, ReferenceCiWorkflowId: refCiWorkflowId, @@ -713,11 +665,11 @@ func buildCiStepsDataFromDockerBuildScripts(dockerBuildScripts []*bean.CiScript) return ciSteps } -func (impl *CiServiceImpl) buildImageTag(commitHashes map[int]bean.GitCommit, id int, wfId int) string { +func (impl *CiServiceImpl) buildImageTag(commitHashes map[int]pipelineConfig.GitCommit, id int, wfId int) string { dockerImageTag := "" for _, v := range commitHashes { _truncatedCommit := "" - if v.WebhookData == nil { + if v.WebhookData.Id == 0 { if v.Commit == "" { continue } From 449308b3e391fe7551c353df4af76264aa618b06 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 18:02:28 +0530 Subject: [PATCH 10/27] code refactoring around error logging --- .../pubsub/WorkflowStatusUpdateHandler.go | 14 ++++++++-- pkg/pipeline/CdHandler.go | 22 +++++++++------ pkg/pipeline/CiHandler.go | 28 ++++++++++++------- 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 255104bfe6..7539efd102 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -77,12 +77,19 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error { impl.logger.Errorw("error while unmarshalling wf status update", "err", err, "msg", string(msg.Data)) return } - go impl.ciHandler.HandleReTriggerCI(wfStatus) + + err = impl.ciHandler.HandleReTriggerCI(wfStatus) + if err != nil { + impl.logger.Errorw("error in HandleReTriggerCI", "err", err) + //don't return as we have to update the workflow status + } + _, err = impl.ciHandler.UpdateWorkflow(wfStatus) if err != nil { impl.logger.Errorw("error on update workflow status", "err", err, "msg", string(msg.Data)) return } + } err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback) @@ -127,7 +134,10 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { } if wfrStatus == string(v1alpha1.NodeFailed) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { - impl.cdHandler.HandleCdStageReTrigger(wfr) + err = impl.cdHandler.HandleCdStageReTrigger(wfr) + if err != nil { + impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err) + } } if wfr.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { event := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, util.CD) diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index a12d8c520b..cc2835260e 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -65,7 +65,7 @@ const ( ) type CdHandler interface { - HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) + HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) error UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) GetCdBuildHistory(appId int, environmentId int, pipelineId int, offset int, size int) ([]pipelineConfig.CdWorkflowWithArtifact, error) GetRunningWorkflowLogs(environmentId int, pipelineId int, workflowId int) (*bufio.Reader, func() error, error) @@ -171,42 +171,46 @@ const WorklowTypeDeploy = "DEPLOY" const WorklowTypePre = "PRE" const WorklowTypePost = "POST" -func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) { +func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) error { var err error // do not re-trigger if retries = 0 or last workflow is aborted if runner == nil || impl.cdConfig.MaxCdWorkflowRunnerRetries == 0 || runner.Status == WorkflowCancel { - return + return errors.New("cdStage workflow retry condition not met,not re-triggering") } if runner.RefCdWorkflowRunnerId != 0 { runner, err = impl.cdWorkflowRepository.FindWorkflowRunnerById(runner.RefCdWorkflowRunnerId) if err != nil { impl.Logger.Errorw("error in FindWorkflowRunnerById by id ", "err", err, "wfrId", runner.RefCdWorkflowRunnerId) - return + return err } } retryCnt, err := impl.cdWorkflowRepository.FindRetriedWorkflowCountByReferenceId(runner.Id) if err != nil { impl.Logger.Errorw("error in FindRetriedWorkflowCountByReferenceId ", "err", err, "cdWorkflowRunnerId", runner.Id) - return + return err } if retryCnt >= impl.cdConfig.MaxCdWorkflowRunnerRetries { impl.Logger.Debugw("maximum retries for this workflow are exhausted, not re-triggering again", "retries", retryCnt, "wfrId", runner.Id) - return + return errors.New("maximum retries for this workflow are exhausted") } if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { err = impl.workflowDagExecutor.TriggerPreStage(context.Background(), nil, runner.CdWorkflow.CiArtifact, runner.CdWorkflow.Pipeline, 1, false, runner.Id) if err != nil { - return + impl.Logger.Errorw("error in TriggerPreStage ", "err", err, "cdWorkflowRunnerId", runner.Id) + + return err } } else if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_POST { err = impl.workflowDagExecutor.TriggerPostStage(runner.CdWorkflow, runner.CdWorkflow.Pipeline, 1, runner.Id) if err != nil { - return + impl.Logger.Errorw("error in TriggerPostStage ", "err", err, "cdWorkflowRunnerId", runner.Id) + + return err } } - return + return nil } func (impl *CdHandlerImpl) CheckArgoAppStatusPeriodicallyAndUpdateInDb(getPipelineDeployedBeforeMinutes int, getPipelineDeployedWithinHours int) error { diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 2a78c20327..987ada40c9 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -58,7 +58,7 @@ import ( type CiHandler interface { HandleCIWebhook(gitCiTriggerRequest bean.GitCiTriggerRequest) (int, error) HandleCIManual(ciTriggerRequest bean.CiTriggerRequest) (int, error) - HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) + HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error FetchMaterialsByPipelineId(pipelineId int, showAll bool) ([]pipelineConfig.CiPipelineMaterialResponse, error) FetchMaterialsByPipelineIdAndGitMaterialId(pipelineId int, gitMaterialId int, showAll bool) ([]pipelineConfig.CiPipelineMaterialResponse, error) @@ -191,24 +191,27 @@ const Running = "Running" const Starting = "Starting" const POD_DELETED_MESSAGE = "pod deleted" -func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) { +func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error { if impl.ciConfig.MaxCiWorkflowRetries == 0 { - return + return errors.New("ci-workflow retires not allowed ") } status, message, retryCount, ciWorkFlow, err := impl.extractPodStatusAndWorkflow(workflowStatus) if err != nil { impl.Logger.Errorw("error in extractPodStatusAndWorkflow", "err", err) - return + return err } - impl.reTriggerCi(status, message, retryCount, ciWorkFlow) - + err = impl.reTriggerCi(status, message, retryCount, ciWorkFlow) + if err != nil { + impl.Logger.Errorw("error in reTriggerCi", "err", err, "status", status, "message", message, "retryCount", retryCount, "ciWorkFlowId", ciWorkFlow.Id) + } + return err } -func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, ciWorkFlow *pipelineConfig.CiWorkflow) { +func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, ciWorkFlow *pipelineConfig.CiWorkflow) error { if !(status == string(v1alpha1.NodeFailed) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.ciConfig.MaxCiWorkflowRetries) { - return + return errors.New("ci-workflow retrigger condition not met, not re-triggering") } impl.Logger.Debugw("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) @@ -226,8 +229,9 @@ func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, c if err != nil { impl.Logger.Errorw("error occured in retriggering ciWorkflow", "triggerDetails", trigger, "err", err) - return + return err } + return nil } func (impl *CiHandlerImpl) HandleCIManual(ciTriggerRequest bean.CiTriggerRequest) (int, error) { @@ -1589,7 +1593,11 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id) continue } - impl.reTriggerCi(ciWorkflow.Status, ciWorkflow.Message, retryCount, refCiWorkflow) + err = impl.reTriggerCi(ciWorkflow.Status, ciWorkflow.Message, retryCount, refCiWorkflow) + if err != nil { + impl.Logger.Errorw("error in reTriggerCi", "ciWorkflowId", refCiWorkflow.Id, "workflowStatus", ciWorkflow.Status, "ciWorkflowMessage", "ciWorkflow.Message", "retryCount", retryCount) + continue + } } } else { ciWorkflow.Status = "Failed" From 6750c9155ad62f0ce9981ca880147885f7dd3eb8 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 18:15:18 +0530 Subject: [PATCH 11/27] review comments incorporation --- pkg/pipeline/CdHandler.go | 4 +--- pkg/pipeline/CiHandler.go | 19 ++++++++----------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index cc2835260e..4a4f1ed03c 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -191,7 +191,7 @@ func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkf } if retryCnt >= impl.cdConfig.MaxCdWorkflowRunnerRetries { - impl.Logger.Debugw("maximum retries for this workflow are exhausted, not re-triggering again", "retries", retryCnt, "wfrId", runner.Id) + impl.Logger.Infow("maximum retries for this workflow are exhausted, not re-triggering again", "retries", retryCnt, "wfrId", runner.Id) return errors.New("maximum retries for this workflow are exhausted") } @@ -199,14 +199,12 @@ func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkf err = impl.workflowDagExecutor.TriggerPreStage(context.Background(), nil, runner.CdWorkflow.CiArtifact, runner.CdWorkflow.Pipeline, 1, false, runner.Id) if err != nil { impl.Logger.Errorw("error in TriggerPreStage ", "err", err, "cdWorkflowRunnerId", runner.Id) - return err } } else if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_POST { err = impl.workflowDagExecutor.TriggerPostStage(runner.CdWorkflow, runner.CdWorkflow.Pipeline, 1, runner.Id) if err != nil { impl.Logger.Errorw("error in TriggerPostStage ", "err", err, "cdWorkflowRunnerId", runner.Id) - return err } } diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 987ada40c9..9d8d084167 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -31,6 +31,7 @@ import ( appGroup2 "github.com/devtron-labs/devtron/pkg/appGroup" "github.com/devtron-labs/devtron/pkg/cluster" repository3 "github.com/devtron-labs/devtron/pkg/cluster/repository" + k8s2 "github.com/devtron-labs/devtron/pkg/k8s" "github.com/devtron-labs/devtron/util/k8s" "github.com/devtron-labs/devtron/util/rbac" "io/ioutil" @@ -105,9 +106,10 @@ type CiHandlerImpl struct { appGroupService appGroup2.AppGroupService envRepository repository3.EnvironmentRepository imageTaggingService ImageTaggingService + k8sCommonService k8s2.K8sCommonService } -func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository, workflowService WorkflowService, ciLogService CiLogService, ciConfig *CiConfig, ciArtifactRepository repository.CiArtifactRepository, userService user.UserService, eventClient client.EventClient, eventFactory client.EventFactory, ciPipelineRepository pipelineConfig.CiPipelineRepository, appListingRepository repository.AppListingRepository, K8sUtil *k8s.K8sUtil, cdPipelineRepository pipelineConfig.PipelineRepository, enforcerUtil rbac.EnforcerUtil, appGroupService appGroup2.AppGroupService, envRepository repository3.EnvironmentRepository, imageTaggingService ImageTaggingService) *CiHandlerImpl { +func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository, workflowService WorkflowService, ciLogService CiLogService, ciConfig *CiConfig, ciArtifactRepository repository.CiArtifactRepository, userService user.UserService, eventClient client.EventClient, eventFactory client.EventFactory, ciPipelineRepository pipelineConfig.CiPipelineRepository, appListingRepository repository.AppListingRepository, K8sUtil *k8s.K8sUtil, cdPipelineRepository pipelineConfig.PipelineRepository, enforcerUtil rbac.EnforcerUtil, appGroupService appGroup2.AppGroupService, envRepository repository3.EnvironmentRepository, imageTaggingService ImageTaggingService, k8sCommonService k8s2.K8sCommonService) *CiHandlerImpl { return &CiHandlerImpl{ Logger: Logger, ciService: ciService, @@ -129,6 +131,7 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline appGroupService: appGroupService, envRepository: envRepository, imageTaggingService: imageTaggingService, + k8sCommonService: k8sCommonService, } } @@ -1551,15 +1554,9 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil if !isEligibleToMarkFailed { ns := DefaultCiWorkflowNamespace if isExt { - clusterConfig, err := cluster.GetClusterBean(*env.Cluster).GetClusterConfig() + _, client, err = impl.k8sCommonService.GetCoreClientByClusterId(env.ClusterId) if err != nil { - impl.Logger.Warnw("error in getting cluster Config", "err", err, "clusterId", env.Cluster.Id) - continue - } - - client, err = impl.K8sUtil.GetCoreV1Client(clusterConfig) - if err != nil { - impl.Logger.Warnw("error in getting core v1 client using clusterConfig", "err", err, "clusterId", env.Cluster.Id) + impl.Logger.Warnw("error in getting core v1 client using GetCoreClientByClusterId", "err", err, "clusterId", env.Cluster.Id) continue } ns = env.Namespace @@ -1590,12 +1587,12 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil if ciWorkflow.Status != WorkflowCancel { retryCount, refCiWorkflow, err := impl.getRefWorkflowAndCiRetryCount(ciWorkflow) if err != nil { - impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id) + impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id, "err", err) continue } err = impl.reTriggerCi(ciWorkflow.Status, ciWorkflow.Message, retryCount, refCiWorkflow) if err != nil { - impl.Logger.Errorw("error in reTriggerCi", "ciWorkflowId", refCiWorkflow.Id, "workflowStatus", ciWorkflow.Status, "ciWorkflowMessage", "ciWorkflow.Message", "retryCount", retryCount) + impl.Logger.Errorw("error in reTriggerCi", "ciWorkflowId", refCiWorkflow.Id, "workflowStatus", ciWorkflow.Status, "ciWorkflowMessage", "ciWorkflow.Message", "retryCount", retryCount, "err", err) continue } } From e63adeb8bef73875c80bf594f182c18af205b029 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 18:16:48 +0530 Subject: [PATCH 12/27] wire --- wire_gen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wire_gen.go b/wire_gen.go index b155447916..6757fe8a25 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -476,7 +476,7 @@ func InitializeApp() (*App, error) { if err != nil { return nil, err } - ciHandlerImpl := pipeline.NewCiHandlerImpl(sugaredLogger, ciServiceImpl, ciPipelineMaterialRepositoryImpl, clientImpl, ciWorkflowRepositoryImpl, workflowServiceImpl, ciLogServiceImpl, ciConfig, ciArtifactRepositoryImpl, userServiceImpl, eventRESTClientImpl, eventSimpleFactoryImpl, ciPipelineRepositoryImpl, appListingRepositoryImpl, k8sUtil, pipelineRepositoryImpl, enforcerUtilImpl, appGroupServiceImpl, environmentRepositoryImpl, imageTaggingServiceImpl) + ciHandlerImpl := pipeline.NewCiHandlerImpl(sugaredLogger, ciServiceImpl, ciPipelineMaterialRepositoryImpl, clientImpl, ciWorkflowRepositoryImpl, workflowServiceImpl, ciLogServiceImpl, ciConfig, ciArtifactRepositoryImpl, userServiceImpl, eventRESTClientImpl, eventSimpleFactoryImpl, ciPipelineRepositoryImpl, appListingRepositoryImpl, k8sUtil, pipelineRepositoryImpl, enforcerUtilImpl, appGroupServiceImpl, environmentRepositoryImpl, imageTaggingServiceImpl, k8sCommonServiceImpl) gitRegistryConfigImpl := pipeline.NewGitRegistryConfigImpl(sugaredLogger, gitProviderRepositoryImpl, clientImpl) dockerRegistryConfigImpl := pipeline.NewDockerRegistryConfigImpl(sugaredLogger, helmAppServiceImpl, dockerArtifactStoreRepositoryImpl, dockerRegistryIpsConfigRepositoryImpl, ociRegistryConfigRepositoryImpl) appListingViewBuilderImpl := app2.NewAppListingViewBuilderImpl(sugaredLogger) From 71d6b0d6dd2254928e216d2819e5dc748d6a015a Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 19:03:00 +0530 Subject: [PATCH 13/27] script number update --- .../{169_workflow_retry.down.sql => 171_workflow_retry.down.sql} | 0 .../sql/{169_workflow_retry.up.sql => 171_workflow_retry.up.sql} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename scripts/sql/{169_workflow_retry.down.sql => 171_workflow_retry.down.sql} (100%) rename scripts/sql/{169_workflow_retry.up.sql => 171_workflow_retry.up.sql} (100%) diff --git a/scripts/sql/169_workflow_retry.down.sql b/scripts/sql/171_workflow_retry.down.sql similarity index 100% rename from scripts/sql/169_workflow_retry.down.sql rename to scripts/sql/171_workflow_retry.down.sql diff --git a/scripts/sql/169_workflow_retry.up.sql b/scripts/sql/171_workflow_retry.up.sql similarity index 100% rename from scripts/sql/169_workflow_retry.up.sql rename to scripts/sql/171_workflow_retry.up.sql From eb8137b8970ba2904a077f2530d2bde40eb6ae18 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 19:08:40 +0530 Subject: [PATCH 14/27] fix --- internal/sql/repository/pipelineConfig/CiWorkflowRepository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index be13d712c8..77c37f8960 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -98,7 +98,7 @@ type WorkflowWithArtifact struct { IsArtifactUploaded bool `json:"is_artifact_uploaded"` EnvironmentId int `json:"environmentId"` EnvironmentName string `json:"environmentName"` - ReferenceCiWorkflowId int `json:"reference_ci_workflow_id"` + ReferenceCiWorkflowId int `json:"ref_ci_workflow_id"` } type GitCommit struct { From 19c02392c6a3f40212f8fd5e6b34b11fd6952b28 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 18 Sep 2023 19:39:51 +0530 Subject: [PATCH 15/27] json key change --- .../pipelineConfig/CiWorkflowRepository.go | 46 +++++++++---------- pkg/pipeline/CiHandler.go | 2 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index 77c37f8960..feeb9ef94b 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -76,29 +76,29 @@ type CiWorkflow struct { } type WorkflowWithArtifact struct { - Id int `json:"id"` - Name string `json:"name"` - PodName string `json:"podName"` - Status string `json:"status"` - PodStatus string `json:"pod_status"` - Message string `json:"message"` - StartedOn time.Time `json:"started_on"` - FinishedOn time.Time `json:"finished_on"` - CiPipelineId int `json:"ci_pipeline_id"` - Namespace string `json:"namespace"` - LogFilePath string `json:"log_file_path"` - GitTriggers map[int]GitCommit `json:"git_triggers"` - TriggeredBy int32 `json:"triggered_by"` - EmailId string `json:"email_id"` - Image string `json:"image"` - CiArtifactLocation string `json:"ci_artifact_location"` - CiArtifactId int `json:"ci_artifact_d"` - BlobStorageEnabled bool `json:"blobStorageEnabled"` - CiBuildType string `json:"ci_build_type"` - IsArtifactUploaded bool `json:"is_artifact_uploaded"` - EnvironmentId int `json:"environmentId"` - EnvironmentName string `json:"environmentName"` - ReferenceCiWorkflowId int `json:"ref_ci_workflow_id"` + Id int `json:"id"` + Name string `json:"name"` + PodName string `json:"podName"` + Status string `json:"status"` + PodStatus string `json:"pod_status"` + Message string `json:"message"` + StartedOn time.Time `json:"started_on"` + FinishedOn time.Time `json:"finished_on"` + CiPipelineId int `json:"ci_pipeline_id"` + Namespace string `json:"namespace"` + LogFilePath string `json:"log_file_path"` + GitTriggers map[int]GitCommit `json:"git_triggers"` + TriggeredBy int32 `json:"triggered_by"` + EmailId string `json:"email_id"` + Image string `json:"image"` + CiArtifactLocation string `json:"ci_artifact_location"` + CiArtifactId int `json:"ci_artifact_d"` + BlobStorageEnabled bool `json:"blobStorageEnabled"` + CiBuildType string `json:"ci_build_type"` + IsArtifactUploaded bool `json:"is_artifact_uploaded"` + EnvironmentId int `json:"environmentId"` + EnvironmentName string `json:"environmentName"` + RefCiWorkflowId int `json:"referenceCiWorkflowId"` } type GitCommit struct { diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 9d8d084167..692c5b761a 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -550,7 +550,7 @@ func (impl *CiHandlerImpl) GetBuildHistory(pipelineId int, appId int, offset int IsArtifactUploaded: w.IsArtifactUploaded, EnvironmentId: w.EnvironmentId, EnvironmentName: w.EnvironmentName, - ReferenceWorkflowId: w.ReferenceCiWorkflowId, + ReferenceWorkflowId: w.RefCiWorkflowId, } if imageTagsDataMap[w.CiArtifactId] != nil { wfResponse.ImageReleaseTags = imageTagsDataMap[w.CiArtifactId] //if artifact is not yet created,empty list will be sent From a062619a8b1a3da83be310e92ebd99909a2ab37f Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 19 Sep 2023 13:32:12 +0530 Subject: [PATCH 16/27] sending reference cd workflow runner id in deployment history api --- .../pipelineConfig/CdWorfkflowRepository.go | 49 ++++++++++--------- pkg/pipeline/CdHandler.go | 2 +- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go index ffac320e2d..1ced35f370 100644 --- a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go @@ -185,30 +185,31 @@ type CiPipelineMaterialResponse struct { } type CdWorkflowWithArtifact struct { - Id int `json:"id"` - CdWorkflowId int `json:"cd_workflow_id"` - Name string `json:"name"` - Status string `json:"status"` - PodStatus string `json:"pod_status"` - Message string `json:"message"` - StartedOn time.Time `json:"started_on"` - FinishedOn time.Time `json:"finished_on"` - PipelineId int `json:"pipeline_id"` - Namespace string `json:"namespace"` - LogFilePath string `json:"log_file_path"` - TriggeredBy int32 `json:"triggered_by"` - EmailId string `json:"email_id"` - Image string `json:"image"` - MaterialInfo string `json:"material_info,omitempty"` - DataSource string `json:"data_source,omitempty"` - CiArtifactId int `json:"ci_artifact_id,omitempty"` - WorkflowType string `json:"workflow_type,omitempty"` - ExecutorType string `json:"executor_type,omitempty"` - BlobStorageEnabled bool `json:"blobStorageEnabled"` - GitTriggers map[int]GitCommit `json:"gitTriggers"` - CiMaterials []CiPipelineMaterialResponse `json:"ciMaterials"` - ImageReleaseTags []*repository2.ImageTag `json:"imageReleaseTags"` - ImageComment *repository2.ImageComment `json:"imageComment"` + Id int `json:"id"` + CdWorkflowId int `json:"cd_workflow_id"` + Name string `json:"name"` + Status string `json:"status"` + PodStatus string `json:"pod_status"` + Message string `json:"message"` + StartedOn time.Time `json:"started_on"` + FinishedOn time.Time `json:"finished_on"` + PipelineId int `json:"pipeline_id"` + Namespace string `json:"namespace"` + LogFilePath string `json:"log_file_path"` + TriggeredBy int32 `json:"triggered_by"` + EmailId string `json:"email_id"` + Image string `json:"image"` + MaterialInfo string `json:"material_info,omitempty"` + DataSource string `json:"data_source,omitempty"` + CiArtifactId int `json:"ci_artifact_id,omitempty"` + WorkflowType string `json:"workflow_type,omitempty"` + ExecutorType string `json:"executor_type,omitempty"` + BlobStorageEnabled bool `json:"blobStorageEnabled"` + GitTriggers map[int]GitCommit `json:"gitTriggers"` + CiMaterials []CiPipelineMaterialResponse `json:"ciMaterials"` + ImageReleaseTags []*repository2.ImageTag `json:"imageReleaseTags"` + ImageComment *repository2.ImageComment `json:"imageComment"` + RefCdWorkflowRunnerId int `json:"referenceCdWorkflowRunnerId"` } type TriggerWorkflowStatus struct { diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 4a4f1ed03c..5920e5b632 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -1143,7 +1143,7 @@ func (impl *CdHandlerImpl) converterWFR(wfr pipelineConfig.CdWorkflowRunner) pip workflow.PipelineId = wfr.CdWorkflow.PipelineId workflow.CiArtifactId = wfr.CdWorkflow.CiArtifactId workflow.BlobStorageEnabled = wfr.BlobStorageEnabled - + workflow.RefCdWorkflowRunnerId = wfr.RefCdWorkflowRunnerId } return workflow } From d23278ec6b7603af64f572e0ab518b4c343c7e28 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 26 Sep 2023 15:50:44 +0530 Subject: [PATCH 17/27] phase fix --- api/router/pubsub/WorkflowStatusUpdateHandler.go | 2 +- pkg/pipeline/CiHandler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 7539efd102..38459bf03a 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -133,7 +133,7 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { eventType = util.Fail } - if wfrStatus == string(v1alpha1.NodeFailed) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { + if wfrStatus == string(v1alpha1.NodeError) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { err = impl.cdHandler.HandleCdStageReTrigger(wfr) if err != nil { impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err) diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 692c5b761a..7428fbf666 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -213,7 +213,7 @@ func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowSta } func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, ciWorkFlow *pipelineConfig.CiWorkflow) error { - if !(status == string(v1alpha1.NodeFailed) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.ciConfig.MaxCiWorkflowRetries) { + if !(status == string(v1alpha1.NodeError) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.ciConfig.MaxCiWorkflowRetries) { return errors.New("ci-workflow retrigger condition not met, not re-triggering") } impl.Logger.Debugw("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) From 6956b0e42166dd4030f5bdf559c8f3ca455c04b6 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 3 Oct 2023 11:48:42 +0530 Subject: [PATCH 18/27] script number update --- pkg/pipeline/CdConfig.go | 1 - pkg/pipeline/CiConfig.go | 0 .../{171_workflow_retry.down.sql => 177_workflow_retry.down.sql} | 0 .../sql/{171_workflow_retry.up.sql => 177_workflow_retry.up.sql} | 0 4 files changed, 1 deletion(-) delete mode 100644 pkg/pipeline/CdConfig.go delete mode 100644 pkg/pipeline/CiConfig.go rename scripts/sql/{171_workflow_retry.down.sql => 177_workflow_retry.down.sql} (100%) rename scripts/sql/{171_workflow_retry.up.sql => 177_workflow_retry.up.sql} (100%) diff --git a/pkg/pipeline/CdConfig.go b/pkg/pipeline/CdConfig.go deleted file mode 100644 index 8b13789179..0000000000 --- a/pkg/pipeline/CdConfig.go +++ /dev/null @@ -1 +0,0 @@ - diff --git a/pkg/pipeline/CiConfig.go b/pkg/pipeline/CiConfig.go deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/scripts/sql/171_workflow_retry.down.sql b/scripts/sql/177_workflow_retry.down.sql similarity index 100% rename from scripts/sql/171_workflow_retry.down.sql rename to scripts/sql/177_workflow_retry.down.sql diff --git a/scripts/sql/171_workflow_retry.up.sql b/scripts/sql/177_workflow_retry.up.sql similarity index 100% rename from scripts/sql/171_workflow_retry.up.sql rename to scripts/sql/177_workflow_retry.up.sql From 948a416784ed134f5f53e378eefbbf6114c10d26 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Wed, 4 Oct 2023 12:17:03 +0530 Subject: [PATCH 19/27] fix --- .../repository/pipelineConfig/CiPipelineMaterial.go | 13 +++++++++++++ pkg/pipeline/CiHandler.go | 10 +++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go b/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go index 65ade20d6a..9576d9fd4a 100644 --- a/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go +++ b/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go @@ -50,6 +50,7 @@ type CiPipelineMaterialRepository interface { UpdateNotNull(tx *pg.Tx, material ...*CiPipelineMaterial) error FindByCiPipelineIdsIn(ids []int) ([]*CiPipelineMaterial, error) GetById(id int) (*CiPipelineMaterial, error) + GetByIdsIncludeDeleted(ids []int) ([]*CiPipelineMaterial, error) GetByPipelineId(id int) ([]*CiPipelineMaterial, error) GetRegexByPipelineId(id int) ([]*CiPipelineMaterial, error) CheckRegexExistsForMaterial(id int) bool @@ -80,6 +81,18 @@ func (impl CiPipelineMaterialRepositoryImpl) GetById(id int) (*CiPipelineMateria return ciPipelineMaterial, err } +func (impl CiPipelineMaterialRepositoryImpl) GetByIdsIncludeDeleted(ids []int) ([]*CiPipelineMaterial, error) { + var ciPipelineMaterials []*CiPipelineMaterial + if len(ids) == 0 { + return ciPipelineMaterials, nil + } + err := impl.dbConnection.Model(&ciPipelineMaterials). + Column("ci_pipeline_material.*", "CiPipeline", "GitMaterial"). + Where("ci_pipeline_material.id in (?)", pg.In(ids)). + Select() + return ciPipelineMaterials, err +} + func (impl CiPipelineMaterialRepositoryImpl) GetByPipelineId(id int) ([]*CiPipelineMaterial, error) { var ciPipelineMaterials []*CiPipelineMaterial err := impl.dbConnection.Model(&ciPipelineMaterials). diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 412ab053b4..9437f3a683 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -227,18 +227,22 @@ func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, c return errors.New("ci-workflow retrigger condition not met, not re-triggering") } impl.Logger.Debugw("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) - + ciPipelineMaterialIds := make([]int, 0, len(ciWorkFlow.GitTriggers)) + for id, _ := range ciWorkFlow.GitTriggers { + ciPipelineMaterialIds = append(ciPipelineMaterialIds, id) + } + ciMaterials, err := impl.ciPipelineMaterialRepository.GetByIdsIncludeDeleted(ciPipelineMaterialIds) trigger := Trigger{ PipelineId: ciWorkFlow.CiPipelineId, CommitHashes: ciWorkFlow.GitTriggers, - CiMaterials: nil, + CiMaterials: ciMaterials, TriggeredBy: 1, InvalidateCache: true, //TODO: ciTriggerRequest.InvalidateCache, //TODO: ExtraEnvironmentVariables: extraEnvironmentVariables, EnvironmentId: ciWorkFlow.EnvironmentId, ReferenceCiWorkflowId: ciWorkFlow.Id, } - _, err := impl.ciService.TriggerCiPipeline(trigger) + _, err = impl.ciService.TriggerCiPipeline(trigger) if err != nil { impl.Logger.Errorw("error occured in retriggering ciWorkflow", "triggerDetails", trigger, "err", err) From efa74dc9bcf67a10309a1e8e5c8ce8d52b33bab1 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Wed, 4 Oct 2023 19:00:50 +0530 Subject: [PATCH 20/27] added missing reference workflow_runner_id --- pkg/pipeline/CdHandler.go | 1 + pkg/pipeline/WorkflowDagExecutor.go | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 68be575d9f..ee328c045d 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -176,6 +176,7 @@ const WorklowTypePre = "PRE" const WorklowTypePost = "POST" func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) error { + impl.Logger.Infow("HandleCdStageReTrigger", "runnerId", runner.Id) var err error // do not re-trigger if retries = 0 or last workflow is aborted if runner == nil || impl.config.MaxCdWorkflowRunnerRetries == 0 || runner.Status == WorkflowCancel { diff --git a/pkg/pipeline/WorkflowDagExecutor.go b/pkg/pipeline/WorkflowDagExecutor.go index 3a9192e1a5..316d4b2337 100644 --- a/pkg/pipeline/WorkflowDagExecutor.go +++ b/pkg/pipeline/WorkflowDagExecutor.go @@ -519,17 +519,18 @@ func (impl *WorkflowDagExecutorImpl) TriggerPreStage(ctx context.Context, cdWf * } cdWorkflowExecutorType := impl.config.GetWorkflowExecutorType() runner := &pipelineConfig.CdWorkflowRunner{ - Name: pipeline.Name, - WorkflowType: bean.CD_WORKFLOW_TYPE_PRE, - ExecutorType: cdWorkflowExecutorType, - Status: pipelineConfig.WorkflowStarting, //starting - TriggeredBy: triggeredBy, - StartedOn: triggeredAt, - Namespace: impl.config.GetDefaultNamespace(), - BlobStorageEnabled: impl.config.BlobStorageEnabled, - CdWorkflowId: cdWf.Id, - LogLocation: fmt.Sprintf("%s/%s%s-%s/main.log", impl.config.GetDefaultBuildLogsKeyPrefix(), strconv.Itoa(cdWf.Id), string(bean.CD_WORKFLOW_TYPE_PRE), pipeline.Name), - AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: 1, UpdatedOn: triggeredAt, UpdatedBy: 1}, + Name: pipeline.Name, + WorkflowType: bean.CD_WORKFLOW_TYPE_PRE, + ExecutorType: cdWorkflowExecutorType, + Status: pipelineConfig.WorkflowStarting, //starting + TriggeredBy: triggeredBy, + StartedOn: triggeredAt, + Namespace: impl.config.GetDefaultNamespace(), + BlobStorageEnabled: impl.config.BlobStorageEnabled, + CdWorkflowId: cdWf.Id, + LogLocation: fmt.Sprintf("%s/%s%s-%s/main.log", impl.config.GetDefaultBuildLogsKeyPrefix(), strconv.Itoa(cdWf.Id), string(bean.CD_WORKFLOW_TYPE_PRE), pipeline.Name), + AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: 1, UpdatedOn: triggeredAt, UpdatedBy: 1}, + RefCdWorkflowRunnerId: refCdWorkflowRunnerId, } var env *repository2.Environment if pipeline.RunPreStageInEnv { From 2a09fa969a115c041a2a667613b949a929a8ef3e Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Thu, 5 Oct 2023 11:32:14 +0530 Subject: [PATCH 21/27] query fix --- internal/sql/repository/pipelineConfig/CiPipelineMaterial.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go b/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go index 9576d9fd4a..2770bbe21e 100644 --- a/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go +++ b/internal/sql/repository/pipelineConfig/CiPipelineMaterial.go @@ -87,7 +87,7 @@ func (impl CiPipelineMaterialRepositoryImpl) GetByIdsIncludeDeleted(ids []int) ( return ciPipelineMaterials, nil } err := impl.dbConnection.Model(&ciPipelineMaterials). - Column("ci_pipeline_material.*", "CiPipeline", "GitMaterial"). + Column("ci_pipeline_material.*", "CiPipeline", "CiPipeline.CiTemplate", "CiPipeline.CiTemplate.GitMaterial", "CiPipeline.App", "CiPipeline.CiTemplate.DockerRegistry", "CiPipeline.CiTemplate.CiBuildConfig", "GitMaterial", "GitMaterial.GitProvider"). Where("ci_pipeline_material.id in (?)", pg.In(ids)). Select() return ciPipelineMaterials, err From 10e67f292cbb117bb49a517b6d4ddd23a5997801 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Thu, 5 Oct 2023 13:32:39 +0530 Subject: [PATCH 22/27] debug logs added --- pkg/pipeline/CiHandler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 9437f3a683..909a0d4672 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -226,7 +226,7 @@ func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, c if !(status == string(v1alpha1.NodeError) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.config.MaxCiWorkflowRetries) { return errors.New("ci-workflow retrigger condition not met, not re-triggering") } - impl.Logger.Debugw("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) + impl.Logger.Infow("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) ciPipelineMaterialIds := make([]int, 0, len(ciWorkFlow.GitTriggers)) for id, _ := range ciWorkFlow.GitTriggers { ciPipelineMaterialIds = append(ciPipelineMaterialIds, id) @@ -1619,6 +1619,7 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id, "err", err) continue } + impl.Logger.Infow("re-triggering ci by UpdateCiWorkflowStatusFailedCron", "refCiWorkflowId", refCiWorkflow.Id, "ciWorkflow.Status", ciWorkflow.Status, "ciWorkflow.Message", ciWorkflow.Message, "retryCount", retryCount) err = impl.reTriggerCi(ciWorkflow.Status, ciWorkflow.Message, retryCount, refCiWorkflow) if err != nil { impl.Logger.Errorw("error in reTriggerCi", "ciWorkflowId", refCiWorkflow.Id, "workflowStatus", ciWorkflow.Status, "ciWorkflowMessage", "ciWorkflow.Message", "retryCount", retryCount, "err", err) From 2545a4fb0f0f74a1d6853b0480948f7ca6ca14d1 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Fri, 6 Oct 2023 13:29:55 +0530 Subject: [PATCH 23/27] fix --- api/router/pubsub/WorkflowStatusUpdateHandler.go | 2 +- pkg/pipeline/CiHandler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 38459bf03a..5f6b2c58a7 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -133,7 +133,7 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { eventType = util.Fail } - if wfrStatus == string(v1alpha1.NodeError) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { + if (wfrStatus == string(v1alpha1.NodeError) || wfrStatus == string(v1alpha1.NodeFailed)) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { err = impl.cdHandler.HandleCdStageReTrigger(wfr) if err != nil { impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err) diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 909a0d4672..7bafdbf20d 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -223,7 +223,7 @@ func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowSta } func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, ciWorkFlow *pipelineConfig.CiWorkflow) error { - if !(status == string(v1alpha1.NodeError) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.config.MaxCiWorkflowRetries) { + if !((status == string(v1alpha1.NodeError) || status == string(v1alpha1.NodeFailed)) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.config.MaxCiWorkflowRetries) { return errors.New("ci-workflow retrigger condition not met, not re-triggering") } impl.Logger.Infow("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) From 0be42462c2a4bda7b1cccc7b59f145f55cf823b6 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Fri, 6 Oct 2023 16:27:11 +0530 Subject: [PATCH 24/27] log added --- api/router/pubsub/WorkflowStatusUpdateHandler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 5f6b2c58a7..f37b28f59a 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -113,7 +113,7 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { impl.logger.Debugw("received cd wf update request body", "body", wfStatus) wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus) - impl.logger.Debug(wfrId) + impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus) if err != nil { impl.logger.Error("err", err) return @@ -124,8 +124,7 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { impl.logger.Errorw("could not get wf runner", "err", err) return } - if wfrStatus == string(v1alpha1.NodeSucceeded) || - wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { + if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { eventType := util.EventType(0) if wfrStatus == string(v1alpha1.NodeSucceeded) { eventType = util.Success From 8f3ac0929ea78c4c7f188d5a029229636ec499b0 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Fri, 6 Oct 2023 19:13:23 +0530 Subject: [PATCH 25/27] refactor --- api/router/pubsub/WorkflowStatusUpdateHandler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index f37b28f59a..3fc01386b8 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -132,12 +132,13 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { eventType = util.Fail } - if (wfrStatus == string(v1alpha1.NodeError) || wfrStatus == string(v1alpha1.NodeFailed)) && wfStatus.Message == pipeline.POD_DELETED_MESSAGE { + if (wfrStatus == string(v1alpha1.NodeError) || wfrStatus == string(v1alpha1.NodeFailed)) && (wfStatus.Message == pipeline.POD_DELETED_MESSAGE) { err = impl.cdHandler.HandleCdStageReTrigger(wfr) if err != nil { impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err) } } + if wfr.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { event := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, util.CD) impl.logger.Debugw("event pre stage", "event", event) From 27758f288d15ae217b5e452a3728c01f993ed7eb Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Mon, 9 Oct 2023 14:05:44 +0530 Subject: [PATCH 26/27] select iid instead of * --- internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go index 1ced35f370..3447ed2b63 100644 --- a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go @@ -492,7 +492,7 @@ func (impl *CdWorkflowRepositoryImpl) FindWorkflowRunnerById(wfrId int) (*CdWork func (impl *CdWorkflowRepositoryImpl) FindRetriedWorkflowCountByReferenceId(wfrId int) (int, error) { retryCount := 0 - query := fmt.Sprintf("select count(*) "+ + query := fmt.Sprintf("select count(id) "+ "from cd_workflow_runner where ref_cd_workflow_runner_id = %v", wfrId) _, err := impl.dbConnection.Query(&retryCount, query) From 13a0c69b08e403f672bf1a4f08ed9de228249685 Mon Sep 17 00:00:00 2001 From: Gireesh Naidu Date: Tue, 10 Oct 2023 15:28:07 +0530 Subject: [PATCH 27/27] refactoring code review changes --- .../pubsub/WorkflowStatusUpdateHandler.go | 7 +- pkg/pipeline/CdHandler.go | 19 ++- pkg/pipeline/CiCdConfig.go | 11 ++ pkg/pipeline/CiHandler.go | 129 +++++++++++------- pkg/pipeline/WorkflowUtils.go | 6 + 5 files changed, 112 insertions(+), 60 deletions(-) diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 3fc01386b8..0d1bbf9ec0 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -78,9 +78,9 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error { return } - err = impl.ciHandler.HandleReTriggerCI(wfStatus) + err = impl.ciHandler.CheckAndReTriggerCI(wfStatus) if err != nil { - impl.logger.Errorw("error in HandleReTriggerCI", "err", err) + impl.logger.Errorw("error in checking and re triggering ci", "err", err) //don't return as we have to update the workflow status } @@ -132,9 +132,10 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { eventType = util.Fail } - if (wfrStatus == string(v1alpha1.NodeError) || wfrStatus == string(v1alpha1.NodeFailed)) && (wfStatus.Message == pipeline.POD_DELETED_MESSAGE) { + if wfr != nil && pipeline.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) { err = impl.cdHandler.HandleCdStageReTrigger(wfr) if err != nil { + //check if this log required or not impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err) } } diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 4c5f03c155..583b5fdf21 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -176,12 +176,15 @@ const WorklowTypePre = "PRE" const WorklowTypePost = "POST" func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkflowRunner) error { - impl.Logger.Infow("HandleCdStageReTrigger", "runnerId", runner.Id) - var err error - // do not re-trigger if retries = 0 or last workflow is aborted - if runner == nil || impl.config.MaxCdWorkflowRunnerRetries == 0 || runner.Status == WorkflowCancel { - return errors.New("cdStage workflow retry condition not met,not re-triggering") + // do not re-trigger if retries = 0 + if !impl.config.WorkflowRetriesEnabled() { + impl.Logger.Debugw("cd stage workflow re-triggering is not enabled") + return nil } + + impl.Logger.Infow("re triggering cd stage ", "runnerId", runner.Id) + var err error + //add comment for this logic if runner.RefCdWorkflowRunnerId != 0 { runner, err = impl.cdWorkflowRepository.FindWorkflowRunnerById(runner.RefCdWorkflowRunnerId) if err != nil { @@ -197,11 +200,11 @@ func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkf if retryCnt >= impl.config.MaxCdWorkflowRunnerRetries { impl.Logger.Infow("maximum retries for this workflow are exhausted, not re-triggering again", "retries", retryCnt, "wfrId", runner.Id) - return errors.New("maximum retries for this workflow are exhausted") + return nil } if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { - err = impl.workflowDagExecutor.TriggerPreStage(context.Background(), nil, runner.CdWorkflow.CiArtifact, runner.CdWorkflow.Pipeline, 1, false, runner.Id) + err = impl.workflowDagExecutor.TriggerPreStage(context.Background(), runner.CdWorkflow, runner.CdWorkflow.CiArtifact, runner.CdWorkflow.Pipeline, 1, false, runner.Id) if err != nil { impl.Logger.Errorw("error in TriggerPreStage ", "err", err, "cdWorkflowRunnerId", runner.Id) return err @@ -213,6 +216,8 @@ func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkf return err } } + + impl.Logger.Infow("cd stage re triggered for runner", "runnerId", runner.Id) return nil } diff --git a/pkg/pipeline/CiCdConfig.go b/pkg/pipeline/CiCdConfig.go index 3b0e0032eb..af9d54d35c 100644 --- a/pkg/pipeline/CiCdConfig.go +++ b/pkg/pipeline/CiCdConfig.go @@ -409,3 +409,14 @@ func (impl *CiCdConfig) GetWorkflowExecutorType() pipelineConfig.WorkflowExecuto return "" } } + +func (impl *CiCdConfig) WorkflowRetriesEnabled() bool { + switch impl.Type { + case CiConfigType: + return impl.MaxCiWorkflowRetries > 0 + case CdConfigType: + return impl.MaxCdWorkflowRunnerRetries > 0 + default: + return false + } +} diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 7bafdbf20d..27ea59b015 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -60,8 +60,7 @@ import ( type CiHandler interface { HandleCIWebhook(gitCiTriggerRequest bean.GitCiTriggerRequest) (int, error) HandleCIManual(ciTriggerRequest bean.CiTriggerRequest) (int, error) - HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error - + CheckAndReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error FetchMaterialsByPipelineId(pipelineId int, showAll bool) ([]pipelineConfig.CiPipelineMaterialResponse, error) FetchMaterialsByPipelineIdAndGitMaterialId(pipelineId int, gitMaterialId int, showAll bool) ([]pipelineConfig.CiPipelineMaterialResponse, error) FetchWorkflowDetails(appId int, pipelineId int, buildId int) (WorkflowResponse, error) @@ -198,54 +197,84 @@ type Trigger struct { ReferenceCiWorkflowId int } +func (obj Trigger) BuildTriggerObject(refCiWorkflow *pipelineConfig.CiWorkflow, + ciMaterials []*pipelineConfig.CiPipelineMaterial, triggeredBy int32, + invalidateCache bool, extraEnvironmentVariables map[string]string, + pipelineType string) { + + obj.PipelineId = refCiWorkflow.CiPipelineId + obj.CommitHashes = refCiWorkflow.GitTriggers + obj.CiMaterials = ciMaterials + obj.TriggeredBy = triggeredBy + obj.InvalidateCache = invalidateCache + obj.EnvironmentId = refCiWorkflow.EnvironmentId + obj.ReferenceCiWorkflowId = refCiWorkflow.Id + obj.InvalidateCache = invalidateCache + obj.ExtraEnvironmentVariables = extraEnvironmentVariables + obj.PipelineType = pipelineType + +} + const WorkflowCancel = "CANCELLED" const DefaultCiWorkflowNamespace = "devtron-ci" const Running = "Running" const Starting = "Starting" const POD_DELETED_MESSAGE = "pod deleted" -func (impl *CiHandlerImpl) HandleReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error { +func (impl *CiHandlerImpl) CheckAndReTriggerCI(workflowStatus v1alpha1.WorkflowStatus) error { - if impl.config.MaxCiWorkflowRetries == 0 { - return errors.New("ci-workflow retires not allowed ") + //return if re-trigger feature is disabled + if !impl.config.WorkflowRetriesEnabled() { + impl.Logger.Debug("CI re-trigger is disabled") + return nil } - status, message, retryCount, ciWorkFlow, err := impl.extractPodStatusAndWorkflow(workflowStatus) + + status, message, ciWorkFlow, err := impl.extractPodStatusAndWorkflow(workflowStatus) if err != nil { impl.Logger.Errorw("error in extractPodStatusAndWorkflow", "err", err) return err } - err = impl.reTriggerCi(status, message, retryCount, ciWorkFlow) + if !CheckIfReTriggerRequired(status, message, ciWorkFlow.Status) { + impl.Logger.Debugw("not re-triggering ci", "status", status, "message", message, "ciWorkflowStatus", ciWorkFlow.Status) + return nil + } + + retryCount, refCiWorkflow, err := impl.getRefWorkflowAndCiRetryCount(ciWorkFlow) + if err != nil { + impl.Logger.Errorw("error while getting retry count value for a ciWorkflow", "ciWorkFlowId", ciWorkFlow.Id) + return err + } + + err = impl.reTriggerCi(retryCount, refCiWorkflow) if err != nil { impl.Logger.Errorw("error in reTriggerCi", "err", err, "status", status, "message", message, "retryCount", retryCount, "ciWorkFlowId", ciWorkFlow.Id) } return err } -func (impl *CiHandlerImpl) reTriggerCi(status, message string, retryCount int, ciWorkFlow *pipelineConfig.CiWorkflow) error { - if !((status == string(v1alpha1.NodeError) || status == string(v1alpha1.NodeFailed)) && message == POD_DELETED_MESSAGE) || (retryCount >= impl.config.MaxCiWorkflowRetries) { - return errors.New("ci-workflow retrigger condition not met, not re-triggering") +func (impl *CiHandlerImpl) reTriggerCi(retryCount int, refCiWorkflow *pipelineConfig.CiWorkflow) error { + if retryCount >= impl.config.MaxCiWorkflowRetries { + impl.Logger.Infow("maximum retries exhausted for this ciWorkflow", "ciWorkflowId", refCiWorkflow.Id, "retries", retryCount, "configuredRetries", impl.config.MaxCiWorkflowRetries) + return nil } - impl.Logger.Infow("HandleReTriggerCI for ciWorkflow ", "ReferenceCiWorkflowId", ciWorkFlow.Id) - ciPipelineMaterialIds := make([]int, 0, len(ciWorkFlow.GitTriggers)) - for id, _ := range ciWorkFlow.GitTriggers { + impl.Logger.Infow("re-triggering ci for a ci workflow", "ReferenceCiWorkflowId", refCiWorkflow.Id) + ciPipelineMaterialIds := make([]int, 0, len(refCiWorkflow.GitTriggers)) + for id, _ := range refCiWorkflow.GitTriggers { ciPipelineMaterialIds = append(ciPipelineMaterialIds, id) } ciMaterials, err := impl.ciPipelineMaterialRepository.GetByIdsIncludeDeleted(ciPipelineMaterialIds) - trigger := Trigger{ - PipelineId: ciWorkFlow.CiPipelineId, - CommitHashes: ciWorkFlow.GitTriggers, - CiMaterials: ciMaterials, - TriggeredBy: 1, - InvalidateCache: true, //TODO: ciTriggerRequest.InvalidateCache, - //TODO: ExtraEnvironmentVariables: extraEnvironmentVariables, - EnvironmentId: ciWorkFlow.EnvironmentId, - ReferenceCiWorkflowId: ciWorkFlow.Id, + if err != nil { + impl.Logger.Errorw("error in getting ci Pipeline Materials using ciPipeline Material Ids", "ciPipelineMaterialIds", ciPipelineMaterialIds, "err", err) + return err } + + trigger := Trigger{} + trigger.BuildTriggerObject(refCiWorkflow, ciMaterials, 1, true, nil, "") _, err = impl.ciService.TriggerCiPipeline(trigger) if err != nil { - impl.Logger.Errorw("error occured in retriggering ciWorkflow", "triggerDetails", trigger, "err", err) + impl.Logger.Errorw("error occurred in re-triggering ciWorkflow", "triggerDetails", trigger, "err", err) return err } return nil @@ -984,29 +1013,26 @@ func (impl *CiHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.Workflow const CiStageFailErrorCode = 2 -func (impl *CiHandlerImpl) extractPodStatusAndWorkflow(workflowStatus v1alpha1.WorkflowStatus) (string, string, int, *pipelineConfig.CiWorkflow, error) { +func (impl *CiHandlerImpl) extractPodStatusAndWorkflow(workflowStatus v1alpha1.WorkflowStatus) (string, string, *pipelineConfig.CiWorkflow, error) { workflowName, status, _, message, _, _ := impl.extractWorkfowStatus(workflowStatus) if workflowName == "" { impl.Logger.Errorw("extract workflow status, invalid wf name", "workflowName", workflowName, "status", status, "message", message) - return status, message, 0, nil, errors.New("invalid wf name") + return status, message, nil, errors.New("invalid wf name") } workflowId, err := strconv.Atoi(workflowName[:strings.Index(workflowName, "-")]) if err != nil { - impl.Logger.Errorw("invalid wf status update req", "err", err) - return status, message, 0, nil, err + impl.Logger.Errorw("extract workflowId, invalid wf name", "workflowName", workflowName, "err", err) + return status, message, nil, err } savedWorkflow, err := impl.ciWorkflowRepository.FindById(workflowId) if err != nil { - impl.Logger.Errorw("cannot get saved wf", "err", err) - return status, message, 0, savedWorkflow, err - } - if savedWorkflow.Status == WorkflowCancel { - return status, message, 0, nil, errors.New("not re-triggering as previous trigger is aborted/cancelled") + impl.Logger.Errorw("cannot get saved wf", "workflowId", workflowId, "err", err) + return status, message, nil, err } - retryCount, savedWorkflow, err := impl.getRefWorkflowAndCiRetryCount(savedWorkflow) - return status, message, retryCount, savedWorkflow, err + return status, message, savedWorkflow, err + } func (impl *CiHandlerImpl) getRefWorkflowAndCiRetryCount(savedWorkflow *pipelineConfig.CiWorkflow) (int, *pipelineConfig.CiWorkflow, error) { @@ -1610,25 +1636,13 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil } } if isEligibleToMarkFailed { + ciWorkflow.Status = "Failed" + ciWorkflow.PodStatus = "Failed" if isPodDeleted { - ciWorkflow.Status = "Failed" ciWorkflow.Message = POD_DELETED_MESSAGE - if ciWorkflow.Status != WorkflowCancel { - retryCount, refCiWorkflow, err := impl.getRefWorkflowAndCiRetryCount(ciWorkflow) - if err != nil { - impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id, "err", err) - continue - } - impl.Logger.Infow("re-triggering ci by UpdateCiWorkflowStatusFailedCron", "refCiWorkflowId", refCiWorkflow.Id, "ciWorkflow.Status", ciWorkflow.Status, "ciWorkflow.Message", ciWorkflow.Message, "retryCount", retryCount) - err = impl.reTriggerCi(ciWorkflow.Status, ciWorkflow.Message, retryCount, refCiWorkflow) - if err != nil { - impl.Logger.Errorw("error in reTriggerCi", "ciWorkflowId", refCiWorkflow.Id, "workflowStatus", ciWorkflow.Status, "ciWorkflowMessage", "ciWorkflow.Message", "retryCount", retryCount, "err", err) - continue - } - } + //error logging handled inside handlePodDeleted + impl.handlePodDeleted(ciWorkflow) } else { - ciWorkflow.Status = "Failed" - ciWorkflow.PodStatus = "Failed" ciWorkflow.Message = "marked failed by job" } err := impl.ciWorkflowRepository.UpdateWorkFlow(ciWorkflow) @@ -1642,6 +1656,21 @@ func (impl *CiHandlerImpl) UpdateCiWorkflowStatusFailure(timeoutForFailureCiBuil return nil } +func (impl *CiHandlerImpl) handlePodDeleted(ciWorkflow *pipelineConfig.CiWorkflow) { + if !impl.config.WorkflowRetriesEnabled() { + impl.Logger.Debug("ci workflow retry feature disabled") + return + } + retryCount, refCiWorkflow, err := impl.getRefWorkflowAndCiRetryCount(ciWorkflow) + if err != nil { + impl.Logger.Errorw("error in getRefWorkflowAndCiRetryCount", "ciWorkflowId", ciWorkflow.Id, "err", err) + } + impl.Logger.Infow("re-triggering ci by UpdateCiWorkflowStatusFailedCron", "refCiWorkflowId", refCiWorkflow.Id, "ciWorkflow.Status", ciWorkflow.Status, "ciWorkflow.Message", ciWorkflow.Message, "retryCount", retryCount) + err = impl.reTriggerCi(retryCount, refCiWorkflow) + if err != nil { + impl.Logger.Errorw("error in reTriggerCi", "ciWorkflowId", refCiWorkflow.Id, "workflowStatus", ciWorkflow.Status, "ciWorkflowMessage", "ciWorkflow.Message", "retryCount", retryCount, "err", err) + } +} func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request resourceGroup.ResourceGroupingRequest) ([]*pipelineConfig.CiWorkflowStatus, error) { ciWorkflowStatuses := make([]*pipelineConfig.CiWorkflowStatus, 0) var cdPipelines []*pipelineConfig.Pipeline diff --git a/pkg/pipeline/WorkflowUtils.go b/pkg/pipeline/WorkflowUtils.go index bc58d7f9e7..4263e40889 100644 --- a/pkg/pipeline/WorkflowUtils.go +++ b/pkg/pipeline/WorkflowUtils.go @@ -742,3 +742,9 @@ func (workflowRequest *WorkflowRequest) GetWorkflowMainContainer(config *CiCdCon UpdateContainerEnvsFromCmCs(&workflowMainContainer, workflowConfigMaps, workflowSecrets) return workflowMainContainer } + +func CheckIfReTriggerRequired(status, message, workflowRunnerStatus string) bool { + return ((status == string(v1alpha1.NodeError) || status == string(v1alpha1.NodeFailed)) && + message == POD_DELETED_MESSAGE) && workflowRunnerStatus != WorkflowCancel + +}