diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 34b3a7d5da753..31a87d5f4fbab 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -1,20 +1,28 @@ -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use futures::{FutureExt, SinkExt}; use http::{Request, Uri}; use hyper::Body; +use indoc::indoc; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use snafu::{ResultExt, Snafu}; +use tokio_util::codec::Encoder as _; use crate::{ - config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext, SinkDescription}, + codecs::Encoder, + config::{ + AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription, + }, event::Event, gcp::{GcpAuthConfig, GcpCredentials, Scope, PUBSUB_URL}, http::HttpClient, sinks::{ gcs_common::config::healthcheck_response, util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{ + EncodingConfig, EncodingConfigAdapter, StandardEncodings, + StandardEncodingsMigrator, Transformer, + }, http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig, }, @@ -41,27 +49,24 @@ impl SinkBatchSettings for PubsubDefaultBatchSettings { const TIMEOUT_SECS: f64 = 1.0; } -#[derive(Deserialize, Serialize, Debug, Clone, Default)] -#[serde(deny_unknown_fields)] +#[derive(Deserialize, Serialize, Debug, Clone)] pub struct PubsubConfig { pub project: String, pub topic: String, + #[serde(default)] pub endpoint: Option, #[serde(default = "default_skip_authentication")] pub skip_authentication: bool, - #[serde(flatten)] + #[serde(default, flatten)] pub auth: GcpAuthConfig, #[serde(default)] pub batch: BatchConfig, #[serde(default)] pub request: TowerRequestConfig, - #[serde( - skip_serializing_if = "crate::serde::skip_serializing_if_default", - default - )] - pub encoding: EncodingConfigWithDefault, + encoding: EncodingConfigAdapter, StandardEncodingsMigrator>, + #[serde(default)] pub tls: Option, #[serde( @@ -76,19 +81,20 @@ const fn default_skip_authentication() -> bool { false } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - inventory::submit! { SinkDescription::new::("gcp_pubsub") } -impl_generate_config_from_default!(PubsubConfig); +impl GenerateConfig for PubsubConfig { + fn generate_config() -> toml::Value { + toml::from_str(indoc! {r#" + project = "my-project" + topic = "my-topic" + encoding.codec = "json" + "#}) + .unwrap() + } +} #[async_trait::async_trait] #[typetag::serde(name = "gcp_pubsub")] @@ -120,7 +126,7 @@ impl SinkConfig for PubsubConfig { } fn input(&self) -> Input { - Input::log() + Input::new(self.encoding.config().input_type()) } fn sink_type(&self) -> &'static str { @@ -136,7 +142,8 @@ struct PubsubSink { api_key: Option, creds: Option, uri_base: String, - encoding: EncodingConfigWithDefault, + transformer: Transformer, + encoder: Encoder<()>, } impl PubsubSink { @@ -157,11 +164,16 @@ impl PubsubSink { uri_base, config.project, config.topic, ); + let transformer = config.encoding.transformer(); + let serializer = config.encoding.encoding(); + let encoder = Encoder::<()>::new(serializer); + Ok(Self { api_key: config.auth.api_key.clone(), - encoding: config.encoding.clone(), creds, uri_base, + transformer, + encoder, }) } @@ -177,17 +189,19 @@ impl PubsubSink { } struct PubSubSinkEventEncoder { - encoding: EncodingConfigWithDefault, + transformer: Transformer, + encoder: Encoder<()>, } impl HttpEventEncoder for PubSubSinkEventEncoder { fn encode_event(&mut self, mut event: Event) -> Option { - self.encoding.apply_rules(&mut event); + self.transformer.transform(&mut event); + let mut bytes = BytesMut::new(); + // Errors are handled by `Encoder`. + self.encoder.encode(event, &mut bytes).ok()?; // Each event needs to be base64 encoded, and put into a JSON object // as the `data` item. - let log = event.into_log(); - let json = serde_json::to_string(&log).unwrap(); - Some(json!({ "data": base64::encode(&json) })) + Some(json!({ "data": base64::encode(&bytes) })) } } @@ -199,7 +213,8 @@ impl HttpSink for PubsubSink { fn build_encoder(&self) -> Self::Encoder { PubSubSinkEventEncoder { - encoding: self.encoding.clone(), + transformer: self.transformer.clone(), + encoder: self.encoder.clone(), } } @@ -249,6 +264,7 @@ mod tests { let config: PubsubConfig = toml::from_str(indoc! {r#" project = "project" topic = "topic" + encoding.codec = "json" "#}) .unwrap(); if config.build(SinkContext::new_test()).await.is_ok() { @@ -274,11 +290,16 @@ mod integration_tests { fn config(topic: &str) -> PubsubConfig { PubsubConfig { - endpoint: Some(gcp::PUBSUB_ADDRESS.clone()), - skip_authentication: true, project: PROJECT.into(), topic: topic.into(), - ..Default::default() + endpoint: Some(gcp::PUBSUB_ADDRESS.clone()), + skip_authentication: true, + auth: Default::default(), + batch: Default::default(), + request: Default::default(), + encoding: EncodingConfig::from(StandardEncodings::Json).into(), + tls: Default::default(), + acknowledgements: Default::default(), } } diff --git a/website/content/en/highlights/2022-05-23-0-23-0-upgrade-guide.md b/website/content/en/highlights/2022-05-23-0-23-0-upgrade-guide.md index d7c184da62fd4..b0bc24c2e5333 100644 --- a/website/content/en/highlights/2022-05-23-0-23-0-upgrade-guide.md +++ b/website/content/en/highlights/2022-05-23-0-23-0-upgrade-guide.md @@ -2,7 +2,7 @@ date: "2022-06-16" title: "0.23 Upgrade Guide" description: "An upgrade guide that addresses breaking changes in 0.23.0" -authors: ["akx", "jszwedko", "spencergilbert", "fuchsnj"] +authors: ["akx", "jszwedko", "spencergilbert", "fuchsnj", "pablosichert"] release: "0.23.0" hide_on_release_notes: false badges: @@ -14,6 +14,7 @@ Vector's 0.23.0 release includes **breaking changes**: 1. [The `.deb` package no longer enables and starts the Vector systemd service](#systemd-autostart) 2. [VRL type definition updates](#vrl-type-def) 3. ["remove_empty" option dropped from VRL's `parse_grok` and `parse_groks`](#vrl-parse_grok) +4. [`gcp_pubsub` sink requires setting "encoding" option](#sinks-mandatory-encoding) We cover them below to help you upgrade quickly: @@ -65,3 +66,14 @@ parsed = parse_grok!(.message, "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} expected = { "timestamp": "", "level": "", "message": ""} parsed = merge(expected, parsed) ``` + +#### [`gcp_pubsub` sink requires setting "encoding" option] {#sinks-mandatory-encoding} + +The `gcp_pubsub` sink now supports a variety of codecs. To encode your logs as JSON before +publishing them to Cloud Pub/Sub, add the following encoding option + +```toml +encoding.codec = "json" +``` + +to the config of your `gcp_pubsub` sink. diff --git a/website/cue/reference/components/sinks/gcp_pubsub.cue b/website/cue/reference/components/sinks/gcp_pubsub.cue index 1ea05de3c1eca..de1db51723660 100644 --- a/website/cue/reference/components/sinks/gcp_pubsub.cue +++ b/website/cue/reference/components/sinks/gcp_pubsub.cue @@ -26,7 +26,10 @@ components: sinks: gcp_pubsub: { compression: enabled: false encoding: { enabled: true - codec: enabled: false + codec: { + enabled: true + enum: ["json", "text"] + } } proxy: enabled: true request: {