Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public int PartitionStartupTimeoutMinutes { get; set; } = 15;

/// <summary>
/// If true, disables the prefetching during replay.
/// </summary>
public bool DisablePrefetchDuringReplay { get; set; } = false;

/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ partial class BlobManager : ICheckpointManager, ILogCommitManager
BlobUtilsV12.BlockBlobClients eventLogCommitBlob;
BlobLeaseClient leaseClient;

BlobUtilsV12.BlockBlobClients checkpointCompletedBlob;

BlobUtilsV12.BlobDirectory pageBlobPartitionDirectory;
BlobUtilsV12.BlobDirectory blockBlobPartitionDirectory;

Expand Down Expand Up @@ -419,6 +421,7 @@ public async Task StartAsync()

this.eventLogCommitBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(CommitBlobName);
this.leaseClient = this.eventLogCommitBlob.WithRetries.GetBlobLeaseClient();
this.checkpointCompletedBlob = this.blockBlobPartitionDirectory.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());

AzureStorageDevice createDevice(string name) =>
new AzureStorageDevice(name, this.blockBlobPartitionDirectory.GetSubDirectory(name), this.pageBlobPartitionDirectory.GetSubDirectory(name), this, true);
Expand Down Expand Up @@ -1057,10 +1060,11 @@ IEnumerable<Guid> ICheckpointManager.GetLogCheckpointTokens()

internal async Task<bool> FindCheckpointsAsync(bool logIsEmpty)
{
BlobUtilsV12.BlockBlobClients checkpointCompletedBlob = default;
string jsonString = null;
DateTimeOffset lastModified = default;

try
{
string jsonString = null;

if (this.UseLocalFiles)
{
Expand All @@ -1076,24 +1080,24 @@ internal async Task<bool> FindCheckpointsAsync(bool logIsEmpty)
else
{
var partDir = this.blockBlobPartitionDirectory;
checkpointCompletedBlob = partDir.GetBlockBlobClient(this.GetCheckpointCompletedBlobName());

await this.PerformWithRetriesAsync(
semaphore: null,
requireLease: true,
"BlockBlobClient.DownloadContentAsync",
"FindCheckpointsAsync",
"",
checkpointCompletedBlob.Name,
this.checkpointCompletedBlob.Name,
1000,
true,
failIfReadonly: false,
async (numAttempts) =>
{
try
{
Azure.Response<BlobDownloadResult> downloadResult = await checkpointCompletedBlob.WithRetries.DownloadContentAsync();
Azure.Response<BlobDownloadResult> downloadResult = await this.checkpointCompletedBlob.WithRetries.DownloadContentAsync();
jsonString = downloadResult.Value.Content.ToString();
lastModified = downloadResult.Value.Details.LastModified;
this.CheckpointInfoETag = downloadResult.Value.Details.ETag;
return 1;
}
Expand All @@ -1105,22 +1109,58 @@ await this.PerformWithRetriesAsync(
});
}

if (jsonString == null)
{
return false;
}
else
{
// read the fields from the json to update the checkpoint info
JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
return true;
}
}
catch (Exception e)
{
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not find any checkpoint", this.checkpointCompletedBlob.Name, e, true, this.PartitionErrorHandler.IsTerminated);
throw;
}

if (jsonString == null)
{
return false;
}

try
{
// read the fields from the json to update the checkpoint info
JsonConvert.PopulateObject(jsonString, this.CheckpointInfo);
}
catch (JsonException e)
{
this.HandleStorageError(nameof(FindCheckpointsAsync), "could not parse json file describing last checkpoint", this.checkpointCompletedBlob.Name, e, true, false);
throw;
}

if (this.CheckpointInfo.RecoveryAttempts > 0 || DateTimeOffset.UtcNow - lastModified > TimeSpan.FromMinutes(5))
{
this.CheckpointInfo.RecoveryAttempts++;

this.TraceHelper.FasterProgress($"Incremented recovery attempt counter to {this.CheckpointInfo.RecoveryAttempts} in {this.checkpointCompletedBlob.Name}.");

await this.WriteCheckpointMetadataAsync();

if (this.CheckpointInfo.RecoveryAttempts > 3 && this.CheckpointInfo.RecoveryAttempts < 30)
{
this.TraceHelper.BoostTracing = true;
}
}

return true;
Comment on lines +1135 to +1156
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this could fail definitely - should we have a cap to how big this integer can grow? Maybe it it's larger than ~100, it's not worth increasing it further, or is it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why capping the counter itself would be useful. No matter how large, it will still give us useful information (also in the traces).

Or did you mean to cap the actual recovery attempts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the extrene: I just worry about the integer getting too large to represent, and then causing another class of issues. In general, I think there's no benefit in increasing this counter past ~10k, for example. I'd prefer to have an upper limit here. After ~10k, we know it is simply "too many" anyways. I do feel a bit stronger about this.

}

public async Task ClearRecoveryAttempts()
{
if (this.CheckpointInfo.RecoveryAttempts > 0)
{
this.CheckpointInfo.RecoveryAttempts = 0;

this.TraceHelper.BoostTracing = false;

await this.WriteCheckpointMetadataAsync();

this.TraceHelper.FasterProgress($"Cleared recovery attempt counter in {this.checkpointCompletedBlob.Name}.");
}
}

void ICheckpointManager.CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
Expand Down Expand Up @@ -1436,7 +1476,7 @@ await this.PerformWithRetriesAsync(
}
}

internal async Task FinalizeCheckpointCompletedAsync()
internal async Task WriteCheckpointMetadataAsync()
{
var jsonText = JsonConvert.SerializeObject(this.CheckpointInfo, Formatting.Indented);
if (this.UseLocalFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,46 @@ namespace DurableTask.Netherite.Faster
[JsonObject]
class CheckpointInfo
{
/// <summary>
/// The FasterKV token for the last index checkpoint taken before this checkpoint.
/// </summary>
[JsonProperty]
public Guid IndexToken { get; set; }

/// <summary>
/// The FasterKV token for this checkpoint.
/// </summary>
[JsonProperty]
public Guid LogToken { get; set; }

/// <summary>
/// The FasterLog position for this checkpoint.
/// </summary>
[JsonProperty]
public long CommitLogPosition { get; set; }

/// <summary>
/// The input queue (event hubs) position for this checkpoint.
/// </summary>
[JsonProperty]
public long InputQueuePosition { get; set; }

/// <summary>
/// If the input queue position is a batch, the position within the batch.
/// </summary>
[JsonProperty]
public int InputQueueBatchPosition { get; set; }

/// <summary>
/// The input queue fingerprint for this checkpoint.
/// </summary>
[JsonProperty]
public string InputQueueFingerprint { get; set; }

[JsonProperty]
public long NumberInstances { get; set; }
/// <summary>
/// The number of recovery attempts that have been made for this checkpoint.
/// </summary>
//[JsonProperty]
public int RecoveryAttempts { get; set; }
}
}
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/StorageLayer/Faster/FasterKV.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void RunTask() {

public override async Task FinalizeCheckpointCompletedAsync(Guid guid)
{
await this.blobManager.FinalizeCheckpointCompletedAsync();
await this.blobManager.WriteCheckpointMetadataAsync();

if (this.cacheDebugger == null)
{
Expand Down Expand Up @@ -739,7 +739,7 @@ public override async Task RunPrefetchSession(IAsyncEnumerable<TrackedObjectKey>
long lastReport = 0;
void ReportProgress(int elapsedMillisecondsThreshold)
{
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold)
if (stopwatch.ElapsedMilliseconds - lastReport >= elapsedMillisecondsThreshold || this.TraceHelper.BoostTracing)
{
this.blobManager.TraceHelper.FasterProgress(
$"FasterKV PrefetchSession {sessionId} elapsed={stopwatch.Elapsed.TotalSeconds:F2}s issued={numberIssued} pending={maxConcurrency - prefetchSemaphore.CurrentCount} hits={numberHits} misses={numberMisses}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public FasterTraceHelper(ILogger logger, LogLevel logLevelLimit, ILogger perform
this.partitionId = (int) partitionId;
}

public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace;
public bool IsTracingAtMostDetailedLevel => this.logLevelLimit == LogLevel.Trace || this.BoostTracing;

public bool BoostTracing { get; set; }

// ----- faster storage layer events

Expand Down Expand Up @@ -139,7 +141,7 @@ public void FasterProgress(Func<string> constructString)

public void FasterStorageProgress(string details)
{
if (this.logLevelLimit <= LogLevel.Trace)
if (this.logLevelLimit <= LogLevel.Trace || this.BoostTracing)
{
this.logger.LogTrace("Part{partition:D2} {details}", this.partitionId, details);
EtwSource.Log.FasterStorageProgress(this.account, this.taskHub, this.partitionId, details, TraceUtils.AppName, TraceUtils.ExtensionVersion);
Expand Down
8 changes: 6 additions & 2 deletions src/DurableTask.Netherite/StorageLayer/Faster/LogWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected override async Task Process(IList<PartitionUpdateEvent> batch)
}
}

public async Task ReplayCommitLog(long from, StoreWorker worker)
public async Task ReplayCommitLog(long from, StoreWorker worker, bool enablePrefetch)
{
// this procedure is called by StoreWorker during recovery. It replays all the events
// that were committed to the log but are not reflected in the loaded store checkpoint.
Expand All @@ -220,7 +220,11 @@ public async Task ReplayCommitLog(long from, StoreWorker worker)

var fetchTask = this.FetchEvents(from, replayChannel.Writer, prefetchChannel.Writer);
var replayTask = Task.Run(() => this.ReplayEvents(replayChannel.Reader, worker));
var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));

if (enablePrefetch)
{
var prefetchTask = Task.Run(() => worker.RunPrefetchSession(prefetchChannel.Reader.ReadAllAsync(this.cancellationToken)));
}

await fetchTask;
await replayTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ class PartitionStorage : IPartitionState
readonly ILogger logger;
readonly ILogger performanceLogger;
readonly MemoryTracker memoryTracker;
//readonly CloudStorageAccount storageAccount;
//readonly string localFileDirectory;
//readonly CloudStorageAccount pageBlobStorageAccount;

Partition partition;
BlobManager blobManager;
Expand Down Expand Up @@ -193,7 +190,10 @@ async Task TerminationWrapper(Task what)
if (this.log.TailAddress > (long)this.storeWorker.CommitLogPosition)
{
// replay log as the store checkpoint lags behind the log
await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker));

bool disablePrefetch = this.blobManager.CheckpointInfo.RecoveryAttempts > 6 || this.settings.DisablePrefetchDuringReplay;
Comment thread
davidmrdavid marked this conversation as resolved.
Outdated

await this.TerminationWrapper(this.storeWorker.ReplayCommitLog(this.logWorker, prefetch: !disablePrefetch));
}
}
catch (OperationCanceledException) when (this.partition.ErrorHandler.IsTerminated)
Expand All @@ -215,6 +215,8 @@ async Task TerminationWrapper(Task what)
}

this.TraceHelper.FasterProgress("Recovery complete");

await this.blobManager.ClearRecoveryAttempts();
}
this.blobManager.FaultInjector?.Started(this.blobManager);
return this.storeWorker.InputQueuePosition;
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,16 @@ protected override async Task Process(IList<PartitionEvent> batch)
return target;
}

public async Task ReplayCommitLog(LogWorker logWorker)
public async Task ReplayCommitLog(LogWorker logWorker, bool prefetch)
{
var startPosition = this.CommitLogPosition;
this.traceHelper.FasterProgress($"Replaying log from {startPosition}");
this.traceHelper.FasterProgress($"Replaying log from {startPosition} prefetch={prefetch}");

var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();

this.effectTracker.IsReplaying = true;
await logWorker.ReplayCommitLog(startPosition, this);
await logWorker.ReplayCommitLog(startPosition, this, prefetch);
stopwatch.Stop();
this.effectTracker.IsReplaying = false;

Expand Down