Skip to content
Closed
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c70c631
feat: Improve graceful shutdown and add AOF handling
yuseok-kim-edushare Sep 16, 2025
48913d1
🚨 Fix formatting error in CI test
yuseok-kim-edushare Sep 16, 2025
08cfc34
✏️ Fix Explain by Copilot
yuseok-kim-edushare Sep 16, 2025
286537b
Refactor connection polling logic and add helper method
yuseok-kim-edushare Sep 16, 2025
82da9ab
Fix resource cleanup and null checks in Worker
yuseok-kim-edushare Sep 16, 2025
ff7f6d4
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Sep 30, 2025
c5c906f
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Oct 8, 2025
d2602f8
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Oct 9, 2025
4b87aa6
Add checkpoint support for tiered storage on shutdown
yuseok-kim-edushare Oct 10, 2025
2a9943e
🚨 Fix Code Formmating
yuseok-kim-edushare Oct 10, 2025
58e8091
🐛 Fix bug like #1390
yuseok-kim-edushare Oct 10, 2025
f5372bf
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Oct 19, 2025
d8a46a2
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Oct 21, 2025
fe006e3
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Oct 26, 2025
969ec81
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Oct 31, 2025
350044b
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Nov 3, 2025
74acd5a
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Nov 4, 2025
827b54d
Merge branch 'main' into feature/win-service-graceful-shutdown
yuseok-kim-edushare Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 144 additions & 2 deletions hosting/Windows/Garnet.worker/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Garnet.server;
using Microsoft.Extensions.Hosting;

namespace Garnet
Expand Down Expand Up @@ -43,8 +45,75 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param>
public override async Task StopAsync(CancellationToken cancellationToken)
{
Dispose();
await base.StopAsync(cancellationToken).ConfigureAwait(false);
try
{
// Wait for existing connections to complete (with timeout)
// Consider making timeout configurable via configuration file or environment variable.
// Currently using fixed 30-second timeout to avoid conflicts with existing server arguments.
var timeout = TimeSpan.FromSeconds(30);
await WaitForActiveConnectionsToComplete(timeout, cancellationToken);

if (server != null)
{
try
{
var storeWrapperField = server.GetType().GetField("storeWrapper",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of trying to operate from outside using reflection, why not add a StopAsync method to GarnetServer, so that it can perform the graceful shutdown without reflection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Your suggestion is reasonable!
Instead of using complex reflexion in Worker, Graceful Shutdown logic insertion into Garnet Server more simply maintanable and understandable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, look forward to the update!

System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);

if (storeWrapperField?.GetValue(server) is StoreWrapper storeWrapper)
{
var enableStorageTier = storeWrapper.serverOptions.EnableStorageTier;
var enableAOF = storeWrapper.serverOptions.EnableAOF;

// ensure all AOF operations are committed before checkpointing or shutdown
if (enableAOF)
{
Console.WriteLine("Committing AOF before checkpoint/shutdown...");
var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken);

if (commitSuccess)
{
_ = await server.Store.WaitForCommitAsync(cancellationToken);
Console.WriteLine("AOF committed successfully.");
}
}

if (enableStorageTier)
{
// take checkpoint with all AOF operations committed
Console.WriteLine("Taking checkpoint for tiered storage...");
var checkpointSuccess = storeWrapper.TakeCheckpoint(background: false, logger: null, token: cancellationToken);

if (checkpointSuccess)
{
Console.WriteLine("Checkpoint completed successfully.");
}
else
{
Console.WriteLine("Checkpoint skipped (another checkpoint in progress).");
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"Error during graceful shutdown operations: {ex.Message}");
}
}
}
catch (OperationCanceledException)
{
// Force shutdown on cancellation request
}
catch (Exception ex)
{
Console.WriteLine($"Error during graceful shutdown: {ex.Message}");
}
finally
{
await base.StopAsync(cancellationToken);
Dispose();
}
}

public override void Dispose()
Expand All @@ -55,6 +124,79 @@ public override void Dispose()
}
server?.Dispose();
_isDisposed = true;
GC.SuppressFinalize(this); // This line resolve CA1816 Info: Dispose methods should call SuppressFinalize
}

/// <summary>
/// Waits for active connections to complete within the specified timeout period.
/// </summary>
/// <param name="timeout">The timeout duration to wait for connections to complete</param>
/// <param name="cancellationToken">Cancellation token used to cancel the operation when shutdown is requested</param>
/// <returns>A task that represents the asynchronous wait operation</returns>
private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, CancellationToken cancellationToken)
{
if (server?.Metrics == null) return;

var stopwatch = Stopwatch.StartNew();

// Simple polling intervals: 50ms -> 300ms -> 1000ms
var delays = new[] { 50, 300, 1000 };
var delayIndex = 0;

while (stopwatch.Elapsed < timeout && !cancellationToken.IsCancellationRequested)
{
try
{
var activeConnections = GetActiveConnectionCount();

if (activeConnections == 0)
{
Console.WriteLine("All connections have been closed gracefully.");
break;
}

Console.WriteLine($"Waiting for {activeConnections} active connections to complete...");

// Use current delay and progress to next one (up to max)
var currentDelay = delays[delayIndex];
if (delayIndex < delays.Length - 1) delayIndex++;

await Task.Delay(currentDelay, cancellationToken);
}
catch (Exception ex)
{
Console.WriteLine($"Error checking active connections: {ex.Message}");
// Reset to fastest polling on error and wait a bit longer
delayIndex = 0;
await Task.Delay(500, cancellationToken);
}
}

if (stopwatch.Elapsed >= timeout)
{
Console.WriteLine($"Timeout reached after {timeout.TotalSeconds} seconds. Some connections may still be active.");
}
}

/// <summary>
/// Gets the current number of active connections from server metrics.
/// </summary>
/// <returns>Number of active connections, or 0 if not found</returns>
private int GetActiveConnectionCount()
{
if (server == null || server.Metrics == null)
return 0;
var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER);

for (int i = 0; i < serverMetrics.Length; i++)
{
if (serverMetrics[i].Name == "connected_clients")
{
return int.TryParse(serverMetrics[i].Value, out var count) ? count : 0;
}
}

return 0;
}
}
}
Loading