-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Datadog Connector Component #25065
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Datadog Connector Component #25065
Changes from 2 commits
82275b5
4fd7e86
f11b7ed
93da989
54e327a
c4541e0
1c08283
852ef82
806f7fb
84f7252
160032a
9d64c12
dfd0fd7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| include ../../Makefile.Common |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| # Datadog Connector | ||
|
|
||
| ## Description | ||
|
|
||
| The Datadog Connector is a connector component that computes Datadog APM Stats pre-sampling in the event that your traces pipeline is sampled using components such as the tailsamplingprocessor or probabilisticsamplerprocessor. | ||
|
|
||
| The connector is most applicable when using the sampling components such as the [tailsamplingprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor#tail-sampling-processor), or the [probabilisticsamplerprocessor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/probabilisticsamplerprocessor) in one of your pipelines. The sampled pipeline should be duplicated and the `datadog` connector should be added to the the pipeline that is not being sampled to ensure that Datadog APM Stats are accurate in the backend. | ||
|
|
||
| ## Usage | ||
|
|
||
| To use the Datadog Connector, add the connector to one set of the duplicated pipelines while sampling the other. The Datadog Connector will compute APM Stats on all spans that it sees. Here is an example on how to add it to a pipeline using the [probabilisticsampler]: | ||
|
|
||
| <table> | ||
| <tr> | ||
| <td> Before </td> <td> After </td> | ||
| </tr> | ||
| <tr> | ||
| <td valign="top"> | ||
|
|
||
| ```yaml | ||
| # ... | ||
| processors: | ||
| # ... | ||
| probabilistic_sampler: | ||
| sampling_percentage: 20 | ||
| # add the "datadog" processor definition | ||
| datadog: | ||
|
|
||
| exporters: | ||
| datadog: | ||
| api: | ||
| key: ${env:DD_API_KEY} | ||
|
|
||
| service: | ||
| pipelines: | ||
| traces: | ||
| receivers: [otlp] | ||
| # prepend it to the sampler in your pipeline: | ||
| processors: [batch, datadog, probabilistic_sampler] | ||
| exporters: [datadog] | ||
|
|
||
| metrics: | ||
| receivers: [otlp] | ||
| processors: [batch] | ||
| exporters: [datadog] | ||
| ``` | ||
|
|
||
| </td><td valign="top"> | ||
|
|
||
| ```yaml | ||
| # ... | ||
| processors: | ||
| probabilistic_sampler: | ||
| sampling_percentage: 20 | ||
|
|
||
| connectors: | ||
| # add the "datadog" connector definition and further configurations | ||
| datadog/connector: | ||
|
|
||
| exporters: | ||
| datadog: | ||
| api: | ||
| key: ${env:DD_API_KEY} | ||
|
|
||
| service: | ||
| pipelines: | ||
| traces: | ||
| receivers: [otlp] | ||
| processors: [batch] | ||
| exporters: [datadog/connector] | ||
|
|
||
| traces/2: # this pipeline uses sampling | ||
| receivers: [otlp] | ||
| processors: [batch, probabilistic_sampler] | ||
| exporters: [datadog] | ||
|
|
||
| metrics: | ||
| receivers: [datadog/connector] | ||
| processors: [batch] | ||
| exporters: [datadog] | ||
| ``` | ||
| </tr></table> | ||
|
|
||
| Here we have two traces pipelines that ingest the same data but one is being sampled. The one that is sampled has its data sent to the datadog backend for you to see the sampled subset of the total traces sent across. The other non-sampled pipeline of traces sends its data to the metrics pipeline to be used in the APM stats. This unsampled pipeline gives the full picture of how much data the application emits in traces. | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector" | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" | ||
| "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics" | ||
| "go.opentelemetry.io/collector/component" | ||
| "go.opentelemetry.io/collector/consumer" | ||
| "go.opentelemetry.io/collector/pdata/ptrace" | ||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog" | ||
| ) | ||
|
|
||
| // connectorImp is the schema for connector | ||
| type connectorImp struct { | ||
| metricsConsumer consumer.Metrics // the next component in the pipeline to ingest data after connector | ||
| logger *zap.Logger | ||
| started bool | ||
|
|
||
| // agent specifies the agent used to ingest traces and output APM Stats. | ||
| // It is implemented by the traceagent structure; replaced in tests. | ||
| agent datadog.Ingester | ||
|
|
||
| // translator specifies the translator used to transform APM Stats Payloads | ||
| // from the agent to OTLP Metrics. | ||
| translator *metrics.Translator | ||
|
|
||
| // in specifies the channel through which the agent will output Stats Payloads | ||
| // resulting from ingested traces. | ||
| in chan *pb.StatsPayload | ||
|
|
||
| // exit specifies the exit channel, which will be closed upon shutdown. | ||
| exit chan struct{} | ||
| } | ||
|
|
||
| // function to create a new connector | ||
| func newConnector(logger *zap.Logger, _ component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) { | ||
| logger.Info("Building datadog connector") | ||
|
|
||
| in := make(chan *pb.StatsPayload, 100) | ||
| trans, err := metrics.NewTranslator(logger) | ||
|
|
||
| ctx := context.Background() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return &connectorImp{ | ||
| logger: logger, | ||
| agent: datadog.NewAgent(ctx, in), | ||
| translator: trans, | ||
| in: in, | ||
| metricsConsumer: nextConsumer, | ||
| exit: make(chan struct{}), | ||
| }, nil | ||
| } | ||
|
|
||
| // Start implements the component.Component interface. | ||
| func (c *connectorImp) Start(_ context.Context, _ component.Host) error { | ||
| c.logger.Info("Starting datadogconnector") | ||
| c.started = true | ||
| c.agent.Start() | ||
| go c.run() | ||
| return nil | ||
| } | ||
|
|
||
| // Shutdown implements the component.Component interface. | ||
| func (c *connectorImp) Shutdown(context.Context) error { | ||
| c.logger.Info("Shutting down datadog connector") | ||
| c.started = false | ||
| c.agent.Stop() | ||
| c.exit <- struct{}{} // signal exit | ||
| <-c.exit // wait for close | ||
| return nil | ||
| } | ||
|
|
||
| // Capabilities implements the consumer interface. | ||
| // tells use whether the component(connector) will mutate the data passed into it. if set to true the processor does modify the data | ||
| func (c *connectorImp) Capabilities() consumer.Capabilities { | ||
| return consumer.Capabilities{MutatesData: false} | ||
| } | ||
|
|
||
| func (c *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { | ||
| c.agent.Ingest(ctx, traces) | ||
| return nil | ||
| } | ||
|
|
||
| // run awaits incoming stats resulting from the agent's ingestion, converts them | ||
| // to metrics and flushes them using the configured metrics exporter. | ||
| func (c *connectorImp) run() { | ||
| defer close(c.exit) | ||
| for { | ||
| select { | ||
| case stats := <-c.in: | ||
| if len(stats.Stats) == 0 { | ||
| continue | ||
| } | ||
| // APM stats as metrics | ||
| mx := c.translator.StatsPayloadToMetrics(stats) | ||
| ctx := context.TODO() | ||
|
|
||
| // send metrics to the consumer or next component in pipeline | ||
| if err := c.metricsConsumer.ConsumeMetrics(ctx, mx); err != nil { | ||
| c.logger.Error("Failed ConsumeMetrics", zap.Error(err)) | ||
| return | ||
| } | ||
| case <-c.exit: | ||
| return | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package datadogconnector | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "go.opentelemetry.io/collector/component" | ||
| "go.opentelemetry.io/collector/connector/connectortest" | ||
| "go.opentelemetry.io/collector/consumer/consumertest" | ||
| ) | ||
|
|
||
| var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface | ||
gord02 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // create test to create a connector, check that basic code compiles | ||
| func TestNewConnector(t *testing.T) { | ||
|
|
||
| factory := NewFactory() | ||
|
|
||
| creationParams := connectortest.NewNopCreateSettings() | ||
| cfg := factory.CreateDefaultConfig().(*Config) | ||
|
|
||
| // Test | ||
| traceConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop()) | ||
| smc := traceConnector.(*connectorImp) | ||
gord02 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| assert.Nil(t, err) | ||
gord02 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| assert.NotNil(t, smc) // checks if the created connector implements the connectorImp struct | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,48 @@ | ||||
| // Copyright The OpenTelemetry Authors | ||||
| // SPDX-License-Identifier: Apache-2.0 | ||||
|
|
||||
| //go:generate mdatagen metadata.yaml | ||||
|
|
||||
| package datadogconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector" | ||||
|
|
||||
| import ( | ||||
| "context" | ||||
|
|
||||
| "go.opentelemetry.io/collector/component" | ||||
| "go.opentelemetry.io/collector/connector" | ||||
| "go.opentelemetry.io/collector/consumer" | ||||
| ) | ||||
|
|
||||
| const ( | ||||
| // this is the name used to refer to the connector in the config.yaml | ||||
| typeStr = "datadog" | ||||
| ) | ||||
|
|
||||
| // NewFactory creates a factory for tailtracer connector. | ||||
| func NewFactory() connector.Factory { | ||||
| // OTel connector factory to make a factory for connectors | ||||
| return connector.NewFactory( | ||||
| // metadata.Type, | ||||
| typeStr, | ||||
| createDefaultConfig, | ||||
| // connector.WithTracesToMetrics(createTracesToMetricsConnector, metadata.TracesStability)) | ||||
| connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelAlpha)) | ||||
|
||||
| //go:generate mdatagen metadata.yaml |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| module github.com/open-telemetry/opentelemetry-collector-contrib/connector/datadogconnector | ||
|
|
||
| go 1.19 | ||
|
|
||
| require ( | ||
| github.com/DataDog/datadog-agent/pkg/proto v0.48.0-beta.1 | ||
songy23 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.7.0 | ||
| github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog v0.82.0 | ||
| github.com/stretchr/testify v1.8.4 | ||
| go.opentelemetry.io/collector/component v0.82.0 | ||
| go.opentelemetry.io/collector/connector v0.82.0 | ||
| go.opentelemetry.io/collector/consumer v0.82.0 | ||
| go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 | ||
| go.uber.org/zap v1.25.0 | ||
| ) | ||
|
|
||
| require ( | ||
| github.com/DataDog/datadog-agent/pkg/obfuscate v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-agent/pkg/trace v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-agent/pkg/util/cgroups v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-agent/pkg/util/log v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-agent/pkg/util/pointer v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-agent/pkg/util/scrubber v0.48.0-beta.1 // indirect | ||
| github.com/DataDog/datadog-go/v5 v5.1.1 // indirect | ||
| github.com/DataDog/go-tuf v1.0.1-0.5.2 // indirect | ||
| github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.7.0 // indirect | ||
| github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.7.0 // indirect | ||
| github.com/DataDog/sketches-go v1.4.2 // indirect | ||
| github.com/Microsoft/go-winio v0.6.1 // indirect | ||
| github.com/cespare/xxhash/v2 v2.2.0 // indirect | ||
| github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect | ||
| github.com/containerd/cgroups v1.0.4 // indirect | ||
| github.com/coreos/go-systemd/v22 v22.5.0 // indirect | ||
| github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect | ||
| github.com/docker/go-units v0.5.0 // indirect | ||
| github.com/dustin/go-humanize v1.0.1 // indirect | ||
| github.com/go-ole/go-ole v1.2.6 // indirect | ||
| github.com/godbus/dbus/v5 v5.0.6 // indirect | ||
| github.com/gogo/protobuf v1.3.2 // indirect | ||
| github.com/golang/mock v1.6.0 // indirect | ||
| github.com/golang/protobuf v1.5.3 // indirect | ||
| github.com/google/uuid v1.3.0 // indirect | ||
| github.com/json-iterator/go v1.1.12 // indirect | ||
| github.com/karrick/godirwalk v1.17.0 // indirect | ||
| github.com/knadh/koanf v1.5.0 // indirect | ||
| github.com/knadh/koanf/v2 v2.0.1 // indirect | ||
| github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c // indirect | ||
| github.com/mitchellh/copystructure v1.2.0 // indirect | ||
| github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect | ||
| github.com/mitchellh/reflectwalk v1.0.2 // indirect | ||
| github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
| github.com/modern-go/reflect2 v1.0.2 // indirect | ||
| github.com/opencontainers/runtime-spec v1.1.0-rc.3 // indirect | ||
| github.com/outcaste-io/ristretto v0.2.1 // indirect | ||
| github.com/patrickmn/go-cache v2.1.0+incompatible // indirect | ||
| github.com/philhofer/fwd v1.1.2 // indirect | ||
| github.com/pkg/errors v0.9.1 // indirect | ||
| github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect | ||
| github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect | ||
| github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect | ||
| github.com/shirou/gopsutil/v3 v3.23.7 // indirect | ||
| github.com/shoenig/go-m1cpu v0.1.6 // indirect | ||
| github.com/tinylib/msgp v1.1.8 // indirect | ||
| github.com/tklauser/go-sysconf v0.3.11 // indirect | ||
| github.com/tklauser/numcpus v0.6.0 // indirect | ||
| github.com/yusufpapurcu/wmi v1.2.3 // indirect | ||
| go.opentelemetry.io/collector v0.82.0 // indirect | ||
| go.opentelemetry.io/collector/config/configtelemetry v0.82.0 // indirect | ||
| go.opentelemetry.io/collector/confmap v0.82.0 // indirect | ||
| go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect | ||
| go.opentelemetry.io/collector/semconv v0.82.0 // indirect | ||
| go.opentelemetry.io/otel v1.16.0 // indirect | ||
| go.opentelemetry.io/otel/metric v1.16.0 // indirect | ||
| go.opentelemetry.io/otel/trace v1.16.0 // indirect | ||
| go.uber.org/atomic v1.11.0 // indirect | ||
| go.uber.org/multierr v1.11.0 // indirect | ||
| golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect | ||
| golang.org/x/mod v0.12.0 // indirect | ||
| golang.org/x/net v0.14.0 // indirect | ||
| golang.org/x/sys v0.11.0 // indirect | ||
| golang.org/x/text v0.12.0 // indirect | ||
| golang.org/x/time v0.3.0 // indirect | ||
| golang.org/x/tools v0.12.0 // indirect | ||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect | ||
| google.golang.org/grpc v1.57.0 // indirect | ||
| google.golang.org/protobuf v1.31.0 // indirect | ||
| gopkg.in/yaml.v2 v2.4.0 // indirect | ||
| gopkg.in/yaml.v3 v3.0.1 // indirect | ||
| ) | ||
|
|
||
gord02 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog => ../../internal/datadog | ||
|
|
||
| replace github.com/DataDog/datadog-agent/pkg/proto => github.com/DataDog/datadog-agent/pkg/proto v0.48.0-beta.1 | ||
|
|
||
| replace github.com/DataDog/datadog-agent/pkg/trace => github.com/DataDog/datadog-agent/pkg/trace v0.48.0-beta.1 | ||
Uh oh!
There was an error while loading. Please reload this page.