@@ -32,6 +32,7 @@ import (
3232 "github.com/ethereum/go-ethereum/core/rawdb"
3333 "github.com/ethereum/go-ethereum/core/txpool"
3434 "github.com/ethereum/go-ethereum/core/types"
35+ "github.com/ethereum/go-ethereum/crypto"
3536 "github.com/ethereum/go-ethereum/eth/downloader"
3637 "github.com/ethereum/go-ethereum/eth/fetcher"
3738 "github.com/ethereum/go-ethereum/eth/protocols/eth"
@@ -41,7 +42,9 @@ import (
4142 "github.com/ethereum/go-ethereum/log"
4243 "github.com/ethereum/go-ethereum/metrics"
4344 "github.com/ethereum/go-ethereum/p2p"
45+ "github.com/ethereum/go-ethereum/p2p/enode"
4446 "github.com/ethereum/go-ethereum/triedb/pathdb"
47+ "golang.org/x/crypto/sha3"
4548)
4649
4750const (
@@ -84,6 +87,7 @@ type txPool interface {
8487// handlerConfig is the collection of initialization parameters to create a full
8588// node network handler.
8689type handlerConfig struct {
90+ NodeID enode.ID // P2P node ID used for tx propagation topology
8791 Database ethdb.Database // Database for direct sync insertions
8892 Chain * core.BlockChain // Blockchain to serve data from
8993 TxPool txPool // Transaction pool to propagate from
@@ -96,6 +100,7 @@ type handlerConfig struct {
96100}
97101
98102type handler struct {
103+ nodeID enode.ID
99104 networkID uint64
100105 forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
101106
@@ -137,6 +142,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
137142 config .EventMux = new (event.TypeMux ) // Nicety initialization for tests
138143 }
139144 h := & handler {
145+ nodeID : config .NodeID ,
140146 networkID : config .Network ,
141147 forkFilter : forkid .NewFilter (config .Chain ),
142148 eventMux : config .EventMux ,
@@ -614,25 +620,54 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
614620 annos = make (map [* ethPeer ][]common.Hash ) // Set peer->hash to announce
615621 )
616622 // Broadcast transactions to a batch of peers not knowing about it
617- for _ , tx := range txs {
618- peers := h .peers .peersWithoutTransaction (tx .Hash ())
623+ direct := big .NewInt (int64 (math .Sqrt (float64 (h .peers .len ())))) // Approximate number of peers to broadcast to
624+ if direct .BitLen () == 0 {
625+ direct = big .NewInt (1 )
626+ }
627+ total := new (big.Int ).Exp (direct , big .NewInt (2 ), nil ) // Stabilise total peer count a bit based on sqrt peers
619628
620- var numDirect int
629+ var (
630+ signer = types .LatestSignerForChainID (h .chain .Config ().ChainID ) // Don't care about chain status, we just need *a* sender
631+ hasher = sha3 .NewLegacyKeccak256 ().(crypto.KeccakState )
632+ hash = make ([]byte , 32 )
633+ )
634+ for _ , tx := range txs {
635+ var maybeDirect bool
621636 switch {
622637 case tx .Type () == types .BlobTxType :
623638 blobTxs ++
624639 case tx .Size () > txMaxBroadcastSize :
625640 largeTxs ++
626641 default :
627- numDirect = int ( math . Sqrt ( float64 ( len ( peers ))))
642+ maybeDirect = true
628643 }
629- // Send the tx unconditionally to a subset of our peers
630- for _ , peer := range peers [:numDirect ] {
631- txset [peer ] = append (txset [peer ], tx .Hash ())
632- }
633- // For the remaining peers, send announcement only
634- for _ , peer := range peers [numDirect :] {
635- annos [peer ] = append (annos [peer ], tx .Hash ())
644+ // Send the transaction (if it's small enough) directly to a subset of
645+ // the peers that have not received it yet, ensuring that the flow of
646+ // transactions is groupped by account to (try and) avoid nonce gaps.
647+ //
648+ // To do this, we hash the local enode IW with together with a peer's
649+ // enode ID together with the transaction sender and broadcast if
650+ // `sha(self, peer, sender) mod peers < sqrt(peers)`.
651+ for _ , peer := range h .peers .peersWithoutTransaction (tx .Hash ()) {
652+ var broadcast bool
653+ if maybeDirect {
654+ hasher .Reset ()
655+ hasher .Write (h .nodeID .Bytes ())
656+ hasher .Write (peer .Node ().ID ().Bytes ())
657+
658+ from , _ := types .Sender (signer , tx ) // Ignore error, we only use the addr as a propagation target splitter
659+ hasher .Write (from .Bytes ())
660+
661+ hasher .Read (hash )
662+ if new (big.Int ).Mod (new (big.Int ).SetBytes (hash ), total ).Cmp (direct ) < 0 {
663+ broadcast = true
664+ }
665+ }
666+ if broadcast {
667+ txset [peer ] = append (txset [peer ], tx .Hash ())
668+ } else {
669+ annos [peer ] = append (annos [peer ], tx .Hash ())
670+ }
636671 }
637672 }
638673 for peer , hashes := range txset {
0 commit comments