Skip to content

Commit cc1ee77

Browse files
committed
feat: add watchman integration
1 parent af87c54 commit cc1ee77

File tree

4 files changed

+256
-19
lines changed

4 files changed

+256
-19
lines changed

src/app/process.go

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"context"
66
"errors"
77
"fmt"
8-
"github.com/cakturk/go-netstat/netstat"
9-
"github.com/f1bonacc1/process-compose/src/types"
108
"io"
119
"math/rand"
1210
"os"
@@ -17,6 +15,9 @@ import (
1715
"syscall"
1816
"time"
1917

18+
"github.com/cakturk/go-netstat/netstat"
19+
"github.com/f1bonacc1/process-compose/src/types"
20+
2021
"github.com/f1bonacc1/process-compose/src/command"
2122
"github.com/f1bonacc1/process-compose/src/health"
2223
"github.com/f1bonacc1/process-compose/src/pclog"
@@ -65,6 +66,13 @@ type Process struct {
6566
isMain bool
6667
extraArgs []string
6768
isStopped atomic.Bool
69+
isDevRestart atomic.Bool
70+
devWatchers []devWatch
71+
}
72+
73+
type devWatch struct {
74+
sub *WatchmanSub
75+
config *types.Watch
6876
}
6977

7078
func NewProcess(
@@ -77,6 +85,7 @@ func NewProcess(
7785
printLogs bool,
7886
isMain bool,
7987
extraArgs []string,
88+
watchman *Watchman,
8089
) *Process {
8190
colNumeric := rand.Intn(int(color.FgHiWhite)-int(color.FgHiBlack)) + int(color.FgHiBlack)
8291

@@ -104,6 +113,7 @@ func NewProcess(
104113
proc.setUpProbes()
105114
proc.procCond = *sync.NewCond(proc)
106115
proc.procStartedCond = *sync.NewCond(proc)
116+
proc.setUpWatchman(watchman)
107117
return proc
108118
}
109119

@@ -139,9 +149,9 @@ func (p *Process) run() int {
139149

140150
p.startProbes()
141151

142-
//Wait should wait for I/O consumption, but if the execution is too fast
143-
//e.g. echo 'hello world' the output will not reach the pipe
144-
//TODO Fix this
152+
// Wait should wait for I/O consumption, but if the execution is too fast
153+
// e.g. echo 'hello world' the output will not reach the pipe
154+
// TODO Fix this
145155
time.Sleep(50 * time.Millisecond)
146156
_ = p.command.Wait()
147157
p.Lock()
@@ -212,7 +222,6 @@ func (p *Process) getCommander() command.Commander {
212222
p.mergeExtraArgs(),
213223
)
214224
}
215-
216225
}
217226

218227
func (p *Process) mergeExtraArgs() []string {
@@ -254,6 +263,11 @@ func (p *Process) isRestartable() bool {
254263
p.Lock()
255264
exitCode := p.getExitCode()
256265
p.Unlock()
266+
267+
if p.isDevRestart.Swap(false) {
268+
return true
269+
}
270+
257271
if p.isStopped.Swap(false) {
258272
return false
259273
}
@@ -381,16 +395,35 @@ func (p *Process) isRunning() bool {
381395

382396
func (p *Process) prepareForShutDown() {
383397
// prevent restart during global shutdown or scale down
384-
//p.procConf.RestartPolicy.Restart = types.RestartPolicyNo
398+
// p.procConf.RestartPolicy.Restart = types.RestartPolicyNo
385399
p.isStopped.Store(true)
386-
387400
}
388401

389402
func (p *Process) onProcessStart() {
390403
if isStringDefined(p.procConf.LogLocation) {
391404
p.logger.Open(p.getLogPath(), p.procConf.LoggerConfig)
392405
}
393406

407+
for _, watch := range p.devWatchers {
408+
go func() {
409+
for {
410+
files, ok := watch.sub.Recv()
411+
if !ok {
412+
log.
413+
Debug().
414+
Msg("watchman sub closed, exiting")
415+
return
416+
}
417+
418+
if len(files) == 0 {
419+
continue
420+
}
421+
422+
p.isDevRestart.Store(true)
423+
}
424+
}()
425+
}
426+
394427
p.Lock()
395428
p.started = true
396429
p.Unlock()
@@ -457,8 +490,8 @@ func (p *Process) updateProcState() {
457490
p.procState.Name = p.getName()
458491
}
459492
p.procState.IsRunning = isRunning
460-
461493
}
494+
462495
func (p *Process) setStartTime(startTime time.Time) {
463496
p.timeMutex.Lock()
464497
defer p.timeMutex.Unlock()
@@ -663,6 +696,13 @@ func (p *Process) onReadinessCheckEnd(isOk, isFatal bool, err string) {
663696
}
664697
}
665698

699+
func (p *Process) setUpWatchman(watchman *Watchman) {
700+
for _, config := range p.procConf.Watch {
701+
recv := watchman.Subscribe(config)
702+
p.devWatchers = append(p.devWatchers, devWatch{recv, config})
703+
}
704+
}
705+
666706
func (p *Process) validateProcess() error {
667707
if isStringDefined(p.procConf.WorkingDir) {
668708
stat, err := os.Stat(p.procConf.WorkingDir)

src/app/project_runner.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package app
22

33
import (
44
"fmt"
5-
"github.com/f1bonacc1/process-compose/src/config"
6-
"github.com/f1bonacc1/process-compose/src/pclog"
7-
"github.com/f1bonacc1/process-compose/src/types"
85
"os"
96
"os/user"
107
"runtime"
118
"slices"
129
"sync"
1310
"time"
1411

12+
"github.com/f1bonacc1/process-compose/src/config"
13+
"github.com/f1bonacc1/process-compose/src/pclog"
14+
"github.com/f1bonacc1/process-compose/src/types"
15+
1516
"github.com/rs/zerolog/log"
1617
)
1718

@@ -32,6 +33,7 @@ type ProjectRunner struct {
3233
mainProcessArgs []string
3334
isTuiOn bool
3435
isOrderedShutDown bool
36+
watchman Watchman
3537
}
3638

3739
func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) {
@@ -67,7 +69,7 @@ func (p *ProjectRunner) Run() int {
6769
p.logger.Open(p.project.LogLocation, p.project.LoggerConfig)
6870
defer p.logger.Close()
6971
}
70-
//zerolog.SetGlobalLevel(zerolog.PanicLevel)
72+
// zerolog.SetGlobalLevel(zerolog.PanicLevel)
7173
log.Debug().Msgf("Spinning up %d processes. Order: %q", len(runOrder), nameOrder)
7274
for _, proc := range runOrder {
7375
newConf := proc
@@ -108,6 +110,7 @@ func (p *ProjectRunner) runProcess(config *types.ProcessConfig) {
108110
printLogs,
109111
isMain,
110112
extraArgs,
113+
&p.watchman,
111114
)
112115
p.addRunningProcess(process)
113116
p.waitGroup.Add(1)
@@ -128,7 +131,6 @@ func (p *ProjectRunner) runProcess(config *types.ProcessConfig) {
128131
func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error {
129132
for k := range process.DependsOn {
130133
if runningProc := p.getRunningProcess(k); runningProc != nil {
131-
132134
switch process.DependsOn[k].Condition {
133135
case types.ProcessConditionCompleted:
134136
runningProc.waitForCompletion()
@@ -158,7 +160,6 @@ func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error {
158160
} else {
159161
log.Error().Msgf("Error: process %s depends on %s, but it isn't running", process.ReplicaName, k)
160162
}
161-
162163
}
163164
return nil
164165
}
@@ -596,6 +597,7 @@ func (p *ProjectRunner) renameProcess(name string, newName string) {
596597
p.project.Processes[newName] = config
597598
}
598599
}
600+
599601
func (p *ProjectRunner) removeProcessLogs(name string) *pclog.ProcessLogBuffer {
600602
p.logsMutex.Lock()
601603
defer p.logsMutex.Unlock()
@@ -724,7 +726,6 @@ func bToMb(b uint64) uint64 {
724726
}
725727

726728
func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) {
727-
728729
hostname, err := os.Hostname()
729730
if err != nil {
730731
log.Err(err).Msg("Failed get hostname")

0 commit comments

Comments
 (0)