Skip to content

Commit 814e9a5

Browse files
committed
progress on heuristics
grpc clients block waiting for the settings frame. http2 fails if we write the settings frame we have to detect that the client didn't send more than 33 bytes: 24 preface + 9 settings (this seems to be optional) to write the settings frame.
1 parent ae72bee commit 814e9a5

2 files changed

Lines changed: 105 additions & 69 deletions

File tree

pkg/connmux/connmux.go

Lines changed: 65 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (c *ConnMux) Serve() error {
8484
for {
8585
conn, err := c.root.Accept()
8686
if err != nil {
87-
c.lg.Error("connection error", zap.Error(err))
87+
c.lg.Error("connection accept error", zap.Error(err))
8888
return c.Close()
8989
}
9090
go c.serve(conn)
@@ -96,17 +96,18 @@ func (c *ConnMux) serve(conn net.Conn) {
9696
conn.SetDeadline(time.Now().Add(readDeadlineTimeout))
9797
defer conn.SetReadDeadline(time.Time{})
9898

99-
buf := make([]byte, 1024)
99+
buf := make([]byte, 512)
100100
buffConn := newBufferedConn(conn)
101101
// Check if is TLS or plain TCP
102-
_, err := buffConn.sniffReader().Read(buf)
102+
n, err := buffConn.sniffReader().Read(buf)
103103
if err != nil && err != io.EOF {
104104
// exit since it will panic trying to detect if is TLS
105105
c.lg.Error("error reading from the connection", zap.Error(err))
106106
conn.Close()
107107
return
108108
}
109-
c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("buffer size", len(buf)), zap.String("buffer content", string(buf[0:])))
109+
r := bytes.NewReader(buf[:n])
110+
c.lg.Debug("connection received", zap.String("remote address", conn.RemoteAddr().String()), zap.Int("read size", n), zap.String("buffer content", string(buf[:n])))
110111
// Check if is TLS or plain TCP
111112
isTLS := buf[0] == 0x16
112113
if isTLS {
@@ -133,7 +134,7 @@ func (c *ConnMux) serve(conn net.Conn) {
133134
conn.Close()
134135
return
135136
}
136-
isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(bytes.NewReader(buf), buffConn.sniffReader()))
137+
isGRPC := isGRPCConnection(c.lg, buffConn, io.MultiReader(r, buffConn.sniffReader()))
137138
c.forward(isGRPC, buffConn)
138139
}
139140

@@ -152,7 +153,7 @@ func (c *ConnMux) forward(isGRPC bool, conn net.Conn) {
152153
return
153154
}
154155

155-
if c.grpc != nil {
156+
if isGRPC && c.grpc != nil {
156157
c.lg.Debug("forwarding connection to the GRPC backend", zap.String("remote address", conn.RemoteAddr().String()))
157158
select {
158159
case c.grpc.connc <- conn:
@@ -227,6 +228,7 @@ func (c *ConnMux) GRPCListener() net.Listener {
227228
// bufferedConn allows to peek in the buffer of the connection
228229
// without advancing the reader.
229230
type bufferedConn struct {
231+
mu sync.Mutex
230232
net.Conn
231233
buf *bytes.Buffer
232234
}
@@ -239,61 +241,83 @@ var bufferPool = sync.Pool{
239241
},
240242
}
241243

242-
func newBufferedConn(c net.Conn) bufferedConn {
243-
return bufferedConn{
244+
func newBufferedConn(c net.Conn) *bufferedConn {
245+
return &bufferedConn{
244246
Conn: c,
245247
buf: bufferPool.Get().(*bytes.Buffer),
246248
}
247249
}
248250

249-
func (b bufferedConn) Read(p []byte) (int, error) {
250-
if b.buf.Len() > 0 {
251-
n, err := b.buf.Read(p)
252-
if err == io.EOF {
253-
// Don't return EOF yet. More readers remain.
254-
return n, nil
255-
}
256-
return n, err
257-
}
258-
// return the buffer to the pool
259-
b.buf.Reset()
260-
bufferPool.Put(b.buf)
261-
return b.Conn.Read(p)
251+
func (b *bufferedConn) Read(p []byte) (int, error) {
252+
b.mu.Lock()
253+
defer b.mu.Unlock()
254+
return io.MultiReader(b.buf, b.Conn).Read(p)
262255
}
263256

264-
func (b bufferedConn) Close() error {
265-
// In case we didn't have time to return the buffer to the pool
266-
if b.buf != nil {
267-
// return the buffer to the pool
257+
func (b *bufferedConn) Close() error {
258+
// return the buffer to the pool
259+
defer func() {
260+
b.mu.Lock()
261+
defer b.mu.Unlock()
268262
b.buf.Reset()
269263
bufferPool.Put(b.buf)
270-
}
264+
}()
271265
return b.Conn.Close()
272266
}
273267

274-
func (b bufferedConn) sniffReader() io.Reader {
268+
func (b *bufferedConn) sniffReader() io.Reader {
275269
return io.TeeReader(b.Conn, b.buf)
276270
}
277271

278272
func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool {
279273
// check the http2 client preface
280-
buf := make([]byte, len(http2.ClientPreface))
274+
buf := make([]byte, 512)
281275
n, err := r.Read(buf)
282-
if err != nil || n != len(http2.ClientPreface) {
276+
if err != nil || n < len(http2.ClientPreface) {
283277
return false
284278
}
285279

286-
if !bytes.Equal(buf, []byte(http2.ClientPreface)) {
280+
if !bytes.Equal(buf[:len(http2.ClientPreface)], []byte(http2.ClientPreface)) {
287281
lg.Debug("not found http2 client preface", zap.String("preface", string(buf)))
288282
return false
289283
}
290-
lg.Debug("found http2 client preface")
284+
lg.Debug("found http2 client preface", zap.Int("bytes read", n))
285+
286+
reader := r
287+
if n > 33 {
288+
reader = io.MultiReader(bytes.NewReader(buf[len(http2.ClientPreface):]), r)
289+
}
290+
framer := http2.NewFramer(w, reader)
291+
// GRPC blocks until receive the Settings frame
292+
// This means we should have the preface 24 + setting frame 9
293+
// HTTP2 fails if we write it before forwarding
294+
if n <= 33 {
295+
lg.Debug("write http2 settings")
296+
err = framer.WriteSettings()
297+
if err != nil {
298+
lg.Debug("error sending setting frame", zap.Error(err))
299+
return false
300+
}
301+
}
302+
// The server connection preface consists of a potentially empty
303+
// SETTINGS frame (Section 6.5) that MUST be the first frame the server
304+
// sends in the HTTP/2 connection.
305+
f, err := framer.ReadFrame()
306+
if err != nil {
307+
lg.Debug("error reading frame", zap.Error(err))
308+
return false
309+
}
310+
// The SETTINGS frames received from a peer as part of the connection
311+
// preface MUST be acknowledged (see Section 6.5.3) after sending the
312+
// connection preface.
313+
if _, ok := f.(*http2.SettingsFrame); !ok {
314+
lg.Debug("expected setting frame")
315+
}
291316

292317
// identify GRPC connections matching match headers names or values defined
293318
// on https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
294319
done := false
295320
isGRPC := false
296-
framer := http2.NewFramer(w, r)
297321
// use the default value for the maxDynamixTable size
298322
// https://pkg.go.dev/golang.org/x/net/http2
299323
// "If zero, the default value of 4096 is used."
@@ -306,12 +330,8 @@ func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool {
306330
}
307331
done = true
308332
})
309-
err = framer.WriteSettingsAck()
310-
if err != nil {
311-
lg.Debug("error acking setting frame", zap.Error(err))
312-
return false
313-
}
314-
lg.Debug("ack settings")
333+
334+
lg.Debug("reading frames")
315335
for !done {
316336
f, err := framer.ReadFrame()
317337
if err != nil {
@@ -323,10 +343,16 @@ func isGRPCConnection(lg *zap.Logger, w io.Writer, r io.Reader) bool {
323343
// preface MUST be acknowledged (see Section 6.5.3) after sending the
324344
// connection preface.
325345
case *http2.SettingsFrame:
326-
lg.Debug("found setting frame")
327346
if f.IsAck() {
347+
lg.Debug("found setting ACK frame")
328348
continue
329349
}
350+
lg.Debug("found setting frame")
351+
err := framer.WriteSettingsAck()
352+
if err != nil {
353+
lg.Debug("error writing settings frame", zap.Error(err))
354+
return false
355+
}
330356

331357
case *http2.HeadersFrame:
332358
hdec.Write(f.HeaderBlockFragment())

pkg/connmux/connmux_test.go

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func TestConnMux(t *testing.T) {
7878
}
7979
http2.ConfigureServer(httpServer, h2s)
8080
go httpServer.Serve(httpL)
81+
defer httpServer.Close()
8182

8283
// GRPC Server
8384
grpcServer := grpc.NewServer()
@@ -94,45 +95,54 @@ func TestConnMux(t *testing.T) {
9495
destHTTPS := url.URL{Scheme: "https", Host: destHost}
9596

9697
// HTTP plain
97-
t.Logf("Test HTTP1 without TLS works")
98-
client := &http.Client{}
99-
verifyHTTPRequest(t, client, destHTTP.String())
98+
t.Run("Test HTTP1 without TLS works", func(t *testing.T) {
99+
//t.Skip()
100+
client := &http.Client{}
101+
verifyHTTPRequest(t, client, destHTTP.String())
102+
})
100103

101104
// HTTPS
102-
t.Logf("Test HTTP1 with TLS works")
103-
client.Transport = &http.Transport{TLSClientConfig: &tls.Config{
104-
InsecureSkipVerify: true,
105-
}}
106-
verifyHTTPRequest(t, client, destHTTPS.String())
105+
t.Run("Test HTTP1 with TLS works", func(t *testing.T) {
106+
client := &http.Client{}
107+
client.Transport = &http.Transport{TLSClientConfig: &tls.Config{
108+
InsecureSkipVerify: true,
109+
}}
110+
verifyHTTPRequest(t, client, destHTTPS.String())
111+
})
107112

108113
// HTTP2 plain
109-
t.Logf("Test HTTP2 without TLS works")
110-
client.Transport = &http2.Transport{
111-
AllowHTTP: true,
112-
DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
113-
return net.Dial(netw, addr)
114-
}}
115-
verifyHTTPRequest(t, client, destHTTP.String())
114+
t.Run("Test HTTP2 without TLS works", func(t *testing.T) {
115+
client := &http.Client{}
116+
client.Transport = &http2.Transport{
117+
AllowHTTP: true,
118+
DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
119+
return net.Dial(netw, addr)
120+
}}
121+
verifyHTTPRequest(t, client, destHTTP.String())
122+
})
116123

117124
// HTTP2 TLS
118-
t.Logf("Test HTTP2 with TLS works")
119-
client.Transport = &http2.Transport{
120-
TLSClientConfig: &tls.Config{
121-
InsecureSkipVerify: true,
122-
NextProtos: []string{http2.NextProtoTLS},
123-
},
124-
}
125-
verifyHTTPRequest(t, client, destHTTPS.String())
125+
t.Run("Test HTTP2 with TLS works", func(t *testing.T) {
126+
client := &http.Client{}
127+
client.Transport = &http2.Transport{
128+
TLSClientConfig: &tls.Config{
129+
InsecureSkipVerify: true,
130+
NextProtos: []string{http2.NextProtoTLS},
131+
},
132+
}
133+
verifyHTTPRequest(t, client, destHTTPS.String())
134+
})
126135

127136
// Set up a connection to the server.
128-
t.Logf("Test GRPC works")
129-
conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
130-
if err != nil {
131-
t.Errorf("did not connect: %v", err)
132-
}
133-
defer conn.Close()
134-
verifyGRPCRequest(t, conn)
135137

138+
t.Run("Test GRPC with TLS works", func(t *testing.T) {
139+
conn, err := grpc.Dial(destHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
140+
if err != nil {
141+
t.Errorf("did not connect: %v", err)
142+
}
143+
defer conn.Close()
144+
verifyGRPCRequest(t, conn)
145+
})
136146
}
137147

138148
// TestConnMuxConcurrency tests that HTTP and GRPC connections work repet

0 commit comments

Comments
 (0)