Skip to content

Commit f0dabce

Browse files
add DAG for syncwaves
Signed-off-by: SebastienFelix <[email protected]>
1 parent 95b191d commit f0dabce

File tree

7 files changed

+244
-49
lines changed

7 files changed

+244
-49
lines changed

gitops-engine/pkg/sync/common/types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ const (
1212
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
1313
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
1414
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
15+
// AnnotationSyncWaveGroup indicates which wave of the sync the resource or hook should be in
16+
AnnotationSyncWaveGroup = "argocd.argoproj.io/sync-wave-group"
17+
// AnnotationSyncWaveGroupDependencies indicates which wave of the sync the resource or hook should be in
18+
AnnotationSyncWaveGroupDependencies = "argocd.argoproj.io/sync-wave-group-dependencies"
1519
// AnnotationKeyHook contains the hook type of a resource
1620
AnnotationKeyHook = "argocd.argoproj.io/hook"
1721
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook

gitops-engine/pkg/sync/sync_context.go

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"slices"
78
"sort"
89
"strings"
910
"sync"
@@ -560,18 +561,25 @@ func (sc *syncContext) Sync() {
560561
return
561562
}
562563

563-
// remove any tasks not in this wave
564+
// remove any tasks which have unsynced dependencies
564565
phase := tasks.phase()
565566
wave := tasks.wave()
566-
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()
567+
syncIdentities := tasks.syncIdentities()
568+
allSyncIdentities := make([]syncIdentity, 0)
569+
for _, t := range tasks {
570+
if !slices.Contains(allSyncIdentities, t.identity()) {
571+
allSyncIdentities = append(allSyncIdentities, t.identity())
572+
}
573+
}
574+
finalWave := phase == tasks.lastPhase() && len(syncIdentities) == len(allSyncIdentities)
567575

568576
// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
569577
// EVEN if those objects subsequently degraded
570578
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
571-
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
579+
remainingTasks := tasks.Filter(func(t *syncTask) bool { return !slices.Contains(syncIdentities, t.identity()) || t.isHook() })
572580

573581
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
574-
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
582+
tasks = tasks.Filter(func(t *syncTask) bool { return slices.Contains(syncIdentities, t.identity()) })
575583

576584
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
577585

@@ -909,52 +917,62 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
909917
}
910918

911919
// for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order)
912-
pruneTasks := make(map[int][]*syncTask)
920+
921+
tasksByWaveGroup := make(map[int][]*syncTask)
913922
for _, task := range tasks {
914-
if task.isPrune() {
915-
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
916-
}
923+
tasksByWaveGroup[task.waveGroup()] = append(tasksByWaveGroup[task.waveGroup()], task)
917924
}
925+
for waveGroup := range tasksByWaveGroup {
926+
pruneTasks := make(map[int][]*syncTask)
927+
for _, task := range tasksByWaveGroup[waveGroup] {
928+
if task.isPrune() {
929+
pruneTasks[task.wave()] = append(pruneTasks[task.wave()], task)
930+
}
931+
}
918932

919-
var uniquePruneWaves []int
920-
for k := range pruneTasks {
921-
uniquePruneWaves = append(uniquePruneWaves, k)
922-
}
923-
sort.Ints(uniquePruneWaves)
933+
var uniquePruneWaves []int
934+
for k := range pruneTasks {
935+
uniquePruneWaves = append(uniquePruneWaves, k)
936+
}
937+
sort.Ints(uniquePruneWaves)
924938

925-
// reorder waves for pruning tasks using symmetric swap on prune waves
926-
n := len(uniquePruneWaves)
927-
for i := 0; i < n/2; i++ {
928-
// waves to swap
929-
startWave := uniquePruneWaves[i]
930-
endWave := uniquePruneWaves[n-1-i]
939+
// reorder waves for pruning tasks using symmetric swap on prune waves
931940

932-
for _, task := range pruneTasks[startWave] {
933-
task.waveOverride = &endWave
934-
}
941+
n := len(uniquePruneWaves)
942+
for j := 0; j < n/2; j++ {
943+
// waves to swap
944+
startWave := uniquePruneWaves[j]
945+
endWave := uniquePruneWaves[n-1-j]
935946

936-
for _, task := range pruneTasks[endWave] {
937-
task.waveOverride = &startWave
947+
for _, task := range pruneTasks[startWave] {
948+
task.waveOverride = &endWave
949+
}
950+
951+
for _, task := range pruneTasks[endWave] {
952+
task.waveOverride = &startWave
953+
}
938954
}
939-
}
940955

941-
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
942-
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
943-
syncPhaseLastWave := 0
944-
for _, task := range tasks {
945-
if task.phase == common.SyncPhaseSync {
946-
if task.wave() > syncPhaseLastWave {
947-
syncPhaseLastWave = task.wave()
956+
// for pruneLast tasks, modify the wave to sync phase last wave of tasks + 1
957+
// to ensure proper cleanup, syncPhaseLastWave should also consider prune tasks to determine last wave
958+
959+
syncPhaseLastWave := 0
960+
for _, task := range tasksByWaveGroup[waveGroup] {
961+
if task.phase == common.SyncPhaseSync {
962+
if task.wave() > syncPhaseLastWave {
963+
syncPhaseLastWave = task.wave()
964+
}
948965
}
949966
}
950-
}
951-
syncPhaseLastWave = syncPhaseLastWave + 1
967+
syncPhaseLastWave = syncPhaseLastWave + 1
952968

953-
for _, task := range tasks {
954-
if task.isPrune() &&
955-
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
956-
task.waveOverride = &syncPhaseLastWave
969+
for _, task := range tasksByWaveGroup[waveGroup] {
970+
if task.isPrune() &&
971+
(sc.pruneLast || resourceutil.HasAnnotationOption(task.liveObj, common.AnnotationSyncOptions, common.SyncOptionPruneLast)) {
972+
task.waveOverride = &syncPhaseLastWave
973+
}
957974
}
975+
958976
}
959977

960978
tasks.Sort()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package sync
2+
3+
import (
4+
"github.com/argoproj/gitops-engine/pkg/sync/common"
5+
)
6+
7+
// syncTask holds the live and target object. At least one should be non-nil. A targetObj of nil
8+
// indicates the live object needs to be pruned. A liveObj of nil indicates the object has yet to
9+
// be deployed
10+
type syncIdentity struct {
11+
phase common.SyncPhase
12+
wave int
13+
waveGroup int
14+
}

gitops-engine/pkg/sync/sync_task.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package sync
22

33
import (
44
"fmt"
5+
"strconv"
6+
"strings"
57

68
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
79
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -16,14 +18,16 @@ import (
1618
// indicates the live object needs to be pruned. A liveObj of nil indicates the object has yet to
1719
// be deployed
1820
type syncTask struct {
19-
phase common.SyncPhase
20-
liveObj *unstructured.Unstructured
21-
targetObj *unstructured.Unstructured
22-
skipDryRun bool
23-
syncStatus common.ResultCode
24-
operationState common.OperationPhase
25-
message string
26-
waveOverride *int
21+
phase common.SyncPhase
22+
liveObj *unstructured.Unstructured
23+
targetObj *unstructured.Unstructured
24+
skipDryRun bool
25+
syncStatus common.ResultCode
26+
operationState common.OperationPhase
27+
message string
28+
waveOverride *int
29+
waveGroupOverride *int
30+
waveGroupDependenciesOverride *string
2731
}
2832

2933
func ternary(val bool, a, b string) string {
@@ -63,6 +67,35 @@ func (t *syncTask) wave() int {
6367
return syncwaves.Wave(t.obj())
6468
}
6569

70+
func (t *syncTask) waveGroup() int {
71+
if t.waveGroupOverride != nil {
72+
return *t.waveGroupOverride
73+
}
74+
return syncwaves.WaveGroup(t.obj())
75+
}
76+
77+
func (t *syncTask) identity() syncIdentity {
78+
return syncIdentity{t.phase, t.wave(), t.waveGroup()}
79+
}
80+
81+
func (t *syncTask) waveGroupDependencies() []int {
82+
if t.waveGroupDependenciesOverride != nil {
83+
waveGroup := t.waveGroup()
84+
stringWaveGroupDependencies := strings.Split(*t.waveGroupDependenciesOverride, ",")
85+
waveGroupDependencies := make([]int, 0)
86+
for _, t := range stringWaveGroupDependencies {
87+
waveGroupDependency, err := strconv.Atoi(t)
88+
if err == nil {
89+
if waveGroupDependency < waveGroup {
90+
waveGroupDependencies = append(waveGroupDependencies, waveGroupDependency)
91+
}
92+
}
93+
}
94+
return waveGroupDependencies
95+
}
96+
return syncwaves.WaveGroupDependencies(t.obj())
97+
}
98+
6699
func (t *syncTask) isHook() bool {
67100
return hook.IsHook(t.obj())
68101
}

gitops-engine/pkg/sync/sync_tasks.go

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sync
22

33
import (
44
"fmt"
5+
"slices"
56
"sort"
67
"strings"
78

@@ -79,9 +80,10 @@ func (s syncTasks) Swap(i, j int) {
7980

8081
// order is
8182
// 1. phase
82-
// 2. wave
83-
// 3. kind
84-
// 4. name
83+
// 2. waveGroup
84+
// 3. wave
85+
// 4. kind
86+
// 5. name
8587
func (s syncTasks) Less(i, j int) bool {
8688
tA := s[i]
8789
tB := s[j]
@@ -91,6 +93,48 @@ func (s syncTasks) Less(i, j int) bool {
9193
return d < 0
9294
}
9395

96+
d = tA.waveGroup() - tB.waveGroup()
97+
if d != 0 {
98+
return d < 0
99+
}
100+
101+
d = tA.wave() - tB.wave()
102+
if d != 0 {
103+
return d < 0
104+
}
105+
106+
a := tA.obj()
107+
b := tB.obj()
108+
109+
// we take advantage of the fact that if the kind is not in the kindOrder map,
110+
// then it will return the default int value of zero, which is the highest value
111+
d = kindOrder[a.GetKind()] - kindOrder[b.GetKind()]
112+
if d != 0 {
113+
return d < 0
114+
}
115+
116+
return a.GetName() < b.GetName()
117+
}
118+
119+
func (s syncTasks) LessSyncTask(tA, tB syncTask) bool {
120+
121+
d := syncPhaseOrder[tA.phase] - syncPhaseOrder[tB.phase]
122+
if d != 0 {
123+
return d < 0
124+
}
125+
126+
if slices.Contains(tB.waveGroupDependencies(), tA.waveGroup()) {
127+
return true
128+
}
129+
130+
if slices.Contains(tA.waveGroupDependencies(), tB.waveGroup()) {
131+
return false
132+
}
133+
134+
if tA.waveGroup() != tB.waveGroup() {
135+
return false
136+
}
137+
94138
d = tA.wave() - tB.wave()
95139
if d != 0 {
96140
return d < 0
@@ -250,20 +294,66 @@ func (s syncTasks) phase() common.SyncPhase {
250294
return ""
251295
}
252296

297+
func (s syncTasks) waveGroups() []int {
298+
if len(s) > 0 {
299+
firstWaveGroups := make([]int, 0)
300+
for i, firstSyncTask := range s {
301+
belongsToFirstWaveGroups := true
302+
for _, secondSyncTask := range s[:i] {
303+
if s.LessSyncTask(*secondSyncTask, *firstSyncTask) {
304+
belongsToFirstWaveGroups = false
305+
}
306+
}
307+
if belongsToFirstWaveGroups && !slices.Contains(firstWaveGroups, s[i].waveGroup()) {
308+
firstWaveGroups = append(firstWaveGroups, s[i].waveGroup())
309+
}
310+
}
311+
return firstWaveGroups
312+
}
313+
return make([]int, 0)
314+
}
315+
253316
func (s syncTasks) wave() int {
254317
if len(s) > 0 {
255318
return s[0].wave()
256319
}
257320
return 0
258321
}
259322

323+
func (s syncTasks) syncIdentities() []syncIdentity {
324+
if len(s) > 0 {
325+
firstSyncIdentities := make([]syncIdentity, 0)
326+
for i, firstSyncTask := range s {
327+
belongsToFirstWaveGroups := true
328+
for _, secondSyncTask := range s[:i] {
329+
if s.LessSyncTask(*secondSyncTask, *firstSyncTask) {
330+
belongsToFirstWaveGroups = false
331+
}
332+
}
333+
wi := &syncIdentity{s[i].phase, s[i].wave(), s[i].waveGroup()}
334+
if belongsToFirstWaveGroups && !slices.Contains(firstSyncIdentities, *wi) {
335+
firstSyncIdentities = append(firstSyncIdentities, *wi)
336+
}
337+
}
338+
return firstSyncIdentities
339+
}
340+
return make([]syncIdentity, 0)
341+
}
342+
260343
func (s syncTasks) lastPhase() common.SyncPhase {
261344
if len(s) > 0 {
262345
return s[len(s)-1].phase
263346
}
264347
return ""
265348
}
266349

350+
func (s syncTasks) lastGroup() int {
351+
if len(s) > 0 {
352+
return s[len(s)-1].waveGroup()
353+
}
354+
return 0
355+
}
356+
267357
func (s syncTasks) lastWave() int {
268358
if len(s) > 0 {
269359
return s[len(s)-1].wave()

0 commit comments

Comments
 (0)