Skip to content

Commit adb8ce1

Browse files
authored
Merge pull request #1725 from Starnop/cri-stream-manager
refactor: Extract stream manager as the public part
2 parents f016ecd + 206c597 commit adb8ce1

10 files changed

Lines changed: 265 additions & 433 deletions

File tree

cri/stream/config.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package stream
2+
3+
import (
4+
"net/url"
5+
"time"
6+
7+
"github.com/alibaba/pouch/cri/stream/constant"
8+
)
9+
10+
// Keep these constants consistent with the peers in official package:
11+
// k8s.io/kubernetes/pkg/kubelet/server.
12+
const (
13+
// DefaultStreamIdleTimeout is the timeout for idle stream.
14+
DefaultStreamIdleTimeout = 4 * time.Hour
15+
16+
// DefaultStreamCreationTimeout is the timeout for stream creation.
17+
DefaultStreamCreationTimeout = 30 * time.Second
18+
)
19+
20+
// TODO: StreamProtocolV2Name, StreamProtocolV3Name, StreamProtocolV4Name support.
21+
22+
// SupportedStreamingProtocols is the streaming protocols which server supports.
23+
var SupportedStreamingProtocols = []string{constant.StreamProtocolV1Name, constant.StreamProtocolV2Name}
24+
25+
// SupportedPortForwardProtocols is the portforward protocols which server supports.
26+
var SupportedPortForwardProtocols = []string{constant.PortForwardProtocolV1Name}
27+
28+
// Config defines the options used for running the stream server.
29+
type Config struct {
30+
// Address is the addr:port address the server will listen on.
31+
Address string
32+
33+
// BaseURL is the optional base URL for constructing streaming URLs.
34+
// If empty, the baseURL will be constructed from the serve address.
35+
BaseURL *url.URL
36+
37+
// StreamIdleTimeout is how long to leave idle connections open for.
38+
StreamIdleTimeout time.Duration
39+
// StreamCreationTimeout is how long to wait for clients to create streams. Only used for SPDY streaming.
40+
StreamCreationTimeout time.Duration
41+
42+
// SupportedStreamingProtocols is the streaming protocols which server supports.
43+
SupportedRemoteCommandProtocols []string
44+
// SupportedPortForwardProtocol is the portforward protocols which server supports.
45+
SupportedPortForwardProtocols []string
46+
}
47+
48+
// DefaultConfig provides default values for server Config.
49+
var DefaultConfig = Config{
50+
StreamIdleTimeout: DefaultStreamIdleTimeout,
51+
StreamCreationTimeout: DefaultStreamCreationTimeout,
52+
SupportedRemoteCommandProtocols: SupportedStreamingProtocols,
53+
SupportedPortForwardProtocols: SupportedPortForwardProtocols,
54+
}

cri/stream/portforward/httpstream.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package portforward
22

33
import (
4+
gocontext "context"
45
"fmt"
56
"net/http"
67
"strconv"
@@ -48,7 +49,7 @@ func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream,
4849
}
4950
}
5051

51-
func handleHTTPStreams(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout, streamCreationTimeout time.Duration, supportedPortForwardProtocols []string) error {
52+
func handleHTTPStreams(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout, streamCreationTimeout time.Duration, supportedPortForwardProtocols []string) error {
5253
_, err := httpstream.Handshake(w, req, supportedPortForwardProtocols)
5354
// Negotiated protocol isn't currently used server side, but could be in the future.
5455
if err != nil {
@@ -76,7 +77,7 @@ func handleHTTPStreams(w http.ResponseWriter, req *http.Request, portForwarder P
7677
pod: podName,
7778
forwarder: portForwarder,
7879
}
79-
h.run()
80+
h.run(goctx)
8081

8182
return nil
8283
}
@@ -150,7 +151,7 @@ func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
150151
// run is the main loop for the httpStreamHandler. It process new streams,
151152
// invoking portForward for each complete stream pair. The loop exits
152153
// when the httpstream.Connection is closed.
153-
func (h *httpStreamHandler) run() {
154+
func (h *httpStreamHandler) run(goctx gocontext.Context) {
154155
logrus.Infof("(conn=%p) waiting for port forward streams", h.conn)
155156

156157
for {
@@ -171,23 +172,23 @@ func (h *httpStreamHandler) run() {
171172
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
172173
p.printError(msg)
173174
} else if complete {
174-
go h.portForward(p)
175+
go h.portForward(goctx, p)
175176
}
176177
}
177178
}
178179
}
179180

180181
// portForward invokes the httpStreamHandler's forwarder.PortForward
181182
// function for the given stream pair.
182-
func (h *httpStreamHandler) portForward(p *httpStreamPair) {
183+
func (h *httpStreamHandler) portForward(goctx gocontext.Context, p *httpStreamPair) {
183184
defer p.dataStream.Close()
184185
defer p.errorStream.Close()
185186

186187
portString := p.dataStream.Headers().Get(constant.PortHeader)
187188
port, _ := strconv.ParseInt(portString, 10, 32)
188189

189190
logrus.Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
190-
err := h.forwarder.PortForward(h.pod, int32(port), p.dataStream)
191+
err := h.forwarder.PortForward(goctx, h.pod, int32(port), p.dataStream)
191192
logrus.Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
192193

193194
if err != nil {

cri/stream/portforward/portforward.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package portforward
22

33
import (
4+
gocontext "context"
45
"io"
56
"net/http"
67
"time"
@@ -12,17 +13,17 @@ import (
1213
// in a pod.
1314
type PortForwarder interface {
1415
// PortForwarder copies data between a data stream and a port in a pod.
15-
PortForward(name string, port int32, stream io.ReadWriteCloser) error
16+
PortForward(goctx gocontext.Context, name string, port int32, stream io.ReadWriteCloser) error
1617
}
1718

1819
// ServePortForward handles a port forwarding request. A single request is
1920
// kept alive as long as the client is still alive and the connection has not
2021
// been timed out due to idleness. This function handles multiple forwarded
2122
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
2223
// handled by a single invocation of ServePortForward.
23-
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
24+
func ServePortForward(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
2425
// TODO: support web socket stream.
25-
err := handleHTTPStreams(w, req, portForwarder, podName, idleTimeout, streamCreationTimeout, supportedProtocols)
26+
err := handleHTTPStreams(goctx, w, req, portForwarder, podName, idleTimeout, streamCreationTimeout, supportedProtocols)
2627
if err != nil {
2728
logrus.Errorf("failed to serve port forward: %v", err)
2829
return

cri/stream/remotecommand/attach.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package remotecommand
22

33
import (
4+
gocontext "context"
45
"fmt"
56
"net/http"
67
"time"
@@ -9,20 +10,20 @@ import (
910
// Attacher knows how to attach a running container in a pod.
1011
type Attacher interface {
1112
// Attach attaches to the running container in the pod.
12-
Attach(containerID string, streamOpts *Options, streams *Streams) error
13+
Attach(goctx gocontext.Context, containerID string, streamOpts *Options, streams *Streams) error
1314
}
1415

1516
// ServeAttach handles requests to attach to a container. After creating/receiving the required
1617
// streams, it delegates the actual attaching to attacher.
17-
func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
18+
func ServeAttach(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, attacher Attacher, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
1819
ctx, ok := createStreams(w, req, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
1920
if !ok {
2021
// Error is handled by createStreams.
2122
return
2223
}
2324
defer ctx.conn.Close()
2425

25-
err := attacher.Attach(container, streamOpts, &Streams{
26+
err := attacher.Attach(goctx, container, streamOpts, &Streams{
2627
StreamCh: make(chan struct{}, 1),
2728
StdinStream: ctx.stdinStream,
2829
StdoutStream: ctx.stdoutStream,

cri/stream/remotecommand/exec.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package remotecommand
22

33
import (
4+
gocontext "context"
45
"fmt"
56
"net/http"
67
"time"
@@ -9,21 +10,21 @@ import (
910
// Executor knows how to execute a command in a container of the pod.
1011
type Executor interface {
1112
// Exec executes a command in a container of the pod.
12-
Exec(containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
13+
Exec(goctx gocontext.Context, containerID string, cmd []string, streamOpts *Options, streams *Streams) (uint32, error)
1314
}
1415

1516
// ServeExec handles requests to execute a command in a container. After
1617
// creating/receiving the required streams, it delegates the actual execution
1718
// to the executor.
18-
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, container string, cmd []string, streamOpts *Options, supportedProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
19+
func ServeExec(goctx gocontext.Context, w http.ResponseWriter, req *http.Request, executor Executor, container string, cmd []string, streamOpts *Options, supportedProtocols []string, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
1920
ctx, ok := createStreams(w, req, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
2021
if !ok {
2122
// Error is handled by createStreams.
2223
return
2324
}
2425
defer ctx.conn.Close()
2526

26-
exitCode, err := executor.Exec(container, cmd, streamOpts, &Streams{
27+
exitCode, err := executor.Exec(goctx, container, cmd, streamOpts, &Streams{
2728
StdinStream: ctx.stdinStream,
2829
StdoutStream: ctx.stdoutStream,
2930
StderrStream: ctx.stderrStream,

cri/stream/runtime.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package stream
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"os/exec"
9+
"strings"
10+
"time"
11+
12+
apitypes "github.com/alibaba/pouch/apis/types"
13+
"github.com/alibaba/pouch/cri/stream/remotecommand"
14+
"github.com/alibaba/pouch/daemon/mgr"
15+
16+
"github.com/sirupsen/logrus"
17+
)
18+
19+
// Runtime is the interface to execute the commands and provide the streams.
20+
type Runtime interface {
21+
// Exec executes the command in pod.
22+
Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error)
23+
24+
// Attach attaches to pod.
25+
Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error
26+
27+
// PortForward forward port to pod.
28+
PortForward(ctx context.Context, name string, port int32, stream io.ReadWriteCloser) error
29+
}
30+
31+
type streamRuntime struct {
32+
containerMgr mgr.ContainerMgr
33+
}
34+
35+
// NewStreamRuntime creates a brand new stream runtime.
36+
func NewStreamRuntime(ctrMgr mgr.ContainerMgr) Runtime {
37+
return &streamRuntime{containerMgr: ctrMgr}
38+
}
39+
40+
// Exec executes a command inside the container.
41+
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) (uint32, error) {
42+
createConfig := &apitypes.ExecCreateConfig{
43+
Cmd: cmd,
44+
AttachStdin: streamOpts.Stdin,
45+
AttachStdout: streamOpts.Stdout,
46+
AttachStderr: streamOpts.Stderr,
47+
Tty: streamOpts.TTY,
48+
}
49+
50+
execid, err := s.containerMgr.CreateExec(ctx, containerID, createConfig)
51+
if err != nil {
52+
return 0, fmt.Errorf("failed to create exec for container %q: %v", containerID, err)
53+
}
54+
55+
attachConfig := &mgr.AttachConfig{
56+
Streams: streams,
57+
MuxDisabled: true,
58+
}
59+
60+
err = s.containerMgr.StartExec(ctx, execid, attachConfig)
61+
if err != nil {
62+
return 0, fmt.Errorf("failed to start exec for container %q: %v", containerID, err)
63+
}
64+
65+
// TODO Find a better way instead of the dead loop
66+
var ei *apitypes.ContainerExecInspect
67+
for {
68+
ei, err = s.containerMgr.InspectExec(ctx, execid)
69+
if err != nil {
70+
return 0, fmt.Errorf("failed to inspect exec for container %q: %v", containerID, err)
71+
}
72+
// Loop until exec finished.
73+
if !ei.Running {
74+
break
75+
}
76+
time.Sleep(100 * time.Millisecond)
77+
}
78+
79+
return uint32(ei.ExitCode), nil
80+
}
81+
82+
// Attach attaches to a running container.
83+
func (s *streamRuntime) Attach(ctx context.Context, containerID string, streamOpts *remotecommand.Options, streams *remotecommand.Streams) error {
84+
attachConfig := &mgr.AttachConfig{
85+
Stdin: streamOpts.Stdin,
86+
Stdout: streamOpts.Stdout,
87+
Stderr: streamOpts.Stderr,
88+
Streams: streams,
89+
}
90+
91+
err := s.containerMgr.Attach(ctx, containerID, attachConfig)
92+
if err != nil {
93+
return fmt.Errorf("failed to attach to container %q: %v", containerID, err)
94+
}
95+
96+
<-streams.StreamCh
97+
98+
return nil
99+
}
100+
101+
// PortForward forwards ports from a PodSandbox.
102+
func (s *streamRuntime) PortForward(ctx context.Context, id string, port int32, stream io.ReadWriteCloser) error {
103+
sandbox, err := s.containerMgr.Get(ctx, id)
104+
if err != nil {
105+
return fmt.Errorf("failed to get metadata of sandbox %q: %v", id, err)
106+
}
107+
pid := sandbox.State.Pid
108+
109+
socat, err := exec.LookPath("socat")
110+
if err != nil {
111+
return fmt.Errorf("failed to find socat: %v", err)
112+
}
113+
114+
// Check following links for meaning of the options:
115+
// * socat: https://linux.die.net/man/1/socat
116+
// * nsenter: http://man7.org/linux/man-pages/man1/nsenter.1.html
117+
args := []string{"-t", fmt.Sprintf("%d", pid), "-n", socat,
118+
"-", fmt.Sprintf("TCP4:localhost:%d", port)}
119+
120+
nsenter, err := exec.LookPath("nsenter")
121+
if err != nil {
122+
return fmt.Errorf("failed to find nsenter: %v", err)
123+
}
124+
125+
logrus.Infof("Executing port forwarding command: %s %s", nsenter, strings.Join(args, " "))
126+
127+
cmd := exec.Command(nsenter, args...)
128+
cmd.Stdout = stream
129+
130+
stderr := new(bytes.Buffer)
131+
cmd.Stderr = stderr
132+
133+
// If we use Stdin, command.Run() won't return until the goroutine that's copying
134+
// from stream finishes. Unfortunately, if you have a client like telnet connected
135+
// via port forwarding, as long as the user's telnet client is connected to the user's
136+
// local listener that port forwarding sets up, the telnet session never exits. This
137+
// means that even if socat has finished running, command.Run() won't ever return
138+
// (because the client still has the connection and stream open).
139+
//
140+
// The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
141+
// when the command (socat) exits.
142+
in, err := cmd.StdinPipe()
143+
if err != nil {
144+
return fmt.Errorf("failed to create stdin pipe: %v", err)
145+
}
146+
go func() {
147+
if _, err := io.Copy(in, stream); err != nil {
148+
logrus.Errorf("failed to copy port forward input for %q port %d: %v", id, port, err)
149+
}
150+
in.Close()
151+
logrus.Infof("finish copy port forward input for %q port %d: %v", id, port, err)
152+
}()
153+
154+
err = cmd.Run()
155+
if err != nil {
156+
return fmt.Errorf("nsenter command returns error: %v, stderr: %q", err, stderr.String())
157+
}
158+
159+
logrus.Infof("finish port forwarding for %q port %d", id, port)
160+
161+
return nil
162+
}

0 commit comments

Comments
 (0)