diff --git a/lambda-events/src/event/sqs/mod.rs b/lambda-events/src/event/sqs/mod.rs index 0a380734..258fd203 100644 --- a/lambda-events/src/event/sqs/mod.rs +++ b/lambda-events/src/event/sqs/mod.rs @@ -154,6 +154,126 @@ pub struct SqsBatchResponse { pub other: serde_json::Map, } +impl SqsBatchResponse { + /// Add a failed message ID to the batch response. + /// + /// When processing SQS messages in batches, you can use this helper method to + /// register individual message failures. Lambda will automatically return failed + /// messages to the queue for reprocessing while successfully processed messages + /// will be deleted. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's SQS event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + /// + /// # Example + /// + /// ```rust,no_run + /// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse}; + /// use lambda_runtime::{service_fn, Error, LambdaEvent}; + /// + /// async fn function_handler( + /// event: LambdaEvent, + /// ) -> Result { + /// // Start from a default response + /// let mut response = SqsBatchResponse::default(); + /// + /// for record in event.payload.records { + /// let message_id = record.message_id.clone().unwrap_or_default(); + /// + /// // Try to process the message + /// if let Err(e) = process_record(&record).await { + /// println!("Failed to process message {}: {}", message_id, e); + /// + /// // Use the helper to register the failure + /// response.add_failure(message_id); + /// } + /// } + /// + /// Ok(response) + /// } + /// + /// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> { + /// // Your message processing logic here + /// Ok(()) + /// } + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Error> { + /// lambda_runtime::run(service_fn(function_handler)).await + /// } + /// ``` + pub fn add_failure(&mut self, message_id: impl Into) { + self.batch_item_failures.push(BatchItemFailure { + item_identifier: message_id.into(), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }); + } + + /// Set multiple failed message IDs at once. + /// + /// This is a convenience method for setting all batch item failures in one call. + /// It replaces any previously registered failures. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's SQS event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + /// + /// # Example + /// + /// ```rust,no_run + /// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse}; + /// use lambda_runtime::{service_fn, Error, LambdaEvent}; + /// + /// async fn function_handler( + /// event: LambdaEvent, + /// ) -> Result { + /// let mut failed_ids = Vec::new(); + /// + /// for record in event.payload.records { + /// let message_id = record.message_id.clone().unwrap_or_default(); + /// + /// // Try to process the message + /// if let Err(e) = process_record(&record).await { + /// println!("Failed to process message {}: {}", message_id, e); + /// failed_ids.push(message_id); + /// } + /// } + /// + /// // Set all failures at once + /// let mut response = SqsBatchResponse::default(); + /// response.set_failures(failed_ids); + /// + /// Ok(response) + /// } + /// + /// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> { + /// // Your message processing logic here + /// Ok(()) + /// } + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Error> { + /// lambda_runtime::run(service_fn(function_handler)).await + /// } + /// ``` + pub fn set_failures(&mut self, message_ids: I) + where + I: IntoIterator, + S: Into, + { + self.batch_item_failures = message_ids + .into_iter() + .map(|id| BatchItemFailure { + item_identifier: id.into(), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }) + .collect(); + } +} + #[non_exhaustive] #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] @@ -335,4 +455,33 @@ mod test { let reparsed: SqsApiEventObj = serde_json::from_slice(output.as_bytes()).unwrap(); assert_eq!(parsed, reparsed); } + + #[test] + #[cfg(feature = "sqs")] + fn example_sqs_batch_response_add_failure() { + let mut response = SqsBatchResponse::default(); + response.add_failure("msg-1".to_string()); + response.add_failure("msg-2".to_string()); + + assert_eq!(response.batch_item_failures.len(), 2); + assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1"); + assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2"); + } + + #[test] + #[cfg(feature = "sqs")] + fn example_sqs_batch_response_set_failures() { + let mut response = SqsBatchResponse::default(); + response.set_failures(vec!["msg-1", "msg-2", "msg-3"]); + + assert_eq!(response.batch_item_failures.len(), 3); + assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1"); + assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2"); + assert_eq!(response.batch_item_failures[2].item_identifier, "msg-3"); + + // Test that set_failures replaces existing failures + response.set_failures(vec!["msg-4".to_string()]); + assert_eq!(response.batch_item_failures.len(), 1); + assert_eq!(response.batch_item_failures[0].item_identifier, "msg-4"); + } } diff --git a/lambda-events/src/event/streams/mod.rs b/lambda-events/src/event/streams/mod.rs index 9f2391c8..bf6dec8a 100644 --- a/lambda-events/src/event/streams/mod.rs +++ b/lambda-events/src/event/streams/mod.rs @@ -17,6 +17,48 @@ pub struct KinesisEventResponse { pub other: serde_json::Map, } +impl KinesisEventResponse { + /// Add a failed item identifier to the batch response. + /// + /// When processing Kinesis records in batches, you can use this helper method to + /// register individual record failures. Lambda will automatically retry failed + /// records while successfully processed records will be checkpointed. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's Kinesis event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + pub fn add_failure(&mut self, item_identifier: impl Into) { + self.batch_item_failures.push(KinesisBatchItemFailure { + item_identifier: Some(item_identifier.into()), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }); + } + + /// Set multiple failed item identifiers at once. + /// + /// This is a convenience method for setting all batch item failures in one call. + /// It replaces any previously registered failures. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's Kinesis event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + pub fn set_failures(&mut self, item_identifiers: I) + where + I: IntoIterator, + S: Into, + { + self.batch_item_failures = item_identifiers + .into_iter() + .map(|id| KinesisBatchItemFailure { + item_identifier: Some(id.into()), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }) + .collect(); + } +} + /// `KinesisBatchItemFailure` is the individual record which failed processing. #[non_exhaustive] #[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)] @@ -94,3 +136,53 @@ pub struct SqsBatchItemFailure { #[serde(flatten)] pub other: serde_json::Map, } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn kinesis_event_response_add_failure() { + let mut response = KinesisEventResponse::default(); + response.add_failure("seq-1"); + response.add_failure("seq-2".to_string()); + + assert_eq!(response.batch_item_failures.len(), 2); + assert_eq!( + response.batch_item_failures[0].item_identifier, + Some("seq-1".to_string()) + ); + assert_eq!( + response.batch_item_failures[1].item_identifier, + Some("seq-2".to_string()) + ); + } + + #[test] + fn kinesis_event_response_set_failures() { + let mut response = KinesisEventResponse::default(); + response.set_failures(vec!["seq-1", "seq-2", "seq-3"]); + + assert_eq!(response.batch_item_failures.len(), 3); + assert_eq!( + response.batch_item_failures[0].item_identifier, + Some("seq-1".to_string()) + ); + assert_eq!( + response.batch_item_failures[1].item_identifier, + Some("seq-2".to_string()) + ); + assert_eq!( + response.batch_item_failures[2].item_identifier, + Some("seq-3".to_string()) + ); + + // Test that set_failures replaces existing failures + response.set_failures(vec!["seq-4".to_string()]); + assert_eq!(response.batch_item_failures.len(), 1); + assert_eq!( + response.batch_item_failures[0].item_identifier, + Some("seq-4".to_string()) + ); + } +}