Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/agent_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strconv"
"strings"
"time"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
)

// This is a special keyword supported on backend for polling from the current default queue in the cluster.
Expand Down Expand Up @@ -127,6 +129,18 @@ func (c *AgentClient) GetScheduledJobs(ctx context.Context, afterCursor string,
now := time.Now()
for _, j := range result.Jobs {
j.QueriedAt = now
if c.queue == defaultQueueKey {
// When we poll from default queue, we don't know the queue key, so in rest of the system queue="".
// The job might contain a queue key `agents: queue: default`, in that case it will cause mismatch in local
// job queue key "" vs our configuration queue key "default".
// So this is forcing them to be equal.
j.AgentQueryRules = agenttags.RemoveTag(j.AgentQueryRules, "queue")
} else {
// Ensure the job has the queue tag. We queried a queue-specific
// endpoint, but it may be the default queue, which doesn't require
// `agents: queue: ...`, so the queue tag might not be present.
j.AgentQueryRules = agenttags.SetTag(j.AgentQueryRules, "queue", c.queue)
}
}
}
return result, retryAfter, err
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/agenttags/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func SetTag(tags []string, key, val string) []string {
return tags
}

func RemoveTag(tags []string, key string) []string {
return slices.DeleteFunc(tags, func(tag string) bool {
return strings.HasPrefix(tag, key+"=")
})
}

// labelsFromTagMap converts map[key->value] to map[tag.buildkite.com/key->value],
// with k8s compatibility checks
func labelsFromTagMap(m map[string]string) (map[string]string, []error) {
Expand Down
7 changes: 0 additions & 7 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,6 @@ func (m *Monitor) passJobsToNextHandler(
filteredJobs := make([]*api.AgentScheduledJob, 0, len(jobs))

for _, job := range jobs {
// Ensure the job has the queue tag. We queried a queue-specific
// endpoint, but it may be the default queue, which doesn't require
// `agents: queue: ...`, so the queue tag might not be present.
if m.cfg.Queue != "" {
job.AgentQueryRules = agenttags.SetTag(job.AgentQueryRules, "queue", m.cfg.Queue)
}

// Convert the job tags to a map.
jobTags, tagErrs := agenttags.TagMapFromTags(job.AgentQueryRules)
if len(tagErrs) != 0 {
Expand Down
10 changes: 10 additions & 0 deletions internal/integration/fixtures/explicit-default-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
agents:
# Target the default queue, but provide a different specific tag
# so that the job is picked up by the test controller.
pseudoQueue: "{{.pseudoQueue}}"

queue: default # This test assumes the default queue is called "default".

steps:
- label: ":wave:"
command: "echo Hi there"
29 changes: 29 additions & 0 deletions internal/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,35 @@ func TestDefaultQueue(t *testing.T) {
tc.AssertLogsContain(build, "Hi there")
}

func TestExplicitDefaultQueue(t *testing.T) {
// Note: this test assumes the default queue is called "default".
// This happens to be the case for our CI setup.
// TODO: generalise the test to work with any name default queue once the
// controller can function without setting the queue explicitly.
tc := testcase{
T: t,
Fixture: "explicit-default-queue.yaml",
Repo: repoHTTP,
GraphQL: api.NewGraphQLClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
CustomQueue: "default",
}.Init()
ctx := context.Background()
pipeline := tc.createPipelineWithCleanup(ctx, "default", map[string]string{
// You may wonder what this pseudoQueue is.
// Since in this particular test case, the queue is static, without extra tag, we will be effectively limiting
// test concurrency to 1.
// 2 builds running at the same time might conflict with each other.
"pseudoQueue": tc.ShortPipelineName(),
})
// Start a controller without queue tag, it will listen to default queue
tc.StartController(ctx, cfg,
"pseudoQueue="+tc.ShortPipelineName(),
)
build := tc.TriggerBuild(ctx, *pipeline.GraphQLID)
tc.AssertSuccess(ctx, build)
tc.AssertLogsContain(build, "Hi there")
}

func TestPodSpecPatchInStep(t *testing.T) {
tc := testcase{
T: t,
Expand Down