diff --git a/bill-of-materials.json b/bill-of-materials.json index db45756dc769..c15e0596ddf8 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -368,15 +368,6 @@ } ] }, - { - "project": "github.com/soheilhy/cmux", - "licenses": [ - { - "type": "Apache License 2.0", - "confidence": 1 - } - ] - }, { "project": "github.com/spf13/cobra", "licenses": [ diff --git a/client/pkg/transport/listener.go b/client/pkg/transport/listener.go index 5e0e13e25a73..c562b107a278 100644 --- a/client/pkg/transport/listener.go +++ b/client/pkg/transport/listener.go @@ -586,11 +586,19 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) { return cfg, nil } -// IsClosedConnError returns true if the error is from closing listener, cmux. +// IsClosedConnError returns true if the error is from closing listener. // copied from golang.org/x/net/http2/http2.go func IsClosedConnError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, net.ErrClosed) { + return true + } // 'use of closed network connection' (Go <=1.8) // 'use of closed file or network connection' (Go >1.8, internal/poll.ErrClosing) - // 'mux: listener closed' (cmux.ErrListenerClosed) - return err != nil && strings.Contains(err.Error(), "closed") + if strings.Contains(err.Error(), "closed") { + return true + } + return false } diff --git a/go.mod b/go.mod index 33784a434c31..857bb921b31a 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,6 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/sirupsen/logrus v1.8.1 // indirect - github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect diff --git a/go.sum b/go.sum index f4132506e504..3454e9af5f02 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -386,7 +384,6 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= diff --git a/pkg/connmux/connmux.go b/pkg/connmux/connmux.go new file mode 100644 index 000000000000..e6d2a3e9769c --- /dev/null +++ b/pkg/connmux/connmux.go @@ -0,0 +1,346 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connmux + +import ( + "bytes" + "crypto/tls" + "io" + "log" + "net" + "strings" + "sync" + "time" + + "go.uber.org/zap" + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" +) + +const ( + // gracefullShutdownDuration is the time to wait before closing the listeners + gracefullShutdownDuration = 200 * time.Millisecond + // readDeadlineTimeout is the maximum time to wait for a read operation + readDeadlineTimeout = 1 * time.Second + // maxConnRead read ahead up to 512 bytes to identify if the connection is GRPC + maxConnRead = 512 +) + +// ConnMux can multiple multiple HTTP and GRPC connections in the same address and port. +// It is able to multiplex connections with and without TLS. +// ConnMux can only forward connections to one HTTP or one GRPC server. +// If only one of those is available it will forward all connections directly. +type ConnMux struct { + lg *zap.Logger + root net.Listener + donec chan struct{} + + secure bool // serve TLS + insecure bool // serve insecure + tlsConfig *tls.Config + + mu sync.Mutex + http *muxListener + grpc *muxListener +} + +type Config struct { + Logger *zap.Logger + Listener net.Listener + Secure bool + Insecure bool + TLSConfig *tls.Config +} + +// New creates a new ConnMux. +func New(cfg Config) *ConnMux { + // defaulting + if cfg.Logger == nil { + cfg.Logger = zap.NewNop() + } + + return &ConnMux{ + lg: cfg.Logger, + root: cfg.Listener, + insecure: cfg.Insecure, + secure: cfg.Secure, + tlsConfig: cfg.TLSConfig, + donec: make(chan struct{}), + } +} + +// Serve starts serving connections +func (c *ConnMux) Serve() error { + for { + conn, err := c.root.Accept() + if err != nil { + c.lg.Error("connection accept error", zap.Error(err)) + return c.Close() + } + go c.serve(conn) + } +} + +func (c *ConnMux) serve(conn net.Conn) { + // avoid to get blocked in any read operation + conn.SetDeadline(time.Now().Add(readDeadlineTimeout)) + defer conn.SetReadDeadline(time.Time{}) + + buf := make([]byte, maxConnRead) + buffConn := newBufferedConn(conn) + // Check if is TLS or plain TCP + n, err := buffConn.sniffReader().Read(buf) + if err != nil && err != io.EOF { + // exit since it will panic trying to detect if is TLS + c.lg.Error("error reading from the connection", zap.Error(err)) + conn.Close() + return + } + r := bytes.NewReader(buf[:n]) + c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("read size", n), zap.String("buffer content", string(buf[:n]))) + // Check if is TLS or plain TCP + isTLS := buf[0] == 0x16 + if isTLS { + if !c.secure { + c.lg.Error("secure connections not enabled") + conn.Close() + return + } + // Establish the TLS connection + tlsConn := tls.Server(buffConn, c.tlsConfig) + err = tlsConn.Handshake() + if err != nil { + c.lg.Error("error establishing the TLS connection", zap.Error(err)) + conn.Close() + return + } + // It is a "new" connection obtained after the handshake so we have to do another read + proxiedConn := newBufferedConn(tlsConn) + isGRPC := isGRPCConnection(c.lg, proxiedConn, proxiedConn.sniffReader()) + c.forward(isGRPC, proxiedConn) + } else { + if !c.insecure { + c.lg.Error("insecure connections not enabled") + conn.Close() + return + } + isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(r, buffConn.sniffReader())) + c.forward(isGRPC, buffConn) + } + +} + +func (c *ConnMux) forward(isGRPC bool, conn net.Conn) { + c.mu.Lock() + defer c.mu.Unlock() + + if !isGRPC && c.http != nil { + c.lg.Debug("forwarding connection to the HTTP backend", zap.String("remote address", conn.RemoteAddr().String())) + select { + case c.http.connc <- conn: + case <-c.donec: + } + return + } + + if isGRPC && c.grpc != nil { + c.lg.Debug("forwarding connection to the GRPC backend", zap.String("remote address", conn.RemoteAddr().String())) + select { + case c.grpc.connc <- conn: + case <-c.donec: + } + return + } + + log.Println("unknown connection") + conn.Close() +} + +// Close closes the listeners +func (c *ConnMux) Close() error { + time.Sleep(gracefullShutdownDuration) + c.closeDoneChans() + return c.root.Close() +} + +func (c *ConnMux) closeDoneChans() { + select { + case <-c.donec: + default: + close(c.donec) + } +} + +// muxListener is the listener exposed to the HTTP and GRPC servers +// The root listener Accept() method is overriden +// The multiplexed servers have access to the Close() and Address() +// methods on the root listener. +type muxListener struct { + net.Listener + connc chan net.Conn + donec chan struct{} +} + +var _ net.Listener = (*muxListener)(nil) + +func (l muxListener) Accept() (net.Conn, error) { + select { + case c, ok := <-l.connc: + if !ok { + return nil, net.ErrClosed + } + return c, nil + case <-l.donec: + return nil, net.ErrClosed + } +} + +// HTTPListener returns a net.Listener that will receive http requests +func (c *ConnMux) HTTPListener() net.Listener { + c.mu.Lock() + defer c.mu.Unlock() + if c.http == nil { + c.http = &muxListener{c.root, make(chan net.Conn), c.donec} + } + return c.http +} + +// GRPCListener returns a net.Listener that will receive grpc requests +func (c *ConnMux) GRPCListener() net.Listener { + c.mu.Lock() + defer c.mu.Unlock() + if c.grpc == nil { + c.grpc = &muxListener{c.root, make(chan net.Conn), c.donec} + } + return c.grpc +} + +// bufferedConn allows to peek in the buffer of the connection +// without advancing the reader. +type bufferedConn struct { + net.Conn + buf *bytes.Buffer +} + +var _ net.Conn = &bufferedConn{} + +func newBufferedConn(c net.Conn) *bufferedConn { + return &bufferedConn{ + Conn: c, + buf: new(bytes.Buffer), + } +} + +func (b *bufferedConn) Read(p []byte) (int, error) { + if b.buf == nil { + return b.Conn.Read(p) + } + n := b.buf.Len() + if n == 0 { + b.buf = nil + return b.Conn.Read(p) + } + if n < len(p) { + p = p[:n] + } + return b.buf.Read(p) +} + +func (b *bufferedConn) Close() error { + if b.buf != nil { + b.buf = nil + } + return b.Conn.Close() +} + +func (b *bufferedConn) sniffReader() io.Reader { + return io.TeeReader(b.Conn, b.buf) +} + +func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { + // check the http2 client preface + buf := make([]byte, len(http2.ClientPreface)) + n, err := r.Read(buf) + if err != nil || n < len(http2.ClientPreface) { + return false + } + + if !bytes.Equal(buf, []byte(http2.ClientPreface)) { + lg.Debug("not found http2 client preface", zap.String("preface", string(buf))) + return false + } + lg.Debug("found http2 client preface", zap.Int("bytes read", n)) + + framer := http2.NewFramer(w, r) + // identify GRPC connections matching match headers names or values defined + // on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + done := false + isGRPC := false + // use the default value for the maxDynamixTable size + // https://pkg.go.dev/golang.org/x/net/http2 + // "If zero, the default value of 4096 is used." + hdec := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of 415 (Unsupported Media Type). + // This will prevent other HTTP/2 clients from interpreting a gRPC error response, which uses status 200 (OK), as successful. + lg.Debug("found header", zap.String("name", hf.Name), zap.String("value", hf.Value)) + if hf.Name == "content-type" { + isGRPC = strings.HasPrefix(hf.Value, "application/grpc") + done = true + } + if hf.Name == ":method" && hf.Value != "POST" { + isGRPC = false + done = true + } + }) + + lg.Debug("reading frames") + for !done { + f, err := framer.ReadFrame() + if err != nil { + lg.Debug("error reading frame", zap.Error(err)) + return false + } + switch f := f.(type) { + // The SETTINGS frames received from a peer as part of the connection + // preface MUST be acknowledged (see Section 6.5.3) after sending the + // connection preface. + case *http2.SettingsFrame: + if f.IsAck() { + lg.Debug("found setting ACK frame") + continue + } + // only write the settings frame if length is zero + // otherwise when passed to an http2 server with prior knowledge + // it will send a PROTOCOL_ERROR + if f.Length == 0 { + lg.Debug("write http2 settings") + err = framer.WriteSettings() + if err != nil { + lg.Debug("error sending setting frame", zap.Error(err)) + return false + } + } + case *http2.ContinuationFrame: + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { + return false + } + case *http2.HeadersFrame: + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { + return false + } + } + } + return isGRPC +} diff --git a/pkg/connmux/connmux_test.go b/pkg/connmux/connmux_test.go new file mode 100644 index 000000000000..88c22bfdb964 --- /dev/null +++ b/pkg/connmux/connmux_test.go @@ -0,0 +1,291 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connmux + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "io/ioutil" + "net" + "strings" + "sync" + "time" + + "net/http" + "net/url" + "testing" + + "go.uber.org/zap" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +func newTestConnMux(t *testing.T) *ConnMux { + certFile := "../../tests/fixtures/server.crt" + keyFile := "../../tests/fixtures/server.key.insecure" + + tcpListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("error creating listener: %v", err) + } + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + t.Errorf("failed to load TLS keys: %v", err) + } + connMux := New(Config{ + Logger: zap.NewExample(), + Listener: tcpListener, + Insecure: true, + Secure: true, + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2", "http/1.1"}, + }, + }) + + t.Logf("Listening on address %s", connMux.root.Addr().String()) + return connMux +} + +func TestConnMux(t *testing.T) { + connMux := newTestConnMux(t) + // HTTP server + httpL := connMux.HTTPListener() + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "HTTP response Proto: %s %v\n", r.Proto, r.Header) + }) + h2s := &http2.Server{} + httpServer := &http.Server{ + Handler: h2c.NewHandler(mux, h2s), + } + go httpServer.Serve(httpL) + defer httpServer.Close() + + // GRPC Server + grpcServer := grpc.NewServer() + testpb.RegisterTestServiceServer(grpcServer, &dummyStubServer{body: []byte("grpc response")}) + + grpcL := connMux.GRPCListener() + go grpcServer.Serve(grpcL) + + go connMux.Serve() + defer connMux.Close() + + destHost := connMux.root.Addr().String() + destHTTP := url.URL{Scheme: "http", Host: destHost} + destHTTPS := url.URL{Scheme: "https", Host: destHost} + + // HTTP plain + t.Run("Test HTTP1 without TLS works", func(t *testing.T) { + //t.Skip() + client := &http.Client{} + verifyHTTPRequest(t, client, destHTTP.String()) + }) + + // HTTPS + t.Run("Test HTTP1 with TLS works", func(t *testing.T) { + client := &http.Client{} + client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }} + verifyHTTPRequest(t, client, destHTTPS.String()) + }) + + // HTTP2 plain + t.Run("Test HTTP2 without TLS works", func(t *testing.T) { + client := &http.Client{} + client.Transport = &http2.Transport{ + AllowHTTP: true, + DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(netw, addr) + }} + verifyHTTPRequest(t, client, destHTTP.String()) + }) + + // HTTP2 TLS + t.Run("Test HTTP2 with TLS works", func(t *testing.T) { + client := &http.Client{} + client.Transport = &http2.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{http2.NextProtoTLS}, + }, + } + verifyHTTPRequest(t, client, destHTTPS.String()) + }) + + // Set up a connection to the server. + + t.Run("Test GRPC with TLS works", func(t *testing.T) { + conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("did not connect: %v", err) + } + defer conn.Close() + verifyGRPCRequest(t, conn) + }) +} + +// TestConnMuxConcurrency tests that HTTP and GRPC connections work repet +func TestConnMuxConcurrency(t *testing.T) { + connMux := newTestConnMux(t) + // HTTP server + httpL := connMux.HTTPListener() + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "HTTP response Proto: %s %v\n", r.Proto, r.Header) + }) + h2s := &http2.Server{} + httpServer := &http.Server{ + Handler: h2c.NewHandler(mux, h2s), + } + go httpServer.Serve(httpL) + + // GRPC Server + grpcServer := grpc.NewServer() + testpb.RegisterTestServiceServer(grpcServer, &dummyStubServer{body: []byte("grpc response")}) + + grpcL := connMux.GRPCListener() + go grpcServer.Serve(grpcL) + + go connMux.Serve() + defer connMux.Close() + + destHost := connMux.root.Addr().String() + destHTTP := url.URL{Scheme: "http", Host: destHost} + destHTTPS := url.URL{Scheme: "https", Host: destHost} + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(5) + go func() { + // HTTP plain + defer wg.Done() + t.Logf("Test HTTP1 without TLS works") + client := &http.Client{} + verifyHTTPRequest(t, client, destHTTP.String()) + }() + + go func() { + // HTTPS + defer wg.Done() + t.Logf("Test HTTP1 with TLS works") + client := &http.Client{} + client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }} + verifyHTTPRequest(t, client, destHTTPS.String()) + }() + + go func() { + // HTTP2 plain + defer wg.Done() + t.Logf("Test HTTP2 without TLS works") + client := &http.Client{} + client.Transport = &http2.Transport{ + AllowHTTP: true, + DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(netw, addr) + }} + verifyHTTPRequest(t, client, destHTTP.String()) + }() + + go func() { + // HTTP2 TLS + defer wg.Done() + t.Logf("Test HTTP2 with TLS works") + client := &http.Client{} + client.Transport = &http2.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{http2.NextProtoTLS}, + }, + } + verifyHTTPRequest(t, client, destHTTPS.String()) + }() + + go func() { + // Set up a connection to the server. + defer wg.Done() + t.Logf("Test GRPC works") + conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("did not connect: %v", err) + } + defer conn.Close() + verifyGRPCRequest(t, conn) + }() + } + wg.Wait() + +} + +func verifyHTTPRequest(t *testing.T, client *http.Client, destination string) { + request, err := http.NewRequest("Get", destination, nil) + if err != nil { + t.Error(err) + } + response, err := client.Do(request) + if err != nil { + t.Fatal(err) + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + t.Error(err) + } + t.Logf("received body: %s", string(body)) + if !strings.Contains(string(body), "HTTP response Proto") { + t.Errorf("expected \"HTTP response Proto\" , got %s", body) + } + +} + +func verifyGRPCRequest(t *testing.T, conn grpc.ClientConnInterface) { + // Contact the server and print out its response. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c := testpb.NewTestServiceClient(conn) + + c.HalfDuplexCall(ctx) + resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) + if err != nil { + t.Error("failed to invoke rpc to foo (e1)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), []byte("grpc response")) { + t.Errorf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody()) + } + +} + +type dummyStubServer struct { + testpb.UnimplementedTestServiceServer + body []byte +} + +func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Type: testpb.PayloadType_COMPRESSABLE, + Body: d.body, + }, + }, nil +} diff --git a/pkg/connmux/doc.go b/pkg/connmux/doc.go new file mode 100644 index 000000000000..476450865dc4 --- /dev/null +++ b/pkg/connmux/doc.go @@ -0,0 +1,21 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package connmux allows to multiplex http and grpc requests connection, +// with and without TLS, using the same port and address. +// The package is inspired in the existing project https://github.com/soheilhy/cmux +// adding some features specific for the etcd use case. +// This code is not mean to be used outside of etcd. + +package connmux diff --git a/pkg/go.mod b/pkg/go.mod index 2e59ed4bbbec..1ffca9a46163 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,6 +14,8 @@ require ( google.golang.org/grpc v1.51.0 ) +require golang.org/x/net v0.8.0 + require ( github.com/benbjohnson/clock v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 48f5cdb6b38b..8ce31b1a6ac1 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -44,7 +44,6 @@ import ( "go.etcd.io/etcd/server/v3/verify" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/soheilhy/cmux" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -567,19 +566,17 @@ func (e *Etcd) servePeers() (err error) { for _, p := range e.Peers { u := p.Listener.Addr().String() - m := cmux.New(p.Listener) srv := &http.Server{ Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), // do not log user error } - go srv.Serve(m.Match(cmux.Any())) p.serve = func() error { e.cfg.logger.Info( - "cmux::serve", + "start::serve", zap.String("address", u), ) - return m.Serve() + return srv.Serve(p.Listener) } p.close = func(ctx context.Context) error { // gracefully shutdown http.Server @@ -594,7 +591,7 @@ func (e *Etcd) servePeers() (err error) { "stopped serving peer traffic", zap.String("address", u), ) - m.Close() + srv.Close() return nil } } diff --git a/server/embed/serve.go b/server/embed/serve.go index af9f9fdacbb7..63e70e34f84f 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -26,6 +26,7 @@ import ( etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw" "go.etcd.io/etcd/client/pkg/v3/transport" + connmux "go.etcd.io/etcd/pkg/v3/connmux" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" "go.etcd.io/etcd/server/v3/config" @@ -40,10 +41,10 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" gw "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" "go.uber.org/zap" "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "golang.org/x/net/trace" "google.golang.org/grpc" ) @@ -111,28 +112,19 @@ func (sctx *serveCtx) serve( sctx.lg.Info("ready to serve client requests") - m := cmux.New(sctx.l) - var server func() error onlyGRPC := splitHttp && !sctx.httpOnly onlyHttp := splitHttp && sctx.httpOnly grpcEnabled := !onlyHttp httpEnabled := !onlyGRPC - v3c := v3client.New(s) - servElection := v3election.NewElectionServer(v3c) - servLock := v3lock.NewLockServer(v3c) + cfg := connmux.Config{ + Logger: sctx.lg, + Listener: sctx.l, + } // Make sure serversC is closed even if we prematurely exit the function. defer close(sctx.serversC) - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - // GRPC gateway connects to grpc server via connection provided by grpc dial. - gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) - if err != nil { - sctx.lg.Error("registerGateway failed", zap.Error(err)) - return err - } - } + var traffic string switch { case onlyGRPC: @@ -144,55 +136,7 @@ func (sctx *serveCtx) serve( } if sctx.insecure { - var gs *grpc.Server - var srv *http.Server - if httpEnabled { - httpmux := sctx.createMux(gwmux, handler) - srv = &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure http server failed", zap.Error(err)) - return err - } - } - if grpcEnabled { - gs = v3rpc.Server(s, nil, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) - } - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) - } - }(gs) - } - if onlyGRPC { - server = func() error { - return gs.Serve(sctx.l) - } - } else { - server = m.Serve - - httpl := m.Match(cmux.HTTP1()) - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, httpl) - - if grpcEnabled { - grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, l net.Listener) { - errHandler(gs.Serve(l)) - }(gs, grpcl) - } - } - - sctx.serversC <- &servers{grpc: gs, http: srv} + cfg.Insecure = true sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", zap.String("traffic", traffic), @@ -201,95 +145,99 @@ func (sctx *serveCtx) serve( } if sctx.secure { - var gs *grpc.Server - var srv *http.Server - tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } - if grpcEnabled { - gs = v3rpc.Server(s, tlscfg, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) - } - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) - } - }(gs) - } - if httpEnabled { - if grpcEnabled { - handler = grpcHandlerFunc(gs, handler) - } - httpmux := sctx.createMux(gwmux, handler) + sctx.lg.Info( + "serving client traffic securely", + zap.String("traffic", traffic), + zap.String("address", sctx.l.Addr().String()), + ) + cfg.Secure = true + cfg.TLSConfig = tlscfg + } - srv = &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure https server failed", zap.Error(err)) - return err - } + m := connmux.New(cfg) + + v3c := v3client.New(s) + servElection := v3election.NewElectionServer(v3c) + servLock := v3lock.NewLockServer(v3c) + + // Make sure serversC is closed even if we prematurely exit the function. + defer func() { + select { + case <-sctx.serversC: + default: + close(sctx.serversC) } + }() - if onlyGRPC { - server = func() error { return gs.Serve(sctx.l) } - } else { - server = m.Serve + var gs *grpc.Server + var srv *http.Server - tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if grpcEnabled { + gs = v3rpc.Server(s, nil, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + + defer func(gs *grpc.Server) { if err != nil { - return err + sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) } - go func(srvhttp *http.Server, tlsl net.Listener) { - errHandler(srvhttp.Serve(tlsl)) - }(srv, tlsl) + }(gs) + + grpcl := m.GRPCListener() + go func() { errHandler(gs.Serve(grpcl)) }() + } + + var gwmux *gw.ServeMux + if s.Cfg.EnableGRPCGateway { + // GRPC gateway connects to grpc server via connection provided by grpc dial. + gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) + if err != nil { + sctx.lg.Error("registerGateway failed", zap.Error(err)) + return err } + } - sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} - sctx.lg.Info( - "serving client traffic securely", - zap.String("traffic", traffic), - zap.String("address", sctx.l.Addr().String()), - ) + if httpEnabled { + httpmux := sctx.createMux(gwmux, handler) + + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } + httpl := m.HTTPListener() + go func() { errHandler(srv.Serve(httpl)) }() } - return server() + sctx.serversC <- &servers{secure: sctx.secure, grpc: gs, http: srv} + + return m.Serve() } func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { // todo (ahrtr): should we support configuring other parameters in the future as well? - return http2.ConfigureServer(srv, &http2.Server{ + h2s := &http2.Server{ MaxConcurrentStreams: cfg.MaxConcurrentStreams, // Override to avoid using priority scheduler which is affected by https://github.com/golang/go/issues/58804. NewWriteScheduler: http2.NewRandomWriteScheduler, - }) -} - -// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC -// connections or otherHandler otherwise. Given in gRPC docs. -func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { - if otherHandler == nil { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - grpcServer.ServeHTTP(w, r) - }) } - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - grpcServer.ServeHTTP(w, r) - } else { - otherHandler.ServeHTTP(w, r) - } - }) + // This is needed to support HTTP2 with prior knowledge, because the connmux + // pass the stream without TLS + srv.Handler = h2c.NewHandler(srv.Handler, h2s) + return http2.ConfigureServer(srv, h2s) } type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 62aea9359031..4c8154fdd40d 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -37,6 +37,7 @@ import ( "go.etcd.io/etcd/client/v3/leasing" "go.etcd.io/etcd/client/v3/namespace" "go.etcd.io/etcd/client/v3/ordering" + connmux "go.etcd.io/etcd/pkg/v3/connmux" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb" @@ -47,11 +48,11 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" "go.uber.org/zap/zapgrpc" "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -217,7 +218,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsInfo))) } m := mustListenCMux(lg, tlsInfo) - grpcl := m.Match(cmux.HTTP2()) + grpcl := m.GRPCListener() defer func() { grpcl.Close() lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) @@ -233,11 +234,12 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { } httpClient := mustNewHTTPClient(lg) - srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient) - - if err := http2.ConfigureServer(srvhttp, &http2.Server{ + srvhttp, httpl := mustHTTPListener(lg, m, client, proxyClient) + h2s := &http2.Server{ MaxConcurrentStreams: maxConcurrentStreams, - }); err != nil { + } + srvhttp.Handler = h2c.NewHandler(srvhttp.Handler, h2s) + if err := http2.ConfigureServer(srvhttp, h2s); err != nil { lg.Fatal("Failed to configure the http server", zap.Error(err)) } @@ -388,7 +390,7 @@ func newTLS(ca, cert, key string, requireEmptyCN bool) *transport.TLSInfo { return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key, EmptyCN: requireEmptyCN} } -func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { +func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) *connmux.ConnMux { l, err := net.Listen("tcp", grpcProxyListenAddr) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -399,15 +401,28 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { fmt.Fprintln(os.Stderr, err) os.Exit(1) } + cfg := connmux.Config{ + Logger: lg, + Listener: l, + } + if tlsinfo != nil { tlsinfo.CRLFile = grpcProxyListenCRL - if l, err = transport.NewTLSListener(l, tlsinfo); err != nil { - lg.Fatal("failed to create TLS listener", zap.Error(err)) + tlscfg, err := tlsinfo.ServerConfig() + if err != nil { + err := fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not valid", l.Addr().String()) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } + cfg.Secure = true + cfg.TLSConfig = tlscfg + } else { + cfg.Insecure = true } + cmux := connmux.New(cfg) lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) - return cmux.New(l) + return cmux } func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { @@ -504,7 +519,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { return server } -func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) { +func mustHTTPListener(lg *zap.Logger, m *connmux.ConnMux, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) { httpClient := mustNewHTTPClient(lg) httpmux := http.NewServeMux() httpmux.HandleFunc("/", http.NotFound) @@ -522,17 +537,7 @@ func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c Handler: httpmux, ErrorLog: log.New(io.Discard, "net/http", 0), } - - if tlsinfo == nil { - return srvhttp, m.Match(cmux.HTTP1()) - } - - srvTLS, err := tlsinfo.ServerConfig() - if err != nil { - lg.Fatal("failed to set up TLS", zap.Error(err)) - } - srvhttp.TLSConfig = srvTLS - return srvhttp, m.Match(cmux.Any()) + return srvhttp, m.HTTPListener() } func mustNewHTTPClient(lg *zap.Logger) *http.Client { diff --git a/server/etcdserver/api/rafthttp/transport.go b/server/etcdserver/api/rafthttp/transport.go index 339c5734bef6..8b0e0ca14f77 100644 --- a/server/etcdserver/api/rafthttp/transport.go +++ b/server/etcdserver/api/rafthttp/transport.go @@ -110,7 +110,7 @@ type Transport struct { Raft Raft // raft state machine, to which the Transport forwards received messages and reports status Snapshotter *snap.Snapshotter ServerStats *stats.ServerStats // used to record general transportation statistics - // used to record transportation statistics with followers when + // LeaderStats used to record transportation statistics with followers when // performing as leader in raft protocol LeaderStats *stats.LeaderStats // ErrorC is used to report detected critical errors, e.g., diff --git a/server/go.mod b/server/go.mod index e046e2f6e5c6..9b5da8f7c3d5 100644 --- a/server/go.mod +++ b/server/go.mod @@ -17,8 +17,7 @@ require ( github.com/jonboulle/clockwork v0.4.0 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/client_model v0.3.0 - github.com/soheilhy/cmux v0.1.5 - github.com/spf13/cobra v1.7.0 + github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.2 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 diff --git a/server/go.sum b/server/go.sum index 57486894fb92..9b618b117d62 100644 --- a/server/go.sum +++ b/server/go.sum @@ -208,8 +208,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -338,7 +336,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index 4f97c2bf224e..a109ea7de03c 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -71,7 +71,13 @@ func TestConnectionMultiplexing(t *testing.T) { cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}, ClientHttpSeparate: tc.separateHttpPort} clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg)) require.NoError(t, err) - defer clus.Close() + + defer func(clus *e2e.EtcdProcessCluster) error { + for _, member := range clus.Procs { + member.Kill() + } + return clus.Stop() + }(clus) var clientScenarios []e2e.ClientConnType switch tc.serverTLS { diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index c023528047b8..f06cea5ad3c2 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -64,7 +64,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/soheilhy/cmux" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" "google.golang.org/grpc" @@ -1002,16 +1001,14 @@ func (m *Member) Launch() error { } for _, ln := range m.PeerListeners { - cm := cmux.New(ln) - // don't hang on matcher after closing listener - cm.SetReadTimeout(time.Second) - - // serve http1/http2 rafthttp/grpc - ll := cm.Match(cmux.Any()) + var ll net.Listener if peerTLScfg != nil { - if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil { + ll, err = transport.NewTLSListener(ln, m.PeerTLSInfo) + if err != nil { return err } + } else { + ll = ln } hs := &httptest.Server{ Listener: ll, @@ -1025,10 +1022,7 @@ func (m *Member) Launch() error { hs.Start() donec := make(chan struct{}) - go func() { - defer close(donec) - cm.Serve() - }() + defer close(donec) closer := func() { ll.Close() hs.CloseClientConnections() diff --git a/tests/go.mod b/tests/go.mod index 5966b5f0f415..5fdc3a6b839e 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -22,7 +22,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/common v0.42.0 - github.com/soheilhy/cmux v0.1.5 github.com/stretchr/testify v1.8.2 go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 diff --git a/tests/go.sum b/tests/go.sum index 003ad77836ff..43f1e5b12d17 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -234,8 +234,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -364,7 +362,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=