Skip to content

Commit 187827a

Browse files
committed
Make logs subscription channel size configurable
1 parent 010b230 commit 187827a

19 files changed

+79
-69
lines changed

cmd/rpcdaemon/cli/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func RootCommand() (*cobra.Command, *httpcfg.HttpCfg) {
143143
rootCmd.PersistentFlags().IntVar(&cfg.MaxGetProofRewindBlockCount, utils.RpcMaxGetProofRewindBlockCount.Name, utils.RpcMaxGetProofRewindBlockCount.Value, utils.RpcMaxGetProofRewindBlockCount.Usage)
144144
rootCmd.PersistentFlags().Uint64Var(&cfg.OtsMaxPageSize, utils.OtsSearchMaxCapFlag.Name, utils.OtsSearchMaxCapFlag.Value, utils.OtsSearchMaxCapFlag.Usage)
145145
rootCmd.PersistentFlags().DurationVar(&cfg.RPCSlowLogThreshold, utils.RPCSlowFlag.Name, utils.RPCSlowFlag.Value, utils.RPCSlowFlag.Usage)
146+
rootCmd.PersistentFlags().IntVar(&cfg.WebsocketSubscribeLogsChannelSize, utils.WSSubscribeLogsChannelSize.Name, utils.WSSubscribeLogsChannelSize.Value, utils.WSSubscribeLogsChannelSize.Usage)
146147

147148
if err := rootCmd.MarkPersistentFlagFilename("rpc.accessList", "json"); err != nil {
148149
panic(err)

cmd/rpcdaemon/cli/httpcfg/http_cfg.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,22 @@ type HttpCfg struct {
4040
AuthRpcPort int
4141
PrivateApiAddr string
4242

43-
API []string
44-
Gascap uint64
45-
MaxTraces uint64
46-
WebsocketPort int
47-
WebsocketEnabled bool
48-
WebsocketCompression bool
49-
RpcAllowListFilePath string
50-
RpcBatchConcurrency uint
51-
RpcStreamingDisable bool
52-
DBReadConcurrency int
53-
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
54-
TxPoolApiAddr string
55-
StateCache kvcache.CoherentConfig
56-
Snap ethconfig.BlocksFreezing
57-
Sync ethconfig.Sync
43+
API []string
44+
Gascap uint64
45+
MaxTraces uint64
46+
WebsocketPort int
47+
WebsocketEnabled bool
48+
WebsocketCompression bool
49+
WebsocketSubscribeLogsChannelSize int
50+
RpcAllowListFilePath string
51+
RpcBatchConcurrency uint
52+
RpcStreamingDisable bool
53+
DBReadConcurrency int
54+
TraceCompatibility bool // Bug for bug compatibility for trace_ routines with OpenEthereum
55+
TxPoolApiAddr string
56+
StateCache kvcache.CoherentConfig
57+
Snap ethconfig.BlocksFreezing
58+
Sync ethconfig.Sync
5859

5960
// GRPC server
6061
GRPCServerEnabled bool

cmd/utils/flags.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,11 @@ var (
488488
Usage: "HTTP path prefix on which JSON-RPC is served. Use '/' to serve on all paths.",
489489
Value: "",
490490
}
491+
WSSubscribeLogsChannelSize = cli.IntFlag{
492+
Name: "ws.api.subscribelogs.channelsize",
493+
Usage: "Size of the channel used for websocket logs subscriptions",
494+
Value: 128,
495+
}
491496
ExecFlag = cli.StringFlag{
492497
Name: "exec",
493498
Usage: "Execute JavaScript statement",

turbo/cli/flags.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -464,20 +464,21 @@ func setEmbeddedRpcDaemon(ctx *cli.Context, cfg *nodecfg.Config, logger log.Logg
464464
WriteTimeout: ctx.Duration(AuthRpcWriteTimeoutFlag.Name),
465465
IdleTimeout: ctx.Duration(HTTPIdleTimeoutFlag.Name),
466466
},
467-
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
468-
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
469-
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
470-
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
471-
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
472-
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
473-
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
474-
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
475-
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
476-
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
477-
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
478-
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
479-
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
480-
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
467+
EvmCallTimeout: ctx.Duration(EvmCallTimeoutFlag.Name),
468+
WebsocketPort: ctx.Int(utils.WSPortFlag.Name),
469+
WebsocketEnabled: ctx.IsSet(utils.WSEnabledFlag.Name),
470+
WebsocketSubscribeLogsChannelSize: ctx.Int(utils.WSSubscribeLogsChannelSize.Name),
471+
RpcBatchConcurrency: ctx.Uint(utils.RpcBatchConcurrencyFlag.Name),
472+
RpcStreamingDisable: ctx.Bool(utils.RpcStreamingDisableFlag.Name),
473+
DBReadConcurrency: ctx.Int(utils.DBReadConcurrencyFlag.Name),
474+
RpcAllowListFilePath: ctx.String(utils.RpcAccessListFlag.Name),
475+
Gascap: ctx.Uint64(utils.RpcGasCapFlag.Name),
476+
MaxTraces: ctx.Uint64(utils.TraceMaxtracesFlag.Name),
477+
TraceCompatibility: ctx.Bool(utils.RpcTraceCompatFlag.Name),
478+
BatchLimit: ctx.Int(utils.RpcBatchLimit.Name),
479+
ReturnDataLimit: ctx.Int(utils.RpcReturnDataLimit.Name),
480+
AllowUnprotectedTxs: ctx.Bool(utils.AllowUnprotectedTxs.Name),
481+
MaxGetProofRewindBlockCount: ctx.Int(utils.RpcMaxGetProofRewindBlockCount.Name),
481482

482483
OtsMaxPageSize: ctx.Uint64(utils.OtsSearchMaxCapFlag.Name),
483484

turbo/engineapi/engine_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (e *EngineServer) Start(
8989
) {
9090
base := jsonrpc.NewBaseApi(filters, stateCache, blockReader, agg, httpConfig.WithDatadir, httpConfig.EvmCallTimeout, engineReader, httpConfig.Dirs)
9191

92-
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, e.logger)
92+
ethImpl := jsonrpc.NewEthAPI(base, db, eth, txPool, mining, httpConfig.Gascap, httpConfig.ReturnDataLimit, httpConfig.AllowUnprotectedTxs, httpConfig.MaxGetProofRewindBlockCount, httpConfig.WebsocketSubscribeLogsChannelSize, e.logger)
9393

9494
// engineImpl := NewEngineAPI(base, db, engineBackend)
9595
// e.startEngineMessageHandler()

turbo/jsonrpc/corner_cases_support_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestNotFoundMustReturnNil(t *testing.T) {
1818
require := require.New(t)
1919
m, _, _ := rpcdaemontest.CreateTestSentry(t)
2020
api := NewEthAPI(newBaseApiForTest(m),
21-
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
21+
m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
2222
ctx := context.Background()
2323

2424
a, err := api.GetTransactionByBlockNumberAndIndex(ctx, 10_000, 1)

turbo/jsonrpc/daemon.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
2222
logger log.Logger,
2323
) (list []rpc.API) {
2424
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, cfg.Dirs)
25-
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, logger)
25+
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, cfg.AllowUnprotectedTxs, cfg.MaxGetProofRewindBlockCount, cfg.WebsocketSubscribeLogsChannelSize, logger)
2626
erigonImpl := NewErigonAPI(base, db, eth)
2727
txpoolImpl := NewTxPoolAPI(base, db, txPool)
2828
netImpl := NewNetAPIImpl(eth)

turbo/jsonrpc/debug_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestTraceBlockByNumber(t *testing.T) {
5252
agg := m.HistoryV3Components()
5353
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
5454
baseApi := NewBaseApi(nil, stateCache, m.BlockReader, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine, m.Dirs)
55-
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
55+
ethApi := NewEthAPI(baseApi, m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
5656
api := NewPrivateDebugAPI(baseApi, m.DB, 0)
5757
for _, tt := range debugTraceTransactionTests {
5858
var buf bytes.Buffer
@@ -97,7 +97,7 @@ func TestTraceBlockByNumber(t *testing.T) {
9797

9898
func TestTraceBlockByHash(t *testing.T) {
9999
m, _, _ := rpcdaemontest.CreateTestSentry(t)
100-
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
100+
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
101101
api := NewPrivateDebugAPI(newBaseApiForTest(m), m.DB, 0)
102102
for _, tt := range debugTraceTransactionTests {
103103
var buf bytes.Buffer

turbo/jsonrpc/erigon_receipts_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestGetLogs(t *testing.T) {
2929
assert := assert.New(t)
3030
m, _, _ := rpcdaemontest.CreateTestSentry(t)
3131
{
32-
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, log.New())
32+
ethApi := NewEthAPI(newBaseApiForTest(m), m.DB, nil, nil, nil, 5000000, 100_000, false, 100_000, 128, log.New())
3333

3434
logs, err := ethApi.GetLogs(context.Background(), filters.FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(10)})
3535
assert.NoError(err)

turbo/jsonrpc/eth_api.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,11 +328,12 @@ type APIImpl struct {
328328
ReturnDataLimit int
329329
AllowUnprotectedTxs bool
330330
MaxGetProofRewindBlockCount int
331+
SubscribeLogsChannelSize int
331332
logger log.Logger
332333
}
333334

334335
// NewEthAPI returns APIImpl instance
335-
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, logger log.Logger) *APIImpl {
336+
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int, allowUnprotectedTxs bool, maxGetProofRewindBlockCount int, subscribeLogsChannelSize int, logger log.Logger) *APIImpl {
336337
if gascap == 0 {
337338
gascap = uint64(math.MaxUint64 / 2)
338339
}
@@ -348,6 +349,7 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpoo
348349
AllowUnprotectedTxs: allowUnprotectedTxs,
349350
ReturnDataLimit: returnDataLimit,
350351
MaxGetProofRewindBlockCount: maxGetProofRewindBlockCount,
352+
SubscribeLogsChannelSize: subscribeLogsChannelSize,
351353
logger: logger,
352354
}
353355
}

0 commit comments

Comments
 (0)