Skip to content

Commit bd6284f

Browse files
trushevfengjian
authored andcommitted
[MINOR] Performance improvement of flink ITs with reused miniCluster (apache#7151)
* implement MiniCluster extension compatible with junit5
1 parent c3855e4 commit bd6284f

5 files changed

Lines changed: 97 additions & 3 deletions

File tree

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.util.AvroSchemaConverter;
3030
import org.apache.hudi.util.HoodiePipeline;
3131
import org.apache.hudi.util.StreamerUtil;
32+
import org.apache.hudi.utils.FlinkMiniCluster;
3233
import org.apache.hudi.utils.TestConfigurations;
3334
import org.apache.hudi.utils.TestData;
3435
import org.apache.hudi.utils.TestUtils;
@@ -54,6 +55,7 @@
5455
import org.apache.flink.table.types.logical.RowType;
5556
import org.apache.flink.util.TestLogger;
5657
import org.junit.jupiter.api.Test;
58+
import org.junit.jupiter.api.extension.ExtendWith;
5759
import org.junit.jupiter.api.io.TempDir;
5860
import org.junit.jupiter.params.ParameterizedTest;
5961
import org.junit.jupiter.params.provider.ValueSource;
@@ -71,6 +73,7 @@
7173
/**
7274
* Integration test for Flink Hoodie stream sink.
7375
*/
76+
@ExtendWith(FlinkMiniCluster.class)
7477
public class ITTestDataStreamWrite extends TestLogger {
7578

7679
private static final Map<String, List<String>> EXPECTED = new HashMap<>();

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.util.AvroSchemaConverter;
4040
import org.apache.hudi.util.CompactionUtil;
4141
import org.apache.hudi.util.StreamerUtil;
42+
import org.apache.hudi.utils.FlinkMiniCluster;
4243
import org.apache.hudi.utils.TestConfigurations;
4344
import org.apache.hudi.utils.TestData;
4445
import org.apache.hudi.utils.TestSQL;
@@ -56,6 +57,7 @@
5657
import org.apache.flink.table.types.DataType;
5758
import org.apache.flink.table.types.logical.RowType;
5859
import org.junit.jupiter.api.Test;
60+
import org.junit.jupiter.api.extension.ExtendWith;
5961
import org.junit.jupiter.api.io.TempDir;
6062

6163
import java.io.File;
@@ -69,6 +71,7 @@
6971
/**
7072
* IT cases for {@link HoodieFlinkClusteringJob}.
7173
*/
74+
@ExtendWith(FlinkMiniCluster.class)
7275
public class ITTestHoodieFlinkClustering {
7376

7477
private static final Map<String, String> EXPECTED = new HashMap<>();

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hudi.table.HoodieFlinkTable;
3131
import org.apache.hudi.util.CompactionUtil;
3232
import org.apache.hudi.util.StreamerUtil;
33+
import org.apache.hudi.utils.FlinkMiniCluster;
3334
import org.apache.hudi.utils.TestConfigurations;
3435
import org.apache.hudi.utils.TestData;
3536
import org.apache.hudi.utils.TestSQL;
@@ -43,6 +44,7 @@
4344
import org.apache.flink.table.api.config.ExecutionConfigOptions;
4445
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
4546
import org.junit.jupiter.api.Test;
47+
import org.junit.jupiter.api.extension.ExtendWith;
4648
import org.junit.jupiter.api.io.TempDir;
4749
import org.junit.jupiter.params.ParameterizedTest;
4850
import org.junit.jupiter.params.provider.ValueSource;
@@ -63,6 +65,7 @@
6365
/**
6466
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
6567
*/
68+
@ExtendWith(FlinkMiniCluster.class)
6669
public class ITTestHoodieFlinkCompactor {
6770

6871
protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
@@ -155,7 +158,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
155158
.transform("compact_task",
156159
TypeInformation.of(CompactionCommitEvent.class),
157160
new ProcessOperator<>(new CompactFunction(conf)))
158-
.setParallelism(compactionPlan.getOperations().size())
161+
.setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
159162
.addSink(new CompactionCommitSink(conf))
160163
.name("clean_commits")
161164
.uid("uid_clean_commits")
@@ -195,6 +198,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
195198
cfg.schedule = true;
196199
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
197200
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
201+
conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), FlinkMiniCluster.DEFAULT_PARALLELISM);
198202

199203
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
200204
asyncCompactionService.start(null);

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
2828
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
2929
import org.apache.hudi.util.StreamerUtil;
30+
import org.apache.hudi.utils.FlinkMiniCluster;
3031
import org.apache.hudi.utils.TestConfigurations;
3132
import org.apache.hudi.utils.TestData;
3233
import org.apache.hudi.utils.TestSQL;
@@ -44,11 +45,11 @@
4445
import org.apache.flink.table.catalog.ObjectPath;
4546
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
4647
import org.apache.flink.table.data.RowData;
47-
import org.apache.flink.test.util.AbstractTestBase;
4848
import org.apache.flink.types.Row;
4949
import org.apache.flink.util.CollectionUtil;
5050
import org.junit.jupiter.api.BeforeEach;
5151
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.api.extension.ExtendWith;
5253
import org.junit.jupiter.api.io.TempDir;
5354
import org.junit.jupiter.params.ParameterizedTest;
5455
import org.junit.jupiter.params.provider.Arguments;
@@ -77,7 +78,8 @@
7778
/**
7879
* IT cases for Hoodie table source and sink.
7980
*/
80-
public class ITTestHoodieDataSource extends AbstractTestBase {
81+
@ExtendWith(FlinkMiniCluster.class)
82+
public class ITTestHoodieDataSource {
8183
private TableEnvironment streamTableEnv;
8284
private TableEnvironment batchTableEnv;
8385

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.utils;
20+
21+
import org.apache.flink.runtime.client.JobStatusMessage;
22+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
23+
import org.apache.flink.test.util.AbstractTestBase;
24+
import org.apache.flink.test.util.MiniClusterWithClientResource;
25+
26+
import org.junit.jupiter.api.extension.AfterAllCallback;
27+
import org.junit.jupiter.api.extension.AfterEachCallback;
28+
import org.junit.jupiter.api.extension.BeforeAllCallback;
29+
import org.junit.jupiter.api.extension.ExtensionContext;
30+
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* Class for tests that run multiple tests and want to reuse the same Flink cluster.
36+
* Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5.
37+
*/
38+
public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback, AfterEachCallback {
39+
private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniCluster.class);
40+
41+
public static final int DEFAULT_PARALLELISM = 4;
42+
43+
private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
44+
new MiniClusterWithClientResource(
45+
new MiniClusterResourceConfiguration.Builder()
46+
.setNumberTaskManagers(1)
47+
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
48+
.build());
49+
50+
@Override
51+
public void beforeAll(ExtensionContext context) throws Exception {
52+
MINI_CLUSTER_RESOURCE.before();
53+
}
54+
55+
@Override
56+
public void afterAll(ExtensionContext context) {
57+
MINI_CLUSTER_RESOURCE.after();
58+
}
59+
60+
@Override
61+
public void afterEach(ExtensionContext context) throws Exception {
62+
cleanupRunningJobs();
63+
}
64+
65+
private void cleanupRunningJobs() throws Exception {
66+
if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) {
67+
// do nothing if the MiniCluster is not running
68+
LOG.warn("Mini cluster is not running after the test!");
69+
return;
70+
}
71+
72+
for (JobStatusMessage path : MINI_CLUSTER_RESOURCE.getClusterClient().listJobs().get()) {
73+
if (!path.getJobState().isTerminalState()) {
74+
try {
75+
MINI_CLUSTER_RESOURCE.getClusterClient().cancel(path.getJobId()).get();
76+
} catch (Exception ignored) {
77+
// ignore exceptions when cancelling dangling jobs
78+
}
79+
}
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)