Skip to content

Commit a59f4c5

Browse files
elizabethengelmani-norden
authored andcommitted
Gracefully exit geth command(#4)
1 parent 46882f7 commit a59f4c5

File tree

3 files changed

+124
-34
lines changed

3 files changed

+124
-34
lines changed

statediff/service/service.go

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,34 +49,69 @@ func (StateDiffService) APIs() []rpc.API {
4949
return []rpc.API{}
5050
}
5151

52-
func (sds *StateDiffService) Loop(events chan core.ChainEvent) {
53-
for elem := range events {
54-
currentBlock := elem.Block
55-
parentHash := currentBlock.ParentHash()
56-
parentBlock := sds.BlockChain.GetBlockByHash(parentHash)
57-
58-
stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock)
59-
if err != nil {
60-
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err)
61-
} else {
62-
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation)
52+
func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
53+
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
54+
defer chainEventSub.Unsubscribe()
55+
56+
blocksCh := make(chan *types.Block, 10)
57+
errCh := chainEventSub.Err()
58+
quitCh := make(chan struct{})
59+
60+
go func() {
61+
HandleChainEventChLoop:
62+
for {
63+
select {
64+
//Notify chain event channel of events
65+
case chainEvent := <-chainEventCh:
66+
log.Debug("Event received from chainEventCh", "event", chainEvent)
67+
blocksCh <- chainEvent.Block
68+
//if node stopped
69+
case err := <-errCh:
70+
log.Warn("Error from chain event subscription, breaking loop.", "error", err)
71+
break HandleChainEventChLoop
72+
}
73+
}
74+
close(quitCh)
75+
}()
76+
77+
//loop through chain events until no more
78+
HandleBlockChLoop:
79+
for {
80+
select {
81+
case block := <-blocksCh:
82+
currentBlock := block
83+
parentHash := currentBlock.ParentHash()
84+
parentBlock := sds.BlockChain.GetBlockByHash(parentHash)
85+
if parentBlock == nil {
86+
log.Error("Parent block is nil, skipping this block",
87+
"parent block hash", parentHash.String(),
88+
"current block number", currentBlock.Number())
89+
break HandleBlockChLoop
90+
}
91+
92+
stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock)
93+
if err != nil {
94+
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err)
95+
} else {
96+
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation)
97+
}
98+
case <-quitCh:
99+
log.Debug("Quitting the statediff block channel")
100+
return
63101
}
64102
}
65103
}
66104

67-
var eventsChannel chan core.ChainEvent
68-
69105
func (sds *StateDiffService) Start(server *p2p.Server) error {
70106
log.Info("Starting statediff service")
71-
eventsChannel := make(chan core.ChainEvent, 10)
72-
sds.BlockChain.SubscribeChainEvent(eventsChannel)
73-
go sds.Loop(eventsChannel)
107+
108+
chainEventCh := make(chan core.ChainEvent, 10)
109+
go sds.Loop(chainEventCh)
110+
74111
return nil
75112
}
76113

77114
func (StateDiffService) Stop() error {
78115
log.Info("Stopping statediff service")
79-
close(eventsChannel)
80-
81116
return nil
82117
}

statediff/service/service_test.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@ import (
99
"github.com/ethereum/go-ethereum/common"
1010
"github.com/ethereum/go-ethereum/core"
1111
"github.com/ethereum/go-ethereum/core/types"
12-
service2 "github.com/ethereum/go-ethereum/statediff/service"
12+
s "github.com/ethereum/go-ethereum/statediff/service"
1313
"github.com/ethereum/go-ethereum/statediff/testhelpers/mocks"
1414
)
1515

1616
func TestServiceLoop(t *testing.T) {
17-
testServiceLoop(t)
17+
testErrorInChainEventLoop(t)
18+
testErrorInBlockLoop(t)
1819
}
1920

2021
var (
21-
eventsChannel = make(chan core.ChainEvent, 10)
22+
eventsChannel = make(chan core.ChainEvent, 1)
2223

2324
parentHeader1 = types.Header{Number: big.NewInt(rand.Int63())}
2425
parentHeader2 = types.Header{Number: big.NewInt(rand.Int63())}
@@ -31,29 +32,30 @@ var (
3132

3233
header1 = types.Header{ParentHash: parentHash1}
3334
header2 = types.Header{ParentHash: parentHash2}
35+
header3 = types.Header{ParentHash: common.HexToHash("parent hash")}
3436

3537
block1 = types.NewBlock(&header1, nil, nil, nil)
3638
block2 = types.NewBlock(&header2, nil, nil, nil)
39+
block3 = types.NewBlock(&header3, nil, nil, nil)
3740

3841
event1 = core.ChainEvent{Block: block1}
3942
event2 = core.ChainEvent{Block: block2}
43+
event3 = core.ChainEvent{Block: block3}
4044
)
4145

42-
func testServiceLoop(t *testing.T) {
43-
eventsChannel <- event1
44-
eventsChannel <- event2
45-
46+
func testErrorInChainEventLoop(t *testing.T) {
47+
//the first chain event causes and error (in blockchain mock)
4648
extractor := mocks.Extractor{}
47-
close(eventsChannel)
4849

4950
blockChain := mocks.BlockChain{}
50-
service := service2.StateDiffService{
51+
service := s.StateDiffService{
5152
Builder: nil,
5253
Extractor: &extractor,
5354
BlockChain: &blockChain,
5455
}
5556

56-
blockChain.SetParentBlockToReturn([]*types.Block{parentBlock1, parentBlock2})
57+
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2})
58+
blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3})
5759
service.Loop(eventsChannel)
5860

5961
//parent and current blocks are passed to the extractor
@@ -75,3 +77,31 @@ func testServiceLoop(t *testing.T) {
7577
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes)
7678
}
7779
}
80+
81+
func testErrorInBlockLoop(t *testing.T) {
82+
//second block's parent block can't be found
83+
extractor := mocks.Extractor{}
84+
85+
blockChain := mocks.BlockChain{}
86+
service := s.StateDiffService{
87+
Builder: nil,
88+
Extractor: &extractor,
89+
BlockChain: &blockChain,
90+
}
91+
92+
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil})
93+
blockChain.SetChainEvents([]core.ChainEvent{event1, event2})
94+
service.Loop(eventsChannel)
95+
96+
//only the first current block (and it's parent) are passed to the extractor
97+
expectedCurrentBlocks := []types.Block{*block1}
98+
if !reflect.DeepEqual(extractor.CurrentBlocks, expectedCurrentBlocks) {
99+
t.Error("Test failure:", t.Name())
100+
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", extractor.CurrentBlocks, expectedCurrentBlocks)
101+
}
102+
expectedParentBlocks := []types.Block{*parentBlock1}
103+
if !reflect.DeepEqual(extractor.ParentBlocks, expectedParentBlocks) {
104+
t.Error("Test failure:", t.Name())
105+
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", extractor.CurrentBlocks, expectedParentBlocks)
106+
}
107+
}
Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package mocks
22

33
import (
4+
"errors"
5+
46
"github.com/ethereum/go-ethereum/common"
57
"github.com/ethereum/go-ethereum/core"
68
"github.com/ethereum/go-ethereum/core/types"
@@ -11,24 +13,47 @@ type BlockChain struct {
1113
ParentHashesLookedUp []common.Hash
1214
parentBlocksToReturn []*types.Block
1315
callCount int
16+
ChainEvents []core.ChainEvent
1417
}
1518

16-
func (mc *BlockChain) SetParentBlockToReturn(blocks []*types.Block) {
19+
func (mc *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) {
1720
mc.parentBlocksToReturn = blocks
1821
}
1922

2023
func (mc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
2124
mc.ParentHashesLookedUp = append(mc.ParentHashesLookedUp, hash)
2225

23-
var parentBlock types.Block
26+
var parentBlock *types.Block
2427
if len(mc.parentBlocksToReturn) > 0 {
25-
parentBlock = *mc.parentBlocksToReturn[mc.callCount]
28+
parentBlock = mc.parentBlocksToReturn[mc.callCount]
2629
}
2730

2831
mc.callCount++
29-
return &parentBlock
32+
return parentBlock
33+
}
34+
35+
func (bc *BlockChain) SetChainEvents(chainEvents []core.ChainEvent) {
36+
bc.ChainEvents = chainEvents
3037
}
3138

32-
func (BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
33-
panic("implement me")
39+
func (bc *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
40+
subErr := errors.New("Subscription Error")
41+
42+
var eventCounter int
43+
subscription := event.NewSubscription(func(quit <-chan struct{}) error {
44+
for _, chainEvent := range bc.ChainEvents {
45+
if eventCounter > 1 {
46+
return subErr
47+
}
48+
select {
49+
case ch <- chainEvent:
50+
case <-quit:
51+
return nil
52+
}
53+
eventCounter++
54+
}
55+
return nil
56+
})
57+
58+
return subscription
3459
}

0 commit comments

Comments
 (0)