From c70c63118c934f826e9798c5e018fb26902b9ea9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Wed, 17 Sep 2025 00:11:46 +0900 Subject: [PATCH 1/8] feat: Improve graceful shutdown and add AOF handling This commit enhances the asynchronous shutdown process in `Worker.cs` with the following changes: - The `StopAsync` method now waits up to 30 seconds for existing connections to complete before termination. - Added logic to flush the AOF (Append-Only File) buffer and create a checkpoint on shutdown. This commit operation is only performed if AOF is enabled. - Implemented the new `WaitForActiveConnectionsToComplete` method to check the status of active connections with a retry mechanism. - Called `GC.SuppressFinalize(this)` in the `Dispose` method to prevent unnecessary finalization. --- hosting/Windows/Garnet.worker/Worker.cs | 120 +++++++++++++++++++++++- 1 file changed, 118 insertions(+), 2 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index d69adb7e3c0..a777b86b7f0 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; @@ -43,8 +44,57 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// Indicates that the shutdown process should no longer be graceful. public override async Task StopAsync(CancellationToken cancellationToken) { - Dispose(); - await base.StopAsync(cancellationToken).ConfigureAwait(false); + try + { + // Wait for existing connections to complete (with timeout) + /* I consider Configurable via args, but args directly inject into garnet server's creation + If we want to make it configurable, We need to make a consent + Garnet.Worker.exe can have config file or env var or specific args for timeout + but I have scare about args, cause exsiting args are directly injected into garnet server's creation + So I decide to make it fixed value for now. 30 seconds should be enough for most scenarios. + */ + var timeout = TimeSpan.FromSeconds(30); + await WaitForActiveConnectionsToComplete(timeout, cancellationToken); + + // Flush AOF buffer and create checkpoint using Store API + if (server?.Store != null) + { + try + { + // CommitAOFAsync returns false if AOF is disabled + var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); + + if (commitSuccess) + { + // Wait only if commit was successful + await server.Store.WaitForCommitAsync(cancellationToken); + Console.WriteLine("AOF operations completed successfully."); + } + else + { + Console.WriteLine("AOF commit skipped (likely disabled or unavailable)."); + } + } + catch (Exception ex) + { + Console.WriteLine($"Error during graceful AOF operations: {ex.Message}"); + } + } + } + catch (OperationCanceledException) + { + // Force shutdown on cancellation request + } + catch (Exception ex) + { + Console.WriteLine($"Error during graceful shutdown: {ex.Message}"); + } + finally + { + // Resource cleanup + Dispose(); + await base.StopAsync(cancellationToken); + } } public override void Dispose() @@ -55,6 +105,72 @@ public override void Dispose() } server?.Dispose(); _isDisposed = true; + GC.SuppressFinalize(this); // This line resolve CA1816 Info: Dispose methods should call SuppressFinalize + } + + /// + /// Waits for active connections to complete within the specified timeout period. + /// + /// The timeout duration to wait for connections to complete + /// Cancellation token used to cancel the operation when shutdown is requested + /// A task that represents the asynchronous wait operation + private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, CancellationToken cancellationToken) + { + if (server?.Metrics == null) return; + + var stopwatch = Stopwatch.StartNew(); + var consecutiveErrors = 0; + const int maxConsecutiveErrors = 3; + + while (stopwatch.Elapsed < timeout && !cancellationToken.IsCancellationRequested) + { + try + { + // Use Metrics API to get SERVER info metrics + var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER); + + // Find connected_clients metric without LINQ + var activeConnections = 0; + for (int i = 0; i < serverMetrics.Length; i++) + { + if (serverMetrics[i].Name == "connected_clients") + { + if (int.TryParse(serverMetrics[i].Value, out activeConnections)) + break; + } + } + + if (activeConnections == 0) + { + Console.WriteLine("All connections have been closed gracefully."); + break; + } + + Console.WriteLine($"Waiting for {activeConnections} active connections to complete..."); + consecutiveErrors = 0; // Reset error counter on success + await Task.Delay(100, cancellationToken); + } + catch (Exception ex) + { + consecutiveErrors++; + Console.WriteLine($"Error checking active connections: {ex.Message}"); + + // Break only after multiple consecutive errors + if (consecutiveErrors >= maxConsecutiveErrors) + { + Console.WriteLine($"Too many consecutive errors ({consecutiveErrors}). Stopping connection check."); + break; + } + + // Continue with longer delay after error + await Task.Delay(500, cancellationToken); + } + } + + if (stopwatch.Elapsed >= timeout) + { + Console.WriteLine($"Timeout reached after {timeout.TotalSeconds} seconds. Some connections may still be active."); + } } } } \ No newline at end of file From 48913d10881f9cfb373f979b262900d2b40b80f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Wed, 17 Sep 2025 00:18:37 +0900 Subject: [PATCH 2/8] =?UTF-8?q?=F0=9F=9A=A8=20Fix=20formatting=20error=20i?= =?UTF-8?q?n=20CI=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hosting/Windows/Garnet.worker/Worker.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index a777b86b7f0..03d1d6ae198 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -63,7 +63,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) { // CommitAOFAsync returns false if AOF is disabled var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); - + if (commitSuccess) { // Wait only if commit was successful @@ -128,7 +128,7 @@ private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, Cancella { // Use Metrics API to get SERVER info metrics var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER); - + // Find connected_clients metric without LINQ var activeConnections = 0; for (int i = 0; i < serverMetrics.Length; i++) @@ -154,14 +154,14 @@ private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, Cancella { consecutiveErrors++; Console.WriteLine($"Error checking active connections: {ex.Message}"); - + // Break only after multiple consecutive errors if (consecutiveErrors >= maxConsecutiveErrors) { Console.WriteLine($"Too many consecutive errors ({consecutiveErrors}). Stopping connection check."); break; } - + // Continue with longer delay after error await Task.Delay(500, cancellationToken); } From 08cfc340efb0be50ecf501897f0ff80eeeedb373 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Wed, 17 Sep 2025 00:40:03 +0900 Subject: [PATCH 3/8] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20Fix=20Explain=20by=20C?= =?UTF-8?q?opilot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- hosting/Windows/Garnet.worker/Worker.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index 03d1d6ae198..ff94339f00f 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -47,12 +47,8 @@ public override async Task StopAsync(CancellationToken cancellationToken) try { // Wait for existing connections to complete (with timeout) - /* I consider Configurable via args, but args directly inject into garnet server's creation - If we want to make it configurable, We need to make a consent - Garnet.Worker.exe can have config file or env var or specific args for timeout - but I have scare about args, cause exsiting args are directly injected into garnet server's creation - So I decide to make it fixed value for now. 30 seconds should be enough for most scenarios. - */ + // Consider making timeout configurable via configuration file or environment variable. + // Currently using fixed 30-second timeout to avoid conflicts with existing server arguments. var timeout = TimeSpan.FromSeconds(30); await WaitForActiveConnectionsToComplete(timeout, cancellationToken); From 286537ba2dff0cb25dc69165c9b0ca90deb7eb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Wed, 17 Sep 2025 00:55:50 +0900 Subject: [PATCH 4/8] Refactor connection polling logic and add helper method Simplifies the polling interval logic for active connections by using a delay array and removing consecutive error tracking. Extracts active connection count retrieval into a new GetActiveConnectionCount() helper method for clarity and reuse. --- hosting/Windows/Garnet.worker/Worker.cs | 59 ++++++++++++++----------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index ff94339f00f..ecff5ae3213 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -115,26 +115,16 @@ private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, Cancella if (server?.Metrics == null) return; var stopwatch = Stopwatch.StartNew(); - var consecutiveErrors = 0; - const int maxConsecutiveErrors = 3; + + // Simple polling intervals: 50ms -> 300ms -> 1000ms + var delays = new[] { 50, 300, 1000 }; + var delayIndex = 0; while (stopwatch.Elapsed < timeout && !cancellationToken.IsCancellationRequested) { try { - // Use Metrics API to get SERVER info metrics - var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER); - - // Find connected_clients metric without LINQ - var activeConnections = 0; - for (int i = 0; i < serverMetrics.Length; i++) - { - if (serverMetrics[i].Name == "connected_clients") - { - if (int.TryParse(serverMetrics[i].Value, out activeConnections)) - break; - } - } + var activeConnections = GetActiveConnectionCount(); if (activeConnections == 0) { @@ -143,22 +133,18 @@ private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, Cancella } Console.WriteLine($"Waiting for {activeConnections} active connections to complete..."); - consecutiveErrors = 0; // Reset error counter on success - await Task.Delay(100, cancellationToken); + + // Use current delay and progress to next one (up to max) + var currentDelay = delays[delayIndex]; + if (delayIndex < delays.Length - 1) delayIndex++; + + await Task.Delay(currentDelay, cancellationToken); } catch (Exception ex) { - consecutiveErrors++; Console.WriteLine($"Error checking active connections: {ex.Message}"); - - // Break only after multiple consecutive errors - if (consecutiveErrors >= maxConsecutiveErrors) - { - Console.WriteLine($"Too many consecutive errors ({consecutiveErrors}). Stopping connection check."); - break; - } - - // Continue with longer delay after error + // Reset to fastest polling on error and wait a bit longer + delayIndex = 0; await Task.Delay(500, cancellationToken); } } @@ -168,5 +154,24 @@ private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, Cancella Console.WriteLine($"Timeout reached after {timeout.TotalSeconds} seconds. Some connections may still be active."); } } + + /// + /// Gets the current number of active connections from server metrics. + /// + /// Number of active connections, or 0 if not found + private int GetActiveConnectionCount() + { + var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER); + + for (int i = 0; i < serverMetrics.Length; i++) + { + if (serverMetrics[i].Name == "connected_clients") + { + return int.TryParse(serverMetrics[i].Value, out var count) ? count : 0; + } + } + + return 0; + } } } \ No newline at end of file From 82da9ab042424f9382bb7935ac599b93d791d661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Wed, 17 Sep 2025 01:01:56 +0900 Subject: [PATCH 5/8] Fix resource cleanup and null checks in Worker Moved Dispose() call after base.StopAsync in the finally block to ensure proper resource cleanup order. Added null checks for server and server.Metrics in GetActiveConnectionCount to prevent possible null reference exceptions. --- hosting/Windows/Garnet.worker/Worker.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index ecff5ae3213..7f6960039cb 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -88,8 +88,8 @@ public override async Task StopAsync(CancellationToken cancellationToken) finally { // Resource cleanup - Dispose(); await base.StopAsync(cancellationToken); + Dispose(); } } @@ -161,6 +161,8 @@ private async Task WaitForActiveConnectionsToComplete(TimeSpan timeout, Cancella /// Number of active connections, or 0 if not found private int GetActiveConnectionCount() { + if (server == null || server.Metrics == null) + return 0; var serverMetrics = server.Metrics.GetInfoMetrics(Garnet.common.InfoMetricsType.SERVER); for (int i = 0; i < serverMetrics.Length; i++) From 4b87aa6fdb81415dcd9d239c3d44a155d5477da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Fri, 10 Oct 2025 13:25:30 +0900 Subject: [PATCH 6/8] Add checkpoint support for tiered storage on shutdown During graceful shutdown, the worker now checks if tiered storage is enabled and takes a checkpoint using StoreWrapper if so. If tiered storage is not enabled, it falls back to flushing the AOF buffer as before. This ensures data consistency for both storage modes. --- hosting/Windows/Garnet.worker/Worker.cs | 56 +++++++++++++++++++------ 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index 7f6960039cb..6f8d93e974b 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; +using Garnet.server; namespace Garnet { @@ -52,28 +53,57 @@ public override async Task StopAsync(CancellationToken cancellationToken) var timeout = TimeSpan.FromSeconds(30); await WaitForActiveConnectionsToComplete(timeout, cancellationToken); - // Flush AOF buffer and create checkpoint using Store API - if (server?.Store != null) + // Take checkpoint if tiered storage is enabled, otherwise flush AOF buffer + if (server != null) { try { - // CommitAOFAsync returns false if AOF is disabled - var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); + // Access storeWrapper field using reflection + var storeWrapperField = server.GetType().GetField("storeWrapper", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + var storeWrapper = storeWrapperField?.GetValue(server) as StoreWrapper; - if (commitSuccess) + if (storeWrapper != null) { - // Wait only if commit was successful - await server.Store.WaitForCommitAsync(cancellationToken); - Console.WriteLine("AOF operations completed successfully."); - } - else - { - Console.WriteLine("AOF commit skipped (likely disabled or unavailable)."); + bool enableStorageTier = storeWrapper.serverOptions.EnableStorageTier; + + if (enableStorageTier) + { + // Checkpoint takes priority when both tiered storage and AOF are enabled + Console.WriteLine("Taking checkpoint for tiered storage..."); + bool checkpointSuccess = storeWrapper.TakeCheckpoint(background: false, logger: null, token: cancellationToken); + + if (checkpointSuccess) + { + Console.WriteLine("Checkpoint completed successfully."); + } + else + { + Console.WriteLine("Checkpoint skipped (another checkpoint in progress)."); + } + } + else if (server.Store != null) + { + // Flush AOF buffer if AOF is enabled + // CommitAOFAsync returns false if AOF is disabled + var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); + + if (commitSuccess) + { + // Wait only if commit was successful + await server.Store.WaitForCommitAsync(cancellationToken); + Console.WriteLine("AOF operations completed successfully."); + } + else + { + Console.WriteLine("AOF commit skipped (likely disabled or unavailable)."); + } + } } } catch (Exception ex) { - Console.WriteLine($"Error during graceful AOF operations: {ex.Message}"); + Console.WriteLine($"Error during graceful shutdown operations: {ex.Message}"); } } } From 2a9943e6c37ad7da9e12bd08254681ce8632e3ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Fri, 10 Oct 2025 13:29:36 +0900 Subject: [PATCH 7/8] =?UTF-8?q?=F0=9F=9A=A8=20Fix=20Code=20Formmating?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hosting/Windows/Garnet.worker/Worker.cs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index 6f8d93e974b..093bcf6f5e8 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -5,8 +5,8 @@ using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Hosting; using Garnet.server; +using Microsoft.Extensions.Hosting; namespace Garnet { @@ -59,20 +59,19 @@ public override async Task StopAsync(CancellationToken cancellationToken) try { // Access storeWrapper field using reflection - var storeWrapperField = server.GetType().GetField("storeWrapper", + var storeWrapperField = server.GetType().GetField("storeWrapper", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); - var storeWrapper = storeWrapperField?.GetValue(server) as StoreWrapper; - if (storeWrapper != null) + if (storeWrapperField?.GetValue(server) is StoreWrapper storeWrapper) { - bool enableStorageTier = storeWrapper.serverOptions.EnableStorageTier; + var enableStorageTier = storeWrapper.serverOptions.EnableStorageTier; if (enableStorageTier) { // Checkpoint takes priority when both tiered storage and AOF are enabled Console.WriteLine("Taking checkpoint for tiered storage..."); - bool checkpointSuccess = storeWrapper.TakeCheckpoint(background: false, logger: null, token: cancellationToken); - + var checkpointSuccess = storeWrapper.TakeCheckpoint(background: false, logger: null, token: cancellationToken); + if (checkpointSuccess) { Console.WriteLine("Checkpoint completed successfully."); @@ -91,7 +90,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) if (commitSuccess) { // Wait only if commit was successful - await server.Store.WaitForCommitAsync(cancellationToken); + _ = await server.Store.WaitForCommitAsync(cancellationToken); Console.WriteLine("AOF operations completed successfully."); } else From 58e80911e76798b541ca5c6e51e1b6b17230e256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=9C=A0=EC=84=9D=28Yu=20Seok=20Kim=29?= Date: Fri, 10 Oct 2025 14:12:42 +0900 Subject: [PATCH 8/8] =?UTF-8?q?=F0=9F=90=9B=20Fix=20bug=20like=20#1390?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit save AOF commit first and secondary take checkpoint to ensure state preserving --- hosting/Windows/Garnet.worker/Worker.cs | 36 +++++++++++-------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/hosting/Windows/Garnet.worker/Worker.cs b/hosting/Windows/Garnet.worker/Worker.cs index 093bcf6f5e8..9f6656c5471 100644 --- a/hosting/Windows/Garnet.worker/Worker.cs +++ b/hosting/Windows/Garnet.worker/Worker.cs @@ -53,22 +53,34 @@ public override async Task StopAsync(CancellationToken cancellationToken) var timeout = TimeSpan.FromSeconds(30); await WaitForActiveConnectionsToComplete(timeout, cancellationToken); - // Take checkpoint if tiered storage is enabled, otherwise flush AOF buffer if (server != null) { try { - // Access storeWrapper field using reflection var storeWrapperField = server.GetType().GetField("storeWrapper", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); if (storeWrapperField?.GetValue(server) is StoreWrapper storeWrapper) { var enableStorageTier = storeWrapper.serverOptions.EnableStorageTier; + var enableAOF = storeWrapper.serverOptions.EnableAOF; + + // ensure all AOF operations are committed before checkpointing or shutdown + if (enableAOF) + { + Console.WriteLine("Committing AOF before checkpoint/shutdown..."); + var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); + + if (commitSuccess) + { + _ = await server.Store.WaitForCommitAsync(cancellationToken); + Console.WriteLine("AOF committed successfully."); + } + } if (enableStorageTier) { - // Checkpoint takes priority when both tiered storage and AOF are enabled + // take checkpoint with all AOF operations committed Console.WriteLine("Taking checkpoint for tiered storage..."); var checkpointSuccess = storeWrapper.TakeCheckpoint(background: false, logger: null, token: cancellationToken); @@ -81,23 +93,6 @@ public override async Task StopAsync(CancellationToken cancellationToken) Console.WriteLine("Checkpoint skipped (another checkpoint in progress)."); } } - else if (server.Store != null) - { - // Flush AOF buffer if AOF is enabled - // CommitAOFAsync returns false if AOF is disabled - var commitSuccess = await server.Store.CommitAOFAsync(cancellationToken); - - if (commitSuccess) - { - // Wait only if commit was successful - _ = await server.Store.WaitForCommitAsync(cancellationToken); - Console.WriteLine("AOF operations completed successfully."); - } - else - { - Console.WriteLine("AOF commit skipped (likely disabled or unavailable)."); - } - } } } catch (Exception ex) @@ -116,7 +111,6 @@ public override async Task StopAsync(CancellationToken cancellationToken) } finally { - // Resource cleanup await base.StopAsync(cancellationToken); Dispose(); }