Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
Expand All @@ -37,6 +38,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
Expand Down Expand Up @@ -291,6 +293,15 @@ public static boolean tableExists(String basePath, org.apache.hadoop.conf.Config
}
}

/**
* Returns whether the hoodie table data exists .
*/
public static boolean tableDataExists(HoodieTableMetaClient metaClient) {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = activeTimeline.getLastCommitMetadataWithValidData();
return instantAndCommitMetadata.isPresent();
}

/**
* Generates the bucket ID using format {partition path}_{fileID}.
*/
Expand Down Expand Up @@ -333,7 +344,12 @@ public static HoodieTableMetaClient metaClientForReader(
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && !tableExists(basePath, hadoopConf)) {
return null;
} else {
return createMetaClient(basePath, hadoopConf);
HoodieTableMetaClient metaClient = createMetaClient(basePath, hadoopConf);
if (tableDataExists(metaClient)) {
return metaClient;
} else {
return null;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ void beforeEach(HoodieTableType tableType, Map<String, String> options) throws I
options.forEach((key, value) -> conf.setString(key, value));

StreamerUtil.initTableIfNotExists(conf);
}

void createTableSource() {
this.tableSource = getTableSource(conf);
}

Expand All @@ -87,6 +90,7 @@ void testRead(HoodieTableType tableType) throws Exception {

TestData.writeData(TestData.DATA_SET_INSERT, conf);

createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();

List<RowData> result = readData(inputFormat);
Expand Down Expand Up @@ -128,6 +132,7 @@ void testReadBaseAndLogFiles() throws Exception {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
createTableSource();

InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();

Expand Down Expand Up @@ -185,7 +190,7 @@ void testReadBaseAndLogFilesWithDeletes() throws Exception {
// write another commit using logs and read again.
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);

createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));

Expand Down Expand Up @@ -238,6 +243,7 @@ void testReadBaseAndLogFilesWithDisorderUpdateDelete(boolean compact) throws Exc
// write another commit using logs and read again.
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, compact);
TestData.writeData(TestData.DATA_SET_DISORDER_UPDATE_DELETE, conf);
createTableSource();

InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
Expand Down Expand Up @@ -270,7 +276,7 @@ void testReadWithDeletesMOR() throws Exception {

// write another commit to read again
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);

createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
Expand All @@ -293,7 +299,7 @@ void testReadWithDeletesCOW() throws Exception {

// write another commit to read again
TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);

createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));

Expand All @@ -312,7 +318,7 @@ void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
beforeEach(tableType);

TestData.writeData(TestData.DATA_SET_INSERT, conf);

createTableSource();
Map<String, String> prunedPartitions = new HashMap<>();
prunedPartitions.put("partition", "par1");
// prune to only be with partition 'par1'
Expand All @@ -337,6 +343,7 @@ void testReadChangesMergedMOR() throws Exception {
// write another commit to read again
TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);

createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));

Expand Down Expand Up @@ -368,7 +375,7 @@ void testReadChangesUnMergedMOR() throws Exception {

// write another commit to read again
TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);

createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));

Expand Down Expand Up @@ -459,6 +466,7 @@ void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
beforeEach(tableType, options);

TestData.writeData(TestData.DATA_SET_INSERT, conf);
createTableSource();
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
List<RowData> result = readData(inputFormat);
TestData.assertRowDataEquals(result, TestData.DATA_SET_INSERT);
Expand All @@ -481,7 +489,7 @@ void testReadMORWithCompactionPlanScheduled() throws Exception {
List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
TestData.writeData(dataset, conf);
}

createTableSource();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explain a little more why this test has been affected by your change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If getTableSource method is called before data is written, metaClient will be null, and a call to tablesource.reset () in the unit test method generate null pointer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, i think the change is risky, because the null value is only handled in HoodieTableSource#getStreamInputFormat, original i put a reader meta client for the case where streaming reader starts but there is no data for the table (which is a common case for streaming read), so let's limit the change only to streaming read too and then i think we do not need to change the test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,get it ! ~ 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 hi danny,I have upgraded the program, please help me to CR,thank you !~

InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class));

Expand Down