From cbecf2ba26817d9298373f5c0d2a96f56a43bf75 Mon Sep 17 00:00:00 2001 From: Luciano Mammino Date: Mon, 24 Nov 2025 18:21:18 +0100 Subject: [PATCH 1/5] feat: Improve ergonomics of SqsBatchResponse --- lambda-events/src/event/sqs/mod.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/lambda-events/src/event/sqs/mod.rs b/lambda-events/src/event/sqs/mod.rs index 0a380734..c9ce2047 100644 --- a/lambda-events/src/event/sqs/mod.rs +++ b/lambda-events/src/event/sqs/mod.rs @@ -154,6 +154,17 @@ pub struct SqsBatchResponse { pub other: serde_json::Map, } +impl SqsBatchResponse { + /// Add a failed message ID to the batch response + pub fn add_failure(&mut self, message_id: String) -> () { + self.batch_item_failures.push(BatchItemFailure { + item_identifier: message_id, + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }); + } +} + #[non_exhaustive] #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] @@ -335,4 +346,16 @@ mod test { let reparsed: SqsApiEventObj = serde_json::from_slice(output.as_bytes()).unwrap(); assert_eq!(parsed, reparsed); } + + #[test] + #[cfg(feature = "sqs")] + fn example_sqs_batch_response_add_failure() { + let mut response = SqsBatchResponse::default(); + response.add_failure("msg-1".to_string()); + response.add_failure("msg-2".to_string()); + + assert_eq!(response.batch_item_failures.len(), 2); + assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1"); + assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2"); + } } From a80c1e56682b9e8eb28c0eba2a06cbbf2533585d Mon Sep 17 00:00:00 2001 From: Luciano Mammino Date: Mon, 24 Nov 2025 18:32:34 +0100 Subject: [PATCH 2/5] chore: fmt + clippy --- lambda-events/src/event/sqs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-events/src/event/sqs/mod.rs b/lambda-events/src/event/sqs/mod.rs index c9ce2047..d34fb4f5 100644 --- a/lambda-events/src/event/sqs/mod.rs +++ b/lambda-events/src/event/sqs/mod.rs @@ -156,7 +156,7 @@ pub struct SqsBatchResponse { impl SqsBatchResponse { /// Add a failed message ID to the batch response - pub fn add_failure(&mut self, message_id: String) -> () { + pub fn add_failure(&mut self, message_id: String) { self.batch_item_failures.push(BatchItemFailure { item_identifier: message_id, #[cfg(feature = "catch-all-fields")] From ecdc5c65dc54e6ed825a7097b39bcb8d2ea83a43 Mon Sep 17 00:00:00 2001 From: Luciano Mammino Date: Mon, 24 Nov 2025 18:47:51 +0100 Subject: [PATCH 3/5] chore: better docs --- lambda-events/src/event/sqs/mod.rs | 49 +++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/lambda-events/src/event/sqs/mod.rs b/lambda-events/src/event/sqs/mod.rs index d34fb4f5..039d8ca8 100644 --- a/lambda-events/src/event/sqs/mod.rs +++ b/lambda-events/src/event/sqs/mod.rs @@ -155,7 +155,54 @@ pub struct SqsBatchResponse { } impl SqsBatchResponse { - /// Add a failed message ID to the batch response + /// Add a failed message ID to the batch response. + /// + /// When processing SQS messages in batches, you can use this helper method to + /// register individual message failures. Lambda will automatically return failed + /// messages to the queue for reprocessing while successfully processed messages + /// will be deleted. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's SQS event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + /// + /// # Example + /// + /// ```rust,no_run + /// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse}; + /// use lambda_runtime::{service_fn, Error, LambdaEvent}; + /// + /// async fn function_handler( + /// event: LambdaEvent, + /// ) -> Result { + /// // Start from a default response + /// let mut response = SqsBatchResponse::default(); + /// + /// for record in event.payload.records { + /// let message_id = record.message_id.clone().unwrap_or_default(); + /// + /// // Try to process the message + /// if let Err(e) = process_record(&record).await { + /// println!("Failed to process message {}: {}", message_id, e); + /// + /// // Use the helper to register the failure + /// response.add_failure(message_id); + /// } + /// } + /// + /// Ok(response) + /// } + /// + /// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> { + /// // Your message processing logic here + /// Ok(()) + /// } + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Error> { + /// lambda_runtime::run(service_fn(function_handler)).await + /// } + /// ``` pub fn add_failure(&mut self, message_id: String) { self.batch_item_failures.push(BatchItemFailure { item_identifier: message_id, From f94fb4d698ffdd0cee1d79e973a574a31dded24d Mon Sep 17 00:00:00 2001 From: Luciano Mammino Date: Tue, 25 Nov 2025 09:53:27 +0100 Subject: [PATCH 4/5] feat: add set_failures helper and improve add_failure ergonomics --- lambda-events/src/event/sqs/mod.rs | 83 +++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/lambda-events/src/event/sqs/mod.rs b/lambda-events/src/event/sqs/mod.rs index 039d8ca8..258fd203 100644 --- a/lambda-events/src/event/sqs/mod.rs +++ b/lambda-events/src/event/sqs/mod.rs @@ -203,13 +203,75 @@ impl SqsBatchResponse { /// lambda_runtime::run(service_fn(function_handler)).await /// } /// ``` - pub fn add_failure(&mut self, message_id: String) { + pub fn add_failure(&mut self, message_id: impl Into) { self.batch_item_failures.push(BatchItemFailure { - item_identifier: message_id, + item_identifier: message_id.into(), #[cfg(feature = "catch-all-fields")] other: serde_json::Map::new(), }); } + + /// Set multiple failed message IDs at once. + /// + /// This is a convenience method for setting all batch item failures in one call. + /// It replaces any previously registered failures. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's SQS event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + /// + /// # Example + /// + /// ```rust,no_run + /// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse}; + /// use lambda_runtime::{service_fn, Error, LambdaEvent}; + /// + /// async fn function_handler( + /// event: LambdaEvent, + /// ) -> Result { + /// let mut failed_ids = Vec::new(); + /// + /// for record in event.payload.records { + /// let message_id = record.message_id.clone().unwrap_or_default(); + /// + /// // Try to process the message + /// if let Err(e) = process_record(&record).await { + /// println!("Failed to process message {}: {}", message_id, e); + /// failed_ids.push(message_id); + /// } + /// } + /// + /// // Set all failures at once + /// let mut response = SqsBatchResponse::default(); + /// response.set_failures(failed_ids); + /// + /// Ok(response) + /// } + /// + /// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> { + /// // Your message processing logic here + /// Ok(()) + /// } + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Error> { + /// lambda_runtime::run(service_fn(function_handler)).await + /// } + /// ``` + pub fn set_failures(&mut self, message_ids: I) + where + I: IntoIterator, + S: Into, + { + self.batch_item_failures = message_ids + .into_iter() + .map(|id| BatchItemFailure { + item_identifier: id.into(), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }) + .collect(); + } } #[non_exhaustive] @@ -405,4 +467,21 @@ mod test { assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1"); assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2"); } + + #[test] + #[cfg(feature = "sqs")] + fn example_sqs_batch_response_set_failures() { + let mut response = SqsBatchResponse::default(); + response.set_failures(vec!["msg-1", "msg-2", "msg-3"]); + + assert_eq!(response.batch_item_failures.len(), 3); + assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1"); + assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2"); + assert_eq!(response.batch_item_failures[2].item_identifier, "msg-3"); + + // Test that set_failures replaces existing failures + response.set_failures(vec!["msg-4".to_string()]); + assert_eq!(response.batch_item_failures.len(), 1); + assert_eq!(response.batch_item_failures[0].item_identifier, "msg-4"); + } } From acb971f6cda55e293d328512bd8d96a90e010848 Mon Sep 17 00:00:00 2001 From: Luciano Mammino Date: Tue, 25 Nov 2025 15:07:57 +0100 Subject: [PATCH 5/5] feat(kinesis): add add_failure and set_failures helpers to KinesisEventResponse --- lambda-events/src/event/streams/mod.rs | 92 ++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/lambda-events/src/event/streams/mod.rs b/lambda-events/src/event/streams/mod.rs index 9f2391c8..bf6dec8a 100644 --- a/lambda-events/src/event/streams/mod.rs +++ b/lambda-events/src/event/streams/mod.rs @@ -17,6 +17,48 @@ pub struct KinesisEventResponse { pub other: serde_json::Map, } +impl KinesisEventResponse { + /// Add a failed item identifier to the batch response. + /// + /// When processing Kinesis records in batches, you can use this helper method to + /// register individual record failures. Lambda will automatically retry failed + /// records while successfully processed records will be checkpointed. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's Kinesis event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + pub fn add_failure(&mut self, item_identifier: impl Into) { + self.batch_item_failures.push(KinesisBatchItemFailure { + item_identifier: Some(item_identifier.into()), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }); + } + + /// Set multiple failed item identifiers at once. + /// + /// This is a convenience method for setting all batch item failures in one call. + /// It replaces any previously registered failures. + /// + /// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures` + /// to be enabled in your Lambda function's Kinesis event source mapping configuration. + /// Without this setting, Lambda will retry the entire batch on any failure. + pub fn set_failures(&mut self, item_identifiers: I) + where + I: IntoIterator, + S: Into, + { + self.batch_item_failures = item_identifiers + .into_iter() + .map(|id| KinesisBatchItemFailure { + item_identifier: Some(id.into()), + #[cfg(feature = "catch-all-fields")] + other: serde_json::Map::new(), + }) + .collect(); + } +} + /// `KinesisBatchItemFailure` is the individual record which failed processing. #[non_exhaustive] #[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)] @@ -94,3 +136,53 @@ pub struct SqsBatchItemFailure { #[serde(flatten)] pub other: serde_json::Map, } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn kinesis_event_response_add_failure() { + let mut response = KinesisEventResponse::default(); + response.add_failure("seq-1"); + response.add_failure("seq-2".to_string()); + + assert_eq!(response.batch_item_failures.len(), 2); + assert_eq!( + response.batch_item_failures[0].item_identifier, + Some("seq-1".to_string()) + ); + assert_eq!( + response.batch_item_failures[1].item_identifier, + Some("seq-2".to_string()) + ); + } + + #[test] + fn kinesis_event_response_set_failures() { + let mut response = KinesisEventResponse::default(); + response.set_failures(vec!["seq-1", "seq-2", "seq-3"]); + + assert_eq!(response.batch_item_failures.len(), 3); + assert_eq!( + response.batch_item_failures[0].item_identifier, + Some("seq-1".to_string()) + ); + assert_eq!( + response.batch_item_failures[1].item_identifier, + Some("seq-2".to_string()) + ); + assert_eq!( + response.batch_item_failures[2].item_identifier, + Some("seq-3".to_string()) + ); + + // Test that set_failures replaces existing failures + response.set_failures(vec!["seq-4".to_string()]); + assert_eq!(response.batch_item_failures.len(), 1); + assert_eq!( + response.batch_item_failures[0].item_identifier, + Some("seq-4".to_string()) + ); + } +}