-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Add Kafka exporter and receiver configuration #5703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
f02e2fc
Add configs for collector and ingester
joeyyy09 a728c2e
Update collector config
joeyyy09 fc7da59
Update ingester config
joeyyy09 53173c5
Add initial kafka_test.go
joeyyy09 b2133cc
Add debug logs in the config
joeyyy09 2ccff53
Update conig files for UI
joeyyy09 c9c32cc
Update conig files
joeyyy09 802413a
Add kafka e2e setup
joeyyy09 819ee21
Add kafka e2e setup
joeyyy09 d02136e
Add Readme for Kafka
joeyyy09 3cf0151
Update cmd/jaeger/internal/integration/README.md
joeyyy09 d09eaa4
Update cmd/jaeger/internal/integration/README.md
joeyyy09 0acf969
Update cmd/jaeger/internal/integration/README.md
joeyyy09 17ebd78
Fixes
joeyyy09 dfcba73
Fix broken syntax
joeyyy09 fe794c1
Add injectStorageCleaner parameter
joeyyy09 6ab1285
fix merge conflicts
joeyyy09 66ef1f9
fix merge conflicts
joeyyy09 4d4a882
Modify E2EStorageIntegration struct
joeyyy09 7cc6a6f
Fixes
joeyyy09 29581a4
Fixes
joeyyy09 580bdc9
Update Lint
joeyyy09 fea62c6
Fix Lint
joeyyy09 284acc9
Add e2eCleanup in e2eInitialize
joeyyy09 4ba5917
Fixes
joeyyy09 06e68ac
Lint fixes
joeyyy09 07f903e
fix ui
yurishkuro 46121f5
Merge branch 'main' into kafka-config
yurishkuro 4d624d6
schedule clean-up after reader/writer are created
yurishkuro d7c70cf
remove duplicate call to cleanup
yurishkuro File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| service: | ||
| pipelines: | ||
| traces: | ||
| receivers: [otlp, jaeger] | ||
| processors: [batch] | ||
| exporters: [kafka] | ||
|
|
||
| receivers: | ||
| otlp: | ||
| protocols: | ||
| grpc: | ||
| http: | ||
| jaeger: | ||
| protocols: | ||
| grpc: | ||
| thrift_binary: | ||
| thrift_compact: | ||
| thrift_http: | ||
|
|
||
| processors: | ||
| batch: | ||
|
|
||
| exporters: | ||
| kafka: | ||
| brokers: | ||
| - localhost:9092 | ||
| topic: "jaeger-spans" | ||
| encoding: otlp_proto |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| service: | ||
| extensions: [jaeger_storage, jaeger_query] | ||
| pipelines: | ||
| traces: | ||
| receivers: [kafka] | ||
| processors: [batch] | ||
| exporters: [jaeger_storage_exporter] | ||
| telemetry: | ||
| metrics: | ||
| address: 0.0.0.0:8889 | ||
| logs: | ||
| level: debug | ||
|
|
||
| extensions: | ||
| jaeger_query: | ||
| trace_storage: some_storage | ||
|
|
||
| jaeger_storage: | ||
| backends: | ||
| some_storage: | ||
| memory: | ||
| max_traces: 100000 | ||
|
|
||
| receivers: | ||
| kafka: | ||
| brokers: | ||
| - localhost:9092 | ||
| topic: "jaeger-spans" | ||
| encoding: otlp_proto | ||
| initial_offset: earliest | ||
|
|
||
| processors: | ||
| batch: | ||
|
|
||
| exporters: | ||
| jaeger_storage_exporter: | ||
| trace_storage: some_storage |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| // Copyright (c) 2024 The Jaeger Authors. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package integration | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "net/http" | ||
| "os" | ||
| "os/exec" | ||
| "path/filepath" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/jaegertracing/jaeger/plugin/storage/integration" | ||
| "github.com/jaegertracing/jaeger/ports" | ||
| ) | ||
|
|
||
| // KafkaCollectorIntegration handles the Jaeger collector with a custom configuration. | ||
| type KafkaCollectorIntegration struct { | ||
| CollectorConfigFile string | ||
| collectorCmd *exec.Cmd | ||
| } | ||
|
|
||
| // e2eInitialize starts the Jaeger collector with the provided config file. | ||
| func (s *KafkaCollectorIntegration) e2eInitialize(t *testing.T) { | ||
| collectorConfigFile := createStorageCleanerConfig(t, s.CollectorConfigFile, "kafka") | ||
|
|
||
| // Start the collector | ||
| t.Logf("Starting Jaeger collector in the background with config file %s", collectorConfigFile) | ||
| s.startProcess(t, "collector", collectorConfigFile) | ||
| } | ||
|
|
||
| func (s *KafkaCollectorIntegration) startProcess(t *testing.T, processType, configFile string) { | ||
joeyyy09 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| outFile, err := os.OpenFile( | ||
| filepath.Join(t.TempDir(), fmt.Sprintf("jaeger_%s_output_logs.txt", processType)), | ||
| os.O_CREATE|os.O_WRONLY, | ||
| os.ModePerm, | ||
| ) | ||
| require.NoError(t, err) | ||
| t.Logf("Writing the Jaeger %s output logs into %s", processType, outFile.Name()) | ||
|
|
||
| errFile, err := os.OpenFile( | ||
| filepath.Join(t.TempDir(), fmt.Sprintf("jaeger_%s_error_logs.txt", processType)), | ||
| os.O_CREATE|os.O_WRONLY, | ||
| os.ModePerm, | ||
| ) | ||
| require.NoError(t, err) | ||
| t.Logf("Writing the Jaeger %s error logs into %s", processType, errFile.Name()) | ||
|
|
||
| cmd := &exec.Cmd{ | ||
| Path: fmt.Sprintf("./cmd/jaeger/jaeger-%s", processType), | ||
| Args: []string{fmt.Sprintf("jaeger-%s", processType), "--config", configFile}, | ||
| Dir: "../../../..", | ||
| Stdout: outFile, | ||
| Stderr: errFile, | ||
| } | ||
| require.NoError(t, cmd.Start()) | ||
|
|
||
| require.Eventually(t, func() bool { | ||
| url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP) | ||
joeyyy09 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| t.Logf("Checking if Jaeger %s is available on %s", processType, url) | ||
| ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) | ||
| defer cancel() | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
| require.NoError(t, err) | ||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| t.Log(err) | ||
| return false | ||
| } | ||
| defer resp.Body.Close() | ||
| return resp.StatusCode == http.StatusOK | ||
| }, 30*time.Second, 500*time.Millisecond, fmt.Sprintf("Jaeger %s did not start", processType)) | ||
| t.Logf("Jaeger %s is ready", processType) | ||
|
|
||
| if processType == "collector" { | ||
| s.collectorCmd = cmd | ||
| } | ||
| } | ||
|
|
||
| // e2eCleanUp stops the processes. | ||
| func (s *KafkaCollectorIntegration) e2eCleanUp(t *testing.T) { | ||
| if s.collectorCmd != nil { | ||
| require.NoError(t, s.collectorCmd.Process.Kill()) | ||
| } | ||
| } | ||
|
|
||
| func TestKafkaStorage(t *testing.T) { | ||
| integration.SkipUnlessEnv(t, "kafka") | ||
|
|
||
| // Initialize and start the collector | ||
| collector := &KafkaCollectorIntegration{ | ||
| CollectorConfigFile: "../../collector-with-kafka.yaml", | ||
| } | ||
| collector.e2eInitialize(t) | ||
| t.Cleanup(func() { | ||
| collector.e2eCleanUp(t) | ||
| }) | ||
|
|
||
| // Reuse E2EStorageIntegration for the ingester | ||
| ingester := &E2EStorageIntegration{ | ||
| ConfigFile: "../../ingester-remote-storage.yaml", | ||
| StorageIntegration: integration.StorageIntegration{ | ||
| CleanUp: purge, | ||
| GetDependenciesReturnsSource: true, | ||
| SkipArchiveTest: true, | ||
| }, | ||
| } | ||
| ingester.e2eInitialize(t, "kafka") | ||
| t.Cleanup(func() { | ||
| ingester.e2eCleanUp(t) | ||
| }) | ||
|
|
||
| // Run the span store tests | ||
| ingester.RunSpanStoreTests(t) | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.