From ea82a6d0ebbfcc2e0ba2d3ef16d6d07de9cff217 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 4 Mar 2020 14:42:59 +0800 Subject: [PATCH 1/4] eth: increase timeout allowance --- eth/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/handler_test.go b/eth/handler_test.go index 4a4e1f955956..8db12b670425 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -534,7 +534,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo } } // Wait until the test timeout passes to ensure proper cleanup - time.Sleep(syncChallengeTimeout + 100*time.Millisecond) + time.Sleep(syncChallengeTimeout + 300*time.Millisecond) // Verify that the remote peer is maintained or dropped if drop { From 6e85f25f6cb0a5326158ac6e3c6b4ebf9dfbe95a Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 4 Mar 2020 16:19:50 +0800 Subject: [PATCH 2/4] les: fix unit test --- les/client_handler.go | 5 +++++ les/odr_test.go | 2 -- les/peer.go | 2 +- les/request_test.go | 1 - les/server_handler.go | 3 +++ les/sync_test.go | 23 ++++++----------------- les/test_helper.go | 41 ++++++++++++++++++++++++++++++----------- 7 files changed, 45 insertions(+), 32 deletions(-) diff --git a/les/client_handler.go b/les/client_handler.go index d04574c8c7fa..9d8b98901259 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -19,6 +19,7 @@ package les import ( "math/big" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -132,6 +133,10 @@ func (h *clientHandler) handle(p *serverPeer) error { if p.poolEntry != nil { h.backend.serverPool.registered(p.poolEntry) } + // Mark the peer starts to be served. + atomic.StoreUint32(&p.serving, 1) + defer atomic.StoreUint32(&p.serving, 0) + // Spawn a main loop to handle all incoming messages. for { if err := h.handleMsg(p); err != nil { diff --git a/les/odr_test.go b/les/odr_test.go index bbe439dfec82..958a741ff759 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -186,8 +186,6 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) defer tearDown() - client.handler.synchronise(client.peer.speer) - // Ensure the client has synced all necessary data. clientHead := client.handler.backend.blockchain.CurrentHeader() if clientHead.Number.Uint64() != 4 { diff --git a/les/peer.go b/les/peer.go index 28ec201bc9a7..5a76de0477e0 100644 --- a/les/peer.go +++ b/les/peer.go @@ -131,6 +131,7 @@ type peerCommons struct { network uint64 // Network ID being on. frozen uint32 // Flag whether the peer is frozen. announceType uint64 // New block announcement type. + serving uint32 // The status indicates the peer is served. headInfo blockInfo // Latest block information. // Background task queue for caching peer tasks and executing in order. @@ -642,7 +643,6 @@ type clientPeer struct { responseCount uint64 // Counter to generate an unique id for request processing. errCh chan error fcClient *flowcontrol.ClientNode // Server side mirror token bucket. - balanceTracker *balanceTracker // set by clientPool.connect, used and removed by serverHandler } func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer { diff --git a/les/request_test.go b/les/request_test.go index f58ebca9c1d9..e20b06fda5ee 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -81,7 +81,6 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true) defer tearDown() - client.handler.synchronise(client.peer.speer) // Ensure the client has synced all necessary data. clientHead := client.handler.backend.blockchain.CurrentHeader() diff --git a/les/server_handler.go b/les/server_handler.go index 186bdcbb03f4..9543e6ed98a5 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -157,6 +157,9 @@ func (h *serverHandler) handle(p *clientPeer) error { clientConnectionGauge.Update(int64(h.server.peers.len())) connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) }() + // Mark the peer starts to be served. + atomic.StoreUint32(&p.serving, 1) + defer atomic.StoreUint32(&p.serving, 0) // Spawn a main loop to handle all incoming messages. for { diff --git a/les/sync_test.go b/les/sync_test.go index 77b82deb7a25..c128a8c9f72d 100644 --- a/les/sync_test.go +++ b/les/sync_test.go @@ -109,16 +109,12 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { } // Create connected peer pair. - peer1, err1, peer2, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler) + peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler) + if err != nil { + t.Fatalf("Failed to connect testing peers %v", err) + } defer peer1.close() defer peer2.close() - select { - case <-time.After(time.Millisecond * 100): - case err := <-err1: - t.Fatalf("peer 1 handshake error: %v", err) - case err := <-err2: - t.Fatalf("peer 2 handshake error: %v", err) - } select { case err := <-done: @@ -208,17 +204,10 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) { done <- fmt.Errorf("blockchain length mismatch, want %d, got %d", expected, header.Number) } } - // Create connected peer pair. - _, err1, _, err2 := newTestPeerPair("peer", 2, server.handler, client.handler) - select { - case <-time.After(time.Millisecond * 100): - case err := <-err1: - t.Fatalf("peer 1 handshake error: %v", err) - case err := <-err2: - t.Fatalf("peer 2 handshake error: %v", err) + if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil { + t.Fatalf("Failed to connect testing peers %v", err) } - select { case err := <-done: if err != nil { diff --git a/les/test_helper.go b/les/test_helper.go index d9ffe32db205..cface525650c 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -22,7 +22,9 @@ package les import ( "context" "crypto/rand" + "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -347,7 +349,7 @@ func (p *testPeer) close() { p.app.Close() } -func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, <-chan error, *testPeer, <-chan error) { +func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) { // Create a message pipe to communicate through app, net := p2p.MsgPipe() @@ -371,11 +373,25 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl go func() { select { case <-client.closeCh: - errc1 <- p2p.DiscQuitting - case errc1 <- client.handle(peer2): + errc2 <- p2p.DiscQuitting + case errc2 <- client.handle(peer2): } }() - return &testPeer{cpeer: peer1, net: net, app: app}, errc1, &testPeer{speer: peer2, net: app, app: net}, errc2 + // Ensure the connection is established or exits when any error occurs + for { + select { + case err := <-errc1: + return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err) + case err := <-errc2: + return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err) + default: + } + if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 { + break + } + time.Sleep(50 * time.Millisecond) + } + return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil } // handshake simulates a trivial handshake that expects the same state from the @@ -514,17 +530,20 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer callback(scIndexer, sbIndexer, sbtIndexer) } var ( + err error speer, cpeer *testPeer - err1, err2 <-chan error ) if connect { - cpeer, err1, speer, err2 = newTestPeerPair("peer", protocol, server, client) + done := make(chan struct{}) + client.syncDone = func() {close(done)} + cpeer, speer, err = newTestPeerPair("peer", protocol, server, client) + if err != nil { + t.Fatalf("Failed to connect testing peers %v", err) + } select { - case <-time.After(time.Millisecond * 300): - case err := <-err1: - t.Fatalf("peer 1 handshake error: %v", err) - case err := <-err2: - t.Fatalf("peer 2 handshake error: %v", err) + case <-done: + case <-time.NewTimer(time.Second * 3).C: + t.Fatal("Timeout") } } s := &testServer{ From 51881bf76f5475d71bd8151f082031fe93a5cff0 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 10 Mar 2020 18:11:58 +0800 Subject: [PATCH 3/4] les: fix linter --- les/peer.go | 12 ++++++------ les/test_helper.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/les/peer.go b/les/peer.go index 5a76de0477e0..ff298319595c 100644 --- a/les/peer.go +++ b/les/peer.go @@ -637,12 +637,12 @@ type clientPeer struct { // responseLock ensures that responses are queued in the same order as // RequestProcessed is called - responseLock sync.Mutex - server bool - invalidCount uint32 // Counter the invalid request the client peer has made. - responseCount uint64 // Counter to generate an unique id for request processing. - errCh chan error - fcClient *flowcontrol.ClientNode // Server side mirror token bucket. + responseLock sync.Mutex + server bool + invalidCount uint32 // Counter the invalid request the client peer has made. + responseCount uint64 // Counter to generate an unique id for request processing. + errCh chan error + fcClient *flowcontrol.ClientNode // Server side mirror token bucket. } func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer { diff --git a/les/test_helper.go b/les/test_helper.go index cface525650c..23f8319e957a 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -535,7 +535,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer ) if connect { done := make(chan struct{}) - client.syncDone = func() {close(done)} + client.syncDone = func() { close(done) } cpeer, speer, err = newTestPeerPair("peer", protocol, server, client) if err != nil { t.Fatalf("Failed to connect testing peers %v", err) From 8af579824c069d743aab667d843bd714e3bac7e7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 11 Mar 2020 11:34:21 +0100 Subject: [PATCH 4/4] les: use time.After --- les/test_helper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/les/test_helper.go b/les/test_helper.go index 23f8319e957a..1f02d2529998 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -542,8 +542,8 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer } select { case <-done: - case <-time.NewTimer(time.Second * 3).C: - t.Fatal("Timeout") + case <-time.After(3 * time.Second): + t.Fatal("test peer did not connect and sync within 3s") } } s := &testServer{