Skip to content

Commit a1c8e6e

Browse files
committed
Added more test cases
1 parent 4068021 commit a1c8e6e

File tree

4 files changed

+178
-39
lines changed

4 files changed

+178
-39
lines changed

core/src/main/java/org/apache/iceberg/hadoop/Util.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36-
3736
public class Util {
3837
private static final Logger LOG = LoggerFactory.getLogger(Util.class);
3938

mr/src/main/java/org/apache/iceberg/mr/SerializationUtil.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import java.io.ObjectOutputStream;
2727
import java.nio.charset.StandardCharsets;
2828
import java.util.Base64;
29-
import java.util.zip.GZIPInputStream;
30-
import java.util.zip.GZIPOutputStream;
3129
import org.apache.iceberg.exceptions.RuntimeIOException;
3230

3331

@@ -38,8 +36,7 @@ private SerializationUtil() {
3836

3937
public static byte[] serializeToBytes(Object obj) {
4038
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
41-
GZIPOutputStream gos = new GZIPOutputStream(baos);
42-
ObjectOutputStream oos = new ObjectOutputStream(gos)) {
39+
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
4340
oos.writeObject(obj);
4441
return baos.toByteArray();
4542
} catch (IOException e) {
@@ -54,8 +51,7 @@ public static <T> T deserializeFromBytes(byte[] bytes) {
5451
}
5552

5653
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
57-
GZIPInputStream gis = new GZIPInputStream(bais);
58-
ObjectInputStream ois = new ObjectInputStream(gis)) {
54+
ObjectInputStream ois = new ObjectInputStream(bais)) {
5955
return (T) ois.readObject();
6056
} catch (IOException e) {
6157
throw new RuntimeIOException("Failed to deserialize object", e);

mr/src/main/java/org/apache/iceberg/mr/IcebergInputFormat.java renamed to mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iceberg.mr;
20+
package org.apache.iceberg.mr.mapreduce;
2121

2222
import com.google.common.base.Preconditions;
2323
import com.google.common.collect.Iterators;
2424
import com.google.common.collect.Lists;
25+
import com.google.common.collect.Sets;
2526
import java.io.Closeable;
2627
import java.io.DataInput;
2728
import java.io.DataOutput;
@@ -65,6 +66,7 @@
6566
import org.apache.iceberg.hadoop.Util;
6667
import org.apache.iceberg.io.CloseableIterable;
6768
import org.apache.iceberg.io.InputFile;
69+
import org.apache.iceberg.mr.SerializationUtil;
6870
import org.apache.iceberg.orc.ORC;
6971
import org.apache.iceberg.parquet.Parquet;
7072
import org.apache.iceberg.types.TypeUtil;
@@ -85,7 +87,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
8587
static final String FILTER_EXPRESSION = "iceberg.mr.filter.expression";
8688
static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
8789
static final String READ_SCHEMA = "iceberg.mr.read.schema";
88-
static final String REUSE_CONTAINERS = "iceberg.mr.case.sensitive";
90+
static final String REUSE_CONTAINERS = "iceberg.mr.reuse.containers";
8991
static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id";
9092
static final String SPLIT_SIZE = "iceberg.mr.split.size";
9193
static final String TABLE_PATH = "iceberg.mr.table.path";
@@ -197,7 +199,7 @@ public ConfigBuilder usePigTuples() {
197199
* can correctly apply the residual filters, then it
198200
* should call this api. Otherwise the current
199201
* api will throw an exception if the passed in
200-
* filter is not completely satisfied. Note. This
202+
* filter is not completely satisfied. Note: This
201203
* does not apply to standalone MR application
202204
*/
203205
public ConfigBuilder platformAppliesFilterResiduals() {
@@ -264,7 +266,7 @@ private static void checkResiduals(Configuration conf, CombinedScanTask task) {
264266
if (residual != null && !residual.equals(Expressions.alwaysTrue())) {
265267
throw new RuntimeException(
266268
String.format(
267-
"Filter expression %s is not completely satisfied . Additional rows " +
269+
"Filter expression %s is not completely satisfied. Additional rows " +
268270
"can be returned not satisfied by the filter expression", residual));
269271
}
270272
});
@@ -346,14 +348,15 @@ private Iterator<T> open(FileScanTask currentTask) {
346348
DataFile file = currentTask.file();
347349
// schema of rows returned by readers
348350
PartitionSpec spec = currentTask.spec();
349-
Set<Integer> idColumns = spec.identitySourceIds();
350351
Schema readSchema = expectedSchema != null ? expectedSchema : tableSchema;
352+
Set<Integer> idColumns = Sets.intersection(spec.identitySourceIds(), TypeUtil.getProjectedIds(readSchema));
351353
boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
354+
352355
if (hasJoinedPartitionColumns) {
353-
readSchema = TypeUtil.selectNot(tableSchema, idColumns);
354-
Schema identityPartitionSchema = TypeUtil.select(tableSchema, idColumns);
356+
Schema readDataSchema = TypeUtil.selectNot(readSchema, idColumns);
357+
Schema identityPartitionSchema = TypeUtil.select(readSchema, idColumns);
355358
return Iterators.transform(
356-
open(currentTask, readSchema),
359+
open(currentTask, readDataSchema),
357360
row -> withPartitionColumns(row, identityPartitionSchema, spec, file.partition()));
358361
} else {
359362
return open(currentTask, readSchema);
@@ -482,15 +485,15 @@ private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTas
482485

483486
private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
484487
ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
485-
.schema(readSchema)
488+
.project(readSchema)
486489
.caseSensitive(caseSensitive)
487490
.split(task.start(), task.length());
488491
// ORC does not support reuse containers yet
489492
switch (inMemoryDataModel) {
490493
case PIG:
491494
case HIVE:
492-
//TODO implement value readers for Pig and Hive
493-
throw new UnsupportedOperationException();
495+
//TODO: implement value readers for Pig and Hive
496+
throw new UnsupportedOperationException("In memory representation not yet supported for Pig and Hive");
494497
case DEFAULT:
495498
//TODO: We do not have support for Iceberg generics for ORC
496499
throw new UnsupportedOperationException();
@@ -502,6 +505,7 @@ private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask ta
502505

503506
private static Table findTable(Configuration conf) {
504507
String path = conf.get(TABLE_PATH);
508+
Preconditions.checkArgument(path != null, "Table path should not be null");
505509
String catalogFuncClass = conf.get(CATALOG);
506510
if (catalogFuncClass != null) {
507511
Function<Configuration, Catalog> catalogFunc
@@ -521,8 +525,8 @@ private static Table findTable(Configuration conf) {
521525
}
522526
}
523527

524-
private static class IcebergSplit extends InputSplit implements Writable {
525-
private static final String[] ANYWHERE = new String[]{"*"};
528+
static class IcebergSplit extends InputSplit implements Writable {
529+
static final String[] ANYWHERE = new String[]{"*"};
526530
private CombinedScanTask task;
527531
private transient String[] locations;
528532
private transient Configuration conf;

mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormat.java renamed to mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java

Lines changed: 159 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
* under the License.
1818
*/
1919

20-
package org.apache.iceberg.mr;
20+
package org.apache.iceberg.mr.mapreduce;
2121

2222
import com.google.common.collect.FluentIterable;
2323
import com.google.common.collect.ImmutableMap;
24+
import com.google.common.collect.ImmutableSet;
2425
import java.io.File;
2526
import java.io.IOException;
2627
import java.util.ArrayList;
@@ -34,6 +35,7 @@
3435
import org.apache.hadoop.mapreduce.TaskAttemptContext;
3536
import org.apache.hadoop.mapreduce.TaskAttemptID;
3637
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
38+
import org.apache.iceberg.AssertHelpers;
3739
import org.apache.iceberg.DataFile;
3840
import org.apache.iceberg.DataFiles;
3941
import org.apache.iceberg.FileFormat;
@@ -51,10 +53,12 @@
5153
import org.apache.iceberg.data.Record;
5254
import org.apache.iceberg.data.avro.DataWriter;
5355
import org.apache.iceberg.data.parquet.GenericParquetWriter;
56+
import org.apache.iceberg.expressions.Expressions;
5457
import org.apache.iceberg.hadoop.HadoopCatalog;
5558
import org.apache.iceberg.hadoop.HadoopTables;
5659
import org.apache.iceberg.io.FileAppender;
5760
import org.apache.iceberg.parquet.Parquet;
61+
import org.apache.iceberg.types.TypeUtil;
5862
import org.apache.iceberg.types.Types;
5963
import org.junit.Assert;
6064
import org.junit.Before;
@@ -69,15 +73,15 @@
6973

7074
@RunWith(Parameterized.class)
7175
public class TestIcebergInputFormat {
72-
private static final Schema SCHEMA = new Schema(
76+
static final Schema SCHEMA = new Schema(
7377
required(1, "data", Types.StringType.get()),
74-
required(3, "id", Types.LongType.get()),
75-
required(2, "date", Types.StringType.get()));
78+
required(2, "id", Types.LongType.get()),
79+
required(3, "date", Types.StringType.get()));
7680

77-
private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
78-
.identity("date")
79-
.bucket("id", 1)
80-
.build();
81+
static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
82+
.identity("date")
83+
.bucket("id", 1)
84+
.build();
8185

8286
@Rule
8387
public TemporaryFolder temp = new TemporaryFolder();
@@ -116,7 +120,10 @@ public void testUnpartitionedTable() throws Exception {
116120
table.newAppend()
117121
.appendFile(dataFile)
118122
.commit();
119-
validate(conf, location.toString(), null, expectedRecords);
123+
Job job = Job.getInstance(conf);
124+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
125+
configBuilder.readFrom(location.toString());
126+
validate(job, expectedRecords);
120127
}
121128

122129
@Test
@@ -132,7 +139,136 @@ public void testPartitionedTable() throws Exception {
132139
table.newAppend()
133140
.appendFile(dataFile)
134141
.commit();
135-
validate(conf, location.toString(), null, expectedRecords);
142+
143+
Job job = Job.getInstance(conf);
144+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
145+
configBuilder.readFrom(location.toString());
146+
validate(job, expectedRecords);
147+
}
148+
149+
@Test
150+
public void testFilterExp() throws Exception {
151+
File location = temp.newFolder(format.name());
152+
Assert.assertTrue(location.delete());
153+
Table table = tables.create(SCHEMA, SPEC,
154+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
155+
location.toString());
156+
List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L);
157+
expectedRecords.get(0).set(2, "2020-03-20");
158+
expectedRecords.get(1).set(2, "2020-03-20");
159+
DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords);
160+
DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
161+
RandomGenericData.generate(table.schema(), 2, 0L));
162+
table.newAppend()
163+
.appendFile(dataFile1)
164+
.appendFile(dataFile2)
165+
.commit();
166+
Job job = Job.getInstance(conf);
167+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
168+
configBuilder.readFrom(location.toString())
169+
.filter(Expressions.equal("date", "2020-03-20"));
170+
validate(job, expectedRecords);
171+
}
172+
173+
@Test
174+
public void testResiduals() throws Exception {
175+
File location = temp.newFolder(format.name());
176+
Assert.assertTrue(location.delete());
177+
Table table = tables.create(SCHEMA, SPEC,
178+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
179+
location.toString());
180+
List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L);
181+
expectedRecords.get(0).set(2, "2020-03-20");
182+
expectedRecords.get(1).set(2, "2020-03-20");
183+
DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords);
184+
table.newAppend()
185+
.appendFile(dataFile)
186+
.commit();
187+
Job job = Job.getInstance(conf);
188+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
189+
configBuilder.readFrom(location.toString())
190+
.filter(Expressions.and(
191+
Expressions.equal("date", "2020-03-20"),
192+
Expressions.equal("id", 0)));
193+
194+
AssertHelpers.assertThrows(
195+
"Residuals are not evaluated today for Iceberg Generics In memory model",
196+
RuntimeException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
197+
() -> validate(job, expectedRecords));
198+
}
199+
200+
@Test
201+
public void testProjection() throws Exception {
202+
File location = temp.newFolder(format.name());
203+
Assert.assertTrue(location.delete());
204+
Schema projectedSchema = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
205+
Table table = tables.create(SCHEMA, SPEC,
206+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
207+
location.toString());
208+
List<Record> inputRecords = RandomGenericData.generate(table.schema(), 1, 0L);
209+
DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, inputRecords);
210+
table.newAppend()
211+
.appendFile(dataFile)
212+
.commit();
213+
214+
Job job = Job.getInstance(conf);
215+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
216+
configBuilder
217+
.readFrom(location.toString())
218+
.project(projectedSchema);
219+
List<Record> outputRecords = readRecords(job.getConfiguration());
220+
Assert.assertEquals(inputRecords.size(), outputRecords.size());
221+
Assert.assertEquals(projectedSchema.asStruct(), outputRecords.get(0).struct());
222+
}
223+
224+
@Test
225+
public void testSnapshotReads() throws Exception {
226+
File location = temp.newFolder(format.name());
227+
Assert.assertTrue(location.delete());
228+
Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
229+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
230+
location.toString());
231+
List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L);
232+
table.newAppend()
233+
.appendFile(writeFile(table, null, format, expectedRecords))
234+
.commit();
235+
long snapshotId = table.currentSnapshot().snapshotId();
236+
table.newAppend()
237+
.appendFile(writeFile(table, null, format, RandomGenericData.generate(table.schema(), 1, 0L)))
238+
.commit();
239+
240+
Job job = Job.getInstance(conf);
241+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
242+
configBuilder
243+
.readFrom(location.toString())
244+
.snapshotId(snapshotId);
245+
246+
validate(job, expectedRecords);
247+
}
248+
249+
@Test
250+
public void testLocality() throws Exception {
251+
File location = temp.newFolder(format.name());
252+
Assert.assertTrue(location.delete());
253+
Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
254+
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
255+
location.toString());
256+
List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 1, 0L);
257+
table.newAppend()
258+
.appendFile(writeFile(table, null, format, expectedRecords))
259+
.commit();
260+
Job job = Job.getInstance(conf);
261+
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
262+
configBuilder.readFrom(location.toString());
263+
264+
for (InputSplit split : splits(job.getConfiguration())) {
265+
Assert.assertArrayEquals(IcebergInputFormat.IcebergSplit.ANYWHERE, split.getLocations());
266+
}
267+
268+
configBuilder.preferLocality();
269+
for (InputSplit split : splits(job.getConfiguration())) {
270+
Assert.assertArrayEquals(new String[]{"localhost"}, split.getLocations());
271+
}
136272
}
137273

138274
public static class HadoopCatalogFunc implements Function<Configuration, Catalog> {
@@ -157,22 +293,26 @@ public void testCustomCatalog() throws Exception {
157293
table.newAppend()
158294
.appendFile(dataFile)
159295
.commit();
160-
validate(conf, tableIdentifier.toString(), HadoopCatalogFunc.class, expectedRecords);
161-
}
162296

163-
private static void validate(
164-
Configuration conf, String path, Class<? extends Function<Configuration, Catalog>> catalogFuncClass,
165-
List<Record> expectedRecords) throws IOException {
166297
Job job = Job.getInstance(conf);
167298
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
168-
if (catalogFuncClass != null) {
169-
configBuilder.catalogFunc(catalogFuncClass);
170-
}
171-
configBuilder.readFrom(path);
299+
configBuilder
300+
.catalogFunc(HadoopCatalogFunc.class)
301+
.readFrom(tableIdentifier.toString());
302+
validate(job, expectedRecords);
303+
}
304+
305+
private static void validate(Job job, List<Record> expectedRecords) {
172306
List<Record> actualRecords = readRecords(job.getConfiguration());
173307
Assert.assertEquals(expectedRecords, actualRecords);
174308
}
175309

310+
private static <T> List<InputSplit> splits(Configuration conf) {
311+
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
312+
IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
313+
return icebergInputFormat.getSplits(context);
314+
}
315+
176316
private static <T> List<T> readRecords(Configuration conf) {
177317
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
178318
IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();

0 commit comments

Comments
 (0)