Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ To get the integration test running locally, you will need:

1. A valid Buildkite API token with GraphQL enabled.
Comment thread
zhming0 marked this conversation as resolved.
2. A valid Buildkite Agent Token in your target Buildkite Cluster.
3. The name of your Buildkite organization (slug) and your target Buildkite Cluster UUID.
3. Your target Buildkite Cluster UUID.
Comment thread
zhming0 marked this conversation as resolved.
4. Depending on test cases, you may also need SSH keys, please keep reading.
5. Your shell environment will need CLI write access to a Kubernetes cluster such as the one provided by https://orbstack.dev/.

Expand All @@ -78,7 +78,6 @@ It's generally convenient to supply the API token, your Buildkite organization n

```bash
export BUILDKITE_TOKEN="bkua_**************"
export ORG="your-cool-org-slug"
export CLUSTER_UUID="UUID-UUID-UUID-UUID"
```

Expand All @@ -87,7 +86,7 @@ export CLUSTER_UUID="UUID-UUID-UUID-UUID"
To run the controller locally, with the environment variables, run the following example. Note that in this example the queue is overridden to ensure jobs from the default queue, which is "", are picked up by the Buildkite agent.

```bash
just run --org $ORG --buildkite-token $BUILDKITE_TOKEN --debug --tags 'queue=,os=linux'
just run --buildkite-token $BUILDKITE_TOKEN --debug --tags 'queue=,os=linux'
```

## Testing locally
Expand All @@ -113,13 +112,13 @@ go test -v -cover `go list ./... | grep -v internal/integration`
To run the integration tests, with the overrides from your environment, you can use the following command:

```bash
just test -timeout 10m -v ./internal/integration/... -args --org $ORG --buildkite-token $BUILDKITE_TOKEN
just test -timeout 10m -v ./internal/integration/... -args --buildkite-token $BUILDKITE_TOKEN
```

To run a single test, following goes `-run` convention will provide this capability:

```bash
just test -timeout 10m -v ./internal/integration/... -run TestImagePullBackOffFailed -args --org $ORG --buildkite-token $BUILDKITE_TOKEN
just test -timeout 10m -v ./internal/integration/... -run TestImagePullBackOffFailed -args --buildkite-token $BUILDKITE_TOKEN
```

## Token scopes
Expand Down Expand Up @@ -209,7 +208,7 @@ running a integration test.
In this case, you can choose to supply some inputs via CLI parameters instead of environment variables:

```bash
just run --org my-org --buildkite-token my-api-token --debug --cluster-uuid my-cluster-uuid
just run --buildkite-token my-api-token --debug --cluster-uuid my-cluster-uuid
```

### Local deployment with Helm
Expand All @@ -235,8 +234,6 @@ With config.yaml being a file containing [required Helm values](values.yaml), su
```yaml
agentToken: "abcdef"
graphqlToken: "12345"
config:
org: "my-buildkite-org"
```

The `config` key contains configuration passed directly to the binary, and so supports all the keys documented in [the example](examples/config.yaml).
Expand Down
4 changes: 1 addition & 3 deletions api/agent_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"
)

func NewAgentClient(token, endpoint, orgSlug, clusterID, queue string, agentQueryRules []string) (*AgentClient, error) {
func NewAgentClient(token, endpoint, clusterID, queue string, agentQueryRules []string) (*AgentClient, error) {
if endpoint == "" {
endpoint = "https://agent.buildkite.com/v3"
}
Expand All @@ -29,7 +29,6 @@ func NewAgentClient(token, endpoint, orgSlug, clusterID, queue string, agentQuer
Timeout: 60 * time.Second,
Transport: NewLogger(NewAuthedTransportWithToken(http.DefaultTransport, token)),
},
orgSlug: orgSlug,
clusterID: clusterID,
queue: queue,
}, nil
Expand All @@ -39,7 +38,6 @@ func NewAgentClient(token, endpoint, orgSlug, clusterID, queue string, agentQuer
type AgentClient struct {
endpoint *url.URL
httpClient *http.Client
orgSlug string
clusterID string // or "unclustered"
queue string
}
Expand Down
58 changes: 58 additions & 0 deletions api/agent_token_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package api

import (
"context"
"net/http"
"net/url"
"time"
)

// This is a special agent client: it can only be used to query the GET /token API.
// Meaning for this client to function, there isn't a need for a cluster id nor queue id.
func NewAgentTokenClient(token, endpoint string) (*AgentTokenClient, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we combine this with AgentClient later on?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my plan but AgentClient needs org, cluster to function. On a quick glance it doesn't seem quite obvious to me on how to combine these (nor on the benefit), so I kept them separate for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you showed that org is unnecessary. I'm thinking that the work AgentTokenClient does could be part of "AgentClient connection setup", so that NewAgentClient no longer needs the cluster ID as an arg. That leaves the queue parameter, which isn't needed for AgentTokenClient, but the arg could be passed an empty string for clients that don't need to get scheduled jobs, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean now. I agree.

if endpoint == "" {
endpoint = "https://agent.buildkite.com/v3"
}
endpointURL, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
return &AgentTokenClient{
endpoint: endpointURL,
httpClient: &http.Client{
Timeout: 60 * time.Second,
Transport: NewLogger(NewAuthedTransportWithToken(http.DefaultTransport, token)),
},
}, nil
}

type AgentTokenClient struct {
endpoint *url.URL
httpClient *http.Client
}

// AgentTokenIdentity describes the token identity information.
type AgentTokenIdentity struct {
UUID string `json:"uuid"`
Description string `json:"description"`
TokenType string `json:"token_type"`
OrganizationSlug string `json:"organization_slug"`
OrganizationUUID string `json:"organization_uuid"`
ClusterUUID string `json:"cluster_uuid"`
ClusterName string `json:"cluster_name"`
OrganizationQueueUUID string `json:"organization_queue_uuid"`
OrganizationQueueKey string `json:"organization_queue_key"`
}

// GetJobState gets the state of a specific job.
func (c *AgentTokenClient) GetTokenIdentity(ctx context.Context) (result *AgentTokenIdentity, retryAfter time.Duration, err error) {
u := c.endpoint.JoinPath("token")

resp, err := c.httpClient.Get(u.String())
if err != nil {
return nil, 0, err
}
defer resp.Body.Close()

return decodeResponse[AgentTokenIdentity](resp)
}
1 change: 0 additions & 1 deletion cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func AddConfigFlags(cmd *cobra.Command) {
cmd.Flags().String("buildkite-token", "", "Deprecated - Buildkite API token with GraphQL scopes")

// in the config file
cmd.Flags().String("org", "", "Buildkite organization name to watch")
cmd.Flags().String(
"image",
config.DefaultAgentImage,
Expand Down
1 change: 0 additions & 1 deletion cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func TestReadAndParseConfig(t *testing.T) {
K8sClientRateLimiterQPS: 20,
K8sClientRateLimiterBurst: 30,
Namespace: "my-buildkite-ns",
Org: "my-buildkite-org",
Tags: []string{"queue=my-queue", "priority=high"},
ClusterUUID: "beefcafe-abbe-baba-abba-deedcedecade",
PrometheusPort: 9216,
Expand Down
1 change: 0 additions & 1 deletion examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ max-in-flight: 100
k8s-client-rate-limiter-qps: 20
k8s-client-rate-limiter-burst: 30
namespace: my-buildkite-ns
org: my-buildkite-org
prometheus-port: 9216
default-image-pull-policy: Never
default-image-check-pull-policy: IfNotPresent
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Config struct {
JobPrefix string `json:"job-prefix" validate:"required"`
MaxInFlight int `json:"max-in-flight" validate:"min=0"`
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
PrometheusPort uint16 `json:"prometheus-port" validate:"omitempty"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
Expand Down Expand Up @@ -96,10 +95,12 @@ type Config struct {
// then the pod will malfunction.
AllowPodSpecPatchUnsafeCmdMod bool `json:"allow-pod-spec-patch-unsafe-command-modification" validate:"omitempty"`

// BuildkiteToken and GraphQLEndpoint are deprecated - they are only used
// for integration tests.
// These are only used for integration tests.
BuildkiteToken string `json:"buildkite-token" validate:"omitempty"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
// FIXME: This is unused. Only keeping here temporarily to ease our transition.
// Once we promote our new version of k8s stack into our own CI, we can remove this line.
Org string `json:"org" validate:"omitempty"`
}

type stringSlice []string
Expand All @@ -122,7 +123,6 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddInt("job-creation-concurrency", c.JobCreationConcurrency)
enc.AddInt("max-in-flight", c.MaxInFlight)
enc.AddString("namespace", c.Namespace)
enc.AddString("org", c.Org)
if err := enc.AddArray("tags", c.Tags); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func Run(
agentClient, err := api.NewAgentClient(
agentToken,
agentEndpoint,
cfg.Org,
cfg.ClusterUUID,
queue,
cfg.Tags,
Expand All @@ -113,7 +112,6 @@ func Run(
// Monitor polls Buildkite for jobs. It passes them to Limiter.
m, err := monitor.New(logger.Named("monitor"), k8sClient, agentClient, monitor.Config{
Namespace: cfg.Namespace,
Org: cfg.Org,
ClusterUUID: cfg.ClusterUUID,
Queue: queue,
MaxInFlight: cfg.MaxInFlight,
Expand Down
3 changes: 0 additions & 3 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type Config struct {
Queue string
MaxInFlight int
PollInterval time.Duration
Org string
Tags []string // same as TagMap but in k=v form
TagMap map[string]string // same as Tags but in map form
EnableQueuePause bool
Expand All @@ -39,8 +38,6 @@ type Config struct {
}

func New(logger *zap.Logger, k8s kubernetes.Interface, agentClient *api.AgentClient, cfg Config) (*Monitor, error) {
logger = logger.With(zap.String("org", cfg.Org))

// Poll no more frequently than every 1s (please don't DoS us).
cfg.PollInterval = max(cfg.PollInterval, time.Second)

Expand Down
25 changes: 17 additions & 8 deletions internal/integration/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestCleanupOrphanedPipelines(t *testing.T) {
ctx := context.Background()
graphqlClient := api.NewGraphQLClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint)

pipelines, err := api.SearchPipelines(ctx, graphqlClient, cfg.Org, "test-", 100)
pipelines, err := api.SearchPipelines(ctx, graphqlClient, getOrgSlug(t), "test-", 100)
require.NoError(t, err)

numPipelines := len(pipelines.Organization.Pipelines.Edges)
Expand All @@ -32,11 +32,18 @@ func TestCleanupOrphanedPipelines(t *testing.T) {
wg.Add(numPipelines)
for _, pipeline := range pipelines.Organization.Pipelines.Edges {
pipeline := pipeline // prevent loop variable capture

tc := testcase{
T: t,
GraphQL: api.NewGraphQLClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
PipelineName: pipeline.Node.Name,
}.Init()

t.Run(pipeline.Node.Name, func(t *testing.T) {
builds, err := api.GetBuilds(
ctx,
graphqlClient,
fmt.Sprintf("%s/%s", cfg.Org, pipeline.Node.Name),
fmt.Sprintf("%s/%s", tc.Org, pipeline.Node.Name),
[]api.BuildStates{api.BuildStatesRunning},
100,
)
Expand All @@ -51,11 +58,6 @@ func TestCleanupOrphanedPipelines(t *testing.T) {
assert.NoError(t, err)
}

tc := testcase{
T: t,
GraphQL: api.NewGraphQLClient(cfg.BuildkiteToken, cfg.GraphQLEndpoint),
PipelineName: pipeline.Node.Name,
}.Init()
tc.deletePipeline(ctx)
})
}
Expand All @@ -69,7 +71,7 @@ func (t testcase) deletePipeline(ctx context.Context) {
roko.WithMaxAttempts(10),
roko.WithStrategy(roko.Exponential(time.Second, 5*time.Second)),
).DoWithContext(ctx, func(r *roko.Retrier) error {
resp, err := t.Buildkite.Pipelines.Delete(cfg.Org, t.PipelineName)
resp, err := t.Buildkite.Pipelines.Delete(t.Org, t.PipelineName)
if err != nil {
if resp.StatusCode == http.StatusNotFound {
return nil
Expand All @@ -86,3 +88,10 @@ func (t testcase) deletePipeline(ctx context.Context) {
t.Logf("deleted pipeline! %s", t.PipelineName)
})
}

func getOrgSlug(t *testing.T) string {
tc := testcase{
T: t,
}.Init()
return tc.Org
}
8 changes: 4 additions & 4 deletions internal/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@ func TestMaxInFlightLimited(t *testing.T) {

for {
build, _, err := tc.Buildkite.Builds.Get(
cfg.Org,
tc.Org,
tc.PipelineName,
strconv.Itoa(buildID),
nil,
)
if err != nil {
t.Fatalf("tc.Buildkite.Builds.Get(%q, %q, %d, nil) error = %v", cfg.Org, tc.PipelineName, buildID, err)
t.Fatalf("tc.Buildkite.Builds.Get(%q, %q, %d, nil) error = %v", tc.Org, tc.PipelineName, buildID, err)
}

switch *build.State {
Expand Down Expand Up @@ -329,13 +329,13 @@ func TestMaxInFlightUnlimited(t *testing.T) {
fetchBuildStateLoop:
for {
build, _, err := tc.Buildkite.Builds.Get(
cfg.Org,
tc.Org,
tc.PipelineName,
strconv.Itoa(buildID),
nil,
)
if err != nil {
t.Fatalf("tc.Buildkite.Builds.Get(%q, %q, %d, nil) error = %v", cfg.Org, tc.PipelineName, buildID, err)
t.Fatalf("tc.Buildkite.Builds.Get(%q, %q, %d, nil) error = %v", tc.Org, tc.PipelineName, buildID, err)
}

switch *build.State {
Expand Down
Loading