132132import org .apache .tez .dag .api .records .DAGProtos .ConfigurationProto ;
133133import org .apache .tez .dag .api .records .DAGProtos .DAGPlan ;
134134import org .apache .tez .dag .api .records .DAGProtos .PlanLocalResourcesProto ;
135- import org .apache .tez .dag .api .records .DAGProtos .TezNamedEntityDescriptorProto ;
136135import org .apache .tez .dag .api .records .DAGProtos .VertexPlan ;
137136import org .apache .tez .dag .app .RecoveryParser .DAGRecoveryData ;
138137import org .apache .tez .dag .app .dag .DAG ;
195194
196195import com .google .common .annotations .VisibleForTesting ;
197196import com .google .common .base .Function ;
198- import com .google .common .collect .BiMap ;
199- import com .google .common .collect .HashBiMap ;
200- import com .google .common .collect .Lists ;
201197import com .google .common .collect .Maps ;
202198import com .google .common .util .concurrent .ListeningExecutorService ;
203199import com .google .common .util .concurrent .MoreExecutors ;
@@ -254,7 +250,6 @@ public class DAGAppMaster extends AbstractService {
254250 private final String workingDirectory ;
255251 private final String [] localDirs ;
256252 private final String [] logDirs ;
257- private final AMPluginDescriptorProto amPluginDescriptorProto ;
258253 private HadoopShim hadoopShim ;
259254 private ContainerSignatureMatcher containerSignatureMatcher ;
260255 private AMContainerMap containers ;
@@ -312,11 +307,8 @@ public class DAGAppMaster extends AbstractService {
312307 private FileSystem recoveryFS ;
313308
314309 private ListeningExecutorService execService ;
310+ private final PluginManager pluginManager ;
315311
316- // TODO May not need to be a bidi map
317- private final BiMap <String , Integer > taskSchedulers = HashBiMap .create ();
318- private final BiMap <String , Integer > containerLaunchers = HashBiMap .create ();
319- private final BiMap <String , Integer > taskCommunicators = HashBiMap .create ();
320312
321313 /**
322314 * set of already executed dag names.
@@ -376,7 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
376368 this .dagVersionInfo = new TezDagVersionInfo ();
377369 this .clientVersion = clientVersion ;
378370 this .amCredentials = credentials ;
379- this .amPluginDescriptorProto = pluginDescriptorProto ;
380371 this .appMasterUgi = UserGroupInformation
381372 .createRemoteUser (jobUserName );
382373 this .appMasterUgi .addCredentials (amCredentials );
@@ -387,6 +378,8 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
387378 LOG .info ("Created DAGAppMaster for application " + applicationAttemptId
388379 + ", versionInfo=" + dagVersionInfo );
389380 TezCommonUtils .logCredentials (LOG , this .appMasterUgi .getCredentials (), "am" );
381+
382+ this .pluginManager = new PluginManager (pluginDescriptorProto );
390383 }
391384
392385 // Pull this WebAppUtils function into Tez until YARN-4186
@@ -451,18 +444,10 @@ protected void serviceInit(final Configuration conf) throws Exception {
451444
452445 UserPayload defaultPayload = TezUtils .createUserPayloadFromConf (amConf );
453446
454- List <NamedEntityDescriptor > taskSchedulerDescriptors = Lists .newLinkedList ();
455- List <NamedEntityDescriptor > containerLauncherDescriptors = Lists .newLinkedList ();
456- List <NamedEntityDescriptor > taskCommunicatorDescriptors = Lists .newLinkedList ();
457-
458- parseAllPlugins (taskSchedulerDescriptors , taskSchedulers , containerLauncherDescriptors ,
459- containerLaunchers , taskCommunicatorDescriptors , taskCommunicators , amPluginDescriptorProto ,
460- isLocal , defaultPayload );
461-
462-
463- LOG .info (buildPluginComponentLog (taskSchedulerDescriptors , taskSchedulers , "TaskSchedulers" ));
464- LOG .info (buildPluginComponentLog (containerLauncherDescriptors , containerLaunchers , "ContainerLaunchers" ));
465- LOG .info (buildPluginComponentLog (taskCommunicatorDescriptors , taskCommunicators , "TaskCommunicators" ));
447+ PluginManager .PluginDescriptors pluginDescriptors = pluginManager .parseAllPlugins (isLocal , defaultPayload );
448+ List <NamedEntityDescriptor > taskSchedulerDescriptors = pluginDescriptors .getTaskSchedulerDescriptors ();
449+ List <NamedEntityDescriptor > containerLauncherDescriptors = pluginDescriptors .getContainerLauncherDescriptors ();
450+ List <NamedEntityDescriptor > taskCommunicatorDescriptors = pluginDescriptors .getTaskCommunicatorDescriptors ();
466451
467452 boolean disableVersionCheck = conf .getBoolean (
468453 TezConfiguration .TEZ_AM_DISABLE_CLIENT_VERSION_CHECK ,
@@ -1672,32 +1657,32 @@ public Credentials getAppCredentials() {
16721657
16731658 @ Override
16741659 public Integer getTaskCommunicatorIdentifier (String name ) {
1675- return taskCommunicators .get (name );
1660+ return pluginManager . getTaskCommunicators () .get (name );
16761661 }
16771662
16781663 @ Override
16791664 public Integer getTaskScheduerIdentifier (String name ) {
1680- return taskSchedulers .get (name );
1665+ return pluginManager . getTaskSchedulers () .get (name );
16811666 }
16821667
16831668 @ Override
16841669 public Integer getContainerLauncherIdentifier (String name ) {
1685- return containerLaunchers .get (name );
1670+ return pluginManager . getContainerLaunchers () .get (name );
16861671 }
16871672
16881673 @ Override
16891674 public String getTaskCommunicatorName (int taskCommId ) {
1690- return taskCommunicators .inverse ().get (taskCommId );
1675+ return pluginManager . getTaskCommunicators () .inverse ().get (taskCommId );
16911676 }
16921677
16931678 @ Override
16941679 public String getTaskSchedulerName (int schedulerId ) {
1695- return taskSchedulers .inverse ().get (schedulerId );
1680+ return pluginManager . getTaskSchedulers () .inverse ().get (schedulerId );
16961681 }
16971682
16981683 @ Override
16991684 public String getContainerLauncherName (int launcherId ) {
1700- return containerLaunchers .inverse ().get (launcherId );
1685+ return pluginManager . getContainerLaunchers () .inverse ().get (launcherId );
17011686 }
17021687
17031688 @ Override
@@ -2732,128 +2717,6 @@ public String getWebUIAddress() {
27322717 return webUIService == null ? null : webUIService .getBaseUrl ();
27332718 }
27342719
2735- @ VisibleForTesting
2736- public static void parseAllPlugins (
2737- List <NamedEntityDescriptor > taskSchedulerDescriptors , BiMap <String , Integer > taskSchedulerPluginMap ,
2738- List <NamedEntityDescriptor > containerLauncherDescriptors , BiMap <String , Integer > containerLauncherPluginMap ,
2739- List <NamedEntityDescriptor > taskCommDescriptors , BiMap <String , Integer > taskCommPluginMap ,
2740- AMPluginDescriptorProto amPluginDescriptorProto , boolean isLocal , UserPayload defaultPayload ) {
2741-
2742- boolean tezYarnEnabled ;
2743- boolean uberEnabled ;
2744- if (!isLocal ) {
2745- if (amPluginDescriptorProto == null ) {
2746- tezYarnEnabled = true ;
2747- uberEnabled = false ;
2748- } else {
2749- tezYarnEnabled = amPluginDescriptorProto .getContainersEnabled ();
2750- uberEnabled = amPluginDescriptorProto .getUberEnabled ();
2751- }
2752- } else {
2753- tezYarnEnabled = false ;
2754- uberEnabled = true ;
2755- }
2756-
2757- parsePlugin (taskSchedulerDescriptors , taskSchedulerPluginMap ,
2758- (amPluginDescriptorProto == null || amPluginDescriptorProto .getTaskSchedulersCount () == 0 ?
2759- null :
2760- amPluginDescriptorProto .getTaskSchedulersList ()),
2761- tezYarnEnabled , uberEnabled , defaultPayload );
2762- processSchedulerDescriptors (taskSchedulerDescriptors , isLocal , defaultPayload , taskSchedulerPluginMap );
2763-
2764- parsePlugin (containerLauncherDescriptors , containerLauncherPluginMap ,
2765- (amPluginDescriptorProto == null ||
2766- amPluginDescriptorProto .getContainerLaunchersCount () == 0 ? null :
2767- amPluginDescriptorProto .getContainerLaunchersList ()),
2768- tezYarnEnabled , uberEnabled , defaultPayload );
2769-
2770- parsePlugin (taskCommDescriptors , taskCommPluginMap ,
2771- (amPluginDescriptorProto == null ||
2772- amPluginDescriptorProto .getTaskCommunicatorsCount () == 0 ? null :
2773- amPluginDescriptorProto .getTaskCommunicatorsList ()),
2774- tezYarnEnabled , uberEnabled , defaultPayload );
2775- }
2776-
2777-
2778- @ VisibleForTesting
2779- public static void parsePlugin (List <NamedEntityDescriptor > resultList ,
2780- BiMap <String , Integer > pluginMap , List <TezNamedEntityDescriptorProto > namedEntityDescriptorProtos ,
2781- boolean tezYarnEnabled , boolean uberEnabled , UserPayload defaultPayload ) {
2782-
2783- if (tezYarnEnabled ) {
2784- // Default classnames will be populated by individual components
2785- NamedEntityDescriptor r = new NamedEntityDescriptor (
2786- TezConstants .getTezYarnServicePluginName (), null ).setUserPayload (defaultPayload );
2787- addDescriptor (resultList , pluginMap , r );
2788- }
2789-
2790- if (uberEnabled ) {
2791- // Default classnames will be populated by individual components
2792- NamedEntityDescriptor r = new NamedEntityDescriptor (
2793- TezConstants .getTezUberServicePluginName (), null ).setUserPayload (defaultPayload );
2794- addDescriptor (resultList , pluginMap , r );
2795- }
2796-
2797- if (namedEntityDescriptorProtos != null ) {
2798- for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos ) {
2799- NamedEntityDescriptor namedEntityDescriptor = DagTypeConverters
2800- .convertNamedDescriptorFromProto (namedEntityDescriptorProto );
2801- addDescriptor (resultList , pluginMap , namedEntityDescriptor );
2802- }
2803- }
2804- }
2805-
2806- @ VisibleForTesting
2807- static void addDescriptor (List <NamedEntityDescriptor > list , BiMap <String , Integer > pluginMap ,
2808- NamedEntityDescriptor namedEntityDescriptor ) {
2809- list .add (namedEntityDescriptor );
2810- pluginMap .put (list .get (list .size () - 1 ).getEntityName (), list .size () - 1 );
2811- }
2812-
2813- @ VisibleForTesting
2814- static void processSchedulerDescriptors (List <NamedEntityDescriptor > descriptors , boolean isLocal ,
2815- UserPayload defaultPayload ,
2816- BiMap <String , Integer > schedulerPluginMap ) {
2817- if (isLocal ) {
2818- boolean foundUberServiceName = false ;
2819- for (NamedEntityDescriptor descriptor : descriptors ) {
2820- if (descriptor .getEntityName ().equals (TezConstants .getTezUberServicePluginName ())) {
2821- foundUberServiceName = true ;
2822- break ;
2823- }
2824- }
2825- Preconditions .checkState (foundUberServiceName );
2826- } else {
2827- boolean foundYarn = false ;
2828- for (int i = 0 ; i < descriptors .size (); i ++) {
2829- if (descriptors .get (i ).getEntityName ().equals (TezConstants .getTezYarnServicePluginName ())) {
2830- foundYarn = true ;
2831- break ;
2832- }
2833- }
2834- if (!foundYarn ) {
2835- NamedEntityDescriptor yarnDescriptor =
2836- new NamedEntityDescriptor (TezConstants .getTezYarnServicePluginName (), null )
2837- .setUserPayload (defaultPayload );
2838- addDescriptor (descriptors , schedulerPluginMap , yarnDescriptor );
2839- }
2840- }
2841- }
2842-
2843- String buildPluginComponentLog (List <NamedEntityDescriptor > namedEntityDescriptors , BiMap <String , Integer > map ,
2844- String component ) {
2845- StringBuilder sb = new StringBuilder ();
2846- sb .append ("AM Level configured " ).append (component ).append (": " );
2847- for (int i = 0 ; i < namedEntityDescriptors .size (); i ++) {
2848- sb .append ("[" ).append (i ).append (":" ).append (map .inverse ().get (i ))
2849- .append (":" ).append (namedEntityDescriptors .get (i ).getClassName ()).append ("]" );
2850- if (i != namedEntityDescriptors .size () - 1 ) {
2851- sb .append ("," );
2852- }
2853- }
2854- return sb .toString ();
2855- }
2856-
28572720 public void vertexComplete (TezVertexID completedVertexID , Set <NodeId > nodesList ) {
28582721 getContainerLauncherManager ().vertexComplete (completedVertexID , jobTokenSecretManager , nodesList );
28592722 }
0 commit comments