Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static Pair<String, Pair<String, String>> calculateBeginAndEndInstants(Ja
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();

final HoodieTimeline activeCommitTimeline =
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: I checked impl of MORIncrementalRelation and seems to use metaClient. getCommitsAndCompactionTimeline() api.
Ref:


So used the same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we should replace this API. Simply use metaClient.getActiveTimeline().getWriteTimeline() as much as possible. I don't think this API brings any real benefit apart from filtering out certain types (deltacommit and compaction) for COW table. Anyway, such commits won't be there for COW table and active timeline has already been loaded by that time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.


String beginInstantTime = beginInstant.orElseGet(() -> {
if (missingCheckpointStrategy != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,42 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -55,20 +66,39 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {

private HoodieTestDataGenerator dataGen;
private HoodieTableMetaClient metaClient;
private HoodieTableType tableType = COPY_ON_WRITE;

@BeforeEach
public void setUp() throws IOException {
dataGen = new HoodieTestDataGenerator();
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
}

@Test
public void testHoodieIncrSource() throws IOException {
@Override
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
props = HoodieTableMetaClient.withPropertyBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class)
.fromProperties(props)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}

private static Stream<Arguments> tableTypeParams() {
return Arrays.stream(new HoodieTableType[][] {{HoodieTableType.COPY_ON_WRITE}, {HoodieTableType.MERGE_ON_READ}}).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("tableTypeParams")
public void testHoodieIncrSource(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.enable(false).build())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why false? Let's keep it default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it messes w/ metadata compaction/archival. and so data table archival does not kick in. I just want to simulate archival in datatable. also, in this test, there is no real benefit w/ metadata enabled. we are just interested in the timeline files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

.build();

SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
Expand Down