feat: add a backlog-queue to the snuba consumers#7856
feat: add a backlog-queue to the snuba consumers#7856
Conversation
There was a problem hiding this comment.
debug target for the consumer in vscode, runs maturin to build rust as pre-step
There was a problem hiding this comment.
this my new arroyo strategy for the blq, see factory_v2 for usage
There was a problem hiding this comment.
here is where i steal the info about the dlq and use it for the blq. if there is a dlq it will be used for the blq.
There was a problem hiding this comment.
here I wire up our existing consumer pipeline to have the BLQ step at the beginning
There was a problem hiding this comment.
and here is my feature flag, thanks @kenzoengineer
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed:
ConcurrencyConfigruntime dropped before strategy uses it- Moved BLQ concurrency to a long-lived
blq_concurrencyfield onConsumerStrategyFactoryV2and used it when constructingProduceso the runtime outlives the strategy chain.
- Moved BLQ concurrency to a long-lived
- ✅ Fixed: Duplicated BLQ router setup across two methods
- Extracted the repeated BLQ feature-flag/producer/router block into a shared
wrap_with_blq_routerhelper used by both pipeline constructors.
- Extracted the repeated BLQ feature-flag/producer/router block into a shared
Or push these changes by commenting:
@cursor push 58d1d747f9
Preview (58d1d747f9)
diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs
--- a/rust_snuba/benches/processors.rs
+++ b/rust_snuba/benches/processors.rs
@@ -103,6 +103,7 @@
join_timeout_ms: None,
health_check: "arroyo".to_string(),
use_row_binary: false,
+ blq_concurrency: ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned()),
blq_producer_config: None,
blq_topic: None,
};
diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs
--- a/rust_snuba/src/consumer.rs
+++ b/rust_snuba/src/consumer.rs
@@ -285,6 +285,7 @@
join_timeout_ms,
health_check: health_check.to_string(),
use_row_binary,
+ blq_concurrency: ConcurrencyConfig::new(10),
blq_producer_config: dlq_producer_config.clone(),
blq_topic: dlq_topic,
};
diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs
--- a/rust_snuba/src/factory_v2.rs
+++ b/rust_snuba/src/factory_v2.rs
@@ -69,6 +69,7 @@
pub join_timeout_ms: Option<u64>,
pub health_check: String,
pub use_row_binary: bool,
+ pub blq_concurrency: ConcurrencyConfig,
pub blq_producer_config: Option<KafkaConfig>,
pub blq_topic: Option<Topic>,
}
@@ -267,43 +268,7 @@
Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))),
);
- let blq_enabled_flag = options("snuba")
- .ok()
- .and_then(|o| o.get("consumer.blq_enabled").ok())
- .and_then(|v| v.as_bool())
- .unwrap_or(false);
- let next_step: Box<dyn ProcessingStrategy<KafkaPayload>> =
- if let (true, Some(blq_producer_config), Some(blq_topic)) =
- (blq_enabled_flag, &self.blq_producer_config, self.blq_topic)
- {
- let stale_threshold = TimeDelta::minutes(30);
- let static_friction = TimeDelta::minutes(2);
- tracing::info!(
- "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
- stale_threshold,
- self.blq_topic,
- static_friction
- );
- let concurrency = ConcurrencyConfig::new(10);
- let blq_producer = Produce::new(
- CommitOffsets::new(Duration::from_millis(250)),
- KafkaProducer::new(blq_producer_config.clone()),
- &concurrency,
- TopicOrPartition::Topic(blq_topic),
- );
- Box::new(
- BLQRouter::new(
- next_step,
- blq_producer,
- stale_threshold,
- Some(static_friction),
- )
- .expect("invalid BLQRouter config"),
- )
- } else {
- tracing::info!("Not using a backlog-queue",);
- Box::new(next_step)
- };
+ let next_step = self.wrap_with_blq_router(Box::new(next_step));
if let Some(path) = &self.health_check_file {
{
@@ -324,6 +289,49 @@
}
impl ConsumerStrategyFactoryV2 {
+ fn wrap_with_blq_router(
+ &self,
+ next_step: Box<dyn ProcessingStrategy<KafkaPayload>>,
+ ) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
+ let blq_enabled_flag = options("snuba")
+ .ok()
+ .and_then(|o| o.get("consumer.blq_enabled").ok())
+ .and_then(|v| v.as_bool())
+ .unwrap_or(false);
+
+ if let (true, Some(blq_producer_config), Some(blq_topic)) =
+ (blq_enabled_flag, &self.blq_producer_config, self.blq_topic)
+ {
+ let stale_threshold = TimeDelta::minutes(30);
+ let static_friction = TimeDelta::minutes(2);
+ tracing::info!(
+ "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
+ stale_threshold,
+ self.blq_topic,
+ static_friction
+ );
+
+ let blq_producer = Produce::new(
+ CommitOffsets::new(Duration::from_millis(250)),
+ KafkaProducer::new(blq_producer_config.clone()),
+ &self.blq_concurrency,
+ TopicOrPartition::Topic(blq_topic),
+ );
+ Box::new(
+ BLQRouter::new(
+ next_step,
+ blq_producer,
+ stale_threshold,
+ Some(static_friction),
+ )
+ .expect("invalid BLQRouter config"),
+ )
+ } else {
+ tracing::info!("Not using a backlog-queue",);
+ next_step
+ }
+ }
+
fn create_row_binary_pipeline<
T: clickhouse::Row + serde::Serialize + Clone + Send + Sync + 'static,
>(
@@ -420,43 +428,7 @@
Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))),
);
- let blq_enabled_flag = options("snuba")
- .ok()
- .and_then(|o| o.get("consumer.blq_enabled").ok())
- .and_then(|v| v.as_bool())
- .unwrap_or(false);
- let next_step: Box<dyn ProcessingStrategy<KafkaPayload>> =
- if let (true, Some(blq_producer_config), Some(blq_topic)) =
- (blq_enabled_flag, &self.blq_producer_config, self.blq_topic)
- {
- let stale_threshold = TimeDelta::minutes(30);
- let static_friction = TimeDelta::minutes(2);
- tracing::info!(
- "Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
- stale_threshold,
- self.blq_topic,
- static_friction
- );
- let concurrency = ConcurrencyConfig::new(10);
- let blq_producer = Produce::new(
- CommitOffsets::new(Duration::from_millis(250)),
- KafkaProducer::new(blq_producer_config.clone()),
- &concurrency,
- TopicOrPartition::Topic(blq_topic),
- );
- Box::new(
- BLQRouter::new(
- next_step,
- blq_producer,
- stale_threshold,
- Some(static_friction),
- )
- .expect("invalid BLQRouter config"),
- )
- } else {
- tracing::info!("Not using a backlog-queue",);
- Box::new(next_step)
- };
+ let next_step = self.wrap_with_blq_router(Box::new(next_step));
if let Some(path) = &self.health_check_file {
if self.health_check == "snuba" {This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
kenzoengineer
left a comment
There was a problem hiding this comment.
- i'd like to see tests that validate behaviour when the option value is changed (using the thread local override guard)
- make sure that the codepath always includes
init_with_schemasfirst
829be7f to
62e9ff7
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
|
|
||
| // i know i shouldnt be blocking in submit but there was no better way to do it | ||
| // the pipeline cant make progress until this completes anyways so it should be fine | ||
| let flush_results = self.producer.join(Some(Duration::from_secs(5))).unwrap(); |
There was a problem hiding this comment.
Hardcoded 5s join timeout with unwrap risks crash loops
Medium Severity
When transitioning from RoutingStale to Flushing, self.producer.join(Some(Duration::from_secs(5))) is called with .unwrap(). If the Kafka broker is slow or unreachable, the 5-second timeout may expire, causing join() to return a StrategyError which .unwrap() converts into a panic with a generic error message. Under sustained broker latency this creates a tight crash loop: stale messages arrive → transition → timeout → panic → restart → repeat.



https://www.notion.so/sentry/project-snuba-backlog-queue-BLQ-3148b10e4b5d809ba443ed6b79606bc6
this adds a
BLQRouterstep to snuba consumers, if a message is fresh it gets passed along as usual, if a message is stale it is routed onto the backlog queue. We are using the DLQ as the backlog queue. It is guarded behind a feature flag that is off by default.todo: the following are todos I will ensure get done as follow ups before enabling