Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 73 additions & 71 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,89 +169,91 @@ func (s *streamTask) writeToPipe(firstPiece *PieceInfo, pw *io.PipeWriter) {
}()
var (
desired int32
cur *PieceInfo
wrote int64
piece *PieceInfo
err error
cache = make(map[int32]bool)
)
// update first piece to cache and check cur with desired
cache[firstPiece.Num] = true
cur = firstPiece
piece = firstPiece
for {
if desired == cur.Num {
for {
delete(cache, desired)
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
span.End()
desired++
cached := cache[desired]
if !cached {
break
}
}
} else {
// not desired piece, cache it
cache[cur.Num] = true
if cur.Num < desired {
s.Warnf("piece number should be equal or greater than %d, received piece number: %d", desired, cur.Num)
if desired == piece.Num || desired <= piece.OrderedNum {
desired, err = s.writeOrderedPieces(desired, piece.OrderedNum, pw)
if err != nil {
return
}
}

select {
case <-s.ctx.Done():
s.Errorf("context done due to: %s", s.ctx.Err())
s.span.RecordError(s.ctx.Err())
if err = pw.CloseWithError(s.ctx.Err()); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
return
case cur = <-s.pieceCh:
// FIXME check missing piece for non-block broker channel
err = fmt.Errorf("context done due to: %s", s.ctx.Err())
s.closeWithError(pw, err)
break
case piece = <-s.pieceCh:
continue
case <-s.peerTaskConductor.failCh:
ptError := fmt.Errorf("context done due to peer task fail: %d/%s",
err = fmt.Errorf("context done due to peer task fail: %d/%s",
s.peerTaskConductor.failedCode, s.peerTaskConductor.failedReason)
s.Error(ptError.Error())
s.span.RecordError(ptError)
if err = pw.CloseWithError(ptError); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
return
s.closeWithError(pw, err)
break
case <-s.peerTaskConductor.successCh:
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.peerTaskConductor.readyPieces.Settled() == desired {
s.Debugf("all %d pieces wrote to pipe", desired)
pw.Close()
return
}
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err = s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.span.RecordError(err)
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
span.End()
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
desired++
}
s.writeRemainingPieces(desired, pw)
break
}
}
}

func (s *streamTask) writeOrderedPieces(desired, orderedNum int32, pw *io.PipeWriter) (int32, error) {
for {
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err := s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return desired, err
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
span.End()

desired++
if desired > orderedNum {
break
}
}
return desired, nil
}

func (s *streamTask) writeRemainingPieces(desired int32, pw *io.PipeWriter) {
for {
// all data wrote to local storage, and all data wrote to pipe write
if s.peerTaskConductor.readyPieces.Settled() == desired {
s.Debugf("all %d pieces wrote to pipe", desired)
pw.Close()
return
}
_, span := tracer.Start(s.ctx, config.SpanWriteBackPiece)
span.SetAttributes(config.AttributePiece.Int(int(desired)))
wrote, err := s.writeOnePiece(pw, desired)
if err != nil {
span.RecordError(err)
span.End()
s.span.RecordError(err)
s.Errorf("write to pipe error: %s", err)
_ = pw.CloseWithError(err)
return
}
span.SetAttributes(config.AttributePieceSize.Int(int(wrote)))
span.End()
s.Debugf("wrote piece %d to pipe, size %d", desired, wrote)
desired++
}
}

func (s *streamTask) closeWithError(pw *io.PipeWriter, err error) {
s.Error(err)
s.span.RecordError(err)
if err = pw.CloseWithError(err); err != nil {
s.Errorf("CloseWithError failed: %s", err)
}
}
27 changes: 24 additions & 3 deletions client/daemon/peer/piece_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ type pieceBroker struct {
}

type PieceInfo struct {
Num int32
Finished bool
// Num is the current piece num
Num int32
// OrderedNum is the max pieces num with ordered, eg: 0 1 2 3 5 7 8, the OrderedNum is 3
OrderedNum int32
Finished bool
}

func newPieceBroker() *pieceBroker {
Expand All @@ -38,7 +41,12 @@ func newPieceBroker() *pieceBroker {
}

func (b *pieceBroker) Start() {
subs := map[chan *PieceInfo]struct{}{}
var (
orderedNum int32 = -1
subs = map[chan *PieceInfo]struct{}{}
pieces = map[int32]struct{}{}
)

for {
select {
case <-b.stopCh:
Expand All @@ -51,6 +59,19 @@ func (b *pieceBroker) Start() {
case msgCh := <-b.unsubCh:
delete(subs, msgCh)
case msg := <-b.publishCh:
pieces[msg.Num] = struct{}{}
if orderedNum+1 == msg.Num {
orderedNum++
// search cached pieces
for {
if _, ok := pieces[orderedNum+1]; ok {
orderedNum++
} else {
break
}
}
}
msg.OrderedNum = orderedNum
for msgCh := range subs {
// msgCh is buffered, use non-blocking send to protect the broker
select {
Expand Down
140 changes: 140 additions & 0 deletions client/daemon/peer/piece_broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package peer

import (
"sync"
"testing"

testifyassert "github.com/stretchr/testify/assert"
)

func TestBroker(t *testing.T) {
var testCases = []struct {
name string
total int32
pubSeq []int32
desire []*PieceInfo
}{
{
name: "order publish",
total: 3,
pubSeq: []int32{0, 1, 2},
desire: []*PieceInfo{
{
Num: 0,
OrderedNum: 0,
Finished: false,
},
{
Num: 1,
OrderedNum: 1,
Finished: false,
},
{
Num: 2,
OrderedNum: 2,
Finished: true,
},
},
},
{
name: "partial order publish",
total: 3,
pubSeq: []int32{0, 2, 1},
desire: []*PieceInfo{
{
Num: 0,
OrderedNum: 0,
Finished: false,
},
{
Num: 2,
OrderedNum: 0,
Finished: false,
},
{
Num: 1,
OrderedNum: 2,
Finished: true,
},
},
},
{
name: "inverted order publish",
total: 3,
pubSeq: []int32{2, 1, 0},
desire: []*PieceInfo{
{
Num: 2,
OrderedNum: -1,
Finished: false,
},
{
Num: 1,
OrderedNum: -1,
Finished: false,
},
{
Num: 0,
OrderedNum: 2,
Finished: true,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert := testifyassert.New(t)
broker := newPieceBroker()
go broker.Start()
ch := broker.Subscribe()
done := make(chan struct{})
var received []*PieceInfo

wg := sync.WaitGroup{}
wg.Add(len(tc.pubSeq))
go func() {
for {
select {
case <-done:
return
case info := <-ch:
received = append(received, info)
wg.Done()
}
}
}()
var sent int32
for _, n := range tc.pubSeq {
sent++
var finished bool
if sent == tc.total {
finished = true
}
broker.Publish(&PieceInfo{
Num: n,
Finished: finished,
})
}
wg.Wait()
broker.Stop()
close(done)
assert.Equal(tc.desire, received)
})
}
}