diff --git a/pkg/eventProcessor/in/WorkflowEventProcessorService.go b/pkg/eventProcessor/in/WorkflowEventProcessorService.go index 7328c13825..8b703f6a62 100644 --- a/pkg/eventProcessor/in/WorkflowEventProcessorService.go +++ b/pkg/eventProcessor/in/WorkflowEventProcessorService.go @@ -436,51 +436,47 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDWorkflowStatusUpdate() error return } - wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus) + wfrId, wfrStatus, stateChanged, err := impl.cdHandler.UpdateWorkflow(wfStatus) impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus) if err != nil { impl.logger.Error("err", err) return } - - wfr, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(wfrId) - if err != nil { - impl.logger.Errorw("could not get wf runner", "err", err) - return - } - if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { - if len(wfr.ImagePathReservationIds) > 0 { - err := impl.cdHandler.DeactivateImageReservationPathsOnFailure(wfr.ImagePathReservationIds) - if err != nil { - impl.logger.Errorw("error in removing image path reservation ") - } - } - } - if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { - eventType := eventUtil.EventType(0) - if wfrStatus == string(v1alpha1.NodeSucceeded) { - eventType = eventUtil.Success - } else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { - eventType = eventUtil.Fail + if stateChanged { + wfr, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(wfrId) + if err != nil { + impl.logger.Errorw("could not get wf runner", "wfrId", wfrId, "err", err) + return } - - if wfr != nil && executors.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) { - err = impl.workflowDagExecutor.HandleCdStageReTrigger(wfr) - if err != nil { - //check if this log required or not - impl.logger.Errorw("error in HandleCdStageReTrigger", "error", err) + if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { + if len(wfr.ImagePathReservationIds) > 0 { + err := impl.cdHandler.DeactivateImageReservationPathsOnFailure(wfr.ImagePathReservationIds) + if err != nil { + impl.logger.Errorw("error in removing image path reservation ", "imagePathReservationIds", wfr.ImagePathReservationIds, "err", err) + } } } + if wfrStatus == string(v1alpha1.NodeSucceeded) || wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { + eventType := eventUtil.EventType(0) + if wfrStatus == string(v1alpha1.NodeSucceeded) { + eventType = eventUtil.Success + } else if wfrStatus == string(v1alpha1.NodeFailed) || wfrStatus == string(v1alpha1.NodeError) { + eventType = eventUtil.Fail + } - if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE || wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST { - event, _ := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, eventUtil.CD) - impl.logger.Debugw("event pre stage", "event", event) - event = impl.eventFactory.BuildExtraCDData(event, wfr, 0, wfr.WorkflowType) - _, evtErr := impl.eventClient.WriteNotificationEvent(event) - if evtErr != nil { - impl.logger.Errorw("CD stage post fail or success event unable to sent", "error", evtErr) + if wfr != nil && executors.CheckIfReTriggerRequired(wfrStatus, wfStatus.Message, wfr.Status) { + err = impl.workflowDagExecutor.HandleCdStageReTrigger(wfr) + if err != nil { + //check if this log required or not + impl.logger.Errorw("error in HandleCdStageReTrigger", "workflowRunnerId", wfr.Id, "workflowStatus", wfrStatus, "workflowStatusMessage", wfStatus.Message, "error", err) + } + impl.logger.Debugw("re-triggered cd stage", "workflowRunnerId", wfr.Id, "workflowStatus", wfrStatus, "workflowStatusMessage", wfStatus.Message) + } else { + impl.sendPrePostCdNotificationEvent(eventType, wfr) } } + } else { + impl.logger.Debugw("no state change detected for the cd workflow status update, ignoring this event", "workflowRunnerId", wfrId, "wfrStatus", wfrStatus) } } @@ -503,6 +499,18 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDWorkflowStatusUpdate() error return nil } +func (impl *WorkflowEventProcessorImpl) sendPrePostCdNotificationEvent(eventType eventUtil.EventType, wfr *pipelineConfig.CdWorkflowRunner) { + if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE || wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST { + event, _ := impl.eventFactory.Build(eventType, &wfr.CdWorkflow.PipelineId, wfr.CdWorkflow.Pipeline.AppId, &wfr.CdWorkflow.Pipeline.EnvironmentId, eventUtil.CD) + impl.logger.Debugw("event pre stage", "event", event) + event = impl.eventFactory.BuildExtraCDData(event, wfr, 0, wfr.WorkflowType) + _, evtErr := impl.eventClient.WriteNotificationEvent(event) + if evtErr != nil { + impl.logger.Errorw("CD stage post fail or success event unable to sent", "error", evtErr) + } + } +} + func (impl *WorkflowEventProcessorImpl) extractCiCompleteEventFrom(msg *model.PubSubMsg) (bean.CiCompleteEvent, error) { ciCompleteEvent := bean.CiCompleteEvent{} err := json.Unmarshal([]byte(msg.Data), &ciCompleteEvent) diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 22f1efc15a..6bd7017c0b 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -68,7 +68,7 @@ const ( ) type CdHandler interface { - UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) + UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, bool, error) GetCdBuildHistory(appId int, environmentId int, pipelineId int, offset int, size int) ([]pipelineBean.CdWorkflowWithArtifact, error) GetRunningWorkflowLogs(environmentId int, pipelineId int, workflowId int) (*bufio.Reader, func() error, error) FetchCdWorkflowDetails(appId int, environmentId int, pipelineId int, buildId int) (types.WorkflowResponse, error) @@ -262,23 +262,23 @@ func (impl *CdHandlerImpl) handleForceAbortCaseForCdStage(workflowRunner *pipeli return nil } -func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, error) { +func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, string, bool, error) { wfStatusRs := impl.extractWorkfowStatus(workflowStatus) workflowName, status, podStatus, message, podName := wfStatusRs.WorkflowName, wfStatusRs.Status, wfStatusRs.PodStatus, wfStatusRs.Message, wfStatusRs.PodName impl.Logger.Debugw("cd update for ", "wf ", workflowName, "status", status) if workflowName == "" { - return 0, "", errors.New("invalid wf name") + return 0, "", false, errors.New("invalid wf name") } workflowId, err := strconv.Atoi(workflowName[:strings.Index(workflowName, "-")]) if err != nil { impl.Logger.Error("invalid wf status update req", "err", err) - return 0, "", err + return 0, "", false, err } savedWorkflow, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(workflowId) if err != nil { impl.Logger.Error("cannot get saved wf", "err", err) - return 0, "", err + return 0, "", false, err } cdArtifactLocationFormat := impl.config.GetArtifactLocationFormat() @@ -299,21 +299,21 @@ func (impl *CdHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus err = impl.cdWorkflowRunnerService.UpdateCdWorkflowRunnerWithStage(savedWorkflow) if err != nil { impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id)) - return 0, "", err + return 0, "", true, err } appId := savedWorkflow.CdWorkflow.Pipeline.AppId envId := savedWorkflow.CdWorkflow.Pipeline.EnvironmentId envDeploymentConfig, err := impl.deploymentConfigService.GetConfigForDevtronApps(appId, envId) if err != nil { impl.Logger.Errorw("error in fetching environment deployment config by appId and envId", "appId", appId, "envId", envId, "err", err) - return 0, "", err + return 0, "", true, err } util3.TriggerCDMetrics(cdWorkflow.GetTriggerMetricsFromRunnerObj(savedWorkflow, envDeploymentConfig), impl.config.ExposeCDMetrics) if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status { impl.Logger.Warnw("cd stage failed for workflow: ", "wfId", savedWorkflow.Id) } } - return savedWorkflow.Id, savedWorkflow.Status, nil + return savedWorkflow.Id, savedWorkflow.Status, false, nil } func (impl *CdHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.WorkflowStatus) *types.WorkflowStatus { diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 4e9df000e8..2b37436c8b 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -1223,17 +1223,25 @@ func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id)) return 0, err } - if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status { - impl.Logger.Warnw("ci failed for workflow: ", "wfId", savedWorkflow.Id) + impl.sendCIFailEvent(savedWorkflow, status, message) + } + return savedWorkflow.Id, nil +} - if extractErrorCode(savedWorkflow.Message) != workFlow.CiStageFailErrorCode { - go impl.WriteCIFailEvent(savedWorkflow) - } else { - impl.Logger.Infof("Step failed notification received for wfID %d with message %s", savedWorkflow.Id, savedWorkflow.Message) - } +func (impl *CiHandlerImpl) sendCIFailEvent(savedWorkflow *pipelineConfig.CiWorkflow, status, message string) { + if string(v1alpha1.NodeError) == savedWorkflow.Status || string(v1alpha1.NodeFailed) == savedWorkflow.Status { + if executors.CheckIfReTriggerRequired(status, message, savedWorkflow.Status) { + impl.Logger.Infow("not sending failure notification for re-trigger workflow", "workflowId", savedWorkflow.Id) + return + } + impl.Logger.Warnw("ci failed for workflow: ", "wfId", savedWorkflow.Id) + + if extractErrorCode(savedWorkflow.Message) != workFlow.CiStageFailErrorCode { + go impl.WriteCIFailEvent(savedWorkflow) + } else { + impl.Logger.Infof("Step failed notification received for wfID %d with message %s", savedWorkflow.Id, savedWorkflow.Message) } } - return savedWorkflow.Id, nil } func extractErrorCode(msg string) int {