Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 65 additions & 32 deletions internal/trie/node/branch_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash"
"io"
"runtime"

"github.com/ChainSafe/gossamer/internal/trie/codec"
"github.com/ChainSafe/gossamer/internal/trie/pools"
Expand Down Expand Up @@ -113,43 +114,72 @@ func (b *Branch) Encode(buffer Buffer) (err error) {
}
}

const parallel = false // TODO Done in pull request #2081
if parallel {
err = encodeChildrenInParallel(b.Children, buffer)
} else {
err = encodeChildrenSequentially(b.Children, buffer)
}
err = encodeChildrenOpportunisticParallel(b.Children, buffer)
if err != nil {
return fmt.Errorf("cannot encode children of branch: %w", err)
}

return nil
}

func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) {
type result struct {
index int
buffer *bytes.Buffer
err error
type encodingAsyncResult struct {
index int
buffer *bytes.Buffer
err error
}

func runEncodeChild(child Node, index int,
results chan<- encodingAsyncResult, rateLimit <-chan struct{}) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

results <- encodingAsyncResult{
index: index,
buffer: buffer,
err: err,
}
if rateLimit != nil {
// Only run if runEncodeChild is launched
// in its own goroutine.
<-rateLimit
}
}

var parallelLimit = runtime.NumCPU()

var parallelEncodingRateLimit = make(chan struct{}, parallelLimit)

resultsCh := make(chan result)
// encodeChildrenOpportunisticParallel encodes children in parallel eventually.
// Leaves are encoded in a blocking way, and branches are encoded in separate
// goroutines IF they are less than the parallelLimit number of goroutines already
// running. This is designed to limit the total number of goroutines in order to
// avoid using too much memory on the stack.
func encodeChildrenOpportunisticParallel(children [16]Node, buffer io.Writer) (err error) {
// Buffered channels since children might be encoded in this
// goroutine or another one.
resultsCh := make(chan encodingAsyncResult, len(children))

for i, child := range children {
go func(index int, child Node) {
buffer := pools.EncodingBuffers.Get().(*bytes.Buffer)
buffer.Reset()
// buffer is put back in the pool after processing its
// data in the select block below.

err := encodeChild(child, buffer)

resultsCh <- result{
index: index,
buffer: buffer,
err: err,
}
}(i, child)
if isNodeNil(child) || child.Type() == LeafType {
runEncodeChild(child, i, resultsCh, nil)
continue
}

// Branch child
select {
case parallelEncodingRateLimit <- struct{}{}:
// We have a goroutine available to encode
// the branch in parallel.
go runEncodeChild(child, i, resultsCh, parallelEncodingRateLimit)
default:
// we reached the maximum parallel goroutines
// so encode this branch in this goroutine
runEncodeChild(child, i, resultsCh, nil)
}
}

currentIndex := 0
Expand All @@ -166,7 +196,7 @@ func encodeChildrenInParallel(children [16]Node, buffer io.Writer) (err error) {
for currentIndex < len(children) &&
resultBuffers[currentIndex] != nil {
bufferSlice := resultBuffers[currentIndex].Bytes()
if len(bufferSlice) > 0 {
if err == nil && len(bufferSlice) > 0 {
// note buffer.Write copies the byte slice given as argument
_, writeErr := buffer.Write(bufferSlice)
if writeErr != nil && err == nil {
Expand Down Expand Up @@ -203,17 +233,20 @@ func encodeChildrenSequentially(children [16]Node, buffer io.Writer) (err error)
return nil
}

func encodeChild(child Node, buffer io.Writer) (err error) {
var isNil bool
switch impl := child.(type) {
func isNodeNil(n Node) (isNil bool) {
switch impl := n.(type) {
case *Branch:
isNil = impl == nil
case *Leaf:
isNil = impl == nil
default:
isNil = child == nil
isNil = n == nil
}
if isNil {
return isNil
}

func encodeChild(child Node, buffer io.Writer) (err error) {
if isNodeNil(child) {
return nil
}

Expand Down
97 changes: 90 additions & 7 deletions internal/trie/node/branch_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package node

import (
"bytes"
"io"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -251,7 +253,7 @@ func Test_Branch_Encode(t *testing.T) {
wrappedErr: errTest,
errMessage: "cannot write encoded value to buffer: test error",
},
"buffer write error for children encoded sequentially": {
"buffer write error for children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -280,10 +282,10 @@ func Test_Branch_Encode(t *testing.T) {
},
wrappedErr: errTest,
errMessage: "cannot encode children of branch: " +
"cannot encode child at index 3: " +
"failed to write child to buffer: test error",
"cannot write encoding of child at index 3: " +
"test error",
},
"success with sequential children encoding": {
"success with children encoding": {
branch: &Branch{
Key: []byte{1, 2, 3},
Value: []byte{100},
Expand Down Expand Up @@ -346,7 +348,46 @@ func Test_Branch_Encode(t *testing.T) {
}
}

func Test_encodeChildrenInParallel(t *testing.T) {
// Opportunistic parallel: 13781602 ns/op 14419488 B/op 323575 allocs/op
// Sequentially: 24269268 ns/op 20126525 B/op 327668 allocs/op
func Benchmark_encodeChildrenOpportunisticParallel(b *testing.B) {
const valueBytesSize = 10
const depth = 3 // do not raise above 4

children := populateChildren(valueBytesSize, depth)

b.Run("", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = encodeChildrenOpportunisticParallel(children, io.Discard)
}
})
}

func populateChildren(valueSize, depth int) (children [16]Node) {
someValue := make([]byte, valueSize)

if depth == 0 {
for i := range children {
children[i] = &Leaf{
Key: someValue,
Value: someValue,
}
}
return children
}

for i := range children {
children[i] = &Branch{
Key: someValue,
Value: someValue,
Children: populateChildren(valueSize, depth-1),
}
}

return children
}

func Test_encodeChildrenOpportunisticParallel(t *testing.T) {
t.Parallel()

testCases := map[string]struct {
Expand Down Expand Up @@ -393,7 +434,7 @@ func Test_encodeChildrenInParallel(t *testing.T) {
},
},
},
"encoding error": {
"leaf encoding error": {
children: [16]Node{
nil, nil, nil, nil,
nil, nil, nil, nil,
Expand All @@ -413,6 +454,23 @@ func Test_encodeChildrenInParallel(t *testing.T) {
errMessage: "cannot write encoding of child at index 11: " +
"test error",
},
"branch encoding": {
// Note this may run in parallel or not depending on other tests
// running in parallel.
children: [16]Node{
&Branch{
Key: []byte{1},
Children: [16]Node{
&Leaf{Key: []byte{1}},
},
},
},
writes: []writeCall{
{
written: []byte{32, 129, 1, 1, 0, 12, 65, 1, 0},
},
},
},
}

for name, testCase := range testCases {
Expand All @@ -434,7 +492,7 @@ func Test_encodeChildrenInParallel(t *testing.T) {
previousCall = call
}

err := encodeChildrenInParallel(testCase.children, buffer)
err := encodeChildrenOpportunisticParallel(testCase.children, buffer)

if testCase.wrappedErr != nil {
assert.ErrorIs(t, err, testCase.wrappedErr)
Expand All @@ -444,6 +502,31 @@ func Test_encodeChildrenInParallel(t *testing.T) {
}
})
}

t.Run("opportunist parallel branch encoding", func(t *testing.T) {
t.Parallel()

var children [16]Node
for i := range children {
children[i] = &Branch{}
}

buffer := bytes.NewBuffer(nil)

err := encodeChildrenOpportunisticParallel(children, buffer)

require.NoError(t, err)
expectedBytes := []byte{
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0,
0xc, 0x80, 0x0, 0x0, 0xc, 0x80, 0x0, 0x0}
assert.Equal(t, expectedBytes, buffer.Bytes())
})
}

func Test_encodeChildrenSequentially(t *testing.T) {
Expand Down