@@ -2817,6 +2817,18 @@ private VertexState setupVertex() {
28172817 return VertexState .INITED ;
28182818 }
28192819
2820+ private boolean canSkipInitializingParents () {
2821+ // Both cases use RootInputVertexManager. RootInputVertexManager can start tasks even though
2822+ // any parents are not fully initialized.
2823+ if (vertexPlan .hasVertexManagerPlugin ()) {
2824+ final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
2825+ .convertVertexManagerPluginDescriptorFromDAGPlan (vertexPlan .getVertexManagerPlugin ());
2826+ return pluginDesc .getClassName ().equals (RootInputVertexManager .class .getName ());
2827+ } else {
2828+ return inputsWithInitializers != null ;
2829+ }
2830+ }
2831+
28202832 private boolean isVertexInitSkippedInParentVertices () {
28212833 for (Map .Entry <Vertex , Edge > entry : sourceVertices .entrySet ()) {
28222834 if (!(((VertexImpl ) entry .getKey ()).isVertexInitSkipped ())) {
@@ -2843,7 +2855,7 @@ private void assignVertexManager() throws TezException {
28432855 if (recoveryData != null && recoveryData .shouldSkipInit ()
28442856 && (recoveryData .isVertexTasksStarted () ||
28452857 recoveryData .getVertexConfigurationDoneEvent ().isSetParallelismCalled ())
2846- && isVertexInitSkippedInParentVertices ()) {
2858+ && ( canSkipInitializingParents () || isVertexInitSkippedInParentVertices () )) {
28472859 // Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
28482860 VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData .getVertexConfigurationDoneEvent ();
28492861 if (LOG .isInfoEnabled ()) {
0 commit comments