Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit 772591b

Browse files
committed
Improve supervisor eventlog locking and API compliance.
Add finer-grained locking to the supervisor eventlog based on the upstream docker containerd code. Adds full compliance for the Event() API by supporting queries for specific container IDs and stored events only, as required by newer Docker daemons in order to recover from hard shutdowns without deadlocking the docker daemon.
1 parent 03142f7 commit 772591b

File tree

4 files changed

+50
-24
lines changed

4 files changed

+50
-24
lines changed

containerd/api/grpc/server/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer
154154
}
155155
t = from
156156
}
157-
events := s.sv.Events.Events(t)
157+
events := s.sv.Events.Events(t, r.StoredOnly, r.Id)
158158
defer s.sv.Events.Unsubscribe(events)
159159
for e := range events {
160160
tsp, err := ptypes.TimestampProto(e.Timestamp)

containerd/containerd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func daemon(sv *supervisor.Supervisor, address string) error {
163163
}
164164

165165
func namespaceShare(sv *supervisor.Supervisor, namespace, state string) {
166-
events := sv.Events.Events(time.Time{})
166+
events := sv.Events.Events(time.Time{}, false, "")
167167
containerCount := 0
168168
for e := range events {
169169
if e.Type == supervisor.EventContainerStart {

supervisor/events.go

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,18 @@ type Event struct {
3434
// Events() might be deadlocked
3535

3636
type SvEvents struct {
37-
sync.RWMutex
38-
subscribers map[chan Event]struct{}
39-
eventLog []Event
37+
subscriberLock sync.RWMutex
38+
subscribers map[chan Event]struct{}
39+
40+
eventLog []Event
41+
eventLock sync.Mutex
4042
}
4143

4244
func (se *SvEvents) setupEventLog(logDir string) error {
4345
if err := se.readEventLog(logDir); err != nil {
4446
return err
4547
}
46-
events := se.Events(time.Time{})
48+
events := se.Events(time.Time{}, false, "")
4749
f, err := os.OpenFile(filepath.Join(logDir, "events.log"), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0755)
4850
if err != nil {
4951
return err
@@ -52,7 +54,9 @@ func (se *SvEvents) setupEventLog(logDir string) error {
5254
go func() {
5355
for e := range events {
5456
glog.Infof("write event log: %v", e)
57+
se.eventLock.Lock()
5558
se.eventLog = append(se.eventLog, e)
59+
se.eventLock.Unlock()
5660
if err := enc.Encode(e); err != nil {
5761
glog.Infof("containerd: fail to write event to journal")
5862
}
@@ -61,6 +65,7 @@ func (se *SvEvents) setupEventLog(logDir string) error {
6165
return nil
6266
}
6367

68+
// Note: no locking - don't call after initialization
6469
func (se *SvEvents) readEventLog(logDir string) error {
6570
f, err := os.Open(filepath.Join(logDir, "events.log"))
6671
if err != nil {
@@ -86,47 +91,68 @@ func (se *SvEvents) readEventLog(logDir string) error {
8691

8792
// Events returns an event channel that external consumers can use to receive updates
8893
// on container events
89-
func (se *SvEvents) Events(from time.Time) chan Event {
90-
se.Lock()
91-
defer se.Unlock()
94+
func (se *SvEvents) Events(from time.Time, storedOnly bool, id string) chan Event {
9295
c := make(chan Event, defaultEventsBufferSize)
93-
se.subscribers[c] = struct{}{}
96+
97+
if storedOnly {
98+
defer se.Unsubscribe(c)
99+
}
100+
101+
// Do not allow the subscriber to unsubscript
102+
se.subscriberLock.Lock()
103+
defer se.subscriberLock.Unlock()
104+
94105
if !from.IsZero() {
95106
// replay old event
96-
for _, e := range se.eventLog {
107+
// note: we lock and make a copy of history to avoid blocking
108+
se.eventLock.Lock()
109+
past := se.eventLog[:]
110+
se.eventLock.Unlock()
111+
112+
for _, e := range past {
97113
if e.Timestamp.After(from) {
98-
c <- e
114+
if id == "" || e.ID == id {
115+
c <- e
116+
}
99117
}
100118
}
101-
// Notify the client that from now on it's live events
102-
c <- Event{
103-
Type: "live",
104-
Timestamp: time.Now(),
119+
120+
if storedOnly {
121+
close(c)
122+
} else {
123+
// Notify the client that from now on it's live events
124+
c <- Event{
125+
Type: "live",
126+
Timestamp: time.Now(),
127+
}
128+
se.subscribers[c] = struct{}{}
105129
}
106130
}
107131
return c
108132
}
109133

110134
// Unsubscribe removes the provided channel from receiving any more events
111135
func (se *SvEvents) Unsubscribe(sub chan Event) {
112-
se.Lock()
113-
defer se.Unlock()
114-
delete(se.subscribers, sub)
115-
close(sub)
136+
se.subscriberLock.Lock()
137+
defer se.subscriberLock.Unlock()
138+
if _, ok := se.subscribers[sub]; ok {
139+
delete(se.subscribers, sub)
140+
close(sub)
141+
}
116142
}
117143

118144
// notifySubscribers will send the provided event to the external subscribers
119145
// of the events channel
120146
func (se *SvEvents) notifySubscribers(e Event) {
121147
glog.Infof("notifySubscribers: %v", e)
122-
se.RLock()
123-
defer se.RUnlock()
148+
se.subscriberLock.RLock()
149+
defer se.subscriberLock.RUnlock()
124150
for sub := range se.subscribers {
125151
// do a non-blocking send for the channel
126152
select {
127153
case sub <- e:
128154
default:
129-
glog.Infof("containerd: event not sent to subscriber")
155+
glog.Warningf("containerd: event not sent to subscriber")
130156
}
131157
}
132158
}

supervisor/supervisor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (sv *Supervisor) getProcess(container, processId string) *Process {
116116
}
117117

118118
func (sv *Supervisor) reaper() {
119-
events := sv.Events.Events(time.Time{})
119+
events := sv.Events.Events(time.Time{}, false, "")
120120
for e := range events {
121121
if e.Type == EventExit {
122122
go sv.reap(e.ID, e.PID)

0 commit comments

Comments
 (0)