Skip to content
This repository was archived by the owner on Nov 8, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ linters:
- unparam
- usestdlibvars
- whitespace
- wrapcheck
settings:
gocritic:
disabled-checks:
Expand Down
6 changes: 3 additions & 3 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
cmd.Dir = s.repoPath
revision, err := cmd.CombinedOutput()
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf("failed to determine git revision: %w", err)
}
var res []*unstructured.Unstructured
for i := range s.paths {
Expand All @@ -80,7 +80,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
}
data, err := os.ReadFile(path)
if err != nil {
return err
return fmt.Errorf("failed to read file %s: %w", path, err)
}
items, err := kube.SplitYAML(data)
if err != nil {
Expand All @@ -89,7 +89,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
res = append(res, items...)
return nil
}); err != nil {
return nil, "", err
return nil, "", fmt.Errorf("failed to parse %s: %w", s.paths[i], err)
}
}
for i := range res {
Expand Down
39 changes: 22 additions & 17 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,15 +526,15 @@ func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
func (c *clusterCache) startMissingWatches() error {
apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
if err != nil {
return err
return fmt.Errorf("failed to get APIResources: %w", err)
}
client, err := c.kubectl.NewDynamicClient(c.config)
if err != nil {
return err
return fmt.Errorf("failed to create client: %w", err)
}
clientset, err := kubernetes.NewForConfig(c.config)
if err != nil {
return err
return fmt.Errorf("failed to create clientset: %w", err)
}
namespacedResources := make(map[schema.GroupKind]bool)
for i := range apis {
Expand Down Expand Up @@ -584,7 +584,7 @@ func runSynced(lock sync.Locker, action func() error) error {
// The callback should not wait on any locks that may be held by other callers.
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
return "", err
return "", fmt.Errorf("failed to acquire list semaphore: %w", err)
}
defer c.listSemaphore.Release(1)

Expand All @@ -610,12 +610,16 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
retryCount++
c.log.Info(fmt.Sprintf("Error while listing resources: %v (try %d/%d)", ierr, retryCount, c.listRetryLimit))
}
//nolint:wrapcheck // wrap outside the retry
return ierr
}
resourceVersion = res.GetResourceVersion()
return nil
})
return res, err
if err != nil {
return res, fmt.Errorf("failed to list resources: %w", err)
}
return res, nil
})
listPager.PageBufferSize = c.listPageBufferSize
listPager.PageSize = c.listPageSize
Expand Down Expand Up @@ -672,11 +676,12 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
if apierrors.IsNotFound(err) {
c.stopWatching(api.GroupKind, ns)
}
//nolint:wrapcheck // wrap outside the retry
return res, err
},
})
if err != nil {
return err
return fmt.Errorf("failed to create resource watcher: %w", err)
}

defer func() {
Expand Down Expand Up @@ -826,7 +831,7 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
return false, fmt.Errorf("failed to create self subject access review: %w", err)
}
if resp != nil && resp.Status.Allowed {
return true, nil
Expand All @@ -839,7 +844,7 @@ func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface auth
sar.Spec.ResourceAttributes.Namespace = ns
resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
return false, fmt.Errorf("failed to create self subject access review: %w", err)
}
if resp != nil && resp.Status.Allowed {
return true, nil
Expand Down Expand Up @@ -883,12 +888,12 @@ func (c *clusterCache) sync() error {
config := c.config
version, err := c.kubectl.GetServerVersion(config)
if err != nil {
return err
return fmt.Errorf("failed to get server version: %w", err)
}
c.serverVersion = version
apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings())
if err != nil {
return err
return fmt.Errorf("failed to get api resources: %w", err)
}
c.apiResources = apiResources

Expand All @@ -905,15 +910,15 @@ func (c *clusterCache) sync() error {

apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
if err != nil {
return err
return fmt.Errorf("failed to get api resources: %w", err)
}
client, err := c.kubectl.NewDynamicClient(c.config)
if err != nil {
return err
return fmt.Errorf("failed to create client: %w", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
return fmt.Errorf("failed to create clientset: %w", err)
}

if c.batchEventsProcessing {
Expand Down Expand Up @@ -1251,7 +1256,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
if apierrors.IsNotFound(err) {
return nil
}
return err
return fmt.Errorf("unexpected error getting managed object: %w", err)
}
}
} else if _, watched := c.apisMeta[key.GroupKind()]; !watched {
Expand All @@ -1261,7 +1266,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
if apierrors.IsNotFound(err) {
return nil
}
return err
return fmt.Errorf("unexpected error getting managed object: %w", err)
}
}
}
Expand All @@ -1276,7 +1281,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
if apierrors.IsNotFound(err) {
return nil
}
return err
return fmt.Errorf("unexpected error getting managed object: %w", err)
}
} else {
managedObj = converted
Expand All @@ -1288,7 +1293,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
return nil
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get managed objects: %w", err)
}

return managedObjs, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/references.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func isStatefulSetChild(un *unstructured.Unstructured) (func(kube.ResourceKey) b
sts := appsv1.StatefulSet{}
data, err := json.Marshal(un)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to marshal unstructured object: %w", err)
}
err = json.Unmarshal(data, &sts)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to unmarshal statefulset: %w", err)
}

templates := sts.Spec.VolumeClaimTemplates
Expand Down
45 changes: 23 additions & 22 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,15 @@ func apply(tvConfig, tvLive *typed.TypedValue, p *SMDParams) (*typed.TypedValue,
if err != nil {
return nil, fmt.Errorf("error while running updater.Apply: %w", err)
}
return mergedLive, err
return mergedLive, nil
}

func buildManagerInfoForApply(manager string) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: manager,
Operation: metav1.ManagedFieldsOperationApply,
}
//nolint:wrapcheck // trivial function, wrapped nicely by the caller
return fieldmanager.BuildManagerIdentifier(&managerInfo)
}

Expand Down Expand Up @@ -489,13 +490,13 @@ func handleResourceCreateOrDeleteDiff(config, live *unstructured.Unstructured) (
if live != nil {
liveData, err := json.Marshal(live)
if err != nil {
return nil, err
return nil, fmt.Errorf("error marshaling live resource: %w", err)
}
return &DiffResult{Modified: false, NormalizedLive: liveData, PredictedLive: []byte("null")}, nil
} else if config != nil {
predictedLiveData, err := json.Marshal(config.Object)
if err != nil {
return nil, err
return nil, fmt.Errorf("error marshaling config resource: %w", err)
}
return &DiffResult{Modified: true, NormalizedLive: []byte("null"), PredictedLive: predictedLiveData}, nil
}
Expand Down Expand Up @@ -539,7 +540,7 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
// Apply the patchBytes patch against liveBytes, using predictedLive to indicate the k8s data type
predictedLiveBytes, err := strategicpatch.StrategicMergePatch(liveBytes, patchBytes, predictedLive)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct strategic merge patch: %w", err)
}

// Unmarshal predictedLiveBytes into predictedLive; note that this will discard JSON fields in predictedLiveBytes
Expand All @@ -562,7 +563,7 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
// to its k8s resource type (eg the JSON may contain those invalid fields that we do not wish to discard).
predictedLiveBytes, err = strategicpatch.StrategicMergePatch(predictedLiveBytes, patch, predictedLive.DeepCopyObject())
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct strategic merge patch for predicted state: %w", err)
}

// 3) Unmarshall into a map[string]any, then back into byte[], to ensure the fields
Expand All @@ -571,11 +572,11 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
var result map[string]any
err = json.Unmarshal([]byte(predictedLiveBytes), &result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to unmarshal strategic merge patch for predicted state: %w", err)
}
predictedLiveBytes, err = json.Marshal(result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal strategic merge patch for predicted state: %w", err)
}
}

Expand All @@ -595,18 +596,18 @@ func applyPatch(liveBytes []byte, patchBytes []byte, newVersionedObject func() (
}
liveBytes, err = strategicpatch.StrategicMergePatch(liveBytes, patch, live.DeepCopyObject())
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct strategic merge patch for live state: %w", err)
}

// Ensure the fields are sorted in a consistent order (as above)
var result map[string]any
err = json.Unmarshal([]byte(liveBytes), &result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to unmarshal strategic merge patch for live state: %w", err)
}
liveBytes, err = json.Marshal(result)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal strategic merge patch for live state: %w", err)
}
}

Expand Down Expand Up @@ -662,7 +663,7 @@ func ThreeWayDiff(orig, config, live *unstructured.Unstructured) (*DiffResult, e
// 2. get expected live object by applying the patch against the live object
liveBytes, err := json.Marshal(live)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to marshal live state: %w", err)
}

var predictedLiveBytes []byte
Expand All @@ -677,7 +678,7 @@ func ThreeWayDiff(orig, config, live *unstructured.Unstructured) (*DiffResult, e
// Otherwise, merge patch directly as JSON
predictedLiveBytes, err = jsonpatch.MergePatch(liveBytes, patchBytes)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to construct merge patch for predicted state: %w", err)
}
}

Expand Down Expand Up @@ -733,11 +734,11 @@ func statefulSetWorkaround(orig, live *unstructured.Unstructured) *unstructured.
func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte, func() (runtime.Object, error), error) {
origBytes, err := json.Marshal(orig.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal original object: %w", err)
}
configBytes, err := json.Marshal(config.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal config object: %w", err)
}

if versionedObject, err := scheme.Scheme.New(orig.GroupVersionKind()); err == nil {
Expand All @@ -748,16 +749,16 @@ func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte,

liveBytes, err := json.Marshal(live.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal live object: %w", err)
}

lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct lookup patch: %w", err)
}
patch, err := strategicpatch.CreateThreeWayMergePatch(origBytes, configBytes, liveBytes, lookupPatchMeta, true)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct thre way merge patch: %w", err)
}
newVersionedObject := func() (runtime.Object, error) {
return scheme.Scheme.New(orig.GroupVersionKind())
Expand All @@ -770,12 +771,12 @@ func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte,

liveBytes, err := json.Marshal(live.Object)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to marshal live object: %w", err)
}

patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(origBytes, configBytes, liveBytes)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("failed to construct thre way merge patch: %w", err)
}
return patch, nil, nil
}
Expand Down Expand Up @@ -989,15 +990,15 @@ func normalizeRole(un *unstructured.Unstructured, o options) {
func CreateTwoWayMergePatch(orig, new, dataStruct any) ([]byte, bool, error) {
origBytes, err := json.Marshal(orig)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("failed to marshal orig object: %w", err)
}
newBytes, err := json.Marshal(new)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("failed to marshal new object: %w", err)
}
patch, err := strategicpatch.CreateTwoWayMergePatch(origBytes, newBytes, dataStruct)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("failed to create two way merge patch: %w", err)
}
return patch, string(patch) != "{}", nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/diff/diff_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewK8sServerSideDryRunner(kubeApplier KubeApplier) *K8sServerSideDryRunner
// obj and the given manager in dryrun mode. Will return the predicted live state
// json as string.
func (kdr *K8sServerSideDryRunner) Run(ctx context.Context, obj *unstructured.Unstructured, manager string) (string, error) {
//nolint:wrapcheck // trivial function, don't bother wrapping
return kdr.dryrunApplier.ApplyResource(ctx, obj, cmdutil.DryRunServer, false, false, true, manager)
}

Expand Down
Loading