Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 182 additions & 24 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,43 @@ package ctrd
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/alibaba/pouch/pkg/scheduler"
"github.com/alibaba/pouch/pkg/utils"

"github.com/containerd/containerd"
"github.com/sirupsen/logrus"
)

const (
unixSocketPath = "/run/containerd/containerd.sock"
containerdPidFileName = "containerd.pid"
defaultGrpcClientPoolCapacity = 5
defaultMaxStreamsClient = 100
containerdShutdownTimeout = 15 * time.Second
)

// Config represents the config used to communicated with containerd.
type Config struct {
Address string
// GrpcClientPoolCapacity is the capacity of grpc client pool.
GrpcClientPoolCapacity int
// MaxStreamsClient records the max number of concurrent streams
MaxStreamsClient int
}

// Client is the client side the daemon holds to communicate with containerd.
type Client struct {
mu sync.RWMutex
Config
mu sync.RWMutex
watch *watch
lock *containerLock

daemonPid int
homeDir string
rpcAddr string
oomScoreAdjust int
debugLog bool

// containerd grpc pool
pool []scheduler.Factory
scheduler scheduler.Scheduler
Expand All @@ -42,38 +48,50 @@ type Client struct {
}

// NewClient connect to containerd.
func NewClient(cfg Config) (APIClient, error) {
if cfg.Address == "" {
cfg.Address = unixSocketPath
}

if cfg.GrpcClientPoolCapacity <= 0 {
cfg.GrpcClientPoolCapacity = defaultGrpcClientPoolCapacity
func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) {
// set default value for parameters
copts := clientOpts{
rpcAddr: unixSocketPath,
grpcClientPoolCapacity: defaultGrpcClientPoolCapacity,
maxStreamsClient: defaultMaxStreamsClient,
}

if cfg.MaxStreamsClient <= 0 {
cfg.MaxStreamsClient = defaultMaxStreamsClient
for _, opt := range opts {
if err := opt(&copts); err != nil {
return nil, err
}
}

client := &Client{
Config: cfg,
lock: &containerLock{
ids: make(map[string]struct{}),
},
watch: &watch{
containers: make(map[string]*containerPack),
},
daemonPid: -1,
homeDir: homeDir,
oomScoreAdjust: copts.oomScoreAdjust,
debugLog: copts.debugLog,
rpcAddr: copts.rpcAddr,
}

// start new containerd instance.
if copts.startDaemon {
if err := client.runContainerdDaemon(homeDir, copts); err != nil {
return nil, err
}
}

for i := 0; i < cfg.GrpcClientPoolCapacity; i++ {
cli, err := newWrapperClient(cfg)
for i := 0; i < copts.grpcClientPoolCapacity; i++ {
cli, err := newWrapperClient(copts.rpcAddr, copts.maxStreamsClient)
if err != nil {
return nil, fmt.Errorf("failed to create containerd client: %v", err)
}
client.pool = append(client.pool, cli)
}

logrus.Infof("success to create %d containerd clients, connect to: %s", cfg.GrpcClientPoolCapacity, cfg.Address)
logrus.Infof("success to create %d containerd clients, connect to: %s", copts.grpcClientPoolCapacity, copts.rpcAddr)

scheduler, err := scheduler.NewLRUScheduler(client.pool)
if err != nil {
Expand Down Expand Up @@ -166,3 +184,143 @@ func (c *Client) Version(ctx context.Context) (containerd.Version, error) {

return cli.client.Version(ctx)
}

func (c *Client) runContainerdDaemon(homeDir string, copts clientOpts) error {
if homeDir == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use os.Stat to check homeDir here? If the homeDir doesn't exist, we should return error right away.
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the home-dir existence is checked in main.go, so we no need check here.
but we should check the stateDir here, thanks for your reminder, :)

return fmt.Errorf("ctrd: containerd home dir should not be empty")
}

containerdPath, err := exec.LookPath(copts.containerdBinary)
if err != nil {
return fmt.Errorf("failed to find containerd binary %s: %v", copts.containerdBinary, err)
}

stateDir := path.Join(homeDir, "containerd/state")
if _, err := os.Stat(stateDir); err != nil && os.IsNotExist(err) {
if err := os.MkdirAll(stateDir, 0666); err != nil {
return fmt.Errorf("failed to mkdir %s: %v", stateDir, err)
}
}

pidFileName := path.Join(stateDir, containerdPidFileName)
f, err := os.OpenFile(pidFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}
defer f.Close()

buf := make([]byte, 8)
num, err := f.Read(buf)
if err != nil && err != io.EOF {
return err
}

if num > 0 {
pid, err := strconv.ParseUint(string(buf[:num]), 10, 64)
if err != nil {
return err
}
if utils.IsProcessAlive(int(pid)) {
logrus.Infof("ctrd: previous instance of containerd still alive (%d)", pid)
c.daemonPid = int(pid)
return nil
}
}

// empty container pid file
_, err = f.Seek(0, os.SEEK_SET)
if err != nil {
return err
}

if err := f.Truncate(0); err != nil {
return err
}

// if socket file exists, delete it.
if _, err := os.Stat(c.rpcAddr); err == nil {
os.RemoveAll(c.rpcAddr)
}

cmd, err := c.newContainerdCmd(containerdPath)
if err != nil {
return err
}

if err := utils.SetOOMScore(cmd.Process.Pid, c.oomScoreAdjust); err != nil {
utils.KillProcess(cmd.Process.Pid)
return err
}

if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
utils.KillProcess(cmd.Process.Pid)
return err
}

go cmd.Wait()

c.daemonPid = cmd.Process.Pid
return nil
}

func (c *Client) newContainerdCmd(containerdPath string) (*exec.Cmd, error) {
// Start a new containerd instance
args := []string{
"-a", c.rpcAddr,
"--root", path.Join(c.homeDir, "containerd/root"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I don't make my suggestion clear in the last comment.

The containerd has own the configuration. The link is here. Maybe we can marshal the settings in the file so that we can use the file to check the information about pouchd.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the containerd core Opts have been in the poucd configuration file, and i think we no need keep much containerd configuration opts in pouchd code, if you want to add more configurations , use the containerd configuration file outside, DYA?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

"--state", path.Join(c.homeDir, "containerd/state"),
"-l", utils.If(c.debugLog, "debug", "info").(string),
}

cmd := exec.Command(containerdPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true, Pdeathsig: syscall.SIGKILL}
cmd.Env = nil
// clear the NOTIFY_SOCKET from the env when starting containerd
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
cmd.Env = append(cmd.Env, e)
}
}

if err := cmd.Start(); err != nil {
return nil, err
}

logrus.Infof("ctrd: new containerd process, pid: %d", cmd.Process.Pid)
return cmd, nil
}

// Cleanup handle containerd instance exits.
func (c *Client) Cleanup() error {
if c.daemonPid == -1 {
return nil
}

if err := c.Close(); err != nil {
return err
}

// Ask the daemon to quit
syscall.Kill(c.daemonPid, syscall.SIGTERM)

// Wait up to 15secs for it to stop
for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second {
if !utils.IsProcessAlive(c.daemonPid) {
break
}
time.Sleep(time.Second)
}

if utils.IsProcessAlive(c.daemonPid) {
logrus.Warnf("ctrd: containerd (%d) didn't stop within 15secs, killing it\n", c.daemonPid)
syscall.Kill(c.daemonPid, syscall.SIGKILL)
}

// cleanup some files
os.Remove(path.Join(c.homeDir, "containerd/state", containerdPidFileName))
os.Remove(c.rpcAddr)

return nil
}
109 changes: 109 additions & 0 deletions ctrd/client_opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package ctrd

import "fmt"

type clientOpts struct {
startDaemon bool
debugLog bool
rpcAddr string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have default value for the option? I'm wondering that the WithXXX is not necessary for every time. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not call WithXXX will set the Opts with default value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @HusterWan

// set default value for parameters
	if copts.rpcAddr == "" {
		copts.rpcAddr = unixSocketPath
	}
	client.rpcAddr = copts.rpcAddr

	if copts.grpcClientPoolCapacity <= 0 {
		copts.grpcClientPoolCapacity = defaultGrpcClientPoolCapacity
	}
	if copts.maxStreamsClient <= 0 {
		copts.maxStreamsClient = defaultMaxStreamsClient
	}

For this part, I means we can use the following code to handle

var defaultClientOpts = clientOpts{
rpcAddr: unixSocketPath,
grpcClientPoolCapacity: defaultGrpcClientPoolCapacity,
maxStreamsClient: defaultMaxStreamsClient
}

With the default value, we don't need to have the if-else code. :)

Hope it can help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid, i can not agree with you, default value take effect when not specified or invalid params

homeDir string
containerdBinary string
grpcClientPoolCapacity int
maxStreamsClient int
oomScoreAdjust int
}

// ClientOpt allows caller to set options for containerd client.
type ClientOpt func(c *clientOpts) error

// WithStartDaemon set startDaemon flag for containerd client.
// startDaemon is a flag to decide whether start a new containerd instance
// when create a containerd client.
func WithStartDaemon(startDaemon bool) ClientOpt {
return func(c *clientOpts) error {
c.startDaemon = startDaemon
return nil
}
}

// WithRPCAddr set containerd listen address.
func WithRPCAddr(rpcAddr string) ClientOpt {
return func(c *clientOpts) error {
if rpcAddr == "" {
return fmt.Errorf("rpc socket path is empty")
}

c.rpcAddr = rpcAddr
return nil
}
}

// WithDebugLog set debugLog flag for containerd client.
// debugLog decides containerd log level.
func WithDebugLog(debugLog bool) ClientOpt {
return func(c *clientOpts) error {
c.debugLog = debugLog
return nil
}
}

// WithHomeDir set home dir for containerd.
func WithHomeDir(homeDir string) ClientOpt {
return func(c *clientOpts) error {
if homeDir == "" {
return fmt.Errorf("containerd home Dir is empty")
}

c.homeDir = homeDir
return nil
}
}

// WithContainerdBinary specifies the containerd binary path.
func WithContainerdBinary(containerdBinary string) ClientOpt {
return func(c *clientOpts) error {
if containerdBinary == "" {
return fmt.Errorf("containerd binary path is empty")
}

c.containerdBinary = containerdBinary
return nil
}
}

// WithGrpcClientPoolCapacity sets containerd clients pool capacity.
func WithGrpcClientPoolCapacity(grpcClientPoolCapacity int) ClientOpt {
return func(c *clientOpts) error {
if grpcClientPoolCapacity <= 0 {
return fmt.Errorf("containerd clients pool capacity should positive number")
}

c.grpcClientPoolCapacity = grpcClientPoolCapacity
return nil
}
}

// WithMaxStreamsClient sets one containerd grpc client can hold max streams client.
func WithMaxStreamsClient(maxStreamsClient int) ClientOpt {
return func(c *clientOpts) error {

if maxStreamsClient <= 0 {
return fmt.Errorf("containerd max streams client should be positive number")
}

c.maxStreamsClient = maxStreamsClient
return nil
}
}

// WithOOMScoreAdjust sets oom-score for containerd instance.
func WithOOMScoreAdjust(oomScore int) ClientOpt {
return func(c *clientOpts) error {
if oomScore > 1000 || oomScore < -1000 {
return fmt.Errorf("oom-score range should be [-1000, 1000]")
}

c.oomScoreAdjust = oomScore
return nil
}
}
5 changes: 3 additions & 2 deletions ctrd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (

func TestNewClient(t *testing.T) {
type args struct {
cfg Config
homeDir string
opts []ClientOpt
}
tests := []struct {
name string
Expand All @@ -19,7 +20,7 @@ func TestNewClient(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewClient(tt.args.cfg)
got, err := NewClient(tt.args.homeDir, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
1 change: 1 addition & 0 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type APIClient interface {
SnapshotAPIClient

Version(ctx context.Context) (containerd.Version, error)
Cleanup() error
}

// ContainerAPIClient provides access to containerd container features.
Expand Down
Loading