Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
}
}
// Wait until the test timeout passes to ensure proper cleanup
time.Sleep(syncChallengeTimeout + 100*time.Millisecond)
time.Sleep(syncChallengeTimeout + 300*time.Millisecond)

// Verify that the remote peer is maintained or dropped
if drop {
Expand Down
5 changes: 5 additions & 0 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package les
import (
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -132,6 +133,10 @@ func (h *clientHandler) handle(p *serverPeer) error {
if p.poolEntry != nil {
h.backend.serverPool.registered(p.poolEntry)
}
// Mark the peer starts to be served.
atomic.StoreUint32(&p.serving, 1)
defer atomic.StoreUint32(&p.serving, 0)

// Spawn a main loop to handle all incoming messages.
for {
if err := h.handleMsg(p); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions les/odr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
defer tearDown()

client.handler.synchronise(client.peer.speer)

// Ensure the client has synced all necessary data.
clientHead := client.handler.backend.blockchain.CurrentHeader()
if clientHead.Number.Uint64() != 4 {
Expand Down
14 changes: 7 additions & 7 deletions les/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type peerCommons struct {
network uint64 // Network ID being on.
frozen uint32 // Flag whether the peer is frozen.
announceType uint64 // New block announcement type.
serving uint32 // The status indicates the peer is served.
headInfo blockInfo // Latest block information.

// Background task queue for caching peer tasks and executing in order.
Expand Down Expand Up @@ -636,13 +637,12 @@ type clientPeer struct {

// responseLock ensures that responses are queued in the same order as
// RequestProcessed is called
responseLock sync.Mutex
server bool
invalidCount uint32 // Counter the invalid request the client peer has made.
responseCount uint64 // Counter to generate an unique id for request processing.
errCh chan error
fcClient *flowcontrol.ClientNode // Server side mirror token bucket.
balanceTracker *balanceTracker // set by clientPool.connect, used and removed by serverHandler
responseLock sync.Mutex
server bool
invalidCount uint32 // Counter the invalid request the client peer has made.
responseCount uint64 // Counter to generate an unique id for request processing.
errCh chan error
fcClient *flowcontrol.ClientNode // Server side mirror token bucket.
}

func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer {
Expand Down
1 change: 0 additions & 1 deletion les/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
// Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true)
defer tearDown()
client.handler.synchronise(client.peer.speer)

// Ensure the client has synced all necessary data.
clientHead := client.handler.backend.blockchain.CurrentHeader()
Expand Down
3 changes: 3 additions & 0 deletions les/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func (h *serverHandler) handle(p *clientPeer) error {
clientConnectionGauge.Update(int64(h.server.peers.len()))
connectionTimer.Update(time.Duration(mclock.Now() - connectedAt))
}()
// Mark the peer starts to be served.
atomic.StoreUint32(&p.serving, 1)
defer atomic.StoreUint32(&p.serving, 0)

// Spawn a main loop to handle all incoming messages.
for {
Expand Down
23 changes: 6 additions & 17 deletions les/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,12 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
}

// Create connected peer pair.
peer1, err1, peer2, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler)
peer1, peer2, err := newTestPeerPair("peer", protocol, server.handler, client.handler)
if err != nil {
t.Fatalf("Failed to connect testing peers %v", err)
}
defer peer1.close()
defer peer2.close()
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
t.Fatalf("peer 1 handshake error: %v", err)
case err := <-err2:
t.Fatalf("peer 2 handshake error: %v", err)
}

select {
case err := <-done:
Expand Down Expand Up @@ -208,17 +204,10 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
done <- fmt.Errorf("blockchain length mismatch, want %d, got %d", expected, header.Number)
}
}

// Create connected peer pair.
_, err1, _, err2 := newTestPeerPair("peer", 2, server.handler, client.handler)
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
t.Fatalf("peer 1 handshake error: %v", err)
case err := <-err2:
t.Fatalf("peer 2 handshake error: %v", err)
if _, _, err := newTestPeerPair("peer", 2, server.handler, client.handler); err != nil {
t.Fatalf("Failed to connect testing peers %v", err)
}

select {
case err := <-done:
if err != nil {
Expand Down
41 changes: 30 additions & 11 deletions les/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package les
import (
"context"
"crypto/rand"
"fmt"
"math/big"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -347,7 +349,7 @@ func (p *testPeer) close() {
p.app.Close()
}

func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, <-chan error, *testPeer, <-chan error) {
func newTestPeerPair(name string, version int, server *serverHandler, client *clientHandler) (*testPeer, *testPeer, error) {
// Create a message pipe to communicate through
app, net := p2p.MsgPipe()

Expand All @@ -371,11 +373,25 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl
go func() {
select {
case <-client.closeCh:
errc1 <- p2p.DiscQuitting
case errc1 <- client.handle(peer2):
errc2 <- p2p.DiscQuitting
case errc2 <- client.handle(peer2):
}
}()
return &testPeer{cpeer: peer1, net: net, app: app}, errc1, &testPeer{speer: peer2, net: app, app: net}, errc2
// Ensure the connection is established or exits when any error occurs
for {
select {
case err := <-errc1:
return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
case err := <-errc2:
return nil, nil, fmt.Errorf("Failed to establish protocol connection %v", err)
default:
}
if atomic.LoadUint32(&peer1.serving) == 1 && atomic.LoadUint32(&peer2.serving) == 1 {
break
}
time.Sleep(50 * time.Millisecond)
}
return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil
}

// handshake simulates a trivial handshake that expects the same state from the
Expand Down Expand Up @@ -514,17 +530,20 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
callback(scIndexer, sbIndexer, sbtIndexer)
}
var (
err error
speer, cpeer *testPeer
err1, err2 <-chan error
)
if connect {
cpeer, err1, speer, err2 = newTestPeerPair("peer", protocol, server, client)
done := make(chan struct{})
client.syncDone = func() { close(done) }
cpeer, speer, err = newTestPeerPair("peer", protocol, server, client)
if err != nil {
t.Fatalf("Failed to connect testing peers %v", err)
}
select {
case <-time.After(time.Millisecond * 300):
case err := <-err1:
t.Fatalf("peer 1 handshake error: %v", err)
case err := <-err2:
t.Fatalf("peer 2 handshake error: %v", err)
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("test peer did not connect and sync within 3s")
}
}
s := &testServer{
Expand Down