diff --git a/.chloggen/feat_41480.yaml b/.chloggen/feat_41480.yaml new file mode 100644 index 0000000000000..771cc1414a0d7 --- /dev/null +++ b/.chloggen/feat_41480.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "receiver/kafkametricsreceiver" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for using franz-go client under a feature gate" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41480] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 45f15256bd2e2..b81a3fb2c1782 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -97,6 +97,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect + github.com/twmb/franz-go/pkg/kadm v1.16.1 // indirect github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect diff --git a/extension/observer/kafkatopicsobserver/go.mod b/extension/observer/kafkatopicsobserver/go.mod index 27c47ed9e1b78..2d264660a8d73 100644 --- a/extension/observer/kafkatopicsobserver/go.mod +++ b/extension/observer/kafkatopicsobserver/go.mod @@ -68,6 +68,7 @@ require ( github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/franz-go v1.19.5 // indirect + github.com/twmb/franz-go/pkg/kadm v1.16.1 // indirect github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect diff --git a/extension/observer/kafkatopicsobserver/go.sum b/extension/observer/kafkatopicsobserver/go.sum index 67b033da6c8fd..b40ce2e87f2ee 100644 --- a/extension/observer/kafkatopicsobserver/go.sum +++ b/extension/observer/kafkatopicsobserver/go.sum @@ -142,6 +142,8 @@ github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro= github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw= +github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250729165834-29dc44e616cd h1:NFxge3WnAb3kSHroE2RAlbFBCb1ED2ii4nQ0arr38Gs= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250729165834-29dc44e616cd/go.mod h1:udxwmMC3r4xqjwrSrMi8p9jpqMDNpC2YwexpDSUmQtw= github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= diff --git a/internal/kafka/franz_client.go b/internal/kafka/franz_client.go index cb840cf7d6f1a..1760a7d693658 100644 --- a/internal/kafka/franz_client.go +++ b/internal/kafka/franz_client.go @@ -14,6 +14,7 @@ import ( krb5client "github.com/jcmturner/gokrb5/v8/client" krb5config "github.com/jcmturner/gokrb5/v8/config" "github.com/jcmturner/gokrb5/v8/keytab" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" @@ -179,6 +180,39 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf return kgo.NewClient(opts...) } +// NewFranzClient creates a franz-go client using the same commonOpts used for producer/consumer. +func NewFranzClient( + ctx context.Context, + clientCfg configkafka.ClientConfig, + logger *zap.Logger, + opts ...kgo.Opt, +) (*kgo.Client, error) { + opts, err := commonOpts(ctx, clientCfg, logger, opts...) + if err != nil { + return nil, err + } + return kgo.NewClient(opts...) +} + +// NewFranzClusterAdminClient creates a kadm admin client from a freshly created franz client. +func NewFranzClusterAdminClient( + ctx context.Context, + clientCfg configkafka.ClientConfig, + logger *zap.Logger, + opts ...kgo.Opt, +) (*kadm.Client, *kgo.Client, error) { + cl, err := NewFranzClient(ctx, clientCfg, logger, opts...) + if err != nil { + return nil, nil, err + } + return kadm.NewClient(cl), cl, nil +} + +// NewFranzAdminFromClient returns a kadm admin bound to an existing kgo client. +func NewFranzAdminFromClient(cl *kgo.Client) *kadm.Client { + return kadm.NewClient(cl) +} + func commonOpts(ctx context.Context, clientCfg configkafka.ClientConfig, logger *zap.Logger, opts ...kgo.Opt, diff --git a/internal/kafka/franz_client_test.go b/internal/kafka/franz_client_test.go index 8ee01f7d4a427..364cc417723f9 100644 --- a/internal/kafka/franz_client_test.go +++ b/internal/kafka/franz_client_test.go @@ -594,3 +594,25 @@ func TestFranzClient_ProtocolVersion(t *testing.T) { }) } } + +func TestNewFranzClient_And_Admin(t *testing.T) { + _, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, "meta-topic")) + tl := zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel)) + + // Plain client + cl, err := NewFranzClient(t.Context(), clientCfg, tl) + require.NoError(t, err) + t.Cleanup(cl.Close) + + // Admin from fresh client + ad, cl2, err := NewFranzClusterAdminClient(t.Context(), clientCfg, tl) + require.NoError(t, err) + t.Cleanup(func() { ad.Close(); cl2.Close() }) + + // Metadata via admin should return brokers & topic + md, err := ad.Metadata(t.Context(), "meta-topic") + require.NoError(t, err) + assert.NotEmpty(t, md.Brokers) + _, ok := md.Topics["meta-topic"] + assert.True(t, ok) +} diff --git a/internal/kafka/go.mod b/internal/kafka/go.mod index 75346f13a3b47..564afac7353fc 100644 --- a/internal/kafka/go.mod +++ b/internal/kafka/go.mod @@ -58,7 +58,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/twmb/franz-go v1.19.5 - github.com/twmb/franz-go/pkg/kadm v1.16.1 // indirect + github.com/twmb/franz-go/pkg/kadm v1.16.1 github.com/twmb/franz-go/pkg/kmsg v1.11.2 github.com/twmb/franz-go/plugin/kzap v1.1.2 github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/receiver/kafkametricsreceiver/README.md b/receiver/kafkametricsreceiver/README.md index 86c9a3c0f3748..035912e21cb42 100644 --- a/receiver/kafkametricsreceiver/README.md +++ b/receiver/kafkametricsreceiver/README.md @@ -24,13 +24,18 @@ This receiver supports Kafka versions: ## Getting Started +> [!NOTE] +> You can opt-in to use [`franz-go`](https://github.com/twmb/franz-go) client by enabling the feature gate +> `receiver.kafkametricsreceiver.UseFranzGo` when you run the OpenTelemetry Collector. See the following page +> for more details: [Feature Gates](https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate#controlling-gates) + Required settings (no defaults): - `scrapers`: any combination of the following scrapers can be enabled. - `topics` - `consumers` - `brokers` - + Metrics collected by the associated scraper are listed in [metadata.yaml](metadata.yaml) Optional Settings (with defaults): diff --git a/receiver/kafkametricsreceiver/broker_scraper_franz.go b/receiver/kafkametricsreceiver/broker_scraper_franz.go new file mode 100644 index 0000000000000..e11710fbf5bc5 --- /dev/null +++ b/receiver/kafkametricsreceiver/broker_scraper_franz.go @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver" + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" +) + +type brokerScraperFranz struct { + // franz-go handles (lazy created on first scrape) + adm *kadm.Client + cl *kgo.Client + + settings receiver.Settings + config Config + mb *metadata.MetricsBuilder +} + +func (s *brokerScraperFranz) start(_ context.Context, _ component.Host) error { + s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings) + return nil +} + +func (s *brokerScraperFranz) shutdown(context.Context) error { + if s.adm != nil { + s.adm.Close() + s.adm = nil + } + if s.cl != nil { + s.cl.Close() + s.cl = nil + } + return nil +} + +func (s *brokerScraperFranz) ensureClients(ctx context.Context) error { + if s.cl != nil && s.adm != nil { + return nil + } + adm, cl, err := kafka.NewFranzClusterAdminClient(ctx, s.config.ClientConfig, s.settings.Logger) + if err != nil { + return fmt.Errorf("failed to create franz-go admin client: %w", err) + } + s.adm = adm + s.cl = cl + return nil +} + +func (s *brokerScraperFranz) scrape(ctx context.Context) (pmetric.Metrics, error) { + scrapeErrs := scrapererror.ScrapeErrors{} + + if err := s.ensureClients(ctx); err != nil { + return pmetric.Metrics{}, err + } + + now := pcommon.NewTimestampFromTime(time.Now()) + rb := s.mb.NewResourceBuilder() + rb.SetKafkaClusterAlias(s.config.ClusterAlias) + + // ---- brokers count ---- + bdetails, err := s.adm.ListBrokers(ctx) + if err != nil { + // If we cannot list brokers, emit what we have (resource attrs) and return the error + scrapeErrs.Add(err) + return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine() + } + brokerIDs := bdetails.NodeIDs() + s.mb.RecordKafkaBrokersDataPoint(now, int64(len(brokerIDs))) + + // If log retention metric is disabled, we are done. + if !s.config.Metrics.KafkaBrokerLogRetentionPeriod.Enabled { + return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine() + } + + res, err := s.adm.DescribeBrokerConfigs(ctx, brokerIDs...) + if err != nil { + s.settings.Logger.Warn("franz-go: DescribeBrokerConfigs failed", zap.Error(err)) + scrapeErrs.AddPartial(len(brokerIDs), fmt.Errorf("DescribeBrokerConfigs: %w", err)) + } + + // Iterate the result and record the metric for each broker entry we can parse. + for _, bid := range brokerIDs { + bidStr := strconv.Itoa(int(bid)) + + // Look up this broker's config set by resource name (broker id as string). + cfg, _ := res.On(bidStr, nil) // fn can be nil to just return the entry + if cfg.Err != nil { + scrapeErrs.AddPartial(1, fmt.Errorf("broker %s: %w", bidStr, cfg.Err)) + continue + } + + for _, kv := range cfg.Configs { + // kadm.Config has Key and MaybeValue() for the string value. + // We only care about log.retention.hours here. + if kv.Key != logRetentionHours { + continue + } + raw := kv.MaybeValue() + hrs, convErr := strconv.Atoi(raw) + if convErr != nil { + scrapeErrs.AddPartial(1, fmt.Errorf("broker %s: cannot parse %s=%q: %w", bidStr, logRetentionHours, raw, convErr)) + continue + } + sec := int64(hrs) * 3600 + s.mb.RecordKafkaBrokerLogRetentionPeriodDataPoint(now, sec, bidStr) + } + } + + return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine() +} + +// factory for franz-go scraper (internal; selected via gate at the call site later) +func createBrokerScraperFranz(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) { + s := &brokerScraperFranz{ + settings: settings, + config: cfg, + } + return scraper.NewMetrics( + s.scrape, + scraper.WithStart(s.start), + scraper.WithShutdown(s.shutdown), + ) +} diff --git a/receiver/kafkametricsreceiver/broker_scraper_franz_test.go b/receiver/kafkametricsreceiver/broker_scraper_franz_test.go new file mode 100644 index 0000000000000..72795d140a64f --- /dev/null +++ b/receiver/kafkametricsreceiver/broker_scraper_franz_test.go @@ -0,0 +1,152 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkametricsreceiver + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/scraper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/kafkatest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" +) + +// helper to make a minimal franz config for tests +func franzTestConfig(t *testing.T) Config { + t.Helper() + _, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, "meta-topic")) + cfg := Config{ + ClientConfig: clientCfg, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + ClusterAlias: "test-cluster", + } + // keep retention metric disabled here (kfake does not expose broker config values) + cfg.Metrics.KafkaBrokerLogRetentionPeriod.Enabled = false + return cfg +} + +func TestBrokerScraperFranz_CreateStartScrapeShutdown(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTestConfig(t) + + // create franz-backed scraper directly (unit test) + var s scraper.Metrics + var err error + + s, err = createBrokerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + // Start + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + + // Scrape + md, err := s.ScrapeMetrics(t.Context()) + require.NoError(t, err) + + // Validate resource + at least the brokers count metric exists + require.Equal(t, 1, md.ResourceMetrics().Len()) + rm := md.ResourceMetrics().At(0) + sm := rm.ScopeMetrics() + require.Equal(t, 1, sm.Len()) + ms := sm.At(0).Metrics() + + var foundBrokers bool + for i := 0; i < ms.Len(); i++ { + m := ms.At(i) + if m.Name() == "kafka.brokers" { + foundBrokers = true + // With one kfake cluster, we should have >=1 broker; exact count comes from kfake + require.GreaterOrEqual(t, m.Sum().DataPoints().At(0).IntValue(), int64(1)) + break + } + } + require.True(t, foundBrokers, "expected kafka.brokers metric") + + // Shutdown + require.NoError(t, s.Shutdown(t.Context())) +} + +func TestBrokerScraperFranz_EmptyClusterAlias(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTestConfig(t) + cfg.ClusterAlias = "" // ensure empty alias behaves like Sarama test + + s, err := createBrokerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + md, err := s.ScrapeMetrics(t.Context()) + require.NoError(t, err) + + require.Equal(t, 1, md.ResourceMetrics().Len()) + rm := md.ResourceMetrics().At(0) + + // attribute should be absent when alias is empty + _, ok := rm.Resource().Attributes().Get("kafka.cluster.alias") + require.False(t, ok) + + require.NoError(t, s.Shutdown(t.Context())) +} + +func TestBrokerScraperFranz_Create(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTestConfig(t) + s, err := createBrokerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) +} + +func TestBrokerScraperFranz_Start(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTestConfig(t) + s, err := createBrokerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + require.NoError(t, s.Shutdown(t.Context())) +} + +// func TestBrokerScraperFranz_ScrapeHandlesClientError(t *testing.T) { +// setFranzGo(t, true) + +// // stub the ctor to fail +// orig := newFranzAdminClient +// t.Cleanup(func() { newFranzAdminClient = orig }) +// newFranzAdminClient = func(ctx context.Context, cfg configkafka.ClientConfig, lg *zap.Logger, opts ...kgo.Opt) (*kadm.Client, *kgo.Client, error) { +// return nil, nil, errors.New("new franz admin failed") +// } + +// cfg := franzTestConfig(t) +// s, err := createBrokerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) +// require.NoError(t, err) + +// _, err = s.ScrapeMetrics(t.Context()) +// require.Error(t, err) +// } + +func TestBrokerScraperFranz_ShutdownWithoutStart_OK(t *testing.T) { + setFranzGo(t, true) + + // create a legit cfg but never start + _, _ = kafkatest.NewCluster(t, kfake.SeedTopics(1, "meta-topic")) + cfg := franzTestConfig(t) + + s, err := createBrokerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + // should not panic / should return nil + require.NoError(t, s.Shutdown(t.Context())) +} diff --git a/receiver/kafkametricsreceiver/consumer_scraper_franz.go b/receiver/kafkametricsreceiver/consumer_scraper_franz.go new file mode 100644 index 0000000000000..3a7df23c8bf03 --- /dev/null +++ b/receiver/kafkametricsreceiver/consumer_scraper_franz.go @@ -0,0 +1,196 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver" + +import ( + "context" + "fmt" + "regexp" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/scraper" + "go.uber.org/multierr" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" +) + +type consumerScraperFranz struct { + adm *kadm.Client + cl *kgo.Client + + settings receiver.Settings + groupFilter *regexp.Regexp + topicFilter *regexp.Regexp + config Config + mb *metadata.MetricsBuilder +} + +func (s *consumerScraperFranz) start(_ context.Context, _ component.Host) error { + s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings) + return nil +} + +func (s *consumerScraperFranz) shutdown(_ context.Context) error { + if s.adm != nil { + s.adm.Close() + s.adm = nil + } + if s.cl != nil { + s.cl.Close() + s.cl = nil + } + return nil +} + +func (s *consumerScraperFranz) ensureClients(ctx context.Context) error { + if s.adm != nil && s.cl != nil { + return nil + } + adm, cl, err := kafka.NewFranzClusterAdminClient(ctx, s.config.ClientConfig, s.settings.Logger) + if err != nil { + return fmt.Errorf("failed to create franz-go admin client: %w", err) + } + s.adm = adm + s.cl = cl + return nil +} + +func (s *consumerScraperFranz) scrape(ctx context.Context) (pmetric.Metrics, error) { + if err := s.ensureClients(ctx); err != nil { + return pmetric.Metrics{}, err + } + + var scrapeErr error + + // 1) list & filter groups + lgs, err := s.adm.ListGroups(ctx) + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("franz-go: ListGroups failed: %w", err) + } + var matchedGrpIDs []string + for _, g := range lgs.Sorted() { + if s.groupFilter.MatchString(g.Group) { + matchedGrpIDs = append(matchedGrpIDs, g.Group) + } + } + + // 2) list & filter topics + td, err := s.adm.ListTopics(ctx) // non-internal only, same as sarama default + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("franz-go: ListTopics failed: %w", err) + } + var matchedTopics []string + for t := range td { + if s.topicFilter.MatchString(t) { + matchedTopics = append(matchedTopics, t) + } + } + + // 3) compute partition list + end offsets for matched topics + endOffsets, err := s.adm.ListEndOffsets(ctx, matchedTopics...) + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("franz-go: ListEndOffsets failed: %w", err) + } + // Build helpers equivalent to Sarama path + topicPartitions := make(map[string][]int32, len(matchedTopics)) + topicPartitionOffset := make(map[string]map[int32]int64, len(matchedTopics)) + endOffsets.Each(func(lo kadm.ListedOffset) { + // lo.Topic, lo.Partition, lo.Offset + if _, ok := topicPartitions[lo.Topic]; !ok { + topicPartitions[lo.Topic] = []int32{} + } + if _, ok := topicPartitionOffset[lo.Topic]; !ok { + topicPartitionOffset[lo.Topic] = map[int32]int64{} + } + topicPartitions[lo.Topic] = append(topicPartitions[lo.Topic], lo.Partition) + topicPartitionOffset[lo.Topic][lo.Partition] = lo.Offset + }) + + // 4) describe groups for member counts + dgs, err := s.adm.DescribeGroups(ctx, matchedGrpIDs...) + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("franz-go: DescribeGroups failed: %w", err) + } + + now := pcommon.NewTimestampFromTime(time.Now()) + + // 5) per group: fetch committed offsets for matched topics and compute metrics + gs := dgs.Sorted() + for i := range gs { + g := &gs[i] + grpID := g.Group + s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(g.Members)), grpID) + + offsets, ferr := s.adm.FetchOffsetsForTopics(ctx, grpID, matchedTopics...) + if ferr != nil { + scrapeErr = multierr.Append(scrapeErr, fmt.Errorf("franz-go: FetchOffsetsForTopics(%s) failed: %w", grpID, ferr)) + continue + } + + for topic, parts := range topicPartitions { + isConsumed := false + var lagSum int64 + var offsetSum int64 + + for _, p := range parts { + consumerOffset := int64(-1) + if or, ok := offsets.Lookup(topic, p); ok && or.Err == nil { + consumerOffset = or.At + } + offsetSum += consumerOffset + s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, consumerOffset, grpID, topic, int64(p)) + + var consumerLag int64 = -1 + if consumerOffset != -1 { + isConsumed = true + if end, ok := topicPartitionOffset[topic][p]; ok { + consumerLag = end - consumerOffset + lagSum += consumerLag + } + } + s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, grpID, topic, int64(p)) + } + + if isConsumed { + s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, grpID, topic) + s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, grpID, topic) + } + } + } + + rb := s.mb.NewResourceBuilder() + rb.SetKafkaClusterAlias(s.config.ClusterAlias) + + return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErr +} + +// Factory helper for franz-go path (selected under the feature gate later). +func createConsumerScraperFranz(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) { + groupFilter, err := regexp.Compile(cfg.GroupMatch) + if err != nil { + return nil, fmt.Errorf("failed to compile group_match: %w", err) + } + topicFilter, err := regexp.Compile(cfg.TopicMatch) + if err != nil { + return nil, fmt.Errorf("failed to compile topic filter: %w", err) + } + s := &consumerScraperFranz{ + settings: settings, + groupFilter: groupFilter, + topicFilter: topicFilter, + config: cfg, + } + return scraper.NewMetrics( + s.scrape, + scraper.WithStart(s.start), + scraper.WithShutdown(s.shutdown), + ) +} diff --git a/receiver/kafkametricsreceiver/consumer_scraper_franz_test.go b/receiver/kafkametricsreceiver/consumer_scraper_franz_test.go new file mode 100644 index 0000000000000..f758f167be362 --- /dev/null +++ b/receiver/kafkametricsreceiver/consumer_scraper_franz_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkametricsreceiver + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/scraper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/kafkatest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" +) + +// franz test config helper: one seed topic, permissive filters +func franzConsumerTestConfig(t *testing.T) Config { + t.Helper() + _, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, "meta-topic")) + cfg := Config{ + ClientConfig: clientCfg, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + ClusterAlias: "test-cluster-franz", + // Use permissive filters so ListEndOffsets/ListStartOffsets have topics. + TopicMatch: ".*", + GroupMatch: ".*", + } + return cfg +} + +func TestConsumerScraperFranz_CreateStartScrapeShutdown(t *testing.T) { + // Feature gate not required when calling createConsumerScraperFranz directly, + // but enabling keeps parity with receiver selection tests later. + setFranzGo(t, true) + + cfg := franzConsumerTestConfig(t) + + var s scraper.Metrics + var err error + + s, err = createConsumerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + + // Scrape should succeed even if there are no consumer groups in kfake. + md, err := s.ScrapeMetrics(t.Context()) + require.NoError(t, err) + require.NotNil(t, md) + // With kfake, there may be zero consumer groups; allow zero metrics. + require.GreaterOrEqual(t, md.DataPointCount(), 0) + + require.NoError(t, s.Shutdown(t.Context())) +} + +func TestConsumerScraperFranz_InvalidTopicRegex(t *testing.T) { + setFranzGo(t, true) + cfg := franzConsumerTestConfig(t) + cfg.TopicMatch = "[" // invalid regex + + s, err := createConsumerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.Error(t, err) + require.Nil(t, s) +} + +func TestConsumerScraperFranz_InvalidGroupRegex(t *testing.T) { + setFranzGo(t, true) + cfg := franzConsumerTestConfig(t) + cfg.GroupMatch = "[" // invalid regex + + s, err := createConsumerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.Error(t, err) + require.Nil(t, s) +} + +func TestConsumerScraperFranz_ShutdownWithoutStart_OK(t *testing.T) { + setFranzGo(t, true) + + cfg := franzConsumerTestConfig(t) + + s, err := createConsumerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + // Shutdown prior to Start should be a no-op + require.NoError(t, s.Shutdown(t.Context())) +} + +func TestConsumerScraperFranz_EmptyClusterAlias(t *testing.T) { + setFranzGo(t, true) + + cfg := franzConsumerTestConfig(t) + cfg.ClusterAlias = "" + + s, err := createConsumerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + md, err := s.ScrapeMetrics(t.Context()) + require.NoError(t, err) + + // With kfake there may be zero consumer groups → zero RMs. Only assert the alias when one exists. + if md.ResourceMetrics().Len() > 0 { + rm := md.ResourceMetrics().At(0) + _, ok := rm.Resource().Attributes().Get("kafka.cluster.alias") + require.False(t, ok) + } + + require.NoError(t, s.Shutdown(t.Context())) +} + +// (Optional) If you want a direct filter parity check like the Sarama unit did: +func TestConsumerScraperFranz_FilterCompilesLikeSarama(t *testing.T) { + setFranzGo(t, true) + + // Just prove that a typical defaultGroupMatch compiles and can be set. + // (Your Sarama tests reference defaultGroupMatch; here we mimic that behavior.) + r := regexp.MustCompile(defaultGroupMatch) + require.NotNil(t, r) +} diff --git a/receiver/kafkametricsreceiver/go.mod b/receiver/kafkametricsreceiver/go.mod index 368a262918569..b6ea7b0737743 100644 --- a/receiver/kafkametricsreceiver/go.mod +++ b/receiver/kafkametricsreceiver/go.mod @@ -8,11 +8,15 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.136.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka v0.136.0 github.com/stretchr/testify v1.11.1 + github.com/twmb/franz-go v1.19.5 + github.com/twmb/franz-go/pkg/kadm v1.16.1 + github.com/twmb/franz-go/pkg/kfake v0.0.0-20250729165834-29dc44e616cd go.opentelemetry.io/collector/component v1.42.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/component/componenttest v0.136.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/confmap v1.42.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/consumer v1.42.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/consumer/consumertest v0.136.1-0.20251002223229-5ec1466578ef + go.opentelemetry.io/collector/featuregate v1.42.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/filter v0.136.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/pdata v1.42.1-0.20251002223229-5ec1466578ef go.opentelemetry.io/collector/receiver v1.42.1-0.20251002223229-5ec1466578ef @@ -73,7 +77,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect github.com/stretchr/objx v0.5.2 // indirect - github.com/twmb/franz-go v1.19.5 // indirect github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect @@ -86,7 +89,6 @@ require ( go.opentelemetry.io/collector/config/configtls v1.42.1-0.20251002223229-5ec1466578ef // indirect go.opentelemetry.io/collector/consumer/consumererror v0.136.1-0.20251002223229-5ec1466578ef // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.136.1-0.20251002223229-5ec1466578ef // indirect - go.opentelemetry.io/collector/featuregate v1.42.1-0.20251002223229-5ec1466578ef // indirect go.opentelemetry.io/collector/internal/telemetry v0.136.1-0.20251002223229-5ec1466578ef // indirect go.opentelemetry.io/collector/pdata/pprofile v0.136.1-0.20251002223229-5ec1466578ef // indirect go.opentelemetry.io/collector/pipeline v1.42.1-0.20251002223229-5ec1466578ef // indirect diff --git a/receiver/kafkametricsreceiver/go.sum b/receiver/kafkametricsreceiver/go.sum index 29ef6a959abde..b551efe837a40 100644 --- a/receiver/kafkametricsreceiver/go.sum +++ b/receiver/kafkametricsreceiver/go.sum @@ -142,6 +142,8 @@ github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD github.com/twmb/franz-go v1.7.0/go.mod h1:PMze0jNfNghhih2XHbkmTFykbMF5sJqmNJB31DOOzro= github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= +github.com/twmb/franz-go/pkg/kadm v1.16.1 h1:IEkrhTljgLHJ0/hT/InhXGjPdmWfFvxp7o/MR7vJ8cw= +github.com/twmb/franz-go/pkg/kadm v1.16.1/go.mod h1:Ue/ye1cc9ipsQFg7udFbbGiFNzQMqiH73fGC2y0rwyc= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250729165834-29dc44e616cd h1:NFxge3WnAb3kSHroE2RAlbFBCb1ED2ii4nQ0arr38Gs= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250729165834-29dc44e616cd/go.mod h1:udxwmMC3r4xqjwrSrMi8p9jpqMDNpC2YwexpDSUmQtw= github.com/twmb/franz-go/pkg/kmsg v1.2.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index 45755dd5194f0..c81aae292df9a 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -15,6 +15,7 @@ import ( "github.com/IBM/sarama" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/scraper" "go.opentelemetry.io/collector/scraper/scraperhelper" @@ -23,13 +24,28 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) +// franzGoFeatureGateName is the name of the feature gate for franz-go +const franzGoFeatureGateName = "receiver.kafkametricsreceiver.UseFranzGo" + +// franzGoFeatureGate is a feature gate that controls whether the Kafka receiver +// uses the franz-go client or the Sarama client for consuming messages. When enabled, +// the Kafka receiver will use the franz-go client, which is more performant and has +// better support for modern Kafka features. +var franzGoFeatureGate = featuregate.GlobalRegistry().MustRegister( + franzGoFeatureGateName, featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, the Kafka Metrics receiver will use the franz-go client to connect to Kafka."), + featuregate.WithRegisterFromVersion("v0.137.0"), +) + type createKafkaScraper func(context.Context, Config, receiver.Settings) (scraper.Metrics, error) var ( brokersScraperType = component.MustNewType("brokers") topicsScraperType = component.MustNewType("topics") consumersScraperType = component.MustNewType("consumers") - allScrapers = map[string]createKafkaScraper{ + + // Default (Sarama) factories; franz-go selection happens in scrapersForCurrentGate(). + allScrapers = map[string]createKafkaScraper{ brokersScraperType.String(): createBrokerScraper, topicsScraperType.String(): createTopicsScraper, consumersScraperType.String(): createConsumerScraper, @@ -39,23 +55,41 @@ var ( newClusterAdmin = sarama.NewClusterAdminFromClient ) +// scrapersForCurrentGate returns the appropriate scraper factory map +// depending on whether the franz-go feature gate is enabled. +func scrapersForCurrentGate() map[string]createKafkaScraper { + if franzGoFeatureGate.IsEnabled() { + // Use franz-go implementations + return map[string]createKafkaScraper{ + brokersScraperType.String(): createBrokerScraperFranz, + topicsScraperType.String(): createTopicsScraperFranz, + consumersScraperType.String(): createConsumerScraperFranz, + } + } + // Fall back to Sarama implementations + return allScrapers +} + var newMetricsReceiver = func( ctx context.Context, config Config, params receiver.Settings, consumer consumer.Metrics, ) (receiver.Metrics, error) { + // Choose scrapers according to the feature gate at receiver construction time. + activeScrapers := scrapersForCurrentGate() + scraperControllerOptions := make([]scraperhelper.ControllerOption, 0, len(config.Scrapers)) - for _, scraper := range config.Scrapers { - if s, ok := allScrapers[scraper]; ok { - s, err := s(ctx, config, params) + for _, key := range config.Scrapers { + if factory, ok := activeScrapers[key]; ok { + s, err := factory(ctx, config, params) if err != nil { return nil, err } scraperControllerOptions = append(scraperControllerOptions, scraperhelper.AddScraper(metadata.Type, s)) continue } - return nil, fmt.Errorf("no scraper found for key: %s", scraper) + return nil, fmt.Errorf("no scraper found for key: %s", key) } return scraperhelper.NewMetricsController( diff --git a/receiver/kafkametricsreceiver/receiver_test.go b/receiver/kafkametricsreceiver/receiver_test.go index f5fe5f94f5fb1..6d9948e2fd3f3 100644 --- a/receiver/kafkametricsreceiver/receiver_test.go +++ b/receiver/kafkametricsreceiver/receiver_test.go @@ -6,10 +6,13 @@ package kafkametricsreceiver import ( "context" "errors" + "reflect" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -18,6 +21,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) +func setFranzGo(tb testing.TB, value bool) { + currentFranzState := franzGoFeatureGate.IsEnabled() + require.NoError(tb, featuregate.GlobalRegistry().Set(franzGoFeatureGate.ID(), value)) + tb.Cleanup(func() { + require.NoError(tb, featuregate.GlobalRegistry().Set(franzGoFeatureGate.ID(), currentFranzState)) + }) +} + func TestNewReceiver_invalid_scraper_error(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers", "cpu"} @@ -69,3 +80,57 @@ func TestNewReceiver_handles_scraper_error(t *testing.T) { assert.Error(t, err) assert.Nil(t, r) } + +func TestReceiver_UsesSaramaWhenFranzDisabled(t *testing.T) { + setFranzGo(t, false) + + m := scrapersForCurrentGate() + require.NotNil(t, m) + + saramaBrokers := allScrapers[brokersScraperType.String()] + saramaTopics := allScrapers[topicsScraperType.String()] + saramaConsumers := allScrapers[consumersScraperType.String()] + + require.Equal(t, + reflect.ValueOf(saramaBrokers).Pointer(), + reflect.ValueOf(m[brokersScraperType.String()]).Pointer(), + "brokers scraper should match allScrapers when gate is disabled", + ) + require.Equal(t, + reflect.ValueOf(saramaTopics).Pointer(), + reflect.ValueOf(m[topicsScraperType.String()]).Pointer(), + "topics scraper should match allScrapers when gate is disabled", + ) + require.Equal(t, + reflect.ValueOf(saramaConsumers).Pointer(), + reflect.ValueOf(m[consumersScraperType.String()]).Pointer(), + "consumers scraper should match allScrapers when gate is disabled", + ) +} + +func TestReceiver_UsesFranzWhenFranzEnabled(t *testing.T) { + setFranzGo(t, true) + + m := scrapersForCurrentGate() + require.NotNil(t, m) + + franzBrokers := allScrapers[brokersScraperType.String()] + franzTopics := allScrapers[topicsScraperType.String()] + franzConsumers := allScrapers[consumersScraperType.String()] + + require.NotEqual(t, + reflect.ValueOf(franzBrokers).Pointer(), + reflect.ValueOf(m[brokersScraperType.String()]).Pointer(), + "brokers scraper should NOT be the Sarama default when gate is enabled", + ) + require.NotEqual(t, + reflect.ValueOf(franzTopics).Pointer(), + reflect.ValueOf(m[topicsScraperType.String()]).Pointer(), + "topics scraper should NOT be the Sarama default when gate is enabled", + ) + require.NotEqual(t, + reflect.ValueOf(franzConsumers).Pointer(), + reflect.ValueOf(m[consumersScraperType.String()]).Pointer(), + "consumers scraper should NOT be the Sarama default when gate is enabled", + ) +} diff --git a/receiver/kafkametricsreceiver/topic_scraper_franz.go b/receiver/kafkametricsreceiver/topic_scraper_franz.go new file mode 100644 index 0000000000000..036d7b9c4de36 --- /dev/null +++ b/receiver/kafkametricsreceiver/topic_scraper_franz.go @@ -0,0 +1,223 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver" + +import ( + "context" + "fmt" + "regexp" + "strconv" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kgo" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" +) + +type topicScraperFranz struct { + adm *kadm.Client + cl *kgo.Client + + settings receiver.Settings + topicFilter *regexp.Regexp + config Config + mb *metadata.MetricsBuilder +} + +func (s *topicScraperFranz) start(_ context.Context, _ component.Host) error { + s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings) + return nil +} + +func (s *topicScraperFranz) shutdown(context.Context) error { + if s.adm != nil { + s.adm.Close() + s.adm = nil + } + if s.cl != nil { + s.cl.Close() + s.cl = nil + } + return nil +} + +func (s *topicScraperFranz) ensureClients(ctx context.Context) error { + if s.adm != nil && s.cl != nil { + return nil + } + adm, cl, err := kafka.NewFranzClusterAdminClient(ctx, s.config.ClientConfig, s.settings.Logger) + if err != nil { + return fmt.Errorf("failed to create franz-go admin client: %w", err) + } + s.adm = adm + s.cl = cl + return nil +} + +func (s *topicScraperFranz) scrape(ctx context.Context) (pmetric.Metrics, error) { + if err := s.ensureClients(ctx); err != nil { + return pmetric.Metrics{}, err + } + + scrapeErrs := scrapererror.ScrapeErrors{} + now := pcommon.NewTimestampFromTime(time.Now()) + + // 1) list topics (with metadata details) + td, err := s.adm.ListTopics(ctx) + if err != nil { + s.settings.Logger.Error("franz-go: ListTopics failed", zap.Error(err)) + return pmetric.Metrics{}, fmt.Errorf("franz-go: ListTopics failed: %w", err) + } + + // filter topic names first + var matched []string + for name := range td { + if s.topicFilter.MatchString(name) { + matched = append(matched, name) + } + } + + // 2) offsets for matched topics (newest & oldest) + endOffs, err := s.adm.ListEndOffsets(ctx, matched...) + if err != nil { + scrapeErrs.Add(fmt.Errorf("franz-go: ListEndOffsets failed: %w", err)) + } + startOffs, err := s.adm.ListStartOffsets(ctx, matched...) + if err != nil { + scrapeErrs.Add(fmt.Errorf("franz-go: ListStartOffsets failed: %w", err)) + } + + // 3) per-topic configs & replication factor + if s.config.Metrics.KafkaTopicLogRetentionPeriod.Enabled || + s.config.Metrics.KafkaTopicLogRetentionSize.Enabled || + s.config.Metrics.KafkaTopicMinInsyncReplicas.Enabled || + s.config.Metrics.KafkaTopicReplicationFactor.Enabled { + // replication factor: derive from first partition's replica count + if s.config.Metrics.KafkaTopicReplicationFactor.Enabled { + for _, topic := range matched { + if det, ok := td[topic]; ok { + var rf int + for _, pd := range det.Partitions { + rf = len(pd.Replicas) + break // first partition is enough; RF should be consistent + } + if rf > 0 { + s.mb.RecordKafkaTopicReplicationFactorDataPoint(now, int64(rf), topic) + } + } + } + } + + rcs, derr := s.adm.DescribeTopicConfigs(ctx, matched...) + if derr != nil { + s.settings.Logger.Warn("franz-go: DescribeTopicConfigs failed", zap.Error(derr)) + scrapeErrs.AddPartial(len(matched), fmt.Errorf("DescribeTopicConfigs: %w", derr)) + } else { + for _, topic := range matched { + rc, _ := rcs.On(topic, nil) + if rc.Err != nil { + scrapeErrs.AddPartial(1, fmt.Errorf("topic %s: %w", topic, rc.Err)) + continue + } + for _, kv := range rc.Configs { + switch kv.Key { + case minInsyncReplicas: + if s.config.Metrics.KafkaTopicMinInsyncReplicas.Enabled { + if v, err := strconv.Atoi(kv.MaybeValue()); err == nil { + s.mb.RecordKafkaTopicMinInsyncReplicasDataPoint(now, int64(v), topic) + } else { + scrapeErrs.AddPartial(1, fmt.Errorf("topic %s: parse %s=%q: %w", topic, minInsyncReplicas, kv.MaybeValue(), err)) + } + } + case retentionMs: + if s.config.Metrics.KafkaTopicLogRetentionPeriod.Enabled { + if v, err := strconv.Atoi(kv.MaybeValue()); err == nil { + // seconds = ms / 1000 + s.mb.RecordKafkaTopicLogRetentionPeriodDataPoint(now, int64(v/1000), topic) + } else { + scrapeErrs.AddPartial(1, fmt.Errorf("topic %s: parse %s=%q: %w", topic, retentionMs, kv.MaybeValue(), err)) + } + } + case retentionBytes: + if s.config.Metrics.KafkaTopicLogRetentionSize.Enabled { + if v, err := strconv.Atoi(kv.MaybeValue()); err == nil { + s.mb.RecordKafkaTopicLogRetentionSizeDataPoint(now, int64(v), topic) + } else { + scrapeErrs.AddPartial(1, fmt.Errorf("topic %s: parse %s=%q: %w", topic, retentionBytes, kv.MaybeValue(), err)) + } + } + } + } + } + } + } + + // 4) per-topic partitions & per-partition metrics + for _, topic := range matched { + det, ok := td[topic] + if !ok { + continue + } + // partitions count + s.mb.RecordKafkaTopicPartitionsDataPoint(now, int64(len(det.Partitions)), topic) + + // iterate partitions without copying large structs + for pid := range det.Partitions { + pd := det.Partitions[pid] + + // replicas + if s.config.Metrics.KafkaPartitionReplicas.Enabled { + s.mb.RecordKafkaPartitionReplicasDataPoint(now, int64(len(pd.Replicas)), topic, int64(pid)) + } + // in-sync replicas + if s.config.Metrics.KafkaPartitionReplicasInSync.Enabled { + s.mb.RecordKafkaPartitionReplicasInSyncDataPoint(now, int64(len(pd.ISR)), topic, int64(pid)) + } + + // offsets: newest/current and oldest (use .Offset) + if or, ok := endOffs.Lookup(topic, pid); ok && or.Err == nil { + s.mb.RecordKafkaPartitionCurrentOffsetDataPoint(now, or.Offset, topic, int64(pid)) + } else if ok && or.Err != nil { + scrapeErrs.AddPartial(1, fmt.Errorf("topic %s partition %d: end offset error: %w", topic, pid, or.Err)) + } + + if or, ok := startOffs.Lookup(topic, pid); ok && or.Err == nil { + s.mb.RecordKafkaPartitionOldestOffsetDataPoint(now, or.Offset, topic, int64(pid)) + } else if ok && or.Err != nil { + scrapeErrs.AddPartial(1, fmt.Errorf("topic %s partition %d: start offset error: %w", topic, pid, or.Err)) + } + } + } + + rb := s.mb.NewResourceBuilder() + rb.SetKafkaClusterAlias(s.config.ClusterAlias) + return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine() +} + +// Factory helper for franz-go path (selected via feature gate elsewhere). +func createTopicsScraperFranz(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) { + topicFilter, err := regexp.Compile(cfg.TopicMatch) + if err != nil { + return nil, fmt.Errorf("failed to compile topic filter: %w", err) + } + s := &topicScraperFranz{ + settings: settings, + topicFilter: topicFilter, + config: cfg, + } + return scraper.NewMetrics( + s.scrape, + scraper.WithStart(s.start), + scraper.WithShutdown(s.shutdown), + ) +} diff --git a/receiver/kafkametricsreceiver/topic_scraper_franz_test.go b/receiver/kafkametricsreceiver/topic_scraper_franz_test.go new file mode 100644 index 0000000000000..2591c883a71f8 --- /dev/null +++ b/receiver/kafkametricsreceiver/topic_scraper_franz_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkametricsreceiver + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kfake" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/scraper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/kafkatest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" +) + +// minimal franz test config for topics +func franzTopicsTestConfig(t *testing.T) Config { + t.Helper() + _, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, "topic-a")) + cfg := Config{ + ClientConfig: clientCfg, + MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), + ClusterAlias: "franz-topics", + TopicMatch: ".*", // allow our seeded topic + } + return cfg +} + +func TestTopicScraperFranz_CreateStartScrapeShutdown(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTopicsTestConfig(t) + + var s scraper.Metrics + var err error + + s, err = createTopicsScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + + md, err := s.ScrapeMetrics(t.Context()) + require.NoError(t, err) + require.NotNil(t, md) + + // With kfake, depending on metadata & offsets availability, data points may be zero. + // The key here is: scrape does not error. + _ = md // intentionally not asserting counts + + require.NoError(t, s.Shutdown(t.Context())) +} + +func TestTopicScraperFranz_InvalidTopicRegex(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTopicsTestConfig(t) + cfg.TopicMatch = "[" // invalid + + s, err := createTopicsScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.Error(t, err) + require.Nil(t, s) +} + +func TestTopicScraperFranz_EmptyClusterAlias(t *testing.T) { + setFranzGo(t, true) + + cfg := franzTopicsTestConfig(t) + cfg.ClusterAlias = "" + + s, err := createTopicsScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type)) + require.NoError(t, err) + require.NotNil(t, s) + + require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost())) + md, err := s.ScrapeMetrics(t.Context()) + require.NoError(t, err) + + // Only assert alias absence if a resource entry exists. + if md.ResourceMetrics().Len() > 0 { + rm := md.ResourceMetrics().At(0) + _, ok := rm.Resource().Attributes().Get("kafka.cluster.alias") + require.False(t, ok) + } + + require.NoError(t, s.Shutdown(t.Context())) +} + +// Optional parity check: default regex compiles (like Sarama test style) +func TestTopicScraperFranz_FilterCompilesLikeSarama(t *testing.T) { + setFranzGo(t, true) + + r := regexp.MustCompile(defaultTopicMatch) + require.NotNil(t, r) +}