-
-
Notifications
You must be signed in to change notification settings - Fork 13
Add SQS/SNS message bus implementation #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
e182d7b
Add SQS/SNS message bus implementation
niemyjski 7a4a4ea
Add per-message-type topic routing and optimize topic creation
niemyjski f2e3d9d
Consolidate duplicate exception handlers in subscriber loop
niemyjski 29cf0f3
Address PR feedback: use range syntax and improve test cleanup
niemyjski 60eec82
Fix deadlock in GetOrCreateTopicArnAsync causing CI tests to hang
niemyjski 678a05f
Fix ArgumentOutOfRangeException in tests with short topic names
niemyjski 4e917e8
Address PR feedback: improve exception handling and use LINQ
niemyjski d18e97c
Updates Foundatio package versions to 13.0.0-beta1.17 in project files
niemyjski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
12 changes: 12 additions & 0 deletions
12
src/Foundatio.AWS/Messaging/SQSMessageBusConnectionStringBuilder.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| namespace Foundatio.Messaging; | ||
|
|
||
| public class SQSMessageBusConnectionStringBuilder : AmazonConnectionStringBuilder | ||
| { | ||
| public SQSMessageBusConnectionStringBuilder() | ||
| { | ||
| } | ||
|
|
||
| public SQSMessageBusConnectionStringBuilder(string connectionString) : base(connectionString) | ||
| { | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,264 @@ | ||
| using System; | ||
| using Amazon; | ||
| using Amazon.Runtime; | ||
|
|
||
| namespace Foundatio.Messaging; | ||
|
|
||
| public class SQSMessageBusOptions : SharedMessageBusOptions | ||
| { | ||
| /// <summary> | ||
| /// The connection string containing AWS credentials and configuration. | ||
| /// Format: "AccessKey=xxx;SecretKey=xxx;Region=us-east-1" or "ServiceUrl=http://localhost:4566;AccessKey=xxx;SecretKey=xxx" | ||
| /// </summary> | ||
| public string ConnectionString { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The AWS credentials to use for authentication. If not specified, uses default credential resolution. | ||
| /// </summary> | ||
| /// <seealso cref="Amazon.Runtime.AWSCredentials"/> | ||
| public AWSCredentials Credentials { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The AWS region endpoint. If not specified, uses default region resolution. | ||
| /// </summary> | ||
| /// <seealso cref="Amazon.RegionEndpoint"/> | ||
| public RegionEndpoint Region { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The service URL for LocalStack or custom endpoints. When set, overrides the region endpoint. | ||
| /// </summary> | ||
| public string ServiceUrl { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Whether the SNS topic can be created if it doesn't exist. Default is true. | ||
| /// </summary> | ||
| public bool CanCreateTopic { get; set; } = true; | ||
|
|
||
| /// <summary> | ||
| /// The name of the SQS subscription queue. If not specified, a unique queue name will be generated. | ||
| /// Use this for durable subscriptions that persist across restarts. | ||
| /// </summary> | ||
| public string SubscriptionQueueName { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Whether the subscription queue should be automatically deleted when the message bus is disposed. | ||
| /// Default is true for ephemeral queues, set to false for durable subscriptions. | ||
| /// </summary> | ||
| public bool SubscriptionQueueAutoDelete { get; set; } = true; | ||
|
|
||
| /// <summary> | ||
| /// The timeout for reading messages from the SQS queue using long polling. Default is 20 seconds. | ||
| /// </summary> | ||
| public TimeSpan ReadQueueTimeout { get; set; } = TimeSpan.FromSeconds(20); | ||
|
|
||
| /// <summary> | ||
| /// The interval between dequeue attempts when no messages are available. Default is 1 second. | ||
| /// </summary> | ||
| public TimeSpan DequeueInterval { get; set; } = TimeSpan.FromSeconds(1); | ||
|
|
||
| /// <summary> | ||
| /// The visibility timeout for messages. Messages will be invisible to other consumers | ||
| /// for this duration after being received. If not set, uses SQS default (30 seconds). | ||
| /// </summary> | ||
| public TimeSpan? MessageVisibilityTimeout { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Whether to use SQS managed server-side encryption (SSE-SQS). Default is false. | ||
| /// </summary> | ||
| public bool SqsManagedSseEnabled { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The KMS master key ID for server-side encryption (SSE-KMS). | ||
| /// </summary> | ||
| public string KmsMasterKeyId { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// The KMS data key reuse period in seconds. Default is 300 seconds (5 minutes). | ||
| /// </summary> | ||
| public int KmsDataKeyReusePeriodSeconds { get; set; } = 300; | ||
| } | ||
|
|
||
| public class SQSMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder<SQSMessageBusOptions, SQSMessageBusOptionsBuilder> | ||
| { | ||
| /// <summary> | ||
| /// Sets the connection string containing AWS credentials and configuration. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder ConnectionString(string connectionString) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(connectionString); | ||
| Target.ConnectionString = connectionString; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the AWS credentials to use for authentication. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder Credentials(AWSCredentials credentials) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(credentials); | ||
| Target.Credentials = credentials; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the AWS credentials using access key and secret key. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder Credentials(string accessKey, string secretKey) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(accessKey); | ||
| ArgumentException.ThrowIfNullOrEmpty(secretKey); | ||
|
|
||
| Target.Credentials = new BasicAWSCredentials(accessKey, secretKey); | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the AWS region endpoint. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder Region(RegionEndpoint region) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(region); | ||
| Target.Region = region; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the AWS region by system name (e.g., "us-east-1"). | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder Region(string region) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(region); | ||
| Target.Region = RegionEndpoint.GetBySystemName(region); | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the service URL for LocalStack or custom endpoints. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder ServiceUrl(string serviceUrl) | ||
| { | ||
| Target.ServiceUrl = serviceUrl; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets whether the SNS topic can be created if it doesn't exist. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder CanCreateTopic(bool enabled) | ||
| { | ||
| Target.CanCreateTopic = enabled; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Enables automatic topic creation if it doesn't exist. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder EnableCreateTopic() => CanCreateTopic(true); | ||
|
|
||
| /// <summary> | ||
| /// Disables automatic topic creation. An exception will be thrown if the topic doesn't exist. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder DisableCreateTopic() => CanCreateTopic(false); | ||
|
|
||
| /// <summary> | ||
| /// Sets the subscription queue name for durable subscriptions. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder SubscriptionQueueName(string queueName) | ||
| { | ||
| Target.SubscriptionQueueName = queueName; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Configures whether the subscription queue should be auto-deleted on dispose. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder SubscriptionQueueAutoDelete(bool autoDelete) | ||
| { | ||
| Target.SubscriptionQueueAutoDelete = autoDelete; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Configures a durable subscription that persists across restarts. | ||
| /// Sets the queue name and disables auto-delete. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder UseDurableSubscription(string queueName) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(queueName); | ||
|
|
||
| Target.SubscriptionQueueName = queueName; | ||
| Target.SubscriptionQueueAutoDelete = false; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the timeout for reading messages from the SQS queue using long polling. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder ReadQueueTimeout(TimeSpan timeout) | ||
| { | ||
| ArgumentOutOfRangeException.ThrowIfLessThan(timeout, TimeSpan.Zero); | ||
|
|
||
| Target.ReadQueueTimeout = timeout; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the interval between dequeue attempts when no messages are available. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder DequeueInterval(TimeSpan interval) | ||
| { | ||
| ArgumentOutOfRangeException.ThrowIfLessThan(interval, TimeSpan.Zero); | ||
|
|
||
| Target.DequeueInterval = interval; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sets the visibility timeout for messages. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder MessageVisibilityTimeout(TimeSpan timeout) | ||
| { | ||
| ArgumentOutOfRangeException.ThrowIfLessThan(timeout, TimeSpan.Zero); | ||
|
|
||
| Target.MessageVisibilityTimeout = timeout; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Enables KMS server-side encryption (SSE-KMS) with the specified master key. | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder UseKmsEncryption(string kmsMasterKeyId, int kmsKeyReusePeriodSeconds = 300) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(kmsMasterKeyId); | ||
| Target.KmsMasterKeyId = kmsMasterKeyId; | ||
| Target.KmsDataKeyReusePeriodSeconds = kmsKeyReusePeriodSeconds; | ||
| Target.SqsManagedSseEnabled = false; | ||
| return this; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Enables SQS managed server-side encryption (SSE-SQS). | ||
| /// </summary> | ||
| public SQSMessageBusOptionsBuilder UseSqsManagedEncryption() | ||
| { | ||
| Target.SqsManagedSseEnabled = true; | ||
| Target.KmsMasterKeyId = null; | ||
| return this; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public override SQSMessageBusOptions Build() | ||
| { | ||
| if (String.IsNullOrEmpty(Target.ConnectionString)) | ||
| return Target; | ||
|
|
||
| var connectionString = new SQSMessageBusConnectionStringBuilder(Target.ConnectionString); | ||
| Target.Credentials ??= connectionString.GetCredentials(); | ||
| Target.Region ??= connectionString.GetRegion(); | ||
|
|
||
| if (String.IsNullOrEmpty(Target.ServiceUrl) && !String.IsNullOrEmpty(connectionString.ServiceUrl)) | ||
| Target.ServiceUrl = connectionString.ServiceUrl; | ||
|
|
||
| return Target; | ||
| } | ||
| } |
49 changes: 49 additions & 0 deletions
49
src/Foundatio.AWS/Messaging/SQSMessageBusServiceCollectionExtensions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| using System; | ||
| using Foundatio.Messaging; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace Foundatio.Extensions.Hosting.Messaging; | ||
|
|
||
| public static class SQSMessageBusServiceCollectionExtensions | ||
| { | ||
| /// <summary> | ||
| /// Adds an SQS message bus to the service collection. | ||
| /// </summary> | ||
| public static IServiceCollection AddSQSMessageBus( | ||
| this IServiceCollection services, | ||
| Action<SQSMessageBusOptionsBuilder> configure) | ||
| { | ||
| ArgumentNullException.ThrowIfNull(configure); | ||
|
|
||
| services.AddSingleton<IMessageBus>(sp => | ||
| { | ||
| var builder = new SQSMessageBusOptionsBuilder(); | ||
| builder.LoggerFactory(sp.GetService<ILoggerFactory>()); | ||
| configure(builder); | ||
| return new SQSMessageBus(builder.Build()); | ||
| }); | ||
|
|
||
| services.AddSingleton<IMessagePublisher>(sp => sp.GetRequiredService<IMessageBus>()); | ||
| services.AddSingleton<IMessageSubscriber>(sp => sp.GetRequiredService<IMessageBus>()); | ||
|
|
||
| return services; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Adds an SQS message bus to the service collection using a connection string. | ||
| /// </summary> | ||
| public static IServiceCollection AddSQSMessageBus( | ||
| this IServiceCollection services, | ||
| string connectionString, | ||
| Action<SQSMessageBusOptionsBuilder> configure = null) | ||
| { | ||
| ArgumentException.ThrowIfNullOrEmpty(connectionString); | ||
|
|
||
| return services.AddSQSMessageBus(builder => | ||
| { | ||
| builder.ConnectionString(connectionString); | ||
| configure?.Invoke(builder); | ||
| }); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent namespace: This file uses the namespace 'Foundatio.Extensions.Hosting.Messaging' but other files in this codebase use namespaces like 'Foundatio.Storage', 'Foundatio.Queues', and 'Foundatio.Messaging' directly. For consistency with the codebase conventions, consider using 'Foundatio.Messaging' or creating a dedicated extensions namespace that follows the established pattern (e.g., 'Foundatio.AWS.Extensions' like the existing TaskExtensions.cs uses).