Skip to content

Commit fc8fe2c

Browse files
committed
(refactoring) introduce monitor to manage containers events and application termination
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent 317ebcd commit fc8fe2c

File tree

14 files changed

+355
-423
lines changed

14 files changed

+355
-423
lines changed

cmd/formatter/logs.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ func NewLogConsumer(ctx context.Context, stdout, stderr io.Writer, color, prefix
5656
}
5757
}
5858

59-
func (l *logConsumer) Register(name string) {
60-
l.register(name)
61-
}
62-
6359
func (l *logConsumer) register(name string) *presenter {
6460
var p *presenter
6561
root, _, found := strings.Cut(name, " ")

pkg/api/api.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -647,24 +647,25 @@ type LogConsumer interface {
647647
Log(containerName, message string)
648648
Err(containerName, message string)
649649
Status(container, msg string)
650-
Register(container string)
651650
}
652651

653652
// ContainerEventListener is a callback to process ContainerEvent from services
654653
type ContainerEventListener func(event ContainerEvent)
655654

656655
// ContainerEvent notify an event has been collected on source container implementing Service
657656
type ContainerEvent struct {
658-
Type int
659-
// Container is the name of the container _without the project prefix_.
657+
Type int
658+
Time int64
659+
Container *ContainerSummary
660+
// Source is the name of the container _without the project prefix_.
660661
//
661662
// This is only suitable for display purposes within Compose, as it's
662663
// not guaranteed to be unique across services.
663-
Container string
664-
ID string
665-
Service string
666-
Line string
667-
// ContainerEventExit only
664+
Source string
665+
ID string
666+
Service string
667+
Line string
668+
// ContainerEventExited only
668669
ExitCode int
669670
Restarting bool
670671
}
@@ -674,17 +675,19 @@ const (
674675
ContainerEventLog = iota
675676
// ContainerEventErr is a ContainerEvent of type log on stderr. Line is set
676677
ContainerEventErr
677-
// ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container
678-
ContainerEventAttach
678+
// ContainerEventStarted let consumer know a container has been started
679+
ContainerEventStarted
680+
// ContainerEventRestarted let consumer know a container has been restarted
681+
ContainerEventRestarted
679682
// ContainerEventStopped is a ContainerEvent of type stopped.
680683
ContainerEventStopped
684+
// ContainerEventCreated let consumer know a new container has been created
685+
ContainerEventCreated
681686
// ContainerEventRecreated let consumer know container stopped but his being replaced
682687
ContainerEventRecreated
683-
// ContainerEventExit is a ContainerEvent of type exit. ExitCode is set
684-
ContainerEventExit
688+
// ContainerEventExited is a ContainerEvent of type exit. ExitCode is set
689+
ContainerEventExited
685690
// UserCancel user cancelled compose up, we are stopping containers
686-
UserCancel
687-
// HookEventLog is a ContainerEvent of type log on stdout by service hook
688691
HookEventLog
689692
)
690693

pkg/compose/attach.go

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -61,41 +61,37 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, lis
6161
}
6262

6363
func (s *composeService) attachContainer(ctx context.Context, container containerType.Summary, listener api.ContainerEventListener) error {
64-
serviceName := container.Labels[api.ServiceLabel]
65-
containerName := getContainerNameWithoutProject(container)
66-
67-
listener(api.ContainerEvent{
68-
Type: api.ContainerEventAttach,
69-
Container: containerName,
70-
ID: container.ID,
71-
Service: serviceName,
72-
})
64+
service := container.Labels[api.ServiceLabel]
65+
name := getContainerNameWithoutProject(container)
66+
return s.doAttachContainer(ctx, service, container.ID, name, listener)
67+
}
68+
69+
func (s *composeService) doAttachContainer(ctx context.Context, service, id, name string, listener api.ContainerEventListener) error {
70+
inspect, err := s.apiClient().ContainerInspect(ctx, id)
71+
if err != nil {
72+
return err
73+
}
7374

7475
wOut := utils.GetWriter(func(line string) {
7576
listener(api.ContainerEvent{
76-
Type: api.ContainerEventLog,
77-
Container: containerName,
78-
ID: container.ID,
79-
Service: serviceName,
80-
Line: line,
77+
Type: api.ContainerEventLog,
78+
Source: name,
79+
ID: id,
80+
Service: service,
81+
Line: line,
8182
})
8283
})
8384
wErr := utils.GetWriter(func(line string) {
8485
listener(api.ContainerEvent{
85-
Type: api.ContainerEventErr,
86-
Container: containerName,
87-
ID: container.ID,
88-
Service: serviceName,
89-
Line: line,
86+
Type: api.ContainerEventErr,
87+
Source: name,
88+
ID: id,
89+
Service: service,
90+
Line: line,
9091
})
9192
})
9293

93-
inspect, err := s.apiClient().ContainerInspect(ctx, container.ID)
94-
if err != nil {
95-
return err
96-
}
97-
98-
_, _, err = s.attachContainerStreams(ctx, container.ID, inspect.Config.Tty, nil, wOut, wErr)
94+
_, _, err = s.attachContainerStreams(ctx, id, inspect.Config.Tty, nil, wOut, wErr)
9995
return err
10096
}
10197

pkg/compose/containers.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,6 @@ func isService(services ...string) containerPredicate {
128128
}
129129
}
130130

131-
func isRunning() containerPredicate {
132-
return func(c container.Summary) bool {
133-
return c.State == "running"
134-
}
135-
}
136-
137131
// isOrphaned is a predicate to select containers without a matching service definition in compose project
138132
func isOrphaned(project *types.Project) containerPredicate {
139133
services := append(project.ServiceNames(), project.DisabledServiceNames()...)

pkg/compose/hook.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ import (
3232
func (s composeService) runHook(ctx context.Context, ctr container.Summary, service types.ServiceConfig, hook types.ServiceHook, listener api.ContainerEventListener) error {
3333
wOut := utils.GetWriter(func(line string) {
3434
listener(api.ContainerEvent{
35-
Type: api.HookEventLog,
36-
Container: getContainerNameWithoutProject(ctr) + " ->",
37-
ID: ctr.ID,
38-
Service: service.Name,
39-
Line: line,
35+
Type: api.HookEventLog,
36+
Source: getContainerNameWithoutProject(ctr) + " ->",
37+
ID: ctr.ID,
38+
Service: service.Name,
39+
Line: line,
4040
})
4141
})
4242
defer wOut.Close() //nolint:errcheck

pkg/compose/logs.go

Lines changed: 25 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (s *composeService) Logs(
6262
eg, ctx := errgroup.WithContext(ctx)
6363
for _, ctr := range containers {
6464
eg.Go(func() error {
65-
err := s.logContainers(ctx, consumer, ctr, options)
65+
err := s.logContainer(ctx, consumer, ctr, options)
6666
if errdefs.IsNotImplemented(err) {
6767
logrus.Warnf("Can't retrieve logs for %q: %s", getCanonicalContainerName(ctr), err.Error())
6868
return nil
@@ -72,34 +72,21 @@ func (s *composeService) Logs(
7272
}
7373

7474
if options.Follow {
75-
containers = containers.filter(isRunning())
7675
printer := newLogPrinter(consumer)
77-
eg.Go(func() error {
78-
_, err := printer.Run(api.CascadeIgnore, "", nil)
79-
return err
80-
})
81-
82-
for _, c := range containers {
83-
printer.HandleEvent(api.ContainerEvent{
84-
Type: api.ContainerEventAttach,
85-
Container: getContainerNameWithoutProject(c),
86-
ID: c.ID,
87-
Service: c.Labels[api.ServiceLabel],
88-
})
89-
}
76+
eg.Go(printer.Run)
9077

91-
eg.Go(func() error {
92-
err := s.watchContainers(ctx, projectName, options.Services, nil, printer.HandleEvent, containers, func(c container.Summary, t time.Time) error {
93-
printer.HandleEvent(api.ContainerEvent{
94-
Type: api.ContainerEventAttach,
95-
Container: getContainerNameWithoutProject(c),
96-
ID: c.ID,
97-
Service: c.Labels[api.ServiceLabel],
98-
})
78+
monitor := newMonitor(s.apiClient(), options.Project)
79+
monitor.withListener(func(event api.ContainerEvent) {
80+
if event.Type == api.ContainerEventStarted {
9981
eg.Go(func() error {
100-
err := s.logContainers(ctx, consumer, c, api.LogOptions{
82+
ctr, err := s.apiClient().ContainerInspect(ctx, event.ID)
83+
if err != nil {
84+
return err
85+
}
86+
87+
err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
10188
Follow: options.Follow,
102-
Since: t.Format(time.RFC3339Nano),
89+
Since: time.Unix(0, event.Time).Format(time.RFC3339Nano),
10390
Until: options.Until,
10491
Tail: options.Tail,
10592
Timestamps: options.Timestamps,
@@ -110,31 +97,28 @@ func (s *composeService) Logs(
11097
}
11198
return err
11299
})
113-
return nil
114-
}, func(c container.Summary, t time.Time) error {
115-
printer.HandleEvent(api.ContainerEvent{
116-
Type: api.ContainerEventAttach,
117-
Container: "", // actual name will be set by start event
118-
ID: c.ID,
119-
Service: c.Labels[api.ServiceLabel],
120-
})
121-
return nil
122-
})
123-
printer.Stop()
124-
return err
100+
}
101+
})
102+
eg.Go(func() error {
103+
defer printer.Stop()
104+
return monitor.Start(ctx)
125105
})
126106
}
127107

128108
return eg.Wait()
129109
}
130110

131-
func (s *composeService) logContainers(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
132-
cnt, err := s.apiClient().ContainerInspect(ctx, c.ID)
111+
func (s *composeService) logContainer(ctx context.Context, consumer api.LogConsumer, c container.Summary, options api.LogOptions) error {
112+
ctr, err := s.apiClient().ContainerInspect(ctx, c.ID)
133113
if err != nil {
134114
return err
135115
}
116+
name := getContainerNameWithoutProject(c)
117+
return s.doLogContainer(ctx, consumer, name, ctr, options)
118+
}
136119

137-
r, err := s.apiClient().ContainerLogs(ctx, cnt.ID, container.LogsOptions{
120+
func (s *composeService) doLogContainer(ctx context.Context, consumer api.LogConsumer, name string, ctr container.InspectResponse, options api.LogOptions) error {
121+
r, err := s.apiClient().ContainerLogs(ctx, ctr.ID, container.LogsOptions{
138122
ShowStdout: true,
139123
ShowStderr: true,
140124
Follow: options.Follow,
@@ -148,11 +132,10 @@ func (s *composeService) logContainers(ctx context.Context, consumer api.LogCons
148132
}
149133
defer r.Close() //nolint:errcheck
150134

151-
name := getContainerNameWithoutProject(c)
152135
w := utils.GetWriter(func(line string) {
153136
consumer.Log(name, line)
154137
})
155-
if cnt.Config.Tty {
138+
if ctr.Config.Tty {
156139
_, err = io.Copy(w, r)
157140
} else {
158141
_, err = stdcopy.StdCopy(w, w, r)

pkg/compose/logs_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,6 @@ func (l *testLogConsumer) Err(containerName, message string) {
189189

190190
func (l *testLogConsumer) Status(containerName, msg string) {}
191191

192-
func (l *testLogConsumer) Register(containerName string) {}
193-
194192
func (l *testLogConsumer) LogsForContainer(containerName string) []string {
195193
l.mu.Lock()
196194
defer l.mu.Unlock()

0 commit comments

Comments
 (0)