diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index f1ca36461ef..187815be55d 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -58,6 +58,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit ClientID: options.ClientID, ProtocolVersion: options.ProtocolVersion, AuthenticationConfig: options.AuthenticationConfig, + FetchMaxMessageBytes: options.FetchMaxMessageBytes, } saramaConsumer, err := consumerConfig.NewConsumer(logger) if err != nil { diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index ec84344d99e..ac4cbd79ea8 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -39,6 +39,8 @@ const ( SuffixTopic = ".topic" // SuffixRackID is a suffix for the consumer rack-id flag SuffixRackID = ".rack-id" + // SuffixFetchMaxMessageBytes is a suffix for the consumer fetch-max-message-bytes flag + SuffixFetchMaxMessageBytes = ".fetch-max-message-bytes" // SuffixGroupID is a suffix for the group-id flag SuffixGroupID = ".group-id" // SuffixClientID is a suffix for the client-id flag @@ -67,6 +69,8 @@ const ( DefaultEncoding = kafka.EncodingProto // DefaultDeadlockInterval is the default deadlock interval DefaultDeadlockInterval = time.Duration(0) + // DefaultFetchMaxMessageBytes is the default for kafka.consumer.fetch-max-message-bytes flag + DefaultFetchMaxMessageBytes = 1024 * 1024 // 1MB ) // Options stores the configuration options for the Ingester @@ -117,6 +121,10 @@ func AddFlags(flagSet *flag.FlagSet) { KafkaConsumerConfigPrefix+SuffixRackID, "", "Rack identifier for this client. This can be any string value which indicates where this client is located. It corresponds with the broker config `broker.rack`") + flagSet.Int( + KafkaConsumerConfigPrefix+SuffixFetchMaxMessageBytes, + DefaultFetchMaxMessageBytes, + "The maximum number of message bytes to fetch from the broker in a single request. So you must be sure this is at least as large as your largest message.") auth.AddFlags(KafkaConsumerConfigPrefix, flagSet) } @@ -130,6 +138,7 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion) o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) o.RackID = v.GetString(KafkaConsumerConfigPrefix + SuffixRackID) + o.FetchMaxMessageBytes = v.GetInt32(KafkaConsumerConfigPrefix + SuffixFetchMaxMessageBytes) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 456a82fa165..bb6cf549014 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -38,6 +38,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.consumer.group-id=group1", "--kafka.consumer.client-id=client-id1", "--kafka.consumer.rack-id=rack1", + "--kafka.consumer.fetch-max-message-bytes=10485760", "--kafka.consumer.encoding=json", "--kafka.consumer.protocol-version=1.0.0", "--ingester.parallelism=5", @@ -49,6 +50,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) assert.Equal(t, "rack1", o.RackID) + assert.Equal(t, int32(10485760), o.FetchMaxMessageBytes) assert.Equal(t, "client-id1", o.ClientID) assert.Equal(t, "1.0.0", o.ProtocolVersion) assert.Equal(t, 5, o.Parallelism) @@ -108,6 +110,7 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, DefaultGroupID, o.GroupID) assert.Equal(t, DefaultClientID, o.ClientID) assert.Equal(t, DefaultParallelism, o.Parallelism) + assert.Equal(t, int32(DefaultFetchMaxMessageBytes), o.FetchMaxMessageBytes) assert.Equal(t, DefaultEncoding, o.Encoding) assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval) } diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 8c849b6b228..402ffeedd56 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -42,13 +42,14 @@ type Configuration struct { auth.AuthenticationConfig `mapstructure:"authentication"` Consumer - Brokers []string `mapstructure:"brokers"` - Topic string `mapstructure:"topic"` - InitialOffset int64 - GroupID string `mapstructure:"group_id"` - ClientID string `mapstructure:"client_id"` - ProtocolVersion string `mapstructure:"protocol_version"` - RackID string `mapstructure:"rack_id"` + Brokers []string `mapstructure:"brokers"` + Topic string `mapstructure:"topic"` + InitialOffset int64 + GroupID string `mapstructure:"group_id"` + ClientID string `mapstructure:"client_id"` + ProtocolVersion string `mapstructure:"protocol_version"` + RackID string `mapstructure:"rack_id"` + FetchMaxMessageBytes int32 `mapstructure:"fetch_max_message_bytes"` } // NewConsumer creates a new kafka consumer @@ -57,6 +58,7 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID saramaConfig.RackID = c.RackID + saramaConfig.Consumer.Fetch.Default = c.FetchMaxMessageBytes if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil {