From c5b8ef004dc73ef28ac07e8a24234a733b352e4b Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 15 Oct 2022 14:32:01 -0700 Subject: [PATCH 1/2] [HUDI-5038] Increase default num_instants to fetch for incremental source --- .../org/apache/hudi/utilities/sources/HoodieIncrSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index a6b979fcbf1a7..3c162a0522f00 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -52,7 +52,7 @@ public static class Config { * {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose changes can be incrementally fetched. */ static final String NUM_INSTANTS_PER_FETCH = "hoodie.deltastreamer.source.hoodieincr.num_instants"; - static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1; + static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 5; /** * {@value #HOODIE_SRC_PARTITION_FIELDS} specifies partition fields that needs to be added to source table after From 1dade489cc8baafe3008f0447d35dd2ac54d18cb Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 21 Oct 2022 16:49:57 -0700 Subject: [PATCH 2/2] Fix test --- .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index ec163aec33b9a..8218bcd00e82f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -2101,6 +2101,7 @@ public void testHoodieIncrFallback() throws Exception { HoodieDeltaStreamer.Config downstreamCfg = TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT, true, null); + downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1"); new HoodieDeltaStreamer(downstreamCfg, jsc).sync(); insertInTable(tableBasePath, 9, WriteOperationType.UPSERT); @@ -2112,6 +2113,8 @@ public void testHoodieIncrFallback() throws Exception { downstreamCfg.configs = new ArrayList<>(); } + // Remove source.hoodieincr.num_instants config + downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1); downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key() + "=true"); //Adding this conf to make testing easier :) downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");