1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing,
13+ * software distributed under the License is distributed on an
14+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+ * KIND, either express or implied. See the License for the
16+ * specific language governing permissions and limitations
17+ * under the License.
18+ */
19+
20+ package org .apache .hudi .table .action .cluster ;
21+
22+ import org .apache .hudi .common .engine .HoodieEngineContext ;
23+ import org .apache .hudi .common .engine .HoodieLocalEngineContext ;
24+ import org .apache .hudi .common .table .timeline .HoodieActiveTimeline ;
25+ import org .apache .hudi .common .table .timeline .HoodieInstant ;
26+ import org .apache .hudi .common .testutils .MockHoodieTimeline ;
27+ import org .apache .hudi .common .util .Option ;
28+ import org .apache .hudi .config .HoodieClusteringConfig ;
29+ import org .apache .hudi .config .HoodieWriteConfig ;
30+
31+ import org .apache .hadoop .conf .Configuration ;
32+ import org .junit .jupiter .api .Assertions ;
33+ import org .junit .jupiter .api .Test ;
34+
35+ import java .util .Arrays ;
36+ import java .util .List ;
37+
38+ import static org .apache .hudi .common .model .ActionType .commit ;
39+ import static org .apache .hudi .common .model .ActionType .replacecommit ;
40+
41+ class TestClusteringPlanActionExecutor {
42+ @ Test
43+ public void testDeltaCommitsClusteringPlanScheduling () {
44+ ClusteringPlanActionExecutor inlineExecutor = getClusteringPlanActionExecutor (true );
45+ ClusteringPlanActionExecutor notInlineExecutor = getClusteringPlanActionExecutor (false );
46+
47+ List <HoodieInstant > instants =
48+ Arrays .asList (
49+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "1" ),
50+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "2" )
51+ );
52+ HoodieActiveTimeline activeTimeline = new MockHoodieTimeline (instants );
53+ Assertions .assertFalse (inlineExecutor .isScheduleClustering (activeTimeline ),
54+ "Enable inline clustering, 2 delta commits, should not schedule clustering" );
55+ Assertions .assertFalse (notInlineExecutor .isScheduleClustering (activeTimeline ),
56+ "Disable inline clustering, 2 delta commits, should not schedule clustering" );
57+
58+ instants =
59+ Arrays .asList (
60+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "1" ),
61+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "2" ),
62+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "3" )
63+ );
64+ activeTimeline = new MockHoodieTimeline (instants );
65+ Assertions .assertFalse (inlineExecutor .isScheduleClustering (activeTimeline ),
66+ "Enable inline clustering, 3 delta commits, should not schedule clustering" );
67+ Assertions .assertTrue (notInlineExecutor .isScheduleClustering (activeTimeline ),
68+ "Disable inline clustering, 3 delta commits, should schedule clustering" );
69+
70+ instants =
71+ Arrays .asList (
72+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "1" ),
73+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "2" ),
74+ new HoodieInstant (HoodieInstant .State .COMPLETED , replacecommit .name (), "3" ),
75+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "4" ),
76+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "5" ),
77+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "6" )
78+ );
79+ activeTimeline = new MockHoodieTimeline (instants );
80+ Assertions .assertFalse (inlineExecutor .isScheduleClustering (activeTimeline ),
81+ "Enable inline clustering, 3 delta commits after replacecommit, should not schedule clustering" );
82+ Assertions .assertTrue (notInlineExecutor .isScheduleClustering (activeTimeline ),
83+ "Disable inline clustering, 3 delta commits after replacecommit, should schedule clustering" );
84+
85+ instants =
86+ Arrays .asList (
87+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "1" ),
88+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "2" ),
89+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "3" ),
90+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "4" ),
91+ new HoodieInstant (HoodieInstant .State .COMPLETED , commit .name (), "5" )
92+ );
93+ activeTimeline = new MockHoodieTimeline (instants );
94+ Assertions .assertTrue (inlineExecutor .isScheduleClustering (activeTimeline ),
95+ "Enable inline clustering, 5 delta commits, should schedule clustering" );
96+ Assertions .assertTrue (notInlineExecutor .isScheduleClustering (activeTimeline ),
97+ "Disable inline clustering, 5 delta commits, should schedule clustering" );
98+ }
99+
100+ private ClusteringPlanActionExecutor getClusteringPlanActionExecutor (boolean isInlineClustering ) {
101+ HoodieEngineContext engineContext = new HoodieLocalEngineContext (new Configuration ());
102+ String instantTime = "4" ;
103+ HoodieWriteConfig config =
104+ HoodieWriteConfig .newBuilder ()
105+ .withPath ("/db/tbl" )
106+ .withClusteringConfig (
107+ HoodieClusteringConfig .newBuilder ()
108+ .withAsyncClustering (false )
109+ .withInlineClustering (isInlineClustering )
110+ .withAsyncClusteringMaxCommits (3 )
111+ .withInlineClusteringNumCommits (5 )
112+ .build ())
113+ .build ();
114+
115+ return new ClusteringPlanActionExecutor (engineContext ,
116+ config ,
117+ null ,
118+ instantTime ,
119+ Option .empty ());
120+ }
121+ }
0 commit comments