Skip to content

Commit 1c37f1a

Browse files
ndeloofglours
authored andcommitted
use logs API with Since to collect the very first logs after restart
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent 485b620 commit 1c37f1a

File tree

7 files changed

+92
-108
lines changed

7 files changed

+92
-108
lines changed

cmd/formatter/logs.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,3 @@ func (l logDecorator) Status(container, msg string) {
183183
l.decorated.Status(container, msg)
184184
l.After()
185185
}
186-
187-
func (l logDecorator) Register(container string) {
188-
l.decorated.Register(container)
189-
}

pkg/api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ type ContainerEvent struct {
667667
ID string
668668
Service string
669669
Line string
670-
// ContainerEventExited only
670+
// ExitCode is only set on ContainerEventExited events
671671
ExitCode int
672672
Restarting bool
673673
}

pkg/compose/convergence.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,13 +635,18 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
635635
if inherit {
636636
inherited = &replaced
637637
}
638+
639+
replacedContainerName := service.ContainerName
640+
if replacedContainerName == "" {
641+
replacedContainerName = service.Name + api.Separator + strconv.Itoa(number)
642+
}
638643
name := getContainerName(project.Name, service, number)
639644
tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name)
640645
opts := createOptions{
641646
AutoRemove: false,
642647
AttachStdin: false,
643648
UseNetworkAliases: true,
644-
Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replaced.ID),
649+
Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replacedContainerName),
645650
}
646651
created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w)
647652
if err != nil {
@@ -659,7 +664,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
659664
return created, err
660665
}
661666

662-
err = s.apiClient().ContainerRename(ctx, created.ID, name)
667+
err = s.apiClient().ContainerRename(ctx, tmpName, name)
663668
if err != nil {
664669
return created, err
665670
}

pkg/compose/logs.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package compose
1919
import (
2020
"context"
2121
"io"
22-
"time"
2322

2423
"github.com/containerd/errdefs"
2524
"github.com/docker/docker/api/types/container"
@@ -73,9 +72,14 @@ func (s *composeService) Logs(
7372

7473
if options.Follow {
7574
printer := newLogPrinter(consumer)
76-
eg.Go(printer.Run)
7775

78-
monitor := newMonitor(s.apiClient(), options.Project)
76+
monitor := newMonitor(s.apiClient(), projectName)
77+
if len(options.Services) > 0 {
78+
monitor.withServices(options.Services)
79+
} else if options.Project != nil {
80+
monitor.withServices(options.Project.ServiceNames())
81+
}
82+
monitor.withListener(printer.HandleEvent)
7983
monitor.withListener(func(event api.ContainerEvent) {
8084
if event.Type == api.ContainerEventStarted {
8185
eg.Go(func() error {
@@ -86,7 +90,7 @@ func (s *composeService) Logs(
8690

8791
err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
8892
Follow: options.Follow,
89-
Since: time.Unix(0, event.Time).Format(time.RFC3339Nano),
93+
Since: ctr.State.StartedAt,
9094
Until: options.Until,
9195
Tail: options.Tail,
9296
Timestamps: options.Timestamps,
@@ -100,7 +104,6 @@ func (s *composeService) Logs(
100104
}
101105
})
102106
eg.Go(func() error {
103-
defer printer.Stop()
104107
return monitor.Start(ctx)
105108
})
106109
}

pkg/compose/monitor.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"strconv"
2222

23-
"github.com/compose-spec/compose-go/v2/types"
2423
"github.com/containerd/errdefs"
2524
"github.com/docker/docker/api/types/container"
2625
"github.com/docker/docker/api/types/events"
@@ -34,23 +33,23 @@ import (
3433

3534
type monitor struct {
3635
api client.APIClient
37-
project *types.Project
36+
project string
3837
// services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command
3938
services map[string]bool
4039
listeners []api.ContainerEventListener
4140
}
4241

43-
func newMonitor(api client.APIClient, project *types.Project) *monitor {
44-
services := map[string]bool{}
45-
if project != nil {
46-
for name := range project.Services {
47-
services[name] = true
48-
}
49-
}
42+
func newMonitor(api client.APIClient, project string) *monitor {
5043
return &monitor{
5144
api: api,
5245
project: project,
53-
services: services,
46+
services: map[string]bool{},
47+
}
48+
}
49+
50+
func (c *monitor) withServices(services []string) {
51+
for _, name := range services {
52+
c.services[name] = true
5453
}
5554
}
5655

@@ -62,7 +61,7 @@ func (c *monitor) Start(ctx context.Context) error {
6261
initialState, err := c.api.ContainerList(ctx, container.ListOptions{
6362
All: true,
6463
Filters: filters.NewArgs(
65-
projectFilter(c.project.Name),
64+
projectFilter(c.project),
6665
oneOffFilter(false),
6766
hasConfigHashLabel(),
6867
),
@@ -78,22 +77,24 @@ func (c *monitor) Start(ctx context.Context) error {
7877
containers.Add(ctr.ID)
7978
}
8079
}
81-
8280
restarting := utils.Set[string]{}
8381

8482
evtCh, errCh := c.api.Events(ctx, events.ListOptions{
8583
Filters: filters.NewArgs(
8684
filters.Arg("type", "container"),
87-
projectFilter(c.project.Name)),
85+
projectFilter(c.project)),
8886
})
8987
for {
88+
if len(containers) == 0 {
89+
return nil
90+
}
9091
select {
9192
case <-ctx.Done():
9293
return nil
9394
case err := <-errCh:
9495
return err
9596
case event := <-evtCh:
96-
if !c.services[event.Actor.Attributes[api.ServiceLabel]] {
97+
if len(c.services) > 0 && !c.services[event.Actor.Attributes[api.ServiceLabel]] {
9798
continue
9899
}
99100
ctr, err := c.getContainerSummary(event)
@@ -103,24 +104,35 @@ func (c *monitor) Start(ctx context.Context) error {
103104

104105
switch event.Action {
105106
case events.ActionCreate:
106-
containers.Add(ctr.ID)
107+
if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
108+
containers.Add(ctr.ID)
109+
}
110+
evtType := api.ContainerEventCreated
111+
if _, ok := ctr.Labels[api.ContainerReplaceLabel]; ok {
112+
evtType = api.ContainerEventRecreated
113+
}
107114
for _, listener := range c.listeners {
108-
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated))
115+
listener(newContainerEvent(event.TimeNano, ctr, evtType))
109116
}
110117
logrus.Debugf("container %s created", ctr.Name)
111118
case events.ActionStart:
112119
restarted := restarting.Has(ctr.ID)
113-
for _, listener := range c.listeners {
114-
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) {
115-
e.Restarting = restarted
116-
}))
117-
}
118120
if restarted {
119121
logrus.Debugf("container %s restarted", ctr.Name)
122+
for _, listener := range c.listeners {
123+
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) {
124+
e.Restarting = restarted
125+
}))
126+
}
120127
} else {
121128
logrus.Debugf("container %s started", ctr.Name)
129+
for _, listener := range c.listeners {
130+
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted))
131+
}
132+
}
133+
if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
134+
containers.Add(ctr.ID)
122135
}
123-
containers.Add(ctr.ID)
124136
case events.ActionRestart:
125137
for _, listener := range c.listeners {
126138
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted))
@@ -159,9 +171,6 @@ func (c *monitor) Start(ctx context.Context) error {
159171
}
160172
}
161173
}
162-
if len(containers) == 0 {
163-
return nil
164-
}
165174
}
166175
}
167176

@@ -192,7 +201,7 @@ func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSumma
192201
ctr := &api.ContainerSummary{
193202
ID: event.Actor.ID,
194203
Name: event.Actor.Attributes["name"],
195-
Project: c.project.Name,
204+
Project: c.project,
196205
Service: event.Actor.Attributes[api.ServiceLabel],
197206
Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us
198207
}

pkg/compose/printer.go

Lines changed: 9 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,80 +18,36 @@ package compose
1818

1919
import (
2020
"fmt"
21-
"sync"
2221

2322
"github.com/docker/compose/v2/pkg/api"
2423
)
2524

2625
// logPrinter watch application containers and collect their logs
2726
type logPrinter interface {
2827
HandleEvent(event api.ContainerEvent)
29-
Run() error
30-
Stop()
3128
}
3229

3330
type printer struct {
34-
queue chan api.ContainerEvent
3531
consumer api.LogConsumer
36-
stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue
37-
stop sync.Once
3832
}
3933

4034
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
4135
func newLogPrinter(consumer api.LogConsumer) logPrinter {
4236
printer := printer{
4337
consumer: consumer,
44-
queue: make(chan api.ContainerEvent),
45-
stopCh: make(chan struct{}),
46-
stop: sync.Once{},
4738
}
4839
return &printer
4940
}
5041

51-
func (p *printer) Stop() {
52-
p.stop.Do(func() {
53-
close(p.stopCh)
54-
for {
55-
select {
56-
case <-p.queue:
57-
// purge the queue to free producers goroutines
58-
// p.queue will be garbage collected
59-
default:
60-
return
61-
}
62-
}
63-
})
64-
}
65-
6642
func (p *printer) HandleEvent(event api.ContainerEvent) {
67-
select {
68-
case <-p.stopCh:
69-
return
70-
default:
71-
p.queue <- event
72-
}
73-
}
74-
75-
func (p *printer) Run() error {
76-
defer p.Stop()
77-
78-
// containers we are tracking. Use true when container is running, false after we receive a stop|die signal
79-
for {
80-
select {
81-
case <-p.stopCh:
82-
return nil
83-
case event := <-p.queue:
84-
switch event.Type {
85-
case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted:
86-
p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
87-
if event.Type == api.ContainerEventRecreated {
88-
p.consumer.Status(event.Source, "has been recreated")
89-
}
90-
case api.ContainerEventLog, api.HookEventLog:
91-
p.consumer.Log(event.Source, event.Line)
92-
case api.ContainerEventErr:
93-
p.consumer.Err(event.Source, event.Line)
94-
}
95-
}
43+
switch event.Type {
44+
case api.ContainerEventExited:
45+
p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
46+
case api.ContainerEventRecreated:
47+
p.consumer.Status(event.Container.Labels[api.ContainerReplaceLabel], "has been recreated")
48+
case api.ContainerEventLog, api.HookEventLog:
49+
p.consumer.Log(event.Source, event.Line)
50+
case api.ContainerEventErr:
51+
p.consumer.Err(event.Source, event.Line)
9652
}
9753
}

0 commit comments

Comments
 (0)