Skip to content

Commit c32d961

Browse files
committed
swarm/network: remove kademlia.EachBin; fix RequestSubscriptions and add unit test
1 parent 822bd51 commit c32d961

File tree

5 files changed

+228
-207
lines changed

5 files changed

+228
-207
lines changed

swarm/network/kademlia.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -356,40 +356,6 @@ func (k *Kademlia) Off(p *Peer) {
356356
}
357357
}
358358

359-
// EachBin is a two level nested iterator
360-
// The outer iterator returns all bins that have known peers, in order from shallowest to deepest
361-
// The inner iterator returns all peers per bin returned by the outer iterator, in no defined order
362-
// TODO the po returned by the inner iterator is not reliable. However, it is not being used in this method
363-
func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn *Peer, po int) bool) {
364-
k.lock.RLock()
365-
defer k.lock.RUnlock()
366-
367-
var startPo int
368-
var endPo int
369-
kadDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
370-
371-
k.conns.EachBin(base, pof, o, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool {
372-
//if the peer's bin is smaller than the kademlia depth,
373-
//only the peer's bin should be considered
374-
if po < kadDepth {
375-
startPo = po
376-
endPo = po
377-
} else {
378-
//if the peer's bin is equal or higher than the kademlia depth,
379-
//each bin from the depth up to k.MaxProxDisplay should be considered
380-
startPo = kadDepth
381-
endPo = k.MaxProxDisplay
382-
}
383-
384-
for bin := startPo; bin <= endPo; bin++ {
385-
f(func(val pot.Val, _ int) bool {
386-
return eachBinFunc(val.(*Peer), bin)
387-
})
388-
}
389-
return true
390-
})
391-
}
392-
393359
// EachConn is an iterator with args (base, po, f) applies f to each live peer
394360
// that has proximity order po or less as measured from the base
395361
// if base is nil, kademlia base address is used

swarm/network/kademlia_test.go

Lines changed: 0 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -888,155 +888,3 @@ func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer {
888888
}
889889
return NewPeer(bp, kad)
890890
}
891-
892-
/*
893-
TestEachBin is a unit test for the kademlia's `EachBin` function.
894-
That function is actually used only for streamer subscriptions.
895-
896-
Thus, the test does:
897-
* assigns each known connected peer to a bin map
898-
* build up a known kademlia in advance
899-
* runs the EachBin function, which returns supposed subscription bins
900-
* store all supposed bins per peer in a map
901-
* check that all peers have the expected subscriptions
902-
903-
This kad table and its peers are copied from TestKademliaCase1,
904-
it represents an edge case but for the purpose of a unit test for
905-
the `EachBin` function it is ok.
906-
907-
Addresses used in this test are discovered as part of the simulation network
908-
in higher level tests for streaming. They were generated randomly.
909-
910-
=========================================================================
911-
Thu Dec 13 14:21:47 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 7efef1
912-
population: 49 (49), MinProxBinSize: 2, MinBinSize: 2, MaxBinSize: 4
913-
000 18 8196 835f 8958 8e23 | 18 8196 (0) 835f (0) 8958 (0) 8e23 (0)
914-
001 14 2690 28f0 2850 3a51 | 14 2690 (0) 28f0 (0) 2850 (0) 3a51 (0)
915-
002 11 4d72 4a45 4375 4607 | 11 4d72 (0) 4a45 (0) 4375 (0) 4607 (0)
916-
003 1 646e | 1 646e (0)
917-
004 3 769c 76d1 7656 | 3 769c (0) 76d1 (0) 7656 (0)
918-
============ DEPTH: 5 ==========================================
919-
005 1 7a48 | 1 7a48 (0)
920-
006 1 7cbd | 1 7cbd (0)
921-
007 0 | 0
922-
008 0 | 0
923-
009 0 | 0
924-
010 0 | 0
925-
011 0 | 0
926-
012 0 | 0
927-
013 0 | 0
928-
014 0 | 0
929-
015 0 | 0
930-
=========================================================================
931-
*/
932-
func TestEachBin(t *testing.T) {
933-
//the pivot address; this is the actual kademlia node
934-
pivotAddr := "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e"
935-
936-
//a map of bin number to addresses from the given kademlia
937-
binMap := make(map[int][]string)
938-
binMap[0] = []string{
939-
"835fbbf1d16ba7347b6e2fc552d6e982148d29c624ea20383850df3c810fa8fc",
940-
"81968a2d8fb39114342ee1da85254ec51e0608d7f0f6997c2a8354c260a71009",
941-
}
942-
binMap[1] = []string{
943-
"28f0bc1b44658548d6e05dd16d4c2fe77f1da5d48b6774bc4263b045725d0c19",
944-
"2690a910c33ee37b91eb6c4e0731d1d345e2dc3b46d308503a6e85bbc242c69e",
945-
}
946-
binMap[2] = []string{
947-
"4a45f1fc63e1a9cb9dfa44c98da2f3d20c2923e5d75ff60b2db9d1bdb0c54d51",
948-
"4d72a04ddeb851a68cd197ef9a92a3e2ff01fbbff638e64929dd1a9c2e150112",
949-
}
950-
binMap[3] = []string{
951-
"646e9540c84f6a2f9cf6585d45a4c219573b4fd1b64a3c9a1386fc5cf98c0d4d",
952-
}
953-
binMap[4] = []string{
954-
"7656caccdc79cd8d7ce66d415cc96a718e8271c62fb35746bfc2b49faf3eebf3",
955-
"76d1e83c71ca246d042e37ff1db181f2776265fbcfdc890ce230bfa617c9c2f0",
956-
"769ce86aa90b518b7ed382f9fdacfbed93574e18dc98fe6c342e4f9f409c2d5a",
957-
}
958-
binMap[5] = []string{
959-
"7a48f75f8ca60487ae42d6f92b785581b40b91f2da551ae73d5eae46640e02e8",
960-
}
961-
binMap[6] = []string{
962-
"7cbd42350bde8e18ae5b955b5450f8e2cef3419f92fbf5598160c60fd78619f0",
963-
}
964-
965-
//create the pivot's kademlia
966-
addr := common.FromHex(pivotAddr)
967-
k := NewKademlia(addr, NewKadParams())
968-
969-
//construct the peers and the kademlia
970-
for _, binaddrs := range binMap {
971-
for _, a := range binaddrs {
972-
addr := common.FromHex(a)
973-
k.On(NewPeer(&BzzPeer{BzzAddr: &BzzAddr{OAddr: addr}}, k))
974-
}
975-
}
976-
977-
//TODO: check kad table is same
978-
//currently k.String() prints date so it will never be the same :)
979-
//--> implement JSON representation of kad table
980-
log.Debug(k.String())
981-
982-
//simulate that we would do subscriptions: just store the bin numbers
983-
fakeSubscriptions := make(map[string][]int)
984-
//define the function which should run for each connection
985-
eachBinFunc := func(p *Peer, bin int) bool {
986-
//get the peer ID
987-
peerstr := fmt.Sprintf("%x", p.Over())
988-
//create the array of bins per peer
989-
if _, ok := fakeSubscriptions[peerstr]; !ok {
990-
fakeSubscriptions[peerstr] = make([]int, 0)
991-
}
992-
//store the (fake) bin subscription
993-
fakeSubscriptions[peerstr] = append(fakeSubscriptions[peerstr], bin)
994-
return true
995-
}
996-
//run the k.EachBin function
997-
k.EachBin(addr[:], pot.DefaultPof(k.MaxProxDisplay), 0, eachBinFunc)
998-
//calculate the kademlia depth
999-
kdepth := k.NeighbourhoodDepth()
1000-
1001-
//now, check that all peers have the expected (fake) subscriptions
1002-
//iterate the bin map
1003-
for bin, peers := range binMap {
1004-
//for every peer...
1005-
for _, peer := range peers {
1006-
//...get its (fake) subscriptions
1007-
fakeSubs := fakeSubscriptions[peer]
1008-
//if the peer's bin is below the kademlia depth...
1009-
if bin < kdepth {
1010-
//(iterate all (fake) subscriptions)
1011-
for _, subbin := range fakeSubs {
1012-
//...only the peer's bin should be "subscribed"
1013-
//(and thus have only one subscription)
1014-
if subbin != bin || len(fakeSubs) != 1 {
1015-
t.Fatalf("Did not get expected subscription for bin < depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin)
1016-
}
1017-
}
1018-
} else { //if the peer's bin is equal or higher than the kademlia depth...
1019-
//(iterate all (fake) subscriptions)
1020-
for i, subbin := range fakeSubs {
1021-
//...each bin from the peer's bin number up to k.MaxProxDisplay should be "subscribed"
1022-
// as we start from depth we can use the iteration index to check
1023-
if subbin != i+kdepth {
1024-
t.Fatalf("Did not get expected subscription for bin > depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin)
1025-
}
1026-
//the last "subscription" should be k.MaxProxDisplay
1027-
if i == len(fakeSubs)-1 && subbin != k.MaxProxDisplay {
1028-
t.Fatalf("Expected last subscription to be: %d, but is: %d", k.MaxProxDisplay, subbin)
1029-
}
1030-
}
1031-
}
1032-
}
1033-
}
1034-
1035-
//print some output
1036-
for p, subs := range fakeSubscriptions {
1037-
log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p))
1038-
for _, bin := range subs {
1039-
log.Debug(fmt.Sprintf("%d,", bin))
1040-
}
1041-
}
1042-
}

swarm/network/stream/snapshot_sync_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -538,10 +538,10 @@ func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
538538
kad := r.delivery.kad
539539
subCnt := 0
540540
//iterate over each bin and solicit needed subscription to bins
541-
kad.EachBin(r.addr[:], pof, 0, func(conn *network.Peer, po int) bool {
541+
kad.EachConn(nil, 255, func(p *network.Peer, po int, _ bool) bool {
542542
//identify begin and start index of the bin(s) we want to subscribe to
543543
subCnt++
544-
err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
544+
err = r.RequestSubscription(conf.addrToIDMap[string(p.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
545545
if err != nil {
546546
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
547547
return false

swarm/network/stream/stream.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/ethereum/go-ethereum/swarm/log"
3434
"github.com/ethereum/go-ethereum/swarm/network"
3535
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
36-
"github.com/ethereum/go-ethereum/swarm/pot"
3736
"github.com/ethereum/go-ethereum/swarm/state"
3837
"github.com/ethereum/go-ethereum/swarm/storage"
3938
)
@@ -467,24 +466,8 @@ func (r *Registry) updateSyncing() {
467466
}
468467
r.peersMu.RUnlock()
469468

470-
// request subscriptions for all nodes and bins
471-
kad.EachBin(r.addr[:], pot.DefaultPof(256), 0, func(p *network.Peer, bin int) bool {
472-
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
473-
474-
// bin is always less then 256 and it is safe to convert it to type uint8
475-
stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
476-
if streams, ok := subs[p.ID()]; ok {
477-
// delete live and history streams from the map, so that it won't be removed with a Quit request
478-
delete(streams, stream)
479-
delete(streams, getHistoryStream(stream))
480-
}
481-
err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
482-
if err != nil {
483-
log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
484-
return false
485-
}
486-
return true
487-
})
469+
// start requesting subscriptions from peers
470+
r.requestPeerSubscriptions(kad, subs, r.doRequestSubscription)
488471

489472
// remove SYNC servers that do not need to be subscribed
490473
for id, streams := range subs {
@@ -505,6 +488,69 @@ func (r *Registry) updateSyncing() {
505488
}
506489
}
507490

491+
// requestPeerSubscriptions calls on each live peer in the kademlia table
492+
// and sends a `RequestSubscription` to peers according to their bin
493+
// and their relationship with kademlia's depth.
494+
// Also check `TestRequestPeerSubscriptions` in order to understand the
495+
// expected behavior.
496+
// The function expects:
497+
// * the kademlia
498+
// * a map of subscriptions
499+
// * the actual function to subscribe
500+
// (in case of the test, it doesn't do real subscriptions)
501+
func (r *Registry) requestPeerSubscriptions(
502+
kad *network.Kademlia,
503+
subs map[enode.ID]map[Stream]struct{},
504+
subscriptionFunc func(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool) {
505+
506+
var startPo int
507+
var endPo int
508+
var ok bool
509+
510+
// kademlia's depth
511+
kadDepth := kad.NeighbourhoodDepth()
512+
// request subscriptions for all nodes and bins
513+
// nil as base takes the node's base; we need to pass 255 as `EachConn` runs
514+
// from deepest bins backwards
515+
kad.EachConn(nil, 255, func(p *network.Peer, po int, _ bool) bool {
516+
//if the peer's bin is shallower than the kademlia depth,
517+
//only the peer's bin should be subscribed
518+
if po < kadDepth {
519+
startPo = po
520+
endPo = po
521+
} else {
522+
//if the peer's bin is equal or deeper than the kademlia depth,
523+
//each bin from the depth up to k.MaxProxDisplay should be subscribed
524+
startPo = kadDepth
525+
endPo = kad.MaxProxDisplay
526+
}
527+
528+
for bin := startPo; bin <= endPo; bin++ {
529+
//do the actual subscription
530+
ok = subscriptionFunc(p, uint8(bin), subs)
531+
}
532+
return ok
533+
})
534+
}
535+
536+
// doRequestSubscription sends the actual RequestSubscription to the peer
537+
func (r *Registry) doRequestSubscription(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
538+
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
539+
// bin is always less then 256 and it is safe to convert it to type uint8
540+
stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
541+
if streams, ok := subs[p.ID()]; ok {
542+
// delete live and history streams from the map, so that it won't be removed with a Quit request
543+
delete(streams, stream)
544+
delete(streams, getHistoryStream(stream))
545+
}
546+
err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High)
547+
if err != nil {
548+
log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream)
549+
return false
550+
}
551+
return true
552+
}
553+
508554
func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
509555
peer := protocols.NewPeer(p, rw, r.spec)
510556
bp := network.NewBzzPeer(peer)

0 commit comments

Comments
 (0)