Skip to content

Commit f269e8d

Browse files
authored
fix(codecs): Improve deserialization of EncodingConfigAdapter / fix overriding framing (#12750)
* Improve `EncodingConfigAdapter` deserialization * Add note about generic error message of untagged enum EncodingConfig Signed-off-by: Pablo Sichert <[email protected]>
1 parent f238e5e commit f269e8d

File tree

19 files changed

+364
-303
lines changed

19 files changed

+364
-303
lines changed

lib/codecs/src/encoding/framing/character_delimited.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl CharacterDelimitedEncoderConfig {
2626
}
2727

2828
/// Options for building a `CharacterDelimitedEncoder`.
29-
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
29+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
3030
pub struct CharacterDelimitedEncoderOptions {
3131
/// The character that delimits byte sequences.
3232
#[serde(with = "vector_core::serde::ascii_char")]

lib/codecs/src/encoding/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl From<std::io::Error> for Error {
4949
// Unfortunately, copying options of the nested enum variants is necessary
5050
// since `serde` doesn't allow `flatten`ing these:
5151
// https://github.com/serde-rs/serde/issues/1402.
52-
#[derive(Debug, Clone, Deserialize, Serialize)]
52+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
5353
#[serde(tag = "method", rename_all = "snake_case")]
5454
pub enum FramingConfig {
5555
/// Configures the `BytesEncoder`.

src/sinks/aws_cloudwatch_logs/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,12 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
8080
}
8181

8282
#[derive(Deserialize, Serialize, Debug, Clone)]
83+
#[serde(deny_unknown_fields)]
8384
pub struct CloudwatchLogsSinkConfig {
8485
pub group_name: Template,
8586
pub stream_name: Template,
8687
#[serde(flatten)]
8788
pub region: RegionOrEndpoint,
88-
#[serde(flatten)]
8989
pub encoding:
9090
EncodingConfigAdapter<EncodingConfig<StandardEncodings>, StandardEncodingsMigrator>,
9191
pub create_missing_group: Option<bool>,

src/sinks/aws_kinesis_firehose/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ impl SinkBatchSettings for KinesisFirehoseDefaultBatchSettings {
5151
}
5252

5353
#[derive(Deserialize, Serialize, Debug, Clone)]
54+
#[serde(deny_unknown_fields)]
5455
pub struct KinesisFirehoseSinkConfig {
5556
pub stream_name: String,
5657
#[serde(flatten)]
5758
pub region: RegionOrEndpoint,
58-
#[serde(flatten)]
5959
pub encoding:
6060
EncodingConfigAdapter<EncodingConfig<StandardEncodings>, StandardEncodingsMigrator>,
6161
#[serde(default)]

src/sinks/aws_kinesis_streams/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,12 @@ impl SinkBatchSettings for KinesisDefaultBatchSettings {
104104
}
105105

106106
#[derive(Deserialize, Serialize, Debug, Clone)]
107+
#[serde(deny_unknown_fields)]
107108
pub struct KinesisSinkConfig {
108109
pub stream_name: String,
109110
pub partition_key_field: Option<String>,
110111
#[serde(flatten)]
111112
pub region: RegionOrEndpoint,
112-
#[serde(flatten)]
113113
pub encoding:
114114
EncodingConfigAdapter<EncodingConfig<StandardEncodings>, StandardEncodingsMigrator>,
115115
#[serde(default)]

src/sinks/aws_s3/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
3636
const DEFAULT_FILENAME_APPEND_UUID: bool = true;
3737

3838
#[derive(Deserialize, Serialize, Debug, Clone)]
39+
#[serde(deny_unknown_fields)]
3940
pub struct S3SinkConfig {
4041
pub bucket: String,
4142
pub key_prefix: Option<String>,

src/sinks/aws_sqs/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ impl EncodingConfigMigrator for EncodingMigrator {
4646
}
4747

4848
#[derive(Deserialize, Serialize, Debug, Clone)]
49+
#[serde(deny_unknown_fields)]
4950
pub struct SqsSinkConfig {
5051
pub queue_url: String,
5152
#[serde(flatten)]
5253
pub region: RegionOrEndpoint,
53-
#[serde(flatten)]
5454
pub encoding: EncodingConfigAdapter<EncodingConfig<Encoding>, EncodingMigrator>,
5555
pub message_group_id: Option<String>,
5656
pub message_deduplication_id: Option<String>,

src/sinks/azure_blob/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::{
3131
};
3232

3333
#[derive(Deserialize, Serialize, Debug, Clone)]
34+
#[serde(deny_unknown_fields)]
3435
pub struct AzureBlobSinkConfig {
3536
pub connection_string: String,
3637
pub(super) container_name: String,

src/sinks/file/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl EncodingConfigWithFramingMigrator for EncodingMigrator {
6161
}
6262

6363
#[derive(Deserialize, Serialize, Debug)]
64+
#[serde(deny_unknown_fields)]
6465
pub struct FileSinkConfig {
6566
pub path: Template,
6667
pub idle_timeout_secs: Option<u64>,

src/sinks/gcp/cloud_storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub enum GcsHealthcheckError {
6262
const NAME: &str = "gcp_cloud_storage";
6363

6464
#[derive(Deserialize, Serialize, Debug)]
65+
#[serde(deny_unknown_fields)]
6566
pub struct GcsSinkConfig {
6667
bucket: String,
6768
acl: Option<GcsPredefinedAcl>,

0 commit comments

Comments
 (0)