Skip to content

Commit 3d22a19

Browse files
trushevAlexey Kudinkin
authored andcommitted
[HUDI-5147] Flink data skipping doesn't work when HepPlanner calls copy()… (apache#7113)
* [HUDI-5147] Flink data skipping doesn't work when HepPlanner calls copy() on HoodieTableSource
1 parent 9524fa8 commit 3d22a19

File tree

3 files changed

+53
-18
lines changed

3 files changed

+53
-18
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,4 +298,9 @@ private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuratio
298298

299299
return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
300300
}
301+
302+
@VisibleForTesting
303+
public List<ResolvedExpression> getFilters() {
304+
return filters;
305+
}
301306
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ public class HoodieTableSource implements
126126

127127
private int[] requiredPos;
128128
private long limit;
129-
private List<ResolvedExpression> filters;
130129

131130
private List<Map<String, String>> requiredPartitions;
132131

@@ -145,25 +144,26 @@ public HoodieTableSource(
145144
List<String> partitionKeys,
146145
String defaultPartName,
147146
Configuration conf,
147+
@Nullable FileIndex fileIndex,
148148
@Nullable List<Map<String, String>> requiredPartitions,
149149
@Nullable int[] requiredPos,
150-
@Nullable Long limit,
151-
@Nullable List<ResolvedExpression> filters) {
150+
@Nullable Long limit) {
152151
this.schema = schema;
153152
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
154153
this.path = path;
155154
this.partitionKeys = partitionKeys;
156155
this.defaultPartName = defaultPartName;
157156
this.conf = conf;
157+
this.fileIndex = fileIndex == null
158+
? FileIndex.instance(this.path, this.conf, this.tableRowType)
159+
: fileIndex;
158160
this.requiredPartitions = requiredPartitions;
159161
this.requiredPos = requiredPos == null
160162
? IntStream.range(0, this.tableRowType.getFieldCount()).toArray()
161163
: requiredPos;
162164
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
163-
this.filters = filters == null ? Collections.emptyList() : filters;
164165
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
165166
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
166-
this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType);
167167
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
168168
}
169169

@@ -214,7 +214,7 @@ public ChangelogMode getChangelogMode() {
214214
@Override
215215
public DynamicTableSource copy() {
216216
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
217-
conf, requiredPartitions, requiredPos, limit, filters);
217+
conf, fileIndex, requiredPartitions, requiredPos, limit);
218218
}
219219

220220
@Override
@@ -224,8 +224,10 @@ public String asSummaryString() {
224224

225225
@Override
226226
public Result applyFilters(List<ResolvedExpression> filters) {
227-
this.filters = filters.stream().filter(ExpressionUtils::isSimpleCallExpression).collect(Collectors.toList());
228-
this.fileIndex.setFilters(this.filters);
227+
List<ResolvedExpression> callExpressionFilters = filters.stream()
228+
.filter(ExpressionUtils::isSimpleCallExpression)
229+
.collect(Collectors.toList());
230+
this.fileIndex.setFilters(callExpressionFilters);
229231
// refuse all the filters now
230232
return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters));
231233
}
@@ -513,4 +515,9 @@ public FileStatus[] getReadFiles() {
513515
}
514516
return fileIndex.getFilesInPartitions();
515517
}
518+
519+
@VisibleForTesting
520+
FileIndex getFileIndex() {
521+
return fileIndex;
522+
}
516523
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import org.apache.flink.api.common.io.InputFormat;
2929
import org.apache.flink.configuration.Configuration;
3030
import org.apache.flink.table.data.RowData;
31+
import org.apache.flink.table.expressions.CallExpression;
32+
import org.apache.flink.table.expressions.ResolvedExpression;
33+
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
3134
import org.apache.hadoop.fs.FileStatus;
3235
import org.apache.hadoop.fs.Path;
3336
import org.junit.jupiter.api.Test;
@@ -40,13 +43,15 @@
4043
import java.util.Arrays;
4144
import java.util.Collections;
4245
import java.util.HashMap;
46+
import java.util.List;
4347
import java.util.Map;
4448
import java.util.stream.Collectors;
4549

4650
import static org.hamcrest.CoreMatchers.instanceOf;
4751
import static org.hamcrest.MatcherAssert.assertThat;
4852
import static org.hamcrest.core.Is.is;
4953
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
54+
import static org.junit.jupiter.api.Assertions.assertEquals;
5055
import static org.junit.jupiter.api.Assertions.assertNotNull;
5156
import static org.junit.jupiter.api.Assertions.assertNull;
5257

@@ -115,16 +120,7 @@ void testGetInputFormat() throws Exception {
115120

116121
@Test
117122
void testGetTableAvroSchema() {
118-
final String path = tempFile.getAbsolutePath();
119-
conf = TestConfigurations.getDefaultConf(path);
120-
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
121-
122-
HoodieTableSource tableSource = new HoodieTableSource(
123-
TestConfigurations.TABLE_SCHEMA,
124-
new Path(tempFile.getPath()),
125-
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
126-
"default-par",
127-
conf);
123+
HoodieTableSource tableSource = getEmptyStreamingSource();
128124
assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed");
129125
final String schemaFields = tableSource.getTableAvroSchema().getFields().stream()
130126
.map(Schema.Field::name)
@@ -137,4 +133,31 @@ void testGetTableAvroSchema() {
137133
+ "uuid,name,age,ts,partition";
138134
assertThat(schemaFields, is(expected));
139135
}
136+
137+
@Test
138+
void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
139+
HoodieTableSource tableSource = getEmptyStreamingSource();
140+
ResolvedExpression mockExpression = new CallExpression(
141+
BuiltInFunctionDefinitions.IN,
142+
Collections.emptyList(),
143+
TestConfigurations.ROW_DATA_TYPE);
144+
List<ResolvedExpression> expectedFilters = Collections.singletonList(mockExpression);
145+
tableSource.applyFilters(expectedFilters);
146+
HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
147+
List<ResolvedExpression> actualFilters = copiedSource.getFileIndex().getFilters();
148+
assertEquals(expectedFilters, actualFilters);
149+
}
150+
151+
private HoodieTableSource getEmptyStreamingSource() {
152+
final String path = tempFile.getAbsolutePath();
153+
conf = TestConfigurations.getDefaultConf(path);
154+
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
155+
156+
return new HoodieTableSource(
157+
TestConfigurations.TABLE_SCHEMA,
158+
new Path(tempFile.getPath()),
159+
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
160+
"default-par",
161+
conf);
162+
}
140163
}

0 commit comments

Comments
 (0)