Skip to content

Commit 71a7ab9

Browse files
authored
MR: Add InputFormat (#843)
1 parent 2dfc455 commit 71a7ab9

File tree

8 files changed

+1175
-19
lines changed

8 files changed

+1175
-19
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ spark/tmp/
3535
.project
3636
.settings
3737
bin/
38+
39+
# Hive/metastore files
40+
metastore_db/

build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,24 @@ project(':iceberg-hive') {
220220
}
221221
}
222222

223+
project(':iceberg-mr') {
224+
dependencies {
225+
compile project(':iceberg-api')
226+
compile project(':iceberg-core')
227+
compile project(':iceberg-orc')
228+
compile project(':iceberg-parquet')
229+
compile project(':iceberg-data')
230+
231+
compileOnly("org.apache.hadoop:hadoop-client") {
232+
exclude group: 'org.apache.avro', module: 'avro'
233+
}
234+
235+
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
236+
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
237+
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
238+
}
239+
}
240+
223241
project(':iceberg-orc') {
224242
dependencies {
225243
compile project(':iceberg-api')

core/src/main/java/org/apache/iceberg/hadoop/Util.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,23 @@
1919

2020
package org.apache.iceberg.hadoop;
2121

22+
import com.google.common.collect.Sets;
2223
import java.io.IOException;
24+
import java.util.Arrays;
25+
import java.util.Set;
2326
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.BlockLocation;
2428
import org.apache.hadoop.fs.FileSystem;
2529
import org.apache.hadoop.fs.Path;
30+
import org.apache.iceberg.CombinedScanTask;
31+
import org.apache.iceberg.FileScanTask;
2632
import org.apache.iceberg.exceptions.RuntimeIOException;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
public class Util {
37+
private static final Logger LOG = LoggerFactory.getLogger(Util.class);
2738

28-
class Util {
2939
private Util() {
3040
}
3141

@@ -36,4 +46,21 @@ public static FileSystem getFs(Path path, Configuration conf) {
3646
throw new RuntimeIOException(e, "Failed to get file system for path: %s", path);
3747
}
3848
}
49+
50+
public static String[] blockLocations(CombinedScanTask task, Configuration conf) {
51+
Set<String> locationSets = Sets.newHashSet();
52+
for (FileScanTask f : task.files()) {
53+
Path path = new Path(f.file().path().toString());
54+
try {
55+
FileSystem fs = path.getFileSystem(conf);
56+
for (BlockLocation b : fs.getFileBlockLocations(path, f.start(), f.length())) {
57+
locationSets.addAll(Arrays.asList(b.getHosts()));
58+
}
59+
} catch (IOException ioe) {
60+
LOG.warn("Failed to get block locations for path {}", path, ioe);
61+
}
62+
}
63+
64+
return locationSets.toArray(new String[0]);
65+
}
3966
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr;
21+
22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.IOException;
25+
import java.io.ObjectInputStream;
26+
import java.io.ObjectOutputStream;
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.Base64;
29+
import org.apache.iceberg.exceptions.RuntimeIOException;
30+
31+
public class SerializationUtil {
32+
33+
private SerializationUtil() {
34+
}
35+
36+
public static byte[] serializeToBytes(Object obj) {
37+
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
38+
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
39+
oos.writeObject(obj);
40+
return baos.toByteArray();
41+
} catch (IOException e) {
42+
throw new RuntimeIOException("Failed to serialize object", e);
43+
}
44+
}
45+
46+
@SuppressWarnings("unchecked")
47+
public static <T> T deserializeFromBytes(byte[] bytes) {
48+
if (bytes == null) {
49+
return null;
50+
}
51+
52+
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
53+
ObjectInputStream ois = new ObjectInputStream(bais)) {
54+
return (T) ois.readObject();
55+
} catch (IOException e) {
56+
throw new RuntimeIOException("Failed to deserialize object", e);
57+
} catch (ClassNotFoundException e) {
58+
throw new RuntimeException("Could not read object ", e);
59+
}
60+
}
61+
62+
public static String serializeToBase64(Object obj) {
63+
byte[] bytes = serializeToBytes(obj);
64+
return new String(Base64.getMimeEncoder().encode(bytes), StandardCharsets.UTF_8);
65+
}
66+
67+
public static <T> T deserializeFromBase64(String base64) {
68+
if (base64 == null) {
69+
return null;
70+
}
71+
byte[] bytes = Base64.getMimeDecoder().decode(base64.getBytes(StandardCharsets.UTF_8));
72+
return deserializeFromBytes(bytes);
73+
}
74+
}

0 commit comments

Comments
 (0)