Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestReadAndParseConfig(t *testing.T) {
ImagePullBackOffGracePeriod: 60 * time.Second,
JobCancelCheckerPollInterval: 10 * time.Second,
EmptyJobGracePeriod: 50 * time.Second,
PodPendingTimeout: 15 * time.Minute,
PollInterval: 5 * time.Second,
HTTPTimeout: 1 * time.Minute,
JobCreationConcurrency: 5,
Expand Down Expand Up @@ -234,6 +235,7 @@ func TestConfigPrecedence(t *testing.T) {
assert.Equal(t, 25, cfg.MaxInFlight)
assert.Equal(t, "default", cfg.Namespace)
assert.Equal(t, 10*time.Minute, cfg.JobTTL)
assert.Equal(t, 15*time.Minute, cfg.PodPendingTimeout)
assert.False(t, cfg.Debug)
})

Expand Down
3 changes: 3 additions & 0 deletions cmd/controller/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type CLI struct {
ImagePullBackOffGracePeriod *time.Duration `kong:"name='image-pull-backoff-grace-period',help='Duration after starting a pod that the controller will wait before considering cancelling a job due to ImagePullBackOff (e.g. when the podSpec specifies container images that cannot be pulled) (default: 30s)'"`
JobCancelCheckerPollInterval *time.Duration `kong:"help='Controls the interval between job state queries while a pod is still Pending (default: 5s)'"`
EmptyJobGracePeriod *time.Duration `kong:"help='Duration after starting a Kubernetes job that the controller will wait before considering failing the job due to a missing pod (e.g. when the podSpec specifies a missing service account) (default: 30s)'"`
PodPendingTimeout *time.Duration `kong:"help='Duration after a pod enters Pending state that the controller will wait before failing the job (default: 15m)'"`

// Image settings
DefaultImagePullPolicy corev1.PullPolicy `kong:"help='Configures a default image pull policy for containers that do not specify a pull policy and non-init containers created by the stack itself'"`
Expand Down Expand Up @@ -152,6 +153,7 @@ func newConfigWithDefaults() *config.Config {
ImagePullBackOffGracePeriod: 30 * time.Second,
JobCancelCheckerPollInterval: 5 * time.Second,
EmptyJobGracePeriod: 30 * time.Second,
PodPendingTimeout: config.DefaultPodPendingTimeout,
PaginationPageSize: 1000,
PaginationDepthLimit: 2,
QueryResetInterval: 10 * time.Second,
Expand Down Expand Up @@ -230,6 +232,7 @@ func convertDurations(m map[string]any) {
"image-pull-backoff-grace-period": {},
"job-cancel-checker-poll-interval": {},
"empty-job-grace-period": {},
"pod-pending-timeout": {},
}

for key, value := range m {
Expand Down
1 change: 1 addition & 0 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ default-termination-grace-period-seconds: 80
image-pull-backoff-grace-period: 60s
job-cancel-checker-poll-interval: 10s
empty-job-grace-period: 50s
pod-pending-timeout: 15m
poll-interval: 5s
job-creation-concurrency: 5
max-in-flight: 100
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
DefaultImagePullBackOffGracePeriod = 30 * time.Second
DefaultJobCancelCheckerPollInterval = 5 * time.Second
DefaultEmptyJobGracePeriod = 30 * time.Second
DefaultPodPendingTimeout = 15 * time.Minute
DefaultJobCreationConcurrency = 25
DefaultK8sClientRateLimiterQPS = 10
DefaultK8sClientRateLimiterBurst = 20
Expand Down Expand Up @@ -83,6 +84,7 @@ type Config struct {
ImagePullBackOffGracePeriod time.Duration `json:"image-pull-backoff-grace-period" validate:"omitempty"`
JobCancelCheckerPollInterval time.Duration `json:"job-cancel-checker-poll-interval" validate:"omitempty"`
EmptyJobGracePeriod time.Duration `json:"empty-job-grace-period" validate:"omitempty"`
PodPendingTimeout time.Duration `json:"pod-pending-timeout" validate:"omitempty"`

// WorkspaceVolume allows supplying a volume for /workspace. By default
// an EmptyDir volume is created for it.
Expand Down Expand Up @@ -173,7 +175,6 @@ func setEnvNegatedOpt(ctr *corev1.Container, name string, value *bool) {
return
}
setEnv(ctr, name, strconv.FormatBool(!*value))

}

// setEnvCommaSep sets an env var to a comma-separated list of values, if not
Expand Down
175 changes: 175 additions & 0 deletions internal/controller/scheduler/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"cmp"
"context"
"fmt"
"log/slog"
"regexp"
"slices"
Expand Down Expand Up @@ -35,6 +36,10 @@ type podWatcher struct {
// creation before it cancels the job.
imagePullBackOffGracePeriod time.Duration

// podPendingTimeout waits this duration for a pod that has entered
// pending state before it will cancel the job
podPendingTimeout time.Duration

// Jobs that we've failed, cancelled, or were found to be in a terminal
// state.
ignoredJobsMu sync.RWMutex
Expand All @@ -45,6 +50,10 @@ type podWatcher struct {
watchingForImageFailureMu sync.Mutex
watchingForImageFailure map[uuid.UUID]*corev1.Pod

// Pods being watched for exceeding the pending timeout
watchingForPendingTimeoutMu sync.Mutex
watchingForPendingTimeout map[uuid.UUID]*corev1.Pod

// Buildkite job checker for handling job cancellation
// There is an argument to be made that these should live outside the podWatcher.
// But, since these are designed to work for Pending pods only, podWatcher largely impact how these work.
Expand Down Expand Up @@ -75,22 +84,28 @@ type podWatcher struct {
// early.
func NewPodWatcher(logger *slog.Logger, k8s kubernetes.Interface, agentClient *api.AgentClient, cfg *config.Config) *podWatcher {
imagePullBackOffGracePeriod := cfg.ImagePullBackOffGracePeriod
podPendingTimeout := cfg.PodPendingTimeout
if imagePullBackOffGracePeriod <= 0 {
imagePullBackOffGracePeriod = config.DefaultImagePullBackOffGracePeriod
}
jobCancelCheckerInterval := cfg.JobCancelCheckerPollInterval
if jobCancelCheckerInterval <= 0 {
jobCancelCheckerInterval = config.DefaultJobCancelCheckerPollInterval
}
if podPendingTimeout <= 0 {
podPendingTimeout = config.DefaultPodPendingTimeout
}

pw := &podWatcher{
logger: logger,
k8s: k8s,
agentClient: agentClient,
cfg: cfg,
imagePullBackOffGracePeriod: imagePullBackOffGracePeriod,
podPendingTimeout: podPendingTimeout,
ignoredJobs: make(map[uuid.UUID]struct{}),
watchingForImageFailure: make(map[uuid.UUID]*corev1.Pod),
watchingForPendingTimeout: make(map[uuid.UUID]*corev1.Pod),
}
pw.batchBkJobChecker = NewBatchBuildkiteJobChecker(logger, agentClient, k8s, jobCancelCheckerInterval)
podWatcherIgnoredJobsGaugeFunc = func() int {
Expand Down Expand Up @@ -118,6 +133,7 @@ func (w *podWatcher) RegisterInformer(ctx context.Context, factory informers.Sha
w.resourceEventHandlerCtx = ctx // 😡
go factory.Start(ctx.Done())
go w.imageFailureChecker(ctx, w.logger)
go w.pendingTimeoutChecker(ctx, w.logger)
return nil
}

Expand All @@ -138,6 +154,7 @@ func (w *podWatcher) OnDelete(previousState any) {

// No need to continue watching for image-related failures or cancellation.
w.stopWatchingForImageFailure(jobUUID)
w.stopWatchingForPendingTimeout(jobUUID)
w.stopCheckingBuildkiteJob(jobUUID)

// The pod is gone, so we can stop ignoring it (if it comes back).
Expand Down Expand Up @@ -190,12 +207,14 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) {
// Running: the agent container has started or is about to start, and it
// can handle the cancellation and exit.
w.stopCheckingBuildkiteJob(jobUUID)
w.stopWatchingForPendingTimeout(jobUUID)

default:
// Succeeded, Failed: it's already over.
// Unknown: probably shouldn't interfere.
w.stopWatchingForImageFailure(jobUUID)
w.stopCheckingBuildkiteJob(jobUUID)
w.stopWatchingForPendingTimeout(jobUUID)
}

if w.isIgnored(jobUUID) {
Expand All @@ -213,6 +232,7 @@ func (w *podWatcher) runChecks(ctx context.Context, pod *corev1.Pod) {
switch pod.Status.Phase {
case corev1.PodPending:
w.watchForImageFailure(jobUUID, pod)
w.watchForPendingTimeout(jobUUID, pod)
w.batchBkJobChecker.AddJob(jobUUID, pod.ObjectMeta)

case corev1.PodRunning:
Expand Down Expand Up @@ -463,6 +483,149 @@ func (w *podWatcher) imageFailureChecker(ctx context.Context, log *slog.Logger)
}
}

// pendingTimeoutChecker is a goroutine that periodically checks pending pods
// to see if they have exceeded the pending timeout.
func (w *podWatcher) pendingTimeoutChecker(ctx context.Context, log *slog.Logger) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return

case <-ticker.C:
// continue below
}
Comment thread
jeremybumsted marked this conversation as resolved.
Outdated

var timedOutPods []*corev1.Pod

// Check all pending pods for timeout.
w.watchingForPendingTimeoutMu.Lock()
for jobUUID, pod := range w.watchingForPendingTimeout {
if w.podHasExceededPendingTimeout(log, pod) {
log.Debug(
"pod has exceeded pending timeout",
"pod_name", pod.Name,
)
timedOutPods = append(timedOutPods, pod)
delete(w.watchingForPendingTimeout, jobUUID)
}
}
w.watchingForPendingTimeoutMu.Unlock()

// Fail the corresponding jobs on Buildkite and delete the pods.
for _, pod := range timedOutPods {
w.failForPendingTimeout(ctx, log, pod)
}
}
}

// podHasExceededPendingTimeout checks if a pod has been in Pending state for
// longer than the configured timeout. It excludes pods that are already being
// handled by the image failure checker (ImagePullBackOff, ErrImageNeverPull, etc).
func (w *podWatcher) podHasExceededPendingTimeout(log *slog.Logger, pod *corev1.Pod) bool {
// Only check pods that are actually in Pending state
if pod.Status.Phase != corev1.PodPending {
return false
}

// Exclude pods that are already being handled by image failure checker
// Check init containers for image-related failures
for _, containerStatus := range pod.Status.InitContainerStatuses {
waiting := containerStatus.State.Waiting
if waiting != nil {
switch waiting.Reason {
case "ImagePullBackOff", "ErrImageNeverPull", "InvalidImageName":
// This pod is already being handled by the image failure checker
log.Debug("Excluding pod from pending timeout check due to image failure", "reason", waiting.Reason)
return false
}
}
}

// Check regular containers for image-related failures
for _, containerStatus := range pod.Status.ContainerStatuses {
waiting := containerStatus.State.Waiting
if waiting != nil {
switch waiting.Reason {
case "ImagePullBackOff", "ErrImageNeverPull", "InvalidImageName":
// This pod is already being handled by the image failure checker
log.Debug("Excluding pod from pending timeout check due to image failure", "reason", waiting.Reason)
return false
}
}
}

// Check if the pod has exceeded the timeout
// Use CreationTimestamp instead of StartTime because StartTime is only set
// when a pod is assigned to a node. For pods that can't be scheduled
// (e.g., node selector mismatch, insufficient resources), StartTime remains nil.
createdAt := pod.CreationTimestamp.Time
if createdAt.IsZero() {
// Shouldn't happen, but be safe
return false
}

if time.Since(createdAt) < w.podPendingTimeout {
// Pod hasn't been pending long enough
return false
}

return true
Comment thread
jeremybumsted marked this conversation as resolved.
Outdated
}

// failForPendingTimeout fails the job on Buildkite and deletes the pod.
func (w *podWatcher) failForPendingTimeout(ctx context.Context, log *slog.Logger, pod *corev1.Pod) {
log = loggerForObject(log, pod)
jobUUID, err := jobUUIDForObject(pod)
if err != nil {
log.Error("Could not find Job UUID from pod metadata", "error", err)
return
}

log.Info("Pod has been pending too long. Failing job.")

// Create the failure message
pendingDuration := time.Since(pod.CreationTimestamp.Time)
message := fmt.Sprintf("The pod has been in Pending state for %s without starting.\n", pendingDuration.Round(time.Second))

// Add pod conditions to help diagnose the issue
if len(pod.Status.Conditions) > 0 {
message += "\nPod Conditions:\n"
for _, condition := range pod.Status.Conditions {
message += fmt.Sprintf(" - %s: %s (Reason: %s, Message: %s)\n",
condition.Type, condition.Status, condition.Reason, condition.Message)
}
}

failureInfo := FailureInfo{
Message: message,
Reason: agent.SignalReasonStackError,
}

// Fail the job on Buildkite
if err := failForK8sObject(ctx, log, pod, failureInfo, w.agentClient); err != nil {
log.Error("Could not fail Buildkite job", "error", err)
podWatcherBuildkiteJobFailErrorsCounter.Inc()

// If the error is permanent, ignore the job to prevent infinite retry
if api.IsPermanentError(err) {
w.ignoreJob(jobUUID)
}
return
}
podWatcherBuildkiteJobFailsCounter.Inc()

// Delete the pod
if err := forcefullyDeletePod(ctx, log, w.k8s, &pod.ObjectMeta, "pending_timeout"); err != nil {
return
}

// Mark the job as ignored to avoid further processing
w.ignoreJob(jobUUID)
}

// failingPod captures information about a pending or running pod that is now
// failing.
type failingPod struct {
Expand Down Expand Up @@ -587,6 +750,18 @@ func (w *podWatcher) stopWatchingForImageFailure(jobUUID uuid.UUID) {
delete(w.watchingForImageFailure, jobUUID)
}

func (w *podWatcher) watchForPendingTimeout(jobUUID uuid.UUID, pod *corev1.Pod) {
w.watchingForPendingTimeoutMu.Lock()
defer w.watchingForPendingTimeoutMu.Unlock()
w.watchingForPendingTimeout[jobUUID] = pod
}

func (w *podWatcher) stopWatchingForPendingTimeout(jobUUID uuid.UUID) {
w.watchingForPendingTimeoutMu.Lock()
defer w.watchingForPendingTimeoutMu.Unlock()
delete(w.watchingForPendingTimeout, jobUUID)
}

// All container-\d containers will have the agent installed as their PID 1.
// Therefore, their lifecycle is well monitored in our backend, allowing us to terminate them if they fail to start.
//
Expand Down
Loading