diff --git a/ctrd/client.go b/ctrd/client.go index 48d0c4133..ca0fe5323 100644 --- a/ctrd/client.go +++ b/ctrd/client.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "github.com/alibaba/pouch/pkg/kmutex" "github.com/alibaba/pouch/pkg/scheduler" "github.com/alibaba/pouch/pkg/utils" @@ -29,6 +30,10 @@ const ( defaultGrpcClientPoolCapacity = 5 defaultMaxStreamsClient = 100 containerdShutdownTimeout = 15 * time.Second + + // trylock and lock with timeout is a policy used when accessing containerd object in ctrd. + // To avoid infinite block and directly return, we add a timeout to support object lock. + containerdObjectLockTimeout = time.Duration(5 * time.Second) ) // ErrGetCtrdClient is an error returned when failed to get a containerd grpc client from clients pool. @@ -38,7 +43,9 @@ var ErrGetCtrdClient = errors.New("failed to get a containerd grpc client") type Client struct { mu sync.RWMutex watch *watch - lock *containerLock + + // lock ensures that only one caller of client can get access to the same object in containerd. + lock *kmutex.KMutex daemonPid int homeDir string @@ -72,9 +79,7 @@ func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) { } client := &Client{ - lock: &containerLock{ - ids: make(map[string]struct{}), - }, + lock: kmutex.New(), watch: &watch{ containers: make(map[string]*containerPack), }, @@ -416,3 +421,13 @@ func (c *Client) collectContainerdEvents() { } } } + +// Trylock tries to lock the object in containerd. +func (c *Client) Trylock(id string) bool { + return c.lock.LockWithTimeout(id, containerdObjectLockTimeout) +} + +// Unlock unlocks the object in containerd. +func (c *Client) Unlock(id string) { + c.lock.Unlock(id) +} diff --git a/ctrd/container.go b/ctrd/container.go index fd5ac4347..17c0eca83 100644 --- a/ctrd/container.go +++ b/ctrd/container.go @@ -55,10 +55,10 @@ func (c *Client) ContainerStats(ctx context.Context, id string) (*containerdtype // containerStats returns stats of the container. func (c *Client) containerStats(ctx context.Context, id string) (*containerdtypes.Metric, error) { - if !c.lock.Trylock(id) { - return nil, errtypes.ErrLockfailed + if !c.Trylock(id) { + return nil, LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { @@ -196,10 +196,10 @@ func (c *Client) ContainerPIDs(ctx context.Context, id string) ([]int, error) { // containerPIDs returns the all processes's ids inside the container. func (c *Client) containerPIDs(ctx context.Context, id string) ([]int, error) { - if !c.lock.Trylock(id) { - return nil, errtypes.ErrLockfailed + if !c.Trylock(id) { + return nil, LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { @@ -255,10 +255,10 @@ func (c *Client) recoverContainer(ctx context.Context, id string, io *containeri return fmt.Errorf("failed to get a containerd grpc client: %v", err) } - if !c.lock.Trylock(id) { - return errtypes.ErrLockfailed + if !c.Trylock(id) { + return LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) lc, err := wrapperCli.client.LoadContainer(ctx, id) if err != nil { @@ -317,10 +317,10 @@ func (c *Client) destroyContainer(ctx context.Context, id string, timeout int64) ctx = leases.WithLease(ctx, wrapperCli.lease.ID()) - if !c.lock.Trylock(id) { - return nil, errtypes.ErrLockfailed + if !c.Trylock(id) { + return nil, LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { @@ -385,10 +385,10 @@ func (c *Client) PauseContainer(ctx context.Context, id string) error { // pauseContainer pause container. func (c *Client) pauseContainer(ctx context.Context, id string) error { - if !c.lock.Trylock(id) { - return errtypes.ErrLockfailed + if !c.Trylock(id) { + return LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { @@ -416,10 +416,10 @@ func (c *Client) UnpauseContainer(ctx context.Context, id string) error { // unpauseContainer unpauses a container. func (c *Client) unpauseContainer(ctx context.Context, id string) error { - if !c.lock.Trylock(id) { - return errtypes.ErrLockfailed + if !c.Trylock(id) { + return LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { @@ -444,10 +444,10 @@ func (c *Client) CreateContainer(ctx context.Context, container *Container, chec id = container.ID ) - if !c.lock.Trylock(id) { - return errtypes.ErrLockfailed + if !c.Trylock(id) { + return LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) if err := c.createContainer(ctx, ref, id, checkpointDir, container); err != nil { return convertCtrdErr(err) @@ -596,10 +596,10 @@ func (c *Client) UpdateResources(ctx context.Context, id string, resources types // updateResources updates the configurations of a container. func (c *Client) updateResources(ctx context.Context, id string, resources types.Resources) error { - if !c.lock.Trylock(id) { - return errtypes.ErrLockfailed + if !c.Trylock(id) { + return LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { @@ -626,10 +626,10 @@ func (c *Client) ResizeContainer(ctx context.Context, id string, opts types.Resi // resizeContainer changes the size of the TTY of the init process running // in the container to the given height and width. func (c *Client) resizeContainer(ctx context.Context, id string, opts types.ResizeOptions) error { - if !c.lock.Trylock(id) { - return errtypes.ErrLockfailed + if !c.Trylock(id) { + return LockFailedError(id) } - defer c.lock.Unlock(id) + defer c.Unlock(id) pack, err := c.watch.get(id) if err != nil { diff --git a/ctrd/container_lock.go b/ctrd/container_lock.go deleted file mode 100644 index 83687f197..000000000 --- a/ctrd/container_lock.go +++ /dev/null @@ -1,29 +0,0 @@ -package ctrd - -import ( - "sync" -) - -// containerLock use to make sure that only one operates the container at the same time. -type containerLock struct { - mutex sync.Mutex - ids map[string]struct{} -} - -func (l *containerLock) Trylock(id string) bool { - l.mutex.Lock() - defer l.mutex.Unlock() - - _, ok := l.ids[id] - if !ok { - l.ids[id] = struct{}{} - return true - } - return false -} - -func (l *containerLock) Unlock(id string) { - l.mutex.Lock() - defer l.mutex.Unlock() - delete(l.ids, id) -} diff --git a/ctrd/container_lock_test.go b/ctrd/container_lock_test.go deleted file mode 100644 index 9809bb2df..000000000 --- a/ctrd/container_lock_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package ctrd - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_containerLock_Trylock(t *testing.T) { - l := &containerLock{ - ids: make(map[string]struct{}), - } - - assert.Equal(t, len(l.ids), 0) - - // lock a new element - ok := l.Trylock("element1") - assert.Equal(t, ok, true) - assert.Equal(t, len(l.ids), 1) - assert.Equal(t, l.ids["element1"], struct{}{}) - - // lock an existent element - ok = l.Trylock("element1") - assert.Equal(t, ok, false) - assert.Equal(t, len(l.ids), 1) - assert.Equal(t, l.ids["element1"], struct{}{}) - - // lock another new element - ok = l.Trylock("element2") - assert.Equal(t, ok, true) - assert.Equal(t, len(l.ids), 2) - assert.Equal(t, l.ids["element1"], struct{}{}) -} - -func Test_containerLock_Unlock(t *testing.T) { - l := &containerLock{ - ids: make(map[string]struct{}), - } - - // unlock a non-existent element - l.Unlock("non-existent") - assert.Equal(t, len(l.ids), 0) - - // lock a new element - ok := l.Trylock("element1") - assert.Equal(t, ok, true) - assert.Equal(t, len(l.ids), 1) - assert.Equal(t, l.ids["element1"], struct{}{}) - - // unlock an existent element - l.Unlock("element1") - assert.Equal(t, len(l.ids), 0) -} diff --git a/ctrd/utils.go b/ctrd/utils.go index 1c14f2d75..c22bac145 100644 --- a/ctrd/utils.go +++ b/ctrd/utils.go @@ -197,3 +197,8 @@ func convertCtrdErr(err error) error { return err } + +// LockFailedError is constructing a much more readable lock failed error. +func LockFailedError(containerID string) error { + return errors.Wrapf(errtypes.ErrLockfailed, "container %s is accessed by other request and please try again", containerID) +} diff --git a/ctrd/utils_test.go b/ctrd/utils_test.go index c67b92b9e..650203ce7 100644 --- a/ctrd/utils_test.go +++ b/ctrd/utils_test.go @@ -81,3 +81,29 @@ func Test_convertCtrdErr(t *testing.T) { }) } } + +func TestLockFailedError(t *testing.T) { + type args struct { + containerID string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "normal test case", + args: args{ + containerID: "asdfghjkl", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := LockFailedError(tt.args.containerID); (err != nil) != tt.wantErr { + t.Errorf("LockFailedError() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/kmutex/kmutex.go b/pkg/kmutex/kmutex.go index cd67f3913..9aea2bce7 100644 --- a/pkg/kmutex/kmutex.go +++ b/pkg/kmutex/kmutex.go @@ -6,22 +6,34 @@ import ( "time" ) +// value is the lock details of a race instance. type value struct { - c chan struct{} + // c is channel to identify whether this race instance is released. + // If c is filled up, it means released. + c chan struct{} + + // waits means how many callers are waiting for the lock. + // When caller wants to Lock and has not finished, waits would increase. + // And when caller finishes to lock (no matter fails or succeeds), it decreases. waits int32 } -// KMutex is a lock implement. One key can acquire only one lock. There is no -// lock between different keys. +// KMutex is a lock implement. One key can acquire only one lock. +// There is no lock between different keys. type KMutex struct { sync.Mutex - keys map[string]*value + // lockedKeys contains the race instances. + // Assuming that there is a race instance and it is stored in lockedKeys with a key, + // it means that this race instance is used by one caller, no one can get this instance any longer. + // If there is no key in lockedKeys representing the race instance, one could get the instance + // and set the key of it in lockedKeys. + lockedKeys map[string]*value } // New creates a KMutex instance. func New() *KMutex { m := &KMutex{ - keys: make(map[string]*value), + lockedKeys: make(map[string]*value), } ticker := time.NewTicker(time.Minute) @@ -30,14 +42,12 @@ func New() *KMutex { <-ticker.C m.Mutex.Lock() - - for k, v := range m.keys { + for k, v := range m.lockedKeys { if v.waits == 0 { - delete(m.keys, k) + delete(m.lockedKeys, k) close(v.c) } } - m.Mutex.Unlock() } }() @@ -45,20 +55,23 @@ func New() *KMutex { return m } -func (m *KMutex) lock(k string) (*value, bool) { - v, ok := m.keys[k] - if !ok { - m.keys[k] = &value{ - c: make(chan struct{}, 1), - waits: 0, - } - return nil, true +// lock adds the key in the lockedKeys. +// It means that key which represents a race instance is accessed by the caller. +func (m *KMutex) lock(key string) (*value, bool) { + v, ok := m.lockedKeys[key] + if ok { + // return false the the value if it has been locked. + return v, false } - - return v, false + m.lockedKeys[key] = &value{ + c: make(chan struct{}, 1), + waits: 0, + } + return nil, true } -// Trylock trys to lock, will not block. +// Trylock tries to lock. It will not block the caller. +// No matter lock fails or succeeds, function returns immediately. func (m *KMutex) Trylock(k string) bool { m.Mutex.Lock() defer m.Mutex.Unlock() @@ -68,15 +81,19 @@ func (m *KMutex) Trylock(k string) bool { return true } + // the k has already been locked. select { case <-v.c: + // the locker has released the lock. return true default: + // return false immediately if someone locked it. return false } } -// LockWithTimeout trys to lock, if can't acquire the lock, will block util timeout. +// LockWithTimeout tries to lock. +// It can't acquire the lock, will block util timeout. func (m *KMutex) LockWithTimeout(k string, to time.Duration) bool { m.Mutex.Lock() @@ -86,6 +103,7 @@ func (m *KMutex) LockWithTimeout(k string, to time.Duration) bool { return true } + // the k has already been locked. atomic.AddInt32(&v.waits, 1) defer atomic.AddInt32(&v.waits, -1) @@ -93,8 +111,10 @@ func (m *KMutex) LockWithTimeout(k string, to time.Duration) bool { select { case <-v.c: + // the locker has released the lock. return true case <-time.After(to): + // timeout before get the released lock. return false } } @@ -119,12 +139,15 @@ func (m *KMutex) Lock(k string) bool { } // Unlock release the lock. +// If the key does not exist, return immediately. +// Otherwise, fill up the chan to broadcast that someone released the lock. func (m *KMutex) Unlock(k string) { m.Mutex.Lock() defer m.Mutex.Unlock() - v, ok := m.keys[k] + v, ok := m.lockedKeys[k] if ok && len(v.c) == 0 { + // filled up the chan to identify caller has released the lock. v.c <- struct{}{} } }