2727import java .util .HashMap ;
2828import java .util .Map ;
2929import java .util .Set ;
30+ import java .util .concurrent .TimeUnit ;
3031
3132import com .google .common .annotations .VisibleForTesting ;
3233import org .apache .hadoop .security .UserGroupInformation ;
34+ import org .apache .tez .common .CachedEntity ;
3335import org .apache .tez .common .Preconditions ;
3436
3537import org .apache .hadoop .yarn .exceptions .ApplicationNotFoundException ;
@@ -58,13 +60,15 @@ public class DAGClientImpl extends DAGClient {
5860 private final String dagId ;
5961 private final TezConfiguration conf ;
6062 private final FrameworkClient frameworkClient ;
61-
63+ /**
64+ * Container to cache the last {@link DAGStatus}.
65+ */
66+ private final CachedEntity <DAGStatus > cachedDAGStatusRef ;
6267 @ VisibleForTesting
6368 protected DAGClientInternal realClient ;
64- private boolean dagCompleted = false ;
69+ private volatile boolean dagCompleted = false ;
6570 @ VisibleForTesting
6671 protected boolean isATSEnabled = false ;
67- private DAGStatus cachedDagStatus = null ;
6872 Map <String , VertexStatus > cachedVertexStatus = new HashMap <String , VertexStatus >();
6973
7074 private static final long SLEEP_FOR_COMPLETION = 500 ;
@@ -110,6 +114,28 @@ public DAGClientImpl(ApplicationId appId, String dagId, TezConfiguration conf,
110114 this .diagnoticsWaitTimeout = conf .getLong (
111115 TezConfiguration .TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS ,
112116 TezConfiguration .TEZ_CLIENT_DIAGNOSTICS_WAIT_TIMEOUT_MS_DEFAULT );
117+ cachedDAGStatusRef = initCacheDAGRefFromConf (conf );
118+ }
119+
120+ /**
121+ * Constructs a new {@link CachedEntity} for {@link DAGStatus}.
122+ * @param tezConf TEZ configuration parameters.
123+ * @return a caching entry to hold the {@link DAGStatus}.
124+ */
125+ protected CachedEntity <DAGStatus > initCacheDAGRefFromConf (TezConfiguration tezConf ) {
126+ long clientDAGStatusCacheTimeOut = tezConf .getLong (
127+ TezConfiguration .TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS ,
128+ TezConfiguration .TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT );
129+ if (clientDAGStatusCacheTimeOut <= 0 ) {
130+ LOG .error ("DAG Status cache timeout interval should be positive. Enforcing default value." );
131+ clientDAGStatusCacheTimeOut =
132+ TezConfiguration .TEZ_CLIENT_DAG_STATUS_CACHE_TIMEOUT_SECS_DEFAULT ;
133+ }
134+ return new CachedEntity <>(TimeUnit .SECONDS , clientDAGStatusCacheTimeOut );
135+ }
136+
137+ protected CachedEntity <DAGStatus > getCachedDAGStatusRef () {
138+ return cachedDAGStatusRef ;
113139 }
114140
115141 @ Override
@@ -133,13 +159,11 @@ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
133159 }
134160
135161 long startTime = System .currentTimeMillis ();
136- boolean refreshStatus ;
137- DAGStatus dagStatus ;
138- if (cachedDagStatus != null ) {
139- dagStatus = cachedDagStatus ;
140- refreshStatus = true ;
141- } else {
142- // For the first lookup only. After this cachedDagStatus should be populated.
162+
163+ DAGStatus dagStatus = cachedDAGStatusRef .getValue ();
164+ boolean refreshStatus = true ;
165+ if (dagStatus == null ) {
166+ // the first lookup only or when the cachedDAG has expired
143167 dagStatus = getDAGStatus (statusOptions );
144168 refreshStatus = false ;
145169 }
@@ -221,13 +245,14 @@ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOpti
221245 final DAGStatus dagStatus = getDAGStatusViaAM (statusOptions , timeout );
222246
223247 if (!dagCompleted ) {
224- if (dagStatus != null ) {
225- cachedDagStatus = dagStatus ;
248+ if (dagStatus != null ) { // update the cached DAGStatus
249+ cachedDAGStatusRef . setValue ( dagStatus ) ;
226250 return dagStatus ;
227251 }
228- if (cachedDagStatus != null ) {
252+ DAGStatus cachedDAG = cachedDAGStatusRef .getValue ();
253+ if (cachedDAG != null ) {
229254 // could not get from AM (not reachable/ was killed). return cached status.
230- return cachedDagStatus ;
255+ return cachedDAG ;
231256 }
232257 }
233258
@@ -253,8 +278,11 @@ protected DAGStatus getDAGStatusInternal(@Nullable Set<StatusGetOpts> statusOpti
253278
254279 // dag completed and Timeline service is either not enabled or does not have completion status
255280 // return cached status if completion info is present.
256- if (dagCompleted && cachedDagStatus != null && cachedDagStatus .isCompleted ()) {
257- return cachedDagStatus ;
281+ if (dagCompleted ) {
282+ DAGStatus cachedDag = cachedDAGStatusRef .getValue ();
283+ if (cachedDag != null && cachedDag .isCompleted ()) {
284+ return cachedDag ;
285+ }
258286 }
259287
260288 // everything else fails rely on RM.
@@ -377,9 +405,11 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
377405 LOG .info ("DAG is no longer running - application not found by YARN" , e );
378406 dagCompleted = true ;
379407 } catch (TezException e ) {
380- // can be either due to a n/w issue of due to AM completed.
408+ // can be either due to a n/w issue or due to AM completed.
409+ LOG .info ("Cannot retrieve DAG Status due to TezException: {}" , e .getMessage ());
381410 } catch (IOException e ) {
382- // can be either due to a n/w issue of due to AM completed.
411+ // can be either due to a n/w issue or due to AM completed.
412+ LOG .info ("Cannot retrieve DAG Status due to IOException: {}" , e .getMessage ());
383413 }
384414
385415 if (dagStatus == null && !dagCompleted ) {
0 commit comments