1818
1919package org .apache .hudi .sink .compact ;
2020
21+ import org .apache .flink .streaming .api .datastream .DataStream ;
22+ import org .apache .flink .streaming .api .datastream .SingleOutputStreamOperator ;
23+ import org .apache .flink .streaming .api .functions .ProcessFunction ;
24+ import org .apache .flink .util .Collector ;
25+ import org .apache .flink .util .OutputTag ;
2126import org .apache .hudi .avro .model .HoodieCompactionPlan ;
2227import org .apache .hudi .client .HoodieFlinkWriteClient ;
2328import org .apache .hudi .common .table .HoodieTableMetaClient ;
2429import org .apache .hudi .common .table .timeline .HoodieInstant ;
2530import org .apache .hudi .common .table .timeline .HoodieTimeline ;
2631import org .apache .hudi .common .util .CompactionUtils ;
2732import org .apache .hudi .common .util .Option ;
33+ import org .apache .hudi .common .util .collection .Pair ;
2834import org .apache .hudi .configuration .FlinkOptions ;
2935import org .apache .hudi .table .HoodieFlinkTable ;
3036import org .apache .hudi .util .CompactionUtil ;
4955
5056import java .io .File ;
5157import java .util .Arrays ;
58+ import java .util .ArrayList ;
5259import java .util .HashMap ;
5360import java .util .List ;
5461import java .util .Map ;
@@ -67,6 +74,8 @@ public class ITTestHoodieFlinkCompactor {
6774
6875 private static final Map <String , List <String >> EXPECTED2 = new HashMap <>();
6976
77+ private static final Map <String , List <String >> EXPECTED3 = new HashMap <>();
78+
7079 static {
7180 EXPECTED1 .put ("par1" , Arrays .asList ("id1,par1,id1,Danny,23,1000,par1" , "id2,par1,id2,Stephen,33,2000,par1" ));
7281 EXPECTED1 .put ("par2" , Arrays .asList ("id3,par2,id3,Julian,53,3000,par2" , "id4,par2,id4,Fabian,31,4000,par2" ));
@@ -77,6 +86,12 @@ public class ITTestHoodieFlinkCompactor {
7786 EXPECTED2 .put ("par2" , Arrays .asList ("id3,par2,id3,Julian,54,3000,par2" , "id4,par2,id4,Fabian,32,4000,par2" ));
7887 EXPECTED2 .put ("par3" , Arrays .asList ("id5,par3,id5,Sophia,18,5000,par3" , "id6,par3,id6,Emma,20,6000,par3" , "id9,par3,id9,Jane,19,6000,par3" ));
7988 EXPECTED2 .put ("par4" , Arrays .asList ("id7,par4,id7,Bob,44,7000,par4" , "id8,par4,id8,Han,56,8000,par4" , "id10,par4,id10,Ella,38,7000,par4" , "id11,par4,id11,Phoebe,52,8000,par4" ));
89+
90+ EXPECTED3 .put ("par1" , Arrays .asList ("id1,par1,id1,Danny,23,1000,par1" , "id2,par1,id2,Stephen,33,2000,par1" ));
91+ EXPECTED3 .put ("par2" , Arrays .asList ("id3,par2,id3,Julian,53,3000,par2" , "id4,par2,id4,Fabian,31,4000,par2" ));
92+ EXPECTED3 .put ("par3" , Arrays .asList ("id5,par3,id5,Sophia,18,5000,par3" , "id6,par3,id6,Emma,20,6000,par3" ));
93+ EXPECTED3 .put ("par4" , Arrays .asList ("id7,par4,id7,Bob,44,7000,par4" , "id8,par4,id8,Han,56,8000,par4" ));
94+ EXPECTED3 .put ("par5" , Arrays .asList ("id12,par5,id12,Tony,27,9000,par5" , "id13,par5,id13,Jenny,72,10000,par5" ));
8095 }
8196
8297 @ TempDir
@@ -203,4 +218,114 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
203218
204219 TestData .checkWrittenFullData (tempFile , EXPECTED2 );
205220 }
221+
222+ @ ParameterizedTest
223+ @ ValueSource (booleans = {true , false })
224+ public void testHoodieFlinkCompactorWithPlanSelectStrategy (boolean enableChangelog ) throws Exception {
225+ // Create hoodie table and insert into data.
226+ EnvironmentSettings settings = EnvironmentSettings .newInstance ().inBatchMode ().build ();
227+ TableEnvironment tableEnv = TableEnvironmentImpl .create (settings );
228+ tableEnv .getConfig ().getConfiguration ()
229+ .setInteger (ExecutionConfigOptions .TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM , 1 );
230+ Map <String , String > options = new HashMap <>();
231+ options .put (FlinkOptions .COMPACTION_ASYNC_ENABLED .key (), "false" );
232+ options .put (FlinkOptions .PATH .key (), tempFile .getAbsolutePath ());
233+ options .put (FlinkOptions .TABLE_TYPE .key (), "MERGE_ON_READ" );
234+ options .put (FlinkOptions .CHANGELOG_ENABLED .key (), enableChangelog + "" );
235+ String hoodieTableDDL = TestConfigurations .getCreateHoodieTableDDL ("t1" , options );
236+ tableEnv .executeSql (hoodieTableDDL );
237+ tableEnv .executeSql (TestSQL .INSERT_T1 ).await ();
238+
239+ TimeUnit .SECONDS .sleep (3 );
240+
241+ StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment ();
242+ FlinkCompactionConfig cfg = new FlinkCompactionConfig ();
243+ cfg .path = tempFile .getAbsolutePath ();
244+ Configuration conf = FlinkCompactionConfig .toFlinkConfig (cfg );
245+ conf .setString (FlinkOptions .TABLE_TYPE .key (), "MERGE_ON_READ" );
246+
247+ HoodieTableMetaClient metaClient = StreamerUtil .createMetaClient (conf );
248+ conf .setString (FlinkOptions .TABLE_NAME , metaClient .getTableConfig ().getTableName ());
249+ CompactionUtil .setAvroSchema (conf , metaClient );
250+ CompactionUtil .inferChangelogMode (conf , metaClient );
251+
252+ List <String > compactionInstantTimeList = new ArrayList <>(2 );
253+
254+ HoodieFlinkWriteClient writeClient = StreamerUtil .createWriteClient (conf );
255+
256+ compactionInstantTimeList .add (scheduleCompactionPlan (metaClient , writeClient ));
257+
258+ // insert a new record to new partition, so that we can generate a new compaction plan
259+ String insertT1ForNewPartition = "insert into t1 values\n "
260+ + "('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n "
261+ + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')" ;
262+ tableEnv .executeSql (insertT1ForNewPartition ).await ();
263+
264+ TimeUnit .SECONDS .sleep (3 );
265+
266+ compactionInstantTimeList .add (scheduleCompactionPlan (metaClient , writeClient ));
267+
268+ HoodieFlinkTable <?> table = writeClient .getHoodieTable ();
269+
270+ List <Pair <String , HoodieCompactionPlan >> compactionPlans = new ArrayList <>(2 );
271+ for (String compactionInstantTime : compactionInstantTimeList ) {
272+ HoodieCompactionPlan plan = CompactionUtils .getCompactionPlan (table .getMetaClient (), compactionInstantTime );
273+ compactionPlans .add (Pair .of (compactionInstantTime , plan ));
274+ }
275+
276+ // Mark instant as compaction inflight
277+ for (String compactionInstantTime : compactionInstantTimeList ) {
278+ HoodieInstant hoodieInstant = HoodieTimeline .getCompactionRequestedInstant (compactionInstantTime );
279+ table .getActiveTimeline ().transitionCompactionRequestedToInflight (hoodieInstant );
280+ }
281+ table .getMetaClient ().reloadActiveTimeline ();
282+
283+ Pair <String , HoodieCompactionPlan > firstPlan = compactionPlans .get (0 );
284+ DataStream <CompactionPlanEvent > source = env .addSource (new CompactionPlanSourceFunction (firstPlan .getRight (), firstPlan .getLeft ()))
285+ .name ("compaction_source " + firstPlan .getLeft ())
286+ .uid ("uid_compaction_source " + firstPlan .getLeft ());
287+ if (compactionPlans .size () > 1 ) {
288+ for (Pair <String , HoodieCompactionPlan > pair : compactionPlans .subList (1 , compactionPlans .size ())) {
289+ source = source .union (env .addSource (new CompactionPlanSourceFunction (pair .getRight (), pair .getLeft ()))
290+ .name ("compaction_source " + pair .getLeft ())
291+ .uid ("uid_compaction_source " + pair .getLeft ()));
292+ }
293+ }
294+ SingleOutputStreamOperator <Void > operator = source .rebalance ()
295+ .transform ("compact_task" ,
296+ TypeInformation .of (CompactionCommitEvent .class ),
297+ new ProcessOperator <>(new CompactFunction (conf )))
298+ .setParallelism (1 )
299+ .process (new ProcessFunction <CompactionCommitEvent , Void >() {
300+ @ Override
301+ public void processElement (CompactionCommitEvent event , ProcessFunction <CompactionCommitEvent , Void >.Context context , Collector <Void > out ) {
302+ context .output (new OutputTag <>(event .getInstant (), TypeInformation .of (CompactionCommitEvent .class )), event );
303+ }
304+ })
305+ .name ("group_by_compaction_plan" )
306+ .uid ("uid_group_by_compaction_plan" )
307+ .setParallelism (1 );
308+ compactionPlans .forEach (pair ->
309+ operator .getSideOutput (new OutputTag <>(pair .getLeft (), TypeInformation .of (CompactionCommitEvent .class )))
310+ .addSink (new CompactionCommitSink (conf ))
311+ .name ("clean_commits " + pair .getLeft ())
312+ .uid ("uid_clean_commits_" + pair .getLeft ())
313+ .setParallelism (1 ));
314+
315+ env .execute ("flink_hudi_compaction" );
316+ writeClient .close ();
317+ TestData .checkWrittenFullData (tempFile , EXPECTED3 );
318+ }
319+
320+ private String scheduleCompactionPlan (HoodieTableMetaClient metaClient , HoodieFlinkWriteClient <?> writeClient ) {
321+ boolean scheduled = false ;
322+ // judge whether have operation
323+ // To compute the compaction instant time and do compaction.
324+ Option <String > compactionInstantTimeOption = CompactionUtil .getCompactionInstantTime (metaClient );
325+ if (compactionInstantTimeOption .isPresent ()) {
326+ scheduled = writeClient .scheduleCompactionAtInstant (compactionInstantTimeOption .get (), Option .empty ());
327+ }
328+ assertTrue (scheduled , "The compaction plan should be scheduled" );
329+ return compactionInstantTimeOption .get ();
330+ }
206331}
0 commit comments