Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/KubeArmorOperator/cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var Cmd = &cobra.Command{
},
Run: func(cmd *cobra.Command, args []string) {
nodeWatcher := controllers.NewClusterWatcher(&o)
go nodeWatcher.WatchConfigCrd()
nodeWatcher.WatchConfigCrd()
nodeWatcher.WatchNodes()

},
Expand Down
69 changes: 66 additions & 3 deletions pkg/KubeArmorOperator/internal/controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type ClusterWatcher struct {
Secv1Client *secv1client.Clientset
Daemonsets map[string]int
DaemonsetsLock *sync.Mutex
snitchInFlight sync.Map
}
type Node struct {
Name string
Expand Down Expand Up @@ -139,6 +140,7 @@ func extractPathFromMessage(message string) (string, bool) {

func (clusterWatcher *ClusterWatcher) checkJobStatus(job, runtime, nodename string) {
defer func() {
clusterWatcher.snitchInFlight.Delete(nodename)
clusterWatcher.Log.Infof("checkJobStatus completed for job: %s", job)
}()

Expand Down Expand Up @@ -251,6 +253,53 @@ func (clusterWatcher *ClusterWatcher) checkJobStatus(job, runtime, nodename stri
}
}

func nodeMatchesGlobalSelector(node *corev1.Node) bool {
for k, v := range common.GlobalNodeSelectors {
if v == "-" {
// skip deleted entries
continue
}
if nodeVal, ok := node.Labels[k]; !ok || nodeVal != v {
return false
}
}
return true
}

// deploySnichForMatchingNodes lists all cluster nodes and deploys snitch jobs for linux
// nodes that match the global node selector
func (clusterWatcher *ClusterWatcher) deploySnichForMatchingNodes() {
nodes, err := clusterWatcher.Client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
clusterWatcher.Log.Errorf("Cannot list nodes for snitch deployment: %s", err.Error())
return
}
for _, node := range nodes.Items {
if val, ok := node.Labels[common.OsLabel]; !ok || val != "linux" {
continue
}
if !nodeMatchesGlobalSelector(&node) {
continue
}
if _, loaded := clusterWatcher.snitchInFlight.LoadOrStore(node.Name, struct{}{}); loaded {
clusterWatcher.Log.Infof("Snitch already in progress for node %s, skipping", node.Name)
continue
}
runtime := node.Status.NodeInfo.ContainerRuntimeVersion
runtime = strings.Split(runtime, ":")[0]
clusterWatcher.Log.Infof("Installing snitch on node %s", node.Name)
snitchJob, err := clusterWatcher.Client.BatchV1().Jobs(common.Namespace).Create(
context.Background(), deploySnitch(node.Name, runtime), v1.CreateOptions{})
if err != nil {
clusterWatcher.snitchInFlight.Delete(node.Name)
clusterWatcher.Log.Errorf("Cannot run snitch on node %s, error=%s", node.Name, err.Error())
continue
}
clusterWatcher.Log.Infof("Snitch was installed on node %s", node.Name)
go clusterWatcher.checkJobStatus(snitchJob.Name, runtime, node.Name)
}
}

func (clusterWatcher *ClusterWatcher) WatchNodes() {
log := clusterWatcher.Log
nodeInformer := informer.Core().V1().Nodes().Informer()
Expand All @@ -259,10 +308,15 @@ func (clusterWatcher *ClusterWatcher) WatchNodes() {
if node, ok := obj.(*corev1.Node); ok {
runtime := node.Status.NodeInfo.ContainerRuntimeVersion
runtime = strings.Split(runtime, ":")[0]
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" {
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" && common.OperatorConfigCrd != nil && nodeMatchesGlobalSelector(node) {
if _, loaded := clusterWatcher.snitchInFlight.LoadOrStore(node.Name, struct{}{}); loaded {
log.Infof("Snitch already in progress for node %s, skipping", node.Name)
return
}
log.Infof("Installing snitch on node %s", node.Name)
snitchJob, err := clusterWatcher.Client.BatchV1().Jobs(common.Namespace).Create(context.Background(), deploySnitch(node.Name, runtime), v1.CreateOptions{})
if err != nil {
clusterWatcher.snitchInFlight.Delete(node.Name)
log.Errorf("Cannot run snitch on node %s, error=%s", node.Name, err.Error())
return
}
Expand All @@ -283,7 +337,7 @@ func (clusterWatcher *ClusterWatcher) WatchNodes() {
runtime := node.Status.NodeInfo.ContainerRuntimeVersion
runtime = strings.Split(runtime, ":")[0]
clusterWatcher.Log.Infof("Node might have been restarted, redeploying snitch ")
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" {
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" && nodeMatchesGlobalSelector(node) {
log.Infof("Installing snitch on node %s", node.Name)
snitchJob, err := clusterWatcher.Client.BatchV1().Jobs(common.Namespace).Create(context.Background(), deploySnitch(node.Name, runtime), v1.CreateOptions{})
if err != nil {
Expand All @@ -295,7 +349,7 @@ func (clusterWatcher *ClusterWatcher) WatchNodes() {
}
}
}
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" && oldRand != node.Labels[common.RandLabel] {
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" && nodeMatchesGlobalSelector(node) && oldRand != node.Labels[common.RandLabel] {
newNode := Node{}
newNode.Name = node.Name
if val, ok := node.Labels[common.EnforcerLabel]; ok {
Expand Down Expand Up @@ -454,7 +508,9 @@ func (clusterWatcher *ClusterWatcher) WatchConfigCrd() {
// mark it as current operating config crd
if cfg.Status.Phase == common.RUNNING {
common.OperatorConfigCrd = &cfg
UpdateImages(&common.OperatorConfigCrd.Spec)
if firstRun {
go clusterWatcher.deploySnichForMatchingNodes()
go clusterWatcher.WatchRequiredResources()
firstRun = false
}
Expand All @@ -473,6 +529,7 @@ func (clusterWatcher *ClusterWatcher) WatchConfigCrd() {
UpdatedSeccomp(&cfg.Spec)
UpdateRecommendedPolicyConfig(&cfg.Spec)
utils.UpdateControllerPort(&cfg.Spec)
go clusterWatcher.deploySnichForMatchingNodes()
// update status to (Installation) Created
go clusterWatcher.UpdateCrdStatus(cfg.Name, common.CREATED, common.CREATED_MSG)
go clusterWatcher.WatchRequiredResources()
Expand All @@ -491,6 +548,7 @@ func (clusterWatcher *ClusterWatcher) WatchConfigCrd() {
if cfg, ok := newObj.(*opv1.KubeArmorConfig); ok {
// update configmap only if it's operating crd
if common.OperatorConfigCrd != nil && cfg.Name == common.OperatorConfigCrd.Name {
oldCfg := oldObj.(*opv1.KubeArmorConfig)
configChanged := UpdateConfigMapData(&cfg.Spec)
imageUpdated := UpdateImages(&cfg.Spec)
controllerPortUpdated := utils.UpdateControllerPort(&cfg.Spec)
Expand All @@ -499,6 +557,11 @@ func (clusterWatcher *ClusterWatcher) WatchConfigCrd() {
tlsUpdated := UpdateTlsData(&cfg.Spec)
UpdateRecommendedPolicyConfig(&cfg.Spec)

// if globalNodeSelector expanded, deploy snitch for newly matching nodes
if !reflect.DeepEqual(oldCfg.Spec.GlobalNodeSelector, cfg.Spec.GlobalNodeSelector) {
go clusterWatcher.deploySnichForMatchingNodes()
}

// return if only status has been updated
if !tlsUpdated && !relayEnvUpdated && !configChanged && cfg.Status != oldObj.(*opv1.KubeArmorConfig).Status && len(imageUpdated) < 1 && !controllerPortUpdated {
return
Expand Down
37 changes: 37 additions & 0 deletions pkg/KubeArmorOperator/internal/controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package controller
import (
"testing"

"github.com/kubearmor/KubeArmor/pkg/KubeArmorOperator/common"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestUpdateEnvIfDefinedAndUpdated(t *testing.T) {
Expand Down Expand Up @@ -184,3 +186,38 @@ func TestAddorUpdateNodeSelector(t *testing.T) {
})
})
}

func TestNodeMatchesGlobalSelector(t *testing.T) {
origSelectors := common.GlobalNodeSelectors
defer func() { common.GlobalNodeSelectors = origSelectors }()

node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"env": "prod", "zone": "us-east"}}}

// empty selector should match any node
common.GlobalNodeSelectors = map[string]string{}
assert.True(t, nodeMatchesGlobalSelector(node))

// matching selector
common.GlobalNodeSelectors = map[string]string{"env": "prod"}
assert.True(t, nodeMatchesGlobalSelector(node))

// multiple selectors all match
common.GlobalNodeSelectors = map[string]string{"env": "prod", "zone": "us-east"}
assert.True(t, nodeMatchesGlobalSelector(node))

// missing label
common.GlobalNodeSelectors = map[string]string{"tier": "frontend"}
assert.False(t, nodeMatchesGlobalSelector(node))

// value mismatch
common.GlobalNodeSelectors = map[string]string{"env": "staging"}
assert.False(t, nodeMatchesGlobalSelector(node))

// partial mismatch with multiple selectors
common.GlobalNodeSelectors = map[string]string{"env": "prod", "zone": "eu-west"}
assert.False(t, nodeMatchesGlobalSelector(node))

// deleted entry should be skipped
common.GlobalNodeSelectors = map[string]string{"env": "-", "zone": "us-east"}
assert.True(t, nodeMatchesGlobalSelector(node))
}
9 changes: 8 additions & 1 deletion pkg/KubeArmorOperator/internal/controller/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,14 @@ func (clusterWatcher *ClusterWatcher) AreAllNodesProcessed() bool {
clusterWatcher.Log.Warnf("Cannot list nodes, error=%s", err.Error())
return false
}
if !(len(nodes.Items) == processedNodes) {
matchingNodes := 0
for i := range nodes.Items {
node := &nodes.Items[i]
if val, ok := node.Labels[common.OsLabel]; ok && val == "linux" && nodeMatchesGlobalSelector(node) {
matchingNodes++
}
}
if matchingNodes != processedNodes {
return false
}

Expand Down