From 58463cc5f4452eb0e4b92eb2bc0b47d13699d375 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 6 Jan 2020 15:18:21 +0800 Subject: [PATCH 1/3] les: move execqueue into utilities package execqueue is a util for executing queued functions in a serial order which is used by both les server and les client. Move it to common package. --- les/peer.go | 15 ++++---- les/{execqueue.go => utilities/exec_queue.go} | 37 ++++++++++--------- .../exec_queue_test.go} | 20 +++++----- 3 files changed, 36 insertions(+), 36 deletions(-) rename les/{execqueue.go => utilities/exec_queue.go} (71%) rename les/{execqueue_test.go => utilities/exec_queue_test.go} (83%) diff --git a/les/peer.go b/les/peer.go index d308fd249efb..579787313c89 100644 --- a/les/peer.go +++ b/les/peer.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" + "github.com/ethereum/go-ethereum/les/utilities" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -135,7 +136,7 @@ type peerCommons struct { headInfo blockInfo // Latest block information. // Background task queue for caching peer tasks and executing in order. - sendQueue *execQueue + sendQueue *utilities.ExecQueue // Flow control agreement. fcParams flowcontrol.ServerParams // The config for token bucket. @@ -153,13 +154,13 @@ func (p *peerCommons) isFrozen() bool { // canQueue returns an indicator whether the peer can queue a operation. func (p *peerCommons) canQueue() bool { - return p.sendQueue.canQueue() && !p.isFrozen() + return p.sendQueue.CanQueue() && !p.isFrozen() } // queueSend caches a peer operation in the background task queue. // Please ensure to check `canQueue` before call this function func (p *peerCommons) queueSend(f func()) bool { - return p.sendQueue.queue(f) + return p.sendQueue.Queue(f) } // mustQueueSend starts a for loop and retry the caching if failed. @@ -337,7 +338,7 @@ func (p *peerCommons) handshake(td *big.Int, head common.Hash, headNum uint64, g // close closes the channel and notifies all background routines to exit. func (p *peerCommons) close() { close(p.closeCh) - p.sendQueue.quit() + p.sendQueue.Quit() } // serverPeer represents each node to which the client is connected. @@ -375,7 +376,7 @@ func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2 id: peerIdToString(p.ID()), version: version, network: network, - sendQueue: newExecQueue(100), + sendQueue: utilities.NewExecQueue(100), closeCh: make(chan struct{}), }, trusted: trusted, @@ -407,7 +408,7 @@ func (p *serverPeer) rejectUpdate(size uint64) bool { // frozen. func (p *serverPeer) freeze() { if atomic.CompareAndSwapUint32(&p.frozen, 0, 1) { - p.sendQueue.clear() + p.sendQueue.Clear() } } @@ -652,7 +653,7 @@ func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWrite id: peerIdToString(p.ID()), version: version, network: network, - sendQueue: newExecQueue(100), + sendQueue: utilities.NewExecQueue(100), closeCh: make(chan struct{}), }, errCh: make(chan error, 1), diff --git a/les/execqueue.go b/les/utilities/exec_queue.go similarity index 71% rename from les/execqueue.go rename to les/utilities/exec_queue.go index e0c88a990f07..8a771780516e 100644 --- a/les/execqueue.go +++ b/les/utilities/exec_queue.go @@ -14,35 +14,35 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utilities import "sync" -// execQueue implements a queue that executes function calls in a single thread, +// ExecQueue implements a queue that executes function calls in a single thread, // in the same order as they have been queued. -type execQueue struct { +type ExecQueue struct { mu sync.Mutex cond *sync.Cond funcs []func() closeWait chan struct{} } -// newExecQueue creates a new execution queue. -func newExecQueue(capacity int) *execQueue { - q := &execQueue{funcs: make([]func(), 0, capacity)} +// NewExecQueue creates a new execution Queue. +func NewExecQueue(capacity int) *ExecQueue { + q := &ExecQueue{funcs: make([]func(), 0, capacity)} q.cond = sync.NewCond(&q.mu) go q.loop() return q } -func (q *execQueue) loop() { +func (q *ExecQueue) loop() { for f := q.waitNext(false); f != nil; f = q.waitNext(true) { f() } close(q.closeWait) } -func (q *execQueue) waitNext(drop bool) (f func()) { +func (q *ExecQueue) waitNext(drop bool) (f func()) { q.mu.Lock() if drop && len(q.funcs) > 0 { // Remove the function that just executed. We do this here instead of when @@ -60,20 +60,20 @@ func (q *execQueue) waitNext(drop bool) (f func()) { return f } -func (q *execQueue) isClosed() bool { +func (q *ExecQueue) isClosed() bool { return q.closeWait != nil } -// canQueue returns true if more function calls can be added to the execution queue. -func (q *execQueue) canQueue() bool { +// CanQueue returns true if more function calls can be added to the execution Queue. +func (q *ExecQueue) CanQueue() bool { q.mu.Lock() ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) q.mu.Unlock() return ok } -// queue adds a function call to the execution queue. Returns true if successful. -func (q *execQueue) queue(f func()) bool { +// Queue adds a function call to the execution Queue. Returns true if successful. +func (q *ExecQueue) Queue(f func()) bool { q.mu.Lock() ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) if ok { @@ -84,16 +84,17 @@ func (q *execQueue) queue(f func()) bool { return ok } -// clear drops all queued functions -func (q *execQueue) clear() { +// Clear drops all queued functions. +func (q *ExecQueue) Clear() { q.mu.Lock() q.funcs = q.funcs[:0] q.mu.Unlock() } -// quit stops the exec queue. -// quit waits for the current execution to finish before returning. -func (q *execQueue) quit() { +// Quit stops the exec Queue. +// +// Quit waits for the current execution to finish before returning. +func (q *ExecQueue) Quit() { q.mu.Lock() if !q.isClosed() { q.closeWait = make(chan struct{}) diff --git a/les/execqueue_test.go b/les/utilities/exec_queue_test.go similarity index 83% rename from les/execqueue_test.go rename to les/utilities/exec_queue_test.go index cd45b03f221d..81a04c381d5d 100644 --- a/les/execqueue_test.go +++ b/les/utilities/exec_queue_test.go @@ -14,21 +14,19 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utilities -import ( - "testing" -) +import "testing" func TestExecQueue(t *testing.T) { var ( N = 10000 - q = newExecQueue(N) + q = NewExecQueue(N) counter int execd = make(chan int) testexit = make(chan struct{}) ) - defer q.quit() + defer q.Quit() defer close(testexit) check := func(state string, wantOK bool) { @@ -40,11 +38,11 @@ func TestExecQueue(t *testing.T) { case <-testexit: } } - if q.canQueue() != wantOK { - t.Fatalf("canQueue() == %t for %s", !wantOK, state) + if q.CanQueue() != wantOK { + t.Fatalf("CanQueue() == %t for %s", !wantOK, state) } - if q.queue(qf) != wantOK { - t.Fatalf("canQueue() == %t for %s", !wantOK, state) + if q.Queue(qf) != wantOK { + t.Fatalf("Queue() == %t for %s", !wantOK, state) } } @@ -57,6 +55,6 @@ func TestExecQueue(t *testing.T) { t.Fatal("execution out of order") } } - q.quit() + q.Quit() check("closed queue", false) } From 21111205515ddb28d62305c6c719214cef1da43c Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 6 Jan 2020 15:36:41 +0800 Subject: [PATCH 2/3] les: move randselect to utilities package weighted_random_selector is a helpful tool for randomly select items maintained in a set but based on the item weight. It's used anywhere is LES package, mainly by les client but will be used in les server with very high chance. So move it into a common package as the second step for les separation. --- les/distributor.go | 9 ++-- les/serverpool.go | 39 ++++++++------- .../weighted_select.go} | 50 +++++++++---------- .../weighted_select_test.go} | 10 ++-- 4 files changed, 54 insertions(+), 54 deletions(-) rename les/{randselect.go => utilities/weighted_select.go} (82%) rename les/{randselect_test.go => utilities/weighted_select_test.go} (92%) diff --git a/les/distributor.go b/les/distributor.go index 4d2be1b8f9f0..fab1d3f1b862 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -22,6 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/les/utilities" ) // requestDistributor implements a mechanism that distributes requests to @@ -194,7 +195,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { elem := d.reqQueue.Front() var ( bestWait time.Duration - sel *weightedRandomSelect + sel *utilities.WeightedRandomSelect ) d.peerLock.RLock() @@ -219,9 +220,9 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { wait, bufRemain := peer.waitBefore(cost) if wait == 0 { if sel == nil { - sel = newWeightedRandomSelect() + sel = utilities.NewWeightedRandomSelect() } - sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) + sel.Update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) } else { if bestWait == 0 || wait < bestWait { bestWait = wait @@ -239,7 +240,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { } if sel != nil { - c := sel.choose().(selectPeerItem) + c := sel.Choose().(selectPeerItem) return c.peer, c.req, 0 } return nil, nil, bestWait diff --git a/les/serverpool.go b/les/serverpool.go index ec99a2d98298..c72353825db1 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/les/utilities" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -129,7 +130,7 @@ type serverPool struct { adjustStats chan poolStatAdjust knownQueue, newQueue poolEntryQueue - knownSelect, newSelect *weightedRandomSelect + knownSelect, newSelect *utilities.WeightedRandomSelect knownSelected, newSelected int fastDiscover bool connCh chan *connReq @@ -152,8 +153,8 @@ func newServerPool(db ethdb.Database, ulcServers []string) *serverPool { disconnCh: make(chan *disconnReq), registerCh: make(chan *registerReq), closeCh: make(chan struct{}), - knownSelect: newWeightedRandomSelect(), - newSelect: newWeightedRandomSelect(), + knownSelect: utilities.NewWeightedRandomSelect(), + newSelect: utilities.NewWeightedRandomSelect(), fastDiscover: true, trustedNodes: parseTrustedNodes(ulcServers), } @@ -402,8 +403,8 @@ func (pool *serverPool) eventLoop() { entry.lastConnected = addr entry.addr = make(map[string]*poolEntryAddress) entry.addr[addr.strKey()] = addr - entry.addrSelect = *newWeightedRandomSelect() - entry.addrSelect.update(addr) + entry.addrSelect = *utilities.NewWeightedRandomSelect() + entry.addrSelect.Update(addr) req.result <- entry } @@ -459,7 +460,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { entry = &poolEntry{ node: node, addr: make(map[string]*poolEntryAddress), - addrSelect: *newWeightedRandomSelect(), + addrSelect: *utilities.NewWeightedRandomSelect(), shortRetry: shortRetryCnt, } pool.entries[node.ID()] = entry @@ -477,7 +478,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { entry.addr[addr.strKey()] = addr } addr.lastSeen = now - entry.addrSelect.update(addr) + entry.addrSelect.Update(addr) if !entry.known { pool.newQueue.setLatest(entry) } @@ -505,7 +506,7 @@ func (pool *serverPool) loadNodes() { pool.entries[e.node.ID()] = e if pool.trustedNodes[e.node.ID()] == nil { pool.knownQueue.setLatest(e) - pool.knownSelect.update((*knownEntry)(e)) + pool.knownSelect.Update((*knownEntry)(e)) } } } @@ -556,8 +557,8 @@ func (pool *serverPool) saveNodes() { // Note that it is called by the new/known queues from which the entry has already // been removed so removing it from the queues is not necessary. func (pool *serverPool) removeEntry(entry *poolEntry) { - pool.newSelect.remove((*discoveredEntry)(entry)) - pool.knownSelect.remove((*knownEntry)(entry)) + pool.newSelect.Remove((*discoveredEntry)(entry)) + pool.knownSelect.Remove((*knownEntry)(entry)) entry.removed = true delete(pool.entries, entry.node.ID()) } @@ -586,8 +587,8 @@ func (pool *serverPool) setRetryDial(entry *poolEntry) { // updateCheckDial is called when an entry can potentially be dialed again. It updates // its selection weights and checks if new dials can/should be made. func (pool *serverPool) updateCheckDial(entry *poolEntry) { - pool.newSelect.update((*discoveredEntry)(entry)) - pool.knownSelect.update((*knownEntry)(entry)) + pool.newSelect.Update((*discoveredEntry)(entry)) + pool.knownSelect.Update((*knownEntry)(entry)) pool.checkDial() } @@ -596,7 +597,7 @@ func (pool *serverPool) updateCheckDial(entry *poolEntry) { func (pool *serverPool) checkDial() { fillWithKnownSelects := !pool.fastDiscover for pool.knownSelected < targetKnownSelect { - entry := pool.knownSelect.choose() + entry := pool.knownSelect.Choose() if entry == nil { fillWithKnownSelects = false break @@ -604,7 +605,7 @@ func (pool *serverPool) checkDial() { pool.dial((*poolEntry)(entry.(*knownEntry)), true) } for pool.knownSelected+pool.newSelected < targetServerCount { - entry := pool.newSelect.choose() + entry := pool.newSelect.Choose() if entry == nil { break } @@ -615,7 +616,7 @@ func (pool *serverPool) checkDial() { // is over, we probably won't find more in the near future so select more // known entries if possible for pool.knownSelected < targetServerCount { - entry := pool.knownSelect.choose() + entry := pool.knownSelect.Choose() if entry == nil { break } @@ -636,7 +637,7 @@ func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { } else { pool.newSelected++ } - addr := entry.addrSelect.choose().(*poolEntryAddress) + addr := entry.addrSelect.Choose().(*poolEntryAddress) log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) entry.dialed = addr go func() { @@ -684,7 +685,7 @@ type poolEntry struct { addr map[string]*poolEntryAddress node *enode.Node lastConnected, dialed *poolEntryAddress - addrSelect weightedRandomSelect + addrSelect utilities.WeightedRandomSelect lastDiscovered mclock.AbsTime known, knownSelected, trusted bool @@ -734,8 +735,8 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port)) e.addr = make(map[string]*poolEntryAddress) e.addr[addr.strKey()] = addr - e.addrSelect = *newWeightedRandomSelect() - e.addrSelect.update(addr) + e.addrSelect = *utilities.NewWeightedRandomSelect() + e.addrSelect.Update(addr) e.lastConnected = addr e.connectStats = entry.CStat e.delayStats = entry.DStat diff --git a/les/randselect.go b/les/utilities/weighted_select.go similarity index 82% rename from les/randselect.go rename to les/utilities/weighted_select.go index 8efe0c94d397..85dd13cb5ec6 100644 --- a/les/randselect.go +++ b/les/utilities/weighted_select.go @@ -14,43 +14,30 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utilities -import ( - "math/rand" -) +import "math/rand" // wrsItem interface should be implemented by any entries that are to be selected from -// a weightedRandomSelect set. Note that recalculating monotonously decreasing item -// weights on-demand (without constantly calling update) is allowed +// a WeightedRandomSelect set. Note that recalculating monotonously decreasing item +// weights on-demand (without constantly calling Update) is allowed type wrsItem interface { Weight() int64 } -// weightedRandomSelect is capable of weighted random selection from a set of items -type weightedRandomSelect struct { +// WeightedRandomSelect is capable of weighted random selection from a set of items +type WeightedRandomSelect struct { root *wrsNode idx map[wrsItem]int } -// newWeightedRandomSelect returns a new weightedRandomSelect structure -func newWeightedRandomSelect() *weightedRandomSelect { - return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} -} - -// update updates an item's weight, adds it if it was non-existent or removes it if -// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. -func (w *weightedRandomSelect) update(item wrsItem) { - w.setWeight(item, item.Weight()) -} - -// remove removes an item from the set -func (w *weightedRandomSelect) remove(item wrsItem) { - w.setWeight(item, 0) +// NewWeightedRandomSelect returns a new WeightedRandomSelect structure +func NewWeightedRandomSelect() *WeightedRandomSelect { + return &WeightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} } // setWeight sets an item's weight to a specific value (removes it if zero) -func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { +func (w *WeightedRandomSelect) setWeight(item wrsItem, weight int64) { idx, ok := w.idx[item] if ok { w.root.setWeight(idx, weight) @@ -71,11 +58,22 @@ func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { } } -// choose randomly selects an item from the set, with a chance proportional to its +// Update updates an item's weight, adds it if it was non-existent or removes it if +// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. +func (w *WeightedRandomSelect) Update(item wrsItem) { + w.setWeight(item, item.Weight()) +} + +// Remove removes an item from the set +func (w *WeightedRandomSelect) Remove(item wrsItem) { + w.setWeight(item, 0) +} + +// Choose randomly selects an item from the set, with a chance proportional to its // current weight. If the weight of the chosen element has been decreased since the // last stored value, returns it with a newWeight/oldWeight chance, otherwise just // updates its weight and selects another one -func (w *weightedRandomSelect) choose() wrsItem { +func (w *WeightedRandomSelect) Choose() wrsItem { for { if w.root.sumWeight == 0 { return nil @@ -154,7 +152,7 @@ func (n *wrsNode) setWeight(idx int, weight int64) int64 { return diff } -// choose recursively selects an item from the tree and returns it along with its weight +// Choose recursively selects an item from the tree and returns it along with its weight func (n *wrsNode) choose(val int64) (wrsItem, int64) { for i, w := range n.weights { if val < w { diff --git a/les/randselect_test.go b/les/utilities/weighted_select_test.go similarity index 92% rename from les/randselect_test.go rename to les/utilities/weighted_select_test.go index 9ae7726ddd09..4e81c0c0863d 100644 --- a/les/randselect_test.go +++ b/les/utilities/weighted_select_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package les +package utilities import ( "math/rand" @@ -36,15 +36,15 @@ func (t *testWrsItem) Weight() int64 { func TestWeightedRandomSelect(t *testing.T) { testFn := func(cnt int) { - s := newWeightedRandomSelect() + s := NewWeightedRandomSelect() w := -1 list := make([]testWrsItem, cnt) for i := range list { list[i] = testWrsItem{idx: i, widx: &w} - s.update(&list[i]) + s.Update(&list[i]) } w = rand.Intn(cnt) - c := s.choose() + c := s.Choose() if c == nil { t.Errorf("expected item, got nil") } else { @@ -53,7 +53,7 @@ func TestWeightedRandomSelect(t *testing.T) { } } w = -2 - if s.choose() != nil { + if s.Choose() != nil { t.Errorf("expected nil, got item") } } From 6b7a2569f0d50d84caa0f66146dcf6340a01779a Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 31 Mar 2020 17:24:28 +0800 Subject: [PATCH 3/3] les: rename to utils --- les/distributor.go | 6 +++--- les/peer.go | 8 ++++---- les/serverpool.go | 16 ++++++++-------- les/{utilities => utils}/exec_queue.go | 2 +- les/{utilities => utils}/exec_queue_test.go | 2 +- les/{utilities => utils}/weighted_select.go | 2 +- les/{utilities => utils}/weighted_select_test.go | 2 +- 7 files changed, 19 insertions(+), 19 deletions(-) rename les/{utilities => utils}/exec_queue.go (99%) rename les/{utilities => utils}/exec_queue_test.go (98%) rename les/{utilities => utils}/weighted_select.go (99%) rename les/{utilities => utils}/weighted_select_test.go (98%) diff --git a/les/distributor.go b/les/distributor.go index fab1d3f1b862..97d2ccdfea7a 100644 --- a/les/distributor.go +++ b/les/distributor.go @@ -22,7 +22,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/les/utilities" + "github.com/ethereum/go-ethereum/les/utils" ) // requestDistributor implements a mechanism that distributes requests to @@ -195,7 +195,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { elem := d.reqQueue.Front() var ( bestWait time.Duration - sel *utilities.WeightedRandomSelect + sel *utils.WeightedRandomSelect ) d.peerLock.RLock() @@ -220,7 +220,7 @@ func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { wait, bufRemain := peer.waitBefore(cost) if wait == 0 { if sel == nil { - sel = utilities.NewWeightedRandomSelect() + sel = utils.NewWeightedRandomSelect() } sel.Update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) } else { diff --git a/les/peer.go b/les/peer.go index 579787313c89..c2f4235eb486 100644 --- a/les/peer.go +++ b/les/peer.go @@ -32,7 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/les/flowcontrol" - "github.com/ethereum/go-ethereum/les/utilities" + "github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" @@ -136,7 +136,7 @@ type peerCommons struct { headInfo blockInfo // Latest block information. // Background task queue for caching peer tasks and executing in order. - sendQueue *utilities.ExecQueue + sendQueue *utils.ExecQueue // Flow control agreement. fcParams flowcontrol.ServerParams // The config for token bucket. @@ -376,7 +376,7 @@ func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2 id: peerIdToString(p.ID()), version: version, network: network, - sendQueue: utilities.NewExecQueue(100), + sendQueue: utils.NewExecQueue(100), closeCh: make(chan struct{}), }, trusted: trusted, @@ -653,7 +653,7 @@ func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWrite id: peerIdToString(p.ID()), version: version, network: network, - sendQueue: utilities.NewExecQueue(100), + sendQueue: utils.NewExecQueue(100), closeCh: make(chan struct{}), }, errCh: make(chan error, 1), diff --git a/les/serverpool.go b/les/serverpool.go index c72353825db1..d1c53295a7a8 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -30,7 +30,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/les/utilities" + "github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -130,7 +130,7 @@ type serverPool struct { adjustStats chan poolStatAdjust knownQueue, newQueue poolEntryQueue - knownSelect, newSelect *utilities.WeightedRandomSelect + knownSelect, newSelect *utils.WeightedRandomSelect knownSelected, newSelected int fastDiscover bool connCh chan *connReq @@ -153,8 +153,8 @@ func newServerPool(db ethdb.Database, ulcServers []string) *serverPool { disconnCh: make(chan *disconnReq), registerCh: make(chan *registerReq), closeCh: make(chan struct{}), - knownSelect: utilities.NewWeightedRandomSelect(), - newSelect: utilities.NewWeightedRandomSelect(), + knownSelect: utils.NewWeightedRandomSelect(), + newSelect: utils.NewWeightedRandomSelect(), fastDiscover: true, trustedNodes: parseTrustedNodes(ulcServers), } @@ -403,7 +403,7 @@ func (pool *serverPool) eventLoop() { entry.lastConnected = addr entry.addr = make(map[string]*poolEntryAddress) entry.addr[addr.strKey()] = addr - entry.addrSelect = *utilities.NewWeightedRandomSelect() + entry.addrSelect = *utils.NewWeightedRandomSelect() entry.addrSelect.Update(addr) req.result <- entry } @@ -460,7 +460,7 @@ func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { entry = &poolEntry{ node: node, addr: make(map[string]*poolEntryAddress), - addrSelect: *utilities.NewWeightedRandomSelect(), + addrSelect: *utils.NewWeightedRandomSelect(), shortRetry: shortRetryCnt, } pool.entries[node.ID()] = entry @@ -685,7 +685,7 @@ type poolEntry struct { addr map[string]*poolEntryAddress node *enode.Node lastConnected, dialed *poolEntryAddress - addrSelect utilities.WeightedRandomSelect + addrSelect utils.WeightedRandomSelect lastDiscovered mclock.AbsTime known, knownSelected, trusted bool @@ -735,7 +735,7 @@ func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port)) e.addr = make(map[string]*poolEntryAddress) e.addr[addr.strKey()] = addr - e.addrSelect = *utilities.NewWeightedRandomSelect() + e.addrSelect = *utils.NewWeightedRandomSelect() e.addrSelect.Update(addr) e.lastConnected = addr e.connectStats = entry.CStat diff --git a/les/utilities/exec_queue.go b/les/utils/exec_queue.go similarity index 99% rename from les/utilities/exec_queue.go rename to les/utils/exec_queue.go index 8a771780516e..a8f9b84acb80 100644 --- a/les/utilities/exec_queue.go +++ b/les/utils/exec_queue.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package utilities +package utils import "sync" diff --git a/les/utilities/exec_queue_test.go b/les/utils/exec_queue_test.go similarity index 98% rename from les/utilities/exec_queue_test.go rename to les/utils/exec_queue_test.go index 81a04c381d5d..98601c448662 100644 --- a/les/utilities/exec_queue_test.go +++ b/les/utils/exec_queue_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package utilities +package utils import "testing" diff --git a/les/utilities/weighted_select.go b/les/utils/weighted_select.go similarity index 99% rename from les/utilities/weighted_select.go rename to les/utils/weighted_select.go index 85dd13cb5ec6..fbf1f37d627a 100644 --- a/les/utilities/weighted_select.go +++ b/les/utils/weighted_select.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package utilities +package utils import "math/rand" diff --git a/les/utilities/weighted_select_test.go b/les/utils/weighted_select_test.go similarity index 98% rename from les/utilities/weighted_select_test.go rename to les/utils/weighted_select_test.go index 4e81c0c0863d..e1969e1a61fa 100644 --- a/les/utilities/weighted_select_test.go +++ b/les/utils/weighted_select_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package utilities +package utils import ( "math/rand"