Skip to content

Commit eb0b789

Browse files
Improve restart of pmc montoring
The polling interval was cause delays in killing processed My moving into a go routine processes it no longer blocks actions such as stopping the process.
1 parent 4f3f58d commit eb0b789

File tree

5 files changed

+117
-51
lines changed

5 files changed

+117
-51
lines changed

pkg/daemon/daemon.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,12 @@ func (p *ProcessManager) EmitClockClassLogs(c net.Conn) {
209209
pmc := dp.(*PMCProcess)
210210
// If set then use current else get value
211211
if pmc.parentDS == nil {
212-
if err := pmc.Poll(); err != nil {
213-
glog.Errorf("Failed to fetch pmc PARENT_DATA_SET: %s", err)
214-
continue
212+
go pmc.Poll()
213+
for { // Wait for pmc.parentDS to be updated by Poll
214+
time.Sleep(10 * time.Millisecond)
215+
if pmc.parentDS != nil {
216+
break
217+
}
215218
}
216219
}
217220
pmc.EmitClockClassLogs(c)

pkg/daemon/pmc.go

Lines changed: 71 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func NewPMCProcess(runID int, eventHandler *event.EventHandler, clockType string
2828
configFileName: fmt.Sprintf("ptp4l.%d.config", runID),
2929
messageTag: fmt.Sprintf("[ptp4l.%d.config:{level}]", runID),
3030
monitorParentData: true,
31+
parentDSCh: make(chan protocol.ParentDataSet, 10),
3132
eventHandler: eventHandler,
3233
clockType: clockType,
3334
}
@@ -43,6 +44,7 @@ type PMCProcess struct {
4344
monitorParentData bool
4445
monitorCMLDS bool
4546
parentDS *protocol.ParentDataSet
47+
parentDSCh chan protocol.ParentDataSet
4648
exitCh chan struct{}
4749
clockType string
4850
c net.Conn
@@ -73,10 +75,8 @@ func (pmc *PMCProcess) getAndSetStopped(val bool) bool {
7375
// CmdStop signals the process to stop.
7476
func (pmc *PMCProcess) CmdStop() {
7577
pmc.getAndSetStopped(true)
76-
// Close the channel to broadcast stop signal to all receivers
7778
select {
7879
case <-pmc.exitCh:
79-
// Already closed
8080
default:
8181
close(pmc.exitCh)
8282
}
@@ -127,16 +127,6 @@ func (pmc *PMCProcess) EmitClockClassLogs(c net.Conn) {
127127
go pmc.eventHandler.EmitClockClass(pmc.configFileName, pmc.c)
128128
}
129129

130-
// Poll polls the parent data set from PMC.
131-
func (pmc *PMCProcess) Poll() error {
132-
parentDS, err := pmcPkg.RunPMCExpGetParentDS(pmc.configFileName)
133-
if err != nil {
134-
return err
135-
}
136-
pmc.handleParentDS(parentDS)
137-
return nil
138-
}
139-
140130
// CmdRun starts the PMC monitoring process.
141131
func (pmc *PMCProcess) CmdRun(stdToSocket bool) {
142132
isStopped := pmc.getAndSetStopped(false)
@@ -161,26 +151,41 @@ func (pmc *PMCProcess) CmdRun(stdToSocket bool) {
161151
}
162152
monitorErr := pmc.Monitor(c)
163153
if monitorErr == nil && pmc.Stopped() {
164-
// No error completed gracefully
165154
return
166155
}
167156
}
168157
}()
169158
}
170159

171-
func (pmc *PMCProcess) monitor(conn net.Conn) error {
172-
if conn != nil {
173-
pmc.c = conn
160+
// workerSignal represents a signal from the expectWorker to the main monitor loop
161+
type workerSignal struct {
162+
err error
163+
restartProcess bool
164+
}
165+
166+
// Poll runs a Poll operation in a goroutine and sends the result to the struct's ParentDataSet channel
167+
func (pmc *PMCProcess) Poll() {
168+
select {
169+
case <-pmc.exitCh:
170+
return
171+
default:
174172
}
175173

176-
err := pmc.Poll() // Set/Anounce current value to initialise or incase message was missed.
174+
parentDS, err := pmcPkg.RunPMCExpGetParentDS(pmc.configFileName, false)
177175
if err != nil {
178-
glog.Error("Failed to initialise clock class")
176+
return
177+
}
178+
179+
pmc.parentDSCh <- parentDS
180+
}
181+
182+
func (pmc *PMCProcess) monitor(conn net.Conn) error {
183+
if conn != nil {
184+
pmc.c = conn
179185
}
180186

181187
exp, r, err := pmcPkg.GetPMCMontior(pmc.configFileName)
182188
if err != nil {
183-
// Clean up the spawned process if initialization failed
184189
if exp != nil {
185190
utils.CloseExpect(exp, r)
186191
}
@@ -191,42 +196,68 @@ func (pmc *PMCProcess) monitor(conn net.Conn) error {
191196
subscribeCmd := pmc.getMonitorSubcribeCommand()
192197
glog.Infof("Sending '%s' to pmc", subscribeCmd)
193198
exp.Send(subscribeCmd + "\n")
199+
200+
workerCh := make(chan workerSignal, 5)
201+
202+
go pmc.expectWorker(exp, pmc.parentDSCh, workerCh)
203+
194204
for {
195-
_, matches, expectErr := exp.Expect(pmcPkg.GetMonitorRegex(pmc.monitorParentData), pollTimeout)
196205
select {
197206
case <-r:
198207
glog.Warningf("PMC monitoring process exited")
199208
return fmt.Errorf("PMC needs to restart")
200209
case <-pmc.exitCh:
201-
return nil // TODO close gracefully
202-
default:
203-
if expectErr != nil {
204-
if _, ok := expectErr.(expect.TimeoutError); ok {
205-
continue
206-
} else if strings.Contains(expectErr.Error(), "EOF") || strings.Contains(expectErr.Error(), "exit") {
207-
glog.Warningf("PMC process exited (%v)", expectErr)
208-
return fmt.Errorf("PMC needs to restart")
209-
}
210-
glog.Errorf("Error waiting for notification: %v", expectErr)
211-
continue
210+
return nil
211+
case parentDS := <-pmc.parentDSCh:
212+
go pmc.handleParentDS(parentDS)
213+
case signal := <-workerCh:
214+
if signal.restartProcess {
215+
glog.Warningf("PMC process exited (%v)", signal.err)
216+
return fmt.Errorf("PMC needs to restart")
212217
}
213-
if len(matches) == 0 {
218+
}
219+
}
220+
}
221+
222+
func (pmc *PMCProcess) expectWorker(exp *expect.GExpect, parentDSCh chan<- protocol.ParentDataSet, signalCh chan<- workerSignal) {
223+
for {
224+
select {
225+
case <-pmc.exitCh:
226+
return
227+
default:
228+
}
229+
230+
go pmc.Poll() // Check if anything changed while handling the last message
231+
_, matches, expectErr := exp.Expect(pmcPkg.GetMonitorRegex(pmc.monitorParentData), -1)
232+
233+
if expectErr != nil {
234+
if _, ok := expectErr.(expect.TimeoutError); ok {
214235
continue
236+
} else if strings.Contains(expectErr.Error(), "EOF") || strings.Contains(expectErr.Error(), "exit") {
237+
signalCh <- workerSignal{err: expectErr, restartProcess: true}
238+
return
215239
}
216-
if strings.Contains(matches[0], "PARENT_DATA_SET") {
217-
processedMessage, procErr := protocol.ProcessMessage[protocol.ParentDataSet](matches)
218-
if procErr != nil {
219-
glog.Warningf("failed to process message for PARENT_DATA_SET: %s", procErr)
220-
// maybe we should attempt a poll here?
221-
continue
222-
}
223-
go pmc.handleParentDS(*processedMessage)
240+
continue
241+
}
242+
243+
if len(matches) > 0 && strings.Contains(matches[0], "PARENT_DATA_SET") {
244+
processedMessage, procErr := protocol.ProcessMessage[protocol.ParentDataSet](matches)
245+
if procErr != nil {
246+
glog.Warningf("failed to process message for PARENT_DATA_SET: %s", procErr)
247+
} else {
248+
parentDSCh <- *processedMessage
224249
}
225250
}
251+
226252
}
227253
}
228254

229255
func (pmc *PMCProcess) handleParentDS(parentDS protocol.ParentDataSet) {
256+
if pmc.parentDS != nil && pmc.parentDS.Equal(&parentDS) {
257+
glog.Infof("ParentDataSet unchanged, skipping processing for %s", pmc.configFileName)
258+
return
259+
}
260+
230261
glog.Info(parentDS)
231262
oldParentDS := pmc.parentDS
232263
pmc.parentDS = &parentDS
@@ -252,13 +283,11 @@ func (pmc *PMCProcess) Monitor(c net.Conn) error {
252283
for {
253284
err := pmc.monitor(c)
254285
if err != nil {
255-
// Check if we should stop before restarting
256286
select {
257287
case <-pmc.exitCh:
258288
glog.Info("PMC Monitor stopping gracefully")
259289
return nil
260290
default:
261-
// If there is an error we need to restart
262291
glog.Info("pmc process hit an issue (%s). restarting...", err)
263292
continue
264293
}

pkg/daemon/ready.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ func (h metricHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
104104
eventHandler.EmitPortRoleLogs(socketConnection)
105105

106106
processManager := h.tracker.processManager
107-
processManager.EmitProcessStatusLogs()
108-
processManager.EmitClockClassLogs(socketConnection)
107+
go processManager.EmitProcessStatusLogs()
108+
go processManager.EmitClockClassLogs(socketConnection)
109109
}()
110110
}
111111

pkg/pmc/pmc.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ func RunPMCExpSetGMSettings(configFileName string, g protocol.GrandmasterSetting
110110
}
111111

112112
// RunPMCExpGetParentDS ... GET
113-
func RunPMCExpGetParentDS(configFileName string) (p protocol.ParentDataSet, err error) {
113+
func RunPMCExpGetParentDS(configFileName string, logOuput bool) (p protocol.ParentDataSet, err error) {
114114
cmdStr := cmdGetParentDataSet
115115
pmcCmd := pmcCmdConstPart + configFileName
116-
glog.Infof("%s \"%s\"", pmcCmd, cmdStr)
116+
if logOuput {
117+
glog.Infof("%s \"%s\"", pmcCmd, cmdStr)
118+
}
117119
e, r, err := expect.Spawn(pmcCmd, -1)
118120
if err != nil {
119121
return p, err
@@ -131,7 +133,9 @@ func RunPMCExpGetParentDS(configFileName string) (p protocol.ParentDataSet, err
131133
glog.Errorf("pmc result match error %v", err1)
132134
return p, err1
133135
}
134-
glog.Infof("pmc result: %s", result)
136+
if logOuput {
137+
glog.Infof("pmc result: %s", result)
138+
}
135139
for i, m := range matches[1:] {
136140
p.Update(p.Keys()[i], m)
137141
}
@@ -435,12 +439,21 @@ func getSubcribeEvents(exp *expect.GExpect) (*protocol.SubscribedEvents, error)
435439
// GetPMCMontior spawns and initializes a PMC monitoring process.
436440
func GetPMCMontior(configFileName string) (*expect.GExpect, <-chan error, error) {
437441
timeout := time.After(10 * montiorStartTimeout)
442+
var exp *expect.GExpect
443+
var r <-chan error
444+
var err error
445+
438446
for {
447+
if exp != nil {
448+
utils.CloseExpect(exp, r)
449+
time.Sleep(10 * time.Millisecond)
450+
}
439451
cmd := pmcCmdConstPart + configFileName
440452
glog.Errorf("Spawning process '%s' for monitoring pmc", cmd)
441-
exp, r, err := expect.Spawn(cmd, -1)
453+
exp, r, err = expect.Spawn(cmd, -1)
442454
if err != nil {
443455
glog.Errorf("Failed to spawn moniotring pmc process")
456+
continue
444457
}
445458
select {
446459
case <-timeout:

pkg/protocol/type.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,27 @@ func (p *ParentDataSet) String() string {
331331
return result
332332
}
333333

334+
// Equal compares two ParentDataSet instances for equality
335+
func (p *ParentDataSet) Equal(other *ParentDataSet) bool {
336+
if p == nil && other == nil {
337+
return true
338+
}
339+
if p == nil || other == nil {
340+
return false
341+
}
342+
343+
return p.ParentPortIdentity == other.ParentPortIdentity &&
344+
p.ParentStats == other.ParentStats &&
345+
p.ObservedParentOffsetScaledLogVariance == other.ObservedParentOffsetScaledLogVariance &&
346+
p.ObservedParentClockPhaseChangeRate == other.ObservedParentClockPhaseChangeRate &&
347+
p.GrandmasterPriority1 == other.GrandmasterPriority1 &&
348+
p.GrandmasterClockClass == other.GrandmasterClockClass &&
349+
p.GrandmasterClockAccuracy == other.GrandmasterClockAccuracy &&
350+
p.GrandmasterOffsetScaledLogVariance == other.GrandmasterOffsetScaledLogVariance &&
351+
p.GrandmasterPriority2 == other.GrandmasterPriority2 &&
352+
p.GrandmasterIdentity == other.GrandmasterIdentity
353+
}
354+
334355
// ValueRegEx provides the regex method for the ExternalGrandmasterProperties values matching
335356
func (e *ExternalGrandmasterProperties) ValueRegEx() map[string]string {
336357
return map[string]string{

0 commit comments

Comments
 (0)