diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index d69adb7e3c0..9f6656c5471 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -2,8 +2,10 @@ // Licensed under the MIT license. using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; +using Garnet.server; using Microsoft.Extensions.Hosting; namespace Garnet @@ -43,8 +45,75 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// Indicates that the shutdown process should no longer be graceful. public override async Task StopAsync(CancellationToken cancellationToken) { - Dispose(); - await base.StopAsync(cancellationToken).ConfigureAwait(false); + try + { + // Wait for existing connections to complete (with timeout) + // Consider making timeout configurable via configuration file or environment variable. + // Currently using fixed 30-second timeout to avoid conflicts with existing server arguments. + var timeout = TimeSpan.FromSeconds(30); + await WaitForActiveConnectionsToComplete(timeout, cancellationToken); + + if (server != null) + { + try + { + var storeWrapperField = server.GetType().GetField("storeWrapper", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + + if (storeWrapperField?.GetValue(server) is StoreWrapper storeWrapper) + { + var enableStorageTier = storeWrapper.serverOptions.EnableStorageTier; + var enableAOF = storeWrapper.serverOptions.EnableAOF; + + // ensure all AOF operations are committed before checkpointing or shutdown + if (enableAOF) + { + Console.WriteLine("Committing AOF before checkpoint/shutdown..."); + var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); + + if (commitSuccess) + { + _ = await server.Store.WaitForCommitAsync(cancellationToken); + Console.WriteLine("AOF committed successfully."); + } + } + + if (enableStorageTier) + { + // take checkpoint with all AOF operations committed + Console.WriteLine("Taking checkpoint for tiered storage..."); + var checkpointSuccess = storeWrapper.TakeCheckpoint(background: false, logger: null, token: cancellationToken); + + if (checkpointSuccess) + { + Console.WriteLine("Checkpoint completed successfully."); + } + else + { + Console.WriteLine("Checkpoint skipped (another checkpoint in progress)."); + } + } + } + } + catch (Exception ex) + { + Console.WriteLine($"Error during graceful shutdown operations: {ex.Message}"); + } + } + } + catch (OperationCanceledException) + { + // Force shutdown on cancellation request + } + catch (Exception ex) + { + Console.WriteLine($"Error during graceful shutdown: {ex.Message}"); + } + finally + { + await base.StopAsync(cancellationToken); + Dispose(); + } } public override void Dispose() @@ -55,6 +124,79 @@ public override void Dispose() } server?.Dispose(); _isDisposed = true; + GC.SuppressFinalize(this); // This line resolve CA1816 Info: Dispose methods should call SuppressFinalize + } + + /// + /// Waits for active connections to complete within the specified timeout period. + /// + /// The timeout duration to wait for connections to complete + /// Cancellation token used to cancel the operation when shutdown is requested + /// A task that represents the asynchronous wait operation + private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, CancellationToken cancellationToken) + { + if (server?.Metrics == null) return; + + var stopwatch = Stopwatch.StartNew(); + + // Simple polling intervals: 50ms -> 300ms -> 1000ms + var delays = new[] { 50, 300, 1000 }; + var delayIndex = 0; + + while (stopwatch.Elapsed < timeout && !cancellationToken.IsCancellationRequested) + { + try + { + var activeConnections = GetActiveConnectionCount(); + + if (activeConnections == 0) + { + Console.WriteLine("All connections have been closed gracefully."); + break; + } + + Console.WriteLine($"Waiting for {activeConnections} active connections to complete..."); + + // Use current delay and progress to next one (up to max) + var currentDelay = delays[delayIndex]; + if (delayIndex < delays.Length - 1) delayIndex++; + + await Task.Delay(currentDelay, cancellationToken); + } + catch (Exception ex) + { + Console.WriteLine($"Error checking active connections: {ex.Message}"); + // Reset to fastest polling on error and wait a bit longer + delayIndex = 0; + await Task.Delay(500, cancellationToken); + } + } + + if (stopwatch.Elapsed >= timeout) + { + Console.WriteLine($"Timeout reached after {timeout.TotalSeconds} seconds. Some connections may still be active."); + } + } + + /// + /// Gets the current number of active connections from server metrics. + /// + /// Number of active connections, or 0 if not found + private int GetActiveConnectionCount() + { + if (server == null || server.Metrics == null) + return 0; + var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER); + + for (int i = 0; i < serverMetrics.Length; i++) + { + if (serverMetrics[i].Name == "connected_clients") + { + return int.TryParse(serverMetrics[i].Value, out var count) ? count : 0; + } + } + + return 0; } } } \ No newline at end of file