diff --git a/azureappconfiguration/azureappconfiguration.go b/azureappconfiguration/azureappconfiguration.go index 0372bde..9ea60f5 100644 --- a/azureappconfiguration/azureappconfiguration.go +++ b/azureappconfiguration/azureappconfiguration.go @@ -43,23 +43,25 @@ type AzureAppConfiguration struct { featureFlags map[string]any // Settings configured from Options - kvSelectors []Selector - ffEnabled bool - ffSelectors []Selector - trimPrefixes []string - watchedSettings []WatchedSetting + kvSelectors []Selector + ffEnabled bool + ffSelectors []Selector + trimPrefixes []string + watchedSettings []WatchedSetting + loadBalancingEnabled bool // Settings used for refresh scenarios - sentinelETags map[WatchedSetting]*azcore.ETag - watchAll bool - kvETags map[Selector][]*azcore.ETag - ffETags map[Selector][]*azcore.ETag - keyVaultRefs map[string]string // unversioned Key Vault references - kvRefreshTimer refresh.Condition - secretRefreshTimer refresh.Condition - ffRefreshTimer refresh.Condition - onRefreshSuccess []func() - tracingOptions tracing.Options + sentinelETags map[WatchedSetting]*azcore.ETag + watchAll bool + kvETags map[Selector][]*azcore.ETag + ffETags map[Selector][]*azcore.ETag + keyVaultRefs map[string]string // unversioned Key Vault references + kvRefreshTimer refresh.Condition + secretRefreshTimer refresh.Condition + ffRefreshTimer refresh.Condition + onRefreshSuccess []func() + tracingOptions tracing.Options + lastSuccessfulEndpoint string // Clients talking to Azure App Configuration/Azure Key Vault service clientManager clientManager @@ -103,6 +105,7 @@ func Load(ctx context.Context, authentication AuthenticationOptions, options *Op azappcfg.featureFlags = make(map[string]any) azappcfg.kvSelectors = deduplicateSelectors(options.Selectors) azappcfg.ffEnabled = options.FeatureFlagOptions.Enabled + azappcfg.loadBalancingEnabled = options.LoadBalancingEnabled azappcfg.trimPrefixes = options.TrimKeyPrefixes azappcfg.clientManager = clientManager @@ -631,6 +634,10 @@ func (azappcfg *AzureAppConfiguration) executeFailoverPolicy(ctx context.Context azappcfg.clientManager.refreshClients(ctx) return fmt.Errorf("no client is available to connect to the target App Configuration store") } + // If load balancing is enabled, rotate the clients so that the next client to be used is not the last successful one + if azappcfg.loadBalancingEnabled && azappcfg.lastSuccessfulEndpoint != "" && len(clients) > 1 { + rotateClientsToNextEndpoint(clients, azappcfg.lastSuccessfulEndpoint) + } if manager, ok := azappcfg.clientManager.(*configurationClientManager); ok { azappcfg.tracingOptions.ReplicaCount = len(manager.dynamicClients) @@ -651,6 +658,10 @@ func (azappcfg *AzureAppConfiguration) executeFailoverPolicy(ctx context.Context } clientWrapper.updateBackoffStatus(true) + // Update the last successful endpoint for load balancing + if azappcfg.loadBalancingEnabled { + azappcfg.lastSuccessfulEndpoint = clientWrapper.endpoint + } return nil } @@ -748,6 +759,10 @@ func configureTracingOptions(options *Options) tracing.Options { tracingOption.KeyVaultConfigured = true } + if options.LoadBalancingEnabled { + tracingOption.IsLoadBalancingEnabled = true + } + if options.FeatureFlagOptions.Enabled { tracingOption.FeatureFlagTracing = &tracing.FeatureFlagTracing{} } @@ -854,6 +869,56 @@ func (azappcfg *AzureAppConfiguration) updateFeatureFlagTracing(featureFlag map[ } } +func rotateClientsToNextEndpoint(clients []*configurationClientWrapper, lastSuccessfulEndpoint string) { + if len(clients) <= 1 { + return + } + + // Find the index of the last successful endpoint + lastSuccessfulIndex := -1 + for i, clientWrapper := range clients { + if clientWrapper.endpoint == lastSuccessfulEndpoint { + lastSuccessfulIndex = i + break + } + } + + // If we found the last successful endpoint, rotate to the next one + if lastSuccessfulIndex >= 0 { + nextClientIndex := (lastSuccessfulIndex + 1) % len(clients) + if nextClientIndex > 0 { + rotateSliceInPlace(clients, nextClientIndex) + } + } +} + +// rotateSliceInPlace rotates a slice left by k positions using the reverse-reverse algorithm. +func rotateSliceInPlace[T any](slice []T, k int) { + if len(slice) <= 1 { + return + } + + k = k % len(slice) + if k == 0 { + return + } + + // Reverse the entire slice + for i, j := 0, len(slice)-1; i < j; i, j = i+1, j-1 { + slice[i], slice[j] = slice[j], slice[i] + } + + // Reverse the first len(slice)-k elements + for i, j := 0, len(slice)-k-1; i < j; i, j = i+1, j-1 { + slice[i], slice[j] = slice[j], slice[i] + } + + // Reverse the remaining elements + for i, j := len(slice)-k, len(slice)-1; i < j; i, j = i+1, j-1 { + slice[i], slice[j] = slice[j], slice[i] + } +} + func isFailoverable(err error) bool { if err == nil { return false diff --git a/azureappconfiguration/client_manager.go b/azureappconfiguration/client_manager.go index 8de6404..e80c574 100644 --- a/azureappconfiguration/client_manager.go +++ b/azureappconfiguration/client_manager.go @@ -182,13 +182,11 @@ func (manager *configurationClientManager) processSrvTargetHosts(srvTargetHosts if strings.EqualFold(targetEndpoint, manager.endpoint) { continue // Skip primary endpoint } - client, err := manager.newConfigurationClient(targetEndpoint) if err != nil { log.Printf("failed to create client for replica %s: %v", targetEndpoint, err) continue // Continue with other replicas instead of returning } - newDynamicClients = append(newDynamicClients, &configurationClientWrapper{ endpoint: targetEndpoint, client: client, @@ -349,7 +347,6 @@ func (client *configurationClientWrapper) getBackoffDuration() time.Duration { // Cap the exponent to prevent overflow exponent := math.Min(float64(client.failedAttempts-1), float64(safeShiftLimit)) calculatedMilliseconds := float64(minBackoffDuration.Milliseconds()) * math.Pow(2, exponent) - if calculatedMilliseconds > float64(maxBackoffDuration.Milliseconds()) || calculatedMilliseconds <= 0 { calculatedMilliseconds = float64(maxBackoffDuration.Milliseconds()) } diff --git a/azureappconfiguration/failover_test.go b/azureappconfiguration/failover_test.go index b24bf52..665407c 100644 --- a/azureappconfiguration/failover_test.go +++ b/azureappconfiguration/failover_test.go @@ -340,6 +340,292 @@ func TestIsFailoverable(t *testing.T) { } } +// Test executeFailoverPolicy with load balancing enabled - should rotate clients +func TestExecuteFailoverPolicy_LoadBalancing_RotateClients(t *testing.T) { + mockClientManager := new(mockClientManager) + + client1 := &azappconfig.Client{} + client2 := &azappconfig.Client{} + client3 := &azappconfig.Client{} + + clientWrappers := []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1, failedAttempts: 0}, + {endpoint: "https://replica1.azconfig.io", client: client2, failedAttempts: 0}, + {endpoint: "https://replica2.azconfig.io", client: client3, failedAttempts: 0}, + } + + mockClientManager.On("getClients", mock.Anything).Return(clientWrappers, nil) + + azappcfg := &AzureAppConfiguration{ + clientManager: mockClientManager, + loadBalancingEnabled: true, + lastSuccessfulEndpoint: "https://primary.azconfig.io", // Last successful was the first client + } + + var usedClient *azappconfig.Client + operation := func(client *azappconfig.Client) error { + usedClient = client + return nil // Success + } + + err := azappcfg.executeFailoverPolicy(context.Background(), operation) + + assert.NoError(t, err) + // After rotation, the second client (replica1) should be used first + assert.Equal(t, client2, usedClient, "Should use the next client after rotation") + assert.Equal(t, "https://replica1.azconfig.io", azappcfg.lastSuccessfulEndpoint, "Should update last successful endpoint") + mockClientManager.AssertExpectations(t) +} + +// Test executeFailoverPolicy with load balancing enabled - last client was successful +func TestExecuteFailoverPolicy_LoadBalancing_LastClientSuccessful(t *testing.T) { + mockClientManager := new(mockClientManager) + + client1 := &azappconfig.Client{} + client2 := &azappconfig.Client{} + client3 := &azappconfig.Client{} + + clientWrappers := []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1, failedAttempts: 0}, + {endpoint: "https://replica1.azconfig.io", client: client2, failedAttempts: 0}, + {endpoint: "https://replica2.azconfig.io", client: client3, failedAttempts: 0}, + } + + mockClientManager.On("getClients", mock.Anything).Return(clientWrappers, nil) + + azappcfg := &AzureAppConfiguration{ + clientManager: mockClientManager, + loadBalancingEnabled: true, + lastSuccessfulEndpoint: "https://replica2.azconfig.io", // Last successful was the last client + } + + var usedClient *azappconfig.Client + operation := func(client *azappconfig.Client) error { + usedClient = client + return nil // Success + } + + err := azappcfg.executeFailoverPolicy(context.Background(), operation) + + assert.NoError(t, err) + // After rotation, should wrap around to the first client + assert.Equal(t, client1, usedClient, "Should wrap around to the first client") + assert.Equal(t, "https://primary.azconfig.io", azappcfg.lastSuccessfulEndpoint, "Should update last successful endpoint") + mockClientManager.AssertExpectations(t) +} + +// Test executeFailoverPolicy with load balancing disabled - should not rotate +func TestExecuteFailoverPolicy_LoadBalancing_Disabled(t *testing.T) { + mockClientManager := new(mockClientManager) + + client1 := &azappconfig.Client{} + client2 := &azappconfig.Client{} + + clientWrappers := []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1, failedAttempts: 0}, + {endpoint: "https://replica.azconfig.io", client: client2, failedAttempts: 0}, + } + + mockClientManager.On("getClients", mock.Anything).Return(clientWrappers, nil) + + azappcfg := &AzureAppConfiguration{ + clientManager: mockClientManager, + loadBalancingEnabled: false, // Load balancing disabled + lastSuccessfulEndpoint: "https://primary.azconfig.io", + } + + var usedClient *azappconfig.Client + operation := func(client *azappconfig.Client) error { + usedClient = client + return nil // Success + } + + err := azappcfg.executeFailoverPolicy(context.Background(), operation) + + assert.NoError(t, err) + // Should use the first client (no rotation) + assert.Equal(t, client1, usedClient, "Should use the first client when load balancing is disabled") + // lastSuccessfulEndpoint should not be updated when load balancing is disabled + assert.Equal(t, "https://primary.azconfig.io", azappcfg.lastSuccessfulEndpoint) + mockClientManager.AssertExpectations(t) +} + +// Test executeFailoverPolicy with load balancing - single client +func TestExecuteFailoverPolicy_LoadBalancing_SingleClient(t *testing.T) { + mockClientManager := new(mockClientManager) + + client1 := &azappconfig.Client{} + + clientWrappers := []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1, failedAttempts: 0}, + } + + mockClientManager.On("getClients", mock.Anything).Return(clientWrappers, nil) + + azappcfg := &AzureAppConfiguration{ + clientManager: mockClientManager, + loadBalancingEnabled: true, + lastSuccessfulEndpoint: "https://primary.azconfig.io", + } + + var usedClient *azappconfig.Client + operation := func(client *azappconfig.Client) error { + usedClient = client + return nil // Success + } + + err := azappcfg.executeFailoverPolicy(context.Background(), operation) + + assert.NoError(t, err) + // Should use the only client available + assert.Equal(t, client1, usedClient, "Should use the only available client") + assert.Equal(t, "https://primary.azconfig.io", azappcfg.lastSuccessfulEndpoint, "Should update last successful endpoint") + mockClientManager.AssertExpectations(t) +} + +// Test rotateClientsToNextEndpoint function directly +func TestRotateClientsToNextEndpoint(t *testing.T) { + client1 := &azappconfig.Client{} + client2 := &azappconfig.Client{} + client3 := &azappconfig.Client{} + + tests := []struct { + name string + clients []*configurationClientWrapper + lastSuccessfulEndpoint string + expectedFirstClient string + }{ + { + name: "rotate from first to second", + clients: []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1}, + {endpoint: "https://replica1.azconfig.io", client: client2}, + {endpoint: "https://replica2.azconfig.io", client: client3}, + }, + lastSuccessfulEndpoint: "https://primary.azconfig.io", + expectedFirstClient: "https://replica1.azconfig.io", + }, + { + name: "rotate from middle to next", + clients: []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1}, + {endpoint: "https://replica1.azconfig.io", client: client2}, + {endpoint: "https://replica2.azconfig.io", client: client3}, + }, + lastSuccessfulEndpoint: "https://replica1.azconfig.io", + expectedFirstClient: "https://replica2.azconfig.io", + }, + { + name: "rotate from last to first (wrap around)", + clients: []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1}, + {endpoint: "https://replica1.azconfig.io", client: client2}, + {endpoint: "https://replica2.azconfig.io", client: client3}, + }, + lastSuccessfulEndpoint: "https://replica2.azconfig.io", + expectedFirstClient: "https://primary.azconfig.io", + }, + { + name: "single client - no rotation", + clients: []*configurationClientWrapper{ + {endpoint: "https://primary.azconfig.io", client: client1}, + }, + lastSuccessfulEndpoint: "https://primary.azconfig.io", + expectedFirstClient: "https://primary.azconfig.io", + }, + { + name: "empty clients - no panic", + clients: []*configurationClientWrapper{}, + lastSuccessfulEndpoint: "https://primary.azconfig.io", + expectedFirstClient: "", // No clients, so no first client + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Make a copy to avoid modifying the original slice + clientsCopy := make([]*configurationClientWrapper, len(tt.clients)) + copy(clientsCopy, tt.clients) + + rotateClientsToNextEndpoint(clientsCopy, tt.lastSuccessfulEndpoint) + + if len(clientsCopy) > 0 { + assert.Equal(t, tt.expectedFirstClient, clientsCopy[0].endpoint, "First client after rotation should match expected") + } + }) + } +} + +// Test rotateSliceInPlace function directly +func TestRotateSliceInPlace(t *testing.T) { + tests := []struct { + name string + input []int + k int + expected []int + }{ + { + name: "rotate by 1", + input: []int{1, 2, 3, 4, 5}, + k: 1, + expected: []int{2, 3, 4, 5, 1}, + }, + { + name: "rotate by 2", + input: []int{1, 2, 3, 4, 5}, + k: 2, + expected: []int{3, 4, 5, 1, 2}, + }, + { + name: "rotate by length (no change)", + input: []int{1, 2, 3, 4, 5}, + k: 5, + expected: []int{1, 2, 3, 4, 5}, + }, + { + name: "rotate by more than length", + input: []int{1, 2, 3, 4, 5}, + k: 7, // 7 % 5 = 2 + expected: []int{3, 4, 5, 1, 2}, + }, + { + name: "rotate by 0", + input: []int{1, 2, 3, 4, 5}, + k: 0, + expected: []int{1, 2, 3, 4, 5}, + }, + { + name: "single element", + input: []int{1}, + k: 1, + expected: []int{1}, + }, + { + name: "two elements rotate by 1", + input: []int{1, 2}, + k: 1, + expected: []int{2, 1}, + }, + { + name: "empty slice", + input: []int{}, + k: 1, + expected: []int{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Make a copy to avoid modifying the original slice + inputCopy := make([]int, len(tt.input)) + copy(inputCopy, tt.input) + + rotateSliceInPlace(inputCopy, tt.k) + assert.Equal(t, tt.expected, inputCopy, "Rotated slice should match expected result") + }) + } +} + // Test client wrapper backoff behavior func TestClientWrapper_UpdateBackoffStatus(t *testing.T) { client := &configurationClientWrapper{ diff --git a/azureappconfiguration/internal/tracing/tracing.go b/azureappconfiguration/internal/tracing/tracing.go index 9b1fa86..cd39858 100644 --- a/azureappconfiguration/internal/tracing/tracing.go +++ b/azureappconfiguration/internal/tracing/tracing.go @@ -42,6 +42,7 @@ const ( AIChatCompletionConfigurationTag = "AICC" FailoverRequestTag = "Failover" ReplicaCountKey = "ReplicaCount" + LoadBalancingEnabledTag = "LB" // Feature flag usage tracing FeatureFilterTypeKey = "Filter" @@ -73,6 +74,7 @@ type Options struct { UseAIChatCompletionConfiguration bool IsFailoverRequest bool ReplicaCount int + IsLoadBalancingEnabled bool FeatureFlagTracing *FeatureFlagTracing } @@ -130,6 +132,10 @@ func CreateCorrelationContextHeader(ctx context.Context, options Options) http.H features = append(features, AIChatCompletionConfigurationTag) } + if options.IsLoadBalancingEnabled { + features = append(features, LoadBalancingEnabledTag) + } + if len(features) > 0 { featureStr := FeaturesKey + "=" + strings.Join(features, DelimiterPlus) output = append(output, featureStr) diff --git a/azureappconfiguration/options.go b/azureappconfiguration/options.go index 5824133..2950db7 100644 --- a/azureappconfiguration/options.go +++ b/azureappconfiguration/options.go @@ -39,6 +39,10 @@ type Options struct { // ReplicaDiscoveryEnabled specifies whether to enable replica discovery for the Azure App Configuration service. // It defaults to true, which allows the provider to discover and use replicas for improved availability. ReplicaDiscoveryEnabled *bool + + // LoadBalancingEnabled specifies whether to enable load balancing across multiple replicas of the Azure App Configuration service. + // It defaults to false. + LoadBalancingEnabled bool } // AuthenticationOptions contains parameters for authenticating with the Azure App Configuration service.