Skip to content

Commit a53db54

Browse files
author
Andrei Ionescu
committed
Fix Iceberg Reader for nested partitions
1 parent c344934 commit a53db54

3 files changed

Lines changed: 76 additions & 4 deletions

File tree

core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,14 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
8181
List<Types.NestedField> expectedFields = struct.fields();
8282
for (int i = 0; i < expectedFields.size(); i += 1) {
8383
Types.NestedField field = expectedFields.get(i);
84+
String sanitizedFieldName = AvroSchemaUtil.sanitize(field.name());
8485

8586
// detect reordering
86-
if (i < fields.size() && !field.name().equals(fields.get(i).name())) {
87+
if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) {
8788
hasChange = true;
8889
}
8990

90-
Schema.Field avroField = updateMap.get(field.name());
91+
Schema.Field avroField = updateMap.get(sanitizedFieldName);
9192

9293
if (avroField != null) {
9394
updatedFields.add(avroField);
@@ -123,7 +124,7 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
123124
return null;
124125
}
125126

126-
String expectedName = expectedField.name();
127+
String expectedName = AvroSchemaUtil.sanitize(expectedField.name());
127128

128129
this.current = expectedField.type();
129130
try {

spark/src/main/java/org/apache/iceberg/spark/source/Reader.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.collect.Iterables;
2525
import com.google.common.collect.Iterators;
2626
import com.google.common.collect.Lists;
27+
import com.google.common.collect.Sets;
2728
import java.io.Closeable;
2829
import java.io.IOException;
2930
import java.io.Serializable;
@@ -396,7 +397,15 @@ private Iterator<InternalRow> open(FileScanTask task) {
396397
// schema or rows returned by readers
397398
Schema finalSchema = expectedSchema;
398399
PartitionSpec spec = task.spec();
399-
Set<Integer> idColumns = spec.identitySourceIds();
400+
401+
Set<Integer> idColumns = Sets.newHashSet();
402+
for (Integer i : spec.identitySourceIds()) {
403+
if (spec.schema().columns().stream()
404+
.noneMatch(j -> j.type().isStructType() && j.type().asStructType().field(i) != null)
405+
) {
406+
idColumns.add(i);
407+
}
408+
}
400409

401410
// schema needed for the projection and filtering
402411
StructType sparkType = SparkSchemaUtil.convert(finalSchema);

spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.iceberg.Table;
3333
import org.apache.iceberg.TableProperties;
3434
import org.apache.iceberg.hadoop.HadoopTables;
35+
import org.apache.iceberg.spark.SparkSchemaUtil;
3536
import org.apache.iceberg.types.Types;
3637
import org.apache.spark.sql.Dataset;
3738
import org.apache.spark.sql.Encoders;
@@ -338,4 +339,65 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio
338339
Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
339340
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
340341
}
342+
343+
@Test
344+
public void testNestedPartitioning() throws IOException {
345+
Schema nestedSchema = new Schema(
346+
optional(1, "id", Types.IntegerType.get()),
347+
optional(2, "data", Types.StringType.get()),
348+
optional(3, "nestedData", Types.StructType.of(
349+
optional(4, "id", Types.IntegerType.get()),
350+
optional(5, "moreData", Types.StringType.get())))
351+
);
352+
353+
File parent = temp.newFolder("parquet");
354+
File location = new File(parent, "test");
355+
356+
HadoopTables tables = new HadoopTables(new Configuration());
357+
PartitionSpec spec = PartitionSpec.builderFor(nestedSchema)
358+
.identity("id")
359+
.identity("nestedData.moreData")
360+
.build();
361+
Table table = tables.create(nestedSchema, spec, location.toString());
362+
363+
List<String> jsons = Lists.newArrayList(
364+
"{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"} }",
365+
"{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"} }",
366+
"{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"} }",
367+
"{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"} }"
368+
);
369+
Dataset<Row> df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema))
370+
.json(spark.createDataset(jsons, Encoders.STRING()));
371+
372+
// TODO: incoming columns must be ordered according to the table's schema
373+
df.select("id", "data", "nestedData").write()
374+
.format("iceberg")
375+
.mode("append")
376+
.save(location.toString());
377+
378+
table.refresh();
379+
380+
Dataset<Row> result = spark.read()
381+
.format("iceberg")
382+
.load(location.toString());
383+
384+
List<Row> actual = result.orderBy("id").collectAsList();
385+
Assert.assertEquals("Number of rows should match", jsons.size(), actual.size());
386+
Assert.assertEquals("Row 1 col 1 is 1", 1, actual.get(0).getInt(0));
387+
Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1));
388+
Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0));
389+
Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1));
390+
Assert.assertEquals("Row 2 col 1 is 1", 2, actual.get(1).getInt(0));
391+
Assert.assertEquals("Row 2 col 2 is a", "b", actual.get(1).getString(1));
392+
Assert.assertEquals("Row 2 col 3,1 is 100", 200, actual.get(1).getStruct(2).getInt(0));
393+
Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1));
394+
Assert.assertEquals("Row 3 col 1 is 1", 3, actual.get(2).getInt(0));
395+
Assert.assertEquals("Row 3 col 2 is a", "c", actual.get(2).getString(1));
396+
Assert.assertEquals("Row 3 col 3,1 is 100", 300, actual.get(2).getStruct(2).getInt(0));
397+
Assert.assertEquals("Row 3 col 3,2 is p1", "p2", actual.get(2).getStruct(2).getString(1));
398+
Assert.assertEquals("Row 4 col 1 is 1", 4, actual.get(3).getInt(0));
399+
Assert.assertEquals("Row 4 col 2 is a", "d", actual.get(3).getString(1));
400+
Assert.assertEquals("Row 4 col 3,1 is 100", 400, actual.get(3).getStruct(2).getInt(0));
401+
Assert.assertEquals("Row 4 col 3,2 is p1", "p2", actual.get(3).getStruct(2).getString(1));
402+
}
341403
}

0 commit comments

Comments
 (0)