Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package config

const (
// K8sNamespace is the namespace we use to connect containerd when CRI is enabled.
K8sNamespace = "k8s.io"
)

// Config defines the CRI configuration.
type Config struct {
// Listen is the listening address which servers CRI.
Expand Down
88 changes: 85 additions & 3 deletions cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ const (

// resolvConfPath is the abs path of resolv.conf on host or container.
resolvConfPath = "/etc/resolv.conf"

// statsCollectPeriod is the time duration (in time.Second) we sync stats from containerd.
statsCollectPeriod = 10

// defaultSnapshotterName is the default Snapshotter name.
defaultSnapshotterName = "overlayfs"

// snapshotPlugin implements a snapshotter.
snapshotPlugin = "io.containerd.snapshotter.v1"
)

var (
Expand Down Expand Up @@ -96,6 +105,12 @@ type CriManager struct {
SandboxImage string
// SandboxStore stores the configuration of sandboxes.
SandboxStore *meta.Store

// SnapshotStore stores information of all snapshots.
SnapshotStore *mgr.SnapshotStore

// ImageFSUUID is the device uuid of image filesystem.
ImageFSUUID string
}

// NewCriManager creates a brand new cri manager.
Expand All @@ -112,6 +127,7 @@ func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.Im
StreamServer: streamServer,
SandboxBaseDir: path.Join(config.HomeDir, "sandboxes"),
SandboxImage: config.CriConfig.SandboxImage,
SnapshotStore: mgr.NewSnapshotStore(),
}

c.SandboxStore, err = meta.NewStore(meta.Config{
Expand All @@ -128,6 +144,18 @@ func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.Im
return nil, fmt.Errorf("failed to create sandbox meta store: %v", err)
}

imageFSPath := imageFSPath(path.Join(config.HomeDir, "containerd/root"), defaultSnapshotterName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should add utils' function to retrieve the root folder in the project next step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate PR will be submitted to deal with similar things.

c.ImageFSUUID, err = getDeviceUUID(imageFSPath)
if err != nil {
return nil, fmt.Errorf("failed to get imagefs uuid of %q: %v", imageFSPath, err)
}

snapshotsSyncer := ctrMgr.NewSnapshotsSyncer(
c.SnapshotStore,
time.Duration(statsCollectPeriod)*time.Second,
)
snapshotsSyncer.Start()

return NewCriWrapper(c), nil
}

Expand Down Expand Up @@ -709,12 +737,46 @@ func (c *CriManager) ContainerStatus(ctx context.Context, r *runtime.ContainerSt
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
func (c *CriManager) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (*runtime.ContainerStatsResponse, error) {
return nil, fmt.Errorf("ContainerStats Not Implemented Yet")
containerID := r.GetContainerId()

container, err := c.ContainerMgr.Get(ctx, containerID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a function to get the cs? If we have the function, we will reuse the function in the ListContainerStats. Don't repeat yourself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is not necessary to do that, the function getContainerMetrics has been encapsulated, what has been repeated is just error handling.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The function like generateContainerStatInfo wraps the c.ContainerMgr.Stats and c.getContainerMetrics to avoid the repeat the same thing. In the future, you only need to update the wrapped function.

Or use the getContainerMetrics to contains the c.ContainerMgr.Stats, WDTY?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit it's more elegant. Thank you.

if err != nil {
return nil, fmt.Errorf("failed to get container %q with error: %v", containerID, err)
}

cs, err := c.getContainerMetrics(ctx, container)
if err != nil {
return nil, fmt.Errorf("failed to decode container metrics: %v", err)
}

return &runtime.ContainerStatsResponse{Stats: cs}, nil
}

// ListContainerStats returns stats of all running containers.
func (c *CriManager) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (*runtime.ListContainerStatsResponse, error) {
return nil, fmt.Errorf("ListContainerStats Not Implemented Yet")
opts := &mgr.ContainerListOption{All: true}
filter := func(c *mgr.Container) bool {
return true
}
opts.FilterFunc = filter

containers, err := c.ContainerMgr.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to list containers: %v", err)
}

result := &runtime.ListContainerStatsResponse{}
for _, container := range containers {
cs, err := c.getContainerMetrics(ctx, container)
if err != nil {
logrus.Errorf("failed to decode metrics of container %q: %v", container.ID, err)
continue
}

result.Stats = append(result.Stats, cs)
}

return result, nil
}

// UpdateContainerResources updates ContainerConfig of the container.
Expand Down Expand Up @@ -962,5 +1024,25 @@ func (c *CriManager) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequ

// ImageFsInfo returns information of the filesystem that is used to store images.
func (c *CriManager) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
return nil, fmt.Errorf("ImageFsInfo Not Implemented Yet")
snapshots := c.SnapshotStore.List()
timestamp := time.Now().UnixNano()
var usedBytes, inodesUsed uint64
for _, sn := range snapshots {
// Use the oldest timestamp as the timestamp of imagefs info.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more information about this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp is latest time which the information were collected, we hava to be consistent. So I choose the oldest timestamp as the timestamp of imagefs info here.

if sn.Timestamp < timestamp {
timestamp = sn.Timestamp
}
usedBytes += sn.Size
inodesUsed += sn.Inodes
}
return &runtime.ImageFsInfoResponse{
ImageFilesystems: []*runtime.FilesystemUsage{
{
Timestamp: timestamp,
StorageId: &runtime.StorageIdentifier{Uuid: c.ImageFSUUID},
UsedBytes: &runtime.UInt64Value{Value: usedBytes},
InodesUsed: &runtime.UInt64Value{Value: inodesUsed},
},
},
}, nil
}
173 changes: 163 additions & 10 deletions cri/v1alpha1/cri_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,28 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

apitypes "github.com/alibaba/pouch/apis/types"
anno "github.com/alibaba/pouch/cri/annotations"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/utils"
"github.com/go-openapi/strfmt"

"github.com/containerd/cgroups"
containerdmount "github.com/containerd/containerd/mount"
"github.com/containerd/typeurl"
"github.com/go-openapi/strfmt"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)

const uuidDir = "/dev/disk/by-uuid"

func parseUint32(s string) (uint32, error) {
n, err := strconv.ParseUint(s, 10, 32)
if err != nil {
Expand Down Expand Up @@ -265,6 +273,10 @@ func makeSandboxPouchConfig(config *runtime.PodSandboxConfig, image string) (*ap
return nil, err
}

// Apply resource options.
if lc := config.GetLinux(); lc != nil {
hc.CgroupParent = lc.CgroupParent
}
return createConfig, nil
}

Expand All @@ -284,11 +296,17 @@ func toCriSandbox(c *mgr.Container) (*runtime.PodSandbox, error) {
return nil, err
}
labels, annotations := extractLabels(c.Config.Labels)

createdAt, err := toCriTimestamp(c.Created)
if err != nil {
return nil, fmt.Errorf("failed to parse create timestamp for container %q: %v", c.ID, err)
}

return &runtime.PodSandbox{
Id: c.ID,
Metadata: metadata,
State: state,
// TODO: fill "CreatedAt" when it is appropriate.
Id: c.ID,
Metadata: metadata,
State: state,
CreatedAt: createdAt,
Labels: labels,
Annotations: annotations,
}, nil
Expand Down Expand Up @@ -625,15 +643,26 @@ func (c *CriManager) updateCreateConfig(createConfig *apitypes.ContainerCreateCo
}

if lc := config.GetLinux(); lc != nil {
// TODO: resource restriction.
resources := lc.GetResources()
if resources != nil {
createConfig.HostConfig.Resources.CPUPeriod = resources.GetCpuPeriod()
createConfig.HostConfig.Resources.CPUQuota = resources.GetCpuQuota()
createConfig.HostConfig.Resources.CPUShares = resources.GetCpuShares()
createConfig.HostConfig.Resources.Memory = resources.GetMemoryLimitInBytes()
createConfig.HostConfig.Resources.CpusetCpus = resources.GetCpusetCpus()
createConfig.HostConfig.Resources.CpusetMems = resources.GetCpusetMems()
}

// Apply security context.
if err := applyContainerSecurityContext(lc, podSandboxID, &createConfig.ContainerConfig, createConfig.HostConfig); err != nil {
return fmt.Errorf("failed to apply container security context for container %q: %v", config.Metadata.Name, err)
}
}

// TODO: apply cgroupParent derived from the sandbox config.
// Apply cgroupsParent derived from the sandbox config.
if sandboxConfig.GetLinux().GetCgroupParent() != "" {
createConfig.HostConfig.CgroupParent = sandboxConfig.GetLinux().GetCgroupParent()
}

return nil
}
Expand All @@ -660,16 +689,20 @@ func toCriContainer(c *mgr.Container) (*runtime.Container, error) {
labels, annotations := extractLabels(c.Config.Labels)
sandboxID := c.Config.Labels[sandboxIDLabelKey]

createdAt, err := toCriTimestamp(c.Created)
if err != nil {
return nil, fmt.Errorf("failed to parse create timestamp for container %q: %v", c.ID, err)
}
return &runtime.Container{
Id: c.ID,
PodSandboxId: sandboxID,
Metadata: metadata,
Image: &runtime.ImageSpec{Image: c.Config.Image},
ImageRef: c.Image,
State: state,
// TODO: fill "CreatedAt" when it is appropriate.
Labels: labels,
Annotations: annotations,
CreatedAt: createdAt,
Labels: labels,
Annotations: annotations,
}, nil
}

Expand Down Expand Up @@ -787,3 +820,123 @@ func parseUserFromImageUser(id string) string {
// no group, just return the id
return id
}

func (c *CriManager) getContainerMetrics(ctx context.Context, meta *mgr.Container) (*runtime.ContainerStats, error) {
var usedBytes, inodesUsed uint64

stats, err := c.ContainerMgr.Stats(ctx, meta.ID)
if err != nil {
return nil, fmt.Errorf("failed to get stats of container %q: %v", meta.ID, err)
}

// snapshot key may not equals container ID later
sn, err := c.SnapshotStore.Get(meta.ID)
if err == nil {
usedBytes = sn.Size
inodesUsed = sn.Inodes
}

cs := &runtime.ContainerStats{}
cs.WritableLayer = &runtime.FilesystemUsage{
Timestamp: sn.Timestamp,
StorageId: &runtime.StorageIdentifier{
Uuid: c.ImageFSUUID,
},
UsedBytes: &runtime.UInt64Value{usedBytes},
InodesUsed: &runtime.UInt64Value{inodesUsed},
}

metadata, err := parseContainerName(meta.Name)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of container %q: %v", meta.ID, err)
}

labels, annotations := extractLabels(meta.Config.Labels)

cs.Attributes = &runtime.ContainerAttributes{
Id: meta.ID,
Metadata: metadata,
Labels: labels,
Annotations: annotations,
}

if stats != nil {
s, err := typeurl.UnmarshalAny(stats.Data)
if err != nil {
return nil, fmt.Errorf("failed to extract container metrics: %v", err)
}
metrics := s.(*cgroups.Metrics)
if metrics.CPU != nil && metrics.CPU.Usage != nil {
cs.Cpu = &runtime.CpuUsage{
Timestamp: stats.Timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{metrics.CPU.Usage.Total},
}
}
if metrics.Memory != nil && metrics.Memory.Usage != nil {
cs.Memory = &runtime.MemoryUsage{
Timestamp: stats.Timestamp.UnixNano(),
WorkingSetBytes: &runtime.UInt64Value{metrics.Memory.Usage.Usage},
}
}
}

return cs, nil
}

// imageFSPath returns containerd image filesystem path.
func imageFSPath(rootDir, snapshotter string) string {
return filepath.Join(rootDir, fmt.Sprintf("%s.%s", snapshotPlugin, snapshotter))
}

// getDeviceUUID gets device uuid for a given path.
func getDeviceUUID(path string) (string, error) {
mount, err := lookupMount(path)
if err != nil {
return "", err
}
rdev := unix.Mkdev(uint32(mount.Major), uint32(mount.Minor))
return deviceUUID(rdev)
}

// lookupMount gets mount info of a given path.
func lookupMount(path string) (containerdmount.Info, error) {
return containerdmount.Lookup(path)
}

// deviceUUID gets device uuid of a device. The passed in rdev should
// be linux device number.
func deviceUUID(rdev uint64) (string, error) {
files, err := ioutil.ReadDir(uuidDir)
if err != nil {
return "", err
}
for _, file := range files {
path := filepath.Join(uuidDir, file.Name())

trdev, err := blkrdev(path)
if err != nil {
continue
}

if rdev == trdev {
return file.Name(), nil
}
}

return "", fmt.Errorf("device %d not found", rdev)
}

// blkdev returns the rdev of a block device or an error if not a block device
func blkrdev(device string) (uint64, error) {
info, err := os.Stat(device)
if err != nil {
return 0, err
}
if stat, ok := info.Sys().(*syscall.Stat_t); ok {
if (stat.Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return 0, fmt.Errorf("%s is not a block device", device)
}
return stat.Rdev, nil
}
return 0, fmt.Errorf("cannot get stat of device %s", device)
}
1 change: 1 addition & 0 deletions cri/v1alpha1/cri_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
Expand Down
Loading