diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a5c850274e..ef34eb9fba 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -79,6 +79,11 @@ type Controller interface { // GetLogger returns this controller logger prefilled with basic information. GetLogger() logr.Logger + + // Len returns the current queue length, for informational purposes only. You + // shouldn't e.g. gate a call to Add() or Get() on Len() being a particular + // value, that can't be synchronized properly. + Len() int } // New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 1f4712d8bf..5abec67ebe 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -267,6 +267,15 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { return true } +// Len returns the current queue length, for informational purposes only. You +// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular +// value, that can't be synchronized properly. +func (c *Controller) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.Queue.Len() +} + const ( labelError = "error" labelRequeueAfter = "requeue_after" diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 9f23fa2abc..b7699db54e 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -467,6 +467,39 @@ var _ = Describe("controller", func() { Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) }) + It("should requeue a Request if there is an error and continue processing items using Len()", func() { + ctx, cancel := context.WithCancel(context.Background()) + req := reconcile.Request{ + NamespacedName: types.NamespacedName{Namespace: "bar", Name: "foo"}, + } + queue = &controllertest.Queue{ + Interface: workqueue.New(), + } + ctrl.Queue = queue + defer cancel() + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) + }() + + queue.Add(request) + queue.Add(req) + Expect(ctrl.Len()).To(Equal(2)) + By("Invoking Reconciler which will give an error") + + fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile")) + Expect(<-reconciled).To(Equal(req)) + + By("Invoking Reconciler a second time without error") + fakeReconcile.AddResult(reconcile.Result{}, nil) + Expect(ctrl.Len()).To(Equal(1)) + Expect(<-reconciled).To(Equal(req)) + + By("Removing the item from the queue") + Expect(ctrl.Len()).To(Equal(0)) + Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0)) + }, 1.0) + PIt("should forget an item if it is not a Request and continue processing items", func() { // TODO(community): write this test })