Skip to content

Commit b812857

Browse files
committed
supervisor: cancel SendTask when channel is full
This patch handles case c) in moby/moby#31487 (c) A request cannot be added to 's.tasks' because the queue is full. case b) isn't fixable at this point case there's no way to cancel stuff from a channel since channels aren't context aware (yet?). Parallelizing tasks handling isn't easily doable either. Signed-off-by: Antonio Murdaca <runcom@redhat.com>
1 parent 719f814 commit b812857

9 files changed

Lines changed: 45 additions & 10 deletions

File tree

api/grpc/server/server.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
4646
return nil, errors.New("empty bundle path")
4747
}
4848
e := &supervisor.StartTask{}
49+
e.WithContext(ctx)
4950
e.ID = c.Id
5051
e.BundlePath = c.BundlePath
5152
e.Stdin = c.Stdin
@@ -56,7 +57,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
5657
e.Runtime = c.Runtime
5758
e.RuntimeArgs = c.RuntimeArgs
5859
e.StartResponse = make(chan supervisor.StartResponse, 1)
59-
e.Ctx = ctx
6060
if c.Checkpoint != "" {
6161
e.CheckpointDir = c.CheckpointDir
6262
e.Checkpoint = &runtime.Checkpoint{
@@ -79,6 +79,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
7979

8080
func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
8181
e := &supervisor.CreateCheckpointTask{}
82+
e.WithContext(ctx)
8283
e.ID = r.Id
8384
e.CheckpointDir = r.CheckpointDir
8485
e.Checkpoint = &runtime.Checkpoint{
@@ -102,6 +103,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo
102103
return nil, errors.New("checkpoint name cannot be empty")
103104
}
104105
e := &supervisor.DeleteCheckpointTask{}
106+
e.WithContext(ctx)
105107
e.ID = r.Id
106108
e.CheckpointDir = r.CheckpointDir
107109
e.Checkpoint = &runtime.Checkpoint{
@@ -116,6 +118,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo
116118

117119
func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
118120
e := &supervisor.GetContainersTask{}
121+
e.WithContext(ctx)
119122
s.sv.SendTask(e)
120123
if err := <-e.ErrorCh(); err != nil {
121124
return nil, err
@@ -150,6 +153,7 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR
150153

151154
func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
152155
e := &supervisor.SignalTask{}
156+
e.WithContext(ctx)
153157
e.ID = r.Id
154158
e.PID = r.Pid
155159
e.Signal = syscall.Signal(int(r.Signal))
@@ -167,6 +171,7 @@ func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.St
167171
}
168172

169173
e := &supervisor.GetContainersTask{}
174+
e.WithContext(ctx)
170175
e.ID = r.Id
171176
e.GetState = getState
172177
s.sv.SendTask(e)
@@ -253,6 +258,7 @@ func toUint32(its []int) []uint32 {
253258

254259
func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
255260
e := &supervisor.UpdateTask{}
261+
e.WithContext(ctx)
256262
e.ID = r.Id
257263
e.State = runtime.State(r.Status)
258264
if r.Resources != nil {
@@ -304,6 +310,7 @@ func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContaine
304310

305311
func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
306312
e := &supervisor.UpdateProcessTask{}
313+
e.WithContext(ctx)
307314
e.ID = r.Id
308315
e.PID = r.Pid
309316
e.Height = int(r.Height)
@@ -482,6 +489,7 @@ func getSystemCPUUsage() (uint64, error) {
482489

483490
func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
484491
e := &supervisor.StatsTask{}
492+
e.WithContext(ctx)
485493
e.ID = r.Id
486494
e.Stat = make(chan *runtime.Stat, 1)
487495
s.sv.SendTask(e)

api/grpc/server/server_linux.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
4949
return nil, fmt.Errorf("process id cannot be empty")
5050
}
5151
e := &supervisor.AddProcessTask{}
52+
e.WithContext(ctx)
5253
e.ID = r.Id
5354
e.PID = r.Pid
5455
e.ProcessSpec = process
5556
e.Stdin = r.Stdin
5657
e.Stdout = r.Stdout
5758
e.Stderr = r.Stderr
5859
e.StartResponse = make(chan supervisor.StartResponse, 1)
59-
e.Ctx = ctx
6060
s.sv.SendTask(e)
6161
if err := <-e.ErrorCh(); err != nil {
6262
return nil, err

api/grpc/server/server_solaris.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
2525
return nil, fmt.Errorf("process id cannot be empty")
2626
}
2727
e := &supervisor.AddProcessTask{}
28+
e.WithContext(ctx)
2829
e.ID = r.Id
2930
e.PID = r.Pid
3031
e.ProcessSpec = process

supervisor/add_process.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66

77
"github.com/containerd/containerd/runtime"
88
"github.com/containerd/containerd/specs"
9-
"golang.org/x/net/context"
109
)
1110

1211
// AddProcessTask holds everything necessary to add a process to a
@@ -20,7 +19,6 @@ type AddProcessTask struct {
2019
Stdin string
2120
ProcessSpec *specs.ProcessSpec
2221
StartResponse chan StartResponse
23-
Ctx context.Context
2422
}
2523

2624
func (s *Supervisor) addProcess(t *AddProcessTask) error {
@@ -29,7 +27,7 @@ func (s *Supervisor) addProcess(t *AddProcessTask) error {
2927
if !ok {
3028
return ErrContainerNotFound
3129
}
32-
process, err := ci.container.Exec(t.Ctx, t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
30+
process, err := ci.container.Exec(t.Ctx(), t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
3331
if err != nil {
3432
return err
3533
}

supervisor/create.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"time"
66

77
"github.com/containerd/containerd/runtime"
8-
"golang.org/x/net/context"
98
)
109

1110
// StartTask holds needed parameters to create a new container
@@ -23,7 +22,6 @@ type StartTask struct {
2322
CheckpointDir string
2423
Runtime string
2524
RuntimeArgs []string
26-
Ctx context.Context
2725
}
2826

2927
func (s *Supervisor) start(t *StartTask) error {
@@ -59,7 +57,7 @@ func (s *Supervisor) start(t *StartTask) error {
5957
Stdin: t.Stdin,
6058
Stdout: t.Stdout,
6159
Stderr: t.Stderr,
62-
Ctx: t.Ctx,
60+
Ctx: t.Ctx(),
6361
}
6462
if t.Checkpoint != nil {
6563
task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name)

supervisor/exit.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func (s *Supervisor) exit(t *ExitTask) error {
4141
Status: status,
4242
Process: proc,
4343
}
44+
ne.WithContext(t.Ctx())
4445
s.execExit(ne)
4546
return nil
4647
}
@@ -51,6 +52,7 @@ func (s *Supervisor) exit(t *ExitTask) error {
5152
PID: proc.ID(),
5253
Process: proc,
5354
}
55+
ne.WithContext(t.Ctx())
5456
s.delete(ne)
5557

5658
ExitProcessTimer.UpdateSince(start)

supervisor/supervisor.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package supervisor
22

33
import (
4+
"context"
45
"encoding/json"
56
"io"
67
"io/ioutil"
@@ -290,15 +291,21 @@ func (s *Supervisor) Machine() Machine {
290291

291292
// SendTask sends the provided event the the supervisors main event loop
292293
func (s *Supervisor) SendTask(evt Task) {
293-
TasksCounter.Inc(1)
294-
s.tasks <- evt
294+
select {
295+
case <-evt.Ctx().Done():
296+
evt.ErrorCh() <- evt.Ctx().Err()
297+
close(evt.ErrorCh())
298+
case s.tasks <- evt:
299+
TasksCounter.Inc(1)
300+
}
295301
}
296302

297303
func (s *Supervisor) exitHandler() {
298304
for p := range s.monitor.Exits() {
299305
e := &ExitTask{
300306
Process: p,
301307
}
308+
e.WithContext(context.Background())
302309
s.SendTask(e)
303310
}
304311
}
@@ -308,6 +315,7 @@ func (s *Supervisor) oomHandler() {
308315
e := &OOMTask{
309316
ID: id,
310317
}
318+
e.WithContext(context.Background())
311319
s.SendTask(e)
312320
}
313321
}
@@ -371,6 +379,7 @@ func (s *Supervisor) restore() error {
371379
e := &ExitTask{
372380
Process: p,
373381
}
382+
e.WithContext(context.Background())
374383
s.SendTask(e)
375384
}
376385
}

supervisor/task.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package supervisor
22

33
import (
4+
"context"
45
"sync"
56

67
"github.com/containerd/containerd/runtime"
@@ -17,13 +18,28 @@ type StartResponse struct {
1718
type Task interface {
1819
// ErrorCh returns a channel used to report and error from an async task
1920
ErrorCh() chan error
21+
// Ctx carries the context of a task
22+
Ctx() context.Context
2023
}
2124

2225
type baseTask struct {
2326
errCh chan error
27+
ctx context.Context
2428
mu sync.Mutex
2529
}
2630

31+
func (t *baseTask) WithContext(ctx context.Context) {
32+
t.mu.Lock()
33+
defer t.mu.Unlock()
34+
t.ctx = ctx
35+
}
36+
37+
func (t *baseTask) Ctx() context.Context {
38+
t.mu.Lock()
39+
defer t.mu.Unlock()
40+
return t.ctx
41+
}
42+
2743
func (t *baseTask) ErrorCh() chan error {
2844
t.mu.Lock()
2945
defer t.mu.Unlock()

supervisor/worker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (w *worker) Start() {
5555
NoEvent: true,
5656
Process: process,
5757
}
58+
evt.WithContext(t.Ctx)
5859
w.s.SendTask(evt)
5960
continue
6061
}
@@ -71,6 +72,7 @@ func (w *worker) Start() {
7172
NoEvent: true,
7273
Process: process,
7374
}
75+
evt.WithContext(t.Ctx)
7476
w.s.SendTask(evt)
7577
continue
7678
}
@@ -85,6 +87,7 @@ func (w *worker) Start() {
8587
NoEvent: true,
8688
Process: process,
8789
}
90+
evt.WithContext(t.Ctx)
8891
w.s.SendTask(evt)
8992
continue
9093
}

0 commit comments

Comments
 (0)