Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public static FileIndex instance(Path path, Configuration conf, RowType rowType)
return new FileIndex(path, conf, rowType);
}

public static FileIndex instance(Path path, Configuration conf, RowType rowType, List<ResolvedExpression> filters) {
FileIndex fileIndex = instance(path, conf, rowType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch ~ Should we also keep this method: instance(Path path, Configuration conf, RowType rowType) ?

Copy link
Contributor Author

@trushev trushev Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a really good question
Quick answer -- yes we should keep because the method is used in IncrementalInputSplits without filter at all.
But why there is no a filter in IncrementalInputSplits. Looks like FileIndex is useless in this way. I mean data skipping feature skips nothing
Mb we should pass filters from HoodieTableSource here:

case FlinkOptions.QUERY_TYPE_INCREMENTAL:
  IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder()
      .conf(conf)
      .path(FilePathUtils.toFlinkPath(path))
      .rowType(this.tableRowType)
      .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
      .requiredPartitions(getRequiredPartitionPaths()).build();

I think it is not part of this fix related to copy() by HepPlanner
WDYT about the separate ticket/PR for this problem if the one exists?:)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we can fix it in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And another solution is we add a FileIndex param in the constructor so that there is no need to re-construct it again and again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, you are right

  • Added FileIndex param in the constructor
  • Replaced this.filters with local variable in applyFilters(filters)
  • Removed field List<ResolvedExpression> filters as no need anymore
  • Reverted instance(path, conf, rowType, filters) as no need anymore

fileIndex.setFilters(filters);
return fileIndex;
}

/**
* Returns the partition path key and values as a list of map, each map item in the list
* is a mapping of the partition key name to its actual partition value. For example, say
Expand Down Expand Up @@ -298,4 +304,9 @@ private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuratio

return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
}

@VisibleForTesting
public List<ResolvedExpression> getFilters() {
return filters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public HoodieTableSource(
this.filters = filters == null ? Collections.emptyList() : filters;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType);
this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType, this.filters);
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}

Expand Down Expand Up @@ -543,4 +543,9 @@ public FileStatus[] getReadFiles() {
}
return fileIndex.getFilesInPartitions();
}

@VisibleForTesting
FileIndex getFileIndex() {
return fileIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
Expand All @@ -40,13 +43,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand Down Expand Up @@ -115,16 +120,7 @@ void testGetInputFormat() throws Exception {

@Test
void testGetTableAvroSchema() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);

HoodieTableSource tableSource = new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA,
new Path(tempFile.getPath()),
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
"default-par",
conf);
HoodieTableSource tableSource = getEmptyStreamingSource();
assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed");
final String schemaFields = tableSource.getTableAvroSchema().getFields().stream()
.map(Schema.Field::name)
Expand All @@ -137,4 +133,31 @@ void testGetTableAvroSchema() {
+ "uuid,name,age,ts,partition";
assertThat(schemaFields, is(expected));
}

@Test
void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
HoodieTableSource tableSource = getEmptyStreamingSource();
ResolvedExpression mockExpression = new CallExpression(
BuiltInFunctionDefinitions.IN,
Collections.emptyList(),
TestConfigurations.ROW_DATA_TYPE);
List<ResolvedExpression> expectedFilters = Collections.singletonList(mockExpression);
tableSource.applyFilters(expectedFilters);
HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
List<ResolvedExpression> actualFilters = copiedSource.getFileIndex().getFilters();
assertEquals(expectedFilters, actualFilters);
}

private HoodieTableSource getEmptyStreamingSource() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);

return new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA,
new Path(tempFile.getPath()),
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
"default-par",
conf);
}
}