diff --git a/tez-api/findbugs-exclude.xml b/tez-api/findbugs-exclude.xml index 10d27f7119..de8f3824cd 100644 --- a/tez-api/findbugs-exclude.xml +++ b/tez-api/findbugs-exclude.xml @@ -131,4 +131,24 @@ + + + + + + + + + + + + + + + + + + diff --git a/tez-api/pom.xml b/tez-api/pom.xml index 17836a49a8..ff1f9d2e6a 100644 --- a/tez-api/pom.xml +++ b/tez-api/pom.xml @@ -73,6 +73,11 @@ org.apache.hadoop hadoop-yarn-client + + org.apache.hadoop + hadoop-registry + ${hadoop.version} + org.apache.commons commons-collections4 diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java new file mode 100644 index 0000000000..8453f9836c --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + +import java.util.Objects; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; + + +/** + * Record representing an Application Master (AM) instance within Tez. + *

+ * This class can be serialized to and from a {@link ServiceRecord}, enabling + * storage and retrieval of AM metadata in external systems. Some constructors + * and methods are not necessarily used within the Tez codebase itself, but + * are part of the Tez API and intended for Tez clients that manage or interact + * with Tez unmanaged sessions. + */ +@InterfaceAudience.Public +public class AMRecord { + private static final String APP_ID_RECORD_KEY = "appId"; + private static final String HOST_RECORD_KEY = "host"; + private static final String PORT_RECORD_KEY = "port"; + private static final String OPAQUE_ID_KEY = "id"; + + private final ApplicationId appId; + private final String host; + private final int port; + private final String id; + + /** + * Creates a new {@code AMRecord} with the given application ID, host, port, and identifier. + *

+ * If the provided identifier is {@code null}, it will be converted to an empty string. + *

+ * Although this constructor may not be used directly within Tez internals, + * it is part of the public API for Tez clients that handle unmanaged sessions. + * + * @param appId the {@link ApplicationId} of the Tez application + * @param host the hostname where the Application Master is running + * @param port the port number on which the Application Master is listening + * @param id an opaque identifier for the record; if {@code null}, defaults to an empty string + */ + public AMRecord(ApplicationId appId, String host, int port, String id) { + this.appId = appId; + this.host = host; + this.port = port; + //If id is not provided, convert to empty string + this.id = (id == null) ? "" : id; + } + + /** + * Copy constructor. + *

+ * Creates a new {@code AMRecord} by copying the fields of another instance. + *

+ * This constructor is mainly useful for client-side logic and session handling, + * and may not be invoked directly within the Tez codebase. + * + * @param other the {@code AMRecord} instance to copy + */ + public AMRecord(AMRecord other) { + this.appId = other.getApplicationId(); + this.host = other.getHost(); + this.port = other.getPort(); + this.id = other.getId(); + } + + /** + * Constructs a new {@code AMRecord} from a {@link ServiceRecord}. + *

+ * This allows conversion from serialized metadata back into an in-memory {@code AMRecord}. + *

+ * While not always used in Tez internals, it exists in the Tez API so + * clients can reconstruct AM information when working with unmanaged sessions. + * + * @param serviceRecord the {@link ServiceRecord} containing AM metadata + * @throws IllegalArgumentException if required keys are missing or invalid + */ + public AMRecord(ServiceRecord serviceRecord) { + this.appId = ApplicationId.fromString(serviceRecord.get(APP_ID_RECORD_KEY)); + this.host = serviceRecord.get(HOST_RECORD_KEY); + this.port = Integer.parseInt(serviceRecord.get(PORT_RECORD_KEY)); + this.id = serviceRecord.get(OPAQUE_ID_KEY); + } + + public ApplicationId getApplicationId() { + return appId; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getId() { + return id; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof AMRecord otherRecord) { + return appId.equals(otherRecord.appId) + && host.equals(otherRecord.host) + && port == otherRecord.port + && id.equals(otherRecord.id); + } else { + return false; + } + } + + /** + * Converts this {@code AMRecord} into a {@link ServiceRecord}. + *

+ * The returned {@link ServiceRecord} contains the Application Master metadata + * (application ID, host, port, and opaque identifier) so that it can be stored + * in an external registry or retrieved later. + *

+ * While this method may not be directly used within Tez internals, + * it is part of the Tez public API and is intended for Tez clients + * that interact with unmanaged sessions or otherwise need to + * persist/reconstruct Application Master information. + * + * @return a {@link ServiceRecord} populated with the values of this {@code AMRecord} + */ + public ServiceRecord toServiceRecord() { + ServiceRecord serviceRecord = new ServiceRecord(); + serviceRecord.set(APP_ID_RECORD_KEY, appId); + serviceRecord.set(HOST_RECORD_KEY, host); + serviceRecord.set(PORT_RECORD_KEY, port); + serviceRecord.set(OPAQUE_ID_KEY, id); + return serviceRecord; + } + + @Override + public int hashCode() { + return Objects.hash(appId, host, port, id); + } +} diff --git a/tez-api/src/main/java/org/apache/tez/client/registry/package-info.java b/tez-api/src/main/java/org/apache/tez/client/registry/package-info.java new file mode 100644 index 0000000000..f08bdfc8db --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/registry/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Public +@Evolving +package org.apache.tez.client.registry; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; \ No newline at end of file diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index a95099dbe0..2176141d06 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2349,4 +2349,11 @@ static Set getPropertySet() { @ConfigurationScope(Scope.AM) @ConfigurationProperty public static final String TEZ_AM_STANDALONE_CONFS = TEZ_AM_PREFIX + "standalone.confs"; + + /** + * String value. The class to be used for the AM registry. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_REGISTRY_CLASS = TEZ_AM_PREFIX + "registry.class"; } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index 9bf5e0503d..71aff74801 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -102,6 +102,9 @@ public final class TezConstants { /// Version-related Environment variables public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION"; + //Arbitrary opaque ID to identify AM instances from AMRegistryClient + public static final String TEZ_AM_UUID = "TEZ_AM_UUID"; + private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn"; private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber"; diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 9b65f2f452..79f9f15a64 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -415,7 +415,7 @@ protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemp versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto) : new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto); + versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, null); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java new file mode 100644 index 0000000000..8c6ab58984 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/AMRegistry.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client.registry; + +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.client.registry.AMRecord; + +/** + * Base class for AMRegistry implementations. + * The specific implementation class is configured by `tez.am.registry.class`. + * + * Implementations should handle the relevant service lifecycle operations: + * `init`, `serviceStart`, `serviceStop`, etc. + * - `init` and `serviceStart` are invoked during `DAGAppMaster.serviceInit`. + * - `serviceStop` is invoked on `DAGAppMaster` shutdown. + */ +public abstract class AMRegistry extends AbstractService { + + /* Implementations should provide a public no-arg constructor. */ + protected AMRegistry(String name) { + super(name); + } + + /* Under typical usage, add() will be called once automatically with an AMRecord + for the DAGClientServer that services an AM. */ + public abstract void add(AMRecord server) throws Exception; + + /* Under typical usage, implementations should remove any stale AMRecords upon serviceStop. */ + public abstract void remove(AMRecord server) throws Exception; + +} \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java new file mode 100644 index 0000000000..943f5197bd --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Public +@Evolving +package org.apache.tez.dag.api.client.registry; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a8c17ed7de..99579c7ff7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -97,6 +97,7 @@ import org.apache.tez.Utils; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; +import org.apache.tez.client.registry.AMRecord; import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.AsyncDispatcherConcurrent; import org.apache.tez.common.ContainerSignatureMatcher; @@ -125,6 +126,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientHandler; import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.api.client.registry.AMRegistry; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; @@ -182,6 +184,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.AMRegistryUtils; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; @@ -244,6 +247,7 @@ public class DAGAppMaster extends AbstractService { private String appName; private final ApplicationAttemptId appAttemptID; private final ContainerId containerID; + private String amUUID; private final String nmHost; private final int nmPort; private final int nmHttpPort; @@ -350,7 +354,8 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, String [] localDirs, String[] logDirs, String clientVersion, - Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { + Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, + String amUUID) { super(DAGAppMaster.class.getName()); this.mdcContext = LoggingUtils.setupLog4j(); this.clock = clock; @@ -358,6 +363,7 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, this.appSubmitTime = appSubmitTime; this.appAttemptID = applicationAttemptId; this.containerID = containerId; + this.amUUID = amUUID; this.nmHost = nmHost; this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; @@ -632,6 +638,10 @@ protected void serviceInit(final Configuration conf) throws Exception { .setDaemon(true).setNameFormat("App Shared Pool - #%d").build()); execService = MoreExecutors.listeningDecorator(rawExecutor); + AMRegistry amRegistry = AMRegistryUtils.createAMRegistry(conf); + initAmRegistry(appAttemptID.getApplicationId(), amUUID, amRegistry, clientRpcServer); + addIfService(amRegistry, false); + initServices(conf); super.serviceInit(conf); @@ -659,6 +669,23 @@ protected void initClientRpcServer() { addIfService(clientRpcServer, true); } + @VisibleForTesting + public static void initAmRegistry(ApplicationId appId, String amUUID, AMRegistry amRegistry, + DAGClientServer dagClientServer) { + if (amRegistry != null) { + dagClientServer.registerServiceListener((service) -> { + if (service.isInState(STATE.STARTED)) { + AMRecord amRecord = AMRegistryUtils.recordForDAGClientServer(appId, amUUID, dagClientServer); + try { + amRegistry.add(amRecord); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + } + } + @VisibleForTesting protected DAGAppMasterShutdownHandler createShutdownHandler() { return new DAGAppMasterShutdownHandler(); @@ -2382,6 +2409,7 @@ public static void main(String[] args) { String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV); + String amUUID = System.getenv(TezConstants.TEZ_AM_UUID); if (clientVersion == null) { clientVersion = VersionInfo.UNKNOWN; } @@ -2446,7 +2474,7 @@ public static void main(String[] args) { System.getenv(Environment.PWD.name()), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())), - clientVersion, credentials, jobUserName, amPluginDescriptorProto); + clientVersion, credentials, jobUserName, amPluginDescriptorProto, amUUID); ShutdownHookManager.get().addShutdownHook( new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java index e0c8443577..71eafd8965 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java @@ -37,7 +37,7 @@ public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName, - pluginDescriptorProto); + pluginDescriptorProto, null); } @Override diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java new file mode 100644 index 0000000000..13cc27cbac --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/AMRegistryUtils.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.utils; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.api.client.registry.AMRegistry; + +public final class AMRegistryUtils { + + private AMRegistryUtils() {} + + public static AMRecord recordForDAGClientServer(ApplicationId appId, String opaqueId, + DAGClientServer dagClientServer) { + InetSocketAddress address = dagClientServer.getBindAddress(); + return new AMRecord(appId, address.getHostName(), address.getPort(), opaqueId); + } + + public static AMRegistry createAMRegistry(Configuration conf) throws Exception { + String tezAMRegistryClass = conf.get(TezConfiguration.TEZ_AM_REGISTRY_CLASS); + return tezAMRegistryClass == null ? null : ReflectionUtils.createClazzInstance(tezAMRegistryClass); + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java new file mode 100644 index 0000000000..dc8cc4acf7 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/registry/TestAMRegistry.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client.registry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.InetSocketAddress; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.DAGClientHandler; +import org.apache.tez.dag.api.client.DAGClientServer; +import org.apache.tez.dag.app.DAGAppMaster; +import org.apache.tez.dag.utils.AMRegistryUtils; + +import org.junit.Test; + +public class TestAMRegistry { + + @Test(timeout = 5000) + public void testAMRegistryFactory() throws Exception { + Configuration conf = new Configuration(); + AMRegistry amRegistry = AMRegistryUtils.createAMRegistry(conf); + assertNull(amRegistry); + String className = SkeletonAMRegistry.class.getName(); + conf.set(TezConfiguration.TEZ_AM_REGISTRY_CLASS, className); + amRegistry = AMRegistryUtils.createAMRegistry(conf); + assertNotNull(amRegistry); + assertEquals(className, amRegistry.getClass().getName()); + } + + @Test(timeout = 5000) + public void testRecordForDagServer() { + DAGClientServer dagClientServer = mock(DAGClientServer.class); + when(dagClientServer.getBindAddress()).thenReturn(new InetSocketAddress("testhost", 1000)); + ApplicationId appId = ApplicationId.newInstance(0, 1); + String id = UUID.randomUUID().toString(); + AMRecord record = AMRegistryUtils.recordForDAGClientServer(appId, id, dagClientServer); + assertEquals(appId, record.getApplicationId()); + assertEquals("testhost", record.getHost()); + assertEquals(1000, record.getPort()); + assertEquals(record.getId(), id); + } + + @Test(timeout = 20000) + public void testAMRegistryService() throws Exception { + DAGClientHandler dagClientHandler = mock(DAGClientHandler.class); + ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); + ApplicationId appId = ApplicationId.newInstance(0, 1); + String uuid = UUID.randomUUID().toString(); + when(appAttemptId.getApplicationId()).thenReturn(appId); + AMRegistry amRegistry = mock(AMRegistry.class); + FileSystem fs = mock(FileSystem.class); + DAGClientServer dagClientServer = new DAGClientServer(dagClientHandler, appAttemptId, fs); + try { + DAGAppMaster.initAmRegistry(appAttemptId.getApplicationId(), uuid, amRegistry, dagClientServer); + dagClientServer.init(new Configuration()); + dagClientServer.start(); + AMRecord record = AMRegistryUtils.recordForDAGClientServer(appId, uuid, dagClientServer); + verify(amRegistry, times(1)).add(record); + } finally { + dagClientServer.stop(); + } + } + + public static class SkeletonAMRegistry extends AMRegistry { + public SkeletonAMRegistry() { + super("SkeletonAMRegistry"); + } + @Override public void add(AMRecord server) throws Exception { } + @Override public void remove(AMRecord server) throws Exception { } + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index fbab519376..213d85b892 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -500,7 +500,7 @@ public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId c Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), - credentials, jobUserName, null); + credentials, jobUserName, null, null); shutdownHandler = new MockDAGAppMasterShutdownHandler(); this.launcherGoFlag = launcherGoFlag; this.initFailFlag = initFailFlag; diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index 40a8e20cd5..85a8248b95 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -552,7 +552,7 @@ public void testBadProgress() throws Exception { TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, new String[] {TEST_DIR.toString()}, new TezApiVersionInfo().getVersion(), amCreds, - "someuser", null)); + "someuser", null, null)); when(am.getState()).thenReturn(DAGAppMasterState.RUNNING); am.init(conf); am.start(); @@ -637,7 +637,7 @@ private void testDagCredentials(boolean doMerge) throws IOException { TEST_DIR.toString(), new String[] {TEST_DIR.toString()}, new String[] {TEST_DIR.toString()}, new TezApiVersionInfo().getVersion(), amCreds, - "someuser", null); + "someuser", null, null); am.init(conf); am.start(); @@ -758,7 +758,7 @@ public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) { super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(), new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath() }, - new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null); + new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null, null); } public static Credentials createCredentials() {