Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 88 additions & 3 deletions rust_snuba/src/strategies/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn make_rust_processor(
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
partition.index,
CommitLogEntry {
offset,
offset: offset + 1,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to confirm two things with EAP:

  1. Were these offset semantics intended behavior for any reason?
  2. Are there any consumers of Snuba commit log entries other than the SynchronizedConsumer implementation (post-process forwarder consumers)? If so and they rely on the current offset semantics, we may have to rethink our approach here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For (2), I believe the only other consumer of the Snuba commit log is the subscription consumer. It looks to me like the subscription consumer cares only about offset intervals (not absolute values), so I think it should be unaffected by the change in this PR (except for maybe immediately after rollout?).

orig_message_ts: timestamp,
received_p99: transformed.origin_timestamp.into_iter().collect(),
},
Expand Down Expand Up @@ -131,7 +131,7 @@ pub fn make_rust_processor_with_replacements(
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
partition.index,
CommitLogEntry {
offset,
offset: offset + 1,
orig_message_ts: timestamp,
received_p99: transformed.origin_timestamp.into_iter().collect(),
},
Expand Down Expand Up @@ -214,7 +214,7 @@ pub fn make_rust_processor_row_binary<T: Clone + Send + Sync + 'static>(
.with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([(
partition.index,
CommitLogEntry {
offset,
offset: offset + 1,
orig_message_ts: timestamp,
received_p99: transformed.origin_timestamp.into_iter().collect(),
},
Expand Down Expand Up @@ -474,7 +474,15 @@ mod tests {
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::types::{Message, Partition, Topic};

use std::sync::Arc;
use std::time::Duration;

use sentry_arroyo::processing::strategies::{
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
};

use crate::types::InsertBatch;
use crate::types::RowData;
use crate::Noop;

#[test]
Expand Down Expand Up @@ -518,6 +526,83 @@ mod tests {
let _ = strategy.join(None);
}

/// The commit log offset produced by the processor must use next-to-consume
/// semantics (current offset + 1).
#[test]
fn commit_log_entry_uses_next_offset() {
let captured: Arc<std::sync::Mutex<Vec<BytesInsertBatch<RowData>>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let captured_clone = captured.clone();

struct Capture(Arc<std::sync::Mutex<Vec<BytesInsertBatch<RowData>>>>);
impl ProcessingStrategy<BytesInsertBatch<RowData>> for Capture {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(
&mut self,
message: Message<BytesInsertBatch<RowData>>,
) -> Result<(), SubmitError<BytesInsertBatch<RowData>>> {
self.0.lock().unwrap().push(message.into_payload());
Ok(())
}
fn terminate(&mut self) {}
fn join(
&mut self,
_timeout: Option<Duration>,
) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

fn noop_processor(
_payload: KafkaPayload,
_metadata: KafkaMessageMetadata,
_config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
Ok(InsertBatch::default())
}

let partition = Partition::new(Topic::new("events-small"), 7);
let raw_offset: u64 = 42;
let concurrency = ConcurrencyConfig::new(1);

let mut strategy = make_rust_processor(
Capture(captured_clone),
noop_processor,
"outcomes",
false,
&concurrency,
ProcessorConfig::default(),
None,
);

let payload = KafkaPayload::new(None, None, Some(b"{}".to_vec()));
let message = Message::new_broker_message(payload, partition, raw_offset, Utc::now());

strategy.submit(message).unwrap();
strategy.poll().unwrap();
let _ = strategy.join(None);

let batches = captured.lock().unwrap();
assert_eq!(batches.len(), 1);

let offsets = batches[0].commit_log_offsets();
let entry = offsets
.0
.get(&partition.index)
.expect("commit log entry missing for partition");

assert_eq!(
entry.offset,
raw_offset + 1,
"Commit log entry must contain next-to-consume offset (raw + 1). \
Got raw offset {}, expected {}.",
raw_offset,
raw_offset + 1,
);
}

#[test]
fn test_ip_addresses() {
let test_cases = [
Expand Down
Loading