Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 53 additions & 32 deletions src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use futures::{FutureExt, SinkExt};
use http::{Request, Uri};
use hyper::Body;
use indoc::indoc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use snafu::{ResultExt, Snafu};
use tokio_util::codec::Encoder as _;

use crate::{
config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext, SinkDescription},
codecs::Encoder,
config::{
AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, SinkDescription,
},
event::Event,
gcp::{GcpAuthConfig, GcpCredentials, Scope, PUBSUB_URL},
http::HttpClient,
sinks::{
gcs_common::config::healthcheck_response,
util::{
encoding::{EncodingConfigWithDefault, EncodingConfiguration},
encoding::{
EncodingConfig, EncodingConfigAdapter, StandardEncodings,
StandardEncodingsMigrator, Transformer,
},
http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
},
Expand All @@ -41,27 +49,24 @@ impl SinkBatchSettings for PubsubDefaultBatchSettings {
const TIMEOUT_SECS: f64 = 1.0;
}

#[derive(Deserialize, Serialize, Debug, Clone, Default)]
#[serde(deny_unknown_fields)]
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct PubsubConfig {
pub project: String,
pub topic: String,
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default = "default_skip_authentication")]
pub skip_authentication: bool,
#[serde(flatten)]
#[serde(default, flatten)]
pub auth: GcpAuthConfig,

#[serde(default)]
pub batch: BatchConfig<PubsubDefaultBatchSettings>,
#[serde(default)]
pub request: TowerRequestConfig,
#[serde(
skip_serializing_if = "crate::serde::skip_serializing_if_default",
default
)]
pub encoding: EncodingConfigWithDefault<Encoding>,
encoding: EncodingConfigAdapter<EncodingConfig<StandardEncodings>, StandardEncodingsMigrator>,

#[serde(default)]
pub tls: Option<TlsConfig>,

#[serde(
Expand All @@ -76,19 +81,20 @@ const fn default_skip_authentication() -> bool {
false
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
#[serde(rename_all = "snake_case")]
#[derivative(Default)]
pub enum Encoding {
#[derivative(Default)]
Default,
}

inventory::submit! {
SinkDescription::new::<PubsubConfig>("gcp_pubsub")
}

impl_generate_config_from_default!(PubsubConfig);
impl GenerateConfig for PubsubConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
project = "my-project"
topic = "my-topic"
encoding.codec = "json"
"#})
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "gcp_pubsub")]
Expand Down Expand Up @@ -120,7 +126,7 @@ impl SinkConfig for PubsubConfig {
}

fn input(&self) -> Input {
Input::log()
Input::new(self.encoding.config().input_type())
}

fn sink_type(&self) -> &'static str {
Expand All @@ -136,7 +142,8 @@ struct PubsubSink {
api_key: Option<String>,
creds: Option<GcpCredentials>,
uri_base: String,
encoding: EncodingConfigWithDefault<Encoding>,
transformer: Transformer,
encoder: Encoder<()>,
}

impl PubsubSink {
Expand All @@ -157,11 +164,16 @@ impl PubsubSink {
uri_base, config.project, config.topic,
);

let transformer = config.encoding.transformer();
let serializer = config.encoding.encoding();
let encoder = Encoder::<()>::new(serializer);

Ok(Self {
api_key: config.auth.api_key.clone(),
encoding: config.encoding.clone(),
creds,
uri_base,
transformer,
encoder,
})
}

Expand All @@ -177,17 +189,19 @@ impl PubsubSink {
}

struct PubSubSinkEventEncoder {
encoding: EncodingConfigWithDefault<Encoding>,
transformer: Transformer,
encoder: Encoder<()>,
}

impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
fn encode_event(&mut self, mut event: Event) -> Option<Value> {
self.encoding.apply_rules(&mut event);
self.transformer.transform(&mut event);
let mut bytes = BytesMut::new();
// Errors are handled by `Encoder`.
self.encoder.encode(event, &mut bytes).ok()?;
// Each event needs to be base64 encoded, and put into a JSON object
// as the `data` item.
let log = event.into_log();
let json = serde_json::to_string(&log).unwrap();
Some(json!({ "data": base64::encode(&json) }))
Some(json!({ "data": base64::encode(&bytes) }))
}
}

Expand All @@ -199,7 +213,8 @@ impl HttpSink for PubsubSink {

fn build_encoder(&self) -> Self::Encoder {
PubSubSinkEventEncoder {
encoding: self.encoding.clone(),
transformer: self.transformer.clone(),
encoder: self.encoder.clone(),
}
}

Expand Down Expand Up @@ -249,6 +264,7 @@ mod tests {
let config: PubsubConfig = toml::from_str(indoc! {r#"
project = "project"
topic = "topic"
encoding.codec = "json"
"#})
.unwrap();
if config.build(SinkContext::new_test()).await.is_ok() {
Expand All @@ -274,11 +290,16 @@ mod integration_tests {

fn config(topic: &str) -> PubsubConfig {
PubsubConfig {
endpoint: Some(gcp::PUBSUB_ADDRESS.clone()),
skip_authentication: true,
project: PROJECT.into(),
topic: topic.into(),
..Default::default()
endpoint: Some(gcp::PUBSUB_ADDRESS.clone()),
skip_authentication: true,
auth: Default::default(),
batch: Default::default(),
request: Default::default(),
encoding: EncodingConfig::from(StandardEncodings::Json).into(),
tls: Default::default(),
acknowledgements: Default::default(),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
date: "2022-06-16"
title: "0.23 Upgrade Guide"
description: "An upgrade guide that addresses breaking changes in 0.23.0"
authors: ["akx", "jszwedko", "spencergilbert", "fuchsnj"]
authors: ["akx", "jszwedko", "spencergilbert", "fuchsnj", "pablosichert"]
release: "0.23.0"
hide_on_release_notes: false
badges:
Expand All @@ -14,6 +14,7 @@ Vector's 0.23.0 release includes **breaking changes**:
1. [The `.deb` package no longer enables and starts the Vector systemd service](#systemd-autostart)
2. [VRL type definition updates](#vrl-type-def)
3. ["remove_empty" option dropped from VRL's `parse_grok` and `parse_groks`](#vrl-parse_grok)
4. [`gcp_pubsub` sink requires setting "encoding" option](#sinks-mandatory-encoding)

We cover them below to help you upgrade quickly:

Expand Down Expand Up @@ -65,3 +66,14 @@ parsed = parse_grok!(.message, "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}
expected = { "timestamp": "", "level": "", "message": ""}
parsed = merge(expected, parsed)
```

#### [`gcp_pubsub` sink requires setting "encoding" option] {#sinks-mandatory-encoding}

The `gcp_pubsub` sink now supports a variety of codecs. To encode your logs as JSON before
publishing them to Cloud Pub/Sub, add the following encoding option

```toml
encoding.codec = "json"
```

to the config of your `gcp_pubsub` sink.
5 changes: 4 additions & 1 deletion website/cue/reference/components/sinks/gcp_pubsub.cue
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ components: sinks: gcp_pubsub: {
compression: enabled: false
encoding: {
enabled: true
codec: enabled: false
codec: {
enabled: true
enum: ["json", "text"]
}
}
proxy: enabled: true
request: {
Expand Down