Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestReadAndParseConfig(t *testing.T) {
ImagePullBackOffGracePeriod: 60 * time.Second,
JobCancelCheckerPollInterval: 10 * time.Second,
EmptyJobGracePeriod: 50 * time.Second,
PodPendingTimeout: 15 * time.Minute,
PollInterval: 5 * time.Second,
HTTPTimeout: 1 * time.Minute,
JobCreationConcurrency: 5,
Expand Down Expand Up @@ -234,6 +235,7 @@ func TestConfigPrecedence(t *testing.T) {
assert.Equal(t, 25, cfg.MaxInFlight)
assert.Equal(t, "default", cfg.Namespace)
assert.Equal(t, 10*time.Minute, cfg.JobTTL)
assert.Equal(t, 15*time.Minute, cfg.PodPendingTimeout)
assert.False(t, cfg.Debug)
})

Expand Down
3 changes: 3 additions & 0 deletions cmd/controller/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type CLI struct {
ImagePullBackOffGracePeriod *time.Duration `kong:"name='image-pull-backoff-grace-period',help='Duration after starting a pod that the controller will wait before considering cancelling a job due to ImagePullBackOff (e.g. when the podSpec specifies container images that cannot be pulled) (default: 30s)'"`
JobCancelCheckerPollInterval *time.Duration `kong:"help='Controls the interval between job state queries while a pod is still Pending (default: 5s)'"`
EmptyJobGracePeriod *time.Duration `kong:"help='Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account) (default: 30s)'"`
PodPendingTimeout *time.Duration `kong:"help='Duration after a pod enters Pending state that the controller will wait before failing the job (default: 15m)'"`

// Image settings
DefaultImagePullPolicy corev1.PullPolicy `kong:"help='Configures a default image pull policy for containers that do not specify a pull policy and non-init containers created by the stack itself'"`
Expand Down Expand Up @@ -152,6 +153,7 @@ func newConfigWithDefaults() *config.Config {
ImagePullBackOffGracePeriod: 30 * time.Second,
JobCancelCheckerPollInterval: 5 * time.Second,
EmptyJobGracePeriod: 30 * time.Second,
PodPendingTimeout: config.DefaultPodPendingTimeout,
PaginationPageSize: 1000,
PaginationDepthLimit: 2,
QueryResetInterval: 10 * time.Second,
Expand Down Expand Up @@ -230,6 +232,7 @@ func convertDurations(m map[string]any) {
"image-pull-backoff-grace-period": {},
"job-cancel-checker-poll-interval": {},
"empty-job-grace-period": {},
"pod-pending-timeout": {},
}

for key, value := range m {
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default-termination-grace-period-seconds: 80
image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
empty-job-grace-period: 50s
pod-pending-timeout: 15m
poll-interval: 5s
job-creation-concurrency: 5
max-in-flight: 100
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
DefaultImagePullBackOffGracePeriod = 30 * time.Second
DefaultJobCancelCheckerPollInterval = 5 * time.Second
DefaultEmptyJobGracePeriod = 30 * time.Second
DefaultPodPendingTimeout = 15 * time.Minute
DefaultJobCreationConcurrency = 25
DefaultK8sClientRateLimiterQPS = 10
DefaultK8sClientRateLimiterBurst = 20
Expand Down Expand Up @@ -83,6 +84,7 @@ type Config struct {
ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"`
JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"`
EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"`
PodPendingTimeout time.Duration `json:"pod-pending-timeout" validate:"omitempty"`

// WorkspaceVolume allows supplying a volume for /workspace. By default
// an EmptyDir volume is created for it.
Expand Down Expand Up @@ -173,7 +175,6 @@ func setEnvNegatedOpt(ctr *corev1.Container, name string, value *bool) {
return
}
setEnv(ctr, name, strconv.FormatBool(!*value))

}

// setEnvCommaSep sets an env var to a comma-separated list of values, if not
Expand Down
164 changes: 164 additions & 0 deletions internal/controller/scheduler/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"cmp"
"context"
"fmt"
"log/slog"
"regexp"
"slices"
Expand Down Expand Up @@ -35,6 +36,10 @@ type podWatcher struct {
// creation before it cancels the job.
imagePullBackOffGracePeriod time.Duration

// podPendingTimeout waits this duration for a pod that has entered
// pending state before it will cancel the job
podPendingTimeout time.Duration

// Jobs that we've failed, cancelled, or were found to be in a terminal
// state.
ignoredJobsMu sync.RWMutex
Expand All @@ -45,6 +50,10 @@ type podWatcher struct {
watchingForImageFailureMu sync.Mutex
watchingForImageFailure map[uuid.UUID]*corev1.Pod

// Pods being watched for exceeding the pending timeout
watchingForPendingTimeoutMu sync.Mutex
watchingForPendingTimeout map[uuid.UUID]*corev1.Pod

// Buildkite job checker for handling job cancellation
// There is an argument to be made that these should live outside the podWatcher.
// But, since these are designed to work for Pending pods only, podWatcher largely impact how these work.
Expand Down Expand Up @@ -75,22 +84,28 @@ type podWatcher struct {
// early.
func NewPodWatcher(logger *slog.Logger, k8s kubernetes.Interface, agentClient *api.AgentClient, cfg *config.Config) *podWatcher {
imagePullBackOffGracePeriod := cfg.ImagePullBackOffGracePeriod
podPendingTimeout := cfg.PodPendingTimeout
if imagePullBackOffGracePeriod <= 0 {
imagePullBackOffGracePeriod = config.DefaultImagePullBackOffGracePeriod
}
jobCancelCheckerInterval := cfg.JobCancelCheckerPollInterval
if jobCancelCheckerInterval <= 0 {
jobCancelCheckerInterval = config.DefaultJobCancelCheckerPollInterval
}
if podPendingTimeout <= 0 {
podPendingTimeout = config.DefaultPodPendingTimeout
}

pw := &podWatcher{
logger: logger,
k8s: k8s,
agentClient: agentClient,
cfg: cfg,
imagePullBackOffGracePeriod: imagePullBackOffGracePeriod,
podPendingTimeout: podPendingTimeout,
ignoredJobs: make(map[uuid.UUID]struct{}),
watchingForImageFailure: make(map[uuid.UUID]*corev1.Pod),
watchingForPendingTimeout: make(map[uuid.UUID]*corev1.Pod),
}
pw.batchBkJobChecker = NewBatchBuildkiteJobChecker(logger, agentClient, k8s, jobCancelCheckerInterval)
podWatcherIgnoredJobsGaugeFunc = func() int {
Expand Down Expand Up @@ -118,6 +133,7 @@ func (w *podWatcher) RegisterInformer(ctx context.Context, factory informers.Sha
w.resourceEventHandlerCtx = ctx // 😡
go factory.Start(ctx.Done())
go w.imageFailureChecker(ctx, w.logger)
go w.pendingTimeoutChecker(ctx, w.logger)
return nil
}

Expand All @@ -138,6 +154,7 @@ func (w *podWatcher) OnDelete(previousState any) {

// No need to continue watching for image-related failures or cancellation.
w.stopWatchingForImageFailure(jobUUID)
w.stopWatchingForPendingTimeout(jobUUID)
w.stopCheckingBuildkiteJob(jobUUID)

// The pod is gone, so we can stop ignoring it (if it comes back).
Expand Down Expand Up @@ -190,12 +207,14 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) {
// Running: the agent container has started or is about to start, and it
// can handle the cancellation and exit.
w.stopCheckingBuildkiteJob(jobUUID)
w.stopWatchingForPendingTimeout(jobUUID)

default:
// Succeeded, Failed: it's already over.
// Unknown: probably shouldn't interfere.
w.stopWatchingForImageFailure(jobUUID)
w.stopCheckingBuildkiteJob(jobUUID)
w.stopWatchingForPendingTimeout(jobUUID)
}

if w.isIgnored(jobUUID) {
Expand All @@ -213,6 +232,7 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) {
switch pod.Status.Phase {
case corev1.PodPending:
w.watchForImageFailure(jobUUID, pod)
w.watchForPendingTimeout(jobUUID, pod)
w.batchBkJobChecker.AddJob(jobUUID, pod.ObjectMeta)

case corev1.PodRunning:
Expand Down Expand Up @@ -463,6 +483,138 @@ func (w *podWatcher) imageFailureChecker(ctx context.Context, log *slog.Logger)
}
}

// pendingTimeoutChecker is a goroutine that periodically checks pending pods
// to see if they have exceeded the pending timeout.
func (w *podWatcher) pendingTimeoutChecker(ctx context.Context, log *slog.Logger) {
ticker := time.Tick(time.Second)

for {
select {
case <-ctx.Done():
return

case <-ticker:
// continue below
}

var timedOutPods []*corev1.Pod

// Check all pending pods for timeout.
w.watchingForPendingTimeoutMu.Lock()
for jobUUID, pod := range w.watchingForPendingTimeout {
if w.podHasExceededPendingTimeout(log, pod) {
log.Debug(
"pod has exceeded pending timeout",
"pod_name", pod.Name,
)
timedOutPods = append(timedOutPods, pod)
delete(w.watchingForPendingTimeout, jobUUID)
}
}
w.watchingForPendingTimeoutMu.Unlock()

// Fail the corresponding jobs on Buildkite and delete the pods.
for _, pod := range timedOutPods {
w.failForPendingTimeout(ctx, log, pod)
}
}
}

// podHasExceededPendingTimeout checks if a pod has been in Pending state for
// longer than the configured timeout. It excludes pods that are already being
// handled by the image failure checker (ImagePullBackOff, ErrImageNeverPull, etc).
func (w *podWatcher) podHasExceededPendingTimeout(log *slog.Logger, pod *corev1.Pod) bool {
// Only check pods that are actually in Pending state
if pod.Status.Phase != corev1.PodPending {
return false
}

// Exclude pods that are already being handled by image failure checker
// Check init containers for image-related failures
for _, containerStatus := range pod.Status.InitContainerStatuses {
waiting := containerStatus.State.Waiting
if waiting != nil {
switch waiting.Reason {
case "ImagePullBackOff", "ErrImageNeverPull", "InvalidImageName":
// This pod is already being handled by the image failure checker
log.Debug("Excluding pod from pending timeout check due to image failure", "reason", waiting.Reason)
return false
}
}
}

// Check regular containers for image-related failures
for _, containerStatus := range pod.Status.ContainerStatuses {
waiting := containerStatus.State.Waiting
if waiting != nil {
switch waiting.Reason {
case "ImagePullBackOff", "ErrImageNeverPull", "InvalidImageName":
// This pod is already being handled by the image failure checker
log.Debug("Excluding pod from pending timeout check due to image failure", "reason", waiting.Reason)
return false
}
}
}

// Check if the pod has exceeded the timeout
// Use CreationTimestamp instead of StartTime because StartTime is only set
// when a pod is assigned to a node. For pods that can't be scheduled
// (e.g., node selector mismatch, insufficient resources), StartTime remains nil.
createdAt := pod.CreationTimestamp.Time
return time.Since(createdAt) >= w.podPendingTimeout
}

// failForPendingTimeout fails the job on Buildkite and deletes the pod.
func (w *podWatcher) failForPendingTimeout(ctx context.Context, log *slog.Logger, pod *corev1.Pod) {
log = loggerForObject(log, pod)
jobUUID, err := jobUUIDForObject(pod)
if err != nil {
log.Error("Could not find Job UUID from pod metadata", "error", err)
return
}

log.Info("Pod has been pending too long. Failing job.")

// Create the failure message
pendingDuration := time.Since(pod.CreationTimestamp.Time)
message := fmt.Sprintf("The pod has been in Pending state for %s without starting.\n", pendingDuration.Round(time.Second))

// Add pod conditions to help diagnose the issue
if len(pod.Status.Conditions) > 0 {
message += "\nPod Conditions:\n"
for _, condition := range pod.Status.Conditions {
message += fmt.Sprintf(" - %s: %s (Reason: %s, Message: %s)\n",
condition.Type, condition.Status, condition.Reason, condition.Message)
}
}

failureInfo := FailureInfo{
Message: message,
Reason: agent.SignalReasonStackError,
}

// Fail the job on Buildkite
if err := failForK8sObject(ctx, log, pod, failureInfo, w.agentClient); err != nil {
log.Error("Could not fail Buildkite job", "error", err)
podWatcherBuildkiteJobFailErrorsCounter.Inc()

// If the error is permanent, ignore the job to prevent infinite retry
if api.IsPermanentError(err) {
w.ignoreJob(jobUUID)
}
return
}
podWatcherBuildkiteJobFailsCounter.Inc()

// Delete the pod
if err := forcefullyDeletePod(ctx, log, w.k8s, &pod.ObjectMeta, "pending_timeout"); err != nil {
return
}

// Mark the job as ignored to avoid further processing
w.ignoreJob(jobUUID)
}

// failingPod captures information about a pending or running pod that is now
// failing.
type failingPod struct {
Expand Down Expand Up @@ -587,6 +739,18 @@ func (w *podWatcher) stopWatchingForImageFailure(jobUUID uuid.UUID) {
delete(w.watchingForImageFailure, jobUUID)
}

func (w *podWatcher) watchForPendingTimeout(jobUUID uuid.UUID, pod *corev1.Pod) {
w.watchingForPendingTimeoutMu.Lock()
defer w.watchingForPendingTimeoutMu.Unlock()
w.watchingForPendingTimeout[jobUUID] = pod
}

func (w *podWatcher) stopWatchingForPendingTimeout(jobUUID uuid.UUID) {
w.watchingForPendingTimeoutMu.Lock()
defer w.watchingForPendingTimeoutMu.Unlock()
delete(w.watchingForPendingTimeout, jobUUID)
}

// All container-\d containers will have the agent installed as their PID 1.
// Therefore, their lifecycle is well monitored in our backend, allowing us to terminate them if they fail to start.
//
Expand Down
Loading