diff --git a/test/e2e/functional/parameter-aggregation-steps-with-retry.yaml b/test/e2e/functional/parameter-aggregation-steps-with-retry.yaml new file mode 100644 index 000000000000..c8f8a4439641 --- /dev/null +++ b/test/e2e/functional/parameter-aggregation-steps-with-retry.yaml @@ -0,0 +1,33 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: parameter-aggregation-steps-with-retry +spec: + retryStrategy: + limit: 1 + entrypoint: fanout-steps-with-output + templates: + - name: echo-value + inputs: + parameters: + - name: message + container: + image: argoproj/argosay:v2 + outputs: + parameters: + - name: dummy-output + value: '{{inputs.parameters.message}}' + - name: fanout-steps-with-output + steps: + - - name: echo-list + template: echo-value + arguments: + parameters: + - name: message + value: '{{item}}' + withItems: [1, 2, 3] + outputs: + parameters: + - name: dummy-steps-output + valueFrom: + parameter: '{{steps.echo-list.outputs.parameters.dummy-output}}' \ No newline at end of file diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 71658712eadb..155859092dfa 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -610,6 +610,24 @@ func (s *FunctionalSuite) TestParameterAggregationDAGWithRetry() { }) } +func (s *FunctionalSuite) TestParameterAggregationStepsWithRetry() { + s.Given(). + Workflow("@functional/parameter-aggregation-steps-with-retry.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(time.Second * 90). + Then(). + ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase) + nodeStatus := status.Nodes.FindByDisplayName("parameter-aggregation-steps-with-retry(0)") + require.NotNil(t, nodeStatus) + assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase) + require.NotNil(t, nodeStatus.Outputs) + assert.Len(t, nodeStatus.Outputs.Parameters, 1) + assert.Equal(t, `["1","2","3"]`, nodeStatus.Outputs.Parameters[0].Value.String()) + }) +} + func (s *FunctionalSuite) TestDAGDepends() { s.Given(). Workflow("@functional/dag-depends.yaml"). diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 14ae9d41d270..1a6968020188 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -673,10 +673,6 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(ctx context.Context, dagCtx * var ancestorNodes []wfv1.NodeStatus for _, node := range woc.wf.Status.Nodes { if node.BoundaryID == dagCtx.boundaryID && strings.HasPrefix(node.Name, ancestorNode.Name+"(") { - // Filter retried nodes and only aggregate outputs of their parent nodes. - if node.NodeFlag != nil && node.NodeFlag.Retried { - continue - } ancestorNodes = append(ancestorNodes, node) } } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index d41397ff2374..9142e3bba068 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -3368,10 +3368,10 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st if len(childNodes) == 0 { return nil } - // Some of the children may be hooks, only keep those that aren't + // Some of the children may be hooks and some of the children may be retried nodes, only keep those that aren't nodeIdx := 0 for i := range childNodes { - if childNodes[i].NodeFlag == nil || !childNodes[i].NodeFlag.Hooked { + if childNodes[i].NodeFlag == nil || (!childNodes[i].NodeFlag.Hooked && !childNodes[i].NodeFlag.Retried) { childNodes[nodeIdx] = childNodes[i] nodeIdx++ }