Skip to content

Commit 7df574c

Browse files
Merge pull request #250 from coinbase/patrick/server-closed-error
[statefulsyncer] Add Variadic Initializer
2 parents a61aa5d + 80535de commit 7df574c

File tree

7 files changed

+180
-56
lines changed

7 files changed

+180
-56
lines changed

fetcher/utils.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ const (
3737
// clientTimeout is returned when a request exceeds the set
3838
// HTTP timeout setting.
3939
clientTimeout = "Client.Timeout exceeded"
40+
41+
// serverClosedIdleConnection is returned when the client
42+
// attempts to make a request on a connection that was closed
43+
// by the server.
44+
serverClosedIdleConnection = "server closed idle connection"
4045
)
4146

4247
// Backoff wraps backoff.BackOff so we can
@@ -66,6 +71,7 @@ func transientError(err error) bool {
6671
if errors.Is(err, client.ErrRetriable) ||
6772
strings.Contains(err.Error(), io.EOF.Error()) ||
6873
strings.Contains(err.Error(), connectionResetByPeer) ||
74+
strings.Contains(err.Error(), serverClosedIdleConnection) ||
6975
strings.Contains(err.Error(), clientTimeout) {
7076
return true
7177
}

statefulsyncer/configuration.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2020 Coinbase, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package statefulsyncer
16+
17+
import (
18+
"time"
19+
)
20+
21+
// Option is used to overwrite default values in
22+
// StatefulSyncer construction. Any Option not provided
23+
// falls back to the default value.
24+
type Option func(s *StatefulSyncer)
25+
26+
// WithCacheSize overrides the default cache size.
27+
func WithCacheSize(cacheSize int) Option {
28+
return func(s *StatefulSyncer) {
29+
s.cacheSize = cacheSize
30+
}
31+
}
32+
33+
// WithPastBlockLimit overrides the default past block limit
34+
func WithPastBlockLimit(blocks int) Option {
35+
return func(s *StatefulSyncer) {
36+
s.pastBlockLimit = blocks
37+
}
38+
}
39+
40+
// WithMaxConcurrency overrides the default max concurrency.
41+
func WithMaxConcurrency(concurrency int64) Option {
42+
return func(s *StatefulSyncer) {
43+
s.maxConcurrency = concurrency
44+
}
45+
}
46+
47+
// WithAdjustmentWindow overrides the default adjustment window.
48+
func WithAdjustmentWindow(adjustmentWindow int64) Option {
49+
return func(s *StatefulSyncer) {
50+
s.adjustmentWindow = adjustmentWindow
51+
}
52+
}
53+
54+
// WithPruneSleepTime overrides the default prune sleep time.
55+
func WithPruneSleepTime(sleepTime int) Option {
56+
return func(s *StatefulSyncer) {
57+
s.pruneSleepTime = time.Duration(sleepTime) * time.Second
58+
}
59+
}

statefulsyncer/stateful_syncer.go

Lines changed: 36 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,22 @@ import (
1919
"errors"
2020
"fmt"
2121
"log"
22-
"math/big"
2322
"time"
2423

2524
"github.com/coinbase/rosetta-sdk-go/fetcher"
2625
"github.com/coinbase/rosetta-sdk-go/storage"
2726
"github.com/coinbase/rosetta-sdk-go/syncer"
2827
"github.com/coinbase/rosetta-sdk-go/types"
28+
"github.com/coinbase/rosetta-sdk-go/utils"
2929
)
3030

3131
var _ syncer.Handler = (*StatefulSyncer)(nil)
3232
var _ syncer.Helper = (*StatefulSyncer)(nil)
3333

3434
const (
35-
// pruneSleepTime is how long we sleep between
35+
// DefaultPruneSleepTime is how long we sleep between
3636
// pruning attempts.
37-
pruneSleepTime = 10 * time.Second
37+
DefaultPruneSleepTime = 30 * time.Minute
3838

3939
// pruneBuffer is the cushion we apply to pastBlockLimit
4040
// when pruning.
@@ -54,9 +54,12 @@ type StatefulSyncer struct {
5454
counterStorage *storage.CounterStorage
5555
logger Logger
5656
workers []storage.BlockWorker
57-
cacheSize int
58-
maxConcurrency int64
59-
pastBlockLimit int
57+
58+
cacheSize int
59+
maxConcurrency int64
60+
pastBlockLimit int
61+
adjustmentWindow int64
62+
pruneSleepTime time.Duration
6063
}
6164

6265
// Logger is used by the statefulsyncer to
@@ -87,22 +90,31 @@ func New(
8790
logger Logger,
8891
cancel context.CancelFunc,
8992
workers []storage.BlockWorker,
90-
cacheSize int,
91-
maxConcurrency int64,
92-
pastBlockLimit int,
93+
options ...Option,
9394
) *StatefulSyncer {
94-
return &StatefulSyncer{
95+
s := &StatefulSyncer{
9596
network: network,
9697
fetcher: fetcher,
9798
cancel: cancel,
9899
blockStorage: blockStorage,
99100
counterStorage: counterStorage,
100101
workers: workers,
101102
logger: logger,
102-
cacheSize: cacheSize,
103-
maxConcurrency: maxConcurrency,
104-
pastBlockLimit: pastBlockLimit,
103+
104+
// Optional args
105+
cacheSize: syncer.DefaultCacheSize,
106+
maxConcurrency: syncer.DefaultMaxConcurrency,
107+
pastBlockLimit: syncer.DefaultPastBlockLimit,
108+
adjustmentWindow: syncer.DefaultAdjustmentWindow,
109+
pruneSleepTime: DefaultPruneSleepTime,
105110
}
111+
112+
// Override defaults with any provided options
113+
for _, opt := range options {
114+
opt(s)
115+
}
116+
117+
return s
106118
}
107119

108120
// Sync starts a new sync run after properly initializing blockStorage.
@@ -135,6 +147,7 @@ func (s *StatefulSyncer) Sync(ctx context.Context, startIndex int64, endIndex in
135147
syncer.WithPastBlocks(pastBlocks),
136148
syncer.WithCacheSize(s.cacheSize),
137149
syncer.WithMaxConcurrency(s.maxConcurrency),
150+
syncer.WithAdjustmentWindow(s.adjustmentWindow),
138151
)
139152

140153
return syncer.Sync(ctx, startIndex, endIndex)
@@ -148,10 +161,16 @@ func (s *StatefulSyncer) Sync(ctx context.Context, startIndex int64, endIndex in
148161
// pruning strategies during syncing.
149162
func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
150163
for ctx.Err() == nil {
164+
// We don't use a timer pattern because s.pruneSleepTime is defined
165+
// as the time between pruning runs. Using a timer would only guarantee
166+
// that the difference between starts of each pruning run are s.pruneSleepTime.
167+
if err := utils.ContextSleep(ctx, s.pruneSleepTime); err != nil {
168+
return err
169+
}
170+
151171
headBlock, err := s.blockStorage.GetHeadBlockIdentifier(ctx)
152172
if headBlock == nil && errors.Is(err, storage.ErrHeadBlockNotFound) {
153173
// this will occur when we are waiting for the first block to be synced
154-
time.Sleep(pruneSleepTime)
155174
continue
156175
}
157176
if err != nil {
@@ -161,7 +180,6 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
161180
oldestIndex, err := s.blockStorage.GetOldestBlockIndex(ctx)
162181
if oldestIndex == -1 && errors.Is(err, storage.ErrOldestIndexMissing) {
163182
// this will occur when we have yet to store the oldest index
164-
time.Sleep(pruneSleepTime)
165183
continue
166184
}
167185
if err != nil {
@@ -174,7 +192,6 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
174192
}
175193

176194
if pruneableIndex < oldestIndex {
177-
time.Sleep(pruneSleepTime)
178195
continue
179196
}
180197

@@ -196,8 +213,6 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
196213

197214
log.Println(pruneMessage)
198215
}
199-
200-
time.Sleep(pruneSleepTime)
201216
}
202217

203218
return ctx.Err()
@@ -215,23 +230,7 @@ func (s *StatefulSyncer) BlockAdded(ctx context.Context, block *types.Block) err
215230
)
216231
}
217232

218-
if err := s.logger.AddBlockStream(ctx, block); err != nil {
219-
return nil
220-
}
221-
222-
// Update Counters
223-
_, _ = s.counterStorage.Update(ctx, storage.BlockCounter, big.NewInt(1))
224-
_, _ = s.counterStorage.Update(
225-
ctx,
226-
storage.TransactionCounter,
227-
big.NewInt(int64(len(block.Transactions))),
228-
)
229-
opCount := int64(0)
230-
for _, txn := range block.Transactions {
231-
opCount += int64(len(txn.Operations))
232-
}
233-
_, _ = s.counterStorage.Update(ctx, storage.OperationCounter, big.NewInt(opCount))
234-
233+
_ = s.logger.AddBlockStream(ctx, block)
235234
return nil
236235
}
237236

@@ -250,14 +249,8 @@ func (s *StatefulSyncer) BlockRemoved(
250249
)
251250
}
252251

253-
if err := s.logger.RemoveBlockStream(ctx, blockIdentifier); err != nil {
254-
return nil
255-
}
256-
257-
// Update Counters
258-
_, _ = s.counterStorage.Update(ctx, storage.OrphanCounter, big.NewInt(1))
259-
260-
return err
252+
_ = s.logger.RemoveBlockStream(ctx, blockIdentifier)
253+
return nil
261254
}
262255

263256
// NetworkStatus is called by the syncer to get the current

storage/counter_storage.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919
"fmt"
2020
"math/big"
21+
22+
"github.com/coinbase/rosetta-sdk-go/types"
2123
)
2224

2325
const (
@@ -76,6 +78,8 @@ const (
7678
counterNamespace = "counter"
7779
)
7880

81+
var _ BlockWorker = (*CounterStorage)(nil)
82+
7983
// CounterStorage implements counter-specific storage methods
8084
// on top of a Database and DatabaseTransaction interface.
8185
type CounterStorage struct {
@@ -162,3 +166,56 @@ func (c *CounterStorage) Get(ctx context.Context, counter string) (*big.Int, err
162166

163167
return transactionalGet(ctx, counter, transaction)
164168
}
169+
170+
// AddingBlock is called by BlockStorage when adding a block.
171+
func (c *CounterStorage) AddingBlock(
172+
ctx context.Context,
173+
block *types.Block,
174+
transaction DatabaseTransaction,
175+
) (CommitWorker, error) {
176+
_, err := c.UpdateTransactional(
177+
ctx,
178+
transaction,
179+
BlockCounter,
180+
big.NewInt(1),
181+
)
182+
if err != nil {
183+
return nil, err
184+
}
185+
186+
_, err = c.UpdateTransactional(
187+
ctx,
188+
transaction,
189+
TransactionCounter,
190+
big.NewInt(int64(len(block.Transactions))),
191+
)
192+
if err != nil {
193+
return nil, err
194+
}
195+
196+
opCount := int64(0)
197+
for _, txn := range block.Transactions {
198+
opCount += int64(len(txn.Operations))
199+
}
200+
_, err = c.UpdateTransactional(
201+
ctx,
202+
transaction,
203+
OperationCounter,
204+
big.NewInt(opCount),
205+
)
206+
if err != nil {
207+
return nil, err
208+
}
209+
210+
return nil, nil
211+
}
212+
213+
// RemovingBlock is called by BlockStorage when removing a block.
214+
func (c *CounterStorage) RemovingBlock(
215+
ctx context.Context,
216+
block *types.Block,
217+
transaction DatabaseTransaction,
218+
) (CommitWorker, error) {
219+
_, err := c.UpdateTransactional(ctx, transaction, OrphanCounter, big.NewInt(1))
220+
return nil, err
221+
}

syncer/configuration.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,10 @@ func WithMaxConcurrency(concurrency int64) Option {
5858
s.maxConcurrency = concurrency
5959
}
6060
}
61+
62+
// WithAdjustmentWindow overrides the default adjustment window.
63+
func WithAdjustmentWindow(adjustmentWindow int64) Option {
64+
return func(s *Syncer) {
65+
s.adjustmentWindow = adjustmentWindow
66+
}
67+
}

syncer/syncer.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,17 @@ func New(
3737
options ...Option,
3838
) *Syncer {
3939
s := &Syncer{
40-
network: network,
41-
helper: helper,
42-
handler: handler,
43-
concurrency: DefaultConcurrency,
44-
cacheSize: DefaultCacheSize,
45-
maxConcurrency: DefaultMaxConcurrency,
46-
sizeMultiplier: DefaultSizeMultiplier,
47-
cancel: cancel,
48-
pastBlocks: []*types.BlockIdentifier{},
49-
pastBlockLimit: DefaultPastBlockLimit,
40+
network: network,
41+
helper: helper,
42+
handler: handler,
43+
concurrency: DefaultConcurrency,
44+
cacheSize: DefaultCacheSize,
45+
maxConcurrency: DefaultMaxConcurrency,
46+
sizeMultiplier: DefaultSizeMultiplier,
47+
cancel: cancel,
48+
pastBlocks: []*types.BlockIdentifier{},
49+
pastBlockLimit: DefaultPastBlockLimit,
50+
adjustmentWindow: DefaultAdjustmentWindow,
5051
}
5152

5253
// Override defaults with any provided options
@@ -408,7 +409,7 @@ func (s *Syncer) adjustWorkers() bool {
408409
shouldCreate := false
409410
if estimatedMaxCache+max < float64(s.cacheSize) &&
410411
s.concurrency < s.maxConcurrency &&
411-
s.lastAdjustment > defaultAdjustmentWindow {
412+
s.lastAdjustment > s.adjustmentWindow {
412413
s.goalConcurrency++
413414
s.concurrency++
414415
s.lastAdjustment = 0

syncer/types.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ const (
5858
// of block sizes to keep when adjusting concurrency.
5959
defaultTrailingWindow = 1000
6060

61-
// defaultAdjustmentWindow is how frequently we will
61+
// DefaultAdjustmentWindow is how frequently we will
6262
// consider increasing our concurrency.
63-
defaultAdjustmentWindow = 10
63+
DefaultAdjustmentWindow = 5
6464

6565
// DefaultSizeMultiplier is used to pad our average size adjustment.
6666
// This can be used to account for the overhead associated with processing
@@ -146,6 +146,7 @@ type Syncer struct {
146146
goalConcurrency int64
147147
recentBlockSizes []int
148148
lastAdjustment int64
149+
adjustmentWindow int64
149150
concurrencyLock sync.Mutex
150151

151152
// doneLoading is used to coordinate adding goroutines

0 commit comments

Comments
 (0)