Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,19 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo
gs.tracer.SendRPC(rpc, p)
}

// maxProtobufOverhead denotes the protobuf encoding overhead for a message.
// it is based on the RPC.Size() function excerpt:
// l = e.Size()
// n += 1 + l + sovRpc(uint64(l))
// where:
// - 1 is a feild key size (see https://github.com/gogo/protobuf/blob/f67b8970b736e53dbd7d0a27146c8f1ac52f74e5/plugin/size/size.go#L499)
//
// and appears to be 1 for all RPC's fields
// - sovRpc is a number of bytes needed to encode some uint64 value.
//
// Assuming that the message size is 10^10, the number of bytes needed to encode it is 5.
const maxProtobufOverhead = 1 + 5

// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible.
// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs.
// If an RPC is too large and can't be split further (e.g. Message data is
Expand All @@ -1433,6 +1446,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
out[0].from = elems[0].from
}

lastSize := out[len(out)-1].Size()
for _, elem := range elems {
lastRPC := out[len(out)-1]

Expand All @@ -1441,10 +1455,15 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
// old behavior. In the future let's not merge messages. Since,
// it may increase message latency.
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
lastRPC.Publish = append(lastRPC.Publish, msg)
// do not use lastRPC.Size() here to avoid lastRPC.Publish iteration calling Size on each element.
// use msg.Size() + maxProtobufOverhead upper bound instead.
lastSize += msg.Size() + maxProtobufOverhead
if lastSize > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
lastSize = lastRPC.Size() // single lastRPC.Publish[] slice calcualtion
out = append(out, lastRPC)
}
}
Expand Down
47 changes: 47 additions & 0 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
mrand "math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -2418,6 +2419,7 @@ func TestFragmentRPCFunction(t *testing.T) {
ensureBelowLimit(results)
msgsPerRPC := limit / msgSize
expectedRPCs := nMessages / msgsPerRPC
expectedRPCs += 1 // add one more message to account for message size approximation when fragmenting
if len(results) != expectedRPCs {
t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results))
}
Expand Down Expand Up @@ -3675,3 +3677,48 @@ func TestPublishDuplicateMessage(t *testing.T) {
t.Fatal("Duplicate message should not return an error")
}
}

func BenchmarkAppendOrMergeRPC(b *testing.B) {
makeTestRPC := func(numMsgs int, msgSize int) RPC {
msgs := make([]*pb.Message, numMsgs)
payload := make([]byte, msgSize)
for i := range msgs {
msgs[i] = &pb.Message{
Data: payload,
From: []byte(strconv.Itoa(i)),
}
}
return RPC{
RPC: pb.RPC{
Publish: msgs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your workload, do you see RPCs being split primarily due to many messages in a single RPC? I ask because we could add some optimizations if so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, more messages added into Publish list, and the max RPC size logic iterates over Publish

},
}
}
b.Run("small", func(b *testing.B) {
r := mrand.New(mrand.NewSource(99))
const numRPCs = 3
const msgSize = 1024
rpcs := make([]RPC, numRPCs)
for i := 0; i < numRPCs; i++ {
rpcs[i] = makeTestRPC(5, msgSize+r.Intn(100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = appendOrMergeRPC(nil, DefaultMaxMessageSize, rpcs...)
}
})

b.Run("large", func(b *testing.B) {
r := mrand.New(mrand.NewSource(99))
const numRPCs = 30
const msgSize = 50 * 1024
rpcs := make([]RPC, numRPCs)
for i := 0; i < numRPCs; i++ {
rpcs[i] = makeTestRPC(20, msgSize+r.Intn(100))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = appendOrMergeRPC(nil, DefaultMaxMessageSize, rpcs...)
}
})
}
Loading