Skip to content

Commit ece880d

Browse files
paulojmdiastommyers-elastic
authored andcommitted
[receiver/kafkametricsreceiver] Add support for using franz-go client under a feature gate (open-telemetry#43019)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR adds franz-go support to the kafkametricsreceiver, behind a feature gate (`receiver.kafkametricsreceiver.UseFranzGo`). <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#41480 <!--Describe what testing was performed and which tests were added.--> #### Testing Tested locally and added tests for trying to cover all the current Sarama ones. <!--Describe the documentation added.--> #### Documentation Updated README.md with reference about opt-in using franz-go library. <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Paulo Dias <[email protected]>
1 parent 48fab2f commit ece880d

File tree

18 files changed

+1140
-9
lines changed

18 files changed

+1140
-9
lines changed

.chloggen/feat_41480.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "receiver/kafkametricsreceiver"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Add support for using franz-go client under a feature gate"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [41480]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ require (
9797
github.com/pierrec/lz4/v4 v4.1.22 // indirect
9898
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
9999
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
100+
github.com/twmb/franz-go/pkg/kadm v1.16.1 // indirect
100101
github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect
101102
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect
102103
github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect

extension/observer/kafkatopicsobserver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ require (
6868
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
6969
github.com/stretchr/objx v0.5.2 // indirect
7070
github.com/twmb/franz-go v1.19.5 // indirect
71+
github.com/twmb/franz-go/pkg/kadm v1.16.1 // indirect
7172
github.com/twmb/franz-go/pkg/kmsg v1.11.2 // indirect
7273
github.com/twmb/franz-go/pkg/sasl/kerberos v1.1.0 // indirect
7374
github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect

extension/observer/kafkatopicsobserver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/kafka/franz_client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
krb5client "github.com/jcmturner/gokrb5/v8/client"
1515
krb5config "github.com/jcmturner/gokrb5/v8/config"
1616
"github.com/jcmturner/gokrb5/v8/keytab"
17+
"github.com/twmb/franz-go/pkg/kadm"
1718
"github.com/twmb/franz-go/pkg/kgo"
1819
"github.com/twmb/franz-go/pkg/kmsg"
1920
"github.com/twmb/franz-go/pkg/kversion"
@@ -179,6 +180,39 @@ func NewFranzConsumerGroup(ctx context.Context, clientCfg configkafka.ClientConf
179180
return kgo.NewClient(opts...)
180181
}
181182

183+
// NewFranzClient creates a franz-go client using the same commonOpts used for producer/consumer.
184+
func NewFranzClient(
185+
ctx context.Context,
186+
clientCfg configkafka.ClientConfig,
187+
logger *zap.Logger,
188+
opts ...kgo.Opt,
189+
) (*kgo.Client, error) {
190+
opts, err := commonOpts(ctx, clientCfg, logger, opts...)
191+
if err != nil {
192+
return nil, err
193+
}
194+
return kgo.NewClient(opts...)
195+
}
196+
197+
// NewFranzClusterAdminClient creates a kadm admin client from a freshly created franz client.
198+
func NewFranzClusterAdminClient(
199+
ctx context.Context,
200+
clientCfg configkafka.ClientConfig,
201+
logger *zap.Logger,
202+
opts ...kgo.Opt,
203+
) (*kadm.Client, *kgo.Client, error) {
204+
cl, err := NewFranzClient(ctx, clientCfg, logger, opts...)
205+
if err != nil {
206+
return nil, nil, err
207+
}
208+
return kadm.NewClient(cl), cl, nil
209+
}
210+
211+
// NewFranzAdminFromClient returns a kadm admin bound to an existing kgo client.
212+
func NewFranzAdminFromClient(cl *kgo.Client) *kadm.Client {
213+
return kadm.NewClient(cl)
214+
}
215+
182216
func commonOpts(ctx context.Context, clientCfg configkafka.ClientConfig,
183217
logger *zap.Logger,
184218
opts ...kgo.Opt,

internal/kafka/franz_client_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,3 +594,25 @@ func TestFranzClient_ProtocolVersion(t *testing.T) {
594594
})
595595
}
596596
}
597+
598+
func TestNewFranzClient_And_Admin(t *testing.T) {
599+
_, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, "meta-topic"))
600+
tl := zaptest.NewLogger(t, zaptest.Level(zap.WarnLevel))
601+
602+
// Plain client
603+
cl, err := NewFranzClient(t.Context(), clientCfg, tl)
604+
require.NoError(t, err)
605+
t.Cleanup(cl.Close)
606+
607+
// Admin from fresh client
608+
ad, cl2, err := NewFranzClusterAdminClient(t.Context(), clientCfg, tl)
609+
require.NoError(t, err)
610+
t.Cleanup(func() { ad.Close(); cl2.Close() })
611+
612+
// Metadata via admin should return brokers & topic
613+
md, err := ad.Metadata(t.Context(), "meta-topic")
614+
require.NoError(t, err)
615+
assert.NotEmpty(t, md.Brokers)
616+
_, ok := md.Topics["meta-topic"]
617+
assert.True(t, ok)
618+
}

internal/kafka/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ require (
5858
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5959
github.com/rogpeppe/go-internal v1.13.1 // indirect
6060
github.com/twmb/franz-go v1.19.5
61-
github.com/twmb/franz-go/pkg/kadm v1.16.1 // indirect
61+
github.com/twmb/franz-go/pkg/kadm v1.16.1
6262
github.com/twmb/franz-go/pkg/kmsg v1.11.2
6363
github.com/twmb/franz-go/plugin/kzap v1.1.2
6464
github.com/xdg-go/pbkdf2 v1.0.0 // indirect

receiver/kafkametricsreceiver/README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@ This receiver supports Kafka versions:
2424

2525
## Getting Started
2626

27+
> [!NOTE]
28+
> You can opt-in to use [`franz-go`](https://github.com/twmb/franz-go) client by enabling the feature gate
29+
> `receiver.kafkametricsreceiver.UseFranzGo` when you run the OpenTelemetry Collector. See the following page
30+
> for more details: [Feature Gates](https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate#controlling-gates)
31+
2732
Required settings (no defaults):
2833

2934
- `scrapers`: any combination of the following scrapers can be enabled.
3035
- `topics`
3136
- `consumers`
3237
- `brokers`
33-
38+
3439
Metrics collected by the associated scraper are listed in [metadata.yaml](metadata.yaml)
3540

3641
Optional Settings (with defaults):
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver"
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"strconv"
10+
"time"
11+
12+
"github.com/twmb/franz-go/pkg/kadm"
13+
"github.com/twmb/franz-go/pkg/kgo"
14+
"go.opentelemetry.io/collector/component"
15+
"go.opentelemetry.io/collector/pdata/pcommon"
16+
"go.opentelemetry.io/collector/pdata/pmetric"
17+
"go.opentelemetry.io/collector/receiver"
18+
"go.opentelemetry.io/collector/scraper"
19+
"go.opentelemetry.io/collector/scraper/scrapererror"
20+
"go.uber.org/zap"
21+
22+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
23+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
24+
)
25+
26+
type brokerScraperFranz struct {
27+
// franz-go handles (lazy created on first scrape)
28+
adm *kadm.Client
29+
cl *kgo.Client
30+
31+
settings receiver.Settings
32+
config Config
33+
mb *metadata.MetricsBuilder
34+
}
35+
36+
func (s *brokerScraperFranz) start(_ context.Context, _ component.Host) error {
37+
s.mb = metadata.NewMetricsBuilder(s.config.MetricsBuilderConfig, s.settings)
38+
return nil
39+
}
40+
41+
func (s *brokerScraperFranz) shutdown(context.Context) error {
42+
if s.adm != nil {
43+
s.adm.Close()
44+
s.adm = nil
45+
}
46+
if s.cl != nil {
47+
s.cl.Close()
48+
s.cl = nil
49+
}
50+
return nil
51+
}
52+
53+
func (s *brokerScraperFranz) ensureClients(ctx context.Context) error {
54+
if s.cl != nil && s.adm != nil {
55+
return nil
56+
}
57+
adm, cl, err := kafka.NewFranzClusterAdminClient(ctx, s.config.ClientConfig, s.settings.Logger)
58+
if err != nil {
59+
return fmt.Errorf("failed to create franz-go admin client: %w", err)
60+
}
61+
s.adm = adm
62+
s.cl = cl
63+
return nil
64+
}
65+
66+
func (s *brokerScraperFranz) scrape(ctx context.Context) (pmetric.Metrics, error) {
67+
scrapeErrs := scrapererror.ScrapeErrors{}
68+
69+
if err := s.ensureClients(ctx); err != nil {
70+
return pmetric.Metrics{}, err
71+
}
72+
73+
now := pcommon.NewTimestampFromTime(time.Now())
74+
rb := s.mb.NewResourceBuilder()
75+
rb.SetKafkaClusterAlias(s.config.ClusterAlias)
76+
77+
// ---- brokers count ----
78+
bdetails, err := s.adm.ListBrokers(ctx)
79+
if err != nil {
80+
// If we cannot list brokers, emit what we have (resource attrs) and return the error
81+
scrapeErrs.Add(err)
82+
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine()
83+
}
84+
brokerIDs := bdetails.NodeIDs()
85+
s.mb.RecordKafkaBrokersDataPoint(now, int64(len(brokerIDs)))
86+
87+
// If log retention metric is disabled, we are done.
88+
if !s.config.Metrics.KafkaBrokerLogRetentionPeriod.Enabled {
89+
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine()
90+
}
91+
92+
res, err := s.adm.DescribeBrokerConfigs(ctx, brokerIDs...)
93+
if err != nil {
94+
s.settings.Logger.Warn("franz-go: DescribeBrokerConfigs failed", zap.Error(err))
95+
scrapeErrs.AddPartial(len(brokerIDs), fmt.Errorf("DescribeBrokerConfigs: %w", err))
96+
}
97+
98+
// Iterate the result and record the metric for each broker entry we can parse.
99+
for _, bid := range brokerIDs {
100+
bidStr := strconv.Itoa(int(bid))
101+
102+
// Look up this broker's config set by resource name (broker id as string).
103+
cfg, _ := res.On(bidStr, nil) // fn can be nil to just return the entry
104+
if cfg.Err != nil {
105+
scrapeErrs.AddPartial(1, fmt.Errorf("broker %s: %w", bidStr, cfg.Err))
106+
continue
107+
}
108+
109+
for _, kv := range cfg.Configs {
110+
// kadm.Config has Key and MaybeValue() for the string value.
111+
// We only care about log.retention.hours here.
112+
if kv.Key != logRetentionHours {
113+
continue
114+
}
115+
raw := kv.MaybeValue()
116+
hrs, convErr := strconv.Atoi(raw)
117+
if convErr != nil {
118+
scrapeErrs.AddPartial(1, fmt.Errorf("broker %s: cannot parse %s=%q: %w", bidStr, logRetentionHours, raw, convErr))
119+
continue
120+
}
121+
sec := int64(hrs) * 3600
122+
s.mb.RecordKafkaBrokerLogRetentionPeriodDataPoint(now, sec, bidStr)
123+
}
124+
}
125+
126+
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine()
127+
}
128+
129+
// factory for franz-go scraper (internal; selected via gate at the call site later)
130+
func createBrokerScraperFranz(_ context.Context, cfg Config, settings receiver.Settings) (scraper.Metrics, error) {
131+
s := &brokerScraperFranz{
132+
settings: settings,
133+
config: cfg,
134+
}
135+
return scraper.NewMetrics(
136+
s.scrape,
137+
scraper.WithStart(s.start),
138+
scraper.WithShutdown(s.shutdown),
139+
)
140+
}

0 commit comments

Comments
 (0)