rightGroup);
-
+
/**
* Aggregate all counters by a group of counters
* @param rightGroup the group to be added to this group
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index a65388ffd0..19d5bbdffa 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -25,7 +25,7 @@
@Private
public enum TaskCounter {
// TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
-
+
NUM_SPECULATIONS,
/**
@@ -39,29 +39,29 @@ public enum TaskCounter {
* Alternately number of records seen by a ReduceProcessor
*/
REDUCE_INPUT_RECORDS,
-
+
REDUCE_OUTPUT_RECORDS, // Not used at the moment.
REDUCE_SKIPPED_GROUPS, // Not used at the moment.
REDUCE_SKIPPED_RECORDS, // Not used at the moment.
SPLIT_RAW_BYTES,
-
+
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS, // Not used at the moment.
/**
* Number of records written to disk in case of OnFileSortedOutput.
- *
- * Number of additional records writtent out to disk in case of
+ *
+ * Number of additional records written out to disk in case of
* ShuffledMergedInput; this represents the number of unnecessary spills to
* disk caused by lac of memory.
*/
SPILLED_RECORDS,
/**
- * Number of Inputs from which data is copied. Represents physical Inputs.
+ * Number of Inputs from which data is copied. Represents physical Inputs.
*/
NUM_SHUFFLED_INPUTS,
-
+
/**
* Number of Inputs from which data was not copied - typically due to an empty Input
*/
@@ -71,7 +71,7 @@ public enum TaskCounter {
* Number of failed copy attempts (physical inputs)
*/
NUM_FAILED_SHUFFLE_INPUTS,
-
+
MERGED_MAP_OUTPUTS,
GC_TIME_MILLIS,
CPU_MILLISECONDS,
@@ -84,7 +84,7 @@ public enum TaskCounter {
/**
* Represents the number of Input Records that were actually processed.
* Used by MRInput and ShuffledUnorderedKVInput
- *
+ *
*/
INPUT_RECORDS_PROCESSED,
@@ -93,7 +93,7 @@ public enum TaskCounter {
*/
INPUT_SPLIT_LENGTH_BYTES,
- //
+ //
/**
* Represents the number of actual output records.
* Used by MROutput, OnFileSortedOutput, and OnFileUnorderedKVOutput
@@ -112,7 +112,7 @@ public enum TaskCounter {
* spilled directly
*/
OUTPUT_LARGE_RECORDS,
-
+
SKIPPED_RECORDS, // Not used at the moment.
/**
@@ -132,19 +132,19 @@ public enum TaskCounter {
* size + overhead)
*/
OUTPUT_BYTES_PHYSICAL,
-
+
/**
* Bytes written to disk due to unnecessary spills (lac of adequate memory).
* Used by OnFileSortedOutput and ShuffledMergedInput
*/
ADDITIONAL_SPILLS_BYTES_WRITTEN,
-
+
/**
* Bytes read from disk due to previous spills (lac of adequate memory).
* Used by OnFileSortedOutput and ShuffledMergedInput
*/
ADDITIONAL_SPILLS_BYTES_READ,
-
+
/**
* Spills that were generated & read by the same task (unnecessary spills due to lac of
* adequate memory).
@@ -159,7 +159,7 @@ public enum TaskCounter {
* as final merge is avoided.
*/
SHUFFLE_CHUNK_COUNT,
-
+
INPUT_GROUPS, // Not used at the moment. Will eventually replace REDUCE_INPUT_GROUPS
/**
@@ -172,15 +172,15 @@ public enum TaskCounter {
* Uncompressed size of the data being processed by the relevant Shuffle.
* Includes serialization, file format etc overheads.
*/
- SHUFFLE_BYTES_DECOMPRESSED,
+ SHUFFLE_BYTES_DECOMPRESSED,
/**
- * Number of bytes which were shuffled directly to memory.
+ * Number of bytes which were shuffled directly to memory.
*/
SHUFFLE_BYTES_TO_MEM,
/**
- * Number of bytes which were shuffled directly to disk
+ * Number of bytes which were shuffled directly to disk
*/
SHUFFLE_BYTES_TO_DISK,
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java
index 4cb1ae94e0..9abbb82167 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java
@@ -81,10 +81,10 @@ public interface TezCounter extends Writable {
public default void aggregate(TezCounter other) {
increment(other.getValue());
};
-
+
/**
* Return the underlying object if this is a facade.
- * @return the undelying object.
+ * @return the underlying object.
*/
@Private
TezCounter getUnderlyingCounter();
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
index 92eea67440..cb4efc6fb6 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
@@ -30,7 +30,7 @@
/**
* ACL Policy Manager
- * An instance of this implements any ACL related activity when starting a session or submitting a
+ * An instance of this implements any ACL related activity when starting a session or submitting a
* DAG. It is used in the HistoryLoggingService to create domain ids and populate entities with
* domain id.
*/
@@ -41,7 +41,7 @@ public interface HistoryACLPolicyManager extends Configurable {
/**
* Take any necessary steps for setting up both Session ACLs and non session acls. This is called
* with the am configuration which contains the ACL information to be used to create a domain.
- * If the method returns a value, then its assumed to be a valid domain and used as domainId.
+ * If the method returns a value, then it's assumed to be a valid domain and used as domainId.
* If the method returns null, acls are disabled at session level, i.e use default acls at session
* level.
* If the method throws an Exception, history logging is disabled for the entire session.
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
index 3d7f2ab2b7..be58354c67 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginOnDemand.java
@@ -32,8 +32,8 @@
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
/**
- * This interface defines the routing of the event between tasks of producer and
- * consumer vertices. The routing is bi-directional. Users can customize the
+ * This interface defines the routing of the event between tasks of producer and
+ * consumer vertices. The routing is bidirectional. Users can customize the
* routing by providing an implementation of this interface.
*/
@Public
@@ -70,22 +70,22 @@ public int getSource() {
/**
* Class to provide routing metadata for {@link Event}s to be routed between
- * producer and consumer tasks. The routing data enabled the system to send
+ * producer and consumer tasks. The routing data enabled the system to send
* the event from the producer task output to the consumer task input
*/
public static class EventRouteMetadata {
private final int numEvents;
private final int[] targetIndices;
private final int[] sourceIndices;
-
+
/**
* Create an {@link EventRouteMetadata} that will create numEvents copies of
* the {@link Event} to be routed. Use this to create
* {@link EventRouteMetadata} for {@link DataMovementEvent}s or
* {@link InputFailedEvent}s where the target input indices must be
- * specified to route those events. Typically numEvents would be 1 for these
+ * specified to route those events. Typically, numEvents would be 1 for these
* events.
- *
+ *
* @param numEvents
* Number of copies of the event to be routed
* @param targetIndices
@@ -97,14 +97,14 @@ public static class EventRouteMetadata {
public static EventRouteMetadata create(int numEvents, int[] targetIndices) {
return new EventRouteMetadata(numEvents, targetIndices, null);
}
-
+
/**
* Create an {@link EventRouteMetadata} that will create numEvents copies of
* the {@link Event} to be routed. Use this to create
* {@link EventRouteMetadata} for {@link CompositeDataMovementEvent} where
* the target input indices and source output indices must be specified to
- * route those events. Typically numEvents would be 1 for these events.
- *
+ * route those events. Typically, numEvents would be 1 for these events.
+ *
* @param numEvents
* Number of copies of the event to be routed
* @param targetIndices
@@ -157,7 +157,7 @@ public int getNumEvents() {
* extending this to create a {@link EdgeManagerPluginOnDemand}, must provide
* the same constructor so that Tez can create an instance of the class at
* runtime.
- *
+ *
* @param context
* the context within which this {@link EdgeManagerPluginOnDemand}
* will run. Includes information like configuration which the user
@@ -177,7 +177,7 @@ public EdgeManagerPluginOnDemand(EdgeManagerPluginContext context) {
* @throws Exception
*/
public abstract void initialize() throws Exception;
-
+
/**
* This method will be invoked just before routing of events will begin. The
* plugin can use this opportunity to make any runtime initialization's that
@@ -187,7 +187,7 @@ public EdgeManagerPluginOnDemand(EdgeManagerPluginContext context) {
/**
* Get the number of physical inputs on the destination task
- * @param destinationTaskIndex Index of destination task for which number of
+ * @param destinationTaskIndex Index of destination task for which number of
* inputs is needed
* @return Number of physical inputs on the destination task
* @throws Exception
@@ -196,34 +196,34 @@ public EdgeManagerPluginOnDemand(EdgeManagerPluginContext context) {
/**
* Get the number of physical outputs on the source task
- * @param sourceTaskIndex Index of the source task for which number of outputs
+ * @param sourceTaskIndex Index of the source task for which number of outputs
* is needed
* @return Number of physical outputs on the source task
* @throws Exception
*/
public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception;
-
+
/**
* Get the number of destination tasks that consume data from the source task
* @param sourceTaskIndex Source task index
* @throws Exception
*/
public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception;
-
+
/**
* Return the source task index to which to send the input error event
- *
+ *
* @param destinationTaskIndex
* Destination task that reported the error
* @param destinationFailedInputIndex
- * Index of the physical input on the destination task that reported
+ * Index of the physical input on the destination task that reported
* the error
* @return Index of the source task that created the unavailable input
* @throws Exception
*/
public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
int destinationFailedInputIndex) throws Exception;
-
+
/**
* The method provides the {@link EventRouteMetadata} to route a
* {@link DataMovementEvent} produced by the given source task to the given
@@ -231,7 +231,7 @@ public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
* target input indices set to enable the routing. If the routing metadata is
* common across different events then the plugin can cache and reuse the same
* object.
- *
+ *
* @param sourceTaskIndex
* The index of the task in the source vertex of this edge that
* produced a {@link DataMovementEvent}
@@ -254,7 +254,7 @@ public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
* the target input indices and source output indices set to enable the
* routing. If the routing metadata is common across different events then the
* plugin can cache and reuse the same object.
- *
+ *
* @param sourceTaskIndex
* The index of the task in the source vertex of this edge that
* produced a {@link CompositeDataMovementEvent}
@@ -275,7 +275,7 @@ public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
* target input indices set to enable the routing. If the routing metadata is
* common across different events then the plugin can cache and reuse the same
* object.
- *
+ *
* @param sourceTaskIndex
* The index of the failed task in the source vertex of this edge.
* @param destinationTaskIndex
@@ -287,7 +287,7 @@ public abstract int routeInputErrorEventToSource(int destinationTaskIndex,
*/
public abstract @Nullable EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
int sourceTaskIndex, int destinationTaskIndex) throws Exception;
-
+
/**
* Return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
* the vertex manager.
@@ -305,7 +305,7 @@ public EdgeManagerPluginContext getContext() {
* The event will be routed to every destination task index in the key of the
* map. Every physical input in the value for that task key will receive the
* input.
- *
+ *
* @param event
* Data movement event that contains the output information
* @param sourceTaskIndex
@@ -329,7 +329,7 @@ public void routeDataMovementEventToDestination(DataMovementEvent event,
* for that task key will receive the failure notification. This method will
* be called once for every source task failure and information for all
* affected destinations must be provided in that invocation.
- *
+ *
* @param sourceTaskIndex
* Source task
* @param destinationTaskAndInputIndices
@@ -341,19 +341,19 @@ public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
/**
* Return the source task index to which to send the input error event
- *
+ *
* @param event
* Input read error event. Has more information about the error
* @param destinationTaskIndex
* Destination task that reported the error
* @param destinationFailedInputIndex
- * Index of the physical input on the destination task that reported
+ * Index of the physical input on the destination task that reported
* the error
* @return Index of the source task that created the unavailable input
* @throws Exception
*/
public int routeInputErrorEventToSource(InputReadErrorEvent event,
- int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
+ int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
return routeInputErrorEventToSource(destinationTaskIndex, destinationFailedInputIndex);
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 30f33e26a9..a864dba72f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1929,7 +1929,7 @@ public TezConfiguration(boolean loadDefaults) {
/**
* Int value.
- * The maximium number of tasks running in parallel within the app master process.
+ * The maximum number of tasks running in parallel within the app master process.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
index 2d0476a721..fec9191dec 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -54,7 +54,7 @@
* create a httpclient, configured for the appropriate runtime.
*
* on hadoop 2.6+ the factory returns TimelineReaderTokenAuthenticatedStrategy, which supports
- * kerberos based auth (secure cluster) or psuedo auth (un-secure cluster).
+ * kerberos based auth (secure cluster) or pseudo auth (un-secure cluster).
*
* on hadoop 2.4 where the token delegation auth is not supported, TimelineReaderPseudoAuthenticatedStrategy
* is used which supports only unsecure timeline.
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index bda439616e..a901d8aa7a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -26,7 +26,7 @@
*
* This interface has methods which can be used by a {@link org.apache.tez.runtime.api.Processor}
* to control execution of this Input and read data from it.
- *
+ *
* Actual implementations are expected to derive from {@link AbstractLogicalInput}
*/
@Public
@@ -36,17 +36,17 @@ public interface Input {
/**
* Start any processing that the Input may need to perform. It is the
* responsibility of the Processor to start Inputs.
- *
+ *
* This typically acts as a signal to Inputs to start any Processing that they
- * may required. A blocking implementation of this method should not be used
+ * may require. A blocking implementation of this method should not be used
* as a mechanism to determine when an Input is actually ready.
- *
+ *
* This method may be invoked by the framework under certain circumstances,
* and as such requires the implementation to be non-blocking.
- *
+ *
* Inputs must be written to handle multiple start invocations - typically
* honoring only the first one.
- *
+ *
* @throws Exception
*/
public void start() throws Exception;
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
index 1ba1a90e3e..004295ddad 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -80,9 +80,9 @@ public interface TaskContext {
* @return Vertex Name
*/
public String getTaskVertexName();
-
+
/**
- * Get the index of this task's vertex in the set of vertices in the DAG. This
+ * Get the index of this task's vertex in the set of vertices in the DAG. This
* is consistent and valid across all tasks/vertices in the same DAG.
* @return index
*/
@@ -119,24 +119,24 @@ public interface TaskContext {
/**
* Returns an identifier which is unique to the specific Input, Processor or
* Output
- *
+ *
* @return a unique identifier
*/
public String getUniqueIdentifier();
-
+
/**
- * Returns a shared {@link ObjectRegistry} to hold user objects in memory
- * between tasks.
+ * Returns a shared {@link ObjectRegistry} to hold user objects in memory
+ * between tasks.
* @return {@link ObjectRegistry}
*/
public ObjectRegistry getObjectRegistry();
-
+
/**
- * Notifies the framework that progress is being made by this component.
+ * Notifies the framework that progress is being made by this component.
* This is used to identify hung components that are not making progress.
* Must be called periodically until processing has completed for this component.
- * Care must be taken to call this when real progress has been made. Simply
- * calling this continuously from a thread without regard to real work may prevent
+ * Care must be taken to call this when real progress has been made. Simply
+ * calling this continuously from a thread without regard to real work may prevent
* identification of hung components and delay/stall job completion.
*/
public void notifyProgress();
@@ -198,34 +198,34 @@ public interface TaskContext {
*/
@Nullable
public ByteBuffer getServiceProviderMetaData(String serviceName);
-
+
/**
* Request a specific amount of memory during initialization
* (initialize(..*Context)) The requester is notified of allocation via the
* provided callback handler.
- *
+ *
* Currently, (post TEZ-668) the caller will be informed about the available
* memory after initialization (I/P/O initialize(...)), and before the
* start/run invocation. There will be no other invocations on the callback.
- *
+ *
* This method can be called only once by any component. Calling it multiple
* times from within the same component will result in an error.
- *
+ *
* Each Input / Output must request memory. For Inputs / Outputs which do not
* have a specific ask, a null callback handler can be specified with a
* request size of 0.
- *
+ *
* @param size
* request size in bytes.
* @param callbackHandler
* the callback handler to be invoked once memory is assigned
*/
public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler);
-
+
/**
* Gets the total memory available to all components of the running task. This
* values will always be constant, and does not factor in any allocations.
- *
+ *
* @return the total available memory for all components of the task
*/
public long getTotalMemoryAvailableToTask();
@@ -248,8 +248,8 @@ public interface TaskContext {
* might not be guaranteed. The service returned works with tez framework, currently it provides
* thread reuse across tasks.
* Note: This is an unstable api, and is not recommended to be used by external users. Please wait
- * until API and code is stablized by use in Tez processors, input and outputs.
- * @param parallelism The expected parallelism for for this ExecutorService.
+ * until API and code is stabilized by use in Tez processors, input and outputs.
+ * @param parallelism The expected parallelism for this ExecutorService.
* @param threadNameFormat The thread name format, format will be given one parameter, threadId.
* @return An ExecutorService instance.
*/
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index bdfc6e02c1..477134e4d2 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -221,7 +221,7 @@ public static byte[] toByteArray(BitSet bits) {
}
/**
- * Convert DAGPlan to text. Skip sensitive informations like credentials.
+ * Convert DAGPlan to text. Skip sensitive information like credentials.
*
* @return a string representation of the dag plan with sensitive information removed
*/
diff --git a/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java b/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java
index b2b9266da6..8d55e6f8cf 100644
--- a/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java
+++ b/tez-common/src/main/java/org/apache/tez/common/web/ProfileServlet.java
@@ -41,7 +41,7 @@
/**
*
* Servlet that runs async-profiler as web-endpoint.
- * Following options from async-profiler can be specified as query paramater.
+ * Following options from async-profiler can be specified as query parameter.
* // -e event profiling event: cpu|alloc|lock|cache-misses etc.
* // -d duration run profiling for{@literal } seconds (integer)
* // -i interval sampling interval in nanoseconds (long)
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index 14eaa3a8db..9c99af0066 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -20,7 +20,7 @@
public enum TaskAttemptTerminationCause {
UNKNOWN_ERROR, // The error cause is unknown. Usually means a gap in error propagation
-
+
TERMINATED_BY_CLIENT, // Killed by client command
TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
TERMINATED_AT_RECOVERY, // Killed in recovery, due to can not recover running task attempt
@@ -29,7 +29,7 @@ public enum TaskAttemptTerminationCause {
TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded
TERMINATED_EFFECTIVE_SPECULATION, // Killed original attempt because speculation succeeded
TERMINATED_ORPHANED, // Attempt is no longer needed by the task
-
+
APPLICATION_ERROR, // Failed due to application code error
FRAMEWORK_ERROR, // Failed due to code error in Tez code
INPUT_READ_ERROR, // Failed due to error in reading inputs
@@ -37,12 +37,12 @@ public enum TaskAttemptTerminationCause {
OUTPUT_LOST, // Failed because attempts output were reported lost
NO_PROGRESS, // Failed because no progress was being made
TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
-
+
CONTAINER_LAUNCH_FAILED, // Failed to launch container
CONTAINER_EXITED, // Container exited. Indicates gap in specific error propagation from the cluster
CONTAINER_STOPPED, // Container stopped or released by Tez
NODE_FAILED, // Node for the container failed
- NODE_DISK_ERROR, // Disk failed on the node runnign the task
+ NODE_DISK_ERROR, // Disk failed on the node running the task
COMMUNICATION_ERROR, // Equivalent to a launch failure
SERVICE_BUSY, // Service rejected the task
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
index c46aa6088b..8e6a7b65d9 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java
@@ -58,7 +58,7 @@ public static TezDAGID getInstance(ApplicationId applicationId, int id) {
Objects.requireNonNull(applicationId, "ApplicationID cannot be null");
return tezDAGIDCache.intern(new TezDAGID(applicationId, id));
}
-
+
/**
* Get a DAGID object from given parts.
* @param yarnRMIdentifier YARN RM identifier
@@ -69,7 +69,7 @@ public static TezDAGID getInstance(ApplicationId applicationId, int id) {
public static TezDAGID getInstance(String yarnRMIdentifier, int appId, int id) {
// The newly created TezDAGIds are primarily for their hashCode method, and
// will be short-lived.
- // Alternately the cache can be keyed by the hash of the incoming paramters.
+ // Alternately the cache can be keyed by the hash of the incoming parameters.
Objects.requireNonNull(yarnRMIdentifier, "yarnRMIdentifier cannot be null");
return tezDAGIDCache.intern(new TezDAGID(yarnRMIdentifier, appId, id));
}
@@ -83,7 +83,7 @@ private TezDAGID(ApplicationId applicationId, int id) {
this.applicationId = applicationId;
}
-
+
private TezDAGID(String yarnRMIdentifier, int appId, int id) {
this(ApplicationId.newInstance(Long.parseLong(yarnRMIdentifier),
appId), id);
@@ -127,7 +127,7 @@ public static TezDAGID readTezDAGID(DataInput in) throws IOException {
TezDAGID dagID = getInstance(ApplicationId.newInstance(clusterId, appId), dagIdInt);
return dagID;
}
-
+
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(applicationId.getClusterTimestamp());
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
index f7becc250f..e428317237 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java
@@ -35,7 +35,7 @@
* TezVertexID represents the immutable and unique identifier for
* a Vertex in a Tez DAG. Each TezVertexID encompasses multiple Tez Tasks.
*
- * TezVertezID consists of 2 parts. The first part is the {@link TezDAGID},
+ * TezVertexID consists of 2 parts. The first part is the {@link TezDAGID},
* that is the Tez DAG that this vertex belongs to. The second part is
* the vertex number.
*
@@ -113,7 +113,7 @@ public void readFields(DataInput in) throws IOException {
dagId = TezDAGID.readTezDAGID(in);
super.readFields(in);
}
-
+
public static TezVertexID readTezVertexID(DataInput in) throws IOException {
TezDAGID dagID = TezDAGID.readTezDAGID(in);
int vertexIdInt = TezID.readID(in);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e9304599dc..3c99b1afd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2211,7 +2211,7 @@ public void handle(DAGEvent event) {
DAG dag = context.getCurrentDAG();
int eventDagIndex = event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
- return; // event not relevant any more
+ return; // event not relevant anymore
}
((EventHandler)dag).handle(event);
}
@@ -2225,7 +2225,7 @@ public void handle(TaskEvent event) {
int eventDagIndex =
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
- return; // event not relevant any more
+ return; // event not relevant anymore
}
Task task =
dag.getVertex(event.getVertexID()).
@@ -2255,7 +2255,7 @@ public void handle(TaskAttemptEvent event) {
int eventDagIndex =
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
- return; // event not relevant any more
+ return; // event not relevant anymore
}
Task task =
dag.getVertex(event.getVertexID()).
@@ -2274,7 +2274,7 @@ public void handle(VertexEvent event) {
int eventDagIndex =
event.getDAGID().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
- return; // event not relevant any more
+ return; // event not relevant anymore
}
Vertex vertex =
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index ce3b62bbd5..b6e942cdae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -81,7 +81,7 @@
/**
* RecoverParser is mainly for Tez AM Recovery. It would read the recovery events. (summary & non-summary)
- *
+ *
*/
public class RecoveryParser {
@@ -150,7 +150,7 @@ public DAGRecoveryData(DAGSummaryData dagSummaryData) {
// DAG is not recoverable if vertex has committer and has completed the commit (based on summary recovery events)
// but its full recovery events are not seen. (based on non-summary recovery events)
- // Unrecoverable reason: vertex is committed we cannot rerun it and if vertex recovery events are not completed
+ // Unrecoverable reason: vertex is committed we cannot rerun it and if vertex recovery events are not completed
// we cannot run other vertices that may depend on this one. So we have to abort.
public void checkRecoverableNonSummary() {
// It is OK without full recovering events if the dag is completed based on summary event.
@@ -425,8 +425,8 @@ public static void main(String argv[]) throws IOException {
}
}
- private Path getSummaryPath(Path attemptRrecoveryDataDir) {
- return TezCommonUtils.getSummaryRecoveryPath(attemptRrecoveryDataDir);
+ private Path getSummaryPath(Path attemptRecoveryDataDir) {
+ return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir);
}
private FSDataInputStream getSummaryStream(Path summaryPath)
@@ -644,7 +644,7 @@ private List getDAGRecoveryFiles(TezDAGID dagId) throws IOException {
/**
* 1. Read Summary Recovery file and build DAGSummaryData
- * Check whether it is recoverable based on the summary file (whether dag is
+ * Check whether it is recoverable based on the summary file (whether dag is
* in the middle of committing)
* 2. Read the non-Summary Recovery file and build DAGRecoveryData
* Check whether it is recoverable based on both the summary file and non-summary file
@@ -796,10 +796,10 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
case DAG_FINISHED:
recoveredDAGData.dagFinishedEvent = (DAGFinishedEvent)event;
skipAllOtherEvents = true;
- break;
+ break;
case DAG_COMMIT_STARTED:
case VERTEX_GROUP_COMMIT_STARTED:
- case VERTEX_GROUP_COMMIT_FINISHED:
+ case VERTEX_GROUP_COMMIT_FINISHED:
case CONTAINER_LAUNCHED:
{
// Nothing to do for now
@@ -918,27 +918,27 @@ public static class VertexRecoveryData {
private VertexFinishedEvent vertexFinishedEvent;
private Map taskRecoveryDataMap =
new HashMap();
- private boolean commited;
+ private boolean committed;
@VisibleForTesting
public VertexRecoveryData(VertexInitializedEvent vertexInitedEvent,
VertexConfigurationDoneEvent vertexReconfigureDoneEvent,
VertexStartedEvent vertexStartedEvent,
VertexFinishedEvent vertexFinishedEvent,
- Map taskRecoveryDataMap, boolean commited) {
+ Map taskRecoveryDataMap, boolean committed) {
super();
this.vertexInitedEvent = vertexInitedEvent;
this.vertexConfigurationDoneEvent = vertexReconfigureDoneEvent;
this.vertexStartedEvent = vertexStartedEvent;
this.vertexFinishedEvent = vertexFinishedEvent;
this.taskRecoveryDataMap = taskRecoveryDataMap;
- this.commited = commited;
+ this.committed = committed;
}
public VertexRecoveryData(boolean committed) {
- this.commited = committed;
+ this.committed = committed;
}
-
+
public VertexInitializedEvent getVertexInitedEvent() {
return vertexInitedEvent;
}
@@ -987,7 +987,7 @@ public boolean isVertexFinished() {
}
public boolean isVertexCommitted() {
- return this.commited;
+ return this.committed;
}
public TaskRecoveryData getTaskRecoveryData(TezTaskID taskId) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 4f9e8723aa..1e2671f5b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -50,7 +50,7 @@
@InterfaceAudience.Private
public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, VertexStateUpdateListener {
- // TODO TEZ-2003 (post) TEZ-2669 Propagate errors baack to the AM with proper error reporting
+ // TODO TEZ-2003 (post) TEZ-2669 Propagate errors back to the AM with proper error reporting
private final AppContext context;
private final TaskCommunicatorManager taskCommunicatorManager;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 51895f4afd..92954b65b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -387,7 +387,7 @@ public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason task
// and messages from the scheduler will release the container.
// TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
- // Fix along the same lines as TEZ-2124 by introducing an explict context.
+ // Fix along the same lines as TEZ-2124 by introducing an explicit context.
sendEvent(new TaskAttemptEventAttemptKilled(taskAttemptId,
diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
taskAttemptEndReason)));
@@ -400,7 +400,7 @@ public void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType taskFailu
// and messages from the scheduler will release the container.
// TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
- // Fix along the same lines as TEZ-2124 by introducing an explict context.
+ // Fix along the same lines as TEZ-2124 by introducing an explicit context.
//TODO-3183. Allow the FailureType to be specified
sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId,
TaskAttemptEventType.TA_FAILED, taskFailureType, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
index e23d27cf6e..6954f12e89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventTaskAttemptCompleted.java
@@ -24,21 +24,21 @@
public class VertexEventTaskAttemptCompleted extends VertexEvent {
private TezTaskAttemptID attemptId;
- private TaskAttemptStateInternal attempState;
-
+ private TaskAttemptStateInternal attemptState;
+
public VertexEventTaskAttemptCompleted(TezTaskAttemptID taskAttemptId,
TaskAttemptStateInternal state) {
super(taskAttemptId.getVertexID(),
VertexEventType.V_TASK_ATTEMPT_COMPLETED);
this.attemptId = taskAttemptId;
- this.attempState = state;
+ this.attemptState = state;
}
public TezTaskAttemptID getTaskAttemptId() {
return attemptId;
}
-
+
public TaskAttemptStateInternal getTaskAttemptState() {
- return attempState;
+ return attemptState;
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index c5b4e1aa80..3c6f46cfd5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -35,8 +35,8 @@
import com.google.common.collect.Lists;
public class ScatterGatherEdgeManager extends EdgeManagerPluginOnDemand {
-
- private AtomicReference> commonRouteMeta =
+
+ private AtomicReference> commonRouteMeta =
new AtomicReference>();
private Object commonRouteMetaLock = new Object();
private int[][] sourceIndices;
@@ -55,12 +55,12 @@ public void initialize() {
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
return getContext().getSourceVertexNumTasks();
}
-
+
@Override
public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
int physicalOutputs = getContext().getDestinationVertexNumTasks();
Preconditions.checkArgument(physicalOutputs >= 0,
- "ScatteGather edge manager must have destination vertex task parallelism specified");
+ "ScatterGather edge manager must have destination vertex task parallelism specified");
return physicalOutputs;
}
@@ -98,10 +98,10 @@ private void createIndices() {
targetIndices[i] = new int[]{i};
}
}
-
+
@Override
public void prepareForRouting() throws Exception {
- createIndices();
+ createIndices();
}
@Override
@@ -112,12 +112,12 @@ public EventRouteMetadata routeDataMovementEventToDestination(
}
return null;
}
-
+
@Override
public @Nullable CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
int sourceTaskIndex, int destinationTaskIndex)
throws Exception {
- return CompositeEventRouteMetadata.create(1, targetIndices[sourceTaskIndex][0],
+ return CompositeEventRouteMetadata.create(1, targetIndices[sourceTaskIndex][0],
sourceIndices[destinationTaskIndex][0]);
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index d2933c5b86..d08c8d3d29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -169,7 +169,7 @@ public class TaskImpl implements Task, EventHandler {
private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
-
+
private static final StateMachineFactory
stateMachineFactory
@@ -179,7 +179,7 @@ public class TaskImpl implements Task, EventHandler {
// define the state machine of Task
// Transitions from NEW state
- // Stay in NEW in recovery when Task is killed in the previous AM
+ // Stay in NEW in recovery when Task is killed in the previous AM
.addTransition(TaskStateInternal.NEW,
EnumSet.of(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED),
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
@@ -200,7 +200,7 @@ TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
TaskEventType.T_ATTEMPT_FAILED,
new AttemptFailedTransition())
- // Happens in recovery
+ // Happens in recovery
.addTransition(TaskStateInternal.SCHEDULED,
EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_SUCCEEDED,
@@ -213,7 +213,7 @@ TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
- .addTransition(TaskStateInternal.RUNNING,
+ .addTransition(TaskStateInternal.RUNNING,
EnumSet.of(TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_SUCCEEDED,
new AttemptSucceededTransition())
@@ -398,7 +398,7 @@ public TaskImpl(TezVertexID vertexId, int taskIndex,
stateMachineFactory.make(this), this);
augmentStateMachine();
}
-
+
@Override
public Map getAttempts() {
readLock.lock();
@@ -417,7 +417,7 @@ public Map getAttempts() {
readLock.unlock();
}
}
-
+
@Override
public TaskAttempt getAttempt(TezTaskAttemptID attemptID) {
readLock.lock();
@@ -478,7 +478,7 @@ public TezCounters getCounters() {
readLock.unlock();
}
}
-
+
TaskStatistics getStatistics() {
// simply return the stats from the best attempt
readLock.lock();
@@ -549,7 +549,7 @@ public ArrayList getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
readLock.unlock();
}
}
-
+
@Override
public TaskSpec getBaseTaskSpec() {
readLock.lock();
@@ -559,7 +559,7 @@ public TaskSpec getBaseTaskSpec() {
readLock.unlock();
}
}
-
+
@Override
public TaskLocationHint getTaskLocationHint() {
readLock.lock();
@@ -643,7 +643,7 @@ private long getLastTaskAttemptFinishTime() {
long finishTime = 0;
for (TaskAttempt at : attempts.values()) {
//select the max finish time of all attempts
- // FIXME shouldnt this not count attempts killed after an attempt succeeds
+ // FIXME shouldn't this not count attempts killed after an attempt succeeds
if (finishTime < at.getFinishTime()) {
finishTime = at.getFinishTime();
}
@@ -693,7 +693,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptID) {
LOG.debug("Commit go/no-go request from {}", taskAttemptID);
TaskState state = getState();
if (state == TaskState.SCHEDULED) {
- // the actual running task ran and is done and asking for commit. we are still stuck
+ // the actual running task ran and is done and asking for commit. we are still stuck
// in the scheduled state which indicates a backlog in event processing. lets wait for the
// backlog to clear. returning false will make the attempt come back to us.
LOG.info(
@@ -701,7 +701,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptID) {
+ "Attempt committing before state machine transitioned to running : Task {}", taskId);
return false;
}
- // at this point the attempt is no longer in scheduled state or else we would still
+ // at this point the attempt is no longer in scheduled state or else we would still
// have been in scheduled state in task impl.
if (state != TaskState.RUNNING) {
LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
@@ -897,12 +897,12 @@ private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) {
eventHandler.handle(new DAGEventSchedulerUpdate(
DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, attempts.get(taId)));
}
-
+
private static void unSucceed(TaskImpl task) {
task.commitAttempt = null;
task.successfulAttempt = null;
}
-
+
/**
* @return a String representation of the splits.
*
@@ -937,7 +937,7 @@ protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
this.finishTime = clock.getTime();
TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
getVertex().getName(), getLaunchTime(), this.finishTime, null,
- finalState,
+ finalState,
StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
getCounters(), failedAttempts);
this.appContext.getHistoryHandler().handle(
@@ -949,7 +949,7 @@ private void addDiagnosticInfo(String diag) {
diagnostics.add(diag);
}
}
-
+
@VisibleForTesting
int getUncompletedAttemptsCount() {
try {
@@ -988,7 +988,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (task.recoveryData != null) {
TaskStartedEvent tStartedEvent = task.recoveryData.getTaskStartedEvent();
TaskFinishedEvent tFinishedEvent = task.recoveryData.getTaskFinishedEvent();
- // If TaskStartedEvent is not seen but TaskFinishedEvent is seen, that means
+ // If TaskStartedEvent is not seen but TaskFinishedEvent is seen, that means
// Task is killed before it is started. Just send T_TERMINATE to itself to move to KILLED
if (tStartedEvent == null
&& tFinishedEvent != null) {
@@ -1238,7 +1238,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
return task.getInternalState();
}
}
-
+
private boolean shouldScheduleNewAttempt() {
return (getUncompletedAttemptsCount() == 0
&& successfulAttempt == null);
@@ -1248,7 +1248,7 @@ private static class AttemptFailedTransition implements
MultipleArcTransition {
private TezTaskAttemptID schedulingCausalTA;
-
+
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++;
@@ -1309,7 +1309,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
protected TaskStateInternal getDefaultState(TaskImpl task) {
return task.getInternalState();
}
-
+
protected TezTaskAttemptID getSchedulingCausalTA() {
return schedulingCausalTA;
}
@@ -1337,7 +1337,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
containerId, failedAttemptId, true));
}
}
-
+
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!failedAttemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
@@ -1352,7 +1352,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
task.internalError(event.getType());
}
Preconditions.checkState(castEvent.getCausalEvent() != null);
- TaskAttemptEventOutputFailed destinationEvent =
+ TaskAttemptEventOutputFailed destinationEvent =
(TaskAttemptEventOutputFailed) castEvent.getCausalEvent();
schedulingCausalTA = destinationEvent.getInputFailedEvent().getSourceInfo().getTaskAttemptID();
@@ -1371,7 +1371,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
return returnState;
}
-
+
@Override
protected TezTaskAttemptID getSchedulingCausalTA() {
return schedulingCausalTA;
@@ -1451,12 +1451,12 @@ public void onStateChanged(TaskImpl task, TaskStateInternal taskStateInternal) {
// This is a horrible hack to get around recovery issues. Without this, recovery would fail
// for successful vertices.
// With this, recovery will end up failing for DAGs making use of InputInitializerEvents
- int succesfulAttemptInt = -1;
+ int successfulAttemptInt = -1;
if (successfulAttempt != null) {
- succesfulAttemptInt = successfulAttempt.getTaskAttemptID().getId();
+ successfulAttemptInt = successfulAttempt.getTaskAttemptID().getId();
}
task.stateChangeNotifier.taskSucceeded(task.getVertex().getName(), task.getTaskID(),
- succesfulAttemptInt);
+ successfulAttemptInt);
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
index 5e45e70993..cb5980bc03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -53,7 +53,7 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
private TaskSchedulerContext real;
private ExecutorService executorService;
-
+
/**
* @param real the actual TaskSchedulerAppCallback
* @param executorService the ExecutorService to be used to send these events.
@@ -90,7 +90,7 @@ public void nodesUpdated(List updatedNodes) {
@Override
public void appShutdownRequested() {
- executorService.submit(new AppShudownRequestedCallable(real));
+ executorService.submit(new AppShutdownRequestedCallable(real));
}
@Override
@@ -116,7 +116,7 @@ public float getProgress() {
throw new TezUncheckedException(e);
}
}
-
+
@Override
public void preemptContainer(ContainerId containerId) {
executorService.submit(new PreemptContainerCallable(real, containerId));
@@ -280,10 +280,10 @@ public Void call() throws Exception {
}
}
- static class AppShudownRequestedCallable extends TaskSchedulerContextCallbackBase
+ static class AppShutdownRequestedCallable extends TaskSchedulerContextCallbackBase
implements Callable {
- public AppShudownRequestedCallable(TaskSchedulerContext app) {
+ public AppShutdownRequestedCallable(TaskSchedulerContext app) {
super(app);
}
@@ -346,19 +346,19 @@ public Void call() throws Exception {
static class PreemptContainerCallable extends TaskSchedulerContextCallbackBase
implements Callable {
private final ContainerId containerId;
-
+
public PreemptContainerCallable(TaskSchedulerContext app, ContainerId id) {
super(app);
this.containerId = id;
}
-
+
@Override
public Void call() throws Exception {
app.preemptContainer(containerId);
return null;
}
}
-
+
static class GetProgressCallable extends TaskSchedulerContextCallbackBase
implements Callable {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
index a14112486a..3dd7a6a9f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEventType.java
@@ -28,14 +28,14 @@ public enum AMNodeEventType {
//Producer: TaskSchedulerEventHandler
N_TA_SUCCEEDED,
- // Producer: TaskSchedulerEventHnadler, Task(retroactive failure)
+ // Producer: TaskSchedulerEventHandler, Task(retroactive failure)
N_TA_ENDED,
-
+
//Producer: TaskScheduler via TaskSchedulerEventHandler
N_TURNED_UNHEALTHY,
N_TURNED_HEALTHY,
N_NODE_COUNT_UPDATED, // for blacklisting.
-
+
//Producer: AMNodeManager
N_IGNORE_BLACKLISTING_ENABLED,
N_IGNORE_BLACKLISTING_DISABLED,
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 5b5a9c7261..d51b79df76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -283,7 +283,7 @@ private void addToEventQueue(DAGHistoryEvent event) {
public void handle(DAGHistoryEvent event) throws IOException {
if (stopped.get()) {
- LOG.warn("Igoring event as service stopped, eventType"
+ LOG.warn("Ignoring event as service stopped, eventType"
+ event.getHistoryEvent().getEventType());
return;
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
index 7d93481581..97dcda5a14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
@@ -152,7 +152,7 @@ private boolean shouldParseSpecificTaskList() {
*
*
* @param conf
- * @return a map from the vertex name to a BitSet representing tasks to be instruemented. null if
+ * @return a map from the vertex name to a BitSet representing tasks to be instrumented. null if
* the provided configuration is empty or invalid
*/
private Map getSpecificTasks(Configuration conf) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
index b4871bd041..5d107ed737 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
@@ -98,7 +98,7 @@ public static List getVMCommand(
vargs.add("2>" + getTaskLogFile(LogName.STDERR));
// TODO Is this StringBuilder really required ? YARN already accepts a list of commands.
- // Final commmand
+ // Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index 6741a361c9..69cb4acf2e 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -41,7 +41,7 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase {
// - Report taskSuccess via a method instead of the heartbeat
// - Add methods to signal container / task state changes
// - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
- // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
+ // - Handling of containers / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
/**
@@ -75,7 +75,7 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase {
* This method must be invoked periodically to receive updates for a running task
*
* @param request the update from the running task.
- * @return the response that is requried by the task.
+ * @return the response that is required by the task.
* @throws IOException
* @throws TezException
*/
@@ -176,7 +176,7 @@ void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType taskFailureType,
/**
* Get an identifier for the executing context of the DAG.
- * @return a String identifier for the exeucting context.
+ * @return a String identifier for the executing context.
*/
String getCurrentAppIdentifier();
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
index c6578ffa31..d81721303c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java
@@ -155,7 +155,7 @@ public void testScatterGatherManager() {
} catch (IllegalArgumentException e) {
e.printStackTrace();
Assert.assertTrue(e.getMessage()
- .contains("ScatteGather edge manager must have destination vertex task parallelism specified"));
+ .contains("ScatterGather edge manager must have destination vertex task parallelism specified"));
}
when(mockContext.getDestinationVertexNumTasks()).thenReturn(0);
manager.getNumSourceTaskPhysicalOutputs(0);
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
index 6eaa03f9d7..361d7a9814 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -226,7 +226,7 @@ private DAG createDag(TezConfiguration tezConf, Path streamPath,
* in both cases for brevity of code. The join task can perform the join
* of its fragment of keys with all the keys of the hash side. Using an
* unpartitioned edge to transfer the complete output of the hash side to
- * be broadcasted to all fragments of the streamed side. Again, since the
+ * be broadcast to all fragments of the streamed side. Again, since the
* data is the key, the value is null. The setFromConfiguration call is
* optional and allows overriding the config options with command line
* parameters.
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
index 02156d684e..3dc9150436 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -63,7 +63,7 @@
* feeding to {@link SortMergeJoinProcessor}, just like the sort phase before
* reduce in traditional MapReduce. Then we could move forward the iterators of
* two inputs in {@link SortMergeJoinProcessor} to find the joined keys since
- * they are both sorted already.
Because of the sort implemention
+ * they are both sorted already.
Because of the sort implementation
* difference we describe above, the data requirement is also different for
* these 2 sort algorithms. For {@link HashJoinExample} It is required that keys
* in the hashFile are unique. while for {@link SortMergeJoinExample} it is
@@ -133,7 +133,7 @@ protected int validateArgs(String[] otherArgs) {
* v1 v2
* \ /
* v3
- *
+ *
* @param tezConf
* @param inputPath1
* @param inputPath2
@@ -269,7 +269,7 @@ public void run() throws Exception {
/**
* Join 2 sorted inputs both from {@link KeyValuesReader} and write output
* using {@link KeyValueWriter}
- *
+ *
* @param inputReader1
* @param inputReader2
* @param writer
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index 8b1b1aee0b..cc34c2a513 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -277,7 +277,7 @@ static class ShuffleMetrics implements ChannelFutureListener {
MutableCounterLong shuffleOutputBytes;
@Metric("# of failed shuffle outputs")
MutableCounterInt shuffleOutputsFailed;
- @Metric("# of succeeeded shuffle outputs")
+ @Metric("# of succeeded shuffle outputs")
MutableCounterInt shuffleOutputsOK;
@Metric("# of current shuffle connections")
MutableGaugeInt shuffleConnections;
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
index fd01093244..e354f624f6 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
@@ -276,7 +276,7 @@ public final TaskAttemptInfo getLastTaskAttemptToFinish() {
}
/**
- * Get average task attempt duration. Includes succesful and failed tasks
+ * Get average task attempt duration. Includes successful and failed tasks
*
* @return float
*/
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
index b7d5fb5885..6e8950c86b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/internals/api/TaskReporterInterface.java
@@ -39,7 +39,7 @@ boolean taskFailed(TezTaskAttemptID taskAttemptId,
String diagnostics, EventMetaData srcMeta) throws IOException,
TezException;
- boolean taskKilled(TezTaskAttemptID taskAttemtpId, Throwable cause, String diagnostics,
+ boolean taskKilled(TezTaskAttemptID taskAttemptId, Throwable cause, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException;
void addEvents(TezTaskAttemptID taskAttemptId, Collection events);
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
index eeb24343be..81047a9f56 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskReporter.java
@@ -71,7 +71,7 @@
* Responsible for communication between tasks running in a Container and the ApplicationMaster.
* Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
* retrieve events specific to this task.
- *
+ *
*/
public class TaskReporter implements TaskReporterInterface {
@@ -171,7 +171,7 @@ static class HeartbeatCallable implements Callable {
private AtomicInteger nonOobHeartbeatCounter = new AtomicInteger(0);
private int nextHeartbeatNumToLog = 0;
/*
- * Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
+ * Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
*/
private int prevCounterSendHeartbeatNum = 0;
@@ -361,7 +361,7 @@ private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException
return askedToDie.get();
}
}
-
+
@VisibleForTesting
TaskStatusUpdateEvent getStatusUpdateEvent(boolean sendCounters) {
TezCounters counters = null;
@@ -472,10 +472,10 @@ public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID,
}
@Override
- public boolean taskKilled(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
+ public boolean taskKilled(TezTaskAttemptID taskAttemptId, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
if(!isShuttingDown()) {
- return currentCallable.taskTerminated(taskAttemptID, true, null, t, diagnostics, srcMeta);
+ return currentCallable.taskTerminated(taskAttemptId, true, null, t, diagnostics, srcMeta);
}
return false;
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index ce379b5d0a..90eeeb569b 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -105,7 +105,7 @@ public class TezTaskRunner2 {
// The callable which is being used to execute the task.
private volatile TaskRunner2Callable taskRunnerCallable;
- // This instance is set only if the runner was not configured explicity and will be shutdown
+ // This instance is set only if the runner was not configured explicitly and will be shutdown
// when this task is finished.
private final TezSharedExecutor localExecutor;
@@ -297,7 +297,7 @@ public TaskRunner2Result run() {
// It's possible for the task to actually complete, and an alternate signal such as killTask/killContainer
// come in before the future has been processed by this thread. That condition is not handled - and
- // the result of the execution will be determind by the thread order.
+ // the result of the execution will be determined by the thread order.
@VisibleForTesting
void processCallableResult(TaskRunner2CallableResult executionResult) {
if (executionResult != null) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
index 05e2d8ce78..c6264fb2f6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java
@@ -548,8 +548,8 @@ Iterable getAllSourceVertexInfo() {
return srcVertexInfo.values();
}
- SourceVertexInfo getSourceVertexInfo(String vertextName) {
- return srcVertexInfo.get(vertextName);
+ SourceVertexInfo getSourceVertexInfo(String vertexName) {
+ return srcVertexInfo.get(vertexName);
}
Iterable> getBipartiteInfo() {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
index e12331c250..0a4306a866 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -49,7 +49,7 @@ public class HttpConnection extends BaseHttpConnection {
@VisibleForTesting
protected volatile HttpURLConnection connection;
private volatile DataInputStream input;
- private volatile boolean connectionSucceeed;
+ private volatile boolean connectionSucceed;
private volatile boolean cleanup;
private final JobTokenSecretManager jobTokenSecretMgr;
@@ -147,7 +147,7 @@ private boolean connect(int connectionTimeout) throws IOException {
long connectStartTime = System.currentTimeMillis();
try {
connection.connect();
- connectionSucceeed = true;
+ connectionSucceed = true;
break;
} catch (IOException ioe) {
// Don't attempt another connect if already cleanedup.
@@ -253,7 +253,7 @@ public void validate() throws IOException {
@Override
public DataInputStream getInputStream() throws IOException {
stopWatch.reset().start();
- if (connectionSucceeed) {
+ if (connectionSucceed) {
input = new DataInputStream(new BufferedInputStream(
connection.getInputStream(), httpConnParams.getBufferSize()));
}
@@ -280,7 +280,7 @@ public void cleanup(boolean disconnect) throws IOException {
input.close();
input = null;
}
- if (httpConnParams.isKeepAlive() && connectionSucceeed) {
+ if (httpConnParams.isKeepAlive() && connectionSucceed) {
// Refer:
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
readErrorStream(connection.getErrorStream());
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
index acbdab7df9..31dccd089e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
@@ -156,7 +156,7 @@ public Response onCompleted() throws IOException {
}
/**
- * This method -- unlike Future.get() -- will block only as long,
+ * This method -- unlike Future.get() -- will block only as long,
* as headers arrive. This is useful for large transfers, to examine headers
* ASAP, and defer body streaming to it's fine destination and prevent
* unneeded bandwidth consumption. The response here will contain the very
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index 32e76f4c1c..45eea0110b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -86,9 +86,9 @@ public static Combiner instantiateCombiner(Configuration conf, TaskContext taskC
} catch (ClassNotFoundException e) {
throw new IOException("Unable to load combiner class: " + className);
}
-
+
Combiner combiner;
-
+
Constructor extends Combiner> ctor;
try {
ctor = clazz.getConstructor(TaskContext.class);
@@ -99,7 +99,7 @@ public static Combiner instantiateCombiner(Configuration conf, TaskContext taskC
}
return combiner;
}
-
+
@SuppressWarnings("unchecked")
public static Partitioner instantiatePartitioner(Configuration conf)
throws IOException {
@@ -171,7 +171,7 @@ public static URL constructBaseURIForShuffleHandlerDagComplete(
}
public static URL constructBaseURIForShuffleHandlerVertexComplete(
- String host, int port, String appId, int dagIdentifier, String vertexIndentifier, boolean sslShuffle)
+ String host, int port, String appId, int dagIdentifier, String vertexIdentifier, boolean sslShuffle)
throws MalformedURLException {
String httpProtocol = (sslShuffle) ? "https://" : "http://";
StringBuilder sb = new StringBuilder(httpProtocol);
@@ -185,7 +185,7 @@ public static URL constructBaseURIForShuffleHandlerVertexComplete(
sb.append("&dag=");
sb.append(dagIdentifier);
sb.append("&vertex=");
- sb.append(vertexIndentifier);
+ sb.append(vertexIdentifier);
return new URL(sb.toString());
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
index 19fbf2d084..7e5f3af60f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -31,8 +31,8 @@
import org.apache.tez.common.security.JobTokenSecretManager;
/**
- *
- * utilities for generating kyes, hashes and verifying them for shuffle
+ *
+ * utilities for generating keys, hashes and verifying them for shuffle
*
*/
@InterfaceAudience.Private
@@ -42,7 +42,7 @@ public final class SecureShuffleUtils {
public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
private SecureShuffleUtils() {}
-
+
/**
* Base64 encoded hash of msg
*/
@@ -124,7 +124,7 @@ public static void verifyReply(String base64Hash, String msg, JobTokenSecretMana
throw new IOException("Verification of the hashReply failed");
}
}
-
+
/**
* Shuffle specific utils - build string for encoding from URL
*
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index eb34ec2993..c700f29782 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -132,18 +132,18 @@ public String toString() {
private boolean ifileReadAhead = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT;
private int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
-
+
private final JobTokenSecretManager jobTokenSecretMgr;
private final FetcherCallback fetcherCallback;
private final FetchedInputAllocator inputManager;
private final ApplicationId appId;
private final int dagIdentifier;
-
+
private final String logIdentifier;
private final String localHostname;
-
+
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
protected final int fetcherIdentifier;
@@ -173,7 +173,7 @@ public String getHost() {
private URL url;
private volatile DataInputStream input;
-
+
BaseHttpConnection httpConnection;
private HttpConnectionParams httpConnectionParams;
@@ -345,7 +345,7 @@ public void cache(String host,
DiskFetchedInput input = (DiskFetchedInput) fetchedInput;
indexRec = new TezIndexRecord(0, decompressedLength, compressedLength);
localFs.mkdirs(outputPath.getParent());
- // avoid pit-falls of speculation
+ // avoid pitfalls of speculation
tmpPath = outputPath.suffix(tmpSuffix);
// JDK7 - TODO: use Files implementation to speed up this process
localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
@@ -459,7 +459,7 @@ protected HostFetchResult doSharedFetch() throws IOException {
srcAttemptsRemaining.values(), "Requeuing as we didn't get a lock"), null, false);
} else {
if (findInputs() == srcAttemptsRemaining.size()) {
- // double checked after lock
+ // double-checked after lock
releaseLock(lock);
lock = null;
return doLocalDiskFetch(true);
@@ -1090,7 +1090,7 @@ private boolean shouldRetry(InputAttemptIdentifier srcAttemptId, Throwable ioe)
/**
* Do some basic verification on the input received -- Being defensive
- *
+ *
* @param compressedLength
* @param decompressedLength
* @param fetchPartition
@@ -1120,7 +1120,7 @@ private boolean verifySanity(long compressedLength, long decompressedLength,
}
return true;
}
-
+
private InputAttemptIdentifier getNextRemainingAttempt() {
if (srcAttemptsRemaining.size() > 0) {
return srcAttemptsRemaining.values().iterator().next();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 5887dcb3ca..c277f57382 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -59,7 +59,7 @@
import com.google.common.annotations.VisibleForTesting;
class FetcherOrderedGrouped extends CallableWithNdc {
-
+
private static final Logger LOG = LoggerFactory.getLogger(FetcherOrderedGrouped.class);
private static final AtomicInteger nextId = new AtomicInteger(0);
@@ -240,8 +240,8 @@ private void cleanupCurrentConnection(boolean disconnect) {
/**
* The crux of the matter...
- *
- * @param host {@link MapHost} from which we need to
+ *
+ * @param host {@link MapHost} from which we need to
* shuffle available map-outputs.
*/
@VisibleForTesting
@@ -272,7 +272,7 @@ protected void copyFromHost(MapHost host) throws IOException {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
- // after putting back the remaining maps to the
+ // after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptFetchFailure[] failedTasks = null;
@@ -280,7 +280,7 @@ protected void copyFromHost(MapHost host) throws IOException {
InputAttemptIdentifier inputAttemptIdentifier =
remaining.entrySet().iterator().next().getValue();
// fail immediately after first failure because we dont know how much to
- // skip for this error in the input stream. So we cannot move on to the
+ // skip for this error in the input stream. So we cannot move on to the
// remaining outputs. YARN-1773. Will get to them in the next retry.
try {
failedTasks = copyMapOutput(host, input, inputAttemptIdentifier);
@@ -486,8 +486,8 @@ protected InputAttemptFetchFailure[] copyMapOutput(MapHost host, DataInputStream
if (!stopped) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
- // Don't know which one was bad, so consider this one bad and dont read
- // the remaining because we dont know where to start reading from. YARN-1773
+ // Don't know which one was bad, so consider this one bad and don't read
+ // the remaining because we don't know where to start reading from. YARN-1773
return new InputAttemptFetchFailure[] {
new InputAttemptFetchFailure(getNextRemainingAttempt()) };
} else {
@@ -651,7 +651,7 @@ private boolean shouldRetry(MapHost host, Throwable ioe) {
return false;
}
}
-
+
/**
* Do some basic verification on the input received -- Being defensive
* @param compressedLength
@@ -666,7 +666,7 @@ private boolean verifySanity(long compressedLength, long decompressedLength,
if (compressedLength < 0 || decompressedLength < 0) {
wrongLengthErrs.increment(1);
LOG.warn(logIdentifier + " invalid lengths in map output header: id: " +
- srcAttemptId + " len: " + compressedLength + ", decomp len: " +
+ srcAttemptId + " len: " + compressedLength + ", decomp len: " +
decompressedLength);
return false;
}
@@ -682,7 +682,7 @@ private boolean verifySanity(long compressedLength, long decompressedLength,
}
return true;
}
-
+
private InputAttemptIdentifier getNextRemainingAttempt() {
if (remaining.size() > 0) {
return remaining.values().iterator().next();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 9da1276b8a..eb13c03549 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -78,14 +78,14 @@
@InterfaceStability.Unstable
@SuppressWarnings(value={"rawtypes"})
public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
-
+
private static final Logger LOG = LoggerFactory.getLogger(MergeManager.class);
private final Configuration conf;
private final FileSystem localFS;
private final FileSystem rfs;
private final LocalDirAllocator localDirAllocator;
-
+
private final TezTaskOutputFiles mapOutputFile;
private final Progressable progressable = new Progressable() {
@Override
@@ -93,8 +93,8 @@ public void progress() {
inputContext.notifyProgress();
}
};
- private final Combiner combiner;
-
+ private final Combiner combiner;
+
@VisibleForTesting
final Set inMemoryMergedMapOutputs =
new TreeSet(new MapOutput.MapOutputComparator());
@@ -109,7 +109,7 @@ public void progress() {
final Set onDiskMapOutputs = new TreeSet();
@VisibleForTesting
final OnDiskMerger onDiskMerger;
-
+
private final long memoryLimit;
@VisibleForTesting
final long postMergeMemLimit;
@@ -120,13 +120,13 @@ public void progress() {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
- private final int memToMemMergeOutputsThreshold;
+ private final int memToMemMergeOutputsThreshold;
private final long mergeThreshold;
-
+
private final long initialMemoryAvailable;
private final ExceptionReporter exceptionReporter;
-
+
private final InputContext inputContext;
private final TezCounter spilledRecordsCounter;
@@ -134,16 +134,16 @@ public void progress() {
private final TezCounter reduceCombineInputCounter;
private final TezCounter mergedMapOutputsCounter;
-
+
private final TezCounter numMemToDiskMerges;
private final TezCounter numDiskToDiskMerges;
private final TezCounter additionalBytesWritten;
private final TezCounter additionalBytesRead;
-
+
private final CompressionCodec codec;
-
+
private volatile boolean finalMergeComplete = false;
-
+
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
private final int ifileBufferSize;
@@ -165,7 +165,7 @@ public void progress() {
*/
public MergeManager(Configuration conf,
FileSystem localFS,
- LocalDirAllocator localDirAllocator,
+ LocalDirAllocator localDirAllocator,
InputContext inputContext,
Combiner combiner,
TezCounter spilledRecordsCounter,
@@ -181,7 +181,7 @@ public MergeManager(Configuration conf,
this.localDirAllocator = localDirAllocator;
this.exceptionReporter = exceptionReporter;
this.initialMemoryAvailable = initialMemoryAvailable;
-
+
this.combiner = combiner;
this.reduceCombineInputCounter = reduceCombineInputCounter;
@@ -193,7 +193,7 @@ public MergeManager(Configuration conf,
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
-
+
this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
@@ -211,11 +211,11 @@ public MergeManager(Configuration conf,
}
this.ifileBufferSize = conf.getInt("io.file.buffer.size",
TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
-
+
// Figure out initial memory req start
final float maxInMemCopyUse =
conf.getFloat(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for " +
@@ -235,13 +235,13 @@ public MergeManager(Configuration conf,
long maxRedBuffer = (long) (inputContext.getTotalMemoryAvailableToTask() * maxRedPer);
// Figure out initial memory req end
-
+
if (this.initialMemoryAvailable < memLimit) {
this.memoryLimit = this.initialMemoryAvailable;
} else {
this.memoryLimit = memLimit;
}
-
+
if (this.initialMemoryAvailable < maxRedBuffer) {
this.postMergeMemLimit = this.initialMemoryAvailable;
} else {
@@ -257,11 +257,11 @@ public MergeManager(Configuration conf,
+ this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
}
- this.ioSortFactor =
+ this.ioSortFactor =
conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
+ TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
-
+
final float singleShuffleMemoryLimitPercent =
conf.getFloat(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
@@ -274,31 +274,31 @@ public MergeManager(Configuration conf,
}
//TODO: Cap it to MAX_VALUE until MapOutput starts supporting > 2 GB
- this.maxSingleShuffleLimit =
+ this.maxSingleShuffleLimit =
(long) Math.min((memoryLimit * singleShuffleMemoryLimitPercent), Integer.MAX_VALUE);
- this.memToMemMergeOutputsThreshold =
+ this.memToMemMergeOutputsThreshold =
conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS,
ioSortFactor);
- this.mergeThreshold =
- (long)(this.memoryLimit *
+ this.mergeThreshold =
+ (long)(this.memoryLimit *
conf.getFloat(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT_DEFAULT));
LOG.info(inputContext.getInputOutputVertexNames() + ": MergerManager: memoryLimit=" + memoryLimit + ", " +
"maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
- "mergeThreshold=" + mergeThreshold + ", " +
+ "mergeThreshold=" + mergeThreshold + ", " +
"ioSortFactor=" + ioSortFactor + ", " +
"postMergeMem=" + postMergeMemLimit + ", " +
"memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
+
if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
- throw new RuntimeException("Invlaid configuration: "
+ throw new RuntimeException("Invalid configuration: "
+ "maxSingleShuffleLimit should be less than mergeThreshold"
+ "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
+ ", mergeThreshold: " + this.mergeThreshold);
}
-
+
boolean allowMemToMemMerge =
conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
@@ -342,7 +342,7 @@ void configureAndStart() {
static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
final float maxInMemCopyUse =
conf.getFloat(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT_DEFAULT);
if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
throw new IllegalArgumentException("Invalid value for " +
@@ -353,7 +353,7 @@ static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTas
// Allow unit tests to fix Runtime memory
long memLimit = conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
(long)(maxAvailableTaskMemory * maxInMemCopyUse));
-
+
float maxRedPer = conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT,
TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
@@ -410,7 +410,7 @@ public synchronized void waitForShuffleToMergeMemory() throws InterruptedExcepti
final private MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);
@Override
- public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
+ public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier,
long requestedSize,
long compressedLength,
int fetcher
@@ -424,7 +424,7 @@ public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifie
return MapOutput.createDiskMapOutput(srcAttemptIdentifier, this, compressedLength, conf,
fetcher, true, mapOutputFile);
}
-
+
// Stall shuffle if we are above the memory limit
// It is possible that all threads could just be stalling and not make
@@ -439,7 +439,7 @@ public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifie
// (usedMemory + requestedSize > memoryLimit). When this thread is done
// fetching, this will automatically trigger a merge thereby unlocking
// all the stalled threads
-
+
if (usedMemory > memoryLimit) {
if (LOG.isDebugEnabled()) {
LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
@@ -448,7 +448,7 @@ public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifie
}
return stallShuffle;
}
-
+
// Allow the in-memory shuffle to progress
if (LOG.isDebugEnabled()) {
LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
@@ -457,7 +457,7 @@ public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifie
}
return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
}
-
+
/**
* Unconditional Reserve is used by the Memory-to-Memory thread
*/
@@ -487,7 +487,7 @@ public synchronized void releaseCommittedMemory(long size) {
@Override
- public synchronized void closeInMemoryFile(MapOutput mapOutput) {
+ public synchronized void closeInMemoryFile(MapOutput mapOutput) {
inMemoryMapOutputs.add(mapOutput);
trackAndLogCloseInMemoryFile(mapOutput);
@@ -500,7 +500,7 @@ public synchronized void closeInMemoryFile(MapOutput mapOutput) {
// This should likely run a Combiner.
if (memToMemMerger != null) {
synchronized (memToMemMerger) {
- if (!memToMemMerger.isInProgress() &&
+ if (!memToMemMerger.isInProgress() &&
inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
memToMemMerger.startMerge(inMemoryMapOutputs);
}
@@ -559,7 +559,7 @@ private void startMemToDiskMerge() {
}
}
}
-
+
public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
inMemoryMergedMapOutputs.add(mapOutput);
if (LOG.isDebugEnabled()) {
@@ -584,7 +584,7 @@ public FileSystem getLocalFileSystem() {
@Override
public synchronized void closeOnDiskFile(FileChunk file) {
- //including only path & offset for valdiations.
+ //including only path & offset for validations.
for (FileChunk fileChunk : onDiskMapOutputs) {
if (fileChunk.getPath().equals(file.getPath())) {
//ensure offsets are not the same.
@@ -627,7 +627,7 @@ private void logCloseOnDiskFile(FileChunk file) {
* Should only be used after the Shuffle phaze is complete, otherwise can
* return an invalid state since a merge may not be in progress dur to
* inadequate inputs
- *
+ *
* @return true if the merge process is complete, otherwise false
*/
@Private
@@ -716,10 +716,10 @@ void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
/**
* Merges multiple in-memory segment to another in-memory segment
*/
- private class IntermediateMemoryToMemoryMerger
+ private class IntermediateMemoryToMemoryMerger
extends MergeThread {
-
- public IntermediateMemoryToMemoryMerger(MergeManager manager,
+
+ public IntermediateMemoryToMemoryMerger(MergeManager manager,
int mergeFactor) {
super(manager, mergeFactor, exceptionReporter);
setName("MemToMemMerger [" + TezUtilsInternal
@@ -736,7 +736,7 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
inputContext.notifyProgress();
- InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
+ InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier();
List inMemorySegments = new ArrayList();
MapOutput mergedMapOutputs = null;
@@ -799,13 +799,13 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
// Nothing will be materialized to disk because the sort factor is being
// set to the number of in memory segments.
// TODO Is this doing any combination ?
- TezRawKeyValueIterator rIter =
+ TezRawKeyValueIterator rIter =
TezMerger.merge(conf, rfs,
serializationContext,
inMemorySegments, inMemorySegments.size(),
new Path(inputContext.getUniqueIdentifier()),
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- progressable, null, null, null, null);
+ progressable, null, null, null, null);
TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
@@ -823,7 +823,7 @@ public void cleanup(List inputs, boolean deleteData) throws IOExcepti
//No OP
}
}
-
+
/**
* Merges multiple in-memory segment to a disk segment
*/
@@ -845,7 +845,7 @@ public InMemoryMerger(MergeManager manager) {
+ "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}
-
+
@Override
public void merge(List inputs) throws IOException, InterruptedException {
if (inputs == null || inputs.size() == 0) {
@@ -854,16 +854,16 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
numMemToDiskMerges.increment(1);
inputContext.notifyProgress();
-
- //name this output file same as the name of the first file that is
+
+ //name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
- //be absent on the disk currently. So we don't overwrite a prev.
+ //be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
//is called (we delete empty files as soon as we see them
//in the merge method)
- //figure out the mapId
+ //figure out the mapId
srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
List inMemorySegments = new ArrayList();
@@ -872,7 +872,7 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
int noInMemorySegments = inMemorySegments.size();
// TODO Maybe track serialized vs deserialized bytes.
-
+
// All disk writes done by this merge are overhead - due to the lack of
// adequate memory to keep all segments in memory.
outputPath = mapOutputFile.getInputFileForWrite(
@@ -889,7 +889,7 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
null, null);
TezRawKeyValueIterator rIter = null;
- LOG.info("Initiating in-memory merge with " + noInMemorySegments +
+ LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
tmpDir = new Path(inputContext.getUniqueIdentifier());
@@ -919,8 +919,8 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
" files in-memory complete." +
" Local file is " + outputPath + " of size " +
outFileLen);
- } catch (IOException e) {
- //make sure that we delete the ondisk file that we created
+ } catch (IOException e) {
+ //make sure that we delete the ondisk file that we created
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
@@ -966,7 +966,7 @@ public OnDiskMerger(MergeManager manager) {
+ "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}
-
+
@Override
public void merge(List inputs) throws IOException, InterruptedException {
// sanity check
@@ -978,10 +978,10 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
inputContext.notifyProgress();
long approxOutputSize = 0;
- int bytesPerSum =
+ int bytesPerSum =
conf.getInt("io.bytes.per.checksum", 512);
-
- LOG.info("OnDiskMerger: We have " + inputs.size() +
+
+ LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
List inputSegments = new ArrayList(inputs.size());
@@ -1004,7 +1004,7 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
}
// add the checksum length
- approxOutputSize +=
+ approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
@@ -1053,9 +1053,9 @@ public void merge(List inputs) throws IOException, InterruptedExcepti
closeOnDiskFile(new FileChunk(outputPath, 0, outputLen));
LOG.info(inputContext.getInputOutputVertexNames() +
- " Finished merging " + inputs.size() +
- " map output files on disk of total-size " +
- approxOutputSize + "." +
+ " Finished merging " + inputs.size() +
+ " map output files on disk of total-size " +
+ approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
outputLen);
}
@@ -1074,9 +1074,9 @@ public void cleanup(List inputs, boolean deleteData) throws IOExcepti
}
}
}
-
+
private long createInMemorySegments(List inMemoryMapOutputs,
- List inMemorySegments,
+ List inMemorySegments,
long leaveBytes
) throws IOException {
long totalSize = 0L;
@@ -1093,11 +1093,11 @@ private long createInMemorySegments(List inMemoryMapOutputs,
long size = data.length;
totalSize += size;
fullSize -= size;
- IFile.Reader reader = new InMemoryReader(MergeManager.this,
+ IFile.Reader reader = new InMemoryReader(MergeManager.this,
mo.getAttemptIdentifier(),
data, 0, (int)size);
inMemorySegments.add(new Segment(reader,
- (mo.isPrimaryMapOutput() ?
+ (mo.isPrimaryMapOutput() ?
mergedMapOutputsCounter : null)));
}
// Bulk remove removed in-memory map outputs efficiently
@@ -1156,7 +1156,7 @@ private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
logFinalMergeStart(inMemoryMapOutputs, onDiskMapOutputs);
StringBuilder finalMergeLog = new StringBuilder();
-
+
inputContext.notifyProgress();
// merge config params
@@ -1177,19 +1177,19 @@ private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
final int numMemDiskSegments = memDiskSegments.size();
if (numMemDiskSegments > 0 &&
ioSortFactor > onDiskMapOutputs.size()) {
-
+
// If we reach here, it implies that we have less than io.sort.factor
- // disk segments and this will be incremented by 1 (result of the
- // memory segments merge). Since this total would still be
+ // disk segments and this will be incremented by 1 (result of the
+ // memory segments merge). Since this total would still be
// <= io.sort.factor, we will not do any more intermediate merges,
// the merge of all these disk segments would be directly fed to the
// reduce method
-
+
mergePhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
// Can not use spill id in final merge as it would clobber with other files, hence using
// Integer.MAX_VALUE
- final Path outputPath =
+ final Path outputPath =
mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, serContext,
@@ -1282,7 +1282,7 @@ public int compare(Segment o1, Segment o2) {
// build final list of segments from merged backed by disk + in-mem
List finalSegments = new ArrayList();
- long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
+ long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
finalSegments, 0);
if (LOG.isInfoEnabled()) {
finalMergeLog.append(". MemSeg: " + finalSegments.size() + ", " + inMemBytes);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 2b99739a58..9881c6e99a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -69,13 +69,13 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Shuffle implements ExceptionReporter {
-
+
private static final Logger LOG = LoggerFactory.getLogger(Shuffle.class);
private static final int PROGRESS_FREQUENCY = 2000;
-
+
private final Configuration conf;
private final InputContext inputContext;
-
+
private final ShuffleInputEventHandlerOrderedGrouped eventHandler;
@VisibleForTesting
final ShuffleScheduler scheduler;
@@ -92,9 +92,9 @@ public class Shuffle implements ExceptionReporter {
private final RunShuffleCallable runShuffleCallable;
private volatile ListenableFuture runShuffleFuture;
private final ListeningExecutorService executor;
-
+
private final String sourceDestNameTrimmed;
-
+
private AtomicBoolean isShutDown = new AtomicBoolean(false);
private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
@@ -111,7 +111,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> "
+ TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());
-
+
this.codec = CodecUtils.getCodec(conf);
this.ifileReadAhead = conf.getBoolean(
@@ -124,11 +124,11 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
} else {
this.ifileReadAheadLength = 0;
}
-
+
Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext);
-
+
FileSystem localFS = FileSystem.getLocal(this.conf);
- LocalDirAllocator localDirAllocator =
+ LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
// TODO TEZ Get rid of Map / Reduce references.
@@ -181,7 +181,7 @@ public Shuffle(InputContext inputContext, Configuration conf, int numInputs,
inputContext,
scheduler,
ShuffleUtils.isTezShuffleHandler(conf));
-
+
ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + sourceDestNameTrimmed + "}").build());
@@ -198,13 +198,13 @@ public void handleEvents(List events) throws IOException {
}
}
-
+
/**
* Indicates whether the Shuffle and Merge processing is complete.
* @return false if not complete, true if complete or if an error occurred.
- * @throws InterruptedException
- * @throws IOException
- * @throws InputAlreadyClosedException
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws InputAlreadyClosedException
*/
public boolean isInputReady() throws IOException, InterruptedException, TezException {
if (isShutDown.get()) {
@@ -308,12 +308,12 @@ protected TezRawKeyValueIterator callInternal() throws IOException, InterruptedE
try {
kvIter = merger.close(true);
} catch (Throwable e) {
- // Set the throwable so that future.get() sees the reported errror.
+ // Set the throwable so that future.get() sees the reported error.
throwable.set(e);
throw new ShuffleError("Error while doing final merge ", e);
}
mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
-
+
inputContext.notifyProgress();
// Sanity check
synchronized (Shuffle.this) {
@@ -395,7 +395,7 @@ public synchronized void reportException(Throwable t) {
"] from thread [" + Thread.currentThread().getName());
throwable.set(t);
throwingThreadName = Thread.currentThread().getName();
- // Notify the scheduler so that the reporting thread finds the
+ // Notify the scheduler so that the reporting thread finds the
// exception immediately.
cleanupShuffleSchedulerIgnoreErrors();
}
@@ -409,7 +409,7 @@ public synchronized void killSelf(Exception exception, String message) {
inputContext.killSelf(exception, message);
}
}
-
+
public static class ShuffleError extends IOException {
private static final long serialVersionUID = 5753909320586607881L;
@@ -422,7 +422,7 @@ public static class ShuffleError extends IOException {
public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
return MergeManager.getInitialMemoryRequirement(conf, maxAvailableTaskMemory);
}
-
+
private class ShuffleRunnerFutureCallback implements FutureCallback {
@Override
public void onSuccess(TezRawKeyValueIterator result) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 9984c5af10..2088c28bf4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -30,7 +30,6 @@
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -46,7 +45,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandler {
-
+
private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class);
private final ShuffleScheduler scheduler;
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 967f58250e..3416bcedf2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -396,13 +396,13 @@ public ShuffleScheduler(InputContext inputContext,
this.maxFailedUniqueFetches = Math.min(numberOfInputs, 5);
referee.start();
- this.maxFetchFailuresBeforeReporting =
+ this.maxFetchFailuresBeforeReporting =
conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT);
- this.reportReadErrorImmediately =
+ this.reportReadErrorImmediately =
conf.getBoolean(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT);
this.verifyDiskChecksum = conf.getBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
@@ -415,7 +415,7 @@ public ShuffleScheduler(InputContext inputContext,
this.maxTaskOutputAtOnce = Math.max(1, Math.min(75, conf.getInt(
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE,
TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE_DEFAULT)));
-
+
this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
@@ -689,7 +689,7 @@ public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifi
LOG.warn("Duplicate fetch of input no longer needs to be fetched: "
+ srcAttemptIdentifier);
// free the resource - specially memory
-
+
// If the src does not generate data, output will be null.
if (output != null) {
output.abort();
@@ -1024,7 +1024,7 @@ private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFail
String logContext = "srcAttempt=" + srcAttempt.toString();
boolean fetcherHealthy = isFetcherHealthy(logContext);
-
+
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)doneMaps / numInputs)
@@ -1034,7 +1034,7 @@ private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFail
// duration for which the reducer is stalled
int stallDuration =
(int)(System.currentTimeMillis() - lastProgressTime);
-
+
// duration for which the reducer ran with progress
int shuffleProgressDuration =
(int)(lastProgressTime - startTime);
@@ -1096,7 +1096,7 @@ public synchronized void addKnownMapOutput(String inputHostName,
notifyAll();
}
}
-
+
public void obsoleteInput(InputAttemptIdentifier srcAttempt) {
// The incoming srcAttempt does not contain a path component.
LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt);
@@ -1123,7 +1123,7 @@ public void obsoleteInput(InputAttemptIdentifier srcAttempt) {
obsoleteInputs.add(srcAttempt);
}
}
-
+
public synchronized void putBackKnownMapOutput(MapHost host,
InputAttemptIdentifier srcAttempt) {
host.addKnownMap(srcAttempt);
@@ -1301,7 +1301,7 @@ public synchronized boolean isDone() {
static class Penalty implements Delayed {
MapHost host;
private long endTime;
-
+
Penalty(MapHost host, long delay) {
this.host = host;
this.endTime = System.currentTimeMillis() + delay;
@@ -1316,7 +1316,7 @@ public int compareTo(Delayed o) {
long other = ((Penalty) o).endTime;
return endTime == other ? 0 : (endTime < other ? -1 : 1);
}
-
+
}
/**
@@ -1350,7 +1350,7 @@ public void run() {
}
}
}
-
+
void setInputFinished(int inputIndex) {
synchronized(finishedMaps) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index c5853d4a46..fbdfbc1b93 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -37,12 +37,12 @@
import org.apache.hadoop.util.DataChecksum;
/**
* A checksum input stream, used for IFiles.
- * Used to validate the checksum of files created by {@link IFileOutputStream}.
+ * Used to validate the checksum of files created by {@link IFileOutputStream}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class IFileInputStream extends InputStream {
-
+
private final InputStream in; //The input stream to be verified for checksum.
private final FileDescriptor inFd; // the file descriptor, if it is known
private final long length; //The total length of the input file
@@ -137,16 +137,16 @@ public void close() throws IOException {
}
in.close();
}
-
+
@Override
public long skip(long n) throws IOException {
throw new IOException("Skip not supported for IFileInputStream");
}
-
+
public long getPosition() {
return (currentOffset >= dataLength) ? dataLength : currentOffset;
}
-
+
public long getSize() {
return checksumSize;
}
@@ -167,11 +167,11 @@ private void checksum(byte[] b, int off, int len) {
System.arraycopy(b, off, buffer, offset, len);
offset += len;
}
-
+
/**
* Read bytes from the stream.
* At EOF, checksum is validated, but the checksum
- * bytes are not passed back in the buffer.
+ * bytes are not passed back in the buffer.
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
@@ -207,13 +207,13 @@ public int readWithChecksum(byte[] b, int off, int len) throws IOException {
}
else if (currentOffset >= dataLength) {
// If the previous read drained off all the data, then just return
- // the checksum now. Note that checksum validation would have
+ // the checksum now. Note that checksum validation would have
// happened in the earlier read
int lenToCopy = (int) (checksumSize - (currentOffset - dataLength));
if (len < lenToCopy) {
lenToCopy = len;
}
- System.arraycopy(csum, (int) (currentOffset - dataLength), b, off,
+ System.arraycopy(csum, (int) (currentOffset - dataLength), b, off,
lenToCopy);
currentOffset += lenToCopy;
return lenToCopy;
@@ -232,21 +232,21 @@ else if (currentOffset >= dataLength) {
}
private int doRead(byte[]b, int off, int len) throws IOException {
-
+
// If we are trying to read past the end of data, just read
- // the left over data
+ // the leftover data
int origLen = len;
if (currentOffset + len > dataLength) {
len = (int) (dataLength - currentOffset);
}
-
+
int bytesRead = in.read(b, off, len);
if (bytesRead < 0) {
String mesg = " CurrentOffset=" + currentOffset +
", offset=" + offset +
", off=" + off +
- ", dataLength=" + dataLength +
+ ", dataLength=" + dataLength +
", origLen=" + origLen +
", len=" + len +
", length=" + length +
@@ -262,7 +262,7 @@ private int doRead(byte[]b, int off, int len) throws IOException {
if (disableChecksumValidation) {
return bytesRead;
}
-
+
if (currentOffset == dataLength) {
//TODO: add checksumSize to currentOffset.
// The last four bytes are checksum. Strip them and verify
@@ -272,13 +272,13 @@ private int doRead(byte[]b, int off, int len) throws IOException {
if (!sum.compare(csum, 0)) {
String mesg = "CurrentOffset=" + currentOffset +
", off=" + offset +
- ", dataLength=" + dataLength +
+ ", dataLength=" + dataLength +
", origLen=" + origLen +
", len=" + len +
", length=" + length +
", checksumSize=" + checksumSize+
", csum=" + Arrays.toString(csum) +
- ", sum=" + sum;
+ ", sum=" + sum;
LOG.info(mesg);
throw new ChecksumException("Checksum Error: " + mesg, 0);
@@ -289,11 +289,11 @@ private int doRead(byte[]b, int off, int len) throws IOException {
@Override
- public int read() throws IOException {
+ public int read() throws IOException {
b[0] = 0;
int l = read(b,0,1);
if (l < 0) return l;
-
+
// Upgrade the b[0] to an int so as not to misinterpret the
// first bit of the byte as a sign bit
int result = 0xFF & b[0];
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index d22988533c..061ba18384 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -64,70 +64,68 @@
public final class TezMerger {
private static final Logger LOG = LoggerFactory.getLogger(TezMerger.class);
-
+
// Local directories
private static final LocalDirAllocator L_DIR_ALLOC =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
private TezMerger() {}
- public static
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- SerializationContext serializationContext,
- CompressionCodec codec, boolean ifileReadAhead,
- int ifileReadAheadLength, int ifileBufferSize,
- Path[] inputs, boolean deleteInputs,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
- throws IOException, InterruptedException {
- return
- new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
- ifileReadAheadLength, ifileBufferSize, false, comparator,
- reporter, null).merge(serializationContext,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter,
- mergePhase);
+ public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ SerializationContext serializationContext,
+ CompressionCodec codec, boolean ifileReadAhead,
+ int ifileReadAheadLength, int ifileBufferSize,
+ Path[] inputs, boolean deleteInputs,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase)
+ throws IOException, InterruptedException {
+ return
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, false, comparator,
+ reporter, null).merge(serializationContext,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ bytesReadCounter,
+ mergePhase);
}
// Used by the in-memory merger.
- public static
- TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- SerializationContext serializationContext,
- List segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
- throws IOException, InterruptedException {
+ public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ SerializationContext serializationContext,
+ List segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase)
+ throws IOException, InterruptedException {
// Get rid of this ?
return merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
- comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter,
- mergePhase);
+ comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter,
+ mergePhase);
}
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- SerializationContext serializationContext,
- List segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
- throws IOException, InterruptedException {
+ SerializationContext serializationContext,
+ List segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase)
+ throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, false).merge(serializationContext, mergeFactor, tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter, mergePhase);
+ sortSegments, false).merge(serializationContext, mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ bytesReadCounter, mergePhase);
}
public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
@@ -148,85 +146,85 @@ public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- SerializationContext serializationContext,
- CompressionCodec codec,
- List segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- boolean considerFinalMergeForProgress,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase, boolean checkForSameKeys)
- throws IOException, InterruptedException {
+ SerializationContext serializationContext,
+ CompressionCodec codec,
+ List segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ boolean considerFinalMergeForProgress,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase, boolean checkForSameKeys)
+ throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec, considerFinalMergeForProgress, checkForSameKeys).
- merge(serializationContext,
- mergeFactor, tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter,
- mergePhase);
+ sortSegments, codec, considerFinalMergeForProgress, checkForSameKeys).
+ merge(serializationContext,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ bytesReadCounter,
+ mergePhase);
}
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- SerializationContext serializationContext,
- CompressionCodec codec,
- List segments,
- int mergeFactor, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- boolean considerFinalMergeForProgress,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
- throws IOException, InterruptedException {
+ SerializationContext serializationContext,
+ CompressionCodec codec,
+ List segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ boolean considerFinalMergeForProgress,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase)
+ throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec, considerFinalMergeForProgress).
- merge(serializationContext, mergeFactor, tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter,
- mergePhase);
+ sortSegments, codec, considerFinalMergeForProgress).
+ merge(serializationContext, mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ bytesReadCounter,
+ mergePhase);
}
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
- SerializationContext serializationContext,
- CompressionCodec codec,
- List segments,
- int mergeFactor, int inMemSegments, Path tmpDir,
- RawComparator comparator, Progressable reporter,
- boolean sortSegments,
- TezCounter readsCounter,
- TezCounter writesCounter,
- TezCounter bytesReadCounter,
- Progress mergePhase)
- throws IOException, InterruptedException {
- return new MergeQueue(conf, fs, segments, comparator, reporter,
- sortSegments, codec, false).merge(serializationContext,
- mergeFactor, inMemSegments,
- tmpDir,
- readsCounter, writesCounter,
- bytesReadCounter,
- mergePhase);
-}
+ SerializationContext serializationContext,
+ CompressionCodec codec,
+ List segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter bytesReadCounter,
+ Progress mergePhase)
+ throws IOException, InterruptedException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments, codec, false).merge(serializationContext,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter,
+ bytesReadCounter,
+ mergePhase);
+ }
public static
void writeFile(TezRawKeyValueIterator records, Writer writer,
- Progressable progressable, long recordsBeforeProgress)
- throws IOException, InterruptedException {
+ Progressable progressable, long recordsBeforeProgress)
+ throws IOException, InterruptedException {
long recordCtr = 0;
long count = 0;
- while(records.next()) {
+ while (records.next()) {
if (records.isSameKey()) {
writer.append(IFile.REPEAT_KEY, records.getValue());
count++;
} else {
writer.append(records.getKey(), records.getValue());
}
-
+
if (((recordCtr++) % recordsBeforeProgress) == 0) {
progressable.progress();
if (Thread.currentThread().isInterrupted()) {
@@ -236,7 +234,7 @@ void writeFile(TezRawKeyValueIterator records, Writer writer,
* 10000 records or so.
*/
throw new InterruptedException("Current thread=" + Thread.currentThread().getName() + " got "
- + "interrupted");
+ + "interrupted");
}
}
}
@@ -381,18 +379,18 @@ public DiskSegment(FileSystem fs, Path file,
}
public DiskSegment(FileSystem fs, Path file,
- CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLenth,
+ CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
int bufferSize, boolean preserve, TezCounter mergedMapOutputsCounter)
throws IOException {
this(fs, file, 0, fs.getFileStatus(file).getLen(), codec,
- ifileReadAhead, ifileReadAheadLenth, bufferSize, preserve,
+ ifileReadAhead, ifileReadAheadLength, bufferSize, preserve,
mergedMapOutputsCounter);
}
public DiskSegment(FileSystem fs, Path file,
long segmentOffset, long segmentLength,
CompressionCodec codec, boolean ifileReadAhead,
- int ifileReadAheadLength, int bufferSize,
+ int ifileReadAheadLength, int bufferSize,
boolean preserve) {
this(fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
ifileReadAheadLength, bufferSize, preserve, null);
@@ -473,9 +471,9 @@ static class MergeQueue
static final int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
static final int ifileBufferSize = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT;
static final long recordsBeforeProgress = TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT;
-
+
private List segments = new ArrayList<>();
-
+
final RawComparator comparator;
private long totalBytesProcessed;
@@ -487,14 +485,14 @@ static class MergeQueue
private final boolean considerFinalMergeForProgress;
final Progressable reporter;
-
+
final DataInputBuffer key = new DataInputBuffer();
final DataInputBuffer value = new DataInputBuffer();
final DataInputBuffer nextKey = new DataInputBuffer();
final DataInputBuffer diskIFileValue = new DataInputBuffer();
-
+
Segment minSegment;
- Comparator segmentComparator =
+ Comparator segmentComparator =
new Comparator() {
public int compare(Segment o1, Segment o2) {
if (o1.getLength() == o2.getLength()) {
@@ -508,13 +506,13 @@ public int compare(Segment o1, Segment o2) {
KeyState hasNext;
DataOutputBuffer prevKey = new DataOutputBuffer();
- public MergeQueue(Configuration conf, FileSystem fs,
+ public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int ifileBufferSize,
boolean considerFinalMergeForProgress,
- RawComparator comparator, Progressable reporter,
- TezCounter mergedMapOutputsCounter)
+ RawComparator comparator, Progressable reporter,
+ TezCounter mergedMapOutputsCounter)
throws IOException {
this.conf = conf;
this.checkForSameKeys = true;
@@ -526,23 +524,23 @@ public MergeQueue(Configuration conf, FileSystem fs,
this.comparator = comparator;
this.reporter = reporter;
this.considerFinalMergeForProgress = considerFinalMergeForProgress;
-
+
for (Path file : inputs) {
if (LOG.isTraceEnabled()) {
LOG.trace("MergeQ: adding: " + file);
}
segments.add(new DiskSegment(fs, file, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize,
- !deleteInputs,
+ !deleteInputs,
(file.toString().endsWith(
- Constants.MERGED_OUTPUT_PREFIX) ?
+ Constants.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter)));
}
-
+
// Sort segments on file-lengths
segments.sort(segmentComparator);
}
-
+
public MergeQueue(Configuration conf, FileSystem fs,
List segments, RawComparator comparator,
Progressable reporter, boolean sortSegments, boolean considerFinalMergeForProgress) {
@@ -668,7 +666,7 @@ public boolean next() throws IOException {
//the same byte[] since it would corrupt the data in the inmem
//segment. So we maintain an explicit DIB for value bytes
//obtained from disk, and if the current segment is a disk
- //segment, we reset the "value" DIB to the byte[] in that (so
+ //segment, we reset the "value" DIB to the byte[] in that (so
//we reuse the disk segment DIB whenever we consider
//a disk segment).
minSegment.getValue(diskIFileValue);
@@ -703,7 +701,7 @@ protected boolean lessThan(Object a, Object b) {
return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
}
-
+
public TezRawKeyValueIterator merge(SerializationContext serializationContext,
int factor, Path tmpDir,
TezCounter readsCounter,
@@ -747,7 +745,7 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
-
+
//create the MergeStreams from the sorted map created in the constructor
//and dump the final output to a file
do {
@@ -764,9 +762,9 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
int numSegmentsToConsider = factor;
long startBytes = 0; // starting bytes of segments of this merge
while (true) {
- //extract the smallest 'factor' number of segments
+ //extract the smallest 'factor' number of segments
//Call cleanup on the empty segments (no key/value data)
- List mStream =
+ List mStream =
getSegmentDescriptors(numSegmentsToConsider);
for (Segment segment : mStream) {
// Initialize the segment at the last possible moment;
@@ -776,7 +774,7 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
long startPos = segment.getPosition();
boolean hasNext = segment.nextRawKey(nextKey);
long endPos = segment.getPosition();
-
+
if (hasNext) {
startBytes += endPos - startPos;
segmentsToMerge.add(segment);
@@ -789,7 +787,7 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
}
//if we have the desired number of segments
//or looked at all available segments, we break
- if (segmentsConsidered == factor ||
+ if (segmentsConsidered == factor ||
segments.size() == 0) {
break;
}
@@ -797,14 +795,14 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
// Get the correct # of segments in case some of them were empty.
numSegmentsToConsider = factor - segmentsConsidered;
}
-
+
//feed the streams to the priority queue
initialize(segmentsToMerge.size());
clear();
for (Segment segment : segmentsToMerge) {
put(segment);
}
-
+
//if we have lesser number of segments remaining, then just return the
//iterator, else do another single level merge
if (numSegments <= factor) { // Will always kick in if only in-mem segments are provided.
@@ -822,7 +820,7 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
}
if (totalBytes != 0) //being paranoid
progPerByte = 1.0f / (float)totalBytes;
-
+
totalBytesProcessed += startBytes;
if (totalBytes != 0)
mergeProgress.set(totalBytesProcessed * progPerByte);
@@ -845,19 +843,19 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
" intermediate segments out of a total of " +
(segments.size() + segmentsToMerge.size()));
}
-
+
long bytesProcessedInPrevMerges = totalBytesProcessed;
totalBytesProcessed += startBytes;
- //we want to spread the creation of temp files on multiple disks if
+ //we want to spread the creation of temp files on multiple disks if
//available under the space constraints
- long approxOutputSize = 0;
+ long approxOutputSize = 0;
for (Segment s : segmentsToMerge) {
- approxOutputSize += s.getLength() +
+ approxOutputSize += s.getLength() +
ChecksumFileSystem.getApproxChkSumLength(
s.getLength());
}
- Path tmpFilename =
+ Path tmpFilename =
new Path(tmpDir, "intermediate").suffix("." + passNo);
Path outputFile = L_DIR_ALLOC.getLocalPathForWrite(
@@ -873,13 +871,13 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
writeFile(this, writer, reporter, recordsBeforeProgress);
writer.close();
-
- //we finished one single level merge; now clean up the priority
+
+ //we finished one single level merge; now clean up the priority
//queue
this.close();
// Add the newly create segment to the list of segments to be merged
- Segment tempSegment =
+ Segment tempSegment =
new DiskSegment(fs, outputFile, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false);
@@ -892,8 +890,8 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
}
segments.add(pos, tempSegment);
numSegments = segments.size();
-
- // Subtract the difference between expected size of new segment and
+
+ // Subtract the difference between expected size of new segment and
// actual size of new segment(Expected size of new segment is
// inputBytesOfThisMerge) from totalBytes. Expected size and actual
// size will match(almost) if combiner is not called in merge.
@@ -903,15 +901,15 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
-
+
passNo++;
}
- //we are worried about only the first pass merge factor. So reset the
+ //we are worried about only the first pass merge factor. So reset the
//factor to what it originally was
factor = origFactor;
} while(true);
}
-
+
/**
* Determine the number of segments to merge in a given pass. Assuming more
* than factor segments, the first pass should attempt to bring the total
@@ -920,14 +918,16 @@ TezRawKeyValueIterator merge(SerializationContext serializationContext,
*/
private static int getPassFactor(int factor, int passNo, int numSegments) {
// passNo > 1 in the OR list - is that correct ?
- if (passNo > 1 || numSegments <= factor || factor == 1)
+ if (passNo > 1 || numSegments <= factor || factor == 1) {
return factor;
+ }
int mod = (numSegments - 1) % (factor - 1);
- if (mod == 0)
+ if (mod == 0) {
return factor;
+ }
return mod + 1;
}
-
+
/** Return (& remove) the requested number of segment descriptors from the
* sorted map.
*/
@@ -944,7 +944,7 @@ private List getSegmentDescriptors(int numDescriptors) {
subList.clear();
return subListCopy;
}
-
+
/**
* Compute expected size of input bytes to merges, will be used in
* calculating mergeProgress. This simulates the above merge() method and
@@ -963,13 +963,13 @@ static long computeBytesInMerges(List segments, int factor, int inMem,
// factor for 1st pass
int f = getPassFactor(factor, 1, n) + inMem;
n = numSegments;
-
+
for (int i = 0; i < numSegments; i++) {
// Not handling empty segments here assuming that it would not affect
// much in calculation of mergeProgress.
segmentSizes[i] = segments.get(i).getLength();
}
-
+
// If includeFinalMerge is true, allow the following while loop iterate
// for 1 more iteration. This is to include final merge as part of the
// computation of expected input bytes of merges
@@ -986,7 +986,7 @@ static long computeBytesInMerges(List segments, int factor, int inMem,
mergedSize += segmentSizes[offset + j];
}
totalBytes += mergedSize;
-
+
// insert new size into the sorted list
int pos = Arrays.binarySearch(segmentSizes, offset, offset + n, mergedSize);
if (pos < 0) {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index faf75866b9..dbd4794b47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -1110,7 +1110,7 @@ private void mergeAll() throws IOException {
ifileBufferSize);
while (reader.nextRawKey(keyBufferIFile)) {
// TODO Inefficient. If spills are not compressed, a direct copy should be possible
- // given the current IFile format. Also exteremely inefficient for large records,
+ // given the current IFile format. Also extremely inefficient for large records,
// since the entire record will be read into memory.
reader.nextRawValue(valBufferIFile);
writer.append(keyBufferIFile, valBufferIFile);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
index c4e8694a8a..02e8a8950a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
@@ -276,7 +276,7 @@ public UnorderedKVInputConfig.SpecificBuilder configureInput() {
/**
* Build and return an instance of the configuration
- * @return an instance of the acatual configuration
+ * @return an instance of the actual configuration
*/
public UnorderedKVEdgeConfig build() {
return new UnorderedKVEdgeConfig(outputBuilder.build(), inputBuilder.build());
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
index 0d8a5aef75..08cfa6e668 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
@@ -281,7 +281,7 @@ public UnorderedKVInputConfig.SpecificBuilder configureInput() {
/**
* Build and return an instance of the configuration
- * @return an instance of the acatual configuration
+ * @return an instance of the actual configuration
*/
public UnorderedPartitionedKVEdgeConfig build() {
return new UnorderedPartitionedKVEdgeConfig(outputBuilder.build(), inputBuilder.build());
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 38d5295094..28914df6bc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -55,7 +55,7 @@
/**
* {@link UnorderedKVInput} provides unordered key value input by
- * bringing together (shuffling) a set of distributed data and providing a
+ * bringing together (shuffling) a set of distributed data and providing a
* unified view to that data. There are no ordering constraints applied by
* this input.
*/
@@ -95,9 +95,9 @@ public synchronized List initialize() throws Exception {
+ getContext().getInputOutputVertexNames());
return Collections.emptyList();
} else {
- long initalMemReq = getInitialMemoryReq();
+ long initialMemReq = getInitialMemoryReq();
memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
- this.getContext().requestInitialMemory(initalMemReq, memoryUpdateCallbackHandler);
+ this.getContext().requestInitialMemory(initialMemReq, memoryUpdateCallbackHandler);
}
this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs());
@@ -215,7 +215,7 @@ public synchronized List close() throws Exception {
if (this.shuffleManager != null) {
this.shuffleManager.shutdown();
}
-
+
long dataSize = getContext().getCounters()
.findCounter(TaskCounter.SHUFFLE_BYTES_DECOMPRESSED).getValue();
getContext().getStatisticsReporter().reportDataSize(dataSize);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index c237bc15e4..8c882f67c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -35,7 +35,7 @@
/**
* Implements an {@link AbstractLogicalIOProcessor} and provides empty
* implementations of most methods and handles input/output initialization.
- * This can be used to implement simple {@link Processor}s that dont need to
+ * This can be used to implement simple {@link Processor}s that dont need to
* do event handling etc.
*/
@Public
@@ -70,9 +70,9 @@ public void run(Map _inputs, Map _o
public abstract void run() throws Exception;
/**
- * Implements input/output initialization. Can be overriden
+ * Implements input/output initialization. Can be overridden
* to implement custom behavior. Called before {@link #run()}
- * is called.
+ * is called.
* @throws Exception
*/
protected void preOp() throws Exception {
@@ -90,7 +90,7 @@ protected void preOp() throws Exception {
}
/**
- * Called after {@link #run()} is called and can be used to
+ * Called after {@link #run()} is called and can be used to
* do post-processing like committing output etc
* @throws Exception
*/
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index da4d9ea76c..d1c4708aee 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -84,13 +84,13 @@ public class FilterLinesByWord extends Configured implements Tool {
private static Logger LOG = LoggerFactory.getLogger(FilterLinesByWord.class);
public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
-
+
private boolean exitOnCompletion = false;
public FilterLinesByWord(boolean exitOnCompletion) {
this.exitOnCompletion = exitOnCompletion;
}
-
+
private static void printUsage() {
System.err.println("Usage filtelinesrbyword [-generateSplitsInClient true/]");
ToolRunner.printGenericCommandUsage(System.err);
@@ -162,7 +162,7 @@ public int run(String[] args) throws Exception {
TezClient tezSession = TezClient.create("FilterLinesByWordSession", tezConf,
commonLocalResources, credentials);
- tezSession.start(); // Why do I need to start the TezSession.
+ tezSession.start(); // Why do I need to start the TezSession?
Configuration stage1Conf = new JobConf(conf);
stage1Conf.set(FILTER_PARAM_NAME, filterWord);
@@ -248,9 +248,9 @@ public int run(String[] args) throws Exception {
return -1;
}
}
-
+
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
-
+
} finally {
fs.delete(stagingDir, true);
tezSession.stop();
@@ -260,7 +260,7 @@ public int run(String[] args) throws Exception {
LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
}
-
+
public static void main(String[] args) throws Exception {
FilterLinesByWord fl = new FilterLinesByWord(true);
int status = ToolRunner.run(new Configuration(), fl, args);
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 00205036f4..a79b68a852 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -77,7 +77,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
private static void printUsage() {
- System.err.println("Usage filterLinesByWordOneToOne "
+ System.err.println("Usage filterLinesByWordOneToOne "
+ " [-generateSplitsInClient true/]");
ToolRunner.printGenericCommandUsage(System.err);
}
@@ -112,7 +112,7 @@ public int run(String[] otherArgs) throws Exception {
String inputPath = otherArgs[0];
String outputPath = otherArgs[1];
String filterWord = otherArgs[2];
-
+
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(outputPath))) {
@@ -148,7 +148,7 @@ public int run(String[] otherArgs) throws Exception {
TezClient tezSession = TezClient.create("FilterLinesByWordSession", tezConf,
commonLocalResources, null);
- tezSession.start(); // Why do I need to start the TezSession.
+ tezSession.start(); // Why do I need to start the TezSession?
Configuration stage1Conf = new JobConf(conf);
stage1Conf.set(FILTER_PARAM_NAME, filterWord);
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
index 227e498510..21419e7179 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/helpers/SplitsInClientOptionParser.java
@@ -48,7 +48,7 @@ public String[] getRemainingArgs() {
@SuppressWarnings("static-access")
public boolean parse(String[] args, boolean defaultVal) throws ParseException {
Preconditions.checkState(parsed == false,
- "Craete a new instance for different option sets");
+ "Create a new instance for different option sets");
parsed = true;
Options opts = new Options();
Option opt = OptionBuilder
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
index efa39a3223..74fff0ee02 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -143,7 +143,7 @@ public void analyze(DagInfo dagInfo) throws TezException {
record.add(shuffleMaxSource);
record.add(Math.max(0, slowestLastEventTime) + "");
record.add(maxSourceName);
- //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+ //Finding out real_work done at vertex level might be meaningless (as it is quite possible
// that it went to starvation).
StringBuilder sb = new StringBuilder();
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
index 026dd1593f..f0ce205418 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -52,7 +52,7 @@ public class SpillAnalyzerImpl extends TezAnalyzerBase implements Analyzer {
private final CSVResult csvResult;
/**
- * Minimum output bytes that should be chunrned out by a task
+ * Minimum output bytes that should be churned out by a task
*/
private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+ ".threshold";
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 90acf3b0a0..b1689ff829 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -41,7 +41,7 @@ public class SVGUtils {
private static int MAX_DAG_RUNTIME = 0;
private static final int SCREEN_WIDTH = 1800;
- public SVGUtils() {
+ public SVGUtils() {
}
private int Y_MAX;
@@ -71,29 +71,29 @@ public static String getTimeStr(final long millis) {
long seconds = millis - TimeUnit.MINUTES.toMillis(
TimeUnit.MILLISECONDS.toMinutes(millis));
b.append(secondFormat.format(seconds/1000.0) + "s");
-
- return b.toString();
+
+ return b.toString();
}
-
+
List svgLines = new LinkedList<>();
-
+
private final int addOffsetX(int x) {
int xOff = x + X_BASE;
X_MAX = Math.max(X_MAX, xOff);
return xOff;
}
-
+
private final int addOffsetY(int y) {
int yOff = y + Y_BASE;
Y_MAX = Math.max(Y_MAX, yOff);
return yOff;
}
-
+
private int scaleDown(int len) {
return Math.round((len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH);
}
-
- private void addRectStr(int x, int width, int y, int height,
+
+ private void addRectStr(int x, int width, int y, int height,
String fillColor, String borderColor, float opacity, String title) {
String rectStyle = "stroke: " + borderColor + "; fill: " + fillColor + "; opacity: " + opacity;
String rectStr = ""
+ " " + title +""
+ " ";
- svgLines.add(rectStr);
+ svgLines.add(rectStr);
}
-
+
private void addTextStr(int x, int y, String text, String anchor, int size, String title, boolean italic) {
String textStyle = "text-anchor: " + anchor + "; font-style: " + (italic?"italic":"normal") +
"; font-size: " + size + "px;";
@@ -118,7 +118,7 @@ private void addTextStr(int x, int y, String text, String anchor, int size, Stri
+ "";
svgLines.add(textStr);
}
-
+
private void addLineStr(int x1, int y1, int x2, int y2, String color, String title, int width) {
String style = "stroke: " + color + "; stroke-width:" + width;
String str = "";
svgLines.add(str);
}
-
+
public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) {
if (step.getType() != EntityType.ATTEMPT) {
// draw initial vertex or final commit overhead
StringBuilder title = new StringBuilder();
String text = null;
if (step.getType() == EntityType.VERTEX_INIT) {
- String vertex = step.getAttempt().getTaskInfo().getVertexInfo().getVertexName();
+ String vertex = step.getAttempt().getTaskInfo().getVertexInfo().getVertexName();
text = vertex + " : Init";
title.append(text).append(TITLE_BR);
} else {
@@ -165,9 +165,9 @@ public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) {
int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
int creationTimeInterval = (int) (attempt.getCreationTime() - dagStartTime);
- int allocationTimeInterval = attempt.getAllocationTime() > 0 ?
+ int allocationTimeInterval = attempt.getAllocationTime() > 0 ?
(int) (attempt.getAllocationTime() - dagStartTime) : 0;
- int launchTimeInterval = attempt.getStartTime() > 0 ?
+ int launchTimeInterval = attempt.getStartTime() > 0 ?
(int) (attempt.getStartTime() - dagStartTime) : 0;
int finishTimeInterval = (int) (attempt.getFinishTime() - dagStartTime);
LOG.debug(attempt.getTaskAttemptId() + " " + creationTimeInterval + " "
@@ -178,7 +178,7 @@ public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) {
title.append("Critical Path Dependency: " + step.getReason()).append(TITLE_BR);
title.append("Completion Status: " + attempt.getDetailedStatus()).append(TITLE_BR);
title.append(
- "Critical Time Contribution: " +
+ "Critical Time Contribution: " +
getTimeStr(step.getStopCriticalTime() - step.getStartCriticalTime())).append(TITLE_BR);
title.append("Critical start at: " + getTimeStr(startCriticalTimeInterval)).append(TITLE_BR);
title.append("Critical stop at: " + getTimeStr(stopCriticalTimeInterval)).append(TITLE_BR);
@@ -201,29 +201,29 @@ public void drawStep(CriticalPathStep step, long dagStartTime, int yOffset) {
if (launchTimeInterval > 0) {
addRectStr(allocationTimeInterval, launchTimeInterval - allocationTimeInterval,
yOffset * STEP_GAP, STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY,
- titleStr);
+ titleStr);
addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP,
STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
} else {
// no launch - so allocate to finish drawn - ended while launching
addRectStr(allocationTimeInterval, finishTimeInterval - allocationTimeInterval, yOffset * STEP_GAP,
- STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
}
} else {
// no allocation - so create to finish drawn - ended while allocating
addRectStr(creationTimeInterval, finishTimeInterval - creationTimeInterval, yOffset * STEP_GAP,
- STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
+ STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
}
addTextStr((finishTimeInterval + creationTimeInterval) / 2,
- (yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE,
+ (yOffset * STEP_GAP + STEP_GAP / 2), attempt.getShortName(), "middle", TEXT_SIZE,
titleStr, !attempt.isSucceeded());
}
}
private void drawCritical(DagInfo dagInfo, List criticalPath) {
long dagStartTime = dagInfo.getStartTime();
- int dagStartTimeInterval = 0; // this is 0 since we are offseting from the dag start time
+ int dagStartTimeInterval = 0; // this is 0 since we are offsetting from the dag start time
int dagFinishTimeInterval = (int) (dagInfo.getFinishTime() - dagStartTime);
if (dagInfo.getFinishTime() <= 0) {
// AM crashed. no dag finish time written
@@ -231,13 +231,13 @@ private void drawCritical(DagInfo dagInfo, List criticalPath)
- dagStartTime);
}
MAX_DAG_RUNTIME = dagFinishTimeInterval;
-
+
// draw grid
addLineStr(dagStartTimeInterval, 0, dagFinishTimeInterval, 0, BORDER_COLOR, "", TICK);
int yGrid = (criticalPath.size() + 2)*STEP_GAP;
for (int i=0; i<11; ++i) {
int x = Math.round(((dagFinishTimeInterval - dagStartTimeInterval)/10.0f)*i);
- addLineStr(x, 0, x, yGrid, BORDER_COLOR, "", TICK);
+ addLineStr(x, 0, x, yGrid, BORDER_COLOR, "", TICK);
addTextStr(x, 0, getTimeStr(x), "left", TEXT_SIZE, "", false);
}
addLineStr(dagStartTimeInterval, yGrid, dagFinishTimeInterval, yGrid, BORDER_COLOR, "", TICK);
@@ -247,21 +247,21 @@ private void drawCritical(DagInfo dagInfo, List criticalPath)
// draw steps
for (int i=1; i<=criticalPath.size(); ++i) {
- CriticalPathStep step = criticalPath.get(i-1);
- drawStep(step, dagStartTime, i);
+ CriticalPathStep step = criticalPath.get(i-1);
+ drawStep(step, dagStartTime, i);
}
-
+
// draw critical path on top
for (int i=1; i<=criticalPath.size(); ++i) {
- CriticalPathStep step = criticalPath.get(i-1);
- boolean isLast = i == criticalPath.size();
-
+ CriticalPathStep step = criticalPath.get(i-1);
+ boolean isLast = i == criticalPath.size();
+
// draw critical path for step
int startCriticalTimeInterval = (int) (step.getStartCriticalTime() - dagStartTime);
int stopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
addLineStr(startCriticalTimeInterval, (i + 1) * STEP_GAP, stopCriticalTimeInterval,
(i + 1) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5);
-
+
if (isLast) {
// last step. add commit overhead
int stepStopCriticalTimeInterval = (int) (step.getStopCriticalTime() - dagStartTime);
@@ -274,12 +274,12 @@ private void drawCritical(DagInfo dagInfo, List criticalPath)
(i + 2) * STEP_GAP, CRITICAL_COLOR, "Critical Time " + step.getAttempt().getShortName(), TICK*5);
}
}
-
+
// draw legend
int legendX = 0;
int legendY = (criticalPath.size() + 2) * STEP_GAP;
int legendWidth = dagFinishTimeInterval/5;
-
+
addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, VERTEX_INIT_COMMIT_COLOR, BORDER_COLOR, RECT_OPACITY, "");
addTextStr(legendX, legendY + STEP_GAP/3, "Vertex Init/Commit Overhead", "left", TEXT_SIZE, "", false);
legendY += STEP_GAP/2;
@@ -291,17 +291,17 @@ private void drawCritical(DagInfo dagInfo, List criticalPath)
legendY += STEP_GAP/2;
addRectStr(legendX, legendWidth, legendY, STEP_GAP/2, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, "");
addTextStr(legendX, legendY + STEP_GAP/3, "Task Execution Time", "left", TEXT_SIZE, "", false);
-
+
Y_MAX += Y_BASE*2;
X_MAX += X_BASE*2;
}
-
- public void saveCriticalPathAsSVG(DagInfo dagInfo,
+
+ public void saveCriticalPathAsSVG(DagInfo dagInfo,
String fileName, List criticalPath) {
drawCritical(dagInfo, criticalPath);
saveFileStr(fileName);
}
-
+
private void saveFileStr(String fileName) {
String header = " "
+ "