Skip to content

Commit 243a73d

Browse files
committed
TEZ-4474: Added config to fail AM in case of missing recovery files
1 parent 39e5a8e commit 243a73d

File tree

3 files changed

+138
-28
lines changed

3 files changed

+138
-28
lines changed

tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,6 +1828,18 @@ public TezConfiguration(boolean loadDefaults) {
18281828
TEZ_PREFIX + "dag.recovery.enabled";
18291829
public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
18301830

1831+
1832+
/**
1833+
* Boolean value. When set, this enables AM to fail when DAG recovery is enabled and
1834+
* restarted app master did not find anything to recover
1835+
* Expert level setting.
1836+
*/
1837+
@ConfigurationScope(Scope.AM)
1838+
@ConfigurationProperty(type="boolean")
1839+
public static final String TEZ_AM_FAILURE_ON_MISSING_RECOVERY =
1840+
TEZ_AM_PREFIX + "failure.on.missing.recovery";
1841+
public static final boolean TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DEFAULT = false;
1842+
18311843
/**
18321844
* Int value. Size in bytes for the IO buffer size while processing the recovery file.
18331845
* Expert level setting.

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1919,6 +1919,13 @@ private DAGRecoveryData recoverDAG() throws IOException, TezException {
19191919
RecoveryParser recoveryParser = new RecoveryParser(
19201920
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
19211921
DAGRecoveryData recoveredDAGData = recoveryParser.parseRecoveryData();
1922+
1923+
if(Objects.isNull(recoveredDAGData) && amConf.getBoolean(
1924+
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY,
1925+
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DEFAULT)) {
1926+
throw new IOException("Found nothing to recover in currentAttemptId= "
1927+
+ this.appAttemptID.getAttemptId());
1928+
}
19221929
return recoveredDAGData;
19231930
}
19241931
} finally {

tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java

Lines changed: 119 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,54 +14,32 @@
1414

1515
package org.apache.tez.dag.app;
1616

17-
import org.apache.hadoop.yarn.util.MonotonicClock;
18-
import org.apache.tez.dag.app.dag.DAGState;
19-
import org.apache.tez.dag.app.dag.Vertex;
20-
import org.apache.tez.dag.records.TezVertexID;
21-
import static org.junit.Assert.assertEquals;
22-
import static org.junit.Assert.assertNotNull;
23-
import static org.junit.Assert.assertNull;
24-
import static org.junit.Assert.assertTrue;
25-
import static org.mockito.Mockito.mock;
26-
import static org.mockito.Mockito.spy;
27-
import static org.mockito.Mockito.verify;
28-
import static org.mockito.Mockito.when;
29-
30-
import java.io.ByteArrayInputStream;
31-
import java.io.DataInput;
32-
import java.io.DataInputStream;
33-
import java.io.DataOutput;
34-
import java.io.File;
35-
import java.io.FileOutputStream;
36-
import java.io.IOException;
37-
import java.nio.ByteBuffer;
38-
import java.util.HashMap;
39-
import java.util.LinkedList;
40-
import java.util.List;
41-
import java.util.Map;
42-
43-
import org.apache.tez.common.Preconditions;
4417
import com.google.common.collect.BiMap;
4518
import com.google.common.collect.HashBiMap;
4619
import com.google.common.collect.Lists;
4720
import com.google.protobuf.ByteString;
48-
4921
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.FSDataInputStream;
5023
import org.apache.hadoop.fs.FSDataOutputStream;
24+
import org.apache.hadoop.fs.FileStatus;
5125
import org.apache.hadoop.fs.FileSystem;
5226
import org.apache.hadoop.fs.FileUtil;
5327
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.fs.permission.FsPermission;
5429
import org.apache.hadoop.io.Text;
5530
import org.apache.hadoop.security.Credentials;
5631
import org.apache.hadoop.security.UserGroupInformation;
5732
import org.apache.hadoop.security.token.SecretManager;
5833
import org.apache.hadoop.security.token.Token;
5934
import org.apache.hadoop.security.token.TokenIdentifier;
35+
import org.apache.hadoop.util.Progressable;
6036
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
6137
import org.apache.hadoop.yarn.api.records.ApplicationId;
6238
import org.apache.hadoop.yarn.api.records.ContainerId;
39+
import org.apache.hadoop.yarn.util.MonotonicClock;
6340
import org.apache.hadoop.yarn.util.SystemClock;
6441
import org.apache.tez.client.TezApiVersionInfo;
42+
import org.apache.tez.common.Preconditions;
6543
import org.apache.tez.common.TezCommonUtils;
6644
import org.apache.tez.common.TezUtils;
6745
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -78,14 +56,43 @@
7856
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
7957
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
8058
import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
59+
import org.apache.tez.dag.app.dag.DAGState;
60+
import org.apache.tez.dag.app.dag.Vertex;
8161
import org.apache.tez.dag.app.dag.impl.DAGImpl;
8262
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
8363
import org.apache.tez.dag.records.TezDAGID;
64+
import org.apache.tez.dag.records.TezVertexID;
8465
import org.junit.After;
8566
import org.junit.Assert;
8667
import org.junit.Before;
8768
import org.junit.Test;
8869

70+
import java.io.ByteArrayInputStream;
71+
import java.io.DataInput;
72+
import java.io.DataInputStream;
73+
import java.io.DataOutput;
74+
import java.io.File;
75+
import java.io.FileNotFoundException;
76+
import java.io.FileOutputStream;
77+
import java.io.IOException;
78+
import java.lang.reflect.Field;
79+
import java.net.URI;
80+
import java.nio.ByteBuffer;
81+
import java.util.HashMap;
82+
import java.util.LinkedList;
83+
import java.util.List;
84+
import java.util.Map;
85+
86+
import static org.junit.Assert.assertEquals;
87+
import static org.junit.Assert.assertNotNull;
88+
import static org.junit.Assert.assertNull;
89+
import static org.junit.Assert.assertTrue;
90+
import static org.mockito.ArgumentMatchers.any;
91+
import static org.mockito.Mockito.mock;
92+
import static org.mockito.Mockito.spy;
93+
import static org.mockito.Mockito.verify;
94+
import static org.mockito.Mockito.when;
95+
8996
public class TestDAGAppMaster {
9097

9198
private static final String TEST_KEY = "TEST_KEY";
@@ -332,6 +339,90 @@ public void testParseAllPluginsCustomAndYarnSpecified() throws IOException {
332339
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
333340
}
334341

342+
@Test
343+
public void testShutdownTezAMWithMissingRecovery() throws Exception {
344+
345+
TezConfiguration conf = new TezConfiguration();
346+
conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
347+
conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
348+
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
349+
conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY, true);
350+
conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
351+
ApplicationId appId = ApplicationId.newInstance(1, 1);
352+
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
353+
354+
FileSystem spyRecoveryFs = spy(new FileSystem() {
355+
@Override
356+
public URI getUri() {
357+
return null;
358+
}
359+
360+
@Override
361+
public FSDataInputStream open(Path path, int i) throws IOException {
362+
return null;
363+
}
364+
365+
@Override
366+
public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException {
367+
return null;
368+
}
369+
370+
@Override
371+
public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
372+
return null;
373+
}
374+
375+
@Override
376+
public boolean rename(Path path, Path path1) throws IOException {
377+
return false;
378+
}
379+
380+
@Override
381+
public boolean delete(Path path, boolean b) throws IOException {
382+
return false;
383+
}
384+
385+
@Override
386+
public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
387+
return new FileStatus[0];
388+
}
389+
390+
@Override
391+
public void setWorkingDirectory(Path path) {
392+
393+
}
394+
395+
@Override
396+
public Path getWorkingDirectory() {
397+
return null;
398+
}
399+
400+
@Override
401+
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
402+
return false;
403+
}
404+
405+
@Override
406+
public FileStatus getFileStatus(Path path) throws IOException {
407+
return null;
408+
}
409+
});
410+
when(spyRecoveryFs.exists(any())).thenReturn(false);
411+
412+
DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
413+
dam.init(conf);
414+
dam.start();
415+
416+
Field field = DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
417+
field.setAccessible(true);
418+
field.set(dam, spyRecoveryFs);
419+
420+
verify(dam.mockScheduler).setShouldUnregisterFlag();
421+
verify(dam.mockShutdown).shutdown();
422+
423+
assertEquals(DAGAppMasterState.ERROR, dam.getState());
424+
}
425+
335426
private void verifyDescAndMap(List<NamedEntityDescriptor> descriptors, BiMap<String, Integer> map,
336427
int numExpected, boolean verifyPayload,
337428
String... expectedNames) throws

0 commit comments

Comments
 (0)