Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/Foundatio.AWS/Extensions/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
Expand All @@ -22,9 +23,9 @@ internal static FileSpec ToFileInfo(this S3Object blob)
return new FileSpec
{
Path = blob.Key,
Size = blob.Size,
Modified = blob.LastModified.ToUniversalTime(),
Created = blob.LastModified.ToUniversalTime() // TODO: Need to fix this
Size = blob.Size.GetValueOrDefault(),
Created = blob.LastModified?.ToUniversalTime() ?? DateTime.MinValue, // TODO: Need to fix this
Modified = blob.LastModified?.ToUniversalTime() ?? DateTime.MinValue,
};
}

Expand Down
6 changes: 3 additions & 3 deletions src/Foundatio.AWS/Foundatio.AWS.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
<PackageTags>$(PackageTags);Amazon;AWS;S3</PackageTags>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AWSSDK.CloudWatch" Version="3.7.403.10" />
<PackageReference Include="AWSSDK.S3" Version="3.7.417.9" />
<PackageReference Include="AWSSDK.SQS" Version="3.7.400.165" />
<PackageReference Include="AWSSDK.CloudWatch" Version="4.0.1.4" />
<PackageReference Include="AWSSDK.S3" Version="4.0.1.6" />
<PackageReference Include="AWSSDK.SQS" Version="4.0.0.9" />

<PackageReference Include="Foundatio" Version="11.1.1-alpha.0.11" Condition="'$(ReferenceFoundatioSource)' == '' OR '$(ReferenceFoundatioSource)' == 'false'" />
<ProjectReference Include="..\..\..\Foundatio\src\Foundatio\Foundatio.csproj" Condition="'$(ReferenceFoundatioSource)' == 'true'" />
Expand Down
17 changes: 8 additions & 9 deletions src/Foundatio.AWS/Queues/AttributeExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Amazon.SQS.Model;
using ThirdParty.Json.LitJson;

namespace Foundatio.Queues;

Expand All @@ -16,7 +16,7 @@ public static int ApproximateReceiveCount(this IDictionary<string, string> attri
if (!attributes.TryGetValue("ApproximateReceiveCount", out string v))
return 0;

int.TryParse(v, out int value);
Int32.TryParse(v, out int value);
return value;
}

Expand All @@ -29,7 +29,7 @@ public static DateTime SentTimestamp(this IDictionary<string, string> attributes
if (!attributes.TryGetValue("SentTimestamp", out string v))
return DateTime.MinValue;

if (!long.TryParse(v, out long value))
if (!Int64.TryParse(v, out long value))
return DateTime.MinValue;

return DateTimeOffset.FromUnixTimeMilliseconds(value).DateTime;
Expand Down Expand Up @@ -65,16 +65,15 @@ public static string DeadLetterQueue(this IDictionary<string, string> attributes
if (!attributes.TryGetValue("RedrivePolicy", out string v))
return null;

if (string.IsNullOrEmpty(v))
if (String.IsNullOrEmpty(v))
return null;

var redrivePolicy = JsonMapper.ToObject(v);

string arn = redrivePolicy["deadLetterTargetArn"]?.ToString();
if (string.IsNullOrEmpty(arn))
using var redrivePolicy = JsonDocument.Parse(v);
string arn = redrivePolicy.RootElement.GetProperty("deadLetterTargetArn").GetString();
if (String.IsNullOrEmpty(arn))
return null;

var parts = arn.Split(':');
string[] parts = arn.Split(':');
return parts.LastOrDefault();
}
}
28 changes: 15 additions & 13 deletions src/Foundatio.AWS/Queues/SQSQueue.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using Amazon;
using Amazon.Runtime;
using Amazon.Runtime.Credentials;
using Amazon.SQS;
using Amazon.SQS.Model;
using Foundatio.AsyncEx;
using Foundatio.Extensions;
using Foundatio.Serializer;
using Microsoft.Extensions.Logging;
using ThirdParty.Json.LitJson;

namespace Foundatio.Queues;

Expand All @@ -33,7 +34,7 @@ public SQSQueue(SQSQueueOptions<T> options) : base(options)
// TODO: Flow through the options like retries and the like.
_client = new Lazy<AmazonSQSClient>(() =>
{
var credentials = options.Credentials ?? FallbackCredentialsFactory.GetCredentials();
var credentials = options.Credentials ?? DefaultAWSCredentialsIdentityResolver.GetCredentials();

if (String.IsNullOrEmpty(options.ServiceUrl))
{
Expand Down Expand Up @@ -104,14 +105,15 @@ protected override async Task<string> EnqueueImplAsync(T data, QueueEntryOptions
message.MessageDeduplicationId = options.UniqueId;

if (!String.IsNullOrEmpty(options.CorrelationId))
message.MessageAttributes.Add("CorrelationId", new MessageAttributeValue
{
DataType = "String",
StringValue = options.CorrelationId
});
{
message.MessageAttributes ??= new Dictionary<string, MessageAttributeValue>();
message.MessageAttributes.Add("CorrelationId", new MessageAttributeValue { DataType = "String", StringValue = options.CorrelationId });
}

if (options.Properties is not null)
{
message.MessageAttributes ??= new Dictionary<string, MessageAttributeValue>();

foreach (var property in options.Properties)
message.MessageAttributes.Add(property.Key, new MessageAttributeValue
{
Expand Down Expand Up @@ -163,7 +165,7 @@ Task<ReceiveMessageResponse> ReceiveMessageAsync()
var response = await ReceiveMessageAsync().AnyContext();

// retry loop
while ((response == null || response.Messages.Count == 0) && !linkedCancellationToken.IsCancellationRequested)
while ((response?.Messages is null || response.Messages.Count == 0) && !linkedCancellationToken.IsCancellationRequested)
{
if (_options.DequeueInterval > TimeSpan.Zero)
{
Expand All @@ -180,7 +182,7 @@ Task<ReceiveMessageResponse> ReceiveMessageAsync()
response = await ReceiveMessageAsync().AnyContext();
}

if (response == null || response.Messages.Count == 0)
if (response?.Messages is null || response.Messages.Count == 0)
{
_logger.LogTrace("Response null or 0 message count");
return null;
Expand All @@ -190,7 +192,7 @@ Task<ReceiveMessageResponse> ReceiveMessageAsync()

_logger.LogTrace("Received message {MessageId} IsCancellationRequested={IsCancellationRequested}", response.Messages[0].MessageId, linkedCancellationToken.IsCancellationRequested);

var message = response.Messages.First();
var message = response.Messages[0];
string body = message.Body;
var data = _serializer.Deserialize<T>(body);
var entry = new SQSQueueEntry<T>(message, data, this);
Expand Down Expand Up @@ -455,15 +457,15 @@ protected virtual async Task CreateQueueAsync()

int maxReceiveCount = Math.Max(_options.Retries + 1, 1);
// step 4, set retry policy
var redrivePolicy = new JsonData
var redrivePolicy = new JsonObject
{
["maxReceiveCount"] = maxReceiveCount.ToString(),
["deadLetterTargetArn"] = deadAttributeResponse.QueueARN
};

var attributes = new Dictionary<string, string>
{
[QueueAttributeName.RedrivePolicy] = JsonMapper.ToJson(redrivePolicy)
[QueueAttributeName.RedrivePolicy] = redrivePolicy.ToJsonString()
};

var setAttributeRequest = new SetQueueAttributesRequest(_queueUrl, attributes);
Expand Down
8 changes: 5 additions & 3 deletions src/Foundatio.AWS/Queues/SQSQueueEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ public class SQSQueueEntry<T>
public SQSQueueEntry(Message message, T value, IQueue<T> queue)
: base(message.MessageId, message.CorrelationId(), value, queue, message.SentTimestamp(), message.ApproximateReceiveCount())
{

foreach (var property in message.MessageAttributes.Where(a => a.Key != "CorrelationId"))
Properties.Add(property.Key, property.Value.StringValue);
if (message.MessageAttributes is not null)
{
foreach (var property in message.MessageAttributes.Where(a => a.Key != "CorrelationId"))
Properties.Add(property.Key, property.Value.StringValue);
}

UnderlyingMessage = message;
}
Expand Down
23 changes: 12 additions & 11 deletions src/Foundatio.AWS/Storage/S3FileStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Amazon;
using Amazon.Runtime;
using Amazon.Runtime.Credentials;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Util;
Expand Down Expand Up @@ -42,7 +43,7 @@ public S3FileStorage(S3FileStorageOptions options)
_cannedAcl = options.CannedACL;
_allowInMemoryStream = options.AllowInMemoryStream;

var credentials = options.Credentials ?? FallbackCredentialsFactory.GetCredentials();
var credentials = options.Credentials ?? DefaultAWSCredentialsIdentityResolver.GetCredentials();

if (String.IsNullOrEmpty(options.ServiceUrl))
{
Expand Down Expand Up @@ -150,8 +151,8 @@ public async Task<FileSpec> GetFileInfoAsync(string path)
{
Path = req.Key,
Size = response.ContentLength,
Created = response.LastModified.ToUniversalTime(), // TODO: Need to fix this
Modified = response.LastModified.ToUniversalTime()
Created = response.LastModified?.ToUniversalTime() ?? DateTime.MinValue, // TODO: Need to fix this
Modified = response.LastModified?.ToUniversalTime() ?? DateTime.MinValue
};
}
catch (AmazonS3Exception ex)
Expand Down Expand Up @@ -309,7 +310,7 @@ public async Task<bool> ExistsAsync(string path)
const int PAGE_SIZE = 100;

var listRequest = new ListObjectsV2Request { BucketName = _bucket, Prefix = criteria.Prefix, MaxKeys = PAGE_SIZE };
var deleteRequest = new DeleteObjectsRequest { BucketName = _bucket };
var deleteRequest = new DeleteObjectsRequest { BucketName = _bucket, Objects = [] };
var errors = new List<DeleteError>();

ListObjectsV2Response listResponse;
Expand All @@ -318,15 +319,15 @@ public async Task<bool> ExistsAsync(string path)
listResponse = await _client.ListObjectsV2Async(listRequest, cancellationToken).AnyContext();
listRequest.ContinuationToken = listResponse.NextContinuationToken;

var keys = listResponse.S3Objects.MatchesPattern(criteria.Pattern).Select(o => new KeyVersion { Key = o.Key }).ToArray();
if (keys.Length == 0)
var keys = listResponse.S3Objects?.MatchesPattern(criteria.Pattern).Select(o => new KeyVersion { Key = o.Key }).ToArray();
if (keys is not { Length: > 0 })
continue;

deleteRequest.Objects.AddRange(keys);

_logger.LogInformation("Deleting {FileCount} files matching {SearchPattern}", keys.Length, searchPattern);
var deleteResponse = await _client.DeleteObjectsAsync(deleteRequest, cancellationToken).AnyContext();
if (deleteResponse.DeleteErrors.Count > 0)
if (deleteResponse.DeleteErrors is { Count: > 0 })
{
// retry 1 time, continue on.
var deleteRetryRequest = new DeleteObjectsRequest { BucketName = _bucket };
Expand All @@ -339,7 +340,7 @@ public async Task<bool> ExistsAsync(string path)
_logger.LogTrace("Deleted {FileCount} files matching {SearchPattern}", deleteResponse.DeletedObjects.Count, searchPattern);
count += deleteResponse.DeletedObjects.Count;
deleteRequest.Objects.Clear();
} while (listResponse.IsTruncated && !cancellationToken.IsCancellationRequested);
} while (listResponse.IsTruncated.GetValueOrDefault() && !cancellationToken.IsCancellationRequested);

if (errors.Count > 0)
{
Expand Down Expand Up @@ -382,9 +383,9 @@ private async Task<NextPageResult> GetFiles(SearchCriteria criteria, int pageSiz
return new NextPageResult
{
Success = response.HttpStatusCode.IsSuccessful(),
HasMore = response.IsTruncated,
Files = response.S3Objects.MatchesPattern(criteria.Pattern).Select(blob => blob.ToFileInfo()).Where(spec => spec is not null && !spec.IsDirectory()).ToList(),
NextPageFunc = response.IsTruncated ? _ => GetFiles(criteria, pageSize, cancellationToken, response.NextContinuationToken) : null
HasMore = response.IsTruncated.GetValueOrDefault(),
Files = response.S3Objects?.MatchesPattern(criteria.Pattern).Select(blob => blob.ToFileInfo()).Where(spec => spec is not null && !spec.IsDirectory()).ToList() ?? [],
NextPageFunc = response.IsTruncated.GetValueOrDefault() ? _ => GetFiles(criteria, pageSize, cancellationToken, response.NextContinuationToken) : null
};
}

Expand Down