Skip to content

Commit 18ad3bf

Browse files
committed
TEZ-4474: Added config to fail AM in case of missing recovery files
1 parent 39e5a8e commit 18ad3bf

3 files changed

Lines changed: 219 additions & 32 deletions

File tree

tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,6 +1828,18 @@ public TezConfiguration(boolean loadDefaults) {
18281828
TEZ_PREFIX + "dag.recovery.enabled";
18291829
public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
18301830

1831+
1832+
/**
1833+
* Boolean value. When set, this enables AM to fail when DAG recovery is enabled and
1834+
* restarted app master did not find anything to recover
1835+
* Expert level setting.
1836+
*/
1837+
@ConfigurationScope(Scope.AM)
1838+
@ConfigurationProperty(type="boolean")
1839+
public static final String TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA =
1840+
TEZ_AM_PREFIX + "failure.on.missing.recovery.data";
1841+
public static final boolean TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT = false;
1842+
18311843
/**
18321844
* Int value. Size in bytes for the IO buffer size while processing the recovery file.
18331845
* Expert level setting.

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,10 +1916,7 @@ private DAGRecoveryData recoverDAG() throws IOException, TezException {
19161916
LOG.info("Recovering data from previous attempts"
19171917
+ ", currentAttemptId=" + this.appAttemptID.getAttemptId());
19181918
this.state = DAGAppMasterState.RECOVERING;
1919-
RecoveryParser recoveryParser = new RecoveryParser(
1920-
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
1921-
DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
1922-
return recoveredDAGData;
1919+
return parseDAGFromRecoveryData();
19231920
}
19241921
} finally {
19251922
hadoopShim.clearHadoopCallerContext();
@@ -1928,6 +1925,27 @@ private DAGRecoveryData recoverDAG() throws IOException, TezException {
19281925
return null;
19291926
}
19301927

1928+
private DAGRecoveryData parseDAGFromRecoveryData() throws IOException {
1929+
RecoveryParser recoveryParser = new RecoveryParser(
1930+
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
1931+
DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
1932+
1933+
/**
1934+
* Parsed recovery data can be NULL in scenarios where AM shutdown prematurely during the first attempt
1935+
* due to some FATAL error, if that happens recovery stream is not closed and no data is flushed on File System
1936+
* In cases like above, in next future attempts of application, recovery returns NULL instead of failing the DAG
1937+
* This config when enabled, throws an IOException for such cases, and it assumes that caller will catch these
1938+
* IOExceptions and will fail the DAG, which happens currently, JIRA: https://issues.apache.org/jira/browse/TEZ-4474
1939+
*/
1940+
if(Objects.isNull(recoveredDAGData) && amConf.getBoolean(
1941+
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
1942+
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT)) {
1943+
throw new IOException(String.format("Found nothing to recover in currentAttemptId=%s from recovery data dir=%s",
1944+
this.appAttemptID.getAttemptId(), this.recoveryDataDir));
1945+
}
1946+
return recoveredDAGData;
1947+
}
1948+
19311949
@Override
19321950
public void serviceStart() throws Exception {
19331951
//start all the components

tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java

Lines changed: 185 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,54 +14,32 @@
1414

1515
package org.apache.tez.dag.app;
1616

17-
import org.apache.hadoop.yarn.util.MonotonicClock;
18-
import org.apache.tez.dag.app.dag.DAGState;
19-
import org.apache.tez.dag.app.dag.Vertex;
20-
import org.apache.tez.dag.records.TezVertexID;
21-
import static org.junit.Assert.assertEquals;
22-
import static org.junit.Assert.assertNotNull;
23-
import static org.junit.Assert.assertNull;
24-
import static org.junit.Assert.assertTrue;
25-
import static org.mockito.Mockito.mock;
26-
import static org.mockito.Mockito.spy;
27-
import static org.mockito.Mockito.verify;
28-
import static org.mockito.Mockito.when;
29-
30-
import java.io.ByteArrayInputStream;
31-
import java.io.DataInput;
32-
import java.io.DataInputStream;
33-
import java.io.DataOutput;
34-
import java.io.File;
35-
import java.io.FileOutputStream;
36-
import java.io.IOException;
37-
import java.nio.ByteBuffer;
38-
import java.util.HashMap;
39-
import java.util.LinkedList;
40-
import java.util.List;
41-
import java.util.Map;
42-
43-
import org.apache.tez.common.Preconditions;
4417
import com.google.common.collect.BiMap;
4518
import com.google.common.collect.HashBiMap;
4619
import com.google.common.collect.Lists;
4720
import com.google.protobuf.ByteString;
48-
4921
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FSDataInputStream;
5023
import org.apache.hadoop.fs.FSDataOutputStream;
24+
import org.apache.hadoop.fs.FileStatus;
5125
import org.apache.hadoop.fs.FileSystem;
5226
import org.apache.hadoop.fs.FileUtil;
5327
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.fs.permission.FsPermission;
5429
import org.apache.hadoop.io.Text;
5530
import org.apache.hadoop.security.Credentials;
5631
import org.apache.hadoop.security.UserGroupInformation;
5732
import org.apache.hadoop.security.token.SecretManager;
5833
import org.apache.hadoop.security.token.Token;
5934
import org.apache.hadoop.security.token.TokenIdentifier;
35+
import org.apache.hadoop.util.Progressable;
6036
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
6137
import org.apache.hadoop.yarn.api.records.ApplicationId;
6238
import org.apache.hadoop.yarn.api.records.ContainerId;
39+
import org.apache.hadoop.yarn.util.MonotonicClock;
6340
import org.apache.hadoop.yarn.util.SystemClock;
6441
import org.apache.tez.client.TezApiVersionInfo;
42+
import org.apache.tez.common.Preconditions;
6543
import org.apache.tez.common.TezCommonUtils;
6644
import org.apache.tez.common.TezUtils;
6745
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -78,13 +56,44 @@
7856
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
7957
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
8058
import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
59+
import org.apache.tez.dag.app.dag.DAGState;
60+
import org.apache.tez.dag.app.dag.Vertex;
8161
import org.apache.tez.dag.app.dag.impl.DAGImpl;
8262
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
8363
import org.apache.tez.dag.records.TezDAGID;
64+
import org.apache.tez.dag.records.TezVertexID;
8465
import org.junit.After;
8566
import org.junit.Assert;
8667
import org.junit.Before;
8768
import org.junit.Test;
69+
import org.mockito.ArgumentCaptor;
70+
71+
import java.io.ByteArrayInputStream;
72+
import java.io.DataInput;
73+
import java.io.DataInputStream;
74+
import java.io.DataOutput;
75+
import java.io.File;
76+
import java.io.FileNotFoundException;
77+
import java.io.FileOutputStream;
78+
import java.io.IOException;
79+
import java.lang.reflect.Field;
80+
import java.net.URI;
81+
import java.nio.ByteBuffer;
82+
import java.util.HashMap;
83+
import java.util.LinkedList;
84+
import java.util.List;
85+
import java.util.Map;
86+
87+
import static org.junit.Assert.assertEquals;
88+
import static org.junit.Assert.assertNotNull;
89+
import static org.junit.Assert.assertNull;
90+
import static org.junit.Assert.assertTrue;
91+
import static org.mockito.ArgumentMatchers.any;
92+
import static org.mockito.Mockito.mock;
93+
import static org.mockito.Mockito.spy;
94+
import static org.mockito.Mockito.times;
95+
import static org.mockito.Mockito.verify;
96+
import static org.mockito.Mockito.when;
8897

8998
public class TestDAGAppMaster {
9099

@@ -97,6 +106,65 @@ public class TestDAGAppMaster {
97106
private static final File TEST_DIR = new File(System.getProperty("test.build.data"),
98107
TestDAGAppMaster.class.getName()).getAbsoluteFile();
99108

109+
private final FileSystem spyFs = spy(new FileSystem() {
110+
@Override
111+
public URI getUri() {
112+
return null;
113+
}
114+
115+
@Override
116+
public FSDataInputStream open(Path path, int i) throws IOException {
117+
return null;
118+
}
119+
120+
@Override
121+
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b,
122+
int i, short i1, long l, Progressable progressable)
123+
throws IOException {
124+
return null;
125+
}
126+
127+
@Override
128+
public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
129+
return null;
130+
}
131+
132+
@Override
133+
public boolean rename(Path path, Path path1) throws IOException {
134+
return false;
135+
}
136+
137+
@Override
138+
public boolean delete(Path path, boolean b) throws IOException {
139+
return false;
140+
}
141+
142+
@Override
143+
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
144+
return new FileStatus[0];
145+
}
146+
147+
@Override
148+
public void setWorkingDirectory(Path path) {
149+
150+
}
151+
152+
@Override
153+
public Path getWorkingDirectory() {
154+
return null;
155+
}
156+
157+
@Override
158+
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
159+
return false;
160+
}
161+
162+
@Override
163+
public FileStatus getFileStatus(Path path) throws IOException {
164+
return null;
165+
}
166+
});
167+
100168
@Before
101169
public void setup() {
102170
FileUtil.fullyDelete(TEST_DIR);
@@ -332,6 +400,95 @@ public void testParseAllPluginsCustomAndYarnSpecified() throws IOException {
332400
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
333401
}
334402

403+
@Test(timeout = 60000)
404+
public void testShutdownTezAMWithMissingRecoveryAndFailureOnMissingData() throws Exception {
405+
406+
TezConfiguration conf = new TezConfiguration();
407+
conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
408+
conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
409+
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
410+
conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, true);
411+
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
412+
413+
/*
414+
Setting very high timeout because in case when TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA is set, it should
415+
not time out, it should get shutdown earlier only without the timeout flow kicking in
416+
*/
417+
conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1000000000);
418+
ApplicationId appId = ApplicationId.newInstance(1, 1);
419+
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
420+
421+
when(spyFs.exists(any())).thenReturn(false);
422+
423+
DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
424+
425+
dam.init(conf);
426+
Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
427+
field.setAccessible(true);
428+
field.set(dam, spyFs);
429+
430+
dam.start();
431+
432+
ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
433+
// This ensures that recovery data file system was called for getting summary files, and it will return false
434+
verify(spyFs, times(3)).exists(captor.capture());
435+
436+
Assert.assertTrue(captor.getAllValues().get(2).toString().contains("/recovery/1/summary"));
437+
Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));
438+
439+
verify(dam.mockScheduler).setShouldUnregisterFlag();
440+
verify(dam.mockShutdown).shutdown();
441+
442+
/*
443+
* Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is set,
444+
* DAG will be in ERRORed state if recovery was missing for attempts > 1
445+
*/
446+
assertEquals(DAGAppMasterState.ERROR, dam.getState());
447+
}
448+
449+
@Test
450+
public void testShutdownTezAMWithMissingRecoveryAndNoFailureOnMissingData() throws Exception {
451+
452+
TezConfiguration conf = new TezConfiguration();
453+
conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
454+
conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
455+
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
456+
conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA, false);
457+
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
458+
conf.setInt(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, 1);
459+
ApplicationId appId = ApplicationId.newInstance(1, 1);
460+
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
461+
462+
when(spyFs.exists(any())).thenReturn(false);
463+
464+
DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
465+
466+
dam.init(conf);
467+
Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
468+
field.setAccessible(true);
469+
field.set(dam, spyFs);
470+
471+
dam.start();
472+
// Waiting for session timeout interval to kick in, which is set to 1 s
473+
Thread.sleep(2000);
474+
475+
ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
476+
// This ensures that recovery data file system was called for getting summary files, and it will return false
477+
verify(spyFs, times(3)).exists(captor.capture());
478+
479+
Assert.assertTrue(captor.getAllValues().get(2).toString().contains("/recovery/1/summary"));
480+
Assert.assertTrue(captor.getAllValues().get(1).toString().contains("/recovery/1/RecoveryFatalErrorOccurred"));
481+
482+
verify(dam.mockScheduler).setShouldUnregisterFlag();
483+
verify(dam.mockShutdown).shutdown();
484+
485+
/*
486+
* Since the TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA config is unset,
487+
* DAG will be in SUCCEEDED state if recovery was missing and timeout got triggered for attempts > 1
488+
*/
489+
assertEquals(DAGAppMasterState.SUCCEEDED, dam.getState());
490+
}
491+
335492
private void verifyDescAndMap(List<NamedEntityDescriptor> descriptors, BiMap<String, Integer> map,
336493
int numExpected, boolean verifyPayload,
337494
String... expectedNames) throws

0 commit comments

Comments
 (0)