Skip to content

Commit f284d63

Browse files
committed
PARQUET-1765: Invalid filteredRowCount in InternalParquetRecordReader (#747)
(cherry picked from commit 8c1bc9b)
1 parent 49de5b4 commit f284d63

3 files changed

Lines changed: 64 additions & 2 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,14 @@ public void initialize(ParquetFileReader reader, ParquetReadOptions options) {
180180
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
181181
this.requestedSchema = readContext.getRequestedSchema();
182182
this.columnCount = requestedSchema.getPaths().size();
183+
// Setting the projection schema before running any filtering (e.g. getting filtered record count)
184+
// because projection impacts filtering
185+
reader.setRequestedSchema(requestedSchema);
183186
this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext);
184187
this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true);
185188
this.total = reader.getFilteredRecordCount();
186189
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total);
187190
this.filterRecords = options.useRecordFilter();
188-
reader.setRequestedSchema(requestedSchema);
189191
LOG.info("RecordReader initialized will read a total of {} records.", total);
190192
}
191193

@@ -201,13 +203,15 @@ public void initialize(ParquetFileReader reader, Configuration configuration)
201203
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
202204
this.requestedSchema = readContext.getRequestedSchema();
203205
this.columnCount = requestedSchema.getPaths().size();
206+
// Setting the projection schema before running any filtering (e.g. getting filtered record count)
207+
// because projection impacts filtering
208+
reader.setRequestedSchema(requestedSchema);
204209
this.recordConverter = readSupport.prepareForRead(
205210
configuration, fileMetadata, fileSchema, readContext);
206211
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
207212
this.total = reader.getFilteredRecordCount();
208213
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
209214
this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true);
210-
reader.setRequestedSchema(requestedSchema);
211215
LOG.info("RecordReader initialized will read a total of {} records.", total);
212216
}
213217

parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ public int hashCode() {
199199
public String toString() {
200200
return "User [id=" + id + ", name=" + name + ", phoneNumbers=" + phoneNumbers + ", location=" + location + "]";
201201
}
202+
203+
public User cloneWithName(String name) {
204+
return new User(id, name, phoneNumbers, location);
205+
}
202206
}
203207

204208
public static SimpleGroup groupFromUser(User user) {
@@ -257,6 +261,10 @@ private static Location getLocation(Group location) {
257261
}
258262

259263
private static boolean isNull(Group group, String field) {
264+
// Use null value if the field is not in the group schema
265+
if (!group.getType().containsField(field)) {
266+
return true;
267+
}
260268
int repetition = group.getFieldRepetitionCount(field);
261269
if (repetition == 0) {
262270
return true;

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.parquet.hadoop;
2020

2121
import static java.util.Collections.emptyList;
22+
import static java.util.stream.Collectors.toList;
2223
import static org.apache.parquet.filter2.predicate.FilterApi.and;
2324
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
2425
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
@@ -33,6 +34,12 @@
3334
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
3435
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
3536
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
37+
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
38+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
39+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
40+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
41+
import static org.apache.parquet.schema.Types.optional;
42+
import static org.apache.parquet.schema.Types.required;
3643
import static org.junit.Assert.assertEquals;
3744
import static org.junit.Assert.assertFalse;
3845
import static org.junit.Assert.assertTrue;
@@ -64,9 +71,12 @@
6471
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
6572
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
6673
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
74+
import org.apache.parquet.hadoop.api.ReadSupport;
6775
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
6876
import org.apache.parquet.hadoop.example.GroupReadSupport;
6977
import org.apache.parquet.io.api.Binary;
78+
import org.apache.parquet.schema.MessageType;
79+
import org.apache.parquet.schema.Types;
7080
import org.junit.AfterClass;
7181
import org.junit.BeforeClass;
7282
import org.junit.Test;
@@ -87,6 +97,19 @@ public class TestColumnIndexFiltering {
8797
private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
8898
private static final Path FILE_V1 = createTempFile();
8999
private static final Path FILE_V2 = createTempFile();
100+
private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
101+
.required(INT64).named("id")
102+
.optionalGroup()
103+
.addField(optional(DOUBLE).named("lon"))
104+
.addField(optional(DOUBLE).named("lat"))
105+
.named("location")
106+
.optionalGroup()
107+
.repeatedGroup()
108+
.addField(required(INT64).named("number"))
109+
.addField(optional(BINARY).as(stringType()).named("kind"))
110+
.named("phone")
111+
.named("phoneNumbers")
112+
.named("user_without_name");
90113

91114
@Parameters
92115
public static Collection<Object[]> params() {
@@ -199,6 +222,16 @@ private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean u
199222
.useColumnIndexFilter(useColumnIndexFilter));
200223
}
201224

225+
private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {
226+
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
227+
.withFilter(filter)
228+
.useDictionaryFilter(useOtherFiltering)
229+
.useStatsFilter(useOtherFiltering)
230+
.useRecordFilter(useOtherFiltering)
231+
.useColumnIndexFilter(useColumnIndexFilter)
232+
.set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
233+
}
234+
202235
// Assumes that both lists are in the same order
203236
private static void assertContains(Stream<User> expected, List<User> actual) {
204237
Iterator<User> expIt = expected.iterator();
@@ -441,4 +474,21 @@ record -> record.getId() == 1234,
441474
or(eq(longColumn("id"), 1234l),
442475
userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
443476
}
477+
478+
@Test
479+
public void testFilteringWithProjection() throws IOException {
480+
// All rows shall be retrieved because all values in column 'name' shall be handled as null values
481+
assertEquals(
482+
DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()),
483+
readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
484+
485+
// Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
486+
assertEquals(
487+
emptyList(),
488+
readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
489+
assertEquals(
490+
emptyList(),
491+
readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
492+
SCHEMA_WITHOUT_NAME, false, true));
493+
}
444494
}

0 commit comments

Comments
 (0)