Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"syscall"
"time"

"github.com/alibaba/pouch/pkg/kmutex"
"github.com/alibaba/pouch/pkg/scheduler"
"github.com/alibaba/pouch/pkg/utils"

Expand All @@ -29,6 +30,10 @@ const (
defaultGrpcClientPoolCapacity = 5
defaultMaxStreamsClient = 100
containerdShutdownTimeout = 15 * time.Second

// trylock and lock with timeout is a policy used when accessing containerd object in ctrd.
// To avoid infinite block and directly return, we add a timeout to support object lock.
containerdObjectLockTimeout = time.Duration(5 * time.Second)
)

// ErrGetCtrdClient is an error returned when failed to get a containerd grpc client from clients pool.
Expand All @@ -38,7 +43,9 @@ var ErrGetCtrdClient = errors.New("failed to get a containerd grpc client")
type Client struct {
mu sync.RWMutex
watch *watch
lock *containerLock

// lock ensures that only one caller of client can get access to the same object in containerd.
lock *kmutex.KMutex

daemonPid int
homeDir string
Expand Down Expand Up @@ -72,9 +79,7 @@ func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) {
}

client := &Client{
lock: &containerLock{
ids: make(map[string]struct{}),
},
lock: kmutex.New(),
watch: &watch{
containers: make(map[string]*containerPack),
},
Expand Down Expand Up @@ -416,3 +421,13 @@ func (c *Client) collectContainerdEvents() {
}
}
}

// Trylock tries to lock the object in containerd.
func (c *Client) Trylock(id string) bool {
return c.lock.LockWithTimeout(id, containerdObjectLockTimeout)
}

// Unlock unlocks the object in containerd.
func (c *Client) Unlock(id string) {
c.lock.Unlock(id)
}
54 changes: 27 additions & 27 deletions ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func (c *Client) ContainerStats(ctx context.Context, id string) (*containerdtype

// containerStats returns stats of the container.
func (c *Client) containerStats(ctx context.Context, id string) (*containerdtypes.Metric, error) {
if !c.lock.Trylock(id) {
return nil, errtypes.ErrLockfailed
if !c.Trylock(id) {
return nil, LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -196,10 +196,10 @@ func (c *Client) ContainerPIDs(ctx context.Context, id string) ([]int, error) {

// containerPIDs returns the all processes's ids inside the container.
func (c *Client) containerPIDs(ctx context.Context, id string) ([]int, error) {
if !c.lock.Trylock(id) {
return nil, errtypes.ErrLockfailed
if !c.Trylock(id) {
return nil, LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -255,10 +255,10 @@ func (c *Client) recoverContainer(ctx context.Context, id string, io *containeri
return fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

lc, err := wrapperCli.client.LoadContainer(ctx, id)
if err != nil {
Expand Down Expand Up @@ -317,10 +317,10 @@ func (c *Client) destroyContainer(ctx context.Context, id string, timeout int64)

ctx = leases.WithLease(ctx, wrapperCli.lease.ID())

if !c.lock.Trylock(id) {
return nil, errtypes.ErrLockfailed
if !c.Trylock(id) {
return nil, LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -385,10 +385,10 @@ func (c *Client) PauseContainer(ctx context.Context, id string) error {

// pauseContainer pause container.
func (c *Client) pauseContainer(ctx context.Context, id string) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down Expand Up @@ -416,10 +416,10 @@ func (c *Client) UnpauseContainer(ctx context.Context, id string) error {

// unpauseContainer unpauses a container.
func (c *Client) unpauseContainer(ctx context.Context, id string) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand All @@ -444,10 +444,10 @@ func (c *Client) CreateContainer(ctx context.Context, container *Container, chec
id = container.ID
)

if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

if err := c.createContainer(ctx, ref, id, checkpointDir, container); err != nil {
return convertCtrdErr(err)
Expand Down Expand Up @@ -596,10 +596,10 @@ func (c *Client) UpdateResources(ctx context.Context, id string, resources types

// updateResources updates the configurations of a container.
func (c *Client) updateResources(ctx context.Context, id string, resources types.Resources) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand All @@ -626,10 +626,10 @@ func (c *Client) ResizeContainer(ctx context.Context, id string, opts types.Resi
// resizeContainer changes the size of the TTY of the init process running
// in the container to the given height and width.
func (c *Client) resizeContainer(ctx context.Context, id string, opts types.ResizeOptions) error {
if !c.lock.Trylock(id) {
return errtypes.ErrLockfailed
if !c.Trylock(id) {
return LockFailedError(id)
}
defer c.lock.Unlock(id)
defer c.Unlock(id)

pack, err := c.watch.get(id)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions ctrd/container_lock.go

This file was deleted.

53 changes: 0 additions & 53 deletions ctrd/container_lock_test.go

This file was deleted.

5 changes: 5 additions & 0 deletions ctrd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func convertCtrdErr(err error) error {

return err
}

// LockFailedError is constructing a much more readable lock failed error.
func LockFailedError(containerID string) error {
return errors.Wrapf(errtypes.ErrLockfailed, "container %s is accessed by other request and please try again", containerID)
}
26 changes: 26 additions & 0 deletions ctrd/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,29 @@ func Test_convertCtrdErr(t *testing.T) {
})
}
}

func TestLockFailedError(t *testing.T) {
type args struct {
containerID string
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "normal test case",
args: args{
containerID: "asdfghjkl",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := LockFailedError(tt.args.containerID); (err != nil) != tt.wantErr {
t.Errorf("LockFailedError() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Loading