Skip to content

Commit 5d73ed7

Browse files
increase limit by multiplier
1 parent 1caffcf commit 5d73ed7

File tree

4 files changed

+25
-12
lines changed

4 files changed

+25
-12
lines changed

storage/modules/balance_storage.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"runtime"
2424
"sync"
2525

26-
"github.com/neilotoole/errgroup"
27-
2826
"github.com/coinbase/rosetta-sdk-go/asserter"
2927
"github.com/coinbase/rosetta-sdk-go/parser"
3028
"github.com/coinbase/rosetta-sdk-go/storage/database"
@@ -198,7 +196,7 @@ func (b *BalanceStorage) AddingBlock(
198196
var newAccountsLock sync.Mutex
199197

200198
// Concurrent execution limited to runtime.NumCPU
201-
g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU)
199+
g, gctx := utils.ErrGroupWithContext(ctx, b.numCPU)
202200
for i := range changes {
203201
// We need to set variable before calling goroutine
204202
// to avoid getting an updated pointer as loop iteration
@@ -273,7 +271,7 @@ func (b *BalanceStorage) RemovingBlock(
273271
var staleAccountsMutex sync.Mutex
274272

275273
// Concurrent execution limited to runtime.NumCPU
276-
g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU)
274+
g, gctx := utils.ErrGroupWithContext(ctx, b.numCPU)
277275
for i := range changes {
278276
// We need to set variable before calling goroutine
279277
// to avoid getting an updated pointer as loop iteration

storage/modules/block_storage.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"strconv"
2424
"strings"
2525

26-
"github.com/neilotoole/errgroup"
27-
2826
"github.com/coinbase/rosetta-sdk-go/storage/database"
2927
"github.com/coinbase/rosetta-sdk-go/storage/encoder"
3028
storageErrs "github.com/coinbase/rosetta-sdk-go/storage/errors"
@@ -252,7 +250,7 @@ func (b *BlockStorage) pruneBlock(
252250
blockIdentifier := blockResponse.Block.BlockIdentifier
253251

254252
// Remove all transaction hashes
255-
g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU)
253+
g, gctx := utils.ErrGroupWithContext(ctx, b.numCPU)
256254
for i := range blockResponse.OtherTransactions {
257255
// We need to set variable before calling goroutine
258256
// to avoid getting an updated pointer as loop iteration
@@ -644,7 +642,7 @@ func (b *BlockStorage) AddBlock(
644642
return fmt.Errorf("%w: %v", storageErrs.ErrBlockStoreFailed, err)
645643
}
646644

647-
g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU)
645+
g, gctx := utils.ErrGroupWithContext(ctx, b.numCPU)
648646
for i := range block.Transactions {
649647
// We need to set variable before calling goroutine
650648
// to avoid getting an updated pointer as loop iteration
@@ -727,7 +725,7 @@ func (b *BlockStorage) RemoveBlock(
727725
}
728726

729727
// Remove all transaction hashes
730-
g, gctx := errgroup.WithContextN(ctx, b.numCPU, b.numCPU)
728+
g, gctx := utils.ErrGroupWithContext(ctx, b.numCPU)
731729
for i := range block.Transactions {
732730
// We need to set variable before calling goroutine
733731
// to avoid getting an updated pointer as loop iteration

storage/modules/coin_storage.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"runtime"
2222
"strings"
2323

24-
"github.com/neilotoole/errgroup"
25-
2624
"github.com/coinbase/rosetta-sdk-go/asserter"
2725
"github.com/coinbase/rosetta-sdk-go/storage/database"
2826
"github.com/coinbase/rosetta-sdk-go/storage/errors"
@@ -298,7 +296,7 @@ func (c *CoinStorage) updateCoins( // nolint:gocognit
298296
}
299297
}
300298

301-
g, gctx := errgroup.WithContextN(ctx, c.numCPU, c.numCPU)
299+
g, gctx := utils.ErrGroupWithContext(ctx, c.numCPU)
302300
for identifier, val := range addCoins {
303301
if _, ok := removeCoins[identifier]; ok {
304302
continue

utils/errgroup.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
6+
"github.com/neilotoole/errgroup"
7+
)
8+
9+
const (
10+
workerMultiplier = 3
11+
queueMultiplier = 20
12+
)
13+
14+
// ErrGroupWithContext returns a drop-in replacement for `sync/errgroup` that
15+
// limits the number of concurrently running goroutines to some function
16+
// of the number of CPUs.
17+
func ErrGroupWithContext(ctx context.Context, numCPU int) (*errgroup.Group, context.Context) {
18+
return errgroup.WithContextN(ctx, numCPU*workerMultiplier, numCPU*queueMultiplier)
19+
}

0 commit comments

Comments
 (0)