Skip to content
This repository was archived by the owner on Nov 8, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ linters:
- unparam
- usestdlibvars
- whitespace
- wrapcheck
settings:
gocritic:
disabled-checks:
Expand Down
6 changes: 3 additions & 3 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
cmd.Dir = s.repoPath
revision, err := cmd.CombinedOutput()
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("failed to determine git revision: %w", err)
}
var res []*unstructured.Unstructured
for i := range s.paths {
Expand All @@ -80,7 +80,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
}
data, err := os.ReadFile(path)
if err != nil {
return err
return fmt.Errorf("failed to read file %s: %w", path, err)
}
items, err := kube.SplitYAML(data)
if err != nil {
Expand All @@ -89,7 +89,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
res = append(res, items...)
return nil
}); err != nil {
return nil, "", err
return nil, "", fmt.Errorf("failed to parse %s: %w", s.paths[i], err)
}
}
for i := range res {
Expand Down
39 changes: 22 additions & 17 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,15 @@ func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
func (c *clusterCache) startMissingWatches() error {
apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
if err != nil {
return err
return fmt.Errorf("failed to get APIResources: %w", err)
}
client, err := c.kubectl.NewDynamicClient(c.config)
if err != nil {
return err
return fmt.Errorf("failed to create client: %w", err)
}
clientset, err := kubernetes.NewForConfig(c.config)
if err != nil {
return err
return fmt.Errorf("failed to create clientset: %w", err)
}
namespacedResources := make(map[schema.GroupKind]bool)
for i := range apis {
Expand Down Expand Up @@ -584,7 +584,7 @@ func runSynced(lock sync.Locker, action func() error) error {
// The callback should not wait on any locks that may be held by other callers.
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
return "", err
return "", fmt.Errorf("failed to acquire list semaphore: %w", err)
}
defer c.listSemaphore.Release(1)

Expand All @@ -610,12 +610,16 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
retryCount++
c.log.Info(fmt.Sprintf("Error while listing resources: %v (try %d/%d)", ierr, retryCount, c.listRetryLimit))
}
//nolint:wrapcheck // wrap outside the retry
return ierr
}
resourceVersion = res.GetResourceVersion()
return nil
})
return res, err
if err != nil {
return res, fmt.Errorf("failed to list resources: %w", err)
}
return res, nil
})
listPager.PageBufferSize = c.listPageBufferSize
listPager.PageSize = c.listPageSize
Expand Down Expand Up @@ -672,11 +676,12 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
if apierrors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
}
//nolint:wrapcheck // wrap outside the retry
return res, err
},
})
if err != nil {
return err
return fmt.Errorf("failed to create resource watcher: %w", err)
}

defer func() {
Expand Down Expand Up @@ -826,7 +831,7 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
return false, fmt.Errorf("failed to create self subject access review: %w", err)
}
if resp != nil && resp.Status.Allowed {
return true, nil
Expand All @@ -839,7 +844,7 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth
sar.Spec.ResourceAttributes.Namespace = ns
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
return false, fmt.Errorf("failed to create self subject access review: %w", err)
}
if resp != nil && resp.Status.Allowed {
return true, nil
Expand Down Expand Up @@ -883,12 +888,12 @@ func (c *clusterCache) sync() error {
config := c.config
version, err := c.kubectl.GetServerVersion(config)
if err != nil {
return err
return fmt.Errorf("failed to get server version: %w", err)
}
c.serverVersion = version
apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings())
if err != nil {
return err
return fmt.Errorf("failed to get api resources: %w", err)
}
c.apiResources = apiResources

Expand All @@ -905,15 +910,15 @@ func (c *clusterCache) sync() error {

apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
if err != nil {
return err
return fmt.Errorf("failed to get api resources: %w", err)
}
client, err := c.kubectl.NewDynamicClient(c.config)
if err != nil {
return err
return fmt.Errorf("failed to create client: %w", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
return fmt.Errorf("failed to create clientset: %w", err)
}

if c.batchEventsProcessing {
Expand Down Expand Up @@ -1251,7 +1256,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
if apierrors.IsNotFound(err) {
return nil
}
return err
return fmt.Errorf("unexpected error getting managed object: %w", err)
}
}
} else if _, watched := c.apisMeta[key.GroupKind()]; !watched {
Expand All @@ -1261,7 +1266,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
if apierrors.IsNotFound(err) {
return nil
}
return err
return fmt.Errorf("unexpected error getting managed object: %w", err)
}
}
}
Expand All @@ -1276,7 +1281,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
if apierrors.IsNotFound(err) {
return nil
}
return err
return fmt.Errorf("unexpected error getting managed object: %w", err)
}
} else {
managedObj = converted
Expand All @@ -1288,7 +1293,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
return nil
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get managed objects: %w", err)
}

return managedObjs, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/references.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func isStatefulSetChild(un *unstructured.Unstructured) (func(kube.ResourceKey) b
sts := appsv1.StatefulSet{}
data, err := json.Marshal(un)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to marshal unstructured object: %w", err)
}
err = json.Unmarshal(data, &sts)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshal statefulset: %w", err)
}

templates := sts.Spec.VolumeClaimTemplates
Expand Down
45 changes: 23 additions & 22 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,15 @@ func apply(tvConfig, tvLive *typed.TypedValue, p *SMDParams) (*typed.TypedValue,
if err != nil {
return nil, fmt.Errorf("error while running updater.Apply: %w", err)
}
return mergedLive, err
return mergedLive, nil
}

func buildManagerInfoForApply(manager string) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: manager,
Operation: metav1.ManagedFieldsOperationApply,
}
//nolint:wrapcheck // trivial function, wrapped nicely by the caller
return fieldmanager.BuildManagerIdentifier(&managerInfo)
}

Expand Down Expand Up @@ -489,13 +490,13 @@ func handleResourceCreateOrDeleteDiff(config, live *unstructured.Unstructured) (
if live != nil {
liveData, err := json.Marshal(live)
if err != nil {
return nil, err
return nil, fmt.Errorf("error marshaling live resource: %w", err)
}
return &DiffResult{Modified: false, NormalizedLive: liveData, PredictedLive: []byte("null")}, nil
} else if config != nil {
predictedLiveData, err := json.Marshal(config.Object)
if err != nil {
return nil, err
return nil, fmt.Errorf("error marshaling config resource: %w", err)
}
return &DiffResult{Modified: true, NormalizedLive: []byte("null"), PredictedLive: predictedLiveData}, nil
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
// Apply the patchBytes patch against liveBytes, using predictedLive to indicate the k8s data type
predictedLiveBytes, err := strategicpatch.StrategicMergePatch(liveBytes, patchBytes, predictedLive)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct strategic merge patch: %w", err)
}

// Unmarshal predictedLiveBytes into predictedLive; note that this will discard JSON fields in predictedLiveBytes
Expand All @@ -562,7 +563,7 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
// to its k8s resource type (eg the JSON may contain those invalid fields that we do not wish to discard).
predictedLiveBytes, err = strategicpatch.StrategicMergePatch(predictedLiveBytes, patch, predictedLive.DeepCopyObject())
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct strategic merge patch for predicted state: %w", err)
}

// 3) Unmarshall into a map[string]any, then back into byte[], to ensure the fields
Expand All @@ -571,11 +572,11 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
var result map[string]any
err = json.Unmarshal([]byte(predictedLiveBytes), &result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to unmarshal strategic merge patch for predicted state: %w", err)
}
predictedLiveBytes, err = json.Marshal(result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal strategic merge patch for predicted state: %w", err)
}
}

Expand All @@ -595,18 +596,18 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
}
liveBytes, err = strategicpatch.StrategicMergePatch(liveBytes, patch, live.DeepCopyObject())
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct strategic merge patch for live state: %w", err)
}

// Ensure the fields are sorted in a consistent order (as above)
var result map[string]any
err = json.Unmarshal([]byte(liveBytes), &result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to unmarshal strategic merge patch for live state: %w", err)
}
liveBytes, err = json.Marshal(result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal strategic merge patch for live state: %w", err)
}
}

Expand Down Expand Up @@ -662,7 +663,7 @@ func ThreeWayDiff(orig, config, live *unstructured.Unstructured) (*DiffResult, e
// 2. get expected live object by applying the patch against the live object
liveBytes, err := json.Marshal(live)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to marshal live state: %w", err)
}

var predictedLiveBytes []byte
Expand All @@ -677,7 +678,7 @@ func ThreeWayDiff(orig, config, live *unstructured.Unstructured) (*DiffResult, e
// Otherwise, merge patch directly as JSON
predictedLiveBytes, err = jsonpatch.MergePatch(liveBytes, patchBytes)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to construct merge patch for predicted state: %w", err)
}
}

Expand Down Expand Up @@ -733,11 +734,11 @@ func statefulSetWorkaround(orig, live *unstructured.Unstructured) *unstructured.
func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte, func() (runtime.Object, error), error) {
origBytes, err := json.Marshal(orig.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal original object: %w", err)
}
configBytes, err := json.Marshal(config.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal config object: %w", err)
}

if versionedObject, err := scheme.Scheme.New(orig.GroupVersionKind()); err == nil {
Expand All @@ -748,16 +749,16 @@ func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte,

liveBytes, err := json.Marshal(live.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal live object: %w", err)
}

lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct lookup patch: %w", err)
}
patch, err := strategicpatch.CreateThreeWayMergePatch(origBytes, configBytes, liveBytes, lookupPatchMeta, true)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct thre way merge patch: %w", err)
}
newVersionedObject := func() (runtime.Object, error) {
return scheme.Scheme.New(orig.GroupVersionKind())
Expand All @@ -770,12 +771,12 @@ func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte,

liveBytes, err := json.Marshal(live.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal live object: %w", err)
}

patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(origBytes, configBytes, liveBytes)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct thre way merge patch: %w", err)
}
return patch, nil, nil
}
Expand Down Expand Up @@ -989,15 +990,15 @@ func normalizeRole(un *unstructured.Unstructured, o options) {
func CreateTwoWayMergePatch(orig, new, dataStruct any) ([]byte, bool, error) {
origBytes, err := json.Marshal(orig)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("failed to marshal orig object: %w", err)
}
newBytes, err := json.Marshal(new)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("failed to marshal new object: %w", err)
}
patch, err := strategicpatch.CreateTwoWayMergePatch(origBytes, newBytes, dataStruct)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("failed to create two way merge patch: %w", err)
}
return patch, string(patch) != "{}", nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/diff/diff_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewK8sServerSideDryRunner(kubeApplier KubeApplier) *K8sServerSideDryRunner
// obj and the given manager in dryrun mode. Will return the predicted live state
// json as string.
func (kdr *K8sServerSideDryRunner) Run(ctx context.Context, obj *unstructured.Unstructured, manager string) (string, error) {
//nolint:wrapcheck // trivial function, don't bother wrapping
return kdr.dryrunApplier.ApplyResource(ctx, obj, cmdutil.DryRunServer, false, false, true, manager)
}

Expand Down
Loading