Skip to content

Commit 4275c5b

Browse files
authored
transport: Re-use slice buffer reader for a stream (#8360)
1 parent ec4810c commit 4275c5b

File tree

5 files changed

+61
-39
lines changed

5 files changed

+61
-39
lines changed

internal/transport/controlbuf.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,11 @@ type earlyAbortStream struct {
157157
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
158158

159159
type dataFrame struct {
160-
streamID uint32
161-
endStream bool
162-
h []byte
163-
reader mem.Reader
160+
streamID uint32
161+
endStream bool
162+
h []byte
163+
data mem.BufferSlice
164+
processing bool
164165
// onEachWrite is called every time
165166
// a part of data is written out.
166167
onEachWrite func()
@@ -245,6 +246,7 @@ type outStream struct {
245246
itl *itemList
246247
bytesOutStanding int
247248
wq *writeQuota
249+
reader mem.Reader
248250

249251
next *outStream
250252
prev *outStream
@@ -472,7 +474,9 @@ func (c *controlBuffer) finish() {
472474
v.onOrphaned(ErrConnClosing)
473475
}
474476
case *dataFrame:
475-
_ = v.reader.Close()
477+
if !v.processing {
478+
v.data.Free()
479+
}
476480
}
477481
}
478482

@@ -661,10 +665,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
661665

662666
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
663667
str := &outStream{
664-
id: h.streamID,
665-
state: empty,
666-
itl: &itemList{},
667-
wq: h.wq,
668+
id: h.streamID,
669+
state: empty,
670+
itl: &itemList{},
671+
wq: h.wq,
672+
reader: mem.BufferSlice{}.Reader(),
668673
}
669674
l.estdStreams[h.streamID] = str
670675
}
@@ -696,10 +701,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
696701
}
697702
// Case 2: Client wants to originate stream.
698703
str := &outStream{
699-
id: h.streamID,
700-
state: empty,
701-
itl: &itemList{},
702-
wq: h.wq,
704+
id: h.streamID,
705+
state: empty,
706+
itl: &itemList{},
707+
wq: h.wq,
708+
reader: mem.BufferSlice{}.Reader(),
703709
}
704710
return l.originateStream(str, h)
705711
}
@@ -801,10 +807,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
801807
// a RST_STREAM before stream initialization thus the stream might
802808
// not be established yet.
803809
delete(l.estdStreams, c.streamID)
810+
str.reader.Close()
804811
str.deleteSelf()
805812
for head := str.itl.dequeueAll(); head != nil; head = head.next {
806813
if df, ok := head.it.(*dataFrame); ok {
807-
_ = df.reader.Close()
814+
if !df.processing {
815+
df.data.Free()
816+
}
808817
}
809818
}
810819
}
@@ -939,21 +948,27 @@ func (l *loopyWriter) processData() (bool, error) {
939948
if str == nil {
940949
return true, nil
941950
}
951+
reader := str.reader
942952
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
953+
if !dataItem.processing {
954+
dataItem.processing = true
955+
str.reader.Reset(dataItem.data)
956+
dataItem.data.Free()
957+
}
943958
// A data item is represented by a dataFrame, since it later translates into
944959
// multiple HTTP2 data frames.
945960
// Every dataFrame has two buffers; h that keeps grpc-message header and data
946961
// that is the actual message. As an optimization to keep wire traffic low, data
947962
// from data is copied to h to make as big as the maximum possible HTTP2 frame
948963
// size.
949964

950-
if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
965+
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
951966
// Client sends out empty data frame with endStream = true
952967
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
953968
return false, err
954969
}
955970
str.itl.dequeue() // remove the empty data item from stream
956-
_ = dataItem.reader.Close()
971+
_ = reader.Close()
957972
if str.itl.isEmpty() {
958973
str.state = empty
959974
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -982,8 +997,8 @@ func (l *loopyWriter) processData() (bool, error) {
982997
}
983998
// Compute how much of the header and data we can send within quota and max frame length
984999
hSize := min(maxSize, len(dataItem.h))
985-
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
986-
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
1000+
dSize := min(maxSize-hSize, reader.Remaining())
1001+
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
9871002
size := hSize + dSize
9881003

9891004
var buf *[]byte
@@ -1004,7 +1019,7 @@ func (l *loopyWriter) processData() (bool, error) {
10041019
defer pool.Put(buf)
10051020

10061021
copy((*buf)[:hSize], dataItem.h)
1007-
_, _ = dataItem.reader.Read((*buf)[hSize:])
1022+
_, _ = reader.Read((*buf)[hSize:])
10081023
}
10091024

10101025
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1025,7 +1040,7 @@ func (l *loopyWriter) processData() (bool, error) {
10251040
dataItem.h = dataItem.h[hSize:]
10261041

10271042
if remainingBytes == 0 { // All the data from that message was written out.
1028-
_ = dataItem.reader.Close()
1043+
_ = reader.Close()
10291044
str.itl.dequeue()
10301045
}
10311046
if str.itl.isEmpty() {

internal/transport/http2_client.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,32 +1091,29 @@ func (t *http2Client) GracefulClose() {
10911091
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
10921092
// should proceed only if Write returns nil.
10931093
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
1094-
reader := data.Reader()
1095-
10961094
if opts.Last {
10971095
// If it's the last message, update stream state.
10981096
if !s.compareAndSwapState(streamActive, streamWriteDone) {
1099-
_ = reader.Close()
11001097
return errStreamDone
11011098
}
11021099
} else if s.getState() != streamActive {
1103-
_ = reader.Close()
11041100
return errStreamDone
11051101
}
11061102
df := &dataFrame{
11071103
streamID: s.id,
11081104
endStream: opts.Last,
11091105
h: hdr,
1110-
reader: reader,
1106+
data: data,
11111107
}
1112-
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
1113-
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
1114-
_ = reader.Close()
1108+
dataLen := data.Len()
1109+
if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
1110+
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
11151111
return err
11161112
}
11171113
}
1114+
data.Ref()
11181115
if err := t.controlBuf.put(df); err != nil {
1119-
_ = reader.Close()
1116+
data.Free()
11201117
return err
11211118
}
11221119
t.incrMsgSent()

internal/transport/http2_server.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,33 +1135,30 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
11351135
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
11361136
// is returns if it fails (e.g., framing error, transport error).
11371137
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
1138-
reader := data.Reader()
1139-
11401138
if !s.isHeaderSent() { // Headers haven't been written yet.
11411139
if err := t.writeHeader(s, nil); err != nil {
1142-
_ = reader.Close()
11431140
return err
11441141
}
11451142
} else {
11461143
// Writing headers checks for this condition.
11471144
if s.getState() == streamDone {
1148-
_ = reader.Close()
11491145
return t.streamContextErr(s)
11501146
}
11511147
}
11521148

11531149
df := &dataFrame{
11541150
streamID: s.id,
11551151
h: hdr,
1156-
reader: reader,
1152+
data: data,
11571153
onEachWrite: t.setResetPingStrikes,
11581154
}
1159-
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
1160-
_ = reader.Close()
1155+
dataLen := data.Len()
1156+
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
11611157
return t.streamContextErr(s)
11621158
}
1159+
data.Ref()
11631160
if err := t.controlBuf.put(df); err != nil {
1164-
_ = reader.Close()
1161+
data.Free()
11651162
return err
11661163
}
11671164
t.incrMsgSent()

internal/transport/transport_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,11 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *ServerStream)
203203
}
204204
}
205205
data := newBufferSlice(p)
206+
data.Ref()
206207
conn.controlBuf.put(&dataFrame{
207208
streamID: s.id,
208209
h: nil,
209-
reader: data.Reader(),
210+
data: data,
210211
onEachWrite: func() {},
211212
})
212213
sent += len(p)
@@ -1092,11 +1093,12 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) {
10921093
t.Fatalf("Failed to open stream: %v", err)
10931094
}
10941095
d := newBufferSlice(make([]byte, http2MaxFrameLen))
1096+
d.Ref()
10951097
ct.controlBuf.put(&dataFrame{
10961098
streamID: s.id,
10971099
endStream: false,
10981100
h: nil,
1099-
reader: d.Reader(),
1101+
data: d,
11001102
onEachWrite: func() {},
11011103
})
11021104
// Loop until the server side stream is created.

mem/buffer_slice.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ type Reader interface {
137137
Close() error
138138
// Remaining returns the number of unread bytes remaining in the slice.
139139
Remaining() int
140+
// Reset frees the currently held buffer slice and starts reading from the
141+
// provided slice. This allows reusing the reader object.
142+
Reset(s BufferSlice)
140143
}
141144

142145
type sliceReader struct {
@@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int {
150153
return r.len
151154
}
152155

156+
func (r *sliceReader) Reset(s BufferSlice) {
157+
r.data.Free()
158+
s.Ref()
159+
r.data = s
160+
r.len = s.Len()
161+
r.bufferIdx = 0
162+
}
163+
153164
func (r *sliceReader) Close() error {
154165
r.data.Free()
155166
r.data = nil

0 commit comments

Comments
 (0)