Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions charts/agent-stack-k8s/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,12 @@
"type": "string",
"default": "10s",
"title": "Controls the interval between pagination cursor resets. Increasing this value will increase the number of jobs to be scheduled but also delay picking up any jobs that were missed from the start of the query."
},
"work-queue-limit": {
"type": "integer",
"default": 1000000,
"minimum": 1,
"title": "Sets the maximum number of Jobs the controller will hold in the work queue."
}
},
"examples": [
Expand Down
5 changes: 5 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func AddConfigFlags(cmd *cobra.Command) {
config.DefaultQueryResetInterval,
"Controls the interval between pagination cursor resets. Increasing this value will increase the number of jobs to be scheduled but also delay picking up any jobs that were missed from the start of the query.",
)
cmd.Flags().Int(
"work-queue-limit",
config.DefaultWorkQueueLimit,
"Sets the maximum number of Jobs the controller will hold in the work queue.",
)
cmd.Flags().String(
"image-check-container-cpu-limit",
config.DefaultImageCheckContainerCPULimit,
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestReadAndParseConfig(t *testing.T) {
DefaultImagePullPolicy: "Never",
DefaultImageCheckPullPolicy: "IfNotPresent",
EnableQueuePause: true,
WorkQueueLimit: 2_000_000,
ImageCheckContainerCPULimit: "201m",
ImageCheckContainerMemoryLimit: "129Mi",

Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ enable-queue-pause: true
pagination-page-size: 1000
pagination-depth-limit: 5
query-reset-interval: 10s
work-queue-limit: 2000000
image-check-container-cpu-limit: 201m
image-check-container-memory-limit: 129Mi

Expand Down
3 changes: 3 additions & 0 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
DefaultPaginationPageSize = 1000
DefaultPaginationDepthLimit = 2
DefaultQueryResetInterval = 10 * time.Second
DefaultWorkQueueLimit = 1_000_000
DefaultImageCheckContainerCPULimit = "200m"
DefaultImageCheckContainerMemoryLimit = "128Mi"
)
Expand Down Expand Up @@ -53,6 +54,7 @@ type Config struct {
PaginationDepthLimit int `json:"pagination-depth-limit" validate:"min=1,max=20"`
QueryResetInterval time.Duration `json:"query-reset-interval" validate:"omitempty"`
EnableQueuePause bool `json:"enable-queue-pause" validate:"omitempty"`
WorkQueueLimit int `json:"work-queue-limit" validate:"omitempty"`
// Agent endpoint is set in agent-config.

K8sClientRateLimiterQPS int `json:"k8s-client-rate-limiter-qps" validate:"omitempty"`
Expand Down Expand Up @@ -158,6 +160,7 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddInt("pagination-page-size", c.PaginationPageSize)
enc.AddInt("pagination-depth-limit", c.PaginationDepthLimit)
enc.AddDuration("query-reset-interval", c.QueryResetInterval)
enc.AddInt("work-queue-limit", c.WorkQueueLimit)
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ func Run(
// Limiter prevents scheduling more than cfg.MaxInFlight jobs at once
// (if configured) and is responsible for the priority queue of jobs.
// Once it figures out a job can be scheduled, it passes to the deduper.
limiter := limiter.New(ctx, logger.Named("limiter"), deduper, cfg.MaxInFlight, cfg.JobCreationConcurrency)
limiter := limiter.New(ctx, logger.Named("limiter"), deduper,
cfg.MaxInFlight,
cfg.JobCreationConcurrency,
cfg.WorkQueueLimit,
)
if err := limiter.RegisterInformer(ctx, informerFactory); err != nil {
logger.Fatal("failed to register limiter informer", zap.Error(err))
}
Expand Down
34 changes: 33 additions & 1 deletion internal/controller/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Limiter struct {
// Controls the number of workers that pass along jobs.
JobCreationConcurrency int

// Maximum number of jobs to store in the work queue.
WorkQueueLimit int

// Next handler in the chain.
handler model.JobHandler

Expand All @@ -56,7 +59,7 @@ type Limiter struct {

// New creates a Limiter. maxInFlight must be non-negative, but 0 is interpreted
// as no limit. Zero or negative concurrency is replaced with the default.
func New(ctx context.Context, logger *zap.Logger, scheduler model.JobHandler, maxInFlight, concurrency int) *Limiter {
func New(ctx context.Context, logger *zap.Logger, scheduler model.JobHandler, maxInFlight, concurrency, workQueueLimit int) *Limiter {
if maxInFlight < 0 {
// Using panic, because getting here is severe programmer error and the
// whole controller is still just starting up.
Expand All @@ -66,10 +69,14 @@ func New(ctx context.Context, logger *zap.Logger, scheduler model.JobHandler, ma
if concurrency <= 0 {
concurrency = config.DefaultJobCreationConcurrency
}
if workQueueLimit <= 0 {
workQueueLimit = config.DefaultWorkQueueLimit
}
l := &Limiter{
handler: scheduler,
MaxInFlight: maxInFlight,
JobCreationConcurrency: concurrency,
WorkQueueLimit: workQueueLimit,
logger: logger,
tokenBucket: make(chan struct{}, maxInFlight),
newWork: make(chan struct{}, concurrency),
Expand Down Expand Up @@ -158,6 +165,10 @@ func (l *Limiter) HandleMany(ctx context.Context, jobs []*api.AgentScheduledJob)
return nil
}

// Deduplicate the queue by ID. The deduper component (next in the chain)
// does this too but deals with jobs in the k8s cluster.
l.queue = inPlaceDedup(l.queue)

// Sort by priority, preserving existing ordering between jobs with equal
// priority.
slices.SortStableFunc(jobs, func(a, b *api.AgentScheduledJob) int {
Expand All @@ -166,6 +177,11 @@ func (l *Limiter) HandleMany(ctx context.Context, jobs []*api.AgentScheduledJob)
return cmp.Compare(b.Priority, a.Priority)
})

// Drop from the end of the queue to fit within the WorkQueueLimit.
// Do this after sorting so the highest priority jobs are more likely to
// remain.
l.queue = l.queue[:min(len(l.queue), l.WorkQueueLimit)]

if l.paused {
return nil
}
Expand Down Expand Up @@ -392,3 +408,19 @@ func (l *Limiter) tryReturnToken(source string) {
tokenOverflowCounter.WithLabelValues(source).Inc()
}
}

func inPlaceDedup(jobs []*api.AgentScheduledJob) []*api.AgentScheduledJob {
ids := make(map[string]struct{})
i := 0 // here's where the next unique job will go
for j, job := range jobs {
if _, exists := ids[job.ID]; exists {
continue // not unique
}
ids[job.ID] = struct{}{}
if i != j {
jobs[i] = job
}
i++
}
return jobs[:i]
}
4 changes: 2 additions & 2 deletions internal/controller/limiter/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLimiter(t *testing.T) {
t.Cleanup(cancel)

fakeSched := model.NewFakeScheduler(1, nil)
limiter := limiter.New(ctx, zaptest.NewLogger(t), fakeSched, 1, 1)
limiter := limiter.New(ctx, zaptest.NewLogger(t), fakeSched, 1, 1, -1)
fakeSched.EventHandler = limiter
fakeSched.Add(50)

Expand Down Expand Up @@ -56,7 +56,7 @@ func TestLimiter_SchedulerErrors(t *testing.T) {
defer cancel()

fakeSched := model.NewFakeScheduler(0, errors.New("invalid"))
limiter := limiter.New(ctx, zaptest.NewLogger(t), fakeSched, 1, 1)
limiter := limiter.New(ctx, zaptest.NewLogger(t), fakeSched, 1, 1, -1)
fakeSched.EventHandler = limiter
fakeSched.Add(50)

Expand Down