Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .features/pending/cache-updated-workflow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Component: General
Issues: 13114
Description: Fast cache workflows to avoid reconciling outdated objects.
Author: [Shuangkun Tian](https://github.com/shuangkun)

Use a thread-safe cache.Store to cache the latest workflow. On read, compare fast-cache and informer resourceVersion and use the newer.
1 change: 0 additions & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ This document outlines environment variables that can be used to customize behav
| `EXPRESSION_TEMPLATES` | `bool` | `true` | Escape hatch to disable expression templates. |
| `EVENT_AGGREGATION_WITH_ANNOTATIONS` | `bool` | `false` | Whether event annotations will be used when aggregating events. |
| `GZIP_IMPLEMENTATION` | `string` | `PGZip` | The implementation of compression/decompression. Currently only "`PGZip`" and "`GZip`" are supported. |
| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. |
| `HEALTHZ_AGE` | `time.Duration` | `5m` | How old a un-reconciled workflow is to report unhealthy. |
| `INDEX_WORKFLOW_SEMAPHORE_KEYS` | `bool` | `true` | Whether or not to index semaphores. |
| `LEADER_ELECTION_IDENTITY` | `string` | Controller's `metadata.name` | The ID used for workflow controllers to elect a leader. |
Expand Down
63 changes: 62 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type WorkflowController struct {
recentCompletions recentCompletions
// lastUnreconciledWorkflows is a map of workflows that have been recently unreconciled
lastUnreconciledWorkflows map[string]*wfv1.Workflow
// workflowFastStore provides fast access to latest workflow objects
workflowFastStore cache.Store
}

const (
Expand Down Expand Up @@ -228,6 +230,9 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
return nil, err
}

// Initialize fast cache
wfc.workflowFastStore = cache.NewStore(cache.MetaNamespaceKeyFunc)

deprecation.Initialize(wfc.metrics.DeprecatedFeature)
wfc.entrypoint = entrypoint.New(kubeclientset, wfc.Config.Images)

Expand Down Expand Up @@ -711,7 +716,7 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

obj, ok := wfc.getWorkflowByKey(ctx, key)
obj, ok := wfc.getWorkflowByKeyWithCache(ctx, key)
if !ok {
return true
}
Expand Down Expand Up @@ -909,6 +914,60 @@ func (wfc *WorkflowController) recordCompletedWorkflow(key string) {
}
}

// updateWorkflowFastCache updates the fast cache with the latest workflow object
func (wfc *WorkflowController) updateWorkflowFastCache(wf *unstructured.Unstructured) {
// Add or update the workflow in the store (cache.Store is thread-safe)
if wfc.workflowFastStore == nil {
// initialize lazily for controllers constructed in tests
wfc.workflowFastStore = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
if err := wfc.workflowFastStore.Add(wf); err != nil {
// best-effort cache; ignore errors to avoid impacting controller flow
_ = err // explicitly ignore error to satisfy linter
}
}

// deleteWorkflowFromFastCache removes workflow from fast cache
func (wfc *WorkflowController) deleteWorkflowFromFastCache(key string) {
if wfc.workflowFastStore == nil {
return
}
if obj, exists, _ := wfc.workflowFastStore.GetByKey(key); exists {
_ = wfc.workflowFastStore.Delete(obj)
}
}

// getWorkflowByKeyWithCache tries to get workflow from fast cache first, then falls back to informer
func (wfc *WorkflowController) getWorkflowByKeyWithCache(ctx context.Context, key string) (interface{}, bool) {
logger := logging.RequireLoggerFromContext(ctx)

if wfc.wfInformer == nil || wfc.wfInformer.GetIndexer() == nil {
return nil, false
}

// Get from informer cache first; if not present, we consider it not processable
objInf, infExists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
if !infExists || err != nil {
logger.WithField("key", key).WithError(err).Error(ctx, "Failed to get workflow from informer")
return nil, false
}

// Try fast cache; if newer than informer, prefer fast
if wfc.workflowFastStore != nil {
objFast, fastExists, _ := wfc.workflowFastStore.GetByKey(key)
if fastExists {
fastUn, okFast := objFast.(*unstructured.Unstructured)
infUn, okInf := objInf.(*unstructured.Unstructured)
if okFast && okInf {
if util.OutDateResourceVersion(infUn.GetResourceVersion(), fastUn.GetResourceVersion()) {
return fastUn.DeepCopy(), true
}
}
}
}
return objInf, true
}

// Returns true if the workflow given by key is in the recently completed
// list. Will perform expiry cleanup before checking.
func (wfc *WorkflowController) checkRecentlyCompleted(key string) bool {
Expand Down Expand Up @@ -999,6 +1058,8 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers(ctx context.Context)
wfc.recordCompletedWorkflow(key)
// no need to add to the queue - this workflow is done
wfc.throttler.Remove(key)
// delete from fast cache
wfc.deleteWorkflowFromFastCache(key)
}
},
},
Expand Down
66 changes: 66 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,72 @@ func TestIsArchivable(t *testing.T) {
})
}

// makeUnstructuredCache creates a minimal unstructured Workflow with given rv.
func makeUnstructuredCache(rv string) *unstructured.Unstructured {
un := &unstructured.Unstructured{}
un.SetNamespace("default")
un.SetName("wf")
un.SetResourceVersion(rv)
return un
}

// setupControllerForFastCacheTests initializes only the parts needed for cache tests.
func setupControllerForFastCacheTests(t *testing.T) *WorkflowController {
t.Helper()
wfc := &WorkflowController{}
wfc.workflowFastStore = cache.NewStore(cache.MetaNamespaceKeyFunc)
inf := cache.NewSharedIndexInformer(&cache.ListWatch{}, &unstructured.Unstructured{}, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
wfc.wfInformer = inf
return wfc
}

func TestGetWorkflowByKeyWithCache_PreferFastWhenNewer(t *testing.T) {
wfc := setupControllerForFastCacheTests(t)
key := "default/wf"
// informer rv=10
_ = wfc.wfInformer.GetStore().Add(makeUnstructuredCache("10"))
// fast rv=20 -> choose fast
wfc.updateWorkflowFastCache(makeUnstructuredCache("20"))

obj, ok := wfc.getWorkflowByKeyWithCache(logging.TestContext(context.Background()), key)
if !ok {
t.Fatalf("expected object, got none")
}
if rv := obj.(*unstructured.Unstructured).GetResourceVersion(); rv != "20" {
t.Fatalf("expected rv=20 (fast), got %s", rv)
}
}

func TestGetWorkflowByKeyWithCache_PreferInformerWhenNewerOrEqual(t *testing.T) {
wfc := setupControllerForFastCacheTests(t)
key := "default/wf"
// informer rv=10
_ = wfc.wfInformer.GetStore().Add(makeUnstructuredCache("10"))
// fast rv=9 -> choose informer
wfc.updateWorkflowFastCache(makeUnstructuredCache("9"))

obj, ok := wfc.getWorkflowByKeyWithCache(logging.TestContext(context.Background()), key)
if !ok {
t.Fatalf("expected object, got none")
}
if rv := obj.(*unstructured.Unstructured).GetResourceVersion(); rv != "10" {
t.Fatalf("expected rv=10 (informer), got %s", rv)
}
}

func TestFastCacheUpdateAndDelete(t *testing.T) {
wfc := setupControllerForFastCacheTests(t)
key := "default/wf"
wfc.updateWorkflowFastCache(makeUnstructuredCache("1"))
if _, exists, _ := wfc.workflowFastStore.GetByKey(key); !exists {
t.Fatalf("expected fast cache to contain key after update")
}
wfc.deleteWorkflowFromFastCache(key)
if _, exists, _ := wfc.workflowFastStore.GetByKey(key); exists {
t.Fatalf("expected fast cache to delete key")
}
}

func TestReleaseAllWorkflowLocks(t *testing.T) {
ctx := logging.TestContext(t.Context())

Expand Down
26 changes: 4 additions & 22 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,16 +796,10 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {

woc.log.WithFields(logging.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info(ctx, "Workflow update successful")

switch os.Getenv("INFORMER_WRITE_BACK") {
// By default we write back (as per v2.11), this does not reduce errors, but does reduce
// conflicts and therefore we log fewer warning messages.
case "", "true":
if err := woc.writeBackToInformer(); err != nil {
woc.markWorkflowError(ctx, err)
return
}
case "false":
time.Sleep(1 * time.Second)
// Update fast cache with the latest workflow object immediately after successful update
// This ensures we have the most recent version available for subsequent processing
if wfUnstructured, err := wfutil.ToUnstructured(woc.wf); err == nil {
woc.controller.updateWorkflowFastCache(wfUnstructured)
}

// Make sure the workflow completed.
Expand Down Expand Up @@ -841,18 +835,6 @@ func (woc *wfOperationCtx) deleteTaskResults(ctx context.Context) error {
)
}

func (woc *wfOperationCtx) writeBackToInformer() error {
un, err := wfutil.ToUnstructured(woc.wf)
if err != nil {
return fmt.Errorf("failed to convert workflow to unstructured: %w", err)
}
err = woc.controller.wfInformer.GetStore().Update(un)
if err != nil {
return fmt.Errorf("failed to update informer store: %w", err)
}
return nil
}

// persistWorkflowSizeLimitErr will fail a the workflow with an error when we hit the resource size limit
// See https://github.com/argoproj/argo-workflows/issues/913
func (woc *wfOperationCtx) persistWorkflowSizeLimitErr(ctx context.Context, wfClient v1alpha1.WorkflowInterface, err error) {
Expand Down
19 changes: 19 additions & 0 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,3 +1614,22 @@ func FindWaitCtrIndex(pod *apiv1.Pod) (int, error) {
}
return waitCtrIndex, nil
}

// OutDateResourceVersion checks whether the resourceVersion is outdated
func OutDateResourceVersion(currentRV, cachedRV string) bool {
// Parse both resourceVersions as integers
current, err2 := strconv.ParseInt(currentRV, 10, 64)
cached, err1 := strconv.ParseInt(cachedRV, 10, 64)

// If either is invalid, assume currentRV is outdated
if err1 != nil || err2 != nil {
return false
}

// If cached RV is much larger than current, then current is old
if current < cached {
return true
}

return false
}
Loading