Skip to content

Commit afbc774

Browse files
committed
TEZ-4227 Introduce convenient methods in TezID subclasses
Change-Id: I6cabfa75e9b6b62e41ba8c2cc5e3d2d1a8a49102
1 parent 211b59b commit afbc774

83 files changed

Lines changed: 741 additions & 684 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.apache.tez.dag.records;
2+
3+
import org.apache.hadoop.yarn.api.records.ApplicationId;
4+
5+
public interface DAGIDHolder {
6+
TezDAGID getDAGId();
7+
8+
default ApplicationId getApplicationId() {
9+
return getDAGId().getApplicationId();
10+
}
11+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.apache.tez.dag.records;
2+
3+
public interface TaskAttemptIDHolder extends TaskIDHolder {
4+
TezTaskAttemptID getTaskAttemptID();
5+
6+
@Override
7+
default TezTaskID getTaskID() {
8+
return getTaskAttemptID().getTaskID();
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.apache.tez.dag.records;
2+
3+
public interface TaskIDHolder extends VertexIDHolder {
4+
TezTaskID getTaskID();
5+
6+
@Override
7+
default TezVertexID getVertexID() {
8+
return getTaskID().getVertexID();
9+
}
10+
}

tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
*/
4646
@InterfaceAudience.Public
4747
@InterfaceStability.Stable
48-
public class TezTaskAttemptID extends TezID {
48+
public class TezTaskAttemptID extends TezID implements TaskIDHolder {
4949
public static final String ATTEMPT = "attempt";
5050
private TezTaskID taskId;
5151

@@ -73,6 +73,7 @@ private TezTaskAttemptID(TezTaskID taskId, int id) {
7373
}
7474

7575
/** Returns the {@link TezTaskID} object that this task attempt belongs to */
76+
@Override
7677
public TezTaskID getTaskID() {
7778
return taskId;
7879
}

tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
*/
4242
@InterfaceAudience.Public
4343
@InterfaceStability.Stable
44-
public class TezTaskID extends TezID {
44+
public class TezTaskID extends TezID implements VertexIDHolder {
4545
public static final String TASK = "task";
4646
private final int serializingHash;
4747

@@ -79,6 +79,7 @@ public int getSerializingHash() {
7979
}
8080

8181
/** Returns the {@link TezVertexID} object that this task belongs to */
82+
@Override
8283
public TezVertexID getVertexID() {
8384
return vertexId;
8485
}

tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*/
4545
@InterfaceAudience.Public
4646
@InterfaceStability.Stable
47-
public class TezVertexID extends TezID {
47+
public class TezVertexID extends TezID implements DAGIDHolder {
4848
public static final String VERTEX = "vertex";
4949
static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() {
5050

@@ -79,6 +79,7 @@ private TezVertexID(TezDAGID dagId, int id) {
7979
}
8080

8181
/** Returns the {@link TezDAGID} object that this tip belongs to */
82+
@Override
8283
public TezDAGID getDAGId() {
8384
return dagId;
8485
}
@@ -158,5 +159,4 @@ public static TezVertexID fromString(String vertexIdStr) {
158159
}
159160
return null;
160161
}
161-
162162
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.apache.tez.dag.records;
2+
3+
public interface VertexIDHolder extends DAGIDHolder {
4+
TezVertexID getVertexID();
5+
6+
@Override
7+
default TezDAGID getDAGId() {
8+
return getVertexID().getDAGId();
9+
}
10+
}

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2187,12 +2187,12 @@ private class TaskEventDispatcher implements EventHandler<TaskEvent> {
21872187
public void handle(TaskEvent event) {
21882188
DAG dag = context.getCurrentDAG();
21892189
int eventDagIndex =
2190-
event.getTaskID().getVertexID().getDAGId().getId();
2190+
event.getDAGId().getId();
21912191
if (dag == null || eventDagIndex != dag.getID().getId()) {
21922192
return; // event not relevant any more
21932193
}
21942194
Task task =
2195-
dag.getVertex(event.getTaskID().getVertexID()).
2195+
dag.getVertex(event.getVertexID()).
21962196
getTask(event.getTaskID());
21972197
((EventHandler<TaskEvent>)task).handle(event);
21982198
}
@@ -2217,13 +2217,13 @@ private class TaskAttemptEventDispatcher
22172217
public void handle(TaskAttemptEvent event) {
22182218
DAG dag = context.getCurrentDAG();
22192219
int eventDagIndex =
2220-
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
2220+
event.getDAGId().getId();
22212221
if (dag == null || eventDagIndex != dag.getID().getId()) {
22222222
return; // event not relevant any more
22232223
}
22242224
Task task =
2225-
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID()).
2226-
getTask(event.getTaskAttemptID().getTaskID());
2225+
dag.getVertex(event.getVertexID()).
2226+
getTask(event.getTaskID());
22272227
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
22282228
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
22292229
}
@@ -2236,13 +2236,13 @@ private class VertexEventDispatcher
22362236
public void handle(VertexEvent event) {
22372237
DAG dag = context.getCurrentDAG();
22382238
int eventDagIndex =
2239-
event.getVertexId().getDAGId().getId();
2239+
event.getDAGId().getId();
22402240
if (dag == null || eventDagIndex != dag.getID().getId()) {
22412241
return; // event not relevant any more
22422242
}
22432243

22442244
Vertex vertex =
2245-
dag.getVertex(event.getVertexId());
2245+
dag.getVertex(event.getVertexID());
22462246
((EventHandler<VertexEvent>) vertex).handle(event);
22472247
}
22482248
}

tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -846,19 +846,19 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
846846
case TASK_STARTED:
847847
{
848848
TaskStartedEvent taskStartedEvent = (TaskStartedEvent) event;
849-
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getTaskID().getVertexID());
849+
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskStartedEvent.getVertexID());
850850
Preconditions.checkArgument(vertexRecoveryData != null,
851-
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getTaskID().getVertexID());
851+
"Invalid TaskStartedEvent, its vertex does not exist:" + taskStartedEvent.getVertexID());
852852
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskStartedEvent.getTaskID());
853853
taskRecoveryData.taskStartedEvent = taskStartedEvent;
854854
break;
855855
}
856856
case TASK_FINISHED:
857857
{
858858
TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent) event;
859-
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getTaskID().getVertexID());
859+
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(taskFinishedEvent.getVertexID());
860860
Preconditions.checkArgument(vertexRecoveryData != null,
861-
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getTaskID().getVertexID());
861+
"Invalid TaskFinishedEvent, its vertex does not exist:" + taskFinishedEvent.getVertexID());
862862
TaskRecoveryData taskRecoveryData = vertexRecoveryData.maybeCreateTaskRecoveryData(taskFinishedEvent.getTaskID());
863863
taskRecoveryData.taskFinishedEvent = taskFinishedEvent;
864864
break;
@@ -867,7 +867,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
867867
{
868868
TaskAttemptStartedEvent taStartedEvent = (TaskAttemptStartedEvent)event;
869869
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
870-
taStartedEvent.getTaskAttemptID().getTaskID().getVertexID());
870+
taStartedEvent.getVertexID());
871871
Preconditions.checkArgument(vertexRecoveryData != null,
872872
"Invalid TaskAttemptStartedEvent, its vertexId does not exist, taId=" + taStartedEvent.getTaskAttemptID());
873873
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap
@@ -882,7 +882,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
882882
{
883883
TaskAttemptFinishedEvent taFinishedEvent = (TaskAttemptFinishedEvent)event;
884884
VertexRecoveryData vertexRecoveryData = recoveredDAGData.vertexRecoveryDataMap.get(
885-
taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID());
885+
taFinishedEvent.getVertexID());
886886
Preconditions.checkArgument(vertexRecoveryData != null,
887887
"Invalid TaskAttemtFinishedEvent, its vertexId does not exist, taId=" + taFinishedEvent.getTaskAttemptID());
888888
TaskRecoveryData taskRecoveryData = vertexRecoveryData.taskRecoveryDataMap

tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,14 +350,14 @@ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
350350
}
351351
}
352352
if (!eventsForVertex.isEmpty()) {
353-
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
353+
TezVertexID vertexId = taskAttemptID.getVertexID();
354354
sendEvent(
355355
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
356356
}
357357
taskHeartbeatHandler.pinged(taskAttemptID);
358358
eventInfo = context
359359
.getCurrentDAG()
360-
.getVertex(taskAttemptID.getTaskID().getVertexID())
360+
.getVertex(taskAttemptID.getVertexID())
361361
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
362362
request.getMaxEvents());
363363
}
@@ -442,7 +442,7 @@ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
442442

443443
DAG job = context.getCurrentDAG();
444444
Task task =
445-
job.getVertex(taskAttemptId.getTaskID().getVertexID()).
445+
job.getVertex(taskAttemptId.getVertexID()).
446446
getTask(taskAttemptId.getTaskID());
447447
return task.canCommit(taskAttemptId);
448448
}

0 commit comments

Comments
 (0)