Skip to content

Commit 13536d0

Browse files
committed
feature: add timeout handler for execSync in cri part
Signed-off-by: Zou Rui <[email protected]>
1 parent 48575a9 commit 13536d0

File tree

6 files changed

+94
-76
lines changed

6 files changed

+94
-76
lines changed

cri/src/cri.go

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"io"
78
"os"
89
"path"
910
"path/filepath"
@@ -696,55 +697,74 @@ func (c *CriManager) UpdateContainerResources(ctx context.Context, r *runtime.Up
696697
// ExecSync executes a command in the container, and returns the stdout output.
697698
// If command exits with a non-zero exit code, an error is returned.
698699
func (c *CriManager) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
699-
// TODO: handle timeout.
700700
id := r.GetContainerId()
701701

702+
timeout := time.Duration(r.GetTimeout()) * time.Second
703+
var cancel context.CancelFunc
704+
if timeout == 0 {
705+
ctx, cancel = context.WithCancel(ctx)
706+
} else {
707+
ctx, cancel = context.WithTimeout(ctx, timeout)
708+
}
709+
defer cancel()
710+
702711
createConfig := &apitypes.ExecCreateConfig{
703712
Cmd: r.GetCmd(),
704713
}
705-
706714
execid, err := c.ContainerMgr.CreateExec(ctx, id, createConfig)
707715
if err != nil {
708716
return nil, fmt.Errorf("failed to create exec for container %q: %v", id, err)
709717
}
710718

711-
var output bytes.Buffer
712-
startConfig := &apitypes.ExecStartConfig{}
719+
reader, writer := io.Pipe()
720+
defer writer.Close()
721+
713722
attachConfig := &mgr.AttachConfig{
714723
Stdout: true,
715724
Stderr: true,
716-
MemBuffer: &output,
725+
Pipe: writer,
717726
MuxDisabled: true,
718727
}
719728

729+
startConfig := &apitypes.ExecStartConfig{}
730+
720731
err = c.ContainerMgr.StartExec(ctx, execid, startConfig, attachConfig)
721732
if err != nil {
722733
return nil, fmt.Errorf("failed to start exec for container %q: %v", id, err)
723734
}
724735

725-
var execConfig *mgr.ContainerExecConfig
726-
for {
727-
execConfig, err = c.ContainerMgr.GetExecConfig(ctx, execid)
736+
readWaitCh := make(chan error, 1)
737+
var recv bytes.Buffer
738+
go func() {
739+
defer reader.Close()
740+
_, err = io.Copy(&recv, reader)
741+
readWaitCh <- err
742+
}()
743+
744+
select {
745+
case <-ctx.Done():
746+
//TODO maybe stop the execution?
747+
return nil, fmt.Errorf("timeout %v exceeded", timeout)
748+
case readWaitErr := <-readWaitCh:
749+
if readWaitErr != nil {
750+
return nil, fmt.Errorf("failed to read data from the pipe: %v", err)
751+
}
752+
execConfig, err := c.ContainerMgr.GetExecConfig(ctx, execid)
728753
if err != nil {
729754
return nil, fmt.Errorf("failed to inspect exec for container %q: %v", id, err)
730755
}
731-
// Loop until exec finished.
732-
if !execConfig.Running {
733-
break
756+
757+
var stderr []byte
758+
if execConfig.Error != nil {
759+
stderr = []byte(execConfig.Error.Error())
734760
}
735-
time.Sleep(100 * time.Millisecond)
736-
}
737761

738-
var stderr []byte
739-
if execConfig.Error != nil {
740-
stderr = []byte(execConfig.Error.Error())
762+
return &runtime.ExecSyncResponse{
763+
Stdout: recv.Bytes(),
764+
Stderr: stderr,
765+
ExitCode: int32(execConfig.ExitCode),
766+
}, nil
741767
}
742-
743-
return &runtime.ExecSyncResponse{
744-
Stdout: output.Bytes(),
745-
Stderr: stderr,
746-
ExitCode: int32(execConfig.ExitCode),
747-
}, nil
748768
}
749769

750770
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.

daemon/containerio/mem_buffer.go

Lines changed: 0 additions & 42 deletions
This file was deleted.

daemon/containerio/options.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package containerio
22

33
import (
4-
"bytes"
4+
"io"
55
"net/http"
66
"os"
77

@@ -18,7 +18,7 @@ type Option struct {
1818
hijack http.Hijacker
1919
hijackUpgrade bool
2020
stdinBackend string
21-
memBuffer *bytes.Buffer
21+
pipe *io.PipeWriter
2222
streams *remotecommand.Streams
2323
criLogFile *os.File
2424
}
@@ -101,14 +101,14 @@ func WithStdinHijack() func(*Option) {
101101
}
102102
}
103103

104-
// WithMemBuffer specified the memory buffer backend.
105-
func WithMemBuffer(memBuffer *bytes.Buffer) func(*Option) {
104+
// WithPipe specified the pipe backend.
105+
func WithPipe(pipe *io.PipeWriter) func(*Option) {
106106
return func(opt *Option) {
107107
if opt.backends == nil {
108108
opt.backends = make(map[string]struct{})
109109
}
110-
opt.backends["memBuffer"] = struct{}{}
111-
opt.memBuffer = memBuffer
110+
opt.backends["pipe"] = struct{}{}
111+
opt.pipe = pipe
112112
}
113113
}
114114

daemon/containerio/pipe.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package containerio
2+
3+
import (
4+
"io"
5+
)
6+
7+
func init() {
8+
Register(func() Backend {
9+
return &pipe{}
10+
})
11+
}
12+
13+
type pipe struct {
14+
pipeWriter *io.PipeWriter
15+
}
16+
17+
func (p *pipe) Name() string {
18+
return "pipe"
19+
}
20+
21+
func (p *pipe) Init(opt *Option) error {
22+
p.pipeWriter = opt.pipe
23+
return nil
24+
}
25+
26+
func (p *pipe) Out() io.Writer {
27+
return p.pipeWriter
28+
}
29+
30+
func (p *pipe) In() io.Reader {
31+
return nil
32+
}
33+
34+
func (p *pipe) Err() io.Writer {
35+
return p.pipeWriter
36+
}
37+
38+
func (p *pipe) Close() error {
39+
return p.pipeWriter.Close()
40+
}

daemon/mgr/container.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,9 +1544,9 @@ func attachConfigToOptions(attach *AttachConfig) []func(*containerio.Option) {
15441544
if attach.Stdin {
15451545
options = append(options, containerio.WithStdinHijack())
15461546
}
1547-
} else if attach.MemBuffer != nil {
1548-
// Attaching using memory buffer.
1549-
options = append(options, containerio.WithMemBuffer(attach.MemBuffer))
1547+
} else if attach.Pipe != nil {
1548+
// Attaching using pipe.
1549+
options = append(options, containerio.WithPipe(attach.Pipe))
15501550
} else if attach.Streams != nil {
15511551
// Attaching using streams.
15521552
options = append(options, containerio.WithStreams(attach.Streams))

daemon/mgr/container_types.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package mgr
22

33
import (
4-
"bytes"
54
"fmt"
5+
"io"
66
"net/http"
77
"os"
88
"sync"
@@ -71,8 +71,8 @@ type AttachConfig struct {
7171
Hijack http.Hijacker
7272
Upgrade bool
7373

74-
// Attach using memory buffer.
75-
MemBuffer *bytes.Buffer
74+
// Attach using pipe.
75+
Pipe *io.PipeWriter
7676

7777
// Attach using streams.
7878
Streams *remotecommand.Streams

0 commit comments

Comments
 (0)