Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 118 additions & 21 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/parser"
"github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/synce"
"github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/utils"
Expand Down Expand Up @@ -317,8 +319,10 @@
pluginManager plugin.PluginManager

// Track sa_file (authentication) files for monitoring changes
saFileTracker map[string]*saFileInfo
saFileMutex sync.Mutex
saFileTracker map[string]*saFileInfo
saFileMutex sync.Mutex
saFileWatcher *fsnotify.Watcher
restartInProgress atomic.Bool
}

// New LinuxPTP is called by daemon to generate new linuxptp instance
Expand Down Expand Up @@ -348,6 +352,17 @@
ptpEventHandler: event.Init(nodeName, stdoutToSocket, eventSocket, eventChannel, closeManager, Offset, ClockState, ClockClassMetrics),
}
tracker.processManager = pm

// Initialize fsnotify watcher for sa_file change detection
watcher, err := fsnotify.NewWatcher()
if err != nil {
glog.Errorf("Failed to create fsnotify watcher for sa_file monitoring: %v", err)
glog.Warning("sa_file change detection will be disabled")
watcher = nil
} else {
glog.Info("fsnotify watcher initialized for sa_file change detection")
}

return &Daemon{
nodeName: nodeName,
namespace: namespace,
Expand All @@ -362,42 +377,89 @@
readyTracker: tracker,
stopCh: stopCh,
saFileTracker: make(map[string]*saFileInfo),
saFileWatcher: watcher,
}
}

// Run in a for loop to listen for any LinuxPTPConfUpdate changes
// This function handles two types of configuration changes:
// 1. PtpConfig changes (via ConfigMap) - triggers UpdateCh
// 2. Authentication file changes (via Secret) - triggers sa_file check
// 2. Authentication file changes (via Secret) - triggers fsnotify events (instant detection)
// Both trigger applyNodePTPProfiles() which restarts PTP processes WITHOUT restarting the pod
func (dn *Daemon) Run() {
go dn.processManager.ptpEventHandler.ProcessEvents()

// Start sa_file monitoring ticker (check every 5 seconds for faster detection)
// This detects when Kubernetes updates the mounted secret file
// Total delay = Kubernetes propagation (1-10s) + our check interval (0-5s)
saFileCheckTicker := time.NewTicker(5 * time.Second)
defer saFileCheckTicker.Stop()
// Setup fsnotify channels (may be nil if watcher initialization failed)
var watcherEvents chan fsnotify.Event
var watcherErrors chan error
if dn.saFileWatcher != nil {
watcherEvents = dn.saFileWatcher.Events
watcherErrors = dn.saFileWatcher.Errors
defer dn.saFileWatcher.Close()
glog.Info("Using fsnotify for instant sa_file change detection")
} else {
glog.Warning("fsnotify unavailable, sa_file change detection disabled")
}

for {
select {
case <-dn.ptpUpdate.UpdateCh:
// PtpConfig change detected via ConfigMap
if dn.restartInProgress.Load() {
glog.Warning("Restart already in progress, skipping ConfigMap change event")
continue
}
glog.Info("PtpConfig change detected, restarting PTP processes")
dn.restartInProgress.Store(true)
err := dn.applyNodePTPProfiles()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not run as goroutines so they are blocking meaning there is no need for this flag.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, there is no concurrent access.. I will fix that

dn.restartInProgress.Store(false)
if err != nil {
glog.Errorf("linuxPTP apply node profile failed: %v", err)
}
case <-saFileCheckTicker.C:
// Check for sa_file (authentication) changes
// When Secret is updated, Kubernetes automatically updates the mounted file

case event, ok := <-watcherEvents:
// File system event on sa_file directory
if !ok {
glog.Error("fsnotify watcher channel closed, disabling sa_file monitoring")
watcherEvents = nil
continue
}
// Filter for relevant events (Write, Create - Kubernetes atomic updates)
// Ignore events on temporary/hidden files
if event.Op&(fsnotify.Write|fsnotify.Create) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should delete also be handled as a change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean "deleting the sa_file" then the controller actually removes the volumeMount in case the user delete it from the ptp4lconf or manually from the file system of the container. which forces a restart on the daemonset.

continue
}
if strings.HasPrefix(filepath.Base(event.Name), ".") {
continue // Ignore hidden files like ..data
}

glog.Infof("File system event: %s (op: %s)", event.Name, event.Op.String())

// Prevent restart flooding from multiple rapid events
if dn.restartInProgress.Load() {
glog.Info("Restart already in progress, skipping file change event")
continue
}

// Check if any tracked sa_file actually changed
if dn.checkSaFileChanges() {
glog.Info("sa_file authentication file changed, restarting PTP processes")
dn.restartInProgress.Store(true)
err := dn.applyNodePTPProfiles()
dn.restartInProgress.Store(false)
if err != nil {
glog.Errorf("linuxPTP apply node profile failed after sa_file change: %v", err)
}
}

case err, ok := <-watcherErrors:
// fsnotify watcher error
if !ok {
watcherErrors = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you recreate the watcher here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should keep recreate until watcher is not having error

continue
}
glog.Errorf("fsnotify watcher error: %v", err)

case <-dn.stopCh:
dn.stopAllProcesses()
glog.Infof("linuxPTP stop signal received, existing..")
Expand All @@ -417,7 +479,7 @@
defer file.Close()

hash := sha256.New()
if _, err := io.Copy(hash, file); err != nil {

Check failure on line 482 in pkg/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / Run GolangCI-Lint

shadow: declaration of "err" shadows declaration at line 475 (govet)
return "", err
}

Expand Down Expand Up @@ -489,13 +551,28 @@

// updateSaFileTracking updates the tracking information for sa_files from current profiles
// Called after profiles are applied to initialize or update sa_file monitoring
// Extracts sa_file paths from ptp4lConf and sets up hash-based change detection
// Extracts sa_file paths from ptp4lConf and sets up fsnotify watches for instant change detection
// Reuses existing hashes when file path hasn't changed to avoid redundant computation
func (dn *Daemon) updateSaFileTracking() {
dn.saFileMutex.Lock()
defer dn.saFileMutex.Unlock()

// Clear old tracking
// Save old tracking to reuse hashes for unchanged files
oldTracker := dn.saFileTracker

// Remove all existing watches first
if dn.saFileWatcher != nil {
for _, info := range oldTracker {
if info.filePath != "" {
dirPath := filepath.Dir(info.filePath)
dn.saFileWatcher.Remove(dirPath)
}
}
}

// Create new tracking
dn.saFileTracker = make(map[string]*saFileInfo)
watchedDirs := make(map[string]bool)

// Add tracking for each profile that has sa_file configured
for _, profile := range dn.ptpUpdate.NodeProfiles {
Expand All @@ -510,24 +587,44 @@

glog.Infof("Tracking sa_file %s for profile %s", saFilePath, *profile.Name)

// Compute initial hash
// Optimize: Reuse hash if same file path in old tracker (avoid recomputation)
hash := ""
if _, err := os.Stat(saFilePath); err == nil {
if h, err := computeFileHash(saFilePath); err == nil {
hash = h
glog.Infof("Initialized tracking for sa_file %s (hash: %.16s...)", saFilePath, h)
if oldInfo, exists := oldTracker[*profile.Name]; exists && oldInfo.filePath == saFilePath {
// File path unchanged, reuse existing hash
hash = oldInfo.fileHash
glog.Infof("Reusing existing hash for sa_file %s (hash: %.16s...)", saFilePath, hash)
} else {
// New file or path changed, compute hash
if _, err := os.Stat(saFilePath); err == nil {
if h, err := computeFileHash(saFilePath); err == nil {

Check failure on line 599 in pkg/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / Run GolangCI-Lint

shadow: declaration of "err" shadows declaration at line 598 (govet)
hash = h
glog.Infof("Computed new hash for sa_file %s (hash: %.16s...)", saFilePath, h)
} else {
glog.Warningf("Failed to compute hash for sa_file %s: %v", saFilePath, err)
}
} else {
glog.Warningf("Failed to compute initial hash for sa_file %s: %v", saFilePath, err)
glog.Warningf("sa_file %s does not exist yet for profile %s", saFilePath, *profile.Name)
}
} else {
glog.Warningf("sa_file %s does not exist yet for profile %s", saFilePath, *profile.Name)
}

dn.saFileTracker[*profile.Name] = &saFileInfo{
profileName: *profile.Name,
filePath: saFilePath,
fileHash: hash,
}

// Setup fsnotify watch on directory (not file, due to Kubernetes symlinks)
if dn.saFileWatcher != nil {
dirPath := filepath.Dir(saFilePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be location where all sa_files are mounted. This path should controlled by the ptp-operator so is known ahead of time.

Instead of relying on the ptpconfig it should just recursively watch all dirs within that path.

That way theirs no need to have this late binding approach it can just be there from the start.

if !watchedDirs[dirPath] {
if err := dn.saFileWatcher.Add(dirPath); err != nil {
glog.Errorf("Failed to watch directory %s for sa_file changes: %v", dirPath, err)
} else {
glog.Infof("Watching directory %s for instant sa_file change detection", dirPath)
watchedDirs[dirPath] = true
}
}
}
}
}

Expand Down
Loading