diff --git a/daemon/containerio/container_io.go b/daemon/containerio/container_io.go index 98e358b13..b67899dcd 100644 --- a/daemon/containerio/container_io.go +++ b/daemon/containerio/container_io.go @@ -4,7 +4,7 @@ import ( "fmt" "io" - "github.com/alibaba/pouch/pkg/ringbuff" + "github.com/alibaba/pouch/pkg/ringbuffer" "github.com/sirupsen/logrus" ) @@ -96,7 +96,7 @@ type ContainerIO struct { typ stdioType closed bool // The stdin of all backends should put into ring first. - ring *ringbuff.RingBuff + ring *ringbuffer.RingBuffer } func (cio *ContainerIO) add(opt *Option, typ stdioType, backends map[string]containerBackend) { @@ -127,7 +127,7 @@ func create(opt *Option, typ stdioType, backends map[string]containerBackend) *C } if typ == stdin { - io.ring = ringbuff.New(10) + io.ring = ringbuffer.New(-1) for _, b := range backends { if b.backend.Name() == opt.stdinBackend { io.backends = append(io.backends, b) @@ -165,8 +165,8 @@ func createBackend(opt *Option) map[string]containerBackend { backends[backend.Name()] = containerBackend{ backend: backend, - outRing: ringbuff.New(10), - errRing: ringbuff.New(10), + outRing: ringbuffer.New(-1), + errRing: ringbuffer.New(-1), } } @@ -243,14 +243,26 @@ func (cio *ContainerIO) Write(data []byte) (int, error) { switch cio.typ { case stdout: for _, b := range cio.backends { - if cover := b.outRing.Push(copyData); cover { - logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id) + cover, err := b.outRing.Push(copyData) + // skip if it is closed ringbuffer + if err != nil { + continue + } + + if cover { + logrus.Warnf("cover stdout data, backend: %s, id: %s", b.backend.Name(), cio.id) } } case stderr: for _, b := range cio.backends { - if cover := b.errRing.Push(copyData); cover { - logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id) + cover, err := b.errRing.Push(copyData) + // skip if it is closed ringbuffer + if err != nil { + continue + } + + if cover { + logrus.Warnf("cover stderr data, backend: %s, id: %s", b.backend.Name(), cio.id) } } } @@ -260,14 +272,24 @@ func (cio *ContainerIO) Write(data []byte) (int, error) { // Close implements the standard Close interface. func (cio *ContainerIO) Close() error { + // FIXME(fuwei): stdin should be treated like stdout, stderr. + if cio.typ == stdin && cio.ring != nil { + // NOTE: let converge goroutine quit + cio.ring.Close() + } + for _, b := range cio.backends { // we need to close ringbuf before close backend, because close ring will flush // the remain data into backend. name := b.backend.Name() + b.outRing.Close() b.errRing.Close() - b.backend.Close() + if err := b.drainRingBuffer(); err != nil { + logrus.Warnf("failed to drain ringbuffer for backend: %s, id: %s", name, cio.id) + } + b.backend.Close() logrus.Infof("close containerio backend: %s, id: %s", name, cio.id) } @@ -275,30 +297,49 @@ func (cio *ContainerIO) Close() error { return nil } +// FIXME(fuwei): just one ringbuffer for one backend type containerBackend struct { backend Backend - outRing *ringbuff.RingBuff - errRing *ringbuff.RingBuff + outRing *ringbuffer.RingBuffer + errRing *ringbuffer.RingBuffer +} + +func (cb *containerBackend) drainRingBuffer() error { + for _, item := range []struct { + data []interface{} + w io.Writer + }{ + {data: cb.outRing.Drain(), w: cb.backend.Out()}, + {data: cb.errRing.Drain(), w: cb.backend.Err()}, + } { + for _, value := range item.data { + if b, ok := value.([]byte); ok { + if _, err := item.w.Write(b); err != nil { + return err + } + } + } + } + return nil } // subscribe be called in a groutine. -func subscribe(name, id string, ring *ringbuff.RingBuff, out io.Writer) { +func subscribe(name, id string, ring *ringbuffer.RingBuffer, out io.Writer) { logrus.Infof("start to subscribe io, backend: %s, id: %s", name, id) for { - value, closed := ring.Pop() // will block, if no element. + value, err := ring.Pop() + // break loop if the ringbuffer has been closed + if err != nil { + break + } if b, ok := value.([]byte); ok { if _, err := out.Write(b); err != nil { logrus.Errorf("failed to write containerio backend: %s, id: %s, %v", name, id, err) } } - - if value == nil && closed { - break - } } - logrus.Infof("finished to subscribe io, backend: %s, id: %s", name, id) } @@ -314,11 +355,15 @@ func (cio *ContainerIO) converge(name, id string, in io.Reader) { logrus.Errorf("failed to read from backend: %s, id: %s, %v", name, id, err) break } - cover := cio.ring.Push(data[:n]) + + cover, err := cio.ring.Push(data[:n]) + if err != nil { + break + } + if cover { logrus.Warnf("cover data, backend: %s, id: %s", name, id) } } - logrus.Infof("finished to converge io, backend: %s, id: %s", name, id) } diff --git a/pkg/ringbuff/ringbuff.go b/pkg/ringbuff/ringbuff.go deleted file mode 100644 index 73a5a9a3f..000000000 --- a/pkg/ringbuff/ringbuff.go +++ /dev/null @@ -1,159 +0,0 @@ -package ringbuff - -import ( - "sync" - "time" -) - -type ringNode struct { - next *ringNode - value interface{} -} - -// RingBuff implements a circular list. -type RingBuff struct { - sync.Mutex - cond *sync.Cond - pushPtr *ringNode - popPtr *ringNode - closed bool -} - -// New creates a RingBuff. -func New(n int) *RingBuff { - var ( - first *ringNode - last *ringNode - ) - - for i := 0; i < n; i++ { - p := &ringNode{} - - if last != nil { - last.next = p - } else { - first = p - } - - last = p - } - - last.next = first - - return &RingBuff{ - pushPtr: first, - popPtr: first, - cond: sync.NewCond(&sync.Mutex{}), - } -} - -// Push puts a elemnet into RingBuff and returns the status of covering or not. -func (r *RingBuff) Push(value interface{}) bool { - r.Lock() - defer r.Unlock() - - cover := false - - if r.closed { - return cover - } - - if r.pushPtr.value != nil { - cover = true - } - - // store value - r.pushPtr.value = value - - // wakes the "Pop" goroutine waiting on "cond". - if r.pushPtr == r.popPtr { - r.cond.Broadcast() - } - - // move pointer to next node. - r.pushPtr = r.pushPtr.next - - return cover -} - -// Pop returns a element, if RingBuff is empty, Pop() will block. -func (r *RingBuff) Pop() (interface{}, bool) { - r.Lock() - - if v := r.popPtr.value; v != nil { - isClosed := r.closed - - r.popPtr.value = nil // if we readed the node, must set nil to it. - // move to next node. - r.popPtr = r.popPtr.next - - // NOTICE: unlock - r.Unlock() - - return v, isClosed - } - - if r.closed { - isClosed := r.closed - - // NOTICE: unlock - r.Unlock() - - return nil, isClosed - } - - // block util there is one element at least. - r.cond.L.Lock() - for r.popPtr.value == nil && !r.closed { - // NOTICE: unlock, then to wait. if not call "Unlock", will block other's operation, eg: Push(). - r.Unlock() - - r.cond.Wait() - - // NOTICE: Wait() return, need to hold lock again. - r.Lock() - } - r.cond.L.Unlock() - - v := r.popPtr.value - isClosed := r.closed - r.popPtr.value = nil // if we readed the node, must set nil to it. - // move to next node. - r.popPtr = r.popPtr.next - - r.Unlock() - - return v, isClosed -} - -// Close closes the RingBuff. -func (r *RingBuff) Close() error { - // first try to wakeup - r.Lock() - if r.closed { - r.Unlock() - return nil - } - r.cond.Broadcast() - r.Unlock() - - for { - r.Lock() - if r.pushPtr == r.popPtr { - // unlock - r.Unlock() - break - } - - // unlock - r.Unlock() - time.Sleep(time.Millisecond * 10) - } - - r.Lock() - r.closed = true - r.cond.Broadcast() - r.Unlock() - - return nil -} diff --git a/pkg/ringbuff/ringbuff_test.go b/pkg/ringbuff/ringbuff_test.go deleted file mode 100644 index 759909e23..000000000 --- a/pkg/ringbuff/ringbuff_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package ringbuff - -import ( - "testing" - "time" -) - -func TestPushRewrite(t *testing.T) { - ring := New(10) - - for i := 0; i < 10; i++ { - if rewrite := ring.Push(i); rewrite { - t.Fatalf("the ring buffer's size is error") - } - } - - for i := 0; i < 10; i++ { - if rewrite := ring.Push(i); !rewrite { - t.Fatalf("don't rewrite the node") - } - } -} - -func TestPopBlock(t *testing.T) { - ring := New(10) - - wait := make(chan struct{}) - go func() { - ring.Pop() - close(wait) - }() - - select { - case <-time.After(time.Second * 5): - case <-wait: - t.Errorf("not to block") - } -} - -func TestPushPop(t *testing.T) { - ring := New(10) - - for i := 0; i < 10; i++ { - ring.Push(i) - } - - for i := 0; i < 10; i++ { - v, _ := ring.Pop() - if v.(int) != i { - t.Errorf("failed to pop, <%d, %d>", v.(int), i) - } - } - - go func() { - time.Sleep(time.Second * 5) - ring.Push(111) - }() - - v, _ := ring.Pop() - if v.(int) != 111 { - t.Errorf("failed to pop, <%d, %d>", v.(int), 111) - } -} diff --git a/pkg/ringbuffer/list.go b/pkg/ringbuffer/list.go new file mode 100644 index 000000000..64157cdb7 --- /dev/null +++ b/pkg/ringbuffer/list.go @@ -0,0 +1,64 @@ +package ringbuffer + +import ( + "sync" +) + +var elemPool = &sync.Pool{New: func() interface{} { return new(element) }} + +type element struct { + next, prev *element + val interface{} +} + +func (e *element) reset() { + e.next, e.prev = nil, nil + e.val = nil +} + +type queue struct { + root element + count int +} + +func newQueue() *queue { + q := new(queue) + + q.root.next = &q.root + q.root.prev = &q.root + q.count = 0 + return q +} + +func (q *queue) size() int { + return q.count +} + +func (q *queue) enqueue(val interface{}) { + elem := elemPool.Get().(*element) + elem.val = val + + at := q.root.prev + + at.next = elem + elem.prev = at + elem.next = &q.root + q.root.prev = elem + q.count++ +} + +func (q *queue) dequeue() interface{} { + if q.size() == 0 { + return nil + } + + at := q.root.next + at.prev.next = at.next + at.next.prev = at.prev + val := at.val + + at.reset() + elemPool.Put(at) + q.count-- + return val +} diff --git a/pkg/ringbuffer/ringbuff.go b/pkg/ringbuffer/ringbuff.go new file mode 100644 index 000000000..f0c3a91e4 --- /dev/null +++ b/pkg/ringbuffer/ringbuff.go @@ -0,0 +1,110 @@ +package ringbuffer + +import ( + "fmt" + "sync" +) + +// ErrClosed is used to indicate the ringbuffer has been closed. +var ErrClosed = fmt.Errorf("closed") + +const defaultSize = 1024 + +// RingBuffer implements a fixed-size buffer which will drop oldest data if full. +type RingBuffer struct { + mu sync.Mutex + wait *sync.Cond + + cap int + closed bool + q *queue +} + +// New creates new RingBuffer. +func New(cap int) *RingBuffer { + if cap <= 0 { + cap = defaultSize + } + + rb := &RingBuffer{ + cap: cap, + closed: false, + q: newQueue(), + } + rb.wait = sync.NewCond(&rb.mu) + return rb +} + +// Push pushes value into buffer and return whether it covers the oldest data +// or not. +func (rb *RingBuffer) Push(val interface{}) (bool, error) { + rb.mu.Lock() + defer rb.mu.Unlock() + + if rb.closed { + return false, ErrClosed + } + + if val == nil { + return false, nil + } + + // drop the oldest element if covered + covered := (rb.q.size() == rb.cap) + if covered { + rb.q.dequeue() + } + + rb.q.enqueue(val) + rb.wait.Broadcast() + return covered, nil +} + +// Pop pops the value in the buffer. +// +// NOTE: it returns ErrClosed if the buffer has been closed. +func (rb *RingBuffer) Pop() (interface{}, error) { + rb.mu.Lock() + for rb.q.size() == 0 && !rb.closed { + rb.wait.Wait() + } + + if rb.closed { + rb.mu.Unlock() + return nil, ErrClosed + } + + val := rb.q.dequeue() + rb.mu.Unlock() + return val, nil +} + +// Drain returns all the data in the buffer. +// +// NOTE: it can be used after closed to make sure the data have been consumed. +func (rb *RingBuffer) Drain() []interface{} { + rb.mu.Lock() + defer rb.mu.Unlock() + + size := rb.q.size() + vals := make([]interface{}, 0, size) + + for i := 0; i < size; i++ { + vals = append(vals, rb.q.dequeue()) + } + return vals +} + +// Close closes the ringbuffer. +func (rb *RingBuffer) Close() error { + rb.mu.Lock() + if rb.closed { + rb.mu.Unlock() + return nil + } + + rb.closed = true + rb.wait.Broadcast() + rb.mu.Unlock() + return nil +} diff --git a/pkg/ringbuffer/ringbuff_test.go b/pkg/ringbuffer/ringbuff_test.go new file mode 100644 index 000000000..ebf403596 --- /dev/null +++ b/pkg/ringbuffer/ringbuff_test.go @@ -0,0 +1,146 @@ +package ringbuffer + +import ( + "reflect" + "sync" + "testing" + "time" +) + +func TestPushNormal(t *testing.T) { + count := 5 + rb := New(count) + + // make the buffer full + for i := 0; i < count; i++ { + covered, err := rb.Push(i) + assertHelper(t, false, covered, "unexpected to drop data") + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) + } + + // continue to push new data + for i := 0; i < count; i++ { + covered, err := rb.Push(i + count) + assertHelper(t, true, covered, "expected to drop data, but not") + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) + } + + // check the buffer data + expectedDump := make([]interface{}, 0, count) + for i := 0; i < count; i++ { + expectedDump = append(expectedDump, count+i) + } + + got := rb.Drain() + assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got) +} + +func TestPopNormal(t *testing.T) { + count := 5 + rb := New(count) + + // make the buffer full + for i := 0; i < count; i++ { + covered, err := rb.Push(i) + assertHelper(t, false, covered, "unexpected to drop data") + assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err) + } + + for i := 0; i < count; i++ { + val, err := rb.Pop() + assertHelper(t, nil, err, "unexpected error during pop: %v", err) + assertHelper(t, i, val, "expected to have %v, but got %v", i, val) + } + + assertHelper(t, 0, rb.q.size(), "expected to have empty queue, but got %d size of queue", rb.q.size()) + assertHelper(t, &rb.q.root, rb.q.root.next, "when empty, expected queue.root.next equal to &queue.root") + assertHelper(t, &rb.q.root, rb.q.root.prev, "when empty, expected queue.root.prev equal to &queue.root") +} + +func TestPushAndPop(t *testing.T) { + count := 5 + rb := New(count) + + for _, v := range []int{1, 3, 5} { + rb.Push(v) + } + + { + // get 1 without error + val, err := rb.Pop() + assertHelper(t, val, 1, "expected to get 1, but got %v", val) + assertHelper(t, nil, err, "unexpected error during pop: %v", err) + } + + // push 4, [3, 5, 4] + rb.Push(4) + + { + // get 3 without error + val, err := rb.Pop() + assertHelper(t, val, 3, "expected to get 3, but got %v", val) + assertHelper(t, nil, err, "unexpected error during pop: %v", err) + } + + // push 2, [5, 4, 2] + rb.Push(2) + + { + // get 5 without error + val, err := rb.Pop() + assertHelper(t, val, 5, "expected to get 5, but got %v", val) + assertHelper(t, nil, err, "unexpected error during pop: %v", err) + } + + rb.Close() + + { + // get error if push data into closed buffer + _, err := rb.Push(0) + assertHelper(t, ErrClosed, err, + "expected to get error(%v) when push data into closed buffer, but got error(%v)", ErrClosed, err) + } + + // check the buffer data + expectedDump, got := []interface{}{4, 2}, rb.Drain() + assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got) +} + +func TestPopWaitWhenNotData(t *testing.T) { + count := 5 + rb := New(count) + + var ( + wg sync.WaitGroup + waitCh = make(chan struct{}, 1) + ) + + wg.Add(1) + go func() { + waitCh <- struct{}{} + + defer wg.Done() + _, rbErr := rb.Pop() + close(waitCh) + assertHelper(t, ErrClosed, rbErr, + "expected to get error(%v) when push data into closed buffer, but got error(%v)", ErrClosed, rbErr) + }() + + // make sure the goroutine has been scheduled + <-waitCh + select { + case <-time.After(1 * time.Second): + rb.Close() + wg.Wait() + case <-waitCh: + t.Errorf("expect to block if there is no data in buffer") + t.FailNow() + } +} + +func assertHelper(t *testing.T, expected, got interface{}, format string, args ...interface{}) { + if !reflect.DeepEqual(expected, got) { + t.Errorf(format, args...) + t.FailNow() + } +}