Skip to content

Commit b88deb1

Browse files
committed
(refactoring) introduce monitor to manage containers events and application termination
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent 3431172 commit b88deb1

File tree

14 files changed

+355
-423
lines changed

14 files changed

+355
-423
lines changed

cmd/formatter/logs.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ func NewLogConsumer(ctx context.Context, stdout, stderr io.Writer, color, prefix
5656
}
5757
}
5858

59-
func (l *logConsumer) Register(name string) {
60-
l.register(name)
61-
}
62-
6359
func (l *logConsumer) register(name string) *presenter {
6460
var p *presenter
6561
root, _, found := strings.Cut(name, " ")

pkg/api/api.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -640,24 +640,25 @@ type LogConsumer interface {
640640
Log(containerName, message string)
641641
Err(containerName, message string)
642642
Status(container, msg string)
643-
Register(container string)
644643
}
645644

646645
// ContainerEventListener is a callback to process ContainerEvent from services
647646
type ContainerEventListener func(event ContainerEvent)
648647

649648
// ContainerEvent notify an event has been collected on source container implementing Service
650649
type ContainerEvent struct {
651-
Type int
652-
// Container is the name of the container _without the project prefix_.
650+
Type int
651+
Time int64
652+
Container *ContainerSummary
653+
// Source is the name of the container _without the project prefix_.
653654
//
654655
// This is only suitable for display purposes within Compose, as it's
655656
// not guaranteed to be unique across services.
656-
Container string
657-
ID string
658-
Service string
659-
Line string
660-
// ContainerEventExit only
657+
Source string
658+
ID string
659+
Service string
660+
Line string
661+
// ContainerEventExited only
661662
ExitCode int
662663
Restarting bool
663664
}
@@ -667,17 +668,19 @@ const (
667668
ContainerEventLog = iota
668669
// ContainerEventErr is a ContainerEvent of type log on stderr. Line is set
669670
ContainerEventErr
670-
// ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container
671-
ContainerEventAttach
671+
// ContainerEventStarted let consumer know a container has been started
672+
ContainerEventStarted
673+
// ContainerEventRestarted let consumer know a container has been restarted
674+
ContainerEventRestarted
672675
// ContainerEventStopped is a ContainerEvent of type stopped.
673676
ContainerEventStopped
677+
// ContainerEventCreated let consumer know a new container has been created
678+
ContainerEventCreated
674679
// ContainerEventRecreated let consumer know container stopped but his being replaced
675680
ContainerEventRecreated
676-
// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
677-
ContainerEventExit
681+
// ContainerEventExited is a ContainerEvent of type exit. ExitCode is set
682+
ContainerEventExited
678683
// UserCancel user cancelled compose up, we are stopping containers
679-
UserCancel
680-
// HookEventLog is a ContainerEvent of type log on stdout by service hook
681684
HookEventLog
682685
)
683686

pkg/compose/attach.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,41 +61,37 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
6161
}
6262

6363
func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error {
64-
serviceName := container.Labels[api.ServiceLabel]
65-
containerName := getContainerNameWithoutProject(container)
66-
67-
listener(api.ContainerEvent{
68-
Type: api.ContainerEventAttach,
69-
Container: containerName,
70-
ID: container.ID,
71-
Service: serviceName,
72-
})
64+
service := container.Labels[api.ServiceLabel]
65+
name := getContainerNameWithoutProject(container)
66+
return s.doAttachContainer(ctx, service, container.ID, name, listener)
67+
}
68+
69+
func (s *composeService) doAttachContainer(ctx context.Context, service, id, name string, listener api.ContainerEventListener) error {
70+
inspect, err := s.apiClient().ContainerInspect(ctx, id)
71+
if err != nil {
72+
return err
73+
}
7374

7475
wOut := utils.GetWriter(func(line string) {
7576
listener(api.ContainerEvent{
76-
Type: api.ContainerEventLog,
77-
Container: containerName,
78-
ID: container.ID,
79-
Service: serviceName,
80-
Line: line,
77+
Type: api.ContainerEventLog,
78+
Source: name,
79+
ID: id,
80+
Service: service,
81+
Line: line,
8182
})
8283
})
8384
wErr := utils.GetWriter(func(line string) {
8485
listener(api.ContainerEvent{
85-
Type: api.ContainerEventErr,
86-
Container: containerName,
87-
ID: container.ID,
88-
Service: serviceName,
89-
Line: line,
86+
Type: api.ContainerEventErr,
87+
Source: name,
88+
ID: id,
89+
Service: service,
90+
Line: line,
9091
})
9192
})
9293

93-
inspect, err := s.apiClient().ContainerInspect(ctx, container.ID)
94-
if err != nil {
95-
return err
96-
}
97-
98-
_, _, err = s.attachContainerStreams(ctx, container.ID, inspect.Config.Tty, nil, wOut, wErr)
94+
_, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr)
9995
return err
10096
}
10197

pkg/compose/containers.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,6 @@ func isService(services ...string) containerPredicate {
128128
}
129129
}
130130

131-
func isRunning() containerPredicate {
132-
return func(c container.Summary) bool {
133-
return c.State == "running"
134-
}
135-
}
136-
137131
// isOrphaned is a predicate to select containers without a matching service definition in compose project
138132
func isOrphaned(project *types.Project) containerPredicate {
139133
services := append(project.ServiceNames(), project.DisabledServiceNames()...)

pkg/compose/hook.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ import (
3232
func (s composeService) runHook(ctx context.Context, ctr container.Summary, service types.ServiceConfig, hook types.ServiceHook, listener api.ContainerEventListener) error {
3333
wOut := utils.GetWriter(func(line string) {
3434
listener(api.ContainerEvent{
35-
Type: api.HookEventLog,
36-
Container: getContainerNameWithoutProject(ctr) + " ->",
37-
ID: ctr.ID,
38-
Service: service.Name,
39-
Line: line,
35+
Type: api.HookEventLog,
36+
Source: getContainerNameWithoutProject(ctr) + " ->",
37+
ID: ctr.ID,
38+
Service: service.Name,
39+
Line: line,
4040
})
4141
})
4242
defer wOut.Close() //nolint:errcheck

pkg/compose/logs.go

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (s *composeService) Logs(
6363
eg, ctx := errgroup.WithContext(ctx)
6464
for _, ctr := range containers {
6565
eg.Go(func() error {
66-
err := s.logContainers(ctx, consumer, ctr, options)
66+
err := s.logContainer(ctx, consumer, ctr, options)
6767
var notImplErr errdefs.ErrNotImplemented
6868
if errors.As(err, &notImplErr) {
6969
logrus.Warnf("Can't retrieve logs for %q: %s", getCanonicalContainerName(ctr), err.Error())
@@ -74,34 +74,21 @@ func (s *composeService) Logs(
7474
}
7575

7676
if options.Follow {
77-
containers = containers.filter(isRunning())
7877
printer := newLogPrinter(consumer)
79-
eg.Go(func() error {
80-
_, err := printer.Run(api.CascadeIgnore, "", nil)
81-
return err
82-
})
83-
84-
for _, c := range containers {
85-
printer.HandleEvent(api.ContainerEvent{
86-
Type: api.ContainerEventAttach,
87-
Container: getContainerNameWithoutProject(c),
88-
ID: c.ID,
89-
Service: c.Labels[api.ServiceLabel],
90-
})
91-
}
78+
eg.Go(printer.Run)
9279

93-
eg.Go(func() error {
94-
err := s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c container.Summary, t time.Time) error {
95-
printer.HandleEvent(api.ContainerEvent{
96-
Type: api.ContainerEventAttach,
97-
Container: getContainerNameWithoutProject(c),
98-
ID: c.ID,
99-
Service: c.Labels[api.ServiceLabel],
100-
})
80+
monitor := newMonitor(s.apiClient(), options.Project)
81+
monitor.withListener(func(event api.ContainerEvent) {
82+
if event.Type == api.ContainerEventStarted {
10183
eg.Go(func() error {
102-
err := s.logContainers(ctx, consumer, c, api.LogOptions{
84+
ctr, err := s.apiClient().ContainerInspect(ctx, event.ID)
85+
if err != nil {
86+
return err
87+
}
88+
89+
err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
10390
Follow: options.Follow,
104-
Since: t.Format(time.RFC3339Nano),
91+
Since: time.Unix(0, event.Time).Format(time.RFC3339Nano),
10592
Until: options.Until,
10693
Tail: options.Tail,
10794
Timestamps: options.Timestamps,
@@ -113,31 +100,28 @@ func (s *composeService) Logs(
113100
}
114101
return err
115102
})
116-
return nil
117-
}, func(c container.Summary, t time.Time) error {
118-
printer.HandleEvent(api.ContainerEvent{
119-
Type: api.ContainerEventAttach,
120-
Container: "", // actual name will be set by start event
121-
ID: c.ID,
122-
Service: c.Labels[api.ServiceLabel],
123-
})
124-
return nil
125-
})
126-
printer.Stop()
127-
return err
103+
}
104+
})
105+
eg.Go(func() error {
106+
defer printer.Stop()
107+
return monitor.Start(ctx)
128108
})
129109
}
130110

131111
return eg.Wait()
132112
}
133113

134-
func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
135-
cnt, err := s.apiClient().ContainerInspect(ctx, c.ID)
114+
func (s *composeService) logContainer(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
115+
ctr, err := s.apiClient().ContainerInspect(ctx, c.ID)
136116
if err != nil {
137117
return err
138118
}
119+
name := getContainerNameWithoutProject(c)
120+
return s.doLogContainer(ctx, consumer, name, ctr, options)
121+
}
139122

140-
r, err := s.apiClient().ContainerLogs(ctx, cnt.ID, container.LogsOptions{
123+
func (s *composeService) doLogContainer(ctx context.Context, consumer api.LogConsumer, name string, ctr container.InspectResponse, options api.LogOptions) error {
124+
r, err := s.apiClient().ContainerLogs(ctx, ctr.ID, container.LogsOptions{
141125
ShowStdout: true,
142126
ShowStderr: true,
143127
Follow: options.Follow,
@@ -151,11 +135,10 @@ func (s *composeService) logContainers(ctx context.Context, consumer api.LogCons
151135
}
152136
defer r.Close() //nolint:errcheck
153137

154-
name := getContainerNameWithoutProject(c)
155138
w := utils.GetWriter(func(line string) {
156139
consumer.Log(name, line)
157140
})
158-
if cnt.Config.Tty {
141+
if ctr.Config.Tty {
159142
_, err = io.Copy(w, r)
160143
} else {
161144
_, err = stdcopy.StdCopy(w, w, r)

pkg/compose/logs_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,6 @@ func (l *testLogConsumer) Err(containerName, message string) {
189189

190190
func (l *testLogConsumer) Status(containerName, msg string) {}
191191

192-
func (l *testLogConsumer) Register(containerName string) {}
193-
194192
func (l *testLogConsumer) LogsForContainer(containerName string) []string {
195193
l.mu.Lock()
196194
defer l.mu.Unlock()

0 commit comments

Comments
 (0)