3131import org .apache .hudi .avro .model .HoodieSliceInfo ;
3232import org .apache .hudi .client .SparkRDDWriteClient ;
3333import org .apache .hudi .client .WriteStatus ;
34- import org .apache .hudi .client .transaction . lock . InProcessLockProvider ;
34+ import org .apache .hudi .client .common . HoodieSparkEngineContext ;
3535import org .apache .hudi .common .HoodieCleanStat ;
3636import org .apache .hudi .common .bootstrap .TestBootstrapIndex ;
3737import org .apache .hudi .common .config .HoodieMetadataConfig ;
7373import org .apache .hudi .common .util .collection .Pair ;
7474import org .apache .hudi .config .HoodieCleanConfig ;
7575import org .apache .hudi .config .HoodieCompactionConfig ;
76- import org .apache .hudi .config .HoodieLockConfig ;
7776import org .apache .hudi .config .HoodieWriteConfig ;
7877import org .apache .hudi .exception .HoodieIOException ;
7978import org .apache .hudi .index .HoodieIndex ;
8685
8786import org .apache .hadoop .fs .FSDataOutputStream ;
8887import org .apache .hadoop .fs .Path ;
89- import org .apache .log4j .LogManager ;
90- import org .apache .log4j .Logger ;
9188import org .apache .spark .api .java .JavaRDD ;
9289import org .junit .jupiter .api .Test ;
9390import org .junit .jupiter .params .ParameterizedTest ;
10198import java .util .Arrays ;
10299import java .util .Collections ;
103100import java .util .HashMap ;
104- import java .util .HashSet ;
105101import java .util .List ;
106102import java .util .Map ;
107- import java .util .Set ;
108103import java .util .TreeSet ;
109104import java .util .UUID ;
110105import java .util .concurrent .TimeUnit ;
132127public class TestCleaner extends HoodieClientTestBase {
133128
134129 private static final int BIG_BATCH_INSERT_SIZE = 500 ;
135- private static final Logger LOG = LogManager . getLogger ( TestCleaner . class ) ;
130+ private static final int PARALLELISM = 10 ;
136131
137132 /**
138133 * Helper method to do first batch of insert for clean by versions/commits tests.
139134 *
140- * @param cfg Hoodie Write Config
135+ * @param context Spark engine context
136+ * @param metaClient Hoodie table meta client
141137 * @param client Hoodie Client
142138 * @param recordGenFunction Function to generate records for insertion
143139 * @param insertFn Insertion API for testing
144140 * @throws Exception in case of error
145141 */
146- private Pair <String , JavaRDD <WriteStatus >> insertFirstBigBatchForClientCleanerTest (HoodieWriteConfig cfg , SparkRDDWriteClient client ,
142+ public static Pair <String , JavaRDD <WriteStatus >> insertFirstBigBatchForClientCleanerTest (
143+ HoodieSparkEngineContext context ,
144+ HoodieTableMetaClient metaClient ,
145+ SparkRDDWriteClient client ,
147146 Function2 <List <HoodieRecord >, String , Integer > recordGenFunction ,
148- Function3 <JavaRDD <WriteStatus >, SparkRDDWriteClient , JavaRDD <HoodieRecord >, String > insertFn ,
149- HoodieCleaningPolicy cleaningPolicy ) throws Exception {
147+ Function3 <JavaRDD <WriteStatus >, SparkRDDWriteClient , JavaRDD <HoodieRecord >, String > insertFn ) throws Exception {
150148
151149 /*
152150 * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -155,7 +153,7 @@ private Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTe
155153 String newCommitTime = client .startCommit ();
156154
157155 List <HoodieRecord > records = recordGenFunction .apply (newCommitTime , BIG_BATCH_INSERT_SIZE );
158- JavaRDD <HoodieRecord > writeRecords = jsc . parallelize (records , 5 );
156+ JavaRDD <HoodieRecord > writeRecords = context . getJavaSparkContext (). parallelize (records , PARALLELISM );
159157
160158 JavaRDD <WriteStatus > statuses = insertFn .apply (client , writeRecords , newCommitTime );
161159 // Verify there are no errors
@@ -174,8 +172,8 @@ private Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTe
174172 assertTrue (table .getCompletedCleanTimeline ().empty ());
175173
176174 if (client .getConfig ().shouldAutoCommit ()) {
177- HoodieIndex index = SparkHoodieIndexFactory .createIndex (cfg );
178- List <HoodieRecord > taggedRecords = tagLocation (index , jsc . parallelize (records , 1 ), table ).collect ();
175+ HoodieIndex index = SparkHoodieIndexFactory .createIndex (client . getConfig () );
176+ List <HoodieRecord > taggedRecords = tagLocation (index , context , context . getJavaSparkContext (). parallelize (records , PARALLELISM ), table ).collect ();
179177 checkTaggedRecords (taggedRecords , newCommitTime );
180178 }
181179 return Pair .of (newCommitTime , statuses );
@@ -184,16 +182,17 @@ private Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTe
184182 /**
185183 * Helper method to do first batch of insert for clean by versions/commits tests.
186184 *
187- * @param cfg Hoodie Write Config
185+ * @param context Spark engine context
188186 * @param client Hoodie Client
189187 * @param recordGenFunction Function to generate records for insertion
190188 * @param insertFn Insertion API for testing
191189 * @throws Exception in case of error
192190 */
193- private Pair <String , JavaRDD <WriteStatus >> insertFirstFailedBigBatchForClientCleanerTest (HoodieWriteConfig cfg , SparkRDDWriteClient client ,
194- Function2 <List <HoodieRecord >, String , Integer > recordGenFunction ,
195- Function3 <JavaRDD <WriteStatus >, SparkRDDWriteClient , JavaRDD <HoodieRecord >, String > insertFn ,
196- HoodieCleaningPolicy cleaningPolicy ) throws Exception {
191+ public static Pair <String , JavaRDD <WriteStatus >> insertFirstFailedBigBatchForClientCleanerTest (
192+ HoodieSparkEngineContext context ,
193+ SparkRDDWriteClient client ,
194+ Function2 <List <HoodieRecord >, String , Integer > recordGenFunction ,
195+ Function3 <JavaRDD <WriteStatus >, SparkRDDWriteClient , JavaRDD <HoodieRecord >, String > insertFn ) throws Exception {
197196
198197 /*
199198 * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -202,7 +201,7 @@ private Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCle
202201 String newCommitTime = client .startCommit ();
203202
204203 List <HoodieRecord > records = recordGenFunction .apply (newCommitTime , BIG_BATCH_INSERT_SIZE );
205- JavaRDD <HoodieRecord > writeRecords = jsc .parallelize (records , 5 );
204+ JavaRDD <HoodieRecord > writeRecords = context . getJavaSparkContext () .parallelize (records , 5 );
206205
207206 JavaRDD <WriteStatus > statuses = insertFn .apply (client , writeRecords , newCommitTime );
208207 // Verify there are no errors
@@ -359,8 +358,7 @@ private void testInsertAndCleanByVersions(
359358 final Function2 <List <HoodieRecord >, String , Integer > recordUpsertGenWrappedFunction =
360359 generateWrapRecordsFn (isPreppedAPI , cfg , dataGen ::generateUniqueUpdates );
361360
362- insertFirstBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
363- HoodieCleaningPolicy .KEEP_LATEST_FILE_VERSIONS );
361+ insertFirstBigBatchForClientCleanerTest (context , metaClient , client , recordInsertGenWrappedFunction , insertFn );
364362
365363 Map <HoodieFileGroupId , FileSlice > compactionFileIdToLatestFileSlice = new HashMap <>();
366364 metaClient = HoodieTableMetaClient .reload (metaClient );
@@ -458,15 +456,6 @@ private void testInsertAndCleanByVersions(
458456 }
459457 }
460458
461- /**
462- * Test Clean-By-Commits using insert/upsert API.
463- */
464- @ ParameterizedTest
465- @ ValueSource (booleans = {true , false })
466- public void testInsertAndCleanByCommits (boolean isAsync ) throws Exception {
467- testInsertAndCleanByCommits (SparkRDDWriteClient ::insert , SparkRDDWriteClient ::upsert , false , isAsync );
468- }
469-
470459 /**
471460 * Test Clean-By-Commits using insert/upsert API.
472461 */
@@ -475,117 +464,6 @@ public void testFailedInsertAndCleanByCommits() throws Exception {
475464 testFailedInsertAndCleanByCommits (SparkRDDWriteClient ::insert , false );
476465 }
477466
478- /**
479- * Test Clean-By-Commits using prepped version of insert/upsert API.
480- */
481- @ Test
482- public void testInsertPreppedAndCleanByCommits () throws Exception {
483- testInsertAndCleanByCommits (SparkRDDWriteClient ::insertPreppedRecords , SparkRDDWriteClient ::upsertPreppedRecords ,
484- true , false );
485- }
486-
487- /**
488- * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
489- */
490- @ Test
491- public void testBulkInsertPreppedAndCleanByCommits () throws Exception {
492- testInsertAndCleanByCommits (
493- (client , recordRDD , instantTime ) -> client .bulkInsertPreppedRecords (recordRDD , instantTime , Option .empty ()),
494- SparkRDDWriteClient ::upsertPreppedRecords , true , false );
495- }
496-
497- /**
498- * Test Clean-By-Commits using bulk-insert/upsert API.
499- */
500- @ Test
501- public void testBulkInsertAndCleanByCommits () throws Exception {
502- testInsertAndCleanByCommits (SparkRDDWriteClient ::bulkInsert , SparkRDDWriteClient ::upsert , false , false );
503- }
504-
505- /**
506- * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
507- *
508- * @param insertFn Insert API to be tested
509- * @param upsertFn Upsert API to be tested
510- * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
511- * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
512- * @throws Exception in case of errors
513- */
514- private void testInsertAndCleanByCommits (
515- Function3 <JavaRDD <WriteStatus >, SparkRDDWriteClient , JavaRDD <HoodieRecord >, String > insertFn ,
516- Function3 <JavaRDD <WriteStatus >, SparkRDDWriteClient , JavaRDD <HoodieRecord >, String > upsertFn , boolean isPreppedAPI , boolean isAsync )
517- throws Exception {
518- int maxCommits = 3 ; // keep upto 3 commits from the past
519- HoodieWriteConfig cfg = getConfigBuilder ()
520- .withCleanConfig (HoodieCleanConfig .newBuilder ()
521- .withCleanerPolicy (HoodieCleaningPolicy .KEEP_LATEST_COMMITS ).withAsyncClean (isAsync ).retainCommits (maxCommits ).build ())
522- .withParallelism (1 , 1 ).withBulkInsertParallelism (1 ).withFinalizeWriteParallelism (1 ).withDeleteParallelism (1 )
523- .withConsistencyGuardConfig (ConsistencyGuardConfig .newBuilder ().withConsistencyCheckEnabled (true ).build ())
524- .withLockConfig (HoodieLockConfig .newBuilder ().withLockProvider (InProcessLockProvider .class ).build ())
525- .build ();
526- SparkRDDWriteClient client = getHoodieWriteClient (cfg );
527-
528- final Function2 <List <HoodieRecord >, String , Integer > recordInsertGenWrappedFunction =
529- generateWrapRecordsFn (isPreppedAPI , cfg , dataGen ::generateInserts );
530-
531- final Function2 <List <HoodieRecord >, String , Integer > recordUpsertGenWrappedFunction =
532- generateWrapRecordsFn (isPreppedAPI , cfg , dataGen ::generateUniqueUpdates );
533-
534- insertFirstBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
535- HoodieCleaningPolicy .KEEP_LATEST_COMMITS );
536-
537- // Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
538- for (int i = 0 ; i < 8 ; i ++) {
539- String newCommitTime = makeNewCommitTime ();
540- try {
541- client .startCommitWithTime (newCommitTime );
542- List <HoodieRecord > records = recordUpsertGenWrappedFunction .apply (newCommitTime , 100 );
543-
544- List <WriteStatus > statuses = upsertFn .apply (client , jsc .parallelize (records , 1 ), newCommitTime ).collect ();
545- // Verify there are no errors
546- assertNoWriteErrors (statuses );
547-
548- metaClient = HoodieTableMetaClient .reload (metaClient );
549- HoodieTable table1 = HoodieSparkTable .create (cfg , context , metaClient );
550- HoodieTimeline activeTimeline = table1 .getCompletedCommitsTimeline ();
551- HoodieInstant lastInstant = activeTimeline .lastInstant ().get ();
552- if (cfg .isAsyncClean ()) {
553- activeTimeline = activeTimeline .findInstantsBefore (lastInstant .getTimestamp ());
554- }
555- // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
556- // commit
557- Option <HoodieInstant > earliestRetainedCommit = activeTimeline .nthFromLastInstant (maxCommits );
558- Set <HoodieInstant > acceptableCommits = activeTimeline .getInstants ().collect (Collectors .toSet ());
559- if (earliestRetainedCommit .isPresent ()) {
560- acceptableCommits
561- .removeAll (activeTimeline .findInstantsInRange ("000" , earliestRetainedCommit .get ().getTimestamp ())
562- .getInstants ().collect (Collectors .toSet ()));
563- acceptableCommits .add (earliestRetainedCommit .get ());
564- }
565-
566- TableFileSystemView fsView = table1 .getFileSystemView ();
567- // Need to ensure the following
568- for (String partitionPath : dataGen .getPartitionPaths ()) {
569- List <HoodieFileGroup > fileGroups = fsView .getAllFileGroups (partitionPath ).collect (Collectors .toList ());
570- for (HoodieFileGroup fileGroup : fileGroups ) {
571- Set <String > commitTimes = new HashSet <>();
572- fileGroup .getAllBaseFiles ().forEach (value -> {
573- LOG .debug ("Data File - " + value );
574- commitTimes .add (value .getCommitTime ());
575- });
576- if (cfg .isAsyncClean ()) {
577- commitTimes .remove (lastInstant .getTimestamp ());
578- }
579- assertEquals (acceptableCommits .stream ().map (HoodieInstant ::getTimestamp ).collect (Collectors .toSet ()), commitTimes ,
580- "Only contain acceptable versions of file should be present" );
581- }
582- }
583- } catch (IOException ioe ) {
584- throw new RuntimeException (ioe );
585- }
586- }
587- }
588-
589467 /**
590468 * Test Helper for Cleaning failed commits by commits logic from HoodieWriteClient API perspective.
591469 *
@@ -612,22 +490,18 @@ private void testFailedInsertAndCleanByCommits(
612490 final Function2 <List <HoodieRecord >, String , Integer > recordInsertGenWrappedFunction =
613491 generateWrapRecordsFn (isPreppedAPI , cfg , dataGen ::generateInserts );
614492
615- Pair <String , JavaRDD <WriteStatus >> result = insertFirstBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
616- HoodieCleaningPolicy .KEEP_LATEST_COMMITS );
493+ Pair <String , JavaRDD <WriteStatus >> result = insertFirstBigBatchForClientCleanerTest (context , metaClient , client , recordInsertGenWrappedFunction , insertFn );
617494 client .commit (result .getLeft (), result .getRight ());
618495
619496 HoodieTable table = HoodieSparkTable .create (client .getConfig (), context , metaClient );
620497 assertTrue (table .getCompletedCleanTimeline ().empty ());
621498
622- insertFirstFailedBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
623- HoodieCleaningPolicy .KEEP_LATEST_COMMITS );
499+ insertFirstFailedBigBatchForClientCleanerTest (context , client , recordInsertGenWrappedFunction , insertFn );
624500
625- insertFirstFailedBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
626- HoodieCleaningPolicy .KEEP_LATEST_COMMITS );
501+ insertFirstFailedBigBatchForClientCleanerTest (context , client , recordInsertGenWrappedFunction , insertFn );
627502
628503 Pair <String , JavaRDD <WriteStatus >> ret =
629- insertFirstFailedBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
630- HoodieCleaningPolicy .KEEP_LATEST_COMMITS );
504+ insertFirstFailedBigBatchForClientCleanerTest (context , client , recordInsertGenWrappedFunction , insertFn );
631505 // Await till enough time passes such that the last failed commits heartbeats are expired
632506 await ().atMost (10 , TimeUnit .SECONDS ).until (() -> client .getHeartbeatClient ()
633507 .isHeartbeatExpired (ret .getLeft ()));
@@ -1352,24 +1226,20 @@ private void testInsertAndCleanFailedWritesByVersions(
13521226 final Function2 <List <HoodieRecord >, String , Integer > recordInsertGenWrappedFunction =
13531227 generateWrapRecordsFn (isPreppedAPI , cfg , dataGen ::generateInserts );
13541228
1355- Pair <String , JavaRDD <WriteStatus >> result = insertFirstBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
1356- HoodieCleaningPolicy .KEEP_LATEST_FILE_VERSIONS );
1229+ Pair <String , JavaRDD <WriteStatus >> result = insertFirstBigBatchForClientCleanerTest (context , metaClient , client , recordInsertGenWrappedFunction , insertFn );
13571230
13581231 client .commit (result .getLeft (), result .getRight ());
13591232
13601233 HoodieTable table = HoodieSparkTable .create (client .getConfig (), context , metaClient );
13611234
13621235 assertTrue (table .getCompletedCleanTimeline ().empty ());
13631236
1364- insertFirstFailedBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
1365- HoodieCleaningPolicy .KEEP_LATEST_FILE_VERSIONS );
1237+ insertFirstFailedBigBatchForClientCleanerTest (context , client , recordInsertGenWrappedFunction , insertFn );
13661238
1367- insertFirstFailedBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
1368- HoodieCleaningPolicy .KEEP_LATEST_FILE_VERSIONS );
1239+ insertFirstFailedBigBatchForClientCleanerTest (context , client , recordInsertGenWrappedFunction , insertFn );
13691240
13701241 Pair <String , JavaRDD <WriteStatus >> ret =
1371- insertFirstFailedBigBatchForClientCleanerTest (cfg , client , recordInsertGenWrappedFunction , insertFn ,
1372- HoodieCleaningPolicy .KEEP_LATEST_FILE_VERSIONS );
1242+ insertFirstFailedBigBatchForClientCleanerTest (context , client , recordInsertGenWrappedFunction , insertFn );
13731243
13741244 // Await till enough time passes such that the last failed commits heartbeats are expired
13751245 await ().atMost (10 , TimeUnit .SECONDS ).until (() -> client .getHeartbeatClient ()
0 commit comments