Skip to content

Commit d411cae

Browse files
committed
[MINOR] Performance improvement of flink ITs with reused miniCluster
1 parent e088faa commit d411cae

5 files changed

Lines changed: 86 additions & 7 deletions

File tree

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.util.AvroSchemaConverter;
3030
import org.apache.hudi.util.HoodiePipeline;
3131
import org.apache.hudi.util.StreamerUtil;
32+
import org.apache.hudi.utils.AbstractHoodieTestBase;
3233
import org.apache.hudi.utils.TestConfigurations;
3334
import org.apache.hudi.utils.TestData;
3435
import org.apache.hudi.utils.TestUtils;
@@ -52,7 +53,6 @@
5253
import org.apache.flink.table.data.RowData;
5354
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
5455
import org.apache.flink.table.types.logical.RowType;
55-
import org.apache.flink.util.TestLogger;
5656
import org.junit.jupiter.api.Test;
5757
import org.junit.jupiter.api.io.TempDir;
5858
import org.junit.jupiter.params.ParameterizedTest;
@@ -71,7 +71,7 @@
7171
/**
7272
* Integration test for Flink Hoodie stream sink.
7373
*/
74-
public class ITTestDataStreamWrite extends TestLogger {
74+
public class ITTestDataStreamWrite extends AbstractHoodieTestBase {
7575

7676
private static final Map<String, List<String>> EXPECTED = new HashMap<>();
7777
private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<>();

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.hudi.util.AvroSchemaConverter;
4040
import org.apache.hudi.util.CompactionUtil;
4141
import org.apache.hudi.util.StreamerUtil;
42+
import org.apache.hudi.utils.AbstractHoodieTestBase;
4243
import org.apache.hudi.utils.TestConfigurations;
4344
import org.apache.hudi.utils.TestData;
4445
import org.apache.hudi.utils.TestSQL;
@@ -69,7 +70,7 @@
6970
/**
7071
* IT cases for {@link HoodieFlinkClusteringJob}.
7172
*/
72-
public class ITTestHoodieFlinkClustering {
73+
public class ITTestHoodieFlinkClustering extends AbstractHoodieTestBase {
7374

7475
private static final Map<String, String> EXPECTED = new HashMap<>();
7576

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hudi.table.HoodieFlinkTable;
3131
import org.apache.hudi.util.CompactionUtil;
3232
import org.apache.hudi.util.StreamerUtil;
33+
import org.apache.hudi.utils.AbstractHoodieTestBase;
3334
import org.apache.hudi.utils.TestConfigurations;
3435
import org.apache.hudi.utils.TestData;
3536
import org.apache.hudi.utils.TestSQL;
@@ -63,7 +64,7 @@
6364
/**
6465
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
6566
*/
66-
public class ITTestHoodieFlinkCompactor {
67+
public class ITTestHoodieFlinkCompactor extends AbstractHoodieTestBase {
6768

6869
protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);
6970

@@ -155,7 +156,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
155156
.transform("compact_task",
156157
TypeInformation.of(CompactionCommitEvent.class),
157158
new ProcessOperator<>(new CompactFunction(conf)))
158-
.setParallelism(compactionPlan.getOperations().size())
159+
.setParallelism(4)
159160
.addSink(new CompactionCommitSink(conf))
160161
.name("clean_commits")
161162
.uid("uid_clean_commits")
@@ -195,6 +196,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
195196
cfg.schedule = true;
196197
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
197198
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
199+
conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), 4);
198200

199201
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
200202
asyncCompactionService.start(null);

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
2828
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
2929
import org.apache.hudi.util.StreamerUtil;
30+
import org.apache.hudi.utils.AbstractHoodieTestBase;
3031
import org.apache.hudi.utils.TestConfigurations;
3132
import org.apache.hudi.utils.TestData;
3233
import org.apache.hudi.utils.TestSQL;
@@ -44,7 +45,6 @@
4445
import org.apache.flink.table.catalog.ObjectPath;
4546
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
4647
import org.apache.flink.table.data.RowData;
47-
import org.apache.flink.test.util.AbstractTestBase;
4848
import org.apache.flink.types.Row;
4949
import org.apache.flink.util.CollectionUtil;
5050
import org.junit.jupiter.api.BeforeEach;
@@ -77,7 +77,7 @@
7777
/**
7878
* IT cases for Hoodie table source and sink.
7979
*/
80-
public class ITTestHoodieDataSource extends AbstractTestBase {
80+
public class ITTestHoodieDataSource extends AbstractHoodieTestBase {
8181
private TableEnvironment streamTableEnv;
8282
private TableEnvironment batchTableEnv;
8383

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.utils;
20+
21+
import org.apache.hudi.exception.HoodieException;
22+
23+
import org.apache.flink.test.util.AbstractTestBase;
24+
import org.apache.flink.test.util.MiniClusterWithClientResource;
25+
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.BeforeAll;
29+
30+
import java.lang.reflect.Field;
31+
import java.util.Arrays;
32+
33+
import static java.lang.reflect.Modifier.isPublic;
34+
import static java.lang.reflect.Modifier.isStatic;
35+
36+
/**
37+
* Hoodie base class for tests that run multiple tests and want to reuse the same Flink cluster.
38+
* Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5.
39+
*/
40+
public abstract class AbstractHoodieTestBase extends AbstractTestBase {
41+
42+
private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = getMiniClusterFromParentClass();
43+
44+
@BeforeAll
45+
public static void beforeAll() throws Exception {
46+
MINI_CLUSTER_RESOURCE.before();
47+
}
48+
49+
@AfterAll
50+
public static void afterAll() {
51+
MINI_CLUSTER_RESOURCE.after();
52+
}
53+
54+
@AfterEach
55+
public void afterEach() throws Exception {
56+
cleanupRunningJobs();
57+
}
58+
59+
private static MiniClusterWithClientResource getMiniClusterFromParentClass() {
60+
String fieldFlink114 = "miniClusterResource";
61+
String fieldFlink115 = "MINI_CLUSTER_RESOURCE";
62+
Field miniClusterField = Arrays.stream(AbstractTestBase.class.getDeclaredFields())
63+
.filter(f -> isPublic(f.getModifiers()) && isStatic(f.getModifiers()))
64+
.filter(f -> fieldFlink114.equals(f.getName()) || fieldFlink115.equals(f.getName()))
65+
.findAny()
66+
.orElseThrow(() -> new NoSuchFieldError(String.format(
67+
"%s not found in %s",
68+
fieldFlink115,
69+
AbstractTestBase.class.getName())));
70+
try {
71+
return (MiniClusterWithClientResource) miniClusterField.get(null);
72+
} catch (IllegalAccessException e) {
73+
throw new HoodieException("Cannot access miniCluster field", e);
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)