Skip to content

Commit ae72bee

Browse files
committed
WIP parse http2 connection headers
The only reliable method to detect a GRPC connection seems to be parsing the content-type header. We need to be able to establish the HTTP2 connection so the client sends us the headers.
1 parent b319e7a commit ae72bee

1 file changed

Lines changed: 79 additions & 78 deletions

File tree

pkg/connmux/connmux.go

Lines changed: 79 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package connmux
1616

1717
import (
18-
"bufio"
1918
"bytes"
2019
"crypto/tls"
2120
"io"
@@ -92,37 +91,24 @@ func (c *ConnMux) Serve() error {
9291
}
9392
}
9493

95-
// peekHTTP2PrefaceBytes define the bytes we need to peek to be able to differentiate between HTTP and GRPC.
96-
// Since GRPC uses HTTP2 we need to match the connection preface
97-
// https://httpwg.org/specs/rfc9113.html#preface (24 octects)
98-
// 3.5. HTTP/2 Connection Preface
99-
// The client connection preface starts with a sequence of 24 octets, which in hex notation is:
100-
// 0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a
101-
// That is, the connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n).
102-
// This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty.
103-
// 4.1. Frame Format
104-
// All frames begin with a fixed 9-octet header followed by a variable-length payload.
105-
// min peek size is 24 + 9 = 33
106-
const peekHTTP2PrefaceBytes = 33
107-
10894
func (c *ConnMux) serve(conn net.Conn) {
10995
// avoid to get blocked in any read operation
11096
conn.SetDeadline(time.Now().Add(readDeadlineTimeout))
11197
defer conn.SetReadDeadline(time.Time{})
11298

113-
var proxiedConn bufferedConn
114-
99+
buf := make([]byte, 1024)
115100
buffConn := newBufferedConn(conn)
116101
// Check if is TLS or plain TCP
117-
b, err := buffConn.Peek(peekHTTP2PrefaceBytes)
118-
if err != nil {
102+
_, err := buffConn.sniffReader().Read(buf)
103+
if err != nil && err != io.EOF {
119104
// exit since it will panic trying to detect if is TLS
120105
c.lg.Error("error reading from the connection", zap.Error(err))
121106
conn.Close()
122107
return
123108
}
124-
// is TLS
125-
isTLS := b[0] == 0x16
109+
c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(buf)), zap.String("buffer content", string(buf[0:])))
110+
// Check if is TLS or plain TCP
111+
isTLS := buf[0] == 0x16
126112
if isTLS {
127113
if !c.secure {
128114
c.lg.Error("secure connections not enabled")
@@ -137,40 +123,20 @@ func (c *ConnMux) serve(conn net.Conn) {
137123
conn.Close()
138124
return
139125
}
140-
141-
proxiedConn = newBufferedConn(tlsConn)
142126
// It is a "new" connection obtained after the handshake so we have to do another read
143-
// to get the new data, but be careful to not get blocked in the read if there are no enough data.
144-
_, err = proxiedConn.Peek(peekHTTP2PrefaceBytes)
145-
if err != nil {
146-
// try to see how far we go, it will be discarded by the server if we route it to the wrong one
147-
c.lg.Error("error reading from the TLS connection", zap.Error(err))
148-
}
127+
proxiedConn := newBufferedConn(tlsConn)
128+
isGRPC := isGRPCConnection(c.lg, proxiedConn, proxiedConn.sniffReader())
129+
c.forward(isGRPC, proxiedConn)
149130
} else {
150131
if !c.insecure {
151132
c.lg.Error("insecure connections not enabled")
152133
conn.Close()
153134
return
154135
}
155-
proxiedConn = buffConn
136+
isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(bytes.NewReader(buf), buffConn.sniffReader()))
137+
c.forward(isGRPC, buffConn)
156138
}
157139

158-
// read the whole buffer
159-
b, err = proxiedConn.Peek(proxiedConn.r.Buffered())
160-
if err != nil && err != io.EOF {
161-
c.lg.Error("error reading", zap.Error(err))
162-
conn.Close()
163-
return
164-
}
165-
c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(b)), zap.String("buffer content", string(b)))
166-
reader := bytes.NewReader(b)
167-
isHTTP2 := isHTTP2Connection(reader)
168-
// if is not http2 it is not grpc
169-
if !isHTTP2 {
170-
c.forward(false, proxiedConn)
171-
} else {
172-
c.forward(isGRPCConnection(c.lg, reader), proxiedConn)
173-
}
174140
}
175141

176142
func (c *ConnMux) forward(isGRPC bool, conn net.Conn) {
@@ -261,75 +227,110 @@ func (c *ConnMux) GRPCListener() net.Listener {
261227
// bufferedConn allows to peek in the buffer of the connection
262228
// without advancing the reader.
263229
type bufferedConn struct {
264-
r *bufio.Reader
265230
net.Conn
231+
buf *bytes.Buffer
266232
}
267233

268-
func newBufferedConn(c net.Conn) bufferedConn {
269-
return bufferedConn{bufio.NewReader(c), c}
234+
var _ net.Conn = &bufferedConn{}
235+
236+
var bufferPool = sync.Pool{
237+
New: func() interface{} {
238+
return new(bytes.Buffer)
239+
},
270240
}
271241

272-
func (b bufferedConn) Peek(n int) ([]byte, error) {
273-
return b.r.Peek(n)
242+
func newBufferedConn(c net.Conn) bufferedConn {
243+
return bufferedConn{
244+
Conn: c,
245+
buf: bufferPool.Get().(*bytes.Buffer),
246+
}
274247
}
275248

276249
func (b bufferedConn) Read(p []byte) (int, error) {
277-
return b.r.Read(p)
250+
if b.buf.Len() > 0 {
251+
n, err := b.buf.Read(p)
252+
if err == io.EOF {
253+
// Don't return EOF yet. More readers remain.
254+
return n, nil
255+
}
256+
return n, err
257+
}
258+
// return the buffer to the pool
259+
b.buf.Reset()
260+
bufferPool.Put(b.buf)
261+
return b.Conn.Read(p)
278262
}
279263

280-
func isHTTP2Connection(r io.Reader) bool {
264+
func (b bufferedConn) Close() error {
265+
// In case we didn't have time to return the buffer to the pool
266+
if b.buf != nil {
267+
// return the buffer to the pool
268+
b.buf.Reset()
269+
bufferPool.Put(b.buf)
270+
}
271+
return b.Conn.Close()
272+
}
273+
274+
func (b bufferedConn) sniffReader() io.Reader {
275+
return io.TeeReader(b.Conn, b.buf)
276+
}
277+
278+
func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool {
281279
// check the http2 client preface
282280
buf := make([]byte, len(http2.ClientPreface))
283281
n, err := r.Read(buf)
284282
if err != nil || n != len(http2.ClientPreface) {
285283
return false
286284
}
287-
return bytes.Equal(buf, []byte(http2.ClientPreface))
288-
}
289285

290-
func isGRPCConnection(lg *zap.Logger, r io.Reader) bool {
286+
if !bytes.Equal(buf, []byte(http2.ClientPreface)) {
287+
lg.Debug("not found http2 client preface", zap.String("preface", string(buf)))
288+
return false
289+
}
290+
lg.Debug("found http2 client preface")
291+
292+
// identify GRPC connections matching match headers names or values defined
293+
// on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
291294
done := false
292295
isGRPC := false
293-
// check if len of the settings frame is 0 or the headers with the "content-type"
294-
// indicates is an grpc connection.
295-
framer := http2.NewFramer(io.Discard, r)
296+
framer := http2.NewFramer(w, r)
296297
// use the default value for the maxDynamixTable size
297298
// https://pkg.go.dev/golang.org/x/net/http2
298299
// "If zero, the default value of 4096 is used."
299300
hdec := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {
300-
// match headers names or values based on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
301-
if strings.Contains(hf.Name, "grpc") {
302-
isGRPC = true
303-
done = true
304-
lg.Debug("found grpc header", zap.String("header", hf.Name))
305-
return
306-
}
301+
// If Content-Type does not begin with "application/grpc", gRPC servers SHOULD respond with HTTP status of 415 (Unsupported Media Type).
302+
// This will prevent other HTTP/2 clients from interpreting a gRPC error response, which uses status 200 (OK), as successful.
303+
lg.Debug("found header", zap.String("name", hf.Name), zap.String("value", hf.Value))
307304
if hf.Name == "content-type" {
308-
isGRPC = strings.Contains(hf.Value, "application/grpc")
309-
lg.Debug("found content-type header", zap.String("content-type", hf.Value))
305+
isGRPC = strings.HasPrefix(hf.Value, "application/grpc")
310306
}
311307
done = true
312308
})
313-
for {
309+
err = framer.WriteSettingsAck()
310+
if err != nil {
311+
lg.Debug("error acking setting frame", zap.Error(err))
312+
return false
313+
}
314+
lg.Debug("ack settings")
315+
for !done {
314316
f, err := framer.ReadFrame()
315317
if err != nil {
316-
break
318+
lg.Debug("error reading frame", zap.Error(err))
319+
return false
317320
}
318321
switch f := f.(type) {
322+
// The SETTINGS frames received from a peer as part of the connection
323+
// preface MUST be acknowledged (see Section 6.5.3) after sending the
324+
// connection preface.
319325
case *http2.SettingsFrame:
320-
// Observed behavior is that etcd GRPC clients sends an empty setting frame
321-
// and block waiting for an answer.
322-
if f.Length == 0 {
323-
isGRPC = true
324-
done = true
325-
lg.Debug("found setting frame with zero length")
326+
lg.Debug("found setting frame")
327+
if f.IsAck() {
328+
continue
326329
}
330+
327331
case *http2.HeadersFrame:
328332
hdec.Write(f.HeaderBlockFragment())
329333
}
330-
if done {
331-
break
332-
}
333334
}
334335
return isGRPC
335336
}

0 commit comments

Comments
 (0)