Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions lambda-events/src/event/sqs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,126 @@ pub struct SqsBatchResponse {
pub other: serde_json::Map<String, Value>,
}

impl SqsBatchResponse {
/// Add a failed message ID to the batch response.
///
/// When processing SQS messages in batches, you can use this helper method to
/// register individual message failures. Lambda will automatically return failed
/// messages to the queue for reprocessing while successfully processed messages
/// will be deleted.
///
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
/// to be enabled in your Lambda function's SQS event source mapping configuration.
/// Without this setting, Lambda will retry the entire batch on any failure.
///
/// # Example
///
/// ```rust,no_run
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: any reason this needs to be no_run?

/// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse};
/// use lambda_runtime::{service_fn, Error, LambdaEvent};
///
/// async fn function_handler(
/// event: LambdaEvent<SqsEvent>,
/// ) -> Result<SqsBatchResponse, Error> {
/// // Start from a default response
/// let mut response = SqsBatchResponse::default();
///
/// for record in event.payload.records {
/// let message_id = record.message_id.clone().unwrap_or_default();
///
/// // Try to process the message
/// if let Err(e) = process_record(&record).await {
/// println!("Failed to process message {}: {}", message_id, e);
///
/// // Use the helper to register the failure
/// response.add_failure(message_id);
/// }
/// }
///
/// Ok(response)
/// }
///
/// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> {
/// // Your message processing logic here
/// Ok(())
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
/// lambda_runtime::run(service_fn(function_handler)).await
/// }
/// ```
pub fn add_failure(&mut self, message_id: impl Into<String>) {
self.batch_item_failures.push(BatchItemFailure {
Copy link
Collaborator

@jlizen jlizen Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to use ..Default::default() here to reduce churn in this function as the struct evolves? We shouldn't be limited by #[non_exhaustive] locally to the crate?

Ie:

self.batch_item_failures.push(BatchItemFailure {
    item_identifier: message_id.into(),
    ...Default::default()
};

We also should state in the doc comment that defaults will be used for all fields besides item_identifier.

item_identifier: message_id.into(),
#[cfg(feature = "catch-all-fields")]
other: serde_json::Map::new(),
});
}

/// Set multiple failed message IDs at once.
///
/// This is a convenience method for setting all batch item failures in one call.
/// It replaces any previously registered failures.
///
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
/// to be enabled in your Lambda function's SQS event source mapping configuration.
/// Without this setting, Lambda will retry the entire batch on any failure.
///
/// # Example
///
/// ```rust,no_run
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: any reason this needs to be no_run?

/// use aws_lambda_events::event::sqs::{SqsEvent, SqsBatchResponse};
/// use lambda_runtime::{service_fn, Error, LambdaEvent};
///
/// async fn function_handler(
/// event: LambdaEvent<SqsEvent>,
/// ) -> Result<SqsBatchResponse, Error> {
/// let mut failed_ids = Vec::new();
///
/// for record in event.payload.records {
/// let message_id = record.message_id.clone().unwrap_or_default();
///
/// // Try to process the message
/// if let Err(e) = process_record(&record).await {
/// println!("Failed to process message {}: {}", message_id, e);
/// failed_ids.push(message_id);
/// }
/// }
///
/// // Set all failures at once
/// let mut response = SqsBatchResponse::default();
/// response.set_failures(failed_ids);
///
/// Ok(response)
/// }
///
/// async fn process_record(record: &aws_lambda_events::event::sqs::SqsMessage) -> Result<(), Error> {
/// // Your message processing logic here
/// Ok(())
/// }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Error> {
/// lambda_runtime::run(service_fn(function_handler)).await
/// }
/// ```
pub fn set_failures<I, S>(&mut self, message_ids: I)
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.batch_item_failures = message_ids
.into_iter()
.map(|id| BatchItemFailure {
Copy link
Collaborator

@jlizen jlizen Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback using ..Default::default() for future proofing, mentioning that defaults are used for other fields.

item_identifier: id.into(),
#[cfg(feature = "catch-all-fields")]
other: serde_json::Map::new(),
})
.collect();
}
}

#[non_exhaustive]
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -335,4 +455,33 @@ mod test {
let reparsed: SqsApiEventObj<CustStruct> = serde_json::from_slice(output.as_bytes()).unwrap();
assert_eq!(parsed, reparsed);
}

#[test]
#[cfg(feature = "sqs")]
fn example_sqs_batch_response_add_failure() {
let mut response = SqsBatchResponse::default();
response.add_failure("msg-1".to_string());
response.add_failure("msg-2".to_string());

assert_eq!(response.batch_item_failures.len(), 2);
assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1");
assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2");
}

#[test]
#[cfg(feature = "sqs")]
fn example_sqs_batch_response_set_failures() {
let mut response = SqsBatchResponse::default();
response.set_failures(vec!["msg-1", "msg-2", "msg-3"]);

assert_eq!(response.batch_item_failures.len(), 3);
assert_eq!(response.batch_item_failures[0].item_identifier, "msg-1");
assert_eq!(response.batch_item_failures[1].item_identifier, "msg-2");
assert_eq!(response.batch_item_failures[2].item_identifier, "msg-3");

// Test that set_failures replaces existing failures
response.set_failures(vec!["msg-4".to_string()]);
assert_eq!(response.batch_item_failures.len(), 1);
assert_eq!(response.batch_item_failures[0].item_identifier, "msg-4");
}
}
92 changes: 92 additions & 0 deletions lambda-events/src/event/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,48 @@ pub struct KinesisEventResponse {
pub other: serde_json::Map<String, Value>,
}

impl KinesisEventResponse {
/// Add a failed item identifier to the batch response.
///
/// When processing Kinesis records in batches, you can use this helper method to
/// register individual record failures. Lambda will automatically retry failed
/// records while successfully processed records will be checkpointed.
///
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
/// to be enabled in your Lambda function's Kinesis event source mapping configuration.
/// Without this setting, Lambda will retry the entire batch on any failure.
pub fn add_failure(&mut self, item_identifier: impl Into<String>) {
self.batch_item_failures.push(KinesisBatchItemFailure {
Copy link
Collaborator

@jlizen jlizen Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback using ..Default::default() for future proofing, mentioning that defaults are used for other fields.

item_identifier: Some(item_identifier.into()),
#[cfg(feature = "catch-all-fields")]
other: serde_json::Map::new(),
});
}

/// Set multiple failed item identifiers at once.
///
/// This is a convenience method for setting all batch item failures in one call.
/// It replaces any previously registered failures.
///
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
/// to be enabled in your Lambda function's Kinesis event source mapping configuration.
/// Without this setting, Lambda will retry the entire batch on any failure.
pub fn set_failures<I, S>(&mut self, item_identifiers: I)
Copy link
Collaborator

@jlizen jlizen Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback using ..Default::default() for future proofing, mentioning that defaults are used for other fields.

where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.batch_item_failures = item_identifiers
.into_iter()
.map(|id| KinesisBatchItemFailure {
item_identifier: Some(id.into()),
#[cfg(feature = "catch-all-fields")]
other: serde_json::Map::new(),
})
.collect();
}
}

/// `KinesisBatchItemFailure` is the individual record which failed processing.
#[non_exhaustive]
#[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)]
Expand Down Expand Up @@ -94,3 +136,53 @@ pub struct SqsBatchItemFailure {
#[serde(flatten)]
pub other: serde_json::Map<String, Value>,
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn kinesis_event_response_add_failure() {
let mut response = KinesisEventResponse::default();
response.add_failure("seq-1");
response.add_failure("seq-2".to_string());

assert_eq!(response.batch_item_failures.len(), 2);
assert_eq!(
response.batch_item_failures[0].item_identifier,
Some("seq-1".to_string())
);
assert_eq!(
response.batch_item_failures[1].item_identifier,
Some("seq-2".to_string())
);
}

#[test]
fn kinesis_event_response_set_failures() {
let mut response = KinesisEventResponse::default();
response.set_failures(vec!["seq-1", "seq-2", "seq-3"]);

assert_eq!(response.batch_item_failures.len(), 3);
assert_eq!(
response.batch_item_failures[0].item_identifier,
Some("seq-1".to_string())
);
assert_eq!(
response.batch_item_failures[1].item_identifier,
Some("seq-2".to_string())
);
assert_eq!(
response.batch_item_failures[2].item_identifier,
Some("seq-3".to_string())
);

// Test that set_failures replaces existing failures
response.set_failures(vec!["seq-4".to_string()]);
assert_eq!(response.batch_item_failures.len(), 1);
assert_eq!(
response.batch_item_failures[0].item_identifier,
Some("seq-4".to_string())
);
}
}
Loading