Skip to content

Commit 689aad2

Browse files
committed
swarm/network/stream: address PR comments
1 parent c32d961 commit 689aad2

File tree

3 files changed

+22
-284
lines changed

3 files changed

+22
-284
lines changed

swarm/network/stream/snapshot_sync_test.go

Lines changed: 0 additions & 266 deletions
Original file line numberDiff line numberDiff line change
@@ -106,43 +106,6 @@ func TestSyncingViaGlobalSync(t *testing.T) {
106106
}
107107
}
108108

109-
func TestSyncingViaDirectSubscribe(t *testing.T) {
110-
if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
111-
t.Skip("Flaky on mac on travis")
112-
}
113-
//if nodes/chunks have been provided via commandline,
114-
//run the tests with these values
115-
if *nodes != 0 && *chunks != 0 {
116-
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
117-
err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
118-
if err != nil {
119-
t.Fatal(err)
120-
}
121-
} else {
122-
var nodeCnt []int
123-
var chnkCnt []int
124-
//if the `longrunning` flag has been provided
125-
//run more test combinations
126-
if *longrunning {
127-
chnkCnt = []int{1, 8, 32, 256, 1024}
128-
nodeCnt = []int{32, 16}
129-
} else {
130-
//default test
131-
chnkCnt = []int{4, 32}
132-
nodeCnt = []int{32, 16}
133-
}
134-
for _, chnk := range chnkCnt {
135-
for _, n := range nodeCnt {
136-
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
137-
err := testSyncingViaDirectSubscribe(t, chnk, n)
138-
if err != nil {
139-
t.Fatal(err)
140-
}
141-
}
142-
}
143-
}
144-
}
145-
146109
var simServiceMap = map[string]simulation.ServiceFunc{
147110
"streamer": streamerFunc,
148111
}
@@ -323,235 +286,6 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
323286
})
324287
}
325288

326-
/*
327-
The test generates the given number of chunks
328-
329-
For every chunk generated, the nearest node addresses
330-
are identified, we verify that the nodes closer to the
331-
chunk addresses actually do have the chunks in their local stores.
332-
333-
The test loads a snapshot file to construct the swarm network,
334-
assuming that the snapshot file identifies a healthy
335-
kademlia network. The snapshot should have 'streamer' in its service list.
336-
*/
337-
func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
338-
339-
sim := simulation.New(map[string]simulation.ServiceFunc{
340-
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
341-
n := ctx.Config.Node()
342-
addr := network.NewAddr(n)
343-
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
344-
if err != nil {
345-
return nil, nil, err
346-
}
347-
bucket.Store(bucketKeyStore, store)
348-
localStore := store.(*storage.LocalStore)
349-
netStore, err := storage.NewNetStore(localStore, nil)
350-
if err != nil {
351-
return nil, nil, err
352-
}
353-
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
354-
delivery := NewDelivery(kad, netStore)
355-
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
356-
357-
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
358-
Retrieval: RetrievalDisabled,
359-
Syncing: SyncingRegisterOnly,
360-
}, nil)
361-
bucket.Store(bucketKeyRegistry, r)
362-
363-
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
364-
bucket.Store(bucketKeyFileStore, fileStore)
365-
366-
cleanup = func() {
367-
os.RemoveAll(datadir)
368-
netStore.Close()
369-
r.Close()
370-
}
371-
372-
return r, cleanup, nil
373-
374-
},
375-
})
376-
defer sim.Close()
377-
378-
ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
379-
defer cancelSimRun()
380-
381-
conf := &synctestConfig{}
382-
//map of discover ID to indexes of chunks expected at that ID
383-
conf.idToChunksMap = make(map[enode.ID][]int)
384-
//map of overlay address to discover ID
385-
conf.addrToIDMap = make(map[string]enode.ID)
386-
//array where the generated chunk hashes will be stored
387-
conf.hashes = make([]storage.Address, 0)
388-
389-
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
390-
if err != nil {
391-
return err
392-
}
393-
394-
if _, err := sim.WaitTillHealthy(ctx); err != nil {
395-
return err
396-
}
397-
398-
disconnections := sim.PeerEvents(
399-
context.Background(),
400-
sim.NodeIDs(),
401-
simulation.NewPeerEventsFilter().Drop(),
402-
)
403-
404-
var disconnected atomic.Value
405-
go func() {
406-
for d := range disconnections {
407-
if d.Error != nil {
408-
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
409-
disconnected.Store(true)
410-
}
411-
}
412-
}()
413-
414-
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
415-
nodeIDs := sim.UpNodeIDs()
416-
for _, n := range nodeIDs {
417-
//get the kademlia overlay address from this ID
418-
a := n.Bytes()
419-
//append it to the array of all overlay addresses
420-
conf.addrs = append(conf.addrs, a)
421-
//the proximity calculation is on overlay addr,
422-
//the p2p/simulations check func triggers on enode.ID,
423-
//so we need to know which overlay addr maps to which nodeID
424-
conf.addrToIDMap[string(a)] = n
425-
}
426-
427-
var subscriptionCount int
428-
429-
filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
430-
eventC := sim.PeerEvents(ctx, nodeIDs, filter)
431-
432-
for j, node := range nodeIDs {
433-
log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
434-
//start syncing!
435-
item, ok := sim.NodeItem(node, bucketKeyRegistry)
436-
if !ok {
437-
return fmt.Errorf("No registry")
438-
}
439-
registry := item.(*Registry)
440-
441-
var cnt int
442-
cnt, err = startSyncing(registry, conf)
443-
if err != nil {
444-
return err
445-
}
446-
//increment the number of subscriptions we need to wait for
447-
//by the count returned from startSyncing (SYNC subscriptions)
448-
subscriptionCount += cnt
449-
}
450-
451-
for e := range eventC {
452-
if e.Error != nil {
453-
return e.Error
454-
}
455-
subscriptionCount--
456-
if subscriptionCount == 0 {
457-
break
458-
}
459-
}
460-
//select a random node for upload
461-
node := sim.Net.GetRandomUpNode()
462-
item, ok := sim.NodeItem(node.ID(), bucketKeyStore)
463-
if !ok {
464-
return fmt.Errorf("No localstore")
465-
}
466-
lstore := item.(*storage.LocalStore)
467-
hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
468-
if err != nil {
469-
return err
470-
}
471-
conf.hashes = append(conf.hashes, hashes...)
472-
mapKeysToNodes(conf)
473-
474-
if _, err := sim.WaitTillHealthy(ctx); err != nil {
475-
return err
476-
}
477-
478-
var globalStore mock.GlobalStorer
479-
if *useMockStore {
480-
globalStore = mockmem.NewGlobalStore()
481-
}
482-
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
483-
// or until the timeout is reached.
484-
REPEAT:
485-
for {
486-
for _, id := range nodeIDs {
487-
//for each expected chunk, check if it is in the local store
488-
localChunks := conf.idToChunksMap[id]
489-
for _, ch := range localChunks {
490-
//get the real chunk by the index in the index array
491-
chunk := conf.hashes[ch]
492-
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
493-
//check if the expected chunk is indeed in the localstore
494-
var err error
495-
if *useMockStore {
496-
//use the globalStore if the mockStore should be used; in that case,
497-
//the complete localStore stack is bypassed for getting the chunk
498-
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
499-
} else {
500-
//use the actual localstore
501-
item, ok := sim.NodeItem(id, bucketKeyStore)
502-
if !ok {
503-
return fmt.Errorf("Error accessing localstore")
504-
}
505-
lstore := item.(*storage.LocalStore)
506-
_, err = lstore.Get(ctx, chunk)
507-
}
508-
if err != nil {
509-
log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
510-
// Do not get crazy with logging the warn message
511-
time.Sleep(500 * time.Millisecond)
512-
continue REPEAT
513-
}
514-
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
515-
}
516-
}
517-
return nil
518-
}
519-
})
520-
521-
if result.Error != nil {
522-
return result.Error
523-
}
524-
525-
if yes, ok := disconnected.Load().(bool); ok && yes {
526-
t.Fatal("disconnect events received")
527-
}
528-
log.Info("Simulation ended")
529-
return nil
530-
}
531-
532-
//the server func to start syncing
533-
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
534-
//the kademlia's `EachBin` function.
535-
//returns the number of subscriptions requested
536-
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
537-
var err error
538-
kad := r.delivery.kad
539-
subCnt := 0
540-
//iterate over each bin and solicit needed subscription to bins
541-
kad.EachConn(nil, 255, func(p *network.Peer, po int, _ bool) bool {
542-
//identify begin and start index of the bin(s) we want to subscribe to
543-
subCnt++
544-
err = r.RequestSubscription(conf.addrToIDMap[string(p.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
545-
if err != nil {
546-
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
547-
return false
548-
}
549-
return true
550-
551-
})
552-
return subCnt, nil
553-
}
554-
555289
//map chunk keys to addresses which are responsible
556290
func mapKeysToNodes(conf *synctestConfig) {
557291
nodemap := make(map[string][]int)

swarm/network/stream/stream.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,13 @@ type Registry struct {
8787
intervalsStore state.Store
8888
autoRetrieval bool // automatically subscribe to retrieve request stream
8989
maxPeerServers int
90-
balance protocols.Balance // implements protocols.Balance, for accounting
91-
prices protocols.Prices // implements protocols.Prices, provides prices to accounting
92-
spec *protocols.Spec // this protocol's spec
90+
spec *protocols.Spec //this protocol's spec
91+
balance protocols.Balance //implements protocols.Balance, for accounting
92+
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
93+
// the subscriptionFunc is used to determine what to do in order to perform subscriptions
94+
// usually we would start to really subscribe to nodes, but for tests other functionality may be needed
95+
// (see TestRequestPeerSubscriptions in streamer_test.go)
96+
subscriptionFunc func(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool
9397
}
9498

9599
// RegistryOptions holds optional values for NewRegistry constructor.
@@ -124,6 +128,8 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
124128
maxPeerServers: options.MaxPeerServers,
125129
balance: balance,
126130
}
131+
//assign the default subscription func: actually do request subscriptions from nodes
132+
streamer.subscriptionFunc = streamer.doRequestSubscription
127133
streamer.setupSpec()
128134

129135
streamer.api = NewAPI(streamer)
@@ -467,7 +473,7 @@ func (r *Registry) updateSyncing() {
467473
r.peersMu.RUnlock()
468474

469475
// start requesting subscriptions from peers
470-
r.requestPeerSubscriptions(kad, subs, r.doRequestSubscription)
476+
r.requestPeerSubscriptions(kad, subs)
471477

472478
// remove SYNC servers that do not need to be subscribed
473479
for id, streams := range subs {
@@ -498,10 +504,7 @@ func (r *Registry) updateSyncing() {
498504
// * a map of subscriptions
499505
// * the actual function to subscribe
500506
// (in case of the test, it doesn't do real subscriptions)
501-
func (r *Registry) requestPeerSubscriptions(
502-
kad *network.Kademlia,
503-
subs map[enode.ID]map[Stream]struct{},
504-
subscriptionFunc func(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool) {
507+
func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) {
505508

506509
var startPo int
507510
var endPo int
@@ -527,7 +530,7 @@ func (r *Registry) requestPeerSubscriptions(
527530

528531
for bin := startPo; bin <= endPo; bin++ {
529532
//do the actual subscription
530-
ok = subscriptionFunc(p, uint8(bin), subs)
533+
ok = r.subscriptionFunc(p, uint8(bin), subs)
531534
}
532535
return ok
533536
})
@@ -537,7 +540,7 @@ func (r *Registry) requestPeerSubscriptions(
537540
func (r *Registry) doRequestSubscription(p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool {
538541
log.Debug(fmt.Sprintf("Requesting subscription by: registry %s from peer %s for bin: %d", r.addr, p.ID(), bin))
539542
// bin is always less then 256 and it is safe to convert it to type uint8
540-
stream := NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true)
543+
stream := NewStream("SYNC", FormatSyncBinKey(bin), true)
541544
if streams, ok := subs[p.ID()]; ok {
542545
// delete live and history streams from the map, so that it won't be removed with a Quit request
543546
delete(streams, stream)

swarm/network/stream/streamer_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -962,7 +962,7 @@ func TestHasPriceImplementation(t *testing.T) {
962962
TestRequestPeerSubscriptions is a unit test for stream's pull sync subscriptions.
963963
964964
The test does:
965-
* assign each known connected peer to a bin map
965+
* assign each connected peer to a bin map
966966
* build up a known kademlia in advance
967967
* run the EachConn function, which returns supposed subscription bins
968968
* store all supposed bins per peer in a map
@@ -1067,7 +1067,8 @@ func TestRequestPeerSubscriptions(t *testing.T) {
10671067
// create just a simple Registry object in order to be able to call...
10681068
r := &Registry{}
10691069
// ...the requestPeerSubscriptions function, which contains the logic for subscriptions
1070-
r.requestPeerSubscriptions(k, nil, requestSubscriptionFunc)
1070+
r.subscriptionFunc = requestSubscriptionFunc
1071+
r.requestPeerSubscriptions(k, nil)
10711072
// calculate the kademlia depth
10721073
kdepth := k.NeighbourhoodDepth()
10731074

@@ -1077,27 +1078,27 @@ func TestRequestPeerSubscriptions(t *testing.T) {
10771078
// for every peer...
10781079
for _, peer := range peers {
10791080
// ...get its (fake) subscriptions
1080-
fakeSubs := fakeSubscriptions[peer]
1081-
// if the peer's bin is below the kademlia depth...
1081+
fakeSubsForPeer := fakeSubscriptions[peer]
1082+
// if the peer's bin is shallower than the kademlia depth...
10821083
if bin < kdepth {
10831084
// (iterate all (fake) subscriptions)
1084-
for _, subbin := range fakeSubs {
1085+
for _, subbin := range fakeSubsForPeer {
10851086
// ...only the peer's bin should be "subscribed"
10861087
// (and thus have only one subscription)
1087-
if subbin != bin || len(fakeSubs) != 1 {
1088+
if subbin != bin || len(fakeSubsForPeer) != 1 {
10881089
t.Fatalf("Did not get expected subscription for bin < depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin)
10891090
}
10901091
}
10911092
} else { //if the peer's bin is equal or higher than the kademlia depth...
10921093
// (iterate all (fake) subscriptions)
1093-
for i, subbin := range fakeSubs {
1094+
for i, subbin := range fakeSubsForPeer {
10941095
// ...each bin from the peer's bin number up to k.MaxProxDisplay should be "subscribed"
10951096
// as we start from depth we can use the iteration index to check
10961097
if subbin != i+kdepth {
10971098
t.Fatalf("Did not get expected subscription for bin > depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin)
10981099
}
10991100
// the last "subscription" should be k.MaxProxDisplay
1100-
if i == len(fakeSubs)-1 && subbin != k.MaxProxDisplay {
1101+
if i == len(fakeSubsForPeer)-1 && subbin != k.MaxProxDisplay {
11011102
t.Fatalf("Expected last subscription to be: %d, but is: %d", k.MaxProxDisplay, subbin)
11021103
}
11031104
}

0 commit comments

Comments
 (0)