From 735080125c5f2872ce352fb5f0451de1a7c0c807 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 08:07:40 -0600 Subject: [PATCH 01/11] explicitly set numCPU in errgroup --- storage/modules/balance_storage.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/storage/modules/balance_storage.go b/storage/modules/balance_storage.go index 5aff76bbe..523106bc9 100644 --- a/storage/modules/balance_storage.go +++ b/storage/modules/balance_storage.go @@ -20,6 +20,7 @@ import ( "fmt" "log" "math/big" + "runtime" "sync" "github.com/neilotoole/errgroup" @@ -143,6 +144,7 @@ type BalanceStorage struct { db database.Database helper BalanceStorageHelper handler BalanceStorageHandler + numCPU int // To scale up write concurrency on reconciliation, // we don't update the global reconciliation counter @@ -159,6 +161,7 @@ func NewBalanceStorage( ) *BalanceStorage { return &BalanceStorage{ db: db, + numCPU: runtime.NumCPU(), pendingReconciliationMutex: new(utils.PriorityMutex), } } @@ -195,7 +198,7 @@ func (b *BalanceStorage) AddingBlock( var newAccountsLock sync.Mutex // Concurrent execution limited to runtime.NumCPU - g, gctx := errgroup.WithContextN(ctx, 0, 0) + g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range changes { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -270,7 +273,7 @@ func (b *BalanceStorage) RemovingBlock( var staleAccountsMutex sync.Mutex // Concurrent execution limited to runtime.NumCPU - g, gctx := errgroup.WithContextN(ctx, 0, 0) + g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range changes { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration From 6a850d3ce60095fbb91f2cc48b5bf033d3e38909 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 08:36:30 -0600 Subject: [PATCH 02/11] First pass at overhauling reconciler --- reconciler/reconciler.go | 103 +++++++++++++++++++++++---------------- reconciler/types.go | 7 +-- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index f7fa554cd..d74b2a421 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -28,6 +28,10 @@ import ( "github.com/coinbase/rosetta-sdk-go/utils" ) +const ( + shardBuffer = 2 +) + // New creates a new Reconciler. func New( helper Helper, @@ -45,9 +49,9 @@ func New( highWaterMark: -1, seenAccounts: map[string]struct{}{}, inactiveQueue: []*InactiveEntry{}, + inactiveQueueMutex: new(utils.PriorityMutex), backlogSize: defaultBacklogSize, lastIndexChecked: -1, - queueMap: map[string]*utils.BST{}, } for _, opt := range options { @@ -57,6 +61,10 @@ func New( // Create change queue r.changeQueue = make(chan *parser.BalanceChange, r.backlogSize) + // Create queueMap + desiredShardCount := shardBuffer * (r.ActiveConcurrency + r.InactiveConcurrency) + r.queueMap = utils.NewShardedMap(desiredShardCount) + return r } @@ -98,7 +106,7 @@ func (r *Reconciler) wrappedInactiveEnqueue( accountCurrency *types.AccountCurrency, liveBlock *types.BlockIdentifier, ) { - if err := r.inactiveAccountQueue(true, accountCurrency, liveBlock); err != nil { + if err := r.inactiveAccountQueue(true, accountCurrency, liveBlock, false); err != nil { log.Printf( "%s: unable to queue account %s", err.Error(), @@ -110,17 +118,18 @@ func (r *Reconciler) wrappedInactiveEnqueue( // addToQueueMap adds a *types.AccountCurrency // to the prune map at the provided index. func (r *Reconciler) addToQueueMap( - acctCurrency *types.AccountCurrency, + m map[string]interface{}, + key string, index int64, ) { - key := types.Hash(acctCurrency) - if _, ok := r.queueMap[key]; !ok { - r.queueMap[key] = &utils.BST{} + if _, ok := m[key]; !ok { + m[key] = &utils.BST{} } - existing := r.queueMap[key].Get(index) + bst := m[key].(*utils.BST) + existing := bst.Get(index) if existing == nil { - r.queueMap[key].Set(index, 1) + bst.Set(index, 1) } else { existing.Value++ } @@ -183,16 +192,20 @@ func (r *Reconciler) QueueChanges( Account: change.Account, Currency: change.Currency, } - err := r.inactiveAccountQueue(false, acctCurrency, block) + + r.inactiveQueueMutex.Lock(true) + err := r.inactiveAccountQueue(false, acctCurrency, block, true) + r.inactiveQueueMutex.Unlock() if err != nil { return err } // Add change to queueMap before enqueuing to ensure // there is no possible race. - r.queueMapMutex.Lock() - r.addToQueueMap(acctCurrency, change.Block.Index) - r.queueMapMutex.Unlock() + key := types.Hash(acctCurrency) + m := r.queueMap.Lock(key, true) + r.addToQueueMap(m, key, change.Block.Index) + r.queueMap.Unlock(key) // Add change to active queue r.wrappedActiveEnqueue(ctx, change) @@ -499,8 +512,12 @@ func (r *Reconciler) inactiveAccountQueue( inactive bool, accountCurrency *types.AccountCurrency, liveBlock *types.BlockIdentifier, + hasLock bool, ) error { - r.inactiveQueueMutex.Lock() + if !hasLock { + r.inactiveQueueMutex.Lock(false) + defer r.inactiveQueueMutex.Unlock() + } // Only enqueue the first time we see an account on an active reconciliation. shouldEnqueueInactive := false @@ -594,33 +611,34 @@ func (r *Reconciler) updateQueueMap( prune bool, ) error { key := types.Hash(acctCurrency) + m := r.queueMap.Lock(key, false) + bst := m[key].(*utils.BST) - r.queueMapMutex.Lock() - existing := r.queueMap[key].Get(index) + existing := bst.Get(index) existing.Value-- if existing.Value > 0 { - r.queueMapMutex.Unlock() + r.queueMap.Unlock(key) return nil } // Cleanup indexes when we don't need them anymore - r.queueMap[key].Delete(index) + bst.Delete(index) // Don't prune if there are items for this AccountCurrency // less than this change. - if !r.queueMap[key].Empty() && - index >= r.queueMap[key].Min().Key { - r.queueMapMutex.Unlock() + if !bst.Empty() && + index >= bst.Min().Key { + r.queueMap.Unlock(key) return nil } // Cleanup keys when we don't need them anymore - if r.queueMap[key].Empty() { - delete(r.queueMap, key) + if bst.Empty() { + delete(m, key) } // Unlock before pruning as this could take some time - r.queueMapMutex.Unlock() + r.queueMap.Unlock(key) // Attempt to prune historical balances that will not be used // anymore. @@ -760,24 +778,10 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit return ctx.Err() } - // Lock BST while determining if we should attempt reconciliation - // to ensure we don't allow any accounts to be pruned at retrieved - // head index. Although this appears to be a long time to hold - // this mutex, this lookup takes less than a millisecond. - r.queueMapMutex.Lock() - - shouldAttempt, head := r.shouldAttemptInactiveReconciliation(ctx) - if !shouldAttempt { - r.queueMapMutex.Unlock() - time.Sleep(inactiveReconciliationSleep) - continue - } - - r.inactiveQueueMutex.Lock() + r.inactiveQueueMutex.Lock(false) queueLen := len(r.inactiveQueue) if queueLen == 0 { r.inactiveQueueMutex.Unlock() - r.queueMapMutex.Unlock() r.debugLog( "no accounts ready for inactive reconciliation (0 accounts in queue)", ) @@ -786,6 +790,21 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit } nextAcct := r.inactiveQueue[0] + key := types.Hash(nextAcct.Entry) + + // Lock BST while determining if we should attempt reconciliation + // to ensure we don't allow any accounts to be pruned at retrieved + // head index. Although this appears to be a long time to hold + // this mutex, this lookup takes less than a millisecond. + m := r.queueMap.Lock(key, false) + + shouldAttempt, head := r.shouldAttemptInactiveReconciliation(ctx) + if !shouldAttempt { + r.queueMap.Unlock(key) + time.Sleep(inactiveReconciliationSleep) + continue + } + nextValidIndex := int64(-1) if nextAcct.LastCheck != nil { // block is set to nil when loaded from previous run nextValidIndex = nextAcct.LastCheck.Index + r.inactiveFrequency @@ -797,8 +816,8 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // Add nextAcct to queueMap before returning // queueMapMutex lock. - r.addToQueueMap(nextAcct.Entry, head.Index) - r.queueMapMutex.Unlock() + r.addToQueueMap(m, key, head.Index) + r.queueMap.Unlock(key) amount, block, err := r.bestLiveBalance( ctx, @@ -872,13 +891,13 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // Always re-enqueue accounts after they have been inactively // reconciled. If we don't re-enqueue, we will never check // these accounts again. - err = r.inactiveAccountQueue(true, nextAcct.Entry, block) + err = r.inactiveAccountQueue(true, nextAcct.Entry, block, false) if err != nil { return err } } else { r.inactiveQueueMutex.Unlock() - r.queueMapMutex.Unlock() + r.queueMap.Unlock(key) r.debugLog( "no accounts ready for inactive reconciliation (%d accounts in queue, will reconcile next account at index %d)", queueLen, diff --git a/reconciler/types.go b/reconciler/types.go index 080378863..3594d7c80 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -254,7 +254,7 @@ type Reconciler struct { // inactiveQueueMutex needed because we can't peek at the tip // of a channel to determine when it is ready to look at. - inactiveQueueMutex sync.Mutex + inactiveQueueMutex *utils.PriorityMutex // lastIndexChecked is the last block index reconciled actively. lastIndexMutex sync.Mutex @@ -264,6 +264,7 @@ type Reconciler struct { // in the active reconciliation queue and being actively // reconciled. It ensures we don't accidentally attempt to prune // computed balances being used by other goroutines. - queueMap map[string]*utils.BST - queueMapMutex sync.Mutex + queueMap *utils.ShardedMap + // queueMap map[string]*utils.BST + // queueMapMutex sync.Mutex } From d17ec9f142372519bd6b7ef57ebc12a822455d1c Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 08:42:02 -0600 Subject: [PATCH 03/11] update tests --- reconciler/reconciler_test.go | 55 +++++++++++++++++++++++++++++++---- reconciler/types.go | 2 -- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index cea9c4520..ce1eb184d 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -28,6 +28,7 @@ import ( mockDatabase "github.com/coinbase/rosetta-sdk-go/mocks/storage/database" "github.com/coinbase/rosetta-sdk-go/parser" "github.com/coinbase/rosetta-sdk-go/types" + "github.com/coinbase/rosetta-sdk-go/utils" ) func TestNewReconciler(t *testing.T) { @@ -566,6 +567,7 @@ func TestInactiveAccountQueue(t *testing.T) { false, accountCurrency, block, + false, ) assert.Nil(t, err) assertContainsAllAccounts(t, r.seenAccounts, []*types.AccountCurrency{accountCurrency}) @@ -582,6 +584,7 @@ func TestInactiveAccountQueue(t *testing.T) { false, accountCurrency2, block2, + false, ) assert.Nil(t, err) assertContainsAllAccounts( @@ -608,6 +611,7 @@ func TestInactiveAccountQueue(t *testing.T) { false, accountCurrency, block, + false, ) assert.Nil(t, err) assertContainsAllAccounts( @@ -623,6 +627,7 @@ func TestInactiveAccountQueue(t *testing.T) { true, accountCurrency, block, + false, ) assert.Nil(t, err) assertContainsAllAccounts( @@ -643,6 +648,7 @@ func TestInactiveAccountQueue(t *testing.T) { true, accountCurrency2, block2, + false, ) assert.Nil(t, err) assertContainsAllAccounts( @@ -760,6 +766,18 @@ func mockReconcilerCalls( } } +func shardsEmpty(m *utils.ShardedMap, keys []string) bool { + for _, k := range keys { + s := m.Lock(k, false) + m.Unlock(k) + if len(s) > 0 { + return true + } + } + + return false +} + func TestReconcile_SuccessOnlyActive(t *testing.T) { var ( block = &types.BlockIdentifier{ @@ -930,7 +948,13 @@ func TestReconcile_SuccessOnlyActive(t *testing.T) { assert.Equal(t, block2.Index, r.LastIndexReconciled()) mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) - assert.Equal(t, 0, len(r.queueMap)) + assert.True(t, shardsEmpty( + r.queueMap, + []string{ + types.Hash(accountCurrency), + types.Hash(accountCurrency2), + }, + )) mtxn.AssertExpectations(t) mtxn2.AssertExpectations(t) mtxn3.AssertExpectations(t) @@ -2826,7 +2850,13 @@ func TestPruningRaceCondition(t *testing.T) { <-d assert.Equal(t, block2.Index, r.LastIndexReconciled()) - assert.Equal(t, 0, len(r.queueMap)) + assert.True(t, shardsEmpty( + r.queueMap, + []string{ + types.Hash(accountCurrency), + types.Hash(accountCurrency2), + }, + )) mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) mtxn.AssertExpectations(t) @@ -2950,7 +2980,12 @@ func TestPruningHappyPath(t *testing.T) { <-d assert.Equal(t, block2.Index, r.LastIndexReconciled()) - assert.Equal(t, 0, len(r.queueMap)) + assert.True(t, shardsEmpty( + r.queueMap, + []string{ + types.Hash(accountCurrency), + }, + )) mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) mtxn.AssertExpectations(t) @@ -3064,7 +3099,12 @@ func TestPruningReorg(t *testing.T) { <-d assert.Equal(t, blockB.Index, r.LastIndexReconciled()) - assert.Equal(t, 0, len(r.queueMap)) + assert.True(t, shardsEmpty( + r.queueMap, + []string{ + types.Hash(accountCurrency), + }, + )) mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) mtxn.AssertExpectations(t) @@ -3213,7 +3253,12 @@ func TestPruningRaceConditionInactive(t *testing.T) { <-d assert.Equal(t, block.Index, r.LastIndexReconciled()) - assert.Equal(t, 0, len(r.queueMap)) + assert.True(t, shardsEmpty( + r.queueMap, + []string{ + types.Hash(accountCurrency), + }, + )) mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) mtxn.AssertExpectations(t) diff --git a/reconciler/types.go b/reconciler/types.go index 3594d7c80..c694ee607 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -265,6 +265,4 @@ type Reconciler struct { // reconciled. It ensures we don't accidentally attempt to prune // computed balances being used by other goroutines. queueMap *utils.ShardedMap - // queueMap map[string]*utils.BST - // queueMapMutex sync.Mutex } From 09126a836acd1ccc6407e8189ce8549c3a499510 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 08:46:03 -0600 Subject: [PATCH 04/11] fix tests --- reconciler/reconciler.go | 4 ++++ reconciler/reconciler_test.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index d74b2a421..fd438db5f 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -29,6 +29,10 @@ import ( ) const ( + // shardBuffer is multiplied by inactive concurrency + + // active concurrency to determine how many shards should + // be created in the queueMap. The more shards created, + // the less lock contention we will encounter. shardBuffer = 2 ) diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index ce1eb184d..7a98b0c79 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -771,11 +771,11 @@ func shardsEmpty(m *utils.ShardedMap, keys []string) bool { s := m.Lock(k, false) m.Unlock(k) if len(s) > 0 { - return true + return false } } - return false + return true } func TestReconcile_SuccessOnlyActive(t *testing.T) { From b08ab96c172ebbdd8cff798bc9cd1920beebbcc1 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 09:08:26 -0600 Subject: [PATCH 05/11] passing all tests --- reconciler/reconciler.go | 49 +++++++++++++++++++++++++++++------ reconciler/reconciler_test.go | 14 +++++++--- reconciler/types.go | 23 ++++++++++++++++ 3 files changed, 75 insertions(+), 11 deletions(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index fd438db5f..3b24996e5 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -28,14 +28,6 @@ import ( "github.com/coinbase/rosetta-sdk-go/utils" ) -const ( - // shardBuffer is multiplied by inactive concurrency + - // active concurrency to determine how many shards should - // be created in the queueMap. The more shards created, - // the less lock contention we will encounter. - shardBuffer = 2 -) - // New creates a new Reconciler. func New( helper Helper, @@ -56,6 +48,7 @@ func New( inactiveQueueMutex: new(utils.PriorityMutex), backlogSize: defaultBacklogSize, lastIndexChecked: -1, + processQueue: make(chan *blockRequest, processQueueBacklog), } for _, opt := range options { @@ -145,6 +138,42 @@ func (r *Reconciler) QueueChanges( ctx context.Context, block *types.BlockIdentifier, balanceChanges []*parser.BalanceChange, +) error { + // If the processQueue fills up, we block + // until some items are dequeued. + select { + case r.processQueue <- &blockRequest{ + Block: block, + Changes: balanceChanges, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// queueWorker processes items from the processQueue. +func (r *Reconciler) queueWorker(ctx context.Context) error { + for { + select { + case req := <-r.processQueue: + if err := r.queueChanges(ctx, req.Block, req.Changes); err != nil { + return err + } + case <-ctx.Done(): + // We return nil here instead of ctx.Err because + // we don't want to swallow any errors returned by the + // reconciliation threads. + return nil + } + } +} + +// queueChanges processes a block for reconciliation. +func (r *Reconciler) queueChanges( + ctx context.Context, + block *types.BlockIdentifier, + balanceChanges []*parser.BalanceChange, ) error { // Ensure all interestingAccounts are checked for _, account := range r.interestingAccounts { @@ -916,6 +945,10 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit // If any goroutine errors, the function will return an error. func (r *Reconciler) Reconcile(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return r.queueWorker(ctx) + }) + for j := 0; j < r.ActiveConcurrency; j++ { g.Go(func() error { return r.reconcileActiveAccounts(ctx) diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index 7a98b0c79..9dabeecf2 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -1100,7 +1100,8 @@ func TestReconcile_HighWaterMark(t *testing.T) { }, }) assert.NoError(t, err) - assert.Equal(t, r.QueueSize(), 4) // includes interesting accounts + assert.Equal(t, 2, len(r.processQueue)) + assert.Equal(t, 0, r.QueueSize()) // queue size is 0 before starting worker go func() { err := r.Reconcile(ctx) @@ -1110,6 +1111,9 @@ func TestReconcile_HighWaterMark(t *testing.T) { time.Sleep(1 * time.Second) cancel() + assert.Equal(t, 0, len(r.processQueue)) + assert.Equal(t, 0, r.QueueSize()) + mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) mtxn.AssertExpectations(t) @@ -1270,7 +1274,8 @@ func TestReconcile_FailureOnlyActive(t *testing.T) { }, }) assert.NoError(t, err) - assert.Equal(t, r.QueueSize(), 1) + assert.Equal(t, 1, len(r.processQueue)) + assert.Equal(t, 0, r.QueueSize()) // queue size is 0 before starting worker go func() { err := r.Reconcile(ctx) @@ -1279,6 +1284,8 @@ func TestReconcile_FailureOnlyActive(t *testing.T) { }() time.Sleep(1 * time.Second) + assert.Equal(t, 0, len(r.processQueue)) + assert.Equal(t, 0, r.QueueSize()) mockHelper.AssertExpectations(t) mockHandler.AssertExpectations(t) @@ -2215,7 +2222,8 @@ func TestReconcile_EnqueueCancel(t *testing.T) { change, }) assert.NoError(t, err) - assert.Equal(t, r.QueueSize(), 1) + assert.Equal(t, 1, len(r.processQueue)) + assert.Equal(t, 0, r.QueueSize()) // queue size is 0 before starting worker go func() { err := r.Reconcile(ctx) diff --git a/reconciler/types.go b/reconciler/types.go index c694ee607..0e9c4abe2 100644 --- a/reconciler/types.go +++ b/reconciler/types.go @@ -108,6 +108,16 @@ const ( // change that we consider safe to prune. We are very conservative // here to prevent removing balances we may need in a reorg. safeBalancePruneDepth = int64(500) // nolint:gomnd + + // shardBuffer is multiplied by inactive concurrency + + // active concurrency to determine how many shards should + // be created in the queueMap. The more shards created, + // the less lock contention we will encounter. + shardBuffer = 2 + + // processQueueBacklog is the maximum number of blocks + // we can get behind the syncing loop without blocking. + processQueueBacklog = 1000 ) // Helper functions are used by Reconciler to compare @@ -211,6 +221,13 @@ type InactiveEntry struct { LastCheck *types.BlockIdentifier } +// blockRequest is used to enqueue processed +// blocks for reconciliation. +type blockRequest struct { + Block *types.BlockIdentifier + Changes []*parser.BalanceChange +} + // Reconciler contains all logic to reconcile balances of // types.AccountIdentifiers returned in types.Operations // by a Rosetta Server. @@ -265,4 +282,10 @@ type Reconciler struct { // reconciled. It ensures we don't accidentally attempt to prune // computed balances being used by other goroutines. queueMap *utils.ShardedMap + + // processQueue is a buffered channel of recently processed + // blocks that must be parsed for reconciliation. We enqueue + // blocks asynchronously so that we don't slow down the sync + // loop. + processQueue chan *blockRequest } From ba93359133db58c67ca9be46f0357deb5aeea31b Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 10:43:44 -0600 Subject: [PATCH 06/11] cleanup test --- reconciler/reconciler.go | 13 ++++--------- reconciler/reconciler_test.go | 6 ------ storage/modules/counter_storage.go | 2 ++ 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 3b24996e5..21c93354d 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -161,10 +161,7 @@ func (r *Reconciler) queueWorker(ctx context.Context) error { return err } case <-ctx.Done(): - // We return nil here instead of ctx.Err because - // we don't want to swallow any errors returned by the - // reconciliation threads. - return nil + return ctx.Err() } } } @@ -806,11 +803,7 @@ func (r *Reconciler) shouldAttemptInactiveReconciliation( func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit ctx context.Context, ) error { - for { - if ctx.Err() != nil { - return ctx.Err() - } - + for ctx.Err() == nil { r.inactiveQueueMutex.Lock(false) queueLen := len(r.inactiveQueue) if queueLen == 0 { @@ -939,6 +932,8 @@ func (r *Reconciler) reconcileInactiveAccounts( // nolint:gocognit time.Sleep(inactiveReconciliationSleep) } } + + return ctx.Err() } // Reconcile starts the active and inactive Reconciler goroutines. diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index 9dabeecf2..20ee36cd6 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -2123,8 +2123,6 @@ func TestReconcile_FailureOnlyInactive(t *testing.T) { opts..., ) ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - mtxn := &mockDatabase.Transaction{} mtxn.On("Discard", mock.Anything).Once() mockHelper.On("DatabaseTransaction", mock.Anything).Return(mtxn).Once() @@ -2133,10 +2131,6 @@ func TestReconcile_FailureOnlyInactive(t *testing.T) { mtxn2.On( "Discard", mock.Anything, - ).Run( - func(args mock.Arguments) { - cancel() - }, ).Once() mockHelper.On("DatabaseTransaction", mock.Anything).Return(mtxn2).Once() mockReconcilerCalls( diff --git a/storage/modules/counter_storage.go b/storage/modules/counter_storage.go index 7cedd5ae1..159ae9298 100644 --- a/storage/modules/counter_storage.go +++ b/storage/modules/counter_storage.go @@ -110,6 +110,8 @@ func getCounterKey(counter string) []byte { return []byte(fmt.Sprintf("%s/%s", counterNamespace, counter)) } +// BigIntGet attempts to fetch a *big.Int +// from a given key in a database.Transaction. func BigIntGet( ctx context.Context, key []byte, From 266b1ec5315e01a7089b3c677eab650eb07640e3 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 10:44:29 -0600 Subject: [PATCH 07/11] increase pruning frequency --- statefulsyncer/stateful_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statefulsyncer/stateful_syncer.go b/statefulsyncer/stateful_syncer.go index a33b006f7..1218789e9 100644 --- a/statefulsyncer/stateful_syncer.go +++ b/statefulsyncer/stateful_syncer.go @@ -35,7 +35,7 @@ var _ syncer.Helper = (*StatefulSyncer)(nil) const ( // DefaultPruneSleepTime is how long we sleep between // pruning attempts. - DefaultPruneSleepTime = 30 * time.Minute + DefaultPruneSleepTime = 10 * time.Minute // pruneBuffer is the cushion we apply to pastBlockLimit // when pruning. From 27fb8fba97e06102932503fd14d94d0e24fb626d Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 10:49:46 -0600 Subject: [PATCH 08/11] use scoped write transaction in block storage --- storage/modules/block_storage.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index 5ea940ac8..d40a356ee 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -49,6 +49,10 @@ const ( // transactionNamespace is prepended to any stored // transaction. transactionNamespace = "transaction" + + // blockSyncIdentifier is the identifier used to acquire + // a database lock. + blockSyncIdentifier = "blockSyncIdentifier" ) type blockTransaction struct { @@ -185,7 +189,7 @@ func (b *BlockStorage) pruneBlock( // we don't hit the database tx size maximum. As a result, it is possible // that we prune a collection of blocks, encounter an error, and cannot // rollback the pruning operations. - dbTx := b.db.Transaction(ctx) + dbTx := b.db.WriteTransaction(ctx, blockSyncIdentifier, false) defer dbTx.Discard(ctx) oldestIndex, err := b.GetOldestBlockIndexTransactional(ctx, dbTx) @@ -561,7 +565,7 @@ func (b *BlockStorage) AddBlock( ctx context.Context, block *types.Block, ) error { - transaction := b.db.Transaction(ctx) + transaction := b.db.WriteTransaction(ctx, blockSyncIdentifier, true) defer transaction.Discard(ctx) // Store all transactions in order and check for duplicates @@ -672,7 +676,7 @@ func (b *BlockStorage) RemoveBlock( ctx context.Context, blockIdentifier *types.BlockIdentifier, ) error { - transaction := b.db.Transaction(ctx) + transaction := b.db.WriteTransaction(ctx, blockSyncIdentifier, true) defer transaction.Discard(ctx) block, err := b.GetBlockTransactional( From 687fd1e2983b66112e9b1e31e648776ce83f972a Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 10:52:02 -0600 Subject: [PATCH 09/11] use limited error group in block storage --- storage/modules/block_storage.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/storage/modules/block_storage.go b/storage/modules/block_storage.go index d40a356ee..4d41bafd9 100644 --- a/storage/modules/block_storage.go +++ b/storage/modules/block_storage.go @@ -19,9 +19,10 @@ import ( "errors" "fmt" "log" + "runtime" "strconv" - "golang.org/x/sync/errgroup" + "github.com/neilotoole/errgroup" "github.com/coinbase/rosetta-sdk-go/storage/database" "github.com/coinbase/rosetta-sdk-go/storage/encoder" @@ -97,7 +98,8 @@ type BlockWorker interface { // BlockStorage implements block specific storage methods // on top of a database.Database and database.Transaction interface. type BlockStorage struct { - db database.Database + db database.Database + numCPU int workers []BlockWorker } @@ -107,7 +109,8 @@ func NewBlockStorage( db database.Database, ) *BlockStorage { return &BlockStorage{ - db: db, + db: db, + numCPU: runtime.NumCPU(), } } @@ -606,7 +609,7 @@ func (b *BlockStorage) AddBlock( return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err) } - g, gctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range block.Transactions { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration @@ -689,7 +692,7 @@ func (b *BlockStorage) RemoveBlock( } // Remove all transaction hashes - g, gctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU) for i := range block.Transactions { // We need to set variable before calling goroutine // to avoid getting an updated pointer as loop iteration From 45ea3f6fcd3a861c928335c5f1a3fb5603ae8c3c Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 10:53:49 -0600 Subject: [PATCH 10/11] use limited errgroup in CoinStorage --- storage/modules/coin_storage.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/storage/modules/coin_storage.go b/storage/modules/coin_storage.go index b7a3f1f77..ac9e8c582 100644 --- a/storage/modules/coin_storage.go +++ b/storage/modules/coin_storage.go @@ -18,9 +18,10 @@ import ( "context" "fmt" "math/big" + "runtime" "strings" - "golang.org/x/sync/errgroup" + "github.com/neilotoole/errgroup" "github.com/coinbase/rosetta-sdk-go/asserter" "github.com/coinbase/rosetta-sdk-go/storage/database" @@ -39,7 +40,8 @@ var _ BlockWorker = (*CoinStorage)(nil) // CoinStorage implements storage methods for storing // UTXOs. type CoinStorage struct { - db database.Database + db database.Database + numCPU int helper CoinStorageHelper asserter *asserter.Asserter @@ -65,6 +67,7 @@ func NewCoinStorage( ) *CoinStorage { return &CoinStorage{ db: db, + numCPU: runtime.NumCPU(), helper: helper, asserter: asserter, } @@ -295,7 +298,7 @@ func (c *CoinStorage) updateCoins( // nolint:gocognit } } - g, gctx := errgroup.WithContext(ctx) + g, gctx := errgroup.WithContextN(ctx, c.numCPU, c.numCPU) for identifier, val := range addCoins { if _, ok := removeCoins[identifier]; ok { continue From dcc06b11198343d5ee071e233e955bacf2a05ee1 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 1 Dec 2020 11:28:18 -0600 Subject: [PATCH 11/11] use segmented transaction for counter writes --- storage/modules/counter_storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/modules/counter_storage.go b/storage/modules/counter_storage.go index 159ae9298..461ec4df8 100644 --- a/storage/modules/counter_storage.go +++ b/storage/modules/counter_storage.go @@ -163,7 +163,7 @@ func (c *CounterStorage) Update( counter string, amount *big.Int, ) (*big.Int, error) { - dbTx := c.db.Transaction(ctx) + dbTx := c.db.WriteTransaction(ctx, counter, false) defer dbTx.Discard(ctx) newVal, err := c.UpdateTransactional(ctx, dbTx, counter, amount)