Skip to content

Commit 51f0188

Browse files
prontclaude
andcommitted
feat(opentelemetry source): Support per-signal OTLP decoding configuration (#24455)
Allow independent configuration of OTLP decoding for logs, metrics, and traces. Maintains backward compatibility with boolean configuration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent c788f93 commit 51f0188

File tree

2 files changed

+429
-18
lines changed

2 files changed

+429
-18
lines changed

src/sources/opentelemetry/config.rs

Lines changed: 135 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,100 @@ pub const LOGS: &str = "logs";
4747
pub const METRICS: &str = "metrics";
4848
pub const TRACES: &str = "traces";
4949

50+
/// Configuration for OTLP decoding behavior.
51+
///
52+
/// This configuration controls how OpenTelemetry Protocol (OTLP) data is decoded for each
53+
/// signal type (logs, metrics, traces).
54+
///
55+
/// When a signal is configured to use OTLP decoding (`true`), the raw OTLP format is preserved,
56+
/// allowing the data to be forwarded to downstream OTLP collectors without transformation.
57+
/// When set to `false`, the signal is converted to Vector's native event format, enabling
58+
/// compatibility with Vector's transforms and sinks.
59+
///
60+
/// # Important Considerations
61+
///
62+
/// When OTLP decoding is enabled for metrics:
63+
/// - Metrics are parsed as logs while preserving the OTLP format
64+
/// - Vector's metric transforms will NOT be compatible with this output
65+
/// - The events can be forwarded directly to a downstream OTLP collector
66+
///
67+
/// # Configuration Examples
68+
///
69+
/// This configuration supports both a simple boolean form (for backward compatibility)
70+
/// and a per-signal configuration:
71+
///
72+
/// ## Simple boolean form (applies to all signals):
73+
/// ```yaml
74+
/// use_otlp_decoding: true # All signals preserve OTLP format
75+
/// # or
76+
/// use_otlp_decoding: false # All signals use Vector native format (default)
77+
/// ```
78+
///
79+
/// ## Per-signal configuration:
80+
/// ```yaml
81+
/// use_otlp_decoding:
82+
/// logs: false # Convert to Vector native format
83+
/// metrics: false # Convert to Vector native format
84+
/// traces: true # Preserve OTLP format
85+
/// ```
86+
#[configurable_component]
87+
#[derive(Clone, Debug, Default, PartialEq, Eq)]
88+
#[serde(deny_unknown_fields)]
89+
pub struct OtlpDecodingConfig {
90+
/// Whether to use OTLP decoding for logs.
91+
///
92+
/// When `true`, logs preserve their OTLP format.
93+
/// When `false` (default), logs are converted to Vector's native format.
94+
#[serde(default)]
95+
pub logs: bool,
96+
97+
/// Whether to use OTLP decoding for metrics.
98+
///
99+
/// When `true`, metrics preserve their OTLP format but are processed as logs.
100+
/// When `false` (default), metrics are converted to Vector's native metric format.
101+
#[serde(default)]
102+
pub metrics: bool,
103+
104+
/// Whether to use OTLP decoding for traces.
105+
///
106+
/// When `true`, traces preserve their OTLP format.
107+
/// When `false` (default), traces are converted to Vector's native format.
108+
#[serde(default)]
109+
pub traces: bool,
110+
}
111+
112+
impl From<bool> for OtlpDecodingConfig {
113+
/// Converts a boolean value to an OtlpDecodingConfig.
114+
///
115+
/// This provides backward compatibility with the previous boolean configuration.
116+
/// - `true` enables OTLP decoding for all signals
117+
/// - `false` disables OTLP decoding for all signals (uses Vector native format)
118+
fn from(value: bool) -> Self {
119+
Self {
120+
logs: value,
121+
metrics: value,
122+
traces: value,
123+
}
124+
}
125+
}
126+
127+
impl OtlpDecodingConfig {
128+
/// Returns true if any signal is configured to use OTLP decoding.
129+
pub const fn any_enabled(&self) -> bool {
130+
self.logs || self.metrics || self.traces
131+
}
132+
133+
/// Returns true if all signals are configured to use OTLP decoding.
134+
pub const fn all_enabled(&self) -> bool {
135+
self.logs && self.metrics && self.traces
136+
}
137+
138+
/// Returns true if signals have mixed configuration (some enabled, some disabled).
139+
pub const fn is_mixed(&self) -> bool {
140+
self.any_enabled() && !self.all_enabled()
141+
}
142+
}
143+
50144
/// Configuration for the `opentelemetry` source.
51145
#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
52146
#[derive(Clone, Debug)]
@@ -67,14 +161,14 @@ pub struct OpentelemetryConfig {
67161
#[serde(default)]
68162
pub log_namespace: Option<bool>,
69163

70-
/// Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly.
164+
/// Configuration for how OTLP data is decoded for each signal type.
71165
///
72-
/// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format.
73-
/// This means that components that work on metrics, will not be compatible with this output.
74-
/// However, these events can be forwarded directly to a downstream OTEL collector.
166+
/// This field supports both a simple boolean form (for backward compatibility) and a
167+
/// per-signal configuration. See [`OtlpDecodingConfig`] for detailed documentation
168+
/// and configuration examples.
75169
#[configurable(derived)]
76-
#[serde(default)]
77-
pub use_otlp_decoding: bool,
170+
#[serde(default, deserialize_with = "bool_or_struct")]
171+
pub use_otlp_decoding: OtlpDecodingConfig,
78172
}
79173

80174
/// Configuration for the `opentelemetry` gRPC server.
@@ -152,18 +246,24 @@ impl GenerateConfig for OpentelemetryConfig {
152246
http: example_http_config(),
153247
acknowledgements: Default::default(),
154248
log_namespace: None,
155-
use_otlp_decoding: false,
249+
use_otlp_decoding: OtlpDecodingConfig::default(),
156250
})
157251
.unwrap()
158252
}
159253
}
160254

161255
impl OpentelemetryConfig {
162-
fn get_signal_deserializer(
256+
pub(crate) fn get_signal_deserializer(
163257
&self,
164258
signal_type: OtlpSignalType,
165259
) -> vector_common::Result<Option<OtlpDeserializer>> {
166-
if self.use_otlp_decoding {
260+
let should_use_otlp = match signal_type {
261+
OtlpSignalType::Logs => self.use_otlp_decoding.logs,
262+
OtlpSignalType::Metrics => self.use_otlp_decoding.metrics,
263+
OtlpSignalType::Traces => self.use_otlp_decoding.traces,
264+
};
265+
266+
if should_use_otlp {
167267
Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
168268
signal_type,
169269
]))))
@@ -183,6 +283,17 @@ impl SourceConfig for OpentelemetryConfig {
183283

184284
let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
185285

286+
// Log info message when using mixed OTLP decoding formats
287+
if self.use_otlp_decoding.is_mixed() {
288+
info!(
289+
message = "Using mixed OTLP decoding configuration.",
290+
logs_otlp = self.use_otlp_decoding.logs,
291+
metrics_otlp = self.use_otlp_decoding.metrics,
292+
traces_otlp = self.use_otlp_decoding.traces,
293+
note = "Signals with OTLP decoding enabled will preserve raw format; others will use Vector native format."
294+
);
295+
}
296+
186297
let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
187298
let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
188299
let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
@@ -352,16 +463,25 @@ impl SourceConfig for OpentelemetryConfig {
352463
}
353464
};
354465

355-
let metrics_output = if self.use_otlp_decoding {
466+
let logs_output = if self.use_otlp_decoding.logs {
467+
SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(LOGS)
468+
} else {
469+
SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS)
470+
};
471+
472+
let metrics_output = if self.use_otlp_decoding.metrics {
356473
SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
357474
} else {
358475
SourceOutput::new_metrics().with_port(METRICS)
359476
};
360-
vec![
361-
SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
362-
metrics_output,
363-
SourceOutput::new_traces().with_port(TRACES),
364-
]
477+
478+
let traces_output = if self.use_otlp_decoding.traces {
479+
SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(TRACES)
480+
} else {
481+
SourceOutput::new_traces().with_port(TRACES)
482+
};
483+
484+
vec![logs_output, metrics_output, traces_output]
365485
}
366486

367487
fn resources(&self) -> Vec<Resource> {

0 commit comments

Comments
 (0)