Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions api/fake_agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type FakeAgentServer struct {

// ReserveError configures an error message to return.
ReserveError string

// NotificationCalls records all notification batches sent to the server.
NotificationCalls [][]stacksapi.StackNotification
}

// NewFakeAgentServer creates and starts a fake agent API server.
Expand All @@ -43,6 +46,7 @@ func NewFakeAgentServer() *FakeAgentServer {
mux := http.NewServeMux()
mux.HandleFunc("/stacks/register", fake.handleRegisterStack)
mux.HandleFunc("/stacks/test-stack/scheduled-jobs/batch-reserve", fake.handleReserveJobs)
mux.HandleFunc("/stacks/test-stack/notifications", fake.handleNotifications)

fake.server = httptest.NewServer(mux)
return fake
Expand Down Expand Up @@ -146,3 +150,31 @@ func (f *FakeAgentServer) handleReserveJobs(w http.ResponseWriter, r *http.Reque
log.Printf("fake: failed to write reserve jobs response: %v", err)
}
}

func (f *FakeAgentServer) handleNotifications(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}

var req stacksapi.CreateStackNotificationsRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

f.mu.Lock()
f.NotificationCalls = append(f.NotificationCalls, req.Notifications)
f.mu.Unlock()

var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(stacksapi.CreateStackNotificationsResponse{}); err != nil {
http.Error(w, "fake: failed to encode response: "+err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(buf.Bytes()); err != nil {
log.Printf("fake: failed to write notifications response: %v", err)
}
}
122 changes: 122 additions & 0 deletions api/stack_notification_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package api

import (
"context"
"fmt"
"log/slog"
"net/url"
"testing"
"time"

"github.com/buildkite/stacksapi"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestNotificationBatcher_SendsBatchesPeriodically(t *testing.T) {
t.Parallel()

server := NewFakeAgentServer()
defer server.Close()

serverURL, _ := url.Parse(server.URL())
client, err := stacksapi.NewClient("fake-token", stacksapi.WithBaseURL(serverURL))
if err != nil {
t.Fatalf("NewClient() error = %v", err)
}

nb := newNotificationBatcher("test-stack", client, slog.Default())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := nb.start(ctx); err != nil {
t.Fatalf("start() error = %v", err)
}

// Add notifications
want := []stacksapi.StackNotification{
{JobUUID: "job-A", Detail: "detail A"},
{JobUUID: "job-B", Detail: "detail B"},
{JobUUID: "job-C", Detail: "detail C"},
}
for _, n := range want {
if err := nb.add(ctx, n); err != nil {
t.Fatalf("add() error = %v", err)
}
}

// Wait for ticker to flush (interval is 100ms)
time.Sleep(250 * time.Millisecond)

if got, wantLen := len(server.NotificationCalls), 1; got != wantLen {
t.Fatalf("number of notification calls = %d, want %d", got, wantLen)
}

if diff := cmp.Diff(want, server.NotificationCalls[0], cmpopts.IgnoreFields(stacksapi.StackNotification{}, "Timestamp")); diff != "" {
t.Errorf("notifications mismatch (-want +got):\n%s", diff)
}

cancel()
nb.waitDone()
}

func TestNotificationBatcher_ChunksBatchesBySize(t *testing.T) {
t.Parallel()

server := NewFakeAgentServer()
defer server.Close()

serverURL, _ := url.Parse(server.URL())
client, err := stacksapi.NewClient("fake-token", stacksapi.WithBaseURL(serverURL))
if err != nil {
t.Fatalf("NewClient() error = %v", err)
}

nb := newNotificationBatcher("test-stack", client, slog.Default())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if err := nb.start(ctx); err != nil {
t.Fatalf("start() error = %v", err)
}

// Add more than maxNotificationsPerBatch (1000) notifications
for i := range 2500 {
note := stacksapi.StackNotification{
JobUUID: fmt.Sprintf("job-%d", i),
Detail: "detail",
}
if err := nb.add(ctx, note); err != nil {
t.Fatalf("i=%d add(%v) error = %v", i, note, err)
}
}

// Wait for ticker to flush (interval is 100ms)
time.Sleep(250 * time.Millisecond)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a better mechanism for ensuring the notifications have flushed?

Could we take advantage of https://pkg.go.dev/testing/synctest ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL synctest, I gave it a good play. I don't think it works in our use. The basic premise of synctest is like poorman's DST, within synctest.Test's when all goroutines are "durably blocked", time flies fast, it fast track to a time where at least one goroutine can be blocked, and then fast forward to next etc. As if a single threaded cooperative concurrency model.

The test goroutine can also enter durably blocked state by issuing time.Sleep or synctest.Wait.

Sadly "durably blocked" does not include:

System calls like file or network I/O
External event handling (such as reading from a socket)

This is precisely what the agent faker server is doing.

As a result, our fake agent server (blocked on the real IO connection accept) and client (blocked on readLoop/writeLoop) will never enter the "durably blocked" state, meaning the entire synctest block will be blocked. (Remember, it's single threaded, one running routine can cause the entire thread to block).

I think it's in theory possible to instrument fake agent server and client so it can become "durably blocked" during test, but I don't think the benefits justify the effort yet.


// Should be chunked into 3 calls: 1000 + 1000 + 500
if got, want := len(server.NotificationCalls), 3; got != want {
t.Fatalf("number of notification calls = %d, want %d", got, want)
}

// Each batch should be at most 1000
for i, batch := range server.NotificationCalls {
if len(batch) > 1000 {
t.Errorf("batch %d size = %d, want <= 1000", i, len(batch))
}
}

// Total should be 2500
total := 0
for _, batch := range server.NotificationCalls {
total += len(batch)
}
if got, want := total, 2500; got != want {
t.Errorf("total notifications = %d, want %d", got, want)
}

cancel()
nb.waitDone()
}