Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 66 additions & 21 deletions daemon/containerio/container_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"io"

"github.com/alibaba/pouch/pkg/ringbuff"
"github.com/alibaba/pouch/pkg/ringbuffer"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -96,7 +96,7 @@ type ContainerIO struct {
typ stdioType
closed bool
// The stdin of all backends should put into ring first.
ring *ringbuff.RingBuff
ring *ringbuffer.RingBuffer
}

func (cio *ContainerIO) add(opt *Option, typ stdioType, backends map[string]containerBackend) {
Expand Down Expand Up @@ -127,7 +127,7 @@ func create(opt *Option, typ stdioType, backends map[string]containerBackend) *C
}

if typ == stdin {
io.ring = ringbuff.New(10)
io.ring = ringbuffer.New(-1)
for _, b := range backends {
if b.backend.Name() == opt.stdinBackend {
io.backends = append(io.backends, b)
Expand Down Expand Up @@ -165,8 +165,8 @@ func createBackend(opt *Option) map[string]containerBackend {

backends[backend.Name()] = containerBackend{
backend: backend,
outRing: ringbuff.New(10),
errRing: ringbuff.New(10),
outRing: ringbuffer.New(-1),
errRing: ringbuffer.New(-1),
}
}

Expand Down Expand Up @@ -243,14 +243,26 @@ func (cio *ContainerIO) Write(data []byte) (int, error) {
switch cio.typ {
case stdout:
for _, b := range cio.backends {
if cover := b.outRing.Push(copyData); cover {
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id)
cover, err := b.outRing.Push(copyData)
// skip if it is closed ringbuffer
if err != nil {
continue
}

if cover {
logrus.Warnf("cover stdout data, backend: %s, id: %s", b.backend.Name(), cio.id)
}
}
case stderr:
for _, b := range cio.backends {
if cover := b.errRing.Push(copyData); cover {
logrus.Warnf("cover data, backend: %s, id: %s", b.backend.Name(), cio.id)
cover, err := b.errRing.Push(copyData)
// skip if it is closed ringbuffer
if err != nil {
continue
}

if cover {
logrus.Warnf("cover stderr data, backend: %s, id: %s", b.backend.Name(), cio.id)
}
}
}
Expand All @@ -260,45 +272,74 @@ func (cio *ContainerIO) Write(data []byte) (int, error) {

// Close implements the standard Close interface.
func (cio *ContainerIO) Close() error {
// FIXME(fuwei): stdin should be treated like stdout, stderr.
if cio.typ == stdin && cio.ring != nil {
// NOTE: let converge goroutine quit
cio.ring.Close()
}

for _, b := range cio.backends {
// we need to close ringbuf before close backend, because close ring will flush
// the remain data into backend.
name := b.backend.Name()

b.outRing.Close()
b.errRing.Close()
b.backend.Close()
if err := b.drainRingBuffer(); err != nil {
logrus.Warnf("failed to drain ringbuffer for backend: %s, id: %s", name, cio.id)
}

b.backend.Close()
logrus.Infof("close containerio backend: %s, id: %s", name, cio.id)
}

cio.closed = true
return nil
}

// FIXME(fuwei): just one ringbuffer for one backend
type containerBackend struct {
backend Backend
outRing *ringbuff.RingBuff
errRing *ringbuff.RingBuff
outRing *ringbuffer.RingBuffer
errRing *ringbuffer.RingBuffer
}

func (cb *containerBackend) drainRingBuffer() error {
for _, item := range []struct {
data []interface{}
w io.Writer
}{
{data: cb.outRing.Drain(), w: cb.backend.Out()},
{data: cb.errRing.Drain(), w: cb.backend.Err()},
} {
for _, value := range item.data {
if b, ok := value.([]byte); ok {
if _, err := item.w.Write(b); err != nil {
return err
}
}
}
}
return nil
}

// subscribe be called in a groutine.
func subscribe(name, id string, ring *ringbuff.RingBuff, out io.Writer) {
func subscribe(name, id string, ring *ringbuffer.RingBuffer, out io.Writer) {
logrus.Infof("start to subscribe io, backend: %s, id: %s", name, id)

for {
value, closed := ring.Pop() // will block, if no element.
value, err := ring.Pop()
// break loop if the ringbuffer has been closed
if err != nil {
break
}

if b, ok := value.([]byte); ok {
if _, err := out.Write(b); err != nil {
logrus.Errorf("failed to write containerio backend: %s, id: %s, %v", name, id, err)
}
}

if value == nil && closed {
break
}
}

logrus.Infof("finished to subscribe io, backend: %s, id: %s", name, id)
}

Expand All @@ -314,11 +355,15 @@ func (cio *ContainerIO) converge(name, id string, in io.Reader) {
logrus.Errorf("failed to read from backend: %s, id: %s, %v", name, id, err)
break
}
cover := cio.ring.Push(data[:n])

cover, err := cio.ring.Push(data[:n])
if err != nil {
break
}

if cover {
logrus.Warnf("cover data, backend: %s, id: %s", name, id)
}
}

logrus.Infof("finished to converge io, backend: %s, id: %s", name, id)
}
159 changes: 0 additions & 159 deletions pkg/ringbuff/ringbuff.go

This file was deleted.

63 changes: 0 additions & 63 deletions pkg/ringbuff/ringbuff_test.go

This file was deleted.

Loading