Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,28 @@

package org.apache.spark.k8s.operator.reconciler.observers;

import static org.apache.spark.k8s.operator.Constants.DRIVER_READY;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverReady;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.Pod;

import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.status.ApplicationState;
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.utils.PodUtils;

/** Observes whether driver is ready */
public class AppDriverReadyObserver extends BaseAppDriverObserver {
@Override
public Optional<ApplicationState> observe(
Pod driver, ApplicationSpec spec, ApplicationStatus currentStatus) {
if (ApplicationStateSummary.DriverReady.ordinal()
<= currentStatus.getCurrentState().getCurrentStateSummary().ordinal()) {
Pod driver, ApplicationSpec spec, ApplicationStatus status) {
if (DriverReady.ordinal() <= status.getCurrentState().getCurrentStateSummary().ordinal()) {
return Optional.empty();
}
if (PodUtils.isPodReady(driver)) {
return Optional.of(
new ApplicationState(ApplicationStateSummary.DriverReady, Constants.DRIVER_READY));
return Optional.of(new ApplicationState(DriverReady, DRIVER_READY));
}
return observeDriverTermination(driver, true, spec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.spark.k8s.operator.status.ApplicationStatus;

/**
* Observes whether driver reaches running state (in other words, whether its at least scheduled)
* Observes whether driver pod reaches running state (in other words, whether its at least
* scheduled). Note that this means `DriverStarted` and we need to wait `DriverReady` as a next
* step.
*/
public class AppDriverRunningObserver extends BaseAppDriverObserver {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@

package org.apache.spark.k8s.operator.reconciler.observers;

import static org.apache.spark.k8s.operator.Constants.DRIVER_RUNNING;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.DriverStarted;

import java.util.Optional;

import io.fabric8.kubernetes.api.model.Pod;

import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.status.ApplicationState;
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.k8s.operator.utils.PodUtils;

public class AppDriverStartObserver extends BaseAppDriverObserver {
@Override
public Optional<ApplicationState> observe(
Pod driver, ApplicationSpec spec, ApplicationStatus currentStatus) {
if (ApplicationStateSummary.DriverStarted.ordinal()
if (DriverStarted.ordinal()
<= currentStatus.getCurrentState().getCurrentStateSummary().ordinal()) {
return Optional.empty();
}
if (PodUtils.isDriverPodStarted(driver, spec)) {
return Optional.of(
new ApplicationState(ApplicationStateSummary.DriverStarted, Constants.DRIVER_RUNNING));
return Optional.of(new ApplicationState(DriverStarted, DRIVER_RUNNING));
}
return observeDriverTermination(driver, false, spec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@ public class AppDriverTimeoutObserver extends BaseAppDriverObserver {
* This helps to avoid resource deadlock when app cannot proceed. Such states include:
*
* <ul>
* <li>DRIVER_REQUESTED -> goes to DRIVER_LAUNCH_TIMED_OUT if driver pod cannot be scheduled or
* cannot start running
* <li>DRIVER_REQUESTED -> goes to DRIVER_LAUNCH_TIMED_OUT if driver pod cannot be scheduled or
* cannot start running
* <li>DRIVER_STARTED -> goes to SPARK_SESSION_INITIALIZATION_TIMED_OUT if Spark session cannot
* be initialized
* <li>DRIVER_READY / EXECUTOR_REQUESTED / EXECUTOR_SCHEDULED /
* INITIALIZED_BELOW_THRESHOLD_EXECUTORS -> go to EXECUTORS_LAUNCH_TIMED_OUT if app cannot
* acquire at least minimal executors in given time
* <li>DriverRequested goes to DriverStartTimedOut if driver pod cannot be scheduled or cannot
* start running
* <li>DriverStarted goes to DriverReadyTimedOut if Spark session cannot be initialized
* <li>DriverReady and InitializedBelowThresholdExecutors goes to ExecutorsStartTimedOut if app
* cannot acquire at least minimal executors in the given time
* </ul>
*
* <p>Operator will NOT proactively stop the app if it has acquired enough executors and later
Expand All @@ -57,36 +53,36 @@ public class AppDriverTimeoutObserver extends BaseAppDriverObserver {
*/
@Override
public Optional<ApplicationState> observe(
Pod driver, ApplicationSpec spec, ApplicationStatus currentStatus) {
Pod driver, ApplicationSpec spec, ApplicationStatus status) {
long timeoutThreshold;
Supplier<ApplicationState> supplier;
ApplicationTimeoutConfig timeoutConfig =
spec.getApplicationTolerations().getApplicationTimeoutConfig();
switch (currentStatus.getCurrentState().getCurrentStateSummary()) {
case DriverRequested:
ApplicationState state = status.getCurrentState();
switch (state.getCurrentStateSummary()) {
case DriverRequested -> {
timeoutThreshold = timeoutConfig.getDriverStartTimeoutMillis();
supplier = SparkAppStatusUtils::driverLaunchTimedOut;
break;
case DriverStarted:
}
case DriverStarted -> {
timeoutThreshold = timeoutConfig.getDriverReadyTimeoutMillis();
supplier = SparkAppStatusUtils::driverReadyTimedOut;
break;
case DriverReady:
case InitializedBelowThresholdExecutors:
}
case DriverReady, InitializedBelowThresholdExecutors -> {
timeoutThreshold = timeoutConfig.getExecutorStartTimeoutMillis();
supplier = SparkAppStatusUtils::executorLaunchTimedOut;
break;
default:
}
default -> {
// No timeout check needed for other states
return Optional.empty();
}
}
Instant lastTransitionTime =
Instant.parse(currentStatus.getCurrentState().getLastTransitionTime());
Instant lastTransitionTime = Instant.parse(state.getLastTransitionTime());
if (timeoutThreshold > 0L
&& lastTransitionTime.plusMillis(timeoutThreshold).isBefore(Instant.now())) {
ApplicationState state = supplier.get();
state.setLastObservedDriverStatus(driver.getStatus());
return Optional.of(state);
ApplicationState appState = supplier.get();
appState.setLastObservedDriverStatus(driver.getStatus());
return Optional.of(appState);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

package org.apache.spark.k8s.operator.reconciler.observers;

import static org.apache.spark.k8s.operator.Constants.*;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.Failed;
import static org.apache.spark.k8s.operator.status.ApplicationStateSummary.Succeeded;

import java.util.List;
import java.util.Optional;

import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import lombok.extern.slf4j.Slf4j;

import org.apache.spark.k8s.operator.Constants;
import org.apache.spark.k8s.operator.spec.ApplicationSpec;
import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
import org.apache.spark.k8s.operator.status.ApplicationState;
Expand Down Expand Up @@ -68,83 +72,71 @@ public abstract class BaseAppDriverObserver
*
* @param driverPod the driverPod
* @param driverReady whether SparkContext / SparkSession has ever been initialized for this pod
* @param spec the application spec
* @return the ApplicationState to be updated if pod is terminated. Returning empty if pod is
* still running
*/
protected Optional<ApplicationState> observeDriverTermination(
final Pod driverPod, final boolean driverReady, final ApplicationSpec spec) {
if (driverPod.getStatus() == null
|| driverPod.getStatus().getContainerStatuses() == null
|| driverPod.getStatus().getContainerStatuses().isEmpty()) {
PodStatus status = driverPod.getStatus();
if (status == null
|| status.getContainerStatuses() == null
|| status.getContainerStatuses().isEmpty()) {
log.warn("Cannot determine driver pod status, the pod may in pending state.");
return Optional.empty();
}

if (PodPhase.FAILED.equals(PodPhase.getPhase(driverPod))) {
ApplicationState applicationState =
new ApplicationState(ApplicationStateSummary.Failed, Constants.DRIVER_FAILED_MESSAGE);
if ("Evicted".equalsIgnoreCase(driverPod.getStatus().getReason())) {
applicationState =
new ApplicationState(
ApplicationStateSummary.DriverEvicted, Constants.DRIVER_FAILED_MESSAGE);
ApplicationState state = new ApplicationState(Failed, DRIVER_FAILED_MESSAGE);
if ("Evicted".equalsIgnoreCase(status.getReason())) {
state = new ApplicationState(ApplicationStateSummary.DriverEvicted, DRIVER_FAILED_MESSAGE);
}
applicationState.setLastObservedDriverStatus(driverPod.getStatus());
return Optional.of(applicationState);
state.setLastObservedDriverStatus(status);
return Optional.of(state);
}

if (PodPhase.SUCCEEDED.equals(PodPhase.getPhase(driverPod))) {
ApplicationState state;
if (driverReady) {
state =
new ApplicationState(
ApplicationStateSummary.Succeeded, Constants.DRIVER_COMPLETED_MESSAGE);
state = new ApplicationState(Succeeded, DRIVER_COMPLETED_MESSAGE);
} else {
state =
new ApplicationState(
ApplicationStateSummary.Failed,
Constants.DRIVER_TERMINATED_BEFORE_INITIALIZATION_MESSAGE);
state.setLastObservedDriverStatus(driverPod.getStatus());
state = new ApplicationState(Failed, DRIVER_TERMINATED_BEFORE_INITIALIZATION_MESSAGE);
state.setLastObservedDriverStatus(status);
}
return Optional.of(state);
}

List<ContainerStatus> initContainerStatusList =
driverPod.getStatus().getInitContainerStatuses();
List<ContainerStatus> initContainerStatusList = status.getInitContainerStatuses();
if (initContainerStatusList != null
&& initContainerStatusList.parallelStream().anyMatch(PodUtils::isContainerFailed)) {
ApplicationState applicationState =
new ApplicationState(
ApplicationStateSummary.Failed, Constants.DRIVER_FAILED_INIT_CONTAINERS_MESSAGE);
applicationState.setLastObservedDriverStatus(driverPod.getStatus());
new ApplicationState(Failed, DRIVER_FAILED_INIT_CONTAINERS_MESSAGE);
applicationState.setLastObservedDriverStatus(status);
return Optional.of(applicationState);
}
List<ContainerStatus> containerStatusList = driverPod.getStatus().getContainerStatuses();
List<ContainerStatus> containerStatusList = status.getContainerStatuses();
List<ContainerStatus> terminatedCriticalContainers =
ModelUtils.findDriverMainContainerStatus(spec, containerStatusList).stream()
.filter(PodUtils::isContainerTerminated)
.toList();

if (!terminatedCriticalContainers.isEmpty()) {
ApplicationState applicationState;
ApplicationState state;
if (terminatedCriticalContainers.parallelStream().anyMatch(PodUtils::isContainerFailed)) {
applicationState =
new ApplicationState(ApplicationStateSummary.Failed, Constants.DRIVER_FAILED_MESSAGE);
state = new ApplicationState(Failed, DRIVER_FAILED_MESSAGE);
} else {
applicationState =
new ApplicationState(
ApplicationStateSummary.Succeeded, Constants.DRIVER_SUCCEEDED_MESSAGE);
state = new ApplicationState(Succeeded, DRIVER_SUCCEEDED_MESSAGE);
}
applicationState.setLastObservedDriverStatus(driverPod.getStatus());
return Optional.of(applicationState);
state.setLastObservedDriverStatus(status);
return Optional.of(state);
}
boolean driverRestarted =

boolean isDriverRestarted =
ModelUtils.findDriverMainContainerStatus(spec, containerStatusList).stream()
.anyMatch(PodUtils::isContainerRestarted);

if (driverRestarted) {
ApplicationState state =
new ApplicationState(ApplicationStateSummary.Failed, Constants.DRIVER_RESTARTED_MESSAGE);
state.setLastObservedDriverStatus(driverPod.getStatus());
if (isDriverRestarted) {
ApplicationState state = new ApplicationState(Failed, DRIVER_RESTARTED_MESSAGE);
state.setLastObservedDriverStatus(status);
return Optional.of(state);
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
import org.apache.spark.k8s.operator.status.BaseStatus;

/**
* Observe given secondary resource, return state to be updated if applicable. These observers do
* not act on secondary resource. They only observe secondary resource status and update owner
* SparkApplication status if needed
* Observe given secondary resource, return state to be updated if applicable. These observers only
* observe secondary resource status and update the status of owner, SparkApplication, if needed.
*/
public abstract class BaseSecondaryResourceObserver<
S,
Expand Down