Skip to content

Commit 92cc9fe

Browse files
rickbrouwerkmoonwright
authored andcommitted
fix: resolve race condition in NSQ scaler tests by fixing atomic counter usage (kedacore#6843)
Signed-off-by: Rick Brouwer <[email protected]> Signed-off-by: kmoonwright <[email protected]>
1 parent 1c79b2d commit 92cc9fe

File tree

1 file changed

+108
-64
lines changed

1 file changed

+108
-64
lines changed

pkg/scalers/nsq_scaler_test.go

Lines changed: 108 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"net/http"
88
"net/http/httptest"
99
"net/url"
10+
"strings"
1011
"testing"
12+
"time"
1113

1214
"github.com/go-logr/logr"
1315
"github.com/stretchr/testify/assert"
@@ -106,6 +108,81 @@ var nsqMetricIdentifiers = []nsqMetricIdentifier{
106108
{&parseNSQMetadataTestDataset[0], 1, "s1-nsq-topic-channel", "AverageValue"},
107109
}
108110

111+
// Create mock handlers that return fixed responses
112+
func createMockNSQdHandler(depth int64, statsError bool) http.HandlerFunc {
113+
return func(w http.ResponseWriter, r *http.Request) {
114+
if statsError {
115+
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
116+
return
117+
}
118+
w.Header().Set("Content-Type", "application/json")
119+
response := fmt.Sprintf(`{"topics":[{"topic_name":"topic","channels":[{"channel_name":"channel","depth":%d}]}]}`, depth)
120+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
121+
}
122+
}
123+
124+
func createMockLookupdHandler(hostname, port string, lookupError bool) http.HandlerFunc {
125+
return func(w http.ResponseWriter, r *http.Request) {
126+
if lookupError {
127+
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
128+
return
129+
}
130+
w.Header().Set("Content-Type", "application/json")
131+
response := fmt.Sprintf(`{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, hostname, port)
132+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
133+
}
134+
}
135+
136+
func createMockNSQdDepthHandler(statsError, channelPaused bool) http.HandlerFunc {
137+
return func(w http.ResponseWriter, r *http.Request) {
138+
if statsError {
139+
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
140+
return
141+
}
142+
w.Header().Set("Content-Type", "application/json")
143+
var response string
144+
if channelPaused {
145+
response = `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`
146+
} else {
147+
response = `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`
148+
}
149+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
150+
}
151+
}
152+
153+
func createMockLookupdDepthHandler(hostname, port string, lookupError, topicNotExist, producersNotExist bool) http.HandlerFunc {
154+
return func(w http.ResponseWriter, r *http.Request) {
155+
if lookupError {
156+
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
157+
return
158+
}
159+
w.Header().Set("Content-Type", "application/json")
160+
161+
var response string
162+
switch {
163+
case topicNotExist:
164+
response = `{"message": "TOPIC_NOT_FOUND"}`
165+
case producersNotExist:
166+
response = `{"producers":[]}`
167+
default:
168+
response = fmt.Sprintf(`{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, hostname, port)
169+
}
170+
171+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
172+
}
173+
}
174+
175+
func createMockServerWithResponse(statusCode int, response string) http.HandlerFunc {
176+
return func(w http.ResponseWriter, r *http.Request) {
177+
if statusCode != http.StatusOK {
178+
http.Error(w, "Internal Server Error", statusCode)
179+
return
180+
}
181+
w.Header().Set("Content-Type", "application/json")
182+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
183+
}
184+
}
185+
109186
func TestNSQParseMetadata(t *testing.T) {
110187
for _, testData := range parseNSQMetadataTestDataset {
111188
config := scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}
@@ -162,33 +239,27 @@ func TestNSQGetMetricsAndActivity(t *testing.T) {
162239
},
163240
}
164241
for _, tc := range testCases {
165-
mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
166-
w.WriteHeader(http.StatusOK)
167-
// nosemgrep: no-fprintf-to-responsewriter
168-
fmt.Fprintf(w, `{"topics":[{"topic_name":"topic","channels":[{"channel_name":"channel","depth":%d}]}]}`, tc.expectedDepth)
169-
}))
242+
mockNSQdServer := httptest.NewServer(createMockNSQdHandler(tc.expectedDepth, tc.statsError))
170243
defer mockNSQdServer.Close()
171244

172245
parsedNSQdURL, err := url.Parse(mockNSQdServer.URL)
173246
assert.Nil(t, err)
174247

175-
mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
176-
w.WriteHeader(http.StatusOK)
177-
// nosemgrep: no-fprintf-to-responsewriter
178-
fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port())
179-
}))
248+
mockNSQLookupdServer := httptest.NewServer(createMockLookupdHandler(parsedNSQdURL.Hostname(), parsedNSQdURL.Port(), tc.lookupError))
180249
defer mockNSQLookupdServer.Close()
181250

182251
parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL)
183252
assert.Nil(t, err)
184253

185254
nsqlookupdHost := net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port())
186255

256+
activationThreshold := fmt.Sprintf("%d", tc.activationdDepthThreshold)
257+
187258
config := scalersconfig.ScalerConfig{TriggerMetadata: map[string]string{
188259
"nsqLookupdHTTPAddresses": nsqlookupdHost,
189260
"topic": "topic",
190261
"channel": "channel",
191-
"activationDepthThreshold": fmt.Sprintf("%d", tc.activationdDepthThreshold),
262+
"activationDepthThreshold": activationThreshold,
192263
}}
193264
meta, err := parseNSQMetadata(&config)
194265
assert.Nil(t, err)
@@ -281,45 +352,13 @@ func TestNSQGetTopicChannelDepth(t *testing.T) {
281352
}
282353

283354
for _, tc := range testCases {
284-
mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
285-
if tc.statsError {
286-
w.WriteHeader(http.StatusInternalServerError)
287-
return
288-
}
289-
if tc.channelPaused {
290-
w.WriteHeader(http.StatusOK)
291-
fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`)
292-
return
293-
}
294-
295-
w.WriteHeader(http.StatusOK)
296-
fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`)
297-
}))
355+
mockNSQdServer := httptest.NewServer(createMockNSQdDepthHandler(tc.statsError, tc.channelPaused))
298356
defer mockNSQdServer.Close()
299357

300358
parsedNSQdURL, err := url.Parse(mockNSQdServer.URL)
301359
assert.Nil(t, err)
302360

303-
mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
304-
if tc.lookupError {
305-
w.WriteHeader(http.StatusInternalServerError)
306-
return
307-
}
308-
if tc.topicNotExist {
309-
w.WriteHeader(http.StatusOK)
310-
fmt.Fprint(w, `{"message": "TOPIC_NOT_FOUND"}`)
311-
return
312-
}
313-
if tc.producersNotExist {
314-
w.WriteHeader(http.StatusOK)
315-
fmt.Fprint(w, `{"producers":[]}`)
316-
return
317-
}
318-
319-
w.WriteHeader(http.StatusOK)
320-
// nosemgrep: no-fprintf-to-responsewriter
321-
fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port())
322-
}))
361+
mockNSQLookupdServer := httptest.NewServer(createMockLookupdDepthHandler(parsedNSQdURL.Hostname(), parsedNSQdURL.Port(), tc.lookupError, tc.topicNotExist, tc.producersNotExist))
323362
defer mockNSQLookupdServer.Close()
324363

325364
parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL)
@@ -368,7 +407,8 @@ func TestNSQGetTopicProducers(t *testing.T) {
368407
},
369408
{
370409
statusAndResponses: []statusAndResponse{
371-
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}, {"broadcast_address":"nsqd-1","http_port":4161}]}`},
410+
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`},
411+
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-1","http_port":4161}]}`},
372412
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-2","http_port":8161}]}`},
373413
},
374414
expectedNSQdHosts: []string{"nsqd-0:4161", "nsqd-1:4161", "nsqd-2:8161"},
@@ -393,12 +433,18 @@ func TestNSQGetTopicProducers(t *testing.T) {
393433
}
394434

395435
for _, tc := range testCases {
396-
callCount := atomic.NewInt32(-1)
436+
callCount := atomic.NewInt32(0)
397437
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
438+
index := int(callCount.Load())
398439
callCount.Inc()
399-
w.WriteHeader(tc.statusAndResponses[callCount.Load()].status)
400-
// nosemgrep: no-fprintf-to-responsewriter
401-
fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response)
440+
441+
statusResponse := tc.statusAndResponses[index]
442+
if statusResponse.status != http.StatusOK {
443+
http.Error(w, "Internal Server Error", statusResponse.status)
444+
return
445+
}
446+
w.Header().Set("Content-Type", "application/json")
447+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(statusResponse.response))
402448
}))
403449
defer mockServer.Close()
404450

@@ -465,11 +511,7 @@ func TestNSQGetLookup(t *testing.T) {
465511

466512
s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"}
467513
for _, tc := range testCases {
468-
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
469-
w.WriteHeader(tc.serverStatus)
470-
// nosemgrep: no-fprintf-to-responsewriter
471-
fmt.Fprint(w, tc.serverResponse)
472-
}))
514+
mockServer := httptest.NewServer(createMockServerWithResponse(tc.serverStatus, tc.serverResponse))
473515
defer mockServer.Close()
474516

475517
parsedURL, err := url.Parse(mockServer.URL)
@@ -577,12 +619,18 @@ func TestNSQAggregateDepth(t *testing.T) {
577619

578620
s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"}
579621
for _, tc := range testCases {
580-
callCount := atomic.NewInt32(-1)
622+
callCount := atomic.NewInt32(0)
581623
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
624+
index := int(callCount.Load())
582625
callCount.Inc()
583-
w.WriteHeader(tc.statusAndResponses[callCount.Load()].status)
584-
// nosemgrep: no-fprintf-to-responsewriter
585-
fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response)
626+
627+
statusResponse := tc.statusAndResponses[index]
628+
if statusResponse.status != http.StatusOK {
629+
http.Error(w, "Internal Server Error", statusResponse.status)
630+
return
631+
}
632+
w.Header().Set("Content-Type", "application/json")
633+
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(statusResponse.response))
586634
}))
587635
defer mockServer.Close()
588636

@@ -641,11 +689,7 @@ func TestNSQGetStats(t *testing.T) {
641689

642690
s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"}
643691
for _, tc := range testCases {
644-
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
645-
w.WriteHeader(tc.serverStatus)
646-
// nosemgrep: no-fprintf-to-responsewriter
647-
fmt.Fprint(w, tc.serverResponse)
648-
}))
692+
mockServer := httptest.NewServer(createMockServerWithResponse(tc.serverStatus, tc.serverResponse))
649693
defer mockServer.Close()
650694

651695
parsedURL, err := url.Parse(mockServer.URL)

0 commit comments

Comments
 (0)