diff --git a/pkg/eventProcessor/bean/workflowEventBean.go b/pkg/eventProcessor/bean/workflowEventBean.go index 8bcd4f200c..4b33b8cba4 100644 --- a/pkg/eventProcessor/bean/workflowEventBean.go +++ b/pkg/eventProcessor/bean/workflowEventBean.go @@ -40,6 +40,7 @@ type CdStageCompleteEvent struct { PluginRegistryArtifactDetails map[string][]string `json:"PluginRegistryArtifactDetails"` PluginArtifacts *PluginArtifacts `json:"pluginArtifacts"` IsArtifactUploaded bool `json:"isArtifactUploaded"` + IsFailed bool `json:"isFailed"` } type UserDeploymentRequest struct { diff --git a/pkg/eventProcessor/in/WorkflowEventProcessorService.go b/pkg/eventProcessor/in/WorkflowEventProcessorService.go index 631c68c597..7328c13825 100644 --- a/pkg/eventProcessor/in/WorkflowEventProcessorService.go +++ b/pkg/eventProcessor/in/WorkflowEventProcessorService.go @@ -170,14 +170,23 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDStageCompleteEvent() error { return } wfr.IsArtifactUploaded = cdStageCompleteEvent.IsArtifactUploaded - if wfr.Status != string(v1alpha1.NodeSucceeded) { + if !slices.Contains(cdWorkflowModelBean.WfrTerminalStatusList, wfr.Status) { impl.logger.Debugw("event received from ci runner, updating workflow runner status as succeeded", "savedWorkflowRunnerId", wfr.Id, "oldStatus", wfr.Status, "podStatus", wfr.PodStatus) - wfr.Status = string(v1alpha1.NodeSucceeded) + if cdStageCompleteEvent.IsFailed { + wfr.Status = string(v1alpha1.NodeFailed) + } else { + wfr.Status = string(v1alpha1.NodeSucceeded) + } err = impl.cdWorkflowRunnerService.UpdateWfr(wfr, 1) if err != nil { impl.logger.Errorw("update cd-wf-runner failed for id ", "cdWfrId", wfr.Id, "err", err) return } + + triggerContext := triggerBean.TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + } + impl.handleCDStageCompleteEvent(triggerContext, cdStageCompleteEvent, wfr) } else { err = impl.cdWorkflowRunnerService.UpdateIsArtifactUploaded(wfr.Id, cdStageCompleteEvent.IsArtifactUploaded) if err != nil { @@ -185,32 +194,6 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDStageCompleteEvent() error { return } } - - triggerContext := triggerBean.TriggerContext{ - ReferenceId: pointer.String(msg.MsgId), - } - if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE { - impl.logger.Debugw("received pre stage success event for workflow runner ", "wfId", strconv.Itoa(wfr.Id)) - err = impl.workflowDagExecutor.HandlePreStageSuccessEvent(triggerContext, cdStageCompleteEvent) - if err != nil { - impl.logger.Errorw("deployment success event error", "err", err) - return - } - } else if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST { - - pluginArtifacts := make(map[string][]string) - if cdStageCompleteEvent.PluginArtifacts != nil { - pluginArtifacts = cdStageCompleteEvent.PluginArtifacts.GetRegistryToUniqueContainerArtifactDataMapping() - } - globalUtil.MergeMaps(pluginArtifacts, cdStageCompleteEvent.PluginRegistryArtifactDetails) - - impl.logger.Debugw("received post stage success event for workflow runner ", "wfId", strconv.Itoa(wfr.Id)) - err = impl.workflowDagExecutor.HandlePostStageSuccessEvent(triggerContext, wfr, wfr.CdWorkflowId, cdStageCompleteEvent.CdPipelineId, cdStageCompleteEvent.TriggeredBy, pluginArtifacts) - if err != nil { - impl.logger.Errorw("deployment success event error", "err", err) - return - } - } } // add required logging here @@ -233,6 +216,37 @@ func (impl *WorkflowEventProcessorImpl) SubscribeCDStageCompleteEvent() error { return nil } +func (impl *WorkflowEventProcessorImpl) handleCDStageCompleteEvent(triggerContext triggerBean.TriggerContext, cdStageCompleteEvent bean.CdStageCompleteEvent, wfr *cdWorkflowBean.CdWorkflowRunnerDto) { + if cdStageCompleteEvent.IsFailed { + impl.logger.Debugw("event received from ci runner, updating workflow runner status as failed, not taking any action", "savedWorkflowRunnerId", wfr.Id, "oldStatus", wfr.Status, "podStatus", wfr.PodStatus) + return + } + + var err error + if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_PRE { + impl.logger.Debugw("received pre stage success event for workflow runner ", "wfId", strconv.Itoa(wfr.Id)) + err = impl.workflowDagExecutor.HandlePreStageSuccessEvent(triggerContext, cdStageCompleteEvent) + if err != nil { + impl.logger.Errorw("deployment success event error", "err", err) + return + } + } else if wfr.WorkflowType == apiBean.CD_WORKFLOW_TYPE_POST { + impl.logger.Debugw("received post stage success event for workflow runner ", "wfId", strconv.Itoa(wfr.Id)) + + pluginArtifacts := make(map[string][]string) + if cdStageCompleteEvent.PluginArtifacts != nil { + pluginArtifacts = cdStageCompleteEvent.PluginArtifacts.GetRegistryToUniqueContainerArtifactDataMapping() + } + globalUtil.MergeMaps(pluginArtifacts, cdStageCompleteEvent.PluginRegistryArtifactDetails) + + err = impl.workflowDagExecutor.HandlePostStageSuccessEvent(triggerContext, wfr, wfr.CdWorkflowId, cdStageCompleteEvent.CdPipelineId, cdStageCompleteEvent.TriggeredBy, pluginArtifacts) + if err != nil { + impl.logger.Errorw("deployment success event error", "err", err) + return + } + } +} + func (impl *WorkflowEventProcessorImpl) SubscribeTriggerBulkAction() error { callback := func(msg *model.PubSubMsg) { cdWorkflow := new(pipelineConfig.CdWorkflow)