Skip to content

Commit 2927ced

Browse files
committed
Integrate encoding::Encoder with gcp_pubsub sink
Signed-off-by: Pablo Sichert <[email protected]>
1 parent 0741d87 commit 2927ced

File tree

1 file changed

+52
-31
lines changed

1 file changed

+52
-31
lines changed

src/sinks/gcp/pubsub.rs

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
1-
use bytes::Bytes;
1+
use bytes::{Bytes, BytesMut};
22
use futures::{FutureExt, SinkExt};
33
use http::{Request, Uri};
44
use hyper::Body;
5+
use indoc::indoc;
56
use serde::{Deserialize, Serialize};
67
use serde_json::{json, Value};
78
use snafu::{ResultExt, Snafu};
9+
use tokio_util::codec::Encoder as _;
810

911
use crate::{
10-
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext, SinkDescription},
12+
codecs::Encoder,
13+
config::{
14+
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription,
15+
},
1116
event::Event,
1217
gcp::{GcpAuthConfig, GcpCredentials, Scope, PUBSUB_URL},
1318
http::HttpClient,
1419
sinks::{
1520
gcs_common::config::healthcheck_response,
1621
util::{
17-
encoding::{EncodingConfigWithDefault, EncodingConfiguration},
22+
encoding::{
23+
EncodingConfig, EncodingConfigAdapter, StandardEncodings,
24+
StandardEncodingsMigrator, Transformer,
25+
},
1826
http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
1927
BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
2028
},
@@ -41,27 +49,25 @@ impl SinkBatchSettings for PubsubDefaultBatchSettings {
4149
const TIMEOUT_SECS: f64 = 1.0;
4250
}
4351

44-
#[derive(Deserialize, Serialize, Debug, Clone, Default)]
45-
#[serde(deny_unknown_fields)]
52+
#[derive(Deserialize, Serialize, Debug, Clone)]
4653
pub struct PubsubConfig {
4754
pub project: String,
4855
pub topic: String,
56+
#[serde(default)]
4957
pub endpoint: Option<String>,
5058
#[serde(default = "default_skip_authentication")]
5159
pub skip_authentication: bool,
52-
#[serde(flatten)]
60+
#[serde(default, flatten)]
5361
pub auth: GcpAuthConfig,
5462

5563
#[serde(default)]
5664
pub batch: BatchConfig<PubsubDefaultBatchSettings>,
5765
#[serde(default)]
5866
pub request: TowerRequestConfig,
59-
#[serde(
60-
skip_serializing_if = "crate::serde::skip_serializing_if_default",
61-
default
62-
)]
63-
pub encoding: EncodingConfigWithDefault<Encoding>,
67+
#[serde(flatten)]
68+
encoding: EncodingConfigAdapter<EncodingConfig<StandardEncodings>, StandardEncodingsMigrator>,
6469

70+
#[serde(default)]
6571
pub tls: Option<TlsConfig>,
6672

6773
#[serde(
@@ -76,19 +82,20 @@ const fn default_skip_authentication() -> bool {
7682
false
7783
}
7884

79-
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
80-
#[serde(rename_all = "snake_case")]
81-
#[derivative(Default)]
82-
pub enum Encoding {
83-
#[derivative(Default)]
84-
Default,
85-
}
86-
8785
inventory::submit! {
8886
SinkDescription::new::<PubsubConfig>("gcp_pubsub")
8987
}
9088

91-
impl_generate_config_from_default!(PubsubConfig);
89+
impl GenerateConfig for PubsubConfig {
90+
fn generate_config() -> toml::Value {
91+
toml::from_str(indoc! {r#"
92+
project = "my-project"
93+
topic = "my-topic"
94+
encoding.codec = "json"
95+
"#})
96+
.unwrap()
97+
}
98+
}
9299

93100
#[async_trait::async_trait]
94101
#[typetag::serde(name = "gcp_pubsub")]
@@ -136,7 +143,8 @@ struct PubsubSink {
136143
api_key: Option<String>,
137144
creds: Option<GcpCredentials>,
138145
uri_base: String,
139-
encoding: EncodingConfigWithDefault<Encoding>,
146+
transformer: Transformer,
147+
encoder: Encoder<()>,
140148
}
141149

142150
impl PubsubSink {
@@ -157,11 +165,16 @@ impl PubsubSink {
157165
uri_base, config.project, config.topic,
158166
);
159167

168+
let transformer = config.encoding.transformer();
169+
let serializer = config.encoding.encoding();
170+
let encoder = Encoder::<()>::new(serializer);
171+
160172
Ok(Self {
161173
api_key: config.auth.api_key.clone(),
162-
encoding: config.encoding.clone(),
163174
creds,
164175
uri_base,
176+
transformer,
177+
encoder,
165178
})
166179
}
167180

@@ -177,17 +190,19 @@ impl PubsubSink {
177190
}
178191

179192
struct PubSubSinkEventEncoder {
180-
encoding: EncodingConfigWithDefault<Encoding>,
193+
transformer: Transformer,
194+
encoder: Encoder<()>,
181195
}
182196

183197
impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
184198
fn encode_event(&mut self, mut event: Event) -> Option<Value> {
185-
self.encoding.apply_rules(&mut event);
199+
self.transformer.transform(&mut event);
200+
let mut bytes = BytesMut::new();
201+
// Errors are handled by `Encoder`.
202+
self.encoder.encode(event, &mut bytes).ok()?;
186203
// Each event needs to be base64 encoded, and put into a JSON object
187204
// as the `data` item.
188-
let log = event.into_log();
189-
let json = serde_json::to_string(&log).unwrap();
190-
Some(json!({ "data": base64::encode(&json) }))
205+
Some(json!({ "data": base64::encode(&bytes) }))
191206
}
192207
}
193208

@@ -199,7 +214,8 @@ impl HttpSink for PubsubSink {
199214

200215
fn build_encoder(&self) -> Self::Encoder {
201216
PubSubSinkEventEncoder {
202-
encoding: self.encoding.clone(),
217+
transformer: self.transformer.clone(),
218+
encoder: self.encoder.clone(),
203219
}
204220
}
205221

@@ -274,11 +290,16 @@ mod integration_tests {
274290

275291
fn config(topic: &str) -> PubsubConfig {
276292
PubsubConfig {
277-
endpoint: Some(gcp::PUBSUB_ADDRESS.clone()),
278-
skip_authentication: true,
279293
project: PROJECT.into(),
280294
topic: topic.into(),
281-
..Default::default()
295+
endpoint: Some(gcp::PUBSUB_ADDRESS.clone()),
296+
skip_authentication: true,
297+
auth: Default::default(),
298+
batch: Default::default(),
299+
request: Default::default(),
300+
encoding: EncodingConfig::from(StandardEncodings::Json).into(),
301+
tls: Default::default(),
302+
acknowledgements: Default::default(),
282303
}
283304
}
284305

0 commit comments

Comments
 (0)