Skip to content

Commit c76d75f

Browse files
authored
grpc: Move some stats handler calls to gRPC layer, and add local address to peer.Peer (#6716)
1 parent 6e14274 commit c76d75f

File tree

8 files changed

+135
-161
lines changed

8 files changed

+135
-161
lines changed

internal/transport/handler_server.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,25 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
7575
return nil, errors.New(msg)
7676
}
7777

78+
var localAddr net.Addr
79+
if la := r.Context().Value(http.LocalAddrContextKey); la != nil {
80+
localAddr, _ = la.(net.Addr)
81+
}
82+
var authInfo credentials.AuthInfo
83+
if r.TLS != nil {
84+
authInfo = credentials.TLSInfo{State: *r.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
85+
}
86+
p := peer.Peer{
87+
Addr: strAddr(r.RemoteAddr),
88+
LocalAddr: localAddr,
89+
AuthInfo: authInfo,
90+
}
7891
st := &serverHandlerTransport{
7992
rw: w,
8093
req: r,
8194
closedCh: make(chan struct{}),
8295
writes: make(chan func()),
96+
peer: p,
8397
contentType: contentType,
8498
contentSubtype: contentSubtype,
8599
stats: stats,
@@ -134,6 +148,8 @@ type serverHandlerTransport struct {
134148

135149
headerMD metadata.MD
136150

151+
peer peer.Peer
152+
137153
closeOnce sync.Once
138154
closedCh chan struct{} // closed on Close
139155

@@ -165,7 +181,13 @@ func (ht *serverHandlerTransport) Close(err error) {
165181
})
166182
}
167183

168-
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
184+
func (ht *serverHandlerTransport) Peer() *peer.Peer {
185+
return &peer.Peer{
186+
Addr: ht.peer.Addr,
187+
LocalAddr: ht.peer.LocalAddr,
188+
AuthInfo: ht.peer.AuthInfo,
189+
}
190+
}
169191

170192
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
171193
// the empty string if unknown.
@@ -347,10 +369,8 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
347369
return err
348370
}
349371

350-
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
372+
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
351373
// With this transport type there will be exactly 1 stream: this HTTP request.
352-
353-
ctx := ht.req.Context()
354374
var cancel context.CancelFunc
355375
if ht.timeoutSet {
356376
ctx, cancel = context.WithTimeout(ctx, ht.timeout)
@@ -370,34 +390,19 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
370390
ht.Close(errors.New("request is done processing"))
371391
}()
372392

393+
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
373394
req := ht.req
374-
375395
s := &Stream{
376-
id: 0, // irrelevant
377-
requestRead: func(int) {},
378-
cancel: cancel,
379-
buf: newRecvBuffer(),
380-
st: ht,
381-
method: req.URL.Path,
382-
recvCompress: req.Header.Get("grpc-encoding"),
383-
contentSubtype: ht.contentSubtype,
384-
}
385-
pr := &peer.Peer{
386-
Addr: ht.RemoteAddr(),
387-
}
388-
if req.TLS != nil {
389-
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
390-
}
391-
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
392-
s.ctx = peer.NewContext(ctx, pr)
393-
for _, sh := range ht.stats {
394-
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
395-
inHeader := &stats.InHeader{
396-
FullMethod: s.method,
397-
RemoteAddr: ht.RemoteAddr(),
398-
Compression: s.recvCompress,
399-
}
400-
sh.HandleRPC(s.ctx, inHeader)
396+
id: 0, // irrelevant
397+
ctx: ctx,
398+
requestRead: func(int) {},
399+
cancel: cancel,
400+
buf: newRecvBuffer(),
401+
st: ht,
402+
method: req.URL.Path,
403+
recvCompress: req.Header.Get("grpc-encoding"),
404+
contentSubtype: ht.contentSubtype,
405+
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
401406
}
402407
s.trReader = &transportReader{
403408
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},

internal/transport/handler_server_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func (s) TestHandlerTransport_HandleStreams(t *testing.T) {
314314
st.ht.WriteStatus(s, status.New(codes.OK, ""))
315315
}
316316
st.ht.HandleStreams(
317-
func(s *Stream) { go handleStream(s) },
317+
context.Background(), func(s *Stream) { go handleStream(s) },
318318
)
319319
wantHeader := http.Header{
320320
"Date": nil,
@@ -347,7 +347,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
347347
st.ht.WriteStatus(s, status.New(statusCode, msg))
348348
}
349349
st.ht.HandleStreams(
350-
func(s *Stream) { go handleStream(s) },
350+
context.Background(), func(s *Stream) { go handleStream(s) },
351351
)
352352
wantHeader := http.Header{
353353
"Date": nil,
@@ -396,7 +396,7 @@ func (s) TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
396396
ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
397397
}
398398
ht.HandleStreams(
399-
func(s *Stream) { go runStream(s) },
399+
context.Background(), func(s *Stream) { go runStream(s) },
400400
)
401401
wantHeader := http.Header{
402402
"Date": nil,
@@ -448,7 +448,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
448448
func testHandlerTransportHandleStreams(t *testing.T, handleStream func(st *handleStreamTest, s *Stream)) {
449449
st := newHandleStreamTest(t)
450450
st.ht.HandleStreams(
451-
func(s *Stream) { go handleStream(st, s) },
451+
context.Background(), func(s *Stream) { go handleStream(st, s) },
452452
)
453453
}
454454

@@ -481,7 +481,7 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
481481
hst.ht.WriteStatus(s, st)
482482
}
483483
hst.ht.HandleStreams(
484-
func(s *Stream) { go handleStream(s) },
484+
context.Background(), func(s *Stream) { go handleStream(s) },
485485
)
486486
wantHeader := http.Header{
487487
"Date": nil,

internal/transport/http2_server.go

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,12 @@ var serverConnectionCounter uint64
6969
// http2Server implements the ServerTransport interface with HTTP2.
7070
type http2Server struct {
7171
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
72-
ctx context.Context
7372
done chan struct{}
7473
conn net.Conn
7574
loopy *loopyWriter
7675
readerDone chan struct{} // sync point to enable testing.
7776
writerDone chan struct{} // sync point to enable testing.
78-
remoteAddr net.Addr
79-
localAddr net.Addr
80-
authInfo credentials.AuthInfo // auth info about the connection
77+
peer peer.Peer
8178
inTapHandle tap.ServerInHandle
8279
framer *framer
8380
// The max number of concurrent streams.
@@ -243,13 +240,15 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
243240
}
244241

245242
done := make(chan struct{})
243+
peer := peer.Peer{
244+
Addr: conn.RemoteAddr(),
245+
LocalAddr: conn.LocalAddr(),
246+
AuthInfo: authInfo,
247+
}
246248
t := &http2Server{
247-
ctx: setConnection(context.Background(), rawConn),
248249
done: done,
249250
conn: conn,
250-
remoteAddr: conn.RemoteAddr(),
251-
localAddr: conn.LocalAddr(),
252-
authInfo: authInfo,
251+
peer: peer,
253252
framer: framer,
254253
readerDone: make(chan struct{}),
255254
writerDone: make(chan struct{}),
@@ -267,8 +266,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
267266
bufferPool: newBufferPool(),
268267
}
269268
t.logger = prefixLoggerForServerTransport(t)
270-
// Add peer information to the http2server context.
271-
t.ctx = peer.NewContext(t.ctx, t.getPeer())
272269

273270
t.controlBuf = newControlBuffer(t.done)
274271
if dynamicWindow {
@@ -277,15 +274,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
277274
updateFlowControl: t.updateFlowControl,
278275
}
279276
}
280-
for _, sh := range t.stats {
281-
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
282-
RemoteAddr: t.remoteAddr,
283-
LocalAddr: t.localAddr,
284-
})
285-
connBegin := &stats.ConnBegin{}
286-
sh.HandleConn(t.ctx, connBegin)
287-
}
288-
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
277+
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))
289278
if err != nil {
290279
return nil, err
291280
}
@@ -342,7 +331,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
342331

343332
// operateHeaders takes action on the decoded headers. Returns an error if fatal
344333
// error encountered and transport needs to close, otherwise returns nil.
345-
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
334+
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
346335
// Acquire max stream ID lock for entire duration
347336
t.maxStreamMu.Lock()
348337
defer t.maxStreamMu.Unlock()
@@ -369,10 +358,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
369358

370359
buf := newRecvBuffer()
371360
s := &Stream{
372-
id: streamID,
373-
st: t,
374-
buf: buf,
375-
fc: &inFlow{limit: uint32(t.initialWindowSize)},
361+
id: streamID,
362+
st: t,
363+
buf: buf,
364+
fc: &inFlow{limit: uint32(t.initialWindowSize)},
365+
headerWireLength: int(frame.Header().Length),
376366
}
377367
var (
378368
// if false, content-type was missing or invalid
@@ -511,9 +501,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
511501
s.state = streamReadDone
512502
}
513503
if timeoutSet {
514-
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
504+
s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
515505
} else {
516-
s.ctx, s.cancel = context.WithCancel(t.ctx)
506+
s.ctx, s.cancel = context.WithCancel(ctx)
517507
}
518508

519509
// Attach the received metadata to the context.
@@ -592,18 +582,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
592582
s.requestRead = func(n int) {
593583
t.adjustWindow(s, uint32(n))
594584
}
595-
for _, sh := range t.stats {
596-
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
597-
inHeader := &stats.InHeader{
598-
FullMethod: s.method,
599-
RemoteAddr: t.remoteAddr,
600-
LocalAddr: t.localAddr,
601-
Compression: s.recvCompress,
602-
WireLength: int(frame.Header().Length),
603-
Header: mdata.Copy(),
604-
}
605-
sh.HandleRPC(s.ctx, inHeader)
606-
}
607585
s.ctxDone = s.ctx.Done()
608586
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
609587
s.trReader = &transportReader{
@@ -629,7 +607,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
629607
// HandleStreams receives incoming streams using the given handler. This is
630608
// typically run in a separate goroutine.
631609
// traceCtx attaches trace to ctx and returns the new context.
632-
func (t *http2Server) HandleStreams(handle func(*Stream)) {
610+
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
633611
defer close(t.readerDone)
634612
for {
635613
t.controlBuf.throttle()
@@ -664,7 +642,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
664642
}
665643
switch frame := frame.(type) {
666644
case *http2.MetaHeadersFrame:
667-
if err := t.operateHeaders(frame, handle); err != nil {
645+
if err := t.operateHeaders(ctx, frame, handle); err != nil {
668646
t.Close(err)
669647
break
670648
}
@@ -1242,10 +1220,6 @@ func (t *http2Server) Close(err error) {
12421220
for _, s := range streams {
12431221
s.cancel()
12441222
}
1245-
for _, sh := range t.stats {
1246-
connEnd := &stats.ConnEnd{}
1247-
sh.HandleConn(t.ctx, connEnd)
1248-
}
12491223
}
12501224

12511225
// deleteStream deletes the stream s from transport's active streams.
@@ -1311,10 +1285,6 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
13111285
})
13121286
}
13131287

1314-
func (t *http2Server) RemoteAddr() net.Addr {
1315-
return t.remoteAddr
1316-
}
1317-
13181288
func (t *http2Server) Drain(debugData string) {
13191289
t.mu.Lock()
13201290
defer t.mu.Unlock()
@@ -1397,11 +1367,11 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
13971367
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
13981368
LocalFlowControlWindow: int64(t.fc.getSize()),
13991369
SocketOptions: channelz.GetSocketOption(t.conn),
1400-
LocalAddr: t.localAddr,
1401-
RemoteAddr: t.remoteAddr,
1370+
LocalAddr: t.peer.LocalAddr,
1371+
RemoteAddr: t.peer.Addr,
14021372
// RemoteName :
14031373
}
1404-
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1374+
if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {
14051375
s.Security = au.GetSecurityValue()
14061376
}
14071377
s.RemoteFlowControlWindow = t.getOutFlowWindow()
@@ -1433,10 +1403,12 @@ func (t *http2Server) getOutFlowWindow() int64 {
14331403
}
14341404
}
14351405

1436-
func (t *http2Server) getPeer() *peer.Peer {
1406+
// Peer returns the peer of the transport.
1407+
func (t *http2Server) Peer() *peer.Peer {
14371408
return &peer.Peer{
1438-
Addr: t.remoteAddr,
1439-
AuthInfo: t.authInfo, // Can be nil
1409+
Addr: t.peer.Addr,
1410+
LocalAddr: t.peer.LocalAddr,
1411+
AuthInfo: t.peer.AuthInfo, // Can be nil
14401412
}
14411413
}
14421414

@@ -1461,6 +1433,6 @@ func GetConnection(ctx context.Context) net.Conn {
14611433
// SetConnection adds the connection to the context to be able to get
14621434
// information about the destination ip and port for an incoming RPC. This also
14631435
// allows any unary or streaming interceptors to see the connection.
1464-
func setConnection(ctx context.Context, conn net.Conn) context.Context {
1436+
func SetConnection(ctx context.Context, conn net.Conn) context.Context {
14651437
return context.WithValue(ctx, connectionKey{}, conn)
14661438
}

0 commit comments

Comments
 (0)