Skip to content

Commit a0049fe

Browse files
authored
Make logs subscription channel size configurable (#9810)
This PR makes the channel that is used to send logs to subscriptions configurable so logs are not dropped when the channel gets filled. See issue 9699. This is just an initial version since I wanted to gather some feedback and was unsure if this is the correct approach to solve this.
1 parent 3829bfe commit a0049fe

19 files changed

+81
-71
lines changed

cmd/rpcdaemon/cli/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
145145
rootCmd.PersistentFlags().IntVar(&cfg.MaxGetProofRewindBlockCount, utils.RpcMaxGetProofRewindBlockCount.Name, utils.RpcMaxGetProofRewindBlockCount.Value, utils.RpcMaxGetProofRewindBlockCount.Usage)
146146
rootCmd.PersistentFlags().Uint64Var(&cfg.OtsMaxPageSize, utils.OtsSearchMaxCapFlag.Name, utils.OtsSearchMaxCapFlag.Value, utils.OtsSearchMaxCapFlag.Usage)
147147
rootCmd.PersistentFlags().DurationVar(&cfg.RPCSlowLogThreshold, utils.RPCSlowFlag.Name, utils.RPCSlowFlag.Value, utils.RPCSlowFlag.Usage)
148+
rootCmd.PersistentFlags().IntVar(&cfg.WebsocketSubscribeLogsChannelSize, utils.WSSubscribeLogsChannelSize.Name, utils.WSSubscribeLogsChannelSize.Value, utils.WSSubscribeLogsChannelSize.Usage)
148149

149150
if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil {
150151
panic(err)

cmd/rpcdaemon/cli/httpcfg/http_cfg.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,22 @@ type HttpCfg struct {
4040
AuthRpcPort int
4141
PrivateApiAddr string
4242

43-
API []string
44-
Gascap uint64
45-
MaxTraces uint64
46-
WebsocketPort int
47-
WebsocketEnabled bool
48-
WebsocketCompression bool
49-
RpcAllowListFilePath string
50-
RpcBatchConcurrency uint
51-
RpcStreamingDisable bool
52-
DBReadConcurrency int
53-
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
54-
TxPoolApiAddr string
55-
StateCache kvcache.CoherentConfig
56-
Snap ethconfig.BlocksFreezing
57-
Sync ethconfig.Sync
43+
API []string
44+
Gascap uint64
45+
MaxTraces uint64
46+
WebsocketPort int
47+
WebsocketEnabled bool
48+
WebsocketCompression bool
49+
WebsocketSubscribeLogsChannelSize int
50+
RpcAllowListFilePath string
51+
RpcBatchConcurrency uint
52+
RpcStreamingDisable bool
53+
DBReadConcurrency int
54+
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
55+
TxPoolApiAddr string
56+
StateCache kvcache.CoherentConfig
57+
Snap ethconfig.BlocksFreezing
58+
Sync ethconfig.Sync
5859

5960
// GRPC server
6061
GRPCServerEnabled bool

cmd/utils/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,11 @@ var (
489489
Usage: "HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths.",
490490
Value: "",
491491
}
492+
WSSubscribeLogsChannelSize = cli.IntFlag{
493+
Name: "ws.api.subscribelogs.channelsize",
494+
Usage: "Size of the channel used for websocket logs subscriptions",
495+
Value: 8192,
496+
}
492497
ExecFlag = cli.StringFlag{
493498
Name: "exec",
494499
Usage: "Execute JavaScript statement",

turbo/cli/flags.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -476,22 +476,23 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
476476
WriteTimeout: ctx.Duration(AuthRpcWriteTimeoutFlag.Name),
477477
IdleTimeout: ctx.Duration(HTTPIdleTimeoutFlag.Name),
478478
},
479-
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
480-
OverlayGetLogsTimeout: ctx.Duration(OverlayGetLogsFlag.Name),
481-
OverlayReplayBlockTimeout: ctx.Duration(OverlayReplayBlockFlag.Name),
482-
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
483-
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
484-
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
485-
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
486-
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
487-
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
488-
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
489-
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
490-
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
491-
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
492-
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
493-
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
494-
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
479+
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
480+
OverlayGetLogsTimeout: ctx.Duration(OverlayGetLogsFlag.Name),
481+
OverlayReplayBlockTimeout: ctx.Duration(OverlayReplayBlockFlag.Name),
482+
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
483+
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
484+
WebsocketSubscribeLogsChannelSize: ctx.Int(utils.WSSubscribeLogsChannelSize.Name),
485+
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
486+
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
487+
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
488+
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
489+
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
490+
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
491+
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
492+
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
493+
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
494+
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
495+
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
495496

496497
OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),
497498

turbo/engineapi/engine_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (e *EngineServer) Start(
8989
) {
9090
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)
9191

92-
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, e.logger)
92+
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)
9393

9494
// engineImpl := NewEngineAPI(base, db, engineBackend)
9595
// e.startEngineMessageHandler()

turbo/jsonrpc/corner_cases_support_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestNotFoundMustReturnNil(t *testing.T) {
1818
require := require.New(t)
1919
m, _, _ := rpcdaemontest.CreateTestSentry(t)
2020
api := NewEthAPI(newBaseApiForTest(m),
21-
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
21+
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
2222
ctx := context.Background()
2323

2424
a, err := api.GetTransactionByBlockNumberAndIndex(ctx, 10_000, 1)

turbo/jsonrpc/daemon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
2222
logger log.Logger,
2323
) (list []rpc.API) {
2424
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
25-
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, logger)
25+
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, cfg.WebsocketSubscribeLogsChannelSize, logger)
2626
erigonImpl := NewErigonAPI(base, db, eth)
2727
txpoolImpl := NewTxPoolAPI(base, db, txPool)
2828
netImpl := NewNetAPIImpl(eth)

turbo/jsonrpc/debug_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestTraceBlockByNumber(t *testing.T) {
5252
agg := m.HistoryV3Components()
5353
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
5454
baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs)
55-
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
55+
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
5656
api := NewPrivateDebugAPI(baseApi, m.DB, 0)
5757
for _, tt := range debugTraceTransactionTests {
5858
var buf bytes.Buffer
@@ -97,7 +97,7 @@ func TestTraceBlockByNumber(t *testing.T) {
9797

9898
func TestTraceBlockByHash(t *testing.T) {
9999
m, _, _ := rpcdaemontest.CreateTestSentry(t)
100-
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
100+
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
101101
api := NewPrivateDebugAPI(newBaseApiForTest(m), m.DB, 0)
102102
for _, tt := range debugTraceTransactionTests {
103103
var buf bytes.Buffer

turbo/jsonrpc/erigon_receipts_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestGetLogs(t *testing.T) {
2929
assert := assert.New(t)
3030
m, _, _ := rpcdaemontest.CreateTestSentry(t)
3131
{
32-
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
32+
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
3333

3434
logs, err := ethApi.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(10)})
3535
assert.NoError(err)

turbo/jsonrpc/eth_api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,11 +328,12 @@ type APIImpl struct {
328328
ReturnDataLimit int
329329
AllowUnprotectedTxs bool
330330
MaxGetProofRewindBlockCount int
331+
SubscribeLogsChannelSize int
331332
logger log.Logger
332333
}
333334

334335
// NewEthAPI returns APIImpl instance
335-
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, logger log.Logger) *APIImpl {
336+
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, subscribeLogsChannelSize int, logger log.Logger) *APIImpl {
336337
if gascap == 0 {
337338
gascap = uint64(math.MaxUint64 / 2)
338339
}
@@ -348,6 +349,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
348349
AllowUnprotectedTxs: allowUnprotectedTxs,
349350
ReturnDataLimit: returnDataLimit,
350351
MaxGetProofRewindBlockCount: maxGetProofRewindBlockCount,
352+
SubscribeLogsChannelSize: subscribeLogsChannelSize,
351353
logger: logger,
352354
}
353355
}

0 commit comments

Comments
 (0)