Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.core.execution.RestoreMode;

import org.apache.flink.shaded.guava31.com.google.common.io.Resources;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-starrocks</artifactId>

<properties>
<starrocks.connector.version>1.2.10_flink-${flink.major.version}</starrocks.connector.version>
<!-- TODO: Update this, when StarRocks releases a 1.20 compatible connector. -->
<starrocks.connector.version>1.2.10_flink-1.19</starrocks.connector.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -108,4 +109,4 @@ limitations under the License.
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.flink.cdc.connectors.mongodb;

import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
Expand All @@ -41,7 +41,7 @@
* Basic class for testing MongoDB source, this contains a MongoDB container which enables change
* streams.
*/
public class LegacyMongoDBTestBase extends AbstractTestBase {
public class LegacyMongoDBTestBase extends AbstractTestBaseJUnit4 {

private static final Logger LOG = LoggerFactory.getLogger(LegacyMongoDBTestBase.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.cdc.connectors.mysql;

import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -33,7 +33,7 @@
* Basic class for testing MySQL binlog source, this contains a MySQL container which enables
* binlog.
*/
public abstract class LegacyMySqlTestBase extends AbstractTestBase {
public abstract class LegacyMySqlTestBase extends AbstractTestBaseJUnit4 {

private static final Logger LOG = LoggerFactory.getLogger(LegacyMySqlTestBase.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.types.Row;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -59,7 +59,7 @@
import static org.junit.Assert.assertTrue;

/** Basic class for testing Database Polardbx which supported the mysql protocol. */
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
public abstract class PolardbxSourceTestBase extends AbstractTestBaseJUnit4 {
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.cdc.connectors.oceanbase.testutils.OceanBaseCdcMetadata;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;

import org.junit.ClassRule;

Expand All @@ -43,7 +43,7 @@
import static org.junit.Assert.assertTrue;

/** Basic class for testing OceanBase source. */
public abstract class OceanBaseTestBase extends AbstractTestBase {
public abstract class OceanBaseTestBase extends AbstractTestBaseJUnit4 {

private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.types.Row;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -63,7 +63,7 @@
* Basic class for testing PostgreSQL source, this contains a PostgreSQL container which enables wal
* log.
*/
public abstract class PostgresTestBase extends AbstractTestBase {
public abstract class PostgresTestBase extends AbstractTestBaseJUnit4 {
private static final Logger LOG = LoggerFactory.getLogger(PostgresTestBase.class);
public static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
public static final String DEFAULT_DB = "postgres";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.cdc.connectors.sqlserver;

import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;

import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
Expand Down Expand Up @@ -49,7 +49,7 @@
import static org.junit.Assert.assertNotNull;

/** Utility class for sqlserver tests. */
public class SqlServerTestBase extends AbstractTestBase {
public class SqlServerTestBase extends AbstractTestBaseJUnit4 {
private static final Logger LOG = LoggerFactory.getLogger(SqlServerTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.flink.cdc.connectors.tidb;

import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;

import com.alibaba.dcm.DnsCacheManipulator;
import org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -53,7 +53,7 @@
import static org.junit.Assert.assertNotNull;

/** Utility class for tidb tests. */
public class TiDBTestBase extends AbstractTestBase {
public class TiDBTestBase extends AbstractTestBaseJUnit4 {
private static final Logger LOG = LoggerFactory.getLogger(TiDBTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.cdc.connectors.vitess;

import org.apache.flink.cdc.connectors.vitess.container.VitessContainer;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;

import org.junit.BeforeClass;
import org.slf4j.Logger;
Expand All @@ -43,7 +43,7 @@
import static org.junit.Assert.assertNotNull;

/** Basic class for testing Vitess source, this contains a Vitess container. */
public abstract class VitessTestBase extends AbstractTestBase {
public abstract class VitessTestBase extends AbstractTestBaseJUnit4 {

private static final Logger LOG = LoggerFactory.getLogger(VitessTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
Expand Down
3 changes: 2 additions & 1 deletion flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ limitations under the License.
<flink-major-1.19>1.19</flink-major-1.19>
<flink-major-1.20>1.20</flink-major-1.20>
<mysql.driver.version>8.0.27</mysql.driver.version>
<starrocks.connector.version>1.2.10_flink-${flink.major.version}</starrocks.connector.version>
<!-- TODO: Update this, when StarRocks releases a 1.20 compatible connector. -->
<starrocks.connector.version>1.2.10_flink-${flink-major-1.19}</starrocks.connector.version>
<paimon.version>0.9.0</paimon.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
import java.util.List;
import java.util.concurrent.CyclicBarrier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -486,7 +486,9 @@ public void testDuplicate() throws Exception {
// --------------------------------------------------------------------------------------------

private void deepEquals(String message, T should, T is) {
assertThat(message, is, CustomEqualityMatcher.deeplyEquals(should).withChecker(checker));
assertThat(is)
.as(message)
.matches(CustomEqualityMatcher.deeplyEquals(should).withChecker(checker));
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -580,11 +582,12 @@ public void run() {
T serdeTestItem = serializer.deserialize(dataInputDeserializer);
T copySerdeTestItem = serializer.copy(serdeTestItem);
dataOutputSerializer.clear();

assertThat(
"Serialization/Deserialization cycle resulted in an object that are not equal to the original.",
copySerdeTestItem,
CustomEqualityMatcher.deeplyEquals(testItem).withChecker(checker));
assertThat(copySerdeTestItem)
.as(
"Serialization/Deserialization cycle resulted in an object that are not equal to the original.")
.matches(
CustomEqualityMatcher.deeplyEquals(testItem)
.withChecker(checker));

// try to enforce some upper bound to the test time
if (System.nanoTime() >= endTimeNanos) {
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ limitations under the License.
<flink.reuseForks>true</flink.reuseForks>

<!-- dependencies versions -->
<flink.version>1.19.1</flink.version>
<flink.major.version>1.19</flink.major.version>
<flink.version>1.20.0</flink.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferenc-csaky Could you bump this version to 1.20.1 as we just released it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my original idea as well, but that generated more and more changes (docs, CI, etc), and 1.19.1 has to be bumped to 1.19.2 as well anyways. I'm happy to update both patch versions in a follow-up PR in 1 chunk. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As they're hotfix versions, follow-up PR makes sense to me, thanks for your quick response.

<flink.major.version>1.20</flink.major.version>
<flink.shaded.version>17.0</flink.shaded.version>
<debezium.version>1.9.8.Final</debezium.version>
<tikv.version>3.2.0</tikv.version>
Expand Down
Loading