Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/Microsoft/go-winio v0.6.2
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/buger/goterm v1.0.4
github.com/compose-spec/compose-go/v2 v2.4.5
github.com/compose-spec/compose-go/v2 v2.4.6-0.20241203131247-9a9cc5d9c345
github.com/containerd/containerd v1.7.24
github.com/containerd/platforms v0.2.1
github.com/davecgh/go-spew v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8E
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4=
github.com/compose-spec/compose-go/v2 v2.4.5 h1:p4ih4Jb6VgGPLPxh3fSFVKAjFHtZd+7HVLCSFzcFx9Y=
github.com/compose-spec/compose-go/v2 v2.4.5/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/compose-spec/compose-go/v2 v2.4.6-0.20241203131247-9a9cc5d9c345 h1:oLm7hga9jjaDedg+dqsWiI1GeRrcGLBPxu8W0VfpiKA=
github.com/compose-spec/compose-go/v2 v2.4.6-0.20241203131247-9a9cc5d9c345/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
Expand Down
142 changes: 85 additions & 57 deletions pkg/compose/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/compose-spec/compose-go/v2/types"
ccli "github.com/docker/cli/cli/command/container"
pathutil "github.com/docker/compose/v2/internal/paths"
"github.com/docker/compose/v2/internal/sync"
"github.com/docker/compose/v2/pkg/api"
Expand All @@ -48,7 +48,7 @@ const quietPeriod = 500 * time.Millisecond
// fileEvent contains the Compose service and modified host system path.
type fileEvent struct {
sync.PathMapping
Action types.WatchAction
Trigger types.Trigger
}

// getSyncImplementation returns an appropriate sync implementation for the
Expand Down Expand Up @@ -298,7 +298,7 @@ func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMat
}

return &fileEvent{
Action: trigger.Action,
Trigger: trigger,
PathMapping: sync.PathMapping{
HostPath: hostPath,
ContainerPath: containerPath,
Expand Down Expand Up @@ -336,7 +336,10 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
}

if trigger.Action == types.WatchActionRebuild && service.Build == nil {
return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
return nil, fmt.Errorf("service %s doesn't have a build section, can't apply %s on watch", types.WatchActionRebuild, service.Name)
}
if trigger.Action == types.WatchActionSyncExec && len(trigger.Exec.Command) == 0 {
return nil, fmt.Errorf("can't watch with action %q on service %s wihtout a command", types.WatchActionSyncExec, service.Name)
}

config.Watch[i] = trigger
Expand All @@ -352,24 +355,17 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
out := make(chan []fileEvent)
go func() {
defer close(out)
seen := make(map[fileEvent]time.Time)
seen := make(map[string]fileEvent)
flushEvents := func() {
if len(seen) == 0 {
return
}
events := make([]fileEvent, 0, len(seen))
for e := range seen {
for _, e := range seen {
events = append(events, e)
}
// sort batch by oldest -> newest
// (if an event is seen > 1 per batch, it gets the latest timestamp)
sort.SliceStable(events, func(i, j int) bool {
x := events[i]
y := events[j]
return seen[x].Before(seen[y])
})
out <- events
seen = make(map[fileEvent]time.Time)
seen = make(map[string]fileEvent)
}

t := clock.NewTicker(delay)
Expand All @@ -386,7 +382,10 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
flushEvents()
return
}
seen[e] = time.Now()
if _, ok := seen[e.HostPath]; !ok {
// already know updated path, first rule in watch configuration wins
seen[e.HostPath] = e
}
t.Reset(delay)
}
}
Expand Down Expand Up @@ -485,49 +484,10 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
pathMappings := make([]sync.PathMapping, len(batch))
restartService := false
for i := range batch {
if batch[i].Action == types.WatchActionRebuild {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
// restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName}
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)

if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
return err
}

if options.Prune {
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
}

options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))

err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName},
Inherit: true,
Recreate: api.RecreateForce,
})
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
return err
}

services := []string{serviceName}
p, err := project.WithSelectedServices(services)
if err != nil {
return err
}
err = s.start(ctx, project.Name, api.StartOptions{
Project: p,
Services: services,
AttachTo: services,
}, nil)
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
}
return nil
if batch[i].Trigger.Action == types.WatchActionRebuild {
return s.rebuild(ctx, project, serviceName, options)
}
if batch[i].Action == types.WatchActionSyncRestart {
if batch[i].Trigger.Action == types.WatchActionSyncRestart {
restartService = true
}
pathMappings[i] = batch[i].PathMapping
Expand All @@ -554,7 +514,75 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
options.LogTo.Log(
api.WatchLogger,
fmt.Sprintf("service %q restarted", serviceName))
}
eg, ctx := errgroup.WithContext(ctx)
for _, b := range batch {
if b.Trigger.Action == types.WatchActionSyncExec {
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, serviceName)
if err != nil {
return err
}
x := b.Trigger.Exec
for _, c := range containers {
eg.Go(func() error {
exec := ccli.NewExecOptions()
exec.User = x.User
exec.Privileged = x.Privileged
exec.Command = x.Command
exec.Workdir = x.WorkingDir
for _, v := range x.Environment.ToMapping().Values() {
err := exec.Env.Set(v)
if err != nil {
return err
}
}
return ccli.RunExec(ctx, s.dockerCli, c.ID, exec)
})
}
}
}
return eg.Wait()
}

func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
// restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName}
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)

if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
return err
}

if options.Prune {
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
}

options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))

err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName},
Inherit: true,
Recreate: api.RecreateForce,
})
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
return err
}

services := []string{serviceName}
p, err := project.WithSelectedServices(services)
if err != nil {
return err
}
err = s.start(ctx, project.Name, api.StartOptions{
Project: p,
Services: services,
AttachTo: services,
}, nil)
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
}
return nil
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/compose/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"
"slices"
"strings"
"testing"
"time"

Expand All @@ -42,23 +44,32 @@ func TestDebounceBatching(t *testing.T) {
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)

trigger := types.Trigger{
Path: "/",
}
matcher := watch.EmptyMatcher{}
eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch)
for i := 0; i < 100; i++ {
var action types.WatchAction = "a"
var path = "/a"
if i%2 == 0 {
action = "b"
path = "/b"
}
ch <- fileEvent{Action: action}

event := maybeFileEvent(trigger, path, matcher)
require.NotNil(t, event)
ch <- *event
}
// we sent 100 events + the debouncer
clock.BlockUntil(101)
clock.Advance(quietPeriod)
select {
case batch := <-eventBatchCh:
require.ElementsMatch(t, batch, []fileEvent{
{Action: "a"},
{Action: "b"},
slices.SortFunc(batch, func(a, b fileEvent) int {
return strings.Compare(a.HostPath, b.HostPath)
})
assert.Equal(t, len(batch), 2)
assert.Equal(t, batch[0].HostPath, "/a")
assert.Equal(t, batch[1].HostPath, "/b")
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for events")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<meta charset="utf-8">
<title>Docker Nginx</title>
<title>Static file 2</title>
</head>
<body>
<h2>Hello from Nginx container</h2>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<html lang="en">
<head>
<meta charset="utf-8">
<title>Docker Nginx</title>
<title>Static file 2</title>
</head>
<body>
<h2>Hello from Nginx container</h2>
Expand Down
14 changes: 14 additions & 0 deletions pkg/e2e/fixtures/watch/exec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
test:
build:
dockerfile_inline: FROM alpine
command: ping localhost
volumes:
- /data
develop:
watch:
- path: .
target: /data
action: sync+exec
exec:
command: echo "SUCCESS"
40 changes: 40 additions & 0 deletions pkg/e2e/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package e2e

import (
"bytes"
"crypto/rand"
"fmt"
"os"
Expand Down Expand Up @@ -289,3 +290,42 @@ func doTest(t *testing.T, svcName string) {

testComplete.Store(true)
}

func TestWatchExec(t *testing.T) {
cli := NewCLI(t)
const projectName = "test_watch_exec"

t.Cleanup(func() {
cli.RunDockerComposeCmd(t, "-p", projectName, "down")
})

tmpdir := t.TempDir()
composeFilePath := filepath.Join(tmpdir, "compose.yaml")
CopyFile(t, filepath.Join("fixtures", "watch", "exec.yaml"), composeFilePath)
cmd := cli.NewDockerComposeCmd(t, "-p", projectName, "-f", composeFilePath, "up", "--watch")
buffer := bytes.NewBuffer(nil)
cmd.Stdout = buffer
watch := icmd.StartCmd(cmd)

poll.WaitOn(t, func(l poll.LogT) poll.Result {
out := buffer.String()
if strings.Contains(out, "64 bytes from") {
return poll.Success()
}
return poll.Continue("%v", watch.Stdout())
})

t.Logf("Create new file")

testFile := filepath.Join(tmpdir, "test")
require.NoError(t, os.WriteFile(testFile, []byte("test\n"), 0o600))

poll.WaitOn(t, func(l poll.LogT) poll.Result {
out := buffer.String()
if strings.Contains(out, "SUCCESS") {
return poll.Success()
}
return poll.Continue("%v", out)
})
cli.RunDockerComposeCmdNoCheck(t, "-p", projectName, "kill", "-s", "9")
}
Loading