Skip to content

Commit 42046c0

Browse files
jim3magaius-qi
authored andcommitted
chore: optimize stream peer task (#1186)
Signed-off-by: Jim Ma <[email protected]>
1 parent bab1dde commit 42046c0

File tree

3 files changed

+237
-74
lines changed

3 files changed

+237
-74
lines changed

client/daemon/peer/peertask_stream.go

Lines changed: 73 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -169,89 +169,91 @@ func (s *streamTask) writeToPipe(firstPiece *PieceInfo, pw *io.PipeWriter) {
169169
}()
170170
var (
171171
desired int32
172-
cur *PieceInfo
173-
wrote int64
172+
piece *PieceInfo
174173
err error
175-
cache = make(map[int32]bool)
176174
)
177-
// update first piece to cache and check cur with desired
178-
cache[firstPiece.Num] = true
179-
cur = firstPiece
175+
piece = firstPiece
180176
for {
181-
if desired == cur.Num {
182-
for {
183-
delete(cache, desired)
184-
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
185-
span.SetAttributes(config.AttributePiece.Int(int(desired)))
186-
wrote, err = s.writeOnePiece(pw, desired)
187-
if err != nil {
188-
span.RecordError(err)
189-
span.End()
190-
s.Errorf("write to pipe error: %s", err)
191-
_ = pw.CloseWithError(err)
192-
return
193-
}
194-
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
195-
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
196-
span.End()
197-
desired++
198-
cached := cache[desired]
199-
if !cached {
200-
break
201-
}
202-
}
203-
} else {
204-
// not desired piece, cache it
205-
cache[cur.Num] = true
206-
if cur.Num < desired {
207-
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur.Num)
177+
if desired == piece.Num || desired <= piece.OrderedNum {
178+
desired, err = s.writeOrderedPieces(desired, piece.OrderedNum, pw)
179+
if err != nil {
180+
return
208181
}
209182
}
210183

211184
select {
212185
case <-s.ctx.Done():
213-
s.Errorf("context done due to: %s", s.ctx.Err())
214-
s.span.RecordError(s.ctx.Err())
215-
if err = pw.CloseWithError(s.ctx.Err()); err != nil {
216-
s.Errorf("CloseWithError failed: %s", err)
217-
}
218-
return
219-
case cur = <-s.pieceCh:
220-
// FIXME check missing piece for non-block broker channel
186+
err = fmt.Errorf("context done due to: %s", s.ctx.Err())
187+
s.closeWithError(pw, err)
188+
break
189+
case piece = <-s.pieceCh:
221190
continue
222191
case <-s.peerTaskConductor.failCh:
223-
ptError := fmt.Errorf("context done due to peer task fail: %d/%s",
192+
err = fmt.Errorf("context done due to peer task fail: %d/%s",
224193
s.peerTaskConductor.failedCode, s.peerTaskConductor.failedReason)
225-
s.Error(ptError.Error())
226-
s.span.RecordError(ptError)
227-
if err = pw.CloseWithError(ptError); err != nil {
228-
s.Errorf("CloseWithError failed: %s", err)
229-
}
230-
return
194+
s.closeWithError(pw, err)
195+
break
231196
case <-s.peerTaskConductor.successCh:
232-
for {
233-
// all data wrote to local storage, and all data wrote to pipe write
234-
if s.peerTaskConductor.readyPieces.Settled() == desired {
235-
s.Debugf("all %d pieces wrote to pipe", desired)
236-
pw.Close()
237-
return
238-
}
239-
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
240-
span.SetAttributes(config.AttributePiece.Int(int(desired)))
241-
wrote, err = s.writeOnePiece(pw, desired)
242-
if err != nil {
243-
span.RecordError(err)
244-
span.End()
245-
s.span.RecordError(err)
246-
s.Errorf("write to pipe error: %s", err)
247-
_ = pw.CloseWithError(err)
248-
return
249-
}
250-
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
251-
span.End()
252-
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
253-
desired++
254-
}
197+
s.writeRemainingPieces(desired, pw)
198+
break
255199
}
256200
}
257201
}
202+
203+
func (s *streamTask) writeOrderedPieces(desired, orderedNum int32, pw *io.PipeWriter) (int32, error) {
204+
for {
205+
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
206+
span.SetAttributes(config.AttributePiece.Int(int(desired)))
207+
wrote, err := s.writeOnePiece(pw, desired)
208+
if err != nil {
209+
span.RecordError(err)
210+
span.End()
211+
s.Errorf("write to pipe error: %s", err)
212+
_ = pw.CloseWithError(err)
213+
return desired, err
214+
}
215+
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
216+
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
217+
span.End()
218+
219+
desired++
220+
if desired > orderedNum {
221+
break
222+
}
223+
}
224+
return desired, nil
225+
}
226+
227+
func (s *streamTask) writeRemainingPieces(desired int32, pw *io.PipeWriter) {
228+
for {
229+
// all data wrote to local storage, and all data wrote to pipe write
230+
if s.peerTaskConductor.readyPieces.Settled() == desired {
231+
s.Debugf("all %d pieces wrote to pipe", desired)
232+
pw.Close()
233+
return
234+
}
235+
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
236+
span.SetAttributes(config.AttributePiece.Int(int(desired)))
237+
wrote, err := s.writeOnePiece(pw, desired)
238+
if err != nil {
239+
span.RecordError(err)
240+
span.End()
241+
s.span.RecordError(err)
242+
s.Errorf("write to pipe error: %s", err)
243+
_ = pw.CloseWithError(err)
244+
return
245+
}
246+
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
247+
span.End()
248+
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
249+
desired++
250+
}
251+
}
252+
253+
func (s *streamTask) closeWithError(pw *io.PipeWriter, err error) {
254+
s.Error(err)
255+
s.span.RecordError(err)
256+
if err = pw.CloseWithError(err); err != nil {
257+
s.Errorf("CloseWithError failed: %s", err)
258+
}
259+
}

client/daemon/peer/piece_broker.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ type pieceBroker struct {
2424
}
2525

2626
type PieceInfo struct {
27-
Num int32
28-
Finished bool
27+
// Num is the current piece num
28+
Num int32
29+
// OrderedNum is the max pieces num with ordered, eg: 0 1 2 3 5 7 8, the OrderedNum is 3
30+
OrderedNum int32
31+
Finished bool
2932
}
3033

3134
func newPieceBroker() *pieceBroker {
@@ -38,7 +41,12 @@ func newPieceBroker() *pieceBroker {
3841
}
3942

4043
func (b *pieceBroker) Start() {
41-
subs := map[chan *PieceInfo]struct{}{}
44+
var (
45+
orderedNum int32 = -1
46+
subs = map[chan *PieceInfo]struct{}{}
47+
pieces = map[int32]struct{}{}
48+
)
49+
4250
for {
4351
select {
4452
case <-b.stopCh:
@@ -51,6 +59,19 @@ func (b *pieceBroker) Start() {
5159
case msgCh := <-b.unsubCh:
5260
delete(subs, msgCh)
5361
case msg := <-b.publishCh:
62+
pieces[msg.Num] = struct{}{}
63+
if orderedNum+1 == msg.Num {
64+
orderedNum++
65+
// search cached pieces
66+
for {
67+
if _, ok := pieces[orderedNum+1]; ok {
68+
orderedNum++
69+
} else {
70+
break
71+
}
72+
}
73+
}
74+
msg.OrderedNum = orderedNum
5475
for msgCh := range subs {
5576
// msgCh is buffered, use non-blocking send to protect the broker
5677
select {
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2022 The Dragonfly Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package peer
18+
19+
import (
20+
"sync"
21+
"testing"
22+
23+
testifyassert "github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestBroker(t *testing.T) {
27+
var testCases = []struct {
28+
name string
29+
total int32
30+
pubSeq []int32
31+
desire []*PieceInfo
32+
}{
33+
{
34+
name: "order publish",
35+
total: 3,
36+
pubSeq: []int32{0, 1, 2},
37+
desire: []*PieceInfo{
38+
{
39+
Num: 0,
40+
OrderedNum: 0,
41+
Finished: false,
42+
},
43+
{
44+
Num: 1,
45+
OrderedNum: 1,
46+
Finished: false,
47+
},
48+
{
49+
Num: 2,
50+
OrderedNum: 2,
51+
Finished: true,
52+
},
53+
},
54+
},
55+
{
56+
name: "partial order publish",
57+
total: 3,
58+
pubSeq: []int32{0, 2, 1},
59+
desire: []*PieceInfo{
60+
{
61+
Num: 0,
62+
OrderedNum: 0,
63+
Finished: false,
64+
},
65+
{
66+
Num: 2,
67+
OrderedNum: 0,
68+
Finished: false,
69+
},
70+
{
71+
Num: 1,
72+
OrderedNum: 2,
73+
Finished: true,
74+
},
75+
},
76+
},
77+
{
78+
name: "inverted order publish",
79+
total: 3,
80+
pubSeq: []int32{2, 1, 0},
81+
desire: []*PieceInfo{
82+
{
83+
Num: 2,
84+
OrderedNum: -1,
85+
Finished: false,
86+
},
87+
{
88+
Num: 1,
89+
OrderedNum: -1,
90+
Finished: false,
91+
},
92+
{
93+
Num: 0,
94+
OrderedNum: 2,
95+
Finished: true,
96+
},
97+
},
98+
},
99+
}
100+
for _, tc := range testCases {
101+
t.Run(tc.name, func(t *testing.T) {
102+
assert := testifyassert.New(t)
103+
broker := newPieceBroker()
104+
go broker.Start()
105+
ch := broker.Subscribe()
106+
done := make(chan struct{})
107+
var received []*PieceInfo
108+
109+
wg := sync.WaitGroup{}
110+
wg.Add(len(tc.pubSeq))
111+
go func() {
112+
for {
113+
select {
114+
case <-done:
115+
return
116+
case info := <-ch:
117+
received = append(received, info)
118+
wg.Done()
119+
}
120+
}
121+
}()
122+
var sent int32
123+
for _, n := range tc.pubSeq {
124+
sent++
125+
var finished bool
126+
if sent == tc.total {
127+
finished = true
128+
}
129+
broker.Publish(&PieceInfo{
130+
Num: n,
131+
Finished: finished,
132+
})
133+
}
134+
wg.Wait()
135+
broker.Stop()
136+
close(done)
137+
assert.Equal(tc.desire, received)
138+
})
139+
}
140+
}

0 commit comments

Comments
 (0)