Skip to content

Commit f7413a4

Browse files
feat(syncwaves): add DAG ordering for syncwaves
Signed-off-by: SebastienFelix <[email protected]>
1 parent 95b191d commit f7413a4

File tree

12 files changed

+452
-57
lines changed

12 files changed

+452
-57
lines changed

controller/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ func hasSharedResourceCondition(app *v1alpha1.Application) (bool, string) {
571571
// Note, this is not foolproof, since a proper fix would require the CRD record
572572
// status.observedGeneration coupled with a health.lua that verifies
573573
// status.observedGeneration == metadata.generation
574-
func delayBetweenSyncWaves(_ common.SyncPhase, _ int, finalWave bool) error {
574+
func delayBetweenSyncWaves(_ []common.SyncIdentity, finalWave bool) error {
575575
if !finalWave {
576576
delaySec := 2
577577
if delaySecStr := os.Getenv(EnvVarSyncWaveDelay); delaySecStr != "" {

gitops-engine/pkg/sync/common/types.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ const (
1212
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
1313
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
1414
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
15+
// AnnotationSyncWaveGroup indicates which wave of the sync the resource or hook should be in
16+
AnnotationSyncWaveGroup = "argocd.argoproj.io/sync-wave-group"
17+
// AnnotationSyncWaveGroupDependencies indicates which wave of the sync the resource or hook should be in
18+
AnnotationSyncWaveGroupDependencies = "argocd.argoproj.io/sync-wave-group-dependencies"
1519
// AnnotationKeyHook contains the hook type of a resource
1620
AnnotationKeyHook = "argocd.argoproj.io/hook"
1721
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
@@ -59,10 +63,16 @@ type PermissionValidator func(un *unstructured.Unstructured, res *metav1.APIReso
5963

6064
type SyncPhase string
6165

66+
type SyncIdentity struct {
67+
Phase SyncPhase
68+
Wave int
69+
WaveGroup int
70+
}
71+
6272
// SyncWaveHook is a callback function which will be invoked after each sync wave is successfully
6373
// applied during a sync operation. The callback indicates which phase and wave it had just
6474
// executed, and whether or not that wave was the final one.
65-
type SyncWaveHook func(phase SyncPhase, wave int, final bool) error
75+
type SyncWaveHook func(t []SyncIdentity, final bool) error
6676

6777
const (
6878
SyncPhasePreSync = "PreSync"

gitops-engine/pkg/sync/doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat
7575
annotations:
7676
argocd.argoproj.io/sync-wave: "5"
7777
78+
# Sync Groups
79+
80+
The wave groups allow to define independant/dependent sync processes
81+
7882
# Sync Options
7983
8084
The sync options allows customizing the synchronization of selected resources. The options are specified using the
@@ -89,6 +93,7 @@ How Does It Work Together?
8993
Syncing process orders the resources in the following precedence:
9094
9195
- The phase
96+
- The group with respect to group dependencies
9297
- The wave they are in (lower values first)
9398
- By kind (e.g. namespaces first)
9499
- By name

gitops-engine/pkg/sync/sync_context.go

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"slices"
78
"sort"
89
"strings"
910
"sync"
@@ -560,26 +561,29 @@ func (sc *syncContext) Sync() {
560561
return
561562
}
562563

563-
// remove any tasks not in this wave
564+
// remove any tasks which have unsynced dependencies
564565
phase := tasks.phase()
565-
wave := tasks.wave()
566-
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
566+
independantSyncIdentities := tasks.independantSyncIdentities()
567+
allSyncIdentities := tasks.syncIdentities()
567568

568569
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
569570
// EVEN if those objects subsequently degraded
570571
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
571-
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
572+
remainingTasks := tasks.Filter(func(t *syncTask) bool {
573+
return !slices.Contains(independantSyncIdentities, t.identity()) || t.isHook()
574+
})
572575

573-
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
574-
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
576+
sc.log.WithValues("phase", phase, "independantSyncIdentities", independantSyncIdentities, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
577+
tasks = tasks.Filter(func(t *syncTask) bool { return slices.Contains(independantSyncIdentities, t.identity()) })
575578

576579
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
577580

578581
sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
579582
runState := sc.runTasks(tasks, false)
580583

581584
if sc.syncWaveHook != nil && runState != failed {
582-
err := sc.syncWaveHook(phase, wave, finalWave)
585+
finalWave := phase == tasks.lastPhase() && len(independantSyncIdentities) == len(allSyncIdentities)
586+
err := sc.syncWaveHook(independantSyncIdentities, finalWave)
583587
if err != nil {
584588
sc.deleteHooks(hooksPendingDeletionFailed)
585589
sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err))
@@ -909,52 +913,61 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
909913
}
910914

911915
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
912-
pruneTasks := make(map[int][]*syncTask)
916+
917+
tasksByWaveGroup := make(map[int][]*syncTask)
913918
for _, task := range tasks {
914-
if task.isPrune() {
915-
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
916-
}
919+
tasksByWaveGroup[task.waveGroup()] = append(tasksByWaveGroup[task.waveGroup()], task)
917920
}
921+
for waveGroup := range tasksByWaveGroup {
922+
pruneTasks := make(map[int][]*syncTask)
923+
for _, task := range tasksByWaveGroup[waveGroup] {
924+
if task.isPrune() {
925+
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
926+
}
927+
}
918928

919-
var uniquePruneWaves []int
920-
for k := range pruneTasks {
921-
uniquePruneWaves = append(uniquePruneWaves, k)
922-
}
923-
sort.Ints(uniquePruneWaves)
929+
var uniquePruneWaves []int
930+
for k := range pruneTasks {
931+
uniquePruneWaves = append(uniquePruneWaves, k)
932+
}
933+
sort.Ints(uniquePruneWaves)
924934

925-
// reorder waves for pruning tasks using symmetric swap on prune waves
926-
n := len(uniquePruneWaves)
927-
for i := 0; i < n/2; i++ {
928-
// waves to swap
929-
startWave := uniquePruneWaves[i]
930-
endWave := uniquePruneWaves[n-1-i]
935+
// reorder waves for pruning tasks using symmetric swap on prune waves
936+
n := len(uniquePruneWaves)
937+
for j := 0; j < n/2; j++ {
938+
// waves to swap
939+
startWave := uniquePruneWaves[j]
940+
endWave := uniquePruneWaves[n-1-j]
931941

932-
for _, task := range pruneTasks[startWave] {
933-
task.waveOverride = &endWave
934-
}
942+
for _, task := range pruneTasks[startWave] {
943+
task.waveOverride = &endWave
944+
}
935945

936-
for _, task := range pruneTasks[endWave] {
937-
task.waveOverride = &startWave
946+
for _, task := range pruneTasks[endWave] {
947+
task.waveOverride = &startWave
948+
}
938949
}
939-
}
940950

941-
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
942-
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
943-
syncPhaseLastWave := 0
944-
for _, task := range tasks {
945-
if task.phase == common.SyncPhaseSync {
946-
if task.wave() > syncPhaseLastWave {
947-
syncPhaseLastWave = task.wave()
951+
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
952+
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
953+
954+
syncPhaseLastWave := 0
955+
for _, task := range tasksByWaveGroup[waveGroup] {
956+
if task.phase == common.SyncPhaseSync {
957+
if task.wave() > syncPhaseLastWave {
958+
syncPhaseLastWave = task.wave()
959+
}
948960
}
949961
}
950-
}
951-
syncPhaseLastWave = syncPhaseLastWave + 1
962+
syncPhaseLastWave = syncPhaseLastWave + 1
952963

953-
for _, task := range tasks {
954-
if task.isPrune() &&
955-
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
956-
task.waveOverride = &syncPhaseLastWave
964+
for _, task := range tasksByWaveGroup[waveGroup] {
965+
if task.isPrune() &&
966+
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
967+
task.waveOverride = &syncPhaseLastWave
968+
}
957969
}
970+
958971
}
959972

960973
tasks.Sort()

gitops-engine/pkg/sync/sync_context_test.go

Lines changed: 131 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,10 +1793,11 @@ func TestSyncWaveHook(t *testing.T) {
17931793
syncCtx.hooks = []*unstructured.Unstructured{pod3}
17941794

17951795
called := false
1796-
syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error {
1796+
syncCtx.syncWaveHook = func(syncIdentities []synccommon.SyncIdentity, final bool) error {
17971797
called = true
1798-
assert.Equal(t, synccommon.SyncPhaseSync, string(phase))
1799-
assert.Equal(t, -1, wave)
1798+
assert.Equal(t, 1, len(syncIdentities))
1799+
assert.Equal(t, synccommon.SyncPhaseSync, string(syncIdentities[0].Phase))
1800+
assert.Equal(t, -1, syncIdentities[0].Wave)
18001801
assert.False(t, final)
18011802
return nil
18021803
}
@@ -1806,7 +1807,7 @@ func TestSyncWaveHook(t *testing.T) {
18061807
// call sync again, it should not invoke the SyncWaveHook callback since we only should be
18071808
// doing this after an apply, and not every reconciliation
18081809
called = false
1809-
syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error {
1810+
syncCtx.syncWaveHook = func(_ []synccommon.SyncIdentity, _ bool) error {
18101811
called = true
18111812
return nil
18121813
}
@@ -1819,10 +1820,11 @@ func TestSyncWaveHook(t *testing.T) {
18191820
pod1Res.HookPhase = synccommon.OperationSucceeded
18201821
syncCtx.syncRes[resourceResultKey(pod1Res.ResourceKey, synccommon.SyncPhaseSync)] = pod1Res
18211822
called = false
1822-
syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error {
1823+
syncCtx.syncWaveHook = func(syncIdentities []synccommon.SyncIdentity, final bool) error {
18231824
called = true
1824-
assert.Equal(t, synccommon.SyncPhaseSync, string(phase))
1825-
assert.Equal(t, 0, wave)
1825+
assert.Equal(t, 1, len(syncIdentities))
1826+
assert.Equal(t, synccommon.SyncPhaseSync, string(syncIdentities[0].Phase))
1827+
assert.Equal(t, 0, syncIdentities[0].Wave)
18261828
assert.False(t, final)
18271829
return nil
18281830
}
@@ -1835,17 +1837,135 @@ func TestSyncWaveHook(t *testing.T) {
18351837
pod2Res.HookPhase = synccommon.OperationSucceeded
18361838
syncCtx.syncRes[resourceResultKey(pod2Res.ResourceKey, synccommon.SyncPhaseSync)] = pod2Res
18371839
called = false
1838-
syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error {
1840+
syncCtx.syncWaveHook = func(syncIdentities []synccommon.SyncIdentity, final bool) error {
18391841
called = true
1840-
assert.Equal(t, synccommon.SyncPhasePostSync, string(phase))
1841-
assert.Equal(t, 0, wave)
1842+
assert.Equal(t, 1, len(syncIdentities))
1843+
assert.Equal(t, synccommon.SyncPhasePostSync, string(syncIdentities[0].Phase))
1844+
assert.Equal(t, 0, syncIdentities[0].Wave)
18421845
assert.True(t, final)
18431846
return nil
18441847
}
18451848
syncCtx.Sync()
18461849
assert.True(t, called)
18471850
}
18481851

1852+
func TestSyncWaveGroup(t *testing.T) {
1853+
syncCtx := newTestSyncCtx(nil, WithOperationSettings(false, false, false, false))
1854+
pod1 := testingutils.NewPod()
1855+
pod1.SetName("pod-1")
1856+
pod1.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "-1", synccommon.AnnotationSyncWaveGroup: "0", synccommon.AnnotationSyncWaveGroupDependencies: ""})
1857+
pod2 := testingutils.NewPod()
1858+
pod2.SetName("pod-2")
1859+
pod2.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "0", synccommon.AnnotationSyncWaveGroup: "0", synccommon.AnnotationSyncWaveGroupDependencies: ""})
1860+
pod3 := testingutils.NewPod()
1861+
pod3.SetName("pod-3")
1862+
pod3.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "3", synccommon.AnnotationSyncWaveGroup: "1", synccommon.AnnotationSyncWaveGroupDependencies: ""})
1863+
pod4 := testingutils.NewPod()
1864+
pod4.SetName("pod-4")
1865+
pod4.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1", synccommon.AnnotationSyncWaveGroup: "2", synccommon.AnnotationSyncWaveGroupDependencies: "1"})
1866+
pod5 := testingutils.NewPod()
1867+
pod5.SetName("pod-5")
1868+
pod5.SetAnnotations(map[string]string{synccommon.AnnotationSyncWave: "1", synccommon.AnnotationSyncWaveGroup: "3", synccommon.AnnotationSyncWaveGroupDependencies: "0,1"})
1869+
1870+
syncCtx.resources = groupResources(ReconciliationResult{
1871+
Live: []*unstructured.Unstructured{nil, nil, nil, nil, nil},
1872+
Target: []*unstructured.Unstructured{pod1, pod2, pod3, pod4, pod5},
1873+
})
1874+
1875+
called := false
1876+
syncCtx.syncWaveHook = func(syncIdentities []synccommon.SyncIdentity, final bool) error {
1877+
called = true
1878+
assert.Equal(t, 2, len(syncIdentities))
1879+
assert.Equal(t, synccommon.SyncPhaseSync, string(syncIdentities[0].Phase))
1880+
assert.Equal(t, -1, syncIdentities[0].Wave)
1881+
assert.Equal(t, 0, syncIdentities[0].WaveGroup)
1882+
assert.Equal(t, 3, syncIdentities[1].Wave)
1883+
assert.Equal(t, 1, syncIdentities[1].WaveGroup)
1884+
assert.False(t, final)
1885+
return nil
1886+
}
1887+
1888+
originalSyncTasks, _ := syncCtx.getSyncTasks()
1889+
assert.Equal(t, 5, len(originalSyncTasks))
1890+
_, _, originalStates := syncCtx.GetState()
1891+
assert.Equal(t, 0, len(originalStates))
1892+
1893+
// call sync. pod-1 and pod-3 should be processed first. Verify we invoke SyncWaveHook call after applying first waves
1894+
1895+
syncCtx.Sync()
1896+
assert.True(t, called)
1897+
1898+
_, _, results := syncCtx.GetState()
1899+
assert.Equal(t, 2, len(results))
1900+
1901+
pod1Res := results[0]
1902+
pod1Res.HookPhase = synccommon.OperationSucceeded
1903+
syncCtx.syncRes[resourceResultKey(pod1Res.ResourceKey, synccommon.SyncPhaseSync)] = pod1Res
1904+
1905+
pod2Res := results[1]
1906+
pod2Res.HookPhase = synccommon.OperationSucceeded
1907+
syncCtx.syncRes[resourceResultKey(pod2Res.ResourceKey, synccommon.SyncPhaseSync)] = pod2Res
1908+
1909+
// Here I would like to see :
1910+
// syncCtx.resources[0].Target and syncCtx.resources[2].Target not nil because pod-1 and pod-3 have been synced
1911+
// syncTasks should contain 3 elements : pod-2, pod-4 and pod-5
1912+
// but syncCtx.getSyncTasks() returns 5 elements
1913+
1914+
/*####### /!\ THIS TEST SHOULD PASS /!\ #######
1915+
1916+
syncTasks2, _ := syncCtx.getSyncTasks()
1917+
assert.Equal(t, 3, len(syncTasks2))
1918+
1919+
######## /!\ BUT IT DOES NOT /!\ #####*/
1920+
1921+
// call sync again, pod-2 and pod-4 should be synced during this sync. Verify we invoke SyncWaveHook call after applying second waves
1922+
1923+
called = false
1924+
syncCtx.syncWaveHook = func(_ []synccommon.SyncIdentity, _ bool) error {
1925+
called = true
1926+
return nil
1927+
}
1928+
syncCtx.Sync()
1929+
assert.True(t, called)
1930+
1931+
_, _, results2 := syncCtx.GetState()
1932+
assert.Equal(t, 4, len(results2))
1933+
1934+
pod3Res := results2[2]
1935+
pod3Res.HookPhase = synccommon.OperationSucceeded
1936+
syncCtx.syncRes[resourceResultKey(pod3Res.ResourceKey, synccommon.SyncPhaseSync)] = pod3Res
1937+
1938+
pod4Res := results2[3]
1939+
pod4Res.HookPhase = synccommon.OperationSucceeded
1940+
syncCtx.syncRes[resourceResultKey(pod4Res.ResourceKey, synccommon.SyncPhaseSync)] = pod4Res
1941+
1942+
// complete last wave, then call Sync again. Verify we invoke another SyncWaveHook call after applying last wave
1943+
called = false
1944+
syncCtx.syncWaveHook = func(syncIdentities []synccommon.SyncIdentity, final bool) error {
1945+
called = true
1946+
return nil
1947+
}
1948+
syncCtx.Sync()
1949+
assert.True(t, called)
1950+
1951+
_, _, results3 := syncCtx.GetState()
1952+
assert.Equal(t, 5, len(results3))
1953+
1954+
pod5Res := results3[0]
1955+
pod5Res.HookPhase = synccommon.OperationSucceeded
1956+
syncCtx.syncRes[resourceResultKey(pod5Res.ResourceKey, synccommon.SyncPhaseSync)] = pod5Res
1957+
1958+
// no remaining wave. Verify we don't invoke another SyncWaveHook call
1959+
1960+
called = false
1961+
syncCtx.syncWaveHook = func(syncIdentities []synccommon.SyncIdentity, final bool) error {
1962+
called = true
1963+
return nil
1964+
}
1965+
syncCtx.Sync()
1966+
assert.False(t, called)
1967+
}
1968+
18491969
func TestSyncWaveHookFail(t *testing.T) {
18501970
syncCtx := newTestSyncCtx(nil, WithOperationSettings(false, false, false, false))
18511971
pod1 := testingutils.NewPod()
@@ -1857,7 +1977,7 @@ func TestSyncWaveHookFail(t *testing.T) {
18571977
})
18581978

18591979
called := false
1860-
syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error {
1980+
syncCtx.syncWaveHook = func(_ []synccommon.SyncIdentity, _ bool) error {
18611981
called = true
18621982
return errors.New("intentional error")
18631983
}

0 commit comments

Comments
 (0)