Skip to content

Commit 69541df

Browse files
committed
Improvements on pipeline cancel
When a pipeline is cancelled, detect it early in the reconcile cycle and cancel all taskruns defined in the pipelinerun status, so that we don't spend time building the DAG and resolving all resources. The patch to cancel a TaskRun is the same for all TaskRuns, so build it once and apply it to all taskruns.
1 parent a4065de commit 69541df

File tree

5 files changed

+40
-44
lines changed

5 files changed

+40
-44
lines changed

pkg/reconciler/pipelinerun/cancel.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,14 @@ import (
2727

2828
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
2929
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
30-
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
3130
corev1 "k8s.io/api/core/v1"
3231
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3332
"k8s.io/apimachinery/pkg/types"
3433
"knative.dev/pkg/apis"
3534
)
3635

3736
// cancelPipelineRun marks the PipelineRun as cancelled and any resolved TaskRun(s) too.
38-
func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask, clientSet clientset.Interface) error {
37+
func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, clientSet clientset.Interface) error {
3938
pr.Status.SetCondition(&apis.Condition{
4039
Type: apis.ConditionSucceeded,
4140
Status: corev1.ConditionFalse,
@@ -45,23 +44,21 @@ func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, pipe
4544
// update pr completed time
4645
pr.Status.CompletionTime = &metav1.Time{Time: time.Now()}
4746
errs := []string{}
48-
for _, rprt := range pipelineState {
49-
if rprt.TaskRun == nil {
50-
// No taskrun yet, pass
51-
continue
52-
}
5347

54-
logger.Infof("cancelling TaskRun %s", rprt.TaskRunName)
48+
// Use Patch to update the TaskRuns since the TaskRun controller may be operating on the
49+
// TaskRuns at the same time and trying to update the entire object may cause a race
50+
b, err := getCancelPatch()
51+
if err != nil {
52+
return fmt.Errorf("couldn't make patch to update TaskRun cancellation: %v", err)
53+
}
5554

56-
// Use Patch to update the TaskRuns since the TaskRun controller may be operating on the
57-
// TaskRuns at the same time and trying to update the entire object may cause a race
58-
b, err := getCancelPatch()
59-
if err != nil {
60-
errs = append(errs, fmt.Errorf("couldn't make patch to update TaskRun cancellation: %v", err).Error())
61-
continue
62-
}
63-
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Patch(rprt.TaskRunName, types.JSONPatchType, b, ""); err != nil {
64-
errs = append(errs, fmt.Errorf("Failed to patch TaskRun `%s` with cancellation: %s", rprt.TaskRunName, err).Error())
55+
// Loop over the TaskRuns in the PipelineRun status.
56+
// If a TaskRun is not in the status yet we should not cancel it anyways.
57+
for taskRunName := range pr.Status.TaskRuns {
58+
logger.Infof("cancelling TaskRun %s", taskRunName)
59+
60+
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Patch(taskRunName, types.JSONPatchType, b, ""); err != nil {
61+
errs = append(errs, fmt.Errorf("Failed to patch TaskRun `%s` with cancellation: %s", taskRunName, err).Error())
6562
continue
6663
}
6764
}

pkg/reconciler/pipelinerun/cancel_test.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"testing"
2222

2323
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
24-
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
2524
ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
2625
tb "github.com/tektoncd/pipeline/test/builder"
2726
test "github.com/tektoncd/pipeline/test/v1alpha1"
@@ -32,10 +31,9 @@ import (
3231

3332
func TestCancelPipelineRun(t *testing.T) {
3433
testCases := []struct {
35-
name string
36-
pipelineRun *v1alpha1.PipelineRun
37-
pipelineState []*resources.ResolvedPipelineRunTask
38-
taskRuns []*v1alpha1.TaskRun
34+
name string
35+
pipelineRun *v1alpha1.PipelineRun
36+
taskRuns []*v1alpha1.TaskRun
3937
}{{
4038
name: "no-resolved-taskrun",
4139
pipelineRun: tb.PipelineRun("test-pipeline-run-cancelled", tb.PipelineRunNamespace("foo"),
@@ -44,28 +42,29 @@ func TestCancelPipelineRun(t *testing.T) {
4442
),
4543
),
4644
}, {
47-
name: "1-of-resolved-taskrun",
45+
name: "1-taskrun",
4846
pipelineRun: tb.PipelineRun("test-pipeline-run-cancelled", tb.PipelineRunNamespace("foo"),
4947
tb.PipelineRunSpec("test-pipeline",
5048
tb.PipelineRunCancelled,
5149
),
50+
tb.PipelineRunStatus(
51+
tb.PipelineRunTaskRunsStatus("t1", &v1alpha1.PipelineRunTaskRunStatus{
52+
PipelineTaskName: "task-1",
53+
})),
5254
),
53-
pipelineState: []*resources.ResolvedPipelineRunTask{
54-
{TaskRunName: "t1", TaskRun: tb.TaskRun("t1", tb.TaskRunNamespace("foo"))},
55-
{TaskRunName: "t2"},
56-
},
5755
taskRuns: []*v1alpha1.TaskRun{tb.TaskRun("t1", tb.TaskRunNamespace("foo"))},
5856
}, {
59-
name: "resolved-taskruns",
57+
name: "multiple-taskruns",
6058
pipelineRun: tb.PipelineRun("test-pipeline-run-cancelled", tb.PipelineRunNamespace("foo"),
6159
tb.PipelineRunSpec("test-pipeline",
6260
tb.PipelineRunCancelled,
6361
),
62+
tb.PipelineRunStatus(
63+
tb.PipelineRunTaskRunsStatus(
64+
"t1", &v1alpha1.PipelineRunTaskRunStatus{PipelineTaskName: "task-1"}),
65+
tb.PipelineRunTaskRunsStatus(
66+
"t2", &v1alpha1.PipelineRunTaskRunStatus{PipelineTaskName: "task-2"})),
6467
),
65-
pipelineState: []*resources.ResolvedPipelineRunTask{
66-
{TaskRunName: "t1", TaskRun: tb.TaskRun("t1", tb.TaskRunNamespace("foo"))},
67-
{TaskRunName: "t2", TaskRun: tb.TaskRun("t2", tb.TaskRunNamespace("foo"))},
68-
},
6968
taskRuns: []*v1alpha1.TaskRun{tb.TaskRun("t1", tb.TaskRunNamespace("foo")), tb.TaskRun("t2", tb.TaskRunNamespace("foo"))},
7069
}}
7170
for _, tc := range testCases {
@@ -79,7 +78,7 @@ func TestCancelPipelineRun(t *testing.T) {
7978
ctx, cancel := context.WithCancel(ctx)
8079
defer cancel()
8180
c, _ := test.SeedTestData(t, ctx, d)
82-
err := cancelPipelineRun(logtesting.TestLogger(t), tc.pipelineRun, tc.pipelineState, c.Pipeline)
81+
err := cancelPipelineRun(logtesting.TestLogger(t), tc.pipelineRun, c.Pipeline)
8382
if err != nil {
8483
t.Fatal(err)
8584
}

pkg/reconciler/pipelinerun/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const (
4444
resyncPeriod = 10 * time.Hour
4545
)
4646

47+
// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
4748
func NewController(images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
4849
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
4950
logger := logging.FromContext(ctx)

pkg/reconciler/pipelinerun/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
stats.UnitDimensionless)
4949
)
5050

51+
// Recorder holds keys for Tekton metrics
5152
type Recorder struct {
5253
initialized bool
5354

pkg/reconciler/pipelinerun/pipelinerun.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
160160
// to update, and return the original error combined with any update error
161161
var merr error
162162

163-
if pr.IsDone() {
163+
switch {
164+
case pr.IsDone():
164165
// We may be reading a version of the object that was stored at an older version
165166
// and may not have had all of the assumed default specified.
166167
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))
@@ -181,7 +182,13 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
181182
c.Logger.Warnf("Failed to log the metrics : %v", err)
182183
}
183184
}(c.metrics)
184-
} else {
185+
case pr.IsCancelled():
186+
// If the pipelinerun is cancelled, cancel tasks and update status
187+
before := pr.Status.GetCondition(apis.ConditionSucceeded)
188+
merr = multierror.Append(merr, cancelPipelineRun(c.Logger, pr, c.PipelineClientSet))
189+
after := pr.Status.GetCondition(apis.ConditionSucceeded)
190+
reconciler.EmitEvent(c.Recorder, before, after, pr)
191+
default:
185192
if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil {
186193
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
187194
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "Failed to create tracker for TaskRuns for PipelineRun")
@@ -527,15 +534,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
527534
}
528535
}
529536

530-
// If the pipelinerun is cancelled, cancel tasks and update status
531-
if pr.IsCancelled() {
532-
before := pr.Status.GetCondition(apis.ConditionSucceeded)
533-
err := cancelPipelineRun(c.Logger, pr, pipelineState, c.PipelineClientSet)
534-
after := pr.Status.GetCondition(apis.ConditionSucceeded)
535-
reconciler.EmitEvent(c.Recorder, before, after, pr)
536-
return err
537-
}
538-
539537
if pipelineState.IsBeforeFirstTaskRun() && pr.HasVolumeClaimTemplate() {
540538
// create workspace PVC from template
541539
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil {

0 commit comments

Comments
 (0)