Skip to content

Commit c583b70

Browse files
committed
TEZ-4548: InputDataInformationEvent to be read from serialized payload from filesystem
1 parent 34bb628 commit c583b70

9 files changed

Lines changed: 188 additions & 37 deletions

File tree

tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public final class InputDataInformationEvent extends Event {
4949
private final int sourceIndex;
5050
private int targetIndex; // TODO Likely to be multiple at a later point.
5151
private final ByteBuffer userPayload;
52+
private String serializedPath;
53+
5254
private final Object userPayloadObject;
5355

5456

@@ -79,6 +81,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex,
7981
return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null);
8082
}
8183

84+
public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) {
85+
InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null);
86+
event.serializedPath = serializedPath;
87+
return event;
88+
}
89+
8290
public int getSourceIndex() {
8391
return this.sourceIndex;
8492
}
@@ -90,11 +98,15 @@ public int getTargetIndex() {
9098
public void setTargetIndex(int target) {
9199
this.targetIndex = target;
92100
}
93-
101+
102+
public String getSerializedPath() {
103+
return serializedPath;
104+
}
105+
94106
public ByteBuffer getUserPayload() {
95107
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
96108
}
97-
109+
98110
public Object getDeserializedUserPayload() {
99111
return this.userPayloadObject;
100112
}
@@ -104,5 +116,5 @@ public String toString() {
104116
return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
105117
+ targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
106118
+ ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]";
107-
}
119+
}
108120
}

tez-api/src/main/proto/Events.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ message RootInputDataInformationEventProto {
5858
optional int32 source_index = 1;
5959
optional int32 target_index = 2;
6060
optional bytes user_payload = 3;
61+
optional bytes serialized_path = 4;
6162
}
6263

6364
message CompositeEventProto {

tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java renamed to tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.tez.runtime.api.event;
19+
package org.apache.tez.runtime.api.events;
2020

2121
import java.nio.ByteBuffer;
2222

23-
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
24-
import org.apache.tez.runtime.api.events.DataMovementEvent;
2523
import org.junit.Assert;
2624
import org.junit.Test;
2725

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.tez.runtime.api.events;
19+
20+
import java.nio.ByteBuffer;
21+
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
import com.google.common.base.Charsets;
26+
27+
public class TestInputDataInformationEvent {
28+
29+
@Test
30+
public void testApiPayloadOrPath() {
31+
InputDataInformationEvent eventWithSerializedPayload =
32+
InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes()));
33+
// event created by createWithSerializedPayload should contain serialized payload
34+
// but not a path or a deserialized payload
35+
Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString());
36+
Assert.assertNull(eventWithSerializedPayload.getSerializedPath());
37+
Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload());
38+
39+
InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2");
40+
// event created by eventWithObjectPayload should contain a deserialized payload
41+
// but not a path or serialized payload
42+
Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload());
43+
Assert.assertNull(eventWithObjectPayload.getSerializedPath());
44+
Assert.assertNull(eventWithObjectPayload.getUserPayload());
45+
46+
InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello");
47+
// event created by createWithSerializedPath should contain a path
48+
// but neither serialized nor deserialized payload
49+
Assert.assertEquals("file://hello", eventWithPath.getSerializedPath());
50+
Assert.assertNull(eventWithPath.getUserPayload());
51+
Assert.assertNull(eventWithPath.getDeserializedUserPayload());
52+
}
53+
}

tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,9 +272,8 @@ public List<Event> call() throws Exception {
272272
List<Event> events = ugi.doAs(new PrivilegedExceptionAction<List<Event>>() {
273273
@Override
274274
public List<Event> run() throws Exception {
275-
LOG.info(
276-
"Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() +
277-
" on vertex " + initializerWrapper.getVertexLogIdentifier());
275+
LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(),
276+
initializerWrapper.getVertexLogIdentifier());
278277
try {
279278
TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
280279
initializerWrapper.vertexId);

tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.tez.mapreduce.hadoop;
2020

2121
import java.io.IOException;
22+
import java.nio.ByteBuffer;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collections;
@@ -30,19 +31,23 @@
3031
import java.util.Objects;
3132

3233
import com.google.common.base.Function;
34+
import com.google.common.base.Strings;
35+
3336
import org.apache.tez.common.Preconditions;
3437
import com.google.common.collect.Iterables;
3538
import com.google.common.collect.Lists;
3639
import com.google.protobuf.ByteString;
3740

3841
import org.apache.tez.runtime.api.InputContext;
42+
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
3943
import org.slf4j.Logger;
4044
import org.slf4j.LoggerFactory;
4145
import org.apache.hadoop.classification.InterfaceAudience;
4246
import org.apache.hadoop.classification.InterfaceAudience.Public;
4347
import org.apache.hadoop.classification.InterfaceStability;
4448
import org.apache.hadoop.classification.InterfaceStability.Unstable;
4549
import org.apache.hadoop.conf.Configuration;
50+
import org.apache.hadoop.fs.FSDataInputStream;
4651
import org.apache.hadoop.fs.FileStatus;
4752
import org.apache.hadoop.fs.FileSystem;
4853
import org.apache.hadoop.fs.Path;
@@ -72,6 +77,7 @@
7277
import org.apache.tez.mapreduce.input.MRInput;
7378
import org.apache.tez.mapreduce.input.MRInputLegacy;
7479
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
80+
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
7581

7682
@Public
7783
@Unstable
@@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
889895
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
890896
}
891897

898+
public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
899+
return !Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromFs(initEvent, jobConf)
900+
: readProtoFromPayload(initEvent);
901+
}
902+
903+
private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
904+
String serializedPath = initEvent.getSerializedPath();
905+
Path filePath = new Path(serializedPath);
906+
LOG.info("Reading InputDataInformationEvent from path: {}", filePath);
907+
908+
MRSplitProto splitProto = null;
909+
FileSystem fs = FileSystem.get(filePath.toUri(), jobConf);
910+
911+
try (FSDataInputStream in = fs.open(filePath)) {
912+
splitProto = MRSplitProto.parseFrom(in);
913+
fs.delete(filePath, false);
914+
}
915+
return splitProto;
916+
}
917+
918+
private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException {
919+
ByteBuffer payload = initEvent.getUserPayload();
920+
LOG.info("Reading InputDataInformationEvent from payload, size: {} bytes}", payload.limit());
921+
return MRSplitProto.parseFrom(ByteString.copyFrom(payload));
922+
}
892923
}

tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import java.util.concurrent.locks.Condition;
2828
import java.util.concurrent.locks.ReentrantLock;
2929

30-
import com.google.protobuf.ByteString;
31-
3230
import org.apache.tez.runtime.api.ProgressFailedException;
3331
import org.slf4j.Logger;
3432
import org.slf4j.LoggerFactory;
@@ -72,6 +70,7 @@
7270
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
7371

7472
import org.apache.tez.common.Preconditions;
73+
7574
import com.google.common.collect.Lists;
7675

7776
/**
@@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
672671
LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event");
673672
}
674673
Objects.requireNonNull(initEvent, "InitEvent must be specified");
675-
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
674+
MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf);
676675
Object splitObj = null;
677676
long splitLength = -1;
678677
if (useNewApi) {

tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@
4242
import org.apache.hadoop.yarn.api.records.LocalResource;
4343
import org.apache.tez.dag.api.DataSourceDescriptor;
4444
import org.apache.tez.dag.api.TaskLocationHint;
45+
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
46+
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
47+
import org.junit.After;
4548
import org.junit.Assert;
49+
import org.junit.Before;
4650
import org.junit.BeforeClass;
4751
import org.junit.Test;
4852

53+
import com.google.protobuf.ByteString;
54+
4955
public class TestMRInputHelpers {
5056

5157
protected static MiniDFSCluster dfsCluster;
@@ -56,9 +62,12 @@ public class TestMRInputHelpers {
5662
private static Path oldSplitsDir;
5763
private static Path newSplitsDir;
5864

59-
private static String TEST_ROOT_DIR = "target"
65+
private static final String TEST_ROOT_DIR = "target"
6066
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir";
6167

68+
private static final Path LOCAL_TEST_ROOT_DIR = new Path("target"
69+
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");
70+
6271
@BeforeClass
6372
public static void setup() throws IOException {
6473
try {
@@ -188,6 +197,43 @@ public void testInputSplitLocalResourceCreation() throws Exception {
188197
MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
189198
}
190199

200+
@Test
201+
public void testInputEventSerializedPayload() throws IOException {
202+
MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build();
203+
204+
InputDataInformationEvent initEvent =
205+
InputDataInformationEvent.createWithSerializedPayload(0, proto.toByteString().asReadOnlyByteBuffer());
206+
MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf));
207+
208+
Assert.assertEquals(proto, protoFromEvent);
209+
}
210+
211+
@Test
212+
public void testInputEventSerializedPath() throws IOException {
213+
MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build();
214+
215+
FileSystem localFs = FileSystem.getLocal(conf);
216+
Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);
217+
218+
Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload");
219+
220+
try (FSDataOutputStream out = localFs.create(serializedPath)) {
221+
proto.writeTo(out);
222+
}
223+
224+
// event file is present on fs
225+
Assert.assertTrue(localFs.exists(serializedPath));
226+
227+
InputDataInformationEvent initEvent =
228+
InputDataInformationEvent.createWithSerializedPath(0, serializedPath.toUri().toString());
229+
MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf));
230+
231+
Assert.assertEquals(proto, protoFromEvent);
232+
233+
// event file is delete after read
234+
Assert.assertFalse(localFs.exists(serializedPath));
235+
}
236+
191237
private void verifyLocationHints(Path inputSplitsDir,
192238
List<TaskLocationHint> actual) throws Exception {
193239
JobID jobId = new JobID("dummy", 1);
@@ -232,30 +278,32 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits
232278
@Test(timeout = 5000)
233279
public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception {
234280
FileSystem localFs = FileSystem.getLocal(conf);
235-
Path LOCAL_TEST_ROOT_DIR = new Path("target"
236-
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");
281+
Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);
237282

238-
try {
239-
localFs.mkdirs(LOCAL_TEST_ROOT_DIR);
240-
241-
Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR);
283+
DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir);
242284

243-
DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir);
244-
245-
Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles();
285+
Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles();
246286

247-
Assert.assertEquals(2, localResources.size());
248-
Assert.assertTrue(localResources.containsKey(
249-
MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
250-
Assert.assertTrue(localResources.containsKey(
251-
MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
287+
Assert.assertEquals(2, localResources.size());
288+
Assert.assertTrue(localResources.containsKey(
289+
MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
290+
Assert.assertTrue(localResources.containsKey(
291+
MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
252292

253-
for (LocalResource lr : localResources.values()) {
254-
Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme()));
255-
}
256-
} finally {
257-
localFs.delete(LOCAL_TEST_ROOT_DIR, true);
293+
for (LocalResource lr : localResources.values()) {
294+
Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme()));
258295
}
259296
}
260297

298+
@Before
299+
public void before() throws IOException {
300+
FileSystem localFs = FileSystem.getLocal(conf);
301+
localFs.mkdirs(LOCAL_TEST_ROOT_DIR);
302+
}
303+
304+
@After
305+
public void after() throws IOException {
306+
FileSystem localFs = FileSystem.getLocal(conf);
307+
localFs.delete(LOCAL_TEST_ROOT_DIR, true);
308+
}
261309
}

tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
package org.apache.tez.common;
2020

21+
import com.google.common.base.Charsets;
2122
import com.google.protobuf.ByteString;
2223

24+
import java.nio.ByteBuffer;
25+
2326
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
2427
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
2528
import org.apache.tez.runtime.api.events.DataMovementEvent;
@@ -135,15 +138,22 @@ public static VertexManagerEvent convertVertexManagerEventFromProto(
135138
if (event.getUserPayload() != null) {
136139
builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
137140
}
141+
if (event.getSerializedPath() != null) {
142+
builder.setSerializedPath(ByteString.copyFrom(event.getSerializedPath().getBytes(Charsets.UTF_8)));
143+
}
138144
return builder.build();
139145
}
140146

141-
public static InputDataInformationEvent
142-
convertRootInputDataInformationEventFromProto(
147+
public static InputDataInformationEvent convertRootInputDataInformationEventFromProto(
143148
EventProtos.RootInputDataInformationEventProto proto) {
144-
InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
145-
proto.getSourceIndex(),
146-
proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null);
149+
ByteBuffer payload = proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null;
150+
InputDataInformationEvent diEvent = null;
151+
if (!proto.getSerializedPath().isEmpty()) {
152+
diEvent = InputDataInformationEvent.createWithSerializedPath(proto.getSourceIndex(),
153+
proto.getSerializedPath().toStringUtf8());
154+
} else {
155+
diEvent = InputDataInformationEvent.createWithSerializedPayload(proto.getSourceIndex(), payload);
156+
}
147157
diEvent.setTargetIndex(proto.getTargetIndex());
148158
return diEvent;
149159
}

0 commit comments

Comments
 (0)