diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index 181ee6aa6f..4d1c3e4193 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -56,7 +56,7 @@ pub fn make_rust_processor( .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { - offset, + offset: offset + 1, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, @@ -135,7 +135,7 @@ pub fn make_rust_processor_with_replacements( .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( partition.index, CommitLogEntry { - offset, + offset: offset + 1, orig_message_ts: timestamp, received_p99: transformed.origin_timestamp.into_iter().collect(), }, @@ -220,7 +220,7 @@ pub fn make_rust_processor_row_binary>>> = + Arc::new(std::sync::Mutex::new(Vec::new())); + let captured_clone = captured.clone(); + + struct Capture(Arc>>>); + impl ProcessingStrategy> for Capture { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + fn submit( + &mut self, + message: Message>, + ) -> Result<(), SubmitError>> { + self.0.lock().unwrap().push(message.into_payload()); + Ok(()) + } + fn terminate(&mut self) {} + fn join( + &mut self, + _timeout: Option, + ) -> Result, StrategyError> { + Ok(None) + } + } + + fn noop_processor( + _payload: KafkaPayload, + _metadata: KafkaMessageMetadata, + _config: &ProcessorConfig, + ) -> anyhow::Result { + Ok(InsertBatch::default()) + } + + let partition = Partition::new(Topic::new("events-small"), 7); + let raw_offset: u64 = 42; + let concurrency = ConcurrencyConfig::new(1); + + let mut strategy = make_rust_processor( + Capture(captured_clone), + noop_processor, + "outcomes", + false, + &concurrency, + ProcessorConfig::default(), + None, + ); + + let payload = KafkaPayload::new(None, None, Some(b"{}".to_vec())); + let message = Message::new_broker_message(payload, partition, raw_offset, Utc::now()); + + strategy.submit(message).unwrap(); + strategy.poll().unwrap(); + let _ = strategy.join(None); + + let batches = captured.lock().unwrap(); + assert_eq!(batches.len(), 1); + + let offsets = batches[0].commit_log_offsets(); + let entry = offsets + .0 + .get(&partition.index) + .expect("commit log entry missing for partition"); + + assert_eq!( + entry.offset, + raw_offset + 1, + "Commit log entry must contain next-to-consume offset (raw + 1). \ + Got raw offset {}, expected {}.", + raw_offset, + raw_offset + 1, + ); + } + #[test] fn test_ip_addresses() { let test_cases = [