Skip to content

Commit b471333

Browse files
authored
enhancement(splunk_hec sink, humio_logs sink, codecs): Integrate encoding::Encoder with splunk_hec/humio_logs sink (#12495)
* Integrate `encoding::Encoder` with `splunk_hec` sink * Test that `encode_json` implementation matches `encode * Rename `encode_json` -> `to_json_value` * Make sink input type dependend on available codec input type Signed-off-by: Pablo Sichert <[email protected]>
1 parent d5f6913 commit b471333

File tree

13 files changed

+294
-157
lines changed

13 files changed

+294
-157
lines changed

lib/codecs/src/encoding/format/json.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ impl JsonSerializer {
4040
pub const fn new() -> Self {
4141
Self
4242
}
43+
44+
/// Encode event and represent it as JSON value.
45+
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, serde_json::Error> {
46+
match event {
47+
Event::Log(log) => serde_json::to_value(&log),
48+
Event::Metric(metric) => serde_json::to_value(&metric),
49+
Event::Trace(trace) => serde_json::to_value(&trace),
50+
}
51+
}
4352
}
4453

4554
impl Encoder<Event> for JsonSerializer {
@@ -76,4 +85,19 @@ mod tests {
7685

7786
assert_eq!(bytes.freeze(), r#"{"foo":"bar"}"#);
7887
}
88+
89+
#[test]
90+
fn serialize_equals_to_json_value() {
91+
let event = Event::from(btreemap! {
92+
"foo" => Value::from("bar")
93+
});
94+
let mut serializer = JsonSerializer::new();
95+
let mut bytes = BytesMut::new();
96+
97+
serializer.encode(event.clone(), &mut bytes).unwrap();
98+
99+
let json = serializer.to_json_value(event).unwrap();
100+
101+
assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
102+
}
79103
}

lib/codecs/src/encoding/format/native_json.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ impl NativeJsonSerializerConfig {
2828
#[derive(Debug, Clone)]
2929
pub struct NativeJsonSerializer;
3030

31+
impl NativeJsonSerializer {
32+
/// Creates a new `NativeJsonSerializer`.
33+
pub const fn new() -> Self {
34+
Self
35+
}
36+
37+
/// Encode event and represent it as native JSON value.
38+
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, serde_json::Error> {
39+
serde_json::to_value(&event)
40+
}
41+
}
42+
3143
impl Encoder<Event> for NativeJsonSerializer {
3244
type Error = vector_core::Error;
3345

@@ -36,3 +48,40 @@ impl Encoder<Event> for NativeJsonSerializer {
3648
serde_json::to_writer(writer, &event).map_err(Into::into)
3749
}
3850
}
51+
52+
#[cfg(test)]
53+
mod tests {
54+
use bytes::BytesMut;
55+
use vector_common::btreemap;
56+
use vector_core::event::Value;
57+
58+
use super::*;
59+
60+
#[test]
61+
fn serialize_json() {
62+
let event = Event::from(btreemap! {
63+
"foo" => Value::from("bar")
64+
});
65+
let mut serializer = NativeJsonSerializer::new();
66+
let mut bytes = BytesMut::new();
67+
68+
serializer.encode(event, &mut bytes).unwrap();
69+
70+
assert_eq!(bytes.freeze(), r#"{"log":{"foo":"bar"}}"#);
71+
}
72+
73+
#[test]
74+
fn serialize_equals_to_json_value() {
75+
let event = Event::from(btreemap! {
76+
"foo" => Value::from("bar")
77+
});
78+
let mut serializer = NativeJsonSerializer::new();
79+
let mut bytes = BytesMut::new();
80+
81+
serializer.encode(event.clone(), &mut bytes).unwrap();
82+
83+
let json = serializer.to_json_value(event).unwrap();
84+
85+
assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
86+
}
87+
}

lib/codecs/src/encoding/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,38 @@ pub enum Serializer {
290290
Text(TextSerializer),
291291
}
292292

293+
impl Serializer {
294+
/// Check if the serializer supports encoding to JSON via `Serializer::to_json_value`.
295+
pub fn supports_json(&self) -> bool {
296+
match self {
297+
Serializer::Json(_) | Serializer::NativeJson(_) => true,
298+
Serializer::Logfmt(_)
299+
| Serializer::Text(_)
300+
| Serializer::Native(_)
301+
| Serializer::RawMessage(_) => false,
302+
}
303+
}
304+
305+
/// Encode event and represent it as JSON value.
306+
///
307+
/// # Panics
308+
///
309+
/// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
310+
/// if you need to determine the capability to encode to JSON at runtime.
311+
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, serde_json::Error> {
312+
match self {
313+
Serializer::Json(serializer) => serializer.to_json_value(event),
314+
Serializer::NativeJson(serializer) => serializer.to_json_value(event),
315+
Serializer::Logfmt(_)
316+
| Serializer::Text(_)
317+
| Serializer::Native(_)
318+
| Serializer::RawMessage(_) => {
319+
panic!("Serializer does not support JSON")
320+
}
321+
}
322+
}
323+
}
324+
293325
impl From<JsonSerializer> for Serializer {
294326
fn from(serializer: JsonSerializer) -> Self {
295327
Self::Json(serializer)

src/sinks/humio/logs.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use serde::{Deserialize, Serialize};
22

3-
use super::{host_key, Encoding};
3+
use super::host_key;
44
use crate::{
55
config::{
66
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription,
@@ -11,9 +11,12 @@ use crate::{
1111
acknowledgements::HecClientAcknowledgementsConfig, timestamp_key,
1212
SplunkHecDefaultBatchSettings,
1313
},
14-
logs::config::HecLogsSinkConfig,
14+
logs::config::{HecEncoding, HecEncodingMigrator, HecLogsSinkConfig},
15+
},
16+
util::{
17+
encoding::{EncodingConfig, EncodingConfigAdapter},
18+
BatchConfig, Compression, TowerRequestConfig,
1519
},
16-
util::{encoding::EncodingConfig, BatchConfig, Compression, TowerRequestConfig},
1720
Healthcheck, VectorSink,
1821
},
1922
template::Template,
@@ -23,13 +26,14 @@ use crate::{
2326
const HOST: &str = "https://cloud.humio.com";
2427

2528
#[derive(Clone, Debug, Deserialize, Serialize)]
29+
#[serde(deny_unknown_fields)]
2630
pub struct HumioLogsConfig {
2731
pub(super) token: String,
2832
// Deprecated name
2933
#[serde(alias = "host")]
3034
pub(super) endpoint: Option<String>,
3135
pub(super) source: Option<Template>,
32-
pub(super) encoding: EncodingConfig<Encoding>,
36+
pub(super) encoding: EncodingConfigAdapter<EncodingConfig<HecEncoding>, HecEncodingMigrator>,
3337
pub(super) event_type: Option<Template>,
3438
#[serde(default = "host_key")]
3539
pub(super) host_key: String,
@@ -70,7 +74,7 @@ impl GenerateConfig for HumioLogsConfig {
7074
token: "${HUMIO_TOKEN}".to_owned(),
7175
endpoint: None,
7276
source: None,
73-
encoding: Encoding::Json.into(),
77+
encoding: EncodingConfig::from(HecEncoding::Json).into(),
7478
event_type: None,
7579
indexed_fields: vec![],
7680
index: None,
@@ -95,7 +99,7 @@ impl SinkConfig for HumioLogsConfig {
9599
}
96100

97101
fn input(&self) -> Input {
98-
Input::log()
102+
Input::new(self.encoding.config().input_type())
99103
}
100104

101105
fn sink_type(&self) -> &'static str {
@@ -120,7 +124,7 @@ impl HumioLogsConfig {
120124
sourcetype: self.event_type.clone(),
121125
source: self.source.clone(),
122126
timestamp_nanos_key: self.timestamp_nanos_key.clone(),
123-
encoding: self.encoding.clone().into_encoding(),
127+
encoding: self.encoding.clone(),
124128
compression: self.compression,
125129
batch: self.batch,
126130
request: self.request,
@@ -302,7 +306,7 @@ mod integration_tests {
302306
token: token.to_string(),
303307
endpoint: Some(humio_address()),
304308
source: None,
305-
encoding: Encoding::Json.into(),
309+
encoding: EncodingConfig::from(HecEncoding::Json).into(),
306310
event_type: None,
307311
host_key: log_schema().host_key().to_string(),
308312
indexed_fields: vec![],

src/sinks/humio/metrics.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@ use indoc::indoc;
55
use serde::{Deserialize, Serialize};
66
use vector_core::{sink::StreamSink, transform::Transform};
77

8-
use super::{host_key, logs::HumioLogsConfig, Encoding};
8+
use super::{host_key, logs::HumioLogsConfig};
99
use crate::{
1010
config::{
1111
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription,
1212
TransformConfig, TransformContext,
1313
},
1414
event::{Event, EventArray, EventContainer},
1515
sinks::{
16-
splunk_hec::common::SplunkHecDefaultBatchSettings,
17-
util::{encoding::EncodingConfig, BatchConfig, Compression, TowerRequestConfig},
16+
splunk_hec::{
17+
common::SplunkHecDefaultBatchSettings,
18+
logs::config::{HecEncoding, HecEncodingMigrator},
19+
},
20+
util::{
21+
encoding::{EncodingConfig, EncodingConfigAdapter},
22+
BatchConfig, Compression, TowerRequestConfig,
23+
},
1824
Healthcheck, VectorSink,
1925
},
2026
template::Template,
@@ -23,6 +29,7 @@ use crate::{
2329
};
2430

2531
#[derive(Clone, Debug, Deserialize, Serialize)]
32+
#[serde(deny_unknown_fields)]
2633
struct HumioMetricsConfig {
2734
#[serde(flatten)]
2835
transform: MetricToLogConfig,
@@ -31,7 +38,7 @@ struct HumioMetricsConfig {
3138
#[serde(alias = "host")]
3239
pub(in crate::sinks::humio) endpoint: Option<String>,
3340
source: Option<Template>,
34-
encoding: EncodingConfig<Encoding>,
41+
encoding: EncodingConfigAdapter<EncodingConfig<HecEncoding>, HecEncodingMigrator>,
3542
event_type: Option<Template>,
3643
#[serde(default = "host_key")]
3744
host_key: String,

src/sinks/humio/mod.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,6 @@
11
pub mod logs;
22
pub mod metrics;
33

4-
use serde::{Deserialize, Serialize};
5-
6-
use crate::sinks::splunk_hec;
7-
8-
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
9-
#[serde(rename_all = "snake_case")]
10-
enum Encoding {
11-
Json,
12-
Text,
13-
}
14-
15-
impl From<Encoding> for splunk_hec::logs::encoder::HecLogsEncoder {
16-
fn from(v: Encoding) -> Self {
17-
match v {
18-
Encoding::Json => splunk_hec::logs::encoder::HecLogsEncoder::Json,
19-
Encoding::Text => splunk_hec::logs::encoder::HecLogsEncoder::Text,
20-
}
21-
}
22-
}
23-
244
fn host_key() -> String {
255
crate::config::log_schema().host_key().to_string()
266
}

src/sinks/splunk_hec/logs/config.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use std::sync::Arc;
22

3+
use codecs::{encoding::SerializerConfig, JsonSerializerConfig, TextSerializerConfig};
34
use futures_util::FutureExt;
45
use serde::{Deserialize, Serialize};
56
use tower::ServiceBuilder;
67
use vector_core::sink::VectorSink;
78

89
use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
910
use crate::{
11+
codecs::Encoder,
1012
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
1113
http::HttpClient,
1214
sinks::{
@@ -17,15 +19,37 @@ use crate::{
1719
timestamp_key, SplunkHecDefaultBatchSettings,
1820
},
1921
util::{
20-
encoding::EncodingConfig, http::HttpRetryLogic, BatchConfig, Compression,
21-
ServiceBuilderExt, TowerRequestConfig,
22+
encoding::{EncodingConfig, EncodingConfigAdapter, EncodingConfigMigrator},
23+
http::HttpRetryLogic,
24+
BatchConfig, Compression, ServiceBuilderExt, TowerRequestConfig,
2225
},
2326
Healthcheck,
2427
},
2528
template::Template,
2629
tls::TlsConfig,
2730
};
2831

32+
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
33+
#[serde(rename_all = "snake_case")]
34+
pub enum HecEncoding {
35+
Json,
36+
Text,
37+
}
38+
39+
#[derive(Debug, Clone, Serialize, Deserialize)]
40+
pub struct HecEncodingMigrator;
41+
42+
impl EncodingConfigMigrator for HecEncodingMigrator {
43+
type Codec = HecEncoding;
44+
45+
fn migrate(codec: &Self::Codec) -> SerializerConfig {
46+
match codec {
47+
HecEncoding::Text => TextSerializerConfig::new().into(),
48+
HecEncoding::Json => JsonSerializerConfig::new().into(),
49+
}
50+
}
51+
}
52+
2953
#[derive(Deserialize, Serialize, Debug, Clone)]
3054
#[serde(deny_unknown_fields)]
3155
pub struct HecLogsSinkConfig {
@@ -40,7 +64,7 @@ pub struct HecLogsSinkConfig {
4064
pub index: Option<Template>,
4165
pub sourcetype: Option<Template>,
4266
pub source: Option<Template>,
43-
pub encoding: EncodingConfig<HecLogsEncoder>,
67+
pub encoding: EncodingConfigAdapter<EncodingConfig<HecEncoding>, HecEncodingMigrator>,
4468
#[serde(default)]
4569
pub compression: Compression,
4670
#[serde(default)]
@@ -66,7 +90,7 @@ impl GenerateConfig for HecLogsSinkConfig {
6690
index: None,
6791
sourcetype: None,
6892
source: None,
69-
encoding: HecLogsEncoder::Text.into(),
93+
encoding: EncodingConfig::from(HecEncoding::Text).into(),
7094
compression: Compression::default(),
7195
batch: BatchConfig::default(),
7296
request: TowerRequestConfig::default(),
@@ -96,7 +120,7 @@ impl SinkConfig for HecLogsSinkConfig {
96120
}
97121

98122
fn input(&self) -> Input {
99-
Input::log()
123+
Input::new(self.encoding.config().input_type())
100124
}
101125

102126
fn sink_type(&self) -> &'static str {
@@ -120,8 +144,15 @@ impl HecLogsSinkConfig {
120144
None
121145
};
122146

147+
let transformer = self.encoding.transformer();
148+
let serializer = self.encoding.clone().encoding();
149+
let encoder = Encoder::<()>::new(serializer);
150+
let encoder = HecLogsEncoder {
151+
transformer,
152+
encoder,
153+
};
123154
let request_builder = HecLogsRequestBuilder {
124-
encoding: self.encoding.clone(),
155+
encoder,
125156
compression: self.compression,
126157
};
127158

@@ -167,7 +198,6 @@ impl HecLogsSinkConfig {
167198

168199
// Add a compatibility alias to avoid breaking existing configs
169200
#[derive(Clone, Debug, Deserialize, Serialize)]
170-
#[serde(deny_unknown_fields)]
171201
struct HecSinkCompatConfig {
172202
#[serde(flatten)]
173203
config: HecLogsSinkConfig,

0 commit comments

Comments
 (0)