From 19fa6d52dd5e5ec57a99a24939c23a050e5ccda5 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 17 Mar 2023 19:35:24 +0000 Subject: [PATCH 01/15] connection multiplexer to multiplex http and grpc over same socket Introduce a new socket connection multiplexer that multiplexes the same ip:port to an http and a grpc backends. The multiplexer is able to terminate TLS and forward the request to the corresponding grpc or http backends Signed-off-by: Antonio Ojea --- pkg/connmux/connmux.go | 335 ++++++++++++++++++++++++++++++++++++ pkg/connmux/connmux_test.go | 283 ++++++++++++++++++++++++++++++ pkg/connmux/doc.go | 21 +++ pkg/go.mod | 2 + 4 files changed, 641 insertions(+) create mode 100644 pkg/connmux/connmux.go create mode 100644 pkg/connmux/connmux_test.go create mode 100644 pkg/connmux/doc.go diff --git a/pkg/connmux/connmux.go b/pkg/connmux/connmux.go new file mode 100644 index 000000000000..bf9b86fb0537 --- /dev/null +++ b/pkg/connmux/connmux.go @@ -0,0 +1,335 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connmux + +import ( + "bufio" + "bytes" + "crypto/tls" + "io" + "log" + "net" + "strings" + "sync" + "time" + + "go.uber.org/zap" + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" +) + +const ( + // gracefullShutdownDuration is the time to wait before closing the listeners + gracefullShutdownDuration = 200 * time.Millisecond + // readDeadlineTimeout is the maximum time to wait for a read operation + readDeadlineTimeout = 1 * time.Second +) + +// ConnMux can multiple multiple HTTP and GRPC connections in the same address and port. +// It is able to multiplex connections with and without TLS. +// ConnMux can only forward connections to one HTTP or one GRPC server. +// If only one of those is available it will forward all connections directly. +type ConnMux struct { + lg *zap.Logger + root net.Listener + donec chan struct{} + + secure bool // serve TLS + insecure bool // serve insecure + tlsConfig *tls.Config + + mu sync.Mutex + http *muxListener + grpc *muxListener +} + +type Config struct { + Logger *zap.Logger + Listener net.Listener + Secure bool + Insecure bool + TLSConfig *tls.Config +} + +// New creates a new ConnMux. +func New(cfg Config) *ConnMux { + // defaulting + if cfg.Logger == nil { + cfg.Logger = zap.NewNop() + } + + return &ConnMux{ + lg: cfg.Logger, + root: cfg.Listener, + insecure: cfg.Insecure, + secure: cfg.Secure, + tlsConfig: cfg.TLSConfig, + donec: make(chan struct{}), + } +} + +// Serve starts serving connections +func (c *ConnMux) Serve() error { + for { + conn, err := c.root.Accept() + if err != nil { + c.lg.Error("connection error", zap.Error(err)) + return c.Close() + } + go c.serve(conn) + } +} + +// peekHTTP2PrefaceBytes define the bytes we need to peek to be able to differentiate between HTTP and GRPC. +// Since GRPC uses HTTP2 we need to match the connection preface +// https://httpwg.org/specs/rfc9113.html#preface (24 octects) +// 3.5. HTTP/2 Connection Preface +// The client connection preface starts with a sequence of 24 octets, which in hex notation is: +// 0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a +// That is, the connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). +// This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty. +// 4.1. Frame Format +// All frames begin with a fixed 9-octet header followed by a variable-length payload. +// min peek size is 24 + 9 = 33 +const peekHTTP2PrefaceBytes = 33 + +func (c *ConnMux) serve(conn net.Conn) { + // avoid to get blocked in any read operation + conn.SetDeadline(time.Now().Add(readDeadlineTimeout)) + defer conn.SetReadDeadline(time.Time{}) + + var proxiedConn bufferedConn + + buffConn := newBufferedConn(conn) + // Check if is TLS or plain TCP + b, err := buffConn.Peek(peekHTTP2PrefaceBytes) + if err != nil { + // exit since it will panic trying to detect if is TLS + c.lg.Error("error reading from the connection", zap.Error(err)) + conn.Close() + return + } + // is TLS + isTLS := b[0] == 0x16 + if isTLS { + if !c.secure { + c.lg.Error("secure connections not enabled") + conn.Close() + return + } + // Establish the TLS connection + tlsConn := tls.Server(buffConn, c.tlsConfig) + err = tlsConn.Handshake() + if err != nil { + c.lg.Error("error establishing the TLS connection", zap.Error(err)) + conn.Close() + return + } + + proxiedConn = newBufferedConn(tlsConn) + // It is a "new" connection obtained after the handshake so we have to do another read + // to get the new data, but be careful to not get blocked in the read if there are no enough data. + _, err = proxiedConn.Peek(peekHTTP2PrefaceBytes) + if err != nil { + // try to see how far we go, it will be discarded by the server if we route it to the wrong one + c.lg.Error("error reading from the TLS connection", zap.Error(err)) + } + } else { + if !c.insecure { + c.lg.Error("insecure connections not enabled") + conn.Close() + return + } + proxiedConn = buffConn + } + + // read the whole buffer + b, err = proxiedConn.Peek(proxiedConn.r.Buffered()) + if err != nil && err != io.EOF { + c.lg.Error("error reading", zap.Error(err)) + conn.Close() + return + } + c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(b)), zap.String("buffer content", string(b))) + reader := bytes.NewReader(b) + isHTTP2 := isHTTP2Connection(reader) + // if is not http2 it is not grpc + if !isHTTP2 { + c.forward(false, proxiedConn) + } else { + c.forward(isGRPCConnection(c.lg, reader), proxiedConn) + } +} + +func (c *ConnMux) forward(isGRPC bool, conn net.Conn) { + c.mu.Lock() + defer c.mu.Unlock() + + if !isGRPC && c.http != nil { + c.lg.Debug("forwarding connection to the HTTP backend", zap.String("remote address", conn.RemoteAddr().String())) + select { + case c.http.connc <- conn: + case <-c.donec: + } + return + } + + if c.grpc != nil { + c.lg.Debug("forwarding connection to the GRPC backend", zap.String("remote address", conn.RemoteAddr().String())) + select { + case c.grpc.connc <- conn: + case <-c.donec: + } + return + } + + log.Println("unknown connection") + conn.Close() +} + +// Close closes the listeners +func (c *ConnMux) Close() error { + time.Sleep(gracefullShutdownDuration) + c.closeDoneChans() + return c.root.Close() +} + +func (c *ConnMux) closeDoneChans() { + select { + case <-c.donec: + default: + close(c.donec) + } +} + +// muxListener is the listener exposed to the HTTP and GRPC servers +// The root listener Accept() method is overriden +// The multiplexed servers have access to the Close() and Address() +// methods on the root listener. +type muxListener struct { + net.Listener + connc chan net.Conn + donec chan struct{} +} + +var _ net.Listener = (*muxListener)(nil) + +func (l muxListener) Accept() (net.Conn, error) { + select { + case c, ok := <-l.connc: + if !ok { + return nil, net.ErrClosed + } + return c, nil + case <-l.donec: + return nil, net.ErrClosed + } +} + +// HTTPListener returns a net.Listener that will receive http requests +func (c *ConnMux) HTTPListener() net.Listener { + c.mu.Lock() + defer c.mu.Unlock() + if c.http == nil { + c.http = &muxListener{c.root, make(chan net.Conn), c.donec} + } + return c.http +} + +// GRPCListener returns a net.Listener that will receive grpc requests +func (c *ConnMux) GRPCListener() net.Listener { + c.mu.Lock() + defer c.mu.Unlock() + if c.grpc == nil { + c.grpc = &muxListener{c.root, make(chan net.Conn), c.donec} + } + return c.grpc +} + +// bufferedConn allows to peek in the buffer of the connection +// without advancing the reader. +type bufferedConn struct { + r *bufio.Reader + net.Conn +} + +func newBufferedConn(c net.Conn) bufferedConn { + return bufferedConn{bufio.NewReader(c), c} +} + +func (b bufferedConn) Peek(n int) ([]byte, error) { + return b.r.Peek(n) +} + +func (b bufferedConn) Read(p []byte) (int, error) { + return b.r.Read(p) +} + +func isHTTP2Connection(r io.Reader) bool { + // check the http2 client preface + buf := make([]byte, len(http2.ClientPreface)) + n, err := r.Read(buf) + if err != nil || n != len(http2.ClientPreface) { + return false + } + return bytes.Equal(buf, []byte(http2.ClientPreface)) +} + +func isGRPCConnection(lg *zap.Logger, r io.Reader) bool { + done := false + isGRPC := false + // check if len of the settings frame is 0 or the headers with the "content-type" + // indicates is an grpc connection. + framer := http2.NewFramer(io.Discard, r) + // use the default value for the maxDynamixTable size + // https://pkg.go.dev/golang.org/x/net/http2 + // "If zero, the default value of 4096 is used." + hdec := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + // match headers names or values based on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md + if strings.Contains(hf.Name, "grpc") { + isGRPC = true + done = true + lg.Debug("found grpc header", zap.String("header", hf.Name)) + return + } + if hf.Name == "content-type" { + isGRPC = strings.Contains(hf.Value, "application/grpc") + lg.Debug("found content-type header", zap.String("content-type", hf.Value)) + } + done = true + }) + for { + f, err := framer.ReadFrame() + if err != nil { + break + } + switch f := f.(type) { + case *http2.SettingsFrame: + // Observed behavior is that etcd GRPC clients sends an empty setting frame + // and block waiting for an answer. + if f.Length == 0 { + isGRPC = true + done = true + lg.Debug("found setting frame with zero length") + } + case *http2.HeadersFrame: + hdec.Write(f.HeaderBlockFragment()) + } + if done { + break + } + } + return isGRPC +} diff --git a/pkg/connmux/connmux_test.go b/pkg/connmux/connmux_test.go new file mode 100644 index 000000000000..468d9ee8acce --- /dev/null +++ b/pkg/connmux/connmux_test.go @@ -0,0 +1,283 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connmux + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "io/ioutil" + "net" + "strings" + "sync" + "time" + + "net/http" + "net/url" + "testing" + + "go.uber.org/zap" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +func newTestConnMux(t *testing.T) *ConnMux { + certFile := "../../tests/fixtures/server.crt" + keyFile := "../../tests/fixtures/server.key.insecure" + + tcpListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("error creating listener: %v", err) + } + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + t.Errorf("failed to load TLS keys: %v", err) + } + connMux := New(Config{ + Logger: zap.NewExample(), + Listener: tcpListener, + Insecure: true, + Secure: true, + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{"h2", "http/1.1"}, + }, + }) + + t.Logf("Listening on address %s", connMux.root.Addr().String()) + return connMux +} + +func TestConnMux(t *testing.T) { + connMux := newTestConnMux(t) + // HTTP server + httpL := connMux.HTTPListener() + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "HTTP response Proto: %s %v\n", r.Proto, r.Header) + }) + h2s := &http2.Server{} + httpServer := &http.Server{ + Handler: h2c.NewHandler(mux, h2s), + } + http2.ConfigureServer(httpServer, h2s) + go httpServer.Serve(httpL) + + // GRPC Server + grpcServer := grpc.NewServer() + testpb.RegisterTestServiceServer(grpcServer, &dummyStubServer{body: []byte("grpc response")}) + + grpcL := connMux.GRPCListener() + go grpcServer.Serve(grpcL) + + go connMux.Serve() + defer connMux.Close() + + destHost := connMux.root.Addr().String() + destHTTP := url.URL{Scheme: "http", Host: destHost} + destHTTPS := url.URL{Scheme: "https", Host: destHost} + + // HTTP plain + t.Logf("Test HTTP1 without TLS works") + client := &http.Client{} + verifyHTTPRequest(t, client, destHTTP.String()) + + // HTTPS + t.Logf("Test HTTP1 with TLS works") + client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }} + verifyHTTPRequest(t, client, destHTTPS.String()) + + // HTTP2 plain + t.Logf("Test HTTP2 without TLS works") + client.Transport = &http2.Transport{ + AllowHTTP: true, + DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(netw, addr) + }} + verifyHTTPRequest(t, client, destHTTP.String()) + + // HTTP2 TLS + t.Logf("Test HTTP2 with TLS works") + client.Transport = &http2.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{http2.NextProtoTLS}, + }, + } + verifyHTTPRequest(t, client, destHTTPS.String()) + + // Set up a connection to the server. + t.Logf("Test GRPC works") + conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("did not connect: %v", err) + } + defer conn.Close() + verifyGRPCRequest(t, conn) + +} + +// TestConnMuxConcurrency tests that HTTP and GRPC connections work repet +func TestConnMuxConcurrency(t *testing.T) { + connMux := newTestConnMux(t) + // HTTP server + httpL := connMux.HTTPListener() + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "HTTP response Proto: %s %v\n", r.Proto, r.Header) + }) + h2s := &http2.Server{} + httpServer := &http.Server{ + Handler: h2c.NewHandler(mux, h2s), + } + http2.ConfigureServer(httpServer, h2s) + go httpServer.Serve(httpL) + + // GRPC Server + grpcServer := grpc.NewServer() + testpb.RegisterTestServiceServer(grpcServer, &dummyStubServer{body: []byte("grpc response")}) + + grpcL := connMux.GRPCListener() + go grpcServer.Serve(grpcL) + + go connMux.Serve() + defer connMux.Close() + + destHost := connMux.root.Addr().String() + destHTTP := url.URL{Scheme: "http", Host: destHost} + destHTTPS := url.URL{Scheme: "https", Host: destHost} + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(5) + go func() { + // HTTP plain + defer wg.Done() + t.Logf("Test HTTP1 without TLS works") + client := &http.Client{} + verifyHTTPRequest(t, client, destHTTP.String()) + }() + + go func() { + // HTTPS + defer wg.Done() + t.Logf("Test HTTP1 with TLS works") + client := &http.Client{} + client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }} + verifyHTTPRequest(t, client, destHTTPS.String()) + }() + + go func() { + // HTTP2 plain + defer wg.Done() + t.Logf("Test HTTP2 without TLS works") + client := &http.Client{} + client.Transport = &http2.Transport{ + AllowHTTP: true, + DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(netw, addr) + }} + verifyHTTPRequest(t, client, destHTTP.String()) + }() + + go func() { + // HTTP2 TLS + defer wg.Done() + t.Logf("Test HTTP2 with TLS works") + client := &http.Client{} + client.Transport = &http2.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{http2.NextProtoTLS}, + }, + } + verifyHTTPRequest(t, client, destHTTPS.String()) + }() + + go func() { + // Set up a connection to the server. + defer wg.Done() + t.Logf("Test GRPC works") + conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("did not connect: %v", err) + } + defer conn.Close() + verifyGRPCRequest(t, conn) + }() + } + wg.Wait() + +} + +func verifyHTTPRequest(t *testing.T, client *http.Client, destination string) { + request, err := http.NewRequest("Get", destination, nil) + if err != nil { + t.Error(err) + } + response, err := client.Do(request) + if err != nil { + t.Error(err) + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + t.Error(err) + } + t.Logf("received body: %s", string(body)) + if !strings.Contains(string(body), "HTTP response Proto") { + t.Errorf("expected \"HTTP response Proto\" , got %s", body) + } + +} + +func verifyGRPCRequest(t *testing.T, conn grpc.ClientConnInterface) { + // Contact the server and print out its response. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + c := testpb.NewTestServiceClient(conn) + + c.HalfDuplexCall(ctx) + resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) + if err != nil { + t.Error("failed to invoke rpc to foo (e1)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), []byte("grpc response")) { + t.Errorf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody()) + } + +} + +type dummyStubServer struct { + testpb.UnimplementedTestServiceServer + body []byte +} + +func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Type: testpb.PayloadType_COMPRESSABLE, + Body: d.body, + }, + }, nil +} diff --git a/pkg/connmux/doc.go b/pkg/connmux/doc.go new file mode 100644 index 000000000000..476450865dc4 --- /dev/null +++ b/pkg/connmux/doc.go @@ -0,0 +1,21 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package connmux allows to multiplex http and grpc requests connection, +// with and without TLS, using the same port and address. +// The package is inspired in the existing project https://github.com/soheilhy/cmux +// adding some features specific for the etcd use case. +// This code is not mean to be used outside of etcd. + +package connmux diff --git a/pkg/go.mod b/pkg/go.mod index 2e59ed4bbbec..1ffca9a46163 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,6 +14,8 @@ require ( google.golang.org/grpc v1.51.0 ) +require golang.org/x/net v0.8.0 + require ( github.com/benbjohnson/clock v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect From f201fef787dea307dc5590f8faeb132b8db99196 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 10:22:47 +0000 Subject: [PATCH 02/15] improve error matching for closed connections Signed-off-by: Antonio Ojea --- client/pkg/transport/listener.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/client/pkg/transport/listener.go b/client/pkg/transport/listener.go index 5e0e13e25a73..c562b107a278 100644 --- a/client/pkg/transport/listener.go +++ b/client/pkg/transport/listener.go @@ -586,11 +586,19 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) { return cfg, nil } -// IsClosedConnError returns true if the error is from closing listener, cmux. +// IsClosedConnError returns true if the error is from closing listener. // copied from golang.org/x/net/http2/http2.go func IsClosedConnError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, net.ErrClosed) { + return true + } // 'use of closed network connection' (Go <=1.8) // 'use of closed file or network connection' (Go >1.8, internal/poll.ErrClosing) - // 'mux: listener closed' (cmux.ErrListenerClosed) - return err != nil && strings.Contains(err.Error(), "closed") + if strings.Contains(err.Error(), "closed") { + return true + } + return false } From 2a905c7bedbb4319aafc5aadb5f5c153be254e3d Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 20:17:40 +0000 Subject: [PATCH 03/15] fix e2e tests cmux Signed-off-by: Antonio Ojea --- tests/e2e/cmux_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/e2e/cmux_test.go b/tests/e2e/cmux_test.go index 4f97c2bf224e..a109ea7de03c 100644 --- a/tests/e2e/cmux_test.go +++ b/tests/e2e/cmux_test.go @@ -71,7 +71,13 @@ func TestConnectionMultiplexing(t *testing.T) { cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}, ClientHttpSeparate: tc.separateHttpPort} clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg)) require.NoError(t, err) - defer clus.Close() + + defer func(clus *e2e.EtcdProcessCluster) error { + for _, member := range clus.Procs { + member.Kill() + } + return clus.Stop() + }(clus) var clientScenarios []e2e.ClientConnType switch tc.serverTLS { From 254dc4f0d84792af1be5b6585ecfe562d69e1369 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 10:16:55 +0000 Subject: [PATCH 04/15] stop using cmux in etcd it was just adding one element in the path when it can be used directly, as it is configured as a passthrough. Signed-off-by: Antonio Ojea --- server/embed/etcd.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 48f5cdb6b38b..8ce31b1a6ac1 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -44,7 +44,6 @@ import ( "go.etcd.io/etcd/server/v3/verify" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/soheilhy/cmux" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -567,19 +566,17 @@ func (e *Etcd) servePeers() (err error) { for _, p := range e.Peers { u := p.Listener.Addr().String() - m := cmux.New(p.Listener) srv := &http.Server{ Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), // do not log user error } - go srv.Serve(m.Match(cmux.Any())) p.serve = func() error { e.cfg.logger.Info( - "cmux::serve", + "start::serve", zap.String("address", u), ) - return m.Serve() + return srv.Serve(p.Listener) } p.close = func(ctx context.Context) error { // gracefully shutdown http.Server @@ -594,7 +591,7 @@ func (e *Etcd) servePeers() (err error) { "stopped serving peer traffic", zap.String("address", u), ) - m.Close() + srv.Close() return nil } } From a51bad3c5413c54f0304108c33bc61c1db539522 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 10:17:50 +0000 Subject: [PATCH 05/15] replace cmux by the new sockproxy sockproxy allows to listen in the same listener both in http and grpc with or without TLS enabled, allowing to remove the duplicate logic for secure and insecure, and to remove the http2 handler for splitting the grpc and http traffic. Signed-off-by: Antonio Ojea --- server/embed/serve.go | 208 ++++++++++++++++-------------------------- 1 file changed, 78 insertions(+), 130 deletions(-) diff --git a/server/embed/serve.go b/server/embed/serve.go index af9f9fdacbb7..63e70e34f84f 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -26,6 +26,7 @@ import ( etcdservergw "go.etcd.io/etcd/api/v3/etcdserverpb/gw" "go.etcd.io/etcd/client/pkg/v3/transport" + connmux "go.etcd.io/etcd/pkg/v3/connmux" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" "go.etcd.io/etcd/server/v3/config" @@ -40,10 +41,10 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" gw "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" "go.uber.org/zap" "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "golang.org/x/net/trace" "google.golang.org/grpc" ) @@ -111,28 +112,19 @@ func (sctx *serveCtx) serve( sctx.lg.Info("ready to serve client requests") - m := cmux.New(sctx.l) - var server func() error onlyGRPC := splitHttp && !sctx.httpOnly onlyHttp := splitHttp && sctx.httpOnly grpcEnabled := !onlyHttp httpEnabled := !onlyGRPC - v3c := v3client.New(s) - servElection := v3election.NewElectionServer(v3c) - servLock := v3lock.NewLockServer(v3c) + cfg := connmux.Config{ + Logger: sctx.lg, + Listener: sctx.l, + } // Make sure serversC is closed even if we prematurely exit the function. defer close(sctx.serversC) - var gwmux *gw.ServeMux - if s.Cfg.EnableGRPCGateway { - // GRPC gateway connects to grpc server via connection provided by grpc dial. - gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) - if err != nil { - sctx.lg.Error("registerGateway failed", zap.Error(err)) - return err - } - } + var traffic string switch { case onlyGRPC: @@ -144,55 +136,7 @@ func (sctx *serveCtx) serve( } if sctx.insecure { - var gs *grpc.Server - var srv *http.Server - if httpEnabled { - httpmux := sctx.createMux(gwmux, handler) - srv = &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure http server failed", zap.Error(err)) - return err - } - } - if grpcEnabled { - gs = v3rpc.Server(s, nil, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) - } - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err)) - } - }(gs) - } - if onlyGRPC { - server = func() error { - return gs.Serve(sctx.l) - } - } else { - server = m.Serve - - httpl := m.Match(cmux.HTTP1()) - go func(srvhttp *http.Server, tlsLis net.Listener) { - errHandler(srvhttp.Serve(tlsLis)) - }(srv, httpl) - - if grpcEnabled { - grpcl := m.Match(cmux.HTTP2()) - go func(gs *grpc.Server, l net.Listener) { - errHandler(gs.Serve(l)) - }(gs, grpcl) - } - } - - sctx.serversC <- &servers{grpc: gs, http: srv} + cfg.Insecure = true sctx.lg.Info( "serving client traffic insecurely; this is strongly discouraged!", zap.String("traffic", traffic), @@ -201,95 +145,99 @@ func (sctx *serveCtx) serve( } if sctx.secure { - var gs *grpc.Server - var srv *http.Server - tlscfg, tlsErr := tlsinfo.ServerConfig() if tlsErr != nil { return tlsErr } - if grpcEnabled { - gs = v3rpc.Server(s, tlscfg, nil, gopts...) - v3electionpb.RegisterElectionServer(gs, servElection) - v3lockpb.RegisterLockServer(gs, servLock) - if sctx.serviceRegister != nil { - sctx.serviceRegister(gs) - } - defer func(gs *grpc.Server) { - if err != nil { - sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) - gs.Stop() - sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) - } - }(gs) - } - if httpEnabled { - if grpcEnabled { - handler = grpcHandlerFunc(gs, handler) - } - httpmux := sctx.createMux(gwmux, handler) + sctx.lg.Info( + "serving client traffic securely", + zap.String("traffic", traffic), + zap.String("address", sctx.l.Addr().String()), + ) + cfg.Secure = true + cfg.TLSConfig = tlscfg + } - srv = &http.Server{ - Handler: createAccessController(sctx.lg, s, httpmux), - TLSConfig: tlscfg, - ErrorLog: logger, // do not log user error - } - if err := configureHttpServer(srv, s.Cfg); err != nil { - sctx.lg.Error("Configure https server failed", zap.Error(err)) - return err - } + m := connmux.New(cfg) + + v3c := v3client.New(s) + servElection := v3election.NewElectionServer(v3c) + servLock := v3lock.NewLockServer(v3c) + + // Make sure serversC is closed even if we prematurely exit the function. + defer func() { + select { + case <-sctx.serversC: + default: + close(sctx.serversC) } + }() - if onlyGRPC { - server = func() error { return gs.Serve(sctx.l) } - } else { - server = m.Serve + var gs *grpc.Server + var srv *http.Server - tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo) + if grpcEnabled { + gs = v3rpc.Server(s, nil, nil, gopts...) + v3electionpb.RegisterElectionServer(gs, servElection) + v3lockpb.RegisterLockServer(gs, servLock) + if sctx.serviceRegister != nil { + sctx.serviceRegister(gs) + } + + defer func(gs *grpc.Server) { if err != nil { - return err + sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err)) + gs.Stop() + sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err)) } - go func(srvhttp *http.Server, tlsl net.Listener) { - errHandler(srvhttp.Serve(tlsl)) - }(srv, tlsl) + }(gs) + + grpcl := m.GRPCListener() + go func() { errHandler(gs.Serve(grpcl)) }() + } + + var gwmux *gw.ServeMux + if s.Cfg.EnableGRPCGateway { + // GRPC gateway connects to grpc server via connection provided by grpc dial. + gwmux, err = sctx.registerGateway(grpcDialForRestGatewayBackends) + if err != nil { + sctx.lg.Error("registerGateway failed", zap.Error(err)) + return err } + } - sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} - sctx.lg.Info( - "serving client traffic securely", - zap.String("traffic", traffic), - zap.String("address", sctx.l.Addr().String()), - ) + if httpEnabled { + httpmux := sctx.createMux(gwmux, handler) + + srv = &http.Server{ + Handler: createAccessController(sctx.lg, s, httpmux), + ErrorLog: logger, // do not log user error + } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } + httpl := m.HTTPListener() + go func() { errHandler(srv.Serve(httpl)) }() } - return server() + sctx.serversC <- &servers{secure: sctx.secure, grpc: gs, http: srv} + + return m.Serve() } func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { // todo (ahrtr): should we support configuring other parameters in the future as well? - return http2.ConfigureServer(srv, &http2.Server{ + h2s := &http2.Server{ MaxConcurrentStreams: cfg.MaxConcurrentStreams, // Override to avoid using priority scheduler which is affected by https://github.com/golang/go/issues/58804. NewWriteScheduler: http2.NewRandomWriteScheduler, - }) -} - -// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC -// connections or otherHandler otherwise. Given in gRPC docs. -func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { - if otherHandler == nil { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - grpcServer.ServeHTTP(w, r) - }) } - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - grpcServer.ServeHTTP(w, r) - } else { - otherHandler.ServeHTTP(w, r) - } - }) + // This is needed to support HTTP2 with prior knowledge, because the connmux + // pass the stream without TLS + srv.Handler = h2c.NewHandler(srv.Handler, h2s) + return http2.ConfigureServer(srv, h2s) } type registerHandlerFunc func(context.Context, *gw.ServeMux, *grpc.ClientConn) error From 7ff9a644f5092ab64d31fbcfb578eb3bb895401f Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 10:29:26 +0000 Subject: [PATCH 06/15] remove cmux from integration test Signed-off-by: Antonio Ojea --- tests/framework/integration/cluster.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index c023528047b8..f06cea5ad3c2 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -64,7 +64,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/soheilhy/cmux" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" "google.golang.org/grpc" @@ -1002,16 +1001,14 @@ func (m *Member) Launch() error { } for _, ln := range m.PeerListeners { - cm := cmux.New(ln) - // don't hang on matcher after closing listener - cm.SetReadTimeout(time.Second) - - // serve http1/http2 rafthttp/grpc - ll := cm.Match(cmux.Any()) + var ll net.Listener if peerTLScfg != nil { - if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil { + ll, err = transport.NewTLSListener(ln, m.PeerTLSInfo) + if err != nil { return err } + } else { + ll = ln } hs := &httptest.Server{ Listener: ll, @@ -1025,10 +1022,7 @@ func (m *Member) Launch() error { hs.Start() donec := make(chan struct{}) - go func() { - defer close(donec) - cm.Serve() - }() + defer close(donec) closer := func() { ll.Close() hs.CloseClientConnections() From 099234f023a3bc58088266e083fe6057f68cead4 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 14:02:41 +0000 Subject: [PATCH 07/15] remove cmux from grpc_proxy Signed-off-by: Antonio Ojea --- server/etcdmain/grpc_proxy.go | 49 +++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 62aea9359031..4c8154fdd40d 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -37,6 +37,7 @@ import ( "go.etcd.io/etcd/client/v3/leasing" "go.etcd.io/etcd/client/v3/namespace" "go.etcd.io/etcd/client/v3/ordering" + connmux "go.etcd.io/etcd/pkg/v3/connmux" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb" @@ -47,11 +48,11 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" "go.uber.org/zap/zapgrpc" "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -217,7 +218,7 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { lg.Info("gRPC proxy server TLS", zap.String("tls-info", fmt.Sprintf("%+v", tlsInfo))) } m := mustListenCMux(lg, tlsInfo) - grpcl := m.Match(cmux.HTTP2()) + grpcl := m.GRPCListener() defer func() { grpcl.Close() lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) @@ -233,11 +234,12 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { } httpClient := mustNewHTTPClient(lg) - srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient) - - if err := http2.ConfigureServer(srvhttp, &http2.Server{ + srvhttp, httpl := mustHTTPListener(lg, m, client, proxyClient) + h2s := &http2.Server{ MaxConcurrentStreams: maxConcurrentStreams, - }); err != nil { + } + srvhttp.Handler = h2c.NewHandler(srvhttp.Handler, h2s) + if err := http2.ConfigureServer(srvhttp, h2s); err != nil { lg.Fatal("Failed to configure the http server", zap.Error(err)) } @@ -388,7 +390,7 @@ func newTLS(ca, cert, key string, requireEmptyCN bool) *transport.TLSInfo { return &transport.TLSInfo{TrustedCAFile: ca, CertFile: cert, KeyFile: key, EmptyCN: requireEmptyCN} } -func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { +func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) *connmux.ConnMux { l, err := net.Listen("tcp", grpcProxyListenAddr) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -399,15 +401,28 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { fmt.Fprintln(os.Stderr, err) os.Exit(1) } + cfg := connmux.Config{ + Logger: lg, + Listener: l, + } + if tlsinfo != nil { tlsinfo.CRLFile = grpcProxyListenCRL - if l, err = transport.NewTLSListener(l, tlsinfo); err != nil { - lg.Fatal("failed to create TLS listener", zap.Error(err)) + tlscfg, err := tlsinfo.ServerConfig() + if err != nil { + err := fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not valid", l.Addr().String()) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } + cfg.Secure = true + cfg.TLSConfig = tlscfg + } else { + cfg.Insecure = true } + cmux := connmux.New(cfg) lg.Info("listening for gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) - return cmux.New(l) + return cmux } func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { @@ -504,7 +519,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { return server } -func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) { +func mustHTTPListener(lg *zap.Logger, m *connmux.ConnMux, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) { httpClient := mustNewHTTPClient(lg) httpmux := http.NewServeMux() httpmux.HandleFunc("/", http.NotFound) @@ -522,17 +537,7 @@ func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c Handler: httpmux, ErrorLog: log.New(io.Discard, "net/http", 0), } - - if tlsinfo == nil { - return srvhttp, m.Match(cmux.HTTP1()) - } - - srvTLS, err := tlsinfo.ServerConfig() - if err != nil { - lg.Fatal("failed to set up TLS", zap.Error(err)) - } - srvhttp.TLSConfig = srvTLS - return srvhttp, m.Match(cmux.Any()) + return srvhttp, m.HTTPListener() } func mustNewHTTPClient(lg *zap.Logger) *http.Client { From 1e051bb75070ea4b864af8fba8874a73f0d4ff85 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 14:02:54 +0000 Subject: [PATCH 08/15] remove cmux go.mod Signed-off-by: Antonio Ojea --- go.mod | 1 - go.sum | 3 --- 2 files changed, 4 deletions(-) diff --git a/go.mod b/go.mod index 33784a434c31..857bb921b31a 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,6 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/sirupsen/logrus v1.8.1 // indirect - github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect diff --git a/go.sum b/go.sum index f4132506e504..3454e9af5f02 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -386,7 +384,6 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= From acd13967ad84b1ffccc989bf2c4b72c24bd38ca3 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 14:05:50 +0000 Subject: [PATCH 09/15] remove cmux gomod from server Signed-off-by: Antonio Ojea --- server/go.mod | 3 +-- server/go.sum | 3 --- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/server/go.mod b/server/go.mod index e046e2f6e5c6..9b5da8f7c3d5 100644 --- a/server/go.mod +++ b/server/go.mod @@ -17,8 +17,7 @@ require ( github.com/jonboulle/clockwork v0.4.0 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/client_model v0.3.0 - github.com/soheilhy/cmux v0.1.5 - github.com/spf13/cobra v1.7.0 + github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.8.2 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 diff --git a/server/go.sum b/server/go.sum index 57486894fb92..9b618b117d62 100644 --- a/server/go.sum +++ b/server/go.sum @@ -208,8 +208,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -338,7 +336,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= From f23d4a474e0d910aae2cb34d917e20639505e9c4 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 14:06:11 +0000 Subject: [PATCH 10/15] remove cmux gomod from tests Signed-off-by: Antonio Ojea --- tests/go.mod | 1 - tests/go.sum | 3 --- 2 files changed, 4 deletions(-) diff --git a/tests/go.mod b/tests/go.mod index 5966b5f0f415..5fdc3a6b839e 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -22,7 +22,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/common v0.42.0 - github.com/soheilhy/cmux v0.1.5 github.com/stretchr/testify v1.8.2 go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 diff --git a/tests/go.sum b/tests/go.sum index 003ad77836ff..43f1e5b12d17 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -234,8 +234,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= -github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= -github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -364,7 +362,6 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= From 995acf710ebcee4f278287b3d715e9738c77a159 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sat, 18 Mar 2023 14:07:24 +0000 Subject: [PATCH 11/15] update bom Signed-off-by: Antonio Ojea --- bill-of-materials.json | 9 --------- 1 file changed, 9 deletions(-) diff --git a/bill-of-materials.json b/bill-of-materials.json index db45756dc769..c15e0596ddf8 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -368,15 +368,6 @@ } ] }, - { - "project": "github.com/soheilhy/cmux", - "licenses": [ - { - "type": "Apache License 2.0", - "confidence": 1 - } - ] - }, { "project": "github.com/spf13/cobra", "licenses": [ From 59e13d710b885452e2ae20ada8773565a5bcd87f Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sun, 19 Mar 2023 23:52:43 +0000 Subject: [PATCH 12/15] fix godoc linter error on rafthttp transport Signed-off-by: Antonio Ojea --- server/etcdserver/api/rafthttp/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/etcdserver/api/rafthttp/transport.go b/server/etcdserver/api/rafthttp/transport.go index 339c5734bef6..8b0e0ca14f77 100644 --- a/server/etcdserver/api/rafthttp/transport.go +++ b/server/etcdserver/api/rafthttp/transport.go @@ -110,7 +110,7 @@ type Transport struct { Raft Raft // raft state machine, to which the Transport forwards received messages and reports status Snapshotter *snap.Snapshotter ServerStats *stats.ServerStats // used to record general transportation statistics - // used to record transportation statistics with followers when + // LeaderStats used to record transportation statistics with followers when // performing as leader in raft protocol LeaderStats *stats.LeaderStats // ErrorC is used to report detected critical errors, e.g., From 6d4ed0bd0259547a0d1427ffe7784588063e61c2 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Tue, 28 Mar 2023 14:07:15 +0000 Subject: [PATCH 13/15] WIP parse http2 connection headers The only reliable method to detect a GRPC connection seems to be parsing the content-type header. We need to be able to establish the HTTP2 connection so the client sends us the headers. Signed-off-by: Antonio Ojea --- pkg/connmux/connmux.go | 157 +++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 78 deletions(-) diff --git a/pkg/connmux/connmux.go b/pkg/connmux/connmux.go index bf9b86fb0537..ff5774f0223d 100644 --- a/pkg/connmux/connmux.go +++ b/pkg/connmux/connmux.go @@ -15,7 +15,6 @@ package connmux import ( - "bufio" "bytes" "crypto/tls" "io" @@ -92,37 +91,24 @@ func (c *ConnMux) Serve() error { } } -// peekHTTP2PrefaceBytes define the bytes we need to peek to be able to differentiate between HTTP and GRPC. -// Since GRPC uses HTTP2 we need to match the connection preface -// https://httpwg.org/specs/rfc9113.html#preface (24 octects) -// 3.5. HTTP/2 Connection Preface -// The client connection preface starts with a sequence of 24 octets, which in hex notation is: -// 0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a -// That is, the connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). -// This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty. -// 4.1. Frame Format -// All frames begin with a fixed 9-octet header followed by a variable-length payload. -// min peek size is 24 + 9 = 33 -const peekHTTP2PrefaceBytes = 33 - func (c *ConnMux) serve(conn net.Conn) { // avoid to get blocked in any read operation conn.SetDeadline(time.Now().Add(readDeadlineTimeout)) defer conn.SetReadDeadline(time.Time{}) - var proxiedConn bufferedConn - + buf := make([]byte, 1024) buffConn := newBufferedConn(conn) // Check if is TLS or plain TCP - b, err := buffConn.Peek(peekHTTP2PrefaceBytes) - if err != nil { + _, err := buffConn.sniffReader().Read(buf) + if err != nil && err != io.EOF { // exit since it will panic trying to detect if is TLS c.lg.Error("error reading from the connection", zap.Error(err)) conn.Close() return } - // is TLS - isTLS := b[0] == 0x16 + c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(buf)), zap.String("buffer content", string(buf[0:]))) + // Check if is TLS or plain TCP + isTLS := buf[0] == 0x16 if isTLS { if !c.secure { c.lg.Error("secure connections not enabled") @@ -137,40 +123,20 @@ func (c *ConnMux) serve(conn net.Conn) { conn.Close() return } - - proxiedConn = newBufferedConn(tlsConn) // It is a "new" connection obtained after the handshake so we have to do another read - // to get the new data, but be careful to not get blocked in the read if there are no enough data. - _, err = proxiedConn.Peek(peekHTTP2PrefaceBytes) - if err != nil { - // try to see how far we go, it will be discarded by the server if we route it to the wrong one - c.lg.Error("error reading from the TLS connection", zap.Error(err)) - } + proxiedConn := newBufferedConn(tlsConn) + isGRPC := isGRPCConnection(c.lg, proxiedConn, proxiedConn.sniffReader()) + c.forward(isGRPC, proxiedConn) } else { if !c.insecure { c.lg.Error("insecure connections not enabled") conn.Close() return } - proxiedConn = buffConn + isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(bytes.NewReader(buf), buffConn.sniffReader())) + c.forward(isGRPC, buffConn) } - // read the whole buffer - b, err = proxiedConn.Peek(proxiedConn.r.Buffered()) - if err != nil && err != io.EOF { - c.lg.Error("error reading", zap.Error(err)) - conn.Close() - return - } - c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(b)), zap.String("buffer content", string(b))) - reader := bytes.NewReader(b) - isHTTP2 := isHTTP2Connection(reader) - // if is not http2 it is not grpc - if !isHTTP2 { - c.forward(false, proxiedConn) - } else { - c.forward(isGRPCConnection(c.lg, reader), proxiedConn) - } } func (c *ConnMux) forward(isGRPC bool, conn net.Conn) { @@ -261,75 +227,110 @@ func (c *ConnMux) GRPCListener() net.Listener { // bufferedConn allows to peek in the buffer of the connection // without advancing the reader. type bufferedConn struct { - r *bufio.Reader net.Conn + buf *bytes.Buffer } -func newBufferedConn(c net.Conn) bufferedConn { - return bufferedConn{bufio.NewReader(c), c} +var _ net.Conn = &bufferedConn{} + +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, } -func (b bufferedConn) Peek(n int) ([]byte, error) { - return b.r.Peek(n) +func newBufferedConn(c net.Conn) bufferedConn { + return bufferedConn{ + Conn: c, + buf: bufferPool.Get().(*bytes.Buffer), + } } func (b bufferedConn) Read(p []byte) (int, error) { - return b.r.Read(p) + if b.buf.Len() > 0 { + n, err := b.buf.Read(p) + if err == io.EOF { + // Don't return EOF yet. More readers remain. + return n, nil + } + return n, err + } + // return the buffer to the pool + b.buf.Reset() + bufferPool.Put(b.buf) + return b.Conn.Read(p) } -func isHTTP2Connection(r io.Reader) bool { +func (b bufferedConn) Close() error { + // In case we didn't have time to return the buffer to the pool + if b.buf != nil { + // return the buffer to the pool + b.buf.Reset() + bufferPool.Put(b.buf) + } + return b.Conn.Close() +} + +func (b bufferedConn) sniffReader() io.Reader { + return io.TeeReader(b.Conn, b.buf) +} + +func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { // check the http2 client preface buf := make([]byte, len(http2.ClientPreface)) n, err := r.Read(buf) if err != nil || n != len(http2.ClientPreface) { return false } - return bytes.Equal(buf, []byte(http2.ClientPreface)) -} -func isGRPCConnection(lg *zap.Logger, r io.Reader) bool { + if !bytes.Equal(buf, []byte(http2.ClientPreface)) { + lg.Debug("not found http2 client preface", zap.String("preface", string(buf))) + return false + } + lg.Debug("found http2 client preface") + + // identify GRPC connections matching match headers names or values defined + // on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md done := false isGRPC := false - // check if len of the settings frame is 0 or the headers with the "content-type" - // indicates is an grpc connection. - framer := http2.NewFramer(io.Discard, r) + framer := http2.NewFramer(w, r) // use the default value for the maxDynamixTable size // https://pkg.go.dev/golang.org/x/net/http2 // "If zero, the default value of 4096 is used." hdec := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { - // match headers names or values based on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md - if strings.Contains(hf.Name, "grpc") { - isGRPC = true - done = true - lg.Debug("found grpc header", zap.String("header", hf.Name)) - return - } + // If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of 415 (Unsupported Media Type). + // This will prevent other HTTP/2 clients from interpreting a gRPC error response, which uses status 200 (OK), as successful. + lg.Debug("found header", zap.String("name", hf.Name), zap.String("value", hf.Value)) if hf.Name == "content-type" { - isGRPC = strings.Contains(hf.Value, "application/grpc") - lg.Debug("found content-type header", zap.String("content-type", hf.Value)) + isGRPC = strings.HasPrefix(hf.Value, "application/grpc") } done = true }) - for { + err = framer.WriteSettingsAck() + if err != nil { + lg.Debug("error acking setting frame", zap.Error(err)) + return false + } + lg.Debug("ack settings") + for !done { f, err := framer.ReadFrame() if err != nil { - break + lg.Debug("error reading frame", zap.Error(err)) + return false } switch f := f.(type) { + // The SETTINGS frames received from a peer as part of the connection + // preface MUST be acknowledged (see Section 6.5.3) after sending the + // connection preface. case *http2.SettingsFrame: - // Observed behavior is that etcd GRPC clients sends an empty setting frame - // and block waiting for an answer. - if f.Length == 0 { - isGRPC = true - done = true - lg.Debug("found setting frame with zero length") + lg.Debug("found setting frame") + if f.IsAck() { + continue } + case *http2.HeadersFrame: hdec.Write(f.HeaderBlockFragment()) } - if done { - break - } } return isGRPC } From 6a5e8d4da1b47b9bbe6eed8575c94f70d73d0edd Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 29 Mar 2023 20:41:01 +0000 Subject: [PATCH 14/15] progress on heuristics grpc clients block waiting for the settings frame. http2 fails if we write the settings frame we have to detect that the client didn't send more than 33 bytes: 24 preface + 9 settings (this seems to be optional) to write the settings frame. Signed-off-by: Antonio Ojea --- pkg/connmux/connmux.go | 104 ++++++++++++++++++++++-------------- pkg/connmux/connmux_test.go | 70 +++++++++++++----------- 2 files changed, 105 insertions(+), 69 deletions(-) diff --git a/pkg/connmux/connmux.go b/pkg/connmux/connmux.go index ff5774f0223d..ba08f5669f9f 100644 --- a/pkg/connmux/connmux.go +++ b/pkg/connmux/connmux.go @@ -84,7 +84,7 @@ func (c *ConnMux) Serve() error { for { conn, err := c.root.Accept() if err != nil { - c.lg.Error("connection error", zap.Error(err)) + c.lg.Error("connection accept error", zap.Error(err)) return c.Close() } go c.serve(conn) @@ -96,17 +96,18 @@ func (c *ConnMux) serve(conn net.Conn) { conn.SetDeadline(time.Now().Add(readDeadlineTimeout)) defer conn.SetReadDeadline(time.Time{}) - buf := make([]byte, 1024) + buf := make([]byte, 512) buffConn := newBufferedConn(conn) // Check if is TLS or plain TCP - _, err := buffConn.sniffReader().Read(buf) + n, err := buffConn.sniffReader().Read(buf) if err != nil && err != io.EOF { // exit since it will panic trying to detect if is TLS c.lg.Error("error reading from the connection", zap.Error(err)) conn.Close() return } - c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(buf)), zap.String("buffer content", string(buf[0:]))) + r := bytes.NewReader(buf[:n]) + c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("read size", n), zap.String("buffer content", string(buf[:n]))) // Check if is TLS or plain TCP isTLS := buf[0] == 0x16 if isTLS { @@ -133,7 +134,7 @@ func (c *ConnMux) serve(conn net.Conn) { conn.Close() return } - isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(bytes.NewReader(buf), buffConn.sniffReader())) + isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(r, buffConn.sniffReader())) c.forward(isGRPC, buffConn) } @@ -152,7 +153,7 @@ func (c *ConnMux) forward(isGRPC bool, conn net.Conn) { return } - if c.grpc != nil { + if isGRPC && c.grpc != nil { c.lg.Debug("forwarding connection to the GRPC backend", zap.String("remote address", conn.RemoteAddr().String())) select { case c.grpc.connc <- conn: @@ -227,6 +228,7 @@ func (c *ConnMux) GRPCListener() net.Listener { // bufferedConn allows to peek in the buffer of the connection // without advancing the reader. type bufferedConn struct { + mu sync.Mutex net.Conn buf *bytes.Buffer } @@ -239,61 +241,83 @@ var bufferPool = sync.Pool{ }, } -func newBufferedConn(c net.Conn) bufferedConn { - return bufferedConn{ +func newBufferedConn(c net.Conn) *bufferedConn { + return &bufferedConn{ Conn: c, buf: bufferPool.Get().(*bytes.Buffer), } } -func (b bufferedConn) Read(p []byte) (int, error) { - if b.buf.Len() > 0 { - n, err := b.buf.Read(p) - if err == io.EOF { - // Don't return EOF yet. More readers remain. - return n, nil - } - return n, err - } - // return the buffer to the pool - b.buf.Reset() - bufferPool.Put(b.buf) - return b.Conn.Read(p) +func (b *bufferedConn) Read(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return io.MultiReader(b.buf, b.Conn).Read(p) } -func (b bufferedConn) Close() error { - // In case we didn't have time to return the buffer to the pool - if b.buf != nil { - // return the buffer to the pool +func (b *bufferedConn) Close() error { + // return the buffer to the pool + defer func() { + b.mu.Lock() + defer b.mu.Unlock() b.buf.Reset() bufferPool.Put(b.buf) - } + }() return b.Conn.Close() } -func (b bufferedConn) sniffReader() io.Reader { +func (b *bufferedConn) sniffReader() io.Reader { return io.TeeReader(b.Conn, b.buf) } func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { // check the http2 client preface - buf := make([]byte, len(http2.ClientPreface)) + buf := make([]byte, 512) n, err := r.Read(buf) - if err != nil || n != len(http2.ClientPreface) { + if err != nil || n < len(http2.ClientPreface) { return false } - if !bytes.Equal(buf, []byte(http2.ClientPreface)) { + if !bytes.Equal(buf[:len(http2.ClientPreface)], []byte(http2.ClientPreface)) { lg.Debug("not found http2 client preface", zap.String("preface", string(buf))) return false } - lg.Debug("found http2 client preface") + lg.Debug("found http2 client preface", zap.Int("bytes read", n)) + + reader := r + if n > 33 { + reader = io.MultiReader(bytes.NewReader(buf[len(http2.ClientPreface):]), r) + } + framer := http2.NewFramer(w, reader) + // GRPC blocks until receive the Settings frame + // This means we should have the preface 24 + setting frame 9 + // HTTP2 fails if we write it before forwarding + if n <= 33 { + lg.Debug("write http2 settings") + err = framer.WriteSettings() + if err != nil { + lg.Debug("error sending setting frame", zap.Error(err)) + return false + } + } + // The server connection preface consists of a potentially empty + // SETTINGS frame (Section 6.5) that MUST be the first frame the server + // sends in the HTTP/2 connection. + f, err := framer.ReadFrame() + if err != nil { + lg.Debug("error reading frame", zap.Error(err)) + return false + } + // The SETTINGS frames received from a peer as part of the connection + // preface MUST be acknowledged (see Section 6.5.3) after sending the + // connection preface. + if _, ok := f.(*http2.SettingsFrame); !ok { + lg.Debug("expected setting frame") + } // identify GRPC connections matching match headers names or values defined // on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md done := false isGRPC := false - framer := http2.NewFramer(w, r) // use the default value for the maxDynamixTable size // https://pkg.go.dev/golang.org/x/net/http2 // "If zero, the default value of 4096 is used." @@ -306,12 +330,8 @@ func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { } done = true }) - err = framer.WriteSettingsAck() - if err != nil { - lg.Debug("error acking setting frame", zap.Error(err)) - return false - } - lg.Debug("ack settings") + + lg.Debug("reading frames") for !done { f, err := framer.ReadFrame() if err != nil { @@ -323,10 +343,16 @@ func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { // preface MUST be acknowledged (see Section 6.5.3) after sending the // connection preface. case *http2.SettingsFrame: - lg.Debug("found setting frame") if f.IsAck() { + lg.Debug("found setting ACK frame") continue } + lg.Debug("found setting frame") + err := framer.WriteSettingsAck() + if err != nil { + lg.Debug("error writing settings frame", zap.Error(err)) + return false + } case *http2.HeadersFrame: hdec.Write(f.HeaderBlockFragment()) diff --git a/pkg/connmux/connmux_test.go b/pkg/connmux/connmux_test.go index 468d9ee8acce..fb0946f0659d 100644 --- a/pkg/connmux/connmux_test.go +++ b/pkg/connmux/connmux_test.go @@ -78,6 +78,7 @@ func TestConnMux(t *testing.T) { } http2.ConfigureServer(httpServer, h2s) go httpServer.Serve(httpL) + defer httpServer.Close() // GRPC Server grpcServer := grpc.NewServer() @@ -94,45 +95,54 @@ func TestConnMux(t *testing.T) { destHTTPS := url.URL{Scheme: "https", Host: destHost} // HTTP plain - t.Logf("Test HTTP1 without TLS works") - client := &http.Client{} - verifyHTTPRequest(t, client, destHTTP.String()) + t.Run("Test HTTP1 without TLS works", func(t *testing.T) { + //t.Skip() + client := &http.Client{} + verifyHTTPRequest(t, client, destHTTP.String()) + }) // HTTPS - t.Logf("Test HTTP1 with TLS works") - client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }} - verifyHTTPRequest(t, client, destHTTPS.String()) + t.Run("Test HTTP1 with TLS works", func(t *testing.T) { + client := &http.Client{} + client.Transport = &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }} + verifyHTTPRequest(t, client, destHTTPS.String()) + }) // HTTP2 plain - t.Logf("Test HTTP2 without TLS works") - client.Transport = &http2.Transport{ - AllowHTTP: true, - DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { - return net.Dial(netw, addr) - }} - verifyHTTPRequest(t, client, destHTTP.String()) + t.Run("Test HTTP2 without TLS works", func(t *testing.T) { + client := &http.Client{} + client.Transport = &http2.Transport{ + AllowHTTP: true, + DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(netw, addr) + }} + verifyHTTPRequest(t, client, destHTTP.String()) + }) // HTTP2 TLS - t.Logf("Test HTTP2 with TLS works") - client.Transport = &http2.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - NextProtos: []string{http2.NextProtoTLS}, - }, - } - verifyHTTPRequest(t, client, destHTTPS.String()) + t.Run("Test HTTP2 with TLS works", func(t *testing.T) { + client := &http.Client{} + client.Transport = &http2.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{http2.NextProtoTLS}, + }, + } + verifyHTTPRequest(t, client, destHTTPS.String()) + }) // Set up a connection to the server. - t.Logf("Test GRPC works") - conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Errorf("did not connect: %v", err) - } - defer conn.Close() - verifyGRPCRequest(t, conn) + t.Run("Test GRPC with TLS works", func(t *testing.T) { + conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Errorf("did not connect: %v", err) + } + defer conn.Close() + verifyGRPCRequest(t, conn) + }) } // TestConnMuxConcurrency tests that HTTP and GRPC connections work repet From 56ac233c71c387e34fa7934fe0ebfade897d8548 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sun, 30 Apr 2023 21:59:04 +0000 Subject: [PATCH 15/15] got it working, the problem was the connection header Signed-off-by: Antonio Ojea --- pkg/connmux/connmux.go | 102 +++++++++++++++--------------------- pkg/connmux/connmux_test.go | 4 +- 2 files changed, 44 insertions(+), 62 deletions(-) diff --git a/pkg/connmux/connmux.go b/pkg/connmux/connmux.go index ba08f5669f9f..e6d2a3e9769c 100644 --- a/pkg/connmux/connmux.go +++ b/pkg/connmux/connmux.go @@ -34,6 +34,8 @@ const ( gracefullShutdownDuration = 200 * time.Millisecond // readDeadlineTimeout is the maximum time to wait for a read operation readDeadlineTimeout = 1 * time.Second + // maxConnRead read ahead up to 512 bytes to identify if the connection is GRPC + maxConnRead = 512 ) // ConnMux can multiple multiple HTTP and GRPC connections in the same address and port. @@ -96,7 +98,7 @@ func (c *ConnMux) serve(conn net.Conn) { conn.SetDeadline(time.Now().Add(readDeadlineTimeout)) defer conn.SetReadDeadline(time.Time{}) - buf := make([]byte, 512) + buf := make([]byte, maxConnRead) buffConn := newBufferedConn(conn) // Check if is TLS or plain TCP n, err := buffConn.sniffReader().Read(buf) @@ -228,40 +230,38 @@ func (c *ConnMux) GRPCListener() net.Listener { // bufferedConn allows to peek in the buffer of the connection // without advancing the reader. type bufferedConn struct { - mu sync.Mutex net.Conn buf *bytes.Buffer } var _ net.Conn = &bufferedConn{} -var bufferPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} - func newBufferedConn(c net.Conn) *bufferedConn { return &bufferedConn{ Conn: c, - buf: bufferPool.Get().(*bytes.Buffer), + buf: new(bytes.Buffer), } } func (b *bufferedConn) Read(p []byte) (int, error) { - b.mu.Lock() - defer b.mu.Unlock() - return io.MultiReader(b.buf, b.Conn).Read(p) + if b.buf == nil { + return b.Conn.Read(p) + } + n := b.buf.Len() + if n == 0 { + b.buf = nil + return b.Conn.Read(p) + } + if n < len(p) { + p = p[:n] + } + return b.buf.Read(p) } func (b *bufferedConn) Close() error { - // return the buffer to the pool - defer func() { - b.mu.Lock() - defer b.mu.Unlock() - b.buf.Reset() - bufferPool.Put(b.buf) - }() + if b.buf != nil { + b.buf = nil + } return b.Conn.Close() } @@ -271,49 +271,19 @@ func (b *bufferedConn) sniffReader() io.Reader { func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { // check the http2 client preface - buf := make([]byte, 512) + buf := make([]byte, len(http2.ClientPreface)) n, err := r.Read(buf) if err != nil || n < len(http2.ClientPreface) { return false } - if !bytes.Equal(buf[:len(http2.ClientPreface)], []byte(http2.ClientPreface)) { + if !bytes.Equal(buf, []byte(http2.ClientPreface)) { lg.Debug("not found http2 client preface", zap.String("preface", string(buf))) return false } lg.Debug("found http2 client preface", zap.Int("bytes read", n)) - reader := r - if n > 33 { - reader = io.MultiReader(bytes.NewReader(buf[len(http2.ClientPreface):]), r) - } - framer := http2.NewFramer(w, reader) - // GRPC blocks until receive the Settings frame - // This means we should have the preface 24 + setting frame 9 - // HTTP2 fails if we write it before forwarding - if n <= 33 { - lg.Debug("write http2 settings") - err = framer.WriteSettings() - if err != nil { - lg.Debug("error sending setting frame", zap.Error(err)) - return false - } - } - // The server connection preface consists of a potentially empty - // SETTINGS frame (Section 6.5) that MUST be the first frame the server - // sends in the HTTP/2 connection. - f, err := framer.ReadFrame() - if err != nil { - lg.Debug("error reading frame", zap.Error(err)) - return false - } - // The SETTINGS frames received from a peer as part of the connection - // preface MUST be acknowledged (see Section 6.5.3) after sending the - // connection preface. - if _, ok := f.(*http2.SettingsFrame); !ok { - lg.Debug("expected setting frame") - } - + framer := http2.NewFramer(w, r) // identify GRPC connections matching match headers names or values defined // on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md done := false @@ -327,8 +297,12 @@ func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { lg.Debug("found header", zap.String("name", hf.Name), zap.String("value", hf.Value)) if hf.Name == "content-type" { isGRPC = strings.HasPrefix(hf.Value, "application/grpc") + done = true + } + if hf.Name == ":method" && hf.Value != "POST" { + isGRPC = false + done = true } - done = true }) lg.Debug("reading frames") @@ -347,15 +321,25 @@ func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool { lg.Debug("found setting ACK frame") continue } - lg.Debug("found setting frame") - err := framer.WriteSettingsAck() - if err != nil { - lg.Debug("error writing settings frame", zap.Error(err)) + // only write the settings frame if length is zero + // otherwise when passed to an http2 server with prior knowledge + // it will send a PROTOCOL_ERROR + if f.Length == 0 { + lg.Debug("write http2 settings") + err = framer.WriteSettings() + if err != nil { + lg.Debug("error sending setting frame", zap.Error(err)) + return false + } + } + case *http2.ContinuationFrame: + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { return false } - case *http2.HeadersFrame: - hdec.Write(f.HeaderBlockFragment()) + if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil { + return false + } } } return isGRPC diff --git a/pkg/connmux/connmux_test.go b/pkg/connmux/connmux_test.go index fb0946f0659d..88c22bfdb964 100644 --- a/pkg/connmux/connmux_test.go +++ b/pkg/connmux/connmux_test.go @@ -76,7 +76,6 @@ func TestConnMux(t *testing.T) { httpServer := &http.Server{ Handler: h2c.NewHandler(mux, h2s), } - http2.ConfigureServer(httpServer, h2s) go httpServer.Serve(httpL) defer httpServer.Close() @@ -158,7 +157,6 @@ func TestConnMuxConcurrency(t *testing.T) { httpServer := &http.Server{ Handler: h2c.NewHandler(mux, h2s), } - http2.ConfigureServer(httpServer, h2s) go httpServer.Serve(httpL) // GRPC Server @@ -247,7 +245,7 @@ func verifyHTTPRequest(t *testing.T, client *http.Client, destination string) { } response, err := client.Do(request) if err != nil { - t.Error(err) + t.Fatal(err) } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body)