Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.AbstractHoodieTestBase;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
Expand All @@ -52,7 +53,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -71,7 +71,7 @@
/**
* Integration test for Flink Hoodie stream sink.
*/
public class ITTestDataStreamWrite extends TestLogger {
public class ITTestDataStreamWrite extends AbstractHoodieTestBase {

private static final Map<String, List<String>> EXPECTED = new HashMap<>();
private static final Map<String, List<String>> EXPECTED_TRANSFORMER = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.AbstractHoodieTestBase;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
Expand Down Expand Up @@ -69,7 +70,7 @@
/**
* IT cases for {@link HoodieFlinkClusteringJob}.
*/
public class ITTestHoodieFlinkClustering {
public class ITTestHoodieFlinkClustering extends AbstractHoodieTestBase {

private static final Map<String, String> EXPECTED = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.AbstractHoodieTestBase;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
Expand Down Expand Up @@ -63,7 +64,7 @@
/**
* IT cases for {@link org.apache.hudi.common.model.HoodieRecord}.
*/
public class ITTestHoodieFlinkCompactor {
public class ITTestHoodieFlinkCompactor extends AbstractHoodieTestBase {

protected static final Logger LOG = LoggerFactory.getLogger(ITTestHoodieFlinkCompactor.class);

Expand Down Expand Up @@ -155,7 +156,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionPlan.getOperations().size())
.setParallelism(4)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
Expand Down Expand Up @@ -195,6 +196,7 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce
cfg.schedule = true;
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), 4);

HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
asyncCompactionService.start(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.AbstractHoodieTestBase;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
Expand All @@ -44,7 +45,6 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -77,7 +77,7 @@
/**
* IT cases for Hoodie table source and sink.
*/
public class ITTestHoodieDataSource extends AbstractTestBase {
public class ITTestHoodieDataSource extends AbstractHoodieTestBase {
private TableEnvironment streamTableEnv;
private TableEnvironment batchTableEnv;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utils;

import org.apache.hudi.exception.HoodieException;

import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;

import java.lang.reflect.Field;
import java.util.Arrays;

import static java.lang.reflect.Modifier.isPublic;
import static java.lang.reflect.Modifier.isStatic;

/**
* Hoodie base class for tests that run multiple tests and want to reuse the same Flink cluster.
* Unlike {@link AbstractTestBase}, this class is designed to run with JUnit 5.
*/
public abstract class AbstractHoodieTestBase extends AbstractTestBase {

private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = getMiniClusterFromParentClass();

@BeforeAll
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the Junit4 ClassRule can not work correctly here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ClassRule does not work
We should call MINI_CLUSTER_RESOURCE.before() and MINI_CLUSTER_RESOURCE.after() using JUnit 5

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some mvn bridge plugin for Junit4 and Junit5, did you try to use that ?

Copy link
Contributor Author

@trushev trushev Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially, we want to run junit5 org.junit.jupiter.api.Test tests in class inherited from class contained junit4 org.junit.ClassRule. Pseudocode:

class HoodieITCase extends FlinkBaseTest {
  run classRule.before() // junit4
  run @Test test1()      // junit5
  run @Test test2()      // junit5
  run classRule.after()  // junit4
}

I didn't find solution better than inroduced by this PR. Do you know mvn plugin that runs such tests?

Copy link
Contributor

@danny0405 danny0405 Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the vintage engine work here:

                <dependency>
			<groupId>org.junit.vintage</groupId>
			<artifactId>junit-vintage-engine</artifactId>
			<version>5.5.2</version>
			<scope>test</scope>
		</dependency>

Copy link
Contributor Author

@trushev trushev Nov 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, there is annotation @EnableRuleMigrationSupport from

<dependency>
  <groupId>org.junit.jupiter</groupId>
  <artifactId>junit-jupiter-migrationsupport</artifactId>
  <version>${junit.jupiter.version}</version>
  <scope>test</scope>
</dependency>

and it works for @Rule

@EnableRuleMigrationSupport
class MyRuleTest {

  @org.junit.Rule
  public ExternalResource myRule = new ExternalResource() {
    public void before() {
      System.out.println("before");
    }
  };

  @org.junit.jupiter.api.Test
  public void test() {
    System.out.println("test");
  }
}
$ mvn test -Dtest=MyRuleTest
before
test

but for some reason it does not work for @ClassRule (our case)

@EnableRuleMigrationSupport
class MyClassRuleTest {

  @org.junit.ClassRule
  public static ExternalResource myRule = new ExternalResource() {
    public void before() {
      System.out.println("before");
    }
  };

  @org.junit.jupiter.api.Test
  public void test() {
    System.out.println("test");
  }
}
$ mvn test -Dtest=MyClassRuleTest
test

There is only @Rule mentioned in official doc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, some compatibility tools should be used here, or maybe we can implement a AbstractTestBase of our own for Junit5, there is no much code here BTW.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implemented as junit5 extension

public static void beforeAll() throws Exception {
MINI_CLUSTER_RESOURCE.before();
}

@AfterAll
public static void afterAll() {
MINI_CLUSTER_RESOURCE.after();
}

@AfterEach
public void afterEach() throws Exception {
cleanupRunningJobs();
}

private static MiniClusterWithClientResource getMiniClusterFromParentClass() {
String fieldFlink114 = "miniClusterResource";
String fieldFlink115 = "MINI_CLUSTER_RESOURCE";
Field miniClusterField = Arrays.stream(AbstractTestBase.class.getDeclaredFields())
.filter(f -> isPublic(f.getModifiers()) && isStatic(f.getModifiers()))
.filter(f -> fieldFlink114.equals(f.getName()) || fieldFlink115.equals(f.getName()))
.findAny()
.orElseThrow(() -> new NoSuchFieldError(String.format(
"%s not found in %s",
fieldFlink115,
AbstractTestBase.class.getName())));
try {
return (MiniClusterWithClientResource) miniClusterField.get(null);
} catch (IllegalAccessException e) {
throw new HoodieException("Cannot access miniCluster field", e);
}
}
}