Skip to content
120 changes: 117 additions & 3 deletions images/agent/internal/controller/lvg/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"

Expand All @@ -16,6 +17,7 @@ import (
"github.com/deckhouse/sds-node-configurator/images/agent/internal/utils"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/uuid"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -102,11 +104,16 @@ func (d *Discoverer) LVMVolumeGroupDiscoverReconcile(ctx context.Context) bool {
d.log.Info("[RunLVMVolumeGroupDiscoverController] no BlockDevices were found")
return false
}
d.log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] BlockDevices: %+v", blockDevices))

filteredLVGs := filterLVGsByNode(currentLVMVGs, d.cfg.NodeName)
filteredBlockDevices := filterBlockDevicesByNodeName(blockDevices, d.cfg.NodeName)
d.log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] Filtered LVMVolumeGroups: %+v", filteredLVGs))
d.log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] Filtered BlockDevices: %+v", filteredBlockDevices))

d.log.Debug("[RunLVMVolumeGroupDiscoverController] tries to get LVMVolumeGroup candidates")
candidates, err := d.GetLVMVolumeGroupCandidates(blockDevices)

candidates, err := d.GetLVMVolumeGroupCandidates(filteredBlockDevices)
if err != nil {
d.log.Error(err, "[RunLVMVolumeGroupDiscoverController] unable to run GetLVMVolumeGroupCandidates")
for _, lvg := range filteredLVGs {
Expand All @@ -132,9 +139,9 @@ func (d *Discoverer) LVMVolumeGroupDiscoverReconcile(ctx context.Context) bool {

shouldRequeue := false
for _, candidate := range candidates {
d.log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] candidate: %+v", candidate))
if lvg, exist := filteredLVGs[candidate.ActualVGNameOnTheNode]; exist {
d.log.Debug(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] the LVMVolumeGroup %s is already exist. Tries to update it", lvg.Name))
d.log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] candidate: %+v", candidate))
d.log.Trace(fmt.Sprintf("[RunLVMVolumeGroupDiscoverController] lvg: %+v", lvg))

if !hasLVMVolumeGroupDiff(d.log, lvg, candidate) {
Expand Down Expand Up @@ -470,6 +477,13 @@ func (d *Discoverer) UpdateLVMVolumeGroupByCandidate(
lvg.Status.VGFree = candidate.VGFree
lvg.Status.VGUuid = candidate.VGUUID

_, err = updateBlockDeviceSelectorIfNeeded(lvg.Spec.BlockDeviceSelector, candidate.BlockDevicesNames)
if err != nil {
return fmt.Errorf("updating block device selectors: %w", err)
}

d.log.Trace(fmt.Sprintf("[UpdateLVMVolumeGroupByCandidate] updated LVMVolumeGroup: %+v", lvg))

start := time.Now()
err = d.cl.Status().Update(ctx, lvg)
d.metrics.APIMethodsDuration(DiscovererName, "update").Observe(d.metrics.GetEstimatedTimeInSeconds(start))
Expand Down Expand Up @@ -799,12 +813,19 @@ func hasLVMVolumeGroupDiff(log logger.Logger, lvg v1alpha1.LVMVolumeGroup, candi
log.Trace(fmt.Sprintf(`VGUUID, candidate: %s, lvg: %s`, candidate.VGUUID, lvg.Status.VGUuid))
log.Trace(fmt.Sprintf(`Nodes, candidate: %+v, lvg: %+v`, convertLVMVGNodes(candidate.Nodes), lvg.Status.Nodes))

notMatchedBlockDeviceNames, err := notMatchedBlockDeviceNames(lvg.Spec.BlockDeviceSelector, candidate.BlockDevicesNames)
if err != nil {
log.Error(err, "[hasLVMVolumeGroupDiff] unable to parse blockDeviceSelector")
notMatchedBlockDeviceNames = []string{}
}

return candidate.AllocatedSize.Value() != lvg.Status.AllocatedSize.Value() ||
hasStatusPoolDiff(convertedStatusPools, lvg.Status.ThinPools) ||
candidate.VGSize.Value() != lvg.Status.VGSize.Value() ||
candidate.VGFree.Value() != lvg.Status.VGFree.Value() ||
candidate.VGUUID != lvg.Status.VGUuid ||
hasStatusNodesDiff(log, convertLVMVGNodes(candidate.Nodes), lvg.Status.Nodes)
hasStatusNodesDiff(log, convertLVMVGNodes(candidate.Nodes), lvg.Status.Nodes) ||
len(notMatchedBlockDeviceNames) > 0
}

func hasStatusNodesDiff(log logger.Logger, first, second []v1alpha1.LVMVolumeGroupNode) bool {
Expand Down Expand Up @@ -869,6 +890,99 @@ func configureBlockDeviceSelector(candidate internal.LVMVolumeGroupCandidate) *m
}
}

func notMatchedBlockDeviceNames(labelSelector *metav1.LabelSelector, blockDeviceNames []string) ([]string, error) {
fullSelector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, fmt.Errorf("parsing label selector: %w", err)
}

requirements, _ := fullSelector.Requirements()
blockDeviceRequirements := requirements[0:0]
for _, requirement := range requirements {
if requirement.Key() == internal.MetadataNameLabelKey {
blockDeviceRequirements = append(blockDeviceRequirements, requirement)
}
}

blockDeviceNameSelector := labels.NewSelector().Add(blockDeviceRequirements...)
var notMatchedBlockDeviceNames []string
for _, blockDeviceName := range blockDeviceNames {
if blockDeviceNameSelector.Matches(labels.Set{
internal.MetadataNameLabelKey: blockDeviceName,
}) {
continue
}
notMatchedBlockDeviceNames = append(notMatchedBlockDeviceNames, blockDeviceName)
}
return notMatchedBlockDeviceNames, nil
}

func appendDeviceNamesToLabelSelector(labelSelector *metav1.LabelSelector, blockDeviceNames []string) {
if len(blockDeviceNames) == 0 {
return
}

var expressionToAddTo *[]string

// find existing expression to add
for i, expression := range labelSelector.MatchExpressions {
if expression.Key == internal.MetadataNameLabelKey && expression.Operator == metav1.LabelSelectorOpIn {
expressionToAddTo = &labelSelector.MatchExpressions[i].Values
break
}
}

if expressionToAddTo == nil {
// Create new expression in list
labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{},
})
expressionToAddTo = &labelSelector.MatchExpressions[len(labelSelector.MatchExpressions)-1].Values
}

*expressionToAddTo = append(*expressionToAddTo, blockDeviceNames...)
value, exists := labelSelector.MatchLabels[internal.MetadataNameLabelKey]
if exists {
*expressionToAddTo = append(*expressionToAddTo, value)
delete(labelSelector.MatchLabels, internal.MetadataNameLabelKey)
}
}

// Add missing block device to label selector.
//
// If labelSelector is provided it will be changed by this call.
//
// If labelSelector is created or updated it will be returned in updatedLabelSelector argument.
//
// If labelSelector was not changed the updatedLabelSelector will be nil.
func updateBlockDeviceSelectorIfNeeded(labelSelector *metav1.LabelSelector, blockDeviceNames []string) (updatedLabelSelector *metav1.LabelSelector, err error) {
if labelSelector == nil {
return &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: slices.Clone(blockDeviceNames),
},
},
}, nil
}

notMatchedBlockDeviceNames, err := notMatchedBlockDeviceNames(labelSelector, blockDeviceNames)
if err != nil {
return nil, err
}

if len(notMatchedBlockDeviceNames) == 0 {
return nil, nil
}

appendDeviceNamesToLabelSelector(labelSelector, notMatchedBlockDeviceNames)
return labelSelector, nil
}

func convertLVMVGNodes(nodes map[string][]internal.LVMVGDevice) []v1alpha1.LVMVolumeGroupNode {
lvmvgNodes := make([]v1alpha1.LVMVolumeGroupNode, 0, len(nodes))

Expand Down
193 changes: 193 additions & 0 deletions images/agent/internal/controller/lvg/discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ limitations under the License.

import (
"context"
"fmt"
"slices"
"testing"

"github.com/deckhouse/sds-node-configurator/api/v1alpha1"
Expand Down Expand Up @@ -755,6 +757,15 @@ func TestLVMVolumeGroupDiscover(t *testing.T) {
Spec: v1alpha1.LVMVolumeGroupSpec{
ThinPools: convertSpecThinPools(specThinPools),
Type: specType,
BlockDeviceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: blockDevicesNames,
},
},
},
},
Status: v1alpha1.LVMVolumeGroupStatus{
AllocatedSize: resource.MustParse("9765625Ki"),
Expand Down Expand Up @@ -871,6 +882,188 @@ func TestLVMVolumeGroupDiscover(t *testing.T) {
assert.Equal(t, message, lvg.Status.Conditions[0].Message)
}
})

t.Run("labelSelectorUpdates", func(t *testing.T) {
allDeviceNames := []string{"dev1", "dev_2", "dev-3"}
t.Run("doNotUpdate", func(t *testing.T) {
t.Run("inMatchExpressions", func(t *testing.T) {
selector := metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: allDeviceNames,
},
},
}
selectorCopy := selector.DeepCopy()
newSelector, err := updateBlockDeviceSelectorIfNeeded(selectorCopy, allDeviceNames)
assert.NoError(t, err)
assert.Nil(t, newSelector)
assert.EqualValues(t, selector, *selectorCopy)
})

t.Run("withOtherRequirements", func(t *testing.T) {
selector := metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: allDeviceNames,
}, {
Key: "otherKey",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"foo", "bar"},
},
},
}
selectorCopy := selector.DeepCopy()
newSelector, err := updateBlockDeviceSelectorIfNeeded(selectorCopy, allDeviceNames)
assert.NoError(t, err)
assert.Nil(t, newSelector)
assert.EqualValues(t, selector, *selectorCopy)
})

t.Run("withOtherDevices", func(t *testing.T) {
otherDevices := []string{"otherDevice1", "otherDevice2"}
selector := metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: append(allDeviceNames, otherDevices...),
},
},
}
selectorCopy := selector.DeepCopy()
newSelector, err := updateBlockDeviceSelectorIfNeeded(selectorCopy, allDeviceNames)
assert.NoError(t, err)
assert.Nil(t, newSelector)
assert.EqualValues(t, selector, *selectorCopy)
})
})
t.Run("createIfNil", func(t *testing.T) {
newSelector, err := updateBlockDeviceSelectorIfNeeded(nil, allDeviceNames)
assert.NoError(t, err)
assert.NotNil(t, newSelector)
assert.Len(t, newSelector.MatchExpressions, 1)
assert.Equal(t, internal.MetadataNameLabelKey, newSelector.MatchExpressions[0].Key)
assert.Equal(t, metav1.LabelSelectorOpIn, newSelector.MatchExpressions[0].Operator)
assert.EqualValues(t, allDeviceNames, newSelector.MatchExpressions[0].Values)

t.Run("doNotUpdateSecondTime", func(t *testing.T) {
newSelector2, err := updateBlockDeviceSelectorIfNeeded(newSelector, allDeviceNames)
assert.NoError(t, err)
assert.Nil(t, newSelector2)
})
})

for i := range allDeviceNames {
notExistingDevices := allDeviceNames[0:i]
existingDevices := allDeviceNames[i:]
selectors := map[string]metav1.LabelSelector{
"onlyOurKeys": {
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: slices.Clone(existingDevices),
},
},
},
// TODO: We don't cover this case yet
// "onlyOurKeysTwice": {
// MatchExpressions: []metav1.LabelSelectorRequirement{
// {
// Key: internal.MetadataNameLabelKey,
// Operator: metav1.LabelSelectorOpIn,
// Values: slices.Clone(existingDevices),
// }, {
// Key: internal.MetadataNameLabelKey,
// Operator: metav1.LabelSelectorOpIn,
// Values: slices.Clone(existingDevices),
// },
// },
// },
"withOtherKeys": {
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: internal.MetadataNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: slices.Clone(existingDevices),
},
{
Key: "other/key",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"foo", "bar"},
},
},
},
}

for selectorName, selectorToTest := range selectors {
selector := selectorToTest.DeepCopy()
t.Run(fmt.Sprintf("missingDevices %v with selector %s", notExistingDevices, selectorName), func(t *testing.T) {
if len(existingDevices) == 1 {
t.Run("inMatchLabels", func(t *testing.T) {
newSelector, err := updateBlockDeviceSelectorIfNeeded(&metav1.LabelSelector{
MatchLabels: map[string]string{
internal.MetadataNameLabelKey: existingDevices[0],
},
}, allDeviceNames)
assert.NoError(t, err)
assert.NotNil(t, newSelector)

t.Run("doNotUpdateSecondTime", func(t *testing.T) {
newSelector2, err := updateBlockDeviceSelectorIfNeeded(newSelector.DeepCopy(), allDeviceNames)
assert.NoError(t, err)
assert.Nil(t, newSelector2)
})
})
}

for i := range notExistingDevices {
notExistingDevicesToAdd := notExistingDevices[i:]
t.Run(fmt.Sprintf("notMatchedBlockDeviceNames %v", notExistingDevicesToAdd), func(t *testing.T) {
notMatched, err := notMatchedBlockDeviceNames(selector.DeepCopy(), notExistingDevicesToAdd)
assert.NoError(t, err)
assert.EqualValues(t, notExistingDevicesToAdd, notMatched)
})

for _, devicesToAdd := range [][]string{
notExistingDevicesToAdd,
append(notExistingDevicesToAdd, existingDevices...),
append(existingDevices, notExistingDevicesToAdd...),
} {
t.Run(fmt.Sprintf("append %v to %v", devicesToAdd, existingDevices), func(t *testing.T) {
newSelector, err := updateBlockDeviceSelectorIfNeeded(selector.DeepCopy(), devicesToAdd)
assert.NoError(t, err)
if len(devicesToAdd) == 0 {
assert.Nil(t, newSelector)
} else {
assert.NotNil(t, newSelector)
}

if newSelector != nil {
t.Run("doNotUpdateSecondTime", func(t *testing.T) {
newSelector2, err := updateBlockDeviceSelectorIfNeeded(newSelector.DeepCopy(), devicesToAdd)
assert.NoError(t, err)
assert.Nil(t, newSelector2)
})

t.Run("notMatchedIsEmpty", func(t *testing.T) {
notMatched, err := notMatchedBlockDeviceNames(newSelector.DeepCopy(), devicesToAdd)
assert.NoError(t, err)
assert.Empty(t, notMatched)
})
}
})
}
}
})
}
}
})
}

func setupDiscoverer(opts *DiscovererConfig) *Discoverer {
Expand Down
Loading