diff --git a/.vscode/launch.json b/.vscode/launch.json index 3525943710..32ecfca6fd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -33,13 +33,13 @@ "--no-strict-offset-reset", "--use-rust-processor", "--enforce-schema", - "--blq-stale-age-mins=30" ], "cwd": "${workspaceFolder}", "sourceLanguages": ["rust"], "env": { "DOGSTATSD_HOST": "localhost" - } + }, + "preLaunchTask": "uvx maturin develop" }, // will only debug the python (first half) { @@ -55,7 +55,6 @@ "--no-strict-offset-reset", "--use-rust-processor", "--enforce-schema", - "--blq-stale-age-mins=30" ], "cwd": "${workspaceFolder}", "justMyCode": false diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000000..b2df099987 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,6 @@ +{ + "version": "2.0.0", + "tasks": [ + {"label": "uvx maturin develop", "type": "shell", "command": "uvx maturin develop", "options": {"cwd": "${workspaceFolder}/rust_snuba"}} + ] +} diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index f9ee3d4c27..8f6793224b 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -104,6 +104,8 @@ fn create_factory( join_timeout_ms: None, health_check: "arroyo".to_string(), use_row_binary: false, + blq_producer_config: None, + blq_topic: None, }; Box::new(factory) } diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ae8c5b22b6..2d70c05737 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -192,10 +192,21 @@ pub fn consumer_impl( // DLQ policy applies only if we are not skipping writes, otherwise we don't want to be // writing to the DLQ topics in prod. + + let dlq_producer_config = consumer_config.dlq_topic.as_ref().map(|dlq_topic_config| { + KafkaConfig::new_producer_config(vec![], Some(dlq_topic_config.broker_config.clone())) + }); + + let dlq_topic = consumer_config + .dlq_topic + .as_ref() + .map(|dlq_topic_config| Topic::new(&dlq_topic_config.physical_topic_name)); + let dlq_policy = consumer_config.dlq_topic.map(|dlq_topic_config| { - let producer_config = - KafkaConfig::new_producer_config(vec![], Some(dlq_topic_config.broker_config)); - let producer = KafkaProducer::new(producer_config); + let producer = KafkaProducer::new(KafkaConfig::new_producer_config( + vec![], + Some(dlq_topic_config.broker_config), + )); let kafka_dlq_producer = Box::new(KafkaDlqProducer::new( producer, @@ -276,6 +287,8 @@ pub fn consumer_impl( join_timeout_ms, health_check: health_check.to_string(), use_row_binary, + blq_producer_config: dlq_producer_config.clone(), + blq_topic: dlq_topic, }; let processor = StreamProcessor::with_kafka(config, factory, topic, dlq_policy); diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 91758b4422..620d48fb81 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -2,6 +2,9 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use sentry_options::options; + +use chrono::TimeDelta; use sentry::{Hub, SentryFutureExt}; use sentry_arroyo::backends::kafka::config::KafkaConfig; use sentry_arroyo::backends::kafka::producer::KafkaProducer; @@ -24,6 +27,8 @@ use crate::config; use crate::metrics::global_tags::set_global_tag; use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; + +use crate::strategies::blq_router::BLQRouter; use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; use crate::strategies::commit_log::ProduceCommitLog; @@ -37,6 +42,10 @@ use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch}; +// BLQ configuration +const BLQ_STALE_THRESHOLD: TimeDelta = TimeDelta::minutes(30); +const BLQ_STATIC_FRICTION: Option = Some(TimeDelta::minutes(2)); + pub struct ConsumerStrategyFactoryV2 { pub storage_config: config::StorageConfig, pub env_config: config::EnvConfig, @@ -63,6 +72,8 @@ pub struct ConsumerStrategyFactoryV2 { pub join_timeout_ms: Option, pub health_check: String, pub use_row_binary: bool, + pub blq_producer_config: Option, + pub blq_topic: Option, } impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { @@ -87,6 +98,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { fn create(&self) -> Box> { if self.use_row_binary { + tracing::info!("Using row_binary pipeline"); return match self .storage_config .message_processor @@ -263,6 +275,34 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { next_step, Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); + + let next_step: Box> = + if let (true, Some(blq_producer_config), Some(blq_topic)) = ( + self.should_use_blq(), + &self.blq_producer_config, + self.blq_topic, + ) { + tracing::info!( + "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", + BLQ_STALE_THRESHOLD, + self.blq_topic, + BLQ_STATIC_FRICTION + ); + Box::new( + BLQRouter::new( + next_step, + blq_producer_config.clone(), + blq_topic, + BLQ_STALE_THRESHOLD, + BLQ_STATIC_FRICTION, + ) + .expect("invalid BLQRouter config"), + ) + } else { + tracing::info!("Not using a backlog-queue",); + Box::new(next_step) + }; + if let Some(path) = &self.health_check_file { { if self.health_check == "snuba" { @@ -282,6 +322,15 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } impl ConsumerStrategyFactoryV2 { + fn should_use_blq(&self) -> bool { + let flag = options("snuba") + .ok() + .and_then(|o| o.get("consumer.blq_enabled").ok()) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + flag && self.blq_producer_config.is_some() && self.blq_topic.is_some() + } + fn create_row_binary_pipeline< T: clickhouse::Row + serde::Serialize @@ -390,6 +439,33 @@ impl ConsumerStrategyFactoryV2 { Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), ); + let next_step: Box> = + if let (true, Some(blq_producer_config), Some(blq_topic)) = ( + self.should_use_blq(), + &self.blq_producer_config, + self.blq_topic, + ) { + tracing::info!( + "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}", + BLQ_STALE_THRESHOLD, + self.blq_topic, + BLQ_STATIC_FRICTION, + ); + Box::new( + BLQRouter::new( + next_step, + blq_producer_config.clone(), + blq_topic, + BLQ_STALE_THRESHOLD, + BLQ_STATIC_FRICTION, + ) + .expect("invalid BLQRouter config"), + ) + } else { + tracing::info!("Not using a backlog-queue",); + Box::new(next_step) + }; + if let Some(path) = &self.health_check_file { if self.health_check == "snuba" { tracing::info!( @@ -435,12 +511,18 @@ impl TaskRunner for SchemaValidator { #[cfg(test)] mod tests { use super::*; + use sentry_arroyo::backends::kafka::config::KafkaConfig; use sentry_arroyo::processing::strategies::{ CommitRequest, ProcessingStrategy, StrategyError, SubmitError, }; use sentry_arroyo::types::{BrokerMessage, InnerMessage, Partition, Topic}; + use sentry_options::init_with_schemas; + use sentry_options::testing::override_options; + use serde_json::json; + use std::sync::Once; use std::sync::{Arc, Mutex}; + // ----------- BYTES_INSERT_BATCH ------------------ /// A next-step that records every batch it receives. struct RecordingStep { batches: Arc>>>, @@ -666,4 +748,109 @@ mod tests { assert_eq!(batches[0].len(), 5); assert_eq!(batches[0].num_bytes(), 200_000); // 5 * 40KB accumulated but didn't trigger flush } + + // --------- BLQ ------------- + fn make_factory( + blq_producer_config: Option, + blq_topic: Option, + ) -> ConsumerStrategyFactoryV2 { + ConsumerStrategyFactoryV2 { + storage_config: config::StorageConfig { + name: "test".to_string(), + clickhouse_table_name: "test".to_string(), + clickhouse_cluster: config::ClickhouseConfig { + host: "localhost".to_string(), + port: 9000, + secure: false, + http_port: 8123, + user: "default".to_string(), + password: "".to_string(), + database: "default".to_string(), + }, + message_processor: config::MessageProcessorConfig { + python_class_name: "Test".to_string(), + python_module: "test".to_string(), + }, + }, + env_config: config::EnvConfig::default(), + logical_topic_name: "test".to_string(), + max_batch_size: 100, + max_batch_time: Duration::from_secs(1), + processing_concurrency: ConcurrencyConfig::new(1), + clickhouse_concurrency: ConcurrencyConfig::new(1), + commitlog_concurrency: ConcurrencyConfig::new(1), + replacements_concurrency: ConcurrencyConfig::new(1), + async_inserts: false, + python_max_queue_depth: None, + use_rust_processor: false, + health_check_file: None, + enforce_schema: false, + commit_log_producer: None, + replacements_config: None, + physical_consumer_group: "test".to_string(), + physical_topic_name: Topic::new("test"), + accountant_topic_config: config::TopicConfig { + physical_topic_name: "test".to_string(), + logical_topic_name: "test".to_string(), + broker_config: HashMap::new(), + quantized_rebalance_consumer_group_delay_secs: None, + }, + stop_at_timestamp: None, + batch_write_timeout: None, + join_timeout_ms: None, + health_check: "arroyo".to_string(), + use_row_binary: false, + blq_producer_config, + blq_topic, + max_batch_size_calculation: config::BatchSizeCalculation::Rows, + } + } + + static INIT: Once = Once::new(); + fn init_config() { + INIT.call_once(|| init_with_schemas(&[("snuba", crate::SNUBA_SCHEMA)]).unwrap()); + } + + fn blq_kafka_config() -> KafkaConfig { + KafkaConfig::new_config(vec!["localhost:9092".to_string()], None) + } + + #[test] + fn test_should_not_use_blq_when_no_flag() { + init_config(); + let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_not_use_blq_when_flag_disabled() { + init_config(); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(false))]).unwrap(); + let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_not_use_blq_when_no_producer_config() { + init_config(); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap(); + let factory = make_factory(None, Some(Topic::new("blq"))); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_not_use_blq_when_no_topic() { + init_config(); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap(); + let factory = make_factory(Some(blq_kafka_config()), None); + assert!(!factory.should_use_blq()); + } + + #[test] + fn test_should_use_blq_when_all_conditions_met() { + init_config(); + let _guard = override_options(&[("snuba", "consumer.blq_enabled", json!(true))]).unwrap(); + let factory = make_factory(Some(blq_kafka_config()), Some(Topic::new("blq"))); + assert!(factory.should_use_blq()); + } } diff --git a/rust_snuba/src/strategies/blq_router.rs b/rust_snuba/src/strategies/blq_router.rs new file mode 100644 index 0000000000..be309a79f5 --- /dev/null +++ b/rust_snuba/src/strategies/blq_router.rs @@ -0,0 +1,373 @@ +//! # BLQ Router +//! +//! BLQ Router is an arroyo strategy that re-directs stale messages (by timestamp) to a configured backlog-queue topic. +//! Non-stale messages will be passively forwarded along to the next step in the arroyo strategy pipeline. +//! +//! ## Implementation +//! its essentially a FSM +//! +//! ```text +//! > forward to next step +//! ┌fresh─┐ +//! │ ▼ +//! ┌──┴─────────┐ +//! ┌fresh─►│ Forwarding ├──stale──► PANIC +//! │ └────────────┘ +//! │ ▲ +//! ┌──────┐ │ │ +//! ────►│ Idle ├─┤ fresh +//! └──────┘ │ │ +//! │ ┌──────┴───────┐ +//! └stale─►│ RoutingStale │ +//! └─┬────────────┘ +//! │ ▲ +//! └─stale──┘ +//! > redirect to blq +//! ``` +//! +//! the reason for the panic is that there may be accumulated data downstream that needs to be flushed before we start +//! redirecting to backlog and committing those messages. The most reliable way to do this is crashing the consumer, +//! when it comes back alive the first messages it gets will be stale so it will go straight from idle to RoutingStale. + +use std::time::Duration; + +use chrono::{TimeDelta, Utc}; +use sentry_arroyo::backends::kafka::config::KafkaConfig; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; +use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::strategies::commit_offsets::CommitOffsets; +use sentry_arroyo::processing::strategies::produce::Produce; +use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; +use sentry_arroyo::processing::strategies::{ + CommitRequest, ProcessingStrategy, StrategyError, SubmitError, +}; +use sentry_arroyo::types::{Message, Topic, TopicOrPartition}; + +#[derive(Debug, PartialEq)] +enum State { + Idle, // no messages have gone through the router yet + RoutingStale, // router is directing stale messages to the backlog-queue (BLQ) + // we have processed all stale messages and are now flushing (finishing producing to BLQ) + // when we transition to this state we will have CommitRequest for what was flushed, and poll + // will be responsible for returning it + Flushing(Option), + Forwarding, // router is forwarding non-stale messages along to the next strategy +} + +pub struct BLQRouter { + next_step: Next, + stale_threshold: TimeDelta, + state: State, + producer: ProduceStrategy, + static_friction: Option, + + // We have to keep this around ourself bc strategies::produce::Produce didn't define their lifetimes well + _concurrency: Option, +} + +impl BLQRouter> +where + Next: ProcessingStrategy + 'static, +{ + /// next_step, + /// is where fresh messages get forwarded to + /// stale_threshold, + /// messages older than the stale_threshold will get sent to the producer + /// static_friction, + /// Once we enter stale routing mode (at STALE_THRESHOLD), + /// we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. + /// This is because we want a higher threshold to enter the stale routing state + /// but a lower threshold to stay in it, so we don't flip-flop at the boundary. + /// Best practice would be no greater than a small percent of stable_threshold like 10% + /// ex: stale_threshold=10m, static_friction=1m + /// and the implication is 9m old messages will now be sent to the BLQ in some cases + pub fn new( + next_step: Next, + blq_producer_config: KafkaConfig, + blq_topic: Topic, + stale_threshold: TimeDelta, + static_friction: Option, + ) -> Result { + let concurrency = ConcurrencyConfig::new(10); + let blq_producer = Produce::new( + CommitOffsets::new(Duration::from_millis(250)), + KafkaProducer::new(blq_producer_config), + &concurrency, + TopicOrPartition::Topic(blq_topic), + ); + let mut router = + Self::new_with_strategy(next_step, blq_producer, stale_threshold, static_friction)?; + router._concurrency = Some(concurrency); + Ok(router) + } +} + +impl BLQRouter +where + Next: ProcessingStrategy + 'static, + ProduceStrategy: ProcessingStrategy + 'static, +{ + /// next_step, + /// is where fresh messages get forwarded to + /// producer, + /// ProcessingStrategy that submits messages to the BLQ, + /// stale messages will get submitted to it. + /// stale_threshold, + /// messages older than the stale_threshold will get sent to the producer + /// static_friction, + /// Once we enter stale routing mode (at STALE_THRESHOLD), + /// we keep routing messages that are at least (STALE_THRESHOLD - STATIC_FRICTION_SECS) seconds old. + /// This is because we want a higher threshold to enter the stale routing state + /// but a lower threshold to stay in it, so we don't flip-flop at the boundary. + /// Best practice would be no greater than a small percent of stable_threshold like 10% + /// ex: stale_threshold=10m, static_friction=1m + /// and the implication is 9m old messages will now be sent to the BLQ in some cases + fn new_with_strategy( + next_step: Next, + blq_producer: ProduceStrategy, + stale_threshold: TimeDelta, + static_friction: Option, + ) -> Result { + if stale_threshold <= TimeDelta::zero() { + return Err("stale_threshold must be positive"); + } + if let Some(friction) = static_friction { + if friction >= stale_threshold { + return Err("static_friction must be less than stale_threshold"); + } + } + Ok(Self { + next_step, + stale_threshold, + state: State::Idle, + producer: blq_producer, + static_friction, + _concurrency: None, + }) + } +} + +impl ProcessingStrategy for BLQRouter +where + Next: ProcessingStrategy + 'static, + ProduceStrategy: ProcessingStrategy + 'static, +{ + fn poll(&mut self) -> Result, StrategyError> { + let produce_result = self.producer.poll(); + let next_step_result = self.next_step.poll(); + match &mut self.state { + State::RoutingStale => produce_result, + State::Forwarding | State::Idle => next_step_result, + State::Flushing(commits) => { + let commits = commits.take(); + self.state = State::Forwarding; + Ok(commits) + } + } + } + + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + let msg_ts = message + .timestamp() + .expect("Expected kafka message to always have a timestamp, but there wasn't one"); + let elapsed = Utc::now() - msg_ts; + + let threshold = match (&self.state, self.static_friction) { + (State::RoutingStale, Some(friction)) => self.stale_threshold - friction, + _ => self.stale_threshold, + }; + let is_stale = elapsed > threshold; + match (is_stale, &self.state) { + (true, State::Forwarding) => { + // When we transition from Forwarding to RoutingStale, there may be + // state in memory held downstream. We crash the consumer to get rid of internal state + // when it restarts it will have no internal state (State::Empty) and the first message in + // the topic will be stale. + panic!("Resetting consumer state to begin processing the stale backlog") + } + (true, State::Idle) | (true, State::RoutingStale) => { + // route the stale message to the BLQ + if self.state == State::Idle { + self.state = State::RoutingStale; + } + self.producer.submit(message) + } + (false, State::Idle) | (false, State::Forwarding) => { + // Forward the fresh message along to the next step + if self.state == State::Idle { + self.state = State::Forwarding; + } + self.next_step.submit(message) + } + (false, State::RoutingStale) => { + // We hit a fresh message, so we are done routing the backlog. + // Finish producing and committing all the state messages and + // then switch back to forwarding fresh. + + // i know i shouldnt be blocking in submit but there was no better way to do it + // the pipeline cant make progress until this completes anyways so it should be fine + let flush_results = self.producer.join(Some(Duration::from_secs(5))).unwrap(); + self.state = State::Flushing(flush_results); + Err(SubmitError::MessageRejected( + sentry_arroyo::processing::strategies::MessageRejected { message }, + )) + } + (true, State::Flushing(_)) | (false, State::Flushing(_)) => { + Err(SubmitError::MessageRejected( + sentry_arroyo::processing::strategies::MessageRejected { message }, + )) + } + } + } + + fn terminate(&mut self) { + self.producer.terminate(); + self.next_step.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + let producer_result = self.producer.join(timeout); + let next_step_result = self.next_step.join(timeout); + match &self.state { + State::RoutingStale => producer_result, + State::Forwarding | State::Idle => next_step_result, + State::Flushing(commits) => Ok(commits.clone()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::DateTime; + use sentry_arroyo::types::{Partition, Topic}; + + struct MockStrategy { + submitted: Vec>, + join_called: bool, + terminate_called: bool, + } + + impl MockStrategy { + fn new() -> Self { + Self { + submitted: vec![], + join_called: false, + terminate_called: false, + } + } + } + + impl ProcessingStrategy for MockStrategy { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + + fn submit( + &mut self, + message: Message, + ) -> Result<(), SubmitError> { + self.submitted.push(message); + Ok(()) + } + + fn terminate(&mut self) { + self.terminate_called = true; + } + + fn join( + &mut self, + _timeout: Option, + ) -> Result, StrategyError> { + self.join_called = true; + Ok(None) + } + } + + fn make_message(timestamp: DateTime) -> Message { + Message::new_broker_message( + KafkaPayload::new(None, None, Some(b"test".to_vec())), + Partition::new(Topic::new("test"), 0), + 0, + timestamp, + ) + } + + #[test] + #[should_panic(expected = "Resetting consumer state to begin processing the stale backlog")] + fn test_fresh_to_stale() { + /* + This tests that the BLQRouter forwards business-as-usual fresh messages through it + and crashes when it hits its first stale message + */ + let mut router = BLQRouter::new_with_strategy( + MockStrategy::new(), + MockStrategy::new(), + TimeDelta::seconds(10), + None, + ) + .unwrap(); + // consuming messages as normal + for _ in 0..10 { + router.submit(make_message(Utc::now())).unwrap(); + _ = router.poll(); + } + assert_eq!(router.state, State::Forwarding); + // now theres a stale message, consumer should crash + _ = router.submit(make_message(Utc::now() - TimeDelta::seconds(20))); + } + + fn submit_with_retry( + router: &mut BLQRouter, + message: Message, + max_retries: usize, + ) -> Result<(), SubmitError> { + let mut msg = message; + for _ in 0..max_retries { + match router.submit(msg) { + Ok(()) => return Ok(()), + Err(SubmitError::MessageRejected(rejected)) => { + _ = router.poll(); + msg = rejected.message; + } + Err(e) => return Err(e), + } + } + Err(SubmitError::MessageRejected( + sentry_arroyo::processing::strategies::MessageRejected { message: msg }, + )) + } + + #[test] + fn test_stale_to_fresh() { + /* + This tests that the BLQRouter properly routes stale messages to the BLQ + and then switches back to forwarding fresh messages once the backlog is burned + */ + let mut router = BLQRouter::new_with_strategy( + MockStrategy::new(), + MockStrategy::new(), + TimeDelta::seconds(10), + Some(TimeDelta::seconds(1)), + ) + .unwrap(); + // backlog of 10 stale messages + for _ in 0..10 { + router + .submit(make_message(Utc::now() - TimeDelta::minutes(1))) + .unwrap(); + _ = router.poll(); + } + assert_eq!(router.state, State::RoutingStale); + assert!(!router.producer.join_called); + // now we are back to fresh messages + for _ in 0..5 { + submit_with_retry(&mut router, make_message(Utc::now()), 3).unwrap(); + _ = router.poll(); + } + assert_eq!(router.state, State::Forwarding); + assert!(router.producer.join_called); + assert_eq!(router.producer.submitted.len(), 10); + assert_eq!(router.next_step.submitted.len(), 5); + } +} diff --git a/rust_snuba/src/strategies/mod.rs b/rust_snuba/src/strategies/mod.rs index 77b8776314..3ac88c5b28 100644 --- a/rust_snuba/src/strategies/mod.rs +++ b/rust_snuba/src/strategies/mod.rs @@ -1,5 +1,6 @@ pub mod accepted_outcomes; pub mod accountant; +pub mod blq_router; pub mod clickhouse; pub mod commit_log; pub mod healthcheck; diff --git a/sentry-options/schemas/snuba/schema.json b/sentry-options/schemas/snuba/schema.json index c5c5bbf2e2..5ae70d23a1 100644 --- a/sentry-options/schemas/snuba/schema.json +++ b/sentry-options/schemas/snuba/schema.json @@ -6,6 +6,11 @@ "type": "boolean", "default": false, "description": "true to use the item timestamp, false for the received timestamp" + }, + "consumer.blq_enabled": { + "type": "boolean", + "default": false, + "description": "enable backlog queue in snuba consumers" } } }