Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 108 additions & 64 deletions pkg/scalers/nsq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -106,6 +108,81 @@ var nsqMetricIdentifiers = []nsqMetricIdentifier{
{&parseNSQMetadataTestDataset[0], 1, "s1-nsq-topic-channel", "AverageValue"},
}

// Create mock handlers that return fixed responses
func createMockNSQdHandler(depth int64, statsError bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if statsError {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
response := fmt.Sprintf(`{"topics":[{"topic_name":"topic","channels":[{"channel_name":"channel","depth":%d}]}]}`, depth)
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
}
}

func createMockLookupdHandler(hostname, port string, lookupError bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if lookupError {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
response := fmt.Sprintf(`{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, hostname, port)
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
}
}

func createMockNSQdDepthHandler(statsError, channelPaused bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if statsError {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
var response string
if channelPaused {
response = `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`
} else {
response = `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`
}
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
}
}

func createMockLookupdDepthHandler(hostname, port string, lookupError, topicNotExist, producersNotExist bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if lookupError {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")

var response string
switch {
case topicNotExist:
response = `{"message": "TOPIC_NOT_FOUND"}`
case producersNotExist:
response = `{"producers":[]}`
default:
response = fmt.Sprintf(`{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, hostname, port)
}

http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
}
}

func createMockServerWithResponse(statusCode int, response string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if statusCode != http.StatusOK {
http.Error(w, "Internal Server Error", statusCode)
return
}
w.Header().Set("Content-Type", "application/json")
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(response))
}
}

func TestNSQParseMetadata(t *testing.T) {
for _, testData := range parseNSQMetadataTestDataset {
config := scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}
Expand Down Expand Up @@ -162,33 +239,27 @@ func TestNSQGetMetricsAndActivity(t *testing.T) {
},
}
for _, tc := range testCases {
mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprintf(w, `{"topics":[{"topic_name":"topic","channels":[{"channel_name":"channel","depth":%d}]}]}`, tc.expectedDepth)
}))
mockNSQdServer := httptest.NewServer(createMockNSQdHandler(tc.expectedDepth, tc.statsError))
defer mockNSQdServer.Close()

parsedNSQdURL, err := url.Parse(mockNSQdServer.URL)
assert.Nil(t, err)

mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port())
}))
mockNSQLookupdServer := httptest.NewServer(createMockLookupdHandler(parsedNSQdURL.Hostname(), parsedNSQdURL.Port(), tc.lookupError))
defer mockNSQLookupdServer.Close()

parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL)
assert.Nil(t, err)

nsqlookupdHost := net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port())

activationThreshold := fmt.Sprintf("%d", tc.activationdDepthThreshold)

config := scalersconfig.ScalerConfig{TriggerMetadata: map[string]string{
"nsqLookupdHTTPAddresses": nsqlookupdHost,
"topic": "topic",
"channel": "channel",
"activationDepthThreshold": fmt.Sprintf("%d", tc.activationdDepthThreshold),
"activationDepthThreshold": activationThreshold,
}}
meta, err := parseNSQMetadata(&config)
assert.Nil(t, err)
Expand Down Expand Up @@ -281,45 +352,13 @@ func TestNSQGetTopicChannelDepth(t *testing.T) {
}

for _, tc := range testCases {
mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tc.statsError {
w.WriteHeader(http.StatusInternalServerError)
return
}
if tc.channelPaused {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`)
return
}

w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`)
}))
mockNSQdServer := httptest.NewServer(createMockNSQdDepthHandler(tc.statsError, tc.channelPaused))
defer mockNSQdServer.Close()

parsedNSQdURL, err := url.Parse(mockNSQdServer.URL)
assert.Nil(t, err)

mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tc.lookupError {
w.WriteHeader(http.StatusInternalServerError)
return
}
if tc.topicNotExist {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"message": "TOPIC_NOT_FOUND"}`)
return
}
if tc.producersNotExist {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"producers":[]}`)
return
}

w.WriteHeader(http.StatusOK)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port())
}))
mockNSQLookupdServer := httptest.NewServer(createMockLookupdDepthHandler(parsedNSQdURL.Hostname(), parsedNSQdURL.Port(), tc.lookupError, tc.topicNotExist, tc.producersNotExist))
defer mockNSQLookupdServer.Close()

parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL)
Expand Down Expand Up @@ -368,7 +407,8 @@ func TestNSQGetTopicProducers(t *testing.T) {
},
{
statusAndResponses: []statusAndResponse{
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}, {"broadcast_address":"nsqd-1","http_port":4161}]}`},
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`},
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-1","http_port":4161}]}`},
{http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-2","http_port":8161}]}`},
},
expectedNSQdHosts: []string{"nsqd-0:4161", "nsqd-1:4161", "nsqd-2:8161"},
Expand All @@ -393,12 +433,18 @@ func TestNSQGetTopicProducers(t *testing.T) {
}

for _, tc := range testCases {
callCount := atomic.NewInt32(-1)
callCount := atomic.NewInt32(0)
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
index := int(callCount.Load())
callCount.Inc()
w.WriteHeader(tc.statusAndResponses[callCount.Load()].status)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response)

statusResponse := tc.statusAndResponses[index]
if statusResponse.status != http.StatusOK {
http.Error(w, "Internal Server Error", statusResponse.status)
return
}
w.Header().Set("Content-Type", "application/json")
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(statusResponse.response))
}))
defer mockServer.Close()

Expand Down Expand Up @@ -465,11 +511,7 @@ func TestNSQGetLookup(t *testing.T) {

s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"}
for _, tc := range testCases {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.serverStatus)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprint(w, tc.serverResponse)
}))
mockServer := httptest.NewServer(createMockServerWithResponse(tc.serverStatus, tc.serverResponse))
defer mockServer.Close()

parsedURL, err := url.Parse(mockServer.URL)
Expand Down Expand Up @@ -577,12 +619,18 @@ func TestNSQAggregateDepth(t *testing.T) {

s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"}
for _, tc := range testCases {
callCount := atomic.NewInt32(-1)
callCount := atomic.NewInt32(0)
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
index := int(callCount.Load())
callCount.Inc()
w.WriteHeader(tc.statusAndResponses[callCount.Load()].status)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response)

statusResponse := tc.statusAndResponses[index]
if statusResponse.status != http.StatusOK {
http.Error(w, "Internal Server Error", statusResponse.status)
return
}
w.Header().Set("Content-Type", "application/json")
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(statusResponse.response))
}))
defer mockServer.Close()

Expand Down Expand Up @@ -641,11 +689,7 @@ func TestNSQGetStats(t *testing.T) {

s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"}
for _, tc := range testCases {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.serverStatus)
// nosemgrep: no-fprintf-to-responsewriter
fmt.Fprint(w, tc.serverResponse)
}))
mockServer := httptest.NewServer(createMockServerWithResponse(tc.serverStatus, tc.serverResponse))
defer mockServer.Close()

parsedURL, err := url.Parse(mockServer.URL)
Expand Down
Loading