Skip to content

Commit 58d1d74

Browse files
committed
Fix BLQ concurrency lifetime and deduplicate router setup
1 parent 4e9ad1c commit 58d1d74

File tree

3 files changed

+48
-74
lines changed

3 files changed

+48
-74
lines changed

rust_snuba/benches/processors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ fn create_factory(
103103
join_timeout_ms: None,
104104
health_check: "arroyo".to_string(),
105105
use_row_binary: false,
106+
blq_concurrency: ConcurrencyConfig::with_runtime(concurrency, RUNTIME.handle().to_owned()),
106107
blq_producer_config: None,
107108
blq_topic: None,
108109
};

rust_snuba/src/consumer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ pub fn consumer_impl(
285285
join_timeout_ms,
286286
health_check: health_check.to_string(),
287287
use_row_binary,
288+
blq_concurrency: ConcurrencyConfig::new(10),
288289
blq_producer_config: dlq_producer_config.clone(),
289290
blq_topic: dlq_topic,
290291
};

rust_snuba/src/factory_v2.rs

Lines changed: 46 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub struct ConsumerStrategyFactoryV2 {
6969
pub join_timeout_ms: Option<u64>,
7070
pub health_check: String,
7171
pub use_row_binary: bool,
72+
pub blq_concurrency: ConcurrencyConfig,
7273
pub blq_producer_config: Option<KafkaConfig>,
7374
pub blq_topic: Option<Topic>,
7475
}
@@ -267,43 +268,7 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
267268
Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))),
268269
);
269270

270-
let blq_enabled_flag = options("snuba")
271-
.ok()
272-
.and_then(|o| o.get("consumer.blq_enabled").ok())
273-
.and_then(|v| v.as_bool())
274-
.unwrap_or(false);
275-
let next_step: Box<dyn ProcessingStrategy<KafkaPayload>> =
276-
if let (true, Some(blq_producer_config), Some(blq_topic)) =
277-
(blq_enabled_flag, &self.blq_producer_config, self.blq_topic)
278-
{
279-
let stale_threshold = TimeDelta::minutes(30);
280-
let static_friction = TimeDelta::minutes(2);
281-
tracing::info!(
282-
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
283-
stale_threshold,
284-
self.blq_topic,
285-
static_friction
286-
);
287-
let concurrency = ConcurrencyConfig::new(10);
288-
let blq_producer = Produce::new(
289-
CommitOffsets::new(Duration::from_millis(250)),
290-
KafkaProducer::new(blq_producer_config.clone()),
291-
&concurrency,
292-
TopicOrPartition::Topic(blq_topic),
293-
);
294-
Box::new(
295-
BLQRouter::new(
296-
next_step,
297-
blq_producer,
298-
stale_threshold,
299-
Some(static_friction),
300-
)
301-
.expect("invalid BLQRouter config"),
302-
)
303-
} else {
304-
tracing::info!("Not using a backlog-queue",);
305-
Box::new(next_step)
306-
};
271+
let next_step = self.wrap_with_blq_router(Box::new(next_step));
307272

308273
if let Some(path) = &self.health_check_file {
309274
{
@@ -324,6 +289,49 @@ impl ProcessingStrategyFactory<KafkaPayload> for ConsumerStrategyFactoryV2 {
324289
}
325290

326291
impl ConsumerStrategyFactoryV2 {
292+
fn wrap_with_blq_router(
293+
&self,
294+
next_step: Box<dyn ProcessingStrategy<KafkaPayload>>,
295+
) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
296+
let blq_enabled_flag = options("snuba")
297+
.ok()
298+
.and_then(|o| o.get("consumer.blq_enabled").ok())
299+
.and_then(|v| v.as_bool())
300+
.unwrap_or(false);
301+
302+
if let (true, Some(blq_producer_config), Some(blq_topic)) =
303+
(blq_enabled_flag, &self.blq_producer_config, self.blq_topic)
304+
{
305+
let stale_threshold = TimeDelta::minutes(30);
306+
let static_friction = TimeDelta::minutes(2);
307+
tracing::info!(
308+
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
309+
stale_threshold,
310+
self.blq_topic,
311+
static_friction
312+
);
313+
314+
let blq_producer = Produce::new(
315+
CommitOffsets::new(Duration::from_millis(250)),
316+
KafkaProducer::new(blq_producer_config.clone()),
317+
&self.blq_concurrency,
318+
TopicOrPartition::Topic(blq_topic),
319+
);
320+
Box::new(
321+
BLQRouter::new(
322+
next_step,
323+
blq_producer,
324+
stale_threshold,
325+
Some(static_friction),
326+
)
327+
.expect("invalid BLQRouter config"),
328+
)
329+
} else {
330+
tracing::info!("Not using a backlog-queue",);
331+
next_step
332+
}
333+
}
334+
327335
fn create_row_binary_pipeline<
328336
T: clickhouse::Row + serde::Serialize + Clone + Send + Sync + 'static,
329337
>(
@@ -420,43 +428,7 @@ impl ConsumerStrategyFactoryV2 {
420428
Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))),
421429
);
422430

423-
let blq_enabled_flag = options("snuba")
424-
.ok()
425-
.and_then(|o| o.get("consumer.blq_enabled").ok())
426-
.and_then(|v| v.as_bool())
427-
.unwrap_or(false);
428-
let next_step: Box<dyn ProcessingStrategy<KafkaPayload>> =
429-
if let (true, Some(blq_producer_config), Some(blq_topic)) =
430-
(blq_enabled_flag, &self.blq_producer_config, self.blq_topic)
431-
{
432-
let stale_threshold = TimeDelta::minutes(30);
433-
let static_friction = TimeDelta::minutes(2);
434-
tracing::info!(
435-
"Routing all messages older than {:?} to the topic {:?} with static_friction {:?}",
436-
stale_threshold,
437-
self.blq_topic,
438-
static_friction
439-
);
440-
let concurrency = ConcurrencyConfig::new(10);
441-
let blq_producer = Produce::new(
442-
CommitOffsets::new(Duration::from_millis(250)),
443-
KafkaProducer::new(blq_producer_config.clone()),
444-
&concurrency,
445-
TopicOrPartition::Topic(blq_topic),
446-
);
447-
Box::new(
448-
BLQRouter::new(
449-
next_step,
450-
blq_producer,
451-
stale_threshold,
452-
Some(static_friction),
453-
)
454-
.expect("invalid BLQRouter config"),
455-
)
456-
} else {
457-
tracing::info!("Not using a backlog-queue",);
458-
Box::new(next_step)
459-
};
431+
let next_step = self.wrap_with_blq_router(Box::new(next_step));
460432

461433
if let Some(path) = &self.health_check_file {
462434
if self.health_check == "snuba" {

0 commit comments

Comments
 (0)