Skip to content

Commit 7bf5abc

Browse files
committed
feature: support take over old containerd instance when pouchd restart
Signed-off-by: Michael Wan <[email protected]>
1 parent 819c4d3 commit 7bf5abc

File tree

9 files changed

+375
-66
lines changed

9 files changed

+375
-66
lines changed

ctrd/client.go

Lines changed: 182 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,43 @@ package ctrd
33
import (
44
"context"
55
"fmt"
6+
"io"
7+
"os"
8+
"os/exec"
9+
"path"
10+
"strconv"
11+
"strings"
612
"sync"
13+
"syscall"
714
"time"
815

916
"github.com/alibaba/pouch/pkg/scheduler"
17+
"github.com/alibaba/pouch/pkg/utils"
1018

1119
"github.com/containerd/containerd"
1220
"github.com/sirupsen/logrus"
1321
)
1422

1523
const (
1624
unixSocketPath = "/run/containerd/containerd.sock"
25+
containerdPidFileName = "containerd.pid"
1726
defaultGrpcClientPoolCapacity = 5
1827
defaultMaxStreamsClient = 100
28+
containerdShutdownTimeout = 15 * time.Second
1929
)
2030

21-
// Config represents the config used to communicated with containerd.
22-
type Config struct {
23-
Address string
24-
// GrpcClientPoolCapacity is the capacity of grpc client pool.
25-
GrpcClientPoolCapacity int
26-
// MaxStreamsClient records the max number of concurrent streams
27-
MaxStreamsClient int
28-
}
29-
3031
// Client is the client side the daemon holds to communicate with containerd.
3132
type Client struct {
32-
mu sync.RWMutex
33-
Config
33+
mu sync.RWMutex
3434
watch *watch
3535
lock *containerLock
3636

37+
daemonPid int
38+
homeDir string
39+
rpcAddr string
40+
oomScoreAdjust int
41+
debugLog bool
42+
3743
// containerd grpc pool
3844
pool []scheduler.Factory
3945
scheduler scheduler.Scheduler
@@ -42,38 +48,50 @@ type Client struct {
4248
}
4349

4450
// NewClient connect to containerd.
45-
func NewClient(cfg Config) (APIClient, error) {
46-
if cfg.Address == "" {
47-
cfg.Address = unixSocketPath
48-
}
49-
50-
if cfg.GrpcClientPoolCapacity <= 0 {
51-
cfg.GrpcClientPoolCapacity = defaultGrpcClientPoolCapacity
51+
func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) {
52+
// set default value for parameters
53+
copts := clientOpts{
54+
rpcAddr: unixSocketPath,
55+
grpcClientPoolCapacity: defaultGrpcClientPoolCapacity,
56+
maxStreamsClient: defaultMaxStreamsClient,
5257
}
5358

54-
if cfg.MaxStreamsClient <= 0 {
55-
cfg.MaxStreamsClient = defaultMaxStreamsClient
59+
for _, opt := range opts {
60+
if err := opt(&copts); err != nil {
61+
return nil, err
62+
}
5663
}
5764

5865
client := &Client{
59-
Config: cfg,
6066
lock: &containerLock{
6167
ids: make(map[string]struct{}),
6268
},
6369
watch: &watch{
6470
containers: make(map[string]*containerPack),
6571
},
72+
daemonPid: -1,
73+
homeDir: homeDir,
74+
oomScoreAdjust: copts.oomScoreAdjust,
75+
debugLog: copts.debugLog,
76+
rpcAddr: copts.rpcAddr,
77+
}
78+
79+
// start new containerd instance.
80+
if copts.startDaemon {
81+
if err := client.runContainerdDaemon(homeDir, copts); err != nil {
82+
return nil, err
83+
}
6684
}
6785

68-
for i := 0; i < cfg.GrpcClientPoolCapacity; i++ {
69-
cli, err := newWrapperClient(cfg)
86+
for i := 0; i < copts.grpcClientPoolCapacity; i++ {
87+
cli, err := newWrapperClient(copts.rpcAddr, copts.maxStreamsClient)
7088
if err != nil {
7189
return nil, fmt.Errorf("failed to create containerd client: %v", err)
7290
}
7391
client.pool = append(client.pool, cli)
7492
}
7593

76-
logrus.Infof("success to create %d containerd clients, connect to: %s", cfg.GrpcClientPoolCapacity, cfg.Address)
94+
logrus.Infof("success to create %d containerd clients, connect to: %s", copts.grpcClientPoolCapacity, copts.rpcAddr)
7795

7896
scheduler, err := scheduler.NewLRUScheduler(client.pool)
7997
if err != nil {
@@ -166,3 +184,143 @@ func (c *Client) Version(ctx context.Context) (containerd.Version, error) {
166184

167185
return cli.client.Version(ctx)
168186
}
187+
188+
func (c *Client) runContainerdDaemon(homeDir string, copts clientOpts) error {
189+
if homeDir == "" {
190+
return fmt.Errorf("ctrd: containerd home dir should not be empty")
191+
}
192+
193+
containerdPath, err := exec.LookPath(copts.containerdBinary)
194+
if err != nil {
195+
return fmt.Errorf("failed to find containerd binary %s: %v", copts.containerdBinary, err)
196+
}
197+
198+
stateDir := path.Join(homeDir, "containerd/state")
199+
if _, err := os.Stat(stateDir); err != nil && os.IsNotExist(err) {
200+
if err := os.MkdirAll(stateDir, 0666); err != nil {
201+
return fmt.Errorf("failed to mkdir %s: %v", stateDir, err)
202+
}
203+
}
204+
205+
pidFileName := path.Join(stateDir, containerdPidFileName)
206+
f, err := os.OpenFile(pidFileName, os.O_RDWR|os.O_CREATE, 0600)
207+
if err != nil {
208+
return err
209+
}
210+
defer f.Close()
211+
212+
buf := make([]byte, 8)
213+
num, err := f.Read(buf)
214+
if err != nil && err != io.EOF {
215+
return err
216+
}
217+
218+
if num > 0 {
219+
pid, err := strconv.ParseUint(string(buf[:num]), 10, 64)
220+
if err != nil {
221+
return err
222+
}
223+
if utils.IsProcessAlive(int(pid)) {
224+
logrus.Infof("ctrd: previous instance of containerd still alive (%d)", pid)
225+
c.daemonPid = int(pid)
226+
return nil
227+
}
228+
}
229+
230+
// empty container pid file
231+
_, err = f.Seek(0, os.SEEK_SET)
232+
if err != nil {
233+
return err
234+
}
235+
236+
if err := f.Truncate(0); err != nil {
237+
return err
238+
}
239+
240+
// if socket file exists, delete it.
241+
if _, err := os.Stat(c.rpcAddr); err == nil {
242+
os.RemoveAll(c.rpcAddr)
243+
}
244+
245+
cmd, err := c.newContainerdCmd(containerdPath)
246+
if err != nil {
247+
return err
248+
}
249+
250+
if err := utils.SetOOMScore(cmd.Process.Pid, c.oomScoreAdjust); err != nil {
251+
utils.KillProcess(cmd.Process.Pid)
252+
return err
253+
}
254+
255+
if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
256+
utils.KillProcess(cmd.Process.Pid)
257+
return err
258+
}
259+
260+
go cmd.Wait()
261+
262+
c.daemonPid = cmd.Process.Pid
263+
return nil
264+
}
265+
266+
func (c *Client) newContainerdCmd(containerdPath string) (*exec.Cmd, error) {
267+
// Start a new containerd instance
268+
args := []string{
269+
"-a", c.rpcAddr,
270+
"--root", path.Join(c.homeDir, "containerd/root"),
271+
"--state", path.Join(c.homeDir, "containerd/state"),
272+
"-l", utils.If(c.debugLog, "debug", "info").(string),
273+
}
274+
275+
cmd := exec.Command(containerdPath, args...)
276+
cmd.Stdout = os.Stdout
277+
cmd.Stderr = os.Stderr
278+
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true, Pdeathsig: syscall.SIGKILL}
279+
cmd.Env = nil
280+
// clear the NOTIFY_SOCKET from the env when starting containerd
281+
for _, e := range os.Environ() {
282+
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
283+
cmd.Env = append(cmd.Env, e)
284+
}
285+
}
286+
287+
if err := cmd.Start(); err != nil {
288+
return nil, err
289+
}
290+
291+
logrus.Infof("ctrd: new containerd process, pid: %d", cmd.Process.Pid)
292+
return cmd, nil
293+
}
294+
295+
// Cleanup handle containerd instance exits.
296+
func (c *Client) Cleanup() error {
297+
if c.daemonPid == -1 {
298+
return nil
299+
}
300+
301+
if err := c.Close(); err != nil {
302+
return err
303+
}
304+
305+
// Ask the daemon to quit
306+
syscall.Kill(c.daemonPid, syscall.SIGTERM)
307+
308+
// Wait up to 15secs for it to stop
309+
for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second {
310+
if !utils.IsProcessAlive(c.daemonPid) {
311+
break
312+
}
313+
time.Sleep(time.Second)
314+
}
315+
316+
if utils.IsProcessAlive(c.daemonPid) {
317+
logrus.Warnf("ctrd: containerd (%d) didn't stop within 15secs, killing it\n", c.daemonPid)
318+
syscall.Kill(c.daemonPid, syscall.SIGKILL)
319+
}
320+
321+
// cleanup some files
322+
os.Remove(path.Join(c.homeDir, "containerd/state", containerdPidFileName))
323+
os.Remove(c.rpcAddr)
324+
325+
return nil
326+
}

ctrd/client_opts.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package ctrd
2+
3+
import "fmt"
4+
5+
type clientOpts struct {
6+
startDaemon bool
7+
debugLog bool
8+
rpcAddr string
9+
homeDir string
10+
containerdBinary string
11+
grpcClientPoolCapacity int
12+
maxStreamsClient int
13+
oomScoreAdjust int
14+
}
15+
16+
// ClientOpt allows caller to set options for containerd client.
17+
type ClientOpt func(c *clientOpts) error
18+
19+
// WithStartDaemon set startDaemon flag for containerd client.
20+
// startDaemon is a flag to decide whether start a new containerd instance
21+
// when create a containerd client.
22+
func WithStartDaemon(startDaemon bool) ClientOpt {
23+
return func(c *clientOpts) error {
24+
c.startDaemon = startDaemon
25+
return nil
26+
}
27+
}
28+
29+
// WithRPCAddr set containerd listen address.
30+
func WithRPCAddr(rpcAddr string) ClientOpt {
31+
return func(c *clientOpts) error {
32+
if rpcAddr == "" {
33+
return fmt.Errorf("rpc socket path is empty")
34+
}
35+
36+
c.rpcAddr = rpcAddr
37+
return nil
38+
}
39+
}
40+
41+
// WithDebugLog set debugLog flag for containerd client.
42+
// debugLog decides containerd log level.
43+
func WithDebugLog(debugLog bool) ClientOpt {
44+
return func(c *clientOpts) error {
45+
c.debugLog = debugLog
46+
return nil
47+
}
48+
}
49+
50+
// WithHomeDir set home dir for containerd.
51+
func WithHomeDir(homeDir string) ClientOpt {
52+
return func(c *clientOpts) error {
53+
if homeDir == "" {
54+
return fmt.Errorf("containerd home Dir is empty")
55+
}
56+
57+
c.homeDir = homeDir
58+
return nil
59+
}
60+
}
61+
62+
// WithContainerdBinary specifies the containerd binary path.
63+
func WithContainerdBinary(containerdBinary string) ClientOpt {
64+
return func(c *clientOpts) error {
65+
if containerdBinary == "" {
66+
return fmt.Errorf("containerd binary path is empty")
67+
}
68+
69+
c.containerdBinary = containerdBinary
70+
return nil
71+
}
72+
}
73+
74+
// WithGrpcClientPoolCapacity sets containerd clients pool capacity.
75+
func WithGrpcClientPoolCapacity(grpcClientPoolCapacity int) ClientOpt {
76+
return func(c *clientOpts) error {
77+
if grpcClientPoolCapacity <= 0 {
78+
return fmt.Errorf("containerd clients pool capacity should positive number")
79+
}
80+
81+
c.grpcClientPoolCapacity = grpcClientPoolCapacity
82+
return nil
83+
}
84+
}
85+
86+
// WithMaxStreamsClient sets one containerd grpc client can hold max streams client.
87+
func WithMaxStreamsClient(maxStreamsClient int) ClientOpt {
88+
return func(c *clientOpts) error {
89+
90+
if maxStreamsClient <= 0 {
91+
return fmt.Errorf("containerd max streams client should be positive number")
92+
}
93+
94+
c.maxStreamsClient = maxStreamsClient
95+
return nil
96+
}
97+
}
98+
99+
// WithOOMScoreAdjust sets oom-score for containerd instance.
100+
func WithOOMScoreAdjust(oomScore int) ClientOpt {
101+
return func(c *clientOpts) error {
102+
if oomScore > 1000 || oomScore < -1000 {
103+
return fmt.Errorf("oom-score range should be [-1000, 1000]")
104+
}
105+
106+
c.oomScoreAdjust = oomScore
107+
return nil
108+
}
109+
}

ctrd/client_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import (
77

88
func TestNewClient(t *testing.T) {
99
type args struct {
10-
cfg Config
10+
homeDir string
11+
opts []ClientOpt
1112
}
1213
tests := []struct {
1314
name string
@@ -19,7 +20,7 @@ func TestNewClient(t *testing.T) {
1920
}
2021
for _, tt := range tests {
2122
t.Run(tt.name, func(t *testing.T) {
22-
got, err := NewClient(tt.args.cfg)
23+
got, err := NewClient(tt.args.homeDir, tt.args.opts...)
2324
if (err != nil) != tt.wantErr {
2425
t.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr)
2526
return

ctrd/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type APIClient interface {
2020
SnapshotAPIClient
2121

2222
Version(ctx context.Context) (containerd.Version, error)
23+
Cleanup() error
2324
}
2425

2526
// ContainerAPIClient provides access to containerd container features.

0 commit comments

Comments
 (0)