Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .vscode/launch.json
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug target for the consumer in vscode, runs maturin to build rust as pre-step

Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
"--no-strict-offset-reset",
"--use-rust-processor",
"--enforce-schema",
"--blq-stale-age-mins=30"
],
"cwd": "${workspaceFolder}",
"sourceLanguages": ["rust"],
"env": {
"DOGSTATSD_HOST": "localhost"
}
},
"preLaunchTask": "uvx maturin develop"
},
// will only debug the python (first half)
{
Expand All @@ -55,7 +55,6 @@
"--no-strict-offset-reset",
"--use-rust-processor",
"--enforce-schema",
"--blq-stale-age-mins=30"
],
"cwd": "${workspaceFolder}",
"justMyCode": false
Expand Down
6 changes: 6 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"version": "2.0.0",
"tasks": [
{"label": "uvx maturin develop", "type": "shell", "command": "uvx maturin develop", "options": {"cwd": "${workspaceFolder}/rust_snuba"}}
]
}
2 changes: 2 additions & 0 deletions rust_snuba/benches/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ fn create_factory(
join_timeout_ms: None,
health_check: "arroyo".to_string(),
use_row_binary: false,
blq_producer_config: None,
blq_topic: None,
};
Box::new(factory)
}
Expand Down
19 changes: 16 additions & 3 deletions rust_snuba/src/consumer.rs
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is where i steal the info about the dlq and use it for the blq. if there is a dlq it will be used for the blq.

Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,21 @@ pub fn consumer_impl(

// DLQ policy applies only if we are not skipping writes, otherwise we don't want to be
// writing to the DLQ topics in prod.

let dlq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| {
KafkaConfig::new_producer_config(vec![], Some(dlq_topic_config.broker_config.clone()))
});

let dlq_topic = consumer_config
.dlq_topic
.as_ref()
.map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name));

let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| {
let producer_config =
KafkaConfig::new_producer_config(vec![], Some(dlq_topic_config.broker_config));
let producer = KafkaProducer::new(producer_config);
let producer = KafkaProducer::new(KafkaConfig::new_producer_config(
vec![],
Some(dlq_topic_config.broker_config),
));

let kafka_dlq_producer = Box::new(KafkaDlqProducer::new(
producer,
Expand Down Expand Up @@ -276,6 +287,8 @@ pub fn consumer_impl(
join_timeout_ms,
health_check: health_check.to_string(),
use_row_binary,
blq_producer_config: dlq_producer_config.clone(),
blq_topic: dlq_topic,
};

let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy);
Expand Down
187 changes: 187 additions & 0 deletions rust_snuba/src/factory_v2.rs
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I wire up our existing consumer pipeline to have the BLQ step at the beginning

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use sentry_options::options;

use chrono::TimeDelta;
use sentry::{Hub, SentryFutureExt};
use sentry_arroyo::backends::kafka::config::KafkaConfig;
use sentry_arroyo::backends::kafka::producer::KafkaProducer;
Expand All @@ -24,6 +27,8 @@ use crate::config;
use crate::metrics::global_tags::set_global_tag;
use crate::processors::{self, get_cogs_label};
use crate::strategies::accountant::RecordCogs;

use crate::strategies::blq_router::BLQRouter;
use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep;
use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep;
use crate::strategies::commit_log::ProduceCommitLog;
Expand All @@ -37,6 +42,10 @@ use crate::strategies::python::PythonTransformStep;
use crate::strategies::replacements::ProduceReplacements;
use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch};

// BLQ configuration
const BLQ_STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30);
const BLQ_STATIC_FRICTION: Option<TimeDelta> = Some(TimeDelta::minutes(2));

pub struct ConsumerStrategyFactoryV2 {
pub storage_config: config::StorageConfig,
pub env_config: config::EnvConfig,
Expand All @@ -63,6 +72,8 @@ pub struct ConsumerStrategyFactoryV2 {
pub join_timeout_ms: Option<u64>,
pub health_check: String,
pub use_row_binary: bool,
pub blq_producer_config: Option<KafkaConfig>,
pub blq_topic: Option<Topic>,
}

impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
Expand All @@ -87,6 +98,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {

fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
if self.use_row_binary {
tracing::info!("Using row_binary pipeline");
return match self
.storage_config
.message_processor
Expand Down Expand Up @@ -263,6 +275,34 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
next_step,
Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))),
);

let next_step: Box<dyn ProcessingStrategy<KafkaPayload>> =
if let (true, Some(blq_producer_config), Some(blq_topic)) = (
self.should_use_blq(),
&self.blq_producer_config,
self.blq_topic,
) {
tracing::info!(
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
BLQ_STALE_THRESHOLD,
self.blq_topic,
BLQ_STATIC_FRICTION
);
Box::new(
BLQRouter::new(
next_step,
blq_producer_config.clone(),
blq_topic,
BLQ_STALE_THRESHOLD,
BLQ_STATIC_FRICTION,
)
.expect("invalid BLQRouter config"),
)
} else {
tracing::info!("Not using a backlog-queue",);
Box::new(next_step)
};

if let Some(path) = &self.health_check_file {
{
if self.health_check == "snuba" {
Expand All @@ -282,6 +322,15 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
}

impl ConsumerStrategyFactoryV2 {
fn should_use_blq(&self) -> bool {
let flag = options("snuba")
.ok()
.and_then(|o| o.get("consumer.blq_enabled").ok())
.and_then(|v| v.as_bool())
.unwrap_or(false);
flag && self.blq_producer_config.is_some() && self.blq_topic.is_some()
}

fn create_row_binary_pipeline<
T: clickhouse::Row
+ serde::Serialize
Expand Down Expand Up @@ -390,6 +439,33 @@ impl ConsumerStrategyFactoryV2 {
Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))),
);

let next_step: Box<dyn ProcessingStrategy<KafkaPayload>> =
if let (true, Some(blq_producer_config), Some(blq_topic)) = (
self.should_use_blq(),
&self.blq_producer_config,
self.blq_topic,
) {
tracing::info!(
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
BLQ_STALE_THRESHOLD,
self.blq_topic,
BLQ_STATIC_FRICTION,
);
Box::new(
BLQRouter::new(
next_step,
blq_producer_config.clone(),
blq_topic,
BLQ_STALE_THRESHOLD,
BLQ_STATIC_FRICTION,
)
.expect("invalid BLQRouter config"),
)
} else {
tracing::info!("Not using a backlog-queue",);
Box::new(next_step)
};

if let Some(path) = &self.health_check_file {
if self.health_check == "snuba" {
tracing::info!(
Expand Down Expand Up @@ -435,12 +511,18 @@ impl TaskRunner<KafkaPayload, KafkaPayload, anyhow::Error> for SchemaValidator {
#[cfg(test)]
mod tests {
use super::*;
use sentry_arroyo::backends::kafka::config::KafkaConfig;
use sentry_arroyo::processing::strategies::{
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
};
use sentry_arroyo::types::{BrokerMessage, InnerMessage, Partition, Topic};
use sentry_options::init_with_schemas;
use sentry_options::testing::set_override;
use serde_json::json;
use std::sync::Once;
use std::sync::{Arc, Mutex};

// ----------- BYTES_INSERT_BATCH ------------------
/// A next-step that records every batch it receives.
struct RecordingStep {
batches: Arc<Mutex<Vec<BytesInsertBatch<RowData>>>>,
Expand Down Expand Up @@ -666,4 +748,109 @@ mod tests {
assert_eq!(batches[0].len(), 5);
assert_eq!(batches[0].num_bytes(), 200_000); // 5 * 40KB accumulated but didn't trigger flush
}

// --------- BLQ -------------
fn make_factory(
blq_producer_config: Option<KafkaConfig>,
blq_topic: Option<Topic>,
) -> ConsumerStrategyFactoryV2 {
ConsumerStrategyFactoryV2 {
storage_config: config::StorageConfig {
name: "test".to_string(),
clickhouse_table_name: "test".to_string(),
clickhouse_cluster: config::ClickhouseConfig {
host: "localhost".to_string(),
port: 9000,
secure: false,
http_port: 8123,
user: "default".to_string(),
password: "".to_string(),
database: "default".to_string(),
},
message_processor: config::MessageProcessorConfig {
python_class_name: "Test".to_string(),
python_module: "test".to_string(),
},
},
env_config: config::EnvConfig::default(),
logical_topic_name: "test".to_string(),
max_batch_size: 100,
max_batch_time: Duration::from_secs(1),
processing_concurrency: ConcurrencyConfig::new(1),
clickhouse_concurrency: ConcurrencyConfig::new(1),
commitlog_concurrency: ConcurrencyConfig::new(1),
replacements_concurrency: ConcurrencyConfig::new(1),
async_inserts: false,
python_max_queue_depth: None,
use_rust_processor: false,
health_check_file: None,
enforce_schema: false,
commit_log_producer: None,
replacements_config: None,
physical_consumer_group: "test".to_string(),
physical_topic_name: Topic::new("test"),
accountant_topic_config: config::TopicConfig {
physical_topic_name: "test".to_string(),
logical_topic_name: "test".to_string(),
broker_config: HashMap::new(),
quantized_rebalance_consumer_group_delay_secs: None,
},
stop_at_timestamp: None,
batch_write_timeout: None,
join_timeout_ms: None,
health_check: "arroyo".to_string(),
use_row_binary: false,
blq_producer_config,
blq_topic,
max_batch_size_calculation: config::BatchSizeCalculation::Rows,
}
}

static INIT: Once = Once::new();
fn init_config() {
INIT.call_once(|| init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap());
}

fn blq_kafka_config() -> KafkaConfig {
KafkaConfig::new_config(vec!["localhost:9092".to_string()], None)
}

#[test]
fn test_should_not_use_blq_when_no_flag() {
init_config();
let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq")));
assert!(!factory.should_use_blq());
}

#[test]
fn test_should_not_use_blq_when_flag_disabled() {
init_config();
let _guard = set_override("snuba", "consumer.blq_enabled", json!(false));
let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq")));
assert!(!factory.should_use_blq());
}

#[test]
fn test_should_not_use_blq_when_no_producer_config() {
init_config();
let _guard = set_override("snuba", "consumer.blq_enabled", json!(true));
let factory = make_factory(None, Some(Topic::new("blq")));
assert!(!factory.should_use_blq());
}

#[test]
fn test_should_not_use_blq_when_no_topic() {
init_config();
let _guard = set_override("snuba", "consumer.blq_enabled", json!(true));
let factory = make_factory(Some(blq_kafka_config()), None);
assert!(!factory.should_use_blq());
}

#[test]
fn test_should_use_blq_when_all_conditions_met() {
init_config();
let _guard = set_override("snuba", "consumer.blq_enabled", json!(true));
let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq")));
assert!(factory.should_use_blq());
}
}
Loading
Loading